Compression Level
Use level 3-6 for balance between speed and size. Level 9 for archival.
This page provides comprehensive examples for using tasklets in Spring Batch RS for file operations and single-task processing.
Add tasklet features to your Cargo.toml:
[dependencies]spring-batch-rs = { version = "0.1", features = ["zip"] }[dependencies]spring-batch-rs = { version = "0.1", features = ["ftp"] }[dependencies]spring-batch-rs = { version = "0.1", features = ["s3"] }[dependencies]spring-batch-rs = { version = "0.1", features = ["zip", "ftp", "s3"] }use spring_batch_rs::{ core::{job::JobBuilder, step::StepBuilder}, tasklet::zip::ZipTaskletBuilder,};
fn main() -> Result<(), Box<dyn std::error::Error>> { let zip_tasklet = ZipTaskletBuilder::new() .source_path("/data/exports") .target_path("/backups/exports.zip") .build()?;
let step = StepBuilder::new("compress-exports") .tasklet(&zip_tasklet) .build();
let job = JobBuilder::new() .start(&step) .build();
job.run()?;
println!("Compression completed!"); Ok(())}let zip_tasklet = ZipTaskletBuilder::new() .source_path("/logs") .target_path("/archives/logs.zip") .include_pattern("*.log") // Only .log files .exclude_pattern("*.tmp") // Exclude .tmp files .compression_level(9) // Maximum compression .build()?;let zip_tasklet = ZipTaskletBuilder::new() .source_path("/data/nested") .target_path("/flat-archive.zip") .preserve_structure(false) // All files in ZIP root .compression_level(3) // Fast compression .build()?;let zip_tasklet = ZipTaskletBuilder::new() .source_path("/reports/monthly_report.pdf") .target_path("/compressed/monthly_report.zip") .compression_level(9) .build()?;use spring_batch_rs::tasklet::ftp::{FtpTaskletBuilder, FtpOperation};
fn main() -> Result<(), Box<dyn std::error::Error>> { let ftp_tasklet = FtpTaskletBuilder::new() .host("ftp.example.com") .port(21) .username("user") .password("password") .local_path("/data/export.csv") .remote_path("/uploads/export.csv") .operation(FtpOperation::Upload) .build()?;
let step = StepBuilder::new("upload-to-ftp") .tasklet(&ftp_tasklet) .build();
let job = JobBuilder::new() .start(&step) .build();
job.run()?;
Ok(())}let ftp_tasklet = FtpTaskletBuilder::new() .host("ftp.example.com") .username("user") .password("password") .remote_path("/exports/data.csv") .local_path("/downloads/data.csv") .operation(FtpOperation::Download) .build()?;let ftp_tasklet = FtpTaskletBuilder::new() .host("ftps.example.com") .port(990) // FTPS implicit port .use_tls(true) // Enable TLS encryption .username("user") .password("password") .local_path("/secure/data.xml") .remote_path("/secure/data.xml") .operation(FtpOperation::Upload) .build()?;use spring_batch_rs::item::{ csv::CsvItemReaderBuilder, json::JsonItemWriterBuilder,};
fn main() -> Result<(), Box<dyn std::error::Error>> { // Step 1: Download file from FTP let download_tasklet = FtpTaskletBuilder::new() .host("ftp.example.com") .username("user") .password("password") .remote_path("/data/input.csv") .local_path("/temp/input.csv") .operation(FtpOperation::Download) .build()?;
let download_step = StepBuilder::new("download") .tasklet(&download_tasklet) .build();
// Step 2: Process the file let reader = CsvItemReaderBuilder::new() .from_path("/temp/input.csv")?;
let writer = JsonItemWriterBuilder::<Record>::new() .pretty_formatter(true) .from_path("/temp/output.json")?;
let process_step = StepBuilder::new("process") .chunk(100) .reader(&reader) .writer(&writer) .build();
// Step 3: Compress output let zip_tasklet = ZipTaskletBuilder::new() .source_path("/temp/output.json") .target_path("/output/data.zip") .compression_level(9) .build()?;
let compress_step = StepBuilder::new("compress") .tasklet(&zip_tasklet) .build();
// Step 4: Upload result let upload_tasklet = FtpTaskletBuilder::new() .host("ftp.example.com") .username("user") .password("password") .local_path("/output/data.zip") .remote_path("/results/data.zip") .operation(FtpOperation::Upload) .build()?;
let upload_step = StepBuilder::new("upload") .tasklet(&upload_tasklet) .build();
// Create job let job = JobBuilder::new() .start(&download_step) .next(&process_step) .next(&compress_step) .next(&upload_step) .build();
job.run()?;
println!("Pipeline completed successfully!"); Ok(())}use spring_batch_rs::{ core::{job::JobBuilder, step::StepBuilder}, tasklet::s3::put::S3PutTaskletBuilder,};
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let put_tasklet = S3PutTaskletBuilder::new() .bucket("my-batch-bucket") .key("exports/data.csv") .local_file("./output/data.csv") .region("eu-west-1") .build()?;
let step = StepBuilder::new("s3-upload") .tasklet(&put_tasklet) .build();
let job = JobBuilder::new().start(&step).build(); job.run()?;
println!("Upload completed!"); Ok(())}use spring_batch_rs::tasklet::s3::get::S3GetTaskletBuilder;
let get_tasklet = S3GetTaskletBuilder::new() .bucket("my-batch-bucket") .key("imports/data.csv") .local_file("./input/data.csv") .region("eu-west-1") .build()?;let put_tasklet = S3PutTaskletBuilder::new() .bucket("my-bucket") .key("file.csv") .local_file("./output/file.csv") .endpoint_url("http://localhost:9000") // MinIO .access_key_id("minioadmin") .secret_access_key("minioadmin") .build()?;use spring_batch_rs::{ core::{job::JobBuilder, step::StepBuilder}, tasklet::s3::{get::S3GetTaskletBuilder, put::S3PutTaskletBuilder},};use std::env::temp_dir;
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let upload_path = temp_dir().join("sample.csv"); let download_path = temp_dir().join("downloaded.csv");
// Step 1: Upload let put_tasklet = S3PutTaskletBuilder::new() .bucket("my-batch-bucket") .key("exports/sample.csv") .local_file(&upload_path) .endpoint_url("http://localhost:4566") // LocalStack .access_key_id("test") .secret_access_key("test") .region("us-east-1") .build()?;
let upload_step = StepBuilder::new("s3-upload").tasklet(&put_tasklet).build();
// Step 2: Download let get_tasklet = S3GetTaskletBuilder::new() .bucket("my-batch-bucket") .key("exports/sample.csv") .local_file(&download_path) .endpoint_url("http://localhost:4566") .access_key_id("test") .secret_access_key("test") .region("us-east-1") .build()?;
let download_step = StepBuilder::new("s3-download").tasklet(&get_tasklet).build();
// Run job let job = JobBuilder::new() .start(&upload_step) .next(&download_step) .build();
let result = job.run()?; println!("Job status: completed, duration: {:?}", result.duration); Ok(())}Run this example with:
# Start LocalStack firstdocker run --rm -p 4566:4566 localstack/localstackaws --endpoint-url=http://localhost:4566 s3 mb s3://my-batch-bucket
# Run examplecargo run --example tasklet_s3 --features s3use spring_batch_rs::core::step::{Tasklet, StepExecution, RepeatStatus};use spring_batch_rs::error::BatchError;use sqlx::PgPool;
struct DatabaseCleanupTasklet { pool: PgPool, days_to_keep: i32,}
impl Tasklet for DatabaseCleanupTasklet { fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> { let runtime = tokio::runtime::Runtime::new() .map_err(|e| BatchError::Tasklet(e.to_string()))?;
runtime.block_on(async { let query = format!( "DELETE FROM logs WHERE created_at < NOW() - INTERVAL '{} days'", self.days_to_keep );
sqlx::query(&query) .execute(&self.pool) .await .map_err(|e| BatchError::Tasklet(format!("Cleanup failed: {}", e)))?;
Ok(RepeatStatus::Finished) }) }}
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let pool = PgPool::connect("postgres://user:pass@localhost/db").await?;
let cleanup_tasklet = DatabaseCleanupTasklet { pool, days_to_keep: 30, };
let step = StepBuilder::new("cleanup") .tasklet(&cleanup_tasklet) .build();
Ok(())}use std::fs;use std::path::Path;
struct FileCleanupTasklet { directory: String, pattern: String,}
impl Tasklet for FileCleanupTasklet { fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> { let entries = fs::read_dir(&self.directory) .map_err(|e| BatchError::Tasklet(format!("Cannot read directory: {}", e)))?;
let mut deleted = 0;
for entry in entries { let entry = entry.map_err(|e| BatchError::Tasklet(e.to_string()))?; let path = entry.path();
if let Some(name) = path.file_name().and_then(|n| n.to_str()) { if name.contains(&self.pattern) { fs::remove_file(&path) .map_err(|e| BatchError::Tasklet(e.to_string()))?; deleted += 1; } } }
println!("Deleted {} files matching '{}'", deleted, self.pattern); Ok(RepeatStatus::Finished) }}struct WebhookTasklet { url: String, message: String,}
impl Tasklet for WebhookTasklet { fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> { let client = reqwest::blocking::Client::new();
let payload = serde_json::json!({ "step": step_execution.step_name(), "message": self.message, "timestamp": chrono::Utc::now().to_rfc3339() });
let response = client .post(&self.url) .json(&payload) .send() .map_err(|e| BatchError::Tasklet(format!("HTTP request failed: {}", e)))?;
if !response.status().is_success() { return Err(BatchError::Tasklet( format!("API returned error: {}", response.status()) )); }
Ok(RepeatStatus::Finished) }}fn main() -> Result<(), Box<dyn std::error::Error>> { // Step 1: Cleanup old reports let cleanup_tasklet = FileCleanupTasklet { directory: "/reports/daily".to_string(), pattern: ".old".to_string(), };
let cleanup_step = StepBuilder::new("cleanup-old-reports") .tasklet(&cleanup_tasklet) .build();
// Step 2: Generate report (chunk-oriented step) let reader = /* database reader */; let writer = CsvItemWriterBuilder::new() .from_path("/reports/daily/report.csv")?;
let generate_step = StepBuilder::new("generate-report") .chunk(1000) .reader(&reader) .writer(&writer) .build();
// Step 3: Compress report let zip_tasklet = ZipTaskletBuilder::new() .source_path("/reports/daily/report.csv") .target_path("/reports/daily/report.zip") .compression_level(9) .build()?;
let compress_step = StepBuilder::new("compress-report") .tasklet(&zip_tasklet) .build();
// Step 4: Upload to FTP let upload_tasklet = FtpTaskletBuilder::new() .host("ftp.reports.com") .username("reports_user") .password("secret") .local_path("/reports/daily/report.zip") .remote_path("/daily_reports/report.zip") .operation(FtpOperation::Upload) .build()?;
let upload_step = StepBuilder::new("upload-report") .tasklet(&upload_tasklet) .build();
// Step 5: Send notification let notify_tasklet = WebhookTasklet { url: "https://api.company.com/notifications".to_string(), message: "Daily report generated and uploaded".to_string(), };
let notify_step = StepBuilder::new("notify") .tasklet(¬ify_tasklet) .build();
// Create and run job let job = JobBuilder::new() .start(&cleanup_step) .next(&generate_step) .next(&compress_step) .next(&upload_step) .next(¬ify_step) .build();
job.run()?;
Ok(())}use std::sync::Mutex;
struct PollingTasklet { url: String, attempts: Mutex<u32>, max_attempts: u32,}
impl Tasklet for PollingTasklet { fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> { let mut attempts = self.attempts.lock().unwrap(); *attempts += 1;
println!("Polling attempt {}/{}", *attempts, self.max_attempts);
// Check condition let client = reqwest::blocking::Client::new(); let response = client.get(&self.url) .send() .map_err(|e| BatchError::Tasklet(e.to_string()))?;
if response.status().is_success() { println!("Condition met!"); return Ok(RepeatStatus::Finished); }
if *attempts >= self.max_attempts { return Err(BatchError::Tasklet( "Max polling attempts reached".to_string() )); }
// Wait before next attempt std::thread::sleep(std::time::Duration::from_secs(10));
Ok(RepeatStatus::Continuable) }}Compression Level
Use level 3-6 for balance between speed and size. Level 9 for archival.
FTP Connections
Reuse connections when possible. Use connection pooling for multiple transfers.
Error Handling
Implement retry logic for network-related tasklets
Idempotency
Design tasklets to be safely re-runnable