From 46bee6ee6055abc006a1f4e5a009e0d80da91adb Mon Sep 17 00:00:00 2001 From: Nolan Darilek Date: Mon, 16 Jun 2025 16:06:15 -0400 Subject: [PATCH] Combine rayon and a larger batch size for another modest gain. --- Cargo.lock | 40 ++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/lib.rs | 17 ++++++++--------- 3 files changed, 49 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bf1444f..c5a9005 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -540,6 +540,25 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.12" @@ -1854,6 +1873,26 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.3.5" @@ -2267,6 +2306,7 @@ dependencies = [ "mysql_async", "mysql_common", "parquet", + "rayon", "testcontainers-modules", "time-test", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 687d503..10f0eaf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ futures = "0.3" mysql_async = { version = "0.36", features = ["chrono"] } mysql_common = "0.35" parquet = { version = "55", features = ["async"] } +rayon = "1" testcontainers-modules = { version = "0.12", features = ["mysql"] } time-test = "0.3" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } diff --git a/src/lib.rs b/src/lib.rs index a10c33d..793958e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ use arrow::{ use futures::TryStreamExt; use mysql_async::{Conn, Pool, params::Params, prelude::*}; use parquet::arrow::AsyncArrowWriter; +use rayon::prelude::*; #[cfg(test)] mod test; @@ -35,7 +36,7 @@ pub async fn convert_data(pool: &Pool, table: &str) -> Result<(), Error> { let schema = discover_schema(&mut conn, table).await?; let file = tokio::fs::File::create(format!("{table}.parquet")).await?; let mut writer = AsyncArrowWriter::try_new(file, schema.clone(), Default::default())?; - const BATCH_SIZE: usize = 10000; + const BATCH_SIZE: usize = 50000; let mut rows = Vec::with_capacity(BATCH_SIZE); // Pre-allocate for efficiency let mut stream = conn .exec_stream(format!("select * from {table}"), Params::Empty) @@ -91,28 +92,28 @@ async fn process_parquet( match field.data_type() { DataType::Int32 => { let values: Vec> = rows - .iter() + .par_iter() .map(|row| row.get::(name.as_str())) .collect(); column_data.push(Box::new(Int32Array::from(values))); } DataType::Int64 => { let values: Vec> = rows - .iter() + .par_iter() .map(|row| row.get::(name.as_str())) .collect(); column_data.push(Box::new(Int64Array::from(values))); } DataType::Utf8 => { let values: Vec> = rows - .iter() + .par_iter() .map(|row| row.get::(name.as_str())) .collect(); column_data.push(Box::new(StringArray::from(values))); } DataType::Timestamp(TimeUnit::Microsecond, None) => { let values: Vec> = rows - .iter() + .par_iter() .map(|row| { // Convert MySQL datetime to microseconds since epoch row.get::(name.as_str()) @@ -123,7 +124,7 @@ async fn process_parquet( } DataType::Date32 => { let values: Vec> = rows - .iter() + .par_iter() .map(|row| { let date = row.get::(name.as_str()); Some((date.unwrap() - epoch).num_days() as i32) @@ -134,15 +135,13 @@ async fn process_parquet( _ => { // Fallback to string for unknown types let values: Vec> = rows - .iter() + .par_iter() .map(|row| row.get::(name.as_str())) .collect(); column_data.push(Box::new(StringArray::from(values))); } } } - - // Convert to Arc for RecordBatch let columns: Vec> = column_data.into_iter().map(|arr| arr.into()).collect(); let batch = RecordBatch::try_new(schema.clone(), columns)?; writer.write(&batch).await?;