1
1
/*
2
- * Copyright 2019 the original author or authors.
2
+ * Copyright 2019-2022 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
23
23
import java .util .concurrent .ScheduledFuture ;
24
24
import java .util .concurrent .atomic .AtomicInteger ;
25
25
26
- import org .checkerframework .checker .nullness .qual .Nullable ;
27
-
28
26
import org .springframework .context .Lifecycle ;
29
27
import org .springframework .core .convert .converter .Converter ;
30
28
import org .springframework .core .serializer .support .SerializingConverter ;
31
29
import org .springframework .expression .Expression ;
32
30
import org .springframework .expression .common .LiteralExpression ;
33
31
import org .springframework .integration .aws .support .AwsHeaders ;
34
32
import org .springframework .integration .aws .support .AwsRequestFailureException ;
33
+ import org .springframework .integration .expression .ValueExpression ;
35
34
import org .springframework .integration .handler .AbstractMessageHandler ;
36
35
import org .springframework .integration .mapping .HeaderMapper ;
37
36
import org .springframework .integration .mapping .OutboundMessageMapper ;
55
54
import com .amazonaws .services .kinesis .producer .KinesisProducer ;
56
55
import com .amazonaws .services .kinesis .producer .UserRecord ;
57
56
import com .amazonaws .services .kinesis .producer .UserRecordResult ;
57
+ import com .amazonaws .services .schemaregistry .common .Schema ;
58
58
import com .google .common .util .concurrent .FutureCallback ;
59
59
import com .google .common .util .concurrent .Futures ;
60
60
import com .google .common .util .concurrent .ListenableFuture ;
@@ -90,6 +90,8 @@ public class KplMessageHandler extends AbstractAwsMessageHandler<Void> implement
90
90
91
91
private Expression sequenceNumberExpression ;
92
92
93
+ private Expression glueSchemaExpression ;
94
+
93
95
private OutboundMessageMapper <byte []> embeddedHeadersMapper ;
94
96
95
97
private Duration flushDuration = Duration .ofMillis (0 );
@@ -201,6 +203,37 @@ public void setHeaderMapper(HeaderMapper<Void> headerMapper) {
201
203
+ "Consider to use 'OutboundMessageMapper<byte[]>' for embedding headers into the record data." );
202
204
}
203
205
206
+ /**
207
+ * Set a {@link Schema} to add into a {@link UserRecord} built from the request message.
208
+ * @param glueSchema the {@link Schema} to add into a {@link UserRecord}.
209
+ * @since 2.5.2
210
+ * @see UserRecord#setSchema(Schema)
211
+ */
212
+ public void setGlueSchema (Schema glueSchema ) {
213
+ setPartitionKeyExpression (new ValueExpression <>(glueSchema ));
214
+ }
215
+
216
+ /**
217
+ * Set a SpEL expression for {@link Schema} to add into a {@link UserRecord}
218
+ * built from the request message.
219
+ * @param glueSchemaExpression the SpEL expression to evaluate a {@link Schema}.
220
+ * @since 2.5.2
221
+ * @see UserRecord#setSchema(Schema)
222
+ */
223
+ public void setGlueSchemaExpressionString (String glueSchemaExpression ) {
224
+ setGlueSchemaExpression (EXPRESSION_PARSER .parseExpression (glueSchemaExpression ));
225
+ }
226
+
227
+ /**
228
+ * Set a SpEL expression for {@link Schema} to add into a {@link UserRecord}
229
+ * built from the request message.
230
+ * @param glueSchemaExpression the SpEL expression to evaluate a {@link Schema}.
231
+ * @since 2.5.2
232
+ * @see UserRecord#setSchema(Schema)
233
+ */
234
+ public void setGlueSchemaExpression (Expression glueSchemaExpression ) {
235
+ this .glueSchemaExpression = glueSchemaExpression ;
236
+ }
204
237
205
238
@ Override
206
239
public synchronized void start () {
@@ -247,6 +280,7 @@ else if (message.getPayload() instanceof UserRecord) {
247
280
userRecord .setData (putRecordRequest .getData ());
248
281
userRecord .setPartitionKey (putRecordRequest .getPartitionKey ());
249
282
userRecord .setStreamName (putRecordRequest .getStreamName ());
283
+ setGlueSchemaIntoUserRecordIfAny (userRecord , message );
250
284
return handleUserRecord (message , putRecordRequest , userRecord );
251
285
}
252
286
}
@@ -268,6 +302,7 @@ private Future<PutRecordsResult> handlePutRecordsRequest(Message<?> message, Put
268
302
userRecord .setData (putRecordsRequestEntry .getData ());
269
303
userRecord .setPartitionKey (putRecordsRequestEntry .getPartitionKey ());
270
304
userRecord .setStreamName (putRecordsRequest .getStreamName ());
305
+ setGlueSchemaIntoUserRecordIfAny (userRecord , message );
271
306
return userRecord ;
272
307
})
273
308
.concatMap ((userRecord ) ->
@@ -303,6 +338,13 @@ private Future<PutRecordsResult> handlePutRecordsRequest(Message<?> message, Put
303
338
return putRecordsResultFuture ;
304
339
}
305
340
341
+ private void setGlueSchemaIntoUserRecordIfAny (UserRecord userRecord , Message <?> message ) {
342
+ if (this .glueSchemaExpression != null ) {
343
+ Schema schema = this .glueSchemaExpression .getValue (getEvaluationContext (), message , Schema .class );
344
+ userRecord .setSchema (schema );
345
+ }
346
+ }
347
+
306
348
private Future <?> handleUserRecord (Message <?> message , PutRecordRequest putRecordRequest , UserRecord userRecord ) {
307
349
ListenableFuture <UserRecordResult > recordResult = this .kinesisProducer .addUserRecord (userRecord );
308
350
applyCallbackForAsyncHandler (message , putRecordRequest , recordResult );
@@ -333,71 +375,88 @@ public void onSuccess(R result) {
333
375
}
334
376
335
377
private PutRecordRequest buildPutRecordRequest (Message <?> message ) {
336
- MessageHeaders messageHeaders = message .getHeaders ();
337
- String stream = messageHeaders .get (AwsHeaders .STREAM , String .class );
338
- if (!StringUtils .hasText (stream ) && this .streamExpression != null ) {
339
- stream = this .streamExpression .getValue (getEvaluationContext (), message , String .class );
340
- }
341
- Assert .state (stream != null ,
342
- "'stream' must not be null for sending a Kinesis record. "
343
- + "Consider configuring this handler with a 'stream'( or 'streamExpression') or supply an "
344
- + "'aws_stream' message header." );
345
-
346
- String partitionKey = messageHeaders .get (AwsHeaders .PARTITION_KEY , String .class );
347
- if (!StringUtils .hasText (partitionKey ) && this .partitionKeyExpression != null ) {
348
- partitionKey = this .partitionKeyExpression .getValue (getEvaluationContext (), message , String .class );
349
- }
350
- Assert .state (partitionKey != null , "'partitionKey' must not be null for sending a Kinesis record. "
351
- + "Consider configuring this handler with a 'partitionKey'( or 'partitionKeyExpression') or supply an "
352
- + "'aws_partitionKey' message header." );
353
-
354
- String explicitHashKey = (this .explicitHashKeyExpression != null
355
- ? this .explicitHashKeyExpression .getValue (getEvaluationContext (), message , String .class ) : null );
356
-
357
- String sequenceNumber = messageHeaders .get (AwsHeaders .SEQUENCE_NUMBER , String .class );
358
- if (!StringUtils .hasText (sequenceNumber ) && this .sequenceNumberExpression != null ) {
359
- sequenceNumber = this .sequenceNumberExpression .getValue (getEvaluationContext (), message , String .class );
360
- }
361
-
362
378
Object payload = message .getPayload ();
363
379
364
380
ByteBuffer data = null ;
365
-
366
- Message <?> messageToEmbed = null ;
367
-
368
- if (payload instanceof ByteBuffer ) {
369
- data = (ByteBuffer ) payload ;
370
- if (this .embeddedHeadersMapper != null ) {
371
- messageToEmbed = new MutableMessage <>(data .array (), messageHeaders );
372
- }
381
+ String sequenceNumber = null ;
382
+ String stream ;
383
+ String partitionKey ;
384
+ String explicitHashKey ;
385
+
386
+ if (payload instanceof UserRecord ) {
387
+ UserRecord userRecord = (UserRecord ) payload ;
388
+ data = userRecord .getData ();
389
+ stream = userRecord .getStreamName ();
390
+ partitionKey = userRecord .getPartitionKey ();
391
+ explicitHashKey = userRecord .getExplicitHashKey ();
373
392
}
374
393
else {
375
- byte [] bytes =
376
- (byte []) (payload instanceof byte []
377
- ? payload
378
- : this .messageConverter .fromMessage (message , byte [].class ));
379
- Assert .notNull (bytes , "payload cannot be null" );
380
- if (this .embeddedHeadersMapper != null ) {
381
- messageToEmbed = new MutableMessage <>(bytes , messageHeaders );
394
+ MessageHeaders messageHeaders = message .getHeaders ();
395
+ stream = messageHeaders .get (AwsHeaders .STREAM , String .class );
396
+ if (!StringUtils .hasText (stream ) && this .streamExpression != null ) {
397
+ stream = this .streamExpression .getValue (getEvaluationContext (), message , String .class );
382
398
}
383
- else {
384
- data = ByteBuffer .wrap (bytes );
399
+ Assert .state (stream != null ,
400
+ "'stream' must not be null for sending a Kinesis record. "
401
+ + "Consider configuring this handler with a 'stream'( or 'streamExpression') or supply an "
402
+ + "'aws_stream' message header." );
403
+
404
+ partitionKey = messageHeaders .get (AwsHeaders .PARTITION_KEY , String .class );
405
+ if (!StringUtils .hasText (partitionKey ) && this .partitionKeyExpression != null ) {
406
+ partitionKey = this .partitionKeyExpression .getValue (getEvaluationContext (), message , String .class );
407
+ }
408
+ Assert .state (partitionKey != null , "'partitionKey' must not be null for sending a Kinesis record. "
409
+ + "Consider configuring this handler with a 'partitionKey'( or 'partitionKeyExpression') " +
410
+ "or supply an 'aws_partitionKey' message header." );
411
+
412
+ explicitHashKey = (this .explicitHashKeyExpression != null
413
+ ? this .explicitHashKeyExpression .getValue (getEvaluationContext (), message , String .class ) : null );
414
+
415
+ sequenceNumber = messageHeaders .get (AwsHeaders .SEQUENCE_NUMBER , String .class );
416
+ if (!StringUtils .hasText (sequenceNumber ) && this .sequenceNumberExpression != null ) {
417
+ sequenceNumber = this .sequenceNumberExpression .getValue (getEvaluationContext (), message , String .class );
385
418
}
386
- }
387
419
388
- if (messageToEmbed != null ) {
389
- try {
390
- byte [] bytes = this .embeddedHeadersMapper .fromMessage (messageToEmbed );
420
+ Message <?> messageToEmbed = null ;
421
+
422
+ if (payload instanceof ByteBuffer ) {
423
+ data = (ByteBuffer ) payload ;
424
+ if (this .embeddedHeadersMapper != null ) {
425
+ messageToEmbed = new MutableMessage <>(data .array (), messageHeaders );
426
+ }
427
+ }
428
+ else {
429
+ byte [] bytes =
430
+ (byte []) (payload instanceof byte []
431
+ ? payload
432
+ : this .messageConverter .fromMessage (message , byte [].class ));
391
433
Assert .notNull (bytes , "payload cannot be null" );
392
- data = ByteBuffer .wrap (bytes );
434
+ if (this .embeddedHeadersMapper != null ) {
435
+ messageToEmbed = new MutableMessage <>(bytes , messageHeaders );
436
+ }
437
+ else {
438
+ data = ByteBuffer .wrap (bytes );
439
+ }
393
440
}
394
- catch (Exception ex ) {
395
- throw new MessageConversionException (message , "Cannot embedded headers to payload" , ex );
441
+
442
+ if (messageToEmbed != null ) {
443
+ try {
444
+ byte [] bytes = this .embeddedHeadersMapper .fromMessage (messageToEmbed );
445
+ Assert .notNull (bytes , "payload cannot be null" );
446
+ data = ByteBuffer .wrap (bytes );
447
+ }
448
+ catch (Exception ex ) {
449
+ throw new MessageConversionException (message , "Cannot embedded headers to payload" , ex );
450
+ }
396
451
}
397
452
}
398
453
399
- return new PutRecordRequest ().withStreamName (stream ).withPartitionKey (partitionKey )
400
- .withExplicitHashKey (explicitHashKey ).withSequenceNumberForOrdering (sequenceNumber ).withData (data );
454
+ return new PutRecordRequest ()
455
+ .withStreamName (stream )
456
+ .withPartitionKey (partitionKey )
457
+ .withExplicitHashKey (explicitHashKey )
458
+ .withSequenceNumberForOrdering (sequenceNumber )
459
+ .withData (data );
401
460
}
402
461
403
462
@ Override
@@ -427,7 +486,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {
427
486
Futures .addCallback (listenableFuture , new FutureCallback <T >() {
428
487
429
488
@ Override
430
- public void onSuccess (@ Nullable T result ) {
489
+ public void onSuccess (T result ) {
431
490
completable .complete (result );
432
491
}
433
492
0 commit comments