Skip to content

[WIP] Add support for AsyncElasticsearch #1007

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- `opentelemetry-instrumentation-elasticsearch` added support for AsyncElasticsearch
([#1007])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1007)
- `opentelemetry-instrumentation-psycopg2` extended the sql commenter support of dbapi into psycopg2
([#940](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/940))

### Fixed

- `opentelemetry-instrumentation-flask` Fix non-recording span bug
([#999])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/999)
- `opentelemetry-instrumentation-tornado` Fix non-recording span bug
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def instrumentation_dependencies(self) -> Collection[str]:

def _instrument(self, **kwargs):
"""
Instruments elasticsarch module
Instruments elasticsearch module
"""
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)
Expand All @@ -143,49 +143,124 @@ def _instrument(self, **kwargs):
tracer, self._span_name_prefix, request_hook, response_hook
),
)
_wrap(
elasticsearch,
"AsyncTransport.perform_request",
_wrap_perform_async_request(
tracer, self._span_name_prefix, request_hook, response_hook
),
)

def _uninstrument(self, **kwargs):
unwrap(elasticsearch.Transport, "perform_request")
unwrap(elasticsearch.AsyncTransport, "perform_request")


_regex_doc_url = re.compile(r"/_doc/([^/]+)")


def _extract(args, kwargs, span_name_prefix):
method = url = None
try:
method, url, *_ = args
except IndexError:
logger.warning(
"expected perform_request to receive two positional arguments. "
"Got %d",
len(args),
)
op_name = span_name_prefix + (url or method or _DEFAULT_OP_NAME)
doc_id = None
if url:
# TODO: This regex-based solution avoids creating an unbounded number of span names,
# but should be replaced by instrumenting individual Elasticsearch methods instead of
# Transport.perform_request()
# A limitation of the regex is that only the '_doc' mapping type is supported.
# Mapping types are deprecated since Elasticsearch 7
# https://github.com/open-telemetry/opentelemetry-python-contrib/issues/708
match = _regex_doc_url.search(url)
if match is not None:
# Remove the full document ID from the URL
doc_span = match.span()
op_name = (
span_name_prefix
+ url[: doc_span[0]]
+ "/_doc/:id"
+ url[doc_span[1] :]
)
# Put the document ID in attributes
doc_id = match.group(1)
params = kwargs.get("params", {})
body = kwargs.get("body", None)
return method, url, op_name, body, params, doc_id


def _set_span_attributes(span, url, method, body, params, doc_id):
attributes = {
SpanAttributes.DB_SYSTEM: "elasticsearch",
}
if url:
attributes["elasticsearch.url"] = url
if method:
attributes["elasticsearch.method"] = method
if body:
attributes[SpanAttributes.DB_STATEMENT] = str(body)
if params:
attributes["elasticsearch.params"] = str(params)
if doc_id:
attributes["elasticsearch.id"] = doc_id
for key, value in attributes.items():
span.set_attribute(key, value)


def _set_span_attributes_from_rv(span, return_value):
for member in _ATTRIBUTES_FROM_RESULT:
if member in return_value:
span.set_attribute(
f"elasticsearch.{member}",
str(return_value[member]),
)


def _wrap_perform_request(
tracer, span_name_prefix, request_hook=None, response_hook=None
):
# pylint: disable=R0912
def wrapper(wrapped, _, args, kwargs):
method = url = None
try:
method, url, *_ = args
except IndexError:
logger.warning(
"expected perform_request to receive two positional arguments. "
"Got %d",
len(args),
)
method, url, op_name, body, params, doc_id = _extract(
args, kwargs, span_name_prefix
)

op_name = span_name_prefix + (url or method or _DEFAULT_OP_NAME)
doc_id = None
if url:
# TODO: This regex-based solution avoids creating an unbounded number of span names, but should be replaced by instrumenting individual Elasticsearch methods instead of Transport.perform_request()
# A limitation of the regex is that only the '_doc' mapping type is supported. Mapping types are deprecated since Elasticsearch 7
# https://github.com/open-telemetry/opentelemetry-python-contrib/issues/708
match = _regex_doc_url.search(url)
if match is not None:
# Remove the full document ID from the URL
doc_span = match.span()
op_name = (
span_name_prefix
+ url[: doc_span[0]]
+ "/_doc/:id"
+ url[doc_span[1] :]
)
# Put the document ID in attributes
doc_id = match.group(1)
params = kwargs.get("params", {})
body = kwargs.get("body", None)
with tracer.start_as_current_span(
op_name,
kind=SpanKind.CLIENT,
) as span:

if callable(request_hook):
request_hook(span, method, url, kwargs)

if span.is_recording():
_set_span_attributes(span, url, method, body, params, doc_id)

return_value = wrapped(*args, **kwargs)
if isinstance(return_value, dict) and span.is_recording():
_set_span_attributes_from_rv(span, return_value)

if callable(response_hook):
response_hook(span, return_value)
return return_value

return wrapper


def _wrap_perform_async_request(
tracer, span_name_prefix, request_hook=None, response_hook=None
):
# pylint: disable=R0912
async def wrapper(wrapped, _, args, kwargs):
method, url, op_name, body, params, doc_id = _extract(
args, kwargs, span_name_prefix
)

with tracer.start_as_current_span(
op_name,
Expand All @@ -196,33 +271,14 @@ def wrapper(wrapped, _, args, kwargs):
request_hook(span, method, url, kwargs)

if span.is_recording():
attributes = {
SpanAttributes.DB_SYSTEM: "elasticsearch",
}
if url:
attributes["elasticsearch.url"] = url
if method:
attributes["elasticsearch.method"] = method
if body:
attributes[SpanAttributes.DB_STATEMENT] = str(body)
if params:
attributes["elasticsearch.params"] = str(params)
if doc_id:
attributes["elasticsearch.id"] = doc_id
for key, value in attributes.items():
span.set_attribute(key, value)

rv = wrapped(*args, **kwargs)
if isinstance(rv, dict) and span.is_recording():
for member in _ATTRIBUTES_FROM_RESULT:
if member in rv:
span.set_attribute(
f"elasticsearch.{member}",
str(rv[member]),
)
_set_span_attributes(span, url, method, body, params, doc_id)

return_value = await wrapped(*args, **kwargs)
if isinstance(return_value, dict) and span.is_recording():
_set_span_attributes_from_rv(span, return_value)

if callable(response_hook):
response_hook(span, rv)
return rv
response_hook(span, return_value)
return return_value

return wrapper
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# limitations under the License.


_instruments = ("elasticsearch >= 2.0",)
_instruments = ("elasticsearch >= 2.0, < 8.0.0",)