Skip to content

ItemProcessor API

The ItemProcessor<I, O> trait defines how to transform items from input type I to output type O. Processors are optional but powerful for data transformation, validation, and filtering.

pub trait ItemProcessor<I, O> {
/// Processes an item and returns the transformed result
///
/// # Parameters
/// - `item`: Reference to the item to process
///
/// # Returns
/// - `Ok(processed_item)` - Successfully processed
/// - `Err(BatchError)` - Processing failed
fn process(&self, item: &I) -> Result<O, BatchError>;
}
pub type ItemProcessorResult<O> = Result<O, BatchError>;

Type Transformation

Convert between different types: IO

Validation

Validate items and reject invalid data

Filtering

Skip items by returning errors that can be handled by fault tolerance

Stateless Design

Processors should be stateless for thread safety


A no-op processor that clones items without transformation.

use spring_batch_rs::core::item::PassThroughProcessor;
#[derive(Clone)]
struct Product {
id: u32,
name: String,
}
let processor = PassThroughProcessor::<Product>::new();

Use when:

  • You don’t need transformation
  • You want to use the same type for input and output
  • Testing reader → writer pipelines

Transform data from one structure to another:

use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Clone)]
struct RawProduct {
id: u32,
name: String,
price_cents: i64,
}
#[derive(Serialize)]
struct Product {
id: u32,
name: String,
price_dollars: f64,
}
struct ProductTransformer;
impl ItemProcessor<RawProduct, Product> for ProductTransformer {
fn process(&self, item: &RawProduct) -> ItemProcessorResult<Product> {
Ok(Product {
id: item.id,
name: item.name.clone(),
price_dollars: item.price_cents as f64 / 100.0,
})
}
}

Validate items and reject invalid data:

use spring_batch_rs::error::BatchError;
#[derive(Clone)]
struct User {
email: String,
age: i32,
}
struct UserValidator;
impl ItemProcessor<User, User> for UserValidator {
fn process(&self, item: &User) -> ItemProcessorResult<User> {
// Validate email format
if !item.email.contains('@') {
return Err(BatchError::ItemProcessor(
format!("Invalid email: {}", item.email)
));
}
// Validate age range
if item.age < 0 || item.age > 150 {
return Err(BatchError::ItemProcessor(
format!("Invalid age: {}", item.age)
));
}
Ok(item.clone())
}
}

Add additional information to items:

use std::collections::HashMap;
#[derive(Clone)]
struct Order {
id: u32,
product_id: u32,
quantity: u32,
}
#[derive(Serialize)]
struct EnrichedOrder {
id: u32,
product_id: u32,
product_name: String,
quantity: u32,
unit_price: f64,
total_price: f64,
}
struct OrderEnricher {
product_catalog: HashMap<u32, (String, f64)>, // (name, price)
}
impl ItemProcessor<Order, EnrichedOrder> for OrderEnricher {
fn process(&self, item: &Order) -> ItemProcessorResult<EnrichedOrder> {
let (product_name, unit_price) = self.product_catalog
.get(&item.product_id)
.cloned()
.ok_or_else(|| BatchError::ItemProcessor(
format!("Unknown product: {}", item.product_id)
))?;
let total_price = unit_price * item.quantity as f64;
Ok(EnrichedOrder {
id: item.id,
product_id: item.product_id,
product_name,
quantity: item.quantity,
unit_price,
total_price,
})
}
}

Clean and normalize data:

struct DataCleanser;
impl ItemProcessor<String, String> for DataCleanser {
fn process(&self, item: &String) -> ItemProcessorResult<String> {
let cleaned = item
.trim() // Remove whitespace
.to_lowercase() // Normalize case
.replace(" ", " ") // Remove double spaces
.chars()
.filter(|c| c.is_alphanumeric() || c.is_whitespace())
.collect(); // Remove special chars
Ok(cleaned)
}
}

Apply different logic based on item properties:

#[derive(Clone)]
struct Transaction {
amount: f64,
category: String,
}
#[derive(Serialize)]
struct ProcessedTransaction {
amount: f64,
category: String,
tax: f64,
final_amount: f64,
}
struct TaxCalculator {
default_tax_rate: f64,
}
impl ItemProcessor<Transaction, ProcessedTransaction> for TaxCalculator {
fn process(&self, item: &Transaction) -> ItemProcessorResult<ProcessedTransaction> {
// Different tax rates by category
let tax_rate = match item.category.as_str() {
"food" => 0.05,
"electronics" => 0.15,
"books" => 0.0, // No tax on books
_ => self.default_tax_rate,
};
let tax = item.amount * tax_rate;
let final_amount = item.amount + tax;
Ok(ProcessedTransaction {
amount: item.amount,
category: item.category.clone(),
tax,
final_amount,
})
}
}

Common string operations:

struct StringProcessor;
impl ItemProcessor<String, String> for StringProcessor {
fn process(&self, item: &String) -> ItemProcessorResult<String> {
Ok(item.to_uppercase()
.replace(" ", "_")
.trim()
.to_string())
}
}

You can chain multiple processors together:

use spring_batch_rs::core::item::ItemProcessor;
struct ProcessorChain<I, M, O> {
first: Box<dyn ItemProcessor<I, M>>,
second: Box<dyn ItemProcessor<M, O>>,
}
impl<I, M, O> ItemProcessor<I, O> for ProcessorChain<I, M, O> {
fn process(&self, item: &I) -> ItemProcessorResult<O> {
let intermediate = self.first.process(item)?;
self.second.process(&intermediate)
}
}

Stateful Processing with Interior Mutability

Section titled “Stateful Processing with Interior Mutability”

When you need state (use carefully):

use std::sync::Mutex;
struct Counter {
count: Mutex<u64>,
}
impl ItemProcessor<String, String> for Counter {
fn process(&self, item: &String) -> ItemProcessorResult<String> {
let mut count = self.count.lock().unwrap();
*count += 1;
Ok(format!("{}: {}", *count, item))
}
}

If you need async operations:

use tokio::runtime::Runtime;
struct ApiEnricher {
runtime: Runtime,
}
impl ItemProcessor<String, String> for ApiEnricher {
fn process(&self, item: &String) -> ItemProcessorResult<String> {
// Block on async operation
self.runtime.block_on(async {
// Call external API
let response = fetch_api_data(item).await?;
Ok(response)
})
}
}
async fn fetch_api_data(input: &str) -> Result<String, BatchError> {
// Async API call
Ok(format!("enriched: {}", input))
}

Use with skip_limit() to continue processing:

impl ItemProcessor<Data, Data> for MyProcessor {
fn process(&self, item: &Data) -> ItemProcessorResult<Data> {
if item.is_invalid() {
return Err(BatchError::ItemProcessor(
"Invalid data".to_string()
));
}
Ok(item.clone())
}
}
// In step configuration:
let step = StepBuilder::new("process")
.chunk(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(10) // Skip up to 10 invalid items
.build();

Stop the job immediately:

impl ItemProcessor<Data, Data> for MyProcessor {
fn process(&self, item: &Data) -> ItemProcessorResult<Data> {
if critical_failure() {
return Err(BatchError::Fatal(
"Critical system error".to_string()
));
}
Ok(item.clone())
}
}

Processors are easy to unit test:

#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_product_transformer() {
let processor = ProductTransformer;
let raw = RawProduct {
id: 1,
name: "Widget".to_string(),
price_cents: 1999,
};
let result = processor.process(&raw).unwrap();
assert_eq!(result.id, 1);
assert_eq!(result.name, "Widget");
assert_eq!(result.price_dollars, 19.99);
}
#[test]
fn test_validation_failure() {
let processor = UserValidator;
let invalid_user = User {
email: "not-an-email".to_string(),
age: 25,
};
assert!(processor.process(&invalid_user).is_err());
}
}

#[derive(Deserialize, Clone)]
struct SourceRecord {
customer_id: String,
amount: String,
date: String,
}
#[derive(Serialize)]
struct TargetRecord {
customer_id: i64,
amount_cents: i64,
date: chrono::NaiveDate,
}
struct ETLProcessor;
impl ItemProcessor<SourceRecord, TargetRecord> for ETLProcessor {
fn process(&self, item: &SourceRecord) -> ItemProcessorResult<TargetRecord> {
// Parse customer ID
let customer_id = item.customer_id.parse::<i64>()
.map_err(|e| BatchError::ItemProcessor(
format!("Invalid customer_id: {}", e)
))?;
// Parse amount
let amount_cents = (item.amount.parse::<f64>()
.map_err(|e| BatchError::ItemProcessor(
format!("Invalid amount: {}", e)
))? * 100.0) as i64;
// Parse date
let date = chrono::NaiveDate::parse_from_str(&item.date, "%Y-%m-%d")
.map_err(|e| BatchError::ItemProcessor(
format!("Invalid date: {}", e)
))?;
Ok(TargetRecord {
customer_id,
amount_cents,
date,
})
}
}
use regex::Regex;
struct PIIRedactor {
email_pattern: Regex,
phone_pattern: Regex,
}
impl PIIRedactor {
fn new() -> Self {
Self {
email_pattern: Regex::new(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b").unwrap(),
phone_pattern: Regex::new(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b").unwrap(),
}
}
}
impl ItemProcessor<String, String> for PIIRedactor {
fn process(&self, item: &String) -> ItemProcessorResult<String> {
let mut redacted = item.clone();
// Redact emails
redacted = self.email_pattern.replace_all(&redacted, "[EMAIL]").to_string();
// Redact phone numbers
redacted = self.phone_pattern.replace_all(&redacted, "[PHONE]").to_string();
Ok(redacted)
}
}

Keep It Simple

Processors should do one thing well. Chain multiple processors for complex transformations.

Stateless Design

Prefer stateless processors for better parallelization and testing.

Immutability

Work with references and return new instances rather than mutating input.

Error Context

Provide detailed error messages including which field or validation failed.

  • Avoid cloning large structures unnecessarily
  • Use Cow<str> for strings that might not need copying
  • Consider Arc for shared read-only data