In order to deserialize the data, I'm going to be using a crate I wrote a while ago with types representing currencies, exchanges and other trading-related primitives.
The crate, markets
, is hosted at crates.jstrong.dev. To be able to use it as a dependency in a Cargo.toml
file you'd need to add an entry for the crates.jstrong.dev registry in your ~/.cargo/config
file. Instructions are here.
Our Trade
struct, representing one row of the CSV file, looks like this:
use serde::{Deserialize}; use markets::crypto::{Exchange, Currency, Ticker, Side}; #[derive(Deserialize)] struct Trade { /// Time of trade in unix nanoseconds pub time: u64, /// Exchange where trade executed pub exch: Exchange, /// Currency rate of trade (base/quote) pub ticker: Ticker, /// Price of trade, in quote denomination pub price: f64, /// Size/Volume of trade, in base denomination pub amount: f64, }
Fields in the CSV that aren't part of the struct will just be skipped during deserialization.
To parse the CSV file, I'm using the csv
crate, which provides an interface to deserialize
a row as a Deserialize
-implementing type. The core parsing loop looks like this:
use std::{io::{self, prelude::*}, fs}; use pretty_toa::ThousandsSep; // comma-separates big numbers e.g 1,000,000 // opening the file let file = fs::File::open(&opt.trades_csv) .map_err(|e| { format!("opening trades csv file failed: {} (tried to open {})", e, opt.trades_csv.display()) })?; let buf_rdr = io::BufReader::new(file); let mut rdr = csv::Reader::from_reader(buf_rdr); // parsing headers let headers: csv::ByteRecord = rdr.byte_headers() .map_err(|e| { format!("failed to parse CSV headers: {}", e) })?.clone(); // storage for one parsed row let mut row = csv::ByteRecord::new(); // count the number of rows we've parsed let mut n: usize = 0; while rdr.read_byte_record(&mut row) .map_err(|e| { format!("reading row {} failed: {}", (n+1).thousands_sep(), e) })? { let trade: Trade = row.deserialize(Some(&headers)) .map_err(|e| { format!("deserializing row failed: {}\n\nFailing row:\n{:?}", e, row) })?; n += 1; // perform operations on a parsed Trade... }
We know any textual values in our CSV are ASCII, hence the use of ByteRecord
rather than the utf-8-aware interfaces that csv
also offers. Working with bytes instead of utf-8 should avoid encoding/validation overhead.
Before adding any code to execute our query, I run the program in parsing-only mode:
RUSTFLAGS='-C target-cpu=native' cargo build --bin csv --release ./target/release/csv -h # data-pipelines 0.1.0 # # USAGE: # csv --output-path <output-path> --trades-csv <trades-csv> # # FLAGS: # -h, --help Prints help information # -V, --version Prints version information # # OPTIONS: # -o, --output-path <output-path> Where to save the query results (CSV output) # -f, --trades-csv <trades-csv> Path to CSV file with trades data ./target/release/csv -f /xfs/trades.csv -o /dev/null # no -o functionality yet anyway # Mar 25 07:40:26.428 INFO initializing..., output-path: /dev/null, trades-csv: /xfs/trades.csv, version: 0.1.0 # Mar 25 07:40:27.217 INFO parsing csv file, elapsed: 795.603376ms, n rows: 1,048,576, version: 0.1.0 # Mar 25 07:40:27.969 INFO parsing csv file, elapsed: 1.547529254s, n rows: 2,097,152, version: 0.1.0 # Mar 25 07:40:28.587 INFO parsing csv file, elapsed: 2.165865958s, n rows: 3,145,728, version: 0.1.0 # Mar 25 07:40:29.216 INFO parsing csv file, elapsed: 2.794056101s, n rows: 4,194,304, version: 0.1.0 # ... # Mar 25 07:48:53.718 INFO parsing csv file, elapsed: 507.296187058s, n rows: 905,969,664, version: 0.1.0 # Mar 25 07:48:54.392 INFO parsing csv file, elapsed: 507.970754347s, n rows: 907,018,240, version: 0.1.0 # Mar 25 07:48:55.070 INFO parsing csv file, elapsed: 508.648253189s, n rows: 908,066,816, version: 0.1.0 # Mar 25 07:48:55.158 INFO finished in 508.736808611s, rows/sec: 17,852,145.2, n rows: 908,204,336, version: 0.1.0
Parsing-only results (best of 2 runs):
Query | No Query: CSV parsing only |
Hardware | coolidge: 2x Xeon E5-2670, 128G ddr3 |
Elapsed time | 8min, 4sec |
Throughput (rows/sec) | 1.88 million |
Peak memory usage | 9MB |
These results from my old workstation at home with dual Xeon E5-2670 CPUs (they have avx
, but not avx2
).
Out of curiosity, I switched out ByteRecord
for StringRecord
to see what the overhead of utf-8 would be for parsing a large CSV file like this. It actually turned out to be slightly faster!
Query | No Query: CSV parsing only |
Hardware | coolidge: 2x Xeon E5-2670, 128G ddr3 |
Elapsed time | 8min, 1sec |
Throughput (rows/sec) | 1.89 million |
Peak memory usage | 9MB |
The lesson here (besides always measure) seems to be not that utf-8 is free, but that somewhere in the Serde Deserialize
pipeline the ByteRecord
text is converted to utf-8.
Evidence backing up this theory: combining ByteRecord
with a custom, manual deserialize function can provide a significant speedup, if you're inclined to write a function like this:
fn manual_deserialize_bytes(row: &csv::ByteRecord) -> Result<Trade, &'static str> { let time: u64 = atoi::atoi(row.get(0).ok_or("no time")?) .ok_or("parsing time failed")?; let amount: f64 = lexical::parse(row.get(1).ok_or("no amount")?) .map_err(|_| "parsing amount failed")?; let exch = match row.get(2).ok_or("no exch")? { b"bmex" => e!(bmex), b"bnce" => e!(bnce), b"btfx" => e!(btfx), b"gdax" => e!(gdax), b"okex" => e!(okex), b"bits" => e!(bits), b"plnx" => e!(plnx), b"krkn" => e!(krkn), _ => return Err("illegal exch"), }; let price: f64 = lexical::parse(row.get(3).ok_or("no price")?) .map_err(|_| "parsing price failed")?; let ticker = match row.get(6).ok_or("no ticker")? { b"btc_usd" => t!(btc-usd), b"eth_usd" => t!(eth-usd), b"ltc_usd" => t!(ltc-usd), b"etc_usd" => t!(etc-usd), b"bch_usd" => t!(bch-usd), b"xmr_usd" => t!(xmr-usd), b"usdt_usd" => t!(usdt-usd), _ => return Err("illegal ticker"), }; Ok(Trade { time, amount, exch, price, ticker }) }
One shortcut taken in manual_deserialize_bytes
is it assumes the order of the columns, which is brittle. But it's pretty fast!
Query | No Query: CSV parsing only |
Hardware | coolidge: 2x Xeon E5-2670, 128G ddr3 |
Elapsed time | 4min, 5sec |
Throughput (rows/sec) | 3.71 million |
Peak memory usage | 9MB |
A custom deserialization function that uses utf-8/StringRecord
is faster than Serde/Csv, but only about 25% of the improvement observed for the otherwise identical function that uses ByteRecord
:
Query | No Query: CSV parsing only |
Hardware | coolidge: 2x Xeon E5-2670, 128G ddr3 |
Elapsed time | 6min, 20sec |
Throughput (rows/sec) | 2.38 million |
Peak memory usage | 9MB |
While this has been an interesting experiment in CSV parsing performance, ultimately it's not what we're after (our first enhancement will be to switch to binary serialization), so I'm going to leave in Serde deserialization (with StringRecord
) for the final version of the reference implementation.