connectorConfigs) {
+ return true;
+ }
+}
diff --git a/src/main/java/io/confluent/connect/elasticsearch/IndexableRecord.java b/src/main/java/io/confluent/connect/elasticsearch/IndexableRecord.java
deleted file mode 100644
index 2f1d16c57..000000000
--- a/src/main/java/io/confluent/connect/elasticsearch/IndexableRecord.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2018 Confluent Inc.
- *
- * Licensed under the Confluent Community License (the "License"); you may not use
- * this file except in compliance with the License. You may obtain a copy of the
- * License at
- *
- * http://www.confluent.io/confluent-community-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package io.confluent.connect.elasticsearch;
-
-import java.util.Objects;
-
-public class IndexableRecord {
-
- public final Key key;
- public final String payload;
- public final Long version;
-
- public IndexableRecord(Key key, String payload, Long version) {
- this.key = key;
- this.version = version;
- this.payload = payload;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- IndexableRecord that = (IndexableRecord) o;
- return Objects.equals(key, that.key)
- && Objects.equals(payload, that.payload)
- && Objects.equals(version, that.version);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(key, version, payload);
- }
-}
diff --git a/src/main/java/io/confluent/connect/elasticsearch/Key.java b/src/main/java/io/confluent/connect/elasticsearch/Key.java
deleted file mode 100644
index f59d572af..000000000
--- a/src/main/java/io/confluent/connect/elasticsearch/Key.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2018 Confluent Inc.
- *
- * Licensed under the Confluent Community License (the "License"); you may not use
- * this file except in compliance with the License. You may obtain a copy of the
- * License at
- *
- * http://www.confluent.io/confluent-community-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package io.confluent.connect.elasticsearch;
-
-import java.util.Objects;
-
-public class Key {
-
- public final String index;
- public final String type;
- public final String id;
-
- public Key(String index, String type, String id) {
- this.index = index;
- this.type = type;
- this.id = id;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Key that = (Key) o;
- return Objects.equals(index, that.index)
- && Objects.equals(type, that.type)
- && Objects.equals(id, that.id);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(index, type, id);
- }
-
- @Override
- public String toString() {
- return String.format("Key{%s/%s/%s}", index, type, id);
- }
-
-}
diff --git a/src/main/java/io/confluent/connect/elasticsearch/LogContext.java b/src/main/java/io/confluent/connect/elasticsearch/LogContext.java
deleted file mode 100644
index f8bd365ef..000000000
--- a/src/main/java/io/confluent/connect/elasticsearch/LogContext.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright [2018 - 2018] Confluent Inc.
- */
-
-package io.confluent.connect.elasticsearch;
-
-import org.slf4j.MDC;
-
-/**
- * Utility that works with Connect's MDC logging when it is enabled, so that the threads
- * created by this connector also include the same connector task specific MDC context plus
- * additional information that distinguishes those threads from each other and from the task thread.
- */
-public class LogContext implements AutoCloseable {
-
- /**
- * We can't reference Connect's constant, since this connector could be deployed to Connect
- * runtimes that don't yet have it.
- */
- private static final String CONNECTOR_CONTEXT = "connector.context";
-
- private final String previousContext;
- private final String currentContext;
-
- public LogContext() {
- this(MDC.get(CONNECTOR_CONTEXT), null);
- }
-
- protected LogContext(String currentContext, String suffix) {
- this.previousContext = currentContext;
- if (currentContext != null && suffix != null && !suffix.trim().isEmpty()) {
- this.currentContext = currentContext + suffix.trim() + " ";
- MDC.put(CONNECTOR_CONTEXT, this.currentContext);
- } else {
- this.currentContext = null;
- }
- }
-
- public LogContext create(String suffix) {
- if (previousContext != null && suffix != null && !suffix.trim().isEmpty()) {
- return new LogContext(previousContext, suffix);
- }
- return this;
- }
-
- @Override
- public void close() {
- if (currentContext != null) {
- if (previousContext != null) {
- MDC.put(CONNECTOR_CONTEXT, previousContext);
- } else {
- MDC.remove(CONNECTOR_CONTEXT);
- }
- }
- }
-}
diff --git a/src/main/java/io/confluent/connect/elasticsearch/Mapping.java b/src/main/java/io/confluent/connect/elasticsearch/Mapping.java
index fc0ee87fb..cd14dfadd 100644
--- a/src/main/java/io/confluent/connect/elasticsearch/Mapping.java
+++ b/src/main/java/io/confluent/connect/elasticsearch/Mapping.java
@@ -15,10 +15,6 @@
package io.confluent.connect.elasticsearch;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.gson.JsonObject;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
@@ -27,94 +23,207 @@
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.BINARY_TYPE;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.BOOLEAN_TYPE;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.BYTE_TYPE;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.DOUBLE_TYPE;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.FLOAT_TYPE;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.INTEGER_TYPE;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.KEYWORD_TYPE;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.LONG_TYPE;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_KEY;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.MAP_VALUE;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.SHORT_TYPE;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.STRING_TYPE;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConstants.TEXT_TYPE;
public class Mapping {
+ // Elasticsearch types
+ public static final String BOOLEAN_TYPE = "boolean";
+ public static final String BYTE_TYPE = "byte";
+ public static final String BINARY_TYPE = "binary";
+ public static final String SHORT_TYPE = "short";
+ public static final String INTEGER_TYPE = "integer";
+ public static final String LONG_TYPE = "long";
+ public static final String FLOAT_TYPE = "float";
+ public static final String DOUBLE_TYPE = "double";
+ public static final String STRING_TYPE = "string";
+ public static final String TEXT_TYPE = "text";
+ public static final String KEYWORD_TYPE = "keyword";
+ public static final String DATE_TYPE = "date";
+
+ // Elasticsearch mapping fields
+ private static final String DEFAULT_VALUE_FIELD = "null_value";
+ private static final String FIELDS_FIELD = "fields";
+ private static final String IGNORE_ABOVE_FIELD = "ignore_above";
+ public static final String KEY_FIELD = "key";
+ private static final String KEYWORD_FIELD = "keyword";
+ private static final String PROPERTIES_FIELD = "properties";
+ private static final String TYPE_FIELD = "type";
+ public static final String VALUE_FIELD = "value";
+
/**
- * Create an explicit mapping.
+ * Build mapping from the provided schema.
*
- * @param client The client to connect to Elasticsearch.
- * @param index The index to write to Elasticsearch.
- * @param type The type to create mapping for.
- * @param schema The schema used to infer mapping.
- * @throws IOException from underlying JestClient
+ * @param schema The schema used to build the mapping.
+ * @return the schema as a JSON mapping
*/
- public static void createMapping(
- ElasticsearchClient client,
- String index,
- String type,
- Schema schema
- )
- throws IOException {
- client.createMapping(index, type, schema);
+ public static XContentBuilder buildMapping(Schema schema) {
+ try {
+ XContentBuilder builder = XContentFactory.jsonBuilder();
+ builder.startObject();
+ {
+ buildMapping(schema, builder);
+ }
+ builder.endObject();
+ return builder;
+ } catch (IOException e) {
+ throw new ConnectException("Failed to build mapping for schema " + schema, e);
+ }
}
- /**
- * Get the JSON mapping for given index and type. Returns {@code null} if it does not exist.
- */
- public static JsonObject getMapping(ElasticsearchClient client, String index, String type)
+ private static XContentBuilder buildMapping(Schema schema, XContentBuilder builder)
throws IOException {
- return client.getMapping(index, type);
- }
- /**
- * Infer mapping from the provided schema.
- *
- * @param schema The schema used to infer mapping.
- */
- public static JsonNode inferMapping(ElasticsearchClient client, Schema schema) {
if (schema == null) {
throw new DataException("Cannot infer mapping without schema.");
}
// Handle logical types
- JsonNode logicalConversion = inferLogicalMapping(schema);
+ XContentBuilder logicalConversion = inferLogicalMapping(builder, schema);
if (logicalConversion != null) {
return logicalConversion;
}
Schema.Type schemaType = schema.type();
- ObjectNode properties = JsonNodeFactory.instance.objectNode();
- ObjectNode fields = JsonNodeFactory.instance.objectNode();
- switch (schemaType) {
+ switch (schema.type()) {
case ARRAY:
- return inferMapping(client, schema.valueSchema());
+ return buildMapping(schema.valueSchema(), builder);
+
case MAP:
- properties.set("properties", fields);
- fields.set(MAP_KEY, inferMapping(client, schema.keySchema()));
- fields.set(MAP_VALUE, inferMapping(client, schema.valueSchema()));
- return properties;
+ return buildMap(schema, builder);
+
case STRUCT:
- properties.set("properties", fields);
- for (Field field : schema.fields()) {
- fields.set(field.name(), inferMapping(client, field.schema()));
+ return buildStruct(schema, builder);
+
+ default:
+ return inferPrimitive(builder, getElasticsearchType(schemaType), schema.defaultValue());
+ }
+ }
+
+ private static void addTextMapping(XContentBuilder builder) throws IOException {
+ // Add additional mapping for indexing, per https://www.elastic.co/blog/strings-are-dead-long-live-strings
+ builder.startObject(FIELDS_FIELD);
+ {
+ builder.startObject(KEYWORD_FIELD);
+ {
+ builder.field(TYPE_FIELD, KEYWORD_TYPE);
+ builder.field(IGNORE_ABOVE_FIELD, 256);
+ }
+ builder.endObject();
+ }
+ builder.endObject();
+ }
+
+ private static XContentBuilder buildMap(Schema schema, XContentBuilder builder)
+ throws IOException {
+
+ builder.startObject(PROPERTIES_FIELD);
+ {
+ builder.startObject(KEY_FIELD);
+ {
+ buildMapping(schema.keySchema(), builder);
+ }
+ builder.endObject();
+ builder.startObject(VALUE_FIELD);
+ {
+ buildMapping(schema.valueSchema(), builder);
+ }
+ builder.endObject();
+ }
+ return builder.endObject();
+ }
+
+ private static XContentBuilder buildStruct(Schema schema, XContentBuilder builder)
+ throws IOException {
+
+ builder.startObject(PROPERTIES_FIELD);
+ {
+ for (Field field : schema.fields()) {
+ builder.startObject(field.name());
+ {
+ buildMapping(field.schema(), builder);
}
- return properties;
+ builder.endObject();
+ }
+ }
+ return builder.endObject();
+ }
+
+ private static XContentBuilder inferPrimitive(
+ XContentBuilder builder,
+ String type,
+ Object defaultValue
+ ) throws IOException {
+
+ if (type == null) {
+ throw new DataException(String.format("Invalid primitive type %s.", type));
+ }
+
+ builder.field(TYPE_FIELD, type);
+ if (type.equals(TEXT_TYPE)) {
+ addTextMapping(builder);
+ }
+
+ if (defaultValue == null) {
+ return builder;
+ }
+
+ switch (type) {
+ case BYTE_TYPE:
+ return builder.field(DEFAULT_VALUE_FIELD, (byte) defaultValue);
+ case SHORT_TYPE:
+ return builder.field(DEFAULT_VALUE_FIELD, (short) defaultValue);
+ case INTEGER_TYPE:
+ return builder.field(DEFAULT_VALUE_FIELD, (int) defaultValue);
+ case LONG_TYPE:
+ return builder.field(DEFAULT_VALUE_FIELD, (long) defaultValue);
+ case FLOAT_TYPE:
+ return builder.field(DEFAULT_VALUE_FIELD, (float) defaultValue);
+ case DOUBLE_TYPE:
+ return builder.field(DEFAULT_VALUE_FIELD, (double) defaultValue);
+ case BOOLEAN_TYPE:
+ return builder.field(DEFAULT_VALUE_FIELD, (boolean) defaultValue);
+ case DATE_TYPE:
+ return builder.field(DEFAULT_VALUE_FIELD, ((java.util.Date) defaultValue).getTime());
+ /*
+ * IGNORE default values for text and binary types as this is not supported by ES side.
+ * see https://www.elastic.co/guide/en/elasticsearch/reference/current/text.html and
+ * https://www.elastic.co/guide/en/elasticsearch/reference/current/binary.html for details.
+ */
+ case STRING_TYPE:
+ case TEXT_TYPE:
+ case BINARY_TYPE:
+ return builder;
default:
- String esType = getElasticsearchType(client, schemaType);
- return inferPrimitive(esType, schema.defaultValue());
+ throw new DataException("Invalid primitive type " + type + ".");
+ }
+ }
+
+ private static XContentBuilder inferLogicalMapping(XContentBuilder builder, Schema schema)
+ throws IOException {
+
+ if (schema.name() == null) {
+ return null;
+ }
+
+ switch (schema.name()) {
+ case Date.LOGICAL_NAME:
+ case Time.LOGICAL_NAME:
+ case Timestamp.LOGICAL_NAME:
+ return inferPrimitive(builder, DATE_TYPE, schema.defaultValue());
+ case Decimal.LOGICAL_NAME:
+ return inferPrimitive(builder, DOUBLE_TYPE, schema.defaultValue());
+ default:
+ // User-defined type or unknown built-in
+ return null;
}
}
// visible for testing
- protected static String getElasticsearchType(ElasticsearchClient client, Schema.Type schemaType) {
+ protected static String getElasticsearchType(Schema.Type schemaType) {
switch (schemaType) {
case BOOLEAN:
return BOOLEAN_TYPE;
@@ -131,119 +240,11 @@ protected static String getElasticsearchType(ElasticsearchClient client, Schema.
case FLOAT64:
return DOUBLE_TYPE;
case STRING:
- switch (client.getVersion()) {
- case ES_V1:
- case ES_V2:
- return STRING_TYPE;
- case ES_V5:
- case ES_V6:
- default:
- return TEXT_TYPE;
- }
+ return TEXT_TYPE;
case BYTES:
return BINARY_TYPE;
default:
return null;
}
}
-
- private static JsonNode inferLogicalMapping(Schema schema) {
- String schemaName = schema.name();
- Object defaultValue = schema.defaultValue();
- if (schemaName == null) {
- return null;
- }
-
- switch (schemaName) {
- case Date.LOGICAL_NAME:
- case Time.LOGICAL_NAME:
- case Timestamp.LOGICAL_NAME:
- return inferPrimitive(ElasticsearchSinkConnectorConstants.DATE_TYPE, defaultValue);
- case Decimal.LOGICAL_NAME:
- return inferPrimitive(ElasticsearchSinkConnectorConstants.DOUBLE_TYPE, defaultValue);
- default:
- // User-defined type or unknown built-in
- return null;
- }
- }
-
- private static JsonNode inferPrimitive(String type, Object defaultValue) {
- if (type == null) {
- throw new ConnectException("Invalid primitive type.");
- }
-
- ObjectNode obj = JsonNodeFactory.instance.objectNode();
- obj.set("type", JsonNodeFactory.instance.textNode(type));
- if (type.equals(TEXT_TYPE)) {
- addTextMapping(obj);
- }
- JsonNode defaultValueNode = null;
- if (defaultValue != null) {
- switch (type) {
- case ElasticsearchSinkConnectorConstants.BYTE_TYPE:
- defaultValueNode = JsonNodeFactory.instance.numberNode((byte) defaultValue);
- break;
- case ElasticsearchSinkConnectorConstants.SHORT_TYPE:
- defaultValueNode = JsonNodeFactory.instance.numberNode((short) defaultValue);
- break;
- case ElasticsearchSinkConnectorConstants.INTEGER_TYPE:
- defaultValueNode = JsonNodeFactory.instance.numberNode((int) defaultValue);
- break;
- case ElasticsearchSinkConnectorConstants.LONG_TYPE:
- defaultValueNode = JsonNodeFactory.instance.numberNode((long) defaultValue);
- break;
- case ElasticsearchSinkConnectorConstants.FLOAT_TYPE:
- defaultValueNode = JsonNodeFactory.instance.numberNode((float) defaultValue);
- break;
- case ElasticsearchSinkConnectorConstants.DOUBLE_TYPE:
- defaultValueNode = JsonNodeFactory.instance.numberNode((double) defaultValue);
- break;
- case ElasticsearchSinkConnectorConstants.STRING_TYPE:
- case ElasticsearchSinkConnectorConstants.TEXT_TYPE:
- case ElasticsearchSinkConnectorConstants.BINARY_TYPE:
- // IGNORE default values for text and binary types as this is not supported by ES side.
- // see https://www.elastic.co/guide/en/elasticsearch/reference/current/text.html
- // https://www.elastic.co/guide/en/elasticsearch/reference/current/binary.html
- // for more details.
- //defaultValueNode = null;
- break;
- case ElasticsearchSinkConnectorConstants.BOOLEAN_TYPE:
- defaultValueNode = JsonNodeFactory.instance.booleanNode((boolean) defaultValue);
- break;
- case ElasticsearchSinkConnectorConstants.DATE_TYPE:
- long value = ((java.util.Date) defaultValue).getTime();
- defaultValueNode = JsonNodeFactory.instance.numberNode(value);
- break;
- default:
- throw new DataException("Invalid primitive type.");
- }
- }
- if (defaultValueNode != null) {
- obj.set("null_value", defaultValueNode);
- }
- return obj;
- }
-
- private static void addTextMapping(ObjectNode obj) {
- // Add additional mapping for indexing, per https://www.elastic.co/blog/strings-are-dead-long-live-strings
- ObjectNode keyword = JsonNodeFactory.instance.objectNode();
- keyword.set("type", JsonNodeFactory.instance.textNode(KEYWORD_TYPE));
- keyword.set("ignore_above", JsonNodeFactory.instance.numberNode(256));
- ObjectNode fields = JsonNodeFactory.instance.objectNode();
- fields.set("keyword", keyword);
- obj.set("fields", fields);
- }
-
- private static byte[] bytes(Object value) {
- final byte[] bytes;
- if (value instanceof ByteBuffer) {
- final ByteBuffer buffer = ((ByteBuffer) value).slice();
- bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- } else {
- bytes = (byte[]) value;
- }
- return bytes;
- }
-
}
diff --git a/src/main/java/io/confluent/connect/elasticsearch/RetryUtil.java b/src/main/java/io/confluent/connect/elasticsearch/RetryUtil.java
index 8d0212770..8d968e99c 100644
--- a/src/main/java/io/confluent/connect/elasticsearch/RetryUtil.java
+++ b/src/main/java/io/confluent/connect/elasticsearch/RetryUtil.java
@@ -15,9 +15,15 @@
package io.confluent.connect.elasticsearch;
+import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Utility to compute the retry times for a given attempt, using exponential backoff.
*
@@ -31,6 +37,8 @@
*/
public class RetryUtil {
+ private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
/**
* An arbitrary absolute maximum practical retry time.
*/
@@ -86,5 +94,84 @@ public static long computeRetryWaitTimeInMillis(int retryAttempts,
return result < 0L ? MAX_RETRY_TIME_MS : Math.min(MAX_RETRY_TIME_MS, result);
}
+ /**
+ * Call the supplied function up to the {@code maxTotalAttempts}.
+ *
+ * The description of the function should be a succinct, human-readable present tense phrase
+ * that summarizes the function, such as "read tables" or "connect to database" or
+ * "make remote request". This description will be used within exception and log messages.
+ *
+ * @param description present tense description of the action, used to create the error
+ * message; may not be null
+ * @param function the function to call; may not be null
+ * @param maxTotalAttempts maximum number of total attempts, including the first call
+ * @param initialBackoff the initial backoff in ms before retrying
+ * @param the return type of the function to retry
+ * @return the function's return value
+ * @throws ConnectException if the function failed after retries
+ */
+ public static T callWithRetries(
+ String description,
+ Callable function,
+ int maxTotalAttempts,
+ long initialBackoff
+ ) {
+ return callWithRetries(description, function, maxTotalAttempts, initialBackoff, Time.SYSTEM);
+ }
+ /**
+ * Call the supplied function up to the {@code maxTotalAttempts}.
+ *
+ * The description of the function should be a succinct, human-readable present tense phrase
+ * that summarizes the function, such as "read tables" or "connect to database" or
+ * "make remote request". This description will be used within exception and log messages.
+ *
+ * @param description present tense description of the action, used to create the error
+ * message; may not be null
+ * @param function the function to call; may not be null
+ * @param maxTotalAttempts maximum number of attempts
+ * @param initialBackoff the initial backoff in ms before retrying
+ * @param clock the clock to use for waiting
+ * @param the return type of the function to retry
+ * @return the function's return value
+ * @throws ConnectException if the function failed after retries
+ */
+ protected static T callWithRetries(
+ String description,
+ Callable function,
+ int maxTotalAttempts,
+ long initialBackoff,
+ Time clock
+ ) {
+ assert description != null;
+ assert function != null;
+ int attempt = 0;
+ while (true) {
+ ++attempt;
+ try {
+ log.trace(
+ "Try {} (attempt {} of {})",
+ description,
+ attempt,
+ maxTotalAttempts
+ );
+ T call = function.call();
+ return call;
+ } catch (Exception e) {
+ if (attempt >= maxTotalAttempts) {
+ String msg = String.format("Failed to %s due to '%s' after %d attempt(s)",
+ description, e, attempt);
+ log.error(msg, e);
+ throw new ConnectException(msg, e);
+ }
+
+ // Otherwise it is retriable and we should retry
+ long backoff = computeRandomRetryWaitTimeInMillis(attempt, initialBackoff);
+
+ log.warn("Failed to {} due to {}. Retrying attempt ({}/{}) after backoff of {} ms",
+ description, e.getCause(), attempt, maxTotalAttempts, backoff);
+ clock.sleep(backoff);
+ }
+ }
+ }
}
diff --git a/src/main/java/io/confluent/connect/elasticsearch/SecurityProtocol.java b/src/main/java/io/confluent/connect/elasticsearch/SecurityProtocol.java
deleted file mode 100644
index 359ed4dae..000000000
--- a/src/main/java/io/confluent/connect/elasticsearch/SecurityProtocol.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2019 Confluent Inc.
- *
- * Licensed under the Confluent Community License (the "License"); you may not use
- * this file except in compliance with the License. You may obtain a copy of the
- * License at
- *
- * http://www.confluent.io/confluent-community-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package io.confluent.connect.elasticsearch;
-
-public enum SecurityProtocol {
-
- /**
- * Un-authenticated, non-encrypted channel
- */
- PLAINTEXT(0, "PLAINTEXT"),
- /**
- * SSL channel
- */
- SSL(1, "SSL");
-
- private final int id;
- private final String name;
-
- SecurityProtocol(int id, String name) {
- this.id = id;
- this.name = name;
- }
-}
diff --git a/src/main/java/io/confluent/connect/elasticsearch/Validator.java b/src/main/java/io/confluent/connect/elasticsearch/Validator.java
index 14747aa56..671861611 100644
--- a/src/main/java/io/confluent/connect/elasticsearch/Validator.java
+++ b/src/main/java/io/confluent/connect/elasticsearch/Validator.java
@@ -1,60 +1,83 @@
/*
* Copyright 2020 Confluent Inc.
*
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- **/
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
package io.confluent.connect.elasticsearch;
+import org.apache.http.HttpHost;
+import org.apache.kafka.common.config.Config;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.common.config.SslConfigs;
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SecurityProtocol;
+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_TOPICS_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_TOPICS_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.KERBEROS_KEYTAB_PATH_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.KERBEROS_PRINCIPAL_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_HOST_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_PASSWORD_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_PORT_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_USERNAME_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SECURITY_PROTOCOL_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SSL_CONFIG_PREFIX;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import org.apache.kafka.common.config.Config;
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.config.ConfigValue;
-import org.apache.kafka.common.config.SslConfigs;
-
public class Validator {
+ private static final Logger log = LoggerFactory.getLogger(Validator.class);
+
private ElasticsearchSinkConnectorConfig config;
private Map values;
private List validations;
+ private ClientFactory clientFactory;
public Validator(Map props) {
+ this(props, null);
+ }
+
+ // Exposed for testing
+ protected Validator(Map props, ClientFactory clientFactory) {
try {
this.config = new ElasticsearchSinkConnectorConfig(props);
} catch (ConfigException e) {
// some configs are invalid
}
+ this.clientFactory = clientFactory == null ? this::createClient : clientFactory;
validations = ElasticsearchSinkConnectorConfig.CONFIG.validate(props);
values = validations.stream().collect(Collectors.toMap(ConfigValue::name, Function.identity()));
}
@@ -65,17 +88,26 @@ public Config validate() {
return new Config(validations);
}
- validateCredentials();
- validateIgnoreConfigs();
- validateLingerMs();
- validateMaxBufferedRecords();
- validateProxy();
- validateSsl();
+ try (RestHighLevelClient client = clientFactory.client()) {
+ validateCredentials();
+ validateIgnoreConfigs();
+ validateKerberos();
+ validateLingerMs();
+ validateMaxBufferedRecords();
+ validateProxy();
+ validateSsl();
+
+ if (!hasErrors()) {
+ // no point if previous configs are invalid
+ validateConnection(client);
+ }
+ } catch (IOException e) {
+ log.warn("Closing the client failed.", e);
+ }
return new Config(validations);
}
-
private void validateCredentials() {
boolean onlyOneSet = config.username() != null ^ config.password() != null;
if (onlyOneSet) {
@@ -105,6 +137,50 @@ private void validateIgnoreConfigs() {
}
}
+ private void validateKerberos() {
+ boolean onlyOneSet = config.kerberosUserPrincipal() != null ^ config.keytabPath() != null;
+ if (onlyOneSet) {
+ String errorMessage = String.format(
+ "Either both or neither '%s' and '%s' must be set.",
+ KERBEROS_PRINCIPAL_CONFIG,
+ KERBEROS_KEYTAB_PATH_CONFIG
+ );
+ addErrorMessage(KERBEROS_PRINCIPAL_CONFIG, errorMessage);
+ addErrorMessage(KERBEROS_KEYTAB_PATH_CONFIG, errorMessage);
+ }
+
+ if (config.isKerberosEnabled()) {
+ // currently do not support Kerberos with regular auth
+ if (config.isAuthenticatedConnection()) {
+ String errorMessage = String.format(
+ "Either only Kerberos (%s, %s) or connection credentials (%s, %s) must be set.",
+ KERBEROS_PRINCIPAL_CONFIG,
+ KERBEROS_KEYTAB_PATH_CONFIG,
+ CONNECTION_USERNAME_CONFIG,
+ CONNECTION_PASSWORD_CONFIG
+ );
+ addErrorMessage(KERBEROS_PRINCIPAL_CONFIG, errorMessage);
+ addErrorMessage(KERBEROS_KEYTAB_PATH_CONFIG, errorMessage);
+ addErrorMessage(CONNECTION_USERNAME_CONFIG, errorMessage);
+ addErrorMessage(CONNECTION_PASSWORD_CONFIG, errorMessage);
+ }
+
+ // currently do not support Kerberos with proxy
+ if (config.isBasicProxyConfigured()) {
+ String errorMessage = String.format(
+ "Kerberos (%s, %s) is not supported with proxy settings (%s).",
+ KERBEROS_PRINCIPAL_CONFIG,
+ KERBEROS_KEYTAB_PATH_CONFIG,
+ PROXY_HOST_CONFIG
+ );
+ addErrorMessage(KERBEROS_PRINCIPAL_CONFIG, errorMessage);
+ addErrorMessage(KERBEROS_KEYTAB_PATH_CONFIG, errorMessage);
+ addErrorMessage(PROXY_HOST_CONFIG, errorMessage);
+ }
+ }
+
+ }
+
private void validateLingerMs() {
if (config.lingerMs() > config.flushTimeoutMs()) {
String errorMessage = String.format(
@@ -164,7 +240,7 @@ private void validateProxy() {
private void validateSsl() {
Map sslConfigs = config.originalsWithPrefix(SSL_CONFIG_PREFIX);
- if (!config.secured()) {
+ if (!config.isSslEnabled()) {
if (!sslConfigs.isEmpty()) {
String errorMessage = String.format(
"'%s' must be set to '%s' to use SSL configs.",
@@ -190,7 +266,108 @@ private void validateSsl() {
}
}
+ private void validateConnection(RestHighLevelClient client) {
+ boolean successful;
+ String exceptionMessage = "";
+ try {
+ successful = client.ping(RequestOptions.DEFAULT);
+ } catch (ElasticsearchStatusException e) {
+ switch (e.status()) {
+ case FORBIDDEN:
+ // ES is up, but user is not authorized to ping server
+ successful = true;
+ break;
+ default:
+ successful = false;
+ exceptionMessage = String.format("Error message: %s", e.getMessage());
+ }
+ } catch (Exception e) {
+ successful = false;
+ exceptionMessage = String.format("Error message: %s", e.getMessage());
+ }
+ if (!successful) {
+ String errorMessage = String.format(
+ "Could not connect to Elasticsearch. %s",
+ exceptionMessage
+ );
+ addErrorMessage(CONNECTION_URL_CONFIG, errorMessage);
+
+ if (config.isAuthenticatedConnection()) {
+ errorMessage = String.format(
+ "Could not authenticate the user. Check the '%s' and '%s'. %s",
+ CONNECTION_USERNAME_CONFIG,
+ CONNECTION_PASSWORD_CONFIG,
+ exceptionMessage
+ );
+ addErrorMessage(CONNECTION_USERNAME_CONFIG, errorMessage);
+ addErrorMessage(CONNECTION_PASSWORD_CONFIG, errorMessage);
+ }
+
+ if (config.isSslEnabled()) {
+ errorMessage = String.format(
+ "Could not connect to Elasticsearch. Check your SSL settings.%s",
+ exceptionMessage
+ );
+
+ addErrorMessage(SECURITY_PROTOCOL_CONFIG, errorMessage);
+ }
+
+ if (config.isKerberosEnabled()) {
+ errorMessage = String.format(
+ "Could not connect to Elasticsearch. Check your Kerberos settings. %s",
+ exceptionMessage
+ );
+
+ addErrorMessage(KERBEROS_PRINCIPAL_CONFIG, errorMessage);
+ addErrorMessage(KERBEROS_KEYTAB_PATH_CONFIG, errorMessage);
+ }
+
+ if (config.isBasicProxyConfigured()) {
+ errorMessage = String.format(
+ "Could not connect to Elasticsearch. Check your proxy settings. %s",
+ exceptionMessage
+ );
+ addErrorMessage(PROXY_HOST_CONFIG, errorMessage);
+ addErrorMessage(PROXY_PORT_CONFIG, errorMessage);
+
+ if (config.isProxyWithAuthenticationConfigured()) {
+ addErrorMessage(PROXY_USERNAME_CONFIG, errorMessage);
+ addErrorMessage(PROXY_PASSWORD_CONFIG, errorMessage);
+ }
+ }
+ }
+ }
+
private void addErrorMessage(String property, String error) {
values.get(property).addErrorMessage(error);
}
+
+ private RestHighLevelClient createClient() {
+ ConfigCallbackHandler configCallbackHandler = new ConfigCallbackHandler(config);
+ return new RestHighLevelClient(
+ RestClient
+ .builder(
+ config.connectionUrls()
+ .stream()
+ .map(HttpHost::create)
+ .collect(Collectors.toList())
+ .toArray(new HttpHost[config.connectionUrls().size()])
+ )
+ .setHttpClientConfigCallback(configCallbackHandler)
+ );
+ }
+
+ private boolean hasErrors() {
+ for (ConfigValue config : validations) {
+ if (!config.errorMessages().isEmpty()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ interface ClientFactory {
+ RestHighLevelClient client();
+ }
}
diff --git a/src/main/java/io/confluent/connect/elasticsearch/bulk/BulkClient.java b/src/main/java/io/confluent/connect/elasticsearch/bulk/BulkClient.java
deleted file mode 100644
index d21837678..000000000
--- a/src/main/java/io/confluent/connect/elasticsearch/bulk/BulkClient.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright 2018 Confluent Inc.
- *
- * Licensed under the Confluent Community License (the "License"); you may not use
- * this file except in compliance with the License. You may obtain a copy of the
- * License at
- *
- * http://www.confluent.io/confluent-community-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package io.confluent.connect.elasticsearch.bulk;
-
-import java.io.IOException;
-import java.util.List;
-
-public interface BulkClient {
-
- B bulkRequest(List batch);
-
- BulkResponse execute(B req) throws IOException;
-
-}
diff --git a/src/main/java/io/confluent/connect/elasticsearch/bulk/BulkProcessor.java b/src/main/java/io/confluent/connect/elasticsearch/bulk/BulkProcessor.java
deleted file mode 100644
index 34239f7c7..000000000
--- a/src/main/java/io/confluent/connect/elasticsearch/bulk/BulkProcessor.java
+++ /dev/null
@@ -1,667 +0,0 @@
-/*
- * Copyright 2018 Confluent Inc.
- *
- * Licensed under the Confluent Community License (the "License"); you may not use
- * this file except in compliance with the License. You may obtain a copy of the
- * License at
- *
- * http://www.confluent.io/confluent-community-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package io.confluent.connect.elasticsearch.bulk;
-
-import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
-import io.confluent.connect.elasticsearch.LogContext;
-import io.confluent.connect.elasticsearch.RetryUtil;
-
-import io.searchbox.core.BulkResult.BulkResultItem;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.sink.ErrantRecordReporter;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.List;
-import java.util.Locale;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * @param record type
- * @param bulk request type
- */
-public class BulkProcessor {
-
- private static final Logger log = LoggerFactory.getLogger(BulkProcessor.class);
-
- private static final AtomicLong BATCH_ID_GEN = new AtomicLong();
-
- private final Time time;
- private final BulkClient bulkClient;
- private final int maxBufferedRecords;
- private final int batchSize;
- private final long lingerMs;
- private final int maxRetries;
- private final long retryBackoffMs;
- private final BehaviorOnMalformedDoc behaviorOnMalformedDoc;
- private final ErrantRecordReporter reporter;
-
- private final Thread farmer;
- private final ExecutorService executor;
-
- // thread-safe state, can be mutated safely without synchronization,
- // but may be part of synchronized(this) wait() conditions so need to notifyAll() on changes
- private volatile boolean stopRequested = false;
- private volatile boolean flushRequested = false;
- private final AtomicReference error = new AtomicReference<>();
-
- // shared state, synchronized on (this), may be part of wait() conditions so need notifyAll() on
- // changes
- private final Deque unsentRecords;
- protected final ConcurrentMap recordsToReportOnError; // visible for tests
- private int inFlightRecords = 0;
- private final LogContext logContext = new LogContext();
-
- public BulkProcessor(
- Time time,
- BulkClient bulkClient,
- int maxBufferedRecords,
- int maxInFlightRequests,
- int batchSize,
- long lingerMs,
- int maxRetries,
- long retryBackoffMs,
- BehaviorOnMalformedDoc behaviorOnMalformedDoc,
- ErrantRecordReporter reporter
- ) {
- this.time = time;
- this.bulkClient = bulkClient;
- this.maxBufferedRecords = maxBufferedRecords;
- this.batchSize = batchSize;
- this.lingerMs = lingerMs;
- this.maxRetries = maxRetries;
- this.retryBackoffMs = retryBackoffMs;
- this.behaviorOnMalformedDoc = behaviorOnMalformedDoc;
- this.reporter = reporter;
-
- unsentRecords = new ArrayDeque<>(maxBufferedRecords);
- recordsToReportOnError = reporter != null
- ? new ConcurrentHashMap<>(maxBufferedRecords)
- : null;
-
- final ThreadFactory threadFactory = makeThreadFactory();
- farmer = threadFactory.newThread(farmerTask());
- executor = Executors.newFixedThreadPool(maxInFlightRequests, threadFactory);
- }
-
- private ThreadFactory makeThreadFactory() {
- final AtomicInteger threadCounter = new AtomicInteger();
- final Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
- new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- log.error("Uncaught exception in BulkProcessor thread {}", t, e);
- failAndStop(e);
- }
- };
- return new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- final int threadId = threadCounter.getAndIncrement();
- final int objId = System.identityHashCode(this);
- final Thread t = new BulkProcessorThread(logContext, r, objId, threadId);
- t.setDaemon(true);
- t.setUncaughtExceptionHandler(uncaughtExceptionHandler);
- return t;
- }
- };
- }
-
- // visible for testing
- Runnable farmerTask() {
- return () -> {
- try (LogContext context = logContext.create("Farmer1")) {
- log.debug("Starting farmer task");
- try {
- List> futures = new ArrayList<>();
- while (!stopRequested) {
- // submitBatchWhenReady waits for lingerMs so we won't spin here unnecessarily
- futures.add(submitBatchWhenReady());
-
- // after we submit, look at any previous futures that were completed and call get() on
- // them so that exceptions are propagated.
- List> unfinishedFutures = new ArrayList<>();
- for (Future f : futures) {
- if (f.isDone()) {
- BulkResponse resp = f.get();
- log.debug("Bulk request completed with status {}", resp);
- } else {
- unfinishedFutures.add(f);
- }
- }
- log.debug("Processing next batch with {} outstanding batch requests in flight",
- unfinishedFutures.size());
- futures = unfinishedFutures;
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new ConnectException(e);
- }
- log.debug("Finished farmer task");
- }
- };
- }
-
- // Visible for testing
- synchronized Future submitBatchWhenReady() throws InterruptedException {
- for (long waitStartTimeMs = time.milliseconds(), elapsedMs = 0;
- !stopRequested && !canSubmit(elapsedMs);
- elapsedMs = time.milliseconds() - waitStartTimeMs) {
- // when linger time has already elapsed, we still have to ensure the other submission
- // conditions hence the wait(0) in that case
- wait(Math.max(0, lingerMs - elapsedMs));
- }
-
- // at this point, either stopRequested or canSubmit
- return stopRequested
- ? CompletableFuture.completedFuture(
- BulkResponse.failure(
- false,
- "request not submitted during shutdown",
- Collections.emptyMap()
- )
- )
- : submitBatch();
- }
-
- private synchronized Future submitBatch() {
- final int numUnsentRecords = unsentRecords.size();
- assert numUnsentRecords > 0;
- final int batchableSize = Math.min(batchSize, numUnsentRecords);
- final List batch = new ArrayList<>(batchableSize);
- for (int i = 0; i < batchableSize; i++) {
- batch.add(unsentRecords.removeFirst());
- }
- inFlightRecords += batchableSize;
- log.debug(
- "Submitting batch of {} records; {} unsent and {} total in-flight records",
- batchableSize,
- numUnsentRecords,
- inFlightRecords
- );
- return executor.submit(new BulkTask(batch));
- }
-
- /**
- * Submission is possible when there are unsent records and:
- *
- * - flush is called, or
- * - the linger timeout passes, or
- * - there are sufficient records to fill a batch
- *
- */
- private synchronized boolean canSubmit(long elapsedMs) {
- return !unsentRecords.isEmpty()
- && (flushRequested || elapsedMs >= lingerMs || unsentRecords.size() >= batchSize);
- }
-
- /**
- * Start concurrently creating and sending batched requests using the client.
- */
- public void start() {
- farmer.start();
- }
-
- /**
- * Initiate shutdown.
- *
- * Pending buffered records are not automatically flushed, so call {@link #flush(long)} before
- * this method if this is desirable.
- */
- public void stop() {
- log.debug("stop");
- stopRequested = true; // this stops the farmer task
- synchronized (this) {
- // shutdown the pool under synchronization to avoid rejected submissions
- executor.shutdown();
- notifyAll();
- }
- }
-
- /**
- * Block upto {@code timeoutMs} till shutdown is complete.
- *
- *
This should only be called after a previous {@link #stop()} invocation.
- */
- public void awaitStop(long timeoutMs) {
- assert stopRequested;
- try {
- if (!executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
- throw new ConnectException("Timed-out waiting for executor termination");
- }
- } catch (InterruptedException e) {
- throw new ConnectException(e);
- } finally {
- executor.shutdownNow();
- }
- }
-
- /**
- * @return whether {@link #stop()} has been requested
- */
- public boolean isStopping() {
- return stopRequested;
- }
-
- /**
- * @return whether any task failed with an error
- */
- public boolean isFailed() {
- return error.get() != null;
- }
-
- /**
- * @return {@link #isTerminal()} or {@link #isFailed()}
- */
- public boolean isTerminal() {
- return isStopping() || isFailed();
- }
-
- /**
- * Throw a {@link ConnectException} if {@link #isStopping()}.
- */
- public void throwIfStopping() {
- if (stopRequested) {
- throw new ConnectException("Stopping");
- }
- }
-
- /**
- * Throw the relevant {@link ConnectException} if {@link #isFailed()}.
- */
- public void throwIfFailed() {
- if (isFailed()) {
- throw error.get();
- }
- }
-
- /**
- * {@link #throwIfFailed()} and {@link #throwIfStopping()}
- */
- public void throwIfTerminal() {
- throwIfFailed();
- throwIfStopping();
- }
-
- /**
- * Add a record, may block upto {@code timeoutMs} if at capacity with respect to
- * {@code maxBufferedRecords}.
- *
- *
If any task has failed prior to or while blocked in the add, or if the timeout expires
- * while blocked, {@link ConnectException} will be thrown.
- */
- public synchronized void add(R record, SinkRecord original, long timeoutMs) {
- throwIfTerminal();
-
- int numBufferedRecords = bufferedRecords();
- if (numBufferedRecords >= maxBufferedRecords) {
- log.trace(
- "Buffer full at {} records, so waiting up to {} ms before adding",
- numBufferedRecords,
- timeoutMs
- );
- final long addStartTimeMs = time.milliseconds();
- for (long elapsedMs = time.milliseconds() - addStartTimeMs;
- !isTerminal() && elapsedMs < timeoutMs && bufferedRecords() >= maxBufferedRecords;
- elapsedMs = time.milliseconds() - addStartTimeMs) {
- try {
- wait(timeoutMs - elapsedMs);
- } catch (InterruptedException e) {
- throw new ConnectException(e);
- }
- }
- throwIfTerminal();
- if (bufferedRecords() >= maxBufferedRecords) {
- throw new ConnectException("Add timeout expired before buffer availability");
- }
- log.debug(
- "Adding record to queue after waiting {} ms",
- time.milliseconds() - addStartTimeMs
- );
- } else {
- log.trace("Adding record to queue");
- }
-
- unsentRecords.addLast(record);
- addRecordToReport(record, original);
- notifyAll();
- }
-
- /**
- * Request a flush and block upto {@code timeoutMs} until all pending records have been flushed.
- *
- *
If any task has failed prior to or during the flush, {@link ConnectException} will be
- * thrown with that error.
- */
- public void flush(long timeoutMs) {
- final long flushStartTimeMs = time.milliseconds();
- try {
- flushRequested = true;
- synchronized (this) {
- notifyAll();
- for (long elapsedMs = time.milliseconds() - flushStartTimeMs;
- !isTerminal() && elapsedMs < timeoutMs && bufferedRecords() > 0;
- elapsedMs = time.milliseconds() - flushStartTimeMs) {
- wait(timeoutMs - elapsedMs);
- }
- throwIfTerminal();
- if (bufferedRecords() > 0) {
- throw new ConnectException("Flush timeout expired with unflushed records: "
- + bufferedRecords());
- }
- }
- } catch (InterruptedException e) {
- throw new ConnectException(e);
- } finally {
- flushRequested = false;
- }
- log.debug("Flushed bulk processor (total time={} ms)", time.milliseconds() - flushStartTimeMs);
- }
-
- private void addRecordToReport(R record, SinkRecord original) {
- if (reporter != null) {
- // avoid unnecessary operations if not using the reporter
- recordsToReportOnError.put(record, original);
- }
- }
-
- private void removeReportedRecords(List batch) {
- if (reporter != null) {
- // avoid unnecessary operations if not using the reporter
- recordsToReportOnError.keySet().removeAll(batch);
- }
- }
-
- private static final class BulkProcessorThread extends Thread {
-
- private final LogContext parentContext;
- private final int threadId;
-
- public BulkProcessorThread(
- LogContext parentContext,
- Runnable target,
- int objId,
- int threadId
- ) {
- super(target, String.format("BulkProcessor@%d-%d", objId, threadId));
- this.parentContext = parentContext;
- this.threadId = threadId;
- }
-
- @Override
- public void run() {
- try (LogContext context = parentContext.create("Thread" + threadId)) {
- super.run();
- }
- }
- }
-
- private final class BulkTask implements Callable {
-
- final long batchId = BATCH_ID_GEN.incrementAndGet();
-
- final List batch;
-
- BulkTask(List batch) {
- this.batch = batch;
- }
-
- @Override
- public BulkResponse call() throws Exception {
- final BulkResponse rsp;
- try {
- rsp = execute();
- } catch (Exception e) {
- failAndStop(e);
- throw e;
- }
- onBatchCompletion(batch.size());
- return rsp;
- }
-
- private BulkResponse execute() throws Exception {
- final long startTime = System.currentTimeMillis();
- final B bulkReq;
- try {
- bulkReq = bulkClient.bulkRequest(batch);
- } catch (Exception e) {
- log.error(
- "Failed to create bulk request from batch {} of {} records",
- batchId,
- batch.size(),
- e
- );
- removeReportedRecords(batch);
- throw e;
- }
- final int maxAttempts = maxRetries + 1;
- for (int attempts = 1, retryAttempts = 0; true; ++attempts, ++retryAttempts) {
- boolean retriable = true;
- try {
- log.trace("Executing batch {} of {} records with attempt {}/{}",
- batchId, batch.size(), attempts, maxAttempts);
- final BulkResponse bulkRsp = bulkClient.execute(bulkReq);
- if (bulkRsp.isSucceeded()) {
- if (log.isDebugEnabled()) {
- log.debug(
- "Completed batch {} of {} records with attempt {}/{} in {} ms",
- batchId,
- batch.size(),
- attempts,
- maxAttempts,
- System.currentTimeMillis() - startTime
- );
- }
- removeReportedRecords(batch);
- return bulkRsp;
- } else if (responseContainsMalformedDocError(bulkRsp)) {
- retriable = bulkRsp.isRetriable();
- handleMalformedDoc(bulkRsp);
- if (reporter != null) {
- for (R record : batch) {
- SinkRecord original = recordsToReportOnError.get(record);
- BulkResultItem result = bulkRsp.failedRecords.get(record);
- String error = result != null ? result.error : null;
- if (error != null && original != null) {
- reporter.report(
- original, new ReportingException("Bulk request failed: " + error)
- );
- }
- }
- }
- removeReportedRecords(batch);
- return bulkRsp;
- } else {
- // for all other errors, throw the error up
- retriable = bulkRsp.isRetriable();
- throw new ConnectException("Bulk request failed: " + bulkRsp.getErrorInfo());
- }
- } catch (Exception e) {
- if (retriable && attempts < maxAttempts) {
- long sleepTimeMs = RetryUtil.computeRandomRetryWaitTimeInMillis(retryAttempts,
- retryBackoffMs);
- log.warn("Failed to execute batch {} of {} records with attempt {}/{}, "
- + "will attempt retry after {} ms. Failure reason: {}",
- batchId, batch.size(), attempts, maxAttempts, sleepTimeMs, e.getMessage());
- time.sleep(sleepTimeMs);
- if (Thread.interrupted()) {
- log.error(
- "Retrying batch {} of {} records interrupted after attempt {}/{}",
- batchId, batch.size(), attempts, maxAttempts, e);
- removeReportedRecords(batch);
- throw e;
- }
- } else {
- log.error("Failed to execute batch {} of {} records after total of {} attempt(s)",
- batchId, batch.size(), attempts, e);
- removeReportedRecords(batch);
- throw e;
- }
- }
- }
- }
-
- private void handleMalformedDoc(BulkResponse bulkRsp) {
- // if the elasticsearch request failed because of a malformed document,
- // the behavior is configurable.
- switch (behaviorOnMalformedDoc) {
- case IGNORE:
- log.debug("Encountered an illegal document error when executing batch {} of {}"
- + " records. Ignoring and will not index record. Error was {}",
- batchId, batch.size(), bulkRsp.getErrorInfo());
- return;
- case WARN:
- log.warn("Encountered an illegal document error when executing batch {} of {}"
- + " records. Ignoring and will not index record. Error was {}",
- batchId, batch.size(), bulkRsp.getErrorInfo());
- return;
- case FAIL:
- log.error("Encountered an illegal document error when executing batch {} of {}"
- + " records. Error was {} (to ignore future records like this"
- + " change the configuration property '{}' from '{}' to '{}').",
- batchId, batch.size(), bulkRsp.getErrorInfo(),
- ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG,
- BehaviorOnMalformedDoc.FAIL,
- BehaviorOnMalformedDoc.IGNORE);
- throw new ConnectException("Bulk request failed: " + bulkRsp.getErrorInfo());
- default:
- throw new RuntimeException(String.format(
- "Unknown value for %s enum: %s",
- BehaviorOnMalformedDoc.class.getSimpleName(),
- behaviorOnMalformedDoc
- ));
- }
- }
- }
-
- private boolean responseContainsMalformedDocError(BulkResponse bulkRsp) {
- return bulkRsp.getErrorInfo().contains("mapper_parsing_exception")
- || bulkRsp.getErrorInfo().contains("illegal_argument_exception")
- || bulkRsp.getErrorInfo().contains("action_request_validation_exception");
- }
-
- private synchronized void onBatchCompletion(int batchSize) {
- inFlightRecords -= batchSize;
- assert inFlightRecords >= 0;
- notifyAll();
- }
-
- private void failAndStop(Throwable t) {
- error.compareAndSet(null, toConnectException(t));
- stop();
- }
-
- /**
- * @return sum of unsent and in-flight record counts
- */
- public synchronized int bufferedRecords() {
- return unsentRecords.size() + inFlightRecords;
- }
-
- private static ConnectException toConnectException(Throwable t) {
- if (t instanceof ConnectException) {
- return (ConnectException) t;
- } else {
- return new ConnectException(t);
- }
- }
-
- public enum BehaviorOnMalformedDoc {
- IGNORE,
- WARN,
- FAIL;
-
- public static final BehaviorOnMalformedDoc DEFAULT = FAIL;
-
- // Want values for "behavior.on.malformed.doc" property to be case-insensitive
- public static final ConfigDef.Validator VALIDATOR = new ConfigDef.Validator() {
- private final ConfigDef.ValidString validator = ConfigDef.ValidString.in(names());
-
- @Override
- public void ensureValid(String name, Object value) {
- if (value instanceof String) {
- value = ((String) value).toLowerCase(Locale.ROOT);
- }
- validator.ensureValid(name, value);
- }
-
- // Overridden here so that ConfigDef.toEnrichedRst shows possible values correctly
- @Override
- public String toString() {
- return validator.toString();
- }
-
- };
-
- public static String[] names() {
- BehaviorOnMalformedDoc[] behaviors = values();
- String[] result = new String[behaviors.length];
-
- for (int i = 0; i < behaviors.length; i++) {
- result[i] = behaviors[i].toString();
- }
-
- return result;
- }
-
- @Override
- public String toString() {
- return name().toLowerCase(Locale.ROOT);
- }
- }
-
- /**
- * Exception that hides the stack trace used for reporting errors from Elasticsearch
- * (mapper_parser_exception, illegal_argument_exception, and action_request_validation_exception)
- * resulting from bad records using the AK 2.6 reporter DLQ interface because the error did not
- * come from that line due to multithreading.
- */
- @SuppressWarnings("serial")
- public static class ReportingException extends RuntimeException {
-
- public ReportingException(String message) {
- super(message);
- }
-
- /**
- * This method is overriden to swallow the stack trace.
- *
- * @return Throwable
- */
- @Override
- public synchronized Throwable fillInStackTrace() {
- return this;
- }
- }
-}
diff --git a/src/main/java/io/confluent/connect/elasticsearch/bulk/BulkRequest.java b/src/main/java/io/confluent/connect/elasticsearch/bulk/BulkRequest.java
deleted file mode 100644
index 4bfc59765..000000000
--- a/src/main/java/io/confluent/connect/elasticsearch/bulk/BulkRequest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2018 Confluent Inc.
- *
- * Licensed under the Confluent Community License (the "License"); you may not use
- * this file except in compliance with the License. You may obtain a copy of the
- * License at
- *
- * http://www.confluent.io/confluent-community-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package io.confluent.connect.elasticsearch.bulk;
-
-
-import java.util.List;
-
-/**
- * BulkRequest is a marker interface for use with
- * {@link io.confluent.connect.elasticsearch.ElasticsearchClient#createBulkRequest(List)} and
- * {@link io.confluent.connect.elasticsearch.ElasticsearchClient#executeBulk(BulkRequest)}.
- * Implementations will typically hold state comprised of instances of classes that are
- * specific to the client library.
- */
-public interface BulkRequest {
-}
diff --git a/src/main/java/io/confluent/connect/elasticsearch/bulk/BulkResponse.java b/src/main/java/io/confluent/connect/elasticsearch/bulk/BulkResponse.java
deleted file mode 100644
index 6d5ca5902..000000000
--- a/src/main/java/io/confluent/connect/elasticsearch/bulk/BulkResponse.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright 2018 Confluent Inc.
- *
- * Licensed under the Confluent Community License (the "License"); you may not use
- * this file except in compliance with the License. You may obtain a copy of the
- * License at
- *
- * http://www.confluent.io/confluent-community-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package io.confluent.connect.elasticsearch.bulk;
-
-import io.confluent.connect.elasticsearch.IndexableRecord;
-import io.searchbox.core.BulkResult.BulkResultItem;
-import java.util.Collections;
-import java.util.Map;
-
-public class BulkResponse {
-
- private static final BulkResponse SUCCESS_RESPONSE =
- new BulkResponse(true, false, "", Collections.emptyMap());
-
- public final boolean succeeded;
- public final boolean retriable;
- public final String errorInfo;
- public final Map failedRecords;
-
- /**
- * Creates a BulkResponse.
- * @param succeeded whether the bulk request was successful or not.
- * @param retriable whether the bulk request should be retried.
- * @param errorInfo the error string
- * @param failedRecords map of failed records and their results. Never null.
- */
- private BulkResponse(
- boolean succeeded,
- boolean retriable,
- String errorInfo,
- Map failedRecords
- ) {
- this.succeeded = succeeded;
- this.retriable = retriable;
- this.errorInfo = errorInfo;
- this.failedRecords = failedRecords;
- }
-
- public static BulkResponse success() {
- return SUCCESS_RESPONSE;
- }
-
- /**
- * Creates a failed BulkResponse.
- * @param retriable whether the error is retriable
- * @param errorInfo the error string
- * @param failedRecords map of failed records and their results. Never null.
- * @return
- */
- public static BulkResponse failure(
- boolean retriable,
- String errorInfo,
- Map failedRecords
- ) {
- return new BulkResponse(false, retriable, errorInfo, failedRecords);
- }
-
- public boolean isSucceeded() {
- return succeeded;
- }
-
- public boolean isRetriable() {
- return retriable;
- }
-
- public String getErrorInfo() {
- return errorInfo;
- }
-
- @Override
- public String toString() {
- return "BulkResponse{"
- + "succeeded=" + succeeded
- + ", retriable=" + retriable
- + ", errorInfo='" + (errorInfo == null ? "" : errorInfo) + '\''
- + '}';
- }
-}
diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/JestBulkRequest.java b/src/main/java/io/confluent/connect/elasticsearch/jest/JestBulkRequest.java
deleted file mode 100644
index e8270f219..000000000
--- a/src/main/java/io/confluent/connect/elasticsearch/jest/JestBulkRequest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2018 Confluent Inc.
- *
- * Licensed under the Confluent Community License (the "License"); you may not use
- * this file except in compliance with the License. You may obtain a copy of the
- * License at
- *
- * http://www.confluent.io/confluent-community-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package io.confluent.connect.elasticsearch.jest;
-
-import io.confluent.connect.elasticsearch.IndexableRecord;
-import io.confluent.connect.elasticsearch.bulk.BulkRequest;
-import io.searchbox.core.Bulk;
-import java.util.List;
-
-public class JestBulkRequest implements BulkRequest {
-
- private final Bulk bulk;
- private final List records;
-
- public JestBulkRequest(Bulk bulk, List records) {
- this.bulk = bulk;
- this.records = records;
- }
-
- public Bulk getBulk() {
- return bulk;
- }
-
- public List records() {
- return records;
- }
-}
diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java
deleted file mode 100644
index e6a2a79d4..000000000
--- a/src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java
+++ /dev/null
@@ -1,716 +0,0 @@
-/*
- * Copyright 2018 Confluent Inc.
- *
- * Licensed under the Confluent Community License (the "License"); you may not use
- * this file except in compliance with the License. You may obtain a copy of the
- * License at
- *
- * http://www.confluent.io/confluent-community-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package io.confluent.connect.elasticsearch.jest;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.gson.JsonObject;
-import io.confluent.connect.elasticsearch.ElasticsearchClient;
-import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
-import io.confluent.connect.elasticsearch.IndexableRecord;
-import io.confluent.connect.elasticsearch.Key;
-import io.confluent.connect.elasticsearch.Mapping;
-import io.confluent.connect.elasticsearch.RetryUtil;
-import io.confluent.connect.elasticsearch.bulk.BulkRequest;
-import io.confluent.connect.elasticsearch.bulk.BulkResponse;
-import io.confluent.connect.elasticsearch.jest.actions.PortableJestCreateIndexBuilder;
-import io.confluent.connect.elasticsearch.jest.actions.PortableJestGetMappingBuilder;
-import io.confluent.connect.elasticsearch.jest.actions.PortableJestPutMappingBuilder;
-import io.searchbox.action.Action;
-import io.searchbox.action.BulkableAction;
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestClientFactory;
-import io.searchbox.client.JestResult;
-import io.searchbox.client.config.HttpClientConfig;
-import io.searchbox.cluster.NodesInfo;
-import io.searchbox.core.Bulk;
-import io.searchbox.core.BulkResult;
-import io.searchbox.core.BulkResult.BulkResultItem;
-import io.searchbox.core.Delete;
-import io.searchbox.core.DocumentResult;
-import io.searchbox.core.Index;
-import io.searchbox.core.Search;
-import io.searchbox.core.SearchResult;
-import io.searchbox.core.Update;
-import io.searchbox.indices.CreateIndex;
-import io.searchbox.indices.DeleteIndex;
-import io.searchbox.indices.IndicesExists;
-import io.searchbox.indices.Refresh;
-import io.searchbox.indices.mapping.PutMapping;
-import java.util.HashMap;
-import java.util.Objects;
-import javax.net.ssl.HostnameVerifier;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
-import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.security.ssl.SslFactory;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import javax.net.ssl.SSLContext;
-
-public class JestElasticsearchClient implements ElasticsearchClient {
- private static final Logger log = LoggerFactory.getLogger(JestElasticsearchClient.class);
-
- // visible for testing
- protected static final String MAPPER_PARSE_EXCEPTION
- = "mapper_parse_exception";
- protected static final String VERSION_CONFLICT_ENGINE_EXCEPTION
- = "version_conflict_engine_exception";
- protected static final String ALL_FIELD_PARAM
- = "_all";
- protected static final String RESOURCE_ALREADY_EXISTS_EXCEPTION
- = "resource_already_exists_exception";
-
- private static final Logger LOG = LoggerFactory.getLogger(JestElasticsearchClient.class);
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
- private final JestClient client;
- private final Version version;
- private long timeout;
- private WriteMethod writeMethod = WriteMethod.DEFAULT;
-
- private final Set indexCache = new HashSet<>();
-
- private int maxRetries;
- private long retryBackoffMs;
- private final Time time = new SystemTime();
- private int retryOnConflict;
-
- // visible for testing
- public JestElasticsearchClient(JestClient client) {
- try {
- this.client = client;
- this.version = getServerVersion();
- } catch (IOException e) {
- throw new ConnectException(
- "Couldn't start ElasticsearchSinkTask due to connection error:",
- e
- );
- }
- }
-
- // visible for testing
- public JestElasticsearchClient(String address) {
- try {
- JestClientFactory factory = new JestClientFactory();
- factory.setHttpClientConfig(new HttpClientConfig.Builder(address)
- .multiThreaded(true)
- .build()
- );
- this.client = factory.getObject();
- this.version = getServerVersion();
- } catch (IOException e) {
- throw new ConnectException(
- "Couldn't start ElasticsearchSinkTask due to connection error:",
- e
- );
- } catch (ConfigException e) {
- throw new ConnectException(
- "Couldn't start ElasticsearchSinkTask due to configuration error:",
- e
- );
- }
- }
-
- public JestElasticsearchClient(Map props) {
- this(props, new JestClientFactory());
- }
-
- // visible for testing
- protected JestElasticsearchClient(Map props, JestClientFactory factory) {
- try {
- ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
- factory.setHttpClientConfig(getClientConfig(config));
- this.client = factory.getObject();
- this.version = getServerVersion();
- this.writeMethod = config.writeMethod();
- this.retryBackoffMs = config.retryBackoffMs();
- this.maxRetries = config.maxRetries();
- this.timeout = config.readTimeoutMs();
- this.retryOnConflict = config.maxInFlightRequests();
- } catch (IOException e) {
- throw new ConnectException(
- "Couldn't start ElasticsearchSinkTask due to connection error:",
- e
- );
- } catch (ConfigException e) {
- throw new ConnectException(
- "Couldn't start ElasticsearchSinkTask due to configuration error:",
- e
- );
- }
- }
-
- // Visible for Testing
- public static HttpClientConfig getClientConfig(ElasticsearchSinkConnectorConfig config) {
-
- Set addresses = config.connectionUrls();
- HttpClientConfig.Builder builder =
- new HttpClientConfig.Builder(addresses)
- .connTimeout(config.connectionTimeoutMs())
- .readTimeout(config.readTimeoutMs())
- .requestCompressionEnabled(config.compression())
- .defaultMaxTotalConnectionPerRoute(config.maxInFlightRequests())
- .maxConnectionIdleTime(config.maxIdleTimeMs(), TimeUnit.MILLISECONDS)
- .multiThreaded(true);
- if (config.isAuthenticatedConnection()) {
- builder.defaultCredentials(config.username(), config.password().value())
- .preemptiveAuthTargetHosts(
- addresses.stream().map(addr -> HttpHost.create(addr)).collect(Collectors.toSet())
- );
- }
-
- configureProxy(config, builder);
-
- if (config.secured()) {
- log.info("Using secured connection to {}", addresses);
- configureSslContext(builder, config);
- } else {
- log.info("Using unsecured connection to {}", addresses);
- }
- return builder.build();
- }
-
- private static void configureProxy(
- ElasticsearchSinkConnectorConfig config,
- HttpClientConfig.Builder builder
- ) {
-
- if (config.isBasicProxyConfigured()) {
- HttpHost proxy = new HttpHost(config.proxyHost(), config.proxyPort());
- builder.proxy(proxy);
-
- if (config.isProxyWithAuthenticationConfigured()) {
-
- CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- if (config.isAuthenticatedConnection()) {
- config.connectionUrls().forEach(
- addr ->
- credentialsProvider.setCredentials(
- new AuthScope(new HttpHost(addr)),
- new UsernamePasswordCredentials(config.username(), config.password().value())
- )
- );
- }
-
- credentialsProvider.setCredentials(
- new AuthScope(proxy),
- new UsernamePasswordCredentials(config.proxyUsername(), config.proxyPassword().value())
- );
-
- builder.credentialsProvider(credentialsProvider);
- }
- }
- }
-
- private static void configureSslContext(
- HttpClientConfig.Builder builder,
- ElasticsearchSinkConnectorConfig config
- ) {
- SslFactory kafkaSslFactory = new SslFactory(Mode.CLIENT, null, false);
- kafkaSslFactory.configure(config.sslConfigs());
-
- SSLContext sslContext;
- try {
- // try AK <= 2.2 first
- sslContext =
- (SSLContext) SslFactory.class.getDeclaredMethod("sslContext").invoke(kafkaSslFactory);
- log.debug("Using AK 2.2 SslFactory methods.");
- } catch (Exception e) {
- // must be running AK 2.3+
- log.debug("Could not find AK 2.2 SslFactory methods. Trying AK 2.3+ methods for SslFactory.");
-
- Object sslEngine;
- try {
- // try AK <= 2.6 second
- sslEngine = SslFactory.class.getDeclaredMethod("sslEngineBuilder").invoke(kafkaSslFactory);
- log.debug("Using AK 2.2-2.5 SslFactory methods.");
-
- } catch (Exception ex) {
- // must be running AK 2.6+
- log.debug(
- "Could not find Ak 2.3-2.5 methods for SslFactory."
- + " Trying AK 2.6+ methods for SslFactory."
- );
- try {
- sslEngine =
- SslFactory.class.getDeclaredMethod("sslEngineFactory").invoke(kafkaSslFactory);
- log.debug("Using AK 2.6+ SslFactory methods.");
- } catch (Exception exc) {
- throw new ConnectException("Failed to find methods for SslFactory.", exc);
- }
- }
-
- try {
- sslContext =
- (SSLContext) sslEngine.getClass().getDeclaredMethod("sslContext").invoke(sslEngine);
- } catch (Exception ex) {
- throw new ConnectException("Could not create SSLContext.", ex);
- }
- }
-
- HostnameVerifier hostnameVerifier = config.shouldDisableHostnameVerification()
- ? (hostname, session) -> true
- : SSLConnectionSocketFactory.getDefaultHostnameVerifier();
-
- // Sync calls
- SSLConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(
- sslContext, hostnameVerifier);
- builder.sslSocketFactory(sslSocketFactory);
-
- // Async calls
- SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext, hostnameVerifier);
- builder.httpsIOSessionStrategy(sessionStrategy);
- }
-
- // visible for testing
- protected void setWriteMethod(WriteMethod writeMethod) {
- this.writeMethod = writeMethod;
- }
-
- /*
- * This method uses the NodesInfo request to get the server version, which is expected to work
- * with all versions of Elasticsearch.
- */
- private Version getServerVersion() throws IOException {
- // Default to newest version for forward compatibility
- Version defaultVersion = Version.ES_V6;
-
- NodesInfo info = new NodesInfo.Builder().addCleanApiParameter("version").build();
- JsonObject result = client.execute(info).getJsonObject();
- if (result == null) {
- LOG.warn("Couldn't get Elasticsearch version (result is null); assuming {}", defaultVersion);
- return defaultVersion;
- }
- if (!result.has("nodes")) {
- LOG.warn("Couldn't get Elasticsearch version from result {} (result has no nodes). "
- + "Assuming {}.", result, defaultVersion);
- return defaultVersion;
- }
-
- checkForError(result);
-
- JsonObject nodesRoot = result.get("nodes").getAsJsonObject();
- if (nodesRoot == null || nodesRoot.entrySet().size() == 0) {
- LOG.warn(
- "Couldn't get Elasticsearch version (response nodesRoot is null or empty); assuming {}",
- defaultVersion
- );
- return defaultVersion;
- }
-
- JsonObject nodeRoot = nodesRoot.entrySet().iterator().next().getValue().getAsJsonObject();
- if (nodeRoot == null) {
- LOG.warn(
- "Couldn't get Elasticsearch version (response nodeRoot is null); assuming {}",
- defaultVersion
- );
- return defaultVersion;
- }
-
- String esVersion = nodeRoot.get("version").getAsString();
- Version version;
- if (esVersion == null) {
- version = defaultVersion;
- LOG.warn(
- "Couldn't get Elasticsearch version (response version is null); assuming {}",
- version
- );
- } else if (esVersion.startsWith("1.")) {
- version = Version.ES_V1;
- log.info("Detected Elasticsearch version is {}", version);
- } else if (esVersion.startsWith("2.")) {
- version = Version.ES_V2;
- log.info("Detected Elasticsearch version is {}", version);
- } else if (esVersion.startsWith("5.")) {
- version = Version.ES_V5;
- log.info("Detected Elasticsearch version is {}", version);
- } else if (esVersion.startsWith("6.")) {
- version = Version.ES_V6;
- log.info("Detected Elasticsearch version is {}", version);
- } else if (esVersion.startsWith("7.")) {
- version = Version.ES_V7;
- log.info("Detected Elasticsearch version is {}", version);
- } else {
- version = defaultVersion;
- log.info("Detected unexpected Elasticsearch version {}, using {}", esVersion, version);
- }
- return version;
- }
-
- private void checkForError(JsonObject result) {
- if (result.has("error") && result.get("error").isJsonObject()) {
- final JsonObject errorObject = result.get("error").getAsJsonObject();
- String errorType = errorObject.has("type") ? errorObject.get("type").getAsString() : "";
- String errorReason = errorObject.has("reason") ? errorObject.get("reason").getAsString() : "";
- throw new ConnectException("Couldn't connect to Elasticsearch, error: "
- + errorType + ", reason: " + errorReason);
- }
- }
-
- public Version getVersion() {
- return version;
- }
-
- private boolean indexExists(String index) {
- if (indexCache.contains(index)) {
- return true;
- }
- Action> action = new IndicesExists.Builder(index).build();
- try {
- log.info("Index '{}' not found in local cache; checking for existence", index);
- JestResult result = client.execute(action);
- log.debug("Received response for checking existence of index '{}'", index);
- boolean exists = result.isSucceeded();
- if (exists) {
- indexCache.add(index);
- log.info("Index '{}' exists in Elasticsearch; adding to local cache", index);
- } else {
- log.info("Index '{}' not found in Elasticsearch. Error message: {}",
- index, result.getErrorMessage());
- }
- return exists;
- } catch (IOException e) {
- throw new ConnectException(e);
- }
- }
-
- public void createIndices(Set indices) {
- log.trace("Attempting to discover or create indexes in Elasticsearch: {}", indices);
- for (String index : indices) {
- if (!indexExists(index)) {
- final int maxAttempts = maxRetries + 1;
- int attempts = 1;
- CreateIndex createIndex =
- new PortableJestCreateIndexBuilder(index, version, timeout).build();
- boolean indexed = false;
- while (!indexed) {
- try {
- createIndex(index, createIndex);
- indexed = true;
- } catch (ConnectException e) {
- if (attempts < maxAttempts) {
- long sleepTimeMs = RetryUtil.computeRandomRetryWaitTimeInMillis(attempts - 1,
- retryBackoffMs);
- log.warn("Failed to create index {} with attempt {}/{}, "
- + "will attempt retry after {} ms. Failure reason: {}",
- index, attempts, maxAttempts, sleepTimeMs, e.getMessage());
- time.sleep(sleepTimeMs);
- } else {
- throw e;
- }
- attempts++;
- }
- }
- }
- }
- }
-
- private void createIndex(String index, CreateIndex createIndex) throws ConnectException {
- try {
- log.info("Requesting Elasticsearch create index '{}'", index);
- JestResult result = client.execute(createIndex);
- log.debug("Received response for request to create index '{}'", index);
- if (!result.isSucceeded()) {
- boolean exists = result.getErrorMessage().contains(RESOURCE_ALREADY_EXISTS_EXCEPTION)
- || indexExists(index);
-
- // Check if index was created by another client
- if (!exists) {
- String msg =
- result.getErrorMessage() != null ? ": " + result.getErrorMessage() : "";
- throw new ConnectException("Could not create index '" + index + "'" + msg);
- }
- log.info("Index '{}' exists in Elasticsearch; adding to local cache", index);
- } else {
- log.info("Index '{}' created in Elasticsearch; adding to local cache", index);
- }
- indexCache.add(index);
- } catch (IOException e) {
- throw new ConnectException(e);
- }
- }
-
- public void createMapping(String index, String type, Schema schema) throws IOException {
- ObjectNode obj = JsonNodeFactory.instance.objectNode();
- obj.set(type, Mapping.inferMapping(this, schema));
- PutMapping putMapping = new PortableJestPutMappingBuilder(index, type, obj.toString(), version)
- .build();
- log.info("Submitting put mapping (type={}) for index '{}' and schema {}", type, index, schema);
- JestResult result = client.execute(putMapping);
- if (!result.isSucceeded()) {
- throw new ConnectException(
- "Cannot create mapping " + obj + " -- " + result.getErrorMessage()
- );
- }
- log.info("Completed put mapping (type={}) for index '{}' and schema {}", type, index, schema);
- }
-
- /**
- * Get the JSON mapping for given index and type. Returns {@code null} if it does not exist.
- */
- public JsonObject getMapping(String index, String type) throws IOException {
- log.info("Get mapping (type={}) for index '{}'", type, index);
- final JestResult result = client.execute(
- new PortableJestGetMappingBuilder(version)
- .addIndex(index)
- .addType(type)
- .build()
- );
- final JsonObject indexRoot = result.getJsonObject().getAsJsonObject(index);
- if (indexRoot == null) {
- log.debug("Received null (root) mapping (type={}) for index '{}'", type, index);
- return null;
- }
- final JsonObject mappingsJson = indexRoot.getAsJsonObject("mappings");
- if (mappingsJson == null) {
- log.debug("Received null mapping (type={}) for index '{}'", type, index);
- return null;
- }
- log.debug("Received mapping (type={}) for index '{}'", type, index);
- return mappingsJson.getAsJsonObject(type);
- }
-
- /**
- * Delete all indexes in Elasticsearch (useful for test)
- */
- // For testing purposes
- public void deleteAll() throws IOException {
- log.info("Request deletion of all indexes");
- final JestResult result = client.execute(new DeleteIndex
- .Builder(ALL_FIELD_PARAM)
- .build());
- if (result.isSucceeded()) {
- log.info("Deletion of all indexes succeeded");
- } else {
- String msg = result.getErrorMessage() != null ? ": " + result.getErrorMessage() : "";
- log.warn("Could not delete all indexes: {}", msg);
- }
- }
-
- /**
- * Refresh all data in elasticsearch, making it available for search (useful for testing)
- */
- // For testing purposes
- public void refresh() throws IOException {
- log.info("Request refresh");
- final JestResult result = client.execute(
- new Refresh.Builder().build()
- );
- if (result.isSucceeded()) {
- log.info("Refresh completed");
- } else {
- String msg = result.getErrorMessage() != null ? ": " + result.getErrorMessage() : "";
- log.warn("Could not refresh: {}", msg);
- }
- }
-
- public BulkRequest createBulkRequest(List batch) {
- final Bulk.Builder builder = new Bulk.Builder();
- for (IndexableRecord record : batch) {
- builder.addAction(toBulkableAction(record));
- }
- return new JestBulkRequest(builder.build(), batch);
- }
-
- // visible for testing
- protected BulkableAction toBulkableAction(IndexableRecord record) {
- // If payload is null, the record was a tombstone and we should delete from the index.
- if (record.payload == null) {
- return toDeleteRequest(record);
- }
- return writeMethod == WriteMethod.INSERT
- ? toIndexRequest(record)
- : toUpdateRequest(record);
- }
-
- private Delete toDeleteRequest(IndexableRecord record) {
- Delete.Builder req = new Delete.Builder(record.key.id)
- .index(record.key.index)
- .type(record.key.type);
-
- // TODO: Should version information be set here?
- return req.build();
- }
-
- private Index toIndexRequest(IndexableRecord record) {
- Index.Builder req = new Index.Builder(record.payload)
- .index(record.key.index)
- .type(record.key.type)
- .id(record.key.id);
- if (record.version != null) {
- req.setParameter("version_type", "external").setParameter("version", record.version);
- }
- return req.build();
- }
-
- private Update toUpdateRequest(IndexableRecord record) {
- String payload = "{\"doc\":" + record.payload
- + ", \"doc_as_upsert\":true}";
- return new Update.Builder(payload)
- .index(record.key.index)
- .type(record.key.type)
- .id(record.key.id)
- .setParameter("retry_on_conflict", retryOnConflict)
- .build();
- }
-
- public BulkResponse executeBulk(BulkRequest bulk) throws IOException {
- final BulkResult result = client.execute(((JestBulkRequest) bulk).getBulk());
-
- if (result.isSucceeded()) {
- return BulkResponse.success();
- }
- log.debug("Bulk request failed; collecting error(s)");
-
- boolean retriable = true;
-
- final List versionConflicts = new ArrayList<>();
- final List errors = new ArrayList<>();
-
- for (BulkResult.BulkResultItem item : result.getItems()) {
- if (item.error != null) {
- final ObjectNode parsedError = (ObjectNode) OBJECT_MAPPER.readTree(item.error);
- final String errorType = parsedError.get("type").asText("");
- if ("version_conflict_engine_exception".equals(errorType)) {
- versionConflicts.add(new Key(item.index, item.type, item.id));
- } else if ("mapper_parse_exception".equals(errorType)) {
- retriable = false;
- errors.add(item.error);
- } else {
- errors.add(item.error);
- }
- }
- }
-
- if (!versionConflicts.isEmpty()) {
- LOG.warn("Ignoring version conflicts for items: {}", versionConflicts);
- if (errors.isEmpty()) {
- // The only errors were version conflicts
- return BulkResponse.success();
- }
- }
-
- final String errorInfo = errors.isEmpty() ? result.getErrorMessage() : errors.toString();
-
- Map failedResponses = new HashMap<>();
- List items = result.getItems();
- List records = ((JestBulkRequest) bulk).records();
- for (int i = 0; i < items.size() && i < records.size() ; i++) {
- BulkResultItem item = items.get(i);
- IndexableRecord record = records.get(i);
- if (item.error != null && Objects.equals(item.id, record.key.id)) {
- // sanity check matching IDs
- failedResponses.put(record, item);
- }
- }
-
- if (items.size() != records.size()) {
- log.error(
- "Elasticsearch bulk response size ({}) does not correspond to records sent ({})",
- ((JestBulkRequest) bulk).records().size(),
- items.size()
- );
- }
-
- return BulkResponse.failure(retriable, errorInfo, failedResponses);
- }
-
- // For testing purposes
- public JsonObject search(String query, String index, String type) throws IOException {
- final Search.Builder search = new Search.Builder(query);
- if (index != null) {
- search.addIndex(index);
- }
- if (type != null) {
- search.addType(type);
- }
-
- log.info("Executing search on index '{}' (type={}): {}", index, type, query);
- final SearchResult result = client.execute(search.build());
- if (result.isSucceeded()) {
- log.info("Executing search succeeded: {}", result);
- } else {
- String msg = result.getErrorMessage() != null ? ": " + result.getErrorMessage() : "";
- log.warn("Failed to execute search: {}", msg);
- }
- return result.getJsonObject();
- }
-
- public void close() {
- try {
- log.debug("Closing Elasticsearch client");
- client.close();
- } catch (IOException e) {
- LOG.error("Exception while closing the JEST client", e);
- }
- }
-
- public enum WriteMethod {
- INSERT,
- UPSERT,
- ;
-
- public static final WriteMethod DEFAULT = INSERT;
- public static final ConfigDef.Validator VALIDATOR = new ConfigDef.Validator() {
- private final ConfigDef.ValidString validator = ConfigDef.ValidString.in(names());
-
- @Override
- public void ensureValid(String name, Object value) {
- validator.ensureValid(name, value);
- }
-
- // Overridden here so that ConfigDef.toEnrichedRst shows possible values correctly
- @Override
- public String toString() {
- return "One of " + INSERT.toString() + " or " + UPSERT.toString();
- }
-
- };
-
- public static String[] names() {
- return new String[] {INSERT.toString(), UPSERT.toString()};
- }
-
- @Override
- public String toString() {
- return name().toLowerCase(Locale.ROOT);
- }
- }
-}
diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/actions/PortableJestCreateIndexBuilder.java b/src/main/java/io/confluent/connect/elasticsearch/jest/actions/PortableJestCreateIndexBuilder.java
deleted file mode 100644
index dc980a82a..000000000
--- a/src/main/java/io/confluent/connect/elasticsearch/jest/actions/PortableJestCreateIndexBuilder.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2018 Confluent Inc.
- *
- * Licensed under the Confluent Community License (the "License"); you may not use
- * this file except in compliance with the License. You may obtain a copy of the
- * License at
- *
- * http://www.confluent.io/confluent-community-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package io.confluent.connect.elasticsearch.jest.actions;
-
-import io.confluent.connect.elasticsearch.ElasticsearchClient.Version;
-import io.searchbox.indices.CreateIndex;
-
-/**
- * Portable Jest action builder, across ES versions, to create indexes.
- * This builder add support for ES version 7 by keeping the type support still enabled, this is
- * done by passing the include_type_name parameter. This parameter is no longer required with ES 8,
- * as types should not be used anymore by the time of ES 8 release.
- */
-public class PortableJestCreateIndexBuilder extends CreateIndex.Builder {
-
- public static final String INCLUDE_TYPE_NAME_PARAM = "include_type_name";
- public static final String TIMEOUT_PARAM = "timeout";
-
- private final Version version;
- private final long timeout;
-
- public PortableJestCreateIndexBuilder(String index, Version version, long timeout) {
- super(index);
- this.version = version;
- this.timeout = timeout;
- }
-
- @Override
- public CreateIndex build() {
- if (version.equals(Version.ES_V7)) {
- setParameter(INCLUDE_TYPE_NAME_PARAM, true);
- }
- if (timeout > 0) {
- setParameter(TIMEOUT_PARAM, timeout + "ms");
- }
-
- return super.build();
- }
-}
diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/actions/PortableJestGetMappingBuilder.java b/src/main/java/io/confluent/connect/elasticsearch/jest/actions/PortableJestGetMappingBuilder.java
deleted file mode 100644
index b26fe833b..000000000
--- a/src/main/java/io/confluent/connect/elasticsearch/jest/actions/PortableJestGetMappingBuilder.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright 2018 Confluent Inc.
- *
- * Licensed under the Confluent Community License (the "License"); you may not use
- * this file except in compliance with the License. You may obtain a copy of the
- * License at
- *
- * http://www.confluent.io/confluent-community-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package io.confluent.connect.elasticsearch.jest.actions;
-
-import io.confluent.connect.elasticsearch.ElasticsearchClient.Version;
-import io.searchbox.indices.mapping.GetMapping;
-
-/**
- * Portable Jest action builder to put a new mapping.
- * This builder add support for ES version 7 by keeping the type support still enabled, this is
- * done by passing the include_type_name parameter. This parameter is no longer required with ES 8,
- * as types should not be used anymore by the time of ES 8 release.
- */
-public class PortableJestGetMappingBuilder extends GetMapping.Builder {
-
- public static final String INCLUDE_TYPE_NAME_PARAM = "include_type_name";
-
- private final Version version;
-
- public PortableJestGetMappingBuilder(Version version) {
- this.version = version;
- }
-
- @Override
- public GetMapping build() {
- if (version.equals(Version.ES_V7)) {
- setParameter(INCLUDE_TYPE_NAME_PARAM, true);
- }
- return super.build();
- }
-}
diff --git a/src/main/java/io/confluent/connect/elasticsearch/jest/actions/PortableJestPutMappingBuilder.java b/src/main/java/io/confluent/connect/elasticsearch/jest/actions/PortableJestPutMappingBuilder.java
deleted file mode 100644
index 5e08ef337..000000000
--- a/src/main/java/io/confluent/connect/elasticsearch/jest/actions/PortableJestPutMappingBuilder.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2018 Confluent Inc.
- *
- * Licensed under the Confluent Community License (the "License"); you may not use
- * this file except in compliance with the License. You may obtain a copy of the
- * License at
- *
- * http://www.confluent.io/confluent-community-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package io.confluent.connect.elasticsearch.jest.actions;
-
-import io.confluent.connect.elasticsearch.ElasticsearchClient.Version;
-import io.searchbox.indices.mapping.PutMapping;
-
-/**
- * Portable Jest action builder to put a new mapping.
- * This builder add support for ES version 7 by keeping the type support still enabled, this is
- * done by passing the include_type_name parameter. This parameter is no longer required with ES 8,
- * as types should not be used anymore by the time of ES 8 release.
- */
-public class PortableJestPutMappingBuilder extends PutMapping.Builder {
-
- public static final String INCLUDE_TYPE_NAME_PARAM = "include_type_name";
-
- private final Version version;
-
- public PortableJestPutMappingBuilder(String index, String type, Object source, Version version) {
- super(index, type, source);
- this.version = version;
- }
-
- @Override
- public PutMapping build() {
- if (version.equals(Version.ES_V7)) {
- setParameter(INCLUDE_TYPE_NAME_PARAM, true);
- }
- return super.build();
- }
-}
diff --git a/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java b/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java
index 469eaa023..6d46e6555 100644
--- a/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java
+++ b/src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java
@@ -15,9 +15,12 @@
package io.confluent.connect.elasticsearch;
+import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues;
import org.apache.kafka.connect.data.*;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
import org.junit.Before;
import org.junit.Test;
@@ -29,8 +32,8 @@
import java.util.List;
import java.util.Map;
-import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;
-
+import static io.confluent.connect.elasticsearch.DataConverter.MAP_KEY;
+import static io.confluent.connect.elasticsearch.DataConverter.MAP_VALUE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
@@ -38,23 +41,27 @@
public class DataConverterTest {
private DataConverter converter;
+ private Map props;
+
private String key;
private String topic;
private int partition;
private long offset;
private String index;
- private String type;
private Schema schema;
@Before
public void setUp() {
- converter = new DataConverter(true, BehaviorOnNullValues.DEFAULT);
+ props = ElasticsearchSinkConnectorConfigTest.addNecessaryProps(new HashMap<>());
+ props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "true");
+ props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true");
+
+ converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));
key = "key";
topic = "topic";
partition = 0;
offset = 0;
index = "index";
- type = "type";
schema = SchemaBuilder
.struct()
.name("struct")
@@ -150,8 +157,8 @@ public void map() {
assertEquals(
SchemaBuilder.array(
SchemaBuilder.struct().name(Schema.INT32_SCHEMA.type().name() + "-" + Decimal.LOGICAL_NAME)
- .field(ElasticsearchSinkConnectorConstants.MAP_KEY, Schema.INT32_SCHEMA)
- .field(ElasticsearchSinkConnectorConstants.MAP_VALUE, Schema.FLOAT64_SCHEMA)
+ .field(MAP_KEY, Schema.INT32_SCHEMA)
+ .field(MAP_VALUE, Schema.FLOAT64_SCHEMA)
.build()
).build(),
preProcessedSchema
@@ -163,11 +170,11 @@ public void map() {
assertEquals(
new HashSet<>(Arrays.asList(
new Struct(preProcessedSchema.valueSchema())
- .put(ElasticsearchSinkConnectorConstants.MAP_KEY, 1)
- .put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 0.02),
+ .put(MAP_KEY, 1)
+ .put(MAP_VALUE, 0.02),
new Struct(preProcessedSchema.valueSchema())
- .put(ElasticsearchSinkConnectorConstants.MAP_KEY, 2)
- .put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 0.42)
+ .put(MAP_KEY, 2)
+ .put(MAP_VALUE, 0.42)
)),
new HashSet<>((List>) converter.preProcessValue(origValue, origSchema, preProcessedSchema))
);
@@ -194,14 +201,15 @@ public void stringKeyedMapNonCompactFormat() {
origValue.put("field2", 2);
// Use the older non-compact format for map entries with string keys
- converter = new DataConverter(false, BehaviorOnNullValues.DEFAULT);
+ props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "false");
+ converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));
Schema preProcessedSchema = converter.preProcessSchema(origSchema);
assertEquals(
SchemaBuilder.array(
SchemaBuilder.struct().name(Schema.STRING_SCHEMA.type().name() + "-" + Schema.INT32_SCHEMA.type().name())
- .field(ElasticsearchSinkConnectorConstants.MAP_KEY, Schema.STRING_SCHEMA)
- .field(ElasticsearchSinkConnectorConstants.MAP_VALUE, Schema.INT32_SCHEMA)
+ .field(MAP_KEY, Schema.STRING_SCHEMA)
+ .field(MAP_VALUE, Schema.INT32_SCHEMA)
.build()
).build(),
preProcessedSchema
@@ -209,11 +217,11 @@ public void stringKeyedMapNonCompactFormat() {
assertEquals(
new HashSet<>(Arrays.asList(
new Struct(preProcessedSchema.valueSchema())
- .put(ElasticsearchSinkConnectorConstants.MAP_KEY, "field1")
- .put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 1),
+ .put(MAP_KEY, "field1")
+ .put(MAP_VALUE, 1),
new Struct(preProcessedSchema.valueSchema())
- .put(ElasticsearchSinkConnectorConstants.MAP_KEY, "field2")
- .put(ElasticsearchSinkConnectorConstants.MAP_VALUE, 2)
+ .put(MAP_KEY, "field2")
+ .put(MAP_VALUE, 2)
)),
new HashSet<>((List>) converter.preProcessValue(origValue, origSchema, preProcessedSchema))
);
@@ -228,7 +236,8 @@ public void stringKeyedMapCompactFormat() {
origValue.put("field2", 2);
// Use the newer compact format for map entries with string keys
- converter = new DataConverter(true, BehaviorOnNullValues.DEFAULT);
+ props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true");
+ converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));
Schema preProcessedSchema = converter.preProcessSchema(origSchema);
assertEquals(
SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(),
@@ -272,7 +281,8 @@ public void optionalFieldsWithoutDefaults() {
testOptionalFieldWithoutDefault(SchemaBuilder.struct().field("innerField", Schema.BOOLEAN_SCHEMA));
testOptionalFieldWithoutDefault(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.BOOLEAN_SCHEMA));
// Have to test maps with useCompactMapEntries set to true and set to false
- converter = new DataConverter(false, BehaviorOnNullValues.DEFAULT);
+ props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "false");
+ converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));
testOptionalFieldWithoutDefault(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.BOOLEAN_SCHEMA));
}
@@ -293,39 +303,104 @@ private void testOptionalFieldWithoutDefault(
@Test
public void ignoreOnNullValue() {
- converter = new DataConverter(true, BehaviorOnNullValues.IGNORE);
+
+ props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true");
+ props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false");
+ props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false");
+ props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.IGNORE.name());
+ converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));
SinkRecord sinkRecord = createSinkRecordWithValue(null);
- assertNull(converter.convertRecord(sinkRecord, index, type, false, false));
+ assertNull(converter.convertRecord(sinkRecord, index));
}
@Test
public void deleteOnNullValue() {
- converter = new DataConverter(true, BehaviorOnNullValues.DELETE);
+ props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true");
+ props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false");
+ props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false");
+ props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.DELETE.name());
+ converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));
+ SinkRecord sinkRecord = createSinkRecordWithValue(null);
+ DeleteRequest actualRecord = (DeleteRequest) converter.convertRecord(sinkRecord, index);
+
+ assertEquals(key, actualRecord.id());
+ assertEquals(index, actualRecord.index());
+ assertEquals(sinkRecord.kafkaOffset(), actualRecord.version());
+ }
+
+ @Test
+ public void externalVersionHeaderOnDelete() {
+ String externalVersionHeader = "version";
+ long expectedExternalVersion = 123l;
+
+ props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true");
+ props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false");
+ props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false");
+ props.put(ElasticsearchSinkConnectorConfig.EXTERNAL_VERSION_HEADER_CONFIG, externalVersionHeader);
+ props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.DELETE.name());
+ converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));
SinkRecord sinkRecord = createSinkRecordWithValue(null);
- IndexableRecord expectedRecord = createIndexableRecordWithPayload(null);
- IndexableRecord actualRecord = converter.convertRecord(sinkRecord, index, type, false, false);
+ sinkRecord.headers().addLong(externalVersionHeader, expectedExternalVersion);
+
+ DeleteRequest actualRecord = (DeleteRequest) converter.convertRecord(sinkRecord, index);
- assertEquals(expectedRecord, actualRecord);
+ assertEquals(key, actualRecord.id());
+ assertEquals(index, actualRecord.index());
+ assertEquals(expectedExternalVersion, actualRecord.version());
+ }
+
+ @Test
+ public void externalVersionHeaderOnIndex() {
+ String externalVersionHeader = "version";
+ long expectedExternalVersion = 123l;
+
+ props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true");
+ props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false");
+ props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false");
+ props.put(ElasticsearchSinkConnectorConfig.EXTERNAL_VERSION_HEADER_CONFIG, externalVersionHeader);
+ props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.DELETE.name());
+ converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));
+
+
+ Schema preProcessedSchema = converter.preProcessSchema(schema);
+ Struct struct = new Struct(preProcessedSchema).put("string", "myValue");
+ SinkRecord sinkRecord = createSinkRecordWithValue(struct);
+ sinkRecord.headers().addLong(externalVersionHeader, expectedExternalVersion);
+
+ IndexRequest actualRecord = (IndexRequest) converter.convertRecord(sinkRecord, index);
+
+ assertEquals(key, actualRecord.id());
+ assertEquals(index, actualRecord.index());
+ assertEquals(expectedExternalVersion, actualRecord.version());
}
@Test
public void ignoreDeleteOnNullValueWithNullKey() {
- converter = new DataConverter(true, BehaviorOnNullValues.DELETE);
+ props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true");
+ props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false");
+ props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false");
+ props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.DELETE.name());
+ converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));
+
key = null;
SinkRecord sinkRecord = createSinkRecordWithValue(null);
- assertNull(converter.convertRecord(sinkRecord, index, type, false, false));
+ assertNull(converter.convertRecord(sinkRecord, index));
}
@Test
public void failOnNullValue() {
- converter = new DataConverter(true, BehaviorOnNullValues.FAIL);
+ props.put(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG, "true");
+ props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "false");
+ props.put(ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG, "false");
+ props.put(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.FAIL.name());
+ converter = new DataConverter(new ElasticsearchSinkConnectorConfig(props));
SinkRecord sinkRecord = createSinkRecordWithValue(null);
try {
- converter.convertRecord(sinkRecord, index, type, false, false);
+ converter.convertRecord(sinkRecord, index);
fail("should fail on null-valued record with behaviorOnNullValues = FAIL");
} catch (DataException e) {
// expected
@@ -335,9 +410,4 @@ public void failOnNullValue() {
public SinkRecord createSinkRecordWithValue(Object value) {
return new SinkRecord(topic, partition, Schema.STRING_SCHEMA, key, schema, value, offset);
}
-
- public IndexableRecord createIndexableRecordWithPayload(String payload) {
- return new IndexableRecord(new Key(index, type, key), payload, offset);
- }
-
}
diff --git a/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchClientTest.java b/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchClientTest.java
new file mode 100644
index 000000000..9dabb00e6
--- /dev/null
+++ b/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchClientTest.java
@@ -0,0 +1,713 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.elasticsearch;
+
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SECURITY_PROTOCOL_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SSL_CONFIG_PREFIX;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnMalformedDoc;
+import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues;
+import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SecurityProtocol;
+import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WriteMethod;
+import io.confluent.connect.elasticsearch.helper.ElasticsearchContainer;
+import io.confluent.connect.elasticsearch.helper.ElasticsearchHelperClient;
+import io.confluent.connect.elasticsearch.helper.NetworkErrorContainer;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.test.TestUtils;
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.index.VersionType;
+import org.elasticsearch.search.SearchHit;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ElasticsearchClientTest {
+
+ private static final String INDEX = "index";
+ private static final String ELASTIC_SUPERUSER_NAME = "elastic";
+ private static final String ELASTIC_SUPERUSER_PASSWORD = "elastic";
+
+ private static ElasticsearchContainer container;
+
+ private DataConverter converter;
+ private ElasticsearchHelperClient helperClient;
+ private ElasticsearchSinkConnectorConfig config;
+ private Map props;
+
+ @BeforeClass
+ public static void setupBeforeAll() {
+ container = ElasticsearchContainer.fromSystemProperties();
+ container.start();
+ }
+
+ @AfterClass
+ public static void cleanupAfterAll() {
+ container.close();
+ }
+
+ @Before
+ public void setup() {
+ props = ElasticsearchSinkConnectorConfigTest.addNecessaryProps(new HashMap<>());
+ props.put(CONNECTION_URL_CONFIG, container.getConnectionUrl());
+ props.put(IGNORE_KEY_CONFIG, "true");
+ props.put(LINGER_MS_CONFIG, "1000");
+ config = new ElasticsearchSinkConnectorConfig(props);
+ converter = new DataConverter(config);
+ helperClient = new ElasticsearchHelperClient(container.getConnectionUrl(), config);
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ if (helperClient != null && helperClient.indexExists(INDEX)){
+ helperClient.deleteIndex(INDEX);
+ }
+ }
+
+ @Test
+ public void testClose() {
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ client.close();
+ }
+
+ @Test
+ public void testCloseFails() throws Exception {
+ props.put(BATCH_SIZE_CONFIG, "1");
+ props.put(MAX_IN_FLIGHT_REQUESTS_CONFIG, "1");
+ ElasticsearchClient client = new ElasticsearchClient(config, null) {
+ @Override
+ public void close() {
+ try {
+ if (!bulkProcessor.awaitClose(1, TimeUnit.MILLISECONDS)) {
+ throw new ConnectException("Failed to process all outstanding requests in time.");
+ }
+ } catch (InterruptedException e) {}
+ }
+ };
+
+ writeRecord(sinkRecord(0), client);
+ assertThrows(
+ "Failed to process all outstanding requests in time.",
+ ConnectException.class,
+ () -> client.close()
+ );
+ waitUntilRecordsInES(1);
+ }
+
+ @Test
+ public void testCreateIndex() throws IOException {
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ assertFalse(helperClient.indexExists(INDEX));
+
+ client.createIndex(INDEX);
+ assertTrue(helperClient.indexExists(INDEX));
+ client.close();
+ }
+
+ @Test
+ public void testDoesNotCreateAlreadyExistingIndex() throws IOException {
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ assertFalse(helperClient.indexExists(INDEX));
+
+ assertTrue(client.createIndex(INDEX));
+ assertTrue(helperClient.indexExists(INDEX));
+
+ assertFalse(client.createIndex(INDEX));
+ assertTrue(helperClient.indexExists(INDEX));
+ client.close();
+ }
+
+ @Test
+ public void testIndexExists() throws IOException {
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ assertFalse(helperClient.indexExists(INDEX));
+
+ assertTrue(client.createIndex(INDEX));
+ assertTrue(client.indexExists(INDEX));
+ client.close();
+ }
+
+ @Test
+ public void testIndexDoesNotExist() throws IOException {
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ assertFalse(helperClient.indexExists(INDEX));
+
+ assertFalse(client.indexExists(INDEX));
+ client.close();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCreateMapping() throws IOException {
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ client.createIndex(INDEX);
+
+ client.createMapping(INDEX, schema());
+
+ assertTrue(client.hasMapping(INDEX));
+
+ Map mapping = helperClient.getMapping(INDEX).sourceAsMap();
+ assertTrue(mapping.containsKey("properties"));
+ Map props = (Map) mapping.get("properties");
+ assertTrue(props.containsKey("offset"));
+ assertTrue(props.containsKey("another"));
+ Map offset = (Map) props.get("offset");
+ assertEquals("integer", offset.get("type"));
+ assertEquals(0, offset.get("null_value"));
+ Map another = (Map) props.get("another");
+ assertEquals("integer", another.get("type"));
+ assertEquals(0, another.get("null_value"));
+ client.close();
+ }
+
+ @Test
+ public void testHasMapping() {
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ client.createIndex(INDEX);
+
+ client.createMapping(INDEX, schema());
+
+ assertTrue(client.hasMapping(INDEX));
+ client.close();
+ }
+
+ @Test
+ public void testDoesNotHaveMapping() {
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ client.createIndex(INDEX);
+
+ assertFalse(client.hasMapping(INDEX));
+ client.close();
+ }
+
+ @Test
+ public void testBuffersCorrectly() throws Exception {
+ props.put(MAX_IN_FLIGHT_REQUESTS_CONFIG, "1");
+ props.put(MAX_BUFFERED_RECORDS_CONFIG, "1");
+ config = new ElasticsearchSinkConnectorConfig(props);
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ client.createIndex(INDEX);
+
+ writeRecord(sinkRecord(0), client);
+ assertEquals(1, client.numRecords.get());
+ client.flush();
+
+ waitUntilRecordsInES(1);
+ assertEquals(1, helperClient.getDocCount(INDEX));
+ assertEquals(0, client.numRecords.get());
+
+ writeRecord(sinkRecord(1), client);
+ assertEquals(1, client.numRecords.get());
+
+ // will block until the previous record is flushed
+ writeRecord(sinkRecord(2), client);
+ assertEquals(1, client.numRecords.get());
+
+ waitUntilRecordsInES(3);
+ client.close();
+ }
+
+ @Test
+ public void testFlush() throws Exception {
+ props.put(LINGER_MS_CONFIG, String.valueOf(TimeUnit.DAYS.toMillis(1)));
+ config = new ElasticsearchSinkConnectorConfig(props);
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ client.createIndex(INDEX);
+
+ writeRecord(sinkRecord(0), client);
+ assertEquals(0, helperClient.getDocCount(INDEX)); // should be empty before flush
+
+ client.flush();
+
+ waitUntilRecordsInES(1);
+ assertEquals(1, helperClient.getDocCount(INDEX));
+ client.close();
+ }
+
+ @Test
+ public void testIndexRecord() throws Exception {
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ client.createIndex(INDEX);
+
+ writeRecord(sinkRecord(0), client);
+ client.flush();
+
+ waitUntilRecordsInES(1);
+ assertEquals(1, helperClient.getDocCount(INDEX));
+ client.close();
+ }
+
+ @Test
+ public void testDeleteRecord() throws Exception {
+ props.put(BEHAVIOR_ON_NULL_VALUES_CONFIG, BehaviorOnNullValues.DELETE.name());
+ props.put(IGNORE_KEY_CONFIG, "false");
+ config = new ElasticsearchSinkConnectorConfig(props);
+ converter = new DataConverter(config);
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ client.createIndex(INDEX);
+
+ writeRecord(sinkRecord("key0", 0), client);
+ writeRecord(sinkRecord("key1", 1), client);
+ client.flush();
+
+ waitUntilRecordsInES(2);
+
+ // delete 1
+ SinkRecord deleteRecord = new SinkRecord(INDEX, 0, Schema.STRING_SCHEMA, "key0", null, null, 3);
+ writeRecord(deleteRecord, client);
+
+ waitUntilRecordsInES(1);
+ client.close();
+ }
+
+ @Test
+ public void testUpsertRecords() throws Exception {
+ props.put(WRITE_METHOD_CONFIG, WriteMethod.UPSERT.name());
+ props.put(IGNORE_KEY_CONFIG, "false");
+ config = new ElasticsearchSinkConnectorConfig(props);
+ converter = new DataConverter(config);
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ client.createIndex(INDEX);
+
+ writeRecord(sinkRecord("key0", 0), client);
+ writeRecord(sinkRecord("key1", 1), client);
+ client.flush();
+
+ waitUntilRecordsInES(2);
+
+ // create modified record for upsert
+ Schema schema = SchemaBuilder
+ .struct()
+ .name("record")
+ .field("offset", SchemaBuilder.int32().defaultValue(0).build())
+ .field("another", SchemaBuilder.int32().defaultValue(0).build())
+ .build();
+
+ Struct value = new Struct(schema).put("offset", 2);
+ SinkRecord upsertRecord = new SinkRecord(INDEX, 0, Schema.STRING_SCHEMA, "key0", schema, value, 2);
+ Struct value2 = new Struct(schema).put("offset", 3);
+ SinkRecord upsertRecord2 = new SinkRecord(INDEX, 0, Schema.STRING_SCHEMA, "key0", schema, value2, 3);
+
+
+ // upsert 2, write another
+ writeRecord(upsertRecord, client);
+ writeRecord(upsertRecord2, client);
+ writeRecord(sinkRecord("key2", 4), client);
+ client.flush();
+
+ waitUntilRecordsInES(3);
+ for (SearchHit hit : helperClient.search(INDEX)) {
+ if (hit.getId().equals("key0")) {
+ assertEquals(3, hit.getSourceAsMap().get("offset"));
+ assertEquals(0, hit.getSourceAsMap().get("another"));
+ }
+ }
+
+ client.close();
+ }
+
+ @Test
+ public void testIgnoreBadRecord() throws Exception {
+ props.put(BEHAVIOR_ON_MALFORMED_DOCS_CONFIG, BehaviorOnMalformedDoc.IGNORE.name());
+ config = new ElasticsearchSinkConnectorConfig(props);
+ converter = new DataConverter(config);
+
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ client.createIndex(INDEX);
+ client.createMapping(INDEX, schema());
+
+ Schema schema = SchemaBuilder
+ .struct()
+ .name("record")
+ .field("not_mapped_field", SchemaBuilder.int32().defaultValue(0).build())
+ .build();
+ Struct value = new Struct(schema).put("not_mapped_field", 420);
+ SinkRecord badRecord = new SinkRecord(INDEX, 0, Schema.STRING_SCHEMA, "key", schema, value, 0);
+
+ writeRecord(sinkRecord(0), client);
+ client.flush();
+
+ writeRecord(badRecord, client);
+ client.flush();
+
+ writeRecord(sinkRecord(1), client);
+ client.flush();
+
+ waitUntilRecordsInES(2);
+ assertEquals(2, helperClient.getDocCount(INDEX));
+ client.close();
+ }
+
+ @Test(expected = ConnectException.class)
+ public void testFailOnBadRecord() throws Exception {
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ client.createIndex(INDEX);
+ client.createMapping(INDEX, schema());
+
+ Schema schema = SchemaBuilder
+ .struct()
+ .name("record")
+ .field("offset", SchemaBuilder.bool().defaultValue(false).build())
+ .build();
+ Struct value = new Struct(schema).put("offset", false);
+ SinkRecord badRecord = new SinkRecord(INDEX, 0, Schema.STRING_SCHEMA, "key", schema, value, 0);
+
+ writeRecord(sinkRecord(0), client);
+ client.flush();
+
+ waitUntilRecordsInES(1);
+ writeRecord(badRecord, client);
+ client.flush();
+
+ // consecutive index calls should cause exception
+ try {
+ for (int i = 0; i < 5; i++) {
+ writeRecord(sinkRecord(i + 1), client);
+ client.flush();
+ waitUntilRecordsInES(i + 2);
+ }
+ } catch (ConnectException e) {
+ client.close();
+ throw e;
+ }
+ }
+
+ @Test
+ public void testRetryRecordsOnFailure() throws Exception {
+ props.put(LINGER_MS_CONFIG, "60000");
+ props.put(BATCH_SIZE_CONFIG, "2");
+ props.put(MAX_RETRIES_CONFIG, "100");
+ props.put(RETRY_BACKOFF_MS_CONFIG, "1000");
+ props.put(MAX_IN_FLIGHT_REQUESTS_CONFIG, "1");
+ config = new ElasticsearchSinkConnectorConfig(props);
+ converter = new DataConverter(config);
+
+
+ // mock bulk processor to throw errors
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ client.createIndex(INDEX);
+
+ // bring down ES service
+ NetworkErrorContainer delay = new NetworkErrorContainer(container.getContainerName());
+ delay.start();
+
+ // attempt a write
+ writeRecord(sinkRecord(0), client);
+ client.flush();
+
+ // keep the ES service down for a couple of timeouts
+ Thread.sleep(config.readTimeoutMs() * 4L);
+
+ // bring up ES service
+ delay.stop();
+
+ waitUntilRecordsInES(1);
+ }
+
+ @Test
+ public void testReporter() throws Exception {
+ props.put(IGNORE_KEY_CONFIG, "false");
+ props.put(BEHAVIOR_ON_MALFORMED_DOCS_CONFIG, BehaviorOnMalformedDoc.IGNORE.name());
+ config = new ElasticsearchSinkConnectorConfig(props);
+ converter = new DataConverter(config);
+
+ ErrantRecordReporter reporter = mock(ErrantRecordReporter.class);
+ ElasticsearchClient client = new ElasticsearchClient(config, reporter);
+ client.createIndex(INDEX);
+ client.createMapping(INDEX, schema());
+
+ Schema schema = SchemaBuilder
+ .struct()
+ .name("record")
+ .field("offset", SchemaBuilder.bool().defaultValue(false).build())
+ .build();
+ Struct value = new Struct(schema).put("offset", false);
+ SinkRecord badRecord = new SinkRecord(INDEX, 0, Schema.STRING_SCHEMA, "key0", schema, value, 1);
+
+ writeRecord(sinkRecord("key0", 0), client);
+ client.flush();
+ waitUntilRecordsInES(1);
+
+ writeRecord(badRecord, client);
+ client.flush();
+
+ // failed requests take a bit longer
+ for (int i = 2; i < 7; i++) {
+ writeRecord(sinkRecord("key" + i, i + 1), client);
+ client.flush();
+ waitUntilRecordsInES(i);
+ }
+
+ verify(reporter, times(1)).report(eq(badRecord), any(Throwable.class));
+ client.close();
+ }
+
+ @Test
+ public void testReporterNotCalled() throws Exception {
+ ErrantRecordReporter reporter = mock(ErrantRecordReporter.class);
+ ElasticsearchClient client = new ElasticsearchClient(config, reporter);
+ client.createIndex(INDEX);
+
+ writeRecord(sinkRecord(0), client);
+ writeRecord(sinkRecord(1), client);
+ writeRecord(sinkRecord(2), client);
+ client.flush();
+
+ waitUntilRecordsInES(3);
+ assertEquals(3, helperClient.getDocCount(INDEX));
+ verify(reporter, never()).report(eq(sinkRecord(0)), any(Throwable.class));
+ client.close();
+ }
+
+
+ /**
+ * Cause a version conflict error.
+ * Assumes that Elasticsearch VersionType is 'EXTERNAL' for the records
+ * @param client The Elasticsearch client object to which to send records
+ */
+ private void causeExternalVersionConflictError(ElasticsearchClient client) throws InterruptedException {
+ client.createIndex(INDEX);
+
+ // Sequentially increase out record version (which comes from the offset)
+ writeRecord(sinkRecord(0), client);
+ writeRecord(sinkRecord(1), client);
+ writeRecord(sinkRecord(2), client);
+ client.flush();
+
+ // At the end of the day, it's just one record being overwritten
+ waitUntilRecordsInES(1);
+
+ // Now duplicate the last and then the one before that
+ writeRecord(sinkRecord(2), client);
+ writeRecord(sinkRecord(1), client);
+ client.flush();
+ }
+
+ /**
+ * If the record version is set to VersionType.EXTERNAL (normal case for non-streaming),
+ * then same or less version number will throw a version conflict exception.
+ * @throws Exception will be thrown if the test fails
+ */
+ @Test
+ public void testExternalVersionConflictReporterNotCalled() throws Exception {
+ props.put(IGNORE_KEY_CONFIG, "false");
+ // Suppress asynchronous operations
+ props.put(MAX_IN_FLIGHT_REQUESTS_CONFIG, "1");
+ config = new ElasticsearchSinkConnectorConfig(props);
+ converter = new DataConverter(config);
+
+ ErrantRecordReporter reporter = mock(ErrantRecordReporter.class);
+ ElasticsearchClient client = new ElasticsearchClient(config, reporter);
+
+ causeExternalVersionConflictError(client);
+
+ // Make sure that no error was reported for either offset [1, 2] record(s)
+ verify(reporter, never()).report(eq(sinkRecord(1)), any(Throwable.class));
+ verify(reporter, never()).report(eq(sinkRecord(2)), any(Throwable.class));
+ client.close();
+ }
+
+ /**
+ * If the record version is set to VersionType.INTERNAL (normal case streaming/logging),
+ * then same or less version number will throw a version conflict exception.
+ * In this test, we are checking that the client function `handleResponse`
+ * properly reports an error for seeing the version conflict error along with
+ * VersionType of INTERNAL. We still actually cause the error via an external
+ * version conflict error, but flip the version type to internal before it is interpreted.
+ * @throws Exception will be thrown if the test fails
+ */
+ @Test
+ public void testHandleResponseInternalVersionConflictReporterCalled() throws Exception {
+ props.put(IGNORE_KEY_CONFIG, "false");
+ // Suppress asynchronous operations
+ props.put(MAX_IN_FLIGHT_REQUESTS_CONFIG, "1");
+ config = new ElasticsearchSinkConnectorConfig(props);
+ converter = new DataConverter(config);
+
+ ErrantRecordReporter reporter = mock(ErrantRecordReporter.class);
+
+ // We will cause a version conflict error, but test that handleResponse()
+ // correctly reports the error when it interprets the version conflict as
+ // "INTERNAL" (version maintained by Elasticsearch) rather than
+ // "EXTERNAL" (version maintained by the connector as kafka offset)
+ ElasticsearchClient client = new ElasticsearchClient(config, reporter) {
+ protected void handleResponse(BulkItemResponse response, DocWriteRequest> request,
+ long executionId) {
+ // Make it think it was an internal version conflict.
+ // Note that we don't make any attempt to reset the response version number,
+ // which will be -1 here.
+ request.versionType(VersionType.INTERNAL);
+ super.handleResponse(response, request, executionId);
+ }
+ };
+
+ causeExternalVersionConflictError(client);
+
+ // Make sure that error was reported for either offset [1, 2] record(s)
+ verify(reporter, times(1)).report(eq(sinkRecord(1)), any(Throwable.class));
+ verify(reporter, times(1)).report(eq(sinkRecord(2)), any(Throwable.class));
+ client.close();
+ }
+
+ @Test
+ public void testNoVersionConflict() throws Exception {
+ props.put(IGNORE_KEY_CONFIG, "false");
+ props.put(WRITE_METHOD_CONFIG, WriteMethod.UPSERT.name());
+ config = new ElasticsearchSinkConnectorConfig(props);
+ converter = new DataConverter(config);
+
+ ErrantRecordReporter reporter = mock(ErrantRecordReporter.class);
+ ErrantRecordReporter reporter2 = mock(ErrantRecordReporter.class);
+ ElasticsearchClient client = new ElasticsearchClient(config, reporter);
+ ElasticsearchClient client2 = new ElasticsearchClient(config, reporter2);
+
+ client.createIndex(INDEX);
+
+ writeRecord(sinkRecord(0), client);
+ writeRecord(sinkRecord(1), client2);
+ writeRecord(sinkRecord(2), client);
+ writeRecord(sinkRecord(3), client2);
+ writeRecord(sinkRecord(4), client);
+ writeRecord(sinkRecord(5), client2);
+
+ waitUntilRecordsInES(1);
+ assertEquals(1, helperClient.getDocCount(INDEX));
+ verify(reporter, never()).report(any(SinkRecord.class), any(Throwable.class));
+ verify(reporter2, never()).report(any(SinkRecord.class), any(Throwable.class));
+ client.close();
+ client2.close();
+ }
+
+ @Test
+ public void testSsl() throws Exception {
+ container.close();
+ container = ElasticsearchContainer.fromSystemProperties().withSslEnabled(true);
+ container.start();
+
+ String address = container.getConnectionUrl(false);
+ props.put(CONNECTION_URL_CONFIG, address);
+ props.put(CONNECTION_USERNAME_CONFIG, ELASTIC_SUPERUSER_NAME);
+ props.put(CONNECTION_PASSWORD_CONFIG, ELASTIC_SUPERUSER_PASSWORD);
+ props.put(SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name());
+ props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, container.getKeystorePath());
+ props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, container.getKeystorePassword());
+ props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, container.getTruststorePath());
+ props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, container.getTruststorePassword());
+ props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEY_PASSWORD_CONFIG, container.getKeyPassword());
+ config = new ElasticsearchSinkConnectorConfig(props);
+ converter = new DataConverter(config);
+
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ helperClient = new ElasticsearchHelperClient(address, config);
+ client.createIndex(INDEX);
+
+ writeRecord(sinkRecord(0), client);
+ client.flush();
+
+ waitUntilRecordsInES(1);
+ assertEquals(1, helperClient.getDocCount(INDEX));
+ client.close();
+ helperClient = null;
+
+ container.close();
+ container = ElasticsearchContainer.fromSystemProperties();
+ container.start();
+ }
+
+ @Test
+ public void testConnectionUrlExtraSlash() {
+ props.put(CONNECTION_URL_CONFIG, container.getConnectionUrl() + "/");
+ config = new ElasticsearchSinkConnectorConfig(props);
+ ElasticsearchClient client = new ElasticsearchClient(config, null);
+ client.close();
+ }
+
+ private static Schema schema() {
+ return SchemaBuilder
+ .struct()
+ .name("record")
+ .field("offset", SchemaBuilder.int32().defaultValue(0).build())
+ .field("another", SchemaBuilder.int32().defaultValue(0).build())
+ .build();
+ }
+
+ private static SinkRecord sinkRecord(int offset) {
+ return sinkRecord("key", offset);
+ }
+
+ private static SinkRecord sinkRecord(String key, int offset) {
+ Struct value = new Struct(schema()).put("offset", offset).put("another", offset + 1);
+ return new SinkRecord(INDEX, 0, Schema.STRING_SCHEMA, key, schema(), value, offset);
+ }
+
+ private void waitUntilRecordsInES(int expectedRecords) throws InterruptedException {
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ return helperClient.getDocCount(INDEX) == expectedRecords;
+ } catch (ElasticsearchStatusException e) {
+ if (e.getMessage().contains("index_not_found_exception")) {
+ return false;
+ }
+
+ throw e;
+ }
+ },
+ TimeUnit.MINUTES.toMillis(1),
+ String.format("Could not find expected documents (%d) in time.", expectedRecords)
+ );
+ }
+
+ private void writeRecord(SinkRecord record, ElasticsearchClient client) {
+ client.index(record, converter.convertRecord(record, record.topic()));
+ }
+}
diff --git a/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfigTest.java b/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfigTest.java
index 4950e9e31..77686844a 100644
--- a/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfigTest.java
+++ b/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfigTest.java
@@ -1,40 +1,31 @@
package io.confluent.connect.elasticsearch;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_HOST_CONFIG;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_PASSWORD_CONFIG;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_PORT_CONFIG;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.PROXY_USERNAME_CONFIG;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SECURITY_PROTOCOL_CONFIG;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SSL_CONFIG_PREFIX;
-import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG;
+import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.*;
+import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SecurityProtocol;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.junit.Before;
import org.junit.Test;
-import java.util.HashMap;
-import java.util.Map;
-
public class ElasticsearchSinkConnectorConfigTest {
private Map props;
@Before
public void setup() {
- props = new HashMap<>();
- props.put(TYPE_NAME_CONFIG, ElasticsearchSinkTestBase.TYPE);
- props.put(CONNECTION_URL_CONFIG, "localhost");
- props.put(IGNORE_KEY_CONFIG, "true");
+ props = addNecessaryProps(new HashMap<>());
}
@Test
@@ -49,6 +40,7 @@ public void testSetHttpTimeoutsConfig() {
props.put(READ_TIMEOUT_MS_CONFIG, "10000");
props.put(CONNECTION_TIMEOUT_MS_CONFIG, "15000");
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
+
assertEquals(config.readTimeoutMs(), 10000);
assertEquals(config.connectionTimeoutMs(), 15000);
}
@@ -66,6 +58,7 @@ public void testSslConfigs() {
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path2");
props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "opensesame2");
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
+
Map sslConfigs = config.sslConfigs();
assertTrue(sslConfigs.size() > 0);
assertEquals(
@@ -83,30 +76,31 @@ public void testSslConfigs() {
@Test
public void testSecured() {
props.put(CONNECTION_URL_CONFIG, "http://host:9999");
- assertFalse(new ElasticsearchSinkConnectorConfig(props).secured());
+ assertFalse(new ElasticsearchSinkConnectorConfig(props).isSslEnabled());
props.put(CONNECTION_URL_CONFIG, "https://host:9999");
- assertFalse(new ElasticsearchSinkConnectorConfig(props).secured());
+ assertFalse(new ElasticsearchSinkConnectorConfig(props).isSslEnabled());
props.put(CONNECTION_URL_CONFIG, "http://host1:9992,https://host:9999");
- assertFalse(new ElasticsearchSinkConnectorConfig(props).secured());
+ assertFalse(new ElasticsearchSinkConnectorConfig(props).isSslEnabled());
// Default behavior should be backwards compat
props.put(CONNECTION_URL_CONFIG, "host1:9992");
- assertFalse(new ElasticsearchSinkConnectorConfig(props).secured());
+ assertFalse(new ElasticsearchSinkConnectorConfig(props).isSslEnabled());
props.put(SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name());
- assertTrue(new ElasticsearchSinkConnectorConfig(props).secured());
+ assertTrue(new ElasticsearchSinkConnectorConfig(props).isSslEnabled());
props.put(SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name());
props.put(CONNECTION_URL_CONFIG, "https://host:9999");
- assertFalse(new ElasticsearchSinkConnectorConfig(props).secured());
+ assertFalse(new ElasticsearchSinkConnectorConfig(props).isSslEnabled());
}
@Test
public void shouldAcceptValidBasicProxy() {
props.put(PROXY_HOST_CONFIG, "proxy host");
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
+
assertNotNull(config);
assertTrue(config.isBasicProxyConfigured());
assertFalse(config.isProxyWithAuthenticationConfigured());
@@ -115,12 +109,18 @@ public void shouldAcceptValidBasicProxy() {
@Test
public void shouldAcceptValidProxyWithAuthentication() {
props.put(PROXY_HOST_CONFIG, "proxy host");
+ props.put(PROXY_PORT_CONFIG, "1010");
props.put(PROXY_USERNAME_CONFIG, "username");
props.put(PROXY_PASSWORD_CONFIG, "password");
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
+
assertNotNull(config);
assertTrue(config.isBasicProxyConfigured());
assertTrue(config.isProxyWithAuthenticationConfigured());
+ assertEquals("proxy host", config.proxyHost());
+ assertEquals(1010, config.proxyPort());
+ assertEquals("username", config.proxyUsername());
+ assertEquals("password", config.proxyPassword().value());
}
@Test(expected = ConfigException.class)
@@ -128,4 +128,74 @@ public void shouldNotAllowInvalidProxyPort() {
props.put(PROXY_PORT_CONFIG, "-666");
new ElasticsearchSinkConnectorConfig(props);
}
+
+ @Test(expected = ConfigException.class)
+ public void shouldNotAllowInvalidUrl() {
+ props.put(CONNECTION_URL_CONFIG, ".com:/bbb/dfs,http://valid.com");
+ new ElasticsearchSinkConnectorConfig(props);
+ }
+
+ @Test(expected = ConfigException.class)
+ public void shouldNotAllowInvalidSecurityProtocol() {
+ props.put(SECURITY_PROTOCOL_CONFIG, "unsecure");
+ new ElasticsearchSinkConnectorConfig(props);
+ }
+
+ @Test
+ public void shouldDisableHostnameVerification() {
+ props.put(SSL_CONFIG_PREFIX + SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "https");
+ ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
+ assertFalse(config.shouldDisableHostnameVerification());
+
+ props.put(SSL_CONFIG_PREFIX + SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ config = new ElasticsearchSinkConnectorConfig(props);
+ assertTrue(config.shouldDisableHostnameVerification());
+
+ props.put(SSL_CONFIG_PREFIX + SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, null);
+ config = new ElasticsearchSinkConnectorConfig(props);
+ assertFalse(config.shouldDisableHostnameVerification());
+ }
+
+ @Test(expected = ConfigException.class)
+ public void shouldNotAllowInvalidExtensionKeytab() {
+ props.put(KERBEROS_KEYTAB_PATH_CONFIG, "keytab.wrongextension");
+ new ElasticsearchSinkConnectorConfig(props);
+ }
+
+ @Test(expected = ConfigException.class)
+ public void shouldNotAllowNonExistingKeytab() {
+ props.put(KERBEROS_KEYTAB_PATH_CONFIG, "idontexist.keytab");
+ new ElasticsearchSinkConnectorConfig(props);
+ }
+
+ @Test
+ public void shouldAllowValidKeytab() throws IOException {
+ Path keytab = Files.createTempFile("iexist", ".keytab");
+ props.put(KERBEROS_KEYTAB_PATH_CONFIG, keytab.toString());
+
+ new ElasticsearchSinkConnectorConfig(props);
+
+ keytab.toFile().delete();
+ }
+
+ public static Map addNecessaryProps(Map props) {
+ if (props == null) {
+ props = new HashMap<>();
+ }
+ props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost:8080");
+ return props;
+ }
+ @Test
+ public void testLogSensitiveData(){
+ ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
+ assertFalse(config.shouldLogSensitiveData());
+
+ props.put(LOG_SENSITIVE_DATA_CONFIG, "true");
+ config = new ElasticsearchSinkConnectorConfig(props);
+ assertTrue(config.shouldLogSensitiveData());
+
+ props.put(LOG_SENSITIVE_DATA_CONFIG, "false");
+ config = new ElasticsearchSinkConnectorConfig(props);
+ assertFalse(config.shouldLogSensitiveData());
+ }
}
diff --git a/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorTest.java b/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorTest.java
new file mode 100644
index 000000000..a22dc656a
--- /dev/null
+++ b/src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorTest.java
@@ -0,0 +1,73 @@
+package io.confluent.connect.elasticsearch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.config.Config;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ElasticsearchSinkConnectorTest {
+
+ private ElasticsearchSinkConnector connector;
+ private Map settings;
+
+ @Before
+ public void before() {
+ settings = ElasticsearchSinkConnectorConfigTest.addNecessaryProps(new HashMap<>());
+ connector = new ElasticsearchSinkConnector();
+ }
+
+ @Test(expected = ConnectException.class)
+ public void shouldCatchInvalidConfigs() {
+ connector.start(new HashMap<>());
+ }
+
+ @Test
+ public void shouldGenerateValidTaskConfigs() {
+ connector.start(settings);
+ List