timeout = {TimeValue@8576} “30s”
masterNodeTimeout = {TimeValue@8463} “1m”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
public class A{
public static void main(String[] args) {
return channel ->
client.index(indexRequest, new RestStatusToXContentListener<>(channel, r -> r.getLocation(indexRequest.routing())));
}

ActionListener<BulkResponse> wrapBulkResponse(ActionListener<Response> listener) {
return ActionListener.wrap(bulkItemResponses -> {
assert bulkItemResponses.getItems().length == 1 : "expected only one item in bulk request";
BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0];
if (bulkItemResponse.isFailed() == false) {
final DocWriteResponse response = bulkItemResponse.getResponse();
listener.onResponse((Response) response);
} else {
listener.onFailure(bulkItemResponse.getFailure().getCause());
}
}, listener::onFailure);
}

void k(){
createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
}
}

@Override
public void onFailure(Exception e) {
if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) {
// fail all requests involving this index, if create didn't work
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest request = bulkRequest.requests.get(i);
if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
bulkRequest.requests.set(i, null);
}
}
}
if (counter.decrementAndGet() == 0) {
executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
inner.addSuppressed(e);
listener.onFailure(inner);
}), responses, indicesThatCannotBeCreated);
}
}
});
}

ActionListener<Response> delegate = new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
listener.onResponse(response);
}

@Override
public void onFailure(Exception t) {
if (t instanceof Discovery.FailedToCommitClusterStateException
|| (t instanceof NotMasterException)) {
logger.debug((org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t);
retry(t, masterChangePredicate);
} else {
listener.onFailure(t);
}
}
};

//开启线程

void a(){
createIndexService.createIndex(updateRequest, ActionListener.wrap(response ->
listener.onResponse(new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcked(), indexName)),
listener::onFailure));


listenerDJR = onlyCreateIndex(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
activeShardsObserver.waitForActiveShards(new String[]{request.index()}, request.waitForActiveShards(), request.ackTimeout(),
shardsAcked -> {
if (shardsAcked == false) {
logger.debug("[{}] index created, but the operation timed out while waiting for " +
"enough shards to be started.", request.index());
}
listener.onResponse(new CreateIndexClusterStateUpdateResponse(response.isAcknowledged(), shardsAcked));
}, listener::onFailure);
} else {
listener.onResponse(new CreateIndexClusterStateUpdateResponse(false, false));
}
}, listener::onFailure));


clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]",
new IndexCreationTask(logger, allocationService, request, listenerDJR, indicesService, aliasValidator, xContentRegistry, settings,
this::validate));


}

class UpdateTask extends BatchedTask {
//SafeAckedClusterStateTaskListener.listener = IndexCreationTask
final ClusterStateTaskListener listener;

UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener,
ClusterStateTaskExecutor<?> executor) {
super(priority, source, executor, task);
this.listener = listener;
}

@Override
public String describeTasks(List<? extends BatchedTask> tasks) {
return ((ClusterStateTaskExecutor<Object>) batchingKey).describeTasks(
tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList()));
}
}

//线程池

//ackListener = DelegetingAckListener

private static class DelegetingAckListener implements Discovery.AckListener {

////ackedListener = (SafeAckedClusterStateTaskListener.listener = IndexCreationTask)
//ackListeners.add(new AckCountDownListener(ackedListener, newClusterState.version(), newClusterState.nodes(),threadPool));
private final List<Discovery.AckListener> listeners;

private DelegetingAckListener(List<Discovery.AckListener> listeners) {
this.listeners = listeners;
}

@Override
public void onNodeAck(DiscoveryNode node, @Nullable Exception e) {
for (Discovery.AckListener listener : listeners) {
listener.onNodeAck(node, e);
}
}

@Override
public void onTimeout() {
throw new UnsupportedOperationException("no timeout delegation");
}
}

void aa(){
Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
TransportResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
clientHandlers.put(requestId, new RequestHolder<>(responseHandler, connection, action, timeoutHandler));
FailedToCommitClusterStateException

sendingController.getPublishResponseHandler().onResponse(node);sendingController.getPublishResponseHandler().onResponse(node);
}


void aaaa(){
ShardStartedClusterStateTaskExecutor;
clusterService.submitStateUpdateTask(
"shard-started " + request,
request,
ClusterStateTaskConfig.build(Priority.URGENT),
shardStartedClusterStateTaskExecutor,
shardStartedClusterStateTaskExecutor);

request = new StartRecoveryRequest(
recoveryTarget.shardId(),
recoveryTarget.indexShard().routingEntry().allocationId().getId(),
recoveryTarget.sourceNode(),
clusterService.localNode(),
Store.MetadataSnapshot.EMPTY,
recoveryTarget.state().getPrimary() = false,
recoveryTarget.recoveryId(),
SequenceNumbers.UNASSIGNED_SEQ_NO);
}
}

return new Checkpoint(0, 0, 1, minSeqNo, maxSeqNo, -2, 1);
return new Checkpoint(offset, 0, 1, minSeqNo, maxSeqNo, -2, 1);

fields.add(new Field(NAME, id, fieldType));

public static SequenceIDFields emptySeqID() {
return new SequenceIDFields(new LongPoint(NAME, SequenceNumbers.UNASSIGNED_SEQ_NO),
new NumericDocValuesField(NAME, SequenceNumbers.UNASSIGNED_SEQ_NO),
new NumericDocValuesField(PRIMARY_TERM_NAME, 0));
context.seqID(seqID);
fields.add(seqID.seqNo);
fields.add(seqID.seqNoDocValue);
fields.add(seqID.primaryTerm);

fields.add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));

final Field version = new NumericDocValuesField(NAME, -1L);

builder = new TextFieldMapper.Builder(currentFieldName)
.addMultiField(new KeywordFieldMapper.Builder(“keyword”).ignoreAbove(256));