forked from awslabs/amazon-kinesis-client-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_amazon_kclpy_input_output_integration.py
107 lines (88 loc) · 3.83 KB
/
test_amazon_kclpy_input_output_integration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import json
import re
from amazon_kclpy import kcl
from utils import make_io_obj
# Dummy record processor
class RecordProcessor(kcl.RecordProcessorBase):
def __init__(self, expected_shard_id, expected_sequence_number):
self.expected_shard_id = expected_shard_id
self.expected_sequence_number = expected_sequence_number
pass
def initialize(self, shard_id):
assert shard_id == self.expected_shard_id
pass
def process_records(self, records, checkpointer):
seq = records[0].get('sequenceNumber')
assert seq == self.expected_sequence_number
try:
checkpointer.checkpoint(seq)
assert 0, "First checkpoint should fail"
except Exception:
# Try it one more time (this time it'll work)
checkpointer.checkpoint(seq)
def shutdown(self, checkpointer, reason):
if 'TERMINATE' == reason:
checkpointer.checkpoint()
'''
An input string which we'll feed to a file for kcl.py to read from.
'''
'''
This string is approximately what the output should look like. We remove whitespace when comparing this to what is
written to the outputfile.
'''
test_output_string = """
{"action": "status", "responseFor": "initialize"}
{"action": "checkpoint", "checkpoint": "456"}
{"action": "checkpoint", "checkpoint": "456"}
{"action": "status", "responseFor": "processRecords"}
{"action": "checkpoint", "checkpoint": null}
{"action": "status", "responseFor": "shutdown"}
"""
test_output_messages = [
{"action": "status", "responseFor": "initialize"},
{"action": "checkpoint", "sequenceNumber": "456", "subSequenceNumber": None},
{"action": "checkpoint", "sequenceNumber": "456", "subSequenceNumber": None},
{"action": "status", "responseFor": "processRecords"},
{"action": "checkpoint", "sequenceNumber": None, "subSequenceNumber": None},
{"action": "status", "responseFor": "shardEnded"}
]
def _strip_all_whitespace(s):
return re.sub('\\s*', '', s)
test_shard_id = "shardId-123"
test_sequence_number = "456"
test_input_messages = [
{"action": "initialize", "shardId": test_shard_id, "sequenceNumber": test_sequence_number, "subSequenceNumber": 0},
{"action": "processRecords", "millisBehindLatest": 1476889708000, "records":
[
{
"action": "record", "data": "bWVvdw==", "partitionKey": "cat", "sequenceNumber": test_sequence_number,
"subSequenceNumber": 0, "approximateArrivalTimestamp": 1476889707000
}
]
},
{"action": "checkpoint", "sequenceNumber": test_sequence_number, "subSequenceNumber": 0, "error": "Exception"},
{"action": "checkpoint", "sequenceNumber": test_sequence_number, "subSequenceNumber": 0},
{"action": "shardEnded"},
{"action": "checkpoint", "sequenceNumber": test_sequence_number, "subSequenceNumber": 0}
]
def test_kcl_py_integration_test_perfect_input():
test_input_json = "\n".join(map(lambda j: json.dumps(j), test_input_messages))
input_file = make_io_obj(test_input_json)
output_file = make_io_obj()
error_file = make_io_obj()
process = kcl.KCLProcess(RecordProcessor(test_shard_id, test_sequence_number),
input_file=input_file, output_file=output_file, error_file=error_file)
process.run()
'''
The strings are approximately the same, modulo whitespace.
'''
output_message_list = filter(lambda s: s != "", output_file.getvalue().split("\n"))
responses = [json.loads(s) for s in output_message_list]
assert len(responses) == len(test_output_messages)
for i in range(len(responses)):
assert responses[i] == test_output_messages[i]
'''
There should be some error output but it seems like overly specific to make sure that a particular message is printed.
'''
error_output = error_file.getvalue()
assert error_output == ""