A Borrowck Battle to Remember

  • Rust's post-NLL borrow checker generally works pretty well, but there are a few situations that can be excruciating
  • In this case, I was trying to keep n log files open for reading, iterating over their data in order
  • Another hard case is reusing a Vec in the body of a loop

In four years of developing in Rust, I've long since made peace with the borrow checker. I'm even fond of it, especially in the post-Non-Lexical Lifetimes era. Rust's borrow checker has saved me from countless bugs, and I am grateful for that.

But, last week on a personal project, I experienced a wicked, bloody, dastardly battle with "borrowck." "Worst fight with borrow checker in years" is the commit message I wrote shortly afterwards.

There are a few kinds of situations in code that are particularly difficult in Rust's ownership. One I'd previously noticed having trouble with is in reusing a Vec as temporary storage in the body of a loop, something like this:

let mut stream: std::net::TcpStream = // ...
let mut buf: Vec<u8> = Vec::new();
let msgs: Vec<Vec<u8>> = // ...
for msg in msgs {
    let parsed: T = serde_json::from_slice(&msg[..])?;
    buf.clear();
    serialize_in_anothre_format(&parseed, &mut buf)?;
    stream.write_all(&buf[..])?;
}

The point of the above example is, the fact that we .clear() the contents of buf in the body of the loop, prior to reading anything from it, does not seem to matter to the borrow checker even though, we don't need to care about whatever was previously in there, since it will never be read from. I've run into this kind of situation many times before, and usually I just create a new Vec in each loop, because I have other things to do.

In my recent "battle to remember" with borrowck, I was trying to iterate over the data in multiple log files with overlapping time ranges. My goal was to keep a set of files open at any given time, and then pull the next entry from the reader with the earliest message, and replacing exhausted readers with the next file in the set.

The time ranges of the log files might be depicted like this...

A     x------x
B        x-------x
C       x---x
D               x------x

     <---  time  --->

...where the xs mark the beginning and ending of a file's range. What I wanted to do was keep A, B and C open, iteratively pulling the earliest (i.e. lowest) next entry from the currently open files, and replacing exhausted readers with ones for the unopened/unread file with the next earliest start time. E.g. in the example above, if you were keeping three files open at a time, the C reader would be the first to become exhausted, and it would be replaced by a D reader.

The log files were generated by the same program running on multiple nodes, each of which saved a log of the data it received.

Importantly, the size of the data contained in the files is such that it would not be possible to hold even one day's worth of files in memory at the same time. In general, the size created a premium on a solution that could stream incrementally over the files, without bringing very many of them into memory at any one time.

Also, did I mention there is a lot of data to get through? I knew heading into this that performance would be critical. This is the hottest of hot loops, and would be hit billions of times during a run of the program.

The files were in a binary format, and an incremental, zero-copy reader existed, something like...

struct BinaryReader<'a> {
    pos: usize,
    data: &'a [u8],
}

...but a bit more involved. Ultimately, to satisfy the borrow checker in this instance I had to abandon the hope of using the BinaryReader and mimic its operation in the body of my loop, as its reference to data was just not possible for me to overcome.

In general, everywhere I might have stored a byte slice or other reference to my current progress ended up being replaced by integer indices.

Here is a simplified and abbreviated version of what I came up with:


/// a segment of data in the binary file that corresponds to an unparsed `ParsedEntry`
/// and the time it was `rcvd`
struct RawEntry<'a> {
    rcvd: u64,
    data: &'a [u8],
}

/// a parsed log entry
struct ParsedEntry {
    // ...
}

pub fn sorted_log_iter<F>(paths: &[&Path], mut apply_fn: F) -> Result<(), io::Error>
    where F: FnMut(&ParsedEntry) -> ()
{
    let max_open_files = 4;
    let n = max_open_files.min(paths.len());
    let mut stack: Vec<&Path> = paths.iter()
        .rev()      // `.rev()` so that initial stack.pop() is paths[0]
        .collect();

    /// stores usize cursor (vs slice) to avoid borrow checker issues
    struct State {                                                 
        cursor: usize,
        bytes: Vec<u8>,
    }

    // helper to construct a new `State` from a `&Path`
    let new_state = |path: &Path| -> Result<State, io::Error> {     
        let raw_bytes = std::fs::read(path)?;
        let validated_bytes = validate_data(&raw_bytes[..])?;
        Ok(State { cursor: 0, bytes: validated_bytes.to_vec() })
    };

    let mut states = Vec::with_capacity(paths.len());

    // `done` is `[false, false, .., false]` initially, 
    // then when `states[i]` becomes exhausted, `done[i]` = true`
    let mut done = vec![false; paths.len()];                        

    macro_rules! n_done {
        () => { (0..states.len()).filter(|i| done[*i]).count() }
    }

    'a: loop {
        let mut open_states: ArrayVec<usize, MAX_N> =
             (0..states.len())
                .filter(|i| ! done[*i])
                .collect();

        open_states.retain(|i| {
            let exhausted = states[*i].cursor >= states[*i].bytes.len();
            if exhausted {
                done[*i] = true;
                std::mem::swap(&mut states[*i].bytes, &mut Vec::new()); // free memory in bytes
            }
            ! exhausted
        });

        while open_states.len() < n && ! stack.is_empty() {
            let path = stack.pop().unwrap();
            let next = new_state(path)?;
            states.push(next);
            open_states.push(states.len() - 1);
        }

        if open_states.is_empty() { break 'a }

        // setup state to track progress in 'b

        let m                                               = open_states.len();
        let mut cursor_offsets  : ArrayVec<usize, MAX_N>    = (0..m).map(|_| 0).collect();
        let mut ix              : ArrayVec<usize, MAX_N>    = (0..m).collect();
        let mut entries         : Vec<Option<RawEntry<'a>>> = Vec::with_capacity(m);

        for _ in 0..m { entries.push(None); }

        'b: loop { // pull sorted entries until one reader is exhausted,
                   // then break out of 'b to reset at top of 'a

            macro_rules! get_next { // macros used to avoid borrowck issues
                ($i:expr) => {{
                    let k = states[open_states[$i]].cursor + cursor_offsets[$i];

                    if k >= states[open_states[$i]].bytes.len() { // states[open_states[i]] exhausted
                        break 'b
                    }    

                    if entries[$i].is_none() {
                        entries[$i] = Some(parse_raw_entry(&states[open_states[$i]].bytes[k..])?);
                    }
                }}
            }

            for i in 0..m { get_next!(i); }
            
            // sort indices to determine which reader has the next message
            ix.sort_unstable_by_key(|&i| {
                entries[i].as_ref()
                    .map(|raw_entry| raw_entry.rcvd)
                    .unwrap_or(u64::MAX)
            });

            // since there are likely to be stretches where one reader (i.e. ix[0]) is
            // repeatedly the lowest, we keep replacing the entry at entries[ix[0]] and
            // comparing it to entries[ix[1]] until it is no longer lowest, then reset
            // by jumping back to the top of 'b

            macro_rules! still_sorted {
                () => {{
                    ix.len() < 2
                    || {
                        let a = entries[ix[0]].as_ref().map(|x| x.rcvd).unwrap_or(u64::MAX);
                        let b = entries[ix[1]].as_ref().map(|x| x.rcvd).unwrap_or(0);
                        a <= b
                    }
                }}
            }

            'c: while still_sorted!() {
                let j = ix[0];
                match entries[j].take() { // consume next entry
                    Some(raw_entry) => {
                        // advance cursor to reflect consumed entry
                        cursor_offsets[j] += raw_entry.n_bytes;
                        // parse entry
                        let parsed_entry = parse_next_entry(raw_entry)?
                        // and now, finally, call the closure on the data
                        apply_fn(&parsed_entry);
                    }

                    None => break 'c,
                };

                get_next!(j);
            }
        }

        // update states to reflect how far each reader was advanced inside of 'b
        // `cursor_offsets` is the marginal progress inside of 'b that needed to
        // be tracked in a different place than states[open_states[i]] for ownership reasons
        //
        for i in 0..m {
            let j = open_states[i];
            states[j].cursor += cursor_offsets[i];
        }
    }

    assert_eq!(states.len(), n_done!());                        // verify all readers exhausted
    assert!(states.iter().all(|x| x.bytes.capacity() == 0));    // verify all memory was freed

    Ok(())
}

What I tried to do initially was keep a list of n open readers, and when one was exhausted (say at states[0]), swap out the value of states[0] with a new reader (the equivalent of xs[0] = 2 but with a reader. This proved impossible to implement in Rust, the borrow checker just would not go for it.

The solution I came up with is to 1) leave the exhausted reader/State at states[0] where it is, 2) push a new reader/State at the end of states, 3) keep track of which indices into states are the currently "open" (i.e. the list of indices in open_states), and 4) swap out the bytes field of the exhausted reader/State (in e.g. states[0]) to free the memory (an entire decompressed file's worth). The bytes field of State is 99.9% of the used memory, as long as we can free that, the cursor, etc. can just stay in states[0] until we are done.

Another major "innovation" (i.e. means of getting around the borrow checker) I came up with is the careful dance between the outer ('a) and inner ('b) loops.

The top of the outer loop identifies exhausted readers in the currently open slots of states, and performs the necessary bookkeeping to open new readers and mark the old slots as done (i.e. done[i] = true). Then the inner loop ('b) proceeds until one reader becomes exhausted, necessitating a reset at the top of 'a.

You might notice that each iteration of 'a sets up a bunch of state to track progress in 'b, something I couldn't get around.

One particularly jaw-dropping moment I encountered was realizing that replacing a Vec with an ArrayVec would, in some cases (but not others), make it fail to compile.

This line, for example:

let mut entries: Vec<Option<RawEntry<'a>>> = Vec::with_capacity(m);

I tried switching it to...

let mut entries: ArrayVec<Option<RawEntry<'a>>, 8> = ArrayVec::new();

...but it absolutely would not compile. This was only true for some of the collections setup in the body of 'a, not all of them.

I am interested to hear from any others regarding, 1) whether they have found a better way to handle this specific situation in Rust, and 2) if they have other borrowck war stories to share!