What I Like: Raw Event Loops

  • Event loops, what are they?
  • Juggling partially completed tasks
  • Get off my lawn, async/await!

"Event loop" is a term used far more frequently than it is defined. When I say event loop, I mean a loop that runs "forever" waiting for new data, and acting on that data as it arrives.

I started coding event loops in 2017. At the time, it was in Python, with zmq pub/sub sockets to connect several processes forked on startup:

import zmq

ctx = zmq.Context()
socket = ctx.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:9999')

while True:
    try:
        msg = socket.recv(flags=zmq.NOBLOCK)

        # handle msg...

    except zmq.error.Again:
        pass

What people don't like about this style of coding, I think, is the need to track the state of partially completed tasks across iterations of the loop. E.g. for a HTTP server, imagine that only some of the bytes from a given request have arrived over the socket:

let n_bytes_read = socket.read(&mut buf[..])?; // -> b"GET /partially-" is what ends up in `buf`

To be able to continue to respond to all the other things that are going on in the event loop, partially-completed tasks must be stowed away, along with the state of their completion, in order to be able to resume work on the task again when new data arrives:

// storing the partial request - pseudocode. assumes something like
//
// struct Pending {
//     n_bytes_read: usize,
//     data: [u8; 1024],
// }
//
// let mut pending_requests: std::collections::HashMap<usize, Pending>;
//
let pending = pending_requests.entry(request_key).or_default();
pending.n_bytes_read = 15;
pending.data.extend_from_slice(&buf[..n_bytes_read]);
if is_finished_request(&pending.data[..n_bytes_read]) { // false
    // ...
}

Later, when the rest of the request arrives, pick up right where you left off:

// new data!
let pending = pending_requests.entry(request_key).or_default();
let n_bytes_read = socket.read(&mut pending.buf[pending.n_bytes_read..])?;
pending.n_bytes_read += n_bytes_read;
if is_finished_request(&pending.data[..n_bytes_read]) { // *now* it's true
    // process finished request and clean up
}

At some point, programming in this style was difficult and confusing for me. But I've been doing it so long now that I'm very comfortable it. I think in event loops.

I've had lots of success, in terms of correctness, rapid development, good performance, and robustness (i.e. extremely rare crashes) using this "raw event loop" style in Rust over the past few years.

Here's the basic template I start with:

use std::time::*;
use std::thread::{self, JoinHandle};
use std::io;
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};

struct Last {
    periodic_thing_a: Instant,
    periodic_thing_b: Instant,

    // ... many more of these sometimes
}

fn event_loop(
    // initial data,
    // config,
    // rx,
    // etc
    term: Arc<AtomicBool>, // for sending exit signal
    root: &slog::Logger,
) -> Result<JoinHandle<()>, io::Error> {

    let logger = root.new(o!("thread" => "thread name"));

    thread::Builder::new().name("thread name".to_string()).spawn(move || {
        // initial setup

        let mut last: Last = Default::default();
        let mut loop_time: Instant;

        loop {
            loop_time = Instant::now();

            match rx.try_recv() { // or `poll.poll(&mut events, None)`, `socket.recv()`, etc.
                // handle incoming data...
            }

            // perform periodic tasks periodically...

            if loop_time.saturating_duration_since(last.periodic_thing_a) { // I studiously avoid "naked" subtractions between `Instant`s,
                                                                            // i.e. `loop_time - last.periodic_thing_a` as I've
                                                                            //  had more crashes from that than anything else in Rust
                // < do periodic thing a > ...

                last.periodic_thing_a = loop_time;
            }
            
            if term.load(Ordering::Relaxed) { break } // main thread or `Drop` impl can `term.store(true, Ordering::Relaxed)`
                                                      // when it's time to exit
        }
    })
}

One key aspect of all this, which might explain why I gravitated towards this design, is the nature of what I've been working on., I've spent the last few years building trading systems in Rust. The programs I work on will be receiving market data from multiple sources, over both TCP (via HTTP polling, websocket subscriptions, and other protocols), as well as UDP, and will be sending network data to multiple sources, including exchanges (FIX) and multiple databases, while writing to several kinds of log and telemetry files. Meanwhile, performance, especially latency, is very important, and I'm generally maxing out the CPU capacity and always looking for more resources.

Basically, there's a lot going on! One thing Rust gives me is total control, and I love that.

So when I see the tokio decorator on fn main...

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // ...
}

... that has been just a complete non-starter for me -- you are not getting my !#$ing main function!

More broadly, handing control flow of your program over to an external runtime means handing some of the most important parts of your program, from a performance perspective and otherwise, to an external runtime. That has just not appealed to me.

Next: why I decided it's finally time to give async/await a shot...