zookeeper源码分析-选举算法

欢迎查看Eetal的第二十五篇博客–zookeeper源码分析-选举算法

入口类

启动zookeeper后输入java命令jps,查看到线程运行的主类为QuoRumPeerMain
QuoRumPeerMain
QuoRumPeerMain的启动方法根据启动命令传递的参数,读取配置文件去解析,根据是单机还是集群使用不同的方法运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}

// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();

//如果是集群模式,使用runFromConfig方法,否则使用ZooKeeperServerMain.main方法运行
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}

集群运行

runFromConfig方法根据传递进来的配置信息,绑定成员quorumPeer,加载属性
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false)这一句代码加载的QuorumVerifier包含了从配置文件解析来的集群中所有成员的信息对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public void runFromConfig(QuorumPeerConfig config)
throws IOException, AdminServerException
{
......

quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}

// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();

quorumPeer.start();
quorumPeer.join();
......
}

1
2
3
4
5
6
7
8
9
10
11
	public interface QuorumVerifier {
long getWeight(long id);
boolean containsQuorum(Set<Long> set);
long getVersion();
void setVersion(long ver);
Map<Long, QuorumServer> getAllMembers();//全部集群成员
Map<Long, QuorumServer> getVotingMembers();//参与投票成员
Map<Long, QuorumServer> getObservingMembers();//观察者成员
boolean equals(Object o);
String toString();
}

接下来调用start方法开始运行QuorumPeer线程成员
start方法被重写了,在开始运行前做了一些处理

1
2
3
4
5
6
7
8
9
10
11
12
  @Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
//getView是根据前面配置信息解析设置的QuorumVerifier,获取全部集群机器的id和ip端口信息
//此处判断当前机器是否在集群里
throw new RuntimeException("My id " + myid + " not in the peer list");
}
loadDataBase();//从数据日志文件恢复数据到内存
......
startLeaderElection();//开启选举
super.start();//线程类的start方法,开始运行run
}

选举运行,此时刚进入集群节点状态为LOOKING,第一票投票给自己

1
2
3
public class QuorumPeerConfig {
protected int electionAlg = 3;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
synchronized public void startLeaderElection() {
......
if (getPeerState() == ServerState.LOOKING) {
//投票给自己
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
......
this.electionAlg = createElectionAlgorithm(electionType);//挑选选举算法---配置文件不设置默认为3
}
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;

//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}

zookeeper依赖jmx去实现集群操作,会将自身的运行线程quorumPeer封装为一个QuorumBean,通过MBeanRegistry注册到jmx上
MBeanRegistry是zookeeper封装了对MBeanServer的一些操作的一个类,主要完成对objectName的自动生成
MBeanRegistry注册时要指定parent,根据parent会自动生成objectName
rmi与MBean相关的源码分析请参照本人的另外两篇博客:
1.java-rmi源码解析
2.MBean与JMX

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class MBeanRegistry {
private static final Logger LOG = LoggerFactory.getLogger(MBeanRegistry.class);

private static volatile MBeanRegistry instance = new MBeanRegistry();

private final Object LOCK = new Object();

private Map<ZKMBeanInfo, String> mapBean2Path =
new ConcurrentHashMap<ZKMBeanInfo, String>();

private MBeanServer mBeanServer;

/**
* Useful for unit tests. Change the MBeanRegistry instance
*
* @param instance new instance
*/
public static void setInstance(MBeanRegistry instance) {
MBeanRegistry.instance = instance;
}

public static MBeanRegistry getInstance() {
return instance;
}

public MBeanRegistry () {
try {
mBeanServer = ManagementFactory.getPlatformMBeanServer();
} catch (Error e) {
// Account for running within IKVM and create a new MBeanServer
// if the PlatformMBeanServer does not exist.
mBeanServer = MBeanServerFactory.createMBeanServer();
}
}

/**
* Return the underlying MBeanServer that is being
* used to register MBean's. The returned MBeanServer
* may be a new empty MBeanServer if running through IKVM.
*/
public MBeanServer getPlatformMBeanServer() {
return mBeanServer;
}

/**
* Registers a new MBean with the platform MBean server.
* @param bean the bean being registered
* @param parent if not null, the new bean will be registered as a child
* node of this parent.
*/
public void register(ZKMBeanInfo bean, ZKMBeanInfo parent)
throws JMException
{
assert bean != null;
String path = null;
if (parent != null) {
path = mapBean2Path.get(parent);
assert path != null;
}
path = makeFullPath(path, parent);
if(bean.isHidden())
return;
ObjectName oname = makeObjectName(path, bean);
try {
synchronized (LOCK) {
mBeanServer.registerMBean(bean, oname);
mapBean2Path.put(bean, path);
}
} catch (JMException e) {
LOG.warn("Failed to register MBean " + bean.getName());
throw e;
}
}

run方法细节,将本地的jmxQuorumBean注册到jmx,并且根据读取配置文件的集群信息,把每个远程节点的信息封装以后,以本机节点jmxQuorumBean为parent,注册到Jmx上
接下来因为状态为欸LOOKING,进入选举方法
setCurrentVote(makeLEStrategy().lookForLeader());

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
@Override
public void run() {
updateThreadName();

jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for(QuorumServer s: getView().values()){
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
}
} else {
RemotePeerBean rBean = new RemotePeerBean(this, s);
try {
MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
jmxRemotePeerBean.put(s.id, rBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
}



/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");

if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");

// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk =
new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);

// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
.....
}
......
}

根据前面分析得知,使用的默认选举策略为FastLeaderElection
FastLeaderElection构造时会实例化一个Messager
Messenger实例化时会创建两个线程,一个负责接收消息,一个负责发送消息启动两个线程,一个负责接收消息,一个负责发送消息
而FastLeaderElection对象前面挑选选举算法时,构建完成就会调用start方法,对应会启动两个工作线程开始接收和发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
   public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
this.stop = false;
this.manager = manager;
starter(self, manager);
}
LinkedBlockingQueue<ToSend> sendqueue;
LinkedBlockingQueue<Notification> recvqueue;
/**
* This method is invoked by the constructor. Because it is a
* part of the starting procedure of the object that must be on
* any constructor of this class, it is probably best to keep as
* a separate method. As we have a single constructor currently,
* it is not strictly necessary to have it separate.
*
* @param self QuorumPeer that created this object
* @param manager Connection manager
*/
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;

sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
}

Messenger(QuorumCnxManager manager) {

this.ws = new WorkerSender(manager);

this.wsThread = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
this.wsThread.setDaemon(true);

this.wr = new WorkerReceiver(manager);

this.wrThread = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
this.wrThread.setDaemon(true);
}

/**
* Starts instances of WorkerSender and WorkerReceiver
*/
void start(){
this.wsThread.start();
this.wrThread.start();
}

该策略为搜寻收到的消息,如果收到的消息的epoch大于当前epoch(朝代),更新本地保存的朝代,直接清空所有的接受消息,对比消息投票属性和自己的初始化属性,如果对方胜出,则将投票改为选举对方挑选的leader,否则投票给自己(初始化时的id-myid)
以下为对比方法(epoch都是peerEpoch)
((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))))
胜出条件:
1.朝代大于当前朝代
2.朝代相等,并且zxid大的 或者 (zxid相等并且id更大的)———-因为zxid代表事务id,更大相当于对方处理了更多的leader消息
当没有新的投票消息时,则继续发送自己的投票消息(sendNotification方法实际只是将消息加入阻塞队列,真正进行发送处理的是另一个WorkerSender线程子类实例,该线程循环获取阻塞队列的消息,进行发送,对应的接受消息是WorkerReader线程子类实例)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();
}
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

int notTimeout = finalizeWait;

synchronized(this){
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}

LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
sendNotifications();

/*
* Loop in which we exchange notifications until we find a leader
*/

while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);

/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){
if(manager.haveDelivered()){
//如果全部发送完了,再次发送一次投票信息
sendNotifications();
} else {
//没发送完,可能出现故障,重新连接每一台集群中的机器
manager.connectAll();
}

/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if (validVoter(n.sid) && validVoter(n.leader)) {
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view for a replica in the current or next voting view.
*/
switch (n.state) {
case LOOKING:
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}

if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}

// don't care about the version if it's in LOOKING state
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {

// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}

/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
......
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}",
manager.getConnectionThreadCount());
}
}

投票以后会进入选举是否完成的判断,如果确定选举有了结果就会设置自身的状态并结束选举

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {

// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}

/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}


protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}

/*
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey());
}
}

return voteSet.hasAllQuorums();
}

SyncedLearnerTracker中投票结果的判定是这样实现的

1
2
3
4
5
6
7
public boolean hasAllQuorums() {
for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))
return false;
}
return true;
}

而 在QuorumPeerConfig加载配置文件解析时就实例化,根据使用带权重的还是不带权重,分别使用QuorumHierarchical和QuorumMaj

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
   void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)
throws IOException, ConfigException {
quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
setupMyId();
setupClientPort();
setupPeerType();
checkValidity();
}
public static QuorumVerifier parseDynamicConfig(Properties dynamicConfigProp, int eAlg, boolean warnings,
boolean configBackwardCompatibilityMode) throws IOException, ConfigException {
boolean isHierarchical = false;
for (Entry<Object, Object> entry : dynamicConfigProp.entrySet()) {
String key = entry.getKey().toString().trim();
if (key.startsWith("group") || key.startsWith("weight")) {
isHierarchical = true;
} else if (!configBackwardCompatibilityMode && !key.startsWith("server.") && !key.equals("version")){
LOG.info(dynamicConfigProp.toString());
throw new ConfigException("Unrecognised parameter: " + key);
}
}

QuorumVerifier qv = createQuorumVerifier(dynamicConfigProp, isHierarchical);

int numParticipators = qv.getVotingMembers().size();
int numObservers = qv.getObservingMembers().size();
if (numParticipators == 0) {
if (!standaloneEnabled) {
throw new IllegalArgumentException("standaloneEnabled = false then " +
"number of participants should be >0");
}
if (numObservers > 0) {
throw new IllegalArgumentException("Observers w/o participants is an invalid configuration");
}
} else if (numParticipators == 1 && standaloneEnabled) {
// HBase currently adds a single server line to the config, for
// b/w compatibility reasons we need to keep this here. If standaloneEnabled
// is true, the QuorumPeerMain script will create a standalone server instead
// of a quorum configuration
LOG.error("Invalid configuration, only one server specified (ignoring)");
if (numObservers > 0) {
throw new IllegalArgumentException("Observers w/o quorum is an invalid configuration");
}
} else {
if (warnings) {
if (numParticipators <= 2) {
LOG.warn("No server failure will be tolerated. " +
"You need at least 3 servers.");
} else if (numParticipators % 2 == 0) {
LOG.warn("Non-optimial configuration, consider an odd number of servers.");
}
}
/*
* If using FLE, then every server requires a separate election
* port.
*/
if (eAlg != 0) {
for (QuorumServer s : qv.getVotingMembers().values()) {
if (s.electionAddr == null)
throw new IllegalArgumentException(
"Missing election port for server: " + s.id);
}
}
}
return qv;
}
private static QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp, boolean isHierarchical) throws ConfigException{
if(isHierarchical){
return new QuorumHierarchical(dynamicConfigProp);
} else {
/*
* The default QuorumVerifier is QuorumMaj
*/
//LOG.info("Defaulting to majority quorums");
return new QuorumMaj(dynamicConfigProp);
}
}

简单分析QuorumMaj,其判断作为leader的条件就是得票数超过一半

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public QuorumMaj(Properties props) throws ConfigException {
for (Entry<Object, Object> entry : props.entrySet()) {
String key = entry.getKey().toString();
String value = entry.getValue().toString();

if (key.startsWith("server.")) {
int dot = key.indexOf('.');
long sid = Long.parseLong(key.substring(dot + 1));
QuorumServer qs = new QuorumServer(sid, value);
allMembers.put(Long.valueOf(sid), qs);
if (qs.type == LearnerType.PARTICIPANT)
votingMembers.put(Long.valueOf(sid), qs);
else {
observingMembers.put(Long.valueOf(sid), qs);
}
} else if (key.equals("version")) {
version = Long.parseLong(value, 16);
}
}
half = votingMembers.size() / 2;
}
public boolean containsQuorum(Set<Long> ackSet) {
return (ackSet.size() > half);
}

更多精彩内容

请移步

个人主页: yangyitao.top