|
| 1 | +/* |
| 2 | + * Licensed to Elasticsearch under one or more contributor |
| 3 | + * license agreements. See the NOTICE file distributed with |
| 4 | + * this work for additional information regarding copyright |
| 5 | + * ownership. Elasticsearch licenses this file to you under |
| 6 | + * the Apache License, Version 2.0 (the "License"); you may |
| 7 | + * not use this file except in compliance with the License. |
| 8 | + * You may obtain a copy of the License at |
| 9 | + * |
| 10 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | + * |
| 12 | + * Unless required by applicable law or agreed to in writing, |
| 13 | + * software distributed under the License is distributed on an |
| 14 | + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | + * KIND, either express or implied. See the License for the |
| 16 | + * specific language governing permissions and limitations |
| 17 | + * under the License. |
| 18 | + */ |
| 19 | + |
| 20 | +package org.elasticsearch.cluster.coordination; |
| 21 | + |
| 22 | +import org.apache.logging.log4j.message.ParameterizedMessage; |
| 23 | +import org.elasticsearch.ElasticsearchException; |
| 24 | +import org.elasticsearch.action.ActionListener; |
| 25 | +import org.elasticsearch.cluster.node.DiscoveryNode; |
| 26 | +import org.elasticsearch.common.component.AbstractComponent; |
| 27 | +import org.elasticsearch.common.settings.Settings; |
| 28 | +import org.elasticsearch.common.unit.TimeValue; |
| 29 | +import org.elasticsearch.discovery.Discovery; |
| 30 | +import org.elasticsearch.discovery.Discovery.AckListener; |
| 31 | +import org.elasticsearch.transport.TransportException; |
| 32 | +import org.elasticsearch.transport.TransportResponse; |
| 33 | + |
| 34 | +import java.util.ArrayList; |
| 35 | +import java.util.List; |
| 36 | +import java.util.Optional; |
| 37 | +import java.util.Set; |
| 38 | +import java.util.function.LongSupplier; |
| 39 | + |
| 40 | +public abstract class Publication extends AbstractComponent { |
| 41 | + |
| 42 | + private final List<PublicationTarget> publicationTargets; |
| 43 | + private final PublishRequest publishRequest; |
| 44 | + private final AckListener ackListener; |
| 45 | + private final LongSupplier currentTimeSupplier; |
| 46 | + private final long startTime; |
| 47 | + |
| 48 | + private Optional<ApplyCommitRequest> applyCommitRequest; // set when state is committed |
| 49 | + private boolean isCompleted; // set when publication is completed |
| 50 | + private boolean timedOut; // set when publication timed out |
| 51 | + |
| 52 | + public Publication(Settings settings, PublishRequest publishRequest, AckListener ackListener, LongSupplier currentTimeSupplier) { |
| 53 | + super(settings); |
| 54 | + this.publishRequest = publishRequest; |
| 55 | + this.ackListener = ackListener; |
| 56 | + this.currentTimeSupplier = currentTimeSupplier; |
| 57 | + startTime = currentTimeSupplier.getAsLong(); |
| 58 | + applyCommitRequest = Optional.empty(); |
| 59 | + publicationTargets = new ArrayList<>(publishRequest.getAcceptedState().getNodes().getNodes().size()); |
| 60 | + publishRequest.getAcceptedState().getNodes().iterator().forEachRemaining(n -> publicationTargets.add(new PublicationTarget(n))); |
| 61 | + } |
| 62 | + |
| 63 | + public void start(Set<DiscoveryNode> faultyNodes) { |
| 64 | + logger.trace("publishing {} to {}", publishRequest, publicationTargets); |
| 65 | + |
| 66 | + for (final DiscoveryNode faultyNode : faultyNodes) { |
| 67 | + onFaultyNode(faultyNode); |
| 68 | + } |
| 69 | + onPossibleCommitFailure(); |
| 70 | + publicationTargets.forEach(PublicationTarget::sendPublishRequest); |
| 71 | + } |
| 72 | + |
| 73 | + public void onTimeout() { |
| 74 | + assert timedOut == false; |
| 75 | + timedOut = true; |
| 76 | + if (applyCommitRequest.isPresent() == false) { |
| 77 | + logger.debug("onTimeout: [{}] timed out before committing", this); |
| 78 | + // fail all current publications |
| 79 | + final Exception e = new ElasticsearchException("publication timed out before committing"); |
| 80 | + publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e)); |
| 81 | + } |
| 82 | + onPossibleCompletion(); |
| 83 | + } |
| 84 | + |
| 85 | + public void onFaultyNode(DiscoveryNode faultyNode) { |
| 86 | + publicationTargets.forEach(t -> t.onFaultyNode(faultyNode)); |
| 87 | + onPossibleCompletion(); |
| 88 | + } |
| 89 | + |
| 90 | + private void onPossibleCompletion() { |
| 91 | + if (isCompleted) { |
| 92 | + return; |
| 93 | + } |
| 94 | + |
| 95 | + if (timedOut == false) { |
| 96 | + for (final PublicationTarget target : publicationTargets) { |
| 97 | + if (target.isActive()) { |
| 98 | + return; |
| 99 | + } |
| 100 | + } |
| 101 | + } |
| 102 | + |
| 103 | + if (applyCommitRequest.isPresent() == false) { |
| 104 | + logger.debug("onPossibleCompletion: [{}] commit failed", this); |
| 105 | + assert isCompleted == false; |
| 106 | + isCompleted = true; |
| 107 | + onCompletion(false); |
| 108 | + return; |
| 109 | + } |
| 110 | + |
| 111 | + assert isCompleted == false; |
| 112 | + isCompleted = true; |
| 113 | + onCompletion(true); |
| 114 | + assert applyCommitRequest.isPresent(); |
| 115 | + logger.trace("onPossibleCompletion: [{}] was successful", this); |
| 116 | + } |
| 117 | + |
| 118 | + // For assertions only: verify that this invariant holds |
| 119 | + private boolean publicationCompletedIffAllTargetsInactiveOrTimedOut() { |
| 120 | + if (timedOut == false) { |
| 121 | + for (final PublicationTarget target : publicationTargets) { |
| 122 | + if (target.isActive()) { |
| 123 | + return isCompleted == false; |
| 124 | + } |
| 125 | + } |
| 126 | + } |
| 127 | + return isCompleted; |
| 128 | + } |
| 129 | + |
| 130 | + private void onPossibleCommitFailure() { |
| 131 | + if (applyCommitRequest.isPresent()) { |
| 132 | + onPossibleCompletion(); |
| 133 | + return; |
| 134 | + } |
| 135 | + |
| 136 | + final CoordinationState.VoteCollection possiblySuccessfulNodes = new CoordinationState.VoteCollection(); |
| 137 | + for (PublicationTarget publicationTarget : publicationTargets) { |
| 138 | + if (publicationTarget.mayCommitInFuture()) { |
| 139 | + possiblySuccessfulNodes.addVote(publicationTarget.discoveryNode); |
| 140 | + } else { |
| 141 | + assert publicationTarget.isFailed() : publicationTarget; |
| 142 | + } |
| 143 | + } |
| 144 | + |
| 145 | + if (isPublishQuorum(possiblySuccessfulNodes) == false) { |
| 146 | + logger.debug("onPossibleCommitFailure: non-failed nodes {} do not form a quorum, so {} cannot succeed", |
| 147 | + possiblySuccessfulNodes, this); |
| 148 | + Exception e = new Discovery.FailedToCommitClusterStateException("non-failed nodes do not form a quorum"); |
| 149 | + publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e)); |
| 150 | + onPossibleCompletion(); |
| 151 | + } |
| 152 | + } |
| 153 | + |
| 154 | + protected abstract void onCompletion(boolean committed); |
| 155 | + |
| 156 | + protected abstract boolean isPublishQuorum(CoordinationState.VoteCollection votes); |
| 157 | + |
| 158 | + protected abstract Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse); |
| 159 | + |
| 160 | + protected abstract void onPossibleJoin(DiscoveryNode sourceNode, PublishWithJoinResponse response); |
| 161 | + |
| 162 | + protected abstract void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, |
| 163 | + ActionListener<PublishWithJoinResponse> responseActionListener); |
| 164 | + |
| 165 | + protected abstract void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit, |
| 166 | + ActionListener<TransportResponse.Empty> responseActionListener); |
| 167 | + |
| 168 | + @Override |
| 169 | + public String toString() { |
| 170 | + return "Publication{term=" + publishRequest.getAcceptedState().term() + |
| 171 | + ", version=" + publishRequest.getAcceptedState().version() + '}'; |
| 172 | + } |
| 173 | + |
| 174 | + enum PublicationTargetState { |
| 175 | + NOT_STARTED, |
| 176 | + FAILED, |
| 177 | + SENT_PUBLISH_REQUEST, |
| 178 | + WAITING_FOR_QUORUM, |
| 179 | + SENT_APPLY_COMMIT, |
| 180 | + APPLIED_COMMIT, |
| 181 | + } |
| 182 | + |
| 183 | + class PublicationTarget { |
| 184 | + private final DiscoveryNode discoveryNode; |
| 185 | + private boolean ackIsPending = true; |
| 186 | + private PublicationTargetState state = PublicationTargetState.NOT_STARTED; |
| 187 | + |
| 188 | + PublicationTarget(DiscoveryNode discoveryNode) { |
| 189 | + this.discoveryNode = discoveryNode; |
| 190 | + } |
| 191 | + |
| 192 | + @Override |
| 193 | + public String toString() { |
| 194 | + return "PublicationTarget{" + |
| 195 | + "discoveryNode=" + discoveryNode + |
| 196 | + ", state=" + state + |
| 197 | + ", ackIsPending=" + ackIsPending + |
| 198 | + '}'; |
| 199 | + } |
| 200 | + |
| 201 | + void sendPublishRequest() { |
| 202 | + if (isFailed()) { |
| 203 | + return; |
| 204 | + } |
| 205 | + assert state == PublicationTargetState.NOT_STARTED : state + " -> " + PublicationTargetState.SENT_PUBLISH_REQUEST; |
| 206 | + state = PublicationTargetState.SENT_PUBLISH_REQUEST; |
| 207 | + Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublishResponseHandler()); |
| 208 | + // TODO Can this ^ fail with an exception? Target should be failed if so. |
| 209 | + assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); |
| 210 | + } |
| 211 | + |
| 212 | + void handlePublishResponse(PublishResponse publishResponse) { |
| 213 | + assert isWaitingForQuorum() : this; |
| 214 | + logger.trace("handlePublishResponse: handling [{}] from [{}])", publishResponse, discoveryNode); |
| 215 | + if (applyCommitRequest.isPresent()) { |
| 216 | + sendApplyCommit(); |
| 217 | + } else { |
| 218 | + Publication.this.handlePublishResponse(discoveryNode, publishResponse).ifPresent(applyCommit -> { |
| 219 | + assert applyCommitRequest.isPresent() == false; |
| 220 | + applyCommitRequest = Optional.of(applyCommit); |
| 221 | + ackListener.onCommit(TimeValue.timeValueMillis(currentTimeSupplier.getAsLong() - startTime)); |
| 222 | + publicationTargets.stream().filter(PublicationTarget::isWaitingForQuorum).forEach(PublicationTarget::sendApplyCommit); |
| 223 | + }); |
| 224 | + } |
| 225 | + } |
| 226 | + |
| 227 | + void sendApplyCommit() { |
| 228 | + assert state == PublicationTargetState.WAITING_FOR_QUORUM : state + " -> " + PublicationTargetState.SENT_APPLY_COMMIT; |
| 229 | + state = PublicationTargetState.SENT_APPLY_COMMIT; |
| 230 | + assert applyCommitRequest.isPresent(); |
| 231 | + Publication.this.sendApplyCommit(discoveryNode, applyCommitRequest.get(), new ApplyCommitResponseHandler()); |
| 232 | + assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); |
| 233 | + } |
| 234 | + |
| 235 | + void setAppliedCommit() { |
| 236 | + assert state == PublicationTargetState.SENT_APPLY_COMMIT : state + " -> " + PublicationTargetState.APPLIED_COMMIT; |
| 237 | + state = PublicationTargetState.APPLIED_COMMIT; |
| 238 | + ackOnce(null); |
| 239 | + } |
| 240 | + |
| 241 | + void setFailed(Exception e) { |
| 242 | + assert state != PublicationTargetState.APPLIED_COMMIT : state + " -> " + PublicationTargetState.FAILED; |
| 243 | + state = PublicationTargetState.FAILED; |
| 244 | + ackOnce(e); |
| 245 | + } |
| 246 | + |
| 247 | + void onFaultyNode(DiscoveryNode faultyNode) { |
| 248 | + if (isActive() && discoveryNode.equals(faultyNode)) { |
| 249 | + logger.debug("onFaultyNode: [{}] is faulty, failing target in publication {}", faultyNode, Publication.this); |
| 250 | + setFailed(new ElasticsearchException("faulty node")); |
| 251 | + onPossibleCommitFailure(); |
| 252 | + } |
| 253 | + } |
| 254 | + |
| 255 | + private void ackOnce(Exception e) { |
| 256 | + if (ackIsPending) { |
| 257 | + ackIsPending = false; |
| 258 | + ackListener.onNodeAck(discoveryNode, e); |
| 259 | + } |
| 260 | + } |
| 261 | + |
| 262 | + boolean isActive() { |
| 263 | + return state != PublicationTargetState.FAILED |
| 264 | + && state != PublicationTargetState.APPLIED_COMMIT; |
| 265 | + } |
| 266 | + |
| 267 | + boolean isWaitingForQuorum() { |
| 268 | + return state == PublicationTargetState.WAITING_FOR_QUORUM; |
| 269 | + } |
| 270 | + |
| 271 | + boolean mayCommitInFuture() { |
| 272 | + return (state == PublicationTargetState.NOT_STARTED |
| 273 | + || state == PublicationTargetState.SENT_PUBLISH_REQUEST |
| 274 | + || state == PublicationTargetState.WAITING_FOR_QUORUM); |
| 275 | + } |
| 276 | + |
| 277 | + boolean isFailed() { |
| 278 | + return state == PublicationTargetState.FAILED; |
| 279 | + } |
| 280 | + |
| 281 | + private class PublishResponseHandler implements ActionListener<PublishWithJoinResponse> { |
| 282 | + |
| 283 | + @Override |
| 284 | + public void onResponse(PublishWithJoinResponse response) { |
| 285 | + if (isFailed()) { |
| 286 | + logger.debug("PublishResponseHandler.handleResponse: already failed, ignoring response from [{}]", discoveryNode); |
| 287 | + assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); |
| 288 | + return; |
| 289 | + } |
| 290 | + |
| 291 | + // TODO: check if we need to pass the full response here or if it's sufficient to just pass the optional join. |
| 292 | + onPossibleJoin(discoveryNode, response); |
| 293 | + |
| 294 | + assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + PublicationTargetState.WAITING_FOR_QUORUM; |
| 295 | + state = PublicationTargetState.WAITING_FOR_QUORUM; |
| 296 | + handlePublishResponse(response.getPublishResponse()); |
| 297 | + |
| 298 | + assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); |
| 299 | + } |
| 300 | + |
| 301 | + @Override |
| 302 | + public void onFailure(Exception e) { |
| 303 | + assert e instanceof TransportException; |
| 304 | + final TransportException exp = (TransportException) e; |
| 305 | + if (exp.getRootCause() instanceof CoordinationStateRejectedException) { |
| 306 | + logger.debug("PublishResponseHandler: [{}] failed: {}", discoveryNode, exp.getRootCause().getMessage()); |
| 307 | + } else { |
| 308 | + logger.debug(() -> new ParameterizedMessage("PublishResponseHandler: [{}] failed", discoveryNode), exp); |
| 309 | + } |
| 310 | + assert ((TransportException) e).getRootCause() instanceof Exception; |
| 311 | + setFailed((Exception) exp.getRootCause()); |
| 312 | + onPossibleCommitFailure(); |
| 313 | + assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); |
| 314 | + } |
| 315 | + |
| 316 | + } |
| 317 | + |
| 318 | + private class ApplyCommitResponseHandler implements ActionListener<TransportResponse.Empty> { |
| 319 | + |
| 320 | + @Override |
| 321 | + public void onResponse(TransportResponse.Empty ignored) { |
| 322 | + if (isFailed()) { |
| 323 | + logger.debug("ApplyCommitResponseHandler.handleResponse: already failed, ignoring response from [{}]", |
| 324 | + discoveryNode); |
| 325 | + return; |
| 326 | + } |
| 327 | + setAppliedCommit(); |
| 328 | + onPossibleCompletion(); |
| 329 | + assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); |
| 330 | + } |
| 331 | + |
| 332 | + @Override |
| 333 | + public void onFailure(Exception e) { |
| 334 | + assert e instanceof TransportException; |
| 335 | + final TransportException exp = (TransportException) e; |
| 336 | + if (exp.getRootCause() instanceof CoordinationStateRejectedException) { |
| 337 | + logger.debug("ApplyCommitResponseHandler: [{}] failed: {}", discoveryNode, exp.getRootCause().getMessage()); |
| 338 | + } else { |
| 339 | + logger.debug(() -> new ParameterizedMessage("ApplyCommitResponseHandler: [{}] failed", discoveryNode), exp); |
| 340 | + } |
| 341 | + assert ((TransportException) e).getRootCause() instanceof Exception; |
| 342 | + setFailed((Exception) exp.getRootCause()); |
| 343 | + onPossibleCompletion(); |
| 344 | + assert publicationCompletedIffAllTargetsInactiveOrTimedOut(); |
| 345 | + } |
| 346 | + } |
| 347 | + } |
| 348 | +} |
0 commit comments