Page Size
Use page sizes of 100-500 for optimal cursor performance
This page provides comprehensive examples for working with MongoDB using Spring Batch RS.
Add MongoDB features to your Cargo.toml:
[dependencies]spring-batch-rs = { version = "0.1", features = ["mongodb"] }mongodb = "2.8"serde = { version = "1.0", features = ["derive"] }use spring_batch_rs::{ core::step::{StepBuilder, StepExecution}, item::{ mongodb::mongodb_reader::{MongodbItemReaderBuilder, WithObjectId}, logger::LoggerWriter, },};use mongodb::{ bson::{doc, oid::ObjectId}, sync::Client,};use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)]struct Book { #[serde(rename = "_id", skip_serializing_if = "Option::is_none")] id: Option<ObjectId>, title: String, author: String, isbn: String, year: i32,}
impl WithObjectId for Book { fn get_id(&self) -> ObjectId { self.id.unwrap_or_else(|| ObjectId::new()) }}
fn main() -> Result<(), Box<dyn std::error::Error>> { let client = Client::with_uri_str("mongodb://localhost:27017")?; let db = client.database("library"); let collection = db.collection::<Book>("books");
let reader = MongodbItemReaderBuilder::new() .collection(&collection) .filter(doc! {}) // Empty filter = all documents .page_size(20) .build();
let writer = LoggerWriterBuilder::<Book>::new().build();
let step = StepBuilder::new("read-mongodb") .chunk::<Book, Book>(10) .reader(&reader) .writer(&writer) .build();
let mut execution = StepExecution::new("read-mongodb"); step.execute(&mut execution)?;
Ok(())}use spring_batch_rs::item::mongodb::mongodb_writer::MongodbItemWriterBuilder;use spring_batch_rs::item::csv::CsvItemReaderBuilder;
fn main() -> Result<(), Box<dyn std::error::Error>> { let client = Client::with_uri_str("mongodb://localhost:27017")?; let db = client.database("library"); let collection = db.collection::<Book>("books");
let reader = CsvItemReaderBuilder::<Book>::new() .has_headers(true) .from_path("books.csv")?;
let writer = MongodbItemWriterBuilder::new() .collection(&collection) .build();
let step = StepBuilder::new("csv-to-mongodb") .chunk::<Book, Book>(100) .reader(&reader) .writer(&writer) .build();
let mut execution = StepExecution::new("csv-to-mongodb"); step.execute(&mut execution)?;
Ok(())}let filter = doc! { "author": "J.K. Rowling", "year": { "$gte": 2000 }};
let reader = MongodbItemReaderBuilder::new() .collection(&collection) .filter(filter) .page_size(50) .build();let filter = doc! { "$and": [ { "year": { "$gte": 2020 } }, { "price": { "$lt": 30.0 } }, { "in_stock": true } ]};use spring_batch_rs::item::json::JsonItemWriterBuilder;
fn main() -> Result<(), Box<dyn std::error::Error>> { let client = Client::with_uri_str("mongodb://localhost:27017")?; let db = client.database("library"); let collection = db.collection::<Book>("books");
let filter = doc! { "year": { "$gte": 2020 } };
let reader = MongodbItemReaderBuilder::new() .collection(&collection) .filter(filter) .page_size(100) .build();
let writer = JsonItemWriterBuilder::<Book>::new() .pretty_formatter(true) .from_path("books_export.json")?;
let step = StepBuilder::new("mongodb-to-json") .chunk::<Book, Book>(100) .reader(&reader) .writer(&writer) .build();
Ok(())}use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};
#[derive(Deserialize, Clone)]struct RawBook { title: String, author: String, price: String, // String from source}
#[derive(Serialize)]struct ProcessedBook { title: String, author: String, price: f64, // Parsed to float category: String,}
struct BookProcessor;
impl ItemProcessor<RawBook, ProcessedBook> for BookProcessor { fn process(&self, item: RawBook) -> ItemProcessorResult<ProcessedBook> { let price = item.price.parse::<f64>() .map_err(|e| spring_batch_rs::error::BatchError::ItemProcessor( format!("Invalid price: {}", e) ))?;
let category = if item.title.to_lowercase().contains("rust") { "Programming" } else if item.title.to_lowercase().contains("novel") { "Fiction" } else { "General" }.to_string();
Ok(ProcessedBook { title: item.title.clone(), author: item.author.clone(), price, category, }) }}use sqlx::PgPool;use spring_batch_rs::item::rdbc::RdbcItemWriterBuilder;
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { // Source: MongoDB let mongo_client = Client::with_uri_str("mongodb://localhost:27017")?; let db = mongo_client.database("library"); let collection = db.collection::<Book>("books");
let reader = MongodbItemReaderBuilder::new() .collection(&collection) .filter(doc! {}) .page_size(100) .build();
// Target: PostgreSQL let pg_pool = PgPool::connect("postgres://user:pass@localhost/library_db").await?;
let writer = RdbcItemWriterBuilder::<Book>::new() .postgres(&pg_pool) .table("books") .column("title", |b: &Book| b.title.as_str().into()) .column("author", |b: &Book| b.author.as_str().into()) .column("isbn", |b: &Book| b.isbn.as_str().into()) .column("year", |b: &Book| b.year.into()) .build_postgres();
let step = StepBuilder::new("mongodb-to-postgres") .chunk::<Book, Book>(200) .reader(&reader) .writer(&writer) .build();
Ok(())}#[derive(Debug, Serialize, Deserialize, Clone)]struct Author { name: String, bio: String, birth_year: i32,}
#[derive(Debug, Serialize, Deserialize, Clone)]struct Review { user: String, rating: i32, comment: String,}
#[derive(Debug, Serialize, Deserialize, Clone)]struct ComplexBook { #[serde(rename = "_id", skip_serializing_if = "Option::is_none")] id: Option<ObjectId>, title: String, author: Author, reviews: Vec<Review>, tags: Vec<String>, metadata: HashMap<String, String>,}
impl WithObjectId for ComplexBook { fn get_id(&self) -> ObjectId { self.id.unwrap_or_else(|| ObjectId::new()) }}#[derive(Debug, Deserialize, Clone)]struct BookSummary { #[serde(rename = "_id")] author: String, total_books: i32, avg_rating: f64, newest_year: i32,}
// Note: For aggregation, you would typically run the pipeline first// and write results to a temporary collection, then read from thereuse chrono::Utc;
#[derive(Deserialize, Clone)]struct LegacyUser { user_id: String, full_name: String, email_address: String, signup_date: String,}
#[derive(Serialize)]struct ModernUser { #[serde(rename = "_id", skip_serializing_if = "Option::is_none")] id: Option<ObjectId>, user_id: String, first_name: String, last_name: String, email: String, created_at: String, migrated_at: String,}
struct UserMigrationProcessor;
impl ItemProcessor<LegacyUser, ModernUser> for UserMigrationProcessor { fn process(&self, item: LegacyUser) -> ItemProcessorResult<ModernUser> { let parts: Vec<&str> = item.full_name.split_whitespace().collect(); let (first_name, last_name) = if parts.len() >= 2 { (parts[0].to_string(), parts[1..].join(" ")) } else { (item.full_name.clone(), String::new()) };
Ok(ModernUser { id: None, user_id: item.user_id.clone(), first_name, last_name, email: item.email_address.to_lowercase(), created_at: item.signup_date.clone(), migrated_at: Utc::now().to_rfc3339(), }) }}
fn main() -> Result<(), Box<dyn std::error::Error>> { let client = Client::with_uri_str("mongodb://localhost:27017")?; let db = client.database("myapp");
let source_collection = db.collection::<LegacyUser>("legacy_users"); let target_collection = db.collection::<ModernUser>("users");
let reader = MongodbItemReaderBuilder::new() .collection(&source_collection) .filter(doc! { "migrated": { "$ne": true } }) .page_size(100) .build();
let processor = UserMigrationProcessor;
let writer = MongodbItemWriterBuilder::new() .collection(&target_collection) .build();
let step = StepBuilder::new("migrate-users") .chunk::<LegacyUser, ModernUser>(100) .reader(&reader) .processor(&processor) .writer(&writer) .build();
Ok(())}Page Size
Use page sizes of 100-500 for optimal cursor performance
Indexes
Ensure collections have indexes on filter fields
Projection
Use projection in queries to reduce data transfer (configure in filter)
Bulk Writes
Writer uses insert_many() for efficient batch inserts