Deferred Reclamation
A piece on shared allocated resources mitigation in Rust.
Today I'd like to write about an issue I encountered at work not so long ago โ What do you do with allocated data you can't free?
The Problem
A quick rundown of the problem first, say you're writing an audio app, and you've got 2 threads. One fetching the data, we shall call the main thread, and one feeding the data to your I/O, we'll call this one the audio thread:
// My Audio App
struct Data {
inner_data: Vec<f32>,
}
impl Data { /* ...*/ }
fn main() {
// Audio Thread
std::thread::spawn(move || {
loop {
// Decode data and feed IO here...
}
});
// Main Thread
// Send data...
}
Now we need the two threads to talk to each other, and for this we'll need some bit of scaffolding code. A de facto standard to communicate between threads are channels, and for our purposes, we'll use an SPSC, single producer (main thread), single consumer (audio thread) lock-free channel. ringbuf promises just that โ let's set it all up:
use ringbuf::traits::{Consumer, Producer, Split};
// ...
fn main() {
let (mut tx, rx) = ringbuf::HeapRb::<Data>::new(2).split();
// Audio Thread
std::thread::spawn(move || {
loop {
// Decode data and feed IO here...
while let Some(data) = rx.try_pop() {
// Feed data to IO...
}
}
});
// Main Thread
// Send data...
tx.try_push(Data::new());
}
Our threads are now able to communicate effectively without locking,
but this design has some issues. First off, backpressure may build up on the main thread,
calling try_push would then fail. It's more of an inconvenience as it stands, we just have
to bump up the channel's capacity or write the amenities to handle failure cases on main and
try our luck when the audio thread catches up. But it points to a real core limitation of channel
in the context of real-time programming despite being lock-free, I'll come back to this later.
There's another problem, can you spot it?
To fully illustrate this, let's try to understand the constraints under which our audio
program is running. Everything happening in the audio thread has to be 100% lock-free in order to
comply with I/O tight delivery schedule. You cannot wait for some process to resolve
โ this involves mutexes, allocation, any kind of syscalls, and the list goes on. It's safe to say,
there are a million ways our code or someone else's stalls on a blocking call, and thus fails to
finish within schedule. It's therefore not reasonable to cover every pitfall ourselves. As such,
we will make use of RTSan, a real-time LLVM sanitizer, that will detect real-time violations
for us. There have been a couple attempts at adding this sanitizer to Rust, see here
and here, but nothing's landed in stable as of the time of writing. Thankfully,
there's a standalone version we can use instead, named rtsan_standalone. Mind you,
this is only working on Linux, macOS and iOS.
For those still figuring it out, let's run our program with rtsan enabled by
adding the attribute #[nonblocking] above our realtime functions (the audio thread) and run it
with RTSAN_ENABLE=1 cargo run:
==59289==ERROR: RealtimeSanitizer: unsafe-library-call
Intercepted call to real-time unsafe function `free` in real-time context!
#0 0x0001051c5b58 in free+0x84 (libclang_rt.rtsan_osx_dynamic.dylib:arm64+0x5b58)
#1 0x000104f66390 in _RNvXs1_NtCsc55tWW7md0m_5alloc7raw_vecINtB5_6RawVecfENtNtNtCsiDBv732ZEpx_4core3ops4drop4Drop4dropCs1fiPlq23UsI_20deferred_reclamation mod.rs:404
...
#17 0x00018e220ba4 in thread_start+0x4 (libsystem_pthread.dylib:arm64+0x1ba4)
SUMMARY: RealtimeSanitizer: unsafe-library-call mod.rs:404 in _RNvXs1_...
fish: Job 1, 'RTSAN_ENABLE=1 cargo run' terminated by signal SIGABRT (Abort)
// Audio Thread
std::thread::spawn(move || {
loop {
while let Some(data) = rx.try_pop() {
// ...
} // <-- Oopsie Daisy
}
});
Data gets dropped, deallocator gets called and in turn free gets called, which may or may not
make a syscall. RTSan spotted the unsafe call and panicked.
This also underlines how valuable using a sanitizer is, Rust borrowing semantics makes it harder
to spot those calls at a glance.
Now to address this issue, we simply send back Data, it's pretty straight forward, we just instantiate a new channel,
once we are done with processing Data, we send it back to main to be freed.
This is in essence deferred reclamation:
use ringbuf::traits::{Consumer, Producer, Split};
// ...
fn main() {
let (mut main_tx, audio_rx) = ringbuf::HeapRb::<Data>::new(2).split();
let (mut audio_tx, main_rx) = ringbuf::HeapRb::<Data>::new(2).split();
// Audio Thread
std::thread::spawn(move || {
loop {
// Decode data and feed IO here...
while let Some(data) = audio_rx.try_pop() {
// Feed data to IO...
// ...
// Defer drop to main
audio_tx.try_push(data);
}
}
});
// Main Thread
// Send data...
main_tx.try_push(Data::new());
// Drop data
let _ = main_rx.try_pop();
}
Now coming back to our earlier point, remember how channel is bounded, and try_push
fallible? This minor inconvenience just turned into a real problem, we have no given way
to account for it whilst ensuring realtime compliance. If try_push were to fail, our
only way is to silently drop Data and hope I/O doesn't notice.
Linked-List
To solve this, we need something unbounded, that'd perform just as well which would not
interact with the OS scheduler. So a wait-free SPSC linked-list queue of sorts, and there exists
one named llq that addresses exactly this issue. The idea is that you can add an infinite number
of nodes (pointers) to a queue, turning push infallible at the mere cost of a couple of atomic
operations, let's see it in action:
use llq::Node;
fn main() {
let (mut main_tx, mut audio_rx) = llq::Queue::<Data>::new().split();
let (mut audio_tx, mut main_rx) = llq::Queue::<Data>::new().split();
// Audio Thread
std::thread::spawn(move || {
loop {
// Decode data and feed IO here...
while let Some(node) = audio_rx.pop() {
// Feed data to IO...
// ...
// Defer drop to main
audio_tx.push(node);
}
}
});
// Main Thread
// Send data...
main_tx.push(Node::new(Data::new()));
// Drop data
let _ = main_rx.pop();
}
And that's it. The pattern extends naturally โ you can wrap this in a Drop impl for better
ergonomics, but the core idea stays the same. Until next time, take care.