Friday, January 29, 2021

Integrating Apache Geode with the Rapid cluster membership system

In 2020 I set a goal of integrating an alternative Membership Service with Apache Geode.  A team of engineers (including myself) broke that service out into a separate module (link) in 2019 and I wanted to see if it was possible to use a different implementation.


All distributed systems are based on a Membership Service.  Geode, Cassandra, Hazelcast, Coherence etc.  all depend on one to know which processes (nodes) are part of the cluster and to detect when there are failures.  There needs to be a way to introduce new nodes into the cluster and a way to remove current nodes when the need arises.


Apache Geode's Membership Service (which I'll call GMS) grew out of a heavily modified fork of the JGroups project.  Though it's been rewritten to not depend on JGroups, Geode still uses the same concept of a membership coordinator (usually the oldest node in the cluster) that accepts join/remove requests and decides whether a node should be kicked out of the cluster.  It also has the other components you'd expect in a Membership Service like a message-sender/receiver and a failure detector.


Halfway through the year I read a paper on the Rapid membership service claiming it could spin up a large cluster in record time and detect failures with a high probability of success.  There was also a Java implementation of the service (as well as a Go impl, which is interesting from a K8s perspective) which made it a nice fit for an integration project.  Perfect!  So I cloned the Rapid repo and started integrating it with Geode.


As an initial goal I wanted to demonstrate that the integration would at least pass a non-trivial integration test and an even more complicated distributed unit test, which orchestrates a cluster of JVMs to test distributed algorithms.  I also wanted to try out a regression test that kills off processes and asserts that the membership service detects and resolves the node failures.


Another goal was to see what dependencies Geode has on its membership implementation that make it difficult or impossible to use a different implementation. I've highlighted these findings in the sections below.


The modularization effort that I mentioned created a nice API for Geode's membership service but hardcoded the implementation into its Builder implementation.  I didn't bother with creating a pluggable SPI during the integration. Instead I modified the Builder to create a Membership based on Rapid.  The Rapid project includes a sample Netty-based messaging component and a ping-pong based failure detector that are similar enough to what Geode has that I decided to use them in the integration effort.


Node Identifiers


In a distributed system nodes have identifiers that let you know how to contact them.  Rapid uses UUIDs and these map to physical addresses that contain the Netty host and port.  Other systems like JGroups use a similar approach, but not Geode.  It has an overgrown membership identifier that includes a lot of information about each node.  That was the first problem I ran into.  How could I disseminate the metadata about each node to other members of the cluster?  That metadata included things like the address used for non-membership communications, the type of node (locator, server) and the name assigned to it.  Fortunately Rapid's Cluster implementation allows you to provide this data when joining the cluster and makes it available to other nodes in its callback API.  I merely serialized the Geode identifier and set that as the metadata for the process joining the cluster.  In the callbacks I could then deserialize the ID and pass that up the line.


--> This points out that Geode requires a way to transmit metadata about a node in membership views and node identifiers.  The metadata is static and is known when joining the cluster.


Map<String, ByteString> metadata = new HashMap<>();
metadata.put(GEODE_ID, objectToByteString(localAddress, serializer));
clusterBuilder = new Cluster.Builder(listenAddress)
.setMessagingClientAndServer(messenger, messenger)
.setMetadata(metadata);
clusterBuilder.addSubscription(com.vrg.rapid.ClusterEvents.VIEW_CHANGE_PROPOSAL,
this::onViewChangeProposal);
clusterBuilder.addSubscription(com.vrg.rapid.ClusterEvents.VIEW_CHANGE,
this::onViewChange);
clusterBuilder.addSubscription(com.vrg.rapid.ClusterEvents.KICKED,
this::onKicked);

 

Joining the cluster


Next I needed to reconcile the discovery mechanisms of Rapid and Geode.


Like many other systems Rapid relies on there being Seed Nodes that new processes can contact and request to join the cluster.  Any process that's already in the cluster can be used as a Seed Node — you just need to know the host & port of the node to use it as a Seed Node.


Geode's membership service, on the other hand, doesn't use Seed Nodes. Instead it uses a Locator service to find the node that is currently the membership coordinator and uses that node to join the cluster.  All of the Geode unit tests that create a cluster expect this behavior, so if I wanted to use those tests as-is I couldn't use Seed Nodes.


The Locator service is a simple request/reply server that keeps track of who is in the cluster and gives new processes the address of the membership coordinator.  I decided to write an alternative plugin to Geode's GMSLocator class that would allow multiple Rapid nodes to start up concurrently.


A new process contacts the new RapidSeedLocator and asks for the ID of a seed node.  Normally there is a seed node available and the new process starts the Rapid cluster using the seed node's address.


seedAddress = findSeedAddressFromLocators();
if (!seedAddress.equals(listenAddress)) {
cluster = clusterBuilder.join(seedAddress);
}


If there are no nodes in the cluster the new process loops a few times to see if one will show up. If none does it starts a new cluster. It's a little more complicated than that because there may be multiple processes starting up at the same time and they need to work out which one will bootstrap the cluster.


if (bestChoice.equals(localAddress)) {
logger.info("I am the lead registrant - starting new cluster");
cluster = clusterBuilder.start();
} else {
logger.info("Joining with lead registrant");
cluster = clusterBuilder.join(seedAddress);
}


This Locator is behavior that we took from the JGroups project and have used in Geode since its inception.  Any seed node based system could be fit into this same pattern.


Other than that, all I needed to do was implement leaving the cluster, the callbacks announcing new/left/crashed nodes and a callback that handles being kicked out of the cluster.  That can happen if a node becomes unresponsive and the other nodes (observers) declare it dead.


--> Geode uses a Locator service to find the node that's the membership coordinator.

--> Geode needs to handle concurrent startup of the initial nodes of a cluster.


Integration Testing


There was an existing unit test for Geode's Membership class that I used to shake out problems with the integration, MembershipIntegrationTest.  The simplest test in that class just boots up a cluster of 1 and verifies that the cluster started okay.  It passed so I went on to the next test, one that boots up a 2 node cluster.  That one failed because the view identifiers in Rapid are not a monotonically increasing integer as they are in Geode's GMS.  Instead they are a hash of the node IDs and endpoints in its membership view.  Geode will refuse to accept a membership view with a view ID lower than the current view's and, since it's just a hash, Rapid's identifiers jump all over the place.


The only way to get around that problem was to comment out all of the checks in Geode that pay attention to view identifiers.  After doing that the 2 node test passed.


Other tests in MembershipIntegrationTest were variants of that second test and passed.  That was encouraging.


--> Geode expects monotonically increasing membership view identifiers.


Distributed Unit Testing


I moved on to a multi-JVM test using a Cache, DistributedAckRegionDUnitTest.  For this kind of test a Locator and four satellite JVMs are started by the test framework and then all of the test methods are executed in the unit test JVM.  The unit tests assign tasks to the satellite JVMs via RMI.  For instance, a test might assign tasks to create a Cache, populate it and then verify that all of the nodes have the correct data.  At the end of each test method the framework cleans up any Caches created in the JVMs to make them ready for the next test method.


First I focused on getting one test to pass.  The test first assigned two of the satellite JVMs tasks to create a Cache and then have them create incompatible cache Regions.  This test hung during startup because, doh!, the nodes had membership views with identifiers in the different order.  One had [nodeA, nodeB, Locator] and the other had [nodeB, nodeA, Locator].  The problem with this is that Geode's distributed lock service depends on a stable ordering of the identifiers and it uses the left-most node as its Elder in its distributed algorithms.  The two nodes disagreed on which node should be the Elder and it screwed up the algorithms, causing the Caches to hang.


In order for Geode to use a membership service that doesn't provide a stable ordering of the node identifiers some other mechanism would be needed to determine the Elder for the distributed lock service.  For my project I didn't want to get into that so I stored the millisecond and nanosecond clock values in the metadata when initializing each node and used that to sort the identifiers.  That's good enough for unit testing but it's not something that could be used IRL.


--> Geode requires stable ordering of node IDs in a membership view or it needs an alternative way of choosing an Elder for the distributed lock service.


With that change the distributed test passed, as did other tests that I selected at random in DistributedAckRegionDUnitTest.  However, when I fired off all of the tests to run sequentially things started to go wrong.  A run would get part way through the tests and then hang.


Looking at the logs I could see the first satellite JVM try to join with the Locator node and some other node that the test hadn't started.


   [vm0] [info 2021/01/13 11:52:29.455 PST <RMI TCP Connection(1)-192.168.1.218> tid=0x14] mac-a01:63410 is sending a join-p2 to mac-a01:63200 (Locator) for config 4530718538436871917


  [vm0] [info 2021/01/13 11:52:29.455 PST <RMI TCP Connection(1)-192.168.1.218> tid=0x14] mac-a01:63410 is sending a join-p2 to mac-a01:63383 (?????) for config 4530718538436871917


and then Rapid's Paxos phase2 would fail:


  [vm0] [error 2021/01/13 11:52:34.455 PST <RMI TCP Connection(1)-192.168.1.218> tid=0x14] Join message to seed mac-a01:63200 (Locator) returned an exception: com.vrg.rapid.Cluster$JoinPhaseTwoException


but there was no logging for this mac-a01:63383 node.


I finally looked at the previous test's logs and found that this node was used in that test and had shut down but the remaining node (the Locator) didn't install a new view removing it.  A quick talk with the Rapid developer confirmed that Rapid requires at least two nodes to make any decisions, so I modified the test to use two Locators instead of one.  That way there are always two nodes to come to agreement on removal of lost nodes.

--> Geode assumes the cluster can devolve to one node.


When running all 101 tests in DistributedAckRegionDUnitTest I found that the shutdown of a Rapid node doesn't seem to inform all other nodes in a small cluster and they're left to figure out that a node is gone.  This caused new nodes to take 20 seconds or more to join the cluster while the seed nodes figured out the loss of old nodes.  Attempts to join would fail and then there would be a retry.  Decreasing the timeout in Rapid's NettyClientServer to 2 seconds helped with that, but joining the cluster usually took a full 2 seconds or more compared to 50 to 250ms in Geode's GMS.  A better failure detector and a UDP-based heartbeat might give tighter response times.


--> Geode has tight SLAs concerning failure detection and needs to know whether a node has crashed or left gracefully.


Regression Testing


VMware has a large collection of regression tests that it uses to harden its own releases of Geode.  These are similar to the Distributed Unit Tests I described above but use a different framework, can run on multiple machines and typically run for much longer periods of time.  I chose one of the tests from the Membership regression to test the Rapid integration's handling of lost & new members.  After the poor performance of the default ping-pong failure detector with Netty in the Distributed Unit Tests I didn't expect this to go well, and it didn't.  Though there were only 10 nodes in the test, all of them failed in the first few minutes with processes unable to join the cluster.


Looking through the logs I could see some nodes in the cluster accepting a new node while others didn't and the join attempt failing with a JoinPhaseTwoException.  The accepting nodes would then remove the new node.


--> Like Geode's Distributed Unit Tests, VMware's regression tests require a membership service that can handle frequent shutdown and startup of nodes.


* Scalability Testing


I ran a small scalability test with Geode's membership implementation and then with the Rapid implementation. The test doesn't spin up a Geode Cache, but just membership instances. For these tests I used my 16gb Mac laptop and ran the tests under IntelliJ. Sometime I'll grab a bunch of machines in AWS or GCP and do a larger test.


Rapid was unable to create a cluster of more than 71 nodes before running out of file descriptors in this single-machine test, probably due to the use of Netty instead of the connectionless JGroups UDP messaging used by Geode's membership implementation.









Using Rapid, nodes were able to join faster than when using Geode's membership system but the time to join was more erratic. The time required to shut down the cluster was much higher with Rapid.


Conclusions


Integration of Rapid and Geode was fairly straightforward but pointed out dependencies that Geode has on its existing membership system that go beyond what you might expect.  Untangling these so that the membership module is truly pluggable will take work.


These are the dependencies that I noticed:

  • Geode requires a way to transmit metadata about a node in membership views and node identifiers.  The metadata is static and is known when joining the cluster.
  • Geode uses a Locator service to find the node that's the membership coordinator.
  • Geode needs to handle concurrent startup of the initial nodes of a cluster.
  • Geode expects monotonically increasing membership view identifiers.
  • Geode requires stable ordering of node IDs in a membership view or it needs an alternative way of choosing an Elder for the distributed lock service.
  • Geode assumes the cluster can devolve to one node.
  • Geode has tight SLAs concerning failure detection and needs to know whether a node has crashed or left gracefully.
  • Like Geode's Distributed Unit Tests, VMware's regression tests require a membership service that can handle frequent shutdown and startup of nodes.
Integration code can be found here:
https://github.com/bschuchardt/geode/tree/feature/rapid_integration
https://github.com/bschuchardt/rapid/tree/geode_integration


No comments:

Post a Comment