Skip to content

MongoDB Examples

This page provides comprehensive examples for working with MongoDB using Spring Batch RS.

Add MongoDB features to your Cargo.toml:

[dependencies]
spring-batch-rs = { version = "0.1", features = ["mongodb"] }
mongodb = "2.8"
serde = { version = "1.0", features = ["derive"] }

use spring_batch_rs::{
core::step::{StepBuilder, StepExecution},
item::{
mongodb::mongodb_reader::{MongodbItemReaderBuilder, WithObjectId},
logger::LoggerWriter,
},
};
use mongodb::{
bson::{doc, oid::ObjectId},
sync::Client,
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Book {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
id: Option<ObjectId>,
title: String,
author: String,
isbn: String,
year: i32,
}
impl WithObjectId for Book {
fn get_id(&self) -> ObjectId {
self.id.unwrap_or_else(|| ObjectId::new())
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::with_uri_str("mongodb://localhost:27017")?;
let db = client.database("library");
let collection = db.collection::<Book>("books");
let reader = MongodbItemReaderBuilder::new()
.collection(&collection)
.filter(doc! {}) // Empty filter = all documents
.page_size(20)
.build();
let writer = LoggerWriterBuilder::<Book>::new().build();
let step = StepBuilder::new("read-mongodb")
.chunk::<Book, Book>(10)
.reader(&reader)
.writer(&writer)
.build();
let mut execution = StepExecution::new("read-mongodb");
step.execute(&mut execution)?;
Ok(())
}

use spring_batch_rs::item::mongodb::mongodb_writer::MongodbItemWriterBuilder;
use spring_batch_rs::item::csv::CsvItemReaderBuilder;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::with_uri_str("mongodb://localhost:27017")?;
let db = client.database("library");
let collection = db.collection::<Book>("books");
let reader = CsvItemReaderBuilder::<Book>::new()
.has_headers(true)
.from_path("books.csv")?;
let writer = MongodbItemWriterBuilder::new()
.collection(&collection)
.build();
let step = StepBuilder::new("csv-to-mongodb")
.chunk::<Book, Book>(100)
.reader(&reader)
.writer(&writer)
.build();
let mut execution = StepExecution::new("csv-to-mongodb");
step.execute(&mut execution)?;
Ok(())
}

let filter = doc! {
"author": "J.K. Rowling",
"year": { "$gte": 2000 }
};
let reader = MongodbItemReaderBuilder::new()
.collection(&collection)
.filter(filter)
.page_size(50)
.build();
let filter = doc! {
"$and": [
{ "year": { "$gte": 2020 } },
{ "price": { "$lt": 30.0 } },
{ "in_stock": true }
]
};

use spring_batch_rs::item::json::JsonItemWriterBuilder;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::with_uri_str("mongodb://localhost:27017")?;
let db = client.database("library");
let collection = db.collection::<Book>("books");
let filter = doc! { "year": { "$gte": 2020 } };
let reader = MongodbItemReaderBuilder::new()
.collection(&collection)
.filter(filter)
.page_size(100)
.build();
let writer = JsonItemWriterBuilder::<Book>::new()
.pretty_formatter(true)
.from_path("books_export.json")?;
let step = StepBuilder::new("mongodb-to-json")
.chunk::<Book, Book>(100)
.reader(&reader)
.writer(&writer)
.build();
Ok(())
}

use spring_batch_rs::core::item::{ItemProcessor, ItemProcessorResult};
#[derive(Deserialize, Clone)]
struct RawBook {
title: String,
author: String,
price: String, // String from source
}
#[derive(Serialize)]
struct ProcessedBook {
title: String,
author: String,
price: f64, // Parsed to float
category: String,
}
struct BookProcessor;
impl ItemProcessor<RawBook, ProcessedBook> for BookProcessor {
fn process(&self, item: RawBook) -> ItemProcessorResult<ProcessedBook> {
let price = item.price.parse::<f64>()
.map_err(|e| spring_batch_rs::error::BatchError::ItemProcessor(
format!("Invalid price: {}", e)
))?;
let category = if item.title.to_lowercase().contains("rust") {
"Programming"
} else if item.title.to_lowercase().contains("novel") {
"Fiction"
} else {
"General"
}.to_string();
Ok(ProcessedBook {
title: item.title.clone(),
author: item.author.clone(),
price,
category,
})
}
}

use sqlx::PgPool;
use spring_batch_rs::item::rdbc::RdbcItemWriterBuilder;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Source: MongoDB
let mongo_client = Client::with_uri_str("mongodb://localhost:27017")?;
let db = mongo_client.database("library");
let collection = db.collection::<Book>("books");
let reader = MongodbItemReaderBuilder::new()
.collection(&collection)
.filter(doc! {})
.page_size(100)
.build();
// Target: PostgreSQL
let pg_pool = PgPool::connect("postgres://user:pass@localhost/library_db").await?;
let writer = RdbcItemWriterBuilder::<Book>::new()
.postgres(&pg_pool)
.table("books")
.column("title", |b: &Book| b.title.as_str().into())
.column("author", |b: &Book| b.author.as_str().into())
.column("isbn", |b: &Book| b.isbn.as_str().into())
.column("year", |b: &Book| b.year.into())
.build_postgres();
let step = StepBuilder::new("mongodb-to-postgres")
.chunk::<Book, Book>(200)
.reader(&reader)
.writer(&writer)
.build();
Ok(())
}

#[derive(Debug, Serialize, Deserialize, Clone)]
struct Author {
name: String,
bio: String,
birth_year: i32,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Review {
user: String,
rating: i32,
comment: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct ComplexBook {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
id: Option<ObjectId>,
title: String,
author: Author,
reviews: Vec<Review>,
tags: Vec<String>,
metadata: HashMap<String, String>,
}
impl WithObjectId for ComplexBook {
fn get_id(&self) -> ObjectId {
self.id.unwrap_or_else(|| ObjectId::new())
}
}

#[derive(Debug, Deserialize, Clone)]
struct BookSummary {
#[serde(rename = "_id")]
author: String,
total_books: i32,
avg_rating: f64,
newest_year: i32,
}
// Note: For aggregation, you would typically run the pipeline first
// and write results to a temporary collection, then read from there

use chrono::Utc;
#[derive(Deserialize, Clone)]
struct LegacyUser {
user_id: String,
full_name: String,
email_address: String,
signup_date: String,
}
#[derive(Serialize)]
struct ModernUser {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
id: Option<ObjectId>,
user_id: String,
first_name: String,
last_name: String,
email: String,
created_at: String,
migrated_at: String,
}
struct UserMigrationProcessor;
impl ItemProcessor<LegacyUser, ModernUser> for UserMigrationProcessor {
fn process(&self, item: LegacyUser) -> ItemProcessorResult<ModernUser> {
let parts: Vec<&str> = item.full_name.split_whitespace().collect();
let (first_name, last_name) = if parts.len() >= 2 {
(parts[0].to_string(), parts[1..].join(" "))
} else {
(item.full_name.clone(), String::new())
};
Ok(ModernUser {
id: None,
user_id: item.user_id.clone(),
first_name,
last_name,
email: item.email_address.to_lowercase(),
created_at: item.signup_date.clone(),
migrated_at: Utc::now().to_rfc3339(),
})
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::with_uri_str("mongodb://localhost:27017")?;
let db = client.database("myapp");
let source_collection = db.collection::<LegacyUser>("legacy_users");
let target_collection = db.collection::<ModernUser>("users");
let reader = MongodbItemReaderBuilder::new()
.collection(&source_collection)
.filter(doc! { "migrated": { "$ne": true } })
.page_size(100)
.build();
let processor = UserMigrationProcessor;
let writer = MongodbItemWriterBuilder::new()
.collection(&target_collection)
.build();
let step = StepBuilder::new("migrate-users")
.chunk::<LegacyUser, ModernUser>(100)
.reader(&reader)
.processor(&processor)
.writer(&writer)
.build();
Ok(())
}

Page Size

Use page sizes of 100-500 for optimal cursor performance

Indexes

Ensure collections have indexes on filter fields

Projection

Use projection in queries to reduce data transfer (configure in filter)

Bulk Writes

Writer uses insert_many() for efficient batch inserts