Aller au contenu

Performance Tuning Guide

Ce contenu n’est pas encore disponible dans votre langue.

This guide covers performance optimization strategies for Spring Batch RS applications.

Chunk Size

Increase chunk size to 500-1000 for large datasets

Connection Pooling

Configure appropriate pool sizes for database operations

Buffering

Use buffered I/O for file operations

Batch Operations

Leverage bulk inserts and batch APIs


Chunk size determines how many items are processed before writing to the destination.

Small Chunks (10-50):

  • Lower memory usage
  • More frequent I/O operations
  • Better for debugging
  • More transaction overhead

Large Chunks (500-1000):

  • Higher memory usage
  • Fewer I/O operations
  • Better throughput
  • Fewer transaction commits
Use CaseRecommended SizeReason
File to File500-1000Minimize I/O overhead
File to Database500-1000Batch inserts are efficient
Database to File100-500Balance query overhead and memory
Database to Database500-1000Bulk operations on both sides
Complex Processing100-200Manage memory for transformations
Testing/Development10-50Easier debugging
// Small chunks for development
let step = StepBuilder::new("dev-process")
.chunk::<Input, Output>(10) // Small for debugging
.reader(&reader)
.writer(&writer)
.build();
// Large chunks for production
let step = StepBuilder::new("prod-process")
.chunk::<Input, Output>(1000) // Large for throughput
.reader(&reader)
.writer(&writer)
.build();

use sqlx::postgres::PgPoolOptions;
let pool = PgPoolOptions::new()
.max_connections(10) // Pool size
.min_connections(2) // Minimum idle connections
.acquire_timeout(Duration::from_secs(30))
.idle_timeout(Duration::from_secs(600))
.max_lifetime(Duration::from_secs(1800))
.connect("postgres://...").await?;

Pool Size Guidelines:

  • Single reader + single writer: 5-10 connections
  • Multiple concurrent jobs: 10-20 connections
  • High concurrency: 20-50 connections
  • Don’t exceed database max_connections limit
// Use appropriate page size
let reader = RdbcItemReaderBuilder::<Record>::new()
.postgres(pool)
.query("SELECT * FROM large_table ORDER BY id") // ORDER BY for consistency
.with_page_size(500) // Match or exceed chunk size
.build_postgres();
// For very large datasets, add WHERE clause
let reader = RdbcItemReaderBuilder::<Record>::new()
.postgres(pool)
.query("SELECT * FROM large_table WHERE created_at > '2024-01-01' ORDER BY id")
.with_page_size(1000)
.build_postgres();
// Use bulk inserts with appropriate chunk size
let writer = PostgresItemWriterBuilder::new()
.pool(pool)
.table("target_table")
.binder(|query, records: &Record| {
// Batch bind all records at once
query.push_values(records, |mut b, record| {
b.push_bind(&record.field1)
.push_bind(&record.field2)
.push_bind(&record.field3);
});
})
.build();
let step = StepBuilder::new("bulk-insert")
.chunk::<Record, Record>(1000) // Large chunks for bulk inserts
.reader(&reader)
.writer(&writer)
.build();
-- Good: Index on filter and order columns
CREATE INDEX idx_records_created_id ON records(created_at, id);
-- Query uses index effectively
SELECT * FROM records WHERE created_at > '2024-01-01' ORDER BY id;

use std::io::BufReader;
use std::fs::File;
// Inefficient: No buffering
let file = File::open("large.csv")?;
let reader = CsvItemReaderBuilder::new()
.from_reader(file); // Direct file access
// Efficient: Buffered I/O
let file = File::open("large.csv")?;
let buffered = BufReader::with_capacity(64 * 1024, file); // 64KB buffer
let reader = CsvItemReaderBuilder::new()
.from_reader(buffered);
use std::io::BufWriter;
// Efficient: Buffered writing
let file = File::create("output.csv")?;
let buffered = BufWriter::with_capacity(64 * 1024, file);
let writer = CsvItemWriterBuilder::new()
.from_writer(buffered);

For very large files, consider memory-mapped I/O:

use memmap2::Mmap;
use std::fs::File;
let file = File::open("huge.csv")?;
let mmap = unsafe { Mmap::map(&file)? };
let reader = CsvItemReaderBuilder::new()
.from_reader(&mmap[..]);

use sysinfo::{System, SystemExt};
fn log_memory_usage() {
let mut system = System::new_all();
system.refresh_all();
let used = system.used_memory();
let total = system.total_memory();
println!("Memory: {:.2} GB / {:.2} GB ({:.1}%)",
used as f64 / 1_000_000_000.0,
total as f64 / 1_000_000_000.0,
(used as f64 / total as f64) * 100.0
);
}

1. Clone Only When Necessary:

// Inefficient: Unnecessary clone
impl ItemProcessor<Record, Record> for MyProcessor {
fn process(&self, item: &Record) -> ItemProcessorResult<Record> {
let mut cloned = item.clone(); // Expensive
cloned.field = transform(cloned.field);
Ok(cloned)
}
}
// Efficient: Transform in-place when possible
impl ItemProcessor<Record, Record> for MyProcessor {
fn process(&self, item: &Record) -> ItemProcessorResult<Record> {
Ok(Record {
field: transform(&item.field),
..item.clone() // Only clone necessary parts
})
}
}

2. Use References for Large Strings:

use std::borrow::Cow;
struct Processor;
impl ItemProcessor<String, String> for Processor {
fn process(&self, item: &String) -> ItemProcessorResult<String> {
// Use Cow to avoid unnecessary cloning
let result: Cow<str> = if item.contains("special") {
Cow::Owned(item.to_uppercase())
} else {
Cow::Borrowed(item)
};
Ok(result.into_owned())
}
}

3. Stream Processing:

// Process items one at a time without buffering
let step = StepBuilder::new("stream-process")
.chunk::<Input, Output>(1) // Process immediately
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();

Spring Batch RS uses a single-threaded model per step execution. For parallel processing:

Option 1: Multiple Job Instances

use std::thread;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let handles: Vec<_> = (0..4).map(|i| {
thread::spawn(move || {
process_partition(i)
})
}).collect();
for handle in handles {
handle.join().unwrap()?;
}
Ok(())
}
fn process_partition(partition_id: usize) -> Result<(), Box<dyn std::error::Error>> {
// Each thread processes a partition
let reader = CsvItemReaderBuilder::new()
.from_path(format!("data_part_{}.csv", partition_id))?;
// ... process partition
Ok(())
}

Option 2: Async Steps with Tokio

use tokio::task;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let tasks = (0..4).map(|i| {
task::spawn(async move {
process_partition(i).await
})
});
futures::future::try_join_all(tasks).await?;
Ok(())
}

use std::time::Instant;
let start = Instant::now();
let mut execution = StepExecution::new("process");
step.execute(&mut execution)?;
let duration = start.elapsed();
let items = execution.read_count();
let throughput = items as f64 / duration.as_secs_f64();
println!("Processed {} items in {:?}", items, duration);
println!("Throughput: {:.2} items/sec", throughput);
println!("Write count: {}", execution.write_count());
println!("Skip count: {}", execution.skip_count());
[dev-dependencies]
criterion = "0.5"
use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn benchmark_processor(c: &mut Criterion) {
let processor = MyProcessor::new();
c.bench_function("process item", |b| {
b.iter(|| {
let item = black_box(create_test_item());
processor.process(&item)
})
});
}
criterion_group!(benches, benchmark_processor);
criterion_main!(benches);
Terminal window
# Install cargo-flamegraph
cargo install flamegraph
# Run with profiling
cargo flamegraph --bin my-batch-job
# Open flamegraph.svg in browser

// Slow: Small chunks, no buffering, unnecessary clones
let reader = CsvItemReaderBuilder::new()
.from_path("data.csv")?; // No buffering
let processor = SlowProcessor; // Clones everything
let writer = PostgresItemWriterBuilder::new()
.pool(small_pool) // 2 connections only
.table("target")
.binder(|query, record| { /* ... */ })
.build();
let step = StepBuilder::new("slow")
.chunk::<Record, Record>(10) // Tiny chunks
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
// Result: 100 items/sec
use std::io::BufReader;
// Fast: Large chunks, buffered I/O, optimized pool
let file = File::open("data.csv")?;
let buffered = BufReader::with_capacity(64 * 1024, file);
let reader = CsvItemReaderBuilder::new()
.from_reader(buffered);
let processor = FastProcessor; // Minimal cloning
let pool = PgPoolOptions::new()
.max_connections(20) // Larger pool
.connect("...").await?;
let writer = PostgresItemWriterBuilder::new()
.pool(pool)
.table("target")
.binder(|query, records| {
// Bulk insert all records
query.push_values(records, |mut b, r| {
b.push_bind(&r.field1)
.push_bind(&r.field2);
});
})
.build();
let step = StepBuilder::new("fast")
.chunk::<Record, Record>(1000) // Large chunks
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
// Result: 10,000 items/sec (100x improvement!)

Check:

  1. Chunk size too small
  2. Missing database indexes
  3. Network latency (database/FTP)
  4. Expensive processor operations
  5. Small connection pool

Check:

  1. Chunk size too large
  2. Processor accumulating state
  3. Memory leaks in custom implementations
  4. Large strings not using Cow
  5. Unbounded collections

Check:

  1. Not using bulk inserts
  2. Individual INSERT statements
  3. Transaction overhead (too frequent commits)
  4. Missing indexes on target table
  5. Constraints/triggers slowing inserts