Skip to content

Commit 2fa3b81

Browse files
committed
[Java] Update consumer position timestamp even if it has not moved but producer is not in a potentially blocked position. Issue #377.
1 parent 1cff0ce commit 2fa3b81

File tree

3 files changed

+10
-19
lines changed

3 files changed

+10
-19
lines changed

aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1070,8 +1070,8 @@ private IpcPublication addIpcPublication(
10701070
PublisherLimit.allocate(countersManager, registrationId, sessionId, streamId, channel),
10711071
rawLog,
10721072
publicationUnblockTimeoutNs,
1073+
nanoClock.nanoTime(),
10731074
context.systemCounters(),
1074-
nanoClock,
10751075
isExclusive);
10761076

10771077
ipcPublications.add(publication);

aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java

+7-15
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import io.aeron.logbuffer.LogBufferDescriptor;
2121
import io.aeron.logbuffer.LogBufferUnblocker;
2222
import org.agrona.collections.ArrayUtil;
23-
import org.agrona.concurrent.NanoClock;
2423
import org.agrona.concurrent.UnsafeBuffer;
2524
import org.agrona.concurrent.status.AtomicCounter;
2625
import org.agrona.concurrent.status.Position;
@@ -54,7 +53,7 @@ enum Status
5453
private long tripLimit;
5554
private long consumerPosition;
5655
private long lastConsumerPosition;
57-
private long timeOfLastConsumerPositionChangeNs;
56+
private long timeOfLastConsumerPositionUpdateNs;
5857
private long cleanPosition;
5958
private long timeOfLastStatusChangeNs;
6059
private int refCount = 0;
@@ -65,7 +64,6 @@ enum Status
6564
private ReadablePosition[] subscriberPositions = EMPTY_POSITIONS;
6665
private final Position publisherLimit;
6766
private final UnsafeBuffer metaDataBuffer;
68-
private final NanoClock nanoClock;
6967
private final RawLog rawLog;
7068
private final AtomicCounter unblockedPublications;
7169

@@ -76,8 +74,8 @@ public IpcPublication(
7674
final Position publisherLimit,
7775
final RawLog rawLog,
7876
final long unblockTimeoutNs,
77+
final long nowNs,
7978
final SystemCounters systemCounters,
80-
final NanoClock nanoClock,
8179
final boolean isExclusive)
8280
{
8381
this.registrationId = registrationId;
@@ -86,7 +84,6 @@ public IpcPublication(
8684
this.isExclusive = isExclusive;
8785
this.termBuffers = rawLog.termBuffers();
8886
this.initialTermId = initialTermId(rawLog.metaData());
89-
this.nanoClock = nanoClock;
9087

9188
final int termLength = rawLog.termLength();
9289
this.termBufferLength = termLength;
@@ -102,7 +99,7 @@ public IpcPublication(
10299
consumerPosition = producerPosition();
103100
lastConsumerPosition = consumerPosition;
104101
cleanPosition = consumerPosition;
105-
timeOfLastConsumerPositionChangeNs = nanoClock.nanoTime();
102+
timeOfLastConsumerPositionUpdateNs = nowNs;
106103
}
107104

108105
public int sessionId()
@@ -148,11 +145,6 @@ public void close()
148145

149146
public void addSubscriber(final ReadablePosition subscriberPosition)
150147
{
151-
if (subscriberPositions.length == 0)
152-
{
153-
timeOfLastConsumerPositionChangeNs = nanoClock.nanoTime();
154-
}
155-
156148
subscriberPositions = ArrayUtil.add(subscriberPositions, subscriberPosition);
157149
}
158150

@@ -326,10 +318,10 @@ private boolean isDrained()
326318

327319
private void checkForBlockedPublisher(final long timeNs)
328320
{
329-
if (consumerPosition == lastConsumerPosition)
321+
final long consumerPosition = this.consumerPosition;
322+
if (consumerPosition == lastConsumerPosition && producerPosition() > consumerPosition)
330323
{
331-
if (timeNs > (timeOfLastConsumerPositionChangeNs + unblockTimeoutNs) &&
332-
producerPosition() > consumerPosition)
324+
if (timeNs > (timeOfLastConsumerPositionUpdateNs + unblockTimeoutNs))
333325
{
334326
if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, consumerPosition))
335327
{
@@ -339,7 +331,7 @@ private void checkForBlockedPublisher(final long timeNs)
339331
}
340332
else
341333
{
342-
timeOfLastConsumerPositionChangeNs = timeNs;
334+
timeOfLastConsumerPositionUpdateNs = timeNs;
343335
lastConsumerPosition = consumerPosition;
344336
}
345337
}

aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -564,10 +564,9 @@ private void cleanBuffer(final long publisherLimit)
564564

565565
private void checkForBlockedPublisher(final long timeNs, final long senderPosition)
566566
{
567-
if (senderPosition == lastSenderPosition)
567+
if (senderPosition == lastSenderPosition && producerPosition() > senderPosition)
568568
{
569-
if (timeNs > (timeOfLastActivityNs + unblockTimeoutNs) &&
570-
producerPosition() > senderPosition)
569+
if (timeNs > (timeOfLastActivityNs + unblockTimeoutNs))
571570
{
572571
if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, senderPosition))
573572
{

0 commit comments

Comments
 (0)