基于zookeeper实现分布式锁

欢迎查看Eetal的第二十七篇博客–基于zookeeper实现分布式锁

分布式锁设计

zookeeper的临时节点在会话断开后,则会自动删除,这个特性使其可以非常好的应用在分布式锁
而如果直接使用某一个path的临时节点作为分布式锁,会导致”惊群效应“(当锁每次释放都会唤醒全部竞争节点,而实际只能有一个竞争到)
所以结合有序节点进行优化,对于某一个特定的节点RootNode创建临时有序子节点
当一个客户端要获取锁时,就会新增一个临时有序子节点
在尝试获取锁时进行判断,查看RootNode当前的所有子节点信息,判断自己是否是最小的子节点(因为zookeeper的有序节点会从1递增)
如果是最小的子节点,则获得锁,在释放锁时,删除自己创建的这个临时有序节点
如果不是最小节点,代表还有别人在占用锁,则注册一个事件监听子节点中节点序号刚好小于自己的节点的删除事件,当监听到事件时,则代表获取锁成功
因为是临时节点,保证了出现某一台客户端占据锁但是断开连接仍然会删除其创建的临时节点
通过这个设计避免了”惊群效应“,某一个锁的释放只会触发刚好晚于他创建临时有序节点的的下一个等待锁的客户端的事件,由其获得锁

demo源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* 基于zk实现分布式锁
*/
public class DistributedLock implements Watcher {
String ROOT_LOCK;//作为锁的命名空间标识
String WAIT_LOCK;//正在等待的锁的路径
String CURRENT_LOCK;//当前请求对应的锁
ZooKeeper zk;
CountDownLatch countDownLatch;

public DistributedLock(ZooKeeper zk,String ROOT_LOCK)throws Exception{
this.zk = zk;
this.ROOT_LOCK = ROOT_LOCK;
if(zk.exists(ROOT_LOCK,false) == null){
throw new Exception("RootLock node not exists!");
}
}

public boolean tryLock() {
try {
CURRENT_LOCK = zk.create(ROOT_LOCK+"/","0".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> children = zk.getChildren(ROOT_LOCK,false);
SortedSet<String> sortChildren = new TreeSet<String>();
sortChildren.addAll(children);
//获得排序小于CURRENT_LOCK的childNode的path
SortedSet<String> lessThanMe = sortChildren.headSet(CURRENT_LOCK);
if(lessThanMe.isEmpty())
return true;
else{
//存在小于自己的节点,获取失败
WAIT_LOCK = lessThanMe.last();
return false;
}
} catch (Exception e){
e.printStackTrace();
}
return false;
}

public void lock() {
while(!tryLock()){
//出现异常则继续tryLock
try {
if(zk.exists(WAIT_LOCK,this)!=null){
countDownLatch = new CountDownLatch(1);
countDownLatch.await();
//获得锁,结束
return;
}
} catch (Exception e){
e.printStackTrace();
}
}
}

public void unlock() {
try {
zk.delete(CURRENT_LOCK,-1);
} catch (Exception e){
e.printStackTrace();
}
}

public void process(WatchedEvent event) {
if(event.getType() == Event.EventType.NodeDeleted)
countDownLatch.countDown();
}

}

第三方工具也有提供基于zookeeper实现的更多分布式锁的设计,如curator的recipes工程

更多精彩内容

请移步

个人主页: yangyitao.top

zookeeper源码分析-事件注册及通信原理

欢迎查看Eetal的第二十六篇博客–zookeeper源码分析-事件注册及通信原理

zookeeper基本命令代码

依赖

1
2
3
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.5</version>

demo代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
final CountDownLatch coutnDownLatch = new CountDownLatch(1);
ZooKeeper zookeeper = new ZooKeeper("192.168.156.171:2181,192.168.156.171:2182,192.168.156.171:2183",
3000,
new Watcher() {

public void process(WatchedEvent event) {
//注册默认的watcher,当连接上时,会触发该watcher
//对该zookeeper实例的节点进行操作,watcher填true使用此默认watcher
System.out.println("defaultWatcher process eventType : "+event.getPath()+"-"+event.getType());
coutnDownLatch.countDown();
}

});
System.out.println(zookeeper.getState());//connecting---连接中
coutnDownLatch.await();
System.out.println(zookeeper.getState());//connected---已连接
//createMode 模式,PERSISTENT---持久化节点
Stat stat = new Stat();
/**
* 节点状态,包含version
* create、delete、setData语句会更新节点stat
* 并且create、delete子节点会更新父节点的stat
* 在create、getData语句传入stat,执行后会把最新的stat属性复制到传入的stat对象中
* setData、delete语句传入stat的version,通过乐观锁验证版本号保证数据同步,版本不对时抛出BadVersionException(如果version是-1,代表不进行版本验证)
* setData、exists语句返回值为最新的stat
*/
zookeeper.delete("/java",-1);
String path = zookeeper.create("/java", "2019-07-29".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,stat);//返回路径
System.out.println("path : "+path);
byte[] byteV = zookeeper.getData(path, null, stat);
System.out.println("version:"+stat.getVersion()+"-"+new String(byteV));
/**
*
* OPEN_ACL_UNSAFE---完全开放
* CREATOR_ALL_ACL---创建者拥有权限
* READ_ACL_UNSAFE---开放读权限
* ANYONE_ID_UNSAFE---有验证的用户就不被全部权限
* AUTH_IDS---只对指定用户开放
*/
stat = zookeeper.setData(path, "2019-07-30".getBytes(), stat.getVersion());
zookeeper.getData(path, true, stat);//true代表注册watcher,使用默认watcher即创建Zookeeper实例时注册的watcher,只响应一次事件
zookeeper.getData(path, new Watcher() {//注册专用的watcher,只响应一次事件
public void process(WatchedEvent event) {
}
}, stat);

zookeeper.setData(path, "2019-07-30".getBytes(), stat.getVersion());
byteV = zookeeper.getData(path, null, stat);
System.out.println("version:"+stat.getVersion()+"-"+new String(byteV));

源码分析

Zookeeper对于集群信息提供jmx的监控支持,详情欢迎查看本人另一篇博客——zookeeper源码分析-选举算法
而对于客户端连接使用的是netty做socket的io处理,实现类为ClientCnxnSocketNetty,还有另一个可选的nio实现类是ClientCnxnSocketNIO
Zookeeper构造函数,实例化最终会到下面的构造函数,根据默认配置,创建一个ZKWatchManager对象,将默认wathcer注入该wathcerManager的defaultWatcher成员
将该watcherManager以及包含了集群信息的zookeeper对象传递用于实例化一个ClientCnxn对象,并启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
protected ZKWatchManager defaultWatchManager() {
return new ZKWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
}
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly, HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {

if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
hostProvider = aHostProvider;

cnxn = createConnection(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}

// @VisibleForTesting
protected ClientCnxn createConnection(String chrootPath,
HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
boolean canBeReadOnly) throws IOException {
return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
watchManager, clientCnxnSocket, canBeReadOnly);
}

ClientCnxn在实例化时会实例化两个线程成员,作为发送消息的线程以及处理事件的线程
因为zookeeper内部使用netty所以发送消息的响应式异步的,当netty收到响应时,将响应封装为一个event,数据的封装返回在evetThread处理
要发送的包由一个阻塞队列保存,已发送待确认的包由一个链表保存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();//等待响应的队列

private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();//要发送的队列

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;

connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;

sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
this.clientConfig=zooKeeper.getClientConfig();
initRequestTimeout();
}

public void start() {
sendThread.start();
eventThread.start();
}

sendThread的处理逻辑为,当服务器状态为活跃时,循环检查计算心跳间隔时间,添加pin包到待发送队列,后面发送以保持连接
每一轮最后调用clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this)去开始传输

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {
// don't re-establish connection if we are closing
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}

if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}

if (sendAuthEvent) {
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
if (state == States.AUTH_FAILED) {
eventThread.queueEventOfDeath();
}
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}

if (to <= 0) {
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv()
+ "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}

// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}

clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
if (LOG.isDebugEnabled()) {
// closing so this is expected
LOG.debug("An exception was thrown while closing send thread for session 0x"
+ Long.toHexString(getSessionId())
+ " : " + e.getMessage());
}
break;
} else {
// this is ugly, you have a better way speak up
if (e instanceof SessionExpiredException) {
LOG.info(e.getMessage() + ", closing socket connection");
} else if (e instanceof SessionTimeoutException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof EndOfStreamException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof RWServerFoundException) {
LOG.info(e.getMessage());
} else if (e instanceof SocketException) {
LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage());
} else {
LOG.warn("Session 0x{} for server {}, unexpected error{}",
Long.toHexString(getSessionId()),
serverAddress,
RETRY_CONN_MSG,
e);
}
// At this point, there might still be new packets appended to outgoingQueue.
// they will be handled in next connection or cleared up if closed.
cleanAndNotifyState();
}
}
}
synchronized (state) {
// When it comes to this point, it guarantees that later queued
// packet to outgoingQueue will be notified of death.
cleanup();
}
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Closed, null));
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x"
+ Long.toHexString(getSessionId()));
}

netty相关的类ClientCnxnSocketNetty中对doTransport的实现为:
从outgoingQueue里取出一个待发送的packet,如果不为空就调用doWrite方法去发送数据包,如果当前连接已断开,将包再次加入待发送队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void doTransport(int waitTimeOut,
List<Packet> pendingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
try {
if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
return;
}
Packet head = null;
if (needSasl.get()) {
if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
return;
}
} else {
head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS);
}
// check if being waken up on closing.
if (!sendThread.getZkState().isAlive()) {
// adding back the packet to notify of failure in conLossPacket().
addBack(head);
return;
}
// channel disconnection happened
if (disconnected.get()) {
addBack(head);
throw new EndOfStreamException("channel for sessionid 0x"
+ Long.toHexString(sessionId)
+ " is lost");
}
if (head != null) {
doWrite(pendingQueue, head, cnxn);
}
} finally {
updateNow();
}
}

doWrite方法会将除了outgoingQueue里的唤醒包和pin包以及链接登陆的auth包以外的全部包和调用方法带过来的packet加入事件队列
(因为pin包和auth包实际没有需要返回的数据信息response回调用方,pin包无处理,auth包只需要失败时的一个响应,收到auth包的失败信息,添加一个authFailed的事件到事件队列,源码里也是这么做的*在sendThread的readResponse方法)
并且outgoingQueue里的packet,除了唤醒包以外,pin包、auth包以及剩下的packet和调用方法带过来的packet都会被发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
updateNow();
boolean anyPacketsSent = false;
while (true) {
if (p != WakeupPacket.getInstance()) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != ZooDefs.OpCode.ping) &&
(p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
sendPktOnly(p);
anyPacketsSent = true;
}
if (outgoingQueue.isEmpty()) {
break;
}
p = outgoingQueue.remove();
}
// TODO: maybe we should flush in the loop above every N packets/bytes?
// But, how do we determine the right value for N ...
if (anyPacketsSent) {
channel.flush();
}
}

当netty的channel读到数据时,会调用sendThread的readResponse方法装载响应,最后会调用wakeupCnxn往outgoingQueue里添加唤醒包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
updateNow();
while (buf.isReadable()) {
if (incomingBuffer.remaining() > buf.readableBytes()) {
int newLimit = incomingBuffer.position()
+ buf.readableBytes();
incomingBuffer.limit(newLimit);
}
buf.readBytes(incomingBuffer);
incomingBuffer.limit(incomingBuffer.capacity());

if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount.getAndIncrement();
readLength();
} else if (!initialized) {
readConnectResult();
lenBuffer.clear();
incomingBuffer = lenBuffer;
initialized = true;
updateLastHeard();
} else {
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
wakeupCnxn();
// Note: SimpleChannelInboundHandler releases the ByteBuf for us
// so we don't need to do it.
}
private void wakeupCnxn() {
if (needSasl.get()) {
waitSasl.release();
}
outgoingQueue.add(WakeupPacket.getInstance());
}

sendThread的readResponse方法根据响应的数据报类型做处理
如果是pin包不做处理只做日志,并返回
如果是auth包,做日志处理,判断err码,如果auth失败了,就往事件队列里增加一个type为None,stat为AuthFailed的事件,并返回
如果是主动监听的服务端通知事件,创建一个WatcherEvent反序列化response的信息,并用一个WatchedEvent包装以后,加入事件队列,并返回
如果不满足以上的,先判断如果当前正在进行SASL身份验证,构造和立即发送响应数据包,而不是排队响应其他数据包
剩下的进行普通数据报处理,从pendingQueue的首部取出一个已发送待响应的packet(因为是从头拿,所以是最早发送的带响应packet)
判断取出的packet与读到的ReplyHeader数据的xid是否相等,不相等说明出现丢包,抛出异常
xid相等则将本次数据的response信息反序列化装载到packet的resonse成员
无论是否丢包,最终会调用finishPacket(packet)对packet做善后工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();

replyHdr.deserialize(bbia, "header");
if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x"
+ Long.toHexString(sessionId)
+ " after "
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
+ "ms");
}
return;
}
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
eventThread.queueEventOfDeath();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got auth sessionid:0x"
+ Long.toHexString(sessionId));
}
return;
}
if (replyHdr.getXid() == -1) {
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");

// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}

WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}

eventThread.queueEvent( we );
return;
}

// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
if (tunnelAuthInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia,"token");
zooKeeperSaslClient.respondToServer(request.getToken(),
ClientCnxn.this);
return;
}

Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
packet = pendingQueue.remove();
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+ replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet );
}

packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}

if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
finishPacket(packet);
}
}

在eventThread的queueEvent方法里,会调用watcher.materialize(event.getState(),event.getType(), event.getPath())识别事件
构建一个WatcherSetEventPair加入事件队列waitingEvents

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private final LinkedBlockingQueue<Object> waitingEvents =
new LinkedBlockingQueue<Object>();
public void queueEvent(WatchedEvent event) {
queueEvent(event, null);
}
private void queueEvent(WatchedEvent event,
Set<Watcher> materializedWatchers) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();
final Set<Watcher> watchers;
if (materializedWatchers == null) {
// materialize the watchers based on the event
watchers = watcher.materialize(event.getState(),
event.getType(), event.getPath());
} else {
watchers = new HashSet<Watcher>();
watchers.addAll(materializedWatchers);
}
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}

eventThread使用的watcher实际为Zookeeper类传递给ClientCnxn的watcherManager成员
也就是zookeeper本地注册watcher的对象

1
2
3
4
5
6
7
protected ClientCnxn createConnection(String chrootPath,
HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
boolean canBeReadOnly) throws IOException {
return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
watchManager, clientCnxnSocket, canBeReadOnly);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
   public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)
throws IOException {
this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
clientCnxnSocket, 0, new byte[16], canBeReadOnly);
}
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;

connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;

sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
this.clientConfig=zooKeeper.getClientConfig();
initRequestTimeout();
}

ZkWatcherManager使用三个集合成员存储不同事件类型的watcher,并设置一个默认的watcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
private boolean disableAutoWatchReset;

ZKWatchManager(boolean disableAutoWatchReset) {
this.disableAutoWatchReset = disableAutoWatchReset;
}

protected volatile Watcher defaultWatcher;

其materialize方法根据事件类型和路径信息,匹配到对应的watcher事件并返回事件集合,可以看到使用remove方法获得事件集合。这也是为什么注册的事件只响应一次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath)
{
Set<Watcher> result = new HashSet<Watcher>();

switch (type) {
case None:
result.add(defaultWatcher);
boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected;
synchronized(dataWatches) {
for(Set<Watcher> ws: dataWatches.values()) {
result.addAll(ws);
}
if (clear) {
dataWatches.clear();
}
}

synchronized(existWatches) {
for(Set<Watcher> ws: existWatches.values()) {
result.addAll(ws);
}
if (clear) {
existWatches.clear();
}
}

synchronized(childWatches) {
for(Set<Watcher> ws: childWatches.values()) {
result.addAll(ws);
}
if (clear) {
childWatches.clear();
}
}

return result;
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break;
case NodeChildrenChanged:
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
break;
case NodeDeleted:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
// XXX This shouldn't be needed, but just in case
synchronized (existWatches) {
Set<Watcher> list = existWatches.remove(clientPath);
if (list != null) {
addTo(list, result);
LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
}
}
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
break;
default:
String msg = "Unhandled watch event type " + type
+ " with state " + state + " on path " + clientPath;
LOG.error(msg);
throw new RuntimeException(msg);
}

return result;
}

说回readResponse最后对数据报的处理,在完成将数据报对应的本地注册事件加入事件队列以后,执行finishPacket方法
在finishPacket中,设置packet的finished标志位true
如果该packet没有绑定异步回调函数,则直接调用packet的notify唤醒(因为是异步发送,前面获得packet以后会陷入阻塞wait)
否则调用eventThread.queuePacket方法对响应后的数据包做进一步处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
p.watchRegistration.register(err);
}
// Add all the removed watch events to the event queue, so that the
// clients will be notified with 'Data/Child WatchRemoved' event type.
if (p.watchDeregistration != null) {
Map<EventType, Set<Watcher>> materializedWatchers = null;
try {
materializedWatchers = p.watchDeregistration.unregister(err);
for (Entry<EventType, Set<Watcher>> entry : materializedWatchers
.entrySet()) {
Set<Watcher> watchers = entry.getValue();
if (watchers.size() > 0) {
queueEvent(p.watchDeregistration.getClientPath(), err,
watchers, entry.getKey());
// ignore connectionloss when removing from local
// session
p.replyHeader.setErr(Code.OK.intValue());
}
}
} catch (KeeperException.NoWatcherException nwe) {
p.replyHeader.setErr(nwe.code().intValue());
} catch (KeeperException ke) {
p.replyHeader.setErr(ke.code().intValue());
}
}

if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}

eventThread的queuePacket方法先判断当前是否已经被要求结束,如果是,判断是否还没有停止运行,没有则将packet加入待处理队列,否则直接调用processEvent处理packet
(因为在设置wasKilled为true后,run方法里会将waitingEvents里事件处理完后才设置停止运行并结束线程)
如果wasKilled不为true,处于运行中,直接将packet加入waitingEvents

1
2
3
4
5
6
7
8
9
10
11

public void queuePacket(Packet packet) {
if (wasKilled) {
synchronized (waitingEvents) {
if (isRunning) waitingEvents.add(packet);
else processEvent(packet);
}
} else {
waitingEvents.add(packet);
}
}

eventThread的run方法,循环从waitingEvents里(事件队列,是个阻塞队列)取出待处理的事件
调用processEvent去处理事件,如果收到结束事件(eventOfDeath),设置wasKilled为true,在处理完waitingEvents的事件后,停止运行(isRunnig设置为false)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}

LOG.info("EventThread shut down for session: 0x{}",
Long.toHexString(getSessionId()));
}

在查看processEvent方法前,我们先回过头来看执行各个语句的源码

create

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public String create(final String path, byte data[], List<ACL> acl,
CreateMode createMode, Stat stat, long ttl)
throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath, createMode.isSequential());
EphemeralType.validateTTL(createMode, ttl);

final String serverPath = prependChroot(clientPath);

RequestHeader h = new RequestHeader();
setCreateHeader(createMode, h);
Create2Response response = new Create2Response();
if (acl != null && acl.size() == 0) {
throw new KeeperException.InvalidACLException();
}
Record record = makeCreateRecord(createMode, serverPath, data, acl, ttl);
ReplyHeader r = cnxn.submitRequest(h, record, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
if (cnxn.chrootPath == null) {
return response.getPath();
} else {
return response.getPath().substring(cnxn.chrootPath.length());
}
}

setData(带回调)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void setData(final String path, byte data[], int version,
StatCallback cb, Object ctx)
{
final String clientPath = path;
PathUtils.validatePath(clientPath);

final String serverPath = prependChroot(clientPath);

RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setData);
SetDataRequest request = new SetDataRequest();
request.setPath(serverPath);
request.setData(data);
request.setVersion(version);
SetDataResponse response = new SetDataResponse();
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
clientPath, serverPath, ctx, null);
}

getData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);

// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}

final String serverPath = prependChroot(clientPath);

RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
.....

可以看见都是先进行下路径的校验,然后生成一个RequestHeader,为请求头设置操作的类型
对于有接受stat参数的,最后会调用DataTree.copyStat(response.getStat(), stat)把最新的stat数据复制给传入的stat对象
根据类型生成不同的Record实现类,record接口提供两个方法,序列化与反序列化,一个Reqeust实现类实例以及一个Response实现类实例
当有回调函数时,将回调函数传入
最终都是调用ClientCnxn的submitRequest方法,获得ReplyHeader(实际的response数据会装载到传入的response对象里)
ReplyHeader只保存了是否出现失败,以及错误码和路径等信息,不保存实际返回的响应数据
submitRequest实际会调用queuePacket方法去发送给服务端,并获得及时响应的packet对象
因为是异步发送,后面会根据是否设置了超时时间选择调用wait阻塞等待或者等待超时设置replyHeader失败并返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
return submitRequest(h, request, response, watchRegistration, null);
}

public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration, watchDeregistration);
synchronized (packet) {
if (requestTimeout > 0) {
// Wait for request completion with timeout
waitForPacketFinish(r, packet);
} else {
// Wait for request completion infinitely
while (!packet.finished) {
packet.wait();
}
}
}
if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
sendThread.cleanAndNotifyState();
}
return r;
}

queuePacket实际将请求信息封装为一个packet,并将packet加入outgoingQueue队列,由sendThread去处理,返回packet对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) {
Packet packet = null;

// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
// The synchronized block here is for two purpose:
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
// later packet will be notified.
synchronized (state) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}

processEvent方法根据event的类型,执行不同操作
如果是本地监听事件,直接调用注册的各个watcher的process方法
如果是本地的callBack回调函数类型,则使用instanceOf判断以后,使用不同的回调函数强制转换去执行processResult(从目前的源码分析暂时没看到有使用,可能是nio的socket实现类,这里只分析了默认的实现类ClientCnxnSocketNetty)
如果是response类型,使用instanceOf判断以后,选择具体的实现类去执行processResult操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
} else if (event instanceof LocalCallback) {
LocalCallback lcb = (LocalCallback) event;
if (lcb.cb instanceof StatCallback) {
((StatCallback) lcb.cb).processResult(lcb.rc, lcb.path,
lcb.ctx, null);
} else if (lcb.cb instanceof DataCallback) {
((DataCallback) lcb.cb).processResult(lcb.rc, lcb.path,
lcb.ctx, null, null);
} else if (lcb.cb instanceof ACLCallback) {
((ACLCallback) lcb.cb).processResult(lcb.rc, lcb.path,
lcb.ctx, null, null);
} else if (lcb.cb instanceof ChildrenCallback) {
((ChildrenCallback) lcb.cb).processResult(lcb.rc,
lcb.path, lcb.ctx, null);
} else if (lcb.cb instanceof Children2Callback) {
((Children2Callback) lcb.cb).processResult(lcb.rc,
lcb.path, lcb.ctx, null, null);
} else if (lcb.cb instanceof StringCallback) {
((StringCallback) lcb.cb).processResult(lcb.rc,
lcb.path, lcb.ctx, null);
} else {
((VoidCallback) lcb.cb).processResult(lcb.rc, lcb.path,
lcb.ctx);
}
} else {
Packet p = (Packet) event;
int rc = 0;
String clientPath = p.clientPath;
if (p.replyHeader.getErr() != 0) {
rc = p.replyHeader.getErr();
}
if (p.cb == null) {
LOG.warn("Somehow a null cb got to EventThread!");
} else if (p.response instanceof ExistsResponse
|| p.response instanceof SetDataResponse
|| p.response instanceof SetACLResponse) {
StatCallback cb = (StatCallback) p.cb;
if (rc == 0) {
if (p.response instanceof ExistsResponse) {
cb.processResult(rc, clientPath, p.ctx,
((ExistsResponse) p.response)
.getStat());
} else if (p.response instanceof SetDataResponse) {
cb.processResult(rc, clientPath, p.ctx,
((SetDataResponse) p.response)
.getStat());
} else if (p.response instanceof SetACLResponse) {
cb.processResult(rc, clientPath, p.ctx,
((SetACLResponse) p.response)
.getStat());
}
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetDataResponse) {
DataCallback cb = (DataCallback) p.cb;
GetDataResponse rsp = (GetDataResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getData(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null,
null);
}
} else if (p.response instanceof GetACLResponse) {
ACLCallback cb = (ACLCallback) p.cb;
GetACLResponse rsp = (GetACLResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getAcl(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null,
null);
}
} else if (p.response instanceof GetChildrenResponse) {
ChildrenCallback cb = (ChildrenCallback) p.cb;
GetChildrenResponse rsp = (GetChildrenResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getChildren());
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetChildren2Response) {
Children2Callback cb = (Children2Callback) p.cb;
GetChildren2Response rsp = (GetChildren2Response) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getChildren(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof CreateResponse) {
StringCallback cb = (StringCallback) p.cb;
CreateResponse rsp = (CreateResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx,
(chrootPath == null
? rsp.getPath()
: rsp.getPath()
.substring(chrootPath.length())));
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof Create2Response) {
Create2Callback cb = (Create2Callback) p.cb;
Create2Response rsp = (Create2Response) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx,
(chrootPath == null
? rsp.getPath()
: rsp.getPath()
.substring(chrootPath.length())), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof MultiResponse) {
MultiCallback cb = (MultiCallback) p.cb;
MultiResponse rsp = (MultiResponse) p.response;
if (rc == 0) {
List<OpResult> results = rsp.getResultList();
int newRc = rc;
for (OpResult result : results) {
if (result instanceof ErrorResult
&& KeeperException.Code.OK.intValue() != (newRc = ((ErrorResult) result)
.getErr())) {
break;
}
}
cb.processResult(newRc, clientPath, p.ctx, results);
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.cb instanceof VoidCallback) {
VoidCallback cb = (VoidCallback) p.cb;
cb.processResult(rc, clientPath, p.ctx);
}
}
} catch (Throwable t) {
LOG.error("Caught unexpected throwable", t);
}
}

当在finishPacket执行完notify后,就会唤醒前面的对应函数,像是getData的,会判断返回头信息是否需要抛出异常
如果没有异常,则装载stat并调用response的getData获得服务端返回的数据并返回

1
2
3
4
5
6
7
8
9
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();

吐槽一下,zk里面使用netty做异步io,源码跳来跳去的,这博客1137行(–)

更多精彩内容

请移步

个人主页: yangyitao.top

zookeeper源码分析-选举算法

欢迎查看Eetal的第二十五篇博客–zookeeper源码分析-选举算法

入口类

启动zookeeper后输入java命令jps,查看到线程运行的主类为QuoRumPeerMain
QuoRumPeerMain
QuoRumPeerMain的启动方法根据启动命令传递的参数,读取配置文件去解析,根据是单机还是集群使用不同的方法运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}

// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();

//如果是集群模式,使用runFromConfig方法,否则使用ZooKeeperServerMain.main方法运行
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}

集群运行

runFromConfig方法根据传递进来的配置信息,绑定成员quorumPeer,加载属性
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false)这一句代码加载的QuorumVerifier包含了从配置文件解析来的集群中所有成员的信息对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public void runFromConfig(QuorumPeerConfig config)
throws IOException, AdminServerException
{
......

quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}

// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();

quorumPeer.start();
quorumPeer.join();
......
}

1
2
3
4
5
6
7
8
9
10
11
	public interface QuorumVerifier {
long getWeight(long id);
boolean containsQuorum(Set<Long> set);
long getVersion();
void setVersion(long ver);
Map<Long, QuorumServer> getAllMembers();//全部集群成员
Map<Long, QuorumServer> getVotingMembers();//参与投票成员
Map<Long, QuorumServer> getObservingMembers();//观察者成员
boolean equals(Object o);
String toString();
}

接下来调用start方法开始运行QuorumPeer线程成员
start方法被重写了,在开始运行前做了一些处理

1
2
3
4
5
6
7
8
9
10
11
12
  @Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
//getView是根据前面配置信息解析设置的QuorumVerifier,获取全部集群机器的id和ip端口信息
//此处判断当前机器是否在集群里
throw new RuntimeException("My id " + myid + " not in the peer list");
}
loadDataBase();//从数据日志文件恢复数据到内存
......
startLeaderElection();//开启选举
super.start();//线程类的start方法,开始运行run
}

选举运行,此时刚进入集群节点状态为LOOKING,第一票投票给自己

1
2
3
public class QuorumPeerConfig {
protected int electionAlg = 3;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
synchronized public void startLeaderElection() {
......
if (getPeerState() == ServerState.LOOKING) {
//投票给自己
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
......
this.electionAlg = createElectionAlgorithm(electionType);//挑选选举算法---配置文件不设置默认为3
}
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;

//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}

zookeeper依赖jmx去实现集群操作,会将自身的运行线程quorumPeer封装为一个QuorumBean,通过MBeanRegistry注册到jmx上
MBeanRegistry是zookeeper封装了对MBeanServer的一些操作的一个类,主要完成对objectName的自动生成
MBeanRegistry注册时要指定parent,根据parent会自动生成objectName
rmi与MBean相关的源码分析请参照本人的另外两篇博客:
1.java-rmi源码解析
2.MBean与JMX

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class MBeanRegistry {
private static final Logger LOG = LoggerFactory.getLogger(MBeanRegistry.class);

private static volatile MBeanRegistry instance = new MBeanRegistry();

private final Object LOCK = new Object();

private Map<ZKMBeanInfo, String> mapBean2Path =
new ConcurrentHashMap<ZKMBeanInfo, String>();

private MBeanServer mBeanServer;

/**
* Useful for unit tests. Change the MBeanRegistry instance
*
* @param instance new instance
*/
public static void setInstance(MBeanRegistry instance) {
MBeanRegistry.instance = instance;
}

public static MBeanRegistry getInstance() {
return instance;
}

public MBeanRegistry () {
try {
mBeanServer = ManagementFactory.getPlatformMBeanServer();
} catch (Error e) {
// Account for running within IKVM and create a new MBeanServer
// if the PlatformMBeanServer does not exist.
mBeanServer = MBeanServerFactory.createMBeanServer();
}
}

/**
* Return the underlying MBeanServer that is being
* used to register MBean's. The returned MBeanServer
* may be a new empty MBeanServer if running through IKVM.
*/
public MBeanServer getPlatformMBeanServer() {
return mBeanServer;
}

/**
* Registers a new MBean with the platform MBean server.
* @param bean the bean being registered
* @param parent if not null, the new bean will be registered as a child
* node of this parent.
*/
public void register(ZKMBeanInfo bean, ZKMBeanInfo parent)
throws JMException
{
assert bean != null;
String path = null;
if (parent != null) {
path = mapBean2Path.get(parent);
assert path != null;
}
path = makeFullPath(path, parent);
if(bean.isHidden())
return;
ObjectName oname = makeObjectName(path, bean);
try {
synchronized (LOCK) {
mBeanServer.registerMBean(bean, oname);
mapBean2Path.put(bean, path);
}
} catch (JMException e) {
LOG.warn("Failed to register MBean " + bean.getName());
throw e;
}
}

run方法细节,将本地的jmxQuorumBean注册到jmx,并且根据读取配置文件的集群信息,把每个远程节点的信息封装以后,以本机节点jmxQuorumBean为parent,注册到Jmx上
接下来因为状态为欸LOOKING,进入选举方法
setCurrentVote(makeLEStrategy().lookForLeader());

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
@Override
public void run() {
updateThreadName();

jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for(QuorumServer s: getView().values()){
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
}
} else {
RemotePeerBean rBean = new RemotePeerBean(this, s);
try {
MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
jmxRemotePeerBean.put(s.id, rBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
}



/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");

if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");

// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk =
new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);

// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
.....
}
......
}

根据前面分析得知,使用的默认选举策略为FastLeaderElection
FastLeaderElection构造时会实例化一个Messager
Messenger实例化时会创建两个线程,一个负责接收消息,一个负责发送消息启动两个线程,一个负责接收消息,一个负责发送消息
而FastLeaderElection对象前面挑选选举算法时,构建完成就会调用start方法,对应会启动两个工作线程开始接收和发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
   public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
this.stop = false;
this.manager = manager;
starter(self, manager);
}
LinkedBlockingQueue<ToSend> sendqueue;
LinkedBlockingQueue<Notification> recvqueue;
/**
* This method is invoked by the constructor. Because it is a
* part of the starting procedure of the object that must be on
* any constructor of this class, it is probably best to keep as
* a separate method. As we have a single constructor currently,
* it is not strictly necessary to have it separate.
*
* @param self QuorumPeer that created this object
* @param manager Connection manager
*/
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;

sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
}

Messenger(QuorumCnxManager manager) {

this.ws = new WorkerSender(manager);

this.wsThread = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
this.wsThread.setDaemon(true);

this.wr = new WorkerReceiver(manager);

this.wrThread = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
this.wrThread.setDaemon(true);
}

/**
* Starts instances of WorkerSender and WorkerReceiver
*/
void start(){
this.wsThread.start();
this.wrThread.start();
}

该策略为搜寻收到的消息,如果收到的消息的epoch大于当前epoch(朝代),更新本地保存的朝代,直接清空所有的接受消息,对比消息投票属性和自己的初始化属性,如果对方胜出,则将投票改为选举对方挑选的leader,否则投票给自己(初始化时的id-myid)
以下为对比方法(epoch都是peerEpoch)
((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))))
胜出条件:
1.朝代大于当前朝代
2.朝代相等,并且zxid大的 或者 (zxid相等并且id更大的)———-因为zxid代表事务id,更大相当于对方处理了更多的leader消息
当没有新的投票消息时,则继续发送自己的投票消息(sendNotification方法实际只是将消息加入阻塞队列,真正进行发送处理的是另一个WorkerSender线程子类实例,该线程循环获取阻塞队列的消息,进行发送,对应的接受消息是WorkerReader线程子类实例)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();
}
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

int notTimeout = finalizeWait;

synchronized(this){
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}

LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
sendNotifications();

/*
* Loop in which we exchange notifications until we find a leader
*/

while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);

/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){
if(manager.haveDelivered()){
//如果全部发送完了,再次发送一次投票信息
sendNotifications();
} else {
//没发送完,可能出现故障,重新连接每一台集群中的机器
manager.connectAll();
}

/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if (validVoter(n.sid) && validVoter(n.leader)) {
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view for a replica in the current or next voting view.
*/
switch (n.state) {
case LOOKING:
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}

if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}

// don't care about the version if it's in LOOKING state
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {

// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}

/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
......
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}",
manager.getConnectionThreadCount());
}
}

投票以后会进入选举是否完成的判断,如果确定选举有了结果就会设置自身的状态并结束选举

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {

// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}

/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}


protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}

/*
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey());
}
}

return voteSet.hasAllQuorums();
}

SyncedLearnerTracker中投票结果的判定是这样实现的

1
2
3
4
5
6
7
public boolean hasAllQuorums() {
for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))
return false;
}
return true;
}

而 在QuorumPeerConfig加载配置文件解析时就实例化,根据使用带权重的还是不带权重,分别使用QuorumHierarchical和QuorumMaj

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
   void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)
throws IOException, ConfigException {
quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode);
setupMyId();
setupClientPort();
setupPeerType();
checkValidity();
}
public static QuorumVerifier parseDynamicConfig(Properties dynamicConfigProp, int eAlg, boolean warnings,
boolean configBackwardCompatibilityMode) throws IOException, ConfigException {
boolean isHierarchical = false;
for (Entry<Object, Object> entry : dynamicConfigProp.entrySet()) {
String key = entry.getKey().toString().trim();
if (key.startsWith("group") || key.startsWith("weight")) {
isHierarchical = true;
} else if (!configBackwardCompatibilityMode && !key.startsWith("server.") && !key.equals("version")){
LOG.info(dynamicConfigProp.toString());
throw new ConfigException("Unrecognised parameter: " + key);
}
}

QuorumVerifier qv = createQuorumVerifier(dynamicConfigProp, isHierarchical);

int numParticipators = qv.getVotingMembers().size();
int numObservers = qv.getObservingMembers().size();
if (numParticipators == 0) {
if (!standaloneEnabled) {
throw new IllegalArgumentException("standaloneEnabled = false then " +
"number of participants should be >0");
}
if (numObservers > 0) {
throw new IllegalArgumentException("Observers w/o participants is an invalid configuration");
}
} else if (numParticipators == 1 && standaloneEnabled) {
// HBase currently adds a single server line to the config, for
// b/w compatibility reasons we need to keep this here. If standaloneEnabled
// is true, the QuorumPeerMain script will create a standalone server instead
// of a quorum configuration
LOG.error("Invalid configuration, only one server specified (ignoring)");
if (numObservers > 0) {
throw new IllegalArgumentException("Observers w/o quorum is an invalid configuration");
}
} else {
if (warnings) {
if (numParticipators <= 2) {
LOG.warn("No server failure will be tolerated. " +
"You need at least 3 servers.");
} else if (numParticipators % 2 == 0) {
LOG.warn("Non-optimial configuration, consider an odd number of servers.");
}
}
/*
* If using FLE, then every server requires a separate election
* port.
*/
if (eAlg != 0) {
for (QuorumServer s : qv.getVotingMembers().values()) {
if (s.electionAddr == null)
throw new IllegalArgumentException(
"Missing election port for server: " + s.id);
}
}
}
return qv;
}
private static QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp, boolean isHierarchical) throws ConfigException{
if(isHierarchical){
return new QuorumHierarchical(dynamicConfigProp);
} else {
/*
* The default QuorumVerifier is QuorumMaj
*/
//LOG.info("Defaulting to majority quorums");
return new QuorumMaj(dynamicConfigProp);
}
}

简单分析QuorumMaj,其判断作为leader的条件就是得票数超过一半

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public QuorumMaj(Properties props) throws ConfigException {
for (Entry<Object, Object> entry : props.entrySet()) {
String key = entry.getKey().toString();
String value = entry.getValue().toString();

if (key.startsWith("server.")) {
int dot = key.indexOf('.');
long sid = Long.parseLong(key.substring(dot + 1));
QuorumServer qs = new QuorumServer(sid, value);
allMembers.put(Long.valueOf(sid), qs);
if (qs.type == LearnerType.PARTICIPANT)
votingMembers.put(Long.valueOf(sid), qs);
else {
observingMembers.put(Long.valueOf(sid), qs);
}
} else if (key.equals("version")) {
version = Long.parseLong(value, 16);
}
}
half = votingMembers.size() / 2;
}
public boolean containsQuorum(Set<Long> ackSet) {
return (ackSet.size() > half);
}

更多精彩内容

请移步

个人主页: yangyitao.top

zookeeper协调原理

欢迎查看Eetal的第二十四篇博客–zookeeper协调原理

CAP原理

CAP原则又称CAP定理,指的是在一个分布式系统中,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可兼得。
一致性表示分布式系统中各个节点数据的一致性
可用性代表数据访问的高性能
分区容错性指的是因为同步的时间问题,数据不一致导致出现了多个不同数据版本的分区现象,但系统仍能继续正常运行(容错)
很显然,三者最多只能取其二
分区容错性与一致性共存(同步需要阻塞)必会与可用性冲突
分区容错性与可用性共存(数据不同步)必会与一致性共存冲突
可用性与一致性共存必会与分区容错性冲突(实际上这个是不实际的需求,因为分布式环境下因为网络通信的延迟分区容错性是必要的)
综上,大部分分布式架构都是实现数据的最终一致性而非实现强一致性(因为分区容错性的必然存在)
zookeeper的zap协议就是对2pc进一步提高分区容错性与可用性而降低强一致性的一种协议,同时其保证最终一致性,所以在分布式环境下仍是可用的

事务请求

所有事务请求(增删改)都会转发到leader由leader处理
zookeeper内部是依赖一个改进版的2pc——ZAB协议

传统2pc

传统2pc分为两个阶段preCommit和commit阶段
preCommit阶段由leader向所有节点发送事务请求,各自执行后向leader返回执行成功或者失败,但不进行commit操作(锁资源,将undo和redo信息写到事务日志)
commit阶段可由以下几种情况触发:
1.超时没有收到所有节点反馈
2.某个节点反馈失败
3.全部节点返回成功
到了commit阶段,1和2两种情况都会触发rollback,leader向参与本次事务的所有节点发送rollback命令,每个节点读取undo,执行事务回滚
第3种情况,leader向所有参与本次事务的节点发送commit命令,每个节点读取redo,执行事务提交
commit阶段以后,每个节点才会释放资源,切这个过程是同步的,且单点影响较大,单个节点故障会导致集群事务失败

ZAB协议-事务广播

ZAB协议是一个中心化的分布式协调协议(非中心化的如redis的gossip——原理和网络协议的泛洪法类似)
ZAB协议对于事务请求,采取过半选举原则,对于分区容错性进行宽容但避免了单点问题,因为最终会进行数据同步,所以还是能保证分布式环境下数据的最终一致性
preCommit阶段,zookeeper的leader节点收到follower发送的事务请求,执行该事务不提交并封装为PROPOSAL消息(包含redo、undo等),向所有follower节点下发该PROPOSAL消息
follower节点收到PROPOSAL消息后,将对应的数据信息写到磁盘日志文件返回ack,如果写入失败则抛弃leader,重新加入集群(会等到事务结束才能成功)进入数据同步阶段
commit阶段,当超时时间到来前,leader节点收到参与投票的节点(follower节点+leader)超过半数以上的ack时,认为本次事务成功,向所有follower节点广播commit消息(只有zxid),并向observer发送INFORM消息(包含PROPOSAL消息),要求每个节点将PROPOSAL消息执行事务提交(写入内存)
如果超时没有收到半数以上的commit,认为本次事务失败,向所有follower节点广播rollback消息,因为提交事务了,执行了该操作的所有zookeeper服务器的zxid会+1
接下来,集群所有节点在与leader通信时,发现zxid比leader小的(重新加入集群那部分),leader就会发送命令进行数据同步

ZAB协议-崩溃恢复

zookeeper集群下,非leader节点出现崩溃只要不影响选举,因为重新启动以后连接集群都要与leader通信,比较zxid进行数据同步,较好解决
而leader节点如果崩溃,一般要面临重新选举,选举也是采用半数通过原则(每次选举都会将当前结点能收到消息的,收到最多投票的节点作为下一次选举对象,如果选举对象没有获得半数以上选票会重复这个过程)
leader选举后,就面临数据同步问题
集群内的有两种
1.旧leader收到的事务请求但还没有发出的
2.旧leader发出commit操作但是还没有收到全部反馈的
对于第一种情况,zookeeper采取的策略是直接丢弃,对执行了该事务未提交的机器进行rollback
实现原理基于zxid的设计,zxid是64位,高32位为epoch,低32位为PROPOSAL事务计数器
epoch为leader编号,每换一次leader会+1
对于第二种情况,zookeeper采取的策略是保证全部提交,这时如果有follower中保存有盖zxid的PROPOSAL事务而未提交,leader会发送commit命令

更多精彩内容

请移步

个人主页: yangyitao.top

zookeeper集群搭建

欢迎查看Eetal的第二十三篇博客–zookeeper集群搭建

zookeeper简介

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务。
它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
一开始是用于分布式系统下资源的同步访问
在分布式事务场景下也多作为 协调者
是一个中心化管理的分布式同步技术
zk上的数据和redis类似,存在于内存,保存快照、数据日志等文件

安装zookeeper

需要安装jdk环境——jdk1.8的u211以上版本
下载zookeeper压缩包并解压——3.5.5

复制zoo.cfg.sample为zoo1.cfg作为第一台的配置文件
复制多个配置文件作为多台zk启动的不同配置
编辑不同配置文件,使用不同客户端连接端口、数据同步端口和选举端口
server.1=127.0.0.1:2881:3881——第一个端口代表数据同步端口,第二个代表选举端口
dataDir配置到不同目录,不同服务器存放数据的快照文件到不同文件夹,还要存放不同的myid文件
myid文件中只保存一个数字代表当前服务器的id
dataLogDir配置到不同目录,使得数据的日志分开存储(不包括运行时的日志,运行时日志在log4j配置文件里配置)

1
2
3
4
5
dataDir=/tmp/zookeeper01
clientPort=2181
server.1=127.0.0.1:2881:3881
server.2=127.0.0.1:2882:3882
server.3=127.0.0.1:2883:3883

使用mvn打包

3.5.5版本的zookeeper需要最少jdk1.8 u211版本,并且默认是没有编译class文件的
需要进入zookeeper-server目录,使用maven命令
mvn package -Dmaventestskip=true
跳过测试程序并打包编译

运行zk集群

我们配置有三台zk,假设第一个的配置文件为conf目录下的zoo1.cfg,依次类推
运行过程为
使用zk的bin下的zkServer.sh命令运行

1
./zkServer.sh start conf/zoo1.cfg

启动成功后,这时使用命令查看状态

1
./zkServer.sh start conf/zoo1.cfg

应该是提示有错误,可能没有运行成功的
那是因为集群里只有一台机器无法选举(因为zk是中心化的,必须有一个协调者),不用管继续启动剩下的机器

1
./zkServer.sh start conf/zoo2.cfg

这时查看状态会发现两台机器都启动成功了
并且一台成为了follower一台成为leader
zookeeper集群下的机器有三种角色,leader、follower和observer
leader负责事务请求(增删改),任意节点都可处理查询请求,follower参与选举,observer不参与选举只做数据同步
接着启动最后一台

1
./zkServer.sh start conf/zoo3.cfg

开启会话

使用zkCli命令

1
sh bin/zkCli.sh -server host:port

host为zkServer的ip或者域名,port为配置的client端口

结束会话

1
quit

创建节点

持久节点

有序节点

创建时添加参数 -s 指定,所有有序节点会在节点,排序从0开始,每次创建有序节点会使序号加一
有序节点会在输入名称后加上序号作为节点名称创建节点

1
create -s sortNode aaa

普通节点

1
create node bbb

没有指定为临时节点的节点默认都是持久节点

临时节点。

临时节点在会话结束时就会被删除,通过创建时添加参数 -e 指定
临时节点不能存在子节点

1
create -e tmpNode tmp

临时节点也可以是有序节点

1
create -s -e sortTmpNode sortTmp

ls命令

ls命令与linux类似,展示给定路径的所有子节点
根路径为/

1
2
ls /
ls -w /

-w参数代表开启该路径的一次watch,下一次且仅一次该路径下的子节点的增删事件会被监听到

修改节点值

absPath—节点的绝对路径值,root为/

1
set [absPath] value [version]

version值可不传,用于并发控制,代表要求的为dataversion,即该节点被修改过几次,如果不符合,本次修改失败

获取节点值

absPath—节点的绝对路径值,root为/

1
2
get -w [absPath]
get -s [absPath]

-w参数代表开启一次watch,下一次且仅一次对该节点的set事件将会被监听到
-s参数代表获取节点值并展示包括版本号、子节点个数等信息
-s输出格式如下

1
2
3
4
5
6
7
8
9
10
11
cZxid = 0x10000001d	//create node时分配的id
ctime = Tue Jul 16 07:22:54 PDT 2019// 创建时间
mZxid = 0x10000001f //set修改过以后变更的id
mtime = Wed Jul 17 08:43:33 PDT 2019 //最后一次修改时间
pZxid = 0x10000001d //一开始为cZxid,子节点修改时变更
cversion = 0 //子节点版本号,一开始为0,子节点修改时变更
dataVersion = 2 //数据版本号,一开始为0,每修改一次+1
aclVersion = 0 //acl权限版本号,一开始为0[create,read,write,delete,admin]
ephemeralOwner = 0x0 //临时节点才会有,持有者会话id
dataLength = 1 //数据的字符串长度
numChildren = 0 //子节点个数

删除节点

absPath—节点的绝对路径值,root为/

1
delete [absPath]

更多精彩内容

请移步

个人主页: yangyitao.top

MBean与JMX

欢迎查看Eetal的第二十二篇博客–MBean与JMX

JMX

JMX(java Management Exetensions)在Java编程语言中定义了应用程序以及网络管理和监控的体系结构、设计模式、应用程序接口以及服务。
通常使用JMX来监控系统的运行状态或管理系统的某些方面,比如清空缓存、重新加载配置文件等
优点是可以非常容易的使应用程序被管理
伸缩性的架构使每个JMX Agent Service可以很容易的放入到Agent中,每个JMX的实现都提供几个核心的Agent Service,你也可以自己编写服务,服务可以很容易的部署,取消部署。
主要作用是提供接口,允许有不同的实现
简单来说,jmx是一个用来管理javaBean并可以进行监控的扩展规范,结合MBeanServer、rmi与http等可以作为一个服务监控和提供中心

MBeanServer

MBeanServer是JMX代理的核心组件。
它是在代理中向管理操作公开的对象的注册表。
向MBeanServer注册的任何对象都对管理应用程序可见。
MBeanServer仅公开MBean的管理接口,而不是它的直接对象引用。
您要从代理的Java VM外部管理的任何资源都必须在MBeanServer中注册为MBean。
MBeanServer还提供标准化接口,用于访问同一Java VM中的MBean,为本地对象提供操作可管理资源的所有好处。
需要注意的是,一般不使用MBeanServerFactory.createMBeanServer(),使用ManagementFactory.getPlatformMBeanServer()
后者在当前尚未在ManagementFactory中注册静态成员MBeanServer时,会先使用MBeanServerFactory.createMBeanServer()创建一个MBeanServer并将其注册到静态成员,后续每次调用会直接返回该成员实例
并且ManagementFactory.getPlatformMBeanServer()方法在第一次调用createMBeanServer()实例化MBeanServer以后,会读取PlatformComponent枚举的枚举值,将一些系统必要的MBean注册到MBeanServer
JConsole监控的MBeanServer就是在ManagementFactory中注册的静态成员MBeanServe
所以如果没有特殊配置的MBeanServer,jconsole是不会监控的

Agent

Java Management Extensions(JMX)Agent是一个在Java虚拟机(Java VM)中运行的管理实体.
充当MBean和管理应用程序(JConsole等)之间的联络人
Agent只是一个规范,一般会封装我们创建和启动MBeanServer以及注册MBean的过程在一个Agent行为里,方便启动

Agent Service

Agent Service是可以对MBeanServer中注册的MBean执行管理操作的对象。
通过将管理智能包含在代理中,JMX可帮助您构建更强大的管理解决方案。
Agent Service通常也是MBean,允许通过MBeanServer控制它们及其功能。
JMX规范定义了以下Agent Service:
通过管理applet(m-let)服务的动态类加载检索并实例化从网络动态下载的新类和本机库。
监视器观察MBean属性的数字或字符串值,并可以向其他对象通知几种类型的更改。
定时器提供调度机制,并且可以以预定间隔发送通知。
关系服务定义MBean之间的关联并维护关系的一致性。

Protocol Adaptors and Connectors

Protocol adaptors and connectors使代理可从远程(通过rmi或者http等协议)管理应用程序访问。
它们通过在MBean服务器中实例化和注册的MBean的特定协议提供视图。
它们使Java VM外部的管理应用程序能够:
获取或设置现有MBean的属性
对现有MBean执行操作
实例化并注册新的MBean
注册并接收MBean发出的通知
因此,要使JMX代理易于管理,它必须至少包含一个协议适配器或连接器。
Java SE平台包括标准RMI连接器。Agent可以包括任意数量的协议适配器和连接器,允许通过不同的协议同时远程管理和监视它

Protocol Adaptors

Protocol Adaptors通过给定协议提供JMX代理的管理视图。
它们将MBean和MBean服务器的操作调整为给定协议中的表示,
并可能调整为不同的信息模型,例如SNMP。Java SE平台不包括任何协议适配器作为标准。
连接到Protocol Adaptors的管理应用程序通常特定于给定的协议。
这通常是依赖于特定管理协议的传统管理解决方案的情况。
它们不是通过MBean服务器的远程表示来访问JMX代理,而是通过映射到MBeanServer的操作来访问JMX代理。

Connectors

Connectors用于将代理与为JMX技术启用的远程管理应用程序连接,即使用JMX规范的分布式服务开发的管理应用程序。
这种通信涉及Agent中的连接器服务器和管理器中的连接器客户端。
这些组件以特定协议的方式透明地传递管理操作。
JMX Remote API为MBeanServer提供远程接口,管理应用程序可以通过该接口执行操作。
Connectors特定于给定协议,但管理应用程序可以无差别地使用任何Connectors,因为它们具有相同的远程接口。

MBean

描述一个可管理的资源。是一个java对象,遵循以下一些规则:
1.必须是公用的,非抽象的类
2.必须有至少一个公用的构造器
3.必须实现它自己的相应的MBean接口或者实现javax.management.DynamicMBean接口
4.可选的,一个MBean可以实现javax.management.NotificationBroadcaster接口MBean的类型
MBean使用ObjectName以keyValue形式注册到MBeanServer上

Standard MBean

标准MBean,也是最简单的MBean,通过类实现的接口名称来识别MBean
接口名称以MBean结尾,实现类需要匹配接口名称的前半部分

1
2
3
4
public interface YytMBean{
public void setName(String name);
public String getName();
}

1
2
3
4
5
6
7
8
9
10
11
public class Yyt implements YytMBean{
private String name;
@Override
public void setName(String name) {
this.name = name;
}
@Override
public String getName() {
return name;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class YytAgent{
private String domainName;
private int rmiPort;
public YytAgent(String domainName,int rmiPort) {
this.domainName = domainName;
this.rmiPort = rmiPort;
}
public void start() throws MalformedObjectNameException, InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException, IOException {
//MBeanServerFactory.createMBeanServer(); //don't use this
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); //use this instead

YytMBean yytMBean = new Yyt();
ObjectName objectName = new ObjectName(domainName+":name=Yyt");
mBeanServer.registerMBean(yytMBean, objectName);

LocateRegistry.createRegistry(rmiPort);//开启rmi端口监听,在jmxConnectorServer.start()时会根据serviceUrl建立一个rmiServer监听该rmi端口

JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:"+rmiPort+"/"+domainName);
JMXConnectorServer jmxConnectorServer = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mBeanServer);
jmxConnectorServer.start();
//开启jmx协议的监控服务器,因为java自带rmi的工具和依赖,可以直接开启,通过jconsole等支持jmx协议的客户端可以监控MBeanServer
}
}

使用JConsole工具监控

JConsole是java自带的监控程序,独立jre中没有,jdk下的jre具备

运行java应用时带上参数
-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=8999
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
关闭ssl和密码校验,开启远程监控端口8999,这时可以直接使用jconsole进行链接

或者在jdk\jre\lib\management目录下,复制一份jmxremote.password.template的文件,改名去掉.template后缀,并去掉文件末尾两行示例的用户名和密码注释,修改密码为自己想要的
运行加上参数
-Djava.rmi.server.hostname=应用所在服务器的ip或者域名
-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=监控端口
-Dcom.sun.management.jmxremote.rmi.port=rmi监控端口(一般与监控端口保持一致)
-Dcom.sun.management.jmxremote.authenticate=true
-Dcom.sun.management.jmxremote.ssl=false
这时使用JConsole链接会提示需要输入用户名和密码

在命令行输入jconsole运行jconsole程序
jconsole
在jconsole的界面,因为是本地,直接选择本地进程,进入监控页面
点击导航栏的MBean即可看到我们注册的MBean在列表中
jconsole
还可以对Bean的属性值进行查看和设置
jconsole

源码分析

ManagementFactory.getPlatformMBeanServer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
Permission perm = new MBeanServerPermission("createMBeanServer");
sm.checkPermission(perm);
}

if (platformMBeanServer == null) {
platformMBeanServer = MBeanServerFactory.createMBeanServer();
for (PlatformComponent pc : PlatformComponent.values()) {
List<? extends PlatformManagedObject> list =
pc.getMXBeans(pc.getMXBeanInterface());
for (PlatformManagedObject o : list) {
// Each PlatformComponent represents one management
// interface. Some MXBean may extend another one.
// The MXBean instances for one platform component
// (returned by pc.getMXBeans()) might be also
// the MXBean instances for another platform component.
// e.g. com.sun.management.GarbageCollectorMXBean
//
// So need to check if an MXBean instance is registered
// before registering into the platform MBeanServer
if (!platformMBeanServer.isRegistered(o.getObjectName())) {
addMXBean(platformMBeanServer, o);
}
}
}
HashMap<ObjectName, DynamicMBean> dynmbeans =
ManagementFactoryHelper.getPlatformDynamicMBeans();
for (Map.Entry<ObjectName, DynamicMBean> e : dynmbeans.entrySet()) {
addDynamicMBean(platformMBeanServer, e.getValue(), e.getKey());
}
for (final PlatformManagedObject o :
ExtendedPlatformComponent.getMXBeans()) {
if (!platformMBeanServer.isRegistered(o.getObjectName())) {
addMXBean(platformMBeanServer, o);
}
}
}
return platformMBeanServer;

ManagementFactory中定义一个platformMBeanServer成员来装载创建好的MBeanServer
在执行该方法的第一步会先校验当前安全权限
在platformMBeanServer尚未绑定一个实例时,会先使用MBeanServerFactory.createMBeanServer()实例一个MBeanServer对象
接下来主要是读取PlatformComponent.values()以及别的枚举类和helper类的值和成员,将系统预设的MBean注册到MBeanServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
enum PlatformComponent {

/**
* Class loading system of the Java virtual machine.
*/
CLASS_LOADING(
"java.lang.management.ClassLoadingMXBean",
"java.lang", "ClassLoading", defaultKeyProperties(),
true, // singleton
new MXBeanFetcher<ClassLoadingMXBean>() {
public List<ClassLoadingMXBean> getMXBeans() {
return Collections.singletonList(ManagementFactoryHelper.getClassLoadingMXBean());
}
}),

/**
* Compilation system of the Java virtual machine.
*/
COMPILATION(
"java.lang.management.CompilationMXBean",
"java.lang", "Compilation", defaultKeyProperties(),
true, // singleton
new MXBeanFetcher<CompilationMXBean>() {
public List<CompilationMXBean> getMXBeans() {
CompilationMXBean m = ManagementFactoryHelper.getCompilationMXBean();
if (m == null) {
return Collections.emptyList();
} else {
return Collections.singletonList(m);
}
}
}),
.....
}

MBeanServerFactory.createMBeanServer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
   public static MBeanServer createMBeanServer() {
return createMBeanServer(null);
}
public static MBeanServer createMBeanServer(String domain) {
checkPermission("createMBeanServer");

final MBeanServer mBeanServer = newMBeanServer(domain);
addMBeanServer(mBeanServer);
return mBeanServer;
}
public static MBeanServer newMBeanServer(String domain) {
checkPermission("newMBeanServer");

// Get the builder. Creates a new one if necessary.
//
final MBeanServerBuilder mbsBuilder = getNewMBeanServerBuilder();
// Returned value cannot be null. NullPointerException if violated.

synchronized(mbsBuilder) {
final MBeanServerDelegate delegate =
mbsBuilder.newMBeanServerDelegate();
if (delegate == null) {
final String msg =
"MBeanServerBuilder.newMBeanServerDelegate() " +
"returned null";
throw new JMRuntimeException(msg);
}
final MBeanServer mbeanServer =
mbsBuilder.newMBeanServer(domain,null,delegate);
if (mbeanServer == null) {
final String msg =
"MBeanServerBuilder.newMBeanServer() returned null";
throw new JMRuntimeException(msg);
}
return mbeanServer;
}
}

createMBeanServer方法会调用newMBeanServer方法实例化MBeanServer
newMBeanServer方法中会先使用MBeanServerBuilder.newMBeanServerDelegate()实例化一个MBeanServerBuilder
再使用MBeanServerBuilder.newMBeanServer(domain,null,delegate)方法实例化一个MBeanServer

1
2
3
4
5
6
7
8
9
public MBeanServer newMBeanServer(String  defaultDomain,
MBeanServer outer,
MBeanServerDelegate delegate) {
// By default, MBeanServerInterceptors are disabled.
// Use com.sun.jmx.mbeanserver.MBeanServerBuilder to obtain
// MBeanServers on which MBeanServerInterceptors are enabled.
return JmxMBeanServer.newMBeanServer(defaultDomain,outer,delegate,
false);
}

newMBeanServer方法实际实例化并返回的是一个JmxMBeanServer对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//in JmxMBeanServer
public static MBeanServer newMBeanServer(String defaultDomain,
MBeanServer outer,
MBeanServerDelegate delegate,
boolean interceptors) {
// Determine whether to use fair locking for the repository.
// Default is true.
final boolean fairLock = DEFAULT_FAIR_LOCK_POLICY;

checkNewMBeanServerPermission();

// This constructor happens to disregard the value of the interceptors
// flag - that is, it always uses the default value - false.
// This is admitedly a bug, but we chose not to fix it for now
// since we would rather not have anybody depending on the Sun private
// interceptor APIs - which is most probably going to be removed and
// replaced by a public (javax) feature in the future.
//
return new JmxMBeanServer(defaultDomain,outer,delegate,null,
interceptors,fairLock);
}

JmxMBeanServer(String domain, MBeanServer outer,
MBeanServerDelegate delegate,
MBeanInstantiator instantiator,
boolean interceptors,
boolean fairLock) {

if (instantiator == null) {
final ModifiableClassLoaderRepository
clr = new ClassLoaderRepositorySupport();
instantiator = new MBeanInstantiator(clr);
}

final MBeanInstantiator fInstantiator = instantiator;
this.secureClr = new
SecureClassLoaderRepository(AccessController.doPrivileged(new PrivilegedAction<ClassLoaderRepository>() {
@Override
public ClassLoaderRepository run() {
return fInstantiator.getClassLoaderRepository();
}
})
);
if (delegate == null)
delegate = new MBeanServerDelegateImpl();
if (outer == null)
outer = this;

this.instantiator = instantiator;
this.mBeanServerDelegateObject = delegate;
this.outerShell = outer;

final Repository repository = new Repository(domain);
this.mbsInterceptor =
new DefaultMBeanServerInterceptor(outer, delegate, instantiator,
repository);
this.interceptorsEnabled = interceptors;
initialize();
}

实例化一个Repository并聚合到新生成的一个DefaultMBeanServerInterceptor
Repository就是MBeanServer存放MBean的容器类,其内部维护一个Map<String,Map<String,NamedObject>> domainTb的成员
此处由于是第一次实例化MBeanServer,传入的domain为null,最终会使用ServiceName的静态的成员值DefaultDomain作为默认域

1
2
3
4
5
6
7
8
9
10
public class ServiceName {

/**
* The default domain.
* <BR>
* The value is <CODE>DefaultDomain</CODE>.
*/
public static final String DOMAIN = "DefaultDomain";
.....
}

NamedObject类是聚合ObjectName和DynamicMBean的辅助类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class NamedObject  {


/**
* Object name.
*/
private final ObjectName name;

/**
* Object reference.
*/
private final DynamicMBean object;
.....
}

后续的注册bean和获取bean等方法实现最终都是由mbsInterceptor成员去操作

LocateRegistry.createRegistry

创建注册中心并开启端口监听,返回创建的RegistryImpl实例
源码分析请参照本人的另一篇博客——java-rmi源码解析

MBeanServer.registerMBean

在JmxMBeanServer中的实现

1
2
3
4
5
6
public ObjectInstance registerMBean(Object object, ObjectName name)
throws InstanceAlreadyExistsException, MBeanRegistrationException,
NotCompliantMBeanException {

return mbsInterceptor.registerMBean(object, cloneObjectName(name));
}

上面讲到,默认的MBeanServer的mbsInterceptor成员由new DefaultMBeanServerInterceptor(outer, delegate, instantiator,repository)实例化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public ObjectInstance registerMBean(Object object, ObjectName name)
throws InstanceAlreadyExistsException, MBeanRegistrationException,
NotCompliantMBeanException {

// ------------------------------
// ------------------------------
Class<?> theClass = object.getClass();

Introspector.checkCompliance(theClass);

final String infoClassName = getNewMBeanClassName(object);

checkMBeanPermission(infoClassName, null, name, "registerMBean");
checkMBeanTrustPermission(theClass);

return registerObject(infoClassName, object, name);
}
private ObjectInstance registerObject(String classname,
Object object, ObjectName name)
throws InstanceAlreadyExistsException,
MBeanRegistrationException,
NotCompliantMBeanException {

if (object == null) {
final RuntimeException wrapped =
new IllegalArgumentException("Cannot add null object");
throw new RuntimeOperationsException(wrapped,
"Exception occurred trying to register the MBean");
}

DynamicMBean mbean = Introspector.makeDynamicMBean(object);

return registerDynamicMBean(classname, mbean, name);
}

DefaultMBeanServerInterceptor会将要注册的MBean封装为DynamicMBean,调用registerDynamicMBean进行注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
private ObjectInstance registerDynamicMBean(String classname,
DynamicMBean mbean,
ObjectName name)
throws InstanceAlreadyExistsException,
MBeanRegistrationException,
NotCompliantMBeanException {


name = nonDefaultDomain(name);

if (MBEANSERVER_LOGGER.isLoggable(Level.FINER)) {
MBEANSERVER_LOGGER.logp(Level.FINER,
DefaultMBeanServerInterceptor.class.getName(),
"registerMBean", "ObjectName = " + name);
}

ObjectName logicalName = preRegister(mbean, server, name);

// preRegister returned successfully, so from this point on we
// must call postRegister(false) if there is any problem.
boolean registered = false;
boolean registerFailed = false;
ResourceContext context = null;

try {
if (mbean instanceof DynamicMBean2) {
try {
((DynamicMBean2) mbean).preRegister2(server, logicalName);
registerFailed = true; // until we succeed
} catch (Exception e) {
if (e instanceof RuntimeException)
throw (RuntimeException) e;
if (e instanceof InstanceAlreadyExistsException)
throw (InstanceAlreadyExistsException) e;
throw new RuntimeException(e);
}
}

if (logicalName != name && logicalName != null) {
logicalName =
ObjectName.getInstance(nonDefaultDomain(logicalName));
}

checkMBeanPermission(classname, null, logicalName, "registerMBean");

if (logicalName == null) {
final RuntimeException wrapped =
new IllegalArgumentException("No object name specified");
throw new RuntimeOperationsException(wrapped,
"Exception occurred trying to register the MBean");
}

final Object resource = getResource(mbean);

// Register the MBean with the repository.
// Returns the resource context that was used.
// The returned context does nothing for regular MBeans.
// For ClassLoader MBeans the context makes it possible to register these
// objects with the appropriate framework artifacts, such as
// the CLR, from within the repository lock.
// In case of success, we also need to call context.done() at the
// end of this method.
//
context = registerWithRepository(resource, mbean, logicalName);


registerFailed = false;
registered = true;

} finally {
try {
postRegister(logicalName, mbean, registered, registerFailed);
} finally {
if (registered && context!=null) context.done();
}
}
return new ObjectInstance(logicalName, classname);
}

registerDynamicMBean方法最终调用registerWithRepository方法将DynamicMBean对象注册到Repository里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private ResourceContext registerWithRepository(
final Object resource,
final DynamicMBean object,
final ObjectName logicalName)
throws InstanceAlreadyExistsException,
MBeanRegistrationException {

// Creates a registration context, if needed.
//
final ResourceContext context =
makeResourceContextFor(resource, logicalName);


repository.addMBean(object, logicalName, context);
// May throw InstanceAlreadyExistsException

// ---------------------
// Send create event
// ---------------------
if (MBEANSERVER_LOGGER.isLoggable(Level.FINER)) {
MBEANSERVER_LOGGER.logp(Level.FINER,
DefaultMBeanServerInterceptor.class.getName(),
"addObject", "Send create notification of object " +
logicalName.getCanonicalName());
}

sendNotification(
MBeanServerNotification.REGISTRATION_NOTIFICATION,
logicalName);

return context;
}

前面已经讲过在创建DefaultMBeanServerInterceptor实例时,会实例化一个repository对象并装载到成员
注册bean会调用repository的addMBean方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public void addMBean(final DynamicMBean object, ObjectName name,
final RegistrationContext context)
throws InstanceAlreadyExistsException {

if (MBEANSERVER_LOGGER.isLoggable(Level.FINER)) {
MBEANSERVER_LOGGER.logp(Level.FINER, Repository.class.getName(),
"addMBean", "name = " + name);
}

// Extract the domain name.
String dom = name.getDomain().intern();
boolean to_default_domain = false;

// Set domain to default if domain is empty and not already set
if (dom.length() == 0)
name = Util.newObjectName(domain + name.toString());

// Do we have default domain ?
if (dom == domain) { // ES: OK (dom & domain are interned)
to_default_domain = true;
dom = domain;
} else {
to_default_domain = false;
}

// Validate name for an object
if (name.isPattern()) {
throw new RuntimeOperationsException(
new IllegalArgumentException("Repository: cannot add mbean for " +
"pattern name " + name.toString()));
}

lock.writeLock().lock();
try {
// Domain cannot be JMImplementation if entry does not exist
if ( !to_default_domain &&
dom.equals("JMImplementation") &&
domainTb.containsKey("JMImplementation")) {
throw new RuntimeOperationsException(
new IllegalArgumentException(
"Repository: domain name cannot be JMImplementation"));
}

// If domain does not already exist, add it to the hash table
final Map<String,NamedObject> moiTb = domainTb.get(dom);
if (moiTb == null) {
addNewDomMoi(object, dom, name, context);
return;
} else {
// Add instance if not already present
String cstr = name.getCanonicalKeyPropertyListString();
NamedObject elmt= moiTb.get(cstr);
if (elmt != null) {
throw new InstanceAlreadyExistsException(name.toString());
} else {
nbElements++;
addMoiToTb(object,name,cstr,moiTb,context);
}
}

} finally {
lock.writeLock().unlock();
}
}
private void addMoiToTb(final DynamicMBean object,
final ObjectName name,
final String key,
final Map<String,NamedObject> moiTb,
final RegistrationContext context) {
registering(context);
moiTb.put(key,new NamedObject(name, object));
}

domainTb成员就是前面说过的repository中存储MBean的Map,类型是Map<String,Map<String,NamedObject>>
add方法里会通过writelock进行同步
如果获取到的moiTb,代表该MBean的域名尚未加入domainTb,会将域名加入map并将MBean也加入

JmxMBeanServer

JmxMBeanServer
不提供直接返回MBean对象的方法
但提供了很多行为如 setAttributes,getAttribute,invoke等方法,根据ObjectName参数进行MBean识别
调用成员的DefaultMBeanServerInterceptor实例去执行对应的修改属性,获取属性值,执行方法等行为
DefaultMBeanServerInterceptor最终是获取repository中的MBean去执行对应行为
通过getObjectInstance可以获得该MBean的类名和域名信息的封装对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public ObjectInstance getObjectInstance(ObjectName name)
throws InstanceNotFoundException {

return mbsInterceptor.getObjectInstance(cloneObjectName(name));
}
public Object getAttribute(ObjectName name, String attribute)
throws MBeanException, AttributeNotFoundException,
InstanceNotFoundException, ReflectionException {

return mbsInterceptor.getAttribute(cloneObjectName(name), attribute);
}
public void setAttribute(ObjectName name, Attribute attribute)
throws InstanceNotFoundException, AttributeNotFoundException,
InvalidAttributeValueException, MBeanException,
ReflectionException {

mbsInterceptor.setAttribute(cloneObjectName(name),
cloneAttribute(attribute));
}
public Object invoke(ObjectName name, String operationName,
Object params[], String signature[])
throws InstanceNotFoundException, MBeanException,
ReflectionException {
return mbsInterceptor.invoke(cloneObjectName(name), operationName,
params, signature);
}

JMXConnectorServerFactory.newJMXConnectorServer

该方法创建一个JMXConnectorServer,将MBeanServer和JMXServiceURL传递过去
最终url、MbeanServer等对象会被聚合到JMXConnectorServer,作为成员

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public static JMXConnectorServer
newJMXConnectorServer(JMXServiceURL serviceURL,
Map<String,?> environment,
MBeanServer mbeanServer)
throws IOException {
Map<String, Object> envcopy;
if (environment == null)
envcopy = new HashMap<String, Object>();
else {
EnvHelp.checkAttributes(environment);
envcopy = new HashMap<String, Object>(environment);
}

final Class<JMXConnectorServerProvider> targetInterface =
JMXConnectorServerProvider.class;
final ClassLoader loader =
JMXConnectorFactory.resolveClassLoader(envcopy);
final String protocol = serviceURL.getProtocol();
final String providerClassName = "ServerProvider";

JMXConnectorServerProvider provider =
JMXConnectorFactory.getProvider(serviceURL,
envcopy,
providerClassName,
targetInterface,
loader);

IOException exception = null;
if (provider == null) {
// Loader is null when context class loader is set to null
// and no loader has been provided in map.
// com.sun.jmx.remote.util.Service class extracted from j2se
// provider search algorithm doesn't handle well null classloader.
if (loader != null) {
try {
JMXConnectorServer connection =
getConnectorServerAsService(loader,
serviceURL,
envcopy,
mbeanServer);
if (connection != null)
return connection;
} catch (JMXProviderException e) {
throw e;
} catch (IOException e) {
exception = e;
}
}
provider =
JMXConnectorFactory.getProvider(
protocol,
PROTOCOL_PROVIDER_DEFAULT_PACKAGE,
JMXConnectorFactory.class.getClassLoader(),
providerClassName,
targetInterface);
}

if (provider == null) {
MalformedURLException e =
new MalformedURLException("Unsupported protocol: " + protocol);
if (exception == null) {
throw e;
} else {
throw EnvHelp.initCause(e, exception);
}
}

envcopy = Collections.unmodifiableMap(envcopy);

return provider.newJMXConnectorServer(serviceURL,
envcopy,
mbeanServer);
}

JMXConnectorServerFactory的newJMXConnectorServer最终是返回由provider生产的JMXConnectorServer
provider是接口,在jdk里对于该方法有两个实现类,一个是返回基于rmi协议,另一个是返回基于iiop协议的

1
2
3
4
5
6
7
8
9
10
11
//ServiceProvider for rmi
public JMXConnectorServer newJMXConnectorServer(JMXServiceURL serviceURL,
Map<String,?> environment,
MBeanServer mbeanServer)
throws IOException {
if (!serviceURL.getProtocol().equals("rmi")) {
throw new MalformedURLException("Protocol not rmi: " +
serviceURL.getProtocol());
}
return new RMIConnectorServer(serviceURL, environment, mbeanServer);
}

需要留意的是,在serviceProvider中调用生成RMIConnectorServer时,最终会调用传递(RMIServerImpl) null为形参参数的构造函数构造
也就是生成的RMIConnectorServer中是没有指定服务端对象的
RMIServerImpl会在JMXConnectorServer调用start方法后结合前面聚合到JMXConnectorServer中的rmi的url去实例化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
   public RMIConnectorServer(JMXServiceURL url, Map<String,?> environment,
MBeanServer mbeanServer)
throws IOException {
this(url, environment, (RMIServerImpl) null, mbeanServer);
}
public RMIConnectorServer(JMXServiceURL url, Map<String,?> environment,
RMIServerImpl rmiServerImpl,
MBeanServer mbeanServer)
throws IOException {
super(mbeanServer);

if (url == null) throw new
IllegalArgumentException("Null JMXServiceURL");
if (rmiServerImpl == null) {
final String prt = url.getProtocol();
if (prt == null || !(prt.equals("rmi") || prt.equals("iiop"))) {
final String msg = "Invalid protocol type: " + prt;
throw new MalformedURLException(msg);
}
final String urlPath = url.getURLPath();
if (!urlPath.equals("")
&& !urlPath.equals("/")
&& !urlPath.startsWith("/jndi/")) {
final String msg = "URL path must be empty or start with " +
"/jndi/";
throw new MalformedURLException(msg);
}
}

if (environment == null)
this.attributes = Collections.emptyMap();
else {
EnvHelp.checkAttributes(environment);
this.attributes = Collections.unmodifiableMap(environment);
}

this.address = url;
this.rmiServerImpl = rmiServerImpl;
}

jmxConnectorServer.start()

启动JMX链接服务器,该过程会完成rmiServerImpl的实例化(开启监听rmi服务端口)
所以如果前面没有调用LocateRegistry.createRegistry(rmiPort)对端口进行注册,会导致报错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
public synchronized void start() throws IOException {
final boolean tracing = logger.traceOn();

if (state == STARTED) {
if (tracing) logger.trace("start", "already started");
return;
} else if (state == STOPPED) {
if (tracing) logger.trace("start", "already stopped");
throw new IOException("The server has been stopped.");
}

if (getMBeanServer() == null)
throw new IllegalStateException("This connector server is not " +
"attached to an MBean server");

// Check the internal access file property to see
// if an MBeanServerForwarder is to be provided
//
if (attributes != null) {
// Check if access file property is specified
//
String accessFile =
(String) attributes.get("jmx.remote.x.access.file");
if (accessFile != null) {
// Access file property specified, create an instance
// of the MBeanServerFileAccessController class
//
MBeanServerForwarder mbsf;
try {
mbsf = new MBeanServerFileAccessController(accessFile);
} catch (IOException e) {
throw EnvHelp.initCause(
new IllegalArgumentException(e.getMessage()), e);
}
// Set the MBeanServerForwarder
//
setMBeanServerForwarder(mbsf);
}
}

try {
if (tracing) logger.trace("start", "setting default class loader");
defaultClassLoader = EnvHelp.resolveServerClassLoader(
attributes, getMBeanServer());
} catch (InstanceNotFoundException infc) {
IllegalArgumentException x = new
IllegalArgumentException("ClassLoader not found: "+infc);
throw EnvHelp.initCause(x,infc);
}

if (tracing) logger.trace("start", "setting RMIServer object");
final RMIServerImpl rmiServer;

if (rmiServerImpl != null)
rmiServer = rmiServerImpl;
else
rmiServer = newServer();

rmiServer.setMBeanServer(getMBeanServer());
rmiServer.setDefaultClassLoader(defaultClassLoader);
rmiServer.setRMIConnectorServer(this);
rmiServer.export();

try {
if (tracing) logger.trace("start", "getting RMIServer object to export");
final RMIServer objref = objectToBind(rmiServer, attributes);

if (address != null && address.getURLPath().startsWith("/jndi/")) {
final String jndiUrl = address.getURLPath().substring(6);

if (tracing)
logger.trace("start", "Using external directory: " + jndiUrl);

String stringBoolean = (String) attributes.get(JNDI_REBIND_ATTRIBUTE);
final boolean rebind = EnvHelp.computeBooleanFromString( stringBoolean );

if (tracing)
logger.trace("start", JNDI_REBIND_ATTRIBUTE + "=" + rebind);

try {
if (tracing) logger.trace("start", "binding to " + jndiUrl);

final Hashtable<?, ?> usemap = EnvHelp.mapToHashtable(attributes);

bind(jndiUrl, usemap, objref, rebind);

boundJndiUrl = jndiUrl;
} catch (NamingException e) {
// fit e in the nested exception if we are on 1.4
throw newIOException("Cannot bind to URL ["+jndiUrl+"]: "
+ e, e);
}
} else {
// if jndiURL is null, we must encode the stub into the URL.
if (tracing) logger.trace("start", "Encoding URL");

encodeStubInAddress(objref, attributes);

if (tracing) logger.trace("start", "Encoded URL: " + this.address);
}
} catch (Exception e) {
try {
rmiServer.close();
} catch (Exception x) {
// OK: we are already throwing another exception
}
if (e instanceof RuntimeException)
throw (RuntimeException) e;
else if (e instanceof IOException)
throw (IOException) e;
else
throw newIOException("Got unexpected exception while " +
"starting the connector server: "
+ e, e);
}

rmiServerImpl = rmiServer;

synchronized(openedServers) {
openedServers.add(this);
}

state = STARTED;

if (tracing) {
logger.trace("start", "Connector Server Address = " + address);
logger.trace("start", "started.");
}
}

前面讲过rmiServerImpl成员是空的,所以这时会调用newServer方法进行创建

1
2
3
4
5
6
7
8
9
10
11
12
RMIServerImpl newServer() throws IOException {
final boolean iiop = isIiopURL(address,true);
final int port;
if (address == null)
port = 0;
else
port = address.getPort();
if (iiop)
return newIIOPServer(attributes);
else
return newJRMPServer(attributes, port);
}

接下来会调用objectToBind
通过toStub方法创建当前的RMIServer对象的远程代理对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static RMIServer objectToBind(
RMIServerImpl rmiServer, Map<String, ?> env)
throws IOException {
return RMIConnector.
connectStub((RMIServer)rmiServer.toStub(),env);
}
public Remote toStub() throws IOException {
return RemoteObject.toStub(this);
}
public static Remote toStub(Remote obj) throws NoSuchObjectException {
if (obj instanceof RemoteStub ||
(obj != null &&
Proxy.isProxyClass(obj.getClass()) &&
Proxy.getInvocationHandler(obj) instanceof
RemoteObjectInvocationHandler))
{
return obj;
} else {
return sun.rmi.transport.ObjectTable.getStub(obj);
}
}

然后调用bind方法将RMIServer的远程代理对象注册到注册中心

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void bind(String jndiUrl, Hashtable<?, ?> attributes,
RMIServer rmiServer, boolean rebind)
throws NamingException, MalformedURLException {
// if jndiURL is not null, we nust bind the stub to a
// directory.
InitialContext ctx =
new InitialContext(attributes);

if (rebind)
ctx.rebind(jndiUrl, rmiServer);
else
ctx.bind(jndiUrl, rmiServer);
ctx.close();
}

至此,完成了JmxConnectorServer的启动,程序可以通过jmx规范,访问MBeanServer与其上的MBean
关于remote和rmi的细节可以参考本人另一篇博客——java-rmi源码解析java-rmi源码解析

更多精彩内容

请移步

个人主页: yangyitao.top

java-rmi源码解析

欢迎查看Eetal的第二十一篇博客–java-rmi源码解析

相关核心类

sun.rmi.server.UnicastServerRef
sun.rmi.server.UnicastRef
sun.rmi.server.Util
sun.rmi.transport.tcp.TCPEndpoint
sun.rmi.transport.LiveRef
java.rmi.Naming
sun.rmi.registry.RegistryImpl

rmi

RMI是Java的一组拥护开发分布式应用程序的API。
RMI使用Java语言接口定义了远程对象,它集合了Java序列化和Java远程方法协议(Java Remote Method Protocol)。
简单地说,这样使原先的程序在同一操作系统的方法调用,变成了不同操作系统之间程序的方法调用,由于J2EE是分布式程序平台,它以RMI机制实现程序组件在不同操作系统之间的通信。
比如,一个EJB可以通过RMI调用Web上另一台机器上的EJB远程方法。

简单使用rmi

要发布的服务接口

1
2
3
4
5
public interface HelloService extends Remote{
public String sayHello() throws RemoteException;
//要发布的服务类的方法必须都throws RemoteException
//在Util中,创建代理对象时会checkMethod,存在没有throws RemoteException的则抛出IllegalArgumentException
}

util中的checkMethod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 public static Remote createProxy(Class<?> implClass,
RemoteRef clientRef,
boolean forceStubUse)
throws StubNotFoundException
{
.....
final Class<?>[] interfaces = getRemoteInterfaces(implClass);
.....
}
private static Class<?>[] getRemoteInterfaces(Class<?> remoteClass) {
ArrayList<Class<?>> list = new ArrayList<>();
getRemoteInterfaces(list, remoteClass);
return list.toArray(new Class<?>[list.size()]);
}

private static void getRemoteInterfaces(ArrayList<Class<?>> list, Class<?> cl) {
.....
for (int i = 0; i < interfaces.length; i++) {
Class<?> intf = interfaces[i];
if (Remote.class.isAssignableFrom(intf)) {
if (!(list.contains(intf))) {
Method[] methods = intf.getMethods();
for (int j = 0; j < methods.length; j++) {
checkMethod(methods[j]);
}
list.add(intf);
}
}
}
}

private static void checkMethod(Method m) {
Class<?>[] ex = m.getExceptionTypes();
for (int i = 0; i < ex.length; i++) {
if (ex[i].isAssignableFrom(RemoteException.class))
return;
}
throw new IllegalArgumentException(
"illegal remote method encountered: " + m);
}

要发布的服务接口的实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
public class HelloServiceImpl extends UnicastRemoteObject implements HelloService {
//需要继承UnicastRemoteObject,因为Remote只是接口,UnicastRemoteObject是Remote的实现类

protected HelloServiceImpl() throws RemoteException {
super();
}

@Override
public String sayHello() throws RemoteException{
return "hello";
}

}

服务发布端与客户消费端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Server {
final static String host = "127.0.0.1";
final static int port = 8080;
public static void main(String[] args) throws RemoteException, MalformedURLException {
HelloService helloService = new HelloServiceImpl();
LocateRegistry.createRegistry(port);
Naming.rebind("rmi://"+host+":"+port+"/hello", helloService);//不写port默认是1099
System.out.println("服务启动...");
}

}
public class Client {

public static void main(String[] args) throws MalformedURLException, RemoteException, NotBoundException {
HelloService helloService = (HelloService) Naming.lookup("rmi://"+Server.host+":"+Server.port+"/hello");//默认端口1099
System.out.println(helloService.sayHello());
}

}

追踪源码

启动服务前,创建服务对象会调用父类无参构造
HelloServiceImpl

1
2
3
protected HelloServiceImpl() throws RemoteException {
super();
}

UnicastRemoteObject构造

1
2
3
4
5
6
7
8
9
protected UnicastRemoteObject() throws RemoteException
{
this(0);
}
protected UnicastRemoteObject(int port) throws RemoteException
{
this.port = port;
exportObject((Remote) this, port);
}

在构造中调用了exportObject,将服务暴露出去

1
2
3
4
5
6
7
8
9
10
11
12
13
public static Remote exportObject(Remote obj, int port)
throws RemoteException
{
return exportObject(obj, new UnicastServerRef(port));
}
private static Remote exportObject(Remote obj, UnicastServerRef sref)
throws RemoteException
{
if (obj instanceof UnicastRemoteObject) {
((UnicastRemoteObject) obj).ref = sref;
}
return sref.exportObject(obj, null, false);
}

构造UnicastServerRef服务器对象来发布服务

1
2
3
4
public UnicastServerRef(int port) {
super(new LiveRef(port));
this.filter = null;
}

UnicastServerRef调用父类带参构造

1
2
3
public UnicastRef(LiveRef liveRef) {
ref = liveRef;
}

sref对象发布服务细节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public Remote exportObject(Remote impl, Object data,
boolean permanent)
throws RemoteException
{
Class<?> implClass = impl.getClass();
Remote stub;

try {
stub = Util.createProxy(implClass, getClientRef(), forceStubUse);
} catch (IllegalArgumentException e) {
throw new ExportException(
"remote object implements illegal remote interface", e);
}
if (stub instanceof RemoteStub) {
setSkeleton(impl);
}

Target target =
new Target(impl, this, stub, ref.getObjID(), permanent);
ref.exportObject(target);
hashToMethod_Map = hashToMethod_Maps.get(implClass);
return stub;
}

创建代理对象,并生成一个可以真正发布的target,调用ref对象(上一步构建以后传递到父类构造函数的LiveRef)发布出去
LiveRef细节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public LiveRef(int port) {
this((new ObjID()), port);
}
public LiveRef(ObjID objID, int port) {
this(objID, TCPEndpoint.getLocalEndpoint(port), true);
}
public LiveRef(ObjID objID, Endpoint endpoint, boolean isLocal) {
ep = endpoint;
id = objID;
this.isLocal = isLocal;
}
public void exportObject(Target target) throws RemoteException {
ep.exportObject(target);
}

TCPEndpoint.getLocalEndpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public static TCPEndpoint getLocalEndpoint(int port) {
return getLocalEndpoint(port, null, null);
}

public static TCPEndpoint getLocalEndpoint(int port,
RMIClientSocketFactory csf,
RMIServerSocketFactory ssf)
{
/*
* Find mapping for an endpoint key to the list of local unique
* endpoints for this client/server socket factory pair (perhaps
* null) for the specific port.
*/
TCPEndpoint ep = null;

synchronized (localEndpoints) {
TCPEndpoint endpointKey = new TCPEndpoint(null, port, csf, ssf);
LinkedList<TCPEndpoint> epList = localEndpoints.get(endpointKey);
String localHost = resampleLocalHost();

if (epList == null) {
/*
* Create new endpoint list.
*/
ep = new TCPEndpoint(localHost, port, csf, ssf);
epList = new LinkedList<TCPEndpoint>();
epList.add(ep);
ep.listenPort = port;
ep.transport = new TCPTransport(epList);
localEndpoints.put(endpointKey, epList);

if (TCPTransport.tcpLog.isLoggable(Log.BRIEF)) {
TCPTransport.tcpLog.log(Log.BRIEF,
"created local endpoint for socket factory " + ssf +
" on port " + port);
}
} else {
synchronized (epList) {
ep = epList.getLast();
String lastHost = ep.host;
int lastPort = ep.port;
TCPTransport lastTransport = ep.transport;
// assert (localHost == null ^ lastHost != null)
if (localHost != null && !localHost.equals(lastHost)) {
/*
* Hostname has been updated; add updated endpoint
* to list.
*/
if (lastPort != 0) {
/*
* Remove outdated endpoints only if the
* port has already been set on those endpoints.
*/
epList.clear();
}
ep = new TCPEndpoint(localHost, lastPort, csf, ssf);
ep.listenPort = port;
ep.transport = lastTransport;
epList.add(ep);
}
}
}
}

return ep;
}

最终获得一个可以发布服务的TCPEndpoint对象,并调用该对象把服务暴露出去

服务注册与拉取源码分析

注册服务
LocateRegistry.createRegistry(port);
创建注册中心时,会创建一个RegistryImpl对象

1
2
3
public static Registry createRegistry(int port) throws RemoteException {
return new RegistryImpl(port);
}

RegistryImpl代表注册中心
RegistryImpl中使用一个HashTable来注册(bind)服务和查找(lookup)服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private Hashtable<String, Remote> bindings = new Hashtable<>(101);
public RegistryImpl(int port)
throws RemoteException
{
if (port == Registry.REGISTRY_PORT && System.getSecurityManager() != null) {
try {
AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
public Void run() throws RemoteException {
LiveRef lref = new LiveRef(id, port);
setup(new UnicastServerRef(lref, RegistryImpl::registryFilter));
return null;
}
}, null, new SocketPermission("localhost:"+port, "listen,accept"));
} catch (PrivilegedActionException pae) {
throw (RemoteException)pae.getException();
}
} else {
LiveRef lref = new LiveRef(id, port);
setup(new UnicastServerRef(lref, RegistryImpl::registryFilter));
}
}
private void setup(UnicastServerRef uref)
throws RemoteException
{
ref = uref;
uref.exportObject(this, null, true);
}
public Remote lookup(String name)
throws RemoteException, NotBoundException
{
synchronized (bindings) {
Remote obj = bindings.get(name);
if (obj == null)
throw new NotBoundException(name);
return obj;
}
}
public void bind(String name, Remote obj)
throws RemoteException, AlreadyBoundException, AccessException
{
synchronized (bindings) {
Remote curr = bindings.get(name);
if (curr != null)
throw new AlreadyBoundException(name);
bindings.put(name, obj);
}
}
public void unbind(String name)
throws RemoteException, NotBoundException, AccessException
{
synchronized (bindings) {
Remote obj = bindings.get(name);
if (obj == null)
throw new NotBoundException(name);
bindings.remove(name);
}
}
public void rebind(String name, Remote obj)
throws RemoteException, AccessException
{
bindings.put(name, obj);
}
.....

最终和服务端暴露服务类似,会把RegistryImpl对象暴露出去

当服务提供方调用Naming.rebind(“rmi://“+host+”:”+port+”/hello”, helloService);注册服务时
会先根据端口获取到暴露的注册中心对象RegistryImpl,然后调用其方法注册
对应的客户端获取服务的过程也是类似(HelloService) Naming.lookup(“rmi://“+Server.host+”:”+Server.port+”/hello”);
会先根据端口获取到暴露的注册中心对象RegistryImpl,然后调用其方法获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void rebind(String name, Remote obj)
throws RemoteException, java.net.MalformedURLException
{
ParsedNamingURL parsed = parseURL(name);
Registry registry = getRegistry(parsed);

if (obj == null)
throw new NullPointerException("cannot bind to null");

registry.rebind(parsed.name, obj);
}
public static Remote lookup(String name)
throws NotBoundException,
java.net.MalformedURLException,
RemoteException
{
ParsedNamingURL parsed = parseURL(name);
Registry registry = getRegistry(parsed);

if (parsed.name == null)
return registry;
return registry.lookup(parsed.name);
}

更多精彩内容

请移步

个人主页: yangyitao.top

SpringCloud的LoadBalance源码分析

欢迎查看Eetal的第二十篇博客–SpringCloud的LoadBalance源码分析

相关核心类

LoadBalanced
LoadBalancerClient
LoadBalancerAutoConfiguration
LoadBalancerInterceptor
RibbonAutoConfiguration
RibbonClassesConditions
AllNestedConditions
AbstractNestedCondition
AbstractNestedCondition.MemberMatchOutcomes
AbstractNestedCondition.MemberConditions
SpringBootCondition
ConditionOutcome

LoadBalanced注解与功能

通过在创建的RestTemplate上加入LoadBalanced注解,则该RestTemplate会自动成为负载均衡的RestTeplate,请求的url的hostname会作为服务名称去解析
用法如下

1
2
3
4
5
6
7
8
9
10
@LoadBalanced
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}

public String requestTest(){
String url = "http://8781-eurekaClient/getClient2Info";
return restTemplate.getForObject(url, String.class);
}

等价于以下写法

1
2
3
4
5
6
7
8
9
10
11
12
@Autowired
RestTemplate restTemplate;

@Autowired
private LoadBalancerClient client;

@RequestMapping("/getProviderInfo")
public String getProviderInfo() {
ServiceInstance instance = client.choose("8780-eurekaClient");
String url = "http://" + instance.getHost() + ":" + instance.getPort() + "/getInfo";
return restTemplate.getForObject(url, String.class);
}

源码分析

1.LoadBalanced注解

1
2
3
4
5
6
7
8
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {

}

这里注意,该注解被标记了@Qualifier注解,后续使用@Autowired注解注入时,如果注解的属性也被LoadBalanced注解,则只装载一样被LoadBalanced注解的符合autowired的对象
2.LoadBalnceAutoConfigure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Configuration
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();

@Autowired(required = false)
private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();

@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}

@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(
LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}

@Configuration
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {

@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}

@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}

}
......
}

@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
这个配置类依赖于加载RestTemplate类以及创建了LoadBalancerClient对象
其中会创建一个RestTemplateCustomizer提供给上方创建智能初始化的bean,该bean初始化过程会使用这个RestTemplateCustomizer,为每个loadBalanced注解的restTemplate加上一个负载均衡的拦截器
如果配置生效,根据前面对LoadBalanced注解了解,此处的集合成员restTemplates就是IOC容器中,被LoadBalanced注解了的RestTemplate
RestTemplate是我们手动的导入,接下来查找LoadBalancerClient对象,在RibbonAutoConfiguration中
拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

private LoadBalancerClient loadBalancer;

private LoadBalancerRequestFactory requestFactory;

public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null,
"Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution));
}

}

Ribbon自动配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Configuration
@Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)
@RibbonClients
@AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
@AutoConfigureBefore({ LoadBalancerAutoConfiguration.class,
AsyncLoadBalancerAutoConfiguration.class })
@EnableConfigurationProperties({ RibbonEagerLoadProperties.class,
ServerIntrospectorProperties.class })
public class RibbonAutoConfiguration {


@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(springClientFactory());
}

/**
* {@link AllNestedConditions} that checks that either multiple classes are present.
*/
static class RibbonClassesConditions extends AllNestedConditions {

RibbonClassesConditions() {
super(ConfigurationPhase.PARSE_CONFIGURATION);
}

@ConditionalOnClass(IClient.class)
static class IClientPresent {

}

@ConditionalOnClass(RestTemplate.class)
static class RestTemplatePresent {

}

@ConditionalOnClass(AsyncRestTemplate.class)
static class AsyncRestTemplatePresent {

}

@ConditionalOnClass(Ribbon.class)
static class RibbonPresent {

}

}

......
}

当IOC容器没有LoadBalancerClient实例时,RibbonAutoConfiguration会自动创建一个Ribbon实现的负载均衡客户端,是其子类
这也是为什么springCloud的Eureka客户端会默认使用Ribbon作为负载均衡
因为spring-cloud-starter-netflix-eureka-client工程引入依赖了spring-cloud-starter-netflix-ribbon

@Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)的意思是该要求满足该类中所有condition相关注解(程序加载了对应的四个类)
public abstract class AllNestedConditions extends AbstractNestedCondition
public abstract class AbstractNestedCondition extends SpringBootCondition implements ConfigurationCondition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
public abstract class SpringBootCondition implements Condition {

private final Log logger = LogFactory.getLog(getClass());

@Override
public final boolean matches(ConditionContext context,
AnnotatedTypeMetadata metadata) {
//成立判断方法
String classOrMethodName = getClassOrMethodName(metadata);
try {
ConditionOutcome outcome = getMatchOutcome(context, metadata);//调用getMatchOutcome
logOutcome(classOrMethodName, outcome);
recordEvaluation(context, classOrMethodName, outcome);
return outcome.isMatch();//注意这里,outcome的match属性作为结果
}
catch (NoClassDefFoundError ex) {
throw new IllegalStateException(
"Could not evaluate condition on " + classOrMethodName + " due to "
+ ex.getMessage() + " not "
+ "found. Make sure your own configuration does not rely on "
+ "that class. This can also happen if you are "
+ "@ComponentScanning a springframework package (e.g. if you "
+ "put a @ComponentScan in the default package by mistake)",
ex);
}
catch (RuntimeException ex) {
throw new IllegalStateException(
"Error processing condition on " + getName(metadata), ex);
}
}
......
}

public abstract class AbstractNestedCondition extends SpringBootCondition
implements ConfigurationCondition {


@Override
public ConditionOutcome getMatchOutcome(ConditionContext context,
AnnotatedTypeMetadata metadata) {
String className = getClass().getName();
MemberConditions memberConditions = new MemberConditions(context, className);
MemberMatchOutcomes memberOutcomes = new MemberMatchOutcomes(memberConditions);
return getFinalMatchOutcome(memberOutcomes);
}

protected static class MemberMatchOutcomes {

private final List<ConditionOutcome> all;

private final List<ConditionOutcome> matches;

private final List<ConditionOutcome> nonMatches;

public MemberMatchOutcomes(MemberConditions memberConditions) {
this.all = Collections.unmodifiableList(memberConditions.getMatchOutcomes());
List<ConditionOutcome> matches = new ArrayList<>();
List<ConditionOutcome> nonMatches = new ArrayList<>();
for (ConditionOutcome outcome : this.all) {
//是否满足加到不同属性集合
(outcome.isMatch() ? matches : nonMatches).add(outcome);
}
this.matches = Collections.unmodifiableList(matches);
this.nonMatches = Collections.unmodifiableList(nonMatches);
}
......

}
}
public abstract class AllNestedConditions extends AbstractNestedCondition {


@Override
protected ConditionOutcome getFinalMatchOutcome(MemberMatchOutcomes memberOutcomes) {
boolean match = hasSameSize(memberOutcomes.getMatches(), memberOutcomes.getAll());
//match就是判断结果
List<ConditionMessage> messages = new ArrayList<>();
messages.add(ConditionMessage.forCondition("AllNestedConditions")
.because(memberOutcomes.getMatches().size() + " matched "
+ memberOutcomes.getNonMatches().size() + " did not"));
for (ConditionOutcome outcome : memberOutcomes.getAll()) {
messages.add(outcome.getConditionMessage());
}
return new ConditionOutcome(match, ConditionMessage.of(messages));
}

private boolean hasSameSize(List<?> list1, List<?> list2) {
return list1.size() == list2.size();
}
......
}

ribbon的loadBalanceClient实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
public class RibbonLoadBalancerClient implements LoadBalancerClient {

private SpringClientFactory clientFactory;

public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
this.clientFactory = clientFactory;
}

@Override
public URI reconstructURI(ServiceInstance instance, URI original) {
Assert.notNull(instance, "instance can not be null");
String serviceId = instance.getServiceId();
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);

URI uri;
Server server;
if (instance instanceof RibbonServer) {
RibbonServer ribbonServer = (RibbonServer) instance;
server = ribbonServer.getServer();
uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
}
else {
server = new Server(instance.getScheme(), instance.getHost(),
instance.getPort());
IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
uri = updateToSecureConnectionIfNeeded(original, clientConfig,
serverIntrospector, server);
}
return context.reconstructURIWithServer(server, uri);
}

@Override
public ServiceInstance choose(String serviceId) {
return choose(serviceId, null);
}

/**
* New: Select a server using a 'key'.
* @param serviceId of the service to choose an instance for
* @param hint to specify the service instance
* @return the selected {@link ServiceInstance}
*/
public ServiceInstance choose(String serviceId, Object hint) {
Server server = getServer(getLoadBalancer(serviceId), hint);
if (server == null) {
return null;
}
return new RibbonServer(serviceId, server, isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
}

@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
throws IOException {
return execute(serviceId, request, null);
}

/**
* New: Execute a request by selecting server using a 'key'. The hint will have to be
* the last parameter to not mess with the `execute(serviceId, ServiceInstance,
* request)` method. This somewhat breaks the fluent coding style when using a lambda
* to define the LoadBalancerRequest.
* @param <T> returned request execution result type
* @param serviceId id of the service to execute the request to
* @param request to be executed
* @param hint used to choose appropriate {@link Server} instance
* @return request execution result
* @throws IOException executing the request may result in an {@link IOException}
*/
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));

return execute(serviceId, ribbonServer, request);
}

@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance,
LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if (serviceInstance instanceof RibbonServer) {
server = ((RibbonServer) serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}

RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

try {
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
......
}

请移步

个人主页: yangyitao.top

使用HandlerInterceptor和RateLimiter进行限流

欢迎查看Eetal的第十九篇博客–使用HandlerInterceptor和RateLimiter进行限流

为什么使用拦截器

目前网上有博客使用的是aop,但是因为我项目里使用了shiro进行权限验证
shiro的过滤器链优先级是低于spring的过滤器的
所以根据约定优于配置,直接使用过滤器就不需要去配置shiro的一些映射和过滤器的优先级
同时网上博客基于aop为注解添加令牌桶效率的属性是有bug的,因为springMVC多线程是Method级别
而在aop的切面里创建rateLimiter或者为拿到的method对象的annotation对象绑定一个rateLimiter都会有线程问题
基于以上,把ratelimiter拿到拦截器,如果要定义不同rateLimiter可以改为定义多个拦截器和注解

常见限流算法

常见的限流方法有信号量计数器(Semaphore)计数当前线程个数
漏桶算法—使用一个容器保存进来的任务,按照固定速率流出任务,桶满时新加入的任务直接流出
令牌桶算法—使用一个容器存储令牌,按照固定速率生产令牌,令牌桶满时直接丢弃新生成令牌,任务进来以后尝试获取令牌,获取成功的任务开始执行
计数器较麻烦,需要维护实时记录线程数,完成时进行维护
对比令牌桶和漏桶,漏桶算法无法解决当容量空闲了一段时间以后,大量任务一起进来时,执行的平均效率低下的问题,而令牌桶的令牌生成速率是固定的

rateLimiter介绍

RateLimiter是guava提供的基于令牌桶算法的实现类
create(Double permitsPerSecond)方法根据给定的(令牌:单位时间(1s))比例为令牌生成速率
tryAcquire()方法尝试获取一个令牌,立即返回true/false,不阻塞,重载方法具备设置获取令牌个数、获取最大等待时间等参数
acquire()方法与tryAcquire类似,但是会阻塞,尝试获取一个令牌,没有时则阻塞直到获取成功

注解创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RateLimit {

/**
* 获取令牌的等待时间 默认0
* @return
*/
int value() default 0;


/**
* 超时时间单位
* @return
*/
TimeUnit timeOutUnit() default TimeUnit.MILLISECONDS;

}

拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class RateLimitInterceptor implements HandlerInterceptor {

private final static Logger logger = LoggerFactory.getLogger(RateLimitInterceptor.class);

//每s产生2000个令牌
RateLimiter rateLimiter = RateLimiter.create(2000);

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) throws Exception {
//如果是SpringMVC请求
if(handler instanceof HandlerMethod){
HandlerMethod handlerMethod = (HandlerMethod) handler;
RateLimit rateLimit = handlerMethod.getMethodAnnotation(RateLimit.class);
if(rateLimit != null) {
logger.info("rateLimit intercept method...");
if (!rateLimiter.tryAcquire(rateLimit.value(), rateLimit.timeOutUnit())) {
throw new Exception("系统繁忙,新稍后再试!");
//这里抛出异常是因为我项目里对异常进行了全局处理
}
}
}
return true;
}

}

springBoot拦截器配置

1
2
3
4
5
6
7
8
9
10
@Configuration
public class WebMVCConfig implements WebMvcConfigurer {

@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new RateLimitInterceptor()).addPathPatterns("/**")
.excludePathPatterns("/static/**", "/");
//配置限流拦截器拦截以及不拦截的映射
}
}

开始使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@RateLimit(1000)	//最大等待时间1s
@RequestMapping("/login")
public ModelAndView login(User user) {
ModelAndView mv = new ModelAndView();
String username=user.getUsername();
String password=user.getPassword();
UsernamePasswordToken token = new UsernamePasswordToken(username,password,false);
Subject subject = SecurityUtils.getSubject();
try {
subject.login(token);
} catch(IncorrectCredentialsException e){
mv.addObject("msg", "密码错误");
mv.setViewName("/login.html");
return mv;
} catch (AuthenticationException e) {
mv.addObject("msg", "登录失败");
mv.setViewName("/login.html");
return mv;
}

mv.addObject("userDto",(UserDto)subject.getSession().getAttribute("userDto"));
mv.setViewName("/index.html");
return mv;
}

请移步

个人主页: yangyitao.top

字符串常量池在1.6与1.8以及String、StringBuffer、StringBuilder

欢迎查看Eetal的第十八篇博客–字符串常量池在1.6与1.8以及String、StringBuffer、StringBuilder

字符串加入常量池时机

直接使用双引号声明出来的String对象会直接存储在常量池中,相同字符串使用同一个对象
而使用构造函数创建的会新创建一个字符串对象,但是该字符串对象的字符数组value会引用常量池中的字符串对象的字符数组
使用String的concat方法拼接后的字符串使用的是share模式的构造函数进行构造,默认不会引用常量池(此处的share模式有点threadLocal的那种意思,所以是创建副本不去引用常量池)
对于不进入常量池而常量池中没有的字符串,可以显示调用String的intern方法将其加入常量池
引用自博客https://blog.csdn.net/chen1280436393/article/details/51768609
也就是说,代码里双引号直接包含的字符串会被放进常量池
比如

1
2
3
4
5
6
7
8
9
String str1 = "我爱java";//我爱java进入常量池
String str2 = "歪歪"+"梯";//因为编译器会优化,所以等价于"歪歪梯",歪歪梯会放入常量池
String str3 = new String("构造1");//构造1会放入常量池
String str4 = new String("构造2")+"测试";//构造2会放入常量池。测试会放入常量池
String str5 = new String("构造3")+new String("构造4");//构造3会放入常量池。构造4会放入常量池
/*
*最终常量池包括 我爱java 歪歪梯 构造1 构造2 构造3 构造4 测试 7个字符串
*注意非字面常量字符串拼接的结果字符串默认并不会加入常量池(因此也不会默认去引用常量池,这个下面会验证)
*/

如果是直接使用常量字符串赋值的,会直接引用常量池该字符串对象

1
2
3
String str = "123";
String str2 = "1"+"23";
System.out.println(str == str2);//true

要将非字面常量字符串存储进常量池,可以调用字符串对象的intern方法
为什么只存储字面常量,其实也不难理解,jvm虚拟机显示存储字面常量是因为字面常量是直接出现在代码里的,是确定,的同时也是一定会用到的,而且有很大概率用来进行多次拼接(循环体代码),所以放进常量池优化内存

jdk1.6 与 jdk1.8的String和常量池

jdk1.6使用(方法区)永久代存储类的静态信息和运行时常量池,字符串常量池也在其中,因为常量池不在堆内存,只有触发fullgc才会回收常量池中对象

jdk1.6字符串常量池内存示意图

jdk1.6字符串常量池内存示意图

jdk1.8字符串常量池内存示意图

jdk1.8字符串常量池移到元数据,元数据在本地内存,同时字符串加入常量池行为发生变化
jdk1.8字符串常量池内存示意图
jdk1.6在调用intern时,如果常量池没有执行的操作是复制字符串在常量池创建一个字符串对象,并返回该对象
jdk1.8在调用intern时,如果常量池没有则在常量池创建一个字符串引用,引用当前字符串对象,返回this

1
2
3
4
5
6
7
8
9
10
11
String str1 = new StringBuilder("hello").append("java").toString();
String str2 = new StringBuilder("hello").append("java").toString();
//此时常量池只有两个字符串hello和java

//jdk1.6的测试效果
System.out.println(str1.intern()==str1);//false
System.out.println(str2==str2.intern());//false

//jdk1.8的测试效果
System.out.println(str1.intern()==str1);//true
System.out.println(str2==str2.intern());//false

使用jol工具进行验证

验证代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
String str1 = new StringBuilder("hello").append("java").toString();
String str2 = new StringBuilder("hello").append("java").toString();

System.out.println(str1.intern()==str1);
System.out.println(str2==str2.intern());

System.out.println("use jol check ...");

String builderString = new StringBuilder("faker").append("大魔王").toString();
System.out.println("-------------------------------------------------------------------------------------------------------");
System.out.println("builderString info");
System.out.println(GraphLayout.parseInstance(builderString).toPrintable());
System.out.println("-------------------------------------------------------------------------------------------------------");

String sin = builderString.intern();
System.out.println("-------------------------------------------------------------------------------------------------------");
System.out.println("init builderString.intern() info");
System.out.println(GraphLayout.parseInstance(sin).toPrintable());
System.out.println("-------------------------------------------------------------------------------------------------------");

String constStr = "faker大魔王";
System.out.println("-------------------------------------------------------------------------------------------------------");
System.out.println("init String constStr = 'faker大魔王' info");
System.out.println(GraphLayout.parseInstance(constStr).toPrintable());
System.out.println("-------------------------------------------------------------------------------------------------------");

String newString = new String("faker大魔王");
System.out.println("-------------------------------------------------------------------------------------------------------");
System.out.println("init new String('faker大魔王') info");
System.out.println(GraphLayout.parseInstance(newString).toPrintable());
System.out.println("-------------------------------------------------------------------------------------------------------");

String newString2 = new String("faker大魔王");
System.out.println("-------------------------------------------------------------------------------------------------------");
System.out.println("init new String('faker大魔王') 2 info");
System.out.println(GraphLayout.parseInstance(newString2).toPrintable());
System.out.println("-------------------------------------------------------------------------------------------------------");

String newStringConcatString = new String("faker")+"大魔王";
System.out.println("-------------------------------------------------------------------------------------------------------");
System.out.println("init String newStringConcatString = new String('faker')+'大魔王' info");
System.out.println(GraphLayout.parseInstance(newStringConcatString).toPrintable());
System.out.println("-------------------------------------------------------------------------------------------------------");

String newStringConcatString2 = new String("faker")+new String("大魔王");
System.out.println("-------------------------------------------------------------------------------------------------------");
System.out.println("init String newStringConcatString2 = new String('faker')+new String('大魔王') info");
System.out.println(GraphLayout.parseInstance(newStringConcatString2).toPrintable());
System.out.println("-------------------------------------------------------------------------------------------------------");

输出结果—jdk1.6

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
false
false
use jol check ...
-------------------------------------------------------------------------------------------------------
builderString info
java.lang.String@1b17a8bdd object externals:
ADDRESS SIZE TYPE PATH VALUE
7d5efcbe0 32 java.lang.String (object)
7d5efcc00 32 [C .value [f, a, k, e, r, 大, 魔, 王]


-------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------
init builderString.intern() info
java.lang.String@60765a16d object externals:
ADDRESS SIZE TYPE PATH VALUE
77cfe3860 32 java.lang.String (object)
77cfe3880 32 [C .value [f, a, k, e, r, 大, 魔, 王]


-------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------
init String constStr = 'faker大魔王' info
java.lang.String@60765a16d object externals:
ADDRESS SIZE TYPE PATH VALUE
77cfe3860 32 java.lang.String (object)
77cfe3880 32 [C .value [f, a, k, e, r, 大, 魔, 王]


-------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------
init new String('faker大魔王') info
java.lang.String@5ae80842d object externals:
ADDRESS SIZE TYPE PATH VALUE
77cfe3880 32 [C .value [f, a, k, e, r, 大, 魔, 王]
77cfe38a0 1498006336 (something else) (somewhere else) (something else)
7d647fbe0 32 java.lang.String (object)


-------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------
init new String('faker大魔王') 2 info
java.lang.String@377653aed object externals:
ADDRESS SIZE TYPE PATH VALUE
77cfe3880 32 [C .value [f, a, k, e, r, 大, 魔, 王]
77cfe38a0 1498033856 (something else) (somewhere else) (something else)
7d6486760 32 java.lang.String (object)


-------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------
init String newStringConcatString = new String('faker')+'大魔王' info
java.lang.String@396fe0f4d object externals:
ADDRESS SIZE TYPE PATH VALUE
7d648d358 32 java.lang.String (object)
7d648d378 32 [C .value [f, a, k, e, r, 大, 魔, 王]


-------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------
init String newStringConcatString2 = new String('faker')+new String('大魔王') info
java.lang.String@3ed02b51d object externals:
ADDRESS SIZE TYPE PATH VALUE
7d6492950 32 java.lang.String (object)
7d6492970 32 [C .value [f, a, k, e, r, 大, 魔, 王]


-------------------------------------------------------------------------------------------------------

输出结果—jdk1.8

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
true
false
use jol check ...
-------------------------------------------------------------------------------------------------------
builderString info
java.lang.String@1540e19dd object externals:
ADDRESS SIZE TYPE PATH VALUE
d5ff44f8 24 java.lang.String (object)
d5ff4510 32 [C .value [f, a, k, e, r, 大, 魔, 王]


-------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------
init builderString.intern() info
java.lang.String@1540e19dd object externals:
ADDRESS SIZE TYPE PATH VALUE
d5ff44f8 24 java.lang.String (object)
d5ff4510 32 [C .value [f, a, k, e, r, 大, 魔, 王]


-------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------
init String constStr = 'faker大魔王' info
java.lang.String@1540e19dd object externals:
ADDRESS SIZE TYPE PATH VALUE
d5ff44f8 24 java.lang.String (object)
d5ff4510 32 [C .value [f, a, k, e, r, 大, 魔, 王]


-------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------
init new String('faker大魔王') info
java.lang.String@51521cc1d object externals:
ADDRESS SIZE TYPE PATH VALUE
d5ff4510 32 [C .value [f, a, k, e, r, 大, 魔, 王]
d5ff4530 8779704 (something else) (somewhere else) (something else)
d6853ce8 24 java.lang.String (object)


-------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------
init new String('faker大魔王') 2 info
java.lang.String@1b4fb997d object externals:
ADDRESS SIZE TYPE PATH VALUE
d5ff4510 32 [C .value [f, a, k, e, r, 大, 魔, 王]
d5ff4530 8805504 (something else) (somewhere else) (something else)
d685a1b0 24 java.lang.String (object)


-------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------
init String newStringConcatString = new String('faker')+'大魔王' info
java.lang.String@deb6432d object externals:
ADDRESS SIZE TYPE PATH VALUE
d6860660 24 java.lang.String (object)
d6860678 32 [C .value [f, a, k, e, r, 大, 魔, 王]


-------------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------------
init String newStringConcatString2 = new String('faker')+new String('大魔王') info
java.lang.String@694f9431d object externals:
ADDRESS SIZE TYPE PATH VALUE
d6865708 24 java.lang.String (object)
d6865720 32 [C .value [f, a, k, e, r, 大, 魔, 王]


-------------------------------------------------------------------------------------------------------

String、StringBuffer、StringBuilder

StringBuffer是StringBuilder线程安全版本的实现
所以以下着重讲StringBuilder为什么拼接会比String快,同时节约内存
以及为什么在字符串很短以及拼接次数少时,两者效率差距不大而到了次数多时,差距会变化很大

String中的拼接方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
   void getChars(char dst[], int dstBegin) {
System.arraycopy(value, 0, dst, dstBegin, value.length);
}
public String concat(String str) {
int otherLen = str.length();
if (otherLen == 0) {
return this;
}
int len = value.length;
char buf[] = Arrays.copyOf(value, len + otherLen);
str.getChars(buf, len);
return new String(buf, true);
}
/*
* Package private constructor which shares value array for speed.
* this constructor is always expected to be called with share==true.
* a separate constructor is needed because we already have a public
* String(char[]) constructor that makes a copy of the given char[].
*/
String(char[] value, boolean share) {
// assert share : "unshared not supported";
this.value = value;
}

也就是每一次拼接的流程是创建一个长度刚好可以容纳这个拼接后的字符串的新数组,把两者字符复制进去
再来看看StringBuilder的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
   public AbstractStringBuilder append(String str) {
if (str == null)
return appendNull();
int len = str.length();
ensureCapacityInternal(count + len);
str.getChars(0, len, value, count);
count += len;
return this;
}
private void ensureCapacityInternal(int minimumCapacity) {
// overflow-conscious code
if (minimumCapacity - value.length > 0) {
value = Arrays.copyOf(value,
newCapacity(minimumCapacity));
}
}
private int newCapacity(int minCapacity) {
// overflow-conscious code
int newCapacity = (value.length << 1) + 2;
if (newCapacity - minCapacity < 0) {
newCapacity = minCapacity;
}
return (newCapacity <= 0 || MAX_ARRAY_SIZE - newCapacity < 0)
? hugeCapacity(minCapacity)
: newCapacity;
}

StringBuilder和String一样在内部维护一个字符数组,但是StringBuilder在构造时会多预留16个位置
StringBuilder会调用父类AbstractStringBuilder的append方法
具体操作为如果当前数组长度不够存储,则将数组长度扩展为原来的两倍+2,如果扩展后还是不够,再直接一次性扩展到刚好满足
这意味着如果每次拼接的字符串相对当前字符串数组要小的多时,因为每次扩容是倍数增长,所以扩容次数相比String会减少
同时直接操作数组除了创建新数组没有创建别的对象,性能也更好(String的内部数组是final不可变,所以每次拼接都是创建新的String对象以及数组对象)
从这里也可以看出,如果每次拼接的字符串长度是指数增长,都要接近甚至更高于数组最新的长度,那么StringBuilder拼接的效率也会下降到和String差不多(当然这种场景是极少见的)

请移步

个人主页: yangyitao.top