diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 7d1314e8..0da03c7e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -4,6 +4,7 @@ CHANGELOG unreleased ========== +* feature: Stream Django ORM SQL queries and add flag to toggle their streaming * feature: Recursively patch any given module functions with capture 2.2.0 diff --git a/README.md b/README.md index 947824ca..bd9b495e 100644 --- a/README.md +++ b/README.md @@ -251,6 +251,19 @@ with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: pass ``` +### Trace SQL queries +By default, if no other value is provided to `.configure()`, SQL trace streaming is enabled +for all the supported DB engines. Those currently are: +- Any engine attached to the Django ORM. +- Any engine attached to SQLAlchemy. + +The behaviour can be toggled by sending the appropriate `stream_sql` value, for example: +```python +from aws_xray_sdk.core import xray_recorder + +xray_recorder.configure(service='fallback_name', stream_sql=True) +``` + ### Patch third-party libraries ```python @@ -297,6 +310,10 @@ MIDDLEWARE = [ # ... other middlewares ] ``` +#### SQL tracing +If Django's ORM is patched - either using the `AUTO_INSTRUMENT = True` in your settings file +or explicitly calling `patch_db()` - the SQL query trace streaming can then be enabled or +disabled updating the `STREAM_SQL` variable in your settings file. It is enabled by default. #### Automatic patching The automatic module patching can also be configured through Django settings. diff --git a/aws_xray_sdk/core/recorder.py b/aws_xray_sdk/core/recorder.py index 26e8c61d..4662d206 100644 --- a/aws_xray_sdk/core/recorder.py +++ b/aws_xray_sdk/core/recorder.py @@ -72,6 +72,7 @@ def __init__(self): self._dynamic_naming = None self._aws_metadata = copy.deepcopy(XRAY_META) self._origin = None + self._stream_sql = True if type(self.sampler).__name__ == 'DefaultSampler': self.sampler.load_settings(DaemonConfig(), self.context) @@ -81,7 +82,8 @@ def configure(self, sampling=None, plugins=None, daemon_address=None, service=None, context=None, emitter=None, streaming=None, dynamic_naming=None, streaming_threshold=None, - max_trace_back=None, sampler=None): + max_trace_back=None, sampler=None, + stream_sql=True): """Configure global X-Ray recorder. Configure needs to run before patching thrid party libraries @@ -130,6 +132,7 @@ class to have your own implementation of the streaming process. maximum number of subsegments within a segment. :param int max_trace_back: The maxinum number of stack traces recorded by auto-capture. Lower this if a single document becomes too large. + :param bool stream_sql: Whether SQL query texts should be streamed. Environment variables AWS_XRAY_DAEMON_ADDRESS, AWS_XRAY_CONTEXT_MISSING and AWS_XRAY_TRACING_NAME respectively overrides arguments @@ -159,6 +162,8 @@ class to have your own implementation of the streaming process. self.streaming_threshold = streaming_threshold if max_trace_back: self.max_trace_back = max_trace_back + if stream_sql is not None: + self.stream_sql = stream_sql if plugins: plugin_modules = get_plugin_modules(plugins) @@ -548,3 +553,11 @@ def max_trace_back(self): @max_trace_back.setter def max_trace_back(self, value): self._max_trace_back = value + + @property + def stream_sql(self): + return self._stream_sql + + @stream_sql.setter + def stream_sql(self, value): + self._stream_sql = value diff --git a/aws_xray_sdk/ext/django/apps.py b/aws_xray_sdk/ext/django/apps.py index 69101c9f..1abf72ee 100644 --- a/aws_xray_sdk/ext/django/apps.py +++ b/aws_xray_sdk/ext/django/apps.py @@ -36,6 +36,7 @@ def ready(self): dynamic_naming=settings.DYNAMIC_NAMING, streaming_threshold=settings.STREAMING_THRESHOLD, max_trace_back=settings.MAX_TRACE_BACK, + stream_sql=settings.STREAM_SQL, ) if settings.PATCH_MODULES: diff --git a/aws_xray_sdk/ext/django/conf.py b/aws_xray_sdk/ext/django/conf.py index bb8d3119..6313ab3c 100644 --- a/aws_xray_sdk/ext/django/conf.py +++ b/aws_xray_sdk/ext/django/conf.py @@ -14,6 +14,7 @@ 'DYNAMIC_NAMING': None, 'STREAMING_THRESHOLD': None, 'MAX_TRACE_BACK': None, + 'STREAM_SQL': True, 'PATCH_MODULES': [], 'AUTO_PATCH_PARENT_SEGMENT_NAME': None, 'IGNORE_MODULE_PATTERNS': [], diff --git a/aws_xray_sdk/ext/django/db.py b/aws_xray_sdk/ext/django/db.py index 0a2c80d6..fdf7e27a 100644 --- a/aws_xray_sdk/ext/django/db.py +++ b/aws_xray_sdk/ext/django/db.py @@ -1,29 +1,62 @@ +import copy import logging import importlib from django.db import connections +from aws_xray_sdk.core import xray_recorder from aws_xray_sdk.ext.dbapi2 import XRayTracedCursor log = logging.getLogger(__name__) def patch_db(): - for conn in connections.all(): module = importlib.import_module(conn.__module__) _patch_conn(getattr(module, conn.__class__.__name__)) -def _patch_conn(conn): - - attr = '_xray_original_cursor' +class DjangoXRayTracedCursor(XRayTracedCursor): + def execute(self, query, *args, **kwargs): + if xray_recorder.stream_sql: + _previous_meta = copy.copy(self._xray_meta) + self._xray_meta['sanitized_query'] = query + result = super(DjangoXRayTracedCursor, self).execute(query, *args, **kwargs) + if xray_recorder.stream_sql: + self._xray_meta = _previous_meta + return result + + def executemany(self, query, *args, **kwargs): + if xray_recorder.stream_sql: + _previous_meta = copy.copy(self._xray_meta) + self._xray_meta['sanitized_query'] = query + result = super(DjangoXRayTracedCursor, self).executemany(query, *args, **kwargs) + if xray_recorder.stream_sql: + self._xray_meta = _previous_meta + return result + + def callproc(self, proc, args): + if xray_recorder.stream_sql: + _previous_meta = copy.copy(self._xray_meta) + self._xray_meta['sanitized_query'] = proc + result = super(DjangoXRayTracedCursor, self).callproc(proc, args) + if xray_recorder.stream_sql: + self._xray_meta = _previous_meta + return result + + +def _patch_cursor(cursor_name, conn): + attr = '_xray_original_{}'.format(cursor_name) if hasattr(conn, attr): - log.debug('django built-in db already patched') + log.debug('django built-in db {} already patched'.format(cursor_name)) + return + + if not hasattr(conn, cursor_name): + log.debug('django built-in db does not have {}'.format(cursor_name)) return - setattr(conn, attr, conn.cursor) + setattr(conn, attr, getattr(conn, cursor_name)) meta = {} @@ -45,7 +78,12 @@ def cursor(self, *args, **kwargs): if user: meta['user'] = user - return XRayTracedCursor( - self._xray_original_cursor(*args, **kwargs), meta) + original_cursor = getattr(self, attr)(*args, **kwargs) + return DjangoXRayTracedCursor(original_cursor, meta) + + setattr(conn, cursor_name, cursor) - conn.cursor = cursor + +def _patch_conn(conn): + _patch_cursor('cursor', conn) + _patch_cursor('chunked_cursor', conn) diff --git a/aws_xray_sdk/ext/sqlalchemy/util/decorators.py b/aws_xray_sdk/ext/sqlalchemy/util/decorators.py index a32a891c..afad1280 100644 --- a/aws_xray_sdk/ext/sqlalchemy/util/decorators.py +++ b/aws_xray_sdk/ext/sqlalchemy/util/decorators.py @@ -47,7 +47,8 @@ def wrapper(*args, **kw): if isinstance(arg, XRayQuery): try: sql = parse_bind(arg.session.bind) - sql['sanitized_query'] = str(arg) + if xray_recorder.stream_sql: + sql['sanitized_query'] = str(arg) except Exception: sql = None if sql is not None: diff --git a/tests/ext/django/test_db.py b/tests/ext/django/test_db.py new file mode 100644 index 00000000..1c3e5439 --- /dev/null +++ b/tests/ext/django/test_db.py @@ -0,0 +1,87 @@ +import django + +import pytest + +from aws_xray_sdk.core import xray_recorder +from aws_xray_sdk.core.context import Context +from aws_xray_sdk.ext.django.db import patch_db + + +@pytest.fixture(scope='module', autouse=True) +def setup(): + django.setup() + xray_recorder.configure(context=Context(), + context_missing='LOG_ERROR') + patch_db() + + +@pytest.fixture(scope='module') +def user_class(setup): + from django.db import models + from django_fake_model import models as f + + class User(f.FakeModel): + name = models.CharField(max_length=255) + password = models.CharField(max_length=255) + + return User + + +@pytest.fixture( + autouse=True, + params=[ + False, + True, + ] +) +@pytest.mark.django_db +def func_setup(request, user_class): + xray_recorder.stream_sql = request.param + xray_recorder.clear_trace_entities() + xray_recorder.begin_segment('name') + try: + user_class.create_table() + yield + finally: + xray_recorder.clear_trace_entities() + try: + user_class.delete_table() + finally: + xray_recorder.end_segment() + + +def _assert_query(sql_meta): + if xray_recorder.stream_sql: + assert 'sanitized_query' in sql_meta + assert sql_meta['sanitized_query'] + assert sql_meta['sanitized_query'].startswith('SELECT') + else: + if 'sanitized_query' in sql_meta: + assert sql_meta['sanitized_query'] + # Django internally executes queries for table checks, ignore those + assert not sql_meta['sanitized_query'].startswith('SELECT') + + +def test_all(user_class): + """ Test calling all() on get all records. + Verify we run the query and return the SQL as metadata""" + # Materialising the query executes the SQL + list(user_class.objects.all()) + subsegment = xray_recorder.current_segment().subsegments[-1] + sql = subsegment.sql + assert sql['database_type'] == 'sqlite' + _assert_query(sql) + + +def test_filter(user_class): + """ Test calling filter() to get filtered records. + Verify we run the query and return the SQL as metadata""" + # Materialising the query executes the SQL + list(user_class.objects.filter(password='mypassword!').all()) + subsegment = xray_recorder.current_segment().subsegments[-1] + sql = subsegment.sql + assert sql['database_type'] == 'sqlite' + _assert_query(sql) + if xray_recorder.stream_sql: + assert 'mypassword!' not in sql['sanitized_query'] + assert '"password" = %s' in sql['sanitized_query'] diff --git a/tests/ext/flask_sqlalchemy/test_query.py b/tests/ext/flask_sqlalchemy/test_query.py index efb13aac..69f0bd7a 100644 --- a/tests/ext/flask_sqlalchemy/test_query.py +++ b/tests/ext/flask_sqlalchemy/test_query.py @@ -22,10 +22,15 @@ class User(db.Model): password = db.Column(db.String(255), nullable=False) -@pytest.fixture() -def session(): +@pytest.fixture( + params=[ + False, + True, + ], +) +def session(request): """Test Fixture to Create DataBase Tables and start a trace segment""" - xray_recorder.configure(service='test', sampling=False, context=Context()) + xray_recorder.configure(service='test', sampling=False, context=Context(), stream_sql=request.param) xray_recorder.clear_trace_entities() xray_recorder.begin_segment('SQLAlchemyTest') db.create_all() @@ -41,8 +46,8 @@ def test_all(capsys, session): User.query.all() subsegment = find_subsegment_by_annotation(xray_recorder.current_segment(), 'sqlalchemy', 'sqlalchemy.orm.query.all') assert subsegment['annotations']['sqlalchemy'] == 'sqlalchemy.orm.query.all' - assert subsegment['sql']['sanitized_query'] assert subsegment['sql']['url'] + assert bool(subsegment['sql'].get('sanitized_query', None)) is xray_recorder.stream_sql def test_add(capsys, session): diff --git a/tox.ini b/tox.ini index 1bd78b23..0dbbe0be 100644 --- a/tox.ini +++ b/tox.ini @@ -18,6 +18,7 @@ deps = future # the sdk doesn't support earlier version of django django >= 1.10, <2.0 + django-fake-model pynamodb >= 3.3.1 psycopg2 pg8000