-
Notifications
You must be signed in to change notification settings - Fork 682
/
Copy pathpatch.py
196 lines (146 loc) · 6.1 KB
/
patch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# 3p
import psycopg2
from ddtrace.vendor import wrapt
# project
from ddtrace import Pin, config
from ddtrace.contrib import dbapi
from ddtrace.ext import sql, net, db
# Original connect method
_connect = psycopg2.connect
# psycopg2 versions can end in `-betaN` where `N` is a number
# in such cases we simply skip version specific patching
PSYCOPG2_VERSION = (0, 0, 0)
try:
PSYCOPG2_VERSION = tuple(map(int, psycopg2.__version__.split()[0].split('.')))
except Exception:
pass
if PSYCOPG2_VERSION >= (2, 7):
from psycopg2.sql import Composable
def patch():
""" Patch monkey patches psycopg's connection function
so that the connection's functions are traced.
"""
if getattr(psycopg2, '_datadog_patch', False):
return
setattr(psycopg2, '_datadog_patch', True)
wrapt.wrap_function_wrapper(psycopg2, 'connect', patched_connect)
_patch_extensions(_psycopg2_extensions) # do this early just in case
def unpatch():
if getattr(psycopg2, '_datadog_patch', False):
setattr(psycopg2, '_datadog_patch', False)
psycopg2.connect = _connect
class Psycopg2TracedCursor(dbapi.TracedCursor):
""" TracedCursor for psycopg2 """
def _trace_method(self, method, name, resource, extra_tags, *args, **kwargs):
# treat psycopg2.sql.Composable resource objects as strings
if PSYCOPG2_VERSION >= (2, 7) and isinstance(resource, Composable):
resource = resource.as_string(self.__wrapped__)
return super(Psycopg2TracedCursor, self)._trace_method(method, name, resource, extra_tags, *args, **kwargs)
class Psycopg2FetchTracedCursor(Psycopg2TracedCursor, dbapi.FetchTracedCursor):
""" FetchTracedCursor for psycopg2 """
class Psycopg2TracedConnection(dbapi.TracedConnection):
""" TracedConnection wraps a Connection with tracing code. """
def __init__(self, conn, pin=None, cursor_cls=None):
if not cursor_cls:
# Do not trace `fetch*` methods by default
cursor_cls = Psycopg2TracedCursor
if config.dbapi2.trace_fetch_methods:
cursor_cls = Psycopg2FetchTracedCursor
super(Psycopg2TracedConnection, self).__init__(conn, pin, cursor_cls=cursor_cls)
def patch_conn(conn, traced_conn_cls=Psycopg2TracedConnection):
""" Wrap will patch the instance so that its queries are traced."""
# ensure we've patched extensions (this is idempotent) in
# case we're only tracing some connections.
_patch_extensions(_psycopg2_extensions)
c = traced_conn_cls(conn)
# fetch tags from the dsn
dsn = sql.parse_pg_dsn(conn.dsn)
tags = {
net.TARGET_HOST: dsn.get('host'),
net.TARGET_PORT: dsn.get('port'),
db.NAME: dsn.get('dbname'),
db.USER: dsn.get('user'),
'db.application': dsn.get('application_name'),
}
Pin(
service='postgres',
app='postgres',
tags=tags).onto(c)
return c
def _patch_extensions(_extensions):
# we must patch extensions all the time (it's pretty harmless) so split
# from global patching of connections. must be idempotent.
for _, module, func, wrapper in _extensions:
if not hasattr(module, func) or isinstance(getattr(module, func), wrapt.ObjectProxy):
continue
wrapt.wrap_function_wrapper(module, func, wrapper)
def _unpatch_extensions(_extensions):
# we must patch extensions all the time (it's pretty harmless) so split
# from global patching of connections. must be idempotent.
for original, module, func, _ in _extensions:
setattr(module, func, original)
#
# monkeypatch targets
#
def patched_connect(connect_func, _, args, kwargs):
conn = connect_func(*args, **kwargs)
return patch_conn(conn)
def _extensions_register_type(func, _, args, kwargs):
def _unroll_args(obj, scope=None):
return obj, scope
obj, scope = _unroll_args(*args, **kwargs)
# register_type performs a c-level check of the object
# type so we must be sure to pass in the actual db connection
if scope and isinstance(scope, wrapt.ObjectProxy):
scope = scope.__wrapped__
return func(obj, scope) if scope else func(obj)
def _extensions_quote_ident(func, _, args, kwargs):
def _unroll_args(obj, scope=None):
return obj, scope
obj, scope = _unroll_args(*args, **kwargs)
# register_type performs a c-level check of the object
# type so we must be sure to pass in the actual db connection
if scope and isinstance(scope, wrapt.ObjectProxy):
scope = scope.__wrapped__
return func(obj, scope) if scope else func(obj)
def _extensions_adapt(func, _, args, kwargs):
adapt = func(*args, **kwargs)
if hasattr(adapt, 'prepare'):
return AdapterWrapper(adapt)
return adapt
class AdapterWrapper(wrapt.ObjectProxy):
def prepare(self, *args, **kwargs):
func = self.__wrapped__.prepare
if not args:
return func(*args, **kwargs)
conn = args[0]
# prepare performs a c-level check of the object type so
# we must be sure to pass in the actual db connection
if isinstance(conn, wrapt.ObjectProxy):
conn = conn.__wrapped__
return func(conn, *args[1:], **kwargs)
# extension hooks
_psycopg2_extensions = [
(psycopg2.extensions.register_type,
psycopg2.extensions, 'register_type',
_extensions_register_type),
(psycopg2._psycopg.register_type,
psycopg2._psycopg, 'register_type',
_extensions_register_type),
(psycopg2.extensions.adapt,
psycopg2.extensions, 'adapt',
_extensions_adapt),
]
# `_json` attribute is only available for psycopg >= 2.5
if getattr(psycopg2, '_json', None):
_psycopg2_extensions += [
(psycopg2._json.register_type,
psycopg2._json, 'register_type',
_extensions_register_type),
]
# `quote_ident` attribute is only available for psycopg >= 2.7
if getattr(psycopg2, 'extensions', None) and getattr(psycopg2.extensions,
'quote_ident', None):
_psycopg2_extensions += [
(psycopg2.extensions.quote_ident, psycopg2.extensions, 'quote_ident', _extensions_quote_ident),
]