If I had to guess, I'd expect that this step -- storing and reading in the data in a binary serialization format -- will be the most significant performance improvement of any.
(But I don't guess, I measure. Disregard!)
People like "plain text" file formats because they can open up the file and understand it -- it's easy to read. Binary serialization is the same idea, just making it easy for computers to read instead of people.
You may have heard, us programmers, we're all waiting on IO. We can't possibly improve the state of things, because, we're just waiting on our 150 database queries to return before we can do anything anyways. We need greenthreads and async/await -- it's an IO world baby!
Except, not when your IO brings back JSON, then it's CPU-bound:
cat wiki-articles.json | python3 json-lines.py # parsed 5,032,105 json documents in 118.7sec # 8869462971 bytes (8,458.6MB, 71.3MB/sec) time cat wiki-articles.json | wc -l # 5032105 # # real 0m4.060s # user 0m0.646s # sys 0m5.512s python3 -c "print('cat | wc -l -> {:,.1f}MB/sec'.format(8869462971 / 4.060 / 1024 / 1024))" # cat | wc -l -> 2,083.4MB/sec python3 -c "print(71.3 / 2083.4)" # 0.034222904867044254
Python JSON parsing (full disclosure: using the import json
parser, not something faster like ujson
) only acheives 3% of the throughput that cat | wc -l
does. Both involved the exact same IO!
Now, you might remember our "first pass" Rust representation of a trade (i.e. a row in our CSV data):
use serde::Serialize; use markets::crypto::{Exchange, Ticker, Side}; #[derive(Serialize)] struct Trade { pub time: u64, pub price: f64, pub amount: f64, pub exch: Exchange, pub ticker: Ticker, pub side: Option<Side>, pub server_time: Option<u64>, } std::mem::size_of::<Trade>() // -> 48
Here's a JSON representation (133 bytes "pretty", 119 bytes condensed):
{ "time": 1527811201900505632, "price": 7492.279785, "amount": 0.048495, "exch": "bits", "server_time": 0, "side": null }
What makes JSON slow? For one thing, there's no types, so it takes some work to determine what type each item is. Next, the numbers have to be parsed from their text representations, which is actually pretty involved. Also, you have no idea what size an object will be when you begin parsing a JSON stream, so you can't pre-allocate memory.
If you want the computer to go fast, make it easy for the computer!
In contrast, binary representations store data similarly or identically to the way it will be represented in memory upon parsing. Entire categories of work are avoided. If types are known, the exact size needed to store the data can be known. Further, the binary representation is far more compact than any plain-text format, so it takes up less space to store on disk or send over the wire.
How Small?
So, lets get something really small and compact. Can we do 32 bytes?
First pertinent fact: markets::crypto
types Exchange
, Currency
, Side
are all #[repr(u8)]
and each enum variant has a u8
representation permanently associated with it, e.g. Currency::btc
is 1u8
and Currency::usd
is 100u8
(crypto currencies start at 1, fiat currencies start at 100). Ticker
, meanwhile, is just two Currency
fields, or two bytes.
Second pertinent fact: neither of our two optional fields, side
and server_time
, need to store separate "tag" data of whether the value is Some(_)
or None
, but can instead use the storage of the data itself.
In the case of server_time
, missing values were already stored as 0
in the source CSV file. So we know that we can use 0
as a sentinel value for None
for server_time
.
Side
is a two-variant enum, with Side::Bid
represented as 1u8
and Side::Ask
represented as 2u8
in the markets::crypto
encoding. So 0u8
can be None
.
Rust actually has a special set of "nonzero" types for just this purpose.
$ evcxr
>> std::mem::size_of::<Option<u64>>()
16
>> std::mem::size_of::<Option<std::num::NonZeroU64>>()
8
Storing server_time
as an Option<NonZeroU64>
and relying on the already thrifty memory layouts of the markets
enums, and we are already down to 36 bytes:
#[repr(packed)] pub struct Trade36 { pub exch: Exchange, pub ticker: Ticker, pub side: Option<Side>, pub time: u64, pub price: f64, pub amount: f64, pub server_time: Option<NonZeroU64>, } assert_eq!(size_of::<Trade36>(), 36);
To cut another 4 bytes, we can represent server_time as an i32
offset relative to time
. Unfortunately, the relatively small range of i32
cannot represent the deltas in the data at nanoseconds precision -- it has to be reduced all the way to millisecond precision, which is quite coarse.
Technically, truncating the server times to milliseconds violates one of our rules, that any storage format must retain the full data represented by the source CSV file. But, since we'll return to storing the full representation down to the nanosecond in the very next subchapter, I'll allow it.
If you're wondering whether it's worth a hill of beans whether our binary representation is 32 or 36 bytes, it actually might be. A CPU's "cache line" is generally 64 bytes, which lets us fetch two rows in one cache line at 32 bytes (but not at 36 bytes). In order to benefit from this, we will also need to specify the memory alignment of the data we read in to parse, so that we aren't splitting cache lines due to alignment. More broadly, I have also found that powers of two are good for performance in many, varied contexts. Finally, this is my current, operating theory; I'm not positive any of this matters. To find out, we'll need to measure.
It is possible, using the bincode
crate, to get Serde's Serialize
and Deserialize
to represent a Trade
at 32 bytes, using the above techniques. But, it's a bit complicated. The Trade
struct would look like this:
#[derive(Deserialize, Serialize, Debug, Clone)] pub struct Serde32BytesTrade { pub time: u64, #[serde(with = "try_from_u8")] pub exch: Exchange, #[serde(with = "try_from_u8")] pub ticker: Ticker, pub price: f64, pub amount: f64, pub side: Option<Side>, pub server_time: Option<NonZeroI32>, }
The #[serde(with = "try_from_u8")]
annotations are the primary tricky part. The with
argument allows you to specify a mod
that has serialize
and deserialize
functions that will be used to the annotated field (instead of the type's Serialize
and Deserialize
implementations).
You can see the implementation of try_from_u8
here -- it's fairly involved. (Prior to this, that code had been sitting in a private crate of mine, but I believe I largely based it off someone else's code.)
Using Serde here is totally justifiable, but I prefer to do this particular kind of serializtion manually, and here's why:
- the manual serialization code documents the format. While it's certainly possible to "recover" the format that
bincode
uses by looking at its source code, that format isn't documented, and could change in a future update - trying to force Serde (or any framework) to do things that are outside the box can be hard, in many cases harder than just doing it manually.
try_from_u8
is an example of that kind of complexity, but to get a 32 bytes serialization from Serde I also spent some time chasing other rabbit holes that aren't included here
Getting Our Hands Dirty
Reminder: this is possible with Serde: just use bincode::serialize_into
and bincode::deserialize
. The Postcard crate also looks interesting, although I haven't had occassion to use it.
But, here's the down-and-dirty layout I came up with, ASCII-diagram and all:
/// Represents the serialized form of a trades row /// /// ```console,ignore /// 1 2 3 /// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 /// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ /// |e|b|q|s| srvtm | time: u64 | price: f64 | amount: f64 | /// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ /// | | | | | /// | | | | | /// | | | | | /// | | | | -> server_time: Option<i32> - 0=None, other=nano offset from `time` /// | | | | /// | | | -> side: Option<Side> - 0=None, 1=Bid, 2=Ask /// | | | /// | | -> quote: Currency - see markets::crypto for u8 <-> currency codes /// | | /// | -> base: Currency - see markets::crypto for u8 <-> currency codes /// | /// -> exch: Exchange - see markets::crypto for u8 <-> exchange codes /// /// ``` /// #[derive(Debug, Clone)] pub struct PackedTrade { pub exch: u8, pub base: u8, pub quote: u8, /// 0=None pub side: u8, /// relative offset from `time`; 0=None pub server_time: i32, pub time: u64, pub price: f64, pub amount: f64, }
A PackedTrade
is more for conceptual purposes. It represents data layout only in terms of primitive types (i.e. no Option<NonZeroI32>
. In practice, you might use this as a "builder" struct, to put the data in prior to performing the logic necessary to get the actual, nice types you want to end up with.
Another thing to keep in mind is, when you go to act on data stored in this layout, it will start as a &[u8]
slice, and you may not be using every field. So one technique I've used is to create a helper struct that knows how to pull out each field out of a byte slice:
#[repr(align(32))] pub struct PackedTradeData<'a>(&'a [u8]);
Implementing deserialization with PackedTradeData
looks like this:
impl<'a> PackedTradeData<'a> { const EXCH_OFFSET : usize = 0; const BASE_OFFSET : usize = 1; const QUOTE_OFFSET : usize = 2; const SIDE_OFFSET : usize = 3; const SERVER_TIME_OFFSET : usize = 4; const TIME_OFFSET : usize = 8; const PRICE_OFFSET : usize = 16; const AMOUNT_OFFSET : usize = 24; #[inline] pub fn exch(&self) -> Result<Exchange, markets::crypto::Error> { Exchange::try_from(self.0[Self::EXCH_OFFSET]) } #[inline] pub fn base(&self) -> Result<Currency, markets::crypto::Error> { Currency::try_from(self.0[Self::BASE_OFFSET]) } #[inline] pub fn quote(&self) -> Result<Currency, markets::crypto::Error> { Currency::try_from(self.0[Self::QUOTE_OFFSET]) } // snip.. #[inline] pub fn time(&self) -> u64 { u64::from_le_bytes( (&self.0[Self::TIME_OFFSET..(Self::TIME_OFFSET + 8)]).try_into().unwrap() ) } #[inline] pub fn price(&self) -> f64 { f64::from_le_bytes( (&self.0[Self::PRICE_OFFSET..(Self::PRICE_OFFSET + 8)]).try_into().unwrap() ) } // snip.. }
The full impl
block is here.
Note that exch
, base
, quote
and side
are fallible (i.e. return a Result
, but time
, price
and amount
are infallible. That's because not every u8
corresponds to a Exchange
or Currency
variant, so we still perform validation.
(Arguably, validation on only some of the fields is nonsensical. We are trusting that whatever bytes are in &self.0[8..16]
represent the time
field. There should be, and will be, very good reasons to trust that about the slice we create a PackedTradeData
with. So why validate (at some performance cost) the other fields?)
All numbers are all encoded in Little Endian, which is what I always use.
Finally, we need code to serialize a trade as it comes from the CSV file into our compact binary representation:
/// A trade as it is represented in the source CSV file #[derive(Deserialize, Serialize, Debug, Clone)] pub struct CsvTrade { pub time: u64, pub exch: Exchange, pub ticker: Ticker, pub price: f64, pub amount: f64, // deserialize_with functions allow 0 to be treated as None #[serde(deserialize_with = "deserialize_csv_side")] pub side: Option<Side>, #[serde(deserialize_with = "deserialize_csv_server_time")] pub server_time: Option<u64>, } pub fn serialize<'a, 'b>(buf: &'a mut [u8], trade: &'b CsvTrade) { assert_eq!(buf.len(), SERIALIZED_SIZE); buf[EXCH_OFFSET] = u8::from(trade.exch); buf[BASE_OFFSET] = u8::from(trade.ticker.base); buf[QUOTE_OFFSET] = u8::from(trade.ticker.quote); match trade.side { Some(side) => { buf[SIDE_OFFSET] = u8::from(side); } None => { buf[SIDE_OFFSET] = 0; } } match trade.server_time { Some(st) => { let delta: i32 = server_time_to_delta(trade.time, st); (&mut buf[SERVER_TIME_OFFSET..(SERVER_TIME_OFFSET + 4)]).copy_from_slice(&delta.to_le_bytes()[..]); } None => { (&mut buf[SERVER_TIME_OFFSET..(SERVER_TIME_OFFSET + 4)]).copy_from_slice(&0i32.to_le_bytes()[..]); } } (&mut buf[TIME_OFFSET..(TIME_OFFSET + 8)]).copy_from_slice(&trade.time.to_le_bytes()[..]); (&mut buf[PRICE_OFFSET..(PRICE_OFFSET + 8)]).copy_from_slice(&trade.price.to_le_bytes()[..]); (&mut buf[AMOUNT_OFFSET..(AMOUNT_OFFSET + 8)]).copy_from_slice(&trade.amount.to_le_bytes()[..]); }
If you were planning to store this binary representation of the data as the permanent, authoritative storage of the underlying data on disk, it would be a wise idea to include some kind of header marking the format (which I am dubbing "ptf" for "pipelines trade format"), version, etc. However, since we are just doing exploratory data analysis, we'll skip that part. Instead, I just write each record to disk in order, and the size of the file can be used to derive the number of records in it -- it's just n_bytes / 32
. Here's the crux of the code to write the binary encoded records to disk:
let mut rdr: csv::Reader<std::io::BufWriter<std::fs::File>> = // ... let headers: csv::ByteRecord = // ... let mut row: csv::ByteRecord = // ... let mut wtr: std::io::BufWriter<std::fs::File> = // ... let mut buf: [u8; 32] = [0; 32]; while rdr.read_byte_record(&mut row) .map_err(|e| { format!("reading row {} failed: {}", (n+1).thousands_sep(), e) })? { let trade: encoding::CsvTrade = row.deserialize(Some(&headers)).map_err(|e| e.to_string())?; encoding::serialize(&mut buf[..], &trade); let bytes_written = wtr.write(&buf[..]).map_err(|e| e.to_string())?; assert_eq!(bytes_written, encoding::SERIALIZED_SIZE); }
Not too bad!
The conversion program runs in about 6.5 minutes. The resulting binary file is 27.1G (less than half the size of the 62G CSV file). Compression via zstd -3
reduces the size to 12.7G (47%).
I'm eager to see what kind of performance we get, so I write a quick routine just to scan all the rows, counting how many are from the gdax and bmex exchanges:
// inside fn that returns Result<_, String> ... let input_file = fs::OpenOptions::new() .read(true) .open(input_path) .map_err(|e| e.to_string())?; let file_length = input_file.metadata().unwrap().len(); if file_length % encoding::SERIALIZED_SIZE as u64 != 0 { return Err(format!("file length is not a multiple of record size: {}", file_length)) } let n_records: usize = file_length as usize / encoding::SERIALIZED_SIZE; info!(logger, "parsing file"; "n_records" => %n_records.thousands_sep()); // memory map the file, mostly for convenience, since it can be treated // like a big byte slice let data: memmap::Mmap = unsafe { memmap::Mmap::map(&input_file) .map_err(|e| { format!("creating Mmap failed: {}", e) })? }; let mut n_gdax = 0; let mut n_bmex = 0; for i in 0..n_records { let j = i * encoding::SERIALIZED_SIZE; let k = j + encoding::SERIALIZED_SIZE; let packed = encoding::PackedTradeData::new(&data[j..k]); n_gdax += (packed.exch().unwrap() == e!(gdax)) as usize; n_bmex += (packed.exch().unwrap() == e!(bmex)) as usize; n += 1; }
The results are pretty incredible. Disk IO and memory bandwidth suddenly matter, versus us being CPU-limited when parsing CSV rows, so there is a big gap between running with a cold disk cache (acheived by invoking echo 3 > /proc/sys/vm/drop_caches
immediately prior to running) versus a second run with the disk reads cached by the operating system.
With a cold cache, we're cruising at 8.5x the throughput of CSV parsing, while keeping the single SSD I'm reading from pegged at 500 MB/sec:
Query | No Query: parsing (and light row counting) only |
Hardware | coolidge: 2x Xeon E5-2670, 128G ddr3 |
Disk Cache | cold |
Elapsed time | 56.3sec |
Throughput (rows/sec) | 16.1 million |
Peak memory usage | 28MB |
But once the disk cache is warm, we're really cooking with gas:
Query | No Query: parsing (and light row counting) only |
Hardware | coolidge: 2x Xeon E5-2670, 128G ddr3 |
Disk Cache | warm |
Elapsed time | 7.72sec |
Throughput (rows/sec) | 117.6 million |
Peak memory usage | 28MB |
That is 62x the throughput of CSV (specifically, a Rust program parsing CSV)!
Considering our design is single-threaded, row-based, and uses no compression, SIMD, or other tricks -- it's pretty fast!
On a z1d.metal instance, we top 100 million rows/second on a cold disk cache:
Query | No Query: parsing (and light row counting) only |
Hardware | z1d.metal |
Disk Cache | cold |
Elapsed time | 8.35sec |
Throughput (rows/sec) | 109.1 million |
Peak memory usage | 28MB |
The instance has dual NVMe drives, setup as a mdadm raid 0 with an xfs file system. During the program's execution, the array is pulling down 3.5G/second from the file, accroding to iostat
.
Here's z1d.metal with a warm cache:
Query | No Query: parsing (and light row counting) only |
Hardware | z1d.metal |
Disk Cache | warm |
Elapsed time | 5.25sec |
Throughput (rows/sec) | 173.0 million |
Peak memory usage | 28MB |
Woah!
Watch out, it's hot!
Query Execution
Ok, ok. Lets do the queries.
No huge changes between our first program and this one in terms of query execution. We're still performing a single pass over every row, with incremental, online computation of the aggregations. But, this time, the CPU work to perform the query execution is actually noticeable since we can scream through 173 million rows per second.
One thing worth mentioning is the code to filter rows by exchange and ticker. The first way I write it looks like this (in the context of the easy query):
let trade: encoding::PackedTradeData::new(record); match (trade.exch(), trade.base(), trade.quote()) { (Ok(e!(bmex)), Ok(c!(btc)), Ok(c!(usd))) => { bmex_total += trade.price() * trade.amount(); bmex_amount += trade.amount(); n_bmex += 1; } (Ok(e!(gdax)), Ok(c!(btc)), Ok(c!(usd))) => { gdax_total += trade.price() * trade.amount(); gdax_amount += trade.amount(); n_gdax += 1; } _ => {} }
You'll notice that instead of refering to trade.time
, trade.price
and so on (i.e. to the struct members of trade
), we are calling accessor methods like trade.time()
, trade.price()
. That's because the trade
object is just a wrapper around a byte slice, and we are only parsing the fields we need.
I'm fairly confident the compiler will merge the repeated calls to a single accessor method into one, so any work is only done once, but it does seem like something that could possibly end up being called repeatedly for unforseen reasons.
The primary performance concern I have is the match
block on a tuple of Result
instances. First, I've experienced poor performance from tuples in Rust before (nothing terrible, but the kind of thing that matters in the hottest part of a loop you're hitting 900 million times). Secondly, it's possible the Result
type could introduce some indirection compared to acting on the data as primitive types.
Here's our initial performance on the easy query, cold cache:
Query | Easy |
Hardware | z1d.metal |
Disk Cache | cold |
Elapsed time | 17.1sec |
Throughput (rows/sec) | 53.3 million |
Peak memory usage | 28MB |
Easy query, warm cache:
Query | Easy |
Hardware | z1d.metal |
Disk Cache | warm |
Elapsed time | 14.2sec |
Throughput (rows/sec) | 64.1 million |
Peak memory usage | 28MB |
Hard query, cold cache:
Query | Hard |
Hardware | z1d.metal |
Disk Cache | cold |
Elapsed time | 21.1sec |
Throughput (rows/sec) | 43.1 million |
Peak memory usage | 28MB |
Hard query, warm cache:
Query | Hard |
Hardware | z1d.metal |
Disk Cache | warm |
Elapsed time | 18.6sec |
Throughput (rows/sec) | 49.0 million |
Peak memory usage | 28MB |
I have an idea that might speed up the row filtering: to treat the first three bytes (which represent exchange, base and quote) as an i32
integer (masking the 4th byte somehow), and then comparing the first three bytes of each record as an i32
against the two "i32" values we are looking for.
It's getting a bit esoteric, but here's how it works. First, a way to get the exchange/base/quote/side bytes as an "i32" as a method on PackedTradeData
:
impl<'a> PackedTradeData<'a> { #[inline] pub fn meta_i32(&self) -> i32 { i32::from_le_bytes((&self.0[..4]).try_into().unwrap()) } }
Secondly, setup the exact values we are looking for, as well as a mask to always turn the final, fourth byte (representing Option<Side>
) to 0:
/// bmex = 6 /// btc = 1 /// usd = 100 /// /// bmex:btc/usd mask -> [6, 1, 100, 0] as i32 const BMEX_BTC_USD : i32 = i32::from_le_bytes([ e!(bmex) as u8, c!(btc) as u8, c!(usd) as u8, 0 ]); /// gdax = 3 /// btc = 1 /// usd = 100 /// /// gdax:btc/usd mask -> [3, 1, 100, 0] as i32 const GDAX_BTC_USD : i32 = i32::from_le_bytes([ e!(gdax) as u8, c!(btc) as u8, c!(usd) as u8, 0 ]); /// this is used to leave the first three bytes intact, and wipe whatever is in the forth byte to 0 const MASK : i32 = i32::from_le_bytes([ 255, 255, 255, 0 ]); // ... therefore // // we can check the exch/base/quote in one operation // by checking whether the "meta bytes" are exactly equal // to BMEX_BTC_USD or GDAX_BTC_USD after &-ing with MASK
Then, when processing each row, convert the equality comparisons between the masked meta_i32
integer and our target filter integers into an f64
"mask" that can be multiplied against the potential accumulation, either confirming or negating it:
let meta_sans_side: i32 = trade.meta_i32() & MASK; let is_bmex_btc_usd: f64 = (meta_sans_side == BMEX_BTC_USD) as u8 as f64; let is_gdax_btc_usd: f64 = (meta_sans_side == GDAX_BTC_USD) as u8 as f64; let amount = trade.amount(); let total = trade.price() * amount; bmex_total += is_bmex_btc_usd * total; bmex_amount += is_bmex_btc_usd * amount; n_bmex += is_bmex_btc_usd as usize * 1; gdax_total += is_gdax_btc_usd * total; gdax_amount += is_gdax_btc_usd * amount; n_gdax += is_gdax_btc_usd as usize * 1;
Even though it might seem like more work to do the computation every iteration, even when either or both of the values of is_bmex_btc_usd
and is_gdax_btc_usd
could be 0.0
, leaving the accumulator values untouched, CPUs are generally better at doing the same work over and over again then they are doing less work that involves branching.
And boy do the results bear this out, increasing throughput 2.3x on the easy query:
Query | Easy |
Hardware | z1d.metal |
Disk Cache | warm |
Elapsed time | 6.23sec |
Throughput (rows/sec) | 145.8 million |
Peak memory usage | 28MB |
With an i7-10850H (Thinkpad laptop):
Query | Easy |
Hardware | i7-10850H |
Disk Cache | warm |
Elapsed time | 3.75sec |
Throughput (rows/sec) | 242.4 million |
Peak memory usage | 27MB |
Query | Hard |
Hardware | i7-10850H |
Disk Cache | warm |
Elapsed time | 14.37sec |
Throughput (rows/sec) | 63.2 million |
Peak memory usage | 28MB |
Even better results using a bare metal OVH server with a Xeon E-2386G (3.50 Ghz base, 5.10Ghz max turbo), topping out at 290.7 million rows/sec:
Query | Easy |
Hardware | Xeon E2386G |
Disk Cache | cold |
Elapsed time | 10.51sec |
Throughput (rows/sec) | 86.4 million |
Peak memory usage | 28MB |
Query | Easy |
Hardware | Xeon E2386G |
Disk Cache | warm |
Elapsed time | 3.12sec |
Throughput (rows/sec) | 290.7 million |
Peak memory usage | 28MB |
Query | Hard |
Hardware | Xeon E2386G |
Disk Cache | warm |
Elapsed time | 11.88sec |
Throughput (rows/sec) | 76.5 million |
Peak memory usage | 28MB |