final CountDownLatch coutnDownLatch = new CountDownLatch(1); ZooKeeper zookeeper = new ZooKeeper("192.168.156.171:2181,192.168.156.171:2182,192.168.156.171:2183", 3000, new Watcher() {
public void run() { clientCnxnSocket.introduce(this, sessionId, outgoingQueue); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; long lastPingRwServer = Time.currentElapsedTime(); final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds InetSocketAddress serverAddress = null; while (state.isAlive()) { try { if (!clientCnxnSocket.isConnected()) { // don't re-establish connection if we are closing if (closing) { break; } if (rwServerAddress != null) { serverAddress = rwServerAddress; rwServerAddress = null; } else { serverAddress = hostProvider.next(1000); } startConnect(serverAddress); clientCnxnSocket.updateLastSendAndHeard(); } if (state.isConnected()) { // determine whether we need to send an AuthFailed event. if (zooKeeperSaslClient != null) { boolean sendAuthEvent = false; if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) { try { zooKeeperSaslClient.initialize(ClientCnxn.this); } catch (SaslException e) { LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e); state = States.AUTH_FAILED; sendAuthEvent = true; } } KeeperState authState = zooKeeperSaslClient.getKeeperState(); if (authState != null) { if (authState == KeeperState.AuthFailed) { // An authentication error occurred during authentication with the Zookeeper Server. state = States.AUTH_FAILED; sendAuthEvent = true; } else { if (authState == KeeperState.SaslAuthenticated) { sendAuthEvent = true; } } } if (sendAuthEvent) { eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, authState,null)); if (state == States.AUTH_FAILED) { eventThread.queueEventOfDeath(); } } } to = readTimeout - clientCnxnSocket.getIdleRecv(); } else { to = connectTimeout - clientCnxnSocket.getIdleRecv(); } if (to <= 0) { String warnInfo; warnInfo = "Client session timed out, have not heard from server in " + clientCnxnSocket.getIdleRecv() + "ms" + " for sessionid 0x" + Long.toHexString(sessionId); LOG.warn(warnInfo); throw new SessionTimeoutException(warnInfo); } if (state.isConnected()) { //1000(1 second) is to prevent race condition missing to send the second ping //also make sure not to send too many pings when readTimeout is small int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { sendPing(); clientCnxnSocket.updateLastSend(); } else { if (timeToNextPing < to) { to = timeToNextPing; } } } // If we are in read-only mode, seek for read/write server if (state == States.CONNECTEDREADONLY) { long now = Time.currentElapsedTime(); int idlePingRwServer = (int) (now - lastPingRwServer); if (idlePingRwServer >= pingRwTimeout) { lastPingRwServer = now; idlePingRwServer = 0; pingRwTimeout = Math.min(2*pingRwTimeout, maxPingRwTimeout); pingRwServer(); } to = Math.min(to, pingRwTimeout - idlePingRwServer); } clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); } catch (Throwable e) { if (closing) { if (LOG.isDebugEnabled()) { // closing so this is expected LOG.debug("An exception was thrown while closing send thread for session 0x" + Long.toHexString(getSessionId()) + " : " + e.getMessage()); } break; } else { // this is ugly, you have a better way speak up if (e instanceof SessionExpiredException) { LOG.info(e.getMessage() + ", closing socket connection"); } else if (e instanceof SessionTimeoutException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } else if (e instanceof EndOfStreamException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } else if (e instanceof RWServerFoundException) { LOG.info(e.getMessage()); } else if (e instanceof SocketException) { LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage()); } else { LOG.warn("Session 0x{} for server {}, unexpected error{}", Long.toHexString(getSessionId()), serverAddress, RETRY_CONN_MSG, e); } // At this point, there might still be new packets appended to outgoingQueue. // they will be handled in next connection or cleared up if closed. cleanAndNotifyState(); } } } synchronized (state) { // When it comes to this point, it guarantees that later queued // packet to outgoingQueue will be notified of death. cleanup(); } clientCnxnSocket.close(); if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); } eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null)); ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId())); }
@Override private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) { updateNow(); boolean anyPacketsSent = false; while (true) { if (p != WakeupPacket.getInstance()) { if ((p.requestHeader != null) && (p.requestHeader.getType() != ZooDefs.OpCode.ping) && (p.requestHeader.getType() != ZooDefs.OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); synchronized (pendingQueue) { pendingQueue.add(p); } } sendPktOnly(p); anyPacketsSent = true; } if (outgoingQueue.isEmpty()) { break; } p = outgoingQueue.remove(); } // TODO: maybe we should flush in the loop above every N packets/bytes? // But, how do we determine the right value for N ... if (anyPacketsSent) { channel.flush(); } }
replyHdr.deserialize(bbia, "header"); if (replyHdr.getXid() == -2) { // -2 is the xid for pings if (LOG.isDebugEnabled()) { LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms"); } return; } if (replyHdr.getXid() == -4) { // -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); eventThread.queueEventOfDeath(); } if (LOG.isDebugEnabled()) { LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId)); } return; } if (replyHdr.getXid() == -1) { // -1 means notification if (LOG.isDebugEnabled()) { LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId)); } WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response");
// convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); elseif (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } }
WatchedEvent we = new WatchedEvent(event); if (LOG.isDebugEnabled()) { LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId)); }
eventThread.queueEvent( we ); return; }
// If SASL authentication is currently in progress, construct and // send a response packet immediately, rather than queuing a // response as with other packets. if (tunnelAuthInProgress()) { GetSASLRequest request = new GetSASLRequest(); request.deserialize(bbia,"token"); zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this); return; }
Packet packet; synchronized (pendingQueue) { if (pendingQueue.size() == 0) { throw new IOException("Nothing in the queue, but got " + replyHdr.getXid()); } packet = pendingQueue.remove(); } /* * Since requests are processed in order, we better get a response * to the first request! */ try { if (packet.requestHeader.getXid() != replyHdr.getXid()) { packet.replyHeader.setErr( KeeperException.Code.CONNECTIONLOSS.intValue()); throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet ); }
public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) { Set<Watcher> result = new HashSet<Watcher>();
switch (type) { case None: result.add(defaultWatcher); boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected; synchronized(dataWatches) { for(Set<Watcher> ws: dataWatches.values()) { result.addAll(ws); } if (clear) { dataWatches.clear(); } }
private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>(); public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) { Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(), // where the packet is actually sent. packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; packet.watchDeregistration = watchDeregistration; // The synchronized block here is for two purpose: // 1. synchronize with the final cleanup() in SendThread.run() to avoid race // 2. synchronized against each packet. So if a closeSession packet is added, // later packet will be notified. synchronized (state) { if (!state.isAlive() || closing) { conLossPacket(packet); } else { // If the client is asking to close the session then // mark as closing if (h.getType() == OpCode.closeSession) { closing = true; } outgoingQueue.add(packet); } } sendThread.getClientCnxnSocket().packetAdded(); return packet; }
// Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); //如果是集群模式,使用runFromConfig方法,否则使用ZooKeeperServerMain.main方法运行 if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args); } }
@Override public synchronized void start() { if (!getView().containsKey(myid)) { //getView是根据前面配置信息解析设置的QuorumVerifier,获取全部集群机器的id和ip端口信息 //此处判断当前机器是否在集群里 throw new RuntimeException("My id " + myid + " not in the peer list"); } loadDataBase();//从数据日志文件恢复数据到内存 ...... startLeaderElection();//开启选举 super.start();//线程类的start方法,开始运行run }
选举运行,此时刚进入集群节点状态为LOOKING,第一票投票给自己
1 2 3
public class QuorumPeerConfig { protected int electionAlg = 3; }
synchronized public void startLeaderElection() { ...... if (getPeerState() == ServerState.LOOKING) { //投票给自己 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } ...... this.electionAlg = createElectionAlgorithm(electionType);//挑选选举算法---配置文件不设置默认为3 } protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null;
//TODO: use a factory rather than a switch switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: QuorumCnxManager qcm = createCnxnManager(); QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm); if (oldQcm != null) { LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)"); oldQcm.halt(); } QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start(); FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start(); le = fle; } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; }
public class MBeanRegistry { private static final Logger LOG = LoggerFactory.getLogger(MBeanRegistry.class); private static volatile MBeanRegistry instance = new MBeanRegistry(); private final Object LOCK = new Object(); private Map<ZKMBeanInfo, String> mapBean2Path = new ConcurrentHashMap<ZKMBeanInfo, String>(); private MBeanServer mBeanServer;
/** * Useful for unit tests. Change the MBeanRegistry instance * * @param instance new instance */ public static void setInstance(MBeanRegistry instance) { MBeanRegistry.instance = instance; }
public static MBeanRegistry getInstance() { return instance; }
public MBeanRegistry () { try { mBeanServer = ManagementFactory.getPlatformMBeanServer(); } catch (Error e) { // Account for running within IKVM and create a new MBeanServer // if the PlatformMBeanServer does not exist. mBeanServer = MBeanServerFactory.createMBeanServer(); } }
/** * Return the underlying MBeanServer that is being * used to register MBean's. The returned MBeanServer * may be a new empty MBeanServer if running through IKVM. */ public MBeanServer getPlatformMBeanServer() { return mBeanServer; } /** * Registers a new MBean with the platform MBean server. * @param bean the bean being registered * @param parent if not null, the new bean will be registered as a child * node of this parent. */ public void register(ZKMBeanInfo bean, ZKMBeanInfo parent) throws JMException { assert bean != null; String path = null; if (parent != null) { path = mapBean2Path.get(parent); assert path != null; } path = makeFullPath(path, parent); if(bean.isHidden()) return; ObjectName oname = makeObjectName(path, bean); try { synchronized (LOCK) { mBeanServer.registerMBean(bean, oname); mapBean2Path.put(bean, path); } } catch (JMException e) { LOG.warn("Failed to register MBean " + bean.getName()); throw e; } }
jmxQuorumBean = new QuorumBean(this); MBeanRegistry.getInstance().register(jmxQuorumBean, null); for(QuorumServer s: getView().values()){ ZKMBeanInfo p; if (getId() == s.id) { p = jmxLocalPeerBean = new LocalPeerBean(this); try { MBeanRegistry.getInstance().register(p, jmxQuorumBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxLocalPeerBean = null; } } else { RemotePeerBean rBean = new RemotePeerBean(this, s); try { MBeanRegistry.getInstance().register(rBean, jmxQuorumBean); jmxRemotePeerBean.put(s.id, rBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); } } }
/* * Main loop */ while (running) { switch (getPeerState()) { case LOOKING: LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled")) { LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb); // Instead of starting roZk immediately, wait some grace // period before we decide we're partitioned. // // Thread is used here because otherwise it would require // changes in each of election strategy classes which is // unnecessary code coupling. Thread roZkMgr = new Thread() { public void run() { try { // lower-bound grace period to 2 secs sleep(Math.max(2000, tickTime)); if (ServerState.LOOKING.equals(getPeerState())) { roZk.startup(); } } catch (InterruptedException e) { LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started"); } catch (Exception e) { LOG.error("FAILED to start ReadOnlyZooKeeperServer", e); } } }; try { roZkMgr.start(); reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } finally { // If the thread is in the the grace period, interrupt // to come out of waiting. roZkMgr.interrupt(); roZk.shutdown(); } } else { try { reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } } break; ..... } ...... }
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){ this.stop = false; this.manager = manager; starter(self, manager); } LinkedBlockingQueue<ToSend> sendqueue; LinkedBlockingQueue<Notification> recvqueue; /** * This method is invoked by the constructor. Because it is a * part of the starting procedure of the object that must be on * any constructor of this class, it is probably best to keep as * a separate method. As we have a single constructor currently, * it is not strictly necessary to have it separate. * * @param self QuorumPeer that created this object * @param manager Connection manager */ private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; proposedLeader = -1; proposedZxid = -1;
sendqueue = new LinkedBlockingQueue<ToSend>(); recvqueue = new LinkedBlockingQueue<Notification>(); this.messenger = new Messenger(manager); }
Messenger(QuorumCnxManager manager) {
this.ws = new WorkerSender(manager);
this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); this.wsThread.setDaemon(true);
LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid)); sendNotifications();
/* * Loop inwhich we exchange notifications until we find a leader */
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ /* * Remove next notification from queue, times out after 2 times * the termination time */ Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
/* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */ if(n == null){ if(manager.haveDelivered()){ //如果全部发送完了,再次发送一次投票信息 sendNotifications(); } else { //没发送完,可能出现故障,重新连接每一台集群中的机器 manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } else if (validVoter(n.sid) && validVoter(n.leader)) { /* * Only proceed if the vote comes from a replica in the current or next * voting view for a replica in the current or next voting view. */ switch (n.state) { case LOOKING: // If notification > current, replace and send messages out if (n.electionEpoch > logicalclock.get()) { logicalclock.set(n.electionEpoch); recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } if(LOG.isDebugEnabled()){ LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } // don't care about the version if it's in LOOKING state recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } } break; ...... } } else { if (!validVoter(n.leader)) { LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid); } if (!validVoter(n.sid)) { LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid); } } } return null; } finally { try { if(self.jmxLeaderElectionBean != null){ MBeanRegistry.getInstance().unregister( self.jmxLeaderElectionBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } self.jmxLeaderElectionBean = null; LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount()); } }
if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } }
/* * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } } protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) { SyncedLearnerTracker voteSet = new SyncedLearnerTracker(); voteSet.addQuorumVerifier(self.getQuorumVerifier()); if (self.getLastSeenQuorumVerifier() != null && self.getLastSeenQuorumVerifier().getVersion() > self .getQuorumVerifier().getVersion()) { voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } /* * First make the views consistent. Sometimes peers will have different * zxids for a server depending on timing. */ for (Map.Entry<Long, Vote> entry : votes.entrySet()) { if (vote.equals(entry.getValue())) { voteSet.addAck(entry.getKey()); } } return voteSet.hasAllQuorums(); }
SyncedLearnerTracker中投票结果的判定是这样实现的
1 2 3 4 5 6 7
public boolean hasAllQuorums() { for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) { if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())) returnfalse; } returntrue; }
void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode) throws IOException, ConfigException { quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode); setupMyId(); setupClientPort(); setupPeerType(); checkValidity(); } public static QuorumVerifier parseDynamicConfig(Properties dynamicConfigProp, int eAlg, boolean warnings, boolean configBackwardCompatibilityMode) throws IOException, ConfigException { boolean isHierarchical = false; for (Entry<Object, Object> entry : dynamicConfigProp.entrySet()) { String key = entry.getKey().toString().trim(); if (key.startsWith("group") || key.startsWith("weight")) { isHierarchical = true; } elseif (!configBackwardCompatibilityMode && !key.startsWith("server.") && !key.equals("version")){ LOG.info(dynamicConfigProp.toString()); throw new ConfigException("Unrecognised parameter: " + key); } } QuorumVerifier qv = createQuorumVerifier(dynamicConfigProp, isHierarchical); int numParticipators = qv.getVotingMembers().size(); int numObservers = qv.getObservingMembers().size(); if (numParticipators == 0) { if (!standaloneEnabled) { throw new IllegalArgumentException("standaloneEnabled = false then " + "number of participants should be >0"); } if (numObservers > 0) { throw new IllegalArgumentException("Observers w/o participants is an invalid configuration"); } } elseif (numParticipators == 1 && standaloneEnabled) { // HBase currently adds a single server line to the config, for // b/w compatibility reasons we need to keep this here. If standaloneEnabled // is true, the QuorumPeerMain script will create a standalone server instead // of a quorum configuration LOG.error("Invalid configuration, only one server specified (ignoring)"); if (numObservers > 0) { throw new IllegalArgumentException("Observers w/o quorum is an invalid configuration"); } } else { if (warnings) { if (numParticipators <= 2) { LOG.warn("No server failure will be tolerated. " + "You need at least 3 servers."); } elseif (numParticipators % 2 == 0) { LOG.warn("Non-optimial configuration, consider an odd number of servers."); } } /* * If using FLE, then every server requires a separate election * port. */ if (eAlg != 0) { for (QuorumServer s : qv.getVotingMembers().values()) { if (s.electionAddr == null) throw new IllegalArgumentException( "Missing election port for server: " + s.id); } } } return qv; } private static QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp, boolean isHierarchical) throws ConfigException{ if(isHierarchical){ return new QuorumHierarchical(dynamicConfigProp); } else { /* * The default QuorumVerifier is QuorumMaj */ //LOG.info("Defaulting to majority quorums"); return new QuorumMaj(dynamicConfigProp); } }
SecurityManager sm = System.getSecurityManager(); if (sm != null) { Permission perm = new MBeanServerPermission("createMBeanServer"); sm.checkPermission(perm); }
if (platformMBeanServer == null) { platformMBeanServer = MBeanServerFactory.createMBeanServer(); for (PlatformComponent pc : PlatformComponent.values()) { List<? extends PlatformManagedObject> list = pc.getMXBeans(pc.getMXBeanInterface()); for (PlatformManagedObject o : list) { // Each PlatformComponent represents one management // interface. Some MXBean may extend another one. // The MXBean instances for one platform component // (returned by pc.getMXBeans()) might be also // the MXBean instances for another platform component. // e.g. com.sun.management.GarbageCollectorMXBean // // So need to check if an MXBean instance is registered // before registering into the platform MBeanServer if (!platformMBeanServer.isRegistered(o.getObjectName())) { addMXBean(platformMBeanServer, o); } } } HashMap<ObjectName, DynamicMBean> dynmbeans = ManagementFactoryHelper.getPlatformDynamicMBeans(); for (Map.Entry<ObjectName, DynamicMBean> e : dynmbeans.entrySet()) { addDynamicMBean(platformMBeanServer, e.getValue(), e.getKey()); } for (final PlatformManagedObject o : ExtendedPlatformComponent.getMXBeans()) { if (!platformMBeanServer.isRegistered(o.getObjectName())) { addMXBean(platformMBeanServer, o); } } } return platformMBeanServer;
/** * Class loading system of the Java virtual machine. */ CLASS_LOADING( "java.lang.management.ClassLoadingMXBean", "java.lang", "ClassLoading", defaultKeyProperties(), true, // singleton new MXBeanFetcher<ClassLoadingMXBean>() { public List<ClassLoadingMXBean> getMXBeans() { return Collections.singletonList(ManagementFactoryHelper.getClassLoadingMXBean()); } }),
/** * Compilation system of the Java virtual machine. */ COMPILATION( "java.lang.management.CompilationMXBean", "java.lang", "Compilation", defaultKeyProperties(), true, // singleton new MXBeanFetcher<CompilationMXBean>() { public List<CompilationMXBean> getMXBeans() { CompilationMXBean m = ManagementFactoryHelper.getCompilationMXBean(); if (m == null) { return Collections.emptyList(); } else { return Collections.singletonList(m); } } }), ..... }
public static MBeanServer createMBeanServer() { return createMBeanServer(null); } public static MBeanServer createMBeanServer(String domain) { checkPermission("createMBeanServer");
final MBeanServer mBeanServer = newMBeanServer(domain); addMBeanServer(mBeanServer); return mBeanServer; } public static MBeanServer newMBeanServer(String domain) { checkPermission("newMBeanServer");
// Get the builder. Creates a new one if necessary. // final MBeanServerBuilder mbsBuilder = getNewMBeanServerBuilder(); // Returned value cannot be null. NullPointerException if violated.
synchronized(mbsBuilder) { final MBeanServerDelegate delegate = mbsBuilder.newMBeanServerDelegate(); if (delegate == null) { final String msg = "MBeanServerBuilder.newMBeanServerDelegate() " + "returned null"; throw new JMRuntimeException(msg); } final MBeanServer mbeanServer = mbsBuilder.newMBeanServer(domain,null,delegate); if (mbeanServer == null) { final String msg = "MBeanServerBuilder.newMBeanServer() returned null"; throw new JMRuntimeException(msg); } return mbeanServer; } }
public MBeanServer newMBeanServer(String defaultDomain, MBeanServer outer, MBeanServerDelegate delegate) { // By default, MBeanServerInterceptors are disabled. // Use com.sun.jmx.mbeanserver.MBeanServerBuilder to obtain // MBeanServers on which MBeanServerInterceptors are enabled. return JmxMBeanServer.newMBeanServer(defaultDomain,outer,delegate, false); }
//in JmxMBeanServer public static MBeanServer newMBeanServer(String defaultDomain, MBeanServer outer, MBeanServerDelegate delegate, boolean interceptors) { // Determine whether to use fair locking for the repository. // Default is true. final boolean fairLock = DEFAULT_FAIR_LOCK_POLICY;
checkNewMBeanServerPermission();
// This constructor happens to disregard the value of the interceptors // flag - that is, it always uses the default value - false. // This is admitedly a bug, but we chose not to fix it for now // since we would rather not have anybody depending on the Sun private // interceptor APIs - which is most probably going to be removed and // replaced by a public (javax) feature in the future. // return new JmxMBeanServer(defaultDomain,outer,delegate,null, interceptors,fairLock); }
if (instantiator == null) { final ModifiableClassLoaderRepository clr = new ClassLoaderRepositorySupport(); instantiator = new MBeanInstantiator(clr); }
final MBeanInstantiator fInstantiator = instantiator; this.secureClr = new SecureClassLoaderRepository(AccessController.doPrivileged(new PrivilegedAction<ClassLoaderRepository>() { @Override public ClassLoaderRepository run() { return fInstantiator.getClassLoaderRepository(); } }) ); if (delegate == null) delegate = new MBeanServerDelegateImpl(); if (outer == null) outer = this;
if (object == null) { final RuntimeException wrapped = new IllegalArgumentException("Cannot add null object"); throw new RuntimeOperationsException(wrapped, "Exception occurred trying to register the MBean"); }
// preRegister returned successfully, so from this point on we // must call postRegister(false) if there is any problem. boolean registered = false; boolean registerFailed = false; ResourceContext context = null;
try { if (mbean instanceof DynamicMBean2) { try { ((DynamicMBean2) mbean).preRegister2(server, logicalName); registerFailed = true; // until we succeed } catch (Exception e) { if (e instanceof RuntimeException) throw (RuntimeException) e; if (e instanceof InstanceAlreadyExistsException) throw (InstanceAlreadyExistsException) e; throw new RuntimeException(e); } }
if (logicalName != name && logicalName != null) { logicalName = ObjectName.getInstance(nonDefaultDomain(logicalName)); }
if (logicalName == null) { final RuntimeException wrapped = new IllegalArgumentException("No object name specified"); throw new RuntimeOperationsException(wrapped, "Exception occurred trying to register the MBean"); }
final Object resource = getResource(mbean);
// Register the MBean with the repository. // Returns the resource context that was used. // The returned context does nothing for regular MBeans. // For ClassLoader MBeans the context makes it possible to register these // objects with the appropriate framework artifacts, such as // the CLR, from within the repository lock. // In case of success, we also need to call context.done() at the // end of this method. // context = registerWithRepository(resource, mbean, logicalName);
// Extract the domain name. String dom = name.getDomain().intern(); boolean to_default_domain = false;
// Set domain to default if domain is empty and not already set if (dom.length() == 0) name = Util.newObjectName(domain + name.toString());
// Do we have default domain ? if (dom == domain) { // ES: OK (dom & domain are interned) to_default_domain = true; dom = domain; } else { to_default_domain = false; }
// Validate name for an object if (name.isPattern()) { throw new RuntimeOperationsException( new IllegalArgumentException("Repository: cannot add mbean for " + "pattern name " + name.toString())); }
lock.writeLock().lock(); try { // Domain cannot be JMImplementation if entry does not exist if ( !to_default_domain && dom.equals("JMImplementation") && domainTb.containsKey("JMImplementation")) { throw new RuntimeOperationsException( new IllegalArgumentException( "Repository: domain name cannot be JMImplementation")); }
// If domain does not already exist, add it to the hash table final Map<String,NamedObject> moiTb = domainTb.get(dom); if (moiTb == null) { addNewDomMoi(object, dom, name, context); return; } else { // Add instance if not already present String cstr = name.getCanonicalKeyPropertyListString(); NamedObject elmt= moiTb.get(cstr); if (elmt != null) { throw new InstanceAlreadyExistsException(name.toString()); } else { nbElements++; addMoiToTb(object,name,cstr,moiTb,context); } }
} finally { lock.writeLock().unlock(); } } private void addMoiToTb(final DynamicMBean object, final ObjectName name, final String key, final Map<String,NamedObject> moiTb, final RegistrationContext context) { registering(context); moiTb.put(key,new NamedObject(name, object)); }
public static JMXConnectorServer newJMXConnectorServer(JMXServiceURL serviceURL, Map<String,?> environment, MBeanServer mbeanServer) throws IOException { Map<String, Object> envcopy; if (environment == null) envcopy = new HashMap<String, Object>(); else { EnvHelp.checkAttributes(environment); envcopy = new HashMap<String, Object>(environment); }
final Class<JMXConnectorServerProvider> targetInterface = JMXConnectorServerProvider.class; final ClassLoader loader = JMXConnectorFactory.resolveClassLoader(envcopy); final String protocol = serviceURL.getProtocol(); final String providerClassName = "ServerProvider";
if (url == null) throw new IllegalArgumentException("Null JMXServiceURL"); if (rmiServerImpl == null) { final String prt = url.getProtocol(); if (prt == null || !(prt.equals("rmi") || prt.equals("iiop"))) { final String msg = "Invalid protocol type: " + prt; throw new MalformedURLException(msg); } final String urlPath = url.getURLPath(); if (!urlPath.equals("") && !urlPath.equals("/") && !urlPath.startsWith("/jndi/")) { final String msg = "URL path must be empty or start with " + "/jndi/"; throw new MalformedURLException(msg); } }
public synchronized void start() throws IOException { final boolean tracing = logger.traceOn();
if (state == STARTED) { if (tracing) logger.trace("start", "already started"); return; } elseif (state == STOPPED) { if (tracing) logger.trace("start", "already stopped"); throw new IOException("The server has been stopped."); }
if (getMBeanServer() == null) throw new IllegalStateException("This connector server is not " + "attached to an MBean server");
// Check the internal access file property to see // if an MBeanServerForwarder is to be provided // if (attributes != null) { // Check if access file property is specified // String accessFile = (String) attributes.get("jmx.remote.x.access.file"); if (accessFile != null) { // Access file property specified, create an instance // of the MBeanServerFileAccessController class // MBeanServerForwarder mbsf; try { mbsf = new MBeanServerFileAccessController(accessFile); } catch (IOException e) { throw EnvHelp.initCause( new IllegalArgumentException(e.getMessage()), e); } // Set the MBeanServerForwarder // setMBeanServerForwarder(mbsf); } }
try { if (tracing) logger.trace("start", "setting default class loader"); defaultClassLoader = EnvHelp.resolveServerClassLoader( attributes, getMBeanServer()); } catch (InstanceNotFoundException infc) { IllegalArgumentException x = new IllegalArgumentException("ClassLoader not found: "+infc); throw EnvHelp.initCause(x,infc); }
if (tracing) logger.trace("start", "setting RMIServer object"); final RMIServerImpl rmiServer;
if (tracing) logger.trace("start", JNDI_REBIND_ATTRIBUTE + "=" + rebind);
try { if (tracing) logger.trace("start", "binding to " + jndiUrl);
final Hashtable<?, ?> usemap = EnvHelp.mapToHashtable(attributes);
bind(jndiUrl, usemap, objref, rebind);
boundJndiUrl = jndiUrl; } catch (NamingException e) { // fit e in the nested exception if we are on 1.4 throw newIOException("Cannot bind to URL ["+jndiUrl+"]: " + e, e); } } else { // if jndiURL is null, we must encode the stub into the URL. if (tracing) logger.trace("start", "Encoding URL");
encodeStubInAddress(objref, attributes);
if (tracing) logger.trace("start", "Encoded URL: " + this.address); } } catch (Exception e) { try { rmiServer.close(); } catch (Exception x) { // OK: we are already throwing another exception } if (e instanceof RuntimeException) throw (RuntimeException) e; elseif (e instanceof IOException) throw (IOException) e; else throw newIOException("Got unexpected exception while " + "starting the connector server: " + e, e); }
if (tracing) { logger.trace("start", "Connector Server Address = " + address); logger.trace("start", "started."); } }
前面讲过rmiServerImpl成员是空的,所以这时会调用newServer方法进行创建
1 2 3 4 5 6 7 8 9 10 11 12
RMIServerImpl newServer() throws IOException { final boolean iiop = isIiopURL(address,true); final int port; if (address == null) port = 0; else port = address.getPort(); if (iiop) return newIIOPServer(attributes); else return newJRMPServer(attributes, port); }
void bind(String jndiUrl, Hashtable<?, ?> attributes, RMIServer rmiServer, boolean rebind) throws NamingException, MalformedURLException { // if jndiURL is not null, we nust bind the stub to a // directory. InitialContext ctx = new InitialContext(attributes);
if (rebind) ctx.rebind(jndiUrl, rmiServer); else ctx.bind(jndiUrl, rmiServer); ctx.close(); }
public static Remote createProxy(Class<?> implClass, RemoteRef clientRef, boolean forceStubUse) throws StubNotFoundException { ..... final Class<?>[] interfaces = getRemoteInterfaces(implClass); ..... } private static Class<?>[] getRemoteInterfaces(Class<?> remoteClass) { ArrayList<Class<?>> list = new ArrayList<>(); getRemoteInterfaces(list, remoteClass); return list.toArray(new Class<?>[list.size()]); }
private static void getRemoteInterfaces(ArrayList<Class<?>> list, Class<?> cl) { ..... for (int i = 0; i < interfaces.length; i++) { Class<?> intf = interfaces[i]; if (Remote.class.isAssignableFrom(intf)) { if (!(list.contains(intf))) { Method[] methods = intf.getMethods(); for (int j = 0; j < methods.length; j++) { checkMethod(methods[j]); } list.add(intf); } } } }
private static void checkMethod(Method m) { Class<?>[] ex = m.getExceptionTypes(); for (int i = 0; i < ex.length; i++) { if (ex[i].isAssignableFrom(RemoteException.class)) return; } throw new IllegalArgumentException( "illegal remote method encountered: " + m); }
要发布的服务接口的实现类
1 2 3 4 5 6 7 8 9 10 11 12 13
public class HelloServiceImpl extends UnicastRemoteObject implements HelloService { //需要继承UnicastRemoteObject,因为Remote只是接口,UnicastRemoteObject是Remote的实现类
@Override public String sayHello() throws RemoteException{ return"hello"; }
}
服务发布端与客户消费端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
public class Server { final static String host = "127.0.0.1"; final static int port = 8080; public static void main(String[] args) throws RemoteException, MalformedURLException { HelloService helloService = new HelloServiceImpl(); LocateRegistry.createRegistry(port); Naming.rebind("rmi://"+host+":"+port+"/hello", helloService);//不写port默认是1099 System.out.println("服务启动..."); }
public static TCPEndpoint getLocalEndpoint(int port, RMIClientSocketFactory csf, RMIServerSocketFactory ssf) { /* * Find mapping for an endpoint key to the list of local unique * endpoints for this client/server socket factory pair (perhaps * null) for the specific port. */ TCPEndpoint ep = null;
if (epList == null) { /* * Create new endpoint list. */ ep = new TCPEndpoint(localHost, port, csf, ssf); epList = new LinkedList<TCPEndpoint>(); epList.add(ep); ep.listenPort = port; ep.transport = new TCPTransport(epList); localEndpoints.put(endpointKey, epList);
if (TCPTransport.tcpLog.isLoggable(Log.BRIEF)) { TCPTransport.tcpLog.log(Log.BRIEF, "created local endpoint for socket factory " + ssf + " on port " + port); } } else { synchronized (epList) { ep = epList.getLast(); String lastHost = ep.host; int lastPort = ep.port; TCPTransport lastTransport = ep.transport; // assert (localHost == null ^ lastHost != null) if (localHost != null && !localHost.equals(lastHost)) { /* * Hostname has been updated; add updated endpoint * to list. */ if (lastPort != 0) { /* * Remove outdated endpoints only if the * port has already been set on those endpoints. */ epList.clear(); } ep = new TCPEndpoint(localHost, lastPort, csf, ssf); ep.listenPort = port; ep.transport = lastTransport; epList.add(ep); } } } }
@Configuration @ConditionalOnClass(RestTemplate.class) @ConditionalOnBean(LoadBalancerClient.class) @EnableConfigurationProperties(LoadBalancerRetryProperties.class) public class LoadBalancerAutoConfiguration {
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) { // for backwards compatibility this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer)); }
@Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution)); }
@Configuration @Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class) @RibbonClients @AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration") @AutoConfigureBefore({ LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class }) @EnableConfigurationProperties({ RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class }) public class RibbonAutoConfiguration {
@Bean @ConditionalOnMissingBean(LoadBalancerClient.class) public LoadBalancerClient loadBalancerClient() { return new RibbonLoadBalancerClient(springClientFactory()); }
/** * {@link AllNestedConditions} that checks that either multiple classes are present. */ static class RibbonClassesConditions extends AllNestedConditions {
@Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)的意思是该要求满足该类中所有condition相关注解(程序加载了对应的四个类) public abstract class AllNestedConditions extends AbstractNestedCondition public abstract class AbstractNestedCondition extends SpringBootCondition implements ConfigurationCondition
public abstract class SpringBootCondition implements Condition {
private final Log logger = LogFactory.getLog(getClass());
@Override public final boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { //成立判断方法 String classOrMethodName = getClassOrMethodName(metadata); try { ConditionOutcome outcome = getMatchOutcome(context, metadata);//调用getMatchOutcome logOutcome(classOrMethodName, outcome); recordEvaluation(context, classOrMethodName, outcome); return outcome.isMatch();//注意这里,outcome的match属性作为结果 } catch (NoClassDefFoundError ex) { throw new IllegalStateException( "Could not evaluate condition on " + classOrMethodName + " due to " + ex.getMessage() + " not " + "found. Make sure your own configuration does not rely on " + "that class. This can also happen if you are " + "@ComponentScanning a springframework package (e.g. if you " + "put a @ComponentScan in the default package by mistake)", ex); } catch (RuntimeException ex) { throw new IllegalStateException( "Error processing condition on " + getName(metadata), ex); } } ...... }
public abstract class AbstractNestedCondition extends SpringBootCondition implements ConfigurationCondition {
@Override public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) { String className = getClass().getName(); MemberConditions memberConditions = new MemberConditions(context, className); MemberMatchOutcomes memberOutcomes = new MemberMatchOutcomes(memberConditions); return getFinalMatchOutcome(memberOutcomes); } protected static class MemberMatchOutcomes {
private final List<ConditionOutcome> all;
private final List<ConditionOutcome> matches;
private final List<ConditionOutcome> nonMatches;
public MemberMatchOutcomes(MemberConditions memberConditions) { this.all = Collections.unmodifiableList(memberConditions.getMatchOutcomes()); List<ConditionOutcome> matches = new ArrayList<>(); List<ConditionOutcome> nonMatches = new ArrayList<>(); for (ConditionOutcome outcome : this.all) { //是否满足加到不同属性集合 (outcome.isMatch() ? matches : nonMatches).add(outcome); } this.matches = Collections.unmodifiableList(matches); this.nonMatches = Collections.unmodifiableList(nonMatches); } ......
} } public abstract class AllNestedConditions extends AbstractNestedCondition {
@Override protected ConditionOutcome getFinalMatchOutcome(MemberMatchOutcomes memberOutcomes) { boolean match = hasSameSize(memberOutcomes.getMatches(), memberOutcomes.getAll()); //match就是判断结果 List<ConditionMessage> messages = new ArrayList<>(); messages.add(ConditionMessage.forCondition("AllNestedConditions") .because(memberOutcomes.getMatches().size() + " matched " + memberOutcomes.getNonMatches().size() + " did not")); for (ConditionOutcome outcome : memberOutcomes.getAll()) { messages.add(outcome.getConditionMessage()); } return new ConditionOutcome(match, ConditionMessage.of(messages)); }
public class RibbonLoadBalancerClient implements LoadBalancerClient {
private SpringClientFactory clientFactory;
public RibbonLoadBalancerClient(SpringClientFactory clientFactory) { this.clientFactory = clientFactory; }
@Override public URI reconstructURI(ServiceInstance instance, URI original) { Assert.notNull(instance, "instance can not be null"); String serviceId = instance.getServiceId(); RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId);
URI uri; Server server; if (instance instanceof RibbonServer) { RibbonServer ribbonServer = (RibbonServer) instance; server = ribbonServer.getServer(); uri = updateToSecureConnectionIfNeeded(original, ribbonServer); } else { server = new Server(instance.getScheme(), instance.getHost(), instance.getPort()); IClientConfig clientConfig = clientFactory.getClientConfig(serviceId); ServerIntrospector serverIntrospector = serverIntrospector(serviceId); uri = updateToSecureConnectionIfNeeded(original, clientConfig, serverIntrospector, server); } return context.reconstructURIWithServer(server, uri); }
@Override public ServiceInstance choose(String serviceId) { return choose(serviceId, null); }
/** * New: Select a server using a 'key'. * @param serviceId of the service to choose an instance for * @param hint to specify the service instance * @return the selected {@link ServiceInstance} */ public ServiceInstance choose(String serviceId, Object hint) { Server server = getServer(getLoadBalancer(serviceId), hint); if (server == null) { return null; } return new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); }
@Override public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { return execute(serviceId, request, null); }
/** * New: Execute a request by selecting server using a 'key'. The hint will have to be * the last parameter to not mess with the `execute(serviceId, ServiceInstance, * request)` method. This somewhat breaks the fluent coding style when using a lambda * to define the LoadBalancerRequest. * @param <T> returned request execution result type * @param serviceId id of the service to execute the request to * @param request to be executed * @param hint used to choose appropriate {@link Server} instance * @return request execution result * @throws IOException executing the request may result in an {@link IOException} */ public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException { ILoadBalancer loadBalancer = getLoadBalancer(serviceId); Server server = getServer(loadBalancer, hint); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server));
@Override public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { Server server = null; if (serviceInstance instanceof RibbonServer) { server = ((RibbonServer) serviceInstance).getServer(); } if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); }
void getChars(char dst[], int dstBegin) { System.arraycopy(value, 0, dst, dstBegin, value.length); } public String concat(String str) { int otherLen = str.length(); if (otherLen == 0) { return this; } int len = value.length; char buf[] = Arrays.copyOf(value, len + otherLen); str.getChars(buf, len); return new String(buf, true); } /* * Package private constructor which shares value array for speed. * this constructor is always expected to be called with share==true. * a separate constructor is needed because we already have a public * String(char[]) constructor that makes a copy of the given char[]. */ String(char[] value, boolean share) { // assert share : "unshared not supported"; this.value = value; }