-
Notifications
You must be signed in to change notification settings - Fork 421
/
Copy pathlogger.py
1275 lines (1074 loc) · 47.9 KB
/
logger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Logger utility
!!! abstract "Usage Documentation"
[`Logger`](../../core/logger.md)
"""
from __future__ import annotations
import functools
import inspect
import logging
import os
import random
import sys
import warnings
from contextlib import contextmanager
from typing import IO, TYPE_CHECKING, Any, Callable, Generator, Iterable, Mapping, TypeVar, cast, overload
from aws_lambda_powertools.logging.buffer.cache import LoggerBufferCache
from aws_lambda_powertools.logging.buffer.functions import _check_minimum_buffer_log_level, _create_buffer_record
from aws_lambda_powertools.logging.constants import (
LOGGER_ATTRIBUTE_HANDLER,
LOGGER_ATTRIBUTE_POWERTOOLS_HANDLER,
LOGGER_ATTRIBUTE_PRECONFIGURED,
)
from aws_lambda_powertools.logging.exceptions import (
InvalidLoggerSamplingRateError,
OrphanedChildLoggerError,
)
from aws_lambda_powertools.logging.filters import SuppressFilter
from aws_lambda_powertools.logging.formatter import (
RESERVED_FORMATTER_CUSTOM_KEYS,
BasePowertoolsFormatter,
LambdaPowertoolsFormatter,
)
from aws_lambda_powertools.logging.lambda_context import build_lambda_context_model
from aws_lambda_powertools.shared import constants
from aws_lambda_powertools.shared.functions import (
extract_event_from_common_models,
get_tracer_id,
resolve_env_var_choice,
resolve_truthy_env_var_choice,
)
from aws_lambda_powertools.utilities import jmespath_utils
from aws_lambda_powertools.warnings import PowertoolsUserWarning
if TYPE_CHECKING:
from aws_lambda_powertools.logging.buffer.config import LoggerBufferConfig
from aws_lambda_powertools.shared.types import AnyCallableT
logger = logging.getLogger(__name__)
is_cold_start = True
PowertoolsFormatter = TypeVar("PowertoolsFormatter", bound=BasePowertoolsFormatter)
def _is_cold_start() -> bool:
"""Verifies whether is cold start
Returns
-------
bool
cold start bool value
"""
global is_cold_start
initialization_type = os.getenv(constants.LAMBDA_INITIALIZATION_TYPE)
# Check for Provisioned Concurrency environment
# AWS_LAMBDA_INITIALIZATION_TYPE is set when using Provisioned Concurrency
if initialization_type == "provisioned-concurrency":
is_cold_start = False
return False
if not is_cold_start:
return False
# This is a cold start - flip the flag and return True
is_cold_start = False
return True
class Logger:
"""Creates and setups a logger to format statements in JSON.
Includes service name and any additional key=value into logs
It also accepts both service name or level explicitly via env vars
Environment variables
---------------------
POWERTOOLS_SERVICE_NAME : str
service name
POWERTOOLS_LOG_LEVEL: str
logging level (e.g. INFO, DEBUG)
POWERTOOLS_LOGGER_SAMPLE_RATE: float
sampling rate ranging from 0 to 1, 1 being 100% sampling
Parameters
----------
service : str, optional
service name to be appended in logs, by default "service_undefined"
level : str, int optional
The level to set. Can be a string representing the level name: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
or an integer representing the level value: 10 for 'DEBUG', 20 for 'INFO', 30 for 'WARNING', 40 for 'ERROR', 50 for 'CRITICAL'.
by default "INFO"
child: bool, optional
create a child Logger named <service>.<caller_file_name>, False by default
sampling_rate: float, optional
sample rate for debug calls within execution context defaults to 0.0
stream: sys.stdout, optional
valid output for a logging stream, by default sys.stdout
logger_formatter: PowertoolsFormatter, optional
custom logging formatter that implements PowertoolsFormatter
logger_handler: logging.Handler, optional
custom logging handler e.g. logging.FileHandler("file.log")
log_uncaught_exceptions: bool, by default False
logs uncaught exception using sys.excepthook
buffer_config: LoggerBufferConfig, optional
logger buffer configuration
See: https://docs.python.org/3/library/sys.html#sys.excepthook
Parameters propagated to LambdaPowertoolsFormatter
--------------------------------------------------
datefmt: str, optional
String directives (strftime) to format log timestamp using `time`, by default it uses 2021-05-03 11:47:12,494+0200.
use_datetime_directive: bool, optional
Interpret `datefmt` as a format string for `datetime.datetime.strftime`, rather than
`time.strftime`.
See https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior . This
also supports a custom %F directive for milliseconds.
use_rfc3339: bool, optional
Whether to use a popular date format that complies with both RFC3339 and ISO8601.
e.g., 2022-10-27T16:27:43.738+02:00.
json_serializer : Callable, optional
function to serialize `obj` to a JSON formatted `str`, by default json.dumps
json_deserializer : Callable, optional
function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`,
by default json.loads
json_default : Callable, optional
function to coerce unserializable values, by default `str()`
Only used when no custom formatter is set
utc : bool, optional
set logging timestamp to UTC, by default False to continue to use local time as per stdlib
log_record_order : list, optional
set order of log keys when logging, by default ["level", "location", "message", "timestamp"]
Example
-------
**Setups structured logging in JSON for Lambda functions with explicit service name**
>>> from aws_lambda_powertools import Logger
>>> logger = Logger(service="payment")
>>>
>>> def handler(event, context):
logger.info("Hello")
**Setups structured logging in JSON for Lambda functions using env vars**
$ export POWERTOOLS_SERVICE_NAME="payment"
$ export POWERTOOLS_LOGGER_SAMPLE_RATE=0.01 # 1% debug sampling
>>> from aws_lambda_powertools import Logger
>>> logger = Logger()
>>>
>>> def handler(event, context):
logger.info("Hello")
**Append payment_id to previously setup logger**
>>> from aws_lambda_powertools import Logger
>>> logger = Logger(service="payment")
>>>
>>> def handler(event, context):
logger.append_keys(payment_id=event["payment_id"])
logger.info("Hello")
**Create child Logger using logging inheritance via child param**
>>> # app.py
>>> import another_file
>>> from aws_lambda_powertools import Logger
>>> logger = Logger(service="payment")
>>>
>>> # another_file.py
>>> from aws_lambda_powertools import Logger
>>> logger = Logger(service="payment", child=True)
**Logging in UTC timezone**
>>> # app.py
>>> import logging
>>> from aws_lambda_powertools import Logger
>>>
>>> logger = Logger(service="payment", utc=True)
**Brings message as the first key in log statements**
>>> # app.py
>>> import logging
>>> from aws_lambda_powertools import Logger
>>>
>>> logger = Logger(service="payment", log_record_order=["message"])
**Logging to a file instead of standard output for testing**
>>> # app.py
>>> import logging
>>> from aws_lambda_powertools import Logger
>>>
>>> logger = Logger(service="payment", logger_handler=logging.FileHandler("log.json"))
Raises
------
InvalidLoggerSamplingRateError
When sampling rate provided is not a float
""" # noqa: E501
def __init__(
self,
service: str | None = None,
level: str | int | None = None,
child: bool = False,
sampling_rate: float | None = None,
stream: IO[str] | None = None,
logger_formatter: PowertoolsFormatter | None = None,
logger_handler: logging.Handler | None = None,
log_uncaught_exceptions: bool = False,
json_serializer: Callable[[dict], str] | None = None,
json_deserializer: Callable[[dict | str | bool | int | float], str] | None = None,
json_default: Callable[[Any], Any] | None = None,
datefmt: str | None = None,
use_datetime_directive: bool = False,
log_record_order: list[str] | None = None,
utc: bool = False,
use_rfc3339: bool = False,
serialize_stacktrace: bool = True,
buffer_config: LoggerBufferConfig | None = None,
**kwargs,
) -> None:
# Used in case of sampling
self.initial_log_level = self._determine_log_level(level)
self.service = resolve_env_var_choice(
choice=service,
env=os.getenv(constants.SERVICE_NAME_ENV, "service_undefined"),
)
self.sampling_rate = resolve_env_var_choice(
choice=sampling_rate,
env=os.getenv(constants.LOGGER_LOG_SAMPLING_RATE),
)
self._default_log_keys: dict[str, Any] = {"service": self.service, "sampling_rate": self.sampling_rate}
self.child = child
self.logger_formatter = logger_formatter
self._stream = stream or sys.stdout
self.log_uncaught_exceptions = log_uncaught_exceptions
self._is_deduplication_disabled = resolve_truthy_env_var_choice(
env=os.getenv(constants.LOGGER_LOG_DEDUPLICATION_ENV, "false"),
)
self._logger = self._get_logger()
self.logger_handler = logger_handler or self._get_handler()
# NOTE: This is primarily to improve UX, so IDEs can autocomplete LambdaPowertoolsFormatter options
# previously, we masked all of them as kwargs thus limiting feature discovery
formatter_options = {
"json_serializer": json_serializer,
"json_deserializer": json_deserializer,
"json_default": json_default,
"datefmt": datefmt,
"use_datetime_directive": use_datetime_directive,
"log_record_order": log_record_order,
"utc": utc,
"use_rfc3339": use_rfc3339,
"serialize_stacktrace": serialize_stacktrace,
}
self._buffer_config = buffer_config
if self._buffer_config:
self._buffer_cache = LoggerBufferCache(max_size_bytes=self._buffer_config.max_bytes)
self._init_logger(
formatter_options=formatter_options,
log_level=level,
buffer_config=self._buffer_config,
buffer_cache=getattr(self, "_buffer_cache", None),
**kwargs,
)
if self.log_uncaught_exceptions:
logger.debug("Replacing exception hook")
sys.excepthook = functools.partial(log_uncaught_exception_hook, logger=self)
# Prevent __getattr__ from shielding unknown attribute errors in type checkers
# https://github.com/aws-powertools/powertools-lambda-python/issues/1660
if not TYPE_CHECKING: # pragma: no cover
def __getattr__(self, name):
# Proxy attributes not found to actual logger to support backward compatibility
# https://github.com/aws-powertools/powertools-lambda-python/issues/97
return getattr(self._logger, name)
def _get_logger(self) -> logging.Logger:
"""Returns a Logger named {self.service}, or {self.service.filename} for child loggers"""
logger_name = self.service
if self.child:
logger_name = f"{self.service}.{_get_caller_filename()}"
return logging.getLogger(logger_name)
def _get_handler(self) -> logging.Handler:
# is a logger handler already configured?
if getattr(self, LOGGER_ATTRIBUTE_HANDLER, None):
return self.logger_handler
# Detect Powertools logger by checking for unique handler
# Retrieve the first handler if it's a Powertools instance
if getattr(self._logger, "powertools_handler", None):
return self._logger.handlers[0]
# for children, use parent's handler
if self.child:
return getattr(self._logger.parent, LOGGER_ATTRIBUTE_POWERTOOLS_HANDLER, None) # type: ignore[return-value] # always checked in formatting
# otherwise, create a new stream handler (first time init)
return logging.StreamHandler(self._stream)
def _init_logger(
self,
formatter_options: dict | None = None,
log_level: str | int | None = None,
buffer_config: LoggerBufferConfig | None = None,
buffer_cache: LoggerBufferCache | None = None,
**kwargs,
) -> None:
"""Configures new logger"""
# Skip configuration if it's a child logger or a pre-configured logger
# to prevent the following:
# a) multiple handlers being attached
# b) different sampling mechanisms
# c) multiple messages from being logged as handlers can be duplicated
is_logger_preconfigured = getattr(self._logger, LOGGER_ATTRIBUTE_PRECONFIGURED, False)
if self.child:
self.setLevel(log_level)
if getattr(self._logger.parent, "powertools_buffer_config", None):
# Initializes a new, empty LoggerBufferCache for child logger
# Preserves parent's buffer configuration while resetting cache contents
self._buffer_config = self._logger.parent.powertools_buffer_config # type: ignore[union-attr]
self._buffer_cache = LoggerBufferCache(self._logger.parent.powertools_buffer_config.max_bytes) # type: ignore[union-attr]
return
if is_logger_preconfigured:
# Reuse existing buffer configuration from a previously configured logger
# Ensures consistent buffer settings across logger instances within the same service
# Enables buffer propagation and maintains a unified logging configuration
self._buffer_config = self._logger.powertools_buffer_config # type: ignore[attr-defined]
self._buffer_cache = self._logger.powertools_buffer_cache # type: ignore[attr-defined]
return
self.setLevel(log_level)
self._configure_sampling()
self.addHandler(self.logger_handler)
self.structure_logs(formatter_options=formatter_options, **kwargs)
# Pytest Live Log feature duplicates log records for colored output
# but we explicitly add a filter for log deduplication.
# This flag disables this protection when you explicit want logs to be duplicated (#262)
if not self._is_deduplication_disabled:
logger.debug("Adding filter in root logger to suppress child logger records to bubble up")
for handler in logging.root.handlers:
# It'll add a filter to suppress any child logger from self.service
# Example: `Logger(service="order")`, where service is Order
# It'll reject all loggers starting with `order` e.g. order.checkout, order.shared
handler.addFilter(SuppressFilter(self.service))
# as per bug in #249, we should not be pre-configuring an existing logger
# therefore we set a custom attribute in the Logger that will be returned
# std logging will return the same Logger with our attribute if name is reused
logger.debug(f"Marking logger {self.service} as preconfigured")
self._logger.init = True # type: ignore[attr-defined]
self._logger.powertools_handler = self.logger_handler # type: ignore[attr-defined]
self._logger.powertools_buffer_config = buffer_config # type: ignore[attr-defined]
self._logger.powertools_buffer_cache = buffer_cache # type: ignore[attr-defined]
def refresh_sample_rate_calculation(self) -> None:
"""
Refreshes the sample rate calculation by reconfiguring logging settings.
Returns
-------
None
"""
self._logger.setLevel(self.initial_log_level)
self._configure_sampling()
def _configure_sampling(self) -> None:
"""Dynamically set log level based on sampling rate
Raises
------
InvalidLoggerSamplingRateError
When sampling rate provided is not a float
"""
if not self.sampling_rate:
return
try:
# This is not testing < 0 or > 1 conditions
# Because I don't need other if condition here
if random.random() <= float(self.sampling_rate):
self._logger.setLevel(logging.DEBUG)
logger.debug("Setting log level to DEBUG due to sampling rate")
except ValueError:
raise InvalidLoggerSamplingRateError(
(
f"Expected a float value ranging 0 to 1, but received {self.sampling_rate} instead."
"Please review POWERTOOLS_LOGGER_SAMPLE_RATE environment variable or `sampling_rate` parameter."
),
)
@overload
def inject_lambda_context(
self,
lambda_handler: AnyCallableT,
log_event: bool | None = None,
correlation_id_path: str | None = None,
clear_state: bool | None = False,
flush_buffer_on_uncaught_error: bool = False,
) -> AnyCallableT: ...
@overload
def inject_lambda_context(
self,
lambda_handler: None = None,
log_event: bool | None = None,
correlation_id_path: str | None = None,
clear_state: bool | None = False,
flush_buffer_on_uncaught_error: bool = False,
) -> Callable[[AnyCallableT], AnyCallableT]: ...
def inject_lambda_context(
self,
lambda_handler: AnyCallableT | None = None,
log_event: bool | None = None,
correlation_id_path: str | None = None,
clear_state: bool | None = False,
flush_buffer_on_uncaught_error: bool = False,
) -> Any:
"""Decorator to capture Lambda contextual info and inject into logger
Parameters
----------
clear_state : bool, optional
Instructs logger to remove any custom keys previously added
lambda_handler : Callable
Method to inject the lambda context
log_event : bool, optional
Instructs logger to log Lambda Event, by default False
correlation_id_path: str, optional
Optional JMESPath for the correlation_id
Environment variables
---------------------
POWERTOOLS_LOGGER_LOG_EVENT : str
instruct logger to log Lambda Event (e.g. `"true", "True", "TRUE"`)
Example
-------
**Captures Lambda contextual runtime info (e.g memory, arn, req_id)**
from aws_lambda_powertools import Logger
logger = Logger(service="payment")
@logger.inject_lambda_context
def handler(event, context):
logger.info("Hello")
**Captures Lambda contextual runtime info and logs incoming request**
from aws_lambda_powertools import Logger
logger = Logger(service="payment")
@logger.inject_lambda_context(log_event=True)
def handler(event, context):
logger.info("Hello")
Returns
-------
decorate : Callable
Decorated lambda handler
"""
# If handler is None we've been called with parameters
# Return a partial function with args filled
if lambda_handler is None:
logger.debug("Decorator called with parameters")
return functools.partial(
self.inject_lambda_context,
log_event=log_event,
correlation_id_path=correlation_id_path,
clear_state=clear_state,
flush_buffer_on_uncaught_error=flush_buffer_on_uncaught_error,
)
log_event = resolve_truthy_env_var_choice(
env=os.getenv(constants.LOGGER_LOG_EVENT_ENV, "false"),
choice=log_event,
)
@functools.wraps(lambda_handler)
def decorate(event, context, *args, **kwargs):
lambda_context = build_lambda_context_model(context)
cold_start = _is_cold_start()
if clear_state:
self.structure_logs(cold_start=cold_start, **lambda_context.__dict__)
else:
self.append_keys(cold_start=cold_start, **lambda_context.__dict__)
if correlation_id_path:
self.set_correlation_id(
jmespath_utils.query(envelope=correlation_id_path, data=event),
)
if log_event:
logger.debug("Event received")
self.info(extract_event_from_common_models(event))
# Sampling rate is defined, and this is not ColdStart
# then we need to recalculate the sampling
# See: https://github.com/aws-powertools/powertools-lambda-python/issues/6141
if self.sampling_rate and not cold_start:
self.refresh_sample_rate_calculation()
try:
# Execute the Lambda handler with provided event and context
return lambda_handler(event, context, *args, **kwargs)
except:
# Flush the log buffer if configured to do so on uncaught errors
# Ensures logging state is cleaned up even if an exception is raised
if flush_buffer_on_uncaught_error:
logger.debug("Uncaught error detected, flushing log buffer before exit")
self.flush_buffer()
# Re-raise any exceptions that occur during handler execution
raise
finally:
# Clear the cache after invocation is complete
if self._buffer_config:
self._buffer_cache.clear()
return decorate
def debug(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 2,
extra: Mapping[str, object] | None = None,
**kwargs: object,
) -> None:
extra = extra or {}
extra = {**extra, **kwargs}
# Logging workflow for logging.debug:
# 1. Buffer is completely disabled - log right away
# 2. DEBUG is the maximum level of buffer, so, can't bypass if enabled
# 3. Store in buffer for potential later processing
# MAINTAINABILITY_DECISION:
# Keeping this implementation to avoid complex code handling.
# Also for clarity over complexity
# Buffer is not active and we need to log immediately
if not self._buffer_config:
return self._logger.debug(
msg,
*args,
exc_info=exc_info,
stack_info=stack_info,
stacklevel=stacklevel,
extra=extra,
)
# Store record in the buffer
self._add_log_record_to_buffer(
level=logging.DEBUG,
msg=msg,
args=args,
exc_info=exc_info,
stack_info=stack_info,
extra=extra,
)
def info(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 2,
extra: Mapping[str, object] | None = None,
**kwargs: object,
) -> None:
extra = extra or {}
extra = {**extra, **kwargs}
# Logging workflow for logging.info:
# 1. Buffer is completely disabled - log right away
# 2. Log severity exceeds buffer's minimum threshold - bypass buffering
# 3. If neither condition met, store in buffer for potential later processing
# MAINTAINABILITY_DECISION:
# Keeping this implementation to avoid complex code handling.
# Also for clarity over complexity
# Buffer is not active and we need to log immediately
if not self._buffer_config:
return self._logger.info(
msg,
*args,
exc_info=exc_info,
stack_info=stack_info,
stacklevel=stacklevel,
extra=extra,
)
# Bypass buffer when log severity meets or exceeds configured minimum
if _check_minimum_buffer_log_level(self._buffer_config.buffer_at_verbosity, "INFO"):
return self._logger.info(
msg,
*args,
exc_info=exc_info,
stack_info=stack_info,
stacklevel=stacklevel,
extra=extra,
)
# Store record in the buffer
self._add_log_record_to_buffer(
level=logging.INFO,
msg=msg,
args=args,
exc_info=exc_info,
stack_info=stack_info,
extra=extra,
)
def warning(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 2,
extra: Mapping[str, object] | None = None,
**kwargs: object,
) -> None:
extra = extra or {}
extra = {**extra, **kwargs}
# Logging workflow for logging.warning:
# 1. Buffer is completely disabled - log right away
# 2. Log severity exceeds buffer's minimum threshold - bypass buffering
# 3. If neither condition met, store in buffer for potential later processing
# MAINTAINABILITY_DECISION:
# Keeping this implementation to avoid complex code handling.
# Also for clarity over complexity
# Buffer is not active and we need to log immediately
if not self._buffer_config:
return self._logger.warning(
msg,
*args,
exc_info=exc_info,
stack_info=stack_info,
stacklevel=stacklevel,
extra=extra,
)
# Bypass buffer when log severity meets or exceeds configured minimum
if _check_minimum_buffer_log_level(self._buffer_config.buffer_at_verbosity, "WARNING"):
return self._logger.warning(
msg,
*args,
exc_info=exc_info,
stack_info=stack_info,
stacklevel=stacklevel,
extra=extra,
)
# Store record in the buffer
self._add_log_record_to_buffer(
level=logging.WARNING,
msg=msg,
args=args,
exc_info=exc_info,
stack_info=stack_info,
extra=extra,
)
def error(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 2,
extra: Mapping[str, object] | None = None,
**kwargs: object,
) -> None:
extra = extra or {}
extra = {**extra, **kwargs}
# Workflow: Error Logging with automatic buffer flushing
# 1. Buffer configuration checked for immediate flush
# 2. If auto-flush enabled, trigger complete buffer processing
# 3. Error log is not "bufferable", so ensure error log is immediately available
if self._buffer_config and self._buffer_config.flush_on_error_log:
self.flush_buffer()
return self._logger.error(
msg,
*args,
exc_info=exc_info,
stack_info=stack_info,
stacklevel=stacklevel,
extra=extra,
)
def critical(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = None,
stack_info: bool = False,
stacklevel: int = 2,
extra: Mapping[str, object] | None = None,
**kwargs: object,
) -> None:
extra = extra or {}
extra = {**extra, **kwargs}
# Workflow: Error Logging with automatic buffer flushing
# 1. Buffer configuration checked for immediate flush
# 2. If auto-flush enabled, trigger complete buffer processing
# 3. Critical log is not "bufferable", so ensure error log is immediately available
if self._buffer_config and self._buffer_config.flush_on_error_log:
self.flush_buffer()
return self._logger.critical(
msg,
*args,
exc_info=exc_info,
stack_info=stack_info,
stacklevel=stacklevel,
extra=extra,
)
def exception(
self,
msg: object,
*args: object,
exc_info: logging._ExcInfoType = True,
stack_info: bool = False,
stacklevel: int = 2,
extra: Mapping[str, object] | None = None,
**kwargs: object,
) -> None:
extra = extra or {}
extra = {**extra, **kwargs}
# Workflow: Error Logging with automatic buffer flushing
# 1. Buffer configuration checked for immediate flush
# 2. If auto-flush enabled, trigger complete buffer processing
# 3. Exception log is not "bufferable", so ensure error log is immediately available
if self._buffer_config and self._buffer_config.flush_on_error_log:
self.flush_buffer()
return self._logger.exception(
msg,
*args,
exc_info=exc_info,
stack_info=stack_info,
stacklevel=stacklevel,
extra=extra,
)
def append_keys(self, **additional_keys: object) -> None:
self.registered_formatter.append_keys(**additional_keys)
def get_current_keys(self) -> dict[str, Any]:
return self.registered_formatter.get_current_keys()
def remove_keys(self, keys: Iterable[str]) -> None:
self.registered_formatter.remove_keys(keys)
@contextmanager
def append_context_keys(self, **additional_keys: Any) -> Generator[None, None, None]:
"""
Context manager to temporarily add logging keys.
Parameters
-----------
**additional_keys: Any
Key-value pairs to include in the log context during the lifespan of the context manager.
Example
--------
**Logging with contextual keys**
logger = Logger(service="example_service")
with logger.append_context_keys(user_id="123", operation="process"):
logger.info("Log with context")
logger.info("Log without context")
"""
with self.registered_formatter.append_context_keys(**additional_keys):
yield
def clear_state(self) -> None:
"""Removes all custom keys that were appended to the Logger."""
# Clear all custom keys from the formatter
self.registered_formatter.clear_state()
# Reset to default keys
self.structure_logs(**self._default_log_keys)
# These specific thread-safe methods are necessary to manage shared context in concurrent environments.
# They prevent race conditions and ensure data consistency across multiple threads.
def thread_safe_append_keys(self, **additional_keys: object) -> None:
# Append additional key-value pairs to the context safely in a thread-safe manner.
self.registered_formatter.thread_safe_append_keys(**additional_keys)
def thread_safe_get_current_keys(self) -> dict[str, Any]:
# Retrieve the current context keys safely in a thread-safe manner.
return self.registered_formatter.thread_safe_get_current_keys()
def thread_safe_remove_keys(self, keys: Iterable[str]) -> None:
# Remove specified keys from the context safely in a thread-safe manner.
self.registered_formatter.thread_safe_remove_keys(keys)
def thread_safe_clear_keys(self) -> None:
# Clear all keys from the context safely in a thread-safe manner.
self.registered_formatter.thread_safe_clear_keys()
def structure_logs(self, append: bool = False, formatter_options: dict | None = None, **keys) -> None:
"""Sets logging formatting to JSON.
Optionally, it can append keyword arguments
to an existing logger, so it is available across future log statements.
Last keyword argument and value wins if duplicated.
Parameters
----------
append : bool, optional
append keys provided to logger formatter, by default False
formatter_options : dict, optional
LambdaPowertoolsFormatter options to be propagated, by default {}
"""
formatter_options = formatter_options or {}
# There are 3 operational modes for this method
## 1. Register a Powertools for AWS Lambda (Python) Formatter for the first time
## 2. Append new keys to the current logger formatter; deprecated in favour of append_keys
## 3. Add new keys and discard existing to the registered formatter
# Mode 1
log_keys = {**self._default_log_keys, **keys}
is_logger_preconfigured = getattr(self._logger, LOGGER_ATTRIBUTE_PRECONFIGURED, False)
if not is_logger_preconfigured:
formatter = self.logger_formatter or LambdaPowertoolsFormatter(**formatter_options, **log_keys)
self.registered_handler.setFormatter(formatter)
# when using a custom Powertools for AWS Lambda (Python) Formatter
# standard and custom keys that are not Powertools for AWS Lambda (Python) Formatter parameters
# should be appended and custom keys that might happen to be Powertools for AWS Lambda (Python)
# Formatter parameters should be discarded this prevents adding them as custom keys, for example,
# `json_default=<callable>` see https://github.com/aws-powertools/powertools-lambda-python/issues/1263
custom_keys = {k: v for k, v in log_keys.items() if k not in RESERVED_FORMATTER_CUSTOM_KEYS}
return self.registered_formatter.append_keys(**custom_keys)
# Mode 2 (legacy)
if append:
# Maintenance: Add deprecation warning for major version
return self.append_keys(**keys)
# Mode 3
self.registered_formatter.clear_state()
self.registered_formatter.thread_safe_clear_keys()
self.registered_formatter.append_keys(**log_keys)
def set_correlation_id(self, value: str | None) -> None:
"""Sets the correlation_id in the logging json
Parameters
----------
value : str, optional
Value for the correlation id. None will remove the correlation_id
"""
self.append_keys(correlation_id=value)
def get_correlation_id(self) -> str | None:
"""Gets the correlation_id in the logging json
Returns
-------
str, optional
Value for the correlation id
"""
if isinstance(self.registered_formatter, LambdaPowertoolsFormatter):
return self.registered_formatter.log_format.get("correlation_id")
return None
def setLevel(self, level: str | int | None) -> None:
return self._logger.setLevel(self._determine_log_level(level))
def addHandler(self, handler: logging.Handler) -> None:
return self._logger.addHandler(handler)
def addFilter(self, filter: logging._FilterType) -> None: # noqa: A002 # filter built-in usage
return self._logger.addFilter(filter)
def removeFilter(self, filter: logging._FilterType) -> None: # noqa: A002 # filter built-in usage
return self._logger.removeFilter(filter)
@property
def registered_handler(self) -> logging.Handler:
"""Convenience property to access the first logger handler"""
# We ignore mypy here because self.child encodes whether or not self._logger.parent is
# None, mypy can't see this from context but we can
return self._get_handler()
@property
def registered_formatter(self) -> BasePowertoolsFormatter:
"""Convenience property to access the first logger formatter"""
handler = self.registered_handler
if handler is None:
raise OrphanedChildLoggerError(
"Orphan child loggers cannot append nor remove keys until a parent is initialized first. "
"To solve this issue, you can A) make sure a parent logger is initialized first, or B) move append/remove keys operations to a later stage." # noqa: E501
"Reference: https://docs.powertools.aws.dev/lambda/python/latest/core/logger/#reusing-logger-across-your-code",
)
return cast(BasePowertoolsFormatter, handler.formatter)
@property
def log_level(self) -> int:
return self._logger.level
@property
def name(self) -> str:
return self._logger.name
@property
def handlers(self) -> list[logging.Handler]:
"""List of registered logging handlers
Notes
-----
Looking for the first configured handler? Use registered_handler property instead.
"""
return self._logger.handlers
def _get_aws_lambda_log_level(self) -> str | None:
"""
Retrieve the log level for AWS Lambda from the Advanced Logging Controls feature.
Returns:
str | None: The corresponding logging level.
"""
return constants.LAMBDA_ADVANCED_LOGGING_LEVELS.get(os.getenv(constants.LAMBDA_LOG_LEVEL_ENV))
def _get_powertools_log_level(self, level: str | int | None) -> str | None:
"""Retrieve the log level for Powertools from the environment variable or level parameter.
If log level is an integer, we convert to its respective string level `logging.getLevelName()`.
If no log level is provided, we check env vars for the log level: POWERTOOLS_LOG_LEVEL_ENV and POWERTOOLS_LOG_LEVEL_LEGACY_ENV.
Parameters:
-----------
level : str | int | None
The specified log level as a string, integer, or None.
Environment variables
---------------------
POWERTOOLS_LOG_LEVEL : str
log level (e.g: INFO, DEBUG, WARNING, ERROR, CRITICAL)
LOG_LEVEL (Legacy) : str
log level (e.g: INFO, DEBUG, WARNING, ERROR, CRITICAL)
Returns: