Skip to content

Commit 20b62b8

Browse files
committed
Support Unix domain socket in Kafka::Datadog
dogstatsd-ruby has supported Unix domain socket. cf. DataDog/dogstatsd-ruby#61
1 parent 1b2ed7c commit 20b62b8

File tree

4 files changed

+91
-23
lines changed

4 files changed

+91
-23
lines changed

lib/kafka/datadog.rb

+10-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ module Datadog
3131

3232
class << self
3333
def statsd
34-
@statsd ||= ::Datadog::Statsd.new(host, port, namespace: namespace, tags: tags)
34+
@statsd ||= ::Datadog::Statsd.new(host, port, namespace: namespace, tags: tags, socket_path: socket_path)
3535
end
3636

3737
def statsd=(statsd)
@@ -57,6 +57,15 @@ def port=(port)
5757
clear
5858
end
5959

60+
def socket_path
61+
@socket_path
62+
end
63+
64+
def socket_path=(socket_path)
65+
@socket_path = socket_path
66+
clear
67+
end
68+
6069
def namespace
6170
@namespace ||= STATSD_NAMESPACE
6271
end

spec/datadog_spec.rb

+29-9
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,40 @@
1414
agent.stop
1515
end
1616

17-
it "emits metrics to the Datadog agent" do
18-
Kafka::Datadog.host = agent.host
19-
Kafka::Datadog.port = agent.port
17+
context "when host and port are specified" do
18+
it "emits metrics to the Datadog agent" do
19+
Kafka::Datadog.host = agent.host
20+
Kafka::Datadog.port = agent.port
2021

21-
client = Kafka::Datadog.statsd
22+
client = Kafka::Datadog.statsd
2223

23-
client.increment("greetings")
24+
client.increment("greetings")
2425

25-
agent.wait_for_metrics
26+
agent.wait_for_metrics
2627

27-
expect(agent.metrics.count).to eq 1
28+
expect(agent.metrics.count).to eq 1
2829

29-
metric = agent.metrics.first
30+
metric = agent.metrics.first
3031

31-
expect(metric).to eq "ruby_kafka.greetings"
32+
expect(metric).to eq "ruby_kafka.greetings"
33+
end
34+
end
35+
36+
context "when host and port are specified" do
37+
it "emits metrics to the Datadog agent" do
38+
Kafka::Datadog.socket_path = agent.socket_path
39+
40+
client = Kafka::Datadog.statsd
41+
42+
client.increment("greetings")
43+
44+
agent.wait_for_metrics
45+
46+
expect(agent.metrics.count).to eq 1
47+
48+
metric = agent.metrics.first
49+
50+
expect(metric).to eq "ruby_kafka.greetings"
51+
end
3252
end
3353
end

spec/fake_datadog_agent.rb

+30-10
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,39 @@
11
# frozen_string_literal: true
22

33
require "socket"
4+
require "tempfile"
45

56
class FakeDatadogAgent
6-
attr_reader :host, :port, :metrics
7+
attr_reader :host, :port, :socket_path, :metrics
78

89
def initialize
910
@host = "127.0.0.1"
1011
@port = 9999
11-
@socket = UDPSocket.new
12-
@thread = nil
12+
@socket_path = Tempfile.open("fake_datadog_agent_sock") do |f|
13+
path = f.path
14+
f.unlink
15+
path
16+
end
17+
@udp_socket = UDPSocket.new
18+
@uds_socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
19+
@threads = []
1320
@metrics = []
14-
15-
@socket.bind(@host, @port)
1621
end
1722

1823
def start
19-
@thread = Thread.new { loop { receive } }
20-
@thread.abort_on_exception = true
24+
@udp_socket.bind(@host, @port)
25+
@uds_socket.bind(Addrinfo.unix(@socket_path))
26+
27+
@threads << Thread.new { loop { receive_from_udp_socket } }
28+
@threads << Thread.new { loop { receive_from_uds_socket } }
29+
@threads.each { |th| th.abort_on_exception = true }
2130
end
2231

2332
def stop
24-
@thread.kill
33+
@threads.each(&:kill)
34+
@udp_socket.close
35+
@uds_socket.close
36+
@threads = []
2537
end
2638

2739
def wait_for_metrics(count: 1)
@@ -34,9 +46,17 @@ def wait_for_metrics(count: 1)
3446

3547
private
3648

37-
def receive
38-
data, sender = @socket.recvfrom(512)
49+
def receive_from_udp_socket
50+
data, _ = @udp_socket.recvfrom(512)
51+
add_metrics(data)
52+
end
53+
54+
def receive_from_uds_socket
55+
data, _ = @uds_socket.recvfrom(512)
56+
add_metrics(data)
57+
end
3958

59+
def add_metrics(data)
4060
data.split("\n").each do |message|
4161
metric = message.split(":").first
4262
@metrics << metric if metric

spec/functional/datadog_spec.rb

+22-3
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,31 @@
55
require "fake_datadog_agent"
66

77
describe "Reporting metrics to Datadog", functional: true do
8-
example "reporting connection metrics" do
9-
agent = FakeDatadogAgent.new
8+
let(:agent) { FakeDatadogAgent.new }
109

10+
before do
11+
agent.start
12+
end
13+
14+
after do
15+
agent.stop
16+
end
17+
18+
example "reporting connection metrics using UDP socket" do
1119
Kafka::Datadog.port = agent.port
1220

13-
agent.start
21+
kafka.topics
22+
23+
agent.wait_for_metrics(count: 4)
24+
25+
expect(agent.metrics).to include("ruby_kafka.api.calls")
26+
expect(agent.metrics).to include("ruby_kafka.api.latency")
27+
expect(agent.metrics).to include("ruby_kafka.api.request_size")
28+
expect(agent.metrics).to include("ruby_kafka.api.response_size")
29+
end
30+
31+
example "reporting connection metrics using UDS socket" do
32+
Kafka::Datadog.socket_path = agent.socket_path
1433

1534
kafka.topics
1635

0 commit comments

Comments
 (0)