Chunk Size
Increase chunk size to 500-1000 for large datasets
Ce contenu n’est pas encore disponible dans votre langue.
This guide covers performance optimization strategies for Spring Batch RS applications.
Chunk Size
Increase chunk size to 500-1000 for large datasets
Connection Pooling
Configure appropriate pool sizes for database operations
Buffering
Use buffered I/O for file operations
Batch Operations
Leverage bulk inserts and batch APIs
Chunk size determines how many items are processed before writing to the destination.
Small Chunks (10-50):
Large Chunks (500-1000):
| Use Case | Recommended Size | Reason |
|---|---|---|
| File to File | 500-1000 | Minimize I/O overhead |
| File to Database | 500-1000 | Batch inserts are efficient |
| Database to File | 100-500 | Balance query overhead and memory |
| Database to Database | 500-1000 | Bulk operations on both sides |
| Complex Processing | 100-200 | Manage memory for transformations |
| Testing/Development | 10-50 | Easier debugging |
// Small chunks for developmentlet step = StepBuilder::new("dev-process") .chunk::<Input, Output>(10) // Small for debugging .reader(&reader) .writer(&writer) .build();
// Large chunks for productionlet step = StepBuilder::new("prod-process") .chunk::<Input, Output>(1000) // Large for throughput .reader(&reader) .writer(&writer) .build();use sqlx::postgres::PgPoolOptions;
let pool = PgPoolOptions::new() .max_connections(10) // Pool size .min_connections(2) // Minimum idle connections .acquire_timeout(Duration::from_secs(30)) .idle_timeout(Duration::from_secs(600)) .max_lifetime(Duration::from_secs(1800)) .connect("postgres://...").await?;use sqlx::mysql::MySqlPoolOptions;
let pool = MySqlPoolOptions::new() .max_connections(10) .min_connections(2) .connect("mysql://...").await?;use sqlx::sqlite::SqlitePoolOptions;
let pool = SqlitePoolOptions::new() .max_connections(5) // SQLite has lower concurrency .connect("sqlite://...").await?;Pool Size Guidelines:
// Use appropriate page sizelet reader = RdbcItemReaderBuilder::<Record>::new() .postgres(pool) .query("SELECT * FROM large_table ORDER BY id") // ORDER BY for consistency .with_page_size(500) // Match or exceed chunk size .build_postgres();
// For very large datasets, add WHERE clauselet reader = RdbcItemReaderBuilder::<Record>::new() .postgres(pool) .query("SELECT * FROM large_table WHERE created_at > '2024-01-01' ORDER BY id") .with_page_size(1000) .build_postgres();// Use bulk inserts with appropriate chunk sizelet writer = PostgresItemWriterBuilder::new() .pool(pool) .table("target_table") .binder(|query, records: &Record| { // Batch bind all records at once query.push_values(records, |mut b, record| { b.push_bind(&record.field1) .push_bind(&record.field2) .push_bind(&record.field3); }); }) .build();
let step = StepBuilder::new("bulk-insert") .chunk::<Record, Record>(1000) // Large chunks for bulk inserts .reader(&reader) .writer(&writer) .build();-- Good: Index on filter and order columnsCREATE INDEX idx_records_created_id ON records(created_at, id);
-- Query uses index effectivelySELECT * FROM records WHERE created_at > '2024-01-01' ORDER BY id;use std::io::BufReader;use std::fs::File;
// Inefficient: No bufferinglet file = File::open("large.csv")?;let reader = CsvItemReaderBuilder::new() .from_reader(file); // Direct file access
// Efficient: Buffered I/Olet file = File::open("large.csv")?;let buffered = BufReader::with_capacity(64 * 1024, file); // 64KB bufferlet reader = CsvItemReaderBuilder::new() .from_reader(buffered);use std::io::BufWriter;
// Efficient: Buffered writinglet file = File::create("output.csv")?;let buffered = BufWriter::with_capacity(64 * 1024, file);let writer = CsvItemWriterBuilder::new() .from_writer(buffered);For very large files, consider memory-mapped I/O:
use memmap2::Mmap;use std::fs::File;
let file = File::open("huge.csv")?;let mmap = unsafe { Mmap::map(&file)? };let reader = CsvItemReaderBuilder::new() .from_reader(&mmap[..]);use sysinfo::{System, SystemExt};
fn log_memory_usage() { let mut system = System::new_all(); system.refresh_all();
let used = system.used_memory(); let total = system.total_memory();
println!("Memory: {:.2} GB / {:.2} GB ({:.1}%)", used as f64 / 1_000_000_000.0, total as f64 / 1_000_000_000.0, (used as f64 / total as f64) * 100.0 );}1. Clone Only When Necessary:
// Inefficient: Unnecessary cloneimpl ItemProcessor<Record, Record> for MyProcessor { fn process(&self, item: &Record) -> ItemProcessorResult<Record> { let mut cloned = item.clone(); // Expensive cloned.field = transform(cloned.field); Ok(cloned) }}
// Efficient: Transform in-place when possibleimpl ItemProcessor<Record, Record> for MyProcessor { fn process(&self, item: &Record) -> ItemProcessorResult<Record> { Ok(Record { field: transform(&item.field), ..item.clone() // Only clone necessary parts }) }}2. Use References for Large Strings:
use std::borrow::Cow;
struct Processor;
impl ItemProcessor<String, String> for Processor { fn process(&self, item: &String) -> ItemProcessorResult<String> { // Use Cow to avoid unnecessary cloning let result: Cow<str> = if item.contains("special") { Cow::Owned(item.to_uppercase()) } else { Cow::Borrowed(item) };
Ok(result.into_owned()) }}3. Stream Processing:
// Process items one at a time without bufferinglet step = StepBuilder::new("stream-process") .chunk::<Input, Output>(1) // Process immediately .reader(&reader) .processor(&processor) .writer(&writer) .build();Spring Batch RS uses a single-threaded model per step execution. For parallel processing:
Option 1: Multiple Job Instances
use std::thread;
fn main() -> Result<(), Box<dyn std::error::Error>> { let handles: Vec<_> = (0..4).map(|i| { thread::spawn(move || { process_partition(i) }) }).collect();
for handle in handles { handle.join().unwrap()?; }
Ok(())}
fn process_partition(partition_id: usize) -> Result<(), Box<dyn std::error::Error>> { // Each thread processes a partition let reader = CsvItemReaderBuilder::new() .from_path(format!("data_part_{}.csv", partition_id))?;
// ... process partition Ok(())}Option 2: Async Steps with Tokio
use tokio::task;
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let tasks = (0..4).map(|i| { task::spawn(async move { process_partition(i).await }) });
futures::future::try_join_all(tasks).await?;
Ok(())}use std::time::Instant;
let start = Instant::now();
let mut execution = StepExecution::new("process");step.execute(&mut execution)?;
let duration = start.elapsed();let items = execution.read_count();let throughput = items as f64 / duration.as_secs_f64();
println!("Processed {} items in {:?}", items, duration);println!("Throughput: {:.2} items/sec", throughput);println!("Write count: {}", execution.write_count());println!("Skip count: {}", execution.skip_count());[dev-dependencies]criterion = "0.5"use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn benchmark_processor(c: &mut Criterion) { let processor = MyProcessor::new();
c.bench_function("process item", |b| { b.iter(|| { let item = black_box(create_test_item()); processor.process(&item) }) });}
criterion_group!(benches, benchmark_processor);criterion_main!(benches);# Install cargo-flamegraphcargo install flamegraph
# Run with profilingcargo flamegraph --bin my-batch-job
# Open flamegraph.svg in browser// Slow: Small chunks, no buffering, unnecessary cloneslet reader = CsvItemReaderBuilder::new() .from_path("data.csv")?; // No buffering
let processor = SlowProcessor; // Clones everything
let writer = PostgresItemWriterBuilder::new() .pool(small_pool) // 2 connections only .table("target") .binder(|query, record| { /* ... */ }) .build();
let step = StepBuilder::new("slow") .chunk::<Record, Record>(10) // Tiny chunks .reader(&reader) .processor(&processor) .writer(&writer) .build();
// Result: 100 items/secuse std::io::BufReader;
// Fast: Large chunks, buffered I/O, optimized poollet file = File::open("data.csv")?;let buffered = BufReader::with_capacity(64 * 1024, file);let reader = CsvItemReaderBuilder::new() .from_reader(buffered);
let processor = FastProcessor; // Minimal cloning
let pool = PgPoolOptions::new() .max_connections(20) // Larger pool .connect("...").await?;
let writer = PostgresItemWriterBuilder::new() .pool(pool) .table("target") .binder(|query, records| { // Bulk insert all records query.push_values(records, |mut b, r| { b.push_bind(&r.field1) .push_bind(&r.field2); }); }) .build();
let step = StepBuilder::new("fast") .chunk::<Record, Record>(1000) // Large chunks .reader(&reader) .processor(&processor) .writer(&writer) .build();
// Result: 10,000 items/sec (100x improvement!)Check:
Check:
Check: