Skip to content

Add more tests, fix compliance to semantics #236

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Dec 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#237](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/237))
- Add Prometheus Remote Write Exporter integration tests in opentelemetry-docker-tests
([#216](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/216))
- `opentelemetry-instrumentation-grpc` Add tests for grpc span attributes, grpc `abort()` conditions
([#236](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/236))

### Changed
- `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-wsgi` Return `None` for `CarrierGetter` if key not found
([#1374](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/233))
- `opentelemetry-instrumentation-grpc` Comply with updated spec, rework tests
([#236](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/236))

## [0.16b1](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.16b1) - 2020-11-26

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,12 @@ def set_trailing_metadata(self, *args, **kwargs):
def abort(self, code, details):
self.code = code
self.details = details
self._active_span.set_attribute("rpc.grpc.status_code", code.name)
self._active_span.set_attribute("rpc.grpc.status_code", code.value[0])
self._active_span.set_status(
Status(status_code=StatusCode.ERROR, description=details)
Status(
status_code=StatusCode.ERROR,
description="{}:{}".format(code, details),
)
)
return self._servicer_context.abort(code, details)

Expand All @@ -126,17 +129,25 @@ def set_code(self, code):
self.code = code
# use details if we already have it, otherwise the status description
details = self.details or code.value[1]
self._active_span.set_attribute("rpc.grpc.status_code", code.name)
self._active_span.set_status(
Status(status_code=StatusCode.ERROR, description=details)
)
self._active_span.set_attribute("rpc.grpc.status_code", code.value[0])
if code != grpc.StatusCode.OK:
self._active_span.set_status(
Status(
status_code=StatusCode.ERROR,
description="{}:{}".format(code, details),
)
)
return self._servicer_context.set_code(code)

def set_details(self, details):
self.details = details
self._active_span.set_status(
Status(status_code=StatusCode.ERROR, description=details)
)
if self.code != grpc.StatusCode.OK:
self._active_span.set_status(
Status(
status_code=StatusCode.ERROR,
description="{}:{}".format(self.code, details),
)
)
return self._servicer_context.set_details(details)


Expand Down Expand Up @@ -181,12 +192,20 @@ def _set_remote_context(self, servicer_context):

def _start_span(self, handler_call_details, context):

# standard attributes
attributes = {
"rpc.method": handler_call_details.method,
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK,
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
}

# if we have details about the call, split into service and method
if handler_call_details.method:
service, method = handler_call_details.method.lstrip("/").split(
"/", 1
)
attributes.update({"rpc.method": method, "rpc.service": service})

# add some attributes from the metadata
metadata = dict(context.invocation_metadata())
if "user-agent" in metadata:
attributes["rpc.user_agent"] = metadata["user-agent"]
Expand All @@ -198,15 +217,15 @@ def _start_span(self, handler_call_details, context):
# * ipv4:10.2.1.1:57284,127.0.0.1:57284
#
try:
host, port = (
ip, port = (
context.peer().split(",")[0].split(":", 1)[1].rsplit(":", 1)
)
attributes.update({"net.peer.ip": ip, "net.peer.port": port})

# other telemetry sources convert this, so we will too
if host in ("[::1]", "127.0.0.1"):
host = "localhost"
# other telemetry sources add this, so we will too
if ip in ("[::1]", "127.0.0.1"):
attributes["net.peer.name"] = "localhost"

attributes.update({"net.peer.name": host, "net.peer.port": port})
except IndexError:
logger.warning("Failed to parse peer address '%s'", context.peer())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
)
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace.status import StatusCode


class UnaryUnaryMethodHandler(grpc.RpcMethodHandler):
Expand Down Expand Up @@ -67,20 +68,37 @@ def handler(request, context):
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

rpc_call = "TestServicer/handler"
try:
server.start()
channel.unary_unary("test")(b"test")
channel.unary_unary(rpc_call)(b"test")
finally:
server.stop(None)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, "test")
self.assertEqual(span.name, rpc_call)
self.assertIs(span.kind, trace.SpanKind.SERVER)

# Check version and name in span's instrumentation info
self.check_span_instrumentation_info(
span, opentelemetry.instrumentation.grpc
)

# Check attributes
self.assert_span_has_attributes(
span,
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "handler",
"rpc.service": "TestServicer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

grpc_server_instrumentor.uninstrument()

def test_uninstrument(self):
Expand All @@ -100,9 +118,10 @@ def handler(request, context):
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

rpc_call = "TestServicer/test"
try:
server.start()
channel.unary_unary("test")(b"test")
channel.unary_unary(rpc_call)(b"test")
finally:
server.stop(None)

Expand Down Expand Up @@ -130,24 +149,38 @@ def handler(request, context):
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

rpc_call = "TestServicer/handler"
try:
server.start()
channel.unary_unary("")(b"")
channel.unary_unary(rpc_call)(b"")
finally:
server.stop(None)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]

self.assertEqual(span.name, "")
self.assertEqual(span.name, rpc_call)
self.assertIs(span.kind, trace.SpanKind.SERVER)

# Check version and name in span's instrumentation info
self.check_span_instrumentation_info(
span, opentelemetry.instrumentation.grpc
)

# Check attributes
self.assert_span_has_attributes(
span,
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "handler",
"rpc.service": "TestServicer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def test_span_lifetime(self):
"""Check that the span is active for the duration of the call."""

Expand All @@ -174,7 +207,7 @@ def handler(request, context):
active_span_before_call = trace.get_current_span()
try:
server.start()
channel.unary_unary("")(b"")
channel.unary_unary("TestServicer/handler")(b"")
finally:
server.stop(None)
active_span_after_call = trace.get_current_span()
Expand Down Expand Up @@ -208,20 +241,34 @@ def handler(request, context):

try:
server.start()
channel.unary_unary("")(b"")
channel.unary_unary("")(b"")
channel.unary_unary("TestServicer/handler")(b"")
channel.unary_unary("TestServicer/handler")(b"")
finally:
server.stop(None)

self.assertEqual(len(active_spans_in_handler), 2)
# pylint:disable=unbalanced-tuple-unpacking
span1, span2 = active_spans_in_handler
# Spans should belong to separate traces, and each should be a root
# span
# Spans should belong to separate traces
self.assertNotEqual(span1.context.span_id, span2.context.span_id)
self.assertNotEqual(span1.context.trace_id, span2.context.trace_id)
self.assertIsNone(span1.parent)
self.assertIsNone(span1.parent)

for span in (span1, span2):
# each should be a root span
self.assertIsNone(span2.parent)

# check attributes
self.assert_span_has_attributes(
span,
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "handler",
"rpc.service": "TestServicer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def test_concurrent_server_spans(self):
"""Check that concurrent RPC calls don't interfere with each other.
Expand Down Expand Up @@ -258,21 +305,107 @@ def handler(request, context):
# Interleave calls so spans are active on each thread at the same
# time
with futures.ThreadPoolExecutor(max_workers=2) as tpe:
f1 = tpe.submit(channel.unary_unary(""), b"")
f2 = tpe.submit(channel.unary_unary(""), b"")
f1 = tpe.submit(
channel.unary_unary("TestServicer/handler"), b""
)
f2 = tpe.submit(
channel.unary_unary("TestServicer/handler"), b""
)
futures.wait((f1, f2))
finally:
server.stop(None)

self.assertEqual(len(active_spans_in_handler), 2)
# pylint:disable=unbalanced-tuple-unpacking
span1, span2 = active_spans_in_handler
# Spans should belong to separate traces, and each should be a root
# span
# Spans should belong to separate traces
self.assertNotEqual(span1.context.span_id, span2.context.span_id)
self.assertNotEqual(span1.context.trace_id, span2.context.trace_id)
self.assertIsNone(span1.parent)
self.assertIsNone(span1.parent)

for span in (span1, span2):
# each should be a root span
self.assertIsNone(span2.parent)

# check attributes
self.assert_span_has_attributes(
span,
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "handler",
"rpc.service": "TestServicer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def test_abort(self):
"""Check that we can catch an abort properly"""

# Intercept gRPC calls...
interceptor = server_interceptor()

# our detailed failure message
failure_message = "This is a test failure"

# aborting RPC handler
def handler(request, context):
context.abort(grpc.StatusCode.FAILED_PRECONDITION, failure_message)

server = grpc.server(
futures.ThreadPoolExecutor(max_workers=1),
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)

server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))

port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel("localhost:{:d}".format(port))

rpc_call = "TestServicer/handler"

server.start()
# unfortunately, these are just bare exceptions in grpc...
with self.assertRaises(Exception):
channel.unary_unary(rpc_call)(b"")
server.stop(None)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]

self.assertEqual(span.name, rpc_call)
self.assertIs(span.kind, trace.SpanKind.SERVER)

# Check version and name in span's instrumentation info
self.check_span_instrumentation_info(
span, opentelemetry.instrumentation.grpc
)

# make sure this span errored, with the right status and detail
self.assertEqual(span.status.status_code, StatusCode.ERROR)
self.assertEqual(
span.status.description,
"{}:{}".format(
grpc.StatusCode.FAILED_PRECONDITION, failure_message
),
)

# Check attributes
self.assert_span_has_attributes(
span,
{
"net.peer.ip": "[::1]",
"net.peer.name": "localhost",
"rpc.method": "handler",
"rpc.service": "TestServicer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.FAILED_PRECONDITION.value[
0
],
},
)


def get_latch(num):
Expand Down