diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 82aaa68e9..cba8a0451 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -12,7 +12,7 @@ from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \ ACLResourcePatternType from kafka.client_async import KafkaClient, selectors -from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol +from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0, ConsumerProtocol_v0 import kafka.errors as Errors from kafka.errors import ( IncompatibleBrokerVersion, KafkaConfigurationError, UnknownTopicOrPartitionError, @@ -1316,7 +1316,7 @@ def _describe_consumer_groups_process_response(self, response): for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields): if group_information_name == 'protocol_type': protocol_type = described_group_information - protocol_type_is_consumer = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type) + protocol_type_is_consumer = (protocol_type == ConsumerProtocol_v0.PROTOCOL_TYPE or not protocol_type) if isinstance(group_information_field, Array): member_information_list = [] member_schema = group_information_field.array_of @@ -1325,9 +1325,9 @@ def _describe_consumer_groups_process_response(self, response): for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names): if protocol_type_is_consumer: if member_name == 'member_metadata' and member: - member_information.append(ConsumerProtocolMemberMetadata.decode(member)) + member_information.append(ConsumerProtocolMemberMetadata_v0.decode(member)) elif member_name == 'member_assignment' and member: - member_information.append(ConsumerProtocolMemberAssignment.decode(member)) + member_information.append(ConsumerProtocolMemberAssignment_v0.decode(member)) else: member_information.append(member) member_info_tuple = MemberInformation._make(member_information) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index ce3cf9203..8d9ea9f17 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -2,6 +2,7 @@ import copy import logging +import re import socket import time @@ -57,6 +58,14 @@ class KafkaConsumer(six.Iterator): committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None + group_instance_id (str): A unique identifier of the consumer instance + provided by end user. Only non-empty strings are permitted. If set, + the consumer is treated as a static member, which means that only + one instance with this ID is allowed in the consumer group at any + time. This can be used in combination with a larger session timeout + to avoid group rebalances caused by transient unavailability (e.g. + process restarts). If not set, the consumer will join the group as + a dynamic member, which is the traditional behavior. Default: None key_deserializer (callable): Any callable that takes a raw message key and returns a deserialized key. value_deserializer (callable): Any callable that takes a @@ -276,6 +285,7 @@ class KafkaConsumer(six.Iterator): 'bootstrap_servers': 'localhost', 'client_id': 'kafka-python-' + __version__, 'group_id': None, + 'group_instance_id': None, 'key_deserializer': None, 'value_deserializer': None, 'enable_incremental_fetch_sessions': True, @@ -408,6 +418,10 @@ def __init__(self, *topics, **configs): "Request timeout (%s) must be larger than session timeout (%s)" % (self.config['request_timeout_ms'], self.config['session_timeout_ms'])) + if self.config['group_instance_id'] is not None: + if self.config['group_id'] is None: + raise KafkaConfigurationError("group_instance_id requires group_id") + self._subscription = SubscriptionState(self.config['auto_offset_reset']) self._fetcher = Fetcher( self._client, self._subscription, metrics=self._metrics, **self.config) @@ -423,6 +437,16 @@ def __init__(self, *topics, **configs): self._subscription.subscribe(topics=topics) self._client.set_topics(topics) + def _validate_group_instance_id(self, group_instance_id): + if not group_instance_id or not isinstance(group_instance_id, str): + raise KafkaConfigurationError("group_instance_id must be non-empty string") + if group_instance_id in (".", ".."): + raise KafkaConfigurationError("group_instance_id cannot be \".\" or \"..\"") + if len(group_instance_id) > 249: + raise KafkaConfigurationError("group_instance_id can't be longer than 249 characters") + if not re.match(r'^[A-Za-z0-9\.\_\-]+$', group_instance_id): + raise KafkaConfigurationError("group_instance_id is illegal: it contains a character other than ASCII alphanumerics, '.', '_' and '-'") + def bootstrap_connected(self): """Return True if the bootstrap is connected.""" return self._client.bootstrap_connected() diff --git a/kafka/coordinator/assignors/abstract.py b/kafka/coordinator/assignors/abstract.py index a1fef3840..a6fe970d2 100644 --- a/kafka/coordinator/assignors/abstract.py +++ b/kafka/coordinator/assignors/abstract.py @@ -23,8 +23,9 @@ def assign(self, cluster, members): Arguments: cluster (ClusterMetadata): metadata for use in assignment - members (dict of {member_id: MemberMetadata}): decoded metadata for - each member in the group. + members (dict of {member_id: Subscription}): decoded metadata + for each member in the group, including group_instance_id + when available. Returns: dict: {member_id: MemberAssignment} diff --git a/kafka/coordinator/assignors/range.py b/kafka/coordinator/assignors/range.py index 299e39c48..d639d5b75 100644 --- a/kafka/coordinator/assignors/range.py +++ b/kafka/coordinator/assignors/range.py @@ -1,12 +1,13 @@ from __future__ import absolute_import import collections +import itertools import logging from kafka.vendor import six from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor -from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment +from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0 log = logging.getLogger(__name__) @@ -32,37 +33,41 @@ class RangePartitionAssignor(AbstractPartitionAssignor): version = 0 @classmethod - def assign(cls, cluster, member_metadata): + def assign(cls, cluster, group_subscriptions): consumers_per_topic = collections.defaultdict(list) - for member, metadata in six.iteritems(member_metadata): - for topic in metadata.subscription: - consumers_per_topic[topic].append(member) + for member_id, subscription in six.iteritems(group_subscriptions): + for topic in subscription.topics: + consumers_per_topic[topic].append((subscription.group_instance_id, member_id)) # construct {member_id: {topic: [partition, ...]}} assignment = collections.defaultdict(dict) + for topic in consumers_per_topic: + # group by static members (True) v dynamic members (False) + grouped = {k: list(g) for k, g in itertools.groupby(consumers_per_topic[topic], key=lambda ids: ids[0] is not None)} + consumers_per_topic[topic] = sorted(grouped.get(True, [])) + sorted(grouped.get(False, [])) # sorted static members first, then sorted dynamic + for topic, consumers_for_topic in six.iteritems(consumers_per_topic): partitions = cluster.partitions_for_topic(topic) if partitions is None: log.warning('No partition metadata for topic %s', topic) continue partitions = sorted(partitions) - consumers_for_topic.sort() partitions_per_consumer = len(partitions) // len(consumers_for_topic) consumers_with_extra = len(partitions) % len(consumers_for_topic) - for i, member in enumerate(consumers_for_topic): + for i, (_group_instance_id, member_id) in enumerate(consumers_for_topic): start = partitions_per_consumer * i start += min(i, consumers_with_extra) length = partitions_per_consumer if not i + 1 > consumers_with_extra: length += 1 - assignment[member][topic] = partitions[start:start+length] + assignment[member_id][topic] = partitions[start:start+length] protocol_assignment = {} - for member_id in member_metadata: - protocol_assignment[member_id] = ConsumerProtocolMemberAssignment( + for member_id in group_subscriptions: + protocol_assignment[member_id] = ConsumerProtocolMemberAssignment_v0( cls.version, sorted(assignment[member_id].items()), b'') @@ -70,7 +75,7 @@ def assign(cls, cluster, member_metadata): @classmethod def metadata(cls, topics): - return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'') + return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'') @classmethod def on_assignment(cls, assignment): diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py index 2d24a5c8b..8d83972cc 100644 --- a/kafka/coordinator/assignors/roundrobin.py +++ b/kafka/coordinator/assignors/roundrobin.py @@ -7,7 +7,7 @@ from kafka.vendor import six from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor -from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment +from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0 from kafka.structs import TopicPartition log = logging.getLogger(__name__) @@ -49,10 +49,10 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor): version = 0 @classmethod - def assign(cls, cluster, member_metadata): + def assign(cls, cluster, group_subscriptions): all_topics = set() - for metadata in six.itervalues(member_metadata): - all_topics.update(metadata.subscription) + for subscription in six.itervalues(group_subscriptions): + all_topics.update(subscription.topics) all_topic_partitions = [] for topic in all_topics: @@ -67,21 +67,26 @@ def assign(cls, cluster, member_metadata): # construct {member_id: {topic: [partition, ...]}} assignment = collections.defaultdict(lambda: collections.defaultdict(list)) - member_iter = itertools.cycle(sorted(member_metadata.keys())) + # Sort static and dynamic members separately to maintain stable static assignments + ungrouped = [(subscription.group_instance_id, member_id) for member_id, subscription in six.iteritems(group_subscriptions)] + grouped = {k: list(g) for k, g in itertools.groupby(ungrouped, key=lambda ids: ids[0] is not None)} + member_list = sorted(grouped.get(True, [])) + sorted(grouped.get(False, [])) # sorted static members first, then sorted dynamic + member_iter = itertools.cycle(member_list) + for partition in all_topic_partitions: - member_id = next(member_iter) + _group_instance_id, member_id = next(member_iter) # Because we constructed all_topic_partitions from the set of # member subscribed topics, we should be safe assuming that # each topic in all_topic_partitions is in at least one member # subscription; otherwise this could yield an infinite loop - while partition.topic not in member_metadata[member_id].subscription: + while partition.topic not in group_subscriptions[member_id].topics: member_id = next(member_iter) assignment[member_id][partition.topic].append(partition.partition) protocol_assignment = {} - for member_id in member_metadata: - protocol_assignment[member_id] = ConsumerProtocolMemberAssignment( + for member_id in group_subscriptions: + protocol_assignment[member_id] = ConsumerProtocolMemberAssignment_v0( cls.version, sorted(assignment[member_id].items()), b'') @@ -89,7 +94,7 @@ def assign(cls, cluster, member_metadata): @classmethod def metadata(cls, topics): - return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'') + return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'') @classmethod def on_assignment(cls, assignment): diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py index 69f68f564..3166356fe 100644 --- a/kafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py @@ -5,7 +5,7 @@ from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements from kafka.coordinator.assignors.sticky.sorted_set import SortedSet -from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment +from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0 from kafka.coordinator.protocol import Schema from kafka.protocol.struct import Struct from kafka.protocol.types import String, Array, Int32 @@ -66,6 +66,7 @@ class StickyAssignorUserDataV1(Struct): class StickyAssignmentExecutor: def __init__(self, cluster, members): + # a mapping of member_id => StickyAssignorMemberMetadataV1 self.members = members # a mapping between consumers and their assigned partitions that is updated during assignment procedure self.current_assignment = defaultdict(list) @@ -603,7 +604,7 @@ def assign(cls, cluster, members): assignment = {} for member_id in members: - assignment[member_id] = ConsumerProtocolMemberAssignment( + assignment[member_id] = ConsumerProtocolMemberAssignment_v0( cls.version, sorted(executor.get_final_assignment(member_id)), b'' ) return assignment @@ -625,16 +626,16 @@ def parse_member_metadata(cls, metadata): user_data = metadata.user_data if not user_data: return StickyAssignorMemberMetadataV1( - partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription + partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.topics ) try: decoded_user_data = StickyAssignorUserDataV1.decode(user_data) - except Exception as e: + except Exception: # ignore the consumer's previous assignment if it cannot be parsed - log.error("Could not parse member data", e) # pylint: disable=logging-too-many-args + log.exception("Could not parse member data") return StickyAssignorMemberMetadataV1( - partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription + partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.topics ) member_partitions = [] @@ -642,7 +643,7 @@ def parse_member_metadata(cls, metadata): member_partitions.extend([TopicPartition(topic, partition) for partition in partitions]) return StickyAssignorMemberMetadataV1( # pylint: disable=no-member - partitions=member_partitions, generation=decoded_user_data.generation, subscription=metadata.subscription + partitions=member_partitions, generation=decoded_user_data.generation, subscription=metadata.topics ) @classmethod @@ -661,7 +662,7 @@ def _metadata(cls, topics, member_assignment_partitions, generation=-1): partitions_by_topic[topic_partition.topic].append(topic_partition.partition) data = StickyAssignorUserDataV1(list(partitions_by_topic.items()), generation) user_data = data.encode() - return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data) + return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), user_data) @classmethod def on_assignment(cls, assignment): diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 9fffd19c6..c29ad0dee 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -16,7 +16,10 @@ from kafka.metrics import AnonMeasurable from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.find_coordinator import FindCoordinatorRequest -from kafka.protocol.group import HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest, DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID +from kafka.protocol.group import ( + HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest, + DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, GroupMember, +) from kafka.util import Timer log = logging.getLogger('kafka.coordinator') @@ -35,9 +38,12 @@ def __init__(self, generation_id, member_id, protocol): self.member_id = member_id self.protocol = protocol - @property - def is_valid(self): - return self.generation_id != DEFAULT_GENERATION_ID + def has_member_id(self): + """ + True if this generation has a valid member id, False otherwise. + A member might have an id before it becomes part of a group generation. + """ + return self.member_id != UNKNOWN_MEMBER_ID def __eq__(self, other): return (self.generation_id == other.generation_id and @@ -94,6 +100,7 @@ class BaseCoordinator(object): DEFAULT_CONFIG = { 'group_id': 'kafka-python-default-group', + 'group_instance_id': None, 'session_timeout_ms': 10000, 'heartbeat_interval_ms': 3000, 'max_poll_interval_ms': 300000, @@ -135,7 +142,6 @@ def __init__(self, client, **configs): "and session_timeout_ms") self._client = client - self.group_id = self.config['group_id'] self.heartbeat = Heartbeat(**self.config) self._heartbeat_thread = None self._lock = threading.Condition() @@ -152,6 +158,14 @@ def __init__(self, client, **configs): else: self._sensors = None + @property + def group_id(self): + return self.config['group_id'] + + @property + def group_instance_id(self): + return self.config['group_instance_id'] + @abc.abstractmethod def protocol_type(self): """ @@ -205,10 +219,10 @@ def _perform_assignment(self, leader_id, protocol, members): Arguments: leader_id (str): The id of the leader (which is this member) protocol (str): the chosen group protocol (assignment strategy) - members (list): [(member_id, metadata_bytes)] from - JoinGroupResponse. metadata_bytes are associated with the chosen - group protocol, and the Coordinator subclass is responsible for - decoding metadata_bytes based on that protocol. + members (list): [GroupMember] from JoinGroupResponse. + metadata_bytes are associated with the chosen group protocol, + and the Coordinator subclass is responsible for decoding + metadata_bytes based on that protocol. Returns: dict: {member_id: assignment}; assignment must either be bytes @@ -534,7 +548,7 @@ def _send_join_group_request(self): (protocol, metadata if isinstance(metadata, bytes) else metadata.encode()) for protocol, metadata in self.group_protocols() ] - version = self._client.api_version(JoinGroupRequest, max_version=4) + version = self._client.api_version(JoinGroupRequest, max_version=5) if version == 0: request = JoinGroupRequest[version]( self.group_id, @@ -542,12 +556,21 @@ def _send_join_group_request(self): self._generation.member_id, self.protocol_type(), member_metadata) + elif version <= 4: + request = JoinGroupRequest[version]( + self.group_id, + self.config['session_timeout_ms'], + self.config['max_poll_interval_ms'], + self._generation.member_id, + self.protocol_type(), + member_metadata) else: request = JoinGroupRequest[version]( self.group_id, self.config['session_timeout_ms'], self.config['max_poll_interval_ms'], self._generation.member_id, + self.group_instance_id, self.protocol_type(), member_metadata) @@ -620,16 +643,17 @@ def _handle_join_group_response(self, future, send_time, response): future.failure(error_type()) elif error_type in (Errors.InconsistentGroupProtocolError, Errors.InvalidSessionTimeoutError, - Errors.InvalidGroupIdError): + Errors.InvalidGroupIdError, + Errors.GroupAuthorizationFailedError, + Errors.GroupMaxSizeReachedError, + Errors.FencedInstanceIdError): # log the error and re-throw the exception - error = error_type(response) log.error("Attempt to join group %s failed due to fatal error: %s", - self.group_id, error) - future.failure(error) - elif error_type is Errors.GroupAuthorizationFailedError: - log.error("Attempt to join group %s failed due to group authorization error", - self.group_id) - future.failure(error_type(self.group_id)) + self.group_id, error_type.__name__) + if error_type in (Errors.GroupAuthorizationFailedError, Errors.GroupMaxSizeReachedError): + future.failure(error_type(self.group_id)) + else: + future.failure(error_type()) elif error_type is Errors.MemberIdRequiredError: # Broker requires a concrete member id to be allowed to join the group. Update member id # and send another join group request in next cycle. @@ -650,12 +674,20 @@ def _handle_join_group_response(self, future, send_time, response): def _on_join_follower(self): # send follower's sync group with an empty assignment - version = self._client.api_version(SyncGroupRequest, max_version=2) - request = SyncGroupRequest[version]( - self.group_id, - self._generation.generation_id, - self._generation.member_id, - {}) + version = self._client.api_version(SyncGroupRequest, max_version=3) + if version <= 2: + request = SyncGroupRequest[version]( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + []) + else: + request = SyncGroupRequest[version]( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + self.group_instance_id, + []) log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) return self._send_sync_group_request(request) @@ -672,21 +704,32 @@ def _on_join_leader(self, response): Future: resolves to member assignment encoded-bytes """ try: + members = [GroupMember(*member) if response.API_VERSION >= 5 else GroupMember(member[0], None, member[1]) + for member in response.members] group_assignment = self._perform_assignment(response.leader_id, response.group_protocol, - response.members) + members) + for member_id, assignment in six.iteritems(group_assignment): + if not isinstance(assignment, bytes): + group_assignment[member_id] = assignment.encode() + except Exception as e: return Future().failure(e) - version = self._client.api_version(SyncGroupRequest, max_version=2) - request = SyncGroupRequest[version]( - self.group_id, - self._generation.generation_id, - self._generation.member_id, - [(member_id, - assignment if isinstance(assignment, bytes) else assignment.encode()) - for member_id, assignment in six.iteritems(group_assignment)]) - + version = self._client.api_version(SyncGroupRequest, max_version=3) + if version <= 2: + request = SyncGroupRequest[version]( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + group_assignment.items()) + else: + request = SyncGroupRequest[version]( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + self.group_instance_id, + group_assignment.items()) log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, request) return self._send_sync_group_request(request) @@ -726,6 +769,10 @@ def _handle_sync_group_response(self, future, send_time, response): log.info("SyncGroup for group %s failed due to coordinator" " rebalance", self.group_id) future.failure(error_type(self.group_id)) + elif error_type is Errors.FencedInstanceIdError: + log.error("SyncGroup for group %s failed due to fenced id error: %s", + self.group_id, self.group_instance_id) + future.failure(error_type((self.group_id, self.group_instance_id))) elif error_type in (Errors.UnknownMemberIdError, Errors.IllegalGenerationError): error = error_type() @@ -878,20 +925,28 @@ def close(self, timeout_ms=None): if self.config['api_version'] >= (0, 9): self.maybe_leave_group(timeout_ms=timeout_ms) + def is_dynamic_member(self): + return self.group_instance_id is None or self.config['api_version'] < (2, 3) + def maybe_leave_group(self, timeout_ms=None): """Leave the current group and reset local generation/memberId.""" if self.config['api_version'] < (0, 9): raise Errors.UnsupportedVersionError('Group Coordinator APIs require 0.9+ broker') with self._client._lock, self._lock: - if (not self.coordinator_unknown() - and self.state is not MemberState.UNJOINED - and self._generation.is_valid): + # Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker, + # consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup, + # and the membership expiration is only controlled by session timeout. + if (self.is_dynamic_member() and not self.coordinator_unknown() + and self.state is not MemberState.UNJOINED and self._generation.has_member_id()): # this is a minimal effort attempt to leave the group. we do not # attempt any resending if the request fails or times out. - log.info('Leaving consumer group (%s).', self.group_id) - version = self._client.api_version(LeaveGroupRequest, max_version=2) - request = LeaveGroupRequest[version](self.group_id, self._generation.member_id) + log.info('Leaving consumer group %s (member %s).', self.group_id, self._generation.member_id) + version = self._client.api_version(LeaveGroupRequest, max_version=3) + if version <= 2: + request = LeaveGroupRequest[version](self.group_id, self._generation.member_id) + else: + request = LeaveGroupRequest[version](self.group_id, [(self._generation.member_id, self.group_instance_id)]) log.debug('Sending LeaveGroupRequest to %s: %s', self.coordinator_id, request) future = self._client.send(self.coordinator_id, request) future.add_callback(self._handle_leave_group_response) @@ -909,6 +964,15 @@ def _handle_leave_group_response(self, response): else: log.error("LeaveGroup request for group %s failed with error: %s", self.group_id, error_type()) + if response.API_VERSION >= 3: + for member_id, group_instance_id, error_code in response.members: + error_type = Errors.for_code(error_code) + if error_type is Errors.NoError: + log.debug("LeaveGroup request for member %s / group instance %s returned successfully", + member_id, group_instance_id) + else: + log.error("LeaveGroup request for member %s / group instance %s failed with error: %s", + member_id, group_instance_id, error_type()) def _send_heartbeat_request(self): """Send a heartbeat request""" @@ -921,10 +985,20 @@ def _send_heartbeat_request(self): e = Errors.NodeNotReadyError(self.coordinator_id) return Future().failure(e) - version = self._client.api_version(HeartbeatRequest, max_version=2) - request = HeartbeatRequest[version](self.group_id, - self._generation.generation_id, - self._generation.member_id) + version = self._client.api_version(HeartbeatRequest, max_version=3) + if version <=2: + request = HeartbeatRequest[version]( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + ) + else: + request = HeartbeatRequest[version]( + self.group_id, + self._generation.generation_id, + self._generation.member_id, + self.group_instance_id, + ) heartbeat_log.debug("Sending HeartbeatRequest to %s: %s", self.coordinator_id, request) future = Future() _f = self._client.send(self.coordinator_id, request) @@ -958,6 +1032,10 @@ def _handle_heartbeat_response(self, future, send_time, response): " current.", self.group_id) self.reset_generation() future.failure(error_type()) + elif error_type is Errors.FencedInstanceIdError: + heartbeat_log.error("Heartbeat failed for group %s due to fenced id error: %s", + self.group_id, self.group_instance_id) + future.failure(error_type((self.group_id, self.group_instance_id))) elif error_type is Errors.UnknownMemberIdError: heartbeat_log.warning("Heartbeat: local member_id was not recognized;" " this consumer needs to re-join") @@ -1177,6 +1255,11 @@ def _handle_heartbeat_failure(self, exception): # then the session timeout may expire before we can rejoin. heartbeat_log.debug('Treating RebalanceInProgressError as successful heartbeat') self.coordinator.heartbeat.received_heartbeat() + elif isinstance(exception, Errors.FencedInstanceIdError): + heartbeat_log.error("Heartbeat thread caught fenced group_instance_id %s error", + self.coordinator.group_instance_id) + self.failed = exception + self.disable() else: heartbeat_log.debug('Heartbeat failure: %s', exception) self.coordinator.heartbeat.fail_heartbeat() diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index ddd413b82..fcb0a422e 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -13,6 +13,7 @@ from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor from kafka.coordinator.protocol import ConsumerProtocol +from kafka.coordinator.subscription import Subscription import kafka.errors as Errors from kafka.future import Future from kafka.metrics import AnonMeasurable @@ -29,6 +30,7 @@ class ConsumerCoordinator(BaseCoordinator): """This class manages the coordination process with the consumer coordinator.""" DEFAULT_CONFIG = { 'group_id': 'kafka-python-default-group', + 'group_instance_id': None, 'enable_auto_commit': True, 'auto_commit_interval_ms': 5000, 'default_offset_commit_callback': None, @@ -50,6 +52,14 @@ def __init__(self, client, subscription, **configs): group_id (str): name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. Default: 'kafka-python-default-group' + group_instance_id (str): A unique identifier of the consumer instance + provided by end user. Only non-empty strings are permitted. If set, + the consumer is treated as a static member, which means that only + one instance with this ID is allowed in the consumer group at any + time. This can be used in combination with a larger session timeout + to avoid group rebalances caused by transient unavailability (e.g. + process restarts). If not set, the consumer will join the group as + a dynamic member, which is the traditional behavior. Default: None enable_auto_commit (bool): If true the consumer's offset will be periodically committed in the background. Default: True. auto_commit_interval_ms (int): milliseconds between automatic @@ -96,6 +106,7 @@ def __init__(self, client, subscription, **configs): self.next_auto_commit_deadline = None self.completed_offset_commits = collections.deque() self._offset_fetch_futures = dict() + self._async_commit_fenced = False if self.config['default_offset_commit_callback'] is None: self.config['default_offset_commit_callback'] = self._default_offset_commit_callback @@ -140,7 +151,7 @@ def __del__(self): super(ConsumerCoordinator, self).__del__() def protocol_type(self): - return ConsumerProtocol.PROTOCOL_TYPE + return ConsumerProtocol[0].PROTOCOL_TYPE def group_protocols(self): """Returns list of preferred (protocols, metadata)""" @@ -228,7 +239,7 @@ def _on_join_complete(self, generation, member_id, protocol, assignor = self._lookup_assignor(protocol) assert assignor, 'Coordinator selected invalid assignment protocol: %s' % (protocol,) - assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) + assignment = ConsumerProtocol[0].ASSIGNMENT.decode(member_assignment_bytes) try: self._subscription.assign_from_subscribed(assignment.partitions()) @@ -320,12 +331,15 @@ def time_to_next_poll(self): def _perform_assignment(self, leader_id, assignment_strategy, members): assignor = self._lookup_assignor(assignment_strategy) assert assignor, 'Invalid assignment protocol: %s' % (assignment_strategy,) - member_metadata = {} + member_subscriptions = {} all_subscribed_topics = set() - for member_id, metadata_bytes in members: - metadata = ConsumerProtocol.METADATA.decode(metadata_bytes) - member_metadata[member_id] = metadata - all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member + for member in members: + subscription = Subscription( + ConsumerProtocol[0].METADATA.decode(member.metadata_bytes), + member.group_instance_id + ) + member_subscriptions[member.member_id] = subscription + all_subscribed_topics.update(subscription.topics) # the leader will begin watching for changes to any of the topics # the group is interested in, which ensures that all metadata changes @@ -343,9 +357,9 @@ def _perform_assignment(self, leader_id, assignment_strategy, members): log.debug("Performing assignment for group %s using strategy %s" " with subscriptions %s", self.group_id, assignor.name, - member_metadata) + member_subscriptions) - assignments = assignor.assign(self._cluster, member_metadata) + assignments = assignor.assign(self._cluster, member_subscriptions) log.debug("Finished assignment for group %s: %s", self.group_id, assignments) @@ -470,6 +484,8 @@ def close(self, autocommit=True, timeout_ms=None): super(ConsumerCoordinator, self).close(timeout_ms=timeout_ms) def _invoke_completed_offset_commit_callbacks(self): + if self._async_commit_fenced: + raise Errors.FencedInstanceIdError("Get fenced exception for group_instance_id %s", self.group_instance_id) while self.completed_offset_commits: callback, offsets, res_or_exc = self.completed_offset_commits.popleft() callback(offsets, res_or_exc) @@ -521,6 +537,10 @@ def _do_commit_offsets_async(self, offsets, callback=None): callback = self.config['default_offset_commit_callback'] future = self._send_offset_commit_request(offsets) future.add_both(lambda res: self.completed_offset_commits.appendleft((callback, offsets, res))) + def _maybe_set_async_commit_fenced(exc): + if isinstance(exc, Errors.FencedInstanceIdError): + self._async_commit_fenced = True + future.add_errback(_maybe_set_async_commit_fenced) return future def commit_offsets_sync(self, offsets, timeout_ms=None): @@ -619,7 +639,7 @@ def _send_offset_commit_request(self, offsets): for tp, offset in six.iteritems(offsets): offset_data[tp.topic][tp.partition] = offset - version = self._client.api_version(OffsetCommitRequest, max_version=6) + version = self._client.api_version(OffsetCommitRequest, max_version=7) if version > 1 and self._subscription.partitions_auto_assigned(): generation = self.generation_if_stable() else: @@ -697,11 +717,26 @@ def _send_offset_commit_request(self, offsets): ) for partition, offset in six.iteritems(partitions)] ) for topic, partitions in six.iteritems(offset_data)] ) + elif version <= 6: + request = OffsetCommitRequest[version]( + self.group_id, + generation.generation_id, + generation.member_id, + [( + topic, [( + partition, + offset.offset, + offset.leader_epoch, + offset.metadata + ) for partition, offset in six.iteritems(partitions)] + ) for topic, partitions in six.iteritems(offset_data)] + ) else: request = OffsetCommitRequest[version]( self.group_id, generation.generation_id, generation.member_id, + self.group_instance_id, [( topic, [( partition, @@ -774,6 +809,11 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response): self.request_rejoin() future.failure(Errors.CommitFailedError(error_type())) return + elif error_type is Errors.FencedInstanceIdError: + log.error("OffsetCommit for group %s failed due to fenced id error: %s", + self.group_id, self.group_instance_id) + future.failure(error_type()) + return elif error_type in (Errors.UnknownMemberIdError, Errors.IllegalGenerationError): # need reset generation and re-join group diff --git a/kafka/coordinator/protocol.py b/kafka/coordinator/protocol.py index 56a390159..bfa1c4695 100644 --- a/kafka/coordinator/protocol.py +++ b/kafka/coordinator/protocol.py @@ -5,14 +5,14 @@ from kafka.structs import TopicPartition -class ConsumerProtocolMemberMetadata(Struct): +class ConsumerProtocolMemberMetadata_v0(Struct): SCHEMA = Schema( ('version', Int16), - ('subscription', Array(String('utf-8'))), + ('topics', Array(String('utf-8'))), ('user_data', Bytes)) -class ConsumerProtocolMemberAssignment(Struct): +class ConsumerProtocolMemberAssignment_v0(Struct): SCHEMA = Schema( ('version', Int16), ('assignment', Array( @@ -26,8 +26,10 @@ def partitions(self): for partition in partitions] -class ConsumerProtocol(object): +class ConsumerProtocol_v0(object): PROTOCOL_TYPE = 'consumer' - ASSIGNMENT_STRATEGIES = ('range', 'roundrobin') - METADATA = ConsumerProtocolMemberMetadata - ASSIGNMENT = ConsumerProtocolMemberAssignment + METADATA = ConsumerProtocolMemberMetadata_v0 + ASSIGNMENT = ConsumerProtocolMemberAssignment_v0 + + +ConsumerProtocol = [ConsumerProtocol_v0] diff --git a/kafka/coordinator/subscription.py b/kafka/coordinator/subscription.py new file mode 100644 index 000000000..ca49c1bc0 --- /dev/null +++ b/kafka/coordinator/subscription.py @@ -0,0 +1,37 @@ +from __future__ import absolute_import + + +class Subscription(object): + __slots__ = ('_metadata', '_group_instance_id') + def __init__(self, metadata, group_instance_id): + self._metadata = metadata + self._group_instance_id = group_instance_id + + @property + def version(self): + return self._metadata.version + + @property + def user_data(self): + return self._metadata.user_data + + @property + def topics(self): + return self._metadata.topics + + # Alias for old interface / name + subscription = topics + + @property + def group_instance_id(self): + return self._group_instance_id + + def encode(self): + return self._metadata.encode() + + def __eq__(self, other): + return ( + isinstance(other, Subscription) and + self._metadata == other._metadata and + self._group_instance_id == other._group_instance_id + ) diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py index a0439e7ef..c1a65bb96 100644 --- a/kafka/protocol/commit.py +++ b/kafka/protocol/commit.py @@ -59,6 +59,12 @@ class OffsetCommitResponse_v6(Response): SCHEMA = OffsetCommitResponse_v5.SCHEMA +class OffsetCommitResponse_v7(Response): + API_KEY = 8 + API_VERSION = 7 + SCHEMA = OffsetCommitResponse_v6.SCHEMA + + class OffsetCommitRequest_v0(Request): API_KEY = 8 API_VERSION = 0 # Zookeeper-backed storage @@ -162,17 +168,34 @@ class OffsetCommitRequest_v6(Request): ) +class OffsetCommitRequest_v7(Request): + API_KEY = 8 + API_VERSION = 7 + RESPONSE_TYPE = OffsetCommitResponse_v6 + SCHEMA = Schema( + ('group_id', String('utf-8')), + ('generation_id', Int32), + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), # added for static membership / kip-345 + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('leader_epoch', Int32), + ('metadata', String('utf-8')))))) + ) + + OffsetCommitRequest = [ - OffsetCommitRequest_v0, OffsetCommitRequest_v1, - OffsetCommitRequest_v2, OffsetCommitRequest_v3, - OffsetCommitRequest_v4, OffsetCommitRequest_v5, - OffsetCommitRequest_v6, + OffsetCommitRequest_v0, OffsetCommitRequest_v1, OffsetCommitRequest_v2, + OffsetCommitRequest_v3, OffsetCommitRequest_v4, OffsetCommitRequest_v5, + OffsetCommitRequest_v6, OffsetCommitRequest_v7, ] OffsetCommitResponse = [ - OffsetCommitResponse_v0, OffsetCommitResponse_v1, - OffsetCommitResponse_v2, OffsetCommitResponse_v3, - OffsetCommitResponse_v4, OffsetCommitResponse_v5, - OffsetCommitResponse_v6, + OffsetCommitResponse_v0, OffsetCommitResponse_v1, OffsetCommitResponse_v2, + OffsetCommitResponse_v3, OffsetCommitResponse_v4, OffsetCommitResponse_v5, + OffsetCommitResponse_v6, OffsetCommitResponse_v7, ] diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py index 74e19c94b..383f3cd2a 100644 --- a/kafka/protocol/group.py +++ b/kafka/protocol/group.py @@ -1,5 +1,7 @@ from __future__ import absolute_import +import collections + from kafka.protocol.api import Request, Response from kafka.protocol.struct import Struct from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String @@ -8,6 +10,9 @@ DEFAULT_GENERATION_ID = -1 UNKNOWN_MEMBER_ID = '' +GroupMember = collections.namedtuple("GroupMember", ["member_id", "group_instance_id", "metadata_bytes"]) +GroupMember.__new__.__defaults__ = (None,) * len(GroupMember._fields) + class JoinGroupResponse_v0(Response): API_KEY = 11 @@ -58,6 +63,23 @@ class JoinGroupResponse_v4(Response): SCHEMA = JoinGroupResponse_v3.SCHEMA +class JoinGroupResponse_v5(Response): + API_KEY = 11 + API_VERSION = 5 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('generation_id', Int32), + ('group_protocol', String('utf-8')), + ('leader_id', String('utf-8')), + ('member_id', String('utf-8')), + ('members', Array( + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ('member_metadata', Bytes))) + ) + + class JoinGroupRequest_v0(Request): API_KEY = 11 API_VERSION = 0 @@ -110,13 +132,31 @@ class JoinGroupRequest_v4(Request): SCHEMA = JoinGroupRequest_v3.SCHEMA +class JoinGroupRequest_v5(Request): + API_KEY = 11 + API_VERSION = 5 + RESPONSE_TYPE = JoinGroupResponse_v5 + SCHEMA = Schema( + ('group', String('utf-8')), + ('session_timeout', Int32), + ('rebalance_timeout', Int32), + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ('protocol_type', String('utf-8')), + ('group_protocols', Array( + ('protocol_name', String('utf-8')), + ('protocol_metadata', Bytes))) + ) + + JoinGroupRequest = [ JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2, - JoinGroupRequest_v3, JoinGroupRequest_v4, + JoinGroupRequest_v3, JoinGroupRequest_v4, JoinGroupRequest_v5, + ] JoinGroupResponse = [ JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2, - JoinGroupResponse_v3, JoinGroupResponse_v4, + JoinGroupResponse_v3, JoinGroupResponse_v4, JoinGroupResponse_v5, ] @@ -153,6 +193,12 @@ class SyncGroupResponse_v2(Response): SCHEMA = SyncGroupResponse_v1.SCHEMA +class SyncGroupResponse_v3(Response): + API_KEY = 14 + API_VERSION = 3 + SCHEMA = SyncGroupResponse_v2.SCHEMA + + class SyncGroupRequest_v0(Request): API_KEY = 14 API_VERSION = 0 @@ -181,8 +227,29 @@ class SyncGroupRequest_v2(Request): SCHEMA = SyncGroupRequest_v1.SCHEMA -SyncGroupRequest = [SyncGroupRequest_v0, SyncGroupRequest_v1, SyncGroupRequest_v2] -SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1, SyncGroupResponse_v2] +class SyncGroupRequest_v3(Request): + API_KEY = 14 + API_VERSION = 3 + RESPONSE_TYPE = SyncGroupResponse_v3 + SCHEMA = Schema( + ('group', String('utf-8')), + ('generation_id', Int32), + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ('group_assignment', Array( + ('member_id', String('utf-8')), + ('member_metadata', Bytes))) + ) + + +SyncGroupRequest = [ + SyncGroupRequest_v0, SyncGroupRequest_v1, SyncGroupRequest_v2, + SyncGroupRequest_v3, +] +SyncGroupResponse = [ + SyncGroupResponse_v0, SyncGroupResponse_v1, SyncGroupResponse_v2, + SyncGroupResponse_v3, +] class MemberAssignment(Struct): @@ -218,6 +285,12 @@ class HeartbeatResponse_v2(Response): SCHEMA = HeartbeatResponse_v1.SCHEMA +class HeartbeatResponse_v3(Response): + API_KEY = 12 + API_VERSION = 3 + SCHEMA = HeartbeatResponse_v2.SCHEMA + + class HeartbeatRequest_v0(Request): API_KEY = 12 API_VERSION = 0 @@ -243,8 +316,26 @@ class HeartbeatRequest_v2(Request): SCHEMA = HeartbeatRequest_v1.SCHEMA -HeartbeatRequest = [HeartbeatRequest_v0, HeartbeatRequest_v1, HeartbeatRequest_v2] -HeartbeatResponse = [HeartbeatResponse_v0, HeartbeatResponse_v1, HeartbeatResponse_v2] +class HeartbeatRequest_v3(Request): + API_KEY = 12 + API_VERSION = 3 + RESPONSE_TYPE = HeartbeatResponse_v3 + SCHEMA = Schema( + ('group', String('utf-8')), + ('generation_id', Int32), + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')) + ) + + +HeartbeatRequest = [ + HeartbeatRequest_v0, HeartbeatRequest_v1, HeartbeatRequest_v2, + HeartbeatRequest_v3, +] +HeartbeatResponse = [ + HeartbeatResponse_v0, HeartbeatResponse_v1, HeartbeatResponse_v2, + HeartbeatResponse_v3, +] class LeaveGroupResponse_v0(Response): @@ -270,6 +361,19 @@ class LeaveGroupResponse_v2(Response): SCHEMA = LeaveGroupResponse_v1.SCHEMA +class LeaveGroupResponse_v3(Response): + API_KEY = 13 + API_VERSION = 3 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('members', Array( + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')), + ('error_code', Int16))) + ) + + class LeaveGroupRequest_v0(Request): API_KEY = 13 API_VERSION = 0 @@ -294,5 +398,23 @@ class LeaveGroupRequest_v2(Request): SCHEMA = LeaveGroupRequest_v1.SCHEMA -LeaveGroupRequest = [LeaveGroupRequest_v0, LeaveGroupRequest_v1, LeaveGroupRequest_v2] -LeaveGroupResponse = [LeaveGroupResponse_v0, LeaveGroupResponse_v1, LeaveGroupResponse_v2] +class LeaveGroupRequest_v3(Request): + API_KEY = 13 + API_VERSION = 3 + RESPONSE_TYPE = LeaveGroupResponse_v3 + SCHEMA = Schema( + ('group', String('utf-8')), + ('members', Array( + ('member_id', String('utf-8')), + ('group_instance_id', String('utf-8')))) + ) + + +LeaveGroupRequest = [ + LeaveGroupRequest_v0, LeaveGroupRequest_v1, LeaveGroupRequest_v2, + LeaveGroupRequest_v3, +] +LeaveGroupResponse = [ + LeaveGroupResponse_v0, LeaveGroupResponse_v1, LeaveGroupResponse_v2, + LeaveGroupResponse_v3, +] diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index f95f367e8..e1ec139a6 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -231,7 +231,7 @@ def consumer_thread(i, group_id): else: assert(len(consumer_group.members) == 1) for member in consumer_group.members: - assert(member.member_metadata.subscription[0] == topic) + assert(member.member_metadata.topics[0] == topic) assert(member.member_assignment.assignment[0][0] == topic) consumer_groups.add(consumer_group.group) assert(sorted(list(consumer_groups)) == group_id_list) diff --git a/test/test_assignors.py b/test/test_assignors.py index 858ef426d..78f424c8e 100644 --- a/test/test_assignors.py +++ b/test/test_assignors.py @@ -10,7 +10,8 @@ from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor, StickyAssignorUserDataV1 -from kafka.coordinator.protocol import ConsumerProtocolMemberAssignment, ConsumerProtocolMemberMetadata +from kafka.coordinator.protocol import ConsumerProtocolMemberAssignment_v0, ConsumerProtocolMemberMetadata_v0 +from kafka.coordinator.subscription import Subscription from kafka.vendor import six @@ -34,17 +35,17 @@ def create_cluster(mocker, topics, topics_partitions=None, topic_partitions_lamb def test_assignor_roundrobin(mocker): assignor = RoundRobinPartitionAssignor - member_metadata = { - 'C0': assignor.metadata({'t0', 't1'}), - 'C1': assignor.metadata({'t0', 't1'}), + group_subscriptions = { + 'C0': Subscription(assignor.metadata({'t0', 't1'}), None), + 'C1': Subscription(assignor.metadata({'t0', 't1'}), None), } cluster = create_cluster(mocker, {'t0', 't1'}, topics_partitions={0, 1, 2}) - ret = assignor.assign(cluster, member_metadata) + ret = assignor.assign(cluster, group_subscriptions) expected = { - 'C0': ConsumerProtocolMemberAssignment( + 'C0': ConsumerProtocolMemberAssignment_v0( assignor.version, [('t0', [0, 2]), ('t1', [1])], b''), - 'C1': ConsumerProtocolMemberAssignment( + 'C1': ConsumerProtocolMemberAssignment_v0( assignor.version, [('t0', [1]), ('t1', [0, 2])], b'') } assert ret == expected @@ -56,17 +57,17 @@ def test_assignor_roundrobin(mocker): def test_assignor_range(mocker): assignor = RangePartitionAssignor - member_metadata = { - 'C0': assignor.metadata({'t0', 't1'}), - 'C1': assignor.metadata({'t0', 't1'}), + group_subscriptions = { + 'C0': Subscription(assignor.metadata({'t0', 't1'}), None), + 'C1': Subscription(assignor.metadata({'t0', 't1'}), None), } cluster = create_cluster(mocker, {'t0', 't1'}, topics_partitions={0, 1, 2}) - ret = assignor.assign(cluster, member_metadata) + ret = assignor.assign(cluster, group_subscriptions) expected = { - 'C0': ConsumerProtocolMemberAssignment( + 'C0': ConsumerProtocolMemberAssignment_v0( assignor.version, [('t0', [0, 1]), ('t1', [0, 1])], b''), - 'C1': ConsumerProtocolMemberAssignment( + 'C1': ConsumerProtocolMemberAssignment_v0( assignor.version, [('t0', [2]), ('t1', [2])], b'') } assert ret == expected @@ -102,9 +103,9 @@ def test_sticky_assignor1(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C0': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [1]), ('t3', [0])], b''), - 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [1]), ('t2', [0]), ('t3', [1])], b''), - 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0]), ('t2', [1])], b''), + 'C0': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [1]), ('t3', [0])], b''), + 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t0', [1]), ('t2', [0]), ('t3', [1])], b''), + 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [0]), ('t2', [1])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -115,10 +116,10 @@ def test_sticky_assignor1(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C0': ConsumerProtocolMemberAssignment( + 'C0': ConsumerProtocolMemberAssignment_v0( StickyPartitionAssignor.version, [('t0', [0]), ('t1', [1]), ('t2', [0]), ('t3', [0])], b'' ), - 'C2': ConsumerProtocolMemberAssignment( + 'C2': ConsumerProtocolMemberAssignment_v0( StickyPartitionAssignor.version, [('t0', [1]), ('t1', [0]), ('t2', [1]), ('t3', [1])], b'' ), } @@ -158,9 +159,9 @@ def test_sticky_assignor2(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C0': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [0])], b''), - 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 1])], b''), - 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''), + 'C0': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t0', [0])], b''), + 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [0, 1])], b''), + 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -171,8 +172,8 @@ def test_sticky_assignor2(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [0, 1])], b''), - 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''), + 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [0, 1])], b''), + 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -187,7 +188,7 @@ def test_sticky_one_consumer_no_topic(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''), + 'C': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -202,7 +203,7 @@ def test_sticky_one_consumer_nonexisting_topic(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''), + 'C': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -217,7 +218,7 @@ def test_sticky_one_consumer_one_topic(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), + 'C': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -232,7 +233,7 @@ def test_sticky_should_only_assign_partitions_from_subscribed_topics(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), + 'C': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -247,7 +248,7 @@ def test_sticky_one_consumer_multiple_topics(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 1, 2]), ('t2', [0, 1, 2])], b''), + 'C': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [0, 1, 2]), ('t2', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -263,8 +264,8 @@ def test_sticky_two_consumers_one_topic_one_partition(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0])], b''), - 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''), + 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t', [0])], b''), + 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -280,8 +281,8 @@ def test_sticky_two_consumers_one_topic_two_partitions(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0])], b''), - 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [1])], b''), + 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t', [0])], b''), + 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t', [1])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -299,9 +300,9 @@ def test_sticky_multiple_consumers_mixed_topic_subscriptions(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 2])], b''), - 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 1])], b''), - 'C3': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [1])], b''), + 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [0, 2])], b''), + 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t2', [0, 1])], b''), + 'C3': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [1])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -316,7 +317,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker): assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), + 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), } assert_assignment(assignment, expected_assignment) @@ -356,8 +357,8 @@ def test_sticky_add_remove_topic_two_consumers(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 2])], b''), - 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [1])], b''), + 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [0, 2])], b''), + 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [1])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -371,8 +372,8 @@ def test_sticky_add_remove_topic_two_consumers(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 2]), ('t2', [1])], b''), - 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [1]), ('t2', [0, 2])], b''), + 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [0, 2]), ('t2', [1])], b''), + 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [1]), ('t2', [0, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -386,8 +387,8 @@ def test_sticky_add_remove_topic_two_consumers(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [1])], b''), - 'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 2])], b''), + 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t2', [1])], b''), + 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t2', [0, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -601,7 +602,7 @@ def topic_partitions(topic): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0]), ('t3', list(range(100)))], b''), + 'C': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [0]), ('t3', list(range(100)))], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -616,7 +617,7 @@ def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), + 'C': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -630,7 +631,7 @@ def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker): cluster = create_cluster(mocker, topics={}, topics_partitions={}) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''), + 'C': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [], b''), } assert_assignment(sticky_assignment, expected_assignment) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 4ffe1d28c..b7db5ad19 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -13,13 +13,15 @@ from kafka.coordinator.base import Generation, MemberState, HeartbeatThread from kafka.coordinator.consumer import ConsumerCoordinator from kafka.coordinator.protocol import ( - ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment) + ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0) +from kafka.coordinator.subscription import Subscription import kafka.errors as Errors from kafka.future import Future from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.protocol.commit import ( OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse) +from kafka.protocol.group import GroupMember from kafka.protocol.metadata import MetadataResponse from kafka.structs import OffsetAndMetadata, TopicPartition from kafka.util import WeakMethod @@ -73,15 +75,15 @@ def test_group_protocols(coordinator): coordinator._subscription.subscribe(topics=['foobar']) assert coordinator.group_protocols() == [ - ('range', ConsumerProtocolMemberMetadata( + ('range', ConsumerProtocolMemberMetadata_v0( RangePartitionAssignor.version, ['foobar'], b'')), - ('roundrobin', ConsumerProtocolMemberMetadata( + ('roundrobin', ConsumerProtocolMemberMetadata_v0( RoundRobinPartitionAssignor.version, ['foobar'], b'')), - ('sticky', ConsumerProtocolMemberMetadata( + ('sticky', ConsumerProtocolMemberMetadata_v0( StickyPartitionAssignor.version, ['foobar'], b'')), @@ -134,7 +136,7 @@ def test_join_complete(mocker, coordinator): coordinator.config['assignors'] = (assignor,) mocker.spy(assignor, 'on_assignment') assert assignor.on_assignment.call_count == 0 - assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') + assignment = ConsumerProtocolMemberAssignment_v0(0, [('foobar', [0, 1])], b'') coordinator._on_join_complete(0, 'member-foo', 'roundrobin', assignment.encode()) assert assignor.on_assignment.call_count == 1 assignor.on_assignment.assert_called_with(assignment) @@ -148,7 +150,7 @@ def test_join_complete_with_sticky_assignor(mocker, coordinator): mocker.spy(assignor, 'on_generation_assignment') assert assignor.on_assignment.call_count == 0 assert assignor.on_generation_assignment.call_count == 0 - assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') + assignment = ConsumerProtocolMemberAssignment_v0(0, [('foobar', [0, 1])], b'') coordinator._on_join_complete(0, 'member-foo', 'sticky', assignment.encode()) assert assignor.on_assignment.call_count == 1 assert assignor.on_generation_assignment.call_count == 1 @@ -166,7 +168,7 @@ def test_subscription_listener(mocker, coordinator): assert listener.on_partitions_revoked.call_count == 1 listener.on_partitions_revoked.assert_called_with(set([])) - assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') + assignment = ConsumerProtocolMemberAssignment_v0(0, [('foobar', [0, 1])], b'') coordinator._on_join_complete( 0, 'member-foo', 'roundrobin', assignment.encode()) assert listener.on_partitions_assigned.call_count == 1 @@ -184,7 +186,7 @@ def test_subscription_listener_failure(mocker, coordinator): coordinator._on_join_prepare(0, 'member-foo') assert listener.on_partitions_revoked.call_count == 1 - assignment = ConsumerProtocolMemberAssignment(0, [('foobar', [0, 1])], b'') + assignment = ConsumerProtocolMemberAssignment_v0(0, [('foobar', [0, 1])], b'') coordinator._on_join_complete( 0, 'member-foo', 'roundrobin', assignment.encode()) assert listener.on_partitions_assigned.call_count == 1 @@ -192,14 +194,14 @@ def test_subscription_listener_failure(mocker, coordinator): def test_perform_assignment(mocker, coordinator): coordinator._subscription.subscribe(topics=['foo1']) - member_metadata = { - 'member-foo': ConsumerProtocolMemberMetadata(0, ['foo1'], b''), - 'member-bar': ConsumerProtocolMemberMetadata(0, ['foo1'], b'') + group_subscriptions = { + 'member-foo': Subscription(ConsumerProtocolMemberMetadata_v0(0, ['foo1'], b''), None), + 'member-bar': Subscription(ConsumerProtocolMemberMetadata_v0(0, ['foo1'], b''), None), } assignments = { - 'member-foo': ConsumerProtocolMemberAssignment( + 'member-foo': ConsumerProtocolMemberAssignment_v0( 0, [('foo1', [0])], b''), - 'member-bar': ConsumerProtocolMemberAssignment( + 'member-bar': ConsumerProtocolMemberAssignment_v0( 0, [('foo1', [1])], b'') } @@ -208,12 +210,12 @@ def test_perform_assignment(mocker, coordinator): ret = coordinator._perform_assignment( 'member-foo', 'roundrobin', - [(member, metadata.encode()) - for member, metadata in member_metadata.items()]) + [GroupMember(member, None, subscription.encode()) + for member, subscription in group_subscriptions.items()]) assert RoundRobinPartitionAssignor.assign.call_count == 1 RoundRobinPartitionAssignor.assign.assert_called_with( - coordinator._client.cluster, member_metadata) + coordinator._client.cluster, group_subscriptions) assert ret == assignments