Skip to content

Support Unix domain socket in Kafka::Datadog #827

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion lib/kafka/datadog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module Datadog

class << self
def statsd
@statsd ||= ::Datadog::Statsd.new(host, port, namespace: namespace, tags: tags)
@statsd ||= ::Datadog::Statsd.new(host, port, namespace: namespace, tags: tags, socket_path: socket_path)
end

def statsd=(statsd)
Expand All @@ -57,6 +57,15 @@ def port=(port)
clear
end

def socket_path
@socket_path
end

def socket_path=(socket_path)
@socket_path = socket_path
clear
end

def namespace
@namespace ||= STATSD_NAMESPACE
end
Expand Down
38 changes: 29 additions & 9 deletions spec/datadog_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,40 @@
agent.stop
end

it "emits metrics to the Datadog agent" do
Kafka::Datadog.host = agent.host
Kafka::Datadog.port = agent.port
context "when host and port are specified" do
it "emits metrics to the Datadog agent" do
Kafka::Datadog.host = agent.host
Kafka::Datadog.port = agent.port

client = Kafka::Datadog.statsd
client = Kafka::Datadog.statsd

client.increment("greetings")
client.increment("greetings")

agent.wait_for_metrics
agent.wait_for_metrics

expect(agent.metrics.count).to eq 1
expect(agent.metrics.count).to eq 1

metric = agent.metrics.first
metric = agent.metrics.first

expect(metric).to eq "ruby_kafka.greetings"
expect(metric).to eq "ruby_kafka.greetings"
end
end

context "when socket_path is specified" do
it "emits metrics to the Datadog agent" do
Kafka::Datadog.socket_path = agent.socket_path

client = Kafka::Datadog.statsd

client.increment("greetings")

agent.wait_for_metrics

expect(agent.metrics.count).to eq 1

metric = agent.metrics.first

expect(metric).to eq "ruby_kafka.greetings"
end
end
end
40 changes: 30 additions & 10 deletions spec/fake_datadog_agent.rb
Original file line number Diff line number Diff line change
@@ -1,27 +1,39 @@
# frozen_string_literal: true

require "socket"
require "tempfile"

class FakeDatadogAgent
attr_reader :host, :port, :metrics
attr_reader :host, :port, :socket_path, :metrics

def initialize
@host = "127.0.0.1"
@port = 9999
@socket = UDPSocket.new
@thread = nil
@socket_path = Tempfile.open("fake_datadog_agent_sock") do |f|
path = f.path
f.unlink
path
end
@udp_socket = UDPSocket.new
@uds_socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
@threads = []
@metrics = []

@socket.bind(@host, @port)
end

def start
@thread = Thread.new { loop { receive } }
@thread.abort_on_exception = true
@udp_socket.bind(@host, @port)
@uds_socket.bind(Addrinfo.unix(@socket_path))

@threads << Thread.new { loop { receive_from_udp_socket } }
@threads << Thread.new { loop { receive_from_uds_socket } }
@threads.each { |th| th.abort_on_exception = true }
end

def stop
@thread.kill
@threads.each(&:kill)
@udp_socket.close
@uds_socket.close
@threads = []
end

def wait_for_metrics(count: 1)
Expand All @@ -34,9 +46,17 @@ def wait_for_metrics(count: 1)

private

def receive
data, sender = @socket.recvfrom(512)
def receive_from_udp_socket
data, _ = @udp_socket.recvfrom(512)
add_metrics(data)
end

def receive_from_uds_socket
data, _ = @uds_socket.recvfrom(512)
add_metrics(data)
end

def add_metrics(data)
data.split("\n").each do |message|
metric = message.split(":").first
@metrics << metric if metric
Expand Down
25 changes: 22 additions & 3 deletions spec/functional/datadog_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,31 @@
require "fake_datadog_agent"

describe "Reporting metrics to Datadog", functional: true do
example "reporting connection metrics" do
agent = FakeDatadogAgent.new
let(:agent) { FakeDatadogAgent.new }

before do
agent.start
end

after do
agent.stop
end

example "reporting connection metrics using UDP socket" do
Kafka::Datadog.port = agent.port

agent.start
kafka.topics

agent.wait_for_metrics(count: 4)

expect(agent.metrics).to include("ruby_kafka.api.calls")
expect(agent.metrics).to include("ruby_kafka.api.latency")
expect(agent.metrics).to include("ruby_kafka.api.request_size")
expect(agent.metrics).to include("ruby_kafka.api.response_size")
end

example "reporting connection metrics using Unix domain socket" do
Kafka::Datadog.socket_path = agent.socket_path

kafka.topics

Expand Down