Chunk Size
Use larger chunks (500-1000) for CSV files to reduce I/O overhead
Ce contenu n’est pas encore disponible dans votre langue.
This page provides comprehensive examples for working with CSV files using Spring Batch RS.
use spring_batch_rs::{ core::{job::JobBuilder, step::StepBuilder, item::PassThroughProcessor}, item::csv::CsvItemReaderBuilder, BatchError,};use serde::Deserialize;
#[derive(Debug, Deserialize, Clone)]struct Product { id: u32, name: String, price: f64, category: String,}
fn main() -> Result<(), BatchError> { // Create CSV reader let reader = CsvItemReaderBuilder::<Product>::new() .has_headers(true) // First row contains headers .delimiter(b',') // Comma delimiter (default) .from_path("products.csv")?; // Read from file
// Use logger writer to see the output let writer = spring_batch_rs::item::logger::LoggerItemWriterBuilder::new() .log_level(log::Level::Info) .build();
let processor = PassThroughProcessor::<Product>::new();
let step = StepBuilder::new("read-csv") .chunk(10) .reader(&reader) .processor(&processor) .writer(&writer) .build();
let job = JobBuilder::new().start(&step).build(); job.run().map(|_| ())}use spring_batch_rs::item::csv::CsvItemReaderBuilder;
let csv_data = r#"id,name,price,category1,Laptop,999.99,Electronics2,Coffee Mug,12.99,Kitchen3,Notebook,5.99,Office"#;
let reader = CsvItemReaderBuilder::<Product>::new() .has_headers(true) .from_reader(csv_data.as_bytes());use spring_batch_rs::item::csv::CsvItemWriterBuilder;use serde::Serialize;
#[derive(Serialize)]struct Product { id: u32, name: String, price: f64,}
let writer = CsvItemWriterBuilder::new() .has_headers(true) // Write header row .delimiter(b',') // Comma delimiter .from_path("output.csv")?; // Write to fileuse spring_batch_rs::{ core::{job::JobBuilder, step::StepBuilder, item::PassThroughProcessor}, item::{ csv::CsvItemReaderBuilder, json::JsonItemWriterBuilder, }, BatchError,};use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Deserialize, Serialize)]struct Product { id: u32, name: String, price: f64, category: String,}
fn main() -> Result<(), BatchError> { let csv_data = r#"id,name,price,category1,Laptop,999.99,Electronics2,Coffee Mug,12.99,Kitchen3,Notebook,5.99,Office4,Wireless Mouse,29.99,Electronics"#;
let reader = CsvItemReaderBuilder::<Product>::new() .has_headers(true) .from_reader(csv_data.as_bytes());
let writer = JsonItemWriterBuilder::<Product>::new() .pretty_formatter(true) .from_path("products.json")?;
let processor = PassThroughProcessor::<Product>::new();
let step = StepBuilder::new("csv-to-json") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) .build();
let job = JobBuilder::new().start(&step).build(); job.run().map(|_| ())}use spring_batch_rs::core::item::ItemProcessor;
#[derive(Deserialize, Clone)]struct RawProduct { id: u32, name: String, price: f64, category: String,}
#[derive(Serialize)]struct DiscountedProduct { id: u32, name: String, original_price: f64, discounted_price: f64, discount_percent: u32, category: String,}
struct DiscountProcessor;
impl ItemProcessor<RawProduct, DiscountedProduct> for DiscountProcessor { fn process(&self, item: RawProduct) -> ItemProcessorResult<DiscountedProduct> { // Category-based discounts let discount_percent = match item.category.as_str() { "Electronics" => 15, "Kitchen" => 10, "Office" => 5, _ => 0, };
let discount_multiplier = 1.0 - (discount_percent as f64 / 100.0); let discounted_price = item.price * discount_multiplier;
Ok(Some(DiscountedProduct { id: item.id, name: item.name, original_price: item.price, discounted_price, discount_percent, category: item.category, })) }}let reader = CsvItemReaderBuilder::<Product>::new() .has_headers(true) .delimiter(b';') // Semicolon delimiter .quote(b'"') // Custom quote character .flexible(true) // Allow variable number of fields .from_path("products.csv")?;#[derive(Deserialize)]struct Product { #[serde(rename = "0")] // Map to first column id: u32, #[serde(rename = "1")] // Map to second column name: String, #[serde(rename = "2")] // Map to third column price: f64,}
let reader = CsvItemReaderBuilder::<Product>::new() .has_headers(false) // No header row .from_path("products.csv")?;use spring_batch_rs::{ core::{job::JobBuilder, step::StepBuilder, item::ItemProcessor}, item::{csv::CsvItemReaderBuilder, json::JsonItemWriterBuilder}, BatchError,};
#[derive(Deserialize, Clone)]struct RawData { id: String, value: String, amount: String,}
#[derive(Serialize)]struct ValidatedData { id: u32, value: f64, amount: f64,}
struct ValidationProcessor;
impl ItemProcessor<RawData, ValidatedData> for ValidationProcessor { fn process(&self, item: RawData) -> ItemProcessorResult<ValidatedData> { // Parse and validate let id = item.id.parse::<u32>() .map_err(|e| BatchError::ItemProcessor( format!("Invalid ID '{}': {}", item.id, e) ))?;
let value = item.value.parse::<f64>() .map_err(|e| BatchError::ItemProcessor( format!("Invalid value '{}': {}", item.value, e) ))?;
let amount = item.amount.parse::<f64>() .map_err(|e| BatchError::ItemProcessor( format!("Invalid amount '{}': {}", item.amount, e) ))?;
// Business validation if value < 0.0 || amount < 0.0 { return Err(BatchError::ItemProcessor( "Negative values not allowed".to_string() )); }
Ok(Some(ValidatedData { id, value, amount })) }}
fn main() -> Result<(), BatchError> { let reader = CsvItemReaderBuilder::<RawData>::new() .has_headers(true) .from_path("raw_data.csv")?;
let writer = JsonItemWriterBuilder::<Product>::new() .pretty_formatter(true) .from_path("validated_data.json")?;
let processor = ValidationProcessor;
let step = StepBuilder::new("validate-csv") .chunk(50) .reader(&reader) .processor(&processor) .writer(&writer) .skip_limit(10) // Skip up to 10 invalid records .build();
let job = JobBuilder::new().start(&step).build(); job.run().map(|_| ())}Read CSV, enrich with external data, write back to CSV:
use std::collections::HashMap;
#[derive(Deserialize, Clone)]struct CustomerOrder { order_id: u32, customer_id: u32, product_id: u32, quantity: u32,}
#[derive(Serialize)]struct EnrichedOrder { order_id: u32, customer_name: String, product_name: String, quantity: u32, total_price: f64,}
struct EnrichmentProcessor { customers: HashMap<u32, String>, products: HashMap<u32, (String, f64)>, // (name, price)}
impl ItemProcessor<CustomerOrder, EnrichedOrder> for EnrichmentProcessor { fn process(&self, item: CustomerOrder) -> ItemProcessorResult<EnrichedOrder> { let customer_name = self.customers .get(&item.customer_id) .cloned() .unwrap_or_else(|| "Unknown".to_string());
let (product_name, unit_price) = self.products .get(&item.product_id) .cloned() .unwrap_or_else(|| ("Unknown".to_string(), 0.0));
let total_price = unit_price * item.quantity as f64;
Ok(Some(EnrichedOrder { order_id: item.order_id, customer_name, product_name, quantity: item.quantity, total_price, })) }}
fn main() -> Result<(), BatchError> { // Build lookup tables let mut customers = HashMap::new(); customers.insert(1, "Alice".to_string()); customers.insert(2, "Bob".to_string());
let mut products = HashMap::new(); products.insert(101, ("Laptop".to_string(), 999.99)); products.insert(102, ("Mouse".to_string(), 29.99));
let processor = EnrichmentProcessor { customers, products };
let reader = CsvItemReaderBuilder::<CustomerOrder>::new() .has_headers(true) .from_path("orders.csv")?;
let writer = CsvItemWriterBuilder::new() .has_headers(true) .from_path("enriched_orders.csv")?;
let step = StepBuilder::new("enrich-orders") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) .build();
let job = JobBuilder::new().start(&step).build(); job.run().map(|_| ())}Return Ok(None) from a processor to silently discard items. Filtered items are counted in
StepExecution::filter_count and are not passed to the writer.
use spring_batch_rs::{ core::{ item::{ItemProcessor, ItemProcessorResult}, job::JobBuilder, step::StepBuilder, }, item::{csv::CsvItemReaderBuilder, json::JsonItemWriterBuilder}, BatchError,};use serde::{Deserialize, Serialize};use std::env::temp_dir;
#[derive(Debug, Deserialize, Serialize, Clone)]struct Person { name: String, age: u32,}
/// Keeps only adults (age >= 18). Minors are filtered with `Ok(None)`.#[derive(Default)]struct AdultFilter;
impl ItemProcessor<Person, Person> for AdultFilter { fn process(&self, item: &Person) -> ItemProcessorResult<Person> { if item.age >= 18 { Ok(Some(item.clone())) // keep } else { Ok(None) // discard — counted in filter_count } }}
fn main() -> Result<(), BatchError> { let csv_data = "name,age\nAlice,30\nBob,16\nCharlie,25\nDiana,15\nEve,42\nFrank,17\n";
// 1. Build reader from inline CSV let reader = CsvItemReaderBuilder::<Person>::new() .has_headers(true) .from_reader(csv_data.as_bytes());
// 2. Build JSON writer let output_path = temp_dir().join("adults.json"); let writer = JsonItemWriterBuilder::<Person>::new().from_path(&output_path);
// 3. Build and run step with filter processor let processor = AdultFilter::default(); let step = StepBuilder::new("filter-adults") .chunk::<Person, Person>(10) .reader(&reader) .processor(&processor) .writer(&writer) .build();
let job = JobBuilder::new().start(&step).build(); job.run()?;
// 4. Inspect filter statistics let step_exec = job.get_step_execution("filter-adults").unwrap(); println!("Read: {}", step_exec.read_count); // 6 println!("Filtered: {}", step_exec.filter_count); // 3 (minors) println!("Written: {}", step_exec.write_count); // 3 (adults)
Ok(())}Run the standalone example:
cargo run --example filter_records_from_csv_with_processor --features csv,jsonChunk Size
Use larger chunks (500-1000) for CSV files to reduce I/O overhead
Buffering
CSV readers use buffered I/O by default for optimal performance
Memory Usage
For very large CSV files, use appropriate chunk sizes to control memory
This example demonstrates a complete sales processing pipeline:
use spring_batch_rs::{ core::{job::JobBuilder, step::StepBuilder, item::ItemProcessor}, item::{csv::CsvItemReaderBuilder, csv::CsvItemWriterBuilder}, BatchError,};use serde::{Deserialize, Serialize};
#[derive(Deserialize, Clone)]struct SaleRecord { date: String, product_id: u32, product_name: String, quantity: u32, unit_price: f64, region: String,}
#[derive(Serialize)]struct ProcessedSale { date: String, product_id: u32, product_name: String, quantity: u32, unit_price: f64, total_amount: f64, tax: f64, final_amount: f64, region: String, sales_tier: String,}
struct SalesProcessor { tax_rate: f64,}
impl ItemProcessor<SaleRecord, ProcessedSale> for SalesProcessor { fn process(&self, item: SaleRecord) -> ItemProcessorResult<ProcessedSale> { let total_amount = item.unit_price * item.quantity as f64; let tax = total_amount * self.tax_rate; let final_amount = total_amount + tax;
let sales_tier = if total_amount < 100.0 { "Small" } else if total_amount < 1000.0 { "Medium" } else { "Large" }.to_string();
Ok(Some(ProcessedSale { date: item.date, product_id: item.product_id, product_name: item.product_name, quantity: item.quantity, unit_price: item.unit_price, total_amount, tax, final_amount, region: item.region, sales_tier, })) }}
fn main() -> Result<(), BatchError> { let reader = CsvItemReaderBuilder::<SaleRecord>::new() .has_headers(true) .from_path("sales_input.csv")?;
let writer = CsvItemWriterBuilder::new() .has_headers(true) .from_path("sales_processed.csv")?;
let processor = SalesProcessor { tax_rate: 0.08 }; // 8% tax
let step = StepBuilder::new("process-sales") .chunk(500) .reader(&reader) .processor(&processor) .writer(&writer) .build();
let job = JobBuilder::new().start(&step).build();
println!("🚀 Starting sales processing job..."); job.run()?; println!("✅ Sales processing completed!");
Ok(())}