Skip to content

Better logging for backpressure #81648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Dec 10, 2024
31 changes: 30 additions & 1 deletion src/sentry/processing/backpressure/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class ServiceMemory:
used: int
available: int
percentage: float
host: str | None = None
port: int | None = None

def __init__(self, name: str, used: int, available: int):
self.name = name
Expand All @@ -21,6 +23,12 @@ def __init__(self, name: str, used: int, available: int):
self.percentage = used / available


@dataclass
class NodeInfo:
host: str | None
port: int | None


def query_rabbitmq_memory_usage(host: str) -> ServiceMemory:
"""Returns the currently used memory and the memory limit of a
RabbitMQ host.
Expand Down Expand Up @@ -51,6 +59,23 @@ def get_memory_usage(node_id: str, info: Mapping[str, Any]) -> ServiceMemory:
return ServiceMemory(node_id, memory_used, memory_available)


def get_host_port_info(node_id: str, cluster: Cluster) -> NodeInfo:
"""
Extract the host and port of the redis node in the cluster.
"""
try:
if isinstance(cluster, RedisCluster):
# RedisCluster node mapping
node = cluster.connection_pool.nodes.nodes.get(node_id)
return NodeInfo(node["host"], node["port"])
else:
# rb.Cluster node mapping
node = cluster.hosts[node_id]
return NodeInfo(node.host, node.port)
except Exception:
return NodeInfo(None, None)


def iter_cluster_memory_usage(cluster: Cluster) -> Generator[ServiceMemory, None, None]:
"""
A generator that yields redis `INFO` results for each of the nodes in the `cluster`.
Expand All @@ -65,4 +90,8 @@ def iter_cluster_memory_usage(cluster: Cluster) -> Generator[ServiceMemory, None
cluster_info = promise.value

for node_id, info in cluster_info.items():
yield get_memory_usage(node_id, info)
node_info = get_host_port_info(node_id, cluster)
memory_usage = get_memory_usage(node_id, info)
memory_usage.host = node_info.host
memory_usage.port = node_info.port
yield memory_usage
10 changes: 10 additions & 0 deletions src/sentry/processing/backpressure/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ def check_service_health(services: Mapping[str, Service]) -> MutableMapping[str,
reasons = []

logger.info("Checking service `%s` (configured high watermark: %s):", name, high_watermark)
memory = None
try:
for memory in check_service_memory(service):
if memory.percentage >= high_watermark:
reasons.append(memory)
logger.info("Checking node: %s:%s", memory.host, memory.port)
logger.info(
" name: %s, used: %s, available: %s, percentage: %s",
memory.name,
Expand All @@ -101,6 +103,14 @@ def check_service_health(services: Mapping[str, Service]) -> MutableMapping[str,
scope.set_tag("service", name)
sentry_sdk.capture_exception(e)
unhealthy_services[name] = e
host = memory.host if memory else "unknown"
port = memory.port if memory else "unknown"
logger.exception(
"Error while processing node %s:%s for service %s",
host,
port,
service,
)
else:
unhealthy_services[name] = reasons

Expand Down
24 changes: 24 additions & 0 deletions tests/sentry/processing/backpressure/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ def test_rb_cluster_returns_some_usage() -> None:
assert memory.used > 0
assert memory.available > 0
assert 0.0 < memory.percentage < 1.0
assert memory.host == "localhost"
assert memory.port == 6379


@use_redis_cluster()
Expand All @@ -35,6 +37,8 @@ def test_redis_cluster_cluster_returns_some_usage() -> None:
assert memory.used > 0
assert memory.available > 0
assert 0.0 < memory.percentage < 1.0
assert memory.host == "127.0.0.1"
assert memory.port in {7000, 7001, 7002, 7003, 7004, 7005}


@use_redis_cluster(high_watermark=100)
Expand All @@ -47,6 +51,14 @@ def test_redis_health():
assert isinstance(redis_services, list)
assert len(redis_services) == 0

usage = list(iter_cluster_memory_usage(services["redis"].cluster))
for memory in usage:
assert memory.used >= 0
assert memory.available > 0
assert 0.0 < memory.percentage <= 1.0
assert memory.host == "127.0.0.1"
assert memory.port in {7000, 7001, 7002, 7003, 7004, 7005}


@use_redis_cluster(high_watermark=0)
def test_redis_unhealthy_state():
Expand All @@ -57,3 +69,15 @@ def test_redis_unhealthy_state():
redis_services = unhealthy_services.get("redis")
assert isinstance(redis_services, list)
assert len(redis_services) == 6

usage = list(iter_cluster_memory_usage(services["redis"].cluster))
for memory in usage:
assert memory.used >= 0
assert memory.available > 0
assert 0.0 < memory.percentage <= 1.0
assert memory.host == "127.0.0.1"
assert memory.port in {7000, 7001, 7002, 7003, 7004, 7005}

for memory in redis_services:
assert memory.host == "127.0.0.1"
assert memory.port in {7000, 7001, 7002, 7003, 7004, 7005}
Loading