Consensus Protocols: A Paxos Implementation

It’s one thing to wax lyrical about an algorithm or protocol having simply read the paper it appeared in. It’s another to have actually taken the time to build an implementation. There are many slips twixt hand and mouth, and the little details that you’ve abstracted away at the point of reading come back to bite you hard at the point of writing.

I’m a big fan of building things to understand them – this blog is essentially an expression of that idea, as the act of constructing an explanation of something helps me understand it better. Still, I felt that in order to be properly useful, this blog probably needed more code.

So when, yesterday, it was suggested I back up my previous post on Paxos with a toy implementation I had plenty of motivation to pick up the gauntlet. However, I’m super-pressed for time at the moment while I write my PhD thesis, so I gave myself a deadline of a few hours, just to keep it interesting.

A few hours later, I’d written this from-scratch implementation of Paxos. There’s enough interesting stuff in it, I think, to warrant this post on how it works. Hopefully some of you will find it useful, and something you can use as a springboard to your own implementations. You can run an example by simply invoking python toy_paxos.py.

I’m not going to go over again how Paxos works – in fact I’m going to assume that you’ve read and understood my previous post on the subject. However, there’s an extra couple of bits of detail that you’ll need to understand some of the things in the code.

It might help to clarify the abstraction this code is trying to present. I’m using Paxos to simulate a state machine, where the leader proposes a value for the next command for the acceptors to agree upon. Therefore, rather than run just one instance of Paxos, we’re running many consecutive instances, the n^{th} of which is concerned with agreeing on the n^{th} command in sequence. In the code, this index is identified as the instanceID. So now every proposal carries with it a value, a proposal ID and an instance ID.

The idea is to agree upon an ordered history of state machine commands, one command per instance. Clients propose a value for a command they’d like to execute. The consensus service orders these commands and notifies any interested parties. Occasionally, it might happen that there are holes in the history that a leader knows about. In that case, the leader can simply propose a ‘no-op’ value (here 0), and will learn about the real value if there is one, as Paxos ensures that a previously accepted value will stay accepted. Otherwise, 0 gets inserted in the history with no ill-effect.

The code is structured into four main classes. Two are ‘actor’ classes, PaxosLeader and PaxosAcceptor. These two classes receive messages from each other and the outside world (in the leader’s case), kick off protocol instances, get notified when protocols have finished and so on. The other two important classes are PaxosLeaderProtocol and PaxosAcceptorProtocol. These are state-machine classes that keep track of one actor’s view of the state of a single protocol instance. The important code for these two is mainly in doTransition, which takes a message object, looks at the current protocol state, and figures out what to do with it.

Of the two actor classes, PaxosAcceptor is much the simpler. Both classes are attached to a MessagePump class, which listens on a socket and copies messages to a queue for the actor class to consume one by one.

(Note the convoluted thread-within-thread copying from the socket onto a synchronised queue inside the MessagePump – the problem is that I was filling the UDP receive buffer quicker than the actor could pull messages from the socket, so I needed to empty the buffer as soon as possible, asynchronously. I don’t actually know if TCP would have helped here – does the acknowledgement of a packet get sent when the socket is read?)

MessagePump calls back into recvMessage, which in the PaxosAcceptor case simply checks if the message contains a new proposal. If it does, a new PaxosAcceptorProtocol object is created and set loose. Otherwise, the corresponding protocol is looked up through the InstanceRecord structure (of which there is one per instance, and simply contains a table of all known proposals for that instance), and doTransition is called through.

For PaxosLeader, recvMessage is a bit more complicated. The first chunk of code:

if message == None:
    # Only run every 15s otherwise you run
    # the risk of cutting good protocols off in their prime
    if self.isPrimary and time.time( ) - self.lasttime > 15.0:
        # if no message is received, we take
        # the chance to do a little cleanup
        for i in xrange(1,self.highestInstance):
            if self.getInstanceValue( i ) == None:
                print "Filling in gap", i
                self.newProposal( 0, i )
        self.lasttime = time.time( )

only gets run if there is no message to process (and only then if it’s been more than 15s since we last ran this code). The idea is to run through all the instances we know about, and check if we’ve got a value for them. If we haven’t then we propose 0 as a no-op value, as above.

Otherwise, we look at the type of message we’ve received. Heartbeat messages come from other leaders, and are used to indicate that a leader thinks it is the current primary. If a leader thinks it is the primary and receives a heartbeat from a leader whose port number is higher, it yields to that leader. However, if no heartbeats have been heard for a period of time, any leader can assert itself as the primary, and start issuing heartbeats.

If the message comes from a client, the leader constructs a new protocol object and passes control to it, after saving it in the InstanceRecord for the current instance. This is done in newProposal. The current instance number is defaulted to one larger than the previous proposal. However, if we are filling in gaps then we may want to force the instance to an earlier value.

If the message belongs to a protocol we know about, we call through the doTransition as before. However, leaders also want to know about accepted proposals so that, if they need to take over, they can be reasonably up-to-date. Therefore every leader, whether or not it is the primary, runs the leader protocol for any proposals that have got to the accept phase – that is, they listen for a quorate number of accepts and then are aware when a proposal has been accepted and at what value.

Some other niceties: you can substitute AdversarialMessagePump in to arbitrarily delay messages and simulate asynchrony. Acceptors can simulate failure by calling fail( ) on them – they then stop responding to messages. In a proper implementation, you’d want to log to persistent storage every state change in both leader and acceptor so that if they were to genuinely crash and fail they could recover even having lost their memory.

This toy implementation has been designed to run on a single machine in one Python process. Therefore every message pump binds to localhost on a unique port (it would not be hard at all to generalise this to use proper IP addresses). There are doubtless a few bugs, and more than a few bits of ugliness. Let me know what you find!

9 thoughts on “Consensus Protocols: A Paxos Implementation

  1. Great blog, and great articles.

    There are some work comparing quorum based system and ROWA based system and the conclusion was that ROWA system win in regard to the scalability. Do you have some work/opinion on the following conclusion?

    –(1) “Quorum systems in replicated databases: science or fiction” says quorum system can probably give us higher scalability (but maybe not higher availability)
    –(2) “Are quorums an alternative for data replication?” (TODS’03) says if you are considering the scalability, you should simply choose ROWAA in most of the case.

    (1) sounds not completely true nowadays because you can put machines on different racks and etc. But (2) seems to make all the quorum based work in database area to stay in papers 🙁

  2. @Sean

    A quorum system is actually used in Amazon’s Dynamo.

    While ROWA clearly scores poorly on availability, it does make better use of systems when performing writes. My concern about ROWAA is consistency. I hope the node that didn’t get the write went to sleep.

  3. This was my response to Sean, after a quick read of the papers he mentioned:

    1. It seems that ROWAA requires an efficient group membership
    protocol, and the cost of this isn’t built into their analysis. The
    nice thing about majority quorums is that you don’t really need this –
    if you can’t assemble n/2 +1 replicas, you don’t make progress. The
    same obviously applies to ROWA, but tolerating 0 failures makes this
    useless.

    2. If you have a very nicely behaved network, with a uniform totally
    ordered multicast primitive and maybe some synchronicity guarantees
    then I suppose their analysis holds water. However, it’s arguable if
    such networks really exist – given that Google saw fit to use Paxos
    for a service that consists of about 5 machines running in the same
    datacentre, which you would expect to be near ideal conditions, I’m
    sceptical.

    3. I’m not in love with analyses that quote analytic closed forms for
    message overheads when talking about practical considerations. The
    actual overhead the network sees depends a bit on the network
    topology. Again, if every machine is on the same subnet listening to
    the same physical bit of wire, then the idealised conditions hold. But
    when we’re talking about the difference between 5 and 10 messages it’s
    difficult to be convinced that this is a win in real life.

    4. I can’t see that they’ve considered the difference between the
    actual time to assemble quorums. Majority systems just need you to
    wait for the top n/2+1 replies from n replicas. Therefore you’re
    isolated from the poor performance stragglers (replica heterogeneity
    is becoming a big deal – several papers have identified and measured
    the difference between supposedly equivalent replicas). Of course,
    ROWAA means you get very fast reads, and this might be what the
    application is looking for. However, you pay for that with really slow
    writes, even when the entire quorum is available.

    5. No matter what quorum mechanism you choose, you’ve still got to
    layer a commit protocol over it. The message counts for these
    protocols might affect their analysis. It seems they only use 2PC –
    there are significant issues here already!

  4. I think I found an error in your implementation.

    In line 477 you get the count and port of the highestID. But in the next line, you compare ‘count’ to the port number of the proposalID and ‘port’ to the count of the proposalID.

    This can lead to different values being accepted for the same instance (When two processes believe to be leader and start the protocol with the same instanceID, but different values)

  5. rvock –

    Good catch! You’re absolutely right. I really should have used Python’s built in tuple comparison to avoid making these mistakes.

    There’s a new version up which also includes some garbage collection stuff.

    Henry

  6. Hi. Thanks for your great articles.
    These articles really helped me a lot to undestand what Paxos is.

    I think I found that the bug that rvock told is still not fixed.
    At line 500 of toy_paxos.py, the if statement checks count with proposalID[0], but proposalID{0] is a port number. The proposalID is made at line 329 in newProposal().

    And I don’t wanna bother you, but would you please fix the images links in the previous Paxos article?

    Thanks!

Comments are closed.