zookeeper选举

如何在zookeeper集群中选举出一个leader,zookeeper使用了三种算法,具体使用哪种算法,在配置文件中是可以配置的,对应的配置项是”electionAlg”,其中1对应的是LeaderElection算法,2对应的是AuthFastLeaderElection算法,3对应的是FastLeaderElection算法.默认使用FastLeaderElection算法.其他两种算法我没有研究过,就不多说了.

要理解这个算法,最好需要一些paxos算法的理论基础.

1) 数据恢复阶段
首先,每个在zookeeper服务器先读取当前保存在磁盘的数据,zookeeper中的每份数据,都有一个对应的id值,这个值是依次递增的,换言之,越新的数据,对应的ID值就越大.

2) 首次发送自己的投票值
在读取数据完毕之后,每个zookeeper服务器发送自己选举的leader,这个协议中包含了以下几部分的数据:
1)所选举leader的id(就是配置文件中写好的每个服务器的id) ,在初始阶段,每台服务器的这个值都是自己服务器的id,也就是它们都选举自己为leader.
2) 服务器最大数据的id,这个值大的服务器,说明存放了更新的数据.
3)逻辑时钟的值,这个值从0开始递增,每次选举对应一个值,也就是说:如果在同一次选举中,那么这个值应该是一致的 2)逻辑时钟值越大,说明这一次选举leader的进程更新.
4) 本机在当前选举过程中的状态,有以下几种:LOOKING,FOLLOWING,OBSERVING,LEADING,顾名思义不必解释了吧.

每台服务器将自己服务器的以上数据发送到集群中的其他服务器之后,同样的也需要接收来自其他服务器的数据,它将做以下的处理:

1) 如果所接收数据服务器的状态还是在选举阶段(LOOKING 状态),那么首先判断逻辑时钟值,又分为以下三种情况:
a) 如果发送过来的逻辑时钟大于目前的逻辑时钟,那么说明这是更新的一次选举,此时需要更新一下本机的逻辑时钟值,同时将之前收集到的来自其他服务器的选举清空,因为这些数据已经不再有效了.然后判断是否需要更新当前自己的选举情况.在这里是根据选举leader id,保存的最大数据id来进行判断的,这两种数据之间对这个选举结果的影响的权重关系是:首先看数据id,数据id大者胜出;其次再判断leader id,leader id大者胜出.然后再将自身最新的选举结果(也就是上面提到的三种数据广播给其他服务器).代码如下:

1
2
3
4
5
6
7
8
9
if (n.epoch > logicalclock) {  
logicalclock = n.epoch;
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid,getInitId(), getInitLastLoggedZxid()))
updateProposal(n.leader, n.zxid);
else
updateProposal(getInitId(),getInitLastLoggedZxid());

sendNotifications();

其中的totalOrderPredicate函数就是根据发送过来的封包中的leader id,数据id来与本机保存的相应数据进行判断的函数,返回true说明需要更新数据,于是调用updateProposal函数更新数据
b) 发送过来数据的逻辑时钟小于本机的逻辑时钟
说明对方在一个相对较早的选举进程中,这里只需要将本机的数据发送过去就是了

c) 两边的逻辑时钟相同,此时也只是调用totalOrderPredicate函数判断是否需要更新本机的数据,如果更新了再将自己最新的选举结果广播出去就是了.

实际上,在处理选票之前,还有一个预处理的动作,它发生在刚刚接收到关于vote的message的时候,具体过程如下:

1.判断message的来源是不是observer,如果是,则告诉该observer我当前认为的Leader的信息,否则进入2
2.判断message是不是vote信息,是则进入3
3.根据message创建一张vote
4.如果当前server处理LOOKING状态,将vote放入自己的投票箱,而且如果vote源server处于LOOKING状态同时vote源server的选举时旧的,则当前server通知它新的一轮投票;
5如果当前server不处于LOOKING状态而vote源server处理LOOKING状态,则当前server告诉它当前的Leader信息。

三种情况的处理完毕之后,再处理两种情况:

1)服务器判断是不是已经收集到了所有服务器的选举状态,如果是那么根据选举结果设置自己的角色(FOLLOWING还是LEADER),然后退出选举过程就是了.

2)即使没有收集到所有服务器的选举状态,也可以判断一下根据以上过程之后最新的选举leader是不是得到了超过半数以上服务器的支持,如果是,那么尝试在200ms内接收一下数据,如果没有新的数据到来,说明大家都已经默认了这个结果,同样也设置角色退出选举过程.
代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
if(self.getVotingView().containsKey(n.sid)){  
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));

//If have received from all nodes, then terminate
if((self.getVotingView().size() == recvset.size()) && (self.getQuorumVerifier().getWeight(proposedLeader) != 0)){
self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());
leaveInstance();
return new Vote(proposedLeader, proposedZxid);

} else if (termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock))) {

// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid,proposedLeader, proposedZxid)){
recvqueue.put(n);
break;
}
}

/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());
if(LOG.isDebugEnabled()){
LOG.debug("About to leave FLE instance: Leader= " + proposedLeader + ", Zxid = " + proposedZxid + ", My id = " + self.getId() + ", My state = " + self.getPeerState());
}

leaveInstance();
return new Vote(proposedLeader,proposedZxid);
}
}
}

2) 如果所接收服务器不在选举状态,也就是在FOLLOWING或者LEADING状态
做以下两个判断:
a) 如果逻辑时钟相同,将该数据保存到recvset,如果所接收服务器宣称自己是leader,那么将判断是不是有半数以上的服务器选举它,如果是则设置选举状态退出选举过程
b) 否则这是一条与当前逻辑时钟不符合的消息,那么说明在另一个选举过程中已经有了选举结果,于是将该选举结果加入到outofelection集合中,再根据outofelection来判断是否可以结束选举,如果可以也是保存逻辑时钟,设置选举状态,退出选举过程.
代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
if(n.epoch == logicalclock){  
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
if((n.state == ServerState.LEADING) || (termPredicate(recvset, new Vote(n.leader,n.zxid, n.epoch, n.state))&& checkLeader(outofelection, n.leader, n.epoch)) ){
self.setPeerState((n.leader == self.getId()) ?ServerState.LEADING: learningState());
leaveInstance();
return new Vote(n.leader, n.zxid);
}
}

outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.epoch, n.state));

if(termPredicate(outofelection, new Vote(n.leader,n.zxid, n.epoch, n.state))&& checkLeader(outofelection, n.leader, n.epoch)) {
synchronized(this){
logicalclock = n.epoch;
self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState());
}
leaveInstance();
return new Vote(n.leader, n.zxid);
}

break;
}
}

以一个简单的例子来说明整个选举的过程.
假设有五台服务器组成的zookeeper集群,它们的id从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的.假设这些服务器依序启动,来看看会发生什么.

1
2
3
4
5
1) 服务器1启动,此时只有它一台服务器启动了,它发出去的报没有任何响应,所以它的选举状态一直是LOOKING状态  
2) 服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1,2还是继续保持LOOKING状态.
3) 服务器3启动,根据前面的理论分析,服务器3成为服务器1,2,3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的leader.
4) 服务器4启动,根据前面的分析,理论上服务器4应该是服务器1,2,3,4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接收当小弟的命了.
5) 服务器5启动,同4一样,当小弟.

送过来的逻辑时钟大于目前的逻辑时钟,那么说明这是更新的一次选举,此时需要更新一下本机的逻辑时钟值,同时将之前收集到的来自其他服务器的选举清空,因为这些数据已经不再有效了.然后判断是否需要更新当前自己的选举情况.在这里是根据选举leader id,保存的最大数据id来进行判断的,这两种数据之间对这个选举结果的影响的权重关系是:首先看数据id,数据id大者胜出;其次再判断leader id,leader id大者胜出.然后再将自身最新的选举结果(也就是上面提到的三种数据广播给其他服务器).代码如下:

leader选举

在分布式计算中, leader election是很重要的一个功能, 这个选举过程是这样子的: 指派一个进程作为组织者,将任务分发给各节点。 在任务开始前, 哪个节点都不知道谁是leader或者coordinator. 当选举算法开始执行后, 每个节点最终会得到一个唯一的节点作为任务leader.
除此之外, 选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来。

Curator 有两种选举recipe, 你可以根据你的需求选择合适的。

Leader latch

1
2
public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath, String id)

必须启动LeaderLatch: leaderLatch.start();

一旦启动, LeaderLatch会和其它使用相同latch path的其它LeaderLatch交涉,然后随机的选择其中一个作为leader。 你可以随时查看一个给定的实例是否是leader:

public boolean hasLeadership()

类似JDK的CountDownLatch, LeaderLatch在请求成为leadership时有block方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void await()
throws InterruptedException,
EOFException
Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.
public boolean await(long timeout,
TimeUnit unit)
throws InterruptedException
```

一旦不使用LeaderLatch了,必须调用close方法。 如果它是leader,会释放leadership, 其它的参与者将会选举一个leader。

异常处理
LeaderLatch实例可以增加ConnectionStateListener来监听网络连接问题。 当 SUSPENDED 或 LOST 时, leader不再认为自己还是leader.当LOST 连接重连后 RECONNECTED,LeaderLatch会删除先前的ZNode然后重新创建一个.
LeaderLatch用户必须考虑导致leadershi丢失的连接问题。 强烈推荐你使用ConnectionStateListener。

下面看例子:

package com.colobu.zkrecipe.leaderelection;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import com.google.common.collect.Lists;
public class LeaderLatchExample {
private static final int CLIENT_QTY = 10;
private static final String PATH = “/examples/leader”;
public static void main(String[] args) throws Exception {
List clients = Lists.newArrayList();
List examples = Lists.newArrayList();
TestingServer server = new TestingServer();
try {
for (int i = 0; i < CLIENT_QTY; ++i) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
clients.add(client);
LeaderLatch example = new LeaderLatch(client, PATH, “Client #” + i);
examples.add(example);
client.start();
example.start();
}
Thread.sleep(20000);
LeaderLatch currentLeader = null;
for (int i = 0; i < CLIENT_QTY; ++i) {
LeaderLatch example = examples.get(i);
if (example.hasLeadership())
currentLeader = example;
}
System.out.println(“current leader is “ + currentLeader.getId());
System.out.println(“release the leader “ + currentLeader.getId());
currentLeader.close();
examples.get(0).await(2, TimeUnit.SECONDS);
System.out.println(“Client #0 maybe is elected as the leader or not although it want to be”);
System.out.println(“the new leader is “ + examples.get(0).getLeader().getId());

        System.out.println("Press enter/return to quit\n");
        new BufferedReader(new InputStreamReader(System.in)).readLine();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        System.out.println("Shutting down...");
        for (LeaderLatch exampleClient : examples) {
            CloseableUtils.closeQuietly(exampleClient);
        }
        for (CuratorFramework client : clients) {
            CloseableUtils.closeQuietly(client);
        }
        CloseableUtils.closeQuietly(server);
    }
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

首先我们创建了10个LeaderLatch,启动后它们中的一个会被选举为leader。 因为选举会花费一些时间,start后并不能马上就得到leader。
通过hasLeadership查看自己是否是leader, 如果是的话返回true。
可以通过.getLeader().getId()可以得到当前的leader的ID。
只能通过close释放当前的领导权。
await是一个阻塞方法, 尝试获取leader地位,但是未必能上位。

Leader Election

Curator还提供了另外一种选举方法。
注意涉及以下四个类:

LeaderSelector
LeaderSelectorListener
LeaderSelectorListenerAdapter
CancelLeadershipException
重要的是LeaderSelector类,它的构造函数为:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

1
2
3
4
5
6
7
8
9
10
11
12
13

类似LeaderLatch,必须start: leaderSelector.start();

一旦启动,当实例取得领导权时你的listener的takeLeadership()方法被调用. 而takeLeadership()方法只有领导权被释放时才返回。
当你不再使用LeaderSelector实例时,应该调用它的close方法。

异常处理
LeaderSelectorListener类继承ConnectionStateListener.LeaderSelector必须小心连接状态的改变. 如果实例成为leader, 它应该相应SUSPENDED 或 LOST. 当 SUSPENDED 状态出现时, 实例必须假定在重新连接成功之前它可能不再是leader了。 如果LOST状态出现, 实例不再是leader, takeLeadership方法返回.

重要: 推荐处理方式是当收到SUSPENDED 或 LOST时抛出CancelLeadershipException异常. 这会导致LeaderSelector实例中断并取消执行takeLeadership方法的异常. 这非常重要, 你必须考虑扩展LeaderSelectorListenerAdapter. LeaderSelectorListenerAdapter提供了推荐的处理逻辑。

这个例子摘自官方。
首先创建一个ExampleClient类, 它继承LeaderSelectorListenerAdapter, 它实现了takeLeadership方法:

package com.colobu.zkrecipe.leaderelection;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ExampleClient extends LeaderSelectorListenerAdapter implements Closeable {
private final String name;
private final LeaderSelector leaderSelector;
private final AtomicInteger leaderCount = new AtomicInteger();
public ExampleClient(CuratorFramework client, String path, String name) {
this.name = name;
leaderSelector = new LeaderSelector(client, path, this);
leaderSelector.autoRequeue();
}
public void start() throws IOException {
leaderSelector.start();
}
@Override
public void close() throws IOException {
leaderSelector.close();
}

@Override
public void takeLeadership(CuratorFramework client) throws Exception {
    final int waitSeconds = (int) (5 * Math.random()) + 1;
    System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
    System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
    try {
        Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
    } catch (InterruptedException e) {
        System.err.println(name + " was interrupted.");
        Thread.currentThread().interrupt();
    } finally {
        System.out.println(name + " relinquishing leadership.\n");
    }
}

}

1
2
3
4
5
6
7


你可以在takeLeadership进行任务的分配等等,并且不要返回,如果你想要要此实例一直是leader的话可以加一个死循环。
leaderSelector.autoRequeue();保证在此实例释放领导权之后还可能获得领导权。
在这里我们使用AtomicInteger来记录此client获得领导权的次数, 它是"fair", 每个client有平等的机会获得领导权。

测试代码:

package com.colobu.zkrecipe.leaderelection;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import com.google.common.collect.Lists;
public class LeaderSelectorExample {
private static final int CLIENT_QTY = 10;
private static final String PATH = “/examples/leader”;
public static void main(String[] args) throws Exception {
List clients = Lists.newArrayList();
List examples = Lists.newArrayList();
TestingServer server = new TestingServer();
try {
for (int i = 0; i < CLIENT_QTY; ++i) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
clients.add(client);
ExampleClient example = new ExampleClient(client, PATH, “Client #” + i);
examples.add(example);
client.start();
example.start();
}

        System.out.println("Press enter/return to quit\n");
        new BufferedReader(new InputStreamReader(System.in)).readLine();
    } finally {
        System.out.println("Shutting down...");
        for (ExampleClient exampleClient : examples) {
            CloseableUtils.closeQuietly(exampleClient);
        }
        for (CuratorFramework client : clients) {
            CloseableUtils.closeQuietly(client);
        }
        CloseableUtils.closeQuietly(server);
    }
}

}

```

与LeaderLatch, 通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。 而LeaderLatch一根筋到死, 除非调用close方法,否则它不会释放领导权。

zhang dong wechat
关注我的微信来交流技术问题吧!