Skip Strategy
Best for: Data quality issues, validation failures
- Skip invalid records
- Continue processing
- Log errors for review
Ce contenu n’est pas encore disponible dans votre langue.
Batch jobs process thousands or millions of records. Robust error handling ensures your jobs can recover from failures gracefully without losing progress.
graph TB
Error[Error Occurs] --> Type{Error Type?}
Type -->|Data Error| Skip[Skip Strategy<br/>Continue Processing]
Type -->|Transient Error| Retry[Retry Strategy<br/>Attempt Again]
Type -->|Fatal Error| Fail[Fail-Fast Strategy<br/>Stop Immediately]
Skip --> Log[Log Skipped Item]
Retry --> Backoff[Exponential Backoff]
Fail --> Rollback[Rollback Transaction]
Log --> Check{Skip Limit<br/>Reached?}
Check -->|No| Continue[Continue Job]
Check -->|Yes| Fail
Backoff --> RetryCheck{Retry Limit<br/>Reached?}
RetryCheck -->|No| Retry
RetryCheck -->|Yes| Fail
style Skip fill:#10b981,color:#fff
style Retry fill:#f59e0b,color:#fff
style Fail fill:#ef4444,color:#fff
Skip Strategy
Best for: Data quality issues, validation failures
Retry Strategy
Best for: Transient failures, network issues
Fail-Fast Strategy
Best for: Critical errors, data integrity
use spring_batch_rs::core::step::StepBuilder;use spring_batch_rs::item::csv::CsvItemReaderBuilder;use spring_batch_rs::BatchError;use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]struct Transaction { id: u32, amount: f64, account: String,}
fn build_fault_tolerant_step() -> Step { let reader = CsvItemReaderBuilder::<Transaction>::new() .has_headers(true) .from_path("transactions.csv")?;
let writer = DatabaseWriter::new(pool);
StepBuilder::new("process-transactions") .chunk(100) .reader(&reader) .writer(&writer) .skip_limit(50) // Skip up to 50 invalid records .build()}Behavior:
Skip only specific error types:
use spring_batch_rs::core::step::{StepBuilder, SkipPolicy};use spring_batch_rs::BatchError;
struct SelectiveSkipPolicy;
impl SkipPolicy for SelectiveSkipPolicy { fn should_skip(&self, error: &BatchError, skip_count: usize) -> bool { // Skip data validation errors, but not I/O errors match error { BatchError::ValidationError(_) => skip_count < 100, BatchError::ProcessingError(_) => skip_count < 50, BatchError::IoError(_) => false, // Never skip I/O errors BatchError::DatabaseError(_) => false, // Never skip DB errors _ => false, } }}
fn build_selective_skip_step() -> Step { StepBuilder::new("selective-skip") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) .skip_policy(SelectiveSkipPolicy) .build()}Collect and report all skipped items:
use std::sync::{Arc, Mutex};use spring_batch_rs::core::item::ItemProcessor;use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, Clone)]struct Record { id: u32, data: String,}
struct ValidatingProcessor { error_log: Arc<Mutex<Vec<String>>>,}
impl ValidatingProcessor { fn new() -> Self { Self { error_log: Arc::new(Mutex::new(Vec::new())), } }
fn get_errors(&self) -> Vec<String> { self.error_log.lock().unwrap().clone() }}
impl ItemProcessor<Record, Record> for ValidatingProcessor { fn process(&self, record: Record) -> Result<Option<Record>, BatchError> { // Validation logic if record.data.is_empty() { let error_msg = format!("Record {} has empty data", record.id); self.error_log.lock().unwrap().push(error_msg.clone()); return Err(BatchError::ValidationError(error_msg)); }
if record.data.len() > 1000 { let error_msg = format!("Record {} data too long", record.id); self.error_log.lock().unwrap().push(error_msg.clone()); return Err(BatchError::ValidationError(error_msg)); }
Ok(Some(record)) }}
// Usagefn process_with_error_logging() -> Result<(), Box<dyn std::error::Error>> { let processor = ValidatingProcessor::new(); let error_log = processor.error_log.clone();
let step = StepBuilder::new("validate-records") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) .skip_limit(100) .build();
let execution = step.execute()?;
// Write error log let errors = error_log.lock().unwrap(); if !errors.is_empty() { std::fs::write("skipped_records.log", errors.join("\n"))?; println!("Skipped {} records - see skipped_records.log", errors.len()); }
println!("Processed: {}", execution.write_count); println!("Skipped: {}", execution.skip_count);
Ok(())}use spring_batch_rs::core::step::StepBuilder;
fn build_retry_step() -> Step { StepBuilder::new("process-with-retry") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) .retry_limit(3) // Retry each failed item up to 3 times .build()}Behavior:
sequenceDiagram
participant Step
participant Processor
participant ErrorHandler
Step->>Processor: process(item)
Processor-->>ErrorHandler: Error
ErrorHandler->>ErrorHandler: Attempt 1 failed
ErrorHandler->>Processor: retry
Processor-->>ErrorHandler: Error
ErrorHandler->>ErrorHandler: Attempt 2 failed
ErrorHandler->>Processor: retry
Processor-->>ErrorHandler: Error
ErrorHandler->>ErrorHandler: Attempt 3 failed
ErrorHandler->>Step: Max retries reached
Step->>Step: Skip or Fail
Add delays between retries to handle transient issues:
use spring_batch_rs::core::item::ItemProcessor;use spring_batch_rs::BatchError;use std::time::Duration;use std::thread;
struct RetryingProcessor<P> { inner: P, max_retries: u32, initial_backoff_ms: u64,}
impl<I, O, P> ItemProcessor<I, O> for RetryingProcessor<P>where P: ItemProcessor<I, O>, I: Clone,{ fn process(&self, item: I) -> Result<Option<O>, BatchError> { let mut attempts = 0; let mut backoff = self.initial_backoff_ms;
loop { match self.inner.process(item.clone()) { Ok(result) => return Ok(result), Err(e) if Self::is_retryable(&e) && attempts < self.max_retries => { attempts += 1; println!("Retry attempt {} after {} ms", attempts, backoff);
thread::sleep(Duration::from_millis(backoff));
// Exponential backoff: 100ms, 200ms, 400ms, 800ms... backoff *= 2; } Err(e) => return Err(e), } } }}
impl<P> RetryingProcessor<P> { fn is_retryable(error: &BatchError) -> bool { matches!( error, BatchError::NetworkError(_) | BatchError::TemporaryError(_) | BatchError::TimeoutError(_) ) }}
// Usagefn build_step_with_backoff() -> Step { let processor = RetryingProcessor { inner: MyProcessor::new(), max_retries: 5, initial_backoff_ms: 100, };
StepBuilder::new("retry-with-backoff") .chunk(50) .reader(&reader) .processor(&processor) .writer(&writer) .build()}Stop retrying if too many consecutive failures occur:
use std::sync::atomic::{AtomicUsize, Ordering};use std::sync::Arc;
struct CircuitBreakerProcessor<P> { inner: P, consecutive_failures: Arc<AtomicUsize>, circuit_breaker_threshold: usize, circuit_open: Arc<AtomicBool>,}
impl<I, O, P> ItemProcessor<I, O> for CircuitBreakerProcessor<P>where P: ItemProcessor<I, O>,{ fn process(&self, item: I) -> Result<Option<O>, BatchError> { // Check if circuit is open if self.circuit_open.load(Ordering::SeqCst) { return Err(BatchError::ProcessingError( "Circuit breaker is open - too many failures".to_string() )); }
match self.inner.process(item) { Ok(result) => { // Reset failure count on success self.consecutive_failures.store(0, Ordering::SeqCst); Ok(result) } Err(e) => { // Increment failure count let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1;
if failures >= self.circuit_breaker_threshold { self.circuit_open.store(true, Ordering::SeqCst); println!("⚠️ Circuit breaker opened after {} failures", failures); }
Err(e) } } }}For operations where any error is unacceptable:
fn build_fail_fast_step() -> Step { StepBuilder::new("critical-processing") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) // No skip_limit - any error fails the job .build()}Validate all data before starting processing:
use spring_batch_rs::core::step::{Tasklet, StepExecution, RepeatStatus};use spring_batch_rs::BatchError;
struct ValidationTasklet { file_path: String,}
impl Tasklet for ValidationTasklet { fn execute(&self, _: &StepExecution) -> Result<RepeatStatus, BatchError> { println!("Validating data file: {}", self.file_path);
let mut reader = csv::Reader::from_path(&self.file_path)?; let mut error_count = 0; let mut errors = Vec::new();
for (line_num, result) in reader.records().enumerate() { match result { Ok(record) => { // Validate record structure if record.len() != 5 { errors.push(format!("Line {}: Expected 5 fields, got {}", line_num + 1, record.len())); error_count += 1; } } Err(e) => { errors.push(format!("Line {}: {}", line_num + 1, e)); error_count += 1; } } }
if error_count > 0 { let error_message = format!( "Validation failed with {} errors:\n{}", error_count, errors.join("\n") ); return Err(BatchError::ValidationError(error_message)); }
println!("✓ Validation successful"); Ok(RepeatStatus::Finished) }}
// Usage: Validate before processingfn build_validated_job() -> Job { let validation_step = StepBuilder::new("validate") .tasklet(&ValidationTasklet { file_path: "data.csv".to_string(), }) .build();
let processing_step = StepBuilder::new("process") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) .build();
JobBuilder::new() .start(&validation_step) // Validate first .next(&processing_step) // Then process .build()}sequenceDiagram
participant Step
participant DB
participant ErrorHandler
Step->>DB: BEGIN TRANSACTION
loop For each chunk
Step->>Step: Read items
Step->>Step: Process items
Step->>DB: Write items
alt All successful
Step->>DB: COMMIT
else Error occurred
Step->>ErrorHandler: Handle error
ErrorHandler->>DB: ROLLBACK
ErrorHandler-->>Step: Skip or Fail
end
end
use spring_batch_rs::core::item::ItemWriter;use spring_batch_rs::BatchError;use sqlx::{PgPool, Postgres, Transaction};
struct TransactionalWriter { pool: PgPool,}
impl<T> ItemWriter<T> for TransactionalWriterwhere T: Serialize,{ fn write(&mut self, items: &[T]) -> Result<(), BatchError> { // Start transaction let mut tx = self.pool.begin().await?;
// Attempt to write all items for item in items { let result = sqlx::query("INSERT INTO table (data) VALUES ($1)") .bind(serde_json::to_value(item)?) .execute(&mut tx) .await;
if let Err(e) = result { // Rollback on any error tx.rollback().await?; return Err(BatchError::DatabaseError(e.to_string())); } }
// Commit if all succeeded tx.commit().await?;
Ok(()) }}Save progress to resume after failure:
use spring_batch_rs::core::step::StepExecution;use std::fs;use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]struct Checkpoint { last_processed_id: u32, processed_count: usize, error_count: usize,}
impl Checkpoint { fn save(&self, path: &str) -> Result<(), BatchError> { let json = serde_json::to_string_pretty(self)?; fs::write(path, json)?; Ok(()) }
fn load(path: &str) -> Result<Self, BatchError> { let json = fs::read_to_string(path)?; Ok(serde_json::from_str(&json)?) }}
struct ResumableReader { checkpoint_file: String, last_id: u32,}
impl ItemReader<Record> for ResumableReader { fn read(&mut self) -> Result<Option<Record>, BatchError> { // Load checkpoint if exists if self.last_id == 0 { if let Ok(checkpoint) = Checkpoint::load(&self.checkpoint_file) { self.last_id = checkpoint.last_processed_id; println!("Resuming from ID: {}", self.last_id); } }
// Read next record after last checkpoint let record = self.fetch_next_record()?;
if let Some(ref rec) = record { // Save checkpoint periodically if rec.id % 1000 == 0 { Checkpoint { last_processed_id: rec.id, processed_count: rec.id as usize, error_count: 0, }.save(&self.checkpoint_file)?; } }
Ok(record) }}Move failed items to a separate queue for later review:
use spring_batch_rs::core::item::{ItemWriter, ItemProcessor};use std::sync::{Arc, Mutex};
struct DeadLetterWriter<T> { file_path: String, failed_items: Arc<Mutex<Vec<T>>>,}
impl<T> DeadLetterWriter<T>where T: Serialize + Clone,{ fn new(file_path: String) -> Self { Self { file_path, failed_items: Arc::new(Mutex::new(Vec::new())), } }
fn add_failed_item(&self, item: T) { self.failed_items.lock().unwrap().push(item); }
fn flush(&self) -> Result<(), BatchError> { let items = self.failed_items.lock().unwrap();
if items.is_empty() { return Ok(()); }
let json = serde_json::to_string_pretty(&*items)?; std::fs::write(&self.file_path, json)?;
println!("Wrote {} failed items to {}", items.len(), self.file_path);
Ok(()) }}
struct FaultTolerantProcessor<P> { inner: P, dead_letter_writer: Arc<DeadLetterWriter<Item>>,}
impl<P> ItemProcessor<Item, Item> for FaultTolerantProcessor<P>where P: ItemProcessor<Item, Item>,{ fn process(&self, item: Item) -> Result<Option<Item>, BatchError> { match self.inner.process(item.clone()) { Ok(result) => Ok(result), Err(e) => { // Log to dead letter queue self.dead_letter_writer.add_failed_item(item); println!("Item added to dead letter queue: {}", e);
// Skip the item Ok(None) } } }}
// Usagefn process_with_dead_letter_queue() -> Result<(), Box<dyn std::error::Error>> { let dlq = Arc::new(DeadLetterWriter::new("failed_items.json".to_string()));
let processor = FaultTolerantProcessor { inner: MyProcessor::new(), dead_letter_writer: dlq.clone(), };
let step = StepBuilder::new("process-with-dlq") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) .build();
step.execute()?;
// Flush dead letter queue dlq.flush()?;
Ok(())}Choose the Right Strategy
Set Appropriate Limits
Log Everything
Test Error Scenarios
graph TB
Start[Error Occurs] --> DataError{Data Quality<br/>Issue?}
DataError -->|Yes| Acceptable{Acceptable<br/>Data Loss?}
Acceptable -->|Yes| UseSkip[Use Skip Strategy<br/>skip_limit]
Acceptable -->|No| UseValidation[Pre-validate Data<br/>Fail-Fast]
DataError -->|No| Transient{Transient<br/>Failure?}
Transient -->|Yes| UseRetry[Use Retry Strategy<br/>retry_limit + backoff]
Transient -->|No| Critical{Critical<br/>Operation?}
Critical -->|Yes| FailFast[Fail-Fast Strategy<br/>No skip/retry]
Critical -->|No| Hybrid[Hybrid Approach<br/>Skip + Retry]
style UseSkip fill:#10b981,color:#fff
style UseRetry fill:#f59e0b,color:#fff
style FailFast fill:#ef4444,color:#fff
style UseValidation fill:#3b82f6,color:#fff
style Hybrid fill:#8b5cf6,color:#fff