Fast Data Pipelines in Rust

  • This is the story about how I re-wrote a Python script that took 68 minutes to execute into a Rust program that took less than one second to do the same work
  • Writing the Rust program took a few hours. I had been coding in Rust for just over one year at the time
  • In data analysis, the problem I've run into isn't running a slow program one time -- it's the painful feedback loop from iterating on something that's distractingly slow

I've done a ton of data analysis in Python. I don't want to rag on it too bad, a lot of times Python is fast enough, generally because the heavy-lifting of the code you write is being executed by a program written in C or Fortran a long time ago and optimized heavily since then.

But, unfortunately, there are performance cliffs all over the place. My own opinion is a lot of the people who are so adamant that Python performance is a trade-off for productivity never seem to grapple with just how much effort is being spent all the time to get acceptable performance from Python code. The battle is real.

Another thing that's often overlooked, specifically for data analysis, is the pain of iterating on something as execution time goes up.

If I have to run a program one time in my whole life, I don't really care if it takes a long time to run.

Even if it runs every day -- or every hour -- if the results are useful passively, it's ok.

On the other hand, if I have to run the program, check the output, and then improve the program, over and over again, slowness moves from annoying to distracting to excruciating pretty fast.

For me, the barrier to iterating on something that takes hours to execute is extremely high. Maybe some people can keep 27 long-running Python balls up in the air, I personally have trouble with that.

It also raises the cost of errors. Who among us has not seen the results of a long-running Jupyter notebook computation wasted due to a bug in the code that runs at the very end?

So, with all this in mind, let me take you back in time for a Python performance story. It was a warm afternoon in June, 2018, and I was very excited to see the results of an analysis that was taking a very long time to run.

What I Was Trying to Do

At the time I was running an automated trading operation and doing lots of market analysis.

I had come across a very cool visualization in a blog authored by Kipp Rogers called Mechanical Markets and wanted to replicate it.

In financial analysis, an "event study" is a type of statistical analysis to examine what happens (in aggregate) before and after some specific kind of catalyst event.

The technique is a clever means of isolating the aggregate trend observed for some category of events from the random or otherwise unrelated noise present in the measurements of each individual event.

For example, for an event study of stock splits,

  1. find a whole bunch of stock splits
  2. each event (stock split) is a row
  3. the columns represent the price of the stock on the days (or other time buckets) before and after the split. If the study window is +/- 7 days relative to the event, the leftmost column might represent the price seven days before the event, and the rightmost column seven days after the event. The middle column would be "day 0" -- the day of the event
  4. normalize the values by row (e.g. percent change from the previous day or similar technique) so all rows can be analyzed on a common scale
  5. aggregate the values by column (e.g. the mean of the "+1 days" column represents the average price change one day after a stock split across the set of stocks included in the analysis)
stock split event study chart

The actual chart from the 1969 Fama, Fisher, Jensen and Roll paper: it shows that on average, the price of a stock slowly increases up to the day of a stock split, after which it stays even

An event study of stock splits was actually the centerpiece of a famous 1969 paper that invented the idea. The paper remains a highly influential pillar of the "Efficient Market Hypothesis" school.

The analysis I found at Mechanical Markets was the same idea, except conducted for a much higher-frequency window of +/- 100 seconds. It also featured a logarithmic x axis, which allowed for very detailed analysis around "time 0" (the first tick is at +/- 10μs) as well as the trend over longer time frames.

high frequency event study from mechanical markets

The high frequency event study I was trying to replicate, published by Mechanical Markets

I wanted to apply this type of analysis to my data. The first prototype I set out to build would use my own btc/usd trades as the study events, and measure the price changes across the whole market in the minutes before and after those trades.

Here's an example of the type high frequency event studies I ended up producing. This one shows the aggregate trend at several exchanges relative to each time a statistical "signal" fired (i.e. was present):

stock split event study chart

My take on high frequency event studies, produced some months after the day I wrote the first Rust prototype. You may notice the first tick comes at 1sec, instead of 10μs - crypto markets aren't there yet

What's So Hard About It?

When using Python libraries like Numpy for CPU-intensive numerical analysis, you want to write "vectorized" code as much as possible. Vectorized, in this sense, means code that utilizes Numpy-style array broadcasting (in systems programming, somewhat confusingly, the same term often refers to something else: whether a program's generated machine code utilizes SIMD instructions).

For instance, in zs = xs + ys, where xs and ys are arrays of the same shape using a library like Numpy, zs is assigned the elementwise addition of xs and ys.

The "vectorized" xs + ys is much faster than the equivalent Python list comprehension [xs[i] + ys[i] for i in range(len(xs))]. For two arrays of length 16,384, the list comprehension (13ms) is approximately 750x slower than the vectorized code (17μs).

There are two main reasons for such a dramatic performance difference:

  1. the C or Fortran code Numpy is using under the hood is highly optimized
  2. operating on the entire array in one statement (xs + ys) greatly reduces the context switching between Python and C/Fortran code, which has its own overhead distinct from the cost of performing the actual calculations

Now, if you think about what calculations are involved in performing an event study-type analysis, you might start to see why it is not a good match for fast Python performance with Numpy and Pandas. For each event, you need to perform calculations relative to the event. That means many trips back-and-forth between the fast C code and the sloooowwwww Python code.

To put it another way: to maximize throughput, what you want is each operation to be on the entire length of the data you have. What an event study requires is many operations on small sections of the data.

The Python Implementation

That June day, I spent more time in Python than I did in Rust. I tried a lot of different tricks. I was working in a Jupyter notebook, using %time and %timeit to get microbenchmarks for all kinds of things.

My trades were in a Pandas DataFrame with 4,369,412 rows -- the global event set. I was performing the analysis on 1,100 of my trades (the study event set).

For each event, I was calculating the volume-weighted average price in 40 time buckets relative to the event time.

The final Python implementation I wrote is below, unaltered from the original, which I found preserved in a Jupyter notebook. Fair warning, it's ugly:

import numpy as np
import pandas as pd

def to_tframe(version, df, trades, start):
    d = {'bid': {}, 'ask': {}}
    cursor = 0
    n = 0
    n_periods = 40
    xs = np.concatenate([periods(n_periods)[:0:-1] * -1, periods(n_periods)]) * 1000000 # mult to convert to nanos
    mask = df['version'] == version
    my_trades = sorted(list(zip(df.loc[mask].index.values.astype(np.int64), df.loc[mask, 'side'], df.loc[mask, 'gid'])))
    idx = trades.index.values.astype(np.int64)
    amts = trades['amount']
    totals = trades['total']
    assert len(idx) == len(amts)
    assert len(idx) == len(totals)
    for tm, side, gid in my_trades:
        print '{} to_tfame {} {} (cursor = {})'.format(time.time() - start, version, n, cursor)
        min_time = tm + xs[0]
        max_time = tm + xs[1]
        if idx[cursor] > min_time:
            print 'warning: idx[cursor] ({}) > min_time ({})'.format(idx[cursor], min_time)
            while idx[cursor] > min_time and cursor > 0:
                cursor -= 1
        else:
            while idx[cursor] < min_time and cursor < len(idx) - 1:
                cursor += 1
        i = 1
        j = cursor
        d[side][gid] = {}
        while i < len(xs) - 1:
            wsum = 0.0
            w = 0.0
            while idx[j] < max_time:
                wsum += totals[j]
                w += amts[j]
                j += 1
            if w > 0.0:
                d[side][gid][xs[i]] = wsum / w
            else:
                d[side][gid][xs[i]] = np.nan
            i += 1
            min_time = max_time
            max_time = tm + xs[i]
        n += 1
    d['bid'] = sort_cols(pd.DataFrame.from_dict(d['bid'], orient='index'))
    d['ask'] = sort_cols(pd.DataFrame.from_dict(d['ask'], orient='index'))
    return d

If you know Python, you may notice this isn't a very "vectorized" implementation. All I can say is, I have distinct memories of attempting several vectorized approaches prior to this one.

Undoubtedly, there are faster ways to do this. Undoubtedly, there are cleaner, more maintainable ways to do this. All I'm claiming about that ambiguously-named, inscrutably implemented function to_tframe is, it's the fastest thing I could come up with June 9, 2018, and that I was, at that time, highly experienced at getting Python numeric code to run fast.

But, to_tframe wasn't fast -- it was glacial. At some point, I realized it was going to be far too slow to iterate on, so I just set it up to keep running and switched tracks.

Later, I went back to check the damage. Here's what I found at end of the output:

4083.80027103 to_tfame 0.3.72-rc.7 1095 (cursor = 1236447)
4084.54927397 to_tfame 0.3.72-rc.7 1096 (cursor = 1236482)
4085.27251816 to_tfame 0.3.72-rc.7 1097 (cursor = 1236487)
4085.98719311 to_tfame 0.3.72-rc.7 1098 (cursor = 1236517)
4086.77543807 to_tfame 0.3.72-rc.7 1099 (cursor = 1236522)
4087.52675414 to_tfame 0.3.72-rc.7 1100 (cursor = 1236528)
4089.26303911 finished

4,089 seconds (or 68min, 9sec) later, it was finally done.

The Rust Implementation

I started a new binary called time-explorer to perform the analysis. My plan was to do the heavy lifting in Rust, then write the aggregations out to CSV, which I could analyze further in Python.

The first task was parsing trades data from a CSV file I was using.

Thanks to the wonderful Serde and CSV crates, that part was a cinch. First I defined a Trade struct that derived Deserialize:

#[derive(Deserialize)]
struct Trade {
    pub time: i64,
    pub price: f64,
    pub amount: f64,
}

Since for this analysis I only cared about those three fields (time, price, and amount), I just left the other columns out, and they were ignored.

If you don't know Serde, what a #[derive(Serialize, Deserialize)] attribute gets you is fast, automatic serialization and deserialization in 16 different formats.

JSON:

let json = r#"{"time":1527811201900505632,"price":7492.279785,"amount":0.048495,"exch":"bits","server_time":0,"side":null}"#;

serde_json::from_str::<Trade>(json) // -> Ok(Trade { time: 1527811201900505632, price: 7492.279785, amount: 0.048495 })

CSV:

let csv = "time,amount,exch,price,server_time,side\n\
           1527811201900505632,0.048495,bits,7492.279785,0,";

let mut csv_reader = csv::Reader::from_reader(csv.as_bytes());

let headers = csv_reader
    .headers()
    .expect("parsing row headers failed")
    .clone();

let mut row = csv::StringRecord::new();

csv_reader.read_record(&mut row);           // -> Ok(true)

row.deserialize::<Trade>(Some(&headers));   // -> Ok(Trade { time: 1527811201900505632, price: 7492.279785, amount: 0.048495 })

Other supported formats include YAML, TOML, x-www-form-urlencoded, MessagePack, BSON, Pickle, CBOR, Bincode, and others. Serde is fantastic, and an awesome part of using Rust.

The CSV crate can parse full rows into structs implementing Deserialize (like Trade), so the parsing code went like this:

let mut trades: Vec<Trade> =
    csv::Reader::from_reader(trades_csv)
        .deserialize()
        .map(|x| x.unwrap())
        .collect();

In a production system, calling .map(|x| x.unwrap()) on each row would be extremely unwise. If one row failed to parse, it would crash the program. In a short-running data munging script, it's not a big deal.

Similarly, in a program like this I generally pepper the whole program with assert! statements to verify the integrity of the data, especially things like array lengths that aren't (currently) encoded in the type system. In this situation, silent data corruption is far worse than a crash.

Once the CSV file was parsed, I sorted the data by time and switched its memory layout from rows to columns, so to speak:

trades.sort_by_key(|k| k.time);

let mut times: Vec<i64> = Vec::with_capacity(trades.len());
let mut amounts: Vec<f64> = Vec::with_capacity(trades.len());
let mut totals: Vec<f64> = Vec::with_capacity(trades.len());

for Trade { time, price, amount } in trades {
    times.push(time);
    totals.push(price * amount);
    amounts.push(amount);
}

Notice that instead of storing a prices vec, I have a totals vec instead. The motivation for this is, to compute the weighted average inside each time bucket, I will be calculating

sum(p * a for p, a in zip(prices[i:j], amounts[i:j])) / sum(amounts[i:j])

... for some range of trades rows i..j. By storing totals rather than prices, I can instead compute the

sum(totals[i:j]) / sum(amounts[i:j])

... potentially saving many multiplications, as the prices[i] * amounts[i] multiplication is only done once per row, and I am likely to hit the same rows many times over the course of the analysis.

I also parsed a separate file with the timestamps for my study events into an Event struct:

#[derive(Deserialize)]
struct Event {
    pub time: i64,
    pub data: Vec<f64>, // holds the computed values for each time bucket for this study event
}

With the study events parsed, it's time do some event studying.

The Hard Part

The nested loops I came up with to do the analysis are admittedly a bit gnarly (it's a relatively faithful port of to_tframe actually):

let mut cursor: usize = 0; // current position in `times`/`totals`/`amounts` vecs

let n: usize =             // [..] number of time buckets. set by cli option.

let buckets: Vec<i64> =    // [..] the time buckets to compute the weighted average for, in
                           //      nanoseconds delta from study event time. buckets[0] is a
                           //      negative value, so the range buckets[0]..buckets[1] is
                           //      a period before the study event occurs (and the earliest bucket).
                           //      the range buckets[buckets.len()-2]..buckets[buckets.len()-1]
                           //      is two positive values, and the furthest bucket into the future
                           //      relative to the study event.

'a: for (i, event) in events.iter_mut().enumerate() {

    let mut min_time: i64 = event.time + buckets[0]; // current "left edge" of time bucket we are computing for
    let mut max_time: i64 = event.time + buckets[1]; // current "right edge" of time bucket we are computing for

    'b: while times[cursor] > min_time && cursor > 0 { cursor -= 1; } // see note below for details

    // find the start of this event's time buckets
    'c: while times[cursor] < min_time {
        if cursor >= times.len() - 1 { break 'a }
        cursor += 1;
    }

    let mut j: usize = cursor;

    'd: for k in 0..(n - 2) {

        let mut wsum: f64 = 0.0; // will store sum(  totals[cursor..j] ) where cursor,j are index bounds of this time bucket
        let mut w   : f64 = 0.0; // will store sum( amounts[cursor..j] ) where cursor,j are index bounds of this time bucket

        'e: while j < times.len() - 1 && times[j] < max_time {
            wsum +=  totals[j];
            w    += amounts[j];
            j    += 1;
        }

        // store weighted average for this time bucket
        event.data[k] = if w > 0.0 { wsum / w } else { std::f64::NAN };

        // increment time bucket
        min_time = max_time;
        max_time = event.time + buckets[k + 2];
    }
}

This implementation relies on both the trades arrays and the study events array being sorted. Since they are sorted, the next study event will always be past the last one. The bounds of the time buckets are computed relative to the study event's time, and will be nearby (and likely in the CPU cache) from the spot the loop has iterated to.

The hardest part of writing this code was the off-by-one errors from all the delicate indexing operations. It took some effort to get right. It's very easy to get confused when you have one array (buckets) that represents the edges of the values in another array, and then you need to find the indices in the second array corresponding to slots with values that are adjacent to the bucket edge values in the first array (phew!).

Nowadays, the unused loop labels (i.e. 'b, 'c, 'd, e) get an unused_labels warning from rustc -- they didn't when I wrote this. Personally, they help me think about the structure of some very dense, nested loops, whether or not the code actually uses them. Maybe that's just me.

While cleaning this up, I spotted a bug. The line 'b: while times[cursor] > min_time && cursor > 0 { cursor -= 1; } wasn't in the original. Without it, it's possible for the previous loop iteration to advance past the earliest time buckets for the next event (i.e. if times[cursor] > min_time at the top of loop 'a. The way the code is designed, the consequences of this are potentially incomplete analysis for some time buckets if two study events are close enough together, or, if cursor had also advanced past max_time, buckets skipped altogether. I added some counters to the loop to diagnose the impact: using a larger data set than when I built the first time-explorer, the bug resulted in 215 skipped time buckets and 730 incomplete data time buckets, out of a total of 899,856 total time buckets. While the impact was quite limited, it's a good reminder to stay sharp.

After completing the analysis, the program writes the output to a CSV file, which is pretty routine. But, the final thing I added was a CLI menu built with the Clap crate:

let args: clap::ArgMatches = clap::App::new("time-explorer")
    .version("0.1")
    .arg(clap::Arg::with_name("trades")
         .long("trades-csv")
         .short("t")
         .help("Path of csv with time (integer nanoseconds timestamp), \
                price (f64), and amount (f64) columns.")
         .takes_value(true)
         .required(true))
    .arg(clap::Arg::with_name("events")
         .long("events-csv")
         .short("e")
         .help("Path of csv file with a time (integer nanoseconds timestamp) as column 0, \
                along with any other metadata columns that will be included in results")
         .takes_value(true)
         .required(true))
    .arg(clap::Arg::with_name("output")
         .long("output-file")
         .short("o")
         .help("Path to save results csv to")
         .takes_value(true)
         .required(true))
    .arg(clap::Arg::with_name("verbose")
         .long("verbose")
         .short("v"))
    .arg(clap::Arg::with_name("n-periods")
         .long("n-periods")
         .short("n")
         .help("Controls how many time buckets are evaluated")
         .takes_value(true)
         .default_value("50"))
    .get_matches();

The --help output:

time-explorer 0.1

USAGE:
    time-explorer [FLAGS] [OPTIONS] --events-csv <events> --output-file <output> --trades-csv <trades>

FLAGS:
    -h, --help       Prints help information
    -V, --version    Prints version information
    -v, --verbose    

OPTIONS:
    -e, --events-csv <events>      Path of csv file with a time (integer nanoseconds timestamp) as column 0, along with
                                   any other metadata columns that will be included in results
    -n, --n-periods <n-periods>    Controls how many time buckets are evaluated [default: 50]
    -o, --output-file <output>     Path to save results csv to
    -t, --trades-csv <trades>      Path of csv with time (integer nanoseconds timestamp), price (f64), and amount (f64)
                                   columns.

Pretty nice! I love how easy that is.

When I ran this analysis for the first time in June 2018, the global trades CSV file had 4,369,412 rows and there were 1,100 events in the study set.

I was only able to find files with larger data sets for both: a trades CSV with 11,836,155 rows (2.7x the original data) and an events CSV with 18,747 rows (17x the original data).

But when I fired up time-explorer v0.1, execution time was still less than one second:

./target/release/time-explorer -t ../var/time-explorer/btc-usd/trades.csv -e ../var/time-explorer/btc-usd/my_trades.csv -o var/teout.csv --verbose 
#    0.00s reading...
#    6.32s calculating...
#    6.32s     No.     0          NaN,      7637.32,      7643.62 ...
#    6.33s     No.   512      7701.29,      7706.13,      7708.57 ...
#    6.34s     No.  1024      7498.03,      7488.73,      7492.05 ...
#    6.35s     No.  1536      7438.60,      7452.81,      7433.43 ...
#    6.36s     No.  2048      7632.81,      7604.25,      7598.79 ...
#    6.37s     No.  2560      7608.89,      7624.22,      7628.91 ...
#    6.38s     No.  3072      7654.30,      7652.55,      7655.41 ...
#    6.39s     No.  3584      7720.13,      7723.60,      7715.98 ...
#    6.39s     No.  4096      7694.29,      7695.07,      7693.11 ...
#    6.40s     No.  4608      7663.73,      7657.47,      7656.74 ...
#    6.41s     No.  5120      7681.48,      7675.49,      7673.05 ...
#    6.42s     No.  5632      7568.57,      7557.18,      7571.07 ...
#    6.43s     No.  6144      7644.18,      7645.71,      7649.90 ...
#    6.43s     No.  6656      7645.69,      7646.39,      7653.64 ...
#    6.44s     No.  7168      7652.45,      7655.90,      7646.96 ...
#    6.45s     No.  7680      7664.94,      7671.65,      7669.66 ...
#    6.45s     No.  8192      7614.52,      7611.79,      7626.52 ...
#    6.47s     No.  8704      7297.09,      7310.70,      7310.43 ...
#    6.48s     No.  9216      7235.21,      7241.13,      7239.70 ...
#    6.51s     No.  9728      6830.22,      6818.21,      6857.73 ...
#    6.52s     No. 10240      6732.01,      6724.39,      6699.79 ...
#    6.53s     No. 10752      6536.09,      6517.65,      6425.04 ...
#    6.55s     No. 11264      6450.31,      6445.63,      6443.42 ...
#    6.58s     No. 11776      6366.25,      6410.68,      6559.79 ...
#    6.60s     No. 12288      6473.03,      6463.63,      6443.70 ...
#    6.61s     No. 12800      6500.16,      6498.74,      6510.93 ...
#    6.62s     No. 13312      6408.23,      6413.47,      6428.04 ...
#    6.64s     No. 13824      6706.12,      6720.87,      6732.83 ...
#    6.65s     No. 14336      6714.93,      6723.90,      6720.23 ...
#    6.66s     No. 14848      6745.45,      6743.03,      6744.32 ...
#    6.68s     No. 15360      5883.81,      5893.97,      5874.75 ...
#    6.69s     No. 15872      5930.25,      5885.04,      5835.80 ...
#    6.70s     No. 16384      6165.85,      6159.55,      6147.91 ...
#    6.71s     No. 16896      6279.92,      6290.18,      6306.97 ...
#    6.72s     No. 17408      6134.21,      6063.72,      6058.22 ...
#    6.73s     No. 17920      6068.31,      6080.82,      6082.38 ...
#    6.75s     No. 18432      6593.90,      6602.86,      6584.98 ...
#    6.76 writing... (n_time_buckets=899856, n_incomplete_buckets=0, n_skipped_buckets=0)
#    7.05 finished.

At 6.32sec it was "calculating" and at 6.76sec it was "writing" -- that leaves 0.44sec spent performing the event study analysis.

4089.26 / 0.44 # -> 9293.78

# python / rust = 9,294x slower!

Guido, call your office!

Conclusion

Was it worth it? This kind of analysis wasn't a run-once thing -- it was something I would iterate on heavily and go on to run hundreds of times in the following months, eventually on much, much larger data sets (hundreds of millions of rows). None of that would have been possible in Python. It just wouldn't have.

Another point is, this wasn't a case of going to the ends of the earth to eek out a tiny bit more performance. This was code I wrote in one sitting, in one afternoon, with a little over one year's experience writing Rust under my belt. And the first pass was just shockingly fast, compared to my expectations.

Finally, forget speed for a second -- which of these programs would you rather grok if you had zero familiarity with either one? As someone who just returned to this code long after I had forgotten all pertinent details, I can tell you it wasn't a difficult choice. The Rust code is far, far easier to follow given its static types, precise scoping rules, and overall explicitness. In contrast, diving into to_tframe to make changes sounds like a major headache to me, and I wrote it!

For me, this was an eye-opening experience. I knew it would be faster, but not that much faster. The amount of math a modern CPU is capable of if it's not sitting there, twiddling its thumbs waiting on memory to arrive is pretty astonishing. It also cemented my view that there were times and places in exploratory data analysis work where Rust could be an extremely valuable tool.

The Python code in the Jupyter notebook had already parsed the CSV, so I don't count the parsing time in my "less than one second" claim. For the record, Pandas reads the same file in around 10sec. With 10 minutes work (knowing what I know today), I was able to get CSV parsing down to 3.64sec, which brought the total runtime down to 4.38sec.

Notes and Further Reading

  • I posted briefly about this at r/rust shortly after it happened
  • Unadulterated, original time-explorer code is here. Cleaned up version used while writing this article is here.
  • Original Python to_tframe is here, and log output here (for the people in the r/rust comments who suggested I was making this all up)