/** * self member obj. * 本机 member */ privatevolatile Member self;
/** * here is always the node information of the "UP" state. * 节点都是UP状态列表 */ privatevolatile Set<String> memberAddressInfos = new ConcurrentHashSet<>();
/** * Broadcast this node element information task. * 广播此节点元素信息任务。 */ privatefinal MemberInfoReportTask infoReportTask = new MemberInfoReportTask();
publicServerMemberManager(ServletContext servletContext)throws Exception { this.serverList = new ConcurrentSkipListMap<>(); EnvUtil.setContextPath(servletContext.getContextPath());
if (isContainSelfIp) { isInIpList = true; } else { isInIpList = false; members.add(this.self); Loggers.CLUSTER.warn("[serverlist] self ip {} not in serverlist {}", self, members); }
// If the number of old and new clusters is different, the cluster information // must have changed; if the number of clusters is the same, then compare whether // there is a difference; if there is a difference, then the cluster node changes // are involved and all recipients need to be notified of the node change event
boolean hasChange = members.size() != serverList.size(); ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>(); Set<String> tmpAddressInfo = new ConcurrentHashSet<>(); for (Member member : members) { final String address = member.getAddress();
if (!serverList.containsKey(address)) { hasChange = true; }
// Ensure that the node is created only once tmpMap.put(address, member); if (NodeState.UP.equals(member.getState())) { tmpAddressInfo.add(address); } }
Loggers.CLUSTER.warn("[serverlist] updated to : {}", finalMembers);
// Persist the current cluster node information to cluster.conf // <important> need to put the event publication into a synchronized block to ensure // that the event publication is sequential // 将当前集群节点信息持久化为cluster.conf <important>需要将事件发布放入同步块中,以确保事件发布是连续的 //如果节点发生了变化 if (hasChange) { //同步到文件 MemberUtil.syncToFile(finalMembers); Event event = MembersChangeEvent.builder().members(finalMembers).build(); //发送事件 NotifyCenter.publishEvent(event); }
/** * List of service nodes, you must ensure that the order of healthyList is the same for all nodes. * 服务节点列表,必须确保所有节点的healthyList顺序相同。 */ privatevolatile List<String> healthyList = new ArrayList<>();
@Override publicvoidonEvent(MembersChangeEvent event){ // Here, the node list must be sorted to ensure that all nacos-server's // node list is in the same order //过滤出是 NodeState.UP 或 NodeState.SUSPICIOUS 状态的节点,原因在 ServerMemberManager 定时任务简介 List<String> list = MemberUtil.simpleMembers(MemberUtil.selectTargetMembers(event.getMembers(), member -> NodeState.UP.equals(member.getState()) || NodeState.SUSPICIOUS.equals(member.getState()))); //必须排序,确保所有服务节点列表顺序都是一样,原因后面 DistroFilter 讲解 Collections.sort(list); Collection<String> old = healthyList; healthyList = Collections.unmodifiableList(list); Loggers.SRV_LOG.info("[NACOS-DISTRO] healthy server list changed, old: {}, new: {}", old, healthyList); }
Loggers.CLUSTER.debug("report the metadata to the node : {}", target.getAddress());
final String url = HttpUtils .buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT, "/cluster/report");
try { asyncRestTemplate .post(url, Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version), Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() { @Override publicvoidonReceive(RestResult<String> result){ if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value() || result.getCode() == HttpStatus.NOT_FOUND.value()) { //警告解释了 Loggers.CLUSTER .warn("{} version is too low, it is recommended to upgrade the version : {}", target, VersionUtils.version); return; } if (result.ok()) { //成功逻辑 MemberUtil.onSuccess(ServerMemberManager.this, target); } else { Loggers.CLUSTER .warn("failed to report new info to target node : {}, result : {}", target.getAddress(), result); //失败 MemberUtil.onFail(ServerMemberManager.this, target); } }
@Override publicvoidonError(Throwable throwable){ Loggers.CLUSTER .error("failed to report new info to target node : {}, error : {}", target.getAddress(), ExceptionUtil.getAllExceptionMsg(throwable)); //失败 MemberUtil.onFail(ServerMemberManager.this, target, throwable); }
@Override publicvoidonCancel(){
} }); } catch (Throwable ex) { Loggers.CLUSTER.error("failed to report new info to target node : {}, error : {}", target.getAddress(), ExceptionUtil.getAllExceptionMsg(ex)); } }
publicclassMemberUtil{ publicstaticvoidonSuccess(final ServerMemberManager manager, final Member member){ //请求成功,证明被请求Server能够正常通信 final NodeState old = member.getState(); //设置被请求Server状态 NodeState.UP manager.getMemberAddressInfos().add(member.getAddress()); member.setState(NodeState.UP); member.setFailAccessCnt(0); if (!Objects.equals(old, member.getState())) { //如果状态不一致,发送事件 manager.notifyMemberChange(); } }
publicstaticvoidonFail(final ServerMemberManager manager, final Member member, Throwable ex){ //请求失败,被请求节点不能通信 //移除被请求Server manager.getMemberAddressInfos().remove(member.getAddress()); final NodeState old = member.getState(); //设置状态 NodeState.SUSPICIOUS member.setState(NodeState.SUSPICIOUS); //失败次数加1 member.setFailAccessCnt(member.getFailAccessCnt() + 1); int maxFailAccessCnt = EnvUtil.getProperty("nacos.core.member.fail-access-cnt", Integer.class, 3);
// If the number of consecutive failures to access the target node reaches // a maximum, or the link request is rejected, the state is directly down //失败次数超过 maxFailAccessCnt 设置状态 NodeState.DOWN if (member.getFailAccessCnt() > maxFailAccessCnt || StringUtils .containsIgnoreCase(ex.getMessage(), TARGET_MEMBER_CONNECT_REFUSE_ERRMSG)) { member.setState(NodeState.DOWN); } if (!Objects.equals(old, member.getState())) { //状态改变,发送事件 manager.notifyMemberChange(); } } }
Loggers.SRV_LOG.info("open empty service auto clean job, initialDelay : {} ms, period : {} ms", cleanEmptyServiceDelay, cleanEmptyServicePeriod);
// delay 60s, period 20s;
// This task is not recommended to be performed frequently in order to avoid // the possibility that the service cache information may just be deleted // and then created due to the heartbeat mechanism
try { Loggers.SRV_LOG.info("listen for service meta change"); consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this); } catch (NacosException e) { Loggers.SRV_LOG.error("listen for service meta change failed!"); } } }