Just Use Postgresql™

Started by creating a new EC2 instance, an i3.8xlarge. Pretty sweet: 16 physical cores (Xeon E5-2686 v4), 240G RAM, 4x NVMe drives.

Used this command to create a disk array:

sudo mdadm --create /dev/md4 --level=10 --raid-devices=4 /dev/disk/by-id/nvme-Amazon*

I read somewhere that Postgresql does marginally better with ext4 vs xfs, so I use that for the file system:

sudo mkfs.ext4 /dev/md4

I probably left some optimizations on the table with those two operations, but I'm not too worried -- the quadruple NVMe drives ought to make up for it.

Next, I install Postgresql 12. I've never used anything newer than 10 but thought I'd give the old elephant its best shot.

Download compressed CSV from S3, then freakin' xz takes 20mins to decompress the CSV file. (Next time: zstd).

I had to convert the data from its raw form into something that makes sense for a Postgresql schema. In Rust terms, I went from this:

#[derive(Deserialize, Debug)]
struct PgBuilder<'a> {
    pub time: u64,
    pub exch: Exchange,
    pub ticker: Ticker,
    pub side: Option<&'a str>,
    pub price: f64,
    pub amount: f64,
    pub server_time: u64,
}

... to this:

#[derive(Serialize, Debug)]
struct PgRow {
    pub time: DateTime<Utc>,
    pub exch: u8,               // smallint actually maps to i16
    pub base: u8,               // but u8 is the type I have the numeric
    pub quote: u8,              // codes in in the markets crate
    pub amount: f64,
    pub price: f64,
    pub side: Option<u8>,
    pub server_time: Option<DateTime<Utc>>,
}

The guts of the Postgresql schema are as follows (full details here):

CREATE TABLE exchanges
(

  id smallint NOT NULL,
  symbol character varying(4) NOT NULL,

  CONSTRAINT exchanges_pkey PRIMARY KEY (id)
);

CREATE TABLE currencies
(
  id smallint NOT NULL,
  symbol character varying(6) NOT NULL,

  CONSTRAINT currencies_pkey PRIMARY KEY (id)
);

CREATE TABLE trades
(
  id integer NOT NULL DEFAULT nextval('trades_id_seq'::regclass),
  "time" timestamp with time zone NOT NULL,
  exch smallint NOT NULL,
  base smallint NOT NULL,
  quote smallint NOT NULL,
  amount double precision NOT NULL,
  price double precision NOT NULL,
  side smallint NULL,                           -- side has no fk ... bid=1, ask=2
  server_time timestamp with time zone NULL,

  CONSTRAINT trades_pkey PRIMARY KEY (id),

  CONSTRAINT exch_fk FOREIGN KEY (exch)
      REFERENCES exchanges (id) MATCH SIMPLE
      ON UPDATE NO ACTION ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED,

  CONSTRAINT base_fk FOREIGN KEY (base)
      REFERENCES currencies (id) MATCH SIMPLE
      ON UPDATE NO ACTION ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED,

  CONSTRAINT quote_fk FOREIGN KEY (quote)
      REFERENCES currencies (id) MATCH SIMPLE
      ON UPDATE NO ACTION ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED
);

CREATE INDEX trades_time_abcdefg
  ON trades
  USING btree
  ("time");

CREATE INDEX trades_base_quote_f6b2eeda
  ON trades
  USING btree
  (base, quote);

CREATE INDEX trades_exchange_5d5c6971
  ON trades
  USING btree
  (exch);

I write some Rust code to convert from the original CSV to something I can load into Postgresql with COPY. The crux of it:

let bldr: PgBuilder = row.deserialize(Some(&headers)).map_err(|e| format!("deser failed: {}", e))?;
let PgBuilder { time, exch, ticker, side, price, amount, server_time } = bldr;
let time = nanos_to_utc(time);
let exch = u8::from(exch);          // the u8::from implementations are in markets crate
let base = u8::from(ticker.base);
let quote = u8::from(ticker.quote);
let side: Option<u8> = match side {
    Some("bid") => Some(1),
    Some("ask") => Some(2),
    _ => None,
};
let server_time = match server_time {
    0 => None,
    x => Some(nanos_to_utc(x)),
};
let pg_row = PgRow { time, exch, base, quote, amount, price, side, server_time };
wtr.serialize(&pg_row).map_err(|e| format!("serializing PgRow to csv failed: {}", e))?;

Conversion program runs in 32min, 49sec. The Postgresql-friendly CSV is 74G.

Here's loading the data with COPY:

psql -d pipelines
pipelines=# \timing on
Timing is on.
pipelines=# copy trades ("time", exch, base, "quote", amount, price, side, server_time) from '/data/pipelines/trades-pg.csv' csv header;

That's gonna take a while. Time to step away from the computer.

The Easy Part

To be honest, I was dreading writing SQL after a decent hiatus, but it didn't turn out to be too bad. After fiddling around in psql for a while, I come up with this:

SELECT hr,
       bmex / gdax AS ratio
FROM
  (SELECT hr,
          max(gdax) "gdax", -- max(..) necessary to grab the non-NULL row from two returned
          max(bmex) "bmex"  -- rows for each hr
   FROM
     (SELECT hr,
             (CASE WHEN exch=3 THEN wt_avg END) "gdax",
             (CASE WHEN exch=6 THEN wt_avg END) "bmex"
      FROM
        (SELECT hr,
                exch,
                w_sum / sum_w AS wt_avg
         FROM
           (SELECT date_trunc('hour', "time") AS hr,
                   exch,
                   sum(price * amount) AS w_sum,
                   sum(amount) AS sum_w
            FROM trades
            WHERE base=1             -- btc=1 usd=100
              AND quote=100
              AND (exch=3 OR exch=6) -- gdax=3, bmex=6
            GROUP BY 1, 2
            ORDER BY 1, 2) a) b) c
   GROUP BY hr) d;

It's ugly, but I think it does what I want for the hourly "easy" query.

At some point while I'm asleep, Postgresql finally finished its COPY, one of many "what is it actually even doing that could take so long?" moments I've had with this database:

COPY 908204336
Time: 30740053.419 ms (08:32:20.053)

While COPY was taking forever, I had figured I would do some Postgresql tuning before running the queries. I am not going to pretend to know all the ins and outs of postgresql.conf, I went to a config generator site and got these recommended settings, which I tweaked slightly:

max_connections = 20
shared_buffers = 120GB
effective_cache_size = 180GB
maintenance_work_mem = 2GB
checkpoint_completion_target = 0.9
wal_buffers = 16MB
default_statistics_target = 500
random_page_cost = 1.1
effective_io_concurrency = 200
work_mem = 96MB
min_wal_size = 4GB
max_wal_size = 16GB
max_worker_processes = 32
max_parallel_workers_per_gather = 32
max_parallel_workers = 32
max_parallel_maintenance_workers = 4

Now that the data is loaded, I restart the server to get all the new settings in place. Then I run vacuum analyze trades for good measure. Finally, it's time to see where things stand:

pipelines=# SELECT hr,
       bmex / gdax AS ratio
FROM
  (SELECT hr,
          max(gdax) "gdax", -- max(..) necessary to grab the non-NULL row from two returned
          max(bmex) "bmex"  -- rows for each hr
   FROM
     (SELECT hr,
             (CASE WHEN exch=3 THEN wt_avg END) "gdax",
             (CASE WHEN exch=6 THEN wt_avg END) "bmex"
      FROM
        (SELECT hr,
                exch,
                w_sum / sum_w AS wt_avg
         FROM
           (SELECT date_trunc('hour', "time") AS hr,
                   exch,
                   sum(price * amount) AS w_sum,
                   sum(amount) AS sum_w
            FROM trades
            WHERE base=1             -- btc=1 usd=100
              AND quote=100
              AND (exch=3 OR exch=6) -- gdax=3, bmex=6
            GROUP BY 1, 2
            ORDER BY 1, 2) a) b) c
   GROUP BY hr) d;
Time: 37325.751 ms (00:37.326)

[spits out drink] ... Wait, what!? 37 seconds? That's actually fast!

I run it again:

Time: 20496.159 ms (00:20.496)

Holy, moly:

Query Easy
Hardware i3.8xlarge
Notes warm cache; fastest observed time
Elapsed time 20.5sec
Throughput (rows/sec) 44.3 million
Peak memory usage 94.8G

Lets flush the cache, see how she handles that!

sync
sudo sh -c 'echo 3 > /proc/sys/vm/drop_caches'

iostat --human while running the same query cold:

Device             tps    kB_read/s    kB_wrtn/s    kB_read    kB_wrtn
nvme0n1        5466.40       493.7M         0.0k       2.4G       0.0k
nvme1n1        5924.80       524.9M         0.0k       2.6G       0.0k
nvme2n1        6514.60       594.1M         0.0k       2.9G       0.0k
nvme3n1        6257.00       562.9M         0.0k       2.7G       0.0k
md4           24422.20         2.4G         0.0k      11.8G       0.0k

The result:

Time: 54385.921 ms (00:54.385)

Not bad at all.

Query Easy
Hardware i3.8xlarge
Notes cold cache
Elapsed time 54sec
Throughput (rows/sec) 16.7 million
Peak memory usage 94.8G

The Hard Part

True confession: I came up with the "hard" query after Postgresql did so well on the "easy" one.

Honestly, I was shocked. I have to admit, it is impressive to see time-based aggregation queries on a 900 million row table come back in less than 30 seconds, since Postgresql isn't even designed for this kind of work.

But as I started working on something more difficult, I ran into another problem: one step off the happy path and you fall straight down to Hell. I found it impossible find any ground in between "computes in 30 seconds" and "never finishes."

This query looked good on a small subset of the data, but then crashed and burned in epic fashion on the whole table. "Out of memory for query result" -- is that even possible with Postgresql? I could swear I've seen it write intermediate results to disk to git 'er done on like a 99¢ Digital Ocean droplet with 1kB of RAM, for crying out loud.

pipelines-# select
pipelines-#     "time",
pipelines-#     (bmex_wsum_5  / bmex_w_5) as bmex_5min,
pipelines-#     (gdax_wsum_5  / gdax_w_5) as gdax_5min,
pipelines-#     (bmex_wsum_5  / bmex_w_5)  / (gdax_wsum_5  / gdax_w_5 ) as r5,
pipelines-#     (bmex_wsum_15 / bmex_w_15) / (gdax_wsum_15 / gdax_w_15) as r15,
pipelines-#     (bmex_wsum_60 / bmex_w_60) / (gdax_wsum_60 / gdax_w_60) as r60
pipelines-# from
pipelines-#     (select
pipelines(#         "time",
pipelines(#
pipelines(#         sum(price * amount) filter(where "time" > "time" - interval  '5 minutes' and "time" <= "time" and exch=3) over( order by "time" ) "gdax_wsum_5",
pipelines(#         sum(price * amount) filter(where "time" > "time" - interval '15 minutes' and "time" <= "time" and exch=3) over( order by "time" ) "gdax_wsum_15",
pipelines(#         sum(price * amount) filter(where "time" > "time" - interval '60 minutes' and "time" <= "time" and exch=3) over( order by "time" ) "gdax_wsum_60",
pipelines(#
pipelines(#         sum(amount) filter(where "time" > "time" - interval  '5 minutes' and "time" <= "time" and exch=3) over( order by "time" ) "gdax_w_5",
pipelines(#         sum(amount) filter(where "time" > "time" - interval '15 minutes' and "time" <= "time" and exch=3) over( order by "time" ) "gdax_w_15",
pipelines(#         sum(amount) filter(where "time" > "time" - interval '60 minutes' and "time" <= "time" and exch=3) over( order by "time" ) "gdax_w_60",
pipelines(#
pipelines(#         sum(price * amount) filter(where "time" > "time" - interval  '5 minutes' and "time" <= "time" and exch=6) over( order by "time" ) "bmex_wsum_5",
pipelines(#         sum(price * amount) filter(where "time" > "time" - interval '15 minutes' and "time" <= "time" and exch=6) over( order by "time" ) "bmex_wsum_15",
pipelines(#         sum(price * amount) filter(where "time" > "time" - interval '60 minutes' and "time" <= "time" and exch=6) over( order by "time" ) "bmex_wsum_60",
pipelines(#
pipelines(#         sum(amount) filter(where "time" > "time" - interval  '5 minutes' and "time" <= "time" and exch=6) over( order by "time" ) "bmex_w_5",
pipelines(#         sum(amount) filter(where "time" > "time" - interval '15 minutes' and "time" <= "time" and exch=6) over( order by "time" ) "bmex_w_15",
pipelines(#         sum(amount) filter(where "time" > "time" - interval '60 minutes' and "time" <= "time" and exch=6) over( order by "time" ) "bmex_w_60"
pipelines(#
pipelines(#     from trades
pipelines(#     order by "time") a
pipelines-# ;
out of memory for query result
Time: 2372367.480 ms (39:32.367)

By constraining the above query to only the 5-minute lookback, I was able to get it to complete in 20 minutes:

Query Hard
Hardware i3.8xlarge
Notes Incomplete: 5-minute lookback only
Elapsed time 19 min, 21sec
Throughput (rows/sec) 0.8 million
Peak memory usage 94.8G

This one couldn't even do the first two rows:

with seconds as (
    select tm::timestamptz from generate_series(
        (select date_trunc('second', min("time")) from trades limit 1),
        (select date_trunc('second', max("time")) from trades limit 1),
        interval '1 second'
    ) as tm
)
select 
tm,
(bmex_wsum_5  / bmex_w_5)  / (gdax_wsum_5  / gdax_w_5 ) as r5,
(bmex_wsum_15 / bmex_w_15) / (gdax_wsum_15 / gdax_w_15) as r15,
(bmex_wsum_60 / bmex_w_60) / (gdax_wsum_60 / gdax_w_60) as r60
from
    (select
        seconds.tm,

        sum(price * amount) filter(where "time" > seconds.tm - interval  '5 minutes' and "time" <= seconds.tm and exch=3) over( order by "time" ) "gdax_wsum_5",
        sum(price * amount) filter(where "time" > seconds.tm - interval '15 minutes' and "time" <= seconds.tm and exch=3) over( order by "time" ) "gdax_wsum_15",
        sum(price * amount) filter(where "time" > seconds.tm - interval '60 minutes' and "time" <= seconds.tm and exch=3) over( order by "time" ) "gdax_wsum_60",

        sum(amount) filter(where         "time" > seconds.tm - interval  '5 minutes' and "time" <= seconds.tm and exch=3) over( order by "time" ) "gdax_w_5",
        sum(amount) filter(where         "time" > seconds.tm - interval '15 minutes' and "time" <= seconds.tm and exch=3) over( order by "time" ) "gdax_w_15",
        sum(amount) filter(where         "time" > seconds.tm - interval '60 minutes' and "time" <= seconds.tm and exch=3) over( order by "time" ) "gdax_w_60",

        sum(price * amount) filter(where "time" > seconds.tm - interval  '5 minutes' and "time" <= seconds.tm and exch=6) over( order by "time" ) "bmex_wsum_5",
        sum(price * amount) filter(where "time" > seconds.tm - interval '15 minutes' and "time" <= seconds.tm and exch=6) over( order by "time" ) "bmex_wsum_15",
        sum(price * amount) filter(where "time" > seconds.tm - interval '60 minutes' and "time" <= seconds.tm and exch=6) over( order by "time" ) "bmex_wsum_60",

        sum(amount) filter(where         "time" > seconds.tm - interval  '5 minutes' and "time" <= seconds.tm and exch=6) over( order by "time" ) "bmex_w_5",
        sum(amount) filter(where         "time" > seconds.tm - interval '15 minutes' and "time" <= seconds.tm and exch=6) over( order by "time" ) "bmex_w_15",
        sum(amount) filter(where         "time" > seconds.tm - interval '60 minutes' and "time" <= seconds.tm and exch=6) over( order by "time" ) "bmex_w_60"

    from seconds, trades
) a
limit 2;

[buzzer sound]

ERROR:  canceling statement due to user request
Time: 7281544.083 ms (02:01:21.544)

And here is a query I let run for many, many hours before pulling the plug:

select
    tm,
    bmex_d15_wt_avg_price / gdax_d15_wt_avg_price as ratio_15,
    bmex_d60_wt_avg_price / gdax_d60_wt_avg_price as ratio_60
from (
    select tm,
           sum(gdax_d15.price * gdax_d15.amount) / sum(gdax_d15.amount) as gdax_d15_wt_avg_price,
           sum(bmex_d15.price * bmex_d15.amount) / sum(bmex_d15.amount) as bmex_d15_wt_avg_price,
           sum(gdax_d60.price * gdax_d60.amount) / sum(gdax_d60.amount) as gdax_d60_wt_avg_price,
           sum(bmex_d60.price * bmex_d60.amount) / sum(bmex_d60.amount) as bmex_d60_wt_avg_price
    from
        (select tm::timestamptz from generate_series(
            (select date_trunc('second', min("time")) from trades),
            (select date_trunc('second', max("time")) from trades),
            interval '1 second'
        ) as tm
    ) a

    left join trades gdax_d15 on (gdax_d15.time between a.tm - '15 minutes'::interval and a.tm and gdax_d15.base=1 and gdax_d15.quote=100 and gdax_d15.exch=3)
    left join trades bmex_d15 on (bmex_d15.time between a.tm - '15 minutes'::interval and a.tm and bmex_d15.base=1 and bmex_d15.quote=100 and bmex_d15.exch=6)
    left join trades gdax_d60 on (gdax_d60.time between a.tm - '60 minutes'::interval and a.tm and gdax_d60.base=1 and gdax_d60.quote=100 and gdax_d60.exch=3)
    left join trades bmex_d60 on (bmex_d60.time between a.tm - '60 minutes'::interval and a.tm and bmex_d60.base=1 and bmex_d60.quote=100 and bmex_d60.exch=6)
    group by a.tm
    order by a.tm
) b;

Womp womp:

Cancel request sent
ERROR:  canceling statement due to user request
Time: 48721938.810 ms (13:32:01.939)

Notes and Further Reading

  • If you're feeling morbid, you can inspect the numerous attempts I made to get the hard query to work here
  • The Postgresql docs for window functions: it would be awesome if there were some examples