Skip to content

Commit 7a4722b

Browse files
authored
Merge pull request #740 from wlukowicz/fix-archive-instrumentation
[Java] Fix archive instrumentation.
2 parents 7a70718 + df4731b commit 7a4722b

File tree

2 files changed

+121
-1
lines changed

2 files changed

+121
-1
lines changed

aeron-agent/src/main/java/io/aeron/agent/EventLogAgent.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ private static AgentBuilder addDriverInstrumentation(final AgentBuilder agentBui
226226
private static AgentBuilder addArchiveInstrumentation(final AgentBuilder agentBuilder)
227227
{
228228
return agentBuilder
229-
.type(nameEndsWith("ControlRequestAdapter"))
229+
.type(nameEndsWith("ControlSessionDemuxer"))
230230
.transform(((builder, typeDescription, classLoader, module) -> builder
231231
.visit(to(ControlRequestInterceptor.ControlRequest.class)
232232
.on(named("onFragment")))));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package io.aeron.agent;
2+
3+
import static io.aeron.agent.EventConfiguration.EVENT_READER_FRAME_LIMIT;
4+
import static io.aeron.agent.EventConfiguration.EVENT_RING_BUFFER;
5+
6+
import io.aeron.archive.Archive;
7+
import io.aeron.archive.ArchiveThreadingMode;
8+
import io.aeron.archive.ArchivingMediaDriver;
9+
import io.aeron.archive.client.AeronArchive;
10+
import io.aeron.driver.MediaDriver;
11+
import io.aeron.driver.ThreadingMode;
12+
13+
import java.io.File;
14+
import java.nio.file.Paths;
15+
import java.util.concurrent.CountDownLatch;
16+
17+
import net.bytebuddy.agent.ByteBuddyAgent;
18+
import org.agrona.CloseHelper;
19+
import org.agrona.IoUtil;
20+
import org.agrona.MutableDirectBuffer;
21+
import org.agrona.concurrent.Agent;
22+
import org.agrona.concurrent.MessageHandler;
23+
import org.junit.After;
24+
import org.junit.Before;
25+
import org.junit.Test;
26+
27+
public class ArchiveLoggingAgentTest
28+
{
29+
private static final CountDownLatch LATCH = new CountDownLatch(1);
30+
31+
private String testDirName;
32+
private ArchivingMediaDriver archivingMediaDriver;
33+
private AeronArchive aeronArchive;
34+
35+
@Before
36+
public void before()
37+
{
38+
System.setProperty(EventConfiguration.ENABLED_ARCHIVE_EVENT_CODES_PROP_NAME, "all");
39+
System.setProperty(EventLogAgent.READER_CLASSNAME_PROP_NAME, StubEventLogReaderAgent.class.getName());
40+
EventLogAgent.agentmain("", ByteBuddyAgent.install());
41+
}
42+
43+
@After
44+
public void after()
45+
{
46+
EventLogAgent.removeTransformer();
47+
System.clearProperty(EventConfiguration.ENABLED_ARCHIVE_EVENT_CODES_PROP_NAME);
48+
System.clearProperty(EventLogAgent.READER_CLASSNAME_PROP_NAME);
49+
50+
CloseHelper.close(aeronArchive);
51+
CloseHelper.close(archivingMediaDriver);
52+
53+
if (testDirName != null)
54+
{
55+
IoUtil.delete(new File(testDirName), false);
56+
}
57+
}
58+
59+
@Test(timeout = 10_000L)
60+
public void shouldLogMessages() throws Exception
61+
{
62+
testDirName = Paths.get(IoUtil.tmpDirName(), "archive-test").toString();
63+
final File testDir = new File(testDirName);
64+
if (testDir.exists())
65+
{
66+
IoUtil.delete(testDir, false);
67+
}
68+
69+
final String aeronDirectoryName = Paths.get(testDirName, "media").toString();
70+
71+
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
72+
.errorHandler(Throwable::printStackTrace)
73+
.aeronDirectoryName(aeronDirectoryName)
74+
.threadingMode(ThreadingMode.SHARED);
75+
76+
final AeronArchive.Context aeronArchiveContext = new AeronArchive.Context()
77+
.aeronDirectoryName(aeronDirectoryName)
78+
.controlRequestChannel("aeron:udp?term-length=64k|endpoint=localhost:8010")
79+
.controlRequestStreamId(100)
80+
.controlResponseChannel("aeron:udp?term-length=64k|endpoint=localhost:8020")
81+
.controlResponseStreamId(101)
82+
.recordingEventsChannel("aeron:udp?control-mode=dynamic|control=localhost:8030");
83+
84+
final Archive.Context archiveCtx = new Archive.Context()
85+
.aeronDirectoryName(aeronDirectoryName)
86+
.errorHandler(Throwable::printStackTrace)
87+
.archiveDir(new File(testDirName, "archive"))
88+
.controlChannel(aeronArchiveContext.controlRequestChannel())
89+
.controlStreamId(aeronArchiveContext.controlRequestStreamId())
90+
.localControlStreamId(aeronArchiveContext.controlRequestStreamId())
91+
.recordingEventsChannel(aeronArchiveContext.recordingEventsChannel())
92+
.threadingMode(ArchiveThreadingMode.SHARED);
93+
94+
archivingMediaDriver = ArchivingMediaDriver.launch(mediaDriverCtx, archiveCtx);
95+
aeronArchive = AeronArchive.connect(aeronArchiveContext);
96+
97+
LATCH.await();
98+
}
99+
100+
public static class StubEventLogReaderAgent implements Agent, MessageHandler
101+
{
102+
public String roleName()
103+
{
104+
return "event-log-reader";
105+
}
106+
107+
public int doWork()
108+
{
109+
return EVENT_RING_BUFFER.read(this, EVENT_READER_FRAME_LIMIT);
110+
}
111+
112+
public void onMessage(final int msgTypeId, final MutableDirectBuffer buffer, final int index, final int length)
113+
{
114+
if (ArchiveEventLogger.toEventCodeId(ArchiveEventCode.CMD_IN_CONNECT) == msgTypeId)
115+
{
116+
LATCH.countDown();
117+
}
118+
}
119+
}
120+
}

0 commit comments

Comments
 (0)