52
52
import org .elasticsearch .xpack .sql .util .StringUtils ;
53
53
54
54
import java .io .IOException ;
55
+ import java .time .ZoneId ;
55
56
import java .util .ArrayList ;
56
57
import java .util .LinkedHashSet ;
57
58
import java .util .List ;
@@ -64,19 +65,21 @@ public class Querier {
64
65
private final Logger log = LogManager .getLogger (getClass ());
65
66
66
67
private final TimeValue keepAlive , timeout ;
68
+ private final ZoneId zoneId ;
67
69
private final int size ;
68
70
private final Client client ;
69
71
@ Nullable
70
72
private final QueryBuilder filter ;
71
73
72
74
public Querier (Client client , Configuration cfg ) {
73
- this (client , cfg .requestTimeout (), cfg .pageTimeout (), cfg .filter (), cfg .pageSize ());
75
+ this (client , cfg .requestTimeout (), cfg .pageTimeout (), cfg .zoneId (), cfg . filter (), cfg .pageSize ());
74
76
}
75
77
76
- public Querier (Client client , TimeValue keepAlive , TimeValue timeout , QueryBuilder filter , int size ) {
78
+ public Querier (Client client , TimeValue keepAlive , TimeValue timeout , ZoneId zoneId , QueryBuilder filter , int size ) {
77
79
this .client = client ;
78
80
this .keepAlive = keepAlive ;
79
81
this .timeout = timeout ;
82
+ this .zoneId = zoneId ;
80
83
this .filter = filter ;
81
84
this .size = size ;
82
85
}
@@ -98,13 +101,13 @@ public void query(Schema schema, QueryContainer query, String index, ActionListe
98
101
ActionListener <SearchResponse > l ;
99
102
if (query .isAggsOnly ()) {
100
103
if (query .aggs ().useImplicitGroupBy ()) {
101
- l = new ImplicitGroupActionListener (listener , client , timeout , schema , query , search );
104
+ l = new ImplicitGroupActionListener (listener , client , timeout , zoneId , schema , query , search );
102
105
} else {
103
- l = new CompositeActionListener (listener , client , timeout , schema , query , search );
106
+ l = new CompositeActionListener (listener , client , timeout , zoneId , schema , query , search );
104
107
}
105
108
} else {
106
109
search .scroll (keepAlive );
107
- l = new ScrollActionListener (listener , client , timeout , schema , query );
110
+ l = new ScrollActionListener (listener , client , timeout , zoneId , schema , query );
108
111
}
109
112
110
113
client .search (search , l );
@@ -149,9 +152,9 @@ public Aggregations getAggregations() {
149
152
}
150
153
});
151
154
152
- ImplicitGroupActionListener (ActionListener <SchemaRowSet > listener , Client client , TimeValue keepAlive , Schema schema ,
153
- QueryContainer query , SearchRequest request ) {
154
- super (listener , client , keepAlive , schema , query , request );
155
+ ImplicitGroupActionListener (ActionListener <SchemaRowSet > listener , Client client , TimeValue keepAlive , ZoneId zoneId ,
156
+ Schema schema , QueryContainer query , SearchRequest request ) {
157
+ super (listener , client , keepAlive , zoneId , schema , query , request );
155
158
}
156
159
157
160
@ Override
@@ -197,12 +200,11 @@ private void handleBuckets(List<? extends Bucket> buckets, SearchResponse respon
197
200
*/
198
201
static class CompositeActionListener extends BaseAggActionListener {
199
202
200
- CompositeActionListener (ActionListener <SchemaRowSet > listener , Client client , TimeValue keepAlive ,
203
+ CompositeActionListener (ActionListener <SchemaRowSet > listener , Client client , TimeValue keepAlive , ZoneId zoneId ,
201
204
Schema schema , QueryContainer query , SearchRequest request ) {
202
- super (listener , client , keepAlive , schema , query , request );
205
+ super (listener , client , keepAlive , zoneId , schema , query , request );
203
206
}
204
207
205
-
206
208
@ Override
207
209
protected void handleResponse (SearchResponse response , ActionListener <SchemaRowSet > listener ) {
208
210
// there are some results
@@ -240,9 +242,9 @@ abstract static class BaseAggActionListener extends BaseActionListener {
240
242
final QueryContainer query ;
241
243
final SearchRequest request ;
242
244
243
- BaseAggActionListener (ActionListener <SchemaRowSet > listener , Client client , TimeValue keepAlive , Schema schema ,
244
- QueryContainer query , SearchRequest request ) {
245
- super (listener , client , keepAlive , schema );
245
+ BaseAggActionListener (ActionListener <SchemaRowSet > listener , Client client , TimeValue keepAlive , ZoneId zoneId ,
246
+ Schema schema , QueryContainer query , SearchRequest request ) {
247
+ super (listener , client , keepAlive , zoneId , schema );
246
248
247
249
this .query = query ;
248
250
this .request = request ;
@@ -263,7 +265,7 @@ protected List<BucketExtractor> initBucketExtractors(SearchResponse response) {
263
265
private BucketExtractor createExtractor (FieldExtraction ref , BucketExtractor totalCount ) {
264
266
if (ref instanceof GroupByRef ) {
265
267
GroupByRef r = (GroupByRef ) ref ;
266
- return new CompositeKeyExtractor (r .key (), r .property (), r . zoneId ());
268
+ return new CompositeKeyExtractor (r .key (), r .property (), zoneId , r . isDateTimeBased ());
267
269
}
268
270
269
271
if (ref instanceof MetricAggRef ) {
@@ -297,9 +299,9 @@ private BucketExtractor createExtractor(FieldExtraction ref, BucketExtractor tot
297
299
static class ScrollActionListener extends BaseActionListener {
298
300
private final QueryContainer query ;
299
301
300
- ScrollActionListener (ActionListener <SchemaRowSet > listener , Client client , TimeValue keepAlive ,
302
+ ScrollActionListener (ActionListener <SchemaRowSet > listener , Client client , TimeValue keepAlive , ZoneId zoneId ,
301
303
Schema schema , QueryContainer query ) {
302
- super (listener , client , keepAlive , schema );
304
+ super (listener , client , keepAlive , zoneId , schema );
303
305
this .query = query ;
304
306
}
305
307
@@ -344,12 +346,12 @@ protected void handleResponse(SearchResponse response, ActionListener<SchemaRowS
344
346
private HitExtractor createExtractor (FieldExtraction ref ) {
345
347
if (ref instanceof SearchHitFieldRef ) {
346
348
SearchHitFieldRef f = (SearchHitFieldRef ) ref ;
347
- return new FieldHitExtractor (f .name (), f .getDataType (), f .useDocValue (), f .hitName ());
349
+ return new FieldHitExtractor (f .name (), f .getDataType (), zoneId , f .useDocValue (), f .hitName ());
348
350
}
349
351
350
352
if (ref instanceof ScriptFieldRef ) {
351
353
ScriptFieldRef f = (ScriptFieldRef ) ref ;
352
- return new FieldHitExtractor (f .name (), null , true );
354
+ return new FieldHitExtractor (f .name (), null , zoneId , true );
353
355
}
354
356
355
357
if (ref instanceof ComputedRef ) {
@@ -387,13 +389,15 @@ abstract static class BaseActionListener implements ActionListener<SearchRespons
387
389
388
390
final Client client ;
389
391
final TimeValue keepAlive ;
392
+ final ZoneId zoneId ;
390
393
final Schema schema ;
391
394
392
- BaseActionListener (ActionListener <SchemaRowSet > listener , Client client , TimeValue keepAlive , Schema schema ) {
395
+ BaseActionListener (ActionListener <SchemaRowSet > listener , Client client , TimeValue keepAlive , ZoneId zoneId , Schema schema ) {
393
396
this .listener = listener ;
394
397
395
398
this .client = client ;
396
399
this .keepAlive = keepAlive ;
400
+ this .zoneId = zoneId ;
397
401
this .schema = schema ;
398
402
}
399
403
0 commit comments