Binary Serialization: a First Step Towards Data-Oriented Design

If I had to guess, I'd expect that this step -- storing and reading in the data in a binary serialization format -- will be the most significant performance improvement of any.

(But I don't guess, I measure. Disregard!)

People like "plain text" file formats because they can open up the file and understand it -- it's easy to read. Binary serialization is the same idea, just making it easy for computers to read instead of people.

You may have heard, us programmers, we're all waiting on IO. We can't possibly improve the state of things, because, we're just waiting on our 150 database queries to return before we can do anything anyways. We need greenthreads and async/await -- it's an IO world baby!

Except, not when your IO brings back JSON, then it's CPU-bound:

cat wiki-articles.json | python3 json-lines.py 
# parsed 5,032,105 json documents in 118.7sec
# 8869462971 bytes (8,458.6MB, 71.3MB/sec)

time cat wiki-articles.json | wc -l
# 5032105
# 
# real	0m4.060s
# user	0m0.646s
# sys	0m5.512s

python3 -c "print('cat | wc -l -> {:,.1f}MB/sec'.format(8869462971 / 4.060 / 1024 / 1024))"
# cat | wc -l -> 2,083.4MB/sec

python3 -c "print(71.3 / 2083.4)"
# 0.034222904867044254

Python JSON parsing (full disclosure: using the import json parser, not something faster like ujson) only acheives 3% of the throughput that cat | wc -l does. Both involved the exact same IO!

Now, you might remember our "first pass" Rust representation of a trade (i.e. a row in our CSV data):

use serde::Serialize;
use markets::crypto::{Exchange, Ticker, Side};

#[derive(Serialize)]
struct Trade {
    pub time: u64,
    pub price: f64,
    pub amount: f64,
    pub exch: Exchange,
    pub ticker: Ticker,
    pub side: Option<Side>,
    pub server_time: Option<u64>,
}

std::mem::size_of::<Trade>() // -> 48

Here's a JSON representation (133 bytes "pretty", 119 bytes condensed):

{
  "time": 1527811201900505632,
  "price": 7492.279785,
  "amount": 0.048495,
  "exch": "bits",
  "server_time": 0,
  "side": null
}

What makes JSON slow? For one thing, there's no types, so it takes some work to determine what type each item is. Next, the numbers have to be parsed from their text representations, which is actually pretty involved. Also, you have no idea what size an object will be when you begin parsing a JSON stream, so you can't pre-allocate memory.

If you want the computer to go fast, make it easy for the computer!

In contrast, binary representations store data similarly or identically to the way it will be represented in memory upon parsing. Entire categories of work are avoided. If types are known, the exact size needed to store the data can be known. Further, the binary representation is far more compact than any plain-text format, so it takes up less space to store on disk or send over the wire.

How Small?

So, lets get something really small and compact. Can we do 32 bytes?

First pertinent fact: markets::crypto types Exchange, Currency, Side are all #[repr(u8)] and each enum variant has a u8 representation permanently associated with it, e.g. Currency::btc is 1u8 and Currency::usd is 100u8 (crypto currencies start at 1, fiat currencies start at 100). Ticker, meanwhile, is just two Currency fields, or two bytes.

Second pertinent fact: neither of our two optional fields, side and server_time, need to store separate "tag" data of whether the value is Some(_) or None, but can instead use the storage of the data itself.

In the case of server_time, missing values were already stored as 0 in the source CSV file. So we know that we can use 0 as a sentinel value for None for server_time.

Side is a two-variant enum, with Side::Bid represented as 1u8 and Side::Ask represented as 2u8 in the markets::crypto encoding. So 0u8 can be None.

Rust actually has a special set of "nonzero" types for just this purpose.

$ evcxr
>> std::mem::size_of::<Option<u64>>()
16
>> std::mem::size_of::<Option<std::num::NonZeroU64>>()
8

Storing server_time as an Option<NonZeroU64> and relying on the already thrifty memory layouts of the markets enums, and we are already down to 36 bytes:

#[repr(packed)]
pub struct Trade36 {
    pub exch: Exchange,
    pub ticker: Ticker,
    pub side: Option<Side>,

    pub time: u64,
    pub price: f64,
    pub amount: f64,

    pub server_time: Option<NonZeroU64>,
}

assert_eq!(size_of::<Trade36>(), 36);

To cut another 4 bytes, we can represent server_time as an i32 offset relative to time. Unfortunately, the relatively small range of i32 cannot represent the deltas in the data at nanoseconds precision -- it has to be reduced all the way to millisecond precision, which is quite coarse.

Technically, truncating the server times to milliseconds violates one of our rules, that any storage format must retain the full data represented by the source CSV file. But, since we'll return to storing the full representation down to the nanosecond in the very next subchapter, I'll allow it.

If you're wondering whether it's worth a hill of beans whether our binary representation is 32 or 36 bytes, it actually might be. A CPU's "cache line" is generally 64 bytes, which lets us fetch two rows in one cache line at 32 bytes (but not at 36 bytes). In order to benefit from this, we will also need to specify the memory alignment of the data we read in to parse, so that we aren't splitting cache lines due to alignment. More broadly, I have also found that powers of two are good for performance in many, varied contexts. Finally, this is my current, operating theory; I'm not positive any of this matters. To find out, we'll need to measure.

It is possible, using the bincode crate, to get Serde's Serialize and Deserialize to represent a Trade at 32 bytes, using the above techniques. But, it's a bit complicated. The Trade struct would look like this:

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct Serde32BytesTrade {
    pub time: u64,
    #[serde(with = "try_from_u8")]
    pub exch: Exchange,
    #[serde(with = "try_from_u8")]
    pub ticker: Ticker,
    pub price: f64,
    pub amount: f64,
    pub side: Option<Side>,
    pub server_time: Option<NonZeroI32>,
}

The #[serde(with = "try_from_u8")] annotations are the primary tricky part. The with argument allows you to specify a mod that has serialize and deserialize functions that will be used to the annotated field (instead of the type's Serialize and Deserialize implementations).

You can see the implementation of try_from_u8 here -- it's fairly involved. (Prior to this, that code had been sitting in a private crate of mine, but I believe I largely based it off someone else's code.)

Using Serde here is totally justifiable, but I prefer to do this particular kind of serializtion manually, and here's why:

  • the manual serialization code documents the format. While it's certainly possible to "recover" the format that bincode uses by looking at its source code, that format isn't documented, and could change in a future update
  • trying to force Serde (or any framework) to do things that are outside the box can be hard, in many cases harder than just doing it manually. try_from_u8 is an example of that kind of complexity, but to get a 32 bytes serialization from Serde I also spent some time chasing other rabbit holes that aren't included here

Getting Our Hands Dirty

Reminder: this is possible with Serde: just use bincode::serialize_into and bincode::deserialize. The Postcard crate also looks interesting, although I haven't had occassion to use it.

But, here's the down-and-dirty layout I came up with, ASCII-diagram and all:

/// Represents the serialized form of a trades row
///
/// ```console,ignore
///                      1                   2                   3
///  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/// |e|b|q|s| srvtm | time: u64     | price: f64    | amount: f64   |
/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
///  | | | | |
///  | | | | |
///  | | | | |
///  | | | | -> server_time: Option<i32> - 0=None, other=nano offset from `time`
///  | | | |
///  | | | -> side: Option<Side> - 0=None, 1=Bid, 2=Ask
///  | | |  
///  | | -> quote: Currency - see markets::crypto for u8 <-> currency codes
///  | |    
///  | -> base: Currency - see markets::crypto for u8 <-> currency codes
///  |     
///  -> exch: Exchange - see markets::crypto for u8 <-> exchange codes
///
///  ```
///
#[derive(Debug, Clone)]
pub struct PackedTrade {
    pub exch: u8,
    pub base: u8,
    pub quote: u8,

    /// 0=None
    pub side: u8,

    /// relative offset from `time`; 0=None
    pub server_time: i32,

    pub time: u64,
    pub price: f64,
    pub amount: f64,
}

A PackedTrade is more for conceptual purposes. It represents data layout only in terms of primitive types (i.e. no Option<NonZeroI32>. In practice, you might use this as a "builder" struct, to put the data in prior to performing the logic necessary to get the actual, nice types you want to end up with.

Another thing to keep in mind is, when you go to act on data stored in this layout, it will start as a &[u8] slice, and you may not be using every field. So one technique I've used is to create a helper struct that knows how to pull out each field out of a byte slice:

#[repr(align(32))]
pub struct PackedTradeData<'a>(&'a [u8]);

Implementing deserialization with PackedTradeData looks like this:

impl<'a> PackedTradeData<'a> {

    const EXCH_OFFSET           : usize = 0;
    const BASE_OFFSET           : usize = 1;
    const QUOTE_OFFSET          : usize = 2;
    const SIDE_OFFSET           : usize = 3;
    const SERVER_TIME_OFFSET    : usize = 4;
    const TIME_OFFSET           : usize = 8;
    const PRICE_OFFSET          : usize = 16;
    const AMOUNT_OFFSET         : usize = 24;

    #[inline]
    pub fn exch(&self) -> Result<Exchange, markets::crypto::Error> {
        Exchange::try_from(self.0[Self::EXCH_OFFSET])
    }

    #[inline]
    pub fn base(&self) -> Result<Currency, markets::crypto::Error> {
        Currency::try_from(self.0[Self::BASE_OFFSET])
    }

    #[inline]
    pub fn quote(&self) -> Result<Currency, markets::crypto::Error> {
        Currency::try_from(self.0[Self::QUOTE_OFFSET])
    }

    // snip..

    #[inline]
    pub fn time(&self) -> u64 {
        u64::from_le_bytes(
            (&self.0[Self::TIME_OFFSET..(Self::TIME_OFFSET + 8)]).try_into().unwrap()
        )
    }

    #[inline]
    pub fn price(&self) -> f64 {
        f64::from_le_bytes(
            (&self.0[Self::PRICE_OFFSET..(Self::PRICE_OFFSET + 8)]).try_into().unwrap()
        )
    }

    // snip..
}

The full impl block is here.

Note that exch, base, quote and side are fallible (i.e. return a Result, but time, price and amount are infallible. That's because not every u8 corresponds to a Exchange or Currency variant, so we still perform validation.

(Arguably, validation on only some of the fields is nonsensical. We are trusting that whatever bytes are in &self.0[8..16] represent the time field. There should be, and will be, very good reasons to trust that about the slice we create a PackedTradeData with. So why validate (at some performance cost) the other fields?)

All numbers are all encoded in Little Endian, which is what I always use.

Finally, we need code to serialize a trade as it comes from the CSV file into our compact binary representation:

/// A trade as it is represented in the source CSV file
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct CsvTrade {
    pub time: u64,
    pub exch: Exchange,
    pub ticker: Ticker,
    pub price: f64,
    pub amount: f64,

    // deserialize_with functions allow 0 to be treated as None

    #[serde(deserialize_with = "deserialize_csv_side")]
    pub side: Option<Side>,
    #[serde(deserialize_with = "deserialize_csv_server_time")]
    pub server_time: Option<u64>,
}

pub fn serialize<'a, 'b>(buf: &'a mut [u8], trade: &'b CsvTrade) {
    assert_eq!(buf.len(), SERIALIZED_SIZE);

    buf[EXCH_OFFSET] = u8::from(trade.exch);
    buf[BASE_OFFSET] = u8::from(trade.ticker.base);
    buf[QUOTE_OFFSET] = u8::from(trade.ticker.quote);

    match trade.side {
        Some(side) => {
            buf[SIDE_OFFSET] = u8::from(side);
        }

        None => {
            buf[SIDE_OFFSET] = 0;
        }
    }

    match trade.server_time {
        Some(st) => {
            let delta: i32 = server_time_to_delta(trade.time, st);
            (&mut buf[SERVER_TIME_OFFSET..(SERVER_TIME_OFFSET + 4)]).copy_from_slice(&delta.to_le_bytes()[..]);
        }

        None => {
            (&mut buf[SERVER_TIME_OFFSET..(SERVER_TIME_OFFSET + 4)]).copy_from_slice(&0i32.to_le_bytes()[..]);
        }
    }
    
    (&mut buf[TIME_OFFSET..(TIME_OFFSET + 8)]).copy_from_slice(&trade.time.to_le_bytes()[..]);
    (&mut buf[PRICE_OFFSET..(PRICE_OFFSET + 8)]).copy_from_slice(&trade.price.to_le_bytes()[..]);
    (&mut buf[AMOUNT_OFFSET..(AMOUNT_OFFSET + 8)]).copy_from_slice(&trade.amount.to_le_bytes()[..]);
}

If you were planning to store this binary representation of the data as the permanent, authoritative storage of the underlying data on disk, it would be a wise idea to include some kind of header marking the format (which I am dubbing "ptf" for "pipelines trade format"), version, etc. However, since we are just doing exploratory data analysis, we'll skip that part. Instead, I just write each record to disk in order, and the size of the file can be used to derive the number of records in it -- it's just n_bytes / 32. Here's the crux of the code to write the binary encoded records to disk:

let mut rdr: csv::Reader<std::io::BufWriter<std::fs::File>> = // ...
let headers: csv::ByteRecord = // ...
let mut row: csv::ByteRecord = // ...
let mut wtr: std::io::BufWriter<std::fs::File> = // ...
let mut buf: [u8; 32] = [0; 32];

while rdr.read_byte_record(&mut row)
    .map_err(|e| {
        format!("reading row {} failed: {}", (n+1).thousands_sep(), e)
    })?
{
    let trade: encoding::CsvTrade = row.deserialize(Some(&headers)).map_err(|e| e.to_string())?;

    encoding::serialize(&mut buf[..], &trade);

    let bytes_written = wtr.write(&buf[..]).map_err(|e| e.to_string())?;
    assert_eq!(bytes_written, encoding::SERIALIZED_SIZE);
}

Not too bad!

The conversion program runs in about 6.5 minutes. The resulting binary file is 27.1G (less than half the size of the 62G CSV file). Compression via zstd -3 reduces the size to 12.7G (47%).

I'm eager to see what kind of performance we get, so I write a quick routine just to scan all the rows, counting how many are from the gdax and bmex exchanges:

// inside fn that returns Result<_, String> ...

let input_file =
    fs::OpenOptions::new()
        .read(true)
        .open(input_path)
        .map_err(|e| e.to_string())?;

let file_length = input_file.metadata().unwrap().len();

if file_length % encoding::SERIALIZED_SIZE as u64 != 0 { 
    return Err(format!("file length is not a multiple of record size: {}", file_length))
}

let n_records: usize = file_length as usize / encoding::SERIALIZED_SIZE;

info!(logger, "parsing file"; "n_records" => %n_records.thousands_sep());

// memory map the file, mostly for convenience, since it can be treated
// like a big byte slice

let data: memmap::Mmap = unsafe {
    memmap::Mmap::map(&input_file)
        .map_err(|e| {
            format!("creating Mmap failed: {}", e)
        })?
};

let mut n_gdax = 0;
let mut n_bmex = 0;

for i in 0..n_records {
    let j = i * encoding::SERIALIZED_SIZE;
    let k = j + encoding::SERIALIZED_SIZE;
    let packed = encoding::PackedTradeData::new(&data[j..k]);
    n_gdax += (packed.exch().unwrap() == e!(gdax)) as usize;
    n_bmex += (packed.exch().unwrap() == e!(bmex)) as usize;
    n += 1;
}

The results are pretty incredible. Disk IO and memory bandwidth suddenly matter, versus us being CPU-limited when parsing CSV rows, so there is a big gap between running with a cold disk cache (acheived by invoking echo 3 > /proc/sys/vm/drop_caches immediately prior to running) versus a second run with the disk reads cached by the operating system.

With a cold cache, we're cruising at 8.5x the throughput of CSV parsing, while keeping the single SSD I'm reading from pegged at 500 MB/sec:

Query No Query: parsing (and light row counting) only
Hardware coolidge: 2x Xeon E5-2670, 128G ddr3
Disk Cache cold
Elapsed time 56.3sec
Throughput (rows/sec) 16.1 million
Peak memory usage 28MB

But once the disk cache is warm, we're really cooking with gas:

Query No Query: parsing (and light row counting) only
Hardware coolidge: 2x Xeon E5-2670, 128G ddr3
Disk Cache warm
Elapsed time 7.72sec
Throughput (rows/sec) 117.6 million
Peak memory usage 28MB

That is 62x the throughput of CSV (specifically, a Rust program parsing CSV)!

Considering our design is single-threaded, row-based, and uses no compression, SIMD, or other tricks -- it's pretty fast!

On a z1d.metal instance, we top 100 million rows/second on a cold disk cache:

Query No Query: parsing (and light row counting) only
Hardware z1d.metal
Disk Cache cold
Elapsed time 8.35sec
Throughput (rows/sec) 109.1 million
Peak memory usage 28MB

The instance has dual NVMe drives, setup as a mdadm raid 0 with an xfs file system. During the program's execution, the array is pulling down 3.5G/second from the file, accroding to iostat.

Here's z1d.metal with a warm cache:

Query No Query: parsing (and light row counting) only
Hardware z1d.metal
Disk Cache warm
Elapsed time 5.25sec
Throughput (rows/sec) 173.0 million
Peak memory usage 28MB

Woah!

ovechkin hot stick

Watch out, it's hot!

Query Execution

Ok, ok. Lets do the queries.

No huge changes between our first program and this one in terms of query execution. We're still performing a single pass over every row, with incremental, online computation of the aggregations. But, this time, the CPU work to perform the query execution is actually noticeable since we can scream through 173 million rows per second.

One thing worth mentioning is the code to filter rows by exchange and ticker. The first way I write it looks like this (in the context of the easy query):

let trade: encoding::PackedTradeData::new(record);

match (trade.exch(), trade.base(), trade.quote()) {
    (Ok(e!(bmex)), Ok(c!(btc)), Ok(c!(usd))) => {
        bmex_total += trade.price() * trade.amount();
        bmex_amount += trade.amount();
        n_bmex += 1;
    }

    (Ok(e!(gdax)), Ok(c!(btc)), Ok(c!(usd))) => {
        gdax_total += trade.price() * trade.amount();
        gdax_amount += trade.amount();
        n_gdax += 1;

    }
    
    _ => {}
}

You'll notice that instead of refering to trade.time, trade.price and so on (i.e. to the struct members of trade), we are calling accessor methods like trade.time(), trade.price(). That's because the trade object is just a wrapper around a byte slice, and we are only parsing the fields we need.

I'm fairly confident the compiler will merge the repeated calls to a single accessor method into one, so any work is only done once, but it does seem like something that could possibly end up being called repeatedly for unforseen reasons.

The primary performance concern I have is the match block on a tuple of Result instances. First, I've experienced poor performance from tuples in Rust before (nothing terrible, but the kind of thing that matters in the hottest part of a loop you're hitting 900 million times). Secondly, it's possible the Result type could introduce some indirection compared to acting on the data as primitive types.

Here's our initial performance on the easy query, cold cache:

Query Easy
Hardware z1d.metal
Disk Cache cold
Elapsed time 17.1sec
Throughput (rows/sec) 53.3 million
Peak memory usage 28MB

Easy query, warm cache:

Query Easy
Hardware z1d.metal
Disk Cache warm
Elapsed time 14.2sec
Throughput (rows/sec) 64.1 million
Peak memory usage 28MB

Hard query, cold cache:

Query Easy
Hardware z1d.metal
Disk Cache cold
Elapsed time 21.1sec
Throughput (rows/sec) 43.1 million
Peak memory usage 28MB

Hard query, warm cache:

Query Easy
Hardware z1d.metal
Disk Cache warm
Elapsed time 18.6sec
Throughput (rows/sec) 49.0 million
Peak memory usage 28MB

I have an idea that might speed up the row filtering: to treat the first three bytes (which represent exchange, base and quote) as an i32 integer (masking the 4th byte somehow), and then comparing the first three bytes of each record as an i32 against the two "i32" values we are looking for.

It's getting a bit esoteric, but here's how it works. First, a way to get the exchange/base/quote/side bytes as an "i32" as a method on PackedTradeData:

impl<'a> PackedTradeData<'a> {
    #[inline]
    pub fn meta_i32(&self) -> i32 {
        i32::from_le_bytes((&self.0[..4]).try_into().unwrap())
    }
}

Secondly, setup the exact values we are looking for, as well as a mask to always turn the final, fourth byte (representing Option<Side>) to 0:

const MASK          : i32 = i32::from_le_bytes([ 255,            255,           255,           0 ]);
const BMEX_BTC_USD  : i32 = i32::from_le_bytes([ e!(bmex) as u8, c!(btc) as u8, c!(usd) as u8, 0 ]);
const GDAX_BTC_USD  : i32 = i32::from_le_bytes([ e!(gdax) as u8, c!(btc) as u8, c!(usd) as u8, 0 ]);

Then, when processing each row, convert the equality comparisons between the masked meta_i32 integer and our target filter integers into an f64 "mask" that can be multiplied against the potential accumulation, either confirming or negating it:

let meta_sans_side: i32 = trade.meta_i32() & MASK; 

let is_bmex_btc_usd: f64 = (meta_sans_side == BMEX_BTC_USD) as u8 as f64;

let is_gdax_btc_usd: f64 = (meta_sans_side == GDAX_BTC_USD) as u8 as f64;

let amount = trade.amount();
let total = trade.price() * amount;

bmex_total += is_bmex_btc_usd * total;
bmex_amount += is_bmex_btc_usd * amount;
n_bmex += is_bmex_btc_usd as usize * 1;

gdax_total += is_gdax_btc_usd * total;
gdax_amount += is_gdax_btc_usd * amount;
n_gdax += is_gdax_btc_usd as usize * 1;

Even though it might seem like more work to do the computation every iteration, even when either or both of the values of is_bmex_btc_usd and is_gdax_btc_usd could be 0.0, leaving the accumulator values untouched, CPUs are generally better at doing the same work over and over again then they are doing less work that involves branching.

And boy do the results bear this out, increasing throughput 2.3x on the easy query:

Query Easy
Hardware z1d.metal
Disk Cache warm
Elapsed time 6.23sec
Throughput (rows/sec) 145.8 million
Peak memory usage 28MB