Chunk Size
Use larger chunks (100-500) for JSON to reduce I/O overhead
This page provides comprehensive examples for working with JSON files using Spring Batch RS.
Add the JSON feature to your Cargo.toml:
[dependencies]spring-batch-rs = { version = "0.1", features = ["json"] }serde = { version = "1.0", features = ["derive"] }use spring_batch_rs::{ core::{step::StepBuilder, item::PassThroughProcessor}, item::json::JsonItemReaderBuilder, item::logger::LoggerWriter,};use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Clone)]struct User { id: u32, name: String, email: String, active: bool,}
fn main() -> Result<(), Box<dyn std::error::Error>> { let reader = JsonItemReaderBuilder::<User>::new() .from_path("users.json")?;
let writer = LoggerWriterBuilder::<User>::new().build();
let processor = PassThroughProcessor::<User>::new();
let step = StepBuilder::new("read-json") .chunk::<User, User>(10) .reader(&reader) .processor(&processor) .writer(&writer) .build();
let mut execution = spring_batch_rs::core::step::StepExecution::new("read-json"); step.execute(&mut execution)?;
Ok(())}Input file (users.json):
[ {"id": 1, "name": "Alice", "email": "alice@example.com", "active": true}, {"id": 2, "name": "Bob", "email": "bob@example.com", "active": false}, {"id": 3, "name": "Charlie", "email": "charlie@example.com", "active": true}]let json_data = r#"[ {"id": 1, "name": "Alice", "email": "alice@example.com"}, {"id": 2, "name": "Bob", "email": "bob@example.com"}]"#;
let reader = JsonItemReaderBuilder::<User>::new() .from_reader(json_data.as_bytes());use spring_batch_rs::item::json::JsonItemWriterBuilder;
let writer = JsonItemWriterBuilder::<User>::new() .pretty_formatter(true) // Human-readable with indentation .from_path("output.json")?;Output:
[ { "id": 1, "name": "Alice", "email": "alice@example.com", "active": true }, { "id": 2, "name": "Bob", "email": "bob@example.com", "active": false }]let writer = JsonItemWriterBuilder::<User>::new() .pretty_formatter(false) // Minified, one line .from_path("output.json")?;Output:
[{"id":1,"name":"Alice","email":"alice@example.com","active":true},{"id":2,"name":"Bob","email":"bob@example.com","active":false}]Convert CSV data to JSON format:
use spring_batch_rs::{ core::{job::JobBuilder, step::StepBuilder, item::PassThroughProcessor}, item::{ csv::CsvItemReaderBuilder, json::JsonItemWriterBuilder, },};use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Deserialize, Serialize)]struct Product { id: u32, name: String, price: f64, category: String, in_stock: bool,}
fn main() -> Result<(), Box<dyn std::error::Error>> { let reader = CsvItemReaderBuilder::<Product>::new() .has_headers(true) .from_path("products.csv")?;
let writer = JsonItemWriterBuilder::<User>::new() .pretty_formatter(true) .from_path("products.json")?;
let processor = PassThroughProcessor::<Product>::new();
let step = StepBuilder::new("csv-to-json") .chunk::<Product, Product>(100) .reader(&reader) .processor(&processor) .writer(&writer) .build();
let job = JobBuilder::new().start(&step).build(); job.run()?;
Ok(())}use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};use spring_batch_rs::error::BatchError;
#[derive(Deserialize, Clone)]struct InputUser { id: u32, name: String, email: String, active: bool,}
#[derive(Serialize)]struct ActiveUser { id: u32, name: String, email: String,}
struct ActiveUserFilter;
impl ItemProcessor<InputUser, ActiveUser> for ActiveUserFilter { fn process(&self, item: &InputUser) -> ItemProcessorResult<ActiveUser> { if !item.active { return Err(BatchError::ItemProcessor( format!("User {} is not active", item.id) )); }
Ok(ActiveUser { id: item.id, name: item.name.clone(), email: item.email.clone(), }) }}
fn main() -> Result<(), Box<dyn std::error::Error>> { let reader = JsonItemReaderBuilder::<InputUser>::new() .from_path("all_users.json")?;
let processor = ActiveUserFilter;
let writer = JsonItemWriterBuilder::<User>::new() .pretty_formatter(true) .from_path("active_users.json")?;
let step = StepBuilder::new("filter-users") .chunk::<InputUser, ActiveUser>(50) .reader(&reader) .processor(&processor) .writer(&writer) .skip_limit(100) // Skip inactive users .build();
let job = JobBuilder::new().start(&step).build(); job.run()?;
Ok(())}use spring_batch_rs::core::item::PassThroughProcessor;
#[derive(Debug, Deserialize, Serialize, Clone)]struct Address { street: String, city: String, state: String, zip: String,}
#[derive(Debug, Deserialize, Serialize, Clone)]struct Customer { id: u32, name: String, email: String, address: Address, orders: Vec<u32>,}
fn main() -> Result<(), Box<dyn std::error::Error>> { let reader = JsonItemReaderBuilder::<Customer>::new() .from_path("customers.json")?;
let writer = JsonItemWriterBuilder::<User>::new() .pretty_formatter(true) .from_path("processed_customers.json")?;
let processor = PassThroughProcessor::<Customer>::new();
let step = StepBuilder::new("process-customers") .chunk::<Customer, Customer>(20) .reader(&reader) .processor(&processor) .writer(&writer) .build();
Ok(())}Input:
[ { "id": 1, "name": "John Doe", "email": "john@example.com", "address": { "street": "123 Main St", "city": "Springfield", "state": "IL", "zip": "62701" }, "orders": [1001, 1002, 1003] }]Enrich JSON data with additional information:
use std::collections::HashMap;
#[derive(Deserialize, Clone)]struct OrderInput { order_id: u32, product_id: u32, quantity: u32,}
#[derive(Serialize)]struct EnrichedOrder { order_id: u32, product_id: u32, product_name: String, quantity: u32, unit_price: f64, total: f64,}
struct OrderEnricher { product_catalog: HashMap<u32, (String, f64)>,}
impl ItemProcessor<OrderInput, EnrichedOrder> for OrderEnricher { fn process(&self, item: &OrderInput) -> ItemProcessorResult<EnrichedOrder> { let (product_name, unit_price) = self.product_catalog .get(&item.product_id) .cloned() .ok_or_else(|| BatchError::ItemProcessor( format!("Unknown product: {}", item.product_id) ))?;
let total = unit_price * item.quantity as f64;
Ok(EnrichedOrder { order_id: item.order_id, product_id: item.product_id, product_name, quantity: item.quantity, unit_price, total, }) }}
fn main() -> Result<(), Box<dyn std::error::Error>> { // Build product catalog let mut catalog = HashMap::new(); catalog.insert(101, ("Laptop".to_string(), 999.99)); catalog.insert(102, ("Mouse".to_string(), 29.99)); catalog.insert(103, ("Keyboard".to_string(), 79.99));
let processor = OrderEnricher { product_catalog: catalog };
let reader = JsonItemReaderBuilder::<OrderInput>::new() .from_path("orders.json")?;
let writer = JsonItemWriterBuilder::<User>::new() .pretty_formatter(true) .from_path("enriched_orders.json")?;
let step = StepBuilder::new("enrich-orders") .chunk::<OrderInput, EnrichedOrder>(100) .reader(&reader) .processor(&processor) .writer(&writer) .build();
let job = JobBuilder::new().start(&step).build(); job.run()?;
Ok(())}Validate and clean JSON data:
#[derive(Deserialize, Clone)]struct RawUser { id: String, name: String, email: String, age: String,}
#[derive(Serialize)]struct ValidatedUser { id: u32, name: String, email: String, age: u32,}
struct UserValidator;
impl ItemProcessor<RawUser, ValidatedUser> for UserValidator { fn process(&self, item: &RawUser) -> ItemProcessorResult<ValidatedUser> { // Validate and parse ID let id = item.id.parse::<u32>() .map_err(|_| BatchError::ItemProcessor( format!("Invalid ID: {}", item.id) ))?;
// Validate email if !item.email.contains('@') || !item.email.contains('.') { return Err(BatchError::ItemProcessor( format!("Invalid email: {}", item.email) )); }
// Validate and parse age let age = item.age.parse::<u32>() .map_err(|_| BatchError::ItemProcessor( format!("Invalid age: {}", item.age) ))?;
if age < 18 || age > 120 { return Err(BatchError::ItemProcessor( format!("Age out of range: {}", age) )); }
// Clean name (trim, title case) let name = item.name.trim().to_string();
Ok(ValidatedUser { id, name, email: item.email.trim().to_lowercase(), age, }) }}
fn main() -> Result<(), Box<dyn std::error::Error>> { let reader = JsonItemReaderBuilder::<RawUser>::new() .from_path("raw_users.json")?;
let processor = UserValidator;
let writer = JsonItemWriterBuilder::<User>::new() .pretty_formatter(true) .from_path("validated_users.json")?;
let step = StepBuilder::new("validate-users") .chunk::<RawUser, ValidatedUser>(50) .reader(&reader) .processor(&processor) .writer(&writer) .skip_limit(10) // Skip up to 10 invalid records .build();
let job = JobBuilder::new().start(&step).build(); job.run()?;
Ok(())}Create test data using the fake data generator:
use spring_batch_rs::{ item::fake::person_reader::{PersonReaderBuilder, Person}, core::item::PassThroughProcessor,};
fn main() -> Result<(), Box<dyn std::error::Error>> { let reader = PersonReaderBuilder::new() .number_of_items(1000) .build();
let writer = JsonItemWriterBuilder::<User>::new() .pretty_formatter(true) .from_path("fake_persons.json")?;
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("generate-fake-data") .chunk::<Person, Person>(100) .reader(&reader) .processor(&processor) .writer(&writer) .build();
let job = JobBuilder::new().start(&step).build(); job.run()?;
println!("Generated 1000 fake persons in fake_persons.json"); Ok(())}Process API responses and transform to internal format:
#[derive(Deserialize, Clone)]#[serde(rename_all = "camelCase")]struct ApiResponse { user_id: u32, full_name: String, email_address: String, is_active: bool, created_at: String,}
#[derive(Serialize)]#[serde(rename_all = "snake_case")]struct InternalUser { id: u32, name: String, email: String, active: bool, created: String,}
struct ApiTransformer;
impl ItemProcessor<ApiResponse, InternalUser> for ApiTransformer { fn process(&self, item: &ApiResponse) -> ItemProcessorResult<InternalUser> { Ok(InternalUser { id: item.user_id, name: item.full_name.clone(), email: item.email_address.to_lowercase(), active: item.is_active, created: item.created_at.clone(), }) }}
fn main() -> Result<(), Box<dyn std::error::Error>> { let reader = JsonItemReaderBuilder::<ApiResponse>::new() .from_path("api_response.json")?;
let processor = ApiTransformer;
let writer = JsonItemWriterBuilder::<User>::new() .pretty_formatter(true) .from_path("internal_users.json")?;
let step = StepBuilder::new("transform-api-data") .chunk::<ApiResponse, InternalUser>(100) .reader(&reader) .processor(&processor) .writer(&writer) .build();
let job = JobBuilder::new().start(&step).build(); job.run()?;
Ok(())}use std::sync::Mutex;
struct SummaryAggregator { total: Mutex<f64>, count: Mutex<u32>,}
impl ItemProcessor<Product, Product> for SummaryAggregator { fn process(&self, item: &Product) -> ItemProcessorResult<Product> { let mut total = self.total.lock().unwrap(); let mut count = self.count.lock().unwrap();
*total += item.price; *count += 1;
Ok(item.clone()) }}struct PriceFilter { min_price: f64,}
impl ItemProcessor<Product, Product> for PriceFilter { fn process(&self, item: &Product) -> ItemProcessorResult<Product> { if item.price < self.min_price { return Err(BatchError::ItemProcessor( format!("Price {} below minimum {}", item.price, self.min_price) )); } Ok(item.clone()) }}Chunk Size
Use larger chunks (100-500) for JSON to reduce I/O overhead
Pretty Formatting
Disable pretty formatting for production to reduce file size
Memory Usage
The entire JSON array is loaded into memory - use appropriate chunk sizes
Streaming
JSON reader uses streaming parser for efficient memory usage