5
5
require "socket" # for Socket.gethostname
6
6
require "manticore"
7
7
require "rufus/scheduler"
8
+ require "yaml" # persistence
8
9
9
10
class LogStash ::Inputs ::HTTP_Poller < LogStash ::Inputs ::Base
10
11
include LogStash ::PluginMixins ::HttpClient
@@ -57,10 +58,15 @@ def setup_requests!
57
58
@requests = Hash [ @urls . map { |name , url | [ name , normalize_request ( url ) ] } ]
58
59
end
59
60
61
+ private
62
+ def filter_dynamic_params ( allowed_keys , params )
63
+ params . slice ( *allowed_keys )
64
+ end
65
+
60
66
private
61
67
def normalize_request ( url_or_spec )
62
68
if url_or_spec . is_a? ( String )
63
- res = [ :get , url_or_spec ]
69
+ res = [ :get , url_or_spec , { } ]
64
70
elsif url_or_spec . is_a? ( Hash )
65
71
# The client will expect keys / values
66
72
spec = Hash [ url_or_spec . clone . map { |k , v | [ k . to_sym , v ] } ] # symbolize keys
@@ -77,17 +83,30 @@ def normalize_request(url_or_spec)
77
83
auth = spec [ :auth ]
78
84
user = spec . delete ( :user ) || ( auth && auth [ "user" ] )
79
85
password = spec . delete ( :password ) || ( auth && auth [ "password" ] )
80
-
86
+
81
87
if user . nil? ^ password . nil?
82
88
raise LogStash ::ConfigurationError , "'user' and 'password' must both be specified for input HTTP poller!"
83
89
end
84
90
85
91
if user && password
86
92
spec [ :auth ] = {
87
- user : user ,
93
+ user : user ,
88
94
pass : password ,
89
95
eager : true
90
- }
96
+ }
97
+ end
98
+
99
+ if spec . delete ( :use_dynamic_params )
100
+ last_dynamic_params_location = spec [ :last_dynamic_params ]
101
+ dynamic_params_map = spec [ :dynamic_params_map ]
102
+
103
+ if last_dynamic_params_location . is_a? ( String ) && File . exist? ( last_dynamic_params_location )
104
+ dynamic_params = YAML . load ( File . read ( last_dynamic_params_location ) )
105
+ allowed_keys = dynamic_params_map . is_a? ( Hash ) ? dynamic_params_map . keys : [ ]
106
+ spec [ :dynamic_params ] = filter_dynamic_params ( allowed_keys , dynamic_params )
107
+ else
108
+ spec [ :dynamic_params ] = { }
109
+ end
91
110
end
92
111
res = [ method , url , spec ]
93
112
else
@@ -133,13 +152,23 @@ def setup_schedule(queue)
133
152
134
153
@scheduler = Rufus ::Scheduler . new ( :max_work_threads => 1 )
135
154
#as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead
136
- opts = schedule_type == "every" ? { :first_in => 0.01 } : { }
155
+ opts = schedule_type == "every" ? { :first_in => 0.01 } : { }
137
156
@scheduler . send ( schedule_type , schedule_value , opts ) { run_once ( queue ) }
138
157
@scheduler . join
139
158
end
140
159
160
+ private
161
+ def assign_dynamic_params ( request )
162
+ params = request [ 2 ] [ :dynamic_params ]
163
+ request [ 2 ] [ :query ] = { } if !request [ 2 ] [ :query ]
164
+ params . keys . each do |key |
165
+ request [ 2 ] [ :query ] [ key ] = params [ key ]
166
+ end
167
+ end
168
+
141
169
def run_once ( queue )
142
170
@requests . each do |name , request |
171
+ assign_dynamic_params ( request ) if request [ 2 ] [ :dynamic_params ]
143
172
request_async ( queue , name , request )
144
173
end
145
174
@@ -175,11 +204,23 @@ def handle_success(queue, name, request, response, execution_time)
175
204
end
176
205
end
177
206
207
+ private
208
+ def update_dynamic_params ( request , event )
209
+ request [ 2 ] [ :dynamic_params_map ] . keys . each do |key |
210
+ value = request [ 2 ] [ :dynamic_params_map ] [ key ]
211
+ event_value = event . get ( value )
212
+ request [ 2 ] [ :dynamic_params ] [ key ] = event_value if event_value
213
+ end
214
+ File . write ( request [ 2 ] [ :last_dynamic_params ] , YAML . dump ( request [ 2 ] [ :dynamic_params ] ) )
215
+ end
216
+
178
217
private
179
218
def handle_decoded_event ( queue , name , request , response , event , execution_time )
180
219
apply_metadata ( event , name , request , response , execution_time )
181
220
decorate ( event )
182
221
queue << event
222
+
223
+ update_dynamic_params ( request , event ) if request [ 2 ] [ :dynamic_params ]
183
224
rescue StandardError , java . lang . Exception => e
184
225
@logger . error? && @logger . error ( "Error eventifying response!" ,
185
226
:exception => e ,
0 commit comments