Query Execution: the Hard Part

Quick recap: the hard query is computing the ratio between the size-weighted average btc/usd price between bmex and gdax every 10 seconds for 5-minute, 15-minute and 60-minute lookback horizons from each row.

In order to calculate a rolling window computation like that efficiently, we'll use the std::collections::VecDeque data structure and perform the calculation incrementally in a single pass.

As we reach each row in the data set, we will 1) add price * amount and amount to our weighted sum accumulators, 2) subtract the price * amount and amount from the accumulators for trade rows that are now "expired," or outside the rolling window, relative to the current row.

The VecDeque is designed to be efficient for insertion/removal at both ends, which we'll rely on heavily.

To facilitate testing and keep the csv binary from getting too huge, I'm putting our online weighted sum code in a new data_pipelines::windows module.

Here are our main data structures:

use std::collections::VecDeque;

/// Calculates online weighted average for a rolling, time-based window
#[derive(Clone)]
pub struct WeightedMeanWindow {
    /// The size of the window. On `purge`, any `WeightedPoint` items are considered
    /// expired if the supplied `time` parameter is greater than `size` from the
    /// `time` attribute of that `WeightedPoint` item.
    size: u64,
    /// The weights and values with times that are "currently" in the aggregation
    /// window. On `push`, items are added to the "back" of the vedeque. On `purge`,
    /// items with a `time` that is > `size` difference relative to the `time` passed
    /// to `purge` are considered expired and removed. In both cases, adding and removing,
    /// the incremental accumulated sums in `w_sum` and `sum_w` are updated.
    items: VecDeque<WeightedPoint>,
    /// The sum of the value * weight for each of the `WeightedPoint`s in `items`.
    w_sum: f64,
    /// The sum of the weights of each of the `WeightedPoint`s in `items`.
    sum_w: f64,
}

/// Stores the time, value and weight for an item "currently" inside the
/// aggregation window of a `WeightedMeanWindow`, allowing its value and
/// weight to be subtracted from the accumulated sums of the window when
/// the item becomes expired.
#[derive(Debug, Clone)]
pub struct WeightedPoint {
    pub time: u64,
    /// value * weight. 
    ///
    /// when purging expired items, do not subtract `wt_val * wt`, as `wt_val`
    /// has already been multiplied by `wt`. Instead, simply substract `wt_val`
    /// from `w_sum`.
    pub wt_val: f64,
    pub wt: f64,
}

At the point we are processing a new row in trades CSV data, we add it to our current aggregation window, updating the incremental sums necessary to get a weighted average for the items currently in the window:

impl WeightedMeanWindow {
    /// Add a new item, updating incremental calculations in the process.
    ///
    /// Note: it is assumed that `time` is >= the highest `time` value for any previous
    /// item. The expiration logic `purge` relies on the items being added to a
    /// `WeightedMeanWindow` in chronological order.
    pub fn push(&mut self, time: u64, val: f64, wt: f64) {
        let wt_val: f64 = val * wt;
        self.w_sum += wt_val;
        self.sum_w += wt;
        self.items.push_back(WeightedPoint { time, wt_val, wt });
    }
}

At the point we want to get a weighted average for the current window at some time, we purge all the expired items relative to that time:

impl WeightedMeanWindow {
    /// Removes expired items and updates incremental calculations.
    ///
    /// Returns `true` if any items were removed.
    pub fn purge(&mut self, time: u64) -> bool {

        // this is somewhat awkwardly implemented, but there is not anything like
        // `drain_while` on `VecDeque` (or `Vec`) that would work like `take_while`,
        // except also removing the items. Since we need the data in the items we
        // are removing to update `sum_w` and `w_sum`, we loop over the expired
        // items first, counting them in `n_remove`, then actually remove them
        // in a second pass.

        let mut n_remove = 0;

        {
            // extra scope needed to shush the borrow checker

            let items = &self.items;
            let w_sum = &mut self.w_sum;
            let sum_w = &mut self.sum_w;
            let size = self.size;

            for expired in items.iter().take_while(|x| time - x.time > size) {
                *w_sum -= expired.wt_val;
                *sum_w -= expired.wt;
                n_remove += 1;
            }
        }

        for _ in 0..n_remove { self.items.pop_front(); }

        // when items is empty, set w_sum, sum_w to 0.0. the motive
        // of this approach, versus an if block with assignment, is
        // for the code to be "branchless" and do the same work each
        // time, in a cache- and branch predictor-friendly manner.
        let zeroer: f64 = ( ! self.items.is_empty()) as u8 as f64;
        self.w_sum *= zeroer;
        self.sum_w *= zeroer;

        n_remove > 0
    }
}

When new items have been added, and expired items have been evicted, both of which updated our incremental sums, pulling out the weighted mean is just a division away:

impl WeightedMeanWindow {
    /// Calculate the weighted mean from current state of incremental
    /// accumulators.
    ///
    /// Note; this value is not cached.
    pub fn weighted_mean(&self) -> f64 {
        self.w_sum / self.sum_w
    }
}

However, what if items was empty, meaning either that no data was added or that all items were expired after the last call to purge? In that case, weighted_mean would be 0.0 / 0.0 which returns NaN. To address this, WeightedMeanWindow also offers a checked_weighted_mean method that returns Some(f64) if there are any items in the window, and None if there are not:

impl WeightedMeanWindow {
    /// Checks whether items `is_empty` before trying to calculate.
    /// Returns None if items is empty.
    ///
    /// Note: this value is not cached.
    pub fn checked_weighted_mean(&self) -> Option<f64> {
        match self.is_empty() {
            true => None,
            false => Some(self.w_sum / self.sum_w),
        }
    }
}

Now to integrate the online weighted mean implementation into our CSV-processing binary. First, I add a new option to the CLI interface, --hard-mode:

data-pipelines 0.1.0

USAGE:
    csv [FLAGS] --output-path <output-path> --trades-csv <trades-csv>

FLAGS:
    -z, --hard-mode    
    -h, --help         Prints help information
    -V, --version      Prints version information

OPTIONS:
    -o, --output-path <output-path>    Where to save the query results (CSV output)
    -f, --trades-csv <trades-csv>      Path to CSV file with trades data

Inside the implementation, I need to store several categories of values at each of the lookback horizons (5, 15, and 60 minutes). When I started coding in Rust, I was used to storing this kind of thing in a hash map. But that's not the most convenient way to do it in Rust, nor the fastest. Instead, I define a quickie struct (inline inside the function) for handling this:

#[derive(Default, Clone)]
struct Lookbacks<T> {
    pub p5: T,
    pub p15: T,
    pub p60: T,
}

Because Lookbacks is generic over type T, I can store whatever kind of type I want in p5, p15, p60, so long as in any instance of Lookbacks, each of p5, p15, and p60 is the same type.

For each of the exchanges I'm tracking, bmex and gdax, I have a Lookbacks to store the WeightedMeanWindow for each time horizon for that exchange:

let mut bmex_windows: Lookbacks<WeightedMeanWindow> =
    Lookbacks { 
        p5:  WeightedMeanWindow::new(ONE_SECOND * 60 * 5 ),
        p15: WeightedMeanWindow::new(ONE_SECOND * 60 * 15),
        p60: WeightedMeanWindow::new(ONE_SECOND * 60 * 60),
    };
let mut gdax_windows = bwindows.clone();

And also create one to store the computed ratios for a given output point:

let mut ratios: Lookbacks<f64> = Default::default();

Then, as we process each new trade row, if the row's exch is Exchange::bmex or Exchange::gdax, we update the respective Lookbacks incrementally:

match trade.exch {
    Exchange::bmex => {
        bmex_windows.p5 .push(time, price, amount);
        bmex_windows.p15.push(time, price, amount);
        bmex_windows.p60.push(time, price, amount);
    }

    Exchange::gdax => {
        gdax_windows.p5 .push(time, price, amount);
        gdax_windows.p15.push(time, price, amount);
        gdax_windows.p60.push(time, price, amount);
    }

    _ => {}
}

The code is pretty repetitive, but it doesn't really bother me.

At each output time (i.e. every ten seconds), purge all expired items from the window and write the ratios of current weighted means for each time horizon to the output CSV:

if trade.time > cur_bucket {
    bmex_windows.p5 .purge(cur_bucket);
    bmex_windows.p15.purge(cur_bucket);
    bmex_windows.p60.purge(cur_bucket);

    gdax_windows.p5 .purge(cur_bucket);
    gdax_windows.p15.purge(cur_bucket);
    gdax_windows.p60.purge(cur_bucket);

    ratios.p5  = bmex_windows.p5 .weighted_mean() / gdax_windows.p5 .weighted_mean();
    ratios.p15 = bmex_windows.p15.weighted_mean() / gdax_windows.p15.weighted_mean();
    ratios.p60 = bmex_windows.p60.weighted_mean() / gdax_windows.p60.weighted_mean();

    wtr.write_record(&[
        &format!("{}", cur_bucket),
        &format!("{}", ratios.p5),
        &format!("{}", ratios.p15),
        &format!("{}", ratios.p60),
    ]).map_err(|e| {
        format!("writing csv row failed: {}", e)
    })?;
    n_written += 1;
    cur_bucket += ONE_SECOND * 10;
}

Performance on my workstation at home:

Query Hard
Hardware coolidge: 2x Xeon E5-2670, 128G ddr3
Elapsed time 8min, 6.3sec
Throughput (rows/sec) 1.87 million
Peak memory usage 18MB

Ok, but let's try it on a z1d.metal, which boasts the fastest single-core performance of any EC2 instance type!

z1d.metal build times, from scratch:

cargo check # 23 seconds
cargo build --bin csv --release # 59.68 seconds

zstd -d in 2 minutes:

time zstd --verbose -d --rm /xfs/trades.csv.zst
# *** zstd command line interface 64-bits v1.3.3, by Yann Collet ***
# /xfs/trades.csv.zst : 66569042526 bytes                                        
# 
# real	2m9.786s
# user	1m37.821s
# sys	0m31.952s

And, finally, the --hard-mode query runs nearly three minutes faster on a z1d.metal than it does using my aging Xeons at home:

Query Hard
Hardware z1d.metal
Elapsed time 5min, 23sec
Throughput (rows/sec) 2.81 million
Peak memory usage 18MB

Well, 5min, 23sec sure beats 30 years! But we'll get it much faster, yet.