Aller au contenu

Database Examples

Ce contenu n’est pas encore disponible dans votre langue.

This page provides comprehensive examples for working with relational databases (PostgreSQL, MySQL, SQLite) using Spring Batch RS.

Add database features to your Cargo.toml:

[dependencies]
spring-batch-rs = { version = "0.1", features = ["rdbc-postgres"] }
sqlx = { version = "0.7", features = ["runtime-tokio-native-tls", "postgres"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }

use spring_batch_rs::{
core::{step::{StepBuilder, StepExecution}, item::PassThroughProcessor},
item::{
rdbc::RdbcItemReaderBuilder,
logger::LoggerWriter,
},
};
use sqlx::{PgPool, FromRow};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Deserialize, Serialize, FromRow)]
struct Person {
id: i64,
first_name: String,
last_name: String,
email: String,
birth_date: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect to database
let pool = PgPool::connect("postgres://user:password@localhost:5432/mydb").await?;
// Create reader
let reader = RdbcItemReaderBuilder::<Person>::new()
.postgres(pool)
.query("SELECT id, first_name, last_name, email, birth_date FROM persons")
.with_page_size(50)
.build_postgres();
// Create writer and processor
let writer = LoggerWriterBuilder::<Person>::new().build();
let processor = PassThroughProcessor::<Person>::new();
// Build and execute step
let step = StepBuilder::new("read-postgres")
.chunk::<Person, Person>(10)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut execution = StepExecution::new("read-postgres");
step.execute(&mut execution)?;
Ok(())
}
use spring_batch_rs::item::rdbc::postgres::PostgresItemWriterBuilder;
use spring_batch_rs::item::csv::CsvItemReaderBuilder;
use spring_batch_rs::core::item::PassThroughProcessor;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect("postgres://user:password@localhost:5432/mydb").await?;
// Read from CSV
let reader = CsvItemReaderBuilder::<Person>::new()
.has_headers(true)
.from_path("persons.csv")?;
// Write to PostgreSQL
let writer = PostgresItemWriterBuilder::new()
.pool(pool)
.table("persons")
.binder(|query, person: &Person| {
query.push_values([person], |mut b, p| {
b.push_bind(&p.first_name)
.push_bind(&p.last_name)
.push_bind(&p.email)
.push_bind(&p.birth_date);
});
})
.build();
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("csv-to-postgres")
.chunk::<Person, Person>(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut execution = StepExecution::new("csv-to-postgres");
step.execute(&mut execution)?;
Ok(())
}
let reader = RdbcItemReaderBuilder::<Person>::new()
.postgres(pool)
.query(
"SELECT id, first_name, last_name, email, birth_date
FROM persons
WHERE active = true AND created_at > '2024-01-01'"
)
.with_page_size(100)
.build_postgres();

use sqlx::MySqlPool;
use spring_batch_rs::core::item::PassThroughProcessor;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = MySqlPool::connect("mysql://user:password@localhost:3306/mydb").await?;
let reader = RdbcItemReaderBuilder::<Person>::new()
.mysql(pool)
.query("SELECT id, first_name, last_name, email, birth_date FROM persons")
.with_page_size(50)
.build_mysql();
let writer = LoggerWriterBuilder::<Person>::new().build();
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("read-mysql")
.chunk::<Person, Person>(10)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut execution = StepExecution::new("read-mysql");
step.execute(&mut execution)?;
Ok(())
}
use spring_batch_rs::item::rdbc::mysql::MysqlItemWriterBuilder;
use spring_batch_rs::core::item::PassThroughProcessor;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = MySqlPool::connect("mysql://user:password@localhost:3306/mydb").await?;
let reader = CsvItemReaderBuilder::<Person>::new()
.has_headers(true)
.from_path("persons.csv")?;
let writer = MysqlItemWriterBuilder::new()
.pool(pool)
.table("persons")
.binder(|query, person: &Person| {
query.push_values([person], |mut b, p| {
b.push_bind(&p.first_name)
.push_bind(&p.last_name)
.push_bind(&p.email)
.push_bind(&p.birth_date);
});
})
.build();
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("csv-to-mysql")
.chunk::<Person, Person>(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
Ok(())
}

use sqlx::SqlitePool;
use spring_batch_rs::core::item::PassThroughProcessor;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = SqlitePool::connect("sqlite:./mydb.db").await?;
let reader = RdbcItemReaderBuilder::<Person>::new()
.sqlite(pool)
.query("SELECT id, first_name, last_name, email, birth_date FROM persons")
.with_page_size(50)
.build_sqlite();
let writer = LoggerWriterBuilder::<Person>::new().build();
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("read-sqlite")
.chunk::<Person, Person>(10)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
Ok(())
}
use spring_batch_rs::item::rdbc::sqlite::SqliteItemWriterBuilder;
use spring_batch_rs::core::item::PassThroughProcessor;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = SqlitePool::connect("sqlite:./mydb.db").await?;
let reader = JsonItemReaderBuilder::<Person>::new()
.from_path("persons.json")?;
let writer = SqliteItemWriterBuilder::new()
.pool(pool)
.table("persons")
.binder(|query, person: &Person| {
query.push_values([person], |mut b, p| {
b.push_bind(&p.first_name)
.push_bind(&p.last_name)
.push_bind(&p.email)
.push_bind(&p.birth_date);
});
})
.build();
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("json-to-sqlite")
.chunk::<Person, Person>(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Source: PostgreSQL
let pg_pool = PgPool::connect("postgres://user:pass@localhost:5432/source_db").await?;
let reader = RdbcItemReaderBuilder::<Person>::new()
.postgres(pg_pool)
.query("SELECT id, first_name, last_name, email, birth_date FROM persons")
.with_page_size(100)
.build_postgres();
// Target: MySQL
let mysql_pool = MySqlPool::connect("mysql://user:pass@localhost:3306/target_db").await?;
let writer = MysqlItemWriterBuilder::new()
.pool(mysql_pool)
.table("persons")
.binder(|query, person: &Person| {
query.push_values([person], |mut b, p| {
b.push_bind(&p.first_name)
.push_bind(&p.last_name)
.push_bind(&p.email)
.push_bind(&p.birth_date);
});
})
.build();
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("pg-to-mysql")
.chunk::<Person, Person>(500)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
let mut execution = StepExecution::new("pg-to-mysql");
step.execute(&mut execution)?;
println!("Migration completed!");
Ok(())
}

use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};
use spring_batch_rs::error::BatchError;
#[derive(Debug, Clone, FromRow)]
struct RawPerson {
id: i64,
name: String, // Full name
email: String,
}
#[derive(Debug, Clone, Serialize)]
struct ProcessedPerson {
id: i64,
first_name: String,
last_name: String,
email: String,
email_domain: String,
}
struct PersonProcessor;
impl ItemProcessor<RawPerson, ProcessedPerson> for PersonProcessor {
fn process(&self, item: &RawPerson) -> ItemProcessorResult<ProcessedPerson> {
// Split name
let parts: Vec<&str> = item.name.split_whitespace().collect();
if parts.len() < 2 {
return Err(BatchError::ItemProcessor(
format!("Invalid name format: {}", item.name)
));
}
let first_name = parts[0].to_string();
let last_name = parts[1..].join(" ");
// Extract domain
let email_domain = item.email
.split('@')
.nth(1)
.ok_or_else(|| BatchError::ItemProcessor(
format!("Invalid email: {}", item.email)
))?
.to_string();
Ok(ProcessedPerson {
id: item.id,
first_name,
last_name,
email: item.email.clone(),
email_domain,
})
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect("postgres://user:pass@localhost/db").await?;
let reader = RdbcItemReaderBuilder::<RawPerson>::new()
.postgres(pool.clone())
.query("SELECT id, name, email FROM raw_persons")
.with_page_size(100)
.build_postgres();
let processor = PersonProcessor;
let writer = PostgresItemWriterBuilder::new()
.pool(pool)
.table("processed_persons")
.binder(|query, person: &ProcessedPerson| {
query.push_values([person], |mut b, p| {
b.push_bind(&p.id)
.push_bind(&p.first_name)
.push_bind(&p.last_name)
.push_bind(&p.email)
.push_bind(&p.email_domain);
});
})
.build();
let step = StepBuilder::new("transform-persons")
.chunk::<RawPerson, ProcessedPerson>(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.skip_limit(10) // Skip up to 10 invalid records
.build();
Ok(())
}

#[derive(Debug, Clone, FromRow, Serialize)]
struct OrderWithCustomer {
order_id: i64,
order_date: String,
customer_name: String,
customer_email: String,
total_amount: f64,
}
let reader = RdbcItemReaderBuilder::<OrderWithCustomer>::new()
.postgres(pool)
.query(
"SELECT
o.id as order_id,
o.order_date,
c.name as customer_name,
c.email as customer_email,
o.total_amount
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.order_date >= '2024-01-01'
ORDER BY o.order_date DESC"
)
.with_page_size(100)
.build_postgres();
#[derive(Debug, Clone, FromRow, Serialize)]
struct SalesSummary {
product_id: i64,
product_name: String,
total_quantity: i64,
total_revenue: f64,
order_count: i64,
}
let reader = RdbcItemReaderBuilder::<SalesSummary>::new()
.postgres(pool)
.query(
"SELECT
p.id as product_id,
p.name as product_name,
SUM(oi.quantity) as total_quantity,
SUM(oi.quantity * oi.unit_price) as total_revenue,
COUNT(DISTINCT oi.order_id) as order_count
FROM products p
JOIN order_items oi ON p.id = oi.product_id
GROUP BY p.id, p.name
HAVING SUM(oi.quantity * oi.unit_price) > 1000
ORDER BY total_revenue DESC"
)
.with_page_size(50)
.build_postgres();

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect("postgres://user:pass@localhost/db").await?;
let reader = RdbcItemReaderBuilder::<Person>::new()
.postgres(pool)
.query("SELECT * FROM persons WHERE active = true")
.with_page_size(100)
.build_postgres();
let writer = JsonItemWriterBuilder::<Person>::new()
.pretty_formatter(true)
.from_path("persons_export.json")?;
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("export-to-json")
.chunk::<Person, Person>(500)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
Ok(())
}
let writer = CsvItemWriterBuilder::new()
.has_headers(true)
.from_path("persons_export.csv")?;
let processor = PassThroughProcessor::<Person>::new();
let step = StepBuilder::new("export-to-csv")
.chunk::<Person, Person>(500)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();

use chrono::NaiveDate;
#[derive(Debug, Clone, FromRow)]
struct TransactionSource {
transaction_id: i64,
customer_id: i64,
product_id: i64,
quantity: i32,
unit_price: f64,
transaction_date: String,
}
#[derive(Debug, Clone, Serialize)]
struct TransactionFact {
transaction_key: i64,
customer_key: i64,
product_key: i64,
date_key: i32,
quantity: i32,
unit_price: f64,
total_amount: f64,
gross_profit: f64,
}
struct FactProcessor {
product_costs: HashMap<i64, f64>,
}
impl ItemProcessor<TransactionSource, TransactionFact> for FactProcessor {
fn process(&self, item: &TransactionSource) -> ItemProcessorResult<TransactionFact> {
let total_amount = item.quantity as f64 * item.unit_price;
let cost = self.product_costs
.get(&item.product_id)
.copied()
.unwrap_or(0.0);
let gross_profit = total_amount - (cost * item.quantity as f64);
// Convert date to date key (YYYYMMDD format)
let date = NaiveDate::parse_from_str(&item.transaction_date, "%Y-%m-%d")
.map_err(|e| BatchError::ItemProcessor(e.to_string()))?;
let date_key = date.format("%Y%m%d")
.to_string()
.parse::<i32>()
.unwrap();
Ok(TransactionFact {
transaction_key: item.transaction_id,
customer_key: item.customer_id,
product_key: item.product_id,
date_key,
quantity: item.quantity,
unit_price: item.unit_price,
total_amount,
gross_profit,
})
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Load product costs
let mut product_costs = HashMap::new();
product_costs.insert(1, 50.0);
product_costs.insert(2, 25.0);
let processor = FactProcessor { product_costs };
// Source: OLTP database
let source_pool = PgPool::connect("postgres://user:pass@localhost/oltp_db").await?;
let reader = RdbcItemReaderBuilder::<TransactionSource>::new()
.postgres(source_pool)
.query(
"SELECT transaction_id, customer_id, product_id, quantity,
unit_price, transaction_date
FROM transactions
WHERE transaction_date >= CURRENT_DATE - INTERVAL '1 day'"
)
.with_page_size(1000)
.build_postgres();
// Target: Data warehouse
let dw_pool = PgPool::connect("postgres://user:pass@localhost/dw_db").await?;
let writer = PostgresItemWriterBuilder::new()
.pool(dw_pool)
.table("fact_transactions")
.binder(|query, fact: &TransactionFact| {
query.push_values([fact], |mut b, f| {
b.push_bind(&f.transaction_key)
.push_bind(&f.customer_key)
.push_bind(&f.product_key)
.push_bind(&f.date_key)
.push_bind(&f.quantity)
.push_bind(&f.unit_price)
.push_bind(&f.total_amount)
.push_bind(&f.gross_profit);
});
})
.build();
let step = StepBuilder::new("load-facts")
.chunk::<TransactionSource, TransactionFact>(1000)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
Ok(())
}

Page Size

Use larger page sizes (500-1000) for better database performance

Chunk Size

Match chunk size to page size for optimal memory usage

Connection Pooling

Use appropriate pool size based on concurrent operations

Indexes

Ensure source tables have indexes on query columns