Skip to content

[WIP] Starting with metrics and prometheus #1414

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proxy/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def _env_threadless_compliant() -> bool:
DEFAULT_ENABLE_SSH_TUNNEL = False
DEFAULT_ENABLE_DEVTOOLS = False
DEFAULT_ENABLE_EVENTS = False
DEFAULT_ENABLE_METRICS = False
DEFAULT_EVENTS_QUEUE = None
DEFAULT_ENABLE_STATIC_SERVER = False
DEFAULT_ENABLE_WEB_SERVER = False
Expand Down
9 changes: 8 additions & 1 deletion proxy/core/event/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .queue import EventQueue
from .dispatcher import EventDispatcher
from ...common.flag import flags
from ...common.constants import DEFAULT_ENABLE_EVENTS
from ...common.constants import DEFAULT_ENABLE_EVENTS, DEFAULT_ENABLE_METRICS


logger = logging.getLogger(__name__)
Expand All @@ -33,6 +33,13 @@
help='Default: False. Enables core to dispatch lifecycle events. '
'Plugins can be used to subscribe for core events.',
)
flags.add_argument(
'--enable-metrics',
action='store_true',
default=DEFAULT_ENABLE_METRICS,
help='Default: False. Enables core to dispatch metrics. '
'Plugins can be used to subscribe for metrics.',
)


class EventManager:
Expand Down
3 changes: 2 additions & 1 deletion proxy/core/event/names.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
('RESPONSE_HEADERS_COMPLETE', int),
('RESPONSE_CHUNK_RECEIVED', int),
('RESPONSE_COMPLETE', int),
('METRIC', int),
],
)
eventNames = EventNames(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
eventNames = EventNames(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
59 changes: 59 additions & 0 deletions proxy/http/metric_emisor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.

:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import logging
from typing import Set, Union, Optional

from proxy import metrics
from proxy.core.event import eventNames


logger = logging.getLogger(__name__)


class MetricEmisorMixin:
"""MetricEmisorMixin provides methods to publish metrics."""

def _can_emit_metrics(self):
if self.flags.enable_events and self.flags.enable_metrics:
assert self.event_queue
return True
logging.info('Metrics disabled')
return False


def emit_metric(self, metric: metrics.Metric) -> None:
if self._can_emit_metrics():
self.event_queue.publish(
request_id=self.uid,
event_name=eventNames.METRIC,
event_payload=metric,
publisher_id=self.__class__.__qualname__,
)

def emit_metric_counter(
self,
name: str,
increment: int | float=1,
description: Optional[str]=None,
tags: Optional[set[str]]=None,
) -> None:
if self._can_emit_metrics():
self.emit_metric(metrics.Counter(name, increment, description, tags))

def emit_metric_gauge(
self,
name: str,
value: Union[int, float],
description: Optional[str]=None,
tags: Optional[Set[str]]=None,
) -> None:
if self._can_emit_metrics():
self.emit_metric(metrics.Gauge(name, value, description, tags))
4 changes: 3 additions & 1 deletion proxy/http/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
import socket
import argparse
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, List, Union, Optional
from typing import TYPE_CHECKING, Set, List, Union, Optional

from .parser import HttpParser
from .connection import HttpClientConnection
from ..core.event import EventQueue
from .descriptors import DescriptorsHandlerMixin
from ..common.utils import tls_interception_enabled
from .metric_emisor import MetricEmisorMixin


if TYPE_CHECKING: # pragma: no cover
Expand All @@ -26,6 +27,7 @@

class HttpProtocolHandlerPlugin(
DescriptorsHandlerMixin,
MetricEmisorMixin,
ABC,
):
"""Base HttpProtocolHandler Plugin class.
Expand Down
7 changes: 6 additions & 1 deletion proxy/http/server/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@
from ..descriptors import DescriptorsHandlerMixin
from ...common.types import RePattern
from ...common.utils import bytes_
from ..metric_emisor import MetricEmisorMixin
from ...http.server.protocols import httpProtocolTypes


if TYPE_CHECKING: # pragma: no cover
from ...core.connection import TcpServerConnection, UpstreamConnectionPool


class HttpWebServerBasePlugin(DescriptorsHandlerMixin, ABC):
class HttpWebServerBasePlugin(
DescriptorsHandlerMixin,
MetricEmisorMixin,
ABC,
):
"""Web Server Plugin for routing of requests."""

def __init__(
Expand Down
49 changes: 49 additions & 0 deletions proxy/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.

:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
from typing import Set, Union, Optional
from datetime import datetime


class Metric:
def __init__(
self,
name: str,
description: Optional[str]=None,
tags:Set[str] = None,
):
self.timestamp = datetime.utcnow().timestamp()
self.name = name
self.description = description
self.tags = tags


class Counter(Metric):
def __init__(
self,
name:str,
increment: Union[int, float]=1,
description: Optional[str]=None,
tags:Set[str] = None,
):
super().__init__(name, description)
self.increment = increment


class Gauge(Metric):
def __init__(
self,
name:str,
value: Union[int, float]=1,
description: Optional[str]=None,
tags:Set[str] = None,
):
super().__init__(name, description)
self.value = value
73 changes: 73 additions & 0 deletions proxy/plugin/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.

:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.

.. spelling::

ws
onmessage
"""
import logging
from typing import Any, Dict, List, Tuple

from prometheus_client import Counter
from prometheus_client.registry import REGISTRY
from prometheus_client.exposition import generate_latest

from proxy import metrics
from proxy.core.event import EventSubscriber
from proxy.http.parser import HttpParser
from proxy.http.server import HttpWebServerBasePlugin, httpProtocolTypes
from proxy.http.responses import okResponse


logger = logging.getLogger(__name__)


class Prometheus(HttpWebServerBasePlugin):
"""
Expose metrics on prometheus format.
Requires to install prometheus client (`pip install prometheus-client`)
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.subscriber = EventSubscriber(
self.event_queue,
callback=self.process_metric_event,
)
self.subscriber.setup()
self.metrics: Dict[str, Any] = {}

def routes(self) -> List[Tuple[int, str]]:
return [
(httpProtocolTypes.HTTP, r'/metrics$'),
(httpProtocolTypes.HTTPS, r'/metrics$'),
]

def handle_request(self, request: HttpParser) -> None:
self.emit_metric_counter('prometheus_requests')
if request.path == b'/metrics':
self.client.queue(okResponse(generate_latest()))

def process_metric_event(self, event: Dict[str, Any]) -> None:
payload = event['event_payload']
if not isinstance(payload, metrics.Metric):
return
try:
logger.info(event)
if isinstance(payload, metrics.Counter):
name = f"counter_{payload.name}"

if name not in REGISTRY._names_to_collectors:
Counter(name, payload.description)
counter = REGISTRY._names_to_collectors.get(name)
counter.inc(payload.increment)

except:
logger.exception('Problems')
Loading