Started by creating a new EC2 instance, an i3.8xlarge. Pretty sweet: 16 physical cores (Xeon E5-2686 v4), 240G RAM, 4x NVMe drives.
Used this command to create a disk array:
sudo mdadm --create /dev/md4 --level=10 --raid-devices=4 /dev/disk/by-id/nvme-Amazon*
I read somewhere that Postgresql does marginally better with ext4 vs xfs, so I use that for the file system:
sudo mkfs.ext4 /dev/md4
I probably left some optimizations on the table with those two operations, but I'm not too worried -- the quadruple NVMe drives ought to make up for it.
Next, I install Postgresql 12. I've never used anything newer than 10 but thought I'd give the old elephant its best shot.
Download compressed CSV from S3, then freakin' xz
takes 20mins to decompress the CSV file. (Next time: zstd
).
I had to convert the data from its raw form into something that makes sense for a Postgresql schema. In Rust terms, I went from this:
#[derive(Deserialize, Debug)] struct PgBuilder<'a> { pub time: u64, pub exch: Exchange, pub ticker: Ticker, pub side: Option<&'a str>, pub price: f64, pub amount: f64, pub server_time: u64, }
... to this:
#[derive(Serialize, Debug)] struct PgRow { pub time: DateTime<Utc>, pub exch: u8, // smallint actually maps to i16 pub base: u8, // but u8 is the type I have the numeric pub quote: u8, // codes in in the markets crate pub amount: f64, pub price: f64, pub side: Option<u8>, pub server_time: Option<DateTime<Utc>>, }
The guts of the Postgresql schema are as follows (full details here):
CREATE TABLE exchanges ( id smallint NOT NULL, symbol character varying(4) NOT NULL, CONSTRAINT exchanges_pkey PRIMARY KEY (id) ); CREATE TABLE currencies ( id smallint NOT NULL, symbol character varying(6) NOT NULL, CONSTRAINT currencies_pkey PRIMARY KEY (id) ); CREATE TABLE trades ( id integer NOT NULL DEFAULT nextval('trades_id_seq'::regclass), "time" timestamp with time zone NOT NULL, exch smallint NOT NULL, base smallint NOT NULL, quote smallint NOT NULL, amount double precision NOT NULL, price double precision NOT NULL, side smallint NULL, -- side has no fk ... bid=1, ask=2 server_time timestamp with time zone NULL, CONSTRAINT trades_pkey PRIMARY KEY (id), CONSTRAINT exch_fk FOREIGN KEY (exch) REFERENCES exchanges (id) MATCH SIMPLE ON UPDATE NO ACTION ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED, CONSTRAINT base_fk FOREIGN KEY (base) REFERENCES currencies (id) MATCH SIMPLE ON UPDATE NO ACTION ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED, CONSTRAINT quote_fk FOREIGN KEY (quote) REFERENCES currencies (id) MATCH SIMPLE ON UPDATE NO ACTION ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED ); CREATE INDEX trades_time_abcdefg ON trades USING btree ("time"); CREATE INDEX trades_base_quote_f6b2eeda ON trades USING btree (base, quote); CREATE INDEX trades_exchange_5d5c6971 ON trades USING btree (exch);
I write some Rust code to convert from the original CSV to something I can load into Postgresql with COPY
. The crux of it:
let bldr: PgBuilder = row.deserialize(Some(&headers)).map_err(|e| format!("deser failed: {}", e))?; let PgBuilder { time, exch, ticker, side, price, amount, server_time } = bldr; let time = nanos_to_utc(time); let exch = u8::from(exch); // the u8::from implementations are in markets crate let base = u8::from(ticker.base); let quote = u8::from(ticker.quote); let side: Option<u8> = match side { Some("bid") => Some(1), Some("ask") => Some(2), _ => None, }; let server_time = match server_time { 0 => None, x => Some(nanos_to_utc(x)), }; let pg_row = PgRow { time, exch, base, quote, amount, price, side, server_time }; wtr.serialize(&pg_row).map_err(|e| format!("serializing PgRow to csv failed: {}", e))?;
Conversion program runs in 32min, 49sec. The Postgresql-friendly CSV is 74G.
Here's loading the data with COPY
:
psql -d pipelines
pipelines=# \timing on
Timing is on.
pipelines=# copy trades ("time", exch, base, "quote", amount, price, side, server_time) from '/data/pipelines/trades-pg.csv' csv header;
That's gonna take a while. Time to step away from the computer.
The Easy Part
To be honest, I was dreading writing SQL after a decent hiatus, but it didn't turn out to be too bad. After fiddling around in psql
for a while, I come up with this:
SELECT hr, bmex / gdax AS ratio FROM (SELECT hr, max(gdax) "gdax", -- max(..) necessary to grab the non-NULL row from two returned max(bmex) "bmex" -- rows for each hr FROM (SELECT hr, (CASE WHEN exch=3 THEN wt_avg END) "gdax", (CASE WHEN exch=6 THEN wt_avg END) "bmex" FROM (SELECT hr, exch, w_sum / sum_w AS wt_avg FROM (SELECT date_trunc('hour', "time") AS hr, exch, sum(price * amount) AS w_sum, sum(amount) AS sum_w FROM trades WHERE base=1 -- btc=1 usd=100 AND quote=100 AND (exch=3 OR exch=6) -- gdax=3, bmex=6 GROUP BY 1, 2 ORDER BY 1, 2) a) b) c GROUP BY hr) d;
It's ugly, but I think it does what I want for the hourly "easy" query.
At some point while I'm asleep, Postgresql finally finished its COPY
, one of many "what is it actually even doing that could take so long?" moments I've had with this database:
COPY 908204336
Time: 30740053.419 ms (08:32:20.053)
While COPY
was taking forever, I had figured I would do some Postgresql tuning before running the queries. I am not going to pretend to know all the ins and outs of postgresql.conf
, I went to a config generator site and got these recommended settings, which I tweaked slightly:
max_connections = 20 shared_buffers = 120GB effective_cache_size = 180GB maintenance_work_mem = 2GB checkpoint_completion_target = 0.9 wal_buffers = 16MB default_statistics_target = 500 random_page_cost = 1.1 effective_io_concurrency = 200 work_mem = 96MB min_wal_size = 4GB max_wal_size = 16GB max_worker_processes = 32 max_parallel_workers_per_gather = 32 max_parallel_workers = 32 max_parallel_maintenance_workers = 4
Now that the data is loaded, I restart the server to get all the new settings in place. Then I run vacuum analyze trades
for good measure. Finally, it's time to see where things stand:
pipelines=# SELECT hr,
bmex / gdax AS ratio
FROM
(SELECT hr,
max(gdax) "gdax", -- max(..) necessary to grab the non-NULL row from two returned
max(bmex) "bmex" -- rows for each hr
FROM
(SELECT hr,
(CASE WHEN exch=3 THEN wt_avg END) "gdax",
(CASE WHEN exch=6 THEN wt_avg END) "bmex"
FROM
(SELECT hr,
exch,
w_sum / sum_w AS wt_avg
FROM
(SELECT date_trunc('hour', "time") AS hr,
exch,
sum(price * amount) AS w_sum,
sum(amount) AS sum_w
FROM trades
WHERE base=1 -- btc=1 usd=100
AND quote=100
AND (exch=3 OR exch=6) -- gdax=3, bmex=6
GROUP BY 1, 2
ORDER BY 1, 2) a) b) c
GROUP BY hr) d;
Time: 37325.751 ms (00:37.326)
[spits out drink] ... Wait, what!? 37 seconds? That's actually fast!
I run it again:
Time: 20496.159 ms (00:20.496)
Holy, moly:
Query | Easy |
Hardware | i3.8xlarge |
Notes | warm cache; fastest observed time |
Elapsed time | 20.5sec |
Throughput (rows/sec) | 44.3 million |
Peak memory usage | 94.8G |
Lets flush the cache, see how she handles that!
sync sudo sh -c 'echo 3 > /proc/sys/vm/drop_caches'
iostat --human
while running the same query cold:
Device tps kB_read/s kB_wrtn/s kB_read kB_wrtn
nvme0n1 5466.40 493.7M 0.0k 2.4G 0.0k
nvme1n1 5924.80 524.9M 0.0k 2.6G 0.0k
nvme2n1 6514.60 594.1M 0.0k 2.9G 0.0k
nvme3n1 6257.00 562.9M 0.0k 2.7G 0.0k
md4 24422.20 2.4G 0.0k 11.8G 0.0k
The result:
Time: 54385.921 ms (00:54.385)
Not bad at all.
Query | Easy |
Hardware | i3.8xlarge |
Notes | cold cache |
Elapsed time | 54sec |
Throughput (rows/sec) | 16.7 million |
Peak memory usage | 94.8G |
The Hard Part
True confession: I came up with the "hard" query after Postgresql did so well on the "easy" one.
Honestly, I was shocked. I have to admit, it is impressive to see time-based aggregation queries on a 900 million row table come back in less than 30 seconds, since Postgresql isn't even designed for this kind of work.
But as I started working on something more difficult, I ran into another problem: one step off the happy path and you fall straight down to Hell. I found it impossible find any ground in between "computes in 30 seconds" and "never finishes."
This query looked good on a small subset of the data, but then crashed and burned in epic fashion on the whole table. "Out of memory for query result" -- is that even possible with Postgresql? I could swear I've seen it write intermediate results to disk to git 'er done on like a 99¢ Digital Ocean droplet with 1kB of RAM, for crying out loud.
select "time", (bmex_wsum_5 / bmex_w_5) as bmex_5min, (gdax_wsum_5 / gdax_w_5) as gdax_5min, (bmex_wsum_5 / bmex_w_5) / (gdax_wsum_5 / gdax_w_5 ) as r5, (bmex_wsum_15 / bmex_w_15) / (gdax_wsum_15 / gdax_w_15) as r15, (bmex_wsum_60 / bmex_w_60) / (gdax_wsum_60 / gdax_w_60) as r60 from (select "time", sum(price * amount) filter(where "time" > "time" - interval '5 minutes' and "time" <= "time" and exch=3) over( order by "time" ) "gdax_wsum_5", sum(price * amount) filter(where "time" > "time" - interval '15 minutes' and "time" <= "time" and exch=3) over( order by "time" ) "gdax_wsum_15", sum(price * amount) filter(where "time" > "time" - interval '60 minutes' and "time" <= "time" and exch=3) over( order by "time" ) "gdax_wsum_60", sum(amount) filter(where "time" > "time" - interval '5 minutes' and "time" <= "time" and exch=3) over( order by "time" ) "gdax_w_5", sum(amount) filter(where "time" > "time" - interval '15 minutes' and "time" <= "time" and exch=3) over( order by "time" ) "gdax_w_15", sum(amount) filter(where "time" > "time" - interval '60 minutes' and "time" <= "time" and exch=3) over( order by "time" ) "gdax_w_60", sum(price * amount) filter(where "time" > "time" - interval '5 minutes' and "time" <= "time" and exch=6) over( order by "time" ) "bmex_wsum_5", sum(price * amount) filter(where "time" > "time" - interval '15 minutes' and "time" <= "time" and exch=6) over( order by "time" ) "bmex_wsum_15", sum(price * amount) filter(where "time" > "time" - interval '60 minutes' and "time" <= "time" and exch=6) over( order by "time" ) "bmex_wsum_60", sum(amount) filter(where "time" > "time" - interval '5 minutes' and "time" <= "time" and exch=6) over( order by "time" ) "bmex_w_5", sum(amount) filter(where "time" > "time" - interval '15 minutes' and "time" <= "time" and exch=6) over( order by "time" ) "bmex_w_15", sum(amount) filter(where "time" > "time" - interval '60 minutes' and "time" <= "time" and exch=6) over( order by "time" ) "bmex_w_60" from trades order by "time") a ;
It did not end well:
pipelines-# select
pipelines-# "time",
pipelines-# (bmex_wsum_5 / bmex_w_5) as bmex_5min,
pipelines-#
pipelines-# # ..snip
pipelines-#
pipelines(# from trades
pipelines(# order by "time") a
pipelines-# ;
out of memory for query result
Time: 2372367.480 ms (39:32.367)
By constraining the above query to only the 5-minute lookback, I was able to get it to complete in 20 minutes:
Query | Hard |
Hardware | i3.8xlarge |
Notes | Incomplete: 5-minute lookback only |
Elapsed time | 19 min, 21sec |
Throughput (rows/sec) | 0.8 million |
Peak memory usage | 94.8G |
This one couldn't even do the first two rows:
with seconds as ( select tm::timestamptz from generate_series( (select date_trunc('second', min("time")) from trades limit 1), (select date_trunc('second', max("time")) from trades limit 1), interval '1 second' ) as tm ) select tm, (bmex_wsum_5 / bmex_w_5) / (gdax_wsum_5 / gdax_w_5 ) as r5, (bmex_wsum_15 / bmex_w_15) / (gdax_wsum_15 / gdax_w_15) as r15, (bmex_wsum_60 / bmex_w_60) / (gdax_wsum_60 / gdax_w_60) as r60 from (select seconds.tm, sum(price * amount) filter(where "time" > seconds.tm - interval '5 minutes' and "time" <= seconds.tm and exch=3) over( order by "time" ) "gdax_wsum_5", sum(price * amount) filter(where "time" > seconds.tm - interval '15 minutes' and "time" <= seconds.tm and exch=3) over( order by "time" ) "gdax_wsum_15", sum(price * amount) filter(where "time" > seconds.tm - interval '60 minutes' and "time" <= seconds.tm and exch=3) over( order by "time" ) "gdax_wsum_60", sum(amount) filter(where "time" > seconds.tm - interval '5 minutes' and "time" <= seconds.tm and exch=3) over( order by "time" ) "gdax_w_5", sum(amount) filter(where "time" > seconds.tm - interval '15 minutes' and "time" <= seconds.tm and exch=3) over( order by "time" ) "gdax_w_15", sum(amount) filter(where "time" > seconds.tm - interval '60 minutes' and "time" <= seconds.tm and exch=3) over( order by "time" ) "gdax_w_60", sum(price * amount) filter(where "time" > seconds.tm - interval '5 minutes' and "time" <= seconds.tm and exch=6) over( order by "time" ) "bmex_wsum_5", sum(price * amount) filter(where "time" > seconds.tm - interval '15 minutes' and "time" <= seconds.tm and exch=6) over( order by "time" ) "bmex_wsum_15", sum(price * amount) filter(where "time" > seconds.tm - interval '60 minutes' and "time" <= seconds.tm and exch=6) over( order by "time" ) "bmex_wsum_60", sum(amount) filter(where "time" > seconds.tm - interval '5 minutes' and "time" <= seconds.tm and exch=6) over( order by "time" ) "bmex_w_5", sum(amount) filter(where "time" > seconds.tm - interval '15 minutes' and "time" <= seconds.tm and exch=6) over( order by "time" ) "bmex_w_15", sum(amount) filter(where "time" > seconds.tm - interval '60 minutes' and "time" <= seconds.tm and exch=6) over( order by "time" ) "bmex_w_60" from seconds, trades ) a limit 2;
[buzzer sound]
ERROR: canceling statement due to user request
Time: 7281544.083 ms (02:01:21.544)
And here is a query I let run for many, many hours before pulling the plug:
select tm, bmex_d15_wt_avg_price / gdax_d15_wt_avg_price as ratio_15, bmex_d60_wt_avg_price / gdax_d60_wt_avg_price as ratio_60 from ( select tm, sum(gdax_d15.price * gdax_d15.amount) / sum(gdax_d15.amount) as gdax_d15_wt_avg_price, sum(bmex_d15.price * bmex_d15.amount) / sum(bmex_d15.amount) as bmex_d15_wt_avg_price, sum(gdax_d60.price * gdax_d60.amount) / sum(gdax_d60.amount) as gdax_d60_wt_avg_price, sum(bmex_d60.price * bmex_d60.amount) / sum(bmex_d60.amount) as bmex_d60_wt_avg_price from (select tm::timestamptz from generate_series( (select date_trunc('second', min("time")) from trades), (select date_trunc('second', max("time")) from trades), interval '1 second' ) as tm ) a left join trades gdax_d15 on (gdax_d15.time between a.tm - '15 minutes'::interval and a.tm and gdax_d15.base=1 and gdax_d15.quote=100 and gdax_d15.exch=3) left join trades bmex_d15 on (bmex_d15.time between a.tm - '15 minutes'::interval and a.tm and bmex_d15.base=1 and bmex_d15.quote=100 and bmex_d15.exch=6) left join trades gdax_d60 on (gdax_d60.time between a.tm - '60 minutes'::interval and a.tm and gdax_d60.base=1 and gdax_d60.quote=100 and gdax_d60.exch=3) left join trades bmex_d60 on (bmex_d60.time between a.tm - '60 minutes'::interval and a.tm and bmex_d60.base=1 and bmex_d60.quote=100 and bmex_d60.exch=6) group by a.tm order by a.tm ) b;
Womp womp:
Cancel request sent
ERROR: canceling statement due to user request
Time: 48721938.810 ms (13:32:01.939)
Notes and Further Reading
- If you're feeling morbid, you can inspect the numerous attempts I made to get the hard query to work here
- The Postgresql docs for window functions: it would be awesome if there were some examples