You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The assignor is responsible for getting partition information
and building an assignment from the result of its strategy.
That simplifies assignment strategies.
Copy file name to clipboardExpand all lines: README.md
+10-11
Original file line number
Diff line number
Diff line change
@@ -727,11 +727,11 @@ end
727
727
#### Customizing Partition Assignment Strategy
728
728
729
729
In some cases, you might want to assign more partitions to some consumers. For example, in applications inserting some records to a database, the consumers running on hosts nearby the database can process more messages than the consumers running on other hosts.
730
-
You can implement a custom assignment strategy and use it by passing a proc object that create the strategy as the argument `assignment_strategy_builder` like below:
730
+
You can implement a custom assignment strategy and use it by passing an object that implements `#protocol_name`, `#user_data`, and `#assign`as the argument `assignment_strategy` like below:
731
731
732
732
```ruby
733
733
classCustomAssignmentStrategy
734
-
definitialize(cluster, user_data)
734
+
definitialize(user_data)
735
735
@cluster= cluster
736
736
@user_data= user_data
737
737
end
@@ -748,20 +748,19 @@ class CustomAssignmentStrategy
748
748
749
749
# Assign the topic partitions to the group members.
750
750
#
751
-
#@parammembers[Hash<String, Protocol::JoinGroupResponse::Metadata>] a hash
751
+
#@parammembers[Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
752
752
# mapping member ids to metadata
753
-
#@paramtopics[Array<String>] topics
754
-
#@return[Hash<String, Protocol::MemberAssignment>] a hash mapping member
755
-
# ids to assignments.
756
-
defassign(members:, topics:)
753
+
#@parampartitions[Array<Kafka::ConsumerGroup::Assignor::Partition>] a list of
754
+
# partitions the consumer group processes
755
+
#@return[Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
0 commit comments