You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: README.md
+27-28
Original file line number
Diff line number
Diff line change
@@ -747,47 +747,46 @@ end
747
747
#### Customizing Partition Assignment Strategy
748
748
749
749
In some cases, you might want to assign more partitions to some consumers. For example, in applications inserting some records to a database, the consumers running on hosts nearby the database can process more messages than the consumers running on other hosts.
750
-
You can implement a custom assignment strategy and use it by passing an object that implements `#user_data` and `#assign` as the argument `assignment_strategy` like below:
750
+
You can use a custom assignment strategy like below:
751
751
752
752
```ruby
753
-
classCustomAssignmentStrategy
754
-
definitialize(user_data)
755
-
@user_data= user_data
756
-
end
753
+
Kafka::ConsumerGroup::Assignor.register_strategy(:custom) do |cluster:, members:, partitions:|
`members` is a hash mapping member ids to metadata, and `partitions` is a list of partitions the consumer group processes. The block should return a hash mapping member ids to partitions.
760
+
For example, the following strategy assigns partitions randomly:
762
761
763
-
# Assign the topic partitions to the group members.
764
-
#
765
-
#@paramcluster[Kafka::Cluster]
766
-
#@parammembers[Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
767
-
# mapping member ids to metadata
768
-
#@parampartitions[Array<Kafka::ConsumerGroup::Assignor::Partition>] a list of
769
-
# partitions the consumer group processes
770
-
#@return[Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
771
-
# mapping member ids to partitions.
772
-
defassign(cluster:, members:, partitions:)
773
-
...
762
+
```ruby
763
+
Kafka::ConsumerGroup::Assignor.register_strategy(:custom) do |cluster:, members:, partitions:|
764
+
member_ids = members.keys
765
+
partitions.each_with_object(Hash.new {|h, k| h[k] = [] }) do |partition, partitions_per_member|
0 commit comments