Skip Strategy
Best for: Data quality issues, validation failures
- Skip invalid records
- Continue processing
- Log errors for review
Batch jobs process thousands or millions of records. Robust error handling ensures your jobs can recover from failures gracefully without losing progress.
graph TB
Error[Error Occurs] --> Type{Error Type?}
Type -->|Data Error| Skip[Skip Strategy<br/>Continue Processing]
Type -->|Transient Error| Retry[Retry Strategy<br/>Attempt Again]
Type -->|Fatal Error| Fail[Fail-Fast Strategy<br/>Stop Immediately]
Skip --> Log[Log Skipped Item]
Retry --> Backoff[Exponential Backoff]
Fail --> Rollback[Rollback Transaction]
Log --> Check{Skip Limit<br/>Reached?}
Check -->|No| Continue[Continue Job]
Check -->|Yes| Fail
Backoff --> RetryCheck{Retry Limit<br/>Reached?}
RetryCheck -->|No| Retry
RetryCheck -->|Yes| Fail
style Skip fill:#10b981,color:#fff
style Retry fill:#f59e0b,color:#fff
style Fail fill:#ef4444,color:#fff
Skip Strategy
Best for: Data quality issues, validation failures
Retry Strategy
Best for: Transient failures, network issues
Fail-Fast Strategy
Best for: Critical errors, data integrity
use spring_batch_rs::core::step::StepBuilder;use spring_batch_rs::item::csv::CsvItemReaderBuilder;use spring_batch_rs::BatchError;use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]struct Transaction { id: u32, amount: f64, account: String,}
fn build_fault_tolerant_step() -> Step { let reader = CsvItemReaderBuilder::<Transaction>::new() .has_headers(true) .from_path("transactions.csv")?;
let writer = DatabaseWriter::new(pool);
StepBuilder::new("process-transactions") .chunk(100) .reader(&reader) .writer(&writer) .skip_limit(50) // Skip up to 50 invalid records .build()}Behavior:
Skip only specific error types:
use spring_batch_rs::core::step::{StepBuilder, SkipPolicy};use spring_batch_rs::BatchError;
struct SelectiveSkipPolicy;
impl SkipPolicy for SelectiveSkipPolicy { fn should_skip(&self, error: &BatchError, skip_count: usize) -> bool { // Skip data validation errors, but not I/O errors match error { BatchError::ValidationError(_) => skip_count < 100, BatchError::ProcessingError(_) => skip_count < 50, BatchError::IoError(_) => false, // Never skip I/O errors BatchError::DatabaseError(_) => false, // Never skip DB errors _ => false, } }}
fn build_selective_skip_step() -> Step { StepBuilder::new("selective-skip") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) .skip_policy(SelectiveSkipPolicy) .build()}Collect and report all skipped items:
use std::sync::{Arc, Mutex};use spring_batch_rs::core::item::ItemProcessor;use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Clone)]struct Record { id: u32, data: String,}
struct ValidatingProcessor { error_log: Arc<Mutex<Vec<String>>>,}
impl ValidatingProcessor { fn new() -> Self { Self { error_log: Arc::new(Mutex::new(Vec::new())), } }
fn get_errors(&self) -> Vec<String> { self.error_log.lock().unwrap().clone() }}
impl ItemProcessor<Record, Record> for ValidatingProcessor { fn process(&self, record: Record) -> Result<Option<Record>, BatchError> { // Validation logic if record.data.is_empty() { let error_msg = format!("Record {} has empty data", record.id); self.error_log.lock().unwrap().push(error_msg.clone()); return Err(BatchError::ValidationError(error_msg)); }
if record.data.len() > 1000 { let error_msg = format!("Record {} data too long", record.id); self.error_log.lock().unwrap().push(error_msg.clone()); return Err(BatchError::ValidationError(error_msg)); }
Ok(Some(record)) }}
// Usagefn process_with_error_logging() -> Result<(), Box<dyn std::error::Error>> { let processor = ValidatingProcessor::new(); let error_log = processor.error_log.clone();
let step = StepBuilder::new("validate-records") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) .skip_limit(100) .build();
let execution = step.execute()?;
// Write error log let errors = error_log.lock().unwrap(); if !errors.is_empty() { std::fs::write("skipped_records.log", errors.join("\n"))?; println!("Skipped {} records - see skipped_records.log", errors.len()); }
println!("Processed: {}", execution.write_count); println!("Skipped: {}", execution.skip_count);
Ok(())}use spring_batch_rs::core::step::StepBuilder;
fn build_retry_step() -> Step { StepBuilder::new("process-with-retry") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) .retry_limit(3) // Retry each failed item up to 3 times .build()}Behavior:
sequenceDiagram
participant Step
participant Processor
participant ErrorHandler
Step->>Processor: process(item)
Processor-->>ErrorHandler: Error
ErrorHandler->>ErrorHandler: Attempt 1 failed
ErrorHandler->>Processor: retry
Processor-->>ErrorHandler: Error
ErrorHandler->>ErrorHandler: Attempt 2 failed
ErrorHandler->>Processor: retry
Processor-->>ErrorHandler: Error
ErrorHandler->>ErrorHandler: Attempt 3 failed
ErrorHandler->>Step: Max retries reached
Step->>Step: Skip or Fail
Add delays between retries to handle transient issues:
use spring_batch_rs::core::item::ItemProcessor;use spring_batch_rs::BatchError;use std::time::Duration;use std::thread;
struct RetryingProcessor<P> { inner: P, max_retries: u32, initial_backoff_ms: u64,}
impl<I, O, P> ItemProcessor<I, O> for RetryingProcessor<P>where P: ItemProcessor<I, O>, I: Clone,{ fn process(&self, item: I) -> Result<Option<O>, BatchError> { let mut attempts = 0; let mut backoff = self.initial_backoff_ms;
loop { match self.inner.process(item.clone()) { Ok(result) => return Ok(result), Err(e) if Self::is_retryable(&e) && attempts < self.max_retries => { attempts += 1; println!("Retry attempt {} after {} ms", attempts, backoff);
thread::sleep(Duration::from_millis(backoff));
// Exponential backoff: 100ms, 200ms, 400ms, 800ms... backoff *= 2; } Err(e) => return Err(e), } } }}
impl<P> RetryingProcessor<P> { fn is_retryable(error: &BatchError) -> bool { matches!( error, BatchError::NetworkError(_) | BatchError::TemporaryError(_) | BatchError::TimeoutError(_) ) }}
// Usagefn build_step_with_backoff() -> Step { let processor = RetryingProcessor { inner: MyProcessor::new(), max_retries: 5, initial_backoff_ms: 100, };
StepBuilder::new("retry-with-backoff") .chunk(50) .reader(&reader) .processor(&processor) .writer(&writer) .build()}Stop retrying if too many consecutive failures occur:
use std::sync::atomic::{AtomicUsize, Ordering};use std::sync::Arc;
struct CircuitBreakerProcessor<P> { inner: P, consecutive_failures: Arc<AtomicUsize>, circuit_breaker_threshold: usize, circuit_open: Arc<AtomicBool>,}
impl<I, O, P> ItemProcessor<I, O> for CircuitBreakerProcessor<P>where P: ItemProcessor<I, O>,{ fn process(&self, item: I) -> Result<Option<O>, BatchError> { // Check if circuit is open if self.circuit_open.load(Ordering::SeqCst) { return Err(BatchError::ProcessingError( "Circuit breaker is open - too many failures".to_string() )); }
match self.inner.process(item) { Ok(result) => { // Reset failure count on success self.consecutive_failures.store(0, Ordering::SeqCst); Ok(result) } Err(e) => { // Increment failure count let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1;
if failures >= self.circuit_breaker_threshold { self.circuit_open.store(true, Ordering::SeqCst); println!("⚠️ Circuit breaker opened after {} failures", failures); }
Err(e) } } }}For operations where any error is unacceptable:
fn build_fail_fast_step() -> Step { StepBuilder::new("critical-processing") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) // No skip_limit - any error fails the job .build()}Validate all data before starting processing:
use spring_batch_rs::core::step::{Tasklet, StepExecution, RepeatStatus};use spring_batch_rs::BatchError;
struct ValidationTasklet { file_path: String,}
impl Tasklet for ValidationTasklet { fn execute(&self, _: &StepExecution) -> Result<RepeatStatus, BatchError> { println!("Validating data file: {}", self.file_path);
let mut reader = csv::Reader::from_path(&self.file_path)?; let mut error_count = 0; let mut errors = Vec::new();
for (line_num, result) in reader.records().enumerate() { match result { Ok(record) => { // Validate record structure if record.len() != 5 { errors.push(format!("Line {}: Expected 5 fields, got {}", line_num + 1, record.len())); error_count += 1; } } Err(e) => { errors.push(format!("Line {}: {}", line_num + 1, e)); error_count += 1; } } }
if error_count > 0 { let error_message = format!( "Validation failed with {} errors:\n{}", error_count, errors.join("\n") ); return Err(BatchError::ValidationError(error_message)); }
println!("✓ Validation successful"); Ok(RepeatStatus::Finished) }}
// Usage: Validate before processingfn build_validated_job() -> Job { let validation_step = StepBuilder::new("validate") .tasklet(&ValidationTasklet { file_path: "data.csv".to_string(), }) .build();
let processing_step = StepBuilder::new("process") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) .build();
JobBuilder::new() .start(&validation_step) // Validate first .next(&processing_step) // Then process .build()}sequenceDiagram
participant Step
participant DB
participant ErrorHandler
Step->>DB: BEGIN TRANSACTION
loop For each chunk
Step->>Step: Read items
Step->>Step: Process items
Step->>DB: Write items
alt All successful
Step->>DB: COMMIT
else Error occurred
Step->>ErrorHandler: Handle error
ErrorHandler->>DB: ROLLBACK
ErrorHandler-->>Step: Skip or Fail
end
end
use spring_batch_rs::core::item::ItemWriter;use spring_batch_rs::BatchError;use sqlx::{PgPool, Postgres, Transaction};
struct TransactionalWriter { pool: PgPool,}
impl<T> ItemWriter<T> for TransactionalWriterwhere T: Serialize,{ fn write(&mut self, items: &[T]) -> Result<(), BatchError> { // Start transaction let mut tx = self.pool.begin().await?;
// Attempt to write all items for item in items { let result = sqlx::query("INSERT INTO table (data) VALUES ($1)") .bind(serde_json::to_value(item)?) .execute(&mut tx) .await;
if let Err(e) = result { // Rollback on any error tx.rollback().await?; return Err(BatchError::DatabaseError(e.to_string())); } }
// Commit if all succeeded tx.commit().await?;
Ok(()) }}Save progress to resume after failure:
use spring_batch_rs::core::step::StepExecution;use std::fs;use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]struct Checkpoint { last_processed_id: u32, processed_count: usize, error_count: usize,}
impl Checkpoint { fn save(&self, path: &str) -> Result<(), BatchError> { let json = serde_json::to_string_pretty(self)?; fs::write(path, json)?; Ok(()) }
fn load(path: &str) -> Result<Self, BatchError> { let json = fs::read_to_string(path)?; Ok(serde_json::from_str(&json)?) }}
struct ResumableReader { checkpoint_file: String, last_id: u32,}
impl ItemReader<Record> for ResumableReader { fn read(&mut self) -> Result<Option<Record>, BatchError> { // Load checkpoint if exists if self.last_id == 0 { if let Ok(checkpoint) = Checkpoint::load(&self.checkpoint_file) { self.last_id = checkpoint.last_processed_id; println!("Resuming from ID: {}", self.last_id); } }
// Read next record after last checkpoint let record = self.fetch_next_record()?;
if let Some(ref rec) = record { // Save checkpoint periodically if rec.id % 1000 == 0 { Checkpoint { last_processed_id: rec.id, processed_count: rec.id as usize, error_count: 0, }.save(&self.checkpoint_file)?; } }
Ok(record) }}Move failed items to a separate queue for later review:
use spring_batch_rs::core::item::{ItemWriter, ItemProcessor};use std::sync::{Arc, Mutex};
struct DeadLetterWriter<T> { file_path: String, failed_items: Arc<Mutex<Vec<T>>>,}
impl<T> DeadLetterWriter<T>where T: Serialize + Clone,{ fn new(file_path: String) -> Self { Self { file_path, failed_items: Arc::new(Mutex::new(Vec::new())), } }
fn add_failed_item(&self, item: T) { self.failed_items.lock().unwrap().push(item); }
fn flush(&self) -> Result<(), BatchError> { let items = self.failed_items.lock().unwrap();
if items.is_empty() { return Ok(()); }
let json = serde_json::to_string_pretty(&*items)?; std::fs::write(&self.file_path, json)?;
println!("Wrote {} failed items to {}", items.len(), self.file_path);
Ok(()) }}
struct FaultTolerantProcessor<P> { inner: P, dead_letter_writer: Arc<DeadLetterWriter<Item>>,}
impl<P> ItemProcessor<Item, Item> for FaultTolerantProcessor<P>where P: ItemProcessor<Item, Item>,{ fn process(&self, item: Item) -> Result<Option<Item>, BatchError> { match self.inner.process(item.clone()) { Ok(result) => Ok(result), Err(e) => { // Log to dead letter queue self.dead_letter_writer.add_failed_item(item); println!("Item added to dead letter queue: {}", e);
// Skip the item Ok(None) } } }}
// Usagefn process_with_dead_letter_queue() -> Result<(), Box<dyn std::error::Error>> { let dlq = Arc::new(DeadLetterWriter::new("failed_items.json".to_string()));
let processor = FaultTolerantProcessor { inner: MyProcessor::new(), dead_letter_writer: dlq.clone(), };
let step = StepBuilder::new("process-with-dlq") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) .build();
step.execute()?;
// Flush dead letter queue dlq.flush()?;
Ok(())}Choose the Right Strategy
Set Appropriate Limits
Log Everything
Test Error Scenarios
graph TB
Start[Error Occurs] --> DataError{Data Quality<br/>Issue?}
DataError -->|Yes| Acceptable{Acceptable<br/>Data Loss?}
Acceptable -->|Yes| UseSkip[Use Skip Strategy<br/>skip_limit]
Acceptable -->|No| UseValidation[Pre-validate Data<br/>Fail-Fast]
DataError -->|No| Transient{Transient<br/>Failure?}
Transient -->|Yes| UseRetry[Use Retry Strategy<br/>retry_limit + backoff]
Transient -->|No| Critical{Critical<br/>Operation?}
Critical -->|Yes| FailFast[Fail-Fast Strategy<br/>No skip/retry]
Critical -->|No| Hybrid[Hybrid Approach<br/>Skip + Retry]
style UseSkip fill:#10b981,color:#fff
style UseRetry fill:#f59e0b,color:#fff
style FailFast fill:#ef4444,color:#fff
style UseValidation fill:#3b82f6,color:#fff
style Hybrid fill:#8b5cf6,color:#fff