|
6 | 6 |
|
7 | 7 | package org.elasticsearch.xpack.transform.integration;
|
8 | 8 |
|
| 9 | +import org.apache.http.entity.ContentType; |
| 10 | +import org.apache.http.entity.StringEntity; |
9 | 11 | import org.elasticsearch.client.Request;
|
10 | 12 | import org.elasticsearch.client.ResponseException;
|
| 13 | +import org.elasticsearch.common.Strings; |
| 14 | +import org.elasticsearch.common.xcontent.XContentBuilder; |
11 | 15 | import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
12 | 16 | import org.junit.Before;
|
13 | 17 |
|
|
19 | 23 | import java.util.Map;
|
20 | 24 | import java.util.Set;
|
21 | 25 |
|
| 26 | +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; |
22 | 27 | import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
|
23 | 28 | import static org.hamcrest.Matchers.containsString;
|
24 | 29 | import static org.hamcrest.Matchers.equalTo;
|
@@ -1022,4 +1027,143 @@ public void testContinuousStopWaitForCheckpoint() throws Exception {
|
1022 | 1027 | assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918);
|
1023 | 1028 | deleteIndex(indexName);
|
1024 | 1029 | }
|
| 1030 | + |
| 1031 | + public void testContinuousDateNanos() throws Exception { |
| 1032 | + String indexName = "nanos"; |
| 1033 | + createDateNanoIndex(indexName, 1000); |
| 1034 | + String transformId = "nanos_continuous_pivot"; |
| 1035 | + String transformIndex = "pivot_nanos_continuous"; |
| 1036 | + setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex); |
| 1037 | + final Request createTransformRequest = createRequestWithAuth( |
| 1038 | + "PUT", |
| 1039 | + getTransformEndpoint() + transformId, |
| 1040 | + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS |
| 1041 | + ); |
| 1042 | + String config = "{" |
| 1043 | + + " \"source\": {\"index\":\"" |
| 1044 | + + indexName |
| 1045 | + + "\"}," |
| 1046 | + + " \"dest\": {\"index\":\"" |
| 1047 | + + transformIndex |
| 1048 | + + "\"}," |
| 1049 | + + " \"frequency\": \"1s\"," |
| 1050 | + + " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}}," |
| 1051 | + + " \"pivot\": {" |
| 1052 | + + " \"group_by\": {" |
| 1053 | + + " \"id\": {" |
| 1054 | + + " \"terms\": {" |
| 1055 | + + " \"field\": \"id\"" |
| 1056 | + + " } } }," |
| 1057 | + + " \"aggregations\": {" |
| 1058 | + + " \"avg_rating\": {" |
| 1059 | + + " \"avg\": {" |
| 1060 | + + " \"field\": \"rating\"" |
| 1061 | + + " } } } }" |
| 1062 | + + "}"; |
| 1063 | + createTransformRequest.setJsonEntity(config); |
| 1064 | + Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); |
| 1065 | + assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); |
| 1066 | + |
| 1067 | + startAndWaitForContinuousTransform(transformId, transformIndex, null); |
| 1068 | + assertTrue(indexExists(transformIndex)); |
| 1069 | + // get and check some ids |
| 1070 | + assertOnePivotValue(transformIndex + "/_search?q=id:id_0", 2.97); |
| 1071 | + assertOnePivotValue(transformIndex + "/_search?q=id:id_1", 2.99); |
| 1072 | + assertOnePivotValue(transformIndex + "/_search?q=id:id_7", 2.97); |
| 1073 | + assertOnePivotValue(transformIndex + "/_search?q=id:id_9", 3.01); |
| 1074 | + |
| 1075 | + String nanoResolutionTimeStamp = Instant.now().minusSeconds(1).plusNanos(randomIntBetween(1, 1000000)).toString(); |
| 1076 | + |
| 1077 | + final StringBuilder bulk = new StringBuilder(); |
| 1078 | + for (int i = 0; i < 20; i++) { |
| 1079 | + bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n"); |
| 1080 | + bulk.append("{\"id\":\"") |
| 1081 | + .append("id_") |
| 1082 | + .append(i % 5) |
| 1083 | + .append("\",\"rating\":") |
| 1084 | + .append(7) |
| 1085 | + .append(",\"timestamp\":") |
| 1086 | + .append("\"" + nanoResolutionTimeStamp + "\"") |
| 1087 | + .append("}\n"); |
| 1088 | + } |
| 1089 | + bulk.append("\r\n"); |
| 1090 | + |
| 1091 | + final Request bulkRequest = new Request("POST", "/_bulk"); |
| 1092 | + bulkRequest.addParameter("refresh", "true"); |
| 1093 | + bulkRequest.setJsonEntity(bulk.toString()); |
| 1094 | + client().performRequest(bulkRequest); |
| 1095 | + |
| 1096 | + waitForTransformCheckpoint(transformId, 2); |
| 1097 | + |
| 1098 | + stopTransform(transformId, false); |
| 1099 | + refreshIndex(transformIndex); |
| 1100 | + |
| 1101 | + // assert changes |
| 1102 | + assertOnePivotValue(transformIndex + "/_search?q=id:id_0", 3.125); |
| 1103 | + assertOnePivotValue(transformIndex + "/_search?q=id:id_1", 3.144230769); |
| 1104 | + |
| 1105 | + // assert unchanged |
| 1106 | + assertOnePivotValue(transformIndex + "/_search?q=id:id_7", 2.97); |
| 1107 | + assertOnePivotValue(transformIndex + "/_search?q=id:id_9", 3.01); |
| 1108 | + |
| 1109 | + deleteIndex(indexName); |
| 1110 | + } |
| 1111 | + |
| 1112 | + private void createDateNanoIndex(String indexName, int numDocs) throws IOException { |
| 1113 | + // create mapping |
| 1114 | + try (XContentBuilder builder = jsonBuilder()) { |
| 1115 | + builder.startObject(); |
| 1116 | + { |
| 1117 | + builder.startObject("mappings") |
| 1118 | + .startObject("properties") |
| 1119 | + .startObject("timestamp") |
| 1120 | + .field("type", "date_nanos") |
| 1121 | + .field("format", "strict_date_optional_time_nanos") |
| 1122 | + .endObject() |
| 1123 | + .startObject("id") |
| 1124 | + .field("type", "keyword") |
| 1125 | + .endObject() |
| 1126 | + .startObject("rating") |
| 1127 | + .field("type", "integer") |
| 1128 | + .endObject() |
| 1129 | + .endObject() |
| 1130 | + .endObject(); |
| 1131 | + } |
| 1132 | + builder.endObject(); |
| 1133 | + final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON); |
| 1134 | + Request req = new Request("PUT", indexName); |
| 1135 | + req.setEntity(entity); |
| 1136 | + client().performRequest(req); |
| 1137 | + } |
| 1138 | + |
| 1139 | + String randomNanos = "," + randomIntBetween(100000000, 999999999); |
| 1140 | + final StringBuilder bulk = new StringBuilder(); |
| 1141 | + for (int i = 0; i < numDocs; i++) { |
| 1142 | + bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n"); |
| 1143 | + bulk.append("{\"id\":\"") |
| 1144 | + .append("id_") |
| 1145 | + .append(i % 10) |
| 1146 | + .append("\",\"rating\":") |
| 1147 | + .append(i % 7) |
| 1148 | + .append(",\"timestamp\":") |
| 1149 | + .append("\"2020-01-27T01:59:00" + randomNanos + "Z\"") |
| 1150 | + .append("}\n"); |
| 1151 | + |
| 1152 | + if (i % 50 == 0) { |
| 1153 | + bulk.append("\r\n"); |
| 1154 | + final Request bulkRequest = new Request("POST", "/_bulk"); |
| 1155 | + bulkRequest.addParameter("refresh", "true"); |
| 1156 | + bulkRequest.setJsonEntity(bulk.toString()); |
| 1157 | + client().performRequest(bulkRequest); |
| 1158 | + // clear the builder |
| 1159 | + bulk.setLength(0); |
| 1160 | + } |
| 1161 | + } |
| 1162 | + bulk.append("\r\n"); |
| 1163 | + |
| 1164 | + final Request bulkRequest = new Request("POST", "/_bulk"); |
| 1165 | + bulkRequest.addParameter("refresh", "true"); |
| 1166 | + bulkRequest.setJsonEntity(bulk.toString()); |
| 1167 | + client().performRequest(bulkRequest); |
| 1168 | + } |
1025 | 1169 | }
|
0 commit comments