Skip to content

Commit a785420

Browse files
committed
Merge branch '6.2.x'
2 parents a52e9c2 + bf2d6c7 commit a785420

23 files changed

+477
-152
lines changed

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ Additional examples may be found under [src/main/](src/main/java/io/confluent/ex
6666
| MapFunction | DSL, stateless transformations, `map()` | [Java 8+ example](src/main/java/io/confluent/examples/streams/MapFunctionLambdaExample.java) | | [Scala Example](src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala) |
6767
| SessionWindows | Sessionization of user events, user behavior analysis | | [Java 7+ example](src/main/java/io/confluent/examples/streams/SessionWindowsExample.java)
6868
| GlobalKTable | `join()` between `KStream` and `GlobalKTable` | [Java 8+ example](src/main/java/io/confluent/examples/streams/GlobalKTablesExample.java) | | |
69+
| GlobalStore | "join" between `KStream` and `GlobalStore` | [Java 8+ example](src/main/java/io/confluent/examples/streams/GlobalStoresExample.java) | | |
6970
| PageViewRegion | `join()` between `KStream` and `KTable` | [Java 8+ example](src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java) | [Java 7+ example](src/main/java/io/confluent/examples/streams/PageViewRegionExample.java) | |
7071
| PageViewRegionGenericAvro | Working with data in Generic Avro format | [Java 8+ example](src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java) | [Java 7+ example](src/main/java/io/confluent/examples/streams/PageViewRegionExample.java) | |
7172
| WikipediaFeedSpecificAvro | Working with data in Specific Avro format | [Java 8+ example](src/main/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExample.java) | [Java 7+ example](src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java) | |
@@ -109,6 +110,7 @@ Additional examples may be found under [src/test/](src/test/java/io/confluent/ex
109110
| CustomStreamTableJoin | DSL, Processor API, Transformers | [Java 8+ Example](src/test/java/io/confluent/examples/streams/CustomStreamTableJoinIntegrationTest.java) | | |
110111
| EventDeduplication | DSL, Processor API, Transformers | [Java 8+ Example](src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java) | | |
111112
| GlobalKTable | DSL, global state | | [Java 7+ Example](src/test/java/io/confluent/examples/streams/GlobalKTablesExampleTest.java) | |
113+
| GlobalStore | DSL, global state, Transformers | | [Java 7+ Example](src/test/java/io/confluent/examples/streams/GlobalStoresExampleTest.java) | |
112114
| HandlingCorruptedInputRecords | DSL, `flatMap()` | [Java 8+ Example](src/test/java/io/confluent/examples/streams/HandlingCorruptedInputRecordsIntegrationTest.java) | | |
113115
| KafkaMusic (Interactive Queries) | Interactive Queries, State Stores, REST API | | [Java 7+ Example](src/test/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleTest.java) | |
114116
| MapFunction | DSL, stateless transformations, `map()` | [Java 8+ Example](src/test/java/io/confluent/examples/streams/MapFunctionLambdaIntegrationTest.java) | | |

src/main/java/io/confluent/examples/streams/AnomalyDetectionLambdaExample.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public static void main(final String[] args) {
160160
// Always (and unconditionally) clean local state prior to starting the processing topology.
161161
// We opt for this unconditional call here because this will make it easier for you to play around with the example
162162
// when resetting the application for doing a re-run (via the Application Reset Tool,
163-
// http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool).
163+
// https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html).
164164
//
165165
// The drawback of cleaning up local state prior is that your app must rebuilt its local state from scratch, which
166166
// will take time and will require reading all the state-relevant data from the Kafka cluster over the network.

src/main/java/io/confluent/examples/streams/ApplicationResetExample.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
/**
3030
* Demonstrates how to reset a Kafka Streams application to re-process its input data from scratch.
31-
* See also <a href='http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool'>http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool</a>
31+
* See also <a href='https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html'>https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html</a>
3232
* <p>
3333
* The main purpose of the example is to explain the usage of the "Application Reset Tool".
3434
* Thus, we don’t put the focus on what this topology is actually doing&mdash;the point is to have an example of a

src/main/java/io/confluent/examples/streams/GlobalKTablesExampleDriver.java renamed to src/main/java/io/confluent/examples/streams/GlobalKTablesAndStoresExampleDriver.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.confluent.examples.streams.avro.EnrichedOrder;
2020
import io.confluent.examples.streams.avro.Order;
2121
import io.confluent.examples.streams.avro.Product;
22-
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
22+
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
2323
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
2424
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
2525
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
@@ -46,8 +46,8 @@
4646
import static io.confluent.examples.streams.GlobalKTablesExample.PRODUCT_TOPIC;
4747

4848
/**
49-
* This is a sample driver for the {@link GlobalKTablesExample}.
50-
* To run this driver please first refer to the instructions in {@link GlobalKTablesExample}.
49+
* This is a sample driver for the {@link GlobalKTablesExample} and {@link GlobalStoresExample}.
50+
* To run this driver please first refer to the instructions in {@link GlobalKTablesExample} or {@link GlobalStoresExample}.
5151
* You can then run this class directly in your IDE or via the command line.
5252
* <p>
5353
* To run via the command line you might want to package as a fatjar first. Please refer to:
@@ -56,11 +56,11 @@
5656
* Once packaged you can then run:
5757
* <pre>
5858
* {@code
59-
* $ java -cp target/kafka-streams-examples-7.0.0-0-standalone.jar io.confluent.examples.streams.GlobalKTablesExampleDriver
59+
* $ java -cp target/kafka-streams-examples-7.0.0-0-standalone.jar io.confluent.examples.streams.GlobalKTablesAndStoresExampleDriver
6060
* }
6161
* </pre>
6262
*/
63-
public class GlobalKTablesExampleDriver {
63+
public class GlobalKTablesAndStoresExampleDriver {
6464

6565
private static final Random RANDOM = new Random();
6666
private static final int RECORDS_TO_GENERATE = 100;
@@ -85,7 +85,7 @@ private static void receiveEnrichedOrders(
8585
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
8686
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.Long().deserializer().getClass());
8787
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
88-
consumerProps.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
88+
consumerProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
8989
consumerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
9090

9191
final KafkaConsumer<Long, EnrichedOrder> consumer = new KafkaConsumer<>(consumerProps);
@@ -184,7 +184,7 @@ private static <VT extends SpecificRecord> SpecificAvroSerde<VT> createSerde(fin
184184

185185
final SpecificAvroSerde<VT> serde = new SpecificAvroSerde<>();
186186
final Map<String, String> serdeConfig = Collections.singletonMap(
187-
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
187+
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
188188
serde.configure(serdeConfig, false);
189189
return serde;
190190
}

src/main/java/io/confluent/examples/streams/GlobalKTablesExample.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@
4242
* Demonstrates how to perform joins between KStreams and GlobalKTables, i.e. joins that
4343
* don't require re-partitioning of the input streams.
4444
* <p>
45+
* The {@link GlobalStoresExample} shows another way to perform the same operation using
46+
* {@link org.apache.kafka.streams.TopologyDescription.GlobalStore} and a
47+
* {@link org.apache.kafka.streams.kstream.ValueTransformer}.
48+
* <p>
4549
* In this example, we join a stream of orders that reads from a topic named
4650
* "order" with a customers table that reads from a topic named "customer", and a products
4751
* table that reads from a topic "product". The join produces an EnrichedOrder object.
@@ -75,14 +79,14 @@
7579
* $ java -cp target/kafka-streams-examples-7.0.0-0-standalone.jar io.confluent.examples.streams.GlobalKTablesExample
7680
* }
7781
* </pre>
78-
* 4) Write some input data to the source topics (e.g. via {@link GlobalKTablesExampleDriver}). The
82+
* 4) Write some input data to the source topics (e.g. via {@link GlobalKTablesAndStoresExampleDriver}). The
7983
* already running example application (step 3) will automatically process this input data and write
8084
* the results to the output topic.
8185
* <pre>
8286
* {@code
8387
* # Here: Write input data using the example driver. The driver will exit once it has received
8488
* # all EnrichedOrders
85-
* $ java -cp target/kafka-streams-examples-7.0.0-0-standalone.jar io.confluent.examples.streams.GlobalKTablesExampleDriver
89+
* $ java -cp target/kafka-streams-examples-7.0.0-0-standalone.jar io.confluent.examples.streams.GlobalKTablesAndStoresExampleDriver
8690
* }
8791
* </pre>
8892
* <p>
@@ -108,7 +112,7 @@ public static void main(final String[] args) {
108112
// Always (and unconditionally) clean local state prior to starting the processing topology.
109113
// We opt for this unconditional call here because this will make it easier for you to play around with the example
110114
// when resetting the application for doing a re-run (via the Application Reset Tool,
111-
// http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool).
115+
// https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html).
112116
//
113117
// The drawback of cleaning up local state prior is that your app must rebuilt its local state from scratch, which
114118
// will take time and will require reading all the state-relevant data from the Kafka cluster over the network.

0 commit comments

Comments
 (0)