Naive single-threaded Parquet writing.

This commit is contained in:
Nolan Darilek 2025-06-16 15:26:56 -04:00
parent e7bb6ea08f
commit 8f10dff3a0
6 changed files with 834 additions and 45 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
/target /target
.direnv/ .direnv/
*.parquet

654
Cargo.lock generated
View file

@ -17,6 +17,20 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]]
name = "ahash"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
dependencies = [
"cfg-if",
"const-random",
"getrandom 0.3.3",
"once_cell",
"version_check",
"zerocopy",
]
[[package]] [[package]]
name = "aho-corasick" name = "aho-corasick"
version = "1.1.3" version = "1.1.3"
@ -26,6 +40,21 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "alloc-no-stdlib"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3"
[[package]]
name = "alloc-stdlib"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece"
dependencies = [
"alloc-no-stdlib",
]
[[package]] [[package]]
name = "allocator-api2" name = "allocator-api2"
version = "0.2.21" version = "0.2.21"
@ -47,6 +76,214 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "arrow"
version = "55.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1bb018b6960c87fd9d025009820406f74e83281185a8bdcb44880d2aa5c9a87"
dependencies = [
"arrow-arith",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-csv",
"arrow-data",
"arrow-ipc",
"arrow-json",
"arrow-ord",
"arrow-row",
"arrow-schema",
"arrow-select",
"arrow-string",
]
[[package]]
name = "arrow-arith"
version = "55.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44de76b51473aa888ecd6ad93ceb262fb8d40d1f1154a4df2f069b3590aa7575"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"chrono",
"num",
]
[[package]]
name = "arrow-array"
version = "55.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29ed77e22744475a9a53d00026cf8e166fe73cf42d89c4c4ae63607ee1cfcc3f"
dependencies = [
"ahash",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"chrono",
"half",
"hashbrown 0.15.4",
"num",
]
[[package]]
name = "arrow-buffer"
version = "55.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0391c96eb58bf7389171d1e103112d3fc3e5625ca6b372d606f2688f1ea4cce"
dependencies = [
"bytes",
"half",
"num",
]
[[package]]
name = "arrow-cast"
version = "55.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f39e1d774ece9292697fcbe06b5584401b26bd34be1bec25c33edae65c2420ff"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"arrow-select",
"atoi",
"base64 0.22.1",
"chrono",
"half",
"lexical-core",
"num",
"ryu",
]
[[package]]
name = "arrow-csv"
version = "55.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9055c972a07bf12c2a827debfd34f88d3b93da1941d36e1d9fee85eebe38a12a"
dependencies = [
"arrow-array",
"arrow-cast",
"arrow-schema",
"chrono",
"csv",
"csv-core",
"lazy_static",
"regex",
]
[[package]]
name = "arrow-data"
version = "55.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf75ac27a08c7f48b88e5c923f267e980f27070147ab74615ad85b5c5f90473d"
dependencies = [
"arrow-buffer",
"arrow-schema",
"half",
"num",
]
[[package]]
name = "arrow-ipc"
version = "55.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a222f0d93772bd058d1268f4c28ea421a603d66f7979479048c429292fac7b2e"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"flatbuffers",
]
[[package]]
name = "arrow-json"
version = "55.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9085342bbca0f75e8cb70513c0807cc7351f1fbf5cb98192a67d5e3044acb033"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data",
"arrow-schema",
"chrono",
"half",
"indexmap 2.9.0",
"lexical-core",
"memchr",
"num",
"serde",
"serde_json",
"simdutf8",
]
[[package]]
name = "arrow-ord"
version = "55.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab2f1065a5cad7b9efa9e22ce5747ce826aa3855766755d4904535123ef431e7"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"arrow-select",
]
[[package]]
name = "arrow-row"
version = "55.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3703a0e3e92d23c3f756df73d2dc9476873f873a76ae63ef9d3de17fda83b2d8"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"half",
]
[[package]]
name = "arrow-schema"
version = "55.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73a47aa0c771b5381de2b7f16998d351a6f4eb839f1e13d48353e17e873d969b"
[[package]]
name = "arrow-select"
version = "55.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24b7b85575702b23b85272b01bc1c25a01c9b9852305e5d0078c79ba25d995d4"
dependencies = [
"ahash",
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"num",
]
[[package]]
name = "arrow-string"
version = "55.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9260fddf1cdf2799ace2b4c2fc0356a9789fa7551e0953e35435536fecefebbd"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"arrow-select",
"memchr",
"num",
"regex",
"regex-syntax",
]
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.88" version = "0.1.88"
@ -58,6 +295,15 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "atoi"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528"
dependencies = [
"num-traits",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.4.0" version = "1.4.0"
@ -162,6 +408,27 @@ dependencies = [
"serde_with", "serde_with",
] ]
[[package]]
name = "brotli"
version = "8.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9991eea70ea4f293524138648e41ee89b0b2b12ddef3b255effa43c8056e0e0d"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
"brotli-decompressor",
]
[[package]]
name = "brotli-decompressor"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "874bb8112abecc98cbd6d81ea4fa7e94fb9449648c93cc89aa40c81c24d7de03"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
]
[[package]] [[package]]
name = "btoi" name = "btoi"
version = "0.4.3" version = "0.4.3"
@ -195,6 +462,8 @@ version = "1.2.26"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "956a5e21988b87f372569b66183b78babf23ebc2e744b733e4350a752c4dafac" checksum = "956a5e21988b87f372569b66183b78babf23ebc2e744b733e4350a752c4dafac"
dependencies = [ dependencies = [
"jobserver",
"libc",
"shlex", "shlex",
] ]
@ -217,6 +486,26 @@ dependencies = [
"windows-link", "windows-link",
] ]
[[package]]
name = "const-random"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359"
dependencies = [
"const-random-macro",
]
[[package]]
name = "const-random-macro"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e"
dependencies = [
"getrandom 0.2.16",
"once_cell",
"tiny-keccak",
]
[[package]] [[package]]
name = "core-foundation" name = "core-foundation"
version = "0.10.1" version = "0.10.1"
@ -266,6 +555,12 @@ version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]]
name = "crunchy"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43da5946c66ffcc7745f48db692ffbb10a83bfe0afd96235c5c2a4fb23994929"
[[package]] [[package]]
name = "crypto-common" name = "crypto-common"
version = "0.1.6" version = "0.1.6"
@ -276,6 +571,27 @@ dependencies = [
"typenum", "typenum",
] ]
[[package]]
name = "csv"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf"
dependencies = [
"csv-core",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "csv-core"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d02f3b0da4c6504f86e9cd789d8dbafab48c2321be74e9987593de5a894d93d"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "darling" name = "darling"
version = "0.20.11" version = "0.20.11"
@ -418,6 +734,16 @@ dependencies = [
"windows-sys 0.59.0", "windows-sys 0.59.0",
] ]
[[package]]
name = "flatbuffers"
version = "25.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1045398c1bfd89168b5fd3f1fc11f6e70b34f6f66300c87d44d3de849463abf1"
dependencies = [
"bitflags 2.9.1",
"rustc_version",
]
[[package]] [[package]]
name = "flate2" name = "flate2"
version = "1.1.2" version = "1.1.2"
@ -425,6 +751,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d" checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d"
dependencies = [ dependencies = [
"crc32fast", "crc32fast",
"libz-rs-sys",
"libz-sys", "libz-sys",
"miniz_oxide", "miniz_oxide",
] ]
@ -578,6 +905,17 @@ version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "half"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "459196ed295495a68f7d7fe1d84f6c4b7ff0e21fe3017b2f283c6fac3ad803c9"
dependencies = [
"cfg-if",
"crunchy",
"num-traits",
]
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.12.3" version = "0.12.3"
@ -908,12 +1246,28 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "integer-encoding"
version = "3.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
[[package]] [[package]]
name = "itoa" name = "itoa"
version = "1.0.15" version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
[[package]]
name = "jobserver"
version = "0.1.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a"
dependencies = [
"getrandom 0.3.3",
"libc",
]
[[package]] [[package]]
name = "js-sys" name = "js-sys"
version = "0.3.77" version = "0.3.77"
@ -933,12 +1287,88 @@ dependencies = [
"indexmap 2.9.0", "indexmap 2.9.0",
] ]
[[package]]
name = "lazy_static"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "lexical-core"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b765c31809609075565a70b4b71402281283aeda7ecaf4818ac14a7b2ade8958"
dependencies = [
"lexical-parse-float",
"lexical-parse-integer",
"lexical-util",
"lexical-write-float",
"lexical-write-integer",
]
[[package]]
name = "lexical-parse-float"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de6f9cb01fb0b08060209a057c048fcbab8717b4c1ecd2eac66ebfe39a65b0f2"
dependencies = [
"lexical-parse-integer",
"lexical-util",
"static_assertions",
]
[[package]]
name = "lexical-parse-integer"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72207aae22fc0a121ba7b6d479e42cbfea549af1479c3f3a4f12c70dd66df12e"
dependencies = [
"lexical-util",
"static_assertions",
]
[[package]]
name = "lexical-util"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a82e24bf537fd24c177ffbbdc6ebcc8d54732c35b50a3f28cc3f4e4c949a0b3"
dependencies = [
"static_assertions",
]
[[package]]
name = "lexical-write-float"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5afc668a27f460fb45a81a757b6bf2f43c2d7e30cb5a2dcd3abf294c78d62bd"
dependencies = [
"lexical-util",
"lexical-write-integer",
"static_assertions",
]
[[package]]
name = "lexical-write-integer"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "629ddff1a914a836fb245616a7888b62903aae58fa771e1d83943035efa0f978"
dependencies = [
"lexical-util",
"static_assertions",
]
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.172" version = "0.2.172"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
[[package]]
name = "libm"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de"
[[package]] [[package]]
name = "libredox" name = "libredox"
version = "0.1.3" version = "0.1.3"
@ -950,6 +1380,15 @@ dependencies = [
"redox_syscall 0.5.12", "redox_syscall 0.5.12",
] ]
[[package]]
name = "libz-rs-sys"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "172a788537a2221661b480fee8dc5f96c580eb34fa88764d3205dc356c7e4221"
dependencies = [
"zlib-rs",
]
[[package]] [[package]]
name = "libz-sys" name = "libz-sys"
version = "1.1.22" version = "1.1.22"
@ -988,6 +1427,15 @@ dependencies = [
"hashbrown 0.15.4", "hashbrown 0.15.4",
] ]
[[package]]
name = "lz4_flex"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
dependencies = [
"twox-hash 1.6.3",
]
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.7.5" version = "2.7.5"
@ -1056,7 +1504,7 @@ dependencies = [
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-util", "tokio-util",
"twox-hash", "twox-hash 2.1.1",
"url", "url",
] ]
@ -1071,6 +1519,7 @@ dependencies = [
"btoi", "btoi",
"byteorder", "byteorder",
"bytes", "bytes",
"chrono",
"crc32fast", "crc32fast",
"flate2", "flate2",
"getrandom 0.3.3", "getrandom 0.3.3",
@ -1087,6 +1536,20 @@ dependencies = [
"uuid", "uuid",
] ]
[[package]]
name = "num"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23"
dependencies = [
"num-bigint",
"num-complex",
"num-integer",
"num-iter",
"num-rational",
"num-traits",
]
[[package]] [[package]]
name = "num-bigint" name = "num-bigint"
version = "0.4.6" version = "0.4.6"
@ -1097,6 +1560,15 @@ dependencies = [
"num-traits", "num-traits",
] ]
[[package]]
name = "num-complex"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495"
dependencies = [
"num-traits",
]
[[package]] [[package]]
name = "num-conv" name = "num-conv"
version = "0.1.0" version = "0.1.0"
@ -1112,6 +1584,28 @@ dependencies = [
"num-traits", "num-traits",
] ]
[[package]]
name = "num-iter"
version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-rational"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824"
dependencies = [
"num-bigint",
"num-integer",
"num-traits",
]
[[package]] [[package]]
name = "num-traits" name = "num-traits"
version = "0.2.19" version = "0.2.19"
@ -1119,6 +1613,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"libm",
] ]
[[package]] [[package]]
@ -1142,6 +1637,50 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
name = "ordered-float"
version = "2.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c"
dependencies = [
"num-traits",
]
[[package]]
name = "parquet"
version = "55.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be7b2d778f6b841d37083ebdf32e33a524acde1266b5884a8ca29bf00dfa1231"
dependencies = [
"ahash",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data",
"arrow-ipc",
"arrow-schema",
"arrow-select",
"base64 0.22.1",
"brotli",
"bytes",
"chrono",
"flate2",
"futures",
"half",
"hashbrown 0.15.4",
"lz4_flex",
"num",
"num-bigint",
"paste",
"seq-macro",
"simdutf8",
"snap",
"thrift",
"tokio",
"twox-hash 2.1.1",
"zstd",
]
[[package]] [[package]]
name = "parse-display" name = "parse-display"
version = "0.9.1" version = "0.9.1"
@ -1167,6 +1706,12 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "paste"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
[[package]] [[package]]
name = "pem" name = "pem"
version = "3.0.5" version = "3.0.5"
@ -1376,6 +1921,15 @@ version = "0.1.25"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f" checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f"
[[package]]
name = "rustc_version"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92"
dependencies = [
"semver",
]
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "1.0.7" version = "1.0.7"
@ -1494,6 +2048,18 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "semver"
version = "1.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0"
[[package]]
name = "seq-macro"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.219" version = "1.0.219"
@ -1607,6 +2173,12 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "simdutf8"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e"
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.9" version = "0.4.9"
@ -1622,6 +2194,12 @@ version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
[[package]]
name = "snap"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b"
[[package]] [[package]]
name = "socket2" name = "socket2"
version = "0.5.10" version = "0.5.10"
@ -1638,6 +2216,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]] [[package]]
name = "strsim" name = "strsim"
version = "0.11.1" version = "0.11.1"
@ -1677,8 +2261,12 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
name = "supermetal_assignment" name = "supermetal_assignment"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"arrow",
"derive_more", "derive_more",
"futures",
"mysql_async", "mysql_async",
"mysql_common",
"parquet",
"testcontainers-modules", "testcontainers-modules",
"time-test", "time-test",
"tokio", "tokio",
@ -1773,6 +2361,17 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "thrift"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09"
dependencies = [
"byteorder",
"integer-encoding",
"ordered-float",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.3.41" version = "0.3.41"
@ -1813,6 +2412,15 @@ dependencies = [
"time", "time",
] ]
[[package]]
name = "tiny-keccak"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237"
dependencies = [
"crunchy",
]
[[package]] [[package]]
name = "tinystr" name = "tinystr"
version = "0.8.1" version = "0.8.1"
@ -1947,6 +2555,16 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "twox-hash"
version = "1.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
dependencies = [
"cfg-if",
"static_assertions",
]
[[package]] [[package]]
name = "twox-hash" name = "twox-hash"
version = "2.1.1" version = "2.1.1"
@ -2402,3 +3020,37 @@ dependencies = [
"quote", "quote",
"syn", "syn",
] ]
[[package]]
name = "zlib-rs"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "626bd9fa9734751fc50d6060752170984d7053f5a39061f524cda68023d4db8a"
[[package]]
name = "zstd"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "7.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d"
dependencies = [
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.15+zstd.1.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237"
dependencies = [
"cc",
"pkg-config",
]

View file

@ -4,8 +4,12 @@ version = "0.1.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]
arrow = "55"
derive_more = { version = "2", features = ["deref", "deref_mut"] } derive_more = { version = "2", features = ["deref", "deref_mut"] }
mysql_async = "0.36" futures = "0.3"
mysql_async = { version = "0.36", features = ["chrono"] }
mysql_common = "0.35"
parquet = { version = "55", features = ["async"] }
testcontainers-modules = { version = "0.12", features = ["mysql"] } testcontainers-modules = { version = "0.12", features = ["mysql"] }
time-test = "0.3" time-test = "0.3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

View file

@ -27,6 +27,7 @@
rustfmt rustfmt
rustPackages.clippy rustPackages.clippy
llvmPackages.bintools llvmPackages.bintools
parquet-tools
]; ];
shellHook = '' shellHook = ''
pre-commit install pre-commit install

View file

@ -1,8 +1,22 @@
use std::error::Error; use std::sync::Arc;
use mysql_async::{Pool, prelude::*}; use arrow::{
array::{
Array, Date32Array, Int32Array, Int64Array, RecordBatch, StringArray,
TimestampMicrosecondArray,
},
datatypes::{DataType, Field, Schema, TimeUnit},
};
use futures::TryStreamExt;
use mysql_async::{Conn, Pool, params::Params, prelude::*};
use parquet::arrow::AsyncArrowWriter;
pub async fn load_data(pool: &Pool) -> Result<(), Box<dyn Error>> { #[cfg(test)]
mod test;
type Error = Box<dyn std::error::Error + Send + Sync>;
pub async fn load_data(pool: &Pool) -> Result<(), Error> {
// Obviously use something better/more robust here if you're a) loading // Obviously use something better/more robust here if you're a) loading
// other data sources and b) aren't so lucky as to have a single `;` after // other data sources and b) aren't so lucky as to have a single `;` after
// each block of SQL. :) // each block of SQL. :)
@ -10,54 +24,128 @@ pub async fn load_data(pool: &Pool) -> Result<(), Box<dyn Error>> {
for stmt in data.split(";") { for stmt in data.split(";") {
if !stmt.trim().is_empty() { if !stmt.trim().is_empty() {
let mut conn = pool.get_conn().await?; let mut conn = pool.get_conn().await?;
conn.exec_drop(stmt, params::Params::Empty).await?; conn.exec_drop(stmt, Params::Empty).await?;
} }
} }
Ok(()) Ok(())
} }
#[cfg(test)] pub async fn convert_data(pool: &Pool, table: &str) -> Result<(), Error> {
mod test { let mut conn = pool.get_conn().await?;
use std::error::Error; let schema = discover_schema(&mut conn, table).await?;
let file = tokio::fs::File::create(format!("{table}.parquet")).await?;
use derive_more::{Deref, DerefMut}; let mut writer = AsyncArrowWriter::try_new(file, schema.clone(), Default::default())?;
const BATCH_SIZE: usize = 10000;
use mysql_async::Pool; let mut rows = Vec::with_capacity(BATCH_SIZE); // Pre-allocate for efficiency
use testcontainers_modules::testcontainers::ContainerAsync; let mut stream = conn
.exec_stream(format!("select * from {table}"), Params::Empty)
// This is a bit YOLO but is the quickest/cleanest way I can think of to a) get
// a connection per database and b) ensure `Drop` cleans it up.
#[derive(Deref, DerefMut)]
struct TestPool(
#[deref]
#[deref_mut]
Pool,
// Never read, only needed for `Drop`.
#[allow(dead_code)] ContainerAsync<testcontainers_modules::mysql::Mysql>,
);
impl TestPool {
async fn try_new() -> Result<TestPool, Box<dyn Error>> {
use testcontainers_modules::{mysql, testcontainers::runners::AsyncRunner};
let mysql = mysql::Mysql::default().start().await?;
let url = format!(
"mysql://root@{}:{}/test",
mysql.get_host().await?,
mysql.get_host_port_ipv4(3306).await?
);
let pool = Pool::from_url(&url)?;
Ok(TestPool(pool, mysql))
}
}
#[tokio::test]
async fn test_mysql() -> Result<(), Box<dyn std::error::Error>> {
let pool = TestPool::try_new().await?;
time_test::time_test!();
crate::load_data(&pool).await?;
<mysql_async::Pool as Clone>::clone(&pool)
.disconnect()
.await?; .await?;
while let Some(row) = stream.try_next().await? {
rows.push(row);
if rows.len() >= BATCH_SIZE {
process_parquet(&mut rows, schema.clone(), &mut writer).await?;
}
}
if !rows.is_empty() {
process_parquet(&mut rows, schema.clone(), &mut writer).await?;
}
writer.close().await?;
Ok(()) Ok(())
} }
async fn discover_schema(conn: &mut Conn, table: &str) -> Result<Arc<Schema>, Error> {
let query = format!("DESCRIBE {}", table);
let rows: Vec<mysql_async::Row> = conn.exec(query, ()).await?;
let mut fields = Vec::new();
for row in rows {
let name: String = row.get("Field").unwrap();
let mysql_type: String = row.get("Type").unwrap();
let is_nullable: String = row.get("Null").unwrap();
let arrow_type = match mysql_type.as_str() {
t if t.starts_with("int") => DataType::Int32,
t if t.starts_with("bigint") => DataType::Int64,
t if t.starts_with("varchar") => DataType::Utf8,
t if t.starts_with("datetime") => DataType::Timestamp(TimeUnit::Microsecond, None),
t if t.starts_with("date") => DataType::Date32,
_ => DataType::Utf8,
};
let nullable = is_nullable == "YES";
fields.push(Field::new(name, arrow_type, nullable));
}
Ok(Arc::new(Schema::new(fields)))
}
async fn process_parquet(
rows: &mut Vec<mysql_async::Row>,
schema: Arc<Schema>,
writer: &mut AsyncArrowWriter<tokio::fs::File>,
) -> Result<(), Error> {
if rows.is_empty() {
return Ok(());
}
let epoch = mysql_common::chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
let mut column_data: Vec<Box<dyn Array>> = Vec::new();
for field in schema.fields() {
let name = field.name();
println!("Importing {name}");
match field.data_type() {
DataType::Int32 => {
let values: Vec<Option<i32>> = rows
.iter()
.map(|row| row.get::<i32, _>(name.as_str()))
.collect();
column_data.push(Box::new(Int32Array::from(values)));
}
DataType::Int64 => {
let values: Vec<Option<i64>> = rows
.iter()
.map(|row| row.get::<i64, _>(name.as_str()))
.collect();
column_data.push(Box::new(Int64Array::from(values)));
}
DataType::Utf8 => {
let values: Vec<Option<String>> = rows
.iter()
.map(|row| row.get::<String, _>(name.as_str()))
.collect();
column_data.push(Box::new(StringArray::from(values)));
}
DataType::Timestamp(TimeUnit::Microsecond, None) => {
let values: Vec<Option<i64>> = rows
.iter()
.map(|row| {
// Convert MySQL datetime to microseconds since epoch
row.get::<mysql_common::chrono::NaiveDateTime, _>(name.as_str())
.map(|dt| dt.and_utc().timestamp_micros())
})
.collect();
column_data.push(Box::new(TimestampMicrosecondArray::from(values)));
}
DataType::Date32 => {
let values: Vec<Option<i32>> = rows
.iter()
.map(|row| {
let date = row.get::<mysql_common::chrono::NaiveDate, _>(name.as_str());
Some((date.unwrap() - epoch).num_days() as i32)
})
.collect();
column_data.push(Box::new(Date32Array::from(values)));
}
_ => {
// Fallback to string for unknown types
let values: Vec<Option<String>> = rows
.iter()
.map(|row| row.get::<String, _>(name.as_str()))
.collect();
column_data.push(Box::new(StringArray::from(values)));
}
}
}
// Convert to Arc<dyn Array> for RecordBatch
let columns: Vec<Arc<dyn Array>> = column_data.into_iter().map(|arr| arr.into()).collect();
let batch = RecordBatch::try_new(schema.clone(), columns)?;
writer.write(&batch).await?;
rows.clear();
Ok(())
} }

43
src/test.rs Normal file
View file

@ -0,0 +1,43 @@
use derive_more::{Deref, DerefMut};
use mysql_async::Pool;
use testcontainers_modules::testcontainers::ContainerAsync;
use crate::Error;
// This is a bit YOLO but is the quickest/cleanest way I can think of to a) get
// a connection per database and b) ensure `Drop` cleans it up.
#[derive(Deref, DerefMut)]
struct TestPool(
#[deref]
#[deref_mut]
Pool,
// Never read, only needed for `Drop`.
#[allow(dead_code)] ContainerAsync<testcontainers_modules::mysql::Mysql>,
);
impl TestPool {
async fn try_new() -> Result<TestPool, Error> {
use testcontainers_modules::{mysql, testcontainers::runners::AsyncRunner};
let mysql = mysql::Mysql::default().start().await?;
let url = format!(
"mysql://root@{}:{}/test",
mysql.get_host().await?,
mysql.get_host_port_ipv4(3306).await?
);
let pool = Pool::from_url(&url)?;
Ok(TestPool(pool, mysql))
}
}
#[tokio::test]
async fn test_mysql() -> Result<(), Error> {
let pool = TestPool::try_new().await?;
crate::load_data(&pool).await?;
time_test::time_test!();
crate::convert_data(&pool, "employees").await?;
crate::convert_data(&pool, "departments").await?;
<mysql_async::Pool as Clone>::clone(&pool)
.disconnect()
.await?;
Ok(())
}