Page Size
Use larger page sizes (500-1000) for better database performance
This page provides comprehensive examples for working with relational databases (PostgreSQL, MySQL, SQLite) using Spring Batch RS.
Add database features to your Cargo.toml:
[dependencies]spring-batch-rs = { version = "0.1", features = ["rdbc-postgres"] }sqlx = { version = "0.7", features = ["runtime-tokio-native-tls", "postgres"] }tokio = { version = "1", features = ["full"] }serde = { version = "1.0", features = ["derive"] }[dependencies]spring-batch-rs = { version = "0.1", features = ["rdbc-mysql"] }sqlx = { version = "0.7", features = ["runtime-tokio-native-tls", "mysql"] }tokio = { version = "1", features = ["full"] }serde = { version = "1.0", features = ["derive"] }[dependencies]spring-batch-rs = { version = "0.1", features = ["rdbc-sqlite"] }sqlx = { version = "0.7", features = ["runtime-tokio-native-tls", "sqlite"] }tokio = { version = "1", features = ["full"] }serde = { version = "1.0", features = ["derive"] }use spring_batch_rs::{ core::{step::{StepBuilder, StepExecution}, item::PassThroughProcessor}, item::{ rdbc::RdbcItemReaderBuilder, logger::LoggerWriter, },};use sqlx::{PgPool, FromRow};use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Deserialize, Serialize, FromRow)]struct Person { id: i64, first_name: String, last_name: String, email: String, birth_date: String,}
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { // Connect to database let pool = PgPool::connect("postgres://user:password@localhost:5432/mydb").await?;
// Create reader let reader = RdbcItemReaderBuilder::<Person>::new() .postgres(pool) .query("SELECT id, first_name, last_name, email, birth_date FROM persons") .with_page_size(50) .build_postgres();
// Create writer and processor let writer = LoggerWriterBuilder::<Person>::new().build(); let processor = PassThroughProcessor::<Person>::new();
// Build and execute step let step = StepBuilder::new("read-postgres") .chunk::<Person, Person>(10) .reader(&reader) .processor(&processor) .writer(&writer) .build();
let mut execution = StepExecution::new("read-postgres"); step.execute(&mut execution)?;
Ok(())}use spring_batch_rs::item::rdbc::postgres::PostgresItemWriterBuilder;use spring_batch_rs::item::csv::CsvItemReaderBuilder;use spring_batch_rs::core::item::PassThroughProcessor;
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let pool = PgPool::connect("postgres://user:password@localhost:5432/mydb").await?;
// Read from CSV let reader = CsvItemReaderBuilder::<Person>::new() .has_headers(true) .from_path("persons.csv")?;
// Write to PostgreSQL let writer = PostgresItemWriterBuilder::new() .pool(pool) .table("persons") .binder(|query, person: &Person| { query.push_values([person], |mut b, p| { b.push_bind(&p.first_name) .push_bind(&p.last_name) .push_bind(&p.email) .push_bind(&p.birth_date); }); }) .build();
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("csv-to-postgres") .chunk::<Person, Person>(100) .reader(&reader) .processor(&processor) .writer(&writer) .build();
let mut execution = StepExecution::new("csv-to-postgres"); step.execute(&mut execution)?;
Ok(())}let reader = RdbcItemReaderBuilder::<Person>::new() .postgres(pool) .query( "SELECT id, first_name, last_name, email, birth_date FROM persons WHERE active = true AND created_at > '2024-01-01'" ) .with_page_size(100) .build_postgres();use sqlx::MySqlPool;use spring_batch_rs::core::item::PassThroughProcessor;
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let pool = MySqlPool::connect("mysql://user:password@localhost:3306/mydb").await?;
let reader = RdbcItemReaderBuilder::<Person>::new() .mysql(pool) .query("SELECT id, first_name, last_name, email, birth_date FROM persons") .with_page_size(50) .build_mysql();
let writer = LoggerWriterBuilder::<Person>::new().build(); let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("read-mysql") .chunk::<Person, Person>(10) .reader(&reader) .processor(&processor) .writer(&writer) .build();
let mut execution = StepExecution::new("read-mysql"); step.execute(&mut execution)?;
Ok(())}use spring_batch_rs::item::rdbc::mysql::MysqlItemWriterBuilder;use spring_batch_rs::core::item::PassThroughProcessor;
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let pool = MySqlPool::connect("mysql://user:password@localhost:3306/mydb").await?;
let reader = CsvItemReaderBuilder::<Person>::new() .has_headers(true) .from_path("persons.csv")?;
let writer = MysqlItemWriterBuilder::new() .pool(pool) .table("persons") .binder(|query, person: &Person| { query.push_values([person], |mut b, p| { b.push_bind(&p.first_name) .push_bind(&p.last_name) .push_bind(&p.email) .push_bind(&p.birth_date); }); }) .build();
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("csv-to-mysql") .chunk::<Person, Person>(100) .reader(&reader) .processor(&processor) .writer(&writer) .build();
Ok(())}use sqlx::SqlitePool;use spring_batch_rs::core::item::PassThroughProcessor;
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let pool = SqlitePool::connect("sqlite:./mydb.db").await?;
let reader = RdbcItemReaderBuilder::<Person>::new() .sqlite(pool) .query("SELECT id, first_name, last_name, email, birth_date FROM persons") .with_page_size(50) .build_sqlite();
let writer = LoggerWriterBuilder::<Person>::new().build();
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("read-sqlite") .chunk::<Person, Person>(10) .reader(&reader) .processor(&processor) .writer(&writer) .build();
Ok(())}use spring_batch_rs::item::rdbc::sqlite::SqliteItemWriterBuilder;use spring_batch_rs::core::item::PassThroughProcessor;
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let pool = SqlitePool::connect("sqlite:./mydb.db").await?;
let reader = JsonItemReaderBuilder::<Person>::new() .from_path("persons.json")?;
let writer = SqliteItemWriterBuilder::new() .pool(pool) .table("persons") .binder(|query, person: &Person| { query.push_values([person], |mut b, p| { b.push_bind(&p.first_name) .push_bind(&p.last_name) .push_bind(&p.email) .push_bind(&p.birth_date); }); }) .build();
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("json-to-sqlite") .chunk::<Person, Person>(100) .reader(&reader) .processor(&processor) .writer(&writer) .build();
Ok(())}#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { // Source: PostgreSQL let pg_pool = PgPool::connect("postgres://user:pass@localhost:5432/source_db").await?;
let reader = RdbcItemReaderBuilder::<Person>::new() .postgres(pg_pool) .query("SELECT id, first_name, last_name, email, birth_date FROM persons") .with_page_size(100) .build_postgres();
// Target: MySQL let mysql_pool = MySqlPool::connect("mysql://user:pass@localhost:3306/target_db").await?;
let writer = MysqlItemWriterBuilder::new() .pool(mysql_pool) .table("persons") .binder(|query, person: &Person| { query.push_values([person], |mut b, p| { b.push_bind(&p.first_name) .push_bind(&p.last_name) .push_bind(&p.email) .push_bind(&p.birth_date); }); }) .build();
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("pg-to-mysql") .chunk::<Person, Person>(500) .reader(&reader) .processor(&processor) .writer(&writer) .build();
let mut execution = StepExecution::new("pg-to-mysql"); step.execute(&mut execution)?;
println!("Migration completed!"); Ok(())}use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};use spring_batch_rs::error::BatchError;
#[derive(Debug, Clone, FromRow)]struct RawPerson { id: i64, name: String, // Full name email: String,}
#[derive(Debug, Clone, Serialize)]struct ProcessedPerson { id: i64, first_name: String, last_name: String, email: String, email_domain: String,}
struct PersonProcessor;
impl ItemProcessor<RawPerson, ProcessedPerson> for PersonProcessor { fn process(&self, item: &RawPerson) -> ItemProcessorResult<ProcessedPerson> { // Split name let parts: Vec<&str> = item.name.split_whitespace().collect(); if parts.len() < 2 { return Err(BatchError::ItemProcessor( format!("Invalid name format: {}", item.name) )); }
let first_name = parts[0].to_string(); let last_name = parts[1..].join(" ");
// Extract domain let email_domain = item.email .split('@') .nth(1) .ok_or_else(|| BatchError::ItemProcessor( format!("Invalid email: {}", item.email) ))? .to_string();
Ok(ProcessedPerson { id: item.id, first_name, last_name, email: item.email.clone(), email_domain, }) }}
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let pool = PgPool::connect("postgres://user:pass@localhost/db").await?;
let reader = RdbcItemReaderBuilder::<RawPerson>::new() .postgres(pool.clone()) .query("SELECT id, name, email FROM raw_persons") .with_page_size(100) .build_postgres();
let processor = PersonProcessor;
let writer = PostgresItemWriterBuilder::new() .pool(pool) .table("processed_persons") .binder(|query, person: &ProcessedPerson| { query.push_values([person], |mut b, p| { b.push_bind(&p.id) .push_bind(&p.first_name) .push_bind(&p.last_name) .push_bind(&p.email) .push_bind(&p.email_domain); }); }) .build();
let step = StepBuilder::new("transform-persons") .chunk::<RawPerson, ProcessedPerson>(100) .reader(&reader) .processor(&processor) .writer(&writer) .skip_limit(10) // Skip up to 10 invalid records .build();
Ok(())}#[derive(Debug, Clone, FromRow, Serialize)]struct OrderWithCustomer { order_id: i64, order_date: String, customer_name: String, customer_email: String, total_amount: f64,}
let reader = RdbcItemReaderBuilder::<OrderWithCustomer>::new() .postgres(pool) .query( "SELECT o.id as order_id, o.order_date, c.name as customer_name, c.email as customer_email, o.total_amount FROM orders o JOIN customers c ON o.customer_id = c.id WHERE o.order_date >= '2024-01-01' ORDER BY o.order_date DESC" ) .with_page_size(100) .build_postgres();#[derive(Debug, Clone, FromRow, Serialize)]struct SalesSummary { product_id: i64, product_name: String, total_quantity: i64, total_revenue: f64, order_count: i64,}
let reader = RdbcItemReaderBuilder::<SalesSummary>::new() .postgres(pool) .query( "SELECT p.id as product_id, p.name as product_name, SUM(oi.quantity) as total_quantity, SUM(oi.quantity * oi.unit_price) as total_revenue, COUNT(DISTINCT oi.order_id) as order_count FROM products p JOIN order_items oi ON p.id = oi.product_id GROUP BY p.id, p.name HAVING SUM(oi.quantity * oi.unit_price) > 1000 ORDER BY total_revenue DESC" ) .with_page_size(50) .build_postgres();#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let pool = PgPool::connect("postgres://user:pass@localhost/db").await?;
let reader = RdbcItemReaderBuilder::<Person>::new() .postgres(pool) .query("SELECT * FROM persons WHERE active = true") .with_page_size(100) .build_postgres();
let writer = JsonItemWriterBuilder::<Person>::new() .pretty_formatter(true) .from_path("persons_export.json")?;
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("export-to-json") .chunk::<Person, Person>(500) .reader(&reader) .processor(&processor) .writer(&writer) .build();
Ok(())}let writer = CsvItemWriterBuilder::new() .has_headers(true) .from_path("persons_export.csv")?;
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("export-to-csv") .chunk::<Person, Person>(500) .reader(&reader) .processor(&processor) .writer(&writer) .build();use chrono::NaiveDate;
#[derive(Debug, Clone, FromRow)]struct TransactionSource { transaction_id: i64, customer_id: i64, product_id: i64, quantity: i32, unit_price: f64, transaction_date: String,}
#[derive(Debug, Clone, Serialize)]struct TransactionFact { transaction_key: i64, customer_key: i64, product_key: i64, date_key: i32, quantity: i32, unit_price: f64, total_amount: f64, gross_profit: f64,}
struct FactProcessor { product_costs: HashMap<i64, f64>,}
impl ItemProcessor<TransactionSource, TransactionFact> for FactProcessor { fn process(&self, item: &TransactionSource) -> ItemProcessorResult<TransactionFact> { let total_amount = item.quantity as f64 * item.unit_price;
let cost = self.product_costs .get(&item.product_id) .copied() .unwrap_or(0.0);
let gross_profit = total_amount - (cost * item.quantity as f64);
// Convert date to date key (YYYYMMDD format) let date = NaiveDate::parse_from_str(&item.transaction_date, "%Y-%m-%d") .map_err(|e| BatchError::ItemProcessor(e.to_string()))?;
let date_key = date.format("%Y%m%d") .to_string() .parse::<i32>() .unwrap();
Ok(TransactionFact { transaction_key: item.transaction_id, customer_key: item.customer_id, product_key: item.product_id, date_key, quantity: item.quantity, unit_price: item.unit_price, total_amount, gross_profit, }) }}
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { // Load product costs let mut product_costs = HashMap::new(); product_costs.insert(1, 50.0); product_costs.insert(2, 25.0);
let processor = FactProcessor { product_costs };
// Source: OLTP database let source_pool = PgPool::connect("postgres://user:pass@localhost/oltp_db").await?;
let reader = RdbcItemReaderBuilder::<TransactionSource>::new() .postgres(source_pool) .query( "SELECT transaction_id, customer_id, product_id, quantity, unit_price, transaction_date FROM transactions WHERE transaction_date >= CURRENT_DATE - INTERVAL '1 day'" ) .with_page_size(1000) .build_postgres();
// Target: Data warehouse let dw_pool = PgPool::connect("postgres://user:pass@localhost/dw_db").await?;
let writer = PostgresItemWriterBuilder::new() .pool(dw_pool) .table("fact_transactions") .binder(|query, fact: &TransactionFact| { query.push_values([fact], |mut b, f| { b.push_bind(&f.transaction_key) .push_bind(&f.customer_key) .push_bind(&f.product_key) .push_bind(&f.date_key) .push_bind(&f.quantity) .push_bind(&f.unit_price) .push_bind(&f.total_amount) .push_bind(&f.gross_profit); }); }) .build();
let step = StepBuilder::new("load-facts") .chunk::<TransactionSource, TransactionFact>(1000) .reader(&reader) .processor(&processor) .writer(&writer) .build();
Ok(())}Page Size
Use larger page sizes (500-1000) for better database performance
Chunk Size
Match chunk size to page size for optimal memory usage
Connection Pooling
Use appropriate pool size based on concurrent operations
Indexes
Ensure source tables have indexes on query columns