Skip to content

create a more pythonic 🐍 exceptions treatment #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 53 additions & 26 deletions amazon_kclpy/kcl.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'''
Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copyright 2014-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.

Licensed under the Amazon Software License (the "License").
You may not use this file except in compliance with the License.
Expand All @@ -12,7 +12,41 @@
express or implied. See the License for the specific language governing
permissions and limitations under the License.
'''
import abc, base64, io, json, os, random, sys, time, traceback
import abc
import json
import sys
import traceback


class CheckpointError(Exception):
'''
Error class used for wrapping exception names passed through the input file.
'''
def __init__(self, value):
'''
:type value: str
:param value: The name of the exception that was received while checkpointing. For more details see
https://github.com/awslabs/amazon-kinesis-client/tree/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/exceptions
Any of those exceptions' names could be returned by the MultiLangDaemon as a response to a checkpoint action.
'''
self.value = value

def __str__(self):
return repr(self.value)


class InvalidStateException(CheckpointError):
def __init__(self, value='InvalidStateException'):
super(self, InvalidStateException).__init__(value)


class ShutdownException(CheckpointError):
pass


class ThrottlingException(CheckpointError):
pass


class _IOHandler(object):
'''
Expand Down Expand Up @@ -87,22 +121,6 @@ def write_action(self, response):
self.write_line(json.dumps(response))


class CheckpointError(Exception):
'''
Error class used for wrapping exception names passed through the input file.
'''
def __init__(self, value):
'''
:type value: str
:param value: The name of the exception that was received while checkpointing. For more details see
https://github.com/awslabs/amazon-kinesis-client/tree/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/exceptions
Any of those exceptions' names could be returned by the MultiLangDaemon as a response to a checkpoint action.
'''
self.value = value

def __str__(self):
return repr(self.value)

class Checkpointer(object):
'''
A checkpointer class which allows you to make checkpoint requests. A checkpoint marks a point in a shard
Expand All @@ -118,6 +136,15 @@ def __init__(self, io_handler):
'''
self.io_handler = io_handler

def _raise_exception_by_str(self, error):
exception_classes = {
'InvalidStateException': InvalidStateException,
'ShutdownException': ShutdownException,
'ThrottlingException': ThrottlingException
}
exception_class = exception_classes.get(error, CheckpointError)
raise exception_class(error)

def _get_action(self):
'''
Gets the next json message from STDIN
Expand All @@ -139,12 +166,12 @@ def checkpoint(self, sequenceNumber=None):
:type sequenceNumber: str
:param sequenceNumber: The sequence number to checkpoint at or None if you want to checkpoint at the farthest record
'''
response = {"action" : "checkpoint", "checkpoint" : sequenceNumber}
response = {"action": "checkpoint", "checkpoint": sequenceNumber}
self.io_handler.write_action(response)
action = self._get_action()
if action.get('action') == 'checkpoint':
if action.get('error') != None:
raise CheckpointError(action.get('error'))
if action.get('error') is not None:
self._raise_exception_by_str(action.get('error'))
else:
'''
We are in an invalid state. We will raise a checkpoint exception
Expand All @@ -153,7 +180,8 @@ def checkpoint(self, sequenceNumber=None):
exception. Note that the documented guidance is that this exception
is NOT retryable so the client code should exit.
'''
raise CheckpointError('InvalidStateException')
raise InvalidStateException()


# RecordProcessor base class
class RecordProcessorBase(object):
Expand Down Expand Up @@ -212,12 +240,14 @@ def shutdown(self, checkpointer, reason):
'''
return


class MalformedAction(Exception):
'''
Raised when an action given by the MultiLangDaemon doesn't have all the appropriate attributes.
'''
pass


class KCLProcess(object):

def __init__(self, record_processor, inputfile=sys.stdin, outputfile=sys.stdout, errorfile=sys.stderr):
Expand Down Expand Up @@ -282,7 +312,7 @@ def _report_done(self, response_for=None):

:param response_for: Required parameter; the action that this status message is confirming completed.
'''
self.io_handler.write_action({"action" : "status", "responseFor" : response_for})
self.io_handler.write_action({"action": "status", "responseFor": response_for})

def _handle_a_line(self, line):
'''
Expand All @@ -298,7 +328,6 @@ def _handle_a_line(self, line):
self._perform_action(action)
self._report_done(action.get('action'))


def run(self):
'''
Starts this KCL processor's main loop.
Expand All @@ -314,5 +343,3 @@ def run(self):
line = self.io_handler.read_line()
if line:
self._handle_a_line(line)


47 changes: 23 additions & 24 deletions samples/sample_kclpy_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
permissions and limitations under the License.
'''
from __future__ import print_function
import sys, time, json, base64
import sys
import time
import base64
from amazon_kclpy import kcl


class RecordProcessor(kcl.RecordProcessorBase):
'''
A RecordProcessor processes a shard in a stream. Its methods will be called with this pattern:
Expand Down Expand Up @@ -54,28 +57,24 @@ def checkpoint(self, checkpointer, sequence_number=None):
try:
checkpointer.checkpoint(sequence_number)
return
except kcl.CheckpointError as e:
if 'ShutdownException' == e.value:
'''
A ShutdownException indicates that this record processor should be shutdown. This is due to
some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
'''
print('Encountered shutdown execption, skipping checkpoint')
except kcl.ShutdownException:
'''
A ShutdownException indicates that this record processor should be shutdown. This is due to
some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
'''
print('Encountered shutdown execption, skipping checkpoint')
return
except kcl.InvalidStateException:
sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
except kcl.ThrottlingException:
if self.CHECKPOINT_RETRIES - 1 == n:
sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
return
elif 'ThrottlingException' == e.value:
'''
A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
dynamo writes. We will sleep temporarily to let it recover.
'''
if self.CHECKPOINT_RETRIES - 1 == n:
sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
return
else:
print('Was throttled while checkpointing, will attempt again in {s} seconds'.format(s=self.SLEEP_SECONDS))
elif 'InvalidStateException' == e.value:
sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
else: # Some other error
sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
else:
print('Was throttled while checkpointing, will attempt again in {s} seconds'.format(s=self.SLEEP_SECONDS))
except kcl.CheckpointError as e:
sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))

time.sleep(self.SLEEP_SECONDS)

def process_record(self, data, partition_key, sequence_number):
Expand Down Expand Up @@ -118,7 +117,7 @@ def process_records(self, records, checkpointer):
seq = int(seq)
key = record.get('partitionKey')
self.process_record(data, key, seq)
if self.largest_seq == None or seq > self.largest_seq:
if self.largest_seq is None or seq > self.largest_seq:
self.largest_seq = seq
# Checkpoints every 60 seconds
if time.time() - self.last_checkpoint_time > self.CHECKPOINT_FREQ_SECONDS:
Expand Down Expand Up @@ -149,7 +148,7 @@ def shutdown(self, checkpointer, reason):
# shard id
print('Was told to terminate, will attempt to checkpoint.')
self.checkpoint(checkpointer, None)
else: # reason == 'ZOMBIE'
else: # reason == 'ZOMBIE'
print('Shutting down due to failover. Will not checkpoint.')
except:
pass
Expand Down