Monday, October 31, 2011

Multithread Barrier (yes, it's a join!)

This post doesn't want to clearify all aspects of parallel programming. It's just an example to show how you have to thinkg when writing parallel code!

When you deal with concurrent programming, everything is going to be screwed up very easily. Debugging is a pain in the ass, and every one who has ever written a multithreaded software, although very simple as a concurrent server, has experimented with such a pain.

One fundamental operation in multithreaded programs is synchronization. In particular, one operation which I consider very important is a thread barrier, that is a line of code in the program which guarantees that if a thread is executing before it, no one else is executing after it, and if there is some thread after it, then every other thread is, at least, executing that line, but not anything before...





Yes, of course, it's a join. But i find "barrier" to be a better name for this kind of stuff.

But how can you implement such a behaviour? The answer might be sempahores or pthread_join, but they are something I don't like at all.
If you wonder why, the quick answer is that I work on High Performance Computing systems, and they involve calls to the kernel or third party libraries, which involves costs (for mode switch or long jumps) which I don't want to pay, never.

Recently, working on a multithreaded simulation kernel, I had the need to develop a thread barrier to serialize a piece of code which was mangling shared information. I thought that my solution was nice and useful, so I wanted to share it with the world. But instead of simply giving you the code (which you can nevertheless find at the end of this post), I want to go through the design process of this simple unit of code, a thing which might be useful to understand its behaviour.

So, what you have to do, is to syncrhonize threads. Suppose that I have N threads, what I have to do is have them tell every thread that they are in the barrier, and wait each other. This can done is a way similar to:

int i = N;

void thread_barrier() {
 i--;
 while(i > 0);
}

Of course this sucks! The issues involved in this are a lot. The first one you have to address is that i-- is a non-atomic statement, so that the compiler will translate it to: "load the value of i in a register, decrement it by one, store it back into i". So, if the interleaving in the execution pattern is:

  • i = N;
  • Thread 1: load i (read N);
  • Thread 2: load i (read N);
  • Thread 1: decrement i by one (have N-1);
  • Thread 2: decrement i by one (have N-1);
  • Thread 1: store back into i (i = N-1)
  • Thread 2: store back into i (i = N-1)
The result is that i now has a value of N-1 instead of N-2. In addition, the read is neither atomic, so an adversary execution pattern might be:
  • i = N;
  • Thread 1: load i (read N);
  • Thread 2: load i (read N);
  • Thread 1: decrement i by one (have N-1);
  • Thread 1: store back into i (i = N-1)
  • Thread 2: decrement i by one (have N-1);
  • Thread 2: store back into i (i = N-1)
In this case, i has the same incorrect number (this is something which in DBMS theory is called Ghost Update).

This can be overcome in a rather easy way. First of all, let's work on the definition of  the value storing the number of threads. The first change involves telling the compiler that we do not want and old value to be reused: we want to force the read from memory (that's our Bible: only in there we find the truth!) every time we need its value:

volatile int i = N;

In addition, we will have the need to use addresses of that counter. So we don't want the compiler to give us the address of some local copy which will be eventually stored back in the real variable, no: we just want the counter's variable, nothing else. This is forced using a struct, which we will use to define an "atomic" type:

typedef struct {volatile int counter; } atomic_t;

Of course, we need a way to "set" the value of the counter and to read the value of the counter. This can be easily done with:

#define atomic_read(v)          ((v)->count)
#define atomic_set(v,i)         (((v)->count) = (i))

These defines work since the counter is volatile (1) and, on x86, a write is intrinsically atomic (2), that is, it is one single instruction.

Now, we must address the issue that decrementing a variable is a non-atomic operation. To do so, we use the following (more tricky) code:

#define LOCK "lock ; "
static __inline__ void atomic_dec(atomic_t *v) {
        __asm__ __volatile__(
                LOCK "decl %0"
                :"=m" (v->count)
                :"m" (v->count));
}

This code is static, that is, it can be invoked just from the object including the header where it is defined. It's inline, which tells gcc that we prefer not to call the function, but to embed its code in the caller. Of course, this function takes one atomic_t's address.

Inside the function we have some inline assembly. In particular, what the assembly does is to execute the decl instruction, which is a single instruction which takes a variable in memory, decrements it, and stores it back into memory. In conjunction with the lock prefix, it locks the memory bus, so that the three aforementioned steps will be executed with no interleaving on memory (in architectures where there are more processors sharing the same RAM). All the rest of the code is simply used to tell gcc that %0 refers to v->count, that is v->count is both the input and the output of the instruction.

Using this nice function, we can rewrite out barrier as:

atomic_t i;

void barrier_init(int N) {
 atomic_set(&i, N);
}

void thread_barrier() {
 atomic_dec(&i);
 while(atomic_read(&i) > 0);
}
As you may notice, you now have to call the barrier_init() function before relying on the barrier itself. But again, this is somewhat limited: I can use that function just to synchronize on one barrier. Nope, I want this to be far way more general, so we can define another structure to handle multiple barriers at a time, and modifying the functions accordingly:

typedef struct { atomic_t count; } barrier_t;

void barrier_init(barrier_t *b, int N) {
 atomic_set(&b->count, N);
}

void thread_barrier(barrier_t *b) {
 atomic_dec(&b->count);
 while(atomic_read(&b->count) > 0);
}

So now you can define multiple barriers in different parts of the code, and synchronize on them always using the same function. So far, so good.

But there is one thing which in multithreaded applications must be always taken into account: loops (or similar constructs). If all the threads have to synch on a barrier, they will likely pass in that part of code a second time. Now, if a thread reaches the barrier a second time, the counter will be already <= 0, so it won't stop on the cycle and the barrier will not work. What we have in our hand is a disposable thread barrier. We have to find a way to restore the barrier to its initial value. So, first the easy and simple part: let's keep track of the initial value of the counter in the barrier applying these changes:

typedef struct {
 int num_threads;
 atomic_t count;
} barrier_t;

void barrier_init(barrier_t *b, int N) {
 b->num_threads = N;
 atomic_set(&b->count, N);
}

This was not tricky. The problem is that we have to find a way to restore the initial value. If we think of the following as a solution we are wrong:

void thread_barrier(barrier_t *b) {
 atomic_dec(&b->count);
 while(atomic_read(&b->count) > 0);
 atomic_set(&b->count, b->num_threads);
}

In fact, a possible execution pattern is:
  • Thread 1: decrement the counter;
  • Thread 2: decrement the counter;
  • Thread 1: load the counter into a register;
  • Thread 1: check if the counter is greater than zero: no, leave the loop;
  • Thread 1: store back the initial value in the counter;
  • Thread 2: load the counter into a register;
  • Thread 2: check if the counter is greater than zero: yes, stay in the loop;
This execution does not violate the "volatile" meaning, as thread 2 reads the most recent counter value from memory, but makes the barrier ineffective. In fact, Thread 2 will be stuck in the barrier until the first thread will not reach again the barrier because of the loop. But then, the value will be set to the initial one again by T2, which will eventually reach again the barrier, and both T1 and T2 will be stuck in deadlock... Far way more critical then expected, huh?!

To make this post short and avoid a lot of thinking, there is no way only with counters to solve this problem, and any attempt you will make to avoid this will result in just an equivalent thing, always ending up in deadlock. There is a proof for this.

The only way to do this is using Consensus. Consensus is a property of distributed systems, which ensures that all the nodes agree on something (here, on the fact that the counter must be restored). Consensus is not easy to implement, but fortunately the theory tells us that Consensus is formally equivalent to Leader Election (in fact: all the nodes agree on who the leader is!).

If we think of Leader Election in a less formal way which is more related to our case, we can conclude that it is equivalent to say: "I don't care who, but only one, must execute the reset of the counter". If we now think of it as smart computer engineers and not theoretical ones, that's just an atomic test and set!!
That is, one atomic operation that can read the value of a flag, if it is zero it flags it to one, and returns true, otherwise it returns false. Let's implement this and see what we can do with it! Again, some assembly:

static __inline__ int atomic_test_and_set(int *b) {
    int result = 0;

    __asm__  __volatile__ (
                LOCK "bts $0, %1;\n\t"
                "adc %0, %0"
                :"=r" (result)
                :"m" (*b), "0" (result)
                :"memory");

        return !result;
}

We rely on two instructions here: bts (bit test and set) and adc (add with carry). The first copies the bit $0 (which is the zero constant in inline assembly) from the memory address passed to the carry bit in the flags register. The second one, simply peforms a sum of two numbers, adding to them the carry flag (this instruction is used to perform sums with more bits than the normal ones, as you can sum the lower parts of the numbers and the sum the carry to the higher ones).

So, if our flag is 0, bts moves 0 to CF and sets it to 1. Then adc performs result = result + result + CF(=1). Given that at the beginning we declared result = 0, we still have result == 0. We invert it and return it: we have true as return value, when we found the flag unset, after having set it. Given that the bts instruction has the lock prefix, and given that the flags register is per-thread, any interleave will produce the same result here.

On the other hand, if our flag is 1, bts moves 0 to CF and does nothing more. Then adc performs result = 0 + 0 + CF(=1) and so we have result == 1. Therefore, the function returns false having found the bit already set. That's it, our atomic test and set!

Let's see how we can use it:

typedef struct {
        int num_threads;
        atomic_t count;
        int barr;
} barrier_t;

void thread_barrier(barrier_t *b) {
 atomic_dec(&b->count);
 while(atomic_read(&b->count) > 0);

 if(atomic_test_and_set(&barr)) {
  atomic_set(&b->count, b->num_threads);
 }
}

Guess what?! It doesn't work... In fact, we still have a pattern like the previous one: T1 reaches the while loop, load the counter and then goes to lunch. T2 gets elected as leader, restores the counter and T1 is doomed...

So how can we solve this? Well, the leader has to wait for everybody to be synchronized, and to do so we add a second counter:

typedef struct {
        int num_threads;
        atomic_t c1;
        atomic_t c2;
        int barr;
} barrier_t;

void thread_barrier(barrier_t *b) {

        atomic_dec(&b->c1);
        while(atomic_read(&b->c1));

        if(atomic_test_and_set(&b->barr)) {

                atomic_dec(&b->c2);

                while(atomic_read(&b->c2));

         atomic_set(&b->c1, b->num_threads);
         atomic_set(&b->c2, b->num_threads);
        }

 atomic_dec(&b->c2);
}
So the leader now waits for all the other threads to leave the first while, and then resets the value. Are we done? Of course no, the leader has to abdicate before leaving the function, as otherwise no one else will be able to elect itself as leader in future executions of this piece of code. As the leader code is intrinsically sinchronized (remember? Just one leader per time!), it's as easy as:

  b->barr = 0;

But again, the danger is behind you, and another adverse parallel execution is lying in ambush. Going back to our friends, T1 and T2, we have:
  • T1 synchronizes on the first while loop;
  • T2 synchronizes on the first while loop;
  • T1 becomes the leader;
  • T1 goes to lunch;
  • T2 cannot become the leader and returns;
  • T2 performs a lot of smart operations while T1 is having lunch, and reaches the barrier again;
  • T2 tries to synchronize, guess what, the barrier is not greater than zero, it thinks it's synch'ed and goes on
  • T1 resets the barrier
Although no deadlock might come in this situation, we can face an incorrect behaviour, as we are in the following situation:

(code) ----> BARRIER 1 ----> BARRIER 2 ---->

and a thread can overpass barrier 2 while another one (the leader!) is still in barrier 1. If the barriers were meant to synchronize all the threads,  the programmer would think that there are all the threads between two barriers, and this is not the case. If you think a barrier in a loop, you can have some threads performing any number of iterations while the leader is still resetting the barrier. This is not good.

So we have to enhance the barrier a bit more to tell the other threads: "hey wait! The leader is still resetting the barrier from the previous execution!". This can be done like this:

void thread_barrier(barrier_t *b) {
        while(b->barr);

        atomic_dec(&b->c1);
        while(atomic_read(&b->c1));

        if(atomic_test_and_set(&b->barr)) {

                atomic_dec(&b->c2);

                while(atomic_read(&b->c2));

  atomic_set(&b->c1, b->num_threads);
  atomic_set(&b->c2, b->num_threads);

                b->barr = 0;
        }

        atomic_dec(&b->c2);
}
The cycle at the beginning prevents this behaviour. The only thing you have to do is to declare the barrier as volatile, to ensure it's read atomically. So, the full code is here:

struct { volatile int count; } atomic_t;

typedef struct {
        int num_threads;
        atomic_t c1;
        atomic_t c2;
        atomic_t barr;
} barrier_t;

#define atomic_read(v)   ((v)->count)
#define atomic_set(v,i)  (((v)->count) = (i))

#define atomic_reset(b) do {
                                                  (atomic_set((&b->c1), (b)->num_threads)); \
                                                  (atomic_set((&b->c2), (b)->num_threads)); \
                                                  (atomic_set((&b->barr), -1)); \
                                             } while (0)


static __inline__ int atomic_test_and_set(int *b) {
        int result = 0;

        __asm__  __volatile__ (
                                LOCK "bts $0, %1;\n\t"
                                "adc %0, %0"
                                :"=r" (result)
                                :"m" (*b), "0" (result)
                                :"memory");

        return !result;
}

static __inline__ void barrier_init(barrier_t *b, int t) {
        b->num_threads = t;
        atomic_reset(b);
}

static __inline__ int thread_barrier(barrier_t *b) {

        // Wait for the leader to finish resetting the barrier
        while(atomic_read(&b->barr));

        // Wait for all threads to synchronize
        atomic_dec(&b->c1);
        while(atomic_read(&b->c1));

        // Leader election
        if(atomic_test_and_set(&b->barr)) {

                // I'm sync'ed!
                atomic_dec(&b->c2);

                // Wait all the other threads to leave the first part of the barrier
                while(atomic_read(&b->c2));

                // Reset the barrier to its initial values
                atomic_reset(b);

                return 1;
        }

        // I'm sync'ed!
        atomic_dec(&b->c2);

        return 0;
}

where I have added a new small portion of code: instead of simply returning, the thread which was the leader now returns 1, every other thread returns 0. This is because it could be useful, after having synchronized all the threads, to have only one thread performing some operations. If you don't need this, you can simply ignore the return value, otherwise you get it for free!

As a firend of mine once said, you can never tell whether a parallel code is correct or not. I strongly agree with him (with some exceptions). Nevertheless I've used this barrier in a kernel the operations of which are extremely complex, and if the task which led me to developing this routine were not synchronized, the outcome would have been incorrect, which was not. So I'm highly confident you can take this  code and use it for your own needs. Just remember to set the initial value of threads which are running on the barrier before actually having them reach it!

1 comment: