Aller au contenu

CSV Examples

Ce contenu n’est pas encore disponible dans votre langue.

This page provides comprehensive examples for working with CSV files using Spring Batch RS.

use spring_batch_rs::{
core::{job::JobBuilder, step::StepBuilder, item::PassThroughProcessor},
item::csv::CsvItemReaderBuilder,
BatchError,
};
use serde::Deserialize;
#[derive(Debug, Deserialize, Clone)]
struct Product {
id: u32,
name: String,
price: f64,
category: String,
}
fn main() -> Result<(), BatchError> {
// Create CSV reader
let reader = CsvItemReaderBuilder::<Product>::new()
.has_headers(true) // First row contains headers
.delimiter(b',') // Comma delimiter (default)
.from_path("products.csv")?; // Read from file
// Use logger writer to see the output
let writer = spring_batch_rs::item::logger::LoggerItemWriterBuilder::new()
.log_level(log::Level::Info)
.build();
let processor = PassThroughProcessor::<Product>::new();
let step = StepBuilder::new("read-csv")
.chunk(10)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let job = JobBuilder::new().start(&step).build();
job.run().map(|_| ())
}
use spring_batch_rs::item::csv::CsvItemReaderBuilder;
let csv_data = r#"id,name,price,category
1,Laptop,999.99,Electronics
2,Coffee Mug,12.99,Kitchen
3,Notebook,5.99,Office"#;
let reader = CsvItemReaderBuilder::<Product>::new()
.has_headers(true)
.from_reader(csv_data.as_bytes());
use spring_batch_rs::item::csv::CsvItemWriterBuilder;
use serde::Serialize;
#[derive(Serialize)]
struct Product {
id: u32,
name: String,
price: f64,
}
let writer = CsvItemWriterBuilder::new()
.has_headers(true) // Write header row
.delimiter(b',') // Comma delimiter
.from_path("output.csv")?; // Write to file
use spring_batch_rs::{
core::{job::JobBuilder, step::StepBuilder, item::PassThroughProcessor},
item::{
csv::CsvItemReaderBuilder,
json::JsonItemWriterBuilder,
},
BatchError,
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Deserialize, Serialize)]
struct Product {
id: u32,
name: String,
price: f64,
category: String,
}
fn main() -> Result<(), BatchError> {
let csv_data = r#"id,name,price,category
1,Laptop,999.99,Electronics
2,Coffee Mug,12.99,Kitchen
3,Notebook,5.99,Office
4,Wireless Mouse,29.99,Electronics"#;
let reader = CsvItemReaderBuilder::<Product>::new()
.has_headers(true)
.from_reader(csv_data.as_bytes());
let writer = JsonItemWriterBuilder::<Product>::new()
.pretty_formatter(true)
.from_path("products.json")?;
let processor = PassThroughProcessor::<Product>::new();
let step = StepBuilder::new("csv-to-json")
.chunk(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let job = JobBuilder::new().start(&step).build();
job.run().map(|_| ())
}
use spring_batch_rs::core::item::ItemProcessor;
#[derive(Deserialize, Clone)]
struct RawProduct {
id: u32,
name: String,
price: f64,
category: String,
}
#[derive(Serialize)]
struct DiscountedProduct {
id: u32,
name: String,
original_price: f64,
discounted_price: f64,
discount_percent: u32,
category: String,
}
struct DiscountProcessor;
impl ItemProcessor<RawProduct, DiscountedProduct> for DiscountProcessor {
fn process(&self, item: RawProduct) -> ItemProcessorResult<DiscountedProduct> {
// Category-based discounts
let discount_percent = match item.category.as_str() {
"Electronics" => 15,
"Kitchen" => 10,
"Office" => 5,
_ => 0,
};
let discount_multiplier = 1.0 - (discount_percent as f64 / 100.0);
let discounted_price = item.price * discount_multiplier;
Ok(Some(DiscountedProduct {
id: item.id,
name: item.name,
original_price: item.price,
discounted_price,
discount_percent,
category: item.category,
}))
}
}
let reader = CsvItemReaderBuilder::<Product>::new()
.has_headers(true)
.delimiter(b';') // Semicolon delimiter
.quote(b'"') // Custom quote character
.flexible(true) // Allow variable number of fields
.from_path("products.csv")?;
#[derive(Deserialize)]
struct Product {
#[serde(rename = "0")] // Map to first column
id: u32,
#[serde(rename = "1")] // Map to second column
name: String,
#[serde(rename = "2")] // Map to third column
price: f64,
}
let reader = CsvItemReaderBuilder::<Product>::new()
.has_headers(false) // No header row
.from_path("products.csv")?;
use spring_batch_rs::{
core::{job::JobBuilder, step::StepBuilder, item::ItemProcessor},
item::{csv::CsvItemReaderBuilder, json::JsonItemWriterBuilder},
BatchError,
};
#[derive(Deserialize, Clone)]
struct RawData {
id: String,
value: String,
amount: String,
}
#[derive(Serialize)]
struct ValidatedData {
id: u32,
value: f64,
amount: f64,
}
struct ValidationProcessor;
impl ItemProcessor<RawData, ValidatedData> for ValidationProcessor {
fn process(&self, item: RawData) -> ItemProcessorResult<ValidatedData> {
// Parse and validate
let id = item.id.parse::<u32>()
.map_err(|e| BatchError::ItemProcessor(
format!("Invalid ID '{}': {}", item.id, e)
))?;
let value = item.value.parse::<f64>()
.map_err(|e| BatchError::ItemProcessor(
format!("Invalid value '{}': {}", item.value, e)
))?;
let amount = item.amount.parse::<f64>()
.map_err(|e| BatchError::ItemProcessor(
format!("Invalid amount '{}': {}", item.amount, e)
))?;
// Business validation
if value < 0.0 || amount < 0.0 {
return Err(BatchError::ItemProcessor(
"Negative values not allowed".to_string()
));
}
Ok(Some(ValidatedData { id, value, amount }))
}
}
fn main() -> Result<(), BatchError> {
let reader = CsvItemReaderBuilder::<RawData>::new()
.has_headers(true)
.from_path("raw_data.csv")?;
let writer = JsonItemWriterBuilder::<Product>::new()
.pretty_formatter(true)
.from_path("validated_data.json")?;
let processor = ValidationProcessor;
let step = StepBuilder::new("validate-csv")
.chunk(50)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(10) // Skip up to 10 invalid records
.build();
let job = JobBuilder::new().start(&step).build();
job.run().map(|_| ())
}

Read CSV, enrich with external data, write back to CSV:

use std::collections::HashMap;
#[derive(Deserialize, Clone)]
struct CustomerOrder {
order_id: u32,
customer_id: u32,
product_id: u32,
quantity: u32,
}
#[derive(Serialize)]
struct EnrichedOrder {
order_id: u32,
customer_name: String,
product_name: String,
quantity: u32,
total_price: f64,
}
struct EnrichmentProcessor {
customers: HashMap<u32, String>,
products: HashMap<u32, (String, f64)>, // (name, price)
}
impl ItemProcessor<CustomerOrder, EnrichedOrder> for EnrichmentProcessor {
fn process(&self, item: CustomerOrder) -> ItemProcessorResult<EnrichedOrder> {
let customer_name = self.customers
.get(&item.customer_id)
.cloned()
.unwrap_or_else(|| "Unknown".to_string());
let (product_name, unit_price) = self.products
.get(&item.product_id)
.cloned()
.unwrap_or_else(|| ("Unknown".to_string(), 0.0));
let total_price = unit_price * item.quantity as f64;
Ok(Some(EnrichedOrder {
order_id: item.order_id,
customer_name,
product_name,
quantity: item.quantity,
total_price,
}))
}
}
fn main() -> Result<(), BatchError> {
// Build lookup tables
let mut customers = HashMap::new();
customers.insert(1, "Alice".to_string());
customers.insert(2, "Bob".to_string());
let mut products = HashMap::new();
products.insert(101, ("Laptop".to_string(), 999.99));
products.insert(102, ("Mouse".to_string(), 29.99));
let processor = EnrichmentProcessor { customers, products };
let reader = CsvItemReaderBuilder::<CustomerOrder>::new()
.has_headers(true)
.from_path("orders.csv")?;
let writer = CsvItemWriterBuilder::new()
.has_headers(true)
.from_path("enriched_orders.csv")?;
let step = StepBuilder::new("enrich-orders")
.chunk(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let job = JobBuilder::new().start(&step).build();
job.run().map(|_| ())
}

Return Ok(None) from a processor to silently discard items. Filtered items are counted in StepExecution::filter_count and are not passed to the writer.

use spring_batch_rs::{
core::{
item::{ItemProcessor, ItemProcessorResult},
job::JobBuilder,
step::StepBuilder,
},
item::{csv::CsvItemReaderBuilder, json::JsonItemWriterBuilder},
BatchError,
};
use serde::{Deserialize, Serialize};
use std::env::temp_dir;
#[derive(Debug, Deserialize, Serialize, Clone)]
struct Person {
name: String,
age: u32,
}
/// Keeps only adults (age >= 18). Minors are filtered with `Ok(None)`.
#[derive(Default)]
struct AdultFilter;
impl ItemProcessor<Person, Person> for AdultFilter {
fn process(&self, item: &Person) -> ItemProcessorResult<Person> {
if item.age >= 18 {
Ok(Some(item.clone())) // keep
} else {
Ok(None) // discard — counted in filter_count
}
}
}
fn main() -> Result<(), BatchError> {
let csv_data = "name,age\nAlice,30\nBob,16\nCharlie,25\nDiana,15\nEve,42\nFrank,17\n";
// 1. Build reader from inline CSV
let reader = CsvItemReaderBuilder::<Person>::new()
.has_headers(true)
.from_reader(csv_data.as_bytes());
// 2. Build JSON writer
let output_path = temp_dir().join("adults.json");
let writer = JsonItemWriterBuilder::<Person>::new().from_path(&output_path);
// 3. Build and run step with filter processor
let processor = AdultFilter::default();
let step = StepBuilder::new("filter-adults")
.chunk::<Person, Person>(10)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let job = JobBuilder::new().start(&step).build();
job.run()?;
// 4. Inspect filter statistics
let step_exec = job.get_step_execution("filter-adults").unwrap();
println!("Read: {}", step_exec.read_count); // 6
println!("Filtered: {}", step_exec.filter_count); // 3 (minors)
println!("Written: {}", step_exec.write_count); // 3 (adults)
Ok(())
}

Run the standalone example:

Terminal window
cargo run --example filter_records_from_csv_with_processor --features csv,json

Chunk Size

Use larger chunks (500-1000) for CSV files to reduce I/O overhead

Buffering

CSV readers use buffered I/O by default for optimal performance

Memory Usage

For very large CSV files, use appropriate chunk sizes to control memory

This example demonstrates a complete sales processing pipeline:

use spring_batch_rs::{
core::{job::JobBuilder, step::StepBuilder, item::ItemProcessor},
item::{csv::CsvItemReaderBuilder, csv::CsvItemWriterBuilder},
BatchError,
};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Clone)]
struct SaleRecord {
date: String,
product_id: u32,
product_name: String,
quantity: u32,
unit_price: f64,
region: String,
}
#[derive(Serialize)]
struct ProcessedSale {
date: String,
product_id: u32,
product_name: String,
quantity: u32,
unit_price: f64,
total_amount: f64,
tax: f64,
final_amount: f64,
region: String,
sales_tier: String,
}
struct SalesProcessor {
tax_rate: f64,
}
impl ItemProcessor<SaleRecord, ProcessedSale> for SalesProcessor {
fn process(&self, item: SaleRecord) -> ItemProcessorResult<ProcessedSale> {
let total_amount = item.unit_price * item.quantity as f64;
let tax = total_amount * self.tax_rate;
let final_amount = total_amount + tax;
let sales_tier = if total_amount < 100.0 {
"Small"
} else if total_amount < 1000.0 {
"Medium"
} else {
"Large"
}.to_string();
Ok(Some(ProcessedSale {
date: item.date,
product_id: item.product_id,
product_name: item.product_name,
quantity: item.quantity,
unit_price: item.unit_price,
total_amount,
tax,
final_amount,
region: item.region,
sales_tier,
}))
}
}
fn main() -> Result<(), BatchError> {
let reader = CsvItemReaderBuilder::<SaleRecord>::new()
.has_headers(true)
.from_path("sales_input.csv")?;
let writer = CsvItemWriterBuilder::new()
.has_headers(true)
.from_path("sales_processed.csv")?;
let processor = SalesProcessor { tax_rate: 0.08 }; // 8% tax
let step = StepBuilder::new("process-sales")
.chunk(500)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let job = JobBuilder::new().start(&step).build();
println!("🚀 Starting sales processing job...");
job.run()?;
println!("✅ Sales processing completed!");
Ok(())
}