Combine rayon and a larger batch size for another modest gain.

This commit is contained in:
Nolan Darilek 2025-06-16 16:06:15 -04:00
parent 75144e21ee
commit 46bee6ee60
3 changed files with 49 additions and 9 deletions

40
Cargo.lock generated
View file

@ -540,6 +540,25 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "crossbeam-deque"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]] [[package]]
name = "crossbeam-queue" name = "crossbeam-queue"
version = "0.3.12" version = "0.3.12"
@ -1854,6 +1873,26 @@ dependencies = [
"getrandom 0.3.3", "getrandom 0.3.3",
] ]
[[package]]
name = "rayon"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.3.5" version = "0.3.5"
@ -2267,6 +2306,7 @@ dependencies = [
"mysql_async", "mysql_async",
"mysql_common", "mysql_common",
"parquet", "parquet",
"rayon",
"testcontainers-modules", "testcontainers-modules",
"time-test", "time-test",
"tokio", "tokio",

View file

@ -10,6 +10,7 @@ futures = "0.3"
mysql_async = { version = "0.36", features = ["chrono"] } mysql_async = { version = "0.36", features = ["chrono"] }
mysql_common = "0.35" mysql_common = "0.35"
parquet = { version = "55", features = ["async"] } parquet = { version = "55", features = ["async"] }
rayon = "1"
testcontainers-modules = { version = "0.12", features = ["mysql"] } testcontainers-modules = { version = "0.12", features = ["mysql"] }
time-test = "0.3" time-test = "0.3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

View file

@ -10,6 +10,7 @@ use arrow::{
use futures::TryStreamExt; use futures::TryStreamExt;
use mysql_async::{Conn, Pool, params::Params, prelude::*}; use mysql_async::{Conn, Pool, params::Params, prelude::*};
use parquet::arrow::AsyncArrowWriter; use parquet::arrow::AsyncArrowWriter;
use rayon::prelude::*;
#[cfg(test)] #[cfg(test)]
mod test; mod test;
@ -35,7 +36,7 @@ pub async fn convert_data(pool: &Pool, table: &str) -> Result<(), Error> {
let schema = discover_schema(&mut conn, table).await?; let schema = discover_schema(&mut conn, table).await?;
let file = tokio::fs::File::create(format!("{table}.parquet")).await?; let file = tokio::fs::File::create(format!("{table}.parquet")).await?;
let mut writer = AsyncArrowWriter::try_new(file, schema.clone(), Default::default())?; let mut writer = AsyncArrowWriter::try_new(file, schema.clone(), Default::default())?;
const BATCH_SIZE: usize = 10000; const BATCH_SIZE: usize = 50000;
let mut rows = Vec::with_capacity(BATCH_SIZE); // Pre-allocate for efficiency let mut rows = Vec::with_capacity(BATCH_SIZE); // Pre-allocate for efficiency
let mut stream = conn let mut stream = conn
.exec_stream(format!("select * from {table}"), Params::Empty) .exec_stream(format!("select * from {table}"), Params::Empty)
@ -91,28 +92,28 @@ async fn process_parquet(
match field.data_type() { match field.data_type() {
DataType::Int32 => { DataType::Int32 => {
let values: Vec<Option<i32>> = rows let values: Vec<Option<i32>> = rows
.iter() .par_iter()
.map(|row| row.get::<i32, _>(name.as_str())) .map(|row| row.get::<i32, _>(name.as_str()))
.collect(); .collect();
column_data.push(Box::new(Int32Array::from(values))); column_data.push(Box::new(Int32Array::from(values)));
} }
DataType::Int64 => { DataType::Int64 => {
let values: Vec<Option<i64>> = rows let values: Vec<Option<i64>> = rows
.iter() .par_iter()
.map(|row| row.get::<i64, _>(name.as_str())) .map(|row| row.get::<i64, _>(name.as_str()))
.collect(); .collect();
column_data.push(Box::new(Int64Array::from(values))); column_data.push(Box::new(Int64Array::from(values)));
} }
DataType::Utf8 => { DataType::Utf8 => {
let values: Vec<Option<String>> = rows let values: Vec<Option<String>> = rows
.iter() .par_iter()
.map(|row| row.get::<String, _>(name.as_str())) .map(|row| row.get::<String, _>(name.as_str()))
.collect(); .collect();
column_data.push(Box::new(StringArray::from(values))); column_data.push(Box::new(StringArray::from(values)));
} }
DataType::Timestamp(TimeUnit::Microsecond, None) => { DataType::Timestamp(TimeUnit::Microsecond, None) => {
let values: Vec<Option<i64>> = rows let values: Vec<Option<i64>> = rows
.iter() .par_iter()
.map(|row| { .map(|row| {
// Convert MySQL datetime to microseconds since epoch // Convert MySQL datetime to microseconds since epoch
row.get::<mysql_common::chrono::NaiveDateTime, _>(name.as_str()) row.get::<mysql_common::chrono::NaiveDateTime, _>(name.as_str())
@ -123,7 +124,7 @@ async fn process_parquet(
} }
DataType::Date32 => { DataType::Date32 => {
let values: Vec<Option<i32>> = rows let values: Vec<Option<i32>> = rows
.iter() .par_iter()
.map(|row| { .map(|row| {
let date = row.get::<mysql_common::chrono::NaiveDate, _>(name.as_str()); let date = row.get::<mysql_common::chrono::NaiveDate, _>(name.as_str());
Some((date.unwrap() - epoch).num_days() as i32) Some((date.unwrap() - epoch).num_days() as i32)
@ -134,15 +135,13 @@ async fn process_parquet(
_ => { _ => {
// Fallback to string for unknown types // Fallback to string for unknown types
let values: Vec<Option<String>> = rows let values: Vec<Option<String>> = rows
.iter() .par_iter()
.map(|row| row.get::<String, _>(name.as_str())) .map(|row| row.get::<String, _>(name.as_str()))
.collect(); .collect();
column_data.push(Box::new(StringArray::from(values))); column_data.push(Box::new(StringArray::from(values)));
} }
} }
} }
// Convert to Arc<dyn Array> for RecordBatch
let columns: Vec<Arc<dyn Array>> = column_data.into_iter().map(|arr| arr.into()).collect(); let columns: Vec<Arc<dyn Array>> = column_data.into_iter().map(|arr| arr.into()).collect();
let batch = RecordBatch::try_new(schema.clone(), columns)?; let batch = RecordBatch::try_new(schema.clone(), columns)?;
writer.write(&batch).await?; writer.write(&batch).await?;