11
11
* and limitations under the License.
12
12
*/
13
13
14
- import { Stack } from '@aws-cdk/core' ;
15
- import * as kinesisanalytics from '@aws-cdk/aws-kinesisanalytics' ;
16
- import * as defaults from '../index' ;
17
- import { overrideProps } from '../lib/utils' ;
18
- import '@aws-cdk/assert/jest' ;
14
+ import { Stack , RemovalPolicy } from "@aws-cdk/core" ;
15
+ import * as cdk from "@aws-cdk/core" ;
16
+ import * as kinesisanalytics from "@aws-cdk/aws-kinesisanalytics" ;
17
+ import * as kinesisFirehose from "@aws-cdk/aws-kinesisfirehose" ;
18
+ import * as iam from "@aws-cdk/aws-iam" ;
19
+ import * as kms from "@aws-cdk/aws-kms" ;
20
+ import * as logs from "@aws-cdk/aws-logs" ;
21
+ import * as defaults from "../index" ;
22
+ import { overrideProps } from "../lib/utils" ;
23
+ import "@aws-cdk/assert/jest" ;
19
24
20
- test ( ' test kinesisanalytics override inputProperty' , ( ) => {
25
+ test ( " test kinesisanalytics override inputProperty" , ( ) => {
21
26
const stack = new Stack ( ) ;
22
27
23
28
const inputProperty : kinesisanalytics . CfnApplication . InputProperty = {
24
29
inputSchema : {
25
- recordColumns : [ { name : 'x' , sqlType : 'y' } ] ,
26
- recordFormat : { recordFormatType : ' csv' }
30
+ recordColumns : [ { name : "x" , sqlType : "y" } ] ,
31
+ recordFormat : { recordFormatType : " csv" } ,
27
32
} ,
28
- namePrefix : ' zzz'
33
+ namePrefix : " zzz" ,
29
34
} ;
30
35
31
- const defaultProps : kinesisanalytics . CfnApplicationProps = defaults . DefaultCfnApplicationProps ;
36
+ const defaultProps : kinesisanalytics . CfnApplicationProps =
37
+ defaults . DefaultCfnApplicationProps ;
32
38
33
39
const inProps : kinesisanalytics . CfnApplicationProps = {
34
- inputs : [ inputProperty ]
40
+ inputs : [ inputProperty ] ,
35
41
} ;
36
42
37
43
const outProps = overrideProps ( defaultProps , inProps ) ;
38
44
39
- new kinesisanalytics . CfnApplication ( stack , ' KinesisAnalytics' , outProps ) ;
45
+ new kinesisanalytics . CfnApplication ( stack , " KinesisAnalytics" , outProps ) ;
40
46
41
47
expect ( stack ) . toHaveResource ( "AWS::KinesisAnalytics::Application" , {
42
48
Inputs : [
@@ -45,15 +51,146 @@ test('test kinesisanalytics override inputProperty', () => {
45
51
RecordColumns : [
46
52
{
47
53
Name : "x" ,
48
- SqlType : "y"
49
- }
54
+ SqlType : "y" ,
55
+ } ,
50
56
] ,
51
57
RecordFormat : {
52
- RecordFormatType : "csv"
53
- }
58
+ RecordFormatType : "csv" ,
59
+ } ,
54
60
} ,
55
- NamePrefix : "zzz"
56
- }
57
- ]
61
+ NamePrefix : "zzz" ,
62
+ } ,
63
+ ] ,
58
64
} ) ;
59
- } ) ;
65
+ } ) ;
66
+
67
+ test ( "Test default implementation" , ( ) => {
68
+ const stack = new Stack ( ) ;
69
+
70
+ const newFirehose = CreateFirehose ( stack ) ;
71
+ const kinesisProps : defaults . BuildKinesisAnalyticsAppProps = {
72
+ kinesisFirehose : newFirehose ,
73
+ kinesisAnalyticsProps : {
74
+ inputs : [ {
75
+ inputSchema : {
76
+ recordColumns : [ {
77
+ name : 'ts' ,
78
+ sqlType : 'TIMESTAMP' ,
79
+ mapping : '$.timestamp'
80
+ } , {
81
+ name : 'trip_id' ,
82
+ sqlType : 'VARCHAR(64)' ,
83
+ mapping : '$.trip_id'
84
+ } ] ,
85
+ recordFormat : {
86
+ recordFormatType : 'JSON'
87
+ } ,
88
+ recordEncoding : 'UTF-8'
89
+ } ,
90
+ namePrefix : 'SOURCE_SQL_STREAM'
91
+ } ]
92
+ } ,
93
+ } ;
94
+
95
+ defaults . buildKinesisAnalyticsApp ( stack , kinesisProps ) ;
96
+
97
+ expect ( stack ) . toHaveResourceLike ( "AWS::KinesisAnalytics::Application" , {
98
+ Inputs : [ {
99
+ InputSchema : {
100
+ RecordColumns : [ {
101
+ Name : 'ts' ,
102
+ SqlType : 'TIMESTAMP' ,
103
+ Mapping : '$.timestamp'
104
+ } , {
105
+ Name : 'trip_id' ,
106
+ SqlType : 'VARCHAR(64)' ,
107
+ Mapping : '$.trip_id'
108
+ } ] ,
109
+ RecordFormat : {
110
+ RecordFormatType : 'JSON'
111
+ } ,
112
+ RecordEncoding : 'UTF-8'
113
+ } ,
114
+ NamePrefix : 'SOURCE_SQL_STREAM'
115
+ } ]
116
+ } ) ;
117
+ } ) ;
118
+
119
+ // test('Test for customer overrides', {
120
+ // test('Check policy created', {
121
+
122
+ function CreateFirehose ( stack : Stack ) : kinesisFirehose . CfnDeliveryStream {
123
+ // Creating the Firehose is kind of a big deal. FirehoseToS3 is not readily available here in core,
124
+ // so this routine pretty much replicates it. If this function ceases to work correctly, look at
125
+ // FirehoseToS3 and see if that changed.
126
+ const destinationBucket = defaults . CreateScrapBucket ( stack , {
127
+ removalPolicy : RemovalPolicy . DESTROY ,
128
+ autoDeleteObjects : true ,
129
+ } ) ;
130
+
131
+ const kinesisFirehoseLogGroup = defaults . buildLogGroup (
132
+ stack ,
133
+ "firehose-log-group" ,
134
+ { }
135
+ ) ;
136
+
137
+ const cwLogStream : logs . LogStream = kinesisFirehoseLogGroup . addStream (
138
+ "firehose-log-stream"
139
+ ) ;
140
+
141
+ const firehoseRole = new iam . Role ( stack , "test-role" , {
142
+ assumedBy : new iam . ServicePrincipal ( "firehose.amazonaws.com" ) ,
143
+ } ) ;
144
+
145
+ // Setup the IAM policy for Kinesis Firehose
146
+ const firehosePolicy = new iam . Policy ( stack , "KinesisFirehosePolicy" , {
147
+ statements : [
148
+ new iam . PolicyStatement ( {
149
+ actions : [
150
+ "s3:AbortMultipartUpload" ,
151
+ "s3:GetBucketLocation" ,
152
+ "s3:GetObject" ,
153
+ "s3:ListBucket" ,
154
+ "s3:ListBucketMultipartUploads" ,
155
+ "s3:PutObject" ,
156
+ ] ,
157
+ resources : [
158
+ `${ destinationBucket . bucketArn } ` ,
159
+ `${ destinationBucket . bucketArn } /*` ,
160
+ ] ,
161
+ } ) ,
162
+ new iam . PolicyStatement ( {
163
+ actions : [ "logs:PutLogEvents" ] ,
164
+ resources : [
165
+ `arn:${ cdk . Aws . PARTITION } :logs:${ cdk . Aws . REGION } :${ cdk . Aws . ACCOUNT_ID } :log-group:${ kinesisFirehoseLogGroup . logGroupName } :log-stream:${ cwLogStream . logStreamName } ` ,
166
+ ] ,
167
+ } ) ,
168
+ ] ,
169
+ } ) ;
170
+
171
+ // Attach policy to role
172
+ firehosePolicy . attachToRole ( firehoseRole ) ;
173
+
174
+ const awsManagedKey : kms . IKey = kms . Alias . fromAliasName (
175
+ stack ,
176
+ "aws-managed-key" ,
177
+ "alias/aws/s3"
178
+ ) ;
179
+
180
+ const defaultKinesisFirehoseProps : kinesisFirehose . CfnDeliveryStreamProps = defaults . DefaultCfnDeliveryStreamProps (
181
+ destinationBucket . bucketArn ,
182
+ firehoseRole . roleArn ,
183
+ kinesisFirehoseLogGroup . logGroupName ,
184
+ cwLogStream . logStreamName ,
185
+ awsManagedKey
186
+ ) ;
187
+
188
+ destinationBucket . grantPut ( firehoseRole ) ;
189
+
190
+ const firehose = new kinesisFirehose . CfnDeliveryStream (
191
+ stack ,
192
+ "KinesisFirehose" ,
193
+ defaultKinesisFirehoseProps
194
+ ) ;
195
+ return firehose ;
196
+ }
0 commit comments