Skip to content

Error Handling

Batch jobs process thousands or millions of records. Robust error handling ensures your jobs can recover from failures gracefully without losing progress.

graph TB
    Error[Error Occurs] --> Type{Error Type?}

    Type -->|Data Error| Skip[Skip Strategy<br/>Continue Processing]
    Type -->|Transient Error| Retry[Retry Strategy<br/>Attempt Again]
    Type -->|Fatal Error| Fail[Fail-Fast Strategy<br/>Stop Immediately]

    Skip --> Log[Log Skipped Item]
    Retry --> Backoff[Exponential Backoff]
    Fail --> Rollback[Rollback Transaction]

    Log --> Check{Skip Limit<br/>Reached?}
    Check -->|No| Continue[Continue Job]
    Check -->|Yes| Fail

    Backoff --> RetryCheck{Retry Limit<br/>Reached?}
    RetryCheck -->|No| Retry
    RetryCheck -->|Yes| Fail

    style Skip fill:#10b981,color:#fff
    style Retry fill:#f59e0b,color:#fff
    style Fail fill:#ef4444,color:#fff

Skip Strategy

Best for: Data quality issues, validation failures

  • Skip invalid records
  • Continue processing
  • Log errors for review

Retry Strategy

Best for: Transient failures, network issues

  • Retry failed operations
  • Exponential backoff
  • Limit retry attempts

Fail-Fast Strategy

Best for: Critical errors, data integrity

  • Stop immediately
  • Rollback transaction
  • Preserve data consistency
use spring_batch_rs::core::step::StepBuilder;
use spring_batch_rs::item::csv::CsvItemReaderBuilder;
use spring_batch_rs::BatchError;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
struct Transaction {
id: u32,
amount: f64,
account: String,
}
fn build_fault_tolerant_step() -> Step {
let reader = CsvItemReaderBuilder::<Transaction>::new()
.has_headers(true)
.from_path("transactions.csv")?;
let writer = DatabaseWriter::new(pool);
StepBuilder::new("process-transactions")
.chunk(100)
.reader(&reader)
.writer(&writer)
.skip_limit(50) // Skip up to 50 invalid records
.build()
}

Behavior:

  • First 50 errors → skip and continue
  • 51st error → job fails
  • All skipped items are logged

Skip only specific error types:

use spring_batch_rs::core::step::{StepBuilder, SkipPolicy};
use spring_batch_rs::BatchError;
struct SelectiveSkipPolicy;
impl SkipPolicy for SelectiveSkipPolicy {
fn should_skip(&self, error: &BatchError, skip_count: usize) -> bool {
// Skip data validation errors, but not I/O errors
match error {
BatchError::ValidationError(_) => skip_count < 100,
BatchError::ProcessingError(_) => skip_count < 50,
BatchError::IoError(_) => false, // Never skip I/O errors
BatchError::DatabaseError(_) => false, // Never skip DB errors
_ => false,
}
}
}
fn build_selective_skip_step() -> Step {
StepBuilder::new("selective-skip")
.chunk(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_policy(SelectiveSkipPolicy)
.build()
}

Collect and report all skipped items:

use std::sync::{Arc, Mutex};
use spring_batch_rs::core::item::ItemProcessor;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Clone)]
struct Record {
id: u32,
data: String,
}
struct ValidatingProcessor {
error_log: Arc<Mutex<Vec<String>>>,
}
impl ValidatingProcessor {
fn new() -> Self {
Self {
error_log: Arc::new(Mutex::new(Vec::new())),
}
}
fn get_errors(&self) -> Vec<String> {
self.error_log.lock().unwrap().clone()
}
}
impl ItemProcessor<Record, Record> for ValidatingProcessor {
fn process(&self, record: Record) -> Result<Option<Record>, BatchError> {
// Validation logic
if record.data.is_empty() {
let error_msg = format!("Record {} has empty data", record.id);
self.error_log.lock().unwrap().push(error_msg.clone());
return Err(BatchError::ValidationError(error_msg));
}
if record.data.len() > 1000 {
let error_msg = format!("Record {} data too long", record.id);
self.error_log.lock().unwrap().push(error_msg.clone());
return Err(BatchError::ValidationError(error_msg));
}
Ok(Some(record))
}
}
// Usage
fn process_with_error_logging() -> Result<(), Box<dyn std::error::Error>> {
let processor = ValidatingProcessor::new();
let error_log = processor.error_log.clone();
let step = StepBuilder::new("validate-records")
.chunk(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(100)
.build();
let execution = step.execute()?;
// Write error log
let errors = error_log.lock().unwrap();
if !errors.is_empty() {
std::fs::write("skipped_records.log", errors.join("\n"))?;
println!("Skipped {} records - see skipped_records.log", errors.len());
}
println!("Processed: {}", execution.write_count);
println!("Skipped: {}", execution.skip_count);
Ok(())
}
use spring_batch_rs::core::step::StepBuilder;
fn build_retry_step() -> Step {
StepBuilder::new("process-with-retry")
.chunk(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.retry_limit(3) // Retry each failed item up to 3 times
.build()
}

Behavior:

sequenceDiagram
    participant Step
    participant Processor
    participant ErrorHandler

    Step->>Processor: process(item)
    Processor-->>ErrorHandler: Error
    ErrorHandler->>ErrorHandler: Attempt 1 failed
    ErrorHandler->>Processor: retry
    Processor-->>ErrorHandler: Error
    ErrorHandler->>ErrorHandler: Attempt 2 failed
    ErrorHandler->>Processor: retry
    Processor-->>ErrorHandler: Error
    ErrorHandler->>ErrorHandler: Attempt 3 failed
    ErrorHandler->>Step: Max retries reached
    Step->>Step: Skip or Fail

Add delays between retries to handle transient issues:

use spring_batch_rs::core::item::ItemProcessor;
use spring_batch_rs::BatchError;
use std::time::Duration;
use std::thread;
struct RetryingProcessor<P> {
inner: P,
max_retries: u32,
initial_backoff_ms: u64,
}
impl<I, O, P> ItemProcessor<I, O> for RetryingProcessor<P>
where
P: ItemProcessor<I, O>,
I: Clone,
{
fn process(&self, item: I) -> Result<Option<O>, BatchError> {
let mut attempts = 0;
let mut backoff = self.initial_backoff_ms;
loop {
match self.inner.process(item.clone()) {
Ok(result) => return Ok(result),
Err(e) if Self::is_retryable(&e) && attempts < self.max_retries => {
attempts += 1;
println!("Retry attempt {} after {} ms", attempts, backoff);
thread::sleep(Duration::from_millis(backoff));
// Exponential backoff: 100ms, 200ms, 400ms, 800ms...
backoff *= 2;
}
Err(e) => return Err(e),
}
}
}
}
impl<P> RetryingProcessor<P> {
fn is_retryable(error: &BatchError) -> bool {
matches!(
error,
BatchError::NetworkError(_) |
BatchError::TemporaryError(_) |
BatchError::TimeoutError(_)
)
}
}
// Usage
fn build_step_with_backoff() -> Step {
let processor = RetryingProcessor {
inner: MyProcessor::new(),
max_retries: 5,
initial_backoff_ms: 100,
};
StepBuilder::new("retry-with-backoff")
.chunk(50)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build()
}

Stop retrying if too many consecutive failures occur:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
struct CircuitBreakerProcessor<P> {
inner: P,
consecutive_failures: Arc<AtomicUsize>,
circuit_breaker_threshold: usize,
circuit_open: Arc<AtomicBool>,
}
impl<I, O, P> ItemProcessor<I, O> for CircuitBreakerProcessor<P>
where
P: ItemProcessor<I, O>,
{
fn process(&self, item: I) -> Result<Option<O>, BatchError> {
// Check if circuit is open
if self.circuit_open.load(Ordering::SeqCst) {
return Err(BatchError::ProcessingError(
"Circuit breaker is open - too many failures".to_string()
));
}
match self.inner.process(item) {
Ok(result) => {
// Reset failure count on success
self.consecutive_failures.store(0, Ordering::SeqCst);
Ok(result)
}
Err(e) => {
// Increment failure count
let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1;
if failures >= self.circuit_breaker_threshold {
self.circuit_open.store(true, Ordering::SeqCst);
println!("⚠️ Circuit breaker opened after {} failures", failures);
}
Err(e)
}
}
}
}

For operations where any error is unacceptable:

fn build_fail_fast_step() -> Step {
StepBuilder::new("critical-processing")
.chunk(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
// No skip_limit - any error fails the job
.build()
}

Validate all data before starting processing:

use spring_batch_rs::core::step::{Tasklet, StepExecution, RepeatStatus};
use spring_batch_rs::BatchError;
struct ValidationTasklet {
file_path: String,
}
impl Tasklet for ValidationTasklet {
fn execute(&self, _: &StepExecution) -> Result<RepeatStatus, BatchError> {
println!("Validating data file: {}", self.file_path);
let mut reader = csv::Reader::from_path(&self.file_path)?;
let mut error_count = 0;
let mut errors = Vec::new();
for (line_num, result) in reader.records().enumerate() {
match result {
Ok(record) => {
// Validate record structure
if record.len() != 5 {
errors.push(format!("Line {}: Expected 5 fields, got {}", line_num + 1, record.len()));
error_count += 1;
}
}
Err(e) => {
errors.push(format!("Line {}: {}", line_num + 1, e));
error_count += 1;
}
}
}
if error_count > 0 {
let error_message = format!(
"Validation failed with {} errors:\n{}",
error_count,
errors.join("\n")
);
return Err(BatchError::ValidationError(error_message));
}
println!("✓ Validation successful");
Ok(RepeatStatus::Finished)
}
}
// Usage: Validate before processing
fn build_validated_job() -> Job {
let validation_step = StepBuilder::new("validate")
.tasklet(&ValidationTasklet {
file_path: "data.csv".to_string(),
})
.build();
let processing_step = StepBuilder::new("process")
.chunk(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
JobBuilder::new()
.start(&validation_step) // Validate first
.next(&processing_step) // Then process
.build()
}
sequenceDiagram
    participant Step
    participant DB
    participant ErrorHandler

    Step->>DB: BEGIN TRANSACTION

    loop For each chunk
        Step->>Step: Read items
        Step->>Step: Process items
        Step->>DB: Write items

        alt All successful
            Step->>DB: COMMIT
        else Error occurred
            Step->>ErrorHandler: Handle error
            ErrorHandler->>DB: ROLLBACK
            ErrorHandler-->>Step: Skip or Fail
        end
    end
use spring_batch_rs::core::item::ItemWriter;
use spring_batch_rs::BatchError;
use sqlx::{PgPool, Postgres, Transaction};
struct TransactionalWriter {
pool: PgPool,
}
impl<T> ItemWriter<T> for TransactionalWriter
where
T: Serialize,
{
fn write(&mut self, items: &[T]) -> Result<(), BatchError> {
// Start transaction
let mut tx = self.pool.begin().await?;
// Attempt to write all items
for item in items {
let result = sqlx::query("INSERT INTO table (data) VALUES ($1)")
.bind(serde_json::to_value(item)?)
.execute(&mut tx)
.await;
if let Err(e) = result {
// Rollback on any error
tx.rollback().await?;
return Err(BatchError::DatabaseError(e.to_string()));
}
}
// Commit if all succeeded
tx.commit().await?;
Ok(())
}
}

Save progress to resume after failure:

use spring_batch_rs::core::step::StepExecution;
use std::fs;
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
struct Checkpoint {
last_processed_id: u32,
processed_count: usize,
error_count: usize,
}
impl Checkpoint {
fn save(&self, path: &str) -> Result<(), BatchError> {
let json = serde_json::to_string_pretty(self)?;
fs::write(path, json)?;
Ok(())
}
fn load(path: &str) -> Result<Self, BatchError> {
let json = fs::read_to_string(path)?;
Ok(serde_json::from_str(&json)?)
}
}
struct ResumableReader {
checkpoint_file: String,
last_id: u32,
}
impl ItemReader<Record> for ResumableReader {
fn read(&mut self) -> Result<Option<Record>, BatchError> {
// Load checkpoint if exists
if self.last_id == 0 {
if let Ok(checkpoint) = Checkpoint::load(&self.checkpoint_file) {
self.last_id = checkpoint.last_processed_id;
println!("Resuming from ID: {}", self.last_id);
}
}
// Read next record after last checkpoint
let record = self.fetch_next_record()?;
if let Some(ref rec) = record {
// Save checkpoint periodically
if rec.id % 1000 == 0 {
Checkpoint {
last_processed_id: rec.id,
processed_count: rec.id as usize,
error_count: 0,
}.save(&self.checkpoint_file)?;
}
}
Ok(record)
}
}

Move failed items to a separate queue for later review:

use spring_batch_rs::core::item::{ItemWriter, ItemProcessor};
use std::sync::{Arc, Mutex};
struct DeadLetterWriter<T> {
file_path: String,
failed_items: Arc<Mutex<Vec<T>>>,
}
impl<T> DeadLetterWriter<T>
where
T: Serialize + Clone,
{
fn new(file_path: String) -> Self {
Self {
file_path,
failed_items: Arc::new(Mutex::new(Vec::new())),
}
}
fn add_failed_item(&self, item: T) {
self.failed_items.lock().unwrap().push(item);
}
fn flush(&self) -> Result<(), BatchError> {
let items = self.failed_items.lock().unwrap();
if items.is_empty() {
return Ok(());
}
let json = serde_json::to_string_pretty(&*items)?;
std::fs::write(&self.file_path, json)?;
println!("Wrote {} failed items to {}", items.len(), self.file_path);
Ok(())
}
}
struct FaultTolerantProcessor<P> {
inner: P,
dead_letter_writer: Arc<DeadLetterWriter<Item>>,
}
impl<P> ItemProcessor<Item, Item> for FaultTolerantProcessor<P>
where
P: ItemProcessor<Item, Item>,
{
fn process(&self, item: Item) -> Result<Option<Item>, BatchError> {
match self.inner.process(item.clone()) {
Ok(result) => Ok(result),
Err(e) => {
// Log to dead letter queue
self.dead_letter_writer.add_failed_item(item);
println!("Item added to dead letter queue: {}", e);
// Skip the item
Ok(None)
}
}
}
}
// Usage
fn process_with_dead_letter_queue() -> Result<(), Box<dyn std::error::Error>> {
let dlq = Arc::new(DeadLetterWriter::new("failed_items.json".to_string()));
let processor = FaultTolerantProcessor {
inner: MyProcessor::new(),
dead_letter_writer: dlq.clone(),
};
let step = StepBuilder::new("process-with-dlq")
.chunk(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
step.execute()?;
// Flush dead letter queue
dlq.flush()?;
Ok(())
}

Choose the Right Strategy

  • Skip: Data quality issues, optional validations
  • Retry: Network errors, temporary failures
  • Fail-Fast: Critical operations, data integrity

Set Appropriate Limits

  • Skip Limit: Based on acceptable data loss
  • Retry Limit: 3-5 attempts is typical
  • Timeout: Prevent infinite retries

Log Everything

  • Log all skipped items with reasons
  • Track error patterns over time
  • Generate error reports
  • Monitor error rates

Test Error Scenarios

  • Simulate failures in tests
  • Verify retry behavior
  • Test transaction rollback
  • Validate error recovery
graph TB
    Start[Error Occurs] --> DataError{Data Quality<br/>Issue?}

    DataError -->|Yes| Acceptable{Acceptable<br/>Data Loss?}
    Acceptable -->|Yes| UseSkip[Use Skip Strategy<br/>skip_limit]
    Acceptable -->|No| UseValidation[Pre-validate Data<br/>Fail-Fast]

    DataError -->|No| Transient{Transient<br/>Failure?}
    Transient -->|Yes| UseRetry[Use Retry Strategy<br/>retry_limit + backoff]
    Transient -->|No| Critical{Critical<br/>Operation?}

    Critical -->|Yes| FailFast[Fail-Fast Strategy<br/>No skip/retry]
    Critical -->|No| Hybrid[Hybrid Approach<br/>Skip + Retry]

    style UseSkip fill:#10b981,color:#fff
    style UseRetry fill:#f59e0b,color:#fff
    style FailFast fill:#ef4444,color:#fff
    style UseValidation fill:#3b82f6,color:#fff
    style Hybrid fill:#8b5cf6,color:#fff