Skip to content

Add shutdown requested implemention in python client. #53

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 23, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions amazon_kclpy/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class MalformedAction(Exception):
"processRecords": messages.ProcessRecordsInput,
"shutdown": messages.ShutdownInput,
"checkpoint": messages.CheckpointInput,
"record": messages.Record
"record": messages.Record,
"shutdownRequested": messages.ShutdownRequestedInput
}


Expand Down Expand Up @@ -56,4 +57,4 @@ def message_decode(json_dict):
raise MalformedAction("Received an action which couldn't be understood. Action was '{action}' -- Allowed {keys}"
.format(action=action, keys=_format_serializer_names()))

return serializer(json_dict)
return serializer(json_dict)
11 changes: 11 additions & 0 deletions amazon_kclpy/kcl.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,17 @@ def shutdown(self, checkpointer, reason):
'''
raise NotImplementedError

@abc.abstractmethod
def shutdownRequested(self, checkpointer):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should follow Python PEP8 style e.g. shutdown_requested.

For the v2 interface this should also use intermediate type, even though I did that wrong with the interface in the Java KCL. See v2/processor.py#L51 for an example

'''
Called by a KCLProcess instance to indicate that this record processor is being shutdown.
And it gives an opportunity for record processor to checkpoint before shutdown.

:type checkpointer: amazon_kclpy.kcl.Checkpointer
:param checkpointer: A checkpointer which accepts a sequence number or no parameters.
'''
raise NotImplementedError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably do nothing. If someone upgrades to the newer version without implementing the shutdownRequested it would cause their application to fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally this should be placed on the v2 version of the record processor in v2/processor.py


version = 1


Expand Down
32 changes: 32 additions & 0 deletions amazon_kclpy/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,38 @@ def dispatch(self, checkpointer, record_processor):
self._checkpointer = checkpointer
record_processor.shutdown(self)

class ShutdownRequestedInput(MessageDispatcher):
"""
Used to tell the record processor it will be shutdown.
"""
def __init__(self, json_dict):
self._checkpointer = None
self._action = json_dict['action']

@property
def checkpointer(self):
"""
The checkpointer that can be used to checkpoint before actual shutdown.

:return: the checkpointer
:rtype: amazon_kclpy.kcl.Checkpointer
"""
return self._checkpointer

@property
def action(self):
"""
The action that spawned this message

:return: the original action value
:rtype: str
"""
return self._action

def dispatch(self, checkpointer, record_processor):
self._checkpointer = checkpointer
record_processor.shutdown_requested(self)


class CheckpointInput(object):
"""
Expand Down