Backup Archives
Compress data directories before backup or transfer
The Tasklet trait defines single-task operations that don’t fit the chunk-oriented read-process-write model. Perfect for setup, cleanup, file operations, and maintenance tasks.
pub trait Tasklet { /// Executes the tasklet operation /// /// # Parameters /// - `step_execution`: The current step execution context /// /// # Returns /// - `Ok(RepeatStatus)` - Task completed successfully /// - `Err(BatchError)` - An error occurred fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError>;}pub enum RepeatStatus { Finished, // Task is complete Continuable, // Task should be repeated}Spring Batch RS provides 2 built-in tasklet implementations:
| Tasklet | Feature Flag | Purpose |
|---|---|---|
ZipTasklet | zip | Compress files and directories |
FtpTasklet | ftp | Transfer files via FTP/FTPS |
Compresses files and directories into ZIP archives with configurable options.
pub struct ZipTaskletBuilder { /* ... */ }| Method | Type | Default | Description |
|---|---|---|---|
source_path(&Path) | &Path | required | File or directory to compress |
target_path(&Path) | &Path | required | Output ZIP file path |
compression_level(u32) | u32 | 6 | Compression level (0-9, 0=none, 9=max) |
include_pattern(&str) | &str | "*" | Glob pattern for files to include |
exclude_pattern(&str) | &str | "" | Glob pattern for files to exclude |
preserve_structure(bool) | bool | true | Keep directory structure in archive |
use spring_batch_rs::tasklet::zip::ZipTaskletBuilder;use spring_batch_rs::core::step::StepBuilder;use spring_batch_rs::core::job::JobBuilder;
fn main() -> Result<(), Box<dyn std::error::Error>> { let zip_tasklet = ZipTaskletBuilder::new() .source_path("/data/exports") .target_path("/backups/exports.zip") .build()?;
let step = StepBuilder::new("zip-exports") .tasklet(&zip_tasklet) .build();
let job = JobBuilder::new() .start(&step) .build();
job.run()?; Ok(())}let zip_tasklet = ZipTaskletBuilder::new() .source_path("/logs") .target_path("/archives/logs.zip") .include_pattern("*.log") // Only .log files .exclude_pattern("*.tmp") // Exclude .tmp files .compression_level(9) // Maximum compression .build()?;let zip_tasklet = ZipTaskletBuilder::new() .source_path("/data") .target_path("/flat-archive.zip") .preserve_structure(false) // All files in root of ZIP .compression_level(3) // Fast compression .build()?;let zip_tasklet = ZipTaskletBuilder::new() .source_path("/data/report.pdf") .target_path("/compressed/report.zip") .compression_level(9) .build()?;| Level | Speed | Ratio | Use Case |
|---|---|---|---|
| 0 | Fastest | None | Store only (archiving) |
| 1-3 | Fast | Low | Quick backups, temporary files |
| 4-6 | Balanced | Medium | General purpose (default: 6) |
| 7-9 | Slow | High | Long-term storage, distribution |
Backup Archives
Compress data directories before backup or transfer
Log Rotation
Archive old log files to save disk space
Report Distribution
Package reports and attachments for delivery
Cleanup Tasks
Compress before deletion as part of data lifecycle
Transfers files to/from FTP or FTPS servers with authentication and connection management.
pub struct FtpTaskletBuilder { /* ... */ }| Method | Type | Default | Description |
|---|---|---|---|
host(&str) | &str | required | FTP server hostname |
port(u16) | u16 | 21 | FTP server port |
username(&str) | &str | "anonymous" | Login username |
password(&str) | &str | "" | Login password |
use_tls(bool) | bool | false | Enable FTPS (TLS encryption) |
local_path(&Path) | &Path | required | Local file path |
remote_path(&str) | &str | required | Remote file path |
operation(Operation) | Operation | required | Upload or Download |
use spring_batch_rs::tasklet::ftp::{FtpTaskletBuilder, FtpOperation};use spring_batch_rs::core::step::StepBuilder;
fn main() -> Result<(), Box<dyn std::error::Error>> { let ftp_tasklet = FtpTaskletBuilder::new() .host("ftp.example.com") .port(21) .username("user") .password("password") .local_path("/data/export.csv") .remote_path("/uploads/export.csv") .operation(FtpOperation::Upload) .build()?;
let step = StepBuilder::new("upload-to-ftp") .tasklet(&ftp_tasklet) .build();
Ok(())}let ftp_tasklet = FtpTaskletBuilder::new() .host("ftp.example.com") .username("user") .password("password") .local_path("/downloads/data.csv") .remote_path("/exports/data.csv") .operation(FtpOperation::Download) .build()?;let ftp_tasklet = FtpTaskletBuilder::new() .host("ftps.example.com") .port(990) // FTPS implicit port .use_tls(true) // Enable TLS encryption .username("user") .password("password") .local_path("/secure/data.xml") .remote_path("/secure/data.xml") .operation(FtpOperation::Upload) .build()?;pub enum FtpOperation { Upload, // Transfer from local to remote Download, // Transfer from remote to local}The FTP tasklet handles common errors:
// Tasklet will return appropriate BatchErrormatch step.execute(&mut execution) { Ok(_) => println!("Transfer successful"), Err(BatchError::Tasklet(msg)) => eprintln!("FTP error: {}", msg), Err(e) => eprintln!("Other error: {}", e),}You can create custom tasklets for any single-task operation:
use spring_batch_rs::core::step::{Tasklet, StepExecution, RepeatStatus};use spring_batch_rs::error::BatchError;
struct HelloTasklet { message: String,}
impl Tasklet for HelloTasklet { fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> { println!("{}", self.message); Ok(RepeatStatus::Finished) }}
impl HelloTasklet { fn new(message: impl Into<String>) -> Self { Self { message: message.into(), } }}use sqlx::PgPool;
struct DatabaseCleanupTasklet { pool: PgPool,}
impl Tasklet for DatabaseCleanupTasklet { fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> { // Execute cleanup query let runtime = tokio::runtime::Runtime::new() .map_err(|e| BatchError::Tasklet(format!("Runtime error: {}", e)))?;
runtime.block_on(async { sqlx::query("DELETE FROM logs WHERE created_at < NOW() - INTERVAL '30 days'") .execute(&self.pool) .await .map_err(|e| BatchError::Tasklet(format!("Cleanup failed: {}", e)))?;
Ok(RepeatStatus::Finished) }) }}use std::fs;use std::path::Path;use std::time::{SystemTime, Duration};
struct FileCleanupTasklet { directory: String, age_days: u64,}
impl Tasklet for FileCleanupTasklet { fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> { let dir = Path::new(&self.directory); let max_age = Duration::from_secs(self.age_days * 24 * 60 * 60); let now = SystemTime::now();
let entries = fs::read_dir(dir) .map_err(|e| BatchError::Tasklet(format!("Failed to read directory: {}", e)))?;
let mut deleted = 0;
for entry in entries { let entry = entry.map_err(|e| BatchError::Tasklet(e.to_string()))?; let metadata = entry.metadata() .map_err(|e| BatchError::Tasklet(e.to_string()))?;
if let Ok(modified) = metadata.modified() { if let Ok(age) = now.duration_since(modified) { if age > max_age { fs::remove_file(entry.path()) .map_err(|e| BatchError::Tasklet(e.to_string()))?; deleted += 1; } } } }
println!("Deleted {} old files", deleted); Ok(RepeatStatus::Finished) }}struct ApiNotificationTasklet { webhook_url: String, message: String,}
impl Tasklet for ApiNotificationTasklet { fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> { let client = reqwest::blocking::Client::new();
let response = client .post(&self.webhook_url) .json(&serde_json::json!({ "message": self.message, "step": step_execution.step_name(), "status": "completed" })) .send() .map_err(|e| BatchError::Tasklet(format!("HTTP request failed: {}", e)))?;
if !response.status().is_success() { return Err(BatchError::Tasklet( format!("API returned error: {}", response.status()) )); }
Ok(RepeatStatus::Finished) }}Use RepeatStatus::Continuable for tasks that need multiple executions:
use std::sync::Mutex;
struct PollingTasklet { attempts: Mutex<u32>, max_attempts: u32,}
impl Tasklet for PollingTasklet { fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> { let mut attempts = self.attempts.lock().unwrap(); *attempts += 1;
println!("Poll attempt {}/{}", *attempts, self.max_attempts);
// Check some condition if check_condition() { println!("Condition met!"); return Ok(RepeatStatus::Finished); }
if *attempts >= self.max_attempts { return Err(BatchError::Tasklet( "Max polling attempts reached".to_string() )); }
// Wait before next attempt std::thread::sleep(std::time::Duration::from_secs(5));
Ok(RepeatStatus::Continuable) }}
fn check_condition() -> bool { // Check if some external condition is met false}Combine tasklets with chunk-oriented steps:
use spring_batch_rs::core::job::JobBuilder;
fn main() -> Result<(), Box<dyn std::error::Error>> { // Step 1: Download file via FTP let download_tasklet = FtpTaskletBuilder::new() .host("ftp.example.com") .username("user") .password("pass") .remote_path("/data/input.csv") .local_path("/temp/input.csv") .operation(FtpOperation::Download) .build()?;
let download_step = StepBuilder::new("download") .tasklet(&download_tasklet) .build();
// Step 2: Process the file (chunk-oriented) let reader = CsvItemReaderBuilder::new() .from_path("/temp/input.csv")?;
let writer = JsonItemWriterBuilder::<Record>::new() .from_path("/temp/output.json")?;
let process_step = StepBuilder::new("process") .chunk(100) .reader(&reader) .writer(&writer) .build();
// Step 3: Compress output let zip_tasklet = ZipTaskletBuilder::new() .source_path("/temp/output.json") .target_path("/output/data.zip") .compression_level(9) .build()?;
let compress_step = StepBuilder::new("compress") .tasklet(&zip_tasklet) .build();
// Step 4: Upload result let upload_tasklet = FtpTaskletBuilder::new() .host("ftp.example.com") .username("user") .password("pass") .local_path("/output/data.zip") .remote_path("/results/data.zip") .operation(FtpOperation::Upload) .build()?;
let upload_step = StepBuilder::new("upload") .tasklet(&upload_tasklet) .build();
// Create job with all steps let job = JobBuilder::new() .start(&download_step) .next(&process_step) .next(&compress_step) .next(&upload_step) .build();
job.run()?; Ok(())}Single Responsibility
Each tasklet should do one thing well. Create multiple tasklets for complex workflows.
Idempotency
Design tasklets to be safely re-runnable in case of failures.
Error Reporting
Return detailed error messages that help diagnose issues.
Resource Cleanup
Always clean up resources even if errors occur (use Drop trait if needed).
Use tasklets for:
Use chunk-oriented steps for: