File Readers
- CsvItemReader
- JsonItemReader
- XmlItemReader
Ce contenu n’est pas encore disponible dans votre langue.
Spring Batch RS is built on proven batch processing patterns from the Java Spring Batch framework, adapted for Rust’s unique strengths in performance and safety.
graph TB
subgraph "Application Layer"
App[Your Application]
end
subgraph "Spring Batch RS Core"
Job[Job]
JobExec[JobExecution]
Step[Step]
StepExec[StepExecution]
end
subgraph "Processing Layer"
ChunkOrient[Chunk-Oriented Processing]
TaskletProc[Tasklet Processing]
end
subgraph "I/O Layer"
Reader[ItemReader]
Processor[ItemProcessor]
Writer[ItemWriter]
Tasklet[Tasklet]
end
subgraph "Data Sources"
Files[Files<br/>CSV, JSON, XML]
DB[Databases<br/>PostgreSQL, MySQL, SQLite]
NoSQL[NoSQL<br/>MongoDB]
Network[Network<br/>FTP, FTPS]
end
App --> Job
Job --> JobExec
JobExec --> Step
Step --> StepExec
StepExec --> ChunkOrient
StepExec --> TaskletProc
ChunkOrient --> Reader
ChunkOrient --> Processor
ChunkOrient --> Writer
TaskletProc --> Tasklet
Reader -.-> Files
Reader -.-> DB
Reader -.-> NoSQL
Writer -.-> Files
Writer -.-> DB
Writer -.-> NoSQL
Tasklet -.-> Network
Tasklet -.-> Files
style Job fill:#3b82f6,stroke:#1e40af,color:#fff
style Step fill:#10b981,stroke:#059669,color:#fff
style ChunkOrient fill:#f59e0b,stroke:#d97706,color:#fff
style TaskletProc fill:#f59e0b,stroke:#d97706,color:#fff
A Job represents the entire batch process. It’s the top-level container that orchestrates one or more steps.
use spring_batch_rs::core::job::JobBuilder;
let job = JobBuilder::new() .start(&step1) .next(&step2) .next(&step3) .build();
let result = job.run()?;Key Characteristics:
A Step is an independent, sequential phase of a Job. Each step can either process data in chunks or execute a single task.
graph LR
Step[Step] --> Type{Step Type?}
Type -->|Chunk-Oriented| Chunk[Read → Process → Write]
Type -->|Tasklet| Task[Single Task Execution]
style Step fill:#10b981,color:#fff
style Chunk fill:#3b82f6,color:#fff
style Task fill:#f59e0b,color:#fff
The read-process-write pattern for handling large datasets efficiently.
sequenceDiagram
participant Step
participant Reader
participant Processor
participant Writer
loop For each chunk
Step->>Reader: read(chunk_size)
Reader-->>Step: items[1..N]
loop For each item
Step->>Processor: process(item)
Processor-->>Step: transformed_item
end
Step->>Writer: write(chunk)
Writer-->>Step: success
end
Architecture Benefits:
Abstracts data retrieval from various sources.
pub trait ItemReader<T> { fn read(&mut self) -> Result<Option<T>, BatchError>;}Design Pattern: Iterator-like pattern with error handling
File Readers
Database Readers
Utility Readers
Transforms and validates items during processing.
pub trait ItemProcessor<I, O> { fn process(&self, item: I) -> Result<Option<O>, BatchError>;}Key Features:
I → ONone to skip itemsErr for invalid itemsOutputs processed items to destinations.
pub trait ItemWriter<T> { fn write(&mut self, items: &[T]) -> Result<(), BatchError>;}Batch Writing: Receives chunks of items for efficient I/O
Single-task operations that don’t fit the chunk pattern.
pub trait Tasklet { fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError>;}Common Use Cases:
stateDiagram-v2
[*] --> JobStarting
JobStarting --> StepExecution
state StepExecution {
[*] --> ReadChunk
ReadChunk --> ProcessItems
ProcessItems --> WriteChunk
WriteChunk --> MoreData?
MoreData? --> ReadChunk: Yes
MoreData? --> [*]: No
}
StepExecution --> NextStep?
NextStep? --> StepExecution: More Steps
NextStep? --> JobComplete: Done
JobComplete --> [*]
graph TB
Read[Read Item] --> Process[Process Item]
Process --> Success{Success?}
Success -->|Yes| Write[Write to Chunk]
Success -->|No| SkipCheck{Skip Limit<br/>Reached?}
SkipCheck -->|No| Skip[Skip Item & Continue]
SkipCheck -->|Yes| Fail[Fail Job]
Write --> MoreItems{More Items?}
MoreItems -->|Yes| Read
MoreItems -->|No| Commit[Commit Chunk]
Skip --> MoreItems
style Success fill:#10b981,color:#fff
style SkipCheck fill:#f59e0b,color:#fff
style Fail fill:#ef4444,color:#fff
style Commit fill:#3b82f6,color:#fff
All components use the builder pattern for flexible, type-safe construction.
let reader = CsvItemReaderBuilder::<Product>::new() .has_headers(true) .delimiter(b',') .from_path("products.csv")?;
let step = StepBuilder::new("process-products") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) .skip_limit(10) .build();Benefits:
Readers, processors, and writers are interchangeable strategies.
graph LR
Step[Step] --> IReader[ItemReader Trait]
IReader --> CSV[CsvReader]
IReader --> JSON[JsonReader]
IReader --> DB[DatabaseReader]
style IReader fill:#3b82f6,color:#fff
style CSV fill:#10b981,color:#fff
style JSON fill:#10b981,color:#fff
style DB fill:#10b981,color:#fff
Job and Step execution follows a template with customizable steps.
// Framework provides the templatepub fn run(&self) -> Result<JobExecution, BatchError> { self.before_job()?; // Hook let result = self.execute_steps()?; self.after_job()?; // Hook Ok(result)}graph TB
subgraph "Memory Usage Per Chunk"
Input[Input Buffer<br/>~chunk_size items]
Processing[Processing Buffer<br/>~chunk_size items]
Output[Output Buffer<br/>~chunk_size items]
end
subgraph "Total Memory"
Total[~3 × chunk_size × item_size]
end
Input --> Processing
Processing --> Output
Output --> Total
style Total fill:#f59e0b,color:#fff
Memory Optimization:
chunk_size based on available memory// Resources are automatically cleaned up{ let reader = CsvItemReaderBuilder::new() .from_path("large_file.csv")?;
// File handle opened let step = StepBuilder::new("process") .chunk(1000) // Only 1000 items in memory .reader(&reader) .build();
job.run()?; // File handle automatically closed}Spring Batch RS is designed for single-threaded execution by default, but supports parallelization strategies.
sequenceDiagram
participant J as Job
participant S1 as Step 1
participant S2 as Step 2
participant S3 as Step 3
J->>S1: Execute
S1-->>J: Complete
J->>S2: Execute
S2-->>J: Complete
J->>S3: Execute
S3-->>J: Complete
graph TB
Job[Job] --> Split{Split}
Split --> Step1[Step 1]
Split --> Step2[Step 2]
Split --> Step3[Step 3]
Step1 --> Join{Join}
Step2 --> Join
Step3 --> Join
Join --> Next[Next Step]
style Split fill:#3b82f6,color:#fff
style Join fill:#10b981,color:#fff
sequenceDiagram
participant Step
participant Reader
participant Writer
participant DB
Step->>DB: BEGIN TRANSACTION
loop Each chunk
Step->>Reader: read(chunk_size)
Reader->>DB: SELECT...
DB-->>Reader: rows
Reader-->>Step: items
Step->>Writer: write(items)
Writer->>DB: INSERT/UPDATE...
end
Step->>DB: COMMIT
Note over Step,DB: If error: ROLLBACK
File operations are not transactional by default. Use staging directories:
// Write to temporary locationlet temp_writer = JsonItemWriterBuilder::<TempData>::new() .from_path("/tmp/output.json")?;
// On success, move to final locationstd::fs::rename("/tmp/output.json", "/final/output.json")?;let step = StepBuilder::new("fault-tolerant") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) .skip_limit(10) // Skip up to 10 errors .build();Use when: Individual item failures shouldn’t stop the job
let step = StepBuilder::new("critical-process") .chunk(100) .reader(&reader) .processor(&processor) .writer(&writer) // No skip_limit - fail on first error .build();Use when: Data integrity is critical
struct RetryProcessor<P> { inner: P, max_retries: u32,}
impl<I, O, P> ItemProcessor<I, O> for RetryProcessor<P>where P: ItemProcessor<I, O>,{ fn process(&self, item: I) -> Result<Option<O>, BatchError> { let mut attempts = 0; loop { match self.inner.process(item.clone()) { Ok(result) => return Ok(result), Err(e) if attempts < self.max_retries => { attempts += 1; std::thread::sleep(Duration::from_millis(100 * attempts)); } Err(e) => return Err(e), } } }}Use when: Transient failures are expected (network, locks)
graph LR
Small[Small Chunks<br/>10-50 items] -->|Lower Memory<br/>Lower Throughput| Result1[Safe for<br/>Large Items]
Medium[Medium Chunks<br/>100-500 items] -->|Balanced| Result2[Recommended<br/>Default]
Large[Large Chunks<br/>1000+ items] -->|Higher Memory<br/>Higher Throughput| Result3[High-Performance<br/>Small Items]
style Medium fill:#10b981,color:#fff
| Operation | Small Chunks (10) | Medium Chunks (100) | Large Chunks (1000) |
|---|---|---|---|
| CSV Read | 5,000/sec | 45,000/sec | 180,000/sec |
| JSON Write | 3,000/sec | 28,000/sec | 95,000/sec |
| DB Insert | 500/sec | 4,000/sec | 12,000/sec |
use spring_batch_rs::core::item::ItemReader;use spring_batch_rs::BatchError;
struct ApiItemReader { url: String, page: usize, buffer: Vec<Item>,}
impl ItemReader<Item> for ApiItemReader { fn read(&mut self) -> Result<Option<Item>, BatchError> { if self.buffer.is_empty() { // Fetch next page self.fetch_page()?; }
Ok(self.buffer.pop()) }}use spring_batch_rs::core::step::{Tasklet, StepExecution, RepeatStatus};
struct CleanupTasklet { directory: PathBuf, days_old: u32,}
impl Tasklet for CleanupTasklet { fn execute(&self, execution: &StepExecution) -> Result<RepeatStatus, BatchError> { // Custom cleanup logic self.delete_old_files()?; Ok(RepeatStatus::Finished) }}1. Size Your Chunks Wisely
2. Handle Errors Gracefully
3. Optimize I/O
4. Monitor & Measure
Spring Batch RS architecture provides:
✅ Separation of Concerns: Clear separation between reading, processing, and writing ✅ Flexibility: Multiple processing models (chunk vs tasklet) ✅ Extensibility: Easy to add custom components ✅ Reliability: Built-in error handling and recovery ✅ Performance: Optimized for throughput and memory efficiency ✅ Type Safety: Rust’s strong type system prevents common errors