Chunk Size
Increase chunk size to 500-1000 for large datasets
This guide covers performance optimization strategies for Spring Batch RS applications.
Chunk Size
Increase chunk size to 500-1000 for large datasets
Connection Pooling
Configure appropriate pool sizes for database operations
Buffering
Use buffered I/O for file operations
Batch Operations
Leverage bulk inserts and batch APIs
Chunk size determines how many items are processed before writing to the destination.
Small Chunks (10-50):
Large Chunks (500-1000):
| Use Case | Recommended Size | Reason |
|---|---|---|
| File to File | 500-1000 | Minimize I/O overhead |
| File to Database | 500-1000 | Batch inserts are efficient |
| Database to File | 100-500 | Balance query overhead and memory |
| Database to Database | 500-1000 | Bulk operations on both sides |
| Complex Processing | 100-200 | Manage memory for transformations |
| Testing/Development | 10-50 | Easier debugging |
// Small chunks for developmentlet step = StepBuilder::new("dev-process") .chunk::<Input, Output>(10) // Small for debugging .reader(&reader) .writer(&writer) .build();
// Large chunks for productionlet step = StepBuilder::new("prod-process") .chunk::<Input, Output>(1000) // Large for throughput .reader(&reader) .writer(&writer) .build();use sqlx::postgres::PgPoolOptions;
let pool = PgPoolOptions::new() .max_connections(10) // Pool size .min_connections(2) // Minimum idle connections .acquire_timeout(Duration::from_secs(30)) .idle_timeout(Duration::from_secs(600)) .max_lifetime(Duration::from_secs(1800)) .connect("postgres://...").await?;use sqlx::mysql::MySqlPoolOptions;
let pool = MySqlPoolOptions::new() .max_connections(10) .min_connections(2) .connect("mysql://...").await?;use sqlx::sqlite::SqlitePoolOptions;
let pool = SqlitePoolOptions::new() .max_connections(5) // SQLite has lower concurrency .connect("sqlite://...").await?;Pool Size Guidelines:
// Use appropriate page sizelet reader = RdbcItemReaderBuilder::<Record>::new() .postgres(pool) .query("SELECT * FROM large_table ORDER BY id") // ORDER BY for consistency .with_page_size(500) // Match or exceed chunk size .build_postgres();
// For very large datasets, add WHERE clauselet reader = RdbcItemReaderBuilder::<Record>::new() .postgres(pool) .query("SELECT * FROM large_table WHERE created_at > '2024-01-01' ORDER BY id") .with_page_size(1000) .build_postgres();// Use bulk inserts with appropriate chunk sizelet writer = PostgresItemWriterBuilder::new() .pool(pool) .table("target_table") .binder(|query, records: &Record| { // Batch bind all records at once query.push_values(records, |mut b, record| { b.push_bind(&record.field1) .push_bind(&record.field2) .push_bind(&record.field3); }); }) .build();
let step = StepBuilder::new("bulk-insert") .chunk::<Record, Record>(1000) // Large chunks for bulk inserts .reader(&reader) .writer(&writer) .build();-- Good: Index on filter and order columnsCREATE INDEX idx_records_created_id ON records(created_at, id);
-- Query uses index effectivelySELECT * FROM records WHERE created_at > '2024-01-01' ORDER BY id;use std::io::BufReader;use std::fs::File;
// Inefficient: No bufferinglet file = File::open("large.csv")?;let reader = CsvItemReaderBuilder::new() .from_reader(file); // Direct file access
// Efficient: Buffered I/Olet file = File::open("large.csv")?;let buffered = BufReader::with_capacity(64 * 1024, file); // 64KB bufferlet reader = CsvItemReaderBuilder::new() .from_reader(buffered);use std::io::BufWriter;
// Efficient: Buffered writinglet file = File::create("output.csv")?;let buffered = BufWriter::with_capacity(64 * 1024, file);let writer = CsvItemWriterBuilder::new() .from_writer(buffered);For very large files, consider memory-mapped I/O:
use memmap2::Mmap;use std::fs::File;
let file = File::open("huge.csv")?;let mmap = unsafe { Mmap::map(&file)? };let reader = CsvItemReaderBuilder::new() .from_reader(&mmap[..]);use sysinfo::{System, SystemExt};
fn log_memory_usage() { let mut system = System::new_all(); system.refresh_all();
let used = system.used_memory(); let total = system.total_memory();
println!("Memory: {:.2} GB / {:.2} GB ({:.1}%)", used as f64 / 1_000_000_000.0, total as f64 / 1_000_000_000.0, (used as f64 / total as f64) * 100.0 );}1. Clone Only When Necessary:
// Inefficient: Unnecessary cloneimpl ItemProcessor<Record, Record> for MyProcessor { fn process(&self, item: &Record) -> ItemProcessorResult<Record> { let mut cloned = item.clone(); // Expensive cloned.field = transform(cloned.field); Ok(cloned) }}
// Efficient: Transform in-place when possibleimpl ItemProcessor<Record, Record> for MyProcessor { fn process(&self, item: &Record) -> ItemProcessorResult<Record> { Ok(Record { field: transform(&item.field), ..item.clone() // Only clone necessary parts }) }}2. Use References for Large Strings:
use std::borrow::Cow;
struct Processor;
impl ItemProcessor<String, String> for Processor { fn process(&self, item: &String) -> ItemProcessorResult<String> { // Use Cow to avoid unnecessary cloning let result: Cow<str> = if item.contains("special") { Cow::Owned(item.to_uppercase()) } else { Cow::Borrowed(item) };
Ok(result.into_owned()) }}3. Stream Processing:
// Process items one at a time without bufferinglet step = StepBuilder::new("stream-process") .chunk::<Input, Output>(1) // Process immediately .reader(&reader) .processor(&processor) .writer(&writer) .build();Spring Batch RS uses a single-threaded model per step execution. For parallel processing:
Option 1: Multiple Job Instances
use std::thread;
fn main() -> Result<(), Box<dyn std::error::Error>> { let handles: Vec<_> = (0..4).map(|i| { thread::spawn(move || { process_partition(i) }) }).collect();
for handle in handles { handle.join().unwrap()?; }
Ok(())}
fn process_partition(partition_id: usize) -> Result<(), Box<dyn std::error::Error>> { // Each thread processes a partition let reader = CsvItemReaderBuilder::new() .from_path(format!("data_part_{}.csv", partition_id))?;
// ... process partition Ok(())}Option 2: Async Steps with Tokio
use tokio::task;
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let tasks = (0..4).map(|i| { task::spawn(async move { process_partition(i).await }) });
futures::future::try_join_all(tasks).await?;
Ok(())}use std::time::Instant;
let start = Instant::now();
let mut execution = StepExecution::new("process");step.execute(&mut execution)?;
let duration = start.elapsed();let items = execution.read_count();let throughput = items as f64 / duration.as_secs_f64();
println!("Processed {} items in {:?}", items, duration);println!("Throughput: {:.2} items/sec", throughput);println!("Write count: {}", execution.write_count());println!("Skip count: {}", execution.skip_count());[dev-dependencies]criterion = "0.5"use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn benchmark_processor(c: &mut Criterion) { let processor = MyProcessor::new();
c.bench_function("process item", |b| { b.iter(|| { let item = black_box(create_test_item()); processor.process(&item) }) });}
criterion_group!(benches, benchmark_processor);criterion_main!(benches);# Install cargo-flamegraphcargo install flamegraph
# Run with profilingcargo flamegraph --bin my-batch-job
# Open flamegraph.svg in browser// Slow: Small chunks, no buffering, unnecessary cloneslet reader = CsvItemReaderBuilder::new() .from_path("data.csv")?; // No buffering
let processor = SlowProcessor; // Clones everything
let writer = PostgresItemWriterBuilder::new() .pool(small_pool) // 2 connections only .table("target") .binder(|query, record| { /* ... */ }) .build();
let step = StepBuilder::new("slow") .chunk::<Record, Record>(10) // Tiny chunks .reader(&reader) .processor(&processor) .writer(&writer) .build();
// Result: 100 items/secuse std::io::BufReader;
// Fast: Large chunks, buffered I/O, optimized poollet file = File::open("data.csv")?;let buffered = BufReader::with_capacity(64 * 1024, file);let reader = CsvItemReaderBuilder::new() .from_reader(buffered);
let processor = FastProcessor; // Minimal cloning
let pool = PgPoolOptions::new() .max_connections(20) // Larger pool .connect("...").await?;
let writer = PostgresItemWriterBuilder::new() .pool(pool) .table("target") .binder(|query, records| { // Bulk insert all records query.push_values(records, |mut b, r| { b.push_bind(&r.field1) .push_bind(&r.field2); }); }) .build();
let step = StepBuilder::new("fast") .chunk::<Record, Record>(1000) // Large chunks .reader(&reader) .processor(&processor) .writer(&writer) .build();
// Result: 10,000 items/sec (100x improvement!)Check:
Check:
Check: