1
1
require "pdsdk/version"
2
2
require "pdsdk/logger"
3
3
require "json"
4
- require 'concurrent'
5
4
6
5
module Pdsdk
7
6
class Error < StandardError
@@ -27,41 +26,48 @@ def bootstrap!
27
26
if !@secret_key
28
27
logger . warn "no $#{ ENV_SECRET_KEY_KEY } detected, will not sign payloads"
29
28
end
29
+
30
+ @hostname = ENV [ "PD_SDK_HOST" ] || "sdk.m.pipedream.net"
31
+ @proto = ENV [ "PD_SDK_PROTO" ] || "https"
30
32
end
31
33
32
- # XXX self.send_message for string and becomes { message } ?
33
- def send_event ( api_key , raw_event , opts = { } , include_response = false )
34
- hostname = ENV [ "PD_SDK_HOST" ] || "sdk.m.pipedream.net"
35
- proto = ENV [ "PD_SDK_PROTO" ] || "https"
34
+ def sync_send_event ( api_key , raw_event , opts = { } , include_response = false )
36
35
event = opts [ :exports ] || { }
37
36
event [ :raw_event ] = raw_event
37
+ # logger.info "going to send event: #{event} to #{api_key}"
38
38
if opts [ :deployment ]
39
- _uri = "#{ proto } ://#{ hostname } /pipelines/#{ api_key } /deployments/#{ opts [ :deployment ] } /events"
39
+ _uri = "#{ @ proto} ://#{ @ hostname} /pipelines/#{ api_key } /deployments/#{ opts [ :deployment ] } /events"
40
40
else
41
- _uri = "#{ proto } ://#{ hostname } /pipelines/#{ api_key } /events"
41
+ _uri = "#{ @ proto} ://#{ @ hostname} /pipelines/#{ api_key } /events"
42
42
end
43
43
uri = URI ( _uri )
44
- use_ssl = uri . scheme == "https"
45
- # TODO clean up old connections
46
- # TODO ensure reconnects if client disconnects
47
- @http_connection ||= Concurrent ::ThreadLocalVar . new { Net ::HTTP . start ( uri . host , uri . port , use_ssl : use_ssl , open_timeout : 1 ) }
48
- logger . info "going to send event: #{ event } to #{ api_key } " # TODO remove
49
- payload = event . to_json
50
- headers = {
51
- "user-agent" => "pipedream-sdk:ruby/1" ,
52
- "content-type" => "application/json" ,
53
- "accept" => "application/json" ,
54
- "x-pd-sdk-version" => Pdsdk ::VERSION ,
55
- }
56
- headers [ "x-pd-sig" ] = "sha256=#{ OpenSSL ::HMAC . hexdigest ( OpenSSL ::Digest . new ( 'sha256' ) , @secret_key , payload ) } " if @secret_key
57
- req = Net ::HTTP ::Post . new ( uri . request_uri , headers )
58
- req . body = payload
59
- resp = @http_connection . value . request ( req )
60
- logger . info "received response: #{ resp } " # TODO remove
61
- if include_response
62
- { 'code' => resp . code . to_i , 'body' => resp . body }
63
- else
64
- { 'code' => resp . code . to_i }
44
+ Net ::HTTP . start ( uri . host , uri . port , use_ssl : uri . scheme == "https" , open_timeout : 1 ) do |http |
45
+ payload = event . to_json
46
+ headers = {
47
+ "user-agent" => "pipedream-sdk:ruby/1" ,
48
+ "content-type" => "application/json" ,
49
+ "accept" => "application/json" ,
50
+ "x-pd-sdk-version" => Pdsdk ::VERSION ,
51
+ }
52
+ headers [ "x-pd-sig" ] = "sha256=#{ OpenSSL ::HMAC . hexdigest ( OpenSSL ::Digest . new ( 'sha256' ) , @secret_key , payload ) } " if @secret_key
53
+ req = Net ::HTTP ::Post . new ( uri . request_uri , headers )
54
+ req . body = payload
55
+ resp = http . request ( req )
56
+ # logger.info "received response: #{resp}"
57
+ if include_response
58
+ { 'code' => resp . code . to_i , 'body' => resp . body }
59
+ else
60
+ { 'code' => resp . code . to_i }
61
+ end
62
+ end
63
+ end
64
+
65
+ # XXX self.send_message for string and becomes { message } ?
66
+ def send_event ( api_key , raw_event , opts = { } )
67
+ # TODO make a proper worker with queue in separate thread rather than making new every time
68
+ # ... mostly so we can connect http client once
69
+ Thread . new do
70
+ sync_send_event ( api_key , raw_event , opts )
65
71
end
66
72
end
67
73
0 commit comments