Skip to content

Tasklet API

The Tasklet trait defines single-task operations that don’t fit the chunk-oriented read-process-write model. Perfect for setup, cleanup, file operations, and maintenance tasks.

pub trait Tasklet {
/// Executes the tasklet operation
///
/// # Parameters
/// - `step_execution`: The current step execution context
///
/// # Returns
/// - `Ok(RepeatStatus)` - Task completed successfully
/// - `Err(BatchError)` - An error occurred
fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError>;
}
pub enum RepeatStatus {
Finished, // Task is complete
Continuable, // Task should be repeated
}

Spring Batch RS provides 2 built-in tasklet implementations:

TaskletFeature FlagPurpose
ZipTaskletzipCompress files and directories
FtpTaskletftpTransfer files via FTP/FTPS

Compresses files and directories into ZIP archives with configurable options.

pub struct ZipTaskletBuilder { /* ... */ }
MethodTypeDefaultDescription
source_path(&Path)&PathrequiredFile or directory to compress
target_path(&Path)&PathrequiredOutput ZIP file path
compression_level(u32)u326Compression level (0-9, 0=none, 9=max)
include_pattern(&str)&str"*"Glob pattern for files to include
exclude_pattern(&str)&str""Glob pattern for files to exclude
preserve_structure(bool)booltrueKeep directory structure in archive
use spring_batch_rs::tasklet::zip::ZipTaskletBuilder;
use spring_batch_rs::core::step::StepBuilder;
use spring_batch_rs::core::job::JobBuilder;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let zip_tasklet = ZipTaskletBuilder::new()
.source_path("/data/exports")
.target_path("/backups/exports.zip")
.build()?;
let step = StepBuilder::new("zip-exports")
.tasklet(&zip_tasklet)
.build();
let job = JobBuilder::new()
.start(&step)
.build();
job.run()?;
Ok(())
}
LevelSpeedRatioUse Case
0FastestNoneStore only (archiving)
1-3FastLowQuick backups, temporary files
4-6BalancedMediumGeneral purpose (default: 6)
7-9SlowHighLong-term storage, distribution

Backup Archives

Compress data directories before backup or transfer

Log Rotation

Archive old log files to save disk space

Report Distribution

Package reports and attachments for delivery

Cleanup Tasks

Compress before deletion as part of data lifecycle


Transfers files to/from FTP or FTPS servers with authentication and connection management.

pub struct FtpTaskletBuilder { /* ... */ }
MethodTypeDefaultDescription
host(&str)&strrequiredFTP server hostname
port(u16)u1621FTP server port
username(&str)&str"anonymous"Login username
password(&str)&str""Login password
use_tls(bool)boolfalseEnable FTPS (TLS encryption)
local_path(&Path)&PathrequiredLocal file path
remote_path(&str)&strrequiredRemote file path
operation(Operation)OperationrequiredUpload or Download
use spring_batch_rs::tasklet::ftp::{FtpTaskletBuilder, FtpOperation};
use spring_batch_rs::core::step::StepBuilder;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let ftp_tasklet = FtpTaskletBuilder::new()
.host("ftp.example.com")
.port(21)
.username("user")
.password("password")
.local_path("/data/export.csv")
.remote_path("/uploads/export.csv")
.operation(FtpOperation::Upload)
.build()?;
let step = StepBuilder::new("upload-to-ftp")
.tasklet(&ftp_tasklet)
.build();
Ok(())
}
pub enum FtpOperation {
Upload, // Transfer from local to remote
Download, // Transfer from remote to local
}

The FTP tasklet handles common errors:

  • Connection failures
  • Authentication errors
  • File not found
  • Permission denied
  • Network timeouts
// Tasklet will return appropriate BatchError
match step.execute(&mut execution) {
Ok(_) => println!("Transfer successful"),
Err(BatchError::Tasklet(msg)) => eprintln!("FTP error: {}", msg),
Err(e) => eprintln!("Other error: {}", e),
}

You can create custom tasklets for any single-task operation:

use spring_batch_rs::core::step::{Tasklet, StepExecution, RepeatStatus};
use spring_batch_rs::error::BatchError;
struct HelloTasklet {
message: String,
}
impl Tasklet for HelloTasklet {
fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
println!("{}", self.message);
Ok(RepeatStatus::Finished)
}
}
impl HelloTasklet {
fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
}
}
}
use sqlx::PgPool;
struct DatabaseCleanupTasklet {
pool: PgPool,
}
impl Tasklet for DatabaseCleanupTasklet {
fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
// Execute cleanup query
let runtime = tokio::runtime::Runtime::new()
.map_err(|e| BatchError::Tasklet(format!("Runtime error: {}", e)))?;
runtime.block_on(async {
sqlx::query("DELETE FROM logs WHERE created_at < NOW() - INTERVAL '30 days'")
.execute(&self.pool)
.await
.map_err(|e| BatchError::Tasklet(format!("Cleanup failed: {}", e)))?;
Ok(RepeatStatus::Finished)
})
}
}
use std::fs;
use std::path::Path;
use std::time::{SystemTime, Duration};
struct FileCleanupTasklet {
directory: String,
age_days: u64,
}
impl Tasklet for FileCleanupTasklet {
fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
let dir = Path::new(&self.directory);
let max_age = Duration::from_secs(self.age_days * 24 * 60 * 60);
let now = SystemTime::now();
let entries = fs::read_dir(dir)
.map_err(|e| BatchError::Tasklet(format!("Failed to read directory: {}", e)))?;
let mut deleted = 0;
for entry in entries {
let entry = entry.map_err(|e| BatchError::Tasklet(e.to_string()))?;
let metadata = entry.metadata()
.map_err(|e| BatchError::Tasklet(e.to_string()))?;
if let Ok(modified) = metadata.modified() {
if let Ok(age) = now.duration_since(modified) {
if age > max_age {
fs::remove_file(entry.path())
.map_err(|e| BatchError::Tasklet(e.to_string()))?;
deleted += 1;
}
}
}
}
println!("Deleted {} old files", deleted);
Ok(RepeatStatus::Finished)
}
}
struct ApiNotificationTasklet {
webhook_url: String,
message: String,
}
impl Tasklet for ApiNotificationTasklet {
fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
let client = reqwest::blocking::Client::new();
let response = client
.post(&self.webhook_url)
.json(&serde_json::json!({
"message": self.message,
"step": step_execution.step_name(),
"status": "completed"
}))
.send()
.map_err(|e| BatchError::Tasklet(format!("HTTP request failed: {}", e)))?;
if !response.status().is_success() {
return Err(BatchError::Tasklet(
format!("API returned error: {}", response.status())
));
}
Ok(RepeatStatus::Finished)
}
}

Use RepeatStatus::Continuable for tasks that need multiple executions:

use std::sync::Mutex;
struct PollingTasklet {
attempts: Mutex<u32>,
max_attempts: u32,
}
impl Tasklet for PollingTasklet {
fn execute(&self, _step_execution: &StepExecution) -> Result<RepeatStatus, BatchError> {
let mut attempts = self.attempts.lock().unwrap();
*attempts += 1;
println!("Poll attempt {}/{}", *attempts, self.max_attempts);
// Check some condition
if check_condition() {
println!("Condition met!");
return Ok(RepeatStatus::Finished);
}
if *attempts >= self.max_attempts {
return Err(BatchError::Tasklet(
"Max polling attempts reached".to_string()
));
}
// Wait before next attempt
std::thread::sleep(std::time::Duration::from_secs(5));
Ok(RepeatStatus::Continuable)
}
}
fn check_condition() -> bool {
// Check if some external condition is met
false
}

Combine tasklets with chunk-oriented steps:

use spring_batch_rs::core::job::JobBuilder;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Step 1: Download file via FTP
let download_tasklet = FtpTaskletBuilder::new()
.host("ftp.example.com")
.username("user")
.password("pass")
.remote_path("/data/input.csv")
.local_path("/temp/input.csv")
.operation(FtpOperation::Download)
.build()?;
let download_step = StepBuilder::new("download")
.tasklet(&download_tasklet)
.build();
// Step 2: Process the file (chunk-oriented)
let reader = CsvItemReaderBuilder::new()
.from_path("/temp/input.csv")?;
let writer = JsonItemWriterBuilder::<Record>::new()
.from_path("/temp/output.json")?;
let process_step = StepBuilder::new("process")
.chunk(100)
.reader(&reader)
.writer(&writer)
.build();
// Step 3: Compress output
let zip_tasklet = ZipTaskletBuilder::new()
.source_path("/temp/output.json")
.target_path("/output/data.zip")
.compression_level(9)
.build()?;
let compress_step = StepBuilder::new("compress")
.tasklet(&zip_tasklet)
.build();
// Step 4: Upload result
let upload_tasklet = FtpTaskletBuilder::new()
.host("ftp.example.com")
.username("user")
.password("pass")
.local_path("/output/data.zip")
.remote_path("/results/data.zip")
.operation(FtpOperation::Upload)
.build()?;
let upload_step = StepBuilder::new("upload")
.tasklet(&upload_tasklet)
.build();
// Create job with all steps
let job = JobBuilder::new()
.start(&download_step)
.next(&process_step)
.next(&compress_step)
.next(&upload_step)
.build();
job.run()?;
Ok(())
}

Single Responsibility

Each tasklet should do one thing well. Create multiple tasklets for complex workflows.

Idempotency

Design tasklets to be safely re-runnable in case of failures.

Error Reporting

Return detailed error messages that help diagnose issues.

Resource Cleanup

Always clean up resources even if errors occur (use Drop trait if needed).

Use tasklets for:

  • File operations (copy, move, delete, compress)
  • FTP/SFTP transfers
  • API calls or webhooks
  • Database maintenance (vacuum, analyze, cleanup)
  • Email notifications
  • Directory creation/cleanup
  • Pre/post processing tasks

Use chunk-oriented steps for:

  • Reading and transforming large datasets
  • ETL operations
  • Data validation and filtering
  • Bulk inserts/updates