-
Notifications
You must be signed in to change notification settings - Fork 66
/
Copy pathhttp_poller.rb
274 lines (230 loc) · 9.57 KB
/
http_poller.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/plugin_mixins/http_client"
require "socket" # for Socket.gethostname
require "manticore"
require "rufus/scheduler"
class LogStash::Inputs::HTTP_Poller2 < LogStash::Inputs::Base
include LogStash::PluginMixins::HttpClient
config_name "http_poller2"
default :codec, "json"
# A Hash of urls in this format : `"name" => "url"`.
# The name and the url will be passed in the outputed event
config :urls, :validate => :hash, :required => true
# Schedule of when to periodically poll from the urls
# Format: A hash with
# + key: "cron" | "every" | "in" | "at"
# + value: string
# Examples:
# a) { "every" => "1h" }
# b) { "cron" => "* * * * * UTC" }
# See: rufus/scheduler for details about different schedule options and value string format
config :schedule, :validate => :hash, :required => true
# Define the target field for placing the received data. If this setting is omitted, the data will be stored at the root (top level) of the event.
config :target, :validate => :string
# If you'd like to work with the request/response metadata.
# Set this value to the name of the field you'd like to store a nested
# hash of metadata.
config :metadata_target, :validate => :string, :default => '@metadata'
public
Schedule_types = %w(cron every at in)
def register
@host = Socket.gethostname.force_encoding(Encoding::UTF_8)
@logger.info("Registering http_poller2 Input", :type => @type, :schedule => @schedule, :timeout => @timeout)
setup_requests!
end
def stop
Stud.stop!(@interval_thread) if @interval_thread
@scheduler.stop if @scheduler
end
private
def setup_requests!
@requests = Hash[@urls.map {|name, url| [name, normalize_request(url)] }]
end
private
def normalize_request(url_or_spec)
if url_or_spec.is_a?(String)
res = [:get, url_or_spec]
#substitute date variables if passed as part of the URL string
now_datetime = DateTime.now
res.gsub!(/{{%y}}/, now_datetime.year)
res.gsub!(/{{%m}}/, now_datetime.month)
elsif url_or_spec.is_a?(Hash)
# The client will expect keys / values
spec = Hash[url_or_spec.clone.map {|k,v| [k.to_sym, v] }] # symbolize keys
# method and url aren't really part of the options, so we pull them out
method = (spec.delete(:method) || :get).to_sym.downcase
url = spec.delete(:url)
# Manticore wants auth options that are like {:auth => {:user => u, :pass => p}}
# We allow that because earlier versions of this plugin documented that as the main way to
# to do things, but now prefer top level "user", and "password" options
# So, if the top level user/password are defined they are moved to the :auth key for manticore
# if those attributes are already in :auth they still need to be transformed to symbols
auth = spec[:auth]
user = spec.delete(:user) || (auth && auth["user"])
password = spec.delete(:password) || (auth && auth["password"])
if user.nil? ^ password.nil?
raise LogStash::ConfigurationError, "'user' and 'password' must both be specified for input HTTP poller!"
end
if user && password
spec[:auth] = {
user: user,
pass: password,
eager: true
}
end
res = [method, url, spec]
else
raise LogStash::ConfigurationError, "Invalid URL or request spec: '#{url_or_spec}', expected a String or Hash!"
end
validate_request!(url_or_spec, res)
res
end
private
def validate_request!(url_or_spec, request)
method, url, spec = request
raise LogStash::ConfigurationError, "Invalid URL #{url}" unless URI::DEFAULT_PARSER.regexp[:ABS_URI].match(url)
raise LogStash::ConfigurationError, "No URL provided for request! #{url_or_spec}" unless url
if spec && spec[:auth]
if !spec[:auth][:user]
raise LogStash::ConfigurationError, "Auth was specified, but 'user' was not!"
end
if !spec[:auth][:pass]
raise LogStash::ConfigurationError, "Auth was specified, but 'password' was not!"
end
end
request
end
public
def run(queue)
setup_schedule(queue)
end
def setup_schedule(queue)
#schedule hash must contain exactly one of the allowed keys
msg_invalid_schedule = "Invalid config. schedule hash must contain " +
"exactly one of the following keys - cron, at, every or in"
raise Logstash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length !=1
schedule_type = @schedule.keys.first
schedule_value = @schedule[schedule_type]
raise LogStash::ConfigurationError, msg_invalid_schedule unless Schedule_types.include?(schedule_type)
@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
#as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead
opts = schedule_type == "every" ? { :first_in => 0.01 } : {}
@scheduler.send(schedule_type, schedule_value, opts) { run_once(queue) }
@scheduler.join
end
def run_once(queue)
@requests.each do |name, request|
request_async(queue, name, request)
end
client.execute!
end
private
def request_async(queue, name, request)
@logger.debug? && @logger.debug("Fetching URL", :name => name, :url => request)
started = Time.now
method, *request_opts = request
client.async.send(method, *request_opts).
on_success {|response| handle_success(queue, name, request, response, Time.now - started)}.
on_failure {|exception|
handle_failure(queue, name, request, exception, Time.now - started)
}
end
private
def handle_success(queue, name, request, response, execution_time)
body = response.body
# If there is a usable response. HEAD requests are `nil` and empty get
# responses come up as "" which will cause the codec to not yield anything
if body && body.size > 0
decode_and_flush(@codec, body) do |decoded|
event = @target ? LogStash::Event.new(@target => decoded.to_hash) : decoded
handle_decoded_event(queue, name, request, response, event, execution_time)
end
else
event = ::LogStash::Event.new
handle_decoded_event(queue, name, request, response, event, execution_time)
end
end
private
def decode_and_flush(codec, body, &yielder)
codec.decode(body, &yielder)
codec.flush(&yielder)
end
private
def handle_decoded_event(queue, name, request, response, event, execution_time)
apply_metadata(event, name, request, response, execution_time)
decorate(event)
queue << event
rescue StandardError, java.lang.Exception => e
@logger.error? && @logger.error("Error eventifying response!",
:exception => e,
:exception_message => e.message,
:name => name,
:url => request,
:response => response
)
end
private
# Beware, on old versions of manticore some uncommon failures are not handled
def handle_failure(queue, name, request, exception, execution_time)
event = LogStash::Event.new
apply_metadata(event, name, request)
event.tag("_http_request_failure")
# This is also in the metadata, but we send it anyone because we want this
# persisted by default, whereas metadata isn't. People don't like mysterious errors
event.set("http_request_failure", {
"request" => structure_request(request),
"name" => name,
"error" => exception.to_s,
"backtrace" => exception.backtrace,
"runtime_seconds" => execution_time
})
queue << event
rescue StandardError, java.lang.Exception => e
@logger.error? && @logger.error("Cannot read URL or send the error as an event!",
:exception => e,
:exception_message => e.message,
:exception_backtrace => e.backtrace,
:name => name)
# If we are running in debug mode we can display more information about the
# specific request which could give more details about the connection.
@logger.debug? && @logger.debug("Cannot read URL or send the error as an event!",
:exception => e,
:exception_message => e.message,
:exception_backtrace => e.backtrace,
:name => name,
:url => request)
end
private
def apply_metadata(event, name, request, response=nil, execution_time=nil)
return unless @metadata_target
event.set(@metadata_target, event_metadata(name, request, response, execution_time))
end
private
def event_metadata(name, request, response=nil, execution_time=nil)
m = {
"name" => name,
"host" => @host,
"request" => structure_request(request),
}
m["runtime_seconds"] = execution_time
if response
m["code"] = response.code
m["response_headers"] = response.headers
m["response_message"] = response.message
m["times_retried"] = response.times_retried
end
m
end
private
# Turn [method, url, spec] requests into a hash for friendlier logging / ES indexing
def structure_request(request)
method, url, spec = request
# Flatten everything into the 'spec' hash, also stringify any keys to normalize
Hash[(spec||{}).merge({
"method" => method.to_s,
"url" => url,
}).map {|k,v| [k.to_s,v] }]
end
end