|
19 | 19 | import org.elasticsearch.common.Strings;
|
20 | 20 | import org.elasticsearch.common.breaker.CircuitBreakingException;
|
21 | 21 | import org.elasticsearch.common.xcontent.XContentBuilder;
|
22 |
| -import org.elasticsearch.index.IndexNotFoundException; |
23 | 22 | import org.elasticsearch.index.query.BoolQueryBuilder;
|
24 | 23 | import org.elasticsearch.index.query.QueryBuilder;
|
25 | 24 | import org.elasticsearch.script.ScriptException;
|
|
42 | 41 | import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
|
43 | 42 | import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
|
44 | 43 | import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
|
45 |
| -import org.elasticsearch.xpack.transform.transforms.pivot.AggregationResultUtils; |
46 | 44 | import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
|
47 | 45 | import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;
|
48 | 46 |
|
@@ -287,7 +285,7 @@ protected void onStart(long now, ActionListener<Boolean> listener) {
|
287 | 285 | // If the transform config index or the transform config is gone, something serious occurred
|
288 | 286 | // We are in an unknown state and should fail out
|
289 | 287 | if (failure instanceof ResourceNotFoundException) {
|
290 |
| - updateConfigListener.onFailure(new TransformConfigReloadingException(msg, failure)); |
| 288 | + updateConfigListener.onFailure(new TransformConfigLostOnReloadException(msg, failure)); |
291 | 289 | } else {
|
292 | 290 | auditor.warning(getJobId(), msg);
|
293 | 291 | updateConfigListener.onResponse(null);
|
@@ -477,37 +475,54 @@ synchronized void handleFailure(Exception e) {
|
477 | 475 |
|
478 | 476 | if (unwrappedException instanceof CircuitBreakingException) {
|
479 | 477 | handleCircuitBreakingException((CircuitBreakingException) unwrappedException);
|
480 |
| - } else if (unwrappedException instanceof ScriptException) { |
| 478 | + return; |
| 479 | + } |
| 480 | + |
| 481 | + if (unwrappedException instanceof ScriptException) { |
481 | 482 | handleScriptException((ScriptException) unwrappedException);
|
482 |
| - // irrecoverable error without special handling |
483 |
| - } else if (unwrappedException instanceof BulkIndexingException && ((BulkIndexingException) unwrappedException).isIrrecoverable()) { |
| 483 | + return; |
| 484 | + } |
| 485 | + |
| 486 | + if (unwrappedException instanceof BulkIndexingException && ((BulkIndexingException) unwrappedException).isIrrecoverable()) { |
484 | 487 | handleIrrecoverableBulkIndexingException((BulkIndexingException) unwrappedException);
|
485 |
| - } else if (unwrappedException instanceof IndexNotFoundException |
486 |
| - || unwrappedException instanceof AggregationResultUtils.AggregationExtractionException |
487 |
| - || unwrappedException instanceof TransformConfigReloadingException |
488 |
| - || unwrappedException instanceof ResourceNotFoundException |
489 |
| - || unwrappedException instanceof IllegalArgumentException) { |
490 |
| - failIndexer("task encountered irrecoverable failure: " + e.getMessage()); |
491 |
| - } else if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) { |
492 |
| - failIndexer( |
493 |
| - "task encountered more than " |
494 |
| - + context.getNumFailureRetries() |
495 |
| - + " failures; latest failure: " |
496 |
| - + ExceptionRootCauseFinder.getDetailedMessage(unwrappedException) |
497 |
| - ); |
498 |
| - } else { |
499 |
| - // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous |
500 |
| - // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one |
501 |
| - if (e.getMessage().equals(lastAuditedExceptionMessage) == false) { |
502 |
| - String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException); |
| 488 | + return; |
| 489 | + } |
503 | 490 |
|
504 |
| - auditor.warning( |
505 |
| - getJobId(), |
506 |
| - "Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger." |
507 |
| - ); |
508 |
| - lastAuditedExceptionMessage = message; |
509 |
| - } |
| 491 | + // irrecoverable error without special handling |
| 492 | + if (unwrappedException instanceof ElasticsearchException) { |
| 493 | + ElasticsearchException elasticsearchException = (ElasticsearchException) unwrappedException; |
| 494 | + if (ExceptionRootCauseFinder.IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) { |
| 495 | + failIndexer("task encountered irrecoverable failure: " + elasticsearchException.getDetailedMessage()); |
| 496 | + return; |
510 | 497 | }
|
| 498 | + } |
| 499 | + |
| 500 | + if (unwrappedException instanceof IllegalArgumentException) { |
| 501 | + failIndexer("task encountered irrecoverable failure: " + e.getMessage()); |
| 502 | + return; |
| 503 | + } |
| 504 | + |
| 505 | + if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) { |
| 506 | + failIndexer( |
| 507 | + "task encountered more than " |
| 508 | + + context.getNumFailureRetries() |
| 509 | + + " failures; latest failure: " |
| 510 | + + ExceptionRootCauseFinder.getDetailedMessage(unwrappedException) |
| 511 | + ); |
| 512 | + return; |
| 513 | + } |
| 514 | + |
| 515 | + // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous |
| 516 | + // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one |
| 517 | + if (e.getMessage().equals(lastAuditedExceptionMessage) == false) { |
| 518 | + String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException); |
| 519 | + |
| 520 | + auditor.warning( |
| 521 | + getJobId(), |
| 522 | + "Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger." |
| 523 | + ); |
| 524 | + lastAuditedExceptionMessage = message; |
| 525 | + } |
511 | 526 | }
|
512 | 527 |
|
513 | 528 | /**
|
@@ -901,8 +916,12 @@ private RunState determineRunStateAtStart() {
|
901 | 916 | return RunState.PARTIAL_RUN_IDENTIFY_CHANGES;
|
902 | 917 | }
|
903 | 918 |
|
904 |
| - static class TransformConfigReloadingException extends ElasticsearchException { |
905 |
| - TransformConfigReloadingException(String msg, Throwable cause, Object... args) { |
| 919 | + /** |
| 920 | + * Thrown when the transform configuration disappeared permanently. |
| 921 | + * (not if reloading failed due to an intermittent problem) |
| 922 | + */ |
| 923 | + static class TransformConfigLostOnReloadException extends ResourceNotFoundException { |
| 924 | + TransformConfigLostOnReloadException(String msg, Throwable cause, Object... args) { |
906 | 925 | super(msg, cause, args);
|
907 | 926 | }
|
908 | 927 | }
|
|
0 commit comments