Type Transformation
Convert between different types: I → O
Ce contenu n’est pas encore disponible dans votre langue.
The ItemProcessor<I, O> trait defines how to transform items from input type I to output type O. Processors are optional but powerful for data transformation, validation, and filtering.
pub trait ItemProcessor<I, O> { /// Processes an item and returns the transformed result /// /// # Parameters /// - `item`: Reference to the item to process /// /// # Returns /// - `Ok(processed_item)` - Successfully processed /// - `Err(BatchError)` - Processing failed fn process(&self, item: &I) -> Result<O, BatchError>;}pub type ItemProcessorResult<O> = Result<O, BatchError>;Type Transformation
Convert between different types: I → O
Validation
Validate items and reject invalid data
Filtering
Skip items by returning errors that can be handled by fault tolerance
Stateless Design
Processors should be stateless for thread safety
A no-op processor that clones items without transformation.
use spring_batch_rs::core::item::PassThroughProcessor;
#[derive(Clone)]struct Product { id: u32, name: String,}
let processor = PassThroughProcessor::<Product>::new();Use when:
Transform data from one structure to another:
use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};use serde::{Deserialize, Serialize};
#[derive(Deserialize, Clone)]struct RawProduct { id: u32, name: String, price_cents: i64,}
#[derive(Serialize)]struct Product { id: u32, name: String, price_dollars: f64,}
struct ProductTransformer;
impl ItemProcessor<RawProduct, Product> for ProductTransformer { fn process(&self, item: &RawProduct) -> ItemProcessorResult<Product> { Ok(Product { id: item.id, name: item.name.clone(), price_dollars: item.price_cents as f64 / 100.0, }) }}Validate items and reject invalid data:
use spring_batch_rs::error::BatchError;
#[derive(Clone)]struct User { email: String, age: i32,}
struct UserValidator;
impl ItemProcessor<User, User> for UserValidator { fn process(&self, item: &User) -> ItemProcessorResult<User> { // Validate email format if !item.email.contains('@') { return Err(BatchError::ItemProcessor( format!("Invalid email: {}", item.email) )); }
// Validate age range if item.age < 0 || item.age > 150 { return Err(BatchError::ItemProcessor( format!("Invalid age: {}", item.age) )); }
Ok(item.clone()) }}Add additional information to items:
use std::collections::HashMap;
#[derive(Clone)]struct Order { id: u32, product_id: u32, quantity: u32,}
#[derive(Serialize)]struct EnrichedOrder { id: u32, product_id: u32, product_name: String, quantity: u32, unit_price: f64, total_price: f64,}
struct OrderEnricher { product_catalog: HashMap<u32, (String, f64)>, // (name, price)}
impl ItemProcessor<Order, EnrichedOrder> for OrderEnricher { fn process(&self, item: &Order) -> ItemProcessorResult<EnrichedOrder> { let (product_name, unit_price) = self.product_catalog .get(&item.product_id) .cloned() .ok_or_else(|| BatchError::ItemProcessor( format!("Unknown product: {}", item.product_id) ))?;
let total_price = unit_price * item.quantity as f64;
Ok(EnrichedOrder { id: item.id, product_id: item.product_id, product_name, quantity: item.quantity, unit_price, total_price, }) }}Clean and normalize data:
struct DataCleanser;
impl ItemProcessor<String, String> for DataCleanser { fn process(&self, item: &String) -> ItemProcessorResult<String> { let cleaned = item .trim() // Remove whitespace .to_lowercase() // Normalize case .replace(" ", " ") // Remove double spaces .chars() .filter(|c| c.is_alphanumeric() || c.is_whitespace()) .collect(); // Remove special chars
Ok(cleaned) }}Apply different logic based on item properties:
#[derive(Clone)]struct Transaction { amount: f64, category: String,}
#[derive(Serialize)]struct ProcessedTransaction { amount: f64, category: String, tax: f64, final_amount: f64,}
struct TaxCalculator { default_tax_rate: f64,}
impl ItemProcessor<Transaction, ProcessedTransaction> for TaxCalculator { fn process(&self, item: &Transaction) -> ItemProcessorResult<ProcessedTransaction> { // Different tax rates by category let tax_rate = match item.category.as_str() { "food" => 0.05, "electronics" => 0.15, "books" => 0.0, // No tax on books _ => self.default_tax_rate, };
let tax = item.amount * tax_rate; let final_amount = item.amount + tax;
Ok(ProcessedTransaction { amount: item.amount, category: item.category.clone(), tax, final_amount, }) }}Common string operations:
struct StringProcessor;
impl ItemProcessor<String, String> for StringProcessor { fn process(&self, item: &String) -> ItemProcessorResult<String> { Ok(item.to_uppercase() .replace(" ", "_") .trim() .to_string()) }}You can chain multiple processors together:
use spring_batch_rs::core::item::ItemProcessor;
struct ProcessorChain<I, M, O> { first: Box<dyn ItemProcessor<I, M>>, second: Box<dyn ItemProcessor<M, O>>,}
impl<I, M, O> ItemProcessor<I, O> for ProcessorChain<I, M, O> { fn process(&self, item: &I) -> ItemProcessorResult<O> { let intermediate = self.first.process(item)?; self.second.process(&intermediate) }}When you need state (use carefully):
use std::sync::Mutex;
struct Counter { count: Mutex<u64>,}
impl ItemProcessor<String, String> for Counter { fn process(&self, item: &String) -> ItemProcessorResult<String> { let mut count = self.count.lock().unwrap(); *count += 1;
Ok(format!("{}: {}", *count, item)) }}If you need async operations:
use tokio::runtime::Runtime;
struct ApiEnricher { runtime: Runtime,}
impl ItemProcessor<String, String> for ApiEnricher { fn process(&self, item: &String) -> ItemProcessorResult<String> { // Block on async operation self.runtime.block_on(async { // Call external API let response = fetch_api_data(item).await?; Ok(response) }) }}
async fn fetch_api_data(input: &str) -> Result<String, BatchError> { // Async API call Ok(format!("enriched: {}", input))}Use with skip_limit() to continue processing:
impl ItemProcessor<Data, Data> for MyProcessor { fn process(&self, item: &Data) -> ItemProcessorResult<Data> { if item.is_invalid() { return Err(BatchError::ItemProcessor( "Invalid data".to_string() )); } Ok(item.clone()) }}
// In step configuration:let step = StepBuilder::new("process") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) .skip_limit(10) // Skip up to 10 invalid items .build();Stop the job immediately:
impl ItemProcessor<Data, Data> for MyProcessor { fn process(&self, item: &Data) -> ItemProcessorResult<Data> { if critical_failure() { return Err(BatchError::Fatal( "Critical system error".to_string() )); } Ok(item.clone()) }}Processors are easy to unit test:
#[cfg(test)]mod tests { use super::*;
#[test] fn test_product_transformer() { let processor = ProductTransformer;
let raw = RawProduct { id: 1, name: "Widget".to_string(), price_cents: 1999, };
let result = processor.process(&raw).unwrap();
assert_eq!(result.id, 1); assert_eq!(result.name, "Widget"); assert_eq!(result.price_dollars, 19.99); }
#[test] fn test_validation_failure() { let processor = UserValidator;
let invalid_user = User { email: "not-an-email".to_string(), age: 25, };
assert!(processor.process(&invalid_user).is_err()); }}#[derive(Deserialize, Clone)]struct SourceRecord { customer_id: String, amount: String, date: String,}
#[derive(Serialize)]struct TargetRecord { customer_id: i64, amount_cents: i64, date: chrono::NaiveDate,}
struct ETLProcessor;
impl ItemProcessor<SourceRecord, TargetRecord> for ETLProcessor { fn process(&self, item: &SourceRecord) -> ItemProcessorResult<TargetRecord> { // Parse customer ID let customer_id = item.customer_id.parse::<i64>() .map_err(|e| BatchError::ItemProcessor( format!("Invalid customer_id: {}", e) ))?;
// Parse amount let amount_cents = (item.amount.parse::<f64>() .map_err(|e| BatchError::ItemProcessor( format!("Invalid amount: {}", e) ))? * 100.0) as i64;
// Parse date let date = chrono::NaiveDate::parse_from_str(&item.date, "%Y-%m-%d") .map_err(|e| BatchError::ItemProcessor( format!("Invalid date: {}", e) ))?;
Ok(TargetRecord { customer_id, amount_cents, date, }) }}use regex::Regex;
struct PIIRedactor { email_pattern: Regex, phone_pattern: Regex,}
impl PIIRedactor { fn new() -> Self { Self { email_pattern: Regex::new(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b").unwrap(), phone_pattern: Regex::new(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b").unwrap(), } }}
impl ItemProcessor<String, String> for PIIRedactor { fn process(&self, item: &String) -> ItemProcessorResult<String> { let mut redacted = item.clone();
// Redact emails redacted = self.email_pattern.replace_all(&redacted, "[EMAIL]").to_string();
// Redact phone numbers redacted = self.phone_pattern.replace_all(&redacted, "[PHONE]").to_string();
Ok(redacted) }}Keep It Simple
Processors should do one thing well. Chain multiple processors for complex transformations.
Stateless Design
Prefer stateless processors for better parallelization and testing.
Immutability
Work with references and return new instances rather than mutating input.
Error Context
Provide detailed error messages including which field or validation failed.
Cow<str> for strings that might not need copyingArc for shared read-only data