Skip to content

Commit 263349f

Browse files
authored
Decouple TimeValue from Elasticsearch server classes (#29454)
* Decouple TimeValue from Elasticsearch server classes This commit decouples the `TimeValue` class from the other server classes. This is in preperation to move `TimeValue` into the `elasticsearch-core` jar, allowing us to use it from projects that cannot depend on the elasticsearch-core library. Relates to #28504
1 parent 0a21533 commit 263349f

27 files changed

+180
-129
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public ClusterHealthRequest(StreamInput in) throws IOException {
6464
indices[i] = in.readString();
6565
}
6666
}
67-
timeout = new TimeValue(in);
67+
timeout = in.readTimeValue();
6868
if (in.readBoolean()) {
6969
waitForStatus = ClusterHealthStatus.fromValue(in.readByte());
7070
}
@@ -90,7 +90,7 @@ public void writeTo(StreamOutput out) throws IOException {
9090
out.writeString(index);
9191
}
9292
}
93-
timeout.writeTo(out);
93+
out.writeTimeValue(timeout);
9494
if (waitForStatus == null) {
9595
out.writeBoolean(false);
9696
} else {

server/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthResponse.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public void readFrom(StreamInput in) throws IOException {
184184
timedOut = in.readBoolean();
185185
numberOfInFlightFetch = in.readInt();
186186
delayedUnassignedShards= in.readInt();
187-
taskMaxWaitingTime = new TimeValue(in);
187+
taskMaxWaitingTime = in.readTimeValue();
188188
}
189189

190190
@Override
@@ -197,7 +197,7 @@ public void writeTo(StreamOutput out) throws IOException {
197197
out.writeBoolean(timedOut);
198198
out.writeInt(numberOfInFlightFetch);
199199
out.writeInt(delayedUnassignedShards);
200-
taskMaxWaitingTime.writeTo(out);
200+
out.writeTimeValue(taskMaxWaitingTime);
201201
}
202202

203203
@Override

server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public void readFrom(StreamInput in) throws IOException {
9999
threads = in.readInt();
100100
ignoreIdleThreads = in.readBoolean();
101101
type = in.readString();
102-
interval = new TimeValue(in);
102+
interval = in.readTimeValue();
103103
snapshots = in.readInt();
104104
}
105105

@@ -109,7 +109,7 @@ public void writeTo(StreamOutput out) throws IOException {
109109
out.writeInt(threads);
110110
out.writeBoolean(ignoreIdleThreads);
111111
out.writeString(type);
112-
interval.writeTo(out);
112+
out.writeTimeValue(interval);
113113
out.writeInt(snapshots);
114114
}
115115
}

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,15 @@ public ActionRequestValidationException validate() {
105105
public void readFrom(StreamInput in) throws IOException {
106106
super.readFrom(in);
107107
taskId = TaskId.readFromStream(in);
108-
timeout = in.readOptionalWriteable(TimeValue::new);
108+
timeout = in.readOptionalTimeValue();
109109
waitForCompletion = in.readBoolean();
110110
}
111111

112112
@Override
113113
public void writeTo(StreamOutput out) throws IOException {
114114
super.writeTo(out);
115115
taskId.writeTo(out);
116-
out.writeOptionalWriteable(timeout);
116+
out.writeOptionalTimeValue(timeout);
117117
out.writeBoolean(waitForCompletion);
118118
}
119119
}

server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ public void readFrom(StreamInput in) throws IOException {
588588
requests.add(DocWriteRequest.readDocumentRequest(in));
589589
}
590590
refreshPolicy = RefreshPolicy.readFrom(in);
591-
timeout = new TimeValue(in);
591+
timeout = in.readTimeValue();
592592
}
593593

594594
@Override
@@ -600,7 +600,7 @@ public void writeTo(StreamOutput out) throws IOException {
600600
DocWriteRequest.writeDocumentRequest(out, request);
601601
}
602602
refreshPolicy.writeTo(out);
603-
timeout.writeTo(out);
603+
out.writeTimeValue(timeout);
604604
}
605605

606606
@Override

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ public void readFrom(StreamInput in) throws IOException {
511511
}
512512
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
513513
in.readOptionalString(); // timestamp
514-
in.readOptionalWriteable(TimeValue::new); // ttl
514+
in.readOptionalTimeValue(); // ttl
515515
}
516516
source = in.readBytesReference();
517517
opType = OpType.fromId(in.readByte());

server/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,13 @@ public TimeValue ackTimeout() {
7878
@Override
7979
public void readFrom(StreamInput in) throws IOException {
8080
super.readFrom(in);
81-
timeout = new TimeValue(in);
81+
timeout = in.readTimeValue();
8282
}
8383

8484
@Override
8585
public void writeTo(StreamOutput out) throws IOException {
8686
super.writeTo(out);
87-
timeout.writeTo(out);
87+
out.writeTimeValue(timeout);
8888
}
8989

9090
}

server/src/main/java/org/elasticsearch/action/support/master/MasterNodeRequest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@ protected MasterNodeRequest() {
4040

4141
protected MasterNodeRequest(StreamInput in) throws IOException {
4242
super(in);
43-
masterNodeTimeout = new TimeValue(in);
43+
masterNodeTimeout = in.readTimeValue();
4444
}
4545

4646
@Override
4747
public void writeTo(StreamOutput out) throws IOException {
4848
super.writeTo(out);
49-
masterNodeTimeout.writeTo(out);
49+
out.writeTimeValue(masterNodeTimeout);
5050
}
5151

5252
/**
@@ -74,6 +74,6 @@ public void readFrom(StreamInput in) throws IOException {
7474
// TODO(talevy): throw exception once all MasterNodeRequest
7575
// subclasses have been migrated to Writeable Readers
7676
super.readFrom(in);
77-
masterNodeTimeout = new TimeValue(in);
77+
masterNodeTimeout = in.readTimeValue();
7878
}
7979
}

server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,14 @@ public void readFrom(StreamInput in) throws IOException {
107107
super.readFrom(in);
108108
nodesIds = in.readStringArray();
109109
concreteNodes = in.readOptionalArray(DiscoveryNode::new, DiscoveryNode[]::new);
110-
timeout = in.readOptionalWriteable(TimeValue::new);
110+
timeout = in.readOptionalTimeValue();
111111
}
112112

113113
@Override
114114
public void writeTo(StreamOutput out) throws IOException {
115115
super.writeTo(out);
116116
out.writeStringArrayNullable(nodesIds);
117117
out.writeOptionalArray(concreteNodes);
118-
out.writeOptionalWriteable(timeout);
118+
out.writeOptionalTimeValue(timeout);
119119
}
120120
}

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ public void readFrom(StreamInput in) throws IOException {
187187
shardId = null;
188188
}
189189
waitForActiveShards = ActiveShardCount.readFrom(in);
190-
timeout = new TimeValue(in);
190+
timeout = in.readTimeValue();
191191
index = in.readString();
192192
routedBasedOnClusterVersion = in.readVLong();
193193
}
@@ -202,7 +202,7 @@ public void writeTo(StreamOutput out) throws IOException {
202202
out.writeBoolean(false);
203203
}
204204
waitForActiveShards.writeTo(out);
205-
timeout.writeTo(out);
205+
out.writeTimeValue(timeout);
206206
out.writeString(index);
207207
out.writeVLong(routedBasedOnClusterVersion);
208208
}

server/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public void readFrom(StreamInput in) throws IOException {
118118
} else {
119119
shardId = null;
120120
}
121-
timeout = new TimeValue(in);
121+
timeout = in.readTimeValue();
122122
concreteIndex = in.readOptionalString();
123123
}
124124

@@ -127,7 +127,7 @@ public void writeTo(StreamOutput out) throws IOException {
127127
super.writeTo(out);
128128
out.writeString(index);
129129
out.writeOptionalStreamable(shardId);
130-
timeout.writeTo(out);
130+
out.writeTimeValue(timeout);
131131
out.writeOptionalString(concreteIndex);
132132
}
133133

server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public void readFrom(StreamInput in) throws IOException {
144144
parentTaskId = TaskId.readFromStream(in);
145145
nodes = in.readStringArray();
146146
actions = in.readStringArray();
147-
timeout = in.readOptionalWriteable(TimeValue::new);
147+
timeout = in.readOptionalTimeValue();
148148
}
149149

150150
@Override
@@ -154,7 +154,7 @@ public void writeTo(StreamOutput out) throws IOException {
154154
parentTaskId.writeTo(out);
155155
out.writeStringArrayNullable(nodes);
156156
out.writeStringArrayNullable(actions);
157-
out.writeOptionalWriteable(timeout);
157+
out.writeOptionalTimeValue(timeout);
158158
}
159159

160160
public boolean match(Task task) {

server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.common.bytes.BytesReference;
3737
import org.elasticsearch.common.geo.GeoPoint;
3838
import org.elasticsearch.common.text.Text;
39+
import org.elasticsearch.common.unit.TimeValue;
3940
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
4041
import org.joda.time.DateTime;
4142
import org.joda.time.DateTimeZone;
@@ -65,6 +66,7 @@
6566
import java.util.List;
6667
import java.util.Locale;
6768
import java.util.Map;
69+
import java.util.concurrent.TimeUnit;
6870
import java.util.function.IntFunction;
6971
import java.util.function.Supplier;
7072

@@ -82,6 +84,26 @@
8284
* on {@link StreamInput}.
8385
*/
8486
public abstract class StreamInput extends InputStream {
87+
88+
private static final Map<Byte, TimeUnit> BYTE_TIME_UNIT_MAP;
89+
90+
static {
91+
final Map<Byte, TimeUnit> byteTimeUnitMap = new HashMap<>();
92+
byteTimeUnitMap.put((byte)0, TimeUnit.NANOSECONDS);
93+
byteTimeUnitMap.put((byte)1, TimeUnit.MICROSECONDS);
94+
byteTimeUnitMap.put((byte)2, TimeUnit.MILLISECONDS);
95+
byteTimeUnitMap.put((byte)3, TimeUnit.SECONDS);
96+
byteTimeUnitMap.put((byte)4, TimeUnit.MINUTES);
97+
byteTimeUnitMap.put((byte)5, TimeUnit.HOURS);
98+
byteTimeUnitMap.put((byte)6, TimeUnit.DAYS);
99+
100+
for (TimeUnit value : TimeUnit.values()) {
101+
assert byteTimeUnitMap.containsValue(value) : value;
102+
}
103+
104+
BYTE_TIME_UNIT_MAP = Collections.unmodifiableMap(byteTimeUnitMap);
105+
}
106+
85107
private Version version = Version.CURRENT;
86108

87109
/**
@@ -971,4 +993,24 @@ private int readArraySize() throws IOException {
971993
* be a no-op depending on the underlying implementation if the information of the remaining bytes is not present.
972994
*/
973995
protected abstract void ensureCanReadBytes(int length) throws EOFException;
996+
997+
/**
998+
* Read a {@link TimeValue} from the stream
999+
*/
1000+
public TimeValue readTimeValue() throws IOException {
1001+
long duration = readZLong();
1002+
TimeUnit timeUnit = BYTE_TIME_UNIT_MAP.get(readByte());
1003+
return new TimeValue(duration, timeUnit);
1004+
}
1005+
1006+
/**
1007+
* Read an optional {@link TimeValue} from the stream, returning null if no TimeValue was written.
1008+
*/
1009+
public @Nullable TimeValue readOptionalTimeValue() throws IOException {
1010+
if (readBoolean()) {
1011+
return readTimeValue();
1012+
} else {
1013+
return null;
1014+
}
1015+
}
9741016
}

server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.common.geo.GeoPoint;
3636
import org.elasticsearch.common.io.stream.Writeable.Writer;
3737
import org.elasticsearch.common.text.Text;
38+
import org.elasticsearch.common.unit.TimeValue;
3839
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
3940
import org.joda.time.DateTimeZone;
4041
import org.joda.time.ReadableInstant;
@@ -54,11 +55,13 @@
5455
import java.time.ZonedDateTime;
5556
import java.util.Collections;
5657
import java.util.Date;
58+
import java.util.EnumMap;
5759
import java.util.HashMap;
5860
import java.util.Iterator;
5961
import java.util.LinkedHashMap;
6062
import java.util.List;
6163
import java.util.Map;
64+
import java.util.concurrent.TimeUnit;
6265
import java.util.function.IntFunction;
6366

6467
/**
@@ -74,6 +77,25 @@
7477
*/
7578
public abstract class StreamOutput extends OutputStream {
7679

80+
private static final Map<TimeUnit, Byte> TIME_UNIT_BYTE_MAP;
81+
82+
static {
83+
final Map<TimeUnit, Byte> timeUnitByteMap = new EnumMap<>(TimeUnit.class);
84+
timeUnitByteMap.put(TimeUnit.NANOSECONDS, (byte)0);
85+
timeUnitByteMap.put(TimeUnit.MICROSECONDS, (byte)1);
86+
timeUnitByteMap.put(TimeUnit.MILLISECONDS, (byte)2);
87+
timeUnitByteMap.put(TimeUnit.SECONDS, (byte)3);
88+
timeUnitByteMap.put(TimeUnit.MINUTES, (byte)4);
89+
timeUnitByteMap.put(TimeUnit.HOURS, (byte)5);
90+
timeUnitByteMap.put(TimeUnit.DAYS, (byte)6);
91+
92+
for (TimeUnit value : TimeUnit.values()) {
93+
assert timeUnitByteMap.containsKey(value) : value;
94+
}
95+
96+
TIME_UNIT_BYTE_MAP = Collections.unmodifiableMap(timeUnitByteMap);
97+
}
98+
7799
private Version version = Version.CURRENT;
78100

79101
/**
@@ -973,4 +995,24 @@ public <E extends Enum<E>> void writeEnum(E enumValue) throws IOException {
973995
writeVInt(enumValue.ordinal());
974996
}
975997

998+
/**
999+
* Write a {@link TimeValue} to the stream
1000+
*/
1001+
public void writeTimeValue(TimeValue timeValue) throws IOException {
1002+
writeZLong(timeValue.duration());
1003+
writeByte(TIME_UNIT_BYTE_MAP.get(timeValue.timeUnit()));
1004+
}
1005+
1006+
/**
1007+
* Write an optional {@link TimeValue} to the stream.
1008+
*/
1009+
public void writeOptionalTimeValue(@Nullable TimeValue timeValue) throws IOException {
1010+
if (timeValue == null) {
1011+
writeBoolean(false);
1012+
} else {
1013+
writeBoolean(true);
1014+
writeTimeValue(timeValue);
1015+
}
1016+
}
1017+
9761018
}

0 commit comments

Comments
 (0)