Skip to content

Performance Tuning Guide

This guide covers performance optimization strategies for Spring Batch RS applications.

Chunk Size

Increase chunk size to 500-1000 for large datasets

Connection Pooling

Configure appropriate pool sizes for database operations

Buffering

Use buffered I/O for file operations

Batch Operations

Leverage bulk inserts and batch APIs


Chunk size determines how many items are processed before writing to the destination.

Small Chunks (10-50):

  • Lower memory usage
  • More frequent I/O operations
  • Better for debugging
  • More transaction overhead

Large Chunks (500-1000):

  • Higher memory usage
  • Fewer I/O operations
  • Better throughput
  • Fewer transaction commits
Use CaseRecommended SizeReason
File to File500-1000Minimize I/O overhead
File to Database500-1000Batch inserts are efficient
Database to File100-500Balance query overhead and memory
Database to Database500-1000Bulk operations on both sides
Complex Processing100-200Manage memory for transformations
Testing/Development10-50Easier debugging
// Small chunks for development
let step = StepBuilder::new("dev-process")
.chunk::<Input, Output>(10) // Small for debugging
.reader(&reader)
.writer(&writer)
.build();
// Large chunks for production
let step = StepBuilder::new("prod-process")
.chunk::<Input, Output>(1000) // Large for throughput
.reader(&reader)
.writer(&writer)
.build();

use sqlx::postgres::PgPoolOptions;
let pool = PgPoolOptions::new()
.max_connections(10) // Pool size
.min_connections(2) // Minimum idle connections
.acquire_timeout(Duration::from_secs(30))
.idle_timeout(Duration::from_secs(600))
.max_lifetime(Duration::from_secs(1800))
.connect("postgres://...").await?;

Pool Size Guidelines:

  • Single reader + single writer: 5-10 connections
  • Multiple concurrent jobs: 10-20 connections
  • High concurrency: 20-50 connections
  • Don’t exceed database max_connections limit
// Use appropriate page size
let reader = RdbcItemReaderBuilder::<Record>::new()
.postgres(pool)
.query("SELECT * FROM large_table ORDER BY id") // ORDER BY for consistency
.with_page_size(500) // Match or exceed chunk size
.build_postgres();
// For very large datasets, add WHERE clause
let reader = RdbcItemReaderBuilder::<Record>::new()
.postgres(pool)
.query("SELECT * FROM large_table WHERE created_at > '2024-01-01' ORDER BY id")
.with_page_size(1000)
.build_postgres();
// Use bulk inserts with appropriate chunk size
let writer = PostgresItemWriterBuilder::new()
.pool(pool)
.table("target_table")
.binder(|query, records: &Record| {
// Batch bind all records at once
query.push_values(records, |mut b, record| {
b.push_bind(&record.field1)
.push_bind(&record.field2)
.push_bind(&record.field3);
});
})
.build();
let step = StepBuilder::new("bulk-insert")
.chunk::<Record, Record>(1000) // Large chunks for bulk inserts
.reader(&reader)
.writer(&writer)
.build();
-- Good: Index on filter and order columns
CREATE INDEX idx_records_created_id ON records(created_at, id);
-- Query uses index effectively
SELECT * FROM records WHERE created_at > '2024-01-01' ORDER BY id;

use std::io::BufReader;
use std::fs::File;
// Inefficient: No buffering
let file = File::open("large.csv")?;
let reader = CsvItemReaderBuilder::new()
.from_reader(file); // Direct file access
// Efficient: Buffered I/O
let file = File::open("large.csv")?;
let buffered = BufReader::with_capacity(64 * 1024, file); // 64KB buffer
let reader = CsvItemReaderBuilder::new()
.from_reader(buffered);
use std::io::BufWriter;
// Efficient: Buffered writing
let file = File::create("output.csv")?;
let buffered = BufWriter::with_capacity(64 * 1024, file);
let writer = CsvItemWriterBuilder::new()
.from_writer(buffered);

For very large files, consider memory-mapped I/O:

use memmap2::Mmap;
use std::fs::File;
let file = File::open("huge.csv")?;
let mmap = unsafe { Mmap::map(&file)? };
let reader = CsvItemReaderBuilder::new()
.from_reader(&mmap[..]);

use sysinfo::{System, SystemExt};
fn log_memory_usage() {
let mut system = System::new_all();
system.refresh_all();
let used = system.used_memory();
let total = system.total_memory();
println!("Memory: {:.2} GB / {:.2} GB ({:.1}%)",
used as f64 / 1_000_000_000.0,
total as f64 / 1_000_000_000.0,
(used as f64 / total as f64) * 100.0
);
}

1. Clone Only When Necessary:

// Inefficient: Unnecessary clone
impl ItemProcessor<Record, Record> for MyProcessor {
fn process(&self, item: &Record) -> ItemProcessorResult<Record> {
let mut cloned = item.clone(); // Expensive
cloned.field = transform(cloned.field);
Ok(cloned)
}
}
// Efficient: Transform in-place when possible
impl ItemProcessor<Record, Record> for MyProcessor {
fn process(&self, item: &Record) -> ItemProcessorResult<Record> {
Ok(Record {
field: transform(&item.field),
..item.clone() // Only clone necessary parts
})
}
}

2. Use References for Large Strings:

use std::borrow::Cow;
struct Processor;
impl ItemProcessor<String, String> for Processor {
fn process(&self, item: &String) -> ItemProcessorResult<String> {
// Use Cow to avoid unnecessary cloning
let result: Cow<str> = if item.contains("special") {
Cow::Owned(item.to_uppercase())
} else {
Cow::Borrowed(item)
};
Ok(result.into_owned())
}
}

3. Stream Processing:

// Process items one at a time without buffering
let step = StepBuilder::new("stream-process")
.chunk::<Input, Output>(1) // Process immediately
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();

Spring Batch RS uses a single-threaded model per step execution. For parallel processing:

Option 1: Multiple Job Instances

use std::thread;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let handles: Vec<_> = (0..4).map(|i| {
thread::spawn(move || {
process_partition(i)
})
}).collect();
for handle in handles {
handle.join().unwrap()?;
}
Ok(())
}
fn process_partition(partition_id: usize) -> Result<(), Box<dyn std::error::Error>> {
// Each thread processes a partition
let reader = CsvItemReaderBuilder::new()
.from_path(format!("data_part_{}.csv", partition_id))?;
// ... process partition
Ok(())
}

Option 2: Async Steps with Tokio

use tokio::task;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let tasks = (0..4).map(|i| {
task::spawn(async move {
process_partition(i).await
})
});
futures::future::try_join_all(tasks).await?;
Ok(())
}

use std::time::Instant;
let start = Instant::now();
let mut execution = StepExecution::new("process");
step.execute(&mut execution)?;
let duration = start.elapsed();
let items = execution.read_count();
let throughput = items as f64 / duration.as_secs_f64();
println!("Processed {} items in {:?}", items, duration);
println!("Throughput: {:.2} items/sec", throughput);
println!("Write count: {}", execution.write_count());
println!("Skip count: {}", execution.skip_count());
[dev-dependencies]
criterion = "0.5"
use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn benchmark_processor(c: &mut Criterion) {
let processor = MyProcessor::new();
c.bench_function("process item", |b| {
b.iter(|| {
let item = black_box(create_test_item());
processor.process(&item)
})
});
}
criterion_group!(benches, benchmark_processor);
criterion_main!(benches);
Terminal window
# Install cargo-flamegraph
cargo install flamegraph
# Run with profiling
cargo flamegraph --bin my-batch-job
# Open flamegraph.svg in browser

// Slow: Small chunks, no buffering, unnecessary clones
let reader = CsvItemReaderBuilder::new()
.from_path("data.csv")?; // No buffering
let processor = SlowProcessor; // Clones everything
let writer = PostgresItemWriterBuilder::new()
.pool(small_pool) // 2 connections only
.table("target")
.binder(|query, record| { /* ... */ })
.build();
let step = StepBuilder::new("slow")
.chunk::<Record, Record>(10) // Tiny chunks
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
// Result: 100 items/sec
use std::io::BufReader;
// Fast: Large chunks, buffered I/O, optimized pool
let file = File::open("data.csv")?;
let buffered = BufReader::with_capacity(64 * 1024, file);
let reader = CsvItemReaderBuilder::new()
.from_reader(buffered);
let processor = FastProcessor; // Minimal cloning
let pool = PgPoolOptions::new()
.max_connections(20) // Larger pool
.connect("...").await?;
let writer = PostgresItemWriterBuilder::new()
.pool(pool)
.table("target")
.binder(|query, records| {
// Bulk insert all records
query.push_values(records, |mut b, r| {
b.push_bind(&r.field1)
.push_bind(&r.field2);
});
})
.build();
let step = StepBuilder::new("fast")
.chunk::<Record, Record>(1000) // Large chunks
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
// Result: 10,000 items/sec (100x improvement!)

Check:

  1. Chunk size too small
  2. Missing database indexes
  3. Network latency (database/FTP)
  4. Expensive processor operations
  5. Small connection pool

Check:

  1. Chunk size too large
  2. Processor accumulating state
  3. Memory leaks in custom implementations
  4. Large strings not using Cow
  5. Unbounded collections

Check:

  1. Not using bulk inserts
  2. Individual INSERT statements
  3. Transaction overhead (too frequent commits)
  4. Missing indexes on target table
  5. Constraints/triggers slowing inserts