Advanced Patterns
Advanced Patterns
Section titled “Advanced Patterns”This section covers more complex and realistic use cases that you might encounter when building batch applications. These examples demonstrate how to chain multiple steps together to create a complete data processing pipeline.
Item Filtering with a Processor
Section titled “Item Filtering with a Processor”A processor can silently discard items by returning Ok(None). The item is not passed to the
writer and is counted in StepExecution::filter_count — separate from errors and skips.
Three-State Return
Section titled “Three-State Return”Ok(Some(item)) → item passed to writer → process_count++Ok(None) → item discarded silently → filter_count++Err(BatchError) → processing failure → process_error_count++ (skip_limit applies)Filtering vs Error Handling
Section titled “Filtering vs Error Handling”| Situation | Use |
|---|---|
| Business rule: discard minors, inactive records, test data | Ok(None) |
| Unexpected invalid data that should be reported | Err(BatchError) + skip_limit |
Example: Keep Only Adults
Section titled “Example: Keep Only Adults”use spring_batch_rs::{ core::{ item::{ItemProcessor, ItemProcessorResult}, job::JobBuilder, step::StepBuilder, }, item::{csv::CsvItemReaderBuilder, json::JsonItemWriterBuilder}, BatchError,};use serde::{Deserialize, Serialize};use std::env::temp_dir;
#[derive(Debug, Deserialize, Serialize, Clone)]struct Person { name: String, age: u32,}
#[derive(Default)]struct AdultFilter;
impl ItemProcessor<Person, Person> for AdultFilter { fn process(&self, item: Person) -> ItemProcessorResult<Person> { if item.age >= 18 { Ok(Some(item)) // keep } else { Ok(None) // discard — increments filter_count, not an error } }}
fn main() -> Result<(), BatchError> { let csv_data = "name,age\nAlice,30\nBob,16\nCharlie,25\nDiana,15\nEve,42\nFrank,17\n";
let reader = CsvItemReaderBuilder::<Person>::new() .has_headers(true) .from_reader(csv_data.as_bytes());
let writer = JsonItemWriterBuilder::<Person>::new() .from_path(temp_dir().join("adults.json"));
let step = StepBuilder::new("filter-adults") .chunk::<Person, Person>(10) .reader(&reader) .processor(&AdultFilter::default()) .writer(&writer) .build();
let job = JobBuilder::new().start(&step).build(); job.run()?;
let exec = job.get_step_execution("filter-adults").unwrap(); println!("Read: {}", exec.read_count); // 6 println!("Filtered: {}", exec.filter_count); // 3 (minors discarded) println!("Written: {}", exec.write_count); // 3 (adults kept)
Ok(())}cargo run --example filter_records_from_csv_with_processor --features csv,jsonChaining Item Processors
Section titled “Chaining Item Processors”CompositeItemProcessorBuilder lets you chain multiple processors into a single pipeline.
The output of each processor becomes the input of the next. If any processor returns
Ok(None), the chain stops immediately and the item is filtered — subsequent processors
are never called.
How It Works
Section titled “How It Works”item → p1 → intermediate₁ → p2 → intermediate₂ → ... → pN → output ↓ Ok(None)? stop — item filteredChaining Processors with Different Types
Section titled “Chaining Processors with Different Types”use spring_batch_rs::core::item::{ ItemProcessor, ItemProcessorResult, CompositeItemProcessorBuilder,};use spring_batch_rs::BatchError;
struct RawOrder { id: String, customer: String, amount: String }
#[derive(Clone)]struct ParsedOrder { id: u32, customer: String, amount: f64 }
struct EnrichedOrder { id: u32, customer: String, amount: f64, tax: f64, total: f64 }
// Step 1: parse raw strings into typed valuesstruct ParseProcessor;impl ItemProcessor<RawOrder, ParsedOrder> for ParseProcessor { fn process(&self, item: RawOrder) -> ItemProcessorResult<ParsedOrder> { let id = item.id.trim().parse().ok(); let amount = item.amount.trim().parse().ok(); match (id, amount) { (Some(id), Some(amount)) => Ok(Some(ParsedOrder { id, amount, customer: item.customer.clone(), })), _ => Ok(None), // unparseable — filter silently } }}
// Step 2: filter orders below thresholdstruct ValidateProcessor { min_amount: f64 }impl ItemProcessor<ParsedOrder, ParsedOrder> for ValidateProcessor { fn process(&self, item: ParsedOrder) -> ItemProcessorResult<ParsedOrder> { if item.amount < self.min_amount { Ok(None) } else { Ok(Some(item)) } }}
// Step 3: add tax and totalstruct EnrichProcessor;impl ItemProcessor<ParsedOrder, EnrichedOrder> for EnrichProcessor { fn process(&self, item: ParsedOrder) -> ItemProcessorResult<EnrichedOrder> { let tax = (item.amount * 0.20 * 100.0).round() / 100.0; Ok(Some(EnrichedOrder { id: item.id, customer: item.customer.clone(), amount: item.amount, tax, total: item.amount + tax, })) }}
// Wire together: RawOrder → ParsedOrder → ParsedOrder → EnrichedOrderlet composite = CompositeItemProcessorBuilder::new(ParseProcessor) .link(ValidateProcessor { min_amount: 10.0 }) .link(EnrichProcessor) .build();
// Use with a step builderlet step = StepBuilder::new("enrich-orders") .chunk::<RawOrder, EnrichedOrder>(100) .reader(&reader) .processor(&composite) .writer(&writer) .build();cargo run --example chaining_processors --features csv,jsonFilter Propagation in a Chain
Section titled “Filter Propagation in a Chain”| Scenario | Result |
|---|---|
Processor N returns Ok(Some(item)) | Item forwarded to processor N+1 |
Processor N returns Ok(None) | Chain stops — filter_count++, no write |
Processor N returns Err(e) | Chain stops — process_error_count++, skip logic applies |
Chaining Item Writers (Fan-out)
Section titled “Chaining Item Writers (Fan-out)”CompositeItemWriterBuilder lets you send the same chunk of items to multiple writers
simultaneously. Writers are called in order; if any writer fails the chain short-circuits
and returns the error.
How It Works
Section titled “How It Works”chunk → w1 → w2 → ... → wN (all receive the same slice) ↓ Err? stop — error propagatedFan-out to Logger and JSON File
Section titled “Fan-out to Logger and JSON File”use spring_batch_rs::core::item::CompositeItemWriterBuilder;use spring_batch_rs::item::{ json::json_writer::JsonItemWriterBuilder, logger::LoggerWriterBuilder,};use serde::{Deserialize, Serialize};use std::env::temp_dir;
#[derive(Debug, Deserialize, Serialize, Clone)]struct Product { id: u32, name: String, price: f64 }
let json_writer = JsonItemWriterBuilder::<Product>::new() .from_path(temp_dir().join("products.json"));let logger_writer = LoggerWriterBuilder::<Product>::new().build();
// Both writers receive identical item slices on every chunk.let composite = CompositeItemWriterBuilder::new(logger_writer) .add(json_writer) .build();
let step = StepBuilder::new("fan-out-products") .chunk::<Product, Product>(10) .reader(&reader) .processor(&PassThroughProcessor::<Product>::new()) .writer(&composite) .build();cargo run --example chaining_writers --features csv,json,loggerError Behaviour
Section titled “Error Behaviour”| Scenario | Result |
|---|---|
| All writers succeed | Ok(()) |
Writer N returns Err(e) | Chain stops — error propagated, writers N+1…M not called |
Multi-Step ETL Job
Section titled “Multi-Step ETL Job”This example demonstrates a complete Extract, Transform, Load (ETL) job with two distinct steps. This is a common pattern for processing data from one source, transforming it, and loading it into another destination.
The job will perform the following actions:
-
Step 1: Extract and Filter
- Reads records from a source CSV file.
- Filters out any records belonging to the “Test” category.
- Writes the resulting records to an intermediate JSON file.
-
Step 2: Transform and Load
- Reads records from the intermediate JSON file.
- Transforms the data into a final
OutputRecordformat. - “Loads” the data by logging it to the console.
This showcases how to use different readers, writers, and processors across multiple, sequential steps within a single Job.
use anyhow::Result;use serde::{{Deserialize, Serialize}};use spring_batch_rs::{{ core::{{ item::ItemProcessor, job::{{Job, JobBuilder}}, step::StepBuilder, }}, item::{{ csv::csv_reader::CsvItemReaderBuilder, json::json_reader::JsonItemReaderBuilder, json::json_writer::JsonItemWriterBuilder, logger::LoggerItemWriter, }}, BatchError,}};use std::env::temp_dir;use std::fs::File;use std::io::Write;
// --- Data Structures ---// Input record from CSV.#[derive(Deserialize, Clone, Debug)]struct InputRecord { id: u32, name: String, category: String, value: f64,}
// Intermediate record, stored in JSON. `Serialize` is needed for the writer.#[derive(Deserialize, Serialize, Clone, Debug)]struct IntermediateRecord { id: u32, name: String, value: f64,}
// Final record, after transformation.#[derive(Serialize, Clone, Debug)]struct OutputRecord { record_id: String, description: String, processed_value: f64,}
// --- Step 1: CSV to JSON with Filtering ---
// A custom processor that filters out records from the "Test" category.struct FilterProcessor;impl ItemProcessor<InputRecord, IntermediateRecord> for FilterProcessor { fn process(&self, item: InputRecord) -> ItemProcessorResult<IntermediateRecord> { if item.category == "Test" { println!("Filtering out test record: {{}}", item.name); return Ok(None); // Returning None filters the item. }
Ok(Some(IntermediateRecord { id: item.id, name: item.name.clone(), value: item.value, })) }}
// --- Step 2: JSON to Console Log with Transformation ---
// A custom processor that transforms an IntermediateRecord into an OutputRecord.struct TransformProcessor;impl ItemProcessor<IntermediateRecord, OutputRecord> for TransformProcessor { fn process(&self, item: IntermediateRecord) -> ItemProcessorResult<OutputRecord> { Ok(Some(OutputRecord { record_id: format!("REC-{{}}", item.id), description: format!("{{}} (Value: {{}})", item.name, item.value), processed_value: item.value * 1.1, // Apply a 10% markup. })) }}
fn main() -> Result<()> { println!("--- Starting Multi-Step ETL Job Example ---");
// --- Create dummy CSV file for input --- let csv_path = temp_dir().join("input_data.csv"); let mut file = File::create(&csv_path)?; file.write_all(b"id,name,category,value\n")?; file.write_all(b"1,Product A,Live,100.0\n")?; file.write_all(b"2,Product B,Test,200.0\n")?; file.write_all(b"3,Product C,Live,300.0\n")?; println!("Created dummy input file: {{:?}}", csv_path);
// Define path for the intermediate JSON file. let intermediate_json_path = temp_dir().join("intermediate_data.json");
// --- 1. Configure and build Step 1 --- println!("\n--- Configuring Step 1: CSV to JSON ---"); let csv_reader = CsvItemReaderBuilder::<InputRecord>::new() .has_headers(true) .from_path(&csv_path);
let json_writer = JsonItemWriterBuilder::<IntermediateRecord>::new().from_path(&intermediate_json_path);
let filter_processor = FilterProcessor;
let step1 = StepBuilder::new("csv_to_json_filter_step") .chunk::<InputRecord, IntermediateRecord>(10) .reader(&csv_reader) .processor(&filter_processor) .writer(&json_writer) .build();
// --- 2. Configure and build Step 2 --- println!("\n--- Configuring Step 2: JSON to Console ---"); let json_reader = JsonItemReaderBuilder::<IntermediateRecord>::new() .from_path(&intermediate_json_path);
let transform_processor = TransformProcessor;
// This writer logs the final records to the console. let log_writer = LoggerItemWriter::new();
let step2 = StepBuilder::new("json_to_log_transform_step") .chunk::<IntermediateRecord, OutputRecord>(10) .reader(&json_reader) .processor(&transform_processor) .writer(&log_writer) .build();
// --- 3. Configure and Run the Job --- println!("\n--- Configuring and Running the Job ---"); let job = JobBuilder::new() .start(&step1) // Start with step 1 .next(&step2) // Proceed to step 2 after step 1 succeeds .build();
let job_execution = job.run();
// --- 4. Verify results --- assert!(job_execution.is_success()); let step1_execution = job.get_step_execution("csv_to_json_filter_step").unwrap(); assert_eq!(step1_execution.read_count, 3); assert_eq!(step1_execution.write_count, 2); // 1 record was filtered.
let step2_execution = job.get_step_execution("json_to_log_transform_step").unwrap(); assert_eq!(step2_execution.read_count, 2); assert_eq!(step2_execution.write_count, 2);
println!("\n--- Multi-Step ETL Job Finished Successfully! ---"); println!("Intermediate file is at: {{:?}}", intermediate_json_path);
Ok(())}