Skip to content

Commit ac69732

Browse files
committed
[Java] Tidy up after merge of PR #684.
1 parent 28b4fcd commit ac69732

File tree

4 files changed

+53
-37
lines changed

4 files changed

+53
-37
lines changed

aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -139,20 +139,20 @@ public static State get(final AtomicCounter counter)
139139
}
140140

141141
/**
142-
* Get the {@link State} corresponding to a particular value.
142+
* Get the {@link State} corresponding to a particular code.
143143
*
144-
* @param value of the State.
145-
* @return the {@link State} corresponding to the provided value.
146-
* @throws ClusterException if the value does not correspond to a valid State.
144+
* @param code representing a {@link State}.
145+
* @return the {@link State} corresponding to the provided code.
146+
* @throws ClusterException if the code does not correspond to a valid State.
147147
*/
148-
public static State get(final int value)
148+
public static State get(final int code)
149149
{
150-
if (value < 0 || value > (STATES.length - 1))
150+
if (code < 0 || code > (STATES.length - 1))
151151
{
152-
throw new ClusterException("invalid state counter code: " + value);
152+
throw new ClusterException("invalid state counter code: " + code);
153153
}
154154

155-
return STATES[value];
155+
return STATES[code];
156156
}
157157
}
158158

aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModuleAgent.java

-1
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,6 @@ public void onSnapshotRecordings(final long correlationId, final SnapshotRecordi
615615
}
616616
}
617617

618-
@SuppressWarnings("unused")
619618
public void onJoinCluster(final long leadershipTermId, final int memberId)
620619
{
621620
final ClusterMember member = clusterMemberByIdMap.get(memberId);

aeron-samples/src/main/java/io/aeron/samples/AeronStat.java

+7-13
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,7 @@ public static void main(final String[] args) throws Exception
160160
final CncFileReader cncFileReader = CncFileReader.map();
161161

162162
final CounterFilter counterFilter = new CounterFilter(
163-
typeFilter,
164-
identityFilter,
165-
sessionFilter,
166-
streamFilter,
167-
channelFilter);
163+
typeFilter, identityFilter, sessionFilter, streamFilter, channelFilter);
168164

169165
if (watch)
170166
{
@@ -176,17 +172,15 @@ public static void main(final String[] args) throws Exception
176172
}
177173
}
178174

179-
private static void workLoop(final long delayMs, final Runnable printOutput) throws Exception
175+
private static void workLoop(final long delayMs, final Runnable outputPrinter) throws Exception
180176
{
181177
final AtomicBoolean running = new AtomicBoolean(true);
182178
SigInt.register(() -> running.set(false));
183179

184180
do
185181
{
186182
clearScreen();
187-
188-
printOutput.run();
189-
183+
outputPrinter.run();
190184
Thread.sleep(delayMs);
191185
}
192186
while (running.get());
@@ -200,7 +194,7 @@ private static void printOutput(final CncFileReader cncFileReader, final Counter
200194
System.out.println(
201195
" - Aeron Stat (CnC v" + cncFileReader.semanticVersion() + ")" +
202196
", pid " + SystemUtil.getPid() +
203-
", heartbeat " + cncFileReader.driverHeartbeatAge() + "ms");
197+
", heartbeat age " + cncFileReader.driverHeartbeatAgeMs() + "ms");
204198
System.out.println("======================================================================");
205199

206200
final CountersReader counters = cncFileReader.countersReader();
@@ -296,9 +290,9 @@ else if ((typeId >= PUBLISHER_LIMIT_TYPE_ID && typeId <= RECEIVER_POS_TYPE_ID) |
296290
{
297291
return
298292
match(identityFilter, () -> Long.toString(keyBuffer.getLong(REGISTRATION_ID_OFFSET))) &&
299-
match(sessionFilter, () -> Integer.toString(keyBuffer.getInt(SESSION_ID_OFFSET))) &&
300-
match(streamFilter, () -> Integer.toString(keyBuffer.getInt(STREAM_ID_OFFSET))) &&
301-
match(channelFilter, () -> keyBuffer.getStringAscii(CHANNEL_OFFSET));
293+
match(sessionFilter, () -> Integer.toString(keyBuffer.getInt(SESSION_ID_OFFSET))) &&
294+
match(streamFilter, () -> Integer.toString(keyBuffer.getInt(STREAM_ID_OFFSET))) &&
295+
match(channelFilter, () -> keyBuffer.getStringAscii(CHANNEL_OFFSET));
302296
}
303297
else if (typeId >= SEND_CHANNEL_STATUS_TYPE_ID && typeId <= RECEIVE_CHANNEL_STATUS_TYPE_ID)
304298
{

aeron-samples/src/main/java/io/aeron/samples/CncFileReader.java

+38-15
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/*
2+
* Copyright 2014-2019 Real Logic Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.aeron.samples;
217

318
import io.aeron.CncFileDescriptor;
@@ -6,7 +21,8 @@
621
import org.agrona.DirectBuffer;
722
import org.agrona.IoUtil;
823
import org.agrona.SemanticVersion;
9-
import org.agrona.concurrent.ringbuffer.ManyToOneRingBuffer;
24+
import org.agrona.concurrent.UnsafeBuffer;
25+
import org.agrona.concurrent.ringbuffer.RingBufferDescriptor;
1026
import org.agrona.concurrent.status.CountersReader;
1127

1228
import java.io.File;
@@ -16,15 +32,16 @@
1632
import static io.aeron.samples.SamplesUtil.mapExistingFileReadOnly;
1733

1834
/**
19-
* A utility class for interpreting the cnc file.
35+
* Reader for Aeron CnC file represented by {@link CncFileDescriptor} which can be used for observability.
2036
*/
2137
public final class CncFileReader implements AutoCloseable
2238
{
23-
private final MappedByteBuffer cncByteBuffer;
39+
private boolean isClosed = false;
2440
private final int cncVersion;
25-
private final CountersReader countersReader;
26-
private final ManyToOneRingBuffer toDriverBuffer;
2741
private final String cncSemanticVersion;
42+
private final MappedByteBuffer cncByteBuffer;
43+
private final CountersReader countersReader;
44+
private final UnsafeBuffer toDriverBuffer;
2845

2946
private CncFileReader(final MappedByteBuffer cncByteBuffer)
3047
{
@@ -46,8 +63,7 @@ private CncFileReader(final MappedByteBuffer cncByteBuffer)
4663
this.cncVersion = cncVersion;
4764
this.cncSemanticVersion = SemanticVersion.toString(cncVersion);
4865

49-
this.toDriverBuffer = new ManyToOneRingBuffer(
50-
CncFileDescriptor.createToDriverBuffer(cncByteBuffer, cncMetaDataBuffer));
66+
this.toDriverBuffer = CncFileDescriptor.createToDriverBuffer(cncByteBuffer, cncMetaDataBuffer);
5167

5268
this.countersReader = new CountersReader(
5369
createCountersMetaDataBuffer(cncByteBuffer, cncMetaDataBuffer),
@@ -64,6 +80,7 @@ public static CncFileReader map()
6480
{
6581
final File cncFile = CommonContext.newDefaultCncFile();
6682
final MappedByteBuffer cncByteBuffer = mapExistingFileReadOnly(cncFile);
83+
6784
return new CncFileReader(cncByteBuffer);
6885
}
6986

@@ -98,28 +115,34 @@ public CountersReader countersReader()
98115
}
99116

100117
/**
101-
* Get the timestamp (ms) of the last driver heartbeat.
118+
* Get the epoch timestamp (ms) of the last driver heartbeat.
102119
*
103-
* @return the timestamp (ms) of the last driver heartbeat.
120+
* @return the epoch timestamp (ms) of the last driver heartbeat.
104121
*/
105-
public long driverHeartbeat()
122+
public long driverHeartbeatMs()
106123
{
107-
return toDriverBuffer.consumerHeartbeatTime();
124+
final int timestampOffset = (toDriverBuffer.capacity() - RingBufferDescriptor.TRAILER_LENGTH) +
125+
RingBufferDescriptor.CONSUMER_HEARTBEAT_OFFSET;
126+
127+
return toDriverBuffer.getLongVolatile(timestampOffset);
108128
}
109129

110130
/**
111131
* Get the number of milliseconds since the last driver heartbeat.
112132
*
113133
* @return the number of milliseconds since the last driver heartbeat.
114134
*/
115-
public long driverHeartbeatAge()
135+
public long driverHeartbeatAgeMs()
116136
{
117-
return System.currentTimeMillis() - driverHeartbeat();
137+
return System.currentTimeMillis() - driverHeartbeatMs();
118138
}
119139

120-
@Override
121140
public void close()
122141
{
123-
IoUtil.unmap(cncByteBuffer);
142+
if (!isClosed)
143+
{
144+
isClosed = true;
145+
IoUtil.unmap(cncByteBuffer);
146+
}
124147
}
125148
}

0 commit comments

Comments
 (0)