Aller au contenu

Tasklet Examples

Ce contenu n’est pas encore disponible dans votre langue.

This page provides comprehensive examples for using tasklets in Spring Batch RS for file operations and single-task processing.

Add tasklet features to your Cargo.toml:

[dependencies]
spring-batch-rs = { version = "0.1", features = ["zip"] }

use spring_batch_rs::{
core::{job::JobBuilder, step::StepBuilder},
tasklet::zip::ZipTaskletBuilder,
};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let zip_tasklet = ZipTaskletBuilder::new()
.source_path("/data/exports")
.target_path("/backups/exports.zip")
.build()?;
let step = StepBuilder::new("compress-exports")
.tasklet(&zip_tasklet)
.build();
let job = JobBuilder::new()
.start(&step)
.build();
job.run()?;
println!("Compression completed!");
Ok(())
}
let zip_tasklet = ZipTaskletBuilder::new()
.source_path("/logs")
.target_path("/archives/logs.zip")
.include_pattern("*.log") // Only .log files
.exclude_pattern("*.tmp") // Exclude .tmp files
.compression_level(9) // Maximum compression
.build()?;
let zip_tasklet = ZipTaskletBuilder::new()
.source_path("/data/nested")
.target_path("/flat-archive.zip")
.preserve_structure(false) // All files in ZIP root
.compression_level(3) // Fast compression
.build()?;
let zip_tasklet = ZipTaskletBuilder::new()
.source_path("/reports/monthly_report.pdf")
.target_path("/compressed/monthly_report.zip")
.compression_level(9)
.build()?;

use spring_batch_rs::tasklet::ftp::{FtpTaskletBuilder, FtpOperation};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let ftp_tasklet = FtpTaskletBuilder::new()
.host("ftp.example.com")
.port(21)
.username("user")
.password("password")
.local_path("/data/export.csv")
.remote_path("/uploads/export.csv")
.operation(FtpOperation::Upload)
.build()?;
let step = StepBuilder::new("upload-to-ftp")
.tasklet(&ftp_tasklet)
.build();
let job = JobBuilder::new()
.start(&step)
.build();
job.run()?;
Ok(())
}
let ftp_tasklet = FtpTaskletBuilder::new()
.host("ftp.example.com")
.username("user")
.password("password")
.remote_path("/exports/data.csv")
.local_path("/downloads/data.csv")
.operation(FtpOperation::Download)
.build()?;
let ftp_tasklet = FtpTaskletBuilder::new()
.host("ftps.example.com")
.port(990) // FTPS implicit port
.use_tls(true) // Enable TLS encryption
.username("user")
.password("password")
.local_path("/secure/data.xml")
.remote_path("/secure/data.xml")
.operation(FtpOperation::Upload)
.build()?;

use spring_batch_rs::item::{
csv::CsvItemReaderBuilder,
json::JsonItemWriterBuilder,
};
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Step 1: Download file from FTP
let download_tasklet = FtpTaskletBuilder::new()
.host("ftp.example.com")
.username("user")
.password("password")
.remote_path("/data/input.csv")
.local_path("/temp/input.csv")
.operation(FtpOperation::Download)
.build()?;
let download_step = StepBuilder::new("download")
.tasklet(&download_tasklet)
.build();
// Step 2: Process the file
let reader = CsvItemReaderBuilder::new()
.from_path("/temp/input.csv")?;
let writer = JsonItemWriterBuilder::<Record>::new()
.pretty_formatter(true)
.from_path("/temp/output.json")?;
let process_step = StepBuilder::new("process")
.chunk(100)
.reader(&reader)
.writer(&writer)
.build();
// Step 3: Compress output
let zip_tasklet = ZipTaskletBuilder::new()
.source_path("/temp/output.json")
.target_path("/output/data.zip")
.compression_level(9)
.build()?;
let compress_step = StepBuilder::new("compress")
.tasklet(&zip_tasklet)
.build();
// Step 4: Upload result
let upload_tasklet = FtpTaskletBuilder::new()
.host("ftp.example.com")
.username("user")
.password("password")
.local_path("/output/data.zip")
.remote_path("/results/data.zip")
.operation(FtpOperation::Upload)
.build()?;
let upload_step = StepBuilder::new("upload")
.tasklet(&upload_tasklet)
.build();
// Create job
let job = JobBuilder::new()
.start(&download_step)
.next(&process_step)
.next(&compress_step)
.next(&upload_step)
.build();
job.run()?;
println!("Pipeline completed successfully!");
Ok(())
}

use spring_batch_rs::{
core::{job::JobBuilder, step::StepBuilder},
tasklet::s3::put::S3PutTaskletBuilder,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let put_tasklet = S3PutTaskletBuilder::new()
.bucket("my-batch-bucket")
.key("exports/data.csv")
.local_file("./output/data.csv")
.region("eu-west-1")
.build()?;
let step = StepBuilder::new("s3-upload")
.tasklet(&put_tasklet)
.build();
let job = JobBuilder::new().start(&step).build();
job.run()?;
println!("Upload completed!");
Ok(())
}
use spring_batch_rs::tasklet::s3::get::S3GetTaskletBuilder;
let get_tasklet = S3GetTaskletBuilder::new()
.bucket("my-batch-bucket")
.key("imports/data.csv")
.local_file("./input/data.csv")
.region("eu-west-1")
.build()?;

S3-Compatible Services (MinIO / LocalStack)

Section titled “S3-Compatible Services (MinIO / LocalStack)”
let put_tasklet = S3PutTaskletBuilder::new()
.bucket("my-bucket")
.key("file.csv")
.local_file("./output/file.csv")
.endpoint_url("http://localhost:9000") // MinIO
.access_key_id("minioadmin")
.secret_access_key("minioadmin")
.build()?;
use spring_batch_rs::{
core::{job::JobBuilder, step::StepBuilder},
tasklet::s3::{get::S3GetTaskletBuilder, put::S3PutTaskletBuilder},
};
use std::env::temp_dir;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let upload_path = temp_dir().join("sample.csv");
let download_path = temp_dir().join("downloaded.csv");
// Step 1: Upload
let put_tasklet = S3PutTaskletBuilder::new()
.bucket("my-batch-bucket")
.key("exports/sample.csv")
.local_file(&upload_path)
.endpoint_url("http://localhost:4566") // LocalStack
.access_key_id("test")
.secret_access_key("test")
.region("us-east-1")
.build()?;
let upload_step = StepBuilder::new("s3-upload").tasklet(&put_tasklet).build();
// Step 2: Download
let get_tasklet = S3GetTaskletBuilder::new()
.bucket("my-batch-bucket")
.key("exports/sample.csv")
.local_file(&download_path)
.endpoint_url("http://localhost:4566")
.access_key_id("test")
.secret_access_key("test")
.region("us-east-1")
.build()?;
let download_step = StepBuilder::new("s3-download").tasklet(&get_tasklet).build();
// Run job
let job = JobBuilder::new()
.start(&upload_step)
.next(&download_step)
.build();
let result = job.run()?;
println!("Job status: completed, duration: {:?}", result.duration);
Ok(())
}

Run this example with:

Terminal window
# Start LocalStack first
docker run --rm -p 4566:4566 localstack/localstack
aws --endpoint-url=http://localhost:4566 s3 mb s3://my-batch-bucket
# Run example
cargo run --example tasklet_s3 --features s3

use spring_batch_rs::core::step::{Tasklet, StepExecution, RepeatStatus};
use spring_batch_rs::error::BatchError;
use sqlx::PgPool;
struct DatabaseCleanupTasklet {
pool: PgPool,
days_to_keep: i32,
}
impl Tasklet for DatabaseCleanupTasklet {
fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
let runtime = tokio::runtime::Runtime::new()
.map_err(|e| BatchError::Tasklet(e.to_string()))?;
runtime.block_on(async {
let query = format!(
"DELETE FROM logs WHERE created_at < NOW() - INTERVAL '{} days'",
self.days_to_keep
);
sqlx::query(&query)
.execute(&self.pool)
.await
.map_err(|e| BatchError::Tasklet(format!("Cleanup failed: {}", e)))?;
Ok(RepeatStatus::Finished)
})
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect("postgres://user:pass@localhost/db").await?;
let cleanup_tasklet = DatabaseCleanupTasklet {
pool,
days_to_keep: 30,
};
let step = StepBuilder::new("cleanup")
.tasklet(&cleanup_tasklet)
.build();
Ok(())
}
use std::fs;
use std::path::Path;
struct FileCleanupTasklet {
directory: String,
pattern: String,
}
impl Tasklet for FileCleanupTasklet {
fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
let entries = fs::read_dir(&self.directory)
.map_err(|e| BatchError::Tasklet(format!("Cannot read directory: {}", e)))?;
let mut deleted = 0;
for entry in entries {
let entry = entry.map_err(|e| BatchError::Tasklet(e.to_string()))?;
let path = entry.path();
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
if name.contains(&self.pattern) {
fs::remove_file(&path)
.map_err(|e| BatchError::Tasklet(e.to_string()))?;
deleted += 1;
}
}
}
println!("Deleted {} files matching '{}'", deleted, self.pattern);
Ok(RepeatStatus::Finished)
}
}
struct WebhookTasklet {
url: String,
message: String,
}
impl Tasklet for WebhookTasklet {
fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
let client = reqwest::blocking::Client::new();
let payload = serde_json::json!({
"step": step_execution.step_name(),
"message": self.message,
"timestamp": chrono::Utc::now().to_rfc3339()
});
let response = client
.post(&self.url)
.json(&payload)
.send()
.map_err(|e| BatchError::Tasklet(format!("HTTP request failed: {}", e)))?;
if !response.status().is_success() {
return Err(BatchError::Tasklet(
format!("API returned error: {}", response.status())
));
}
Ok(RepeatStatus::Finished)
}
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
// Step 1: Cleanup old reports
let cleanup_tasklet = FileCleanupTasklet {
directory: "/reports/daily".to_string(),
pattern: ".old".to_string(),
};
let cleanup_step = StepBuilder::new("cleanup-old-reports")
.tasklet(&cleanup_tasklet)
.build();
// Step 2: Generate report (chunk-oriented step)
let reader = /* database reader */;
let writer = CsvItemWriterBuilder::new()
.from_path("/reports/daily/report.csv")?;
let generate_step = StepBuilder::new("generate-report")
.chunk(1000)
.reader(&reader)
.writer(&writer)
.build();
// Step 3: Compress report
let zip_tasklet = ZipTaskletBuilder::new()
.source_path("/reports/daily/report.csv")
.target_path("/reports/daily/report.zip")
.compression_level(9)
.build()?;
let compress_step = StepBuilder::new("compress-report")
.tasklet(&zip_tasklet)
.build();
// Step 4: Upload to FTP
let upload_tasklet = FtpTaskletBuilder::new()
.host("ftp.reports.com")
.username("reports_user")
.password("secret")
.local_path("/reports/daily/report.zip")
.remote_path("/daily_reports/report.zip")
.operation(FtpOperation::Upload)
.build()?;
let upload_step = StepBuilder::new("upload-report")
.tasklet(&upload_tasklet)
.build();
// Step 5: Send notification
let notify_tasklet = WebhookTasklet {
url: "https://api.company.com/notifications".to_string(),
message: "Daily report generated and uploaded".to_string(),
};
let notify_step = StepBuilder::new("notify")
.tasklet(&notify_tasklet)
.build();
// Create and run job
let job = JobBuilder::new()
.start(&cleanup_step)
.next(&generate_step)
.next(&compress_step)
.next(&upload_step)
.next(&notify_step)
.build();
job.run()?;
Ok(())
}

use std::sync::Mutex;
struct PollingTasklet {
url: String,
attempts: Mutex<u32>,
max_attempts: u32,
}
impl Tasklet for PollingTasklet {
fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
let mut attempts = self.attempts.lock().unwrap();
*attempts += 1;
println!("Polling attempt {}/{}", *attempts, self.max_attempts);
// Check condition
let client = reqwest::blocking::Client::new();
let response = client.get(&self.url)
.send()
.map_err(|e| BatchError::Tasklet(e.to_string()))?;
if response.status().is_success() {
println!("Condition met!");
return Ok(RepeatStatus::Finished);
}
if *attempts >= self.max_attempts {
return Err(BatchError::Tasklet(
"Max polling attempts reached".to_string()
));
}
// Wait before next attempt
std::thread::sleep(std::time::Duration::from_secs(10));
Ok(RepeatStatus::Continuable)
}
}

Compression Level

Use level 3-6 for balance between speed and size. Level 9 for archival.

FTP Connections

Reuse connections when possible. Use connection pooling for multiple transfers.

Error Handling

Implement retry logic for network-related tasklets

Idempotency

Design tasklets to be safely re-runnable