Skip to content

Commit 5d04552

Browse files
authored
Update MultiLangDaemon to Support New Features (#118)
Updated the MultiLangDaemon to use the v2 record processor interfaces, and added features to messages passed to MultiLangDaemon clients. These changes will require updates to the various MultiLangDaemon clients. The changes for the Python version are complete, and other versions will be updated later.
1 parent ed7d069 commit 5d04552

18 files changed

+476
-362
lines changed

Diff for: pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,13 @@
6363
<version>2.6</version>
6464
</dependency>
6565

66+
<dependency>
67+
<groupId>org.projectlombok</groupId>
68+
<artifactId>lombok</artifactId>
69+
<version>1.16.10</version>
70+
<scope>provided</scope>
71+
</dependency>
72+
6673
<!-- Test -->
6774
<dependency>
6875
<groupId>junit</groupId>

Diff for: src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public class KinesisClientLibConfiguration {
120120
/**
121121
* User agent set when Amazon Kinesis Client Library makes AWS requests.
122122
*/
123-
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.1-SNAPSHOT";
123+
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.7.2";
124124

125125
/**
126126
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls

Diff for: src/main/java/com/amazonaws/services/kinesis/multilang/MessageWriter.java

+19-12
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.io.IOException;
1919
import java.io.OutputStream;
2020
import java.io.OutputStreamWriter;
21-
import java.util.List;
2221
import java.util.concurrent.Callable;
2322
import java.util.concurrent.ExecutorService;
2423
import java.util.concurrent.Future;
@@ -27,7 +26,8 @@
2726
import org.apache.commons.logging.LogFactory;
2827

2928
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
30-
import com.amazonaws.services.kinesis.model.Record;
29+
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
30+
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
3131
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
3232
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
3333
import com.amazonaws.services.kinesis.multilang.messages.Message;
@@ -119,19 +119,21 @@ private Future<Boolean> writeMessage(Message message) {
119119
/**
120120
* Writes an {@link InitializeMessage} to the subprocess.
121121
*
122-
* @param shardIdToWrite The shard id.
122+
* @param initializationInput
123+
* contains information about the shard being initialized
123124
*/
124-
Future<Boolean> writeInitializeMessage(String shardIdToWrite) {
125-
return writeMessage(new InitializeMessage(shardIdToWrite));
125+
Future<Boolean> writeInitializeMessage(InitializationInput initializationInput) {
126+
return writeMessage(new InitializeMessage(initializationInput));
126127
}
127128

128129
/**
129130
* Writes a {@link ProcessRecordsMessage} message to the subprocess.
130131
*
131-
* @param records The records to be processed.
132+
* @param processRecordsInput
133+
* the records, and associated metadata to be processed.
132134
*/
133-
Future<Boolean> writeProcessRecordsMessage(List<Record> records) {
134-
return writeMessage(new ProcessRecordsMessage(records));
135+
Future<Boolean> writeProcessRecordsMessage(ProcessRecordsInput processRecordsInput) {
136+
return writeMessage(new ProcessRecordsMessage(processRecordsInput));
135137
}
136138

137139
/**
@@ -146,11 +148,16 @@ Future<Boolean> writeShutdownMessage(ShutdownReason reason) {
146148
/**
147149
* Writes a {@link CheckpointMessage} to the subprocess.
148150
*
149-
* @param sequenceNumber The sequence number that was checkpointed.
150-
* @param throwable The exception that was thrown by a checkpoint attempt. Null if one didn't occur.
151+
* @param sequenceNumber
152+
* The sequence number that was checkpointed.
153+
* @param subSequenceNumber
154+
* the sub sequence number to checkpoint at.
155+
* @param throwable
156+
* The exception that was thrown by a checkpoint attempt. Null if one didn't occur.
151157
*/
152-
Future<Boolean> writeCheckpointMessageWithError(String sequenceNumber, Throwable throwable) {
153-
return writeMessage(new CheckpointMessage(sequenceNumber, throwable));
158+
Future<Boolean> writeCheckpointMessageWithError(String sequenceNumber, Long subSequenceNumber,
159+
Throwable throwable) {
160+
return writeMessage(new CheckpointMessage(sequenceNumber, subSequenceNumber, throwable));
154161
}
155162

156163
/**

Diff for: src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.commons.logging.Log;
2525
import org.apache.commons.logging.LogFactory;
2626

27+
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
2728
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
2829
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
2930

@@ -71,7 +72,13 @@ public class MultiLangDaemon implements Callable<Integer> {
7172
public MultiLangDaemon(KinesisClientLibConfiguration configuration,
7273
MultiLangRecordProcessorFactory recordProcessorFactory,
7374
ExecutorService workerThreadPool) {
74-
this(new Worker(recordProcessorFactory, configuration, workerThreadPool));
75+
this(buildWorker(recordProcessorFactory, configuration, workerThreadPool));
76+
}
77+
78+
private static Worker buildWorker(IRecordProcessorFactory recordProcessorFactory,
79+
KinesisClientLibConfiguration configuration, ExecutorService workerThreadPool) {
80+
return new Worker.Builder().recordProcessorFactory(recordProcessorFactory).config(configuration)
81+
.execService(workerThreadPool).build();
7582
}
7683

7784
/**

Diff for: src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemonConfig.java

+34-5
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,24 @@
1414
*/
1515
package com.amazonaws.services.kinesis.multilang;
1616

17+
import java.io.File;
18+
import java.io.FileInputStream;
19+
import java.io.FileNotFoundException;
1720
import java.io.IOException;
1821
import java.io.InputStream;
1922
import java.util.Properties;
2023
import java.util.concurrent.ExecutorService;
21-
import java.util.concurrent.Executors;
24+
import java.util.concurrent.LinkedBlockingQueue;
25+
import java.util.concurrent.SynchronousQueue;
26+
import java.util.concurrent.ThreadPoolExecutor;
27+
import java.util.concurrent.TimeUnit;
2228

2329
import org.apache.commons.logging.Log;
2430
import org.apache.commons.logging.LogFactory;
2531

2632
import com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfigurator;
2733
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
34+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2835

2936
/**
3037
* This class captures the configuration needed to run the MultiLangDaemon.
@@ -131,10 +138,29 @@ private void prepare(String processingLanguage) {
131138

132139
private static Properties loadProperties(ClassLoader classLoader, String propertiesFileName) throws IOException {
133140
Properties properties = new Properties();
134-
try (InputStream propertiesStream = classLoader.getResourceAsStream(propertiesFileName)) {
135-
properties.load(propertiesStream);
141+
InputStream propertyStream = null;
142+
try {
143+
propertyStream = classLoader.getResourceAsStream(propertiesFileName);
144+
if (propertyStream == null) {
145+
File propertyFile = new File(propertiesFileName);
146+
if (propertyFile.exists()) {
147+
propertyStream = new FileInputStream(propertyFile);
148+
}
149+
}
150+
151+
if (propertyStream == null) {
152+
throw new FileNotFoundException(
153+
"Unable to find property file in classpath, or file system: '" + propertiesFileName + "'");
154+
}
155+
156+
properties.load(propertyStream);
136157
return properties;
158+
} finally {
159+
if (propertyStream != null) {
160+
propertyStream.close();
161+
}
137162
}
163+
138164
}
139165

140166
private static boolean validateProperties(Properties properties) {
@@ -147,13 +173,16 @@ private static int getMaxActiveThreads(Properties properties) {
147173

148174
private static ExecutorService buildExecutorService(Properties properties) {
149175
int maxActiveThreads = getMaxActiveThreads(properties);
176+
ThreadFactoryBuilder builder = new ThreadFactoryBuilder().setNameFormat("multi-lang-daemon-%04d");
150177
LOG.debug(String.format("Value for %s property is %d", PROP_MAX_ACTIVE_THREADS, maxActiveThreads));
151178
if (maxActiveThreads <= 0) {
152179
LOG.info("Using a cached thread pool.");
153-
return Executors.newCachedThreadPool();
180+
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
181+
builder.build());
154182
} else {
155183
LOG.info(String.format("Using a fixed thread pool with %d max active threads.", maxActiveThreads));
156-
return Executors.newFixedThreadPool(maxActiveThreads);
184+
return new ThreadPoolExecutor(maxActiveThreads, maxActiveThreads, 0L, TimeUnit.MILLISECONDS,
185+
new LinkedBlockingQueue<Runnable>(), builder.build());
157186
}
158187
}
159188

0 commit comments

Comments
 (0)