nacos 版本 1.4.1
DistroProtocol 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Component public class DistroProtocol { private void startDistroTask () { if (EnvUtil.getStandaloneMode()) { isInitialized = true ; return ; } startVerifyTask(); startLoadTask(); } private void startVerifyTask () { GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTask(memberManager, distroComponentHolder), distroConfig.getVerifyIntervalMillis()); } }
定时任务 每5秒循环执行DistroVerifyTask
每5秒广播向其他Server发送 Datum checksum,其他 Server 如果发现你的 checksum 和我的不一样,就会请求源 Server 获取新数据,获取成功后更新本地 Datum 和本地实例表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 public class DistroVerifyTask implements Runnable { private final ServerMemberManager serverMemberManager; private final DistroComponentHolder distroComponentHolder; public DistroVerifyTask (ServerMemberManager serverMemberManager, DistroComponentHolder distroComponentHolder) { this .serverMemberManager = serverMemberManager; this .distroComponentHolder = distroComponentHolder; } @Override public void run () { try { List<Member> targetServer = serverMemberManager.allMembersWithoutSelf(); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("server list is: {}" , targetServer); } for (String each : distroComponentHolder.getDataStorageTypes()) { verifyForDataStorage(each, targetServer); } } catch (Exception e) { Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed." , e); } } private void verifyForDataStorage (String type, List<Member> targetServer) { DistroData distroData = distroComponentHolder.findDataStorage(type).getVerifyData(); if (null == distroData) { return ; } distroData.setType(DataOperation.VERIFY); for (Member member : targetServer) { try { distroComponentHolder.findTransportAgent(type).syncVerifyData(distroData, member.getAddress()); } catch (Exception e) { Loggers.DISTRO.error(String .format("[DISTRO-FAILED] verify data for type %s to %s failed." , type, member.getAddress()), e); } } } } public class DistroDataStorageImpl implements DistroDataStorage { @Override public DistroData getVerifyData () { Map<String, String> keyChecksums = new HashMap<>(64 ); for (String key : dataStore.keys()) { if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) { continue ; } Datum datum = dataStore.get(key); if (datum == null ) { continue ; } keyChecksums.put(key, datum.value.getChecksum()); } if (keyChecksums.isEmpty()) { return null ; } DistroKey distroKey = new DistroKey("checksum" , KeyBuilder.INSTANCE_LIST_KEY_PREFIX); return new DistroData(distroKey, ApplicationUtils.getBean(Serializer.class).serialize(keyChecksums)); } } public class DistroHttpAgent implements DistroTransportAgent { @Override public boolean syncVerifyData (DistroData verifyData, String targetServer) { if (!memberManager.hasMember(targetServer)) { return true ; } NamingProxy.syncCheckSums(verifyData.getContent(), targetServer); return true ; } }
DistroConsistencyServiceImpl 默认情况下,延时1秒后把数据广播同步到其他Server,如果发送给某了Server失败了,那么会重试,延时设置为5秒后
key = 代表Service key,在这1秒内,无论这个Service下实例发生了什么变化,也都会等到1秒后延时执行同步,并不是实例发生了改变就立即同步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 @DependsOn("ProtocolManager") @org .springframework.stereotype.Service("distroConsistencyService" )public class DistroConsistencyServiceImpl implements EphemeralConsistencyService , DistroDataProcessor { @Override public void put (String key, Record value) throws NacosException { onPut(key, value); distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2 ); } } @Component public class DistroProtocol { public void sync (DistroKey distroKey, DataOperation action, long delay) { for (Member each : memberManager.allMembersWithoutSelf()) { DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress()); DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}" , distroKey, each.getAddress()); } } } } public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine <AbstractDelayTask > { @Override public void addTask (Object key, AbstractDelayTask newTask) { lock.lock(); try { AbstractDelayTask existTask = tasks.get(key); if (null != existTask) { newTask.merge(existTask); } tasks.put(key, newTask); } finally { lock.unlock(); } } protected void processTasks () { Collection<Object> keys = getAllTaskKeys(); for (Object taskKey : keys) { AbstractDelayTask task = removeTask(taskKey); if (null == task) { continue ; } NacosTaskProcessor processor = getProcessor(taskKey); if (null == processor) { getEngineLog().error("processor not found for task, so discarded. " + task); continue ; } try { if (!processor.process(task)) { retryFailedTask(taskKey, task); } } catch (Throwable e) { getEngineLog().error("Nacos task execute error : " + e.toString(), e); retryFailedTask(taskKey, task); } } } @Override public AbstractDelayTask removeTask (Object key) { lock.lock(); try { AbstractDelayTask task = tasks.get(key); if (null != task && task.shouldProcess()) { return tasks.remove(key); } else { return null ; } } finally { lock.unlock(); } } } public class DistroDelayTask extends AbstractDelayTask { @Override public void merge (AbstractDelayTask task) { if (!(task instanceof DistroDelayTask)) { return ; } DistroDelayTask newTask = (DistroDelayTask) task; if (!action.equals(newTask.getAction()) && createTime < newTask.getCreateTime()) { action = newTask.getAction(); createTime = newTask.getCreateTime(); } setLastProcessTime(newTask.getLastProcessTime()); } } public class DistroDelayTaskProcessor implements NacosTaskProcessor { @Override public boolean process (NacosTask task) { if (!(task instanceof DistroDelayTask)) { return true ; } DistroDelayTask distroDelayTask = (DistroDelayTask) task; DistroKey distroKey = distroDelayTask.getDistroKey(); if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) { DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder); distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask); return true ; } return false ; } } public class DistroSyncChangeTask extends AbstractDistroExecuteTask { @Override public void run () { Loggers.DISTRO.info("[DISTRO-START] {}" , toString()); try { String type = getDistroKey().getResourceType(); DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey()); distroData.setType(DataOperation.CHANGE); boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer()); if (!result) { handleFailedTask(); } Loggers.DISTRO.info("[DISTRO-END] {} result: {}" , toString(), result); } catch (Exception e) { Loggers.DISTRO.warn("[DISTRO] Sync data change failed." , e); handleFailedTask(); } } private void handleFailedTask () { String type = getDistroKey().getResourceType(); DistroFailedTaskHandler failedTaskHandler = distroComponentHolder.findFailedTaskHandler(type); if (null == failedTaskHandler) { Loggers.DISTRO.warn("[DISTRO] Can't find failed task for type {}, so discarded" , type); return ; } failedTaskHandler.retry(getDistroKey(), DataOperation.CHANGE); } } public class DistroHttpAgent implements DistroTransportAgent { @Override public boolean syncData (DistroData data, String targetServer) { if (!memberManager.hasMember(targetServer)) { return true ; } byte [] dataContent = data.getContent(); return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer()); } } public class DistroHttpCombinedKeyTaskFailedHandler implements DistroFailedTaskHandler { @Override public void retry (DistroKey distroKey, DataOperation action) { DistroHttpCombinedKey combinedKey = (DistroHttpCombinedKey) distroKey; for (String each : combinedKey.getActualResourceTypes()) { DistroKey newKey = new DistroKey(each, KeyBuilder.INSTANCE_LIST_KEY_PREFIX, distroKey.getTargetServer()); DistroDelayTask newTask = new DistroDelayTask(newKey, action, globalConfig.getSyncRetryDelay()); distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(newKey, newTask); } } }
/v1/ns/distro/checksum 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 @RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/distro") public class DistroController {@PutMapping("/checksum") public ResponseEntity syncChecksum (@RequestParam String source, @RequestBody Map<String, String> dataMap) { DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(source), dataMap); distroProtocol.onVerify(distroHttpData); return ResponseEntity.ok("ok" ); } } @Component public class DistroProtocol { public boolean onReceive (DistroData distroData) { String resourceType = distroData.getDistroKey().getResourceType(); DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType); if (null == dataProcessor) { Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}" , resourceType); return false ; } return dataProcessor.processData(distroData); } } @DependsOn("ProtocolManager") @org .springframework.stereotype.Service("distroConsistencyService" )public class DistroConsistencyServiceImpl implements EphemeralConsistencyService , DistroDataProcessor { @Override public boolean processVerifyData (DistroData distroData) { DistroHttpData distroHttpData = (DistroHttpData) distroData; String sourceServer = distroData.getDistroKey().getResourceKey(); Map<String, String> verifyData = (Map<String, String>) distroHttpData.getDeserializedContent(); onReceiveChecksums(verifyData, sourceServer); return true ; } public void onReceiveChecksums (Map<String, String> checksumMap, String server) { if (syncChecksumTasks.containsKey(server)) { Loggers.DISTRO.warn("sync checksum task already in process with {}" , server); return ; } syncChecksumTasks.put(server, "1" ); try { List<String> toUpdateKeys = new ArrayList<>(); List<String> toRemoveKeys = new ArrayList<>(); for (Map.Entry<String, String> entry : checksumMap.entrySet()) { if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) { Loggers.DISTRO.error("receive responsible key timestamp of " + entry.getKey() + " from " + server); return ; } if (!dataStore.contains(entry.getKey()) || dataStore.get(entry.getKey()).value == null || !dataStore .get(entry.getKey()).value.getChecksum().equals(entry.getValue())) { toUpdateKeys.add(entry.getKey()); } } for (String key : dataStore.keys()) { if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) { continue ; } if (!checksumMap.containsKey(key)) { toRemoveKeys.add(key); } } Loggers.DISTRO .info("to remove keys: {}, to update keys: {}, source: {}" , toRemoveKeys, toUpdateKeys, server); for (String key : toRemoveKeys) { onRemove(key); } if (toUpdateKeys.isEmpty()) { return ; } try { DistroHttpCombinedKey distroKey = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, server); distroKey.getActualResourceTypes().addAll(toUpdateKeys); DistroData remoteData = distroProtocol.queryFromRemote(distroKey); if (null != remoteData) { processData(remoteData.getContent()); } } catch (Exception e) { Loggers.DISTRO.error("get data from " + server + " failed!" , e); } } finally { syncChecksumTasks.remove(server); } } } @Component public class DistroProtocol { public DistroData queryFromRemote (DistroKey distroKey) { if (null == distroKey.getTargetServer()) { Loggers.DISTRO.warn("[DISTRO] Can't query data from empty server" ); return null ; } String resourceType = distroKey.getResourceType(); DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType); if (null == transportAgent) { Loggers.DISTRO.warn("[DISTRO] Can't find transport agent for key {}" , resourceType); return null ; } return transportAgent.getData(distroKey, distroKey.getTargetServer()); } } public class DistroHttpAgent implements DistroTransportAgent { @Override public DistroData getData (DistroKey key, String targetServer) { try { List<String> toUpdateKeys = null ; if (key instanceof DistroHttpCombinedKey) { toUpdateKeys = ((DistroHttpCombinedKey) key).getActualResourceTypes(); } else { toUpdateKeys = new ArrayList<>(1 ); toUpdateKeys.add(key.getResourceKey()); } byte [] queriedData = NamingProxy.getData(toUpdateKeys, key.getTargetServer()); return new DistroData(key, queriedData); } catch (Exception e) { throw new DistroException(String.format("Get data from %s failed." , key.getTargetServer()), e); } } } @DependsOn("ProtocolManager") @org .springframework.stereotype.Service("distroConsistencyService" )public class DistroConsistencyServiceImpl implements EphemeralConsistencyService , DistroDataProcessor { private boolean processData (byte [] data) throws Exception { if (data.length > 0 ) { Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class); for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) { dataStore.put(entry.getKey(), entry.getValue()); if (!listeners.containsKey(entry.getKey())) { if (switchDomain.isDefaultInstanceEphemeral()) { Loggers.DISTRO.info("creating service {}" , entry.getKey()); Service service = new Service(); String serviceName = KeyBuilder.getServiceName(entry.getKey()); String namespaceId = KeyBuilder.getNamespace(entry.getKey()); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(Constants.DEFAULT_GROUP); service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek(); if (Objects.isNull(listener)) { return false ; } listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service); } } } for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) { if (!listeners.containsKey(entry.getKey())) { Loggers.DISTRO.warn("listener of {} not found." , entry.getKey()); continue ; } try { for (RecordListener listener : listeners.get(entry.getKey())) { listener.onChange(entry.getKey(), entry.getValue().value); } } catch (Exception e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}" , entry.getKey(), e); continue ; } dataStore.put(entry.getKey(), entry.getValue()); } } return true ; } }
GET /v1/ns/distro/datum 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 @RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/distro") public class DistroController { @GetMapping("/datum") public ResponseEntity get (@RequestBody String body) throws Exception { JsonNode bodyNode = JacksonUtils.toObj(body); String keys = bodyNode.get("keys" ).asText(); String keySplitter = "," ; DistroHttpCombinedKey distroKey = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, "" ); for (String key : keys.split(keySplitter)) { distroKey.getActualResourceTypes().add(key); } DistroData distroData = distroProtocol.onQuery(distroKey); return ResponseEntity.ok(distroData.getContent()); } } @Component public class DistroProtocol { public DistroData onQuery (DistroKey distroKey) { String resourceType = distroKey.getResourceType(); DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(resourceType); if (null == distroDataStorage) { Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}" , resourceType); return new DistroData(distroKey, new byte [0 ]); } return distroDataStorage.getDistroData(distroKey); } } public class DistroDataStorageImpl implements DistroDataStorage { @Override public DistroData getDistroData (DistroKey distroKey) { Map<String, Datum> result = new HashMap<>(1 ); if (distroKey instanceof DistroHttpCombinedKey) { result = dataStore.batchGet(((DistroHttpCombinedKey) distroKey).getActualResourceTypes()); } else { Datum datum = dataStore.get(distroKey.getResourceKey()); result.put(distroKey.getResourceKey(), datum); } byte [] dataContent = ApplicationUtils.getBean(Serializer.class).serialize(result); return new DistroData(distroKey, dataContent); } }
PUT /v1/ns/distro/datum 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/distro") public class DistroController {@PutMapping("/datum") public ResponseEntity onSyncDatum (@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception { if (dataMap.isEmpty()) { Loggers.DISTRO.error("[onSync] receive empty entity!" ); throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!" ); } for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) { if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) { String namespaceId = KeyBuilder.getNamespace(entry.getKey()); String serviceName = KeyBuilder.getServiceName(entry.getKey()); if (!serviceManager.containService(namespaceId, serviceName) && switchDomain .isDefaultInstanceEphemeral()) { serviceManager.createEmptyService(namespaceId, serviceName, true ); } DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), entry.getValue()); distroProtocol.onReceive(distroHttpData); } } return ResponseEntity.ok("ok" ); } } @DependsOn("ProtocolManager") @org .springframework.stereotype.Service("distroConsistencyService" )public class DistroConsistencyServiceImpl implements EphemeralConsistencyService , DistroDataProcessor { @Override public boolean processData (DistroData distroData) { DistroHttpData distroHttpData = (DistroHttpData) distroData; Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent(); onPut(datum.key, datum.value); return true ; } }
总结
client,可以去任意Server发心跳或者注册,因为到达Server会根据服务名计算hash,如果不是自己Server处理就会转发,最后只会是同一个Server处理
client,可以去任意Server订阅服务列表,因为 Server 和 Server 之间通过定时任务广播复制服务列表,服务列表最终一致性
client订阅的服务列表本地缓存每10秒刷新一次,这是拉模式,如果在10秒内有实例表更新了,Server 会推送新的数据过来,这是推模式
client 请求 Server 如果只填了一个 server-ip 会重试3次,如果填了多个,会重试所有你填的 server-ip
todo 有时间重新画一下上面那副大图
todo 持久化实例使用了cp模式,用了 sofajraft 实现的,暂时没有时间看
todo 配置中心
todo 和安全有关
todo istio