Skip to content

Advanced Patterns

This section covers more complex and realistic use cases that you might encounter when building batch applications. These examples demonstrate how to chain multiple steps together to create a complete data processing pipeline.

A processor can silently discard items by returning Ok(None). The item is not passed to the writer and is counted in StepExecution::filter_count — separate from errors and skips.

Ok(Some(item)) → item passed to writer → process_count++
Ok(None) → item discarded silently → filter_count++
Err(BatchError) → processing failure → process_error_count++ (skip_limit applies)
SituationUse
Business rule: discard minors, inactive records, test dataOk(None)
Unexpected invalid data that should be reportedErr(BatchError) + skip_limit
use spring_batch_rs::{
core::{
item::{ItemProcessor, ItemProcessorResult},
job::JobBuilder,
step::StepBuilder,
},
item::{csv::CsvItemReaderBuilder, json::JsonItemWriterBuilder},
BatchError,
};
use serde::{Deserialize, Serialize};
use std::env::temp_dir;
#[derive(Debug, Deserialize, Serialize, Clone)]
struct Person {
name: String,
age: u32,
}
#[derive(Default)]
struct AdultFilter;
impl ItemProcessor<Person, Person> for AdultFilter {
fn process(&self, item: Person) -> ItemProcessorResult<Person> {
if item.age >= 18 {
Ok(Some(item)) // keep
} else {
Ok(None) // discard — increments filter_count, not an error
}
}
}
fn main() -> Result<(), BatchError> {
let csv_data = "name,age\nAlice,30\nBob,16\nCharlie,25\nDiana,15\nEve,42\nFrank,17\n";
let reader = CsvItemReaderBuilder::<Person>::new()
.has_headers(true)
.from_reader(csv_data.as_bytes());
let writer = JsonItemWriterBuilder::<Person>::new()
.from_path(temp_dir().join("adults.json"));
let step = StepBuilder::new("filter-adults")
.chunk::<Person, Person>(10)
.reader(&reader)
.processor(&AdultFilter::default())
.writer(&writer)
.build();
let job = JobBuilder::new().start(&step).build();
job.run()?;
let exec = job.get_step_execution("filter-adults").unwrap();
println!("Read: {}", exec.read_count); // 6
println!("Filtered: {}", exec.filter_count); // 3 (minors discarded)
println!("Written: {}", exec.write_count); // 3 (adults kept)
Ok(())
}
Terminal window
cargo run --example filter_records_from_csv_with_processor --features csv,json

CompositeItemProcessorBuilder lets you chain multiple processors into a single pipeline. The output of each processor becomes the input of the next. If any processor returns Ok(None), the chain stops immediately and the item is filtered — subsequent processors are never called.

item → p1 → intermediate₁ → p2 → intermediate₂ → ... → pN → output
↓ Ok(None)?
stop — item filtered
use spring_batch_rs::core::item::{
ItemProcessor, ItemProcessorResult, CompositeItemProcessorBuilder,
};
use spring_batch_rs::BatchError;
struct RawOrder { id: String, customer: String, amount: String }
#[derive(Clone)]
struct ParsedOrder { id: u32, customer: String, amount: f64 }
struct EnrichedOrder { id: u32, customer: String, amount: f64, tax: f64, total: f64 }
// Step 1: parse raw strings into typed values
struct ParseProcessor;
impl ItemProcessor<RawOrder, ParsedOrder> for ParseProcessor {
fn process(&self, item: RawOrder) -> ItemProcessorResult<ParsedOrder> {
let id = item.id.trim().parse().ok();
let amount = item.amount.trim().parse().ok();
match (id, amount) {
(Some(id), Some(amount)) => Ok(Some(ParsedOrder {
id, amount, customer: item.customer.clone(),
})),
_ => Ok(None), // unparseable — filter silently
}
}
}
// Step 2: filter orders below threshold
struct ValidateProcessor { min_amount: f64 }
impl ItemProcessor<ParsedOrder, ParsedOrder> for ValidateProcessor {
fn process(&self, item: ParsedOrder) -> ItemProcessorResult<ParsedOrder> {
if item.amount < self.min_amount { Ok(None) } else { Ok(Some(item)) }
}
}
// Step 3: add tax and total
struct EnrichProcessor;
impl ItemProcessor<ParsedOrder, EnrichedOrder> for EnrichProcessor {
fn process(&self, item: ParsedOrder) -> ItemProcessorResult<EnrichedOrder> {
let tax = (item.amount * 0.20 * 100.0).round() / 100.0;
Ok(Some(EnrichedOrder {
id: item.id, customer: item.customer.clone(),
amount: item.amount, tax, total: item.amount + tax,
}))
}
}
// Wire together: RawOrder → ParsedOrder → ParsedOrder → EnrichedOrder
let composite = CompositeItemProcessorBuilder::new(ParseProcessor)
.link(ValidateProcessor { min_amount: 10.0 })
.link(EnrichProcessor)
.build();
// Use with a step builder
let step = StepBuilder::new("enrich-orders")
.chunk::<RawOrder, EnrichedOrder>(100)
.reader(&reader)
.processor(&composite)
.writer(&writer)
.build();
Terminal window
cargo run --example chaining_processors --features csv,json
ScenarioResult
Processor N returns Ok(Some(item))Item forwarded to processor N+1
Processor N returns Ok(None)Chain stops — filter_count++, no write
Processor N returns Err(e)Chain stops — process_error_count++, skip logic applies

CompositeItemWriterBuilder lets you send the same chunk of items to multiple writers simultaneously. Writers are called in order; if any writer fails the chain short-circuits and returns the error.

chunk → w1 → w2 → ... → wN (all receive the same slice)
↓ Err?
stop — error propagated
use spring_batch_rs::core::item::CompositeItemWriterBuilder;
use spring_batch_rs::item::{
json::json_writer::JsonItemWriterBuilder,
logger::LoggerWriterBuilder,
};
use serde::{Deserialize, Serialize};
use std::env::temp_dir;
#[derive(Debug, Deserialize, Serialize, Clone)]
struct Product { id: u32, name: String, price: f64 }
let json_writer = JsonItemWriterBuilder::<Product>::new()
.from_path(temp_dir().join("products.json"));
let logger_writer = LoggerWriterBuilder::<Product>::new().build();
// Both writers receive identical item slices on every chunk.
let composite = CompositeItemWriterBuilder::new(logger_writer)
.add(json_writer)
.build();
let step = StepBuilder::new("fan-out-products")
.chunk::<Product, Product>(10)
.reader(&reader)
.processor(&PassThroughProcessor::<Product>::new())
.writer(&composite)
.build();
Terminal window
cargo run --example chaining_writers --features csv,json,logger
ScenarioResult
All writers succeedOk(())
Writer N returns Err(e)Chain stops — error propagated, writers N+1…M not called

This example demonstrates a complete Extract, Transform, Load (ETL) job with two distinct steps. This is a common pattern for processing data from one source, transforming it, and loading it into another destination.

The job will perform the following actions:

  1. Step 1: Extract and Filter

    • Reads records from a source CSV file.
    • Filters out any records belonging to the “Test” category.
    • Writes the resulting records to an intermediate JSON file.
  2. Step 2: Transform and Load

    • Reads records from the intermediate JSON file.
    • Transforms the data into a final OutputRecord format.
    • “Loads” the data by logging it to the console.

This showcases how to use different readers, writers, and processors across multiple, sequential steps within a single Job.

use anyhow::Result;
use serde::{{Deserialize, Serialize}};
use spring_batch_rs::{{
core::{{
item::ItemProcessor,
job::{{Job, JobBuilder}},
step::StepBuilder,
}},
item::{{
csv::csv_reader::CsvItemReaderBuilder,
json::json_reader::JsonItemReaderBuilder,
json::json_writer::JsonItemWriterBuilder,
logger::LoggerItemWriter,
}},
BatchError,
}};
use std::env::temp_dir;
use std::fs::File;
use std::io::Write;
// --- Data Structures ---
// Input record from CSV.
#[derive(Deserialize, Clone, Debug)]
struct InputRecord {
id: u32,
name: String,
category: String,
value: f64,
}
// Intermediate record, stored in JSON. `Serialize` is needed for the writer.
#[derive(Deserialize, Serialize, Clone, Debug)]
struct IntermediateRecord {
id: u32,
name: String,
value: f64,
}
// Final record, after transformation.
#[derive(Serialize, Clone, Debug)]
struct OutputRecord {
record_id: String,
description: String,
processed_value: f64,
}
// --- Step 1: CSV to JSON with Filtering ---
// A custom processor that filters out records from the "Test" category.
struct FilterProcessor;
impl ItemProcessor<InputRecord, IntermediateRecord> for FilterProcessor {
fn process(&self, item: InputRecord) -> ItemProcessorResult<IntermediateRecord> {
if item.category == "Test" {
println!("Filtering out test record: {{}}", item.name);
return Ok(None); // Returning None filters the item.
}
Ok(Some(IntermediateRecord {
id: item.id,
name: item.name.clone(),
value: item.value,
}))
}
}
// --- Step 2: JSON to Console Log with Transformation ---
// A custom processor that transforms an IntermediateRecord into an OutputRecord.
struct TransformProcessor;
impl ItemProcessor<IntermediateRecord, OutputRecord> for TransformProcessor {
fn process(&self, item: IntermediateRecord) -> ItemProcessorResult<OutputRecord> {
Ok(Some(OutputRecord {
record_id: format!("REC-{{}}", item.id),
description: format!("{{}} (Value: {{}})", item.name, item.value),
processed_value: item.value * 1.1, // Apply a 10% markup.
}))
}
}
fn main() -> Result<()> {
println!("--- Starting Multi-Step ETL Job Example ---");
// --- Create dummy CSV file for input ---
let csv_path = temp_dir().join("input_data.csv");
let mut file = File::create(&csv_path)?;
file.write_all(b"id,name,category,value\n")?;
file.write_all(b"1,Product A,Live,100.0\n")?;
file.write_all(b"2,Product B,Test,200.0\n")?;
file.write_all(b"3,Product C,Live,300.0\n")?;
println!("Created dummy input file: {{:?}}", csv_path);
// Define path for the intermediate JSON file.
let intermediate_json_path = temp_dir().join("intermediate_data.json");
// --- 1. Configure and build Step 1 ---
println!("\n--- Configuring Step 1: CSV to JSON ---");
let csv_reader = CsvItemReaderBuilder::<InputRecord>::new()
.has_headers(true)
.from_path(&csv_path);
let json_writer = JsonItemWriterBuilder::<IntermediateRecord>::new().from_path(&intermediate_json_path);
let filter_processor = FilterProcessor;
let step1 = StepBuilder::new("csv_to_json_filter_step")
.chunk::<InputRecord, IntermediateRecord>(10)
.reader(&csv_reader)
.processor(&filter_processor)
.writer(&json_writer)
.build();
// --- 2. Configure and build Step 2 ---
println!("\n--- Configuring Step 2: JSON to Console ---");
let json_reader = JsonItemReaderBuilder::<IntermediateRecord>::new()
.from_path(&intermediate_json_path);
let transform_processor = TransformProcessor;
// This writer logs the final records to the console.
let log_writer = LoggerItemWriter::new();
let step2 = StepBuilder::new("json_to_log_transform_step")
.chunk::<IntermediateRecord, OutputRecord>(10)
.reader(&json_reader)
.processor(&transform_processor)
.writer(&log_writer)
.build();
// --- 3. Configure and Run the Job ---
println!("\n--- Configuring and Running the Job ---");
let job = JobBuilder::new()
.start(&step1) // Start with step 1
.next(&step2) // Proceed to step 2 after step 1 succeeds
.build();
let job_execution = job.run();
// --- 4. Verify results ---
assert!(job_execution.is_success());
let step1_execution = job.get_step_execution("csv_to_json_filter_step").unwrap();
assert_eq!(step1_execution.read_count, 3);
assert_eq!(step1_execution.write_count, 2); // 1 record was filtered.
let step2_execution = job.get_step_execution("json_to_log_transform_step").unwrap();
assert_eq!(step2_execution.read_count, 2);
assert_eq!(step2_execution.write_count, 2);
println!("\n--- Multi-Step ETL Job Finished Successfully! ---");
println!("Intermediate file is at: {{:?}}", intermediate_json_path);
Ok(())
}