|
9 | 9 | from functools import partial
|
10 | 10 | from typing import Literal
|
11 | 11 |
|
12 |
| -import msgpack |
13 | 12 | import sentry_sdk
|
14 | 13 | from arroyo.backends.kafka.consumer import KafkaPayload
|
15 | 14 | from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
|
@@ -864,11 +863,7 @@ def process_batch(executor: ThreadPoolExecutor, message: Message[ValuesBatch[Kaf
|
864 | 863 | assert isinstance(item, BrokerValue)
|
865 | 864 |
|
866 | 865 | try:
|
867 |
| - try: |
868 |
| - wrapper: IngestMonitorMessage = MONITOR_CODEC.decode(item.payload.value) |
869 |
| - except ValidationError: |
870 |
| - wrapper = msgpack.unpackb(item.payload.value) |
871 |
| - logger.exception("Failed to unpack message payload via sentry_kafka_schemas") |
| 866 | + wrapper: IngestMonitorMessage = MONITOR_CODEC.decode(item.payload.value) |
872 | 867 | except Exception:
|
873 | 868 | logger.exception("Failed to unpack message payload")
|
874 | 869 | continue
|
@@ -913,12 +908,7 @@ def process_batch(executor: ThreadPoolExecutor, message: Message[ValuesBatch[Kaf
|
913 | 908 | def process_single(message: Message[KafkaPayload]):
|
914 | 909 | assert isinstance(message.value, BrokerValue)
|
915 | 910 | try:
|
916 |
| - try: |
917 |
| - wrapper: IngestMonitorMessage = MONITOR_CODEC.decode(message.payload.value) |
918 |
| - except ValidationError: |
919 |
| - logger.exception("Failed to unpack message payload via sentry_kafka_schemas") |
920 |
| - wrapper = msgpack.unpackb(message.payload.value) |
921 |
| - |
| 911 | + wrapper: IngestMonitorMessage = MONITOR_CODEC.decode(message.payload.value) |
922 | 912 | ts = message.value.timestamp
|
923 | 913 | partition = message.value.partition.index
|
924 | 914 |
|
|
0 commit comments