Programming my own locking and why I shouldn't have

Part 3: the (broken) solution


Designing the API

Ok, so now that we know how the Linux kernel does it, let's implement a wait queue in RIOT!

A first challenge that we have to overcome is a matter of ergonomics. In the previous episode, when analyzing both RIOT's condition variable and Linux' wait queue, I flattened out the code i.e. the locking code was in the same scope as the application code. Let's have a look at that again, this time with application code highlighted:


WAKER:
  cond_data = VALUE  
  wake_up_waiters(wq)

WAITER:
  enqueue(wq)
    while (!(cond_data == VALUE)):
      yield()
      enqueue(wq)
  dequeue(wq)
      

First thing, we want to put the wait queue code neatly behind some API. For the waker part, this is de facto already done. For the waiter, however, the condition value check cond_data == VALUE is interleaved with code that we want to hide from the user. But we want to achieve something like this:


WAKER:
  cond_data = VALUE  
  wake_up_waiters(wq)

WAITER:
  if (cond_data != VALUE):
    queue_wait(wq, (cond_data == VALUE))
      

This will not work, simply because we're just passing the value of the expression (cond_data == VALUE) at the time of calling queue_wait(). We need to pass the expression itself s.t. it can be evaluated each time the thread is waken up. As C doesn't feature something like closures, we're left with two options:

  1. Pass the condition check as a function pointer.
  2. Use macro magic to inline queue_wait() in the application code
Let's get back to C and try to design the API of the first variant:


extern void queue_wait(wait_queue_t *wq, bool (*)(void *), void *arg);
wait_queue_t wq;

struct wait_value_t {
    uint64_t *cond_data;
    uint64_t value;
}

bool check_value(void *arg)
{
    struct wait_value_t *wait_value = (struct wait_value_t *)arg;
    return atomic_load_u64(wait_value->cond_data) == wait_value->value;
}

void wait_vor_value(uint64_t *cond_data, uint64_t value)
{
    if (atomic_load_u64(cond_data) != value) {
        wait_value_t wait_value = { cond_data, value };
        queue_wait(&wq, check_value, &wait_value); 
    }
}
      

This is horrible. The praised simplicity of the C language fires back and forces us to do boiler-plate acrobatics. Looks like we have to go for macros:


#define QUEUE_WAIT(wq, cond_expr) /* omit implementation for now */
wait_queue_t wq;

void wait_vor_value(uint64_t *cond_data, uint64_t value)
{
    if (atomic_load_u64((uint64_t *)arg) != VALUE) {
        QUEUE_WAIT(&wq, atomic_load_u64(cond_data) != value); 
    }
}
      

Much, much better. And this is not my idea, the Linux kernel API for the wait queue looks exactly like this, and now we know why.

Implementing the API

Since the wait queue is so similar in scope and semantics with the condtition variable, let's start with the condition variable code and build on that. As of release 2024.10, the locking code for the condition variable in core/cond.c looks like this (with original comments removed and my own added):


void cond_wait(cond_t *cond, mutex_t *mutex)
{
    /* Disable interrupts to ensure exclusive access to scheduler. */
    unsigned irqstate = irq_disable();
    thread_t *me = thread_get_active();

    /* Unlock the mutex associated with the condition variable in preparation
     * for sleep. */
    mutex_unlock(mutex);

    /* Dequeue from the scheduler runqueue. */
    sched_set_status(me, STATUS_COND_BLOCKED);

    /* Enqueue in the condition variable's queue. */
    thread_add_to_list(&cond->queue, me);

    /* Restore previous interrupt enable state */
    irq_restore(irqstate);

    /* Schedule some other thread */
    thread_yield_higher();

    /* Lock back the mutex associated with the condition variable */
    mutex_lock(mutex);
}
      

Ok, so we have all the building blocks required, it's just a matter of re-ordering them. Let's start by defining the wait queue struct. Same as with the condtion variable, we only need a queue of threads. Another similarity is with the wake-up code, which we've seen isn't any different. So then, let's just re-use the cond_t struct in our definition:


typedef struct {
    cond_t cond;
} wait_queue_t;
      

Then we get the wake-up code for free:


static inline queue_wake(wait_queue_t *wq)
{
    cond_signal(&wq->cond);
}

static inline queue_wake_all(wait_queue_t *wq)
{
    cond_broadcast(&wq->cond);
}
      

All that's left to implement is the waiting macro. For now, we'll assume that the user is well-behaved and not calling code that might block with interrupts disabled. We'll come back to this topic later.


#define QUEUE_WAIT(wq, cond_expr)
  do {
    thread_t *me = thread_get_active();

    irq_disable();
    sched_set_status(me, STATUS_COND_BLOCKED);
    thread_add_to_list(&wq->cond.queue, me);
    /* Interrupts must be enabled because the condition check is user code. */
    irq_enable(irq_state);
    while (!(cond_expr)) {
      thread_yield_higher();                                                   
      irq_disable();

      sched_set_status(me, STATUS_COND_BLOCKED);
      thread_add_to_list(&wq->cond.queue, me);
      irq_enable(irq_state);
    }
    irq_disable();
    list_remove(&wq->cond.queue, &me->rq_entry);
    sched_set_status(me, STATUS_RUNNING);
    irq_enable();
  } while (0)
      

There's one more optimization that we can do. In the wait_for_value() function above, the condition expression is checked before calling queue_wait(). That's actually not needed, because the same check is performed in queue_wait(). However, it's an optimization for the case that the condition is already expected to be true, as it skips unnecessarily queuing the current thread. We can pull that check as early exit for the macro:


#define QUEUE_WAIT(wq, cond_expr)
  do {
    if (cond_expr) {
        break;
    }

    thread_t *me = thread_get_active();

    irq_disable();
    sched_set_status(me, STATUS_COND_BLOCKED);
    thread_add_to_list(&wq->cond.queue, me);
    /* Interrupts must be enabled because the condition check is user code. */
    irq_enable(irq_state);
    while (!(cond_expr)) {
      thread_yield_higher();                                                   
      irq_disable();

      sched_set_status(me, STATUS_COND_BLOCKED);
      thread_add_to_list(&wq->cond.queue, me);
      irq_enable(irq_state);
    }
    irq_disable();
    list_remove(&wq->cond.queue, &me->rq_entry);
    sched_set_status(me, STATUS_RUNNING);
    irq_enable();
  } while (0)
      

The user code then simplifies to:


#define QUEUE_WAIT(wq, cond_expr) /* omit implementation for now */
wait_queue_t wq;

void wait_vor_value(uint64_t *cond_data, uint64_t value)
{
    QUEUE_WAIT(&wq, atomic_load_u64(cond_data) != value); 
}
      

This should do it. But man, what a macro. I'm not that versed in the code size costs of inlining, but I do expect that macro to generate a lot more machine code than a simple function call. Let's try to factor out some code, also for better readability.


static inline void _wait_enqueue(wait_queue_t *wq, thread_t *me)
{
    irq_disable();
    sched_set_status(me, STATUS_COND_BLOCKED);
    thread_add_to_list(&wq->cond.queue, me);
    irq_enable();
}

static inline void _wait_dequeue(wait_queue_t *wq, thread_t *me)
{
    irq_disable();
    list_remove(&wq->cond.queue, &me->rq_entry);
    sched_set_status(me, STATUS_RUNNING);
    irq_enable();
}

#define QUEUE_WAIT(wq, cond_expr)
  do {
    if (cond_expr) {
        break;
    }

    thread_t *me = thread_get_active();

    _wait_enqueue(wq, me);
    while (!(cond_expr)) {
      thread_yield_higher();                                                   
      _wait_enqueue(wq, me);
    }
    _wait_dequeue(wq, me);
  } while (0)
      

I don't know if this leads to a smaller binary footprint, but it's a lot cleaner.

Condition data check atomicity

Until now, we assumed that the condition data check is trivially atomic (up to machine word size) or easily achievable (e.g. by using the atomic_*() helpers). But what if the condition expression is something like a == VAL_1 && b == VAL_2? Depending on the context, the two checks might need to be atomic w.r.t. writing to these variables. We could try to pass this responsibility to the user, but then the C language refuses to cooperate once again. For the user to be able to lock, the condition check expression must very probably contain multiple lines, e.g.:


QUEUE_WAIT(&wq, ({
    lock_something();
    bool ret = a == VAL_1 && b == VAL2;
    unlock_something();
    ret;
})); 
      

While code blocks may return values in languages like RUST, the snippet above is not valid C syntax (actually, GCC supports this, but we shouldn't assume a specific compiler). So the user will need to wrap this in a function:


bool check_vars(int *a, int *b)
{
    lock_something();
    bool ret = *a == VAL_1 && *b == VAL2;
    unlock_something();
    return ret;
}

void wait_vor_value(int *a, int *b)
{
    QUEUE_WAIT(&wq, check_vars(&a, &b)); 
}
      

This is not as bad as the first attempt at designing the API, but still unpleasant. So leaving the locking completely to the user is not a great idea. But there's a middle ground. Recall that a wait queue only makes sense when the waker is in ISR context (or maybe both thread and ISR), otherwise we can just go for the condition variables. Therefore, the only locking option left for the user is to disable interrupts. With this assumption, we can then do the following:

Of course, the user must be aware that the macro is going to enable interrupts and not rely, for whatever reason, that they will stay disabled until it returns. And this is the correct assumption too: QUEUE_WAIT() may put the thread to sleep, i.e. may reschedule. And any code that reschedules must, sooner or later, enable interrupts (alas, RIOT doesn't seem to have a proper model for this, but that's a topic for another time). For this, we only have to change _wait_enqueue() and _wait_dequeue() to restore that state, and enable interrupts before calling thread_yield_higher():


static inline void _wait_enqueue(wait_queue_t *wq, thread_t *me, int irq_state)
{
    irq_disable();
    sched_set_status(me, STATUS_COND_BLOCKED);
    thread_add_to_list(&wq->cond.queue, me);
    irq_restore(irq_state);
}

static inline void _wait_dequeue(wait_queue_t *wq, thread_t *me, int irq_state)
{
    irq_disable();
    list_remove(&wq->cond.queue, &me->rq_entry);
    sched_set_status(me, STATUS_RUNNING);
    irq_restore(irq_state);
}

#define QUEUE_WAIT(wq, cond_expr)
  do {
    if (cond_expr) {
        break;
    }

    thread_t *me = thread_get_active();
    int irq_state = irq_disable();

    _wait_enqueue(wq, me, irq_state);
    while (!(cond_expr)) {
      irq_enable();
      thread_yield_higher();                                                   
      _wait_enqueue(wq, me, irq_state);
    }
    _wait_dequeue(wq, me, irq_state);
  } while (0)
      

User code can now do:


void wait_vor_value(int *a, int *b)
{
    irq_disable();
    QUEUE_WAIT(&wq, *a == VAL_1 && *b == VAL_2); 
    irq_enable();
}
      

Pretty. But we're not done.

Sleepwalking

There's still one more edge case that we haven't accounted for: the condition expression itself might call into something that puts the thread to sleep. And that's big trouble, because at that moment, as seen by the rest of the system, we are actually already sleeping. The following chain of events is possible:

  1. QUEUE_WAIT() dequeues current thread from the runqueue and enqueues it in the wait queue. The condition expression is executed and blocks on a mutex, putting the thread to sleep.
  2. Some other thread decides the condition to wake up the waiter thread is met and calls queue_wake()
  3. The waiter thread wakes up and enters the critical section guarded by the mutex, although the mutex is actually still locked

So how does the Linux kernel handle this problem? Well, it doesn't have it in the first place. From what I gather from the mutex implementation (which is arcane and I understand about 10% of it), it just assumes the thread may wake up at any time, and I extrapolate that this is true for other locking primitives too. mutex_lock() does something like this (extremely overly-simplified):


mutex_lock(mutex):
    enqueue(mutex)
        while (!mutex_trylock(mutex)):
        yield()
        enqueue(mutex)

    dequeue(mutex)
      

This is strikingly similar to the condition variable locking code: it keeps trying until the condition check is true, which is that we have actually obtained the lock. In contrast, the RIOT's mutex assumes that, as soon as we wake up from sleep, it is so because the mutex was unlocked.

And there's another closely related bug: if the thread is swapped during the condition check (a higher priority thread becomes runnable) the thread will not get back on the runqueue unless the wait queue gets signaled once again.

As it is neither my intention, nor in my competence to re-think the RIOT's concurrency model, the fix will have to be enclosed within the wait queue code. One thing is clear though: we can't reuse the condition variable code anymore. We need to re-implement the wait queue from scratch.