Skip to content

Commit df32e8c

Browse files
authored
add elasticsearch db.statement sanitization (#1598)
1 parent 7af87e1 commit df32e8c

File tree

5 files changed

+228
-23
lines changed

5 files changed

+228
-23
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
### Added
1111

1212
- `opentelemetry-instrumentation-redis` Add `sanitize_query` config option to allow query sanitization. ([#1572](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1572))
13+
- `opentelemetry-instrumentation-elasticsearch` Add optional db.statement query sanitization.
14+
([#1598](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1598))
1315
- `opentelemetry-instrumentation-celery` Record exceptions as events on the span.
1416
([#1573](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1573))
1517
- Add metric instrumentation for urllib

instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py

+18-3
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
4545
The instrument() method accepts the following keyword args:
4646
tracer_provider (TracerProvider) - an optional tracer provider
47+
sanitize_query (bool) - an optional query sanitization flag
4748
request_hook (Callable) - a function with extra user-defined logic to be performed before performing the request
4849
this function signature is:
4950
def request_hook(span: Span, method: str, url: str, kwargs)
@@ -96,6 +97,8 @@ def response_hook(span, response):
9697
from opentelemetry.semconv.trace import SpanAttributes
9798
from opentelemetry.trace import SpanKind, get_tracer
9899

100+
from .utils import sanitize_body
101+
99102
logger = getLogger(__name__)
100103

101104

@@ -135,11 +138,16 @@ def _instrument(self, **kwargs):
135138
tracer = get_tracer(__name__, __version__, tracer_provider)
136139
request_hook = kwargs.get("request_hook")
137140
response_hook = kwargs.get("response_hook")
141+
sanitize_query = kwargs.get("sanitize_query", False)
138142
_wrap(
139143
elasticsearch,
140144
"Transport.perform_request",
141145
_wrap_perform_request(
142-
tracer, self._span_name_prefix, request_hook, response_hook
146+
tracer,
147+
sanitize_query,
148+
self._span_name_prefix,
149+
request_hook,
150+
response_hook,
143151
),
144152
)
145153

@@ -154,7 +162,11 @@ def _uninstrument(self, **kwargs):
154162

155163

156164
def _wrap_perform_request(
157-
tracer, span_name_prefix, request_hook=None, response_hook=None
165+
tracer,
166+
sanitize_query,
167+
span_name_prefix,
168+
request_hook=None,
169+
response_hook=None,
158170
):
159171
# pylint: disable=R0912,R0914
160172
def wrapper(wrapped, _, args, kwargs):
@@ -213,7 +225,10 @@ def wrapper(wrapped, _, args, kwargs):
213225
if method:
214226
attributes["elasticsearch.method"] = method
215227
if body:
216-
attributes[SpanAttributes.DB_STATEMENT] = str(body)
228+
statement = str(body)
229+
if sanitize_query:
230+
statement = sanitize_body(body)
231+
attributes[SpanAttributes.DB_STATEMENT] = statement
217232
if params:
218233
attributes["elasticsearch.params"] = str(params)
219234
if doc_id:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
sanitized_keys = (
16+
"message",
17+
"should",
18+
"filter",
19+
"query",
20+
"queries",
21+
"intervals",
22+
"match",
23+
)
24+
sanitized_value = "?"
25+
26+
27+
# pylint: disable=C0103
28+
def _flatten_dict(d, parent_key=""):
29+
items = []
30+
for k, v in d.items():
31+
new_key = parent_key + "." + k if parent_key else k
32+
if isinstance(v, dict):
33+
items.extend(_flatten_dict(v, new_key).items())
34+
else:
35+
items.append((new_key, v))
36+
return dict(items)
37+
38+
39+
def _unflatten_dict(d):
40+
res = {}
41+
for k, v in d.items():
42+
keys = k.split(".")
43+
d = res
44+
for key in keys[:-1]:
45+
if key not in d:
46+
d[key] = {}
47+
d = d[key]
48+
d[keys[-1]] = v
49+
return res
50+
51+
52+
def sanitize_body(body) -> str:
53+
flatten_body = _flatten_dict(body)
54+
55+
for key in flatten_body:
56+
if key.endswith(sanitized_keys):
57+
flatten_body[key] = sanitized_value
58+
59+
return str(_unflatten_dict(flatten_body))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
interval_query = {
2+
"query": {
3+
"intervals": {
4+
"my_text": {
5+
"all_of": {
6+
"ordered": True,
7+
"intervals": [
8+
{
9+
"match": {
10+
"query": "my favorite food",
11+
"max_gaps": 0,
12+
"ordered": True,
13+
}
14+
},
15+
{
16+
"any_of": {
17+
"intervals": [
18+
{"match": {"query": "hot water"}},
19+
{"match": {"query": "cold porridge"}},
20+
]
21+
}
22+
},
23+
],
24+
}
25+
}
26+
}
27+
}
28+
}
29+
30+
match_query = {"query": {"match": {"message": {"query": "this is a test"}}}}
31+
32+
filter_query = {
33+
"query": {
34+
"bool": {
35+
"must": [
36+
{"match": {"title": "Search"}},
37+
{"match": {"content": "Elasticsearch"}},
38+
],
39+
"filter": [
40+
{"term": {"status": "published"}},
41+
{"range": {"publish_date": {"gte": "2015-01-01"}}},
42+
],
43+
}
44+
}
45+
}
46+
47+
interval_query_sanitized = {
48+
"query": {
49+
"intervals": {
50+
"my_text": {"all_of": {"ordered": True, "intervals": "?"}}
51+
}
52+
}
53+
}
54+
match_query_sanitized = {"query": {"match": {"message": {"query": "?"}}}}
55+
filter_query_sanitized = {
56+
"query": {
57+
"bool": {
58+
"must": [
59+
{"match": {"title": "Search"}},
60+
{"match": {"content": "Elasticsearch"}},
61+
],
62+
"filter": "?",
63+
}
64+
}
65+
}

instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py

+84-20
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
1415
import json
1516
import os
1617
import threading
@@ -27,10 +28,13 @@
2728
from opentelemetry.instrumentation.elasticsearch import (
2829
ElasticsearchInstrumentor,
2930
)
31+
from opentelemetry.instrumentation.elasticsearch.utils import sanitize_body
3032
from opentelemetry.semconv.trace import SpanAttributes
3133
from opentelemetry.test.test_base import TestBase
3234
from opentelemetry.trace import StatusCode
3335

36+
from . import sanitization_queries # pylint: disable=no-name-in-module
37+
3438
major_version = elasticsearch.VERSION[0]
3539

3640
if major_version == 7:
@@ -42,14 +46,29 @@
4246
else:
4347
from . import helpers_es2 as helpers # pylint: disable=no-name-in-module
4448

45-
4649
Article = helpers.Article
4750

4851

4952
@mock.patch(
5053
"elasticsearch.connection.http_urllib3.Urllib3HttpConnection.perform_request"
5154
)
5255
class TestElasticsearchIntegration(TestBase):
56+
search_attributes = {
57+
SpanAttributes.DB_SYSTEM: "elasticsearch",
58+
"elasticsearch.url": "/test-index/_search",
59+
"elasticsearch.method": helpers.dsl_search_method,
60+
"elasticsearch.target": "test-index",
61+
SpanAttributes.DB_STATEMENT: str(
62+
{"query": {"bool": {"filter": [{"term": {"author": "testing"}}]}}}
63+
),
64+
}
65+
66+
create_attributes = {
67+
SpanAttributes.DB_SYSTEM: "elasticsearch",
68+
"elasticsearch.url": "/test-index",
69+
"elasticsearch.method": "HEAD",
70+
}
71+
5372
def setUp(self):
5473
super().setUp()
5574
self.tracer = self.tracer_provider.get_tracer(__name__)
@@ -241,21 +260,36 @@ def test_dsl_search(self, request_mock):
241260
self.assertIsNotNone(span.end_time)
242261
self.assertEqual(
243262
span.attributes,
263+
self.search_attributes,
264+
)
265+
266+
def test_dsl_search_sanitized(self, request_mock):
267+
# Reset instrumentation to use sanitized query (default)
268+
ElasticsearchInstrumentor().uninstrument()
269+
ElasticsearchInstrumentor().instrument(sanitize_query=True)
270+
271+
# update expected attributes to match sanitized query
272+
sanitized_search_attributes = self.search_attributes.copy()
273+
sanitized_search_attributes.update(
244274
{
245-
SpanAttributes.DB_SYSTEM: "elasticsearch",
246-
"elasticsearch.url": "/test-index/_search",
247-
"elasticsearch.method": helpers.dsl_search_method,
248-
"elasticsearch.target": "test-index",
249-
SpanAttributes.DB_STATEMENT: str(
250-
{
251-
"query": {
252-
"bool": {
253-
"filter": [{"term": {"author": "testing"}}]
254-
}
255-
}
256-
}
257-
),
258-
},
275+
SpanAttributes.DB_STATEMENT: "{'query': {'bool': {'filter': '?'}}}"
276+
}
277+
)
278+
279+
request_mock.return_value = (1, {}, '{"hits": {"hits": []}}')
280+
client = Elasticsearch()
281+
search = Search(using=client, index="test-index").filter(
282+
"term", author="testing"
283+
)
284+
search.execute()
285+
spans = self.get_finished_spans()
286+
span = spans[0]
287+
self.assertEqual(1, len(spans))
288+
self.assertEqual(span.name, "Elasticsearch/<target>/_search")
289+
self.assertIsNotNone(span.end_time)
290+
self.assertEqual(
291+
span.attributes,
292+
sanitized_search_attributes,
259293
)
260294

261295
def test_dsl_create(self, request_mock):
@@ -264,17 +298,14 @@ def test_dsl_create(self, request_mock):
264298
Article.init(using=client)
265299

266300
spans = self.get_finished_spans()
301+
assert spans
267302
self.assertEqual(2, len(spans))
268303
span1 = spans.by_attr(key="elasticsearch.method", value="HEAD")
269304
span2 = spans.by_attr(key="elasticsearch.method", value="PUT")
270305

271306
self.assertEqual(
272307
span1.attributes,
273-
{
274-
SpanAttributes.DB_SYSTEM: "elasticsearch",
275-
"elasticsearch.url": "/test-index",
276-
"elasticsearch.method": "HEAD",
277-
},
308+
self.create_attributes,
278309
)
279310

280311
attributes = {
@@ -288,6 +319,25 @@ def test_dsl_create(self, request_mock):
288319
helpers.dsl_create_statement,
289320
)
290321

322+
def test_dsl_create_sanitized(self, request_mock):
323+
# Reset instrumentation to explicitly use sanitized query
324+
ElasticsearchInstrumentor().uninstrument()
325+
ElasticsearchInstrumentor().instrument(sanitize_query=True)
326+
request_mock.return_value = (1, {}, {})
327+
client = Elasticsearch()
328+
Article.init(using=client)
329+
330+
spans = self.get_finished_spans()
331+
assert spans
332+
333+
self.assertEqual(2, len(spans))
334+
span = spans.by_attr(key="elasticsearch.method", value="HEAD")
335+
336+
self.assertEqual(
337+
span.attributes,
338+
self.create_attributes,
339+
)
340+
291341
def test_dsl_index(self, request_mock):
292342
request_mock.return_value = helpers.dsl_index_result
293343

@@ -412,3 +462,17 @@ def response_hook(span, response):
412462
json.dumps(response_payload),
413463
spans[0].attributes[response_attribute_name],
414464
)
465+
466+
def test_body_sanitization(self, _):
467+
self.assertEqual(
468+
sanitize_body(sanitization_queries.interval_query),
469+
str(sanitization_queries.interval_query_sanitized),
470+
)
471+
self.assertEqual(
472+
sanitize_body(sanitization_queries.match_query),
473+
str(sanitization_queries.match_query_sanitized),
474+
)
475+
self.assertEqual(
476+
sanitize_body(sanitization_queries.filter_query),
477+
str(sanitization_queries.filter_query_sanitized),
478+
)

0 commit comments

Comments
 (0)