Skip to content

Commit 2824e06

Browse files
ZsailerSteve Silvester
authored and
GitHub Enterprise
committed
Adding Hubble metric collection (jupyter-server#190)
* add inital hubble port from @jialin-zhang4 * refactor hubble module * add initial tests * add one more test * some more updates * add client * more refactoring * don't validate cert in tests * capture other events * cleaner matcher * Update data_studio_jupyter_extensions/configurables/hubble.py * Update data_studio_jupyter_extensions/configurables/hubble.py * Update data_studio_jupyter_extensions/configurables/hubble.py Co-authored-by: Steve Silvester <[email protected]>
1 parent 809fdca commit 2824e06

File tree

14 files changed

+643
-43
lines changed

14 files changed

+643
-43
lines changed

conftest.py

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from jupyter_server.serverapp import ServerApp
33

44
from data_studio_jupyter_extensions import constants
5+
from data_studio_jupyter_extensions.configurables.hubble import HubbleAgentConfigurable
56
from data_studio_jupyter_extensions.configurables.notebook_service import (
67
NotebookServiceClient,
78
)
@@ -60,6 +61,7 @@ def clear_singletons():
6061
ServerApp.clear_instance()
6162
NotebookServiceClient.clear_instance()
6263
TelemetryBus.clear_instance()
64+
HubbleAgentConfigurable.clear_instance()
6365

6466

6567
@pytest.fixture

data_studio_jupyter_extensions/app.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
from . import constants
1616
from .auth.authenticator import JWTAuthenticator
17+
from .configurables.hubble import HubbleAgentConfigurable
18+
from .configurables.hubble import HubbleHandler
1719
from .configurables.notebook_service import NotebookServiceClient
1820
from .traits import IntFromEnv
1921
from .traits import UnicodeFromEnv
@@ -62,7 +64,7 @@ class DataStudioJupyterExtensions(ExtensionAppJinjaMixin, ExtensionApp):
6264

6365
# Add configurables to the list of classes displayed
6466
# in the --help.
65-
classes = [JWTAuthenticator]
67+
classes = [JWTAuthenticator, HubbleAgentConfigurable]
6668

6769
aliases = aliases
6870

@@ -227,6 +229,12 @@ def initialize_settings(self):
227229

228230
def initialize_configurables(self):
229231
self.authenticator = JWTAuthenticator(parent=self, log=self.log)
232+
# Add a Hubble agent to collect Hubble metrics.
233+
self.hubble_agent = HubbleAgentConfigurable.instance(parent=self, log=self.log)
234+
hubble_handler = HubbleHandler()
235+
hubble_handler.setLevel(level=logging.DEBUG)
236+
self.serverapp.log.addHandler(hubble_handler)
237+
# Instantiate a Notebook Service Client.
230238
self.nbservice_client = self.nbservice_client_class.instance(
231239
parent=self,
232240
log=self.log,

data_studio_jupyter_extensions/auth/authenticator.py

+21-4
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
import jwcrypto.jws as jws
22
import jwcrypto.jwt as jwt
3-
import requests
43
from jupyter_core.paths import jupyter_runtime_dir
4+
from jupyter_server.utils import run_sync
55
from jwcrypto.common import json_decode
66
from jwcrypto.jwk import JWK
7+
from tornado.httpclient import AsyncHTTPClient
8+
from tornado.httpclient import HTTPRequest
79
from traitlets import default
810
from traitlets import List
911
from traitlets.config import LoggingConfigurable
1012

13+
from data_studio_jupyter_extensions.configurables.hubble import hubble
1114
from data_studio_jupyter_extensions.traits import UnicodeFromEnv
1215
from data_studio_jupyter_extensions.utils import get_ssl_cert
1316

@@ -51,6 +54,19 @@ def _default_ssl_cert_file(self): # pragma: no cover
5154
def _expected_payload(self):
5255
return {"client_id": self.client_id}
5356

57+
@hubble("jwt_public_key")
58+
async def fetch_public_keys(self):
59+
validate_cert = False
60+
if self.ssl_cert_file:
61+
validate_cert = True
62+
request = HTTPRequest(
63+
url=self.key_url,
64+
method="GET",
65+
ca_certs=self.ssl_cert_file,
66+
validate_cert=validate_cert,
67+
)
68+
return await AsyncHTTPClient().fetch(request)
69+
5470
def get_public_keys(self):
5571
"""Fetch public keys for authentication of the JWT.
5672
If public keys are set by configuration (likely only used
@@ -59,9 +75,10 @@ def get_public_keys(self):
5975
"""
6076
if not self.public_keys:
6177
public_keys = []
62-
r = requests.get(self.key_url, verify=self.ssl_cert_file)
63-
if r.status_code == 200:
64-
keys = r.json().pop("keys", [])
78+
r = run_sync(self.fetch_public_keys())
79+
response = json_decode(r.body)
80+
if r.code == 200:
81+
keys = response.pop("keys", [])
6582
for key in keys:
6683
if key["alg"] == "RS256":
6784
public_keys.append(JWK(**key))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
import re
2+
import time
3+
from collections import namedtuple
4+
from logging import Handler
5+
from logging import LogRecord
6+
from typing import Union
7+
from urllib.parse import urljoin
8+
from urllib.parse import urlparse
9+
10+
from jupyter_server.base.handlers import path_regex
11+
from jupyter_server.serverapp import ServerApp
12+
from jupyter_server.services.config.handlers import section_name_regex
13+
from jupyter_server.services.contents.handlers import _checkpoint_id_regex
14+
from jupyter_server.services.kernels.handlers import _kernel_id_regex
15+
from jupyter_server.services.sessions.handlers import _session_id_regex
16+
from pyoneer.hubble import HubbleAgent
17+
from traitlets import Int
18+
from traitlets import Unicode
19+
from traitlets.config.configurable import SingletonConfigurable
20+
21+
from data_studio_jupyter_extensions.traits import BoolFromEnv
22+
from data_studio_jupyter_extensions.traits import UnicodeFromEnv
23+
24+
25+
kernelspec_id_regex = r"(?P<kernelspec_id>[\w\.\-%]+)"
26+
27+
28+
HubbleMetric = namedtuple("HubbleMetric", ["name", "status", "http_method", "timer"])
29+
30+
31+
def get_metric_name_from_endpoint(endpoint) -> str:
32+
"""Returns a Hubble metric name from an endpoint"""
33+
34+
class EndpointMap:
35+
"""Helper class for matching URL patterns against a URL regex."""
36+
37+
def __init__(self, regex: str, metric_name: str = None):
38+
self.regex = regex
39+
self.metric_name = metric_name
40+
41+
def match(self, url: str) -> Union[str, None]:
42+
# Remove parameters before regex matching.
43+
url = urljoin(url, urlparse(url).path)
44+
# Search for the endpoint
45+
match = re.search(self.regex, url)
46+
# If a match, return the metric name. If no metric_name is
47+
# given, just return the endpoint.
48+
if match:
49+
if not self.metric_name:
50+
return url
51+
return self.metric_name
52+
53+
# Note that the order of these endpoint maps
54+
# MATTERS. Do not rearrange these without thorough
55+
# testing.
56+
metric_name_mapping = (
57+
EndpointMap(
58+
r"/login",
59+
),
60+
EndpointMap(
61+
r"/api/config/%s" % section_name_regex,
62+
),
63+
EndpointMap(
64+
r"/api/spec.yaml",
65+
),
66+
EndpointMap(
67+
r"/api/status",
68+
),
69+
EndpointMap(
70+
r"/api/contents%s/checkpoints/%s" % (path_regex, _checkpoint_id_regex),
71+
r"/api/contents/*/checkpoints/*",
72+
),
73+
EndpointMap(
74+
r"/api/contents%s/checkpoints" % path_regex, r"/api/contents/*/checkpoints"
75+
),
76+
EndpointMap(
77+
r"/api/contents%s/trust" % path_regex,
78+
r"/api/contents/*/trust",
79+
),
80+
EndpointMap(r"/api/contents%s" % path_regex, r"/api/contents/*"),
81+
EndpointMap(
82+
r"/api/kernelspecs/%s" % kernelspec_id_regex, r"/api/kernelspecs/*"
83+
),
84+
EndpointMap(
85+
r"/api/kernelspecs",
86+
),
87+
EndpointMap(
88+
r"/api/kernels/%s/channels" % _kernel_id_regex, r"/api/kernels/*/channels"
89+
),
90+
EndpointMap(
91+
r"/api/kernels/%s/interrupt" % _kernel_id_regex, r"/api/kernels/*/interrupt"
92+
),
93+
EndpointMap(
94+
r"/api/kernels/%s/restart" % _kernel_id_regex, r"/api/kernels/*/restart"
95+
),
96+
EndpointMap(r"/api/kernels/%s" % _kernel_id_regex, r"/api/kernels/*"),
97+
EndpointMap(
98+
r"/api/kernels",
99+
),
100+
EndpointMap(r"/api/sessions/%s" % _session_id_regex, r"/api/sessions/*"),
101+
EndpointMap(
102+
r"/api/sessions",
103+
),
104+
EndpointMap(r"/notebooks%s" % path_regex, r"/notebooks"),
105+
)
106+
# Loop through endpoints and find the metric name.
107+
for mapping in metric_name_mapping:
108+
metric_name = mapping.match(endpoint)
109+
if metric_name:
110+
return metric_name
111+
return "unknown"
112+
113+
114+
def _match_tornado_pattern(
115+
record: LogRecord,
116+
) -> Union[HubbleMetric, None]: # pragma: no cover
117+
"""Match requests coming in from the current running Jupyter Tornado Server."""
118+
# Get the base url from the current running application.
119+
string = record.msg
120+
# If the log message is a formatted string, apply the formatting.
121+
if string == "%d %s %.2fms":
122+
string = string % record.args
123+
124+
base_url = "/"
125+
if ServerApp._instance:
126+
base_url = ServerApp.instance().base_url
127+
128+
tornado_regex = (
129+
r"(?P<status>[1-5][0-9][0-9])" # HTTP Status number
130+
r" " # Space
131+
r"(?P<http_method>GET|POST|PUT|DELETE)" # HTTP Methods to capture
132+
r" " # Space
133+
rf"{base_url}" # Start with the base URL prefix
134+
r"(?P<endpoint>[\w+-\-\.%\/].*)" # get endpoint
135+
r" " # Space
136+
r"(?P<host>[\(]?[0-9]+(.[0-9])+[\)]?)"
137+
r" "
138+
r"(?P<timer>[0-9]+(.[0-9]+)+)ms"
139+
)
140+
match = re.search(tornado_regex, string)
141+
if not match:
142+
return
143+
parts = match.groupdict()
144+
# The endpoint loses the leading slash. Annoyingly,
145+
# we put it back in here.
146+
endpoint = "/" + parts["endpoint"]
147+
metric_name = get_metric_name_from_endpoint(endpoint)
148+
return HubbleMetric(
149+
name=metric_name,
150+
status=int(parts["status"]),
151+
http_method=parts["http_method"],
152+
timer=float(parts["timer"]),
153+
)
154+
155+
156+
def _match_logger_pattern(record: LogRecord) -> HubbleMetric:
157+
"""Match 'interesting' log messages."""
158+
message = record.msg
159+
# Capture kernel started events: https://github.com/jupyter-server/jupyter_server/blob/ac93dd8f60de54db99167016f0396959826b8ed9/jupyter_server/services/kernels/kernelmanager.py#L214
160+
if message.startswith("Kernel started:"):
161+
channel = message.split(" ")[1]
162+
return HubbleMetric(
163+
name="zmp_kernel_%s_connect_failure" % channel,
164+
status=None,
165+
http_method=None,
166+
timer=None,
167+
)
168+
# Capture kernel_info timeout events: https://github.com/jupyter-server/jupyter_server/blob/ac93dd8f60de54db99167016f0396959826b8ed9/jupyter_server/services/kernels/kernelmanager.py#L404
169+
if message.startswith("Timeout waiting for kernel_info_reply:"):
170+
return HubbleMetric(
171+
name="kernel_info_timeout",
172+
status=None,
173+
http_method=None,
174+
timer=None,
175+
)
176+
177+
178+
_hubble_matchers = [_match_tornado_pattern, _match_logger_pattern]
179+
180+
181+
def log_record_to_hubble_metric(record):
182+
for matcher in _hubble_matchers:
183+
hubble_metric = matcher(record)
184+
if hubble_metric:
185+
return hubble_metric
186+
187+
188+
class HubbleAgentConfigurable(SingletonConfigurable):
189+
"""Configurable Hubble Agent object."""
190+
191+
app_name = UnicodeFromEnv(
192+
name="HUBBLE_APP_NAME", default_value="datastudio-notebookserver-dev"
193+
).tag(config=True)
194+
data_center = UnicodeFromEnv(name="K8S_DATACENTER", default_value="local").tag(
195+
config=True
196+
)
197+
partition = UnicodeFromEnv(name="K8S_ENVIRONMENT", default_value="dev").tag(
198+
config=True
199+
)
200+
host = UnicodeFromEnv(name="K8S_APPLICATION", default_value="local").tag(
201+
config=True
202+
)
203+
instance_type = UnicodeFromEnv(name="K8S_INSTANCE", default_value="local").tag(
204+
config=True
205+
)
206+
base_url = UnicodeFromEnv(
207+
name="HUBBLE_URL", default_value="https://hubble-publish-pie-prod.apple.com"
208+
).tag(config=True)
209+
port = Unicode(default_value="443").tag(config=True)
210+
hubble_version = Unicode(default_value="2.1").tag(config=True)
211+
interval = Int(default_value=60).tag(config=True)
212+
enabled = BoolFromEnv(name="HUBBLE_ENABLED", default_value=False)
213+
214+
def __init__(self, *args, **kwargs): # pragma: no cover
215+
super().__init__(*args, **kwargs)
216+
self.agent = HubbleAgent(
217+
app_name=self.app_name,
218+
data_center=self.data_center,
219+
partition=self.partition,
220+
host=self.host,
221+
instance=self.instance_type,
222+
base_url=self.base_url,
223+
port=self.port,
224+
hubble_version=self.hubble_version,
225+
interval=self.interval,
226+
)
227+
if self.enabled:
228+
self.agent.enable()
229+
230+
def publish_hubble_metrics(
231+
self, name: str, status: int, http_method: str, latency_in_ms: float
232+
):
233+
if status and http_method and name:
234+
self.agent.capture_gauge("%s_%s_status" % (http_method, name), status)
235+
self.agent.capture_gauge("%s_%s_timer" % (http_method, name), latency_in_ms)
236+
self.agent.capture_count("%s_%s_count" % (http_method, name), 1)
237+
elif name:
238+
self.agent.capture_count(name, 1)
239+
240+
241+
class HubbleHandler(Handler):
242+
"""Logging Handler to capture raw log messages that
243+
can be converted to Hubble metrics.
244+
"""
245+
246+
@property
247+
def hubble_agent(self) -> Union[HubbleAgentConfigurable, None]: # pragma: no cover
248+
if HubbleAgentConfigurable._instance:
249+
return HubbleAgentConfigurable.instance()
250+
251+
def emit(self, record):
252+
# Get an instance of the Hubble Agent.
253+
agent = self.hubble_agent
254+
if agent:
255+
metric = log_record_to_hubble_metric(record)
256+
if metric:
257+
agent.publish_hubble_metrics(*metric)
258+
259+
def flush(self): # pragma: no cover
260+
pass
261+
262+
263+
def hubble(metric_name): # pragma: no cover
264+
"""Decorator for wrapping class methods that emit
265+
Hubble metrics. This is just simple syntax sugar.
266+
267+
Usage:
268+
```python
269+
class A:
270+
271+
@hubble("get", "something_cool")
272+
def method_that_emits_something(self, *args, **kwargs):
273+
...
274+
"""
275+
276+
def method_wrapper(method):
277+
# Needs to wrap a method, so adding one more layer
278+
# of depth to handle self.
279+
async def method_caller(self, *args, **kwargs):
280+
start = time.time()
281+
response = await method(self, *args, **kwargs)
282+
end = time.time()
283+
timer = (end - start) * 1000
284+
hubble = HubbleAgentConfigurable.instance()
285+
hubble_metric = HubbleMetric(
286+
name=metric_name,
287+
status=response.code,
288+
http_method=response.request.method,
289+
timer=timer,
290+
)
291+
hubble.publish_hubble_metrics(*hubble_metric)
292+
return response
293+
294+
return method_caller
295+
296+
return method_wrapper

0 commit comments

Comments
 (0)