Skip to content

Commit 7c10e9b

Browse files
author
Hendrik Muhs
committed
[Transform] improve checkpoint reporting (#50369)
fixes empty checkpoints, re-factors checkpoint info creation (moves builder) and always reports last change detection relates #43201 relates #50018
1 parent de14092 commit 7c10e9b

File tree

10 files changed

+327
-211
lines changed

10 files changed

+327
-211
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java

+24-21
Original file line numberDiff line numberDiff line change
@@ -62,28 +62,27 @@ public class TransformCheckpoint implements Writeable, ToXContentObject {
6262
private final long timeUpperBoundMillis;
6363

6464
private static ConstructingObjectParser<TransformCheckpoint, Void> createParser(boolean lenient) {
65-
ConstructingObjectParser<TransformCheckpoint, Void> parser = new ConstructingObjectParser<>(NAME,
66-
lenient, args -> {
67-
String id = (String) args[0];
68-
long timestamp = (Long) args[1];
69-
long checkpoint = (Long) args[2];
65+
ConstructingObjectParser<TransformCheckpoint, Void> parser = new ConstructingObjectParser<>(NAME, lenient, args -> {
66+
String id = (String) args[0];
67+
long timestamp = (Long) args[1];
68+
long checkpoint = (Long) args[2];
7069

71-
@SuppressWarnings("unchecked")
72-
Map<String, long[]> checkpoints = (Map<String, long[]>) args[3];
70+
@SuppressWarnings("unchecked")
71+
Map<String, long[]> checkpoints = (Map<String, long[]>) args[3];
7372

74-
Long timeUpperBound = (Long) args[4];
73+
Long timeUpperBound = (Long) args[4];
7574

76-
// ignored, only for internal storage: String docType = (String) args[5];
77-
return new TransformCheckpoint(id, timestamp, checkpoint, checkpoints, timeUpperBound);
78-
});
75+
// ignored, only for internal storage: String docType = (String) args[5];
76+
return new TransformCheckpoint(id, timestamp, checkpoint, checkpoints, timeUpperBound);
77+
});
7978

8079
parser.declareString(constructorArg(), TransformField.ID);
8180

8281
// note: this is never parsed from the outside where timestamp can be formatted as date time
8382
parser.declareLong(constructorArg(), TransformField.TIMESTAMP_MILLIS);
8483
parser.declareLong(constructorArg(), CHECKPOINT);
8584

86-
parser.declareObject(constructorArg(), (p,c) -> {
85+
parser.declareObject(constructorArg(), (p, c) -> {
8786
Map<String, long[]> checkPointsByIndexName = new TreeMap<>();
8887
XContentParser.Token token = null;
8988
while ((token = p.nextToken()) != XContentParser.Token.END_OBJECT) {
@@ -108,8 +107,7 @@ private static ConstructingObjectParser<TransformCheckpoint, Void> createParser(
108107
return parser;
109108
}
110109

111-
public TransformCheckpoint(String transformId, long timestamp, long checkpoint, Map<String, long[]> checkpoints,
112-
Long timeUpperBound) {
110+
public TransformCheckpoint(String transformId, long timestamp, long checkpoint, Map<String, long[]> checkpoints, Long timeUpperBound) {
113111
this.transformId = Objects.requireNonNull(transformId);
114112
this.timestampMillis = timestamp;
115113
this.checkpoint = checkpoint;
@@ -126,7 +124,7 @@ public TransformCheckpoint(StreamInput in) throws IOException {
126124
}
127125

128126
public boolean isEmpty() {
129-
return indicesCheckpoints.isEmpty();
127+
return this.equals(EMPTY);
130128
}
131129

132130
/**
@@ -212,8 +210,10 @@ public boolean equals(Object other) {
212210
final TransformCheckpoint that = (TransformCheckpoint) other;
213211

214212
// compare the timestamp, id, checkpoint and than call matches for the rest
215-
return this.timestampMillis == that.timestampMillis && this.checkpoint == that.checkpoint
216-
&& this.timeUpperBoundMillis == that.timeUpperBoundMillis && matches(that);
213+
return this.timestampMillis == that.timestampMillis
214+
&& this.checkpoint == that.checkpoint
215+
&& this.timeUpperBoundMillis == that.timeUpperBoundMillis
216+
&& matches(that);
217217
}
218218

219219
/**
@@ -224,7 +224,7 @@ public boolean equals(Object other) {
224224
* @param that other checkpoint
225225
* @return true if checkpoints match
226226
*/
227-
public boolean matches (TransformCheckpoint that) {
227+
public boolean matches(TransformCheckpoint that) {
228228
if (this == that) {
229229
return true;
230230
}
@@ -258,7 +258,7 @@ public static String documentId(String transformId, long checkpoint) {
258258
return NAME + "-" + transformId + "-" + checkpoint;
259259
}
260260

261-
public static boolean isNullOrEmpty (TransformCheckpoint checkpoint) {
261+
public static boolean isNullOrEmpty(TransformCheckpoint checkpoint) {
262262
return checkpoint == null || checkpoint.isEmpty();
263263
}
264264

@@ -315,8 +315,11 @@ private static Map<String, long[]> readCheckpoints(Map<String, Object> readMap)
315315
if (e.getValue() instanceof long[]) {
316316
checkpoints.put(e.getKey(), (long[]) e.getValue());
317317
} else {
318-
throw new ElasticsearchParseException("expecting the checkpoints for [{}] to be a long[], but found [{}] instead",
319-
e.getKey(), e.getValue().getClass());
318+
throw new ElasticsearchParseException(
319+
"expecting the checkpoints for [{}] to be a long[], but found [{}] instead",
320+
e.getKey(),
321+
e.getValue().getClass()
322+
);
320323
}
321324
}
322325
return checkpoints;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java

+149-44
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,110 @@
3131
*/
3232
public class TransformCheckpointingInfo implements Writeable, ToXContentObject {
3333

34+
/**
35+
* Builder for collecting checkpointing information for the purpose of _stats
36+
*/
37+
public static class TransformCheckpointingInfoBuilder {
38+
private TransformIndexerPosition nextCheckpointPosition;
39+
private TransformProgress nextCheckpointProgress;
40+
private TransformCheckpoint lastCheckpoint;
41+
private TransformCheckpoint nextCheckpoint;
42+
private TransformCheckpoint sourceCheckpoint;
43+
private Instant changesLastDetectedAt;
44+
private long operationsBehind;
45+
46+
public TransformCheckpointingInfoBuilder() {}
47+
48+
public TransformCheckpointingInfo build() {
49+
if (lastCheckpoint == null) {
50+
lastCheckpoint = TransformCheckpoint.EMPTY;
51+
}
52+
if (nextCheckpoint == null) {
53+
nextCheckpoint = TransformCheckpoint.EMPTY;
54+
}
55+
if (sourceCheckpoint == null) {
56+
sourceCheckpoint = TransformCheckpoint.EMPTY;
57+
}
58+
59+
// checkpointstats requires a non-negative checkpoint number
60+
long lastCheckpointNumber = lastCheckpoint.getCheckpoint() > 0 ? lastCheckpoint.getCheckpoint() : 0;
61+
long nextCheckpointNumber = nextCheckpoint.getCheckpoint() > 0 ? nextCheckpoint.getCheckpoint() : 0;
62+
63+
return new TransformCheckpointingInfo(
64+
new TransformCheckpointStats(
65+
lastCheckpointNumber,
66+
null,
67+
null,
68+
lastCheckpoint.getTimestamp(),
69+
lastCheckpoint.getTimeUpperBound()
70+
),
71+
new TransformCheckpointStats(
72+
nextCheckpointNumber,
73+
nextCheckpointPosition,
74+
nextCheckpointProgress,
75+
nextCheckpoint.getTimestamp(),
76+
nextCheckpoint.getTimeUpperBound()
77+
),
78+
operationsBehind,
79+
changesLastDetectedAt
80+
);
81+
}
82+
83+
public TransformCheckpointingInfoBuilder setLastCheckpoint(TransformCheckpoint lastCheckpoint) {
84+
this.lastCheckpoint = lastCheckpoint;
85+
return this;
86+
}
87+
88+
public TransformCheckpoint getLastCheckpoint() {
89+
return lastCheckpoint;
90+
}
91+
92+
public TransformCheckpointingInfoBuilder setNextCheckpoint(TransformCheckpoint nextCheckpoint) {
93+
this.nextCheckpoint = nextCheckpoint;
94+
return this;
95+
}
96+
97+
public TransformCheckpoint getNextCheckpoint() {
98+
return nextCheckpoint;
99+
}
100+
101+
public TransformCheckpointingInfoBuilder setSourceCheckpoint(TransformCheckpoint sourceCheckpoint) {
102+
this.sourceCheckpoint = sourceCheckpoint;
103+
return this;
104+
}
105+
106+
public TransformCheckpoint getSourceCheckpoint() {
107+
return sourceCheckpoint;
108+
}
109+
110+
public TransformCheckpointingInfoBuilder setNextCheckpointProgress(TransformProgress nextCheckpointProgress) {
111+
this.nextCheckpointProgress = nextCheckpointProgress;
112+
return this;
113+
}
114+
115+
public TransformCheckpointingInfoBuilder setNextCheckpointPosition(TransformIndexerPosition nextCheckpointPosition) {
116+
this.nextCheckpointPosition = nextCheckpointPosition;
117+
return this;
118+
}
119+
120+
public TransformCheckpointingInfoBuilder setChangesLastDetectedAt(Instant changesLastDetectedAt) {
121+
this.changesLastDetectedAt = changesLastDetectedAt;
122+
return this;
123+
}
124+
125+
public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehind) {
126+
this.operationsBehind = operationsBehind;
127+
return this;
128+
}
129+
130+
}
131+
34132
public static final TransformCheckpointingInfo EMPTY = new TransformCheckpointingInfo(
35133
TransformCheckpointStats.EMPTY,
36134
TransformCheckpointStats.EMPTY,
37135
0L,
38-
null);
136+
null
137+
);
39138

40139
public static final ParseField LAST_CHECKPOINT = new ParseField("last");
41140
public static final ParseField NEXT_CHECKPOINT = new ParseField("next");
@@ -44,32 +143,41 @@ public class TransformCheckpointingInfo implements Writeable, ToXContentObject {
44143
private final TransformCheckpointStats last;
45144
private final TransformCheckpointStats next;
46145
private final long operationsBehind;
47-
private Instant changesLastDetectedAt;
48-
49-
private static final ConstructingObjectParser<TransformCheckpointingInfo, Void> LENIENT_PARSER =
50-
new ConstructingObjectParser<>(
51-
"data_frame_transform_checkpointing_info",
52-
true,
53-
a -> {
54-
long behind = a[2] == null ? 0L : (Long) a[2];
55-
Instant changesLastDetectedAt = (Instant)a[3];
56-
return new TransformCheckpointingInfo(
57-
a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0],
58-
a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1],
59-
behind,
60-
changesLastDetectedAt);
61-
});
146+
private final Instant changesLastDetectedAt;
147+
148+
private static final ConstructingObjectParser<TransformCheckpointingInfo, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
149+
"data_frame_transform_checkpointing_info",
150+
true,
151+
a -> {
152+
long behind = a[2] == null ? 0L : (Long) a[2];
153+
Instant changesLastDetectedAt = (Instant) a[3];
154+
return new TransformCheckpointingInfo(
155+
a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0],
156+
a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1],
157+
behind,
158+
changesLastDetectedAt
159+
);
160+
}
161+
);
62162

63163
static {
64-
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
65-
TransformCheckpointStats.LENIENT_PARSER::apply, LAST_CHECKPOINT);
66-
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
67-
TransformCheckpointStats.LENIENT_PARSER::apply, NEXT_CHECKPOINT);
164+
LENIENT_PARSER.declareObject(
165+
ConstructingObjectParser.optionalConstructorArg(),
166+
TransformCheckpointStats.LENIENT_PARSER::apply,
167+
LAST_CHECKPOINT
168+
);
169+
LENIENT_PARSER.declareObject(
170+
ConstructingObjectParser.optionalConstructorArg(),
171+
TransformCheckpointStats.LENIENT_PARSER::apply,
172+
NEXT_CHECKPOINT
173+
);
68174
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND);
69-
LENIENT_PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
175+
LENIENT_PARSER.declareField(
176+
ConstructingObjectParser.optionalConstructorArg(),
70177
p -> TimeUtils.parseTimeFieldToInstant(p, CHANGES_LAST_DETECTED_AT.getPreferredName()),
71178
CHANGES_LAST_DETECTED_AT,
72-
ObjectParser.ValueType.VALUE);
179+
ObjectParser.ValueType.VALUE
180+
);
73181
}
74182

75183
/**
@@ -81,28 +189,26 @@ public class TransformCheckpointingInfo implements Writeable, ToXContentObject {
81189
* @param operationsBehind counter of operations the current checkpoint is behind source
82190
* @param changesLastDetectedAt the last time the source indices were checked for changes
83191
*/
84-
public TransformCheckpointingInfo(TransformCheckpointStats last,
85-
TransformCheckpointStats next,
86-
long operationsBehind,
87-
Instant changesLastDetectedAt) {
192+
public TransformCheckpointingInfo(
193+
TransformCheckpointStats last,
194+
TransformCheckpointStats next,
195+
long operationsBehind,
196+
Instant changesLastDetectedAt
197+
) {
88198
this.last = Objects.requireNonNull(last);
89199
this.next = Objects.requireNonNull(next);
90200
this.operationsBehind = operationsBehind;
91201
this.changesLastDetectedAt = changesLastDetectedAt == null ? null : Instant.ofEpochMilli(changesLastDetectedAt.toEpochMilli());
92202
}
93203

94-
public TransformCheckpointingInfo(TransformCheckpointStats last,
95-
TransformCheckpointStats next,
96-
long operationsBehind) {
97-
this(last, next, operationsBehind, null);
98-
}
99-
100204
public TransformCheckpointingInfo(StreamInput in) throws IOException {
101205
last = new TransformCheckpointStats(in);
102206
next = new TransformCheckpointStats(in);
103207
operationsBehind = in.readLong();
104208
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
105209
changesLastDetectedAt = in.readOptionalInstant();
210+
} else {
211+
changesLastDetectedAt = null;
106212
}
107213
}
108214

@@ -122,23 +228,22 @@ public Instant getChangesLastDetectedAt() {
122228
return changesLastDetectedAt;
123229
}
124230

125-
public TransformCheckpointingInfo setChangesLastDetectedAt(Instant changesLastDetectedAt) {
126-
this.changesLastDetectedAt = Instant.ofEpochMilli(Objects.requireNonNull(changesLastDetectedAt).toEpochMilli());
127-
return this;
128-
}
129-
130231
@Override
131232
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
132233
builder.startObject();
133234
builder.field(LAST_CHECKPOINT.getPreferredName(), last);
134235
if (next.getCheckpoint() > 0) {
135236
builder.field(NEXT_CHECKPOINT.getPreferredName(), next);
136237
}
137-
builder.field(OPERATIONS_BEHIND.getPreferredName(), operationsBehind);
238+
if (operationsBehind > 0) {
239+
builder.field(OPERATIONS_BEHIND.getPreferredName(), operationsBehind);
240+
}
138241
if (changesLastDetectedAt != null) {
139-
builder.timeField(CHANGES_LAST_DETECTED_AT.getPreferredName(),
242+
builder.timeField(
243+
CHANGES_LAST_DETECTED_AT.getPreferredName(),
140244
CHANGES_LAST_DETECTED_AT.getPreferredName() + "_string",
141-
changesLastDetectedAt.toEpochMilli());
245+
changesLastDetectedAt.toEpochMilli()
246+
);
142247
}
143248
builder.endObject();
144249
return builder;
@@ -175,10 +280,10 @@ public boolean equals(Object other) {
175280

176281
TransformCheckpointingInfo that = (TransformCheckpointingInfo) other;
177282

178-
return Objects.equals(this.last, that.last) &&
179-
Objects.equals(this.next, that.next) &&
180-
this.operationsBehind == that.operationsBehind &&
181-
Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt);
283+
return Objects.equals(this.last, that.last)
284+
&& Objects.equals(this.next, that.next)
285+
&& this.operationsBehind == that.operationsBehind
286+
&& Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt);
182287
}
183288

184289
@Override

0 commit comments

Comments
 (0)