March 22, 2016

Implementing Leader Election and automatic fail over

Leader election and automatic fail over are two common tasks in every distributed environment. There are various reasons why multiple instances of the same software are running at the same time, for example because of load balancing, hot standby or dedicated instances for different tasks (same software but other configuration, etc.). Depending on your architecture, different aspects of these are more important than others. Furthermore, it is easier to develop services that have one master and one or more slaves than developing service set-ups in which every service is equal.

One common task would be having multiple services running for load balancing but only one that executes batch jobs like data base clean up jobs. This can be done in different ways, but one smart way would be to have the same configuration for all service instances and let the services decide at runtime which one executes the batch job (typically the master slave: the master executes the batch jobs and the slaves do not execute any batch jobs at all).


The figure below shows the setup we want to achieve. There are one to n instances of our service running on different machines connected via network. They need to be coordinated so that only one of them executes the batch job.

Seems easy, but there are several questions that make this task more complicated:
  •  How is the master (leader) elected?
  • What happens if the leader dies? (automatic fail over)
  • What happens if the leader just disconnects from the other services?
  • What happens if he reconnects?
  • What happens if the leader is not responsive anymore? (Deadlock?)

These are a lot of corner cases that need to be considered to get a robust and reliable system. The most complicated part in practice is connection handling. Dealing with issues on network level, like a broken connection and timeouts, is difficult to test and to implement a correct solution is not an easy task at all. Using a technology stack that handles these issues for us is the best solution to be up and running in reasonable time. This is where Apache Zookeeper can help.

Apache Zookeeper

Apache Zookeeper is a solution for a wide range of issues that come with the distribution of services e.g. naming, configuration and service synchronization. It is used in cluster environments like SOLR Cloud and others. It can be seen as a simple distributed file system with some extras. There is a root node that can have child nodes. Each node can have its own properties. There are different types of nodes. For our use case these two are important:
Ephemeral Node: This node is a temporal node. As soon as the client that created the node disconnects or gets a time out it will disappear

Sequential Node: These nodes get their names automatically with a sequence number as suffix that is growing.

The next step is to use these nodes for our leader election.

Apache Zookeeper provides a recipe for that (another good article that covers that topic can be found here). The figure below shows a quick overview:




  1. If not exists create the root node ELECTION
  2. Create Node e.g. ELECTION/guid-n_001 that has flags sequence and ephemeral set.
  3. Get all children of ELECTION
  4. If the created node of the instance is the smallest, this instance is the leader.
  5. If not, watch for delete events on ELECTION. If an event occurs check the above condition again.
A service with two running instances, for example, will look like this on the Zookeeper server with dev-messagingService-election as root node (screenhot shows zkui):



As shown there is a root node

/dev-messagingService-election

With two child nodes (one for each services running)

P_0*1052
P_0*1072

In this scenario the instance that created P_0*1052 is the leader. Schematically it looks like shown in the figure below. Each node is connected to Zookeeper and has one node that is only there as long as the connection is active. This will be ensured by Zookeeper.



There are frameworks that already implement this, and other Zookeeper recipes like Apache Curator. Using these framework is as simple as plug and play. Nevertheless, there are some shortcomings in e.g. dependencies that come with these frameworks and error handling. We decided to implement our own solution based on the suggested recipe from Zookeeper.

Our Solution is as simple as shown in this figure:





All services that have master and slave modes just implement the ClusterService interface. It has two callback methods: one going to master mode and the other one going to slave mode. These callback methods will be called at any state change. The service keeps an internal state using an AtomicBoolean. Depending on whether it is in master mode or not, the batch job processes something or just skips the work part. It is as simple as that, everything else is done in the ClusterManagerService. So each service does not have to deal with any cluster functionality at all and we could easily switch the actual implementation of how the leader is elected and even the technology that it is based on. The ClusterManagerService deals with Zookeeper and handles any changes that may occur and errors like session timeouts and connections problems.

To test this, we have provided some example code.

How to run our example

Our implementation and some code for demonstration can be found on our github account:


Furthermore, a Zookeeper instance is needed. It can be downloaded here. There are startup scripts for various platforms provided in the bin directory. Simply start the server and this step is done - no additional configuration is needed.

Normally, no changes need to be made to the configuration of our example code (leader-election\src\main\resources\application.yml). But you might want to double-check the IP and port of your Zookeeper instance.

  1. cluster:
  2.   name: testCluster1
  3.   leaderElectionRootNode: /
  4.   zkhost: localhost:2181


For testing purpose two shells are needed to start two different service instances. A service can be started with:

mvn spring-boot:run

After the startup, this output will be shown

  1. INFO 16336 --- [pool-2-thread-1]a.w.b.c.s.i.ClusterManagerService        : Process node created. Path: /testCluster1Election/p_0000000005
  2. INFO 16336 --- [pool-2-thread-1]a.w.b.c.s.i.ClusterManagerService        : Going to master mode
  3. INFO 16336 --- [pool-2-thread-1]a.w.b.c.s.i.ClusterManagerService        : Going to master mode ...ClusterManagerService{PROCESS_NODE_PREFIX='/p_', clusterName='testCluster1', leaderElectionRootNode='/testCluster1Election', processNodePath='/testCluster1Election/p_0000000005', watchedNodePath='null', zookeeperURL='localhost:2181', status=SLAVE}
  4. INFO 16336 --- [pool-2-thread-1]a.w.b.c.service.example.TestService      : goToMasterMode
  5. INFO 16336 --- [           main]a.w.b.c.service.example.TestService      : Doing some work ...
  6. INFO 16336 --- [           main]a.w.b.c.service.example.TestService      : Doing some work ...
  7. INFO 16336 --- [           main]a.w.b.c.service.example.TestService      : Doing some work ...
  8. INFO 16336 --- [           main]a.w.b.c.service.example.TestService      : Doing some work ...

The service is up and running and doing some work in master mode. Now start another instance - the output will look like this:


  1. INFO 16020 --- [pool-2-thread-1]a.w.b.c.s.i.ClusterManagerService        : Process node created. Path: /testCluster1Election/p_0000000007
  2. INFO 16020 --- [pool-2-thread-1]a.w.b.c.s.i.ClusterManagerService        : Watching path: /testCluster1Election/p_0000000006
  3. INFO 16020 --- [           main]o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
  4. INFO 16020 --- [           main]at.willhaben.blog.cluster.service.Main   : Started Main in 2.724 seconds (JVM running for3.923)
  5. INFO 16020 --- [           main]a.w.b.c.service.example.TestService      : Doing nothing ...
  6. INFO 16020 --- [           main]a.w.b.c.service.example.TestService      : Doing nothing ...
  7. INFO 16020 --- [           main]a.w.b.c.service.example.TestService      : Doing nothing ...
Because it is in slave mode, nothing will be done. Now stop the first node and watch the output of instance 2:
  1. INFO 16020 --- [ain-EventThread]a.w.b.c.s.i.ClusterManagerService        : Going to master mode ...ClusterManagerService{PROCESS_NODE_PREFIX='/p_', clusterName='testCluster1', leaderElectionRootNode='/testCluster1Election', processNodePath='/testCluster1Election/p_0000000007', watchedNodePath='/testCluster1Election/p_0000000006', zookeeperURL='localhost:2181', status=SLAVE}
  2. INFO 16020 --- [ain-EventThread]a.w.b.c.service.example.TestService      : goToMasterMode
  3. INFO 16020 --- [ain-EventThread]a.w.b.c.s.i.ClusterManagerService        : Going to master mode
  4. INFO 16020 --- [ain-EventThread]a.w.b.c.s.i.ClusterManagerService        : Going to master mode ...ClusterManagerService{PROCESS_NODE_PREFIX='/p_', clusterName='testCluster1', leaderElectionRootNode='/testCluster1Election', processNodePath='/testCluster1Election/p_0000000007', watchedNodePath='/testCluster1Election/p_0000000006', zookeeperURL='localhost:2181', status=MASTER}
  5. INFO 16020 --- [ain-EventThread]a.w.b.c.service.example.TestService      : goToMasterMode
  6. INFO 16020 --- [           main]a.w.b.c.service.example.TestService      : Doing some work ...
It seamlessly takes over the work from instance one. At this point we have automatic leader election and fail over in place with just implementing one interface. Couldn’t be simpler than that.

Summary

Dealing with issues in a distributed environment is difficult due to many different errors that may occur on each level of the software and even hardware stack. Using ready-for-production recipes in combination with proven infrastructure components like Zoopkeeper can really facilitate the task of creating a robust distributed system. Apache Zookeeper provides a wide range of solutions for these systems, such as distributed looks, queues, barriers, etc. Using this existing infrastructure really simplifies creating distributed services.

2 comments: