finallong startTime = relativeTime(); final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
//默认为true if (needToCheck()) { // Attempt to create all the indices that we're going to need during the bulk before we start. // Step 1: collect all the indices in the request //获取索引名字 final Set<String> indices = bulkRequest.requests.stream() // delete requests should not attempt to create the index (if the index does not // exists), unless an external versioning is used .filter(request -> request.opType() != DocWriteRequest.OpType.DELETE || request.versionType() == VersionType.EXTERNAL || request.versionType() == VersionType.EXTERNAL_GTE) .map(DocWriteRequest::index) .collect(Collectors.toSet()); /* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create * that we'll use when we try to run the requests. */ final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>(); Set<String> autoCreateIndices = new HashSet<>(); ClusterState state = clusterService.state(); for (String index : indices) { boolean shouldAutoCreate; try { //是否自动创建索引,从路由表查询索引是否存在,如果不存在那么就自动创建索引,路由表在集群状态中保存 shouldAutoCreate = shouldAutoCreate(index, state); } catch (IndexNotFoundException e) { shouldAutoCreate = false; indicesThatCannotBeCreated.put(index, e); } if (shouldAutoCreate) { autoCreateIndices.add(index); } } // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back. if (autoCreateIndices.isEmpty()) { executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); } else { //需要自动创建索引 final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size()); for (String index : autoCreateIndices) { //2 这个方法就是创建索引 createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() { @Override publicvoidonResponse(CreateIndexResponse result){ if (counter.decrementAndGet() == 0) { //创建完所有索引后执行 executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); } }
@Override publicvoidonFailure(Exception e){ if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) { // fail all requests involving this index, if create didn't work for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest request = bulkRequest.requests.get(i); if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) { bulkRequest.requests.set(i, null); } } } if (counter.decrementAndGet() == 0) { executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> { inner.addSuppressed(e); listener.onFailure(inner); }), responses, indicesThatCannotBeCreated); } } }); } } } else { executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap()); } }
voidexecuteBulk(Task task, final BulkRequest bulkRequest, finallong startTimeNanos, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated){ new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos, indicesThatCannotBeCreated).run(); }
privatefinalclassBulkOperationextendsAbstractRunnable{ @Override protectedvoiddoRun()throws Exception { final ClusterState clusterState = observer.setAndGetObservedState(); if (handleBlockExceptions(clusterState)) { return; } final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver); MetaData metaData = clusterState.metaData(); //遍历所有请求 for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest docWriteRequest = bulkRequest.requests.get(i); //the request can only be null because we set it to null in the previous step, so it gets ignored if (docWriteRequest == null) { continue; } //检查索引状态是否可用 if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metaData)) { continue; } Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest); try { //opType 默认 INDEX switch (docWriteRequest.opType()) { case CREATE: case INDEX: IndexRequest indexRequest = (IndexRequest) docWriteRequest; final IndexMetaData indexMetaData = metaData.index(concreteIndex); MappingMetaData mappingMd = indexMetaData.mappingOrDefault(indexRequest.type()); Version indexCreated = indexMetaData.getCreationVersion(); //3 设置 routing,获取请求URL或mapping中的_routing,如果没有则使用_id indexRequest.resolveRouting(metaData); //如果id为空,生成id indexRequest.process(indexCreated, mappingMd, concreteIndex.getName()); break; case UPDATE: TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest); break; case DELETE: docWriteRequest.routing(metaData.resolveIndexRouting(docWriteRequest.parent(), docWriteRequest.routing(), docWriteRequest.index())); // check if routing is required, if so, throw error if routing wasn't specified if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName(), docWriteRequest.type())) { thrownew RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id()); } break; default: thrownew AssertionError("request type not supported: [" + docWriteRequest.opType() + "]"); } } catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) { BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id(), e); BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure); responses.set(i, bulkItemResponse); // make sure the request gets never processed again bulkRequest.requests.set(i, null); } }
// first, go over all the requests and create a ShardId -> Operations mapping Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>(); for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest request = bulkRequest.requests.get(i); if (request == null) { continue; } //获取索引名 String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName(); //根据id 或 _routing 算出要写入到的分片id ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId(); List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>()); //把同一分片的请求合并一起 shardRequests.add(new BulkItemRequest(i, request)); }
if (requestsByShard.isEmpty()) { listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))); return; }
final AtomicInteger counter = new AtomicInteger(requestsByShard.size()); String nodeId = clusterService.localNode().getId(); //遍历每个分片请求 for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) { final ShardId shardId = entry.getKey(); final List<BulkItemRequest> requests = entry.getValue(); //4 构建BulkShardRequest BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(), requests.toArray(new BulkItemRequest[requests.size()])); bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards()); bulkShardRequest.timeout(bulkRequest.timeout()); if (task != null) { bulkShardRequest.setParentTask(nodeId, task.getId()); } shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() { @Override publicvoidonResponse(BulkShardResponse bulkShardResponse){ for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { // we may have no response if item failed if (bulkItemResponse.getResponse() != null) { bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo()); } responses.set(bulkItemResponse.getItemId(), bulkItemResponse); } if (counter.decrementAndGet() == 0) { finishHim(); } }
@Override publicvoidonFailure(Exception e){ // create failures for all relevant requests for (BulkItemRequest request : requests) { final String indexName = concreteIndices.getConcreteIndex(request.index()).getName(); DocWriteRequest docWriteRequest = request.request(); responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(), new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e))); } if (counter.decrementAndGet() == 0) { finishHim(); } }
@Override protectedvoiddoRun(){ setPhase(task, "routing"); final ClusterState state = observer.setAndGetObservedState(); if (handleBlockExceptions(state)) { return; }
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId final String concreteIndex = concreteIndex(state); final IndexMetaData indexMetaData = state.metaData().index(concreteIndex); if (indexMetaData == null) { retry(new IndexNotFoundException(concreteIndex)); return; } if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { thrownew IndexClosedException(indexMetaData.getIndex()); }
// resolve all derived request fields, so we can route and apply it resolveRequest(indexMetaData, request); assert request.shardId() != null : "request shardId must be set in resolveRequest"; assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
//根据请求分片id获取主分片 final ShardRouting primary = primary(state); if (retryIfUnavailable(state, primary)) { return; } //获取主分片所在的节点 final DiscoveryNode node = state.nodes().get(primary.currentNodeId()); //5 因为当前执行的是写操作,因此只能在primary上完成,所以需要把请求路由到primary shard所在节点 if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) { //主分片在当前节点 performLocalAction(state, primary, node, indexMetaData); } else { //在其他节点 performRemoteAction(state, primary, node); } }
privatevoidperformLocalAction(ClusterState state, ShardRouting primary, DiscoveryNode node, IndexMetaData indexMetaData){ setPhase(task, "waiting_on_primary"); if (logger.isTraceEnabled()) { logger.trace("send action [{}] to local primary [{}] for request [{}] with cluster state version [{}] to [{}] ", transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId()); } performAction(node, transportPrimaryAction, true, new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetaData.primaryTerm(primary.id()))); }
privatevoidperformAction(final DiscoveryNode node, final String action, finalboolean isPrimaryAction, final TransportRequest requestToPerform){ transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {
@Override public Response newInstance(){ return newResponseInstance(); }
@Override public String executor(){ return ThreadPool.Names.SAME; }
@Override publicvoidhandleException(TransportException exp){ try { // if we got disconnected from the node, or the node / shard is not in the right state (being closed) final Throwable cause = exp.unwrapCause(); if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException || (isPrimaryAction && retryPrimaryException(cause))) { logger.trace( (org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage( "received an error from node [{}] for request [{}], scheduling a retry", node.getId(), requestToPerform), exp); retry(exp); } else { finishAsFailed(exp); } } catch (Exception e) { e.addSuppressed(exp); finishWithUnexpectedFailure(e); } } }); } } }
source version first json Field field = new Field(fieldType().name(), binaryValue, fieldType()); fields.add(new SortedSetDocValuesField(fieldType().name(), binaryValue));
BulkItemResponse primaryResponse = new BulkItemResponse(replicaRequest.id(), opType, response); // set a blank ShardInfo so we can safely send it to the replicas. We won’t use it in the real response though. primaryResponse.getResponse().setShardInfo(new ShardInfo()); return primaryResponse;
return new WritePrimaryResult<>(request, response, location, null, primary, logger);