ruby-kafka-ec2 is an extension of ruby-kafka that provides useful features for EC2 like Kafka::EC2::MixedInstanceAssignmentStrategy.
Add this line to your application's Gemfile:
gem 'ruby-kafka-ec2'
And then execute:
$ bundle install
Or install it yourself as:
$ gem install ruby-kafka-ec2
Kafka::EC2::MixedInstanceAssignmentStrategy
is an assignor for auto-scaling groups with mixed instance policies. The throughputs of consumers usually depend on instance families and availability zones. For example, if your application writes data to a database, the throughputs of consumers running on the same availability zone as the writer DB instance is higher.
To assign more partitions to consumers with high throughputs, you have to define Kafka::EC2::MixedInstanceAssignmentStrategyFactory
first like below:
require "aws-sdk-rds"
require "kafka"
require "kafka/ec2"
rds = Aws::RDS::Client.new(region: "ap-northeast-1")
assignment_strategy_factory = Kafka::EC2::MixedInstanceAssignmentStrategyFactory.new(
instance_family_weights: {
"r4" => 1.00,
"r5" => 1.20,
"m5" => 1.35,
"c5" => 1.50,
},
availability_zone_weights: ->() {
db_cluster = rds.describe_db_clusters(filters: [
{ name: "db-cluster-id", values: [ENV["RDS_CLUSTER"]] },
]).db_clusters.first
db_instance_id = db_cluster.db_cluster_members.find { |m| m.is_cluster_writer }.db_instance_identifier
db_instance = rds.describe_db_instances(filters: [
{ name: "db-cluster-id", values: [ENV["RDS_CLUSTER"]] },
{ name: "db-instance-id", values: [db_instance_id] },
]).db_instances.first
if db_instance.availability_zone == "ap-northeast-1a"
{
"ap-northeast-1a" => 1,
"ap-northeast-1c" => 0.25,
}
else
{
"ap-northeast-1a" => 0.25,
"ap-northeast-1c" => 1,
}
end
},
)
In the preceding example, consumers running on c5 instances will have 1.5x as many partitions compared to consumers running on r4 instances. In a similar way, if the writer DB instance is in ap-northeast-1a, consumers in ap-northeast-1a will have 4x as many partitions compared to consumers in ap-northeast-1c.
You can use Kafka::EC2::MixedInstanceAssignmentStrategy
by specifying the factory to Kafka::EC2.with_assignment_strategy_factory
and creating a consumer in the block:
consumer = Kafka::EC2.with_assignment_strategy_factory(assignment_strategy_factory) do
kafka.consumer(group_id: ENV["KAFKA_CONSUMER_GROUP_ID"])
end
After checking out the repo, run bin/setup
to install dependencies. Then, run rake spec
to run the tests. You can also run bin/console
for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install
. To release a new version, update the version number in version.rb
, and then run bundle exec rake release
, which will create a git tag for the version, push git commits and tags, and push the .gem
file to rubygems.org.
Bug reports and pull requests are welcome on GitHub at https://github.com/abicky/ruby-kafka-ec2.
The gem is available as open source under the terms of the MIT License.