Wednesday, November 3, 2010

Performance of Message Patterns

I've been faced with a dilemma in distributed processing.  I have a "client" process that needs to send a message to several servers.  The servers contain a versioning service that stamps a version on each message for concurrency management, but the client doesn't have this service.  Each of the servers must end up having the same version number for the message.  And it must be blindingly fast.

Either the servers are going to have to talk to each other to agree on a version for the message or some other trick is going to have to be used.

The simplest approach is to send a request to one of the servers to get a version number and then send the version out with the message to all of the servers in parallel.  This is nice because the message can be serialized to the network in parallel and there are mechanisms in place that will do this very quickly.

Here's a diagram of the first scenario.  If any of the diagrams are hard to read, you can click on it to see a larger image.


Instead of sending a request for a version to the first server, the client could send the message and expect a version number in response.  It could then add this to the message and send it to the other servers.  This would require multiple serialization of the message to on-wire format, but might be faster for small messages.


Another approach is to delegate the sending of the message to one of the servers ("one hop messaging").  The client sends the message to one of the servers and it, in turn, sends the message to the other servers after adding a version stamp to it.  This also requires some level of extra serialization since the client must write the message to the network and then the selected server must forward the message to the other servers over the network.

We can simplify the acknowledgment scheme by having each server send an ack directly back to the client instead of piping them through Server1.  This would cause context switching in the client, but might be better than funneling the acks through Server1.


Yet another approach is to have the client send the message to all of the servers and include a tag that selects one of the servers to supply the version tag.  After the selected server (Server1 in the diagram below) receives the message it stamps it with a version and then sends that version to the other servers.  The other servers wait for the version number before accepting the message.  This, too, requires extra context switching because the other servers have to wait for a signal that the version number has been received.



I wrote a program to simulate these different approaches to see how they compared.  Like the diagrams, I used a client and three servers running on a fairly large and fast Linux computer.


The payload column shows the size of message that was used in each test run, and each column shows how many seconds it took the approch to handle 1 million messages.  The "no versioning" column is a base-line that shows the performance of simple send-with-acknowledgement messaging.


The results were a little surprising to me.  I had expected the last approach, where the client sends the message to all servers and  they wait for one to send a version number, to have the best performance.  Instead, all but one of them converge to the same performance level when messages reach 10,000 bytes in size.  At this level they are only moving about 70mb of data through the system per second, so they aren't being throttled by the network, but with the extra synchronization points and context switches CPU was becoming a limiting factor.

The "version request" and "message returns version" scenarios (the first two diagrams above) are clear losers because server2 and server3 do not even see the message until a complete send and response cycle is performed with server1.

The "one hop" scenario had a poor showing because of the long acknowledgement chain, with both server2 and server3 sending their acks to server1.  Server1 has to wait for both acks before it finally sends its own ack back to the client.

The clear winner is the "one hop, ack client" algorithm with servers sending acknowledgements directly to the client.  It even converged with and then passed the base-line "no versioning" scenario at about 3000 bytes/message.

Thursday, September 23, 2010

Virtual Synchrony in GemFire

A virtual synchrony protocol ensures that all past operations have been delivered before a membership change is put into effect.

In our GemFire product, the equivalent of virtual synchrony is achieved in Gemfire in a component of the cache called a DistributionAdvisor.  These are mini membership managers that track the ownership of cache Regions across the distributed system.  When a member creates a new cache Region that others already have, the DistributionAdvisor is used to perform a flush operation (called State Flush in Gemfire) to ensure that all past operations have been applied before the new member's Region is allowed to be used.  There are a number of steps involved in this operation for which I have applied for a patent (#11/982,563).  Outside of this we haven't seen the need for a full virtual synchrony protocol covering all communications.

Thursday, July 1, 2010

JGroups address reuse

One of the things we avoided for years in GemFire was the fact that its JGroups component can reuse an old address with a new member.

When a process joins a JGroups channel it creates an ephemeral datagram socket and uses its port number + the machines IP address to create a membership ID. It then finds the membership coordinator and sends a Join message to it with this ID. The coordinator sends out a new membership view containing the new member's ID.

If the address happens to be one that was recently used by another process on the same machine, it can look like the member left and then joined again. This is actually possible for a single process if you use the JGroups merge protocols.

GemFire keeps track of recently departed members in a "shunned members" collection and has used this to keep addresses from being reused. If a member tries to join using an old ID, all messages from it are ignored. The new member then times out in its attempt to send startup messages to other members of the system and throws a SystemConnectException.

That was fine for a long time, but new customers have brought new demands and we now allow you to restrict the range of ephemeral ports that JGroups uses. This makes ID reuse much more likely. Setting a port range of 1000-1050, for instance, only provides a pool of 51 ports to select from.

We tried several approaches to making identifiers less likely to be reused before finding a solution that worked. We tried adding the cache's peer-to-peer stream socket port number to the ID and adding randomly selected bytes to the ID before hitting on the idea of using the JGroups Lamport clock.

Now when a member starts to join the system it creates a temporary ID that contains its InetAddress and JGroups port. This address is put in the Locator's discovery set and is sent to the membership coordinator in a Join request. The coordinator responds by sending back the current membership view and a modified ID that the member should use. This ID has the JGroups Lamport timestamp embedded in it and this is then used in ID equality checks to differentiate between two IDs that have the same InetAddress and port.

In JGroups the Lamport timestamp is nothing more than the membership view number. It is usually incremented by 1 each time a new membership view is formed. Having this information in the ID not only lets us make them "more unique" but it also lets you see at a glance when a member joined the system.

Having this information in the identifier has eliminated address reuse completely. The Lamport clock is never decremented or reset, so each new member is guaranteed to have a unique ID and not be confused with an old, departed member.

Wednesday, May 12, 2010

GemFire and Paxos

People have recently been asking me about the Paxos algorithm and how GemFire implements it. I'm not an expert on Paxos itself, but have spent years dealing with the problems that arise if you do not have something similar to it in place in a distributed computing product like GemFire.

Paxos is a family of protocols for developing consensus in a distributed system. The basic idea is that there is a Client that sends a message to effect some change. One or more Acceptors exist to service the message, and the message is sent to all of them. A Proposer works on behalf of the client to make sure the Acceptors agree on the message. The message is then acted on and a Learner is tasked to effect replication of the change.

There is also a Leader, which is selected from among the Acceptors. If there is no unique Leader, no progress is allowed in the system.

In GemFire there are Client processes that send messages to Servers. The Client forms a unique identifier for the message with respect to the Client and the originating thread. In a shortcut of the general Paxos algorithm a Server acts as both an Acceptor and a Proposer. It determines the member having primary storage for the message and forwards the message to it.

The member having primary storage plays the role of Learner. It is responsible for effecting the change in primary storage and then replicating that change in other Servers. Once it is done, any result is sent back to the first Server (Paxos Proposer) and the result is sent back to the client.

If after sending the message, the Client deems that the Server is taking too long to process it, another Server is selected and the client sends the message (with the same unique identifier) to it. The Server finds the member having primary storage and forwards the message to it. The member having primary storage (Paxos Learner) protects its storage by checking the message's unique identifier. If the identifier has already been applied to the store, it is marked as a possible-duplicate and then replicated to other Servers. Each other server is also responsible for performing duplicate-checks.

The "timeout/retry" logic in the Client can cause duplicate processing, but in the normal case it does not and is much faster than a cumbersome back-and-forth Client/Acceptor/Proposer interaction.

The Paxos Leader role is handled in GemFire by the low-level membership system. Administrative members of the system and those marked as not being Servers are excluded from election as Leader. After this, the oldest Server is selected as the GemFire "lead" member.

Paxos also has the notion of a Quorum. Wikipedia defines this as "Quorums are defined as a family of subsets of the set of Acceptors such that any two subsets from the family (that is, any two Quorums) have a non-empty intersection.". In GemFire this function is handled by the Roles feature. Each Server can note that it provides one or more Roles and each cache region can require one or more Roles in order to function. If the required Roles aren't present, the cache region experiences a "loss action" which can cause it to become read-only, block access, or even shut down and restart the cache. This provides better control over the behavior of the system when there are machine or network failures than a simple Quorum mechanism.

Paxos also has the notion of a Choice when there are conflicting values. This is essentially handled by the unique identifiers assigned to messages. The identifiers can be used to order conflicting values and determine which is newer or older than the other. This is done during duplicate-detection and is a very cheap and lightweight way to implement Choice.

So there is quite a strong mapping from GemFire's distributed correctness algorithms and Paxos. Where there are differences it is usually because we opted for higher performance at the possible cost of doing a small amount of extra work (duplicate processing).