forked from open-telemetry/opentelemetry-python-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
39 lines (28 loc) · 927 Bytes
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from confluent_kafka import Consumer
class MockConsumer(Consumer):
def __init__(self, queue, config):
self._queue = queue
super().__init__(config)
def consume(self, num_messages=1, *args, **kwargs):
messages = self._queue[:num_messages]
self._queue = self._queue[num_messages:]
return messages
def poll(self, timeout=None):
if len(self._queue) > 0:
return self._queue.pop(0)
else:
return None
class MockedMessage:
def __init__(self, topic: str, partition: int, offset: int, headers):
self._topic = topic
self._partition = partition
self._offset = offset
self._headers = headers
def topic(self):
return self._topic
def partition(self):
return self._partition
def offset(self):
return self._offset
def headers(self):
return self._headers