Skip to content

Commit 1cff0ce

Browse files
committed
[Java] Set the time of last consumer position change to be construction of the publication and on transition from 0 to 1 subscribers for IPC. This should avoid the publisher getting unblocked unexpectedly. Issue #377.
1 parent 39fa50d commit 1cff0ce

File tree

3 files changed

+28
-16
lines changed

3 files changed

+28
-16
lines changed

aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java

+1
Original file line numberDiff line numberDiff line change
@@ -1071,6 +1071,7 @@ private IpcPublication addIpcPublication(
10711071
rawLog,
10721072
publicationUnblockTimeoutNs,
10731073
context.systemCounters(),
1074+
nanoClock,
10741075
isExclusive);
10751076

10761077
ipcPublications.add(publication);

aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java

+24-14
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.aeron.logbuffer.LogBufferDescriptor;
2121
import io.aeron.logbuffer.LogBufferUnblocker;
2222
import org.agrona.collections.ArrayUtil;
23+
import org.agrona.concurrent.NanoClock;
2324
import org.agrona.concurrent.UnsafeBuffer;
2425
import org.agrona.concurrent.status.AtomicCounter;
2526
import org.agrona.concurrent.status.Position;
@@ -50,12 +51,12 @@ enum Status
5051
private final int termWindowLength;
5152
private final int positionBitsToShift;
5253
private final int initialTermId;
53-
private long tripLimit = 0;
54-
private long consumerPosition = 0;
55-
private long lastConsumerPosition = 0;
56-
private long timeOfLastConsumerPositionChange = 0;
57-
private long cleanPosition = 0;
58-
private long timeOfLastStatusChange = 0;
54+
private long tripLimit;
55+
private long consumerPosition;
56+
private long lastConsumerPosition;
57+
private long timeOfLastConsumerPositionChangeNs;
58+
private long cleanPosition;
59+
private long timeOfLastStatusChangeNs;
5960
private int refCount = 0;
6061
private boolean reachedEndOfLife = false;
6162
private final boolean isExclusive;
@@ -64,6 +65,7 @@ enum Status
6465
private ReadablePosition[] subscriberPositions = EMPTY_POSITIONS;
6566
private final Position publisherLimit;
6667
private final UnsafeBuffer metaDataBuffer;
68+
private final NanoClock nanoClock;
6769
private final RawLog rawLog;
6870
private final AtomicCounter unblockedPublications;
6971

@@ -75,6 +77,7 @@ public IpcPublication(
7577
final RawLog rawLog,
7678
final long unblockTimeoutNs,
7779
final SystemCounters systemCounters,
80+
final NanoClock nanoClock,
7881
final boolean isExclusive)
7982
{
8083
this.registrationId = registrationId;
@@ -83,6 +86,7 @@ public IpcPublication(
8386
this.isExclusive = isExclusive;
8487
this.termBuffers = rawLog.termBuffers();
8588
this.initialTermId = initialTermId(rawLog.metaData());
89+
this.nanoClock = nanoClock;
8690

8791
final int termLength = rawLog.termLength();
8892
this.termBufferLength = termLength;
@@ -98,6 +102,7 @@ public IpcPublication(
98102
consumerPosition = producerPosition();
99103
lastConsumerPosition = consumerPosition;
100104
cleanPosition = consumerPosition;
105+
timeOfLastConsumerPositionChangeNs = nanoClock.nanoTime();
101106
}
102107

103108
public int sessionId()
@@ -143,6 +148,11 @@ public void close()
143148

144149
public void addSubscriber(final ReadablePosition subscriberPosition)
145150
{
151+
if (subscriberPositions.length == 0)
152+
{
153+
timeOfLastConsumerPositionChangeNs = nanoClock.nanoTime();
154+
}
155+
146156
subscriberPositions = ArrayUtil.add(subscriberPositions, subscriberPosition);
147157
}
148158

@@ -236,13 +246,13 @@ public void onTimeEvent(final long timeNs, final long timeMs, final DriverConduc
236246
if (isDrained())
237247
{
238248
status = Status.LINGER;
239-
timeOfLastStatusChange = timeNs;
249+
timeOfLastStatusChangeNs = timeNs;
240250
conductor.transitionToLinger(this);
241251
}
242252
break;
243253

244254
case LINGER:
245-
if (timeNs > (timeOfLastStatusChange + PUBLICATION_LINGER_NS))
255+
if (timeNs > (timeOfLastStatusChangeNs + PUBLICATION_LINGER_NS))
246256
{
247257
reachedEndOfLife = true;
248258
conductor.cleanupIpcPublication(this);
@@ -256,14 +266,14 @@ public boolean hasReachedEndOfLife()
256266
return reachedEndOfLife;
257267
}
258268

259-
public void timeOfLastStateChange(final long time)
269+
public void timeOfLastStateChange(final long timeNs)
260270
{
261-
timeOfLastStatusChange = time;
271+
timeOfLastStatusChangeNs = timeNs;
262272
}
263273

264274
public long timeOfLastStateChange()
265275
{
266-
return timeOfLastStatusChange;
276+
return timeOfLastStatusChangeNs;
267277
}
268278

269279
public void delete()
@@ -318,8 +328,8 @@ private void checkForBlockedPublisher(final long timeNs)
318328
{
319329
if (consumerPosition == lastConsumerPosition)
320330
{
321-
if (producerPosition() > consumerPosition &&
322-
timeNs > (timeOfLastConsumerPositionChange + unblockTimeoutNs))
331+
if (timeNs > (timeOfLastConsumerPositionChangeNs + unblockTimeoutNs) &&
332+
producerPosition() > consumerPosition)
323333
{
324334
if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, consumerPosition))
325335
{
@@ -329,7 +339,7 @@ private void checkForBlockedPublisher(final long timeNs)
329339
}
330340
else
331341
{
332-
timeOfLastConsumerPositionChange = timeNs;
342+
timeOfLastConsumerPositionChangeNs = timeNs;
333343
lastConsumerPosition = consumerPosition;
334344
}
335345
}

aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ public NetworkPublication(
198198

199199
lastSenderPosition = senderPosition.get();
200200
cleanPosition = lastSenderPosition;
201+
timeOfLastActivityNs = nowNs;
201202
}
202203

203204
public void close()
@@ -565,8 +566,8 @@ private void checkForBlockedPublisher(final long timeNs, final long senderPositi
565566
{
566567
if (senderPosition == lastSenderPosition)
567568
{
568-
if (producerPosition() > senderPosition &&
569-
timeNs > (timeOfLastActivityNs + unblockTimeoutNs))
569+
if (timeNs > (timeOfLastActivityNs + unblockTimeoutNs) &&
570+
producerPosition() > senderPosition)
570571
{
571572
if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, senderPosition))
572573
{

0 commit comments

Comments
 (0)