Skip to content

Commit 505f94b

Browse files
authored
fix: Add better logging for scheduler (#24)
1 parent 17fee27 commit 505f94b

File tree

4 files changed

+34
-12
lines changed

4 files changed

+34
-12
lines changed

cloudquery/sdk/scheduler/scheduler.py

+18-5
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
SyncMigrateTableMessage,
1111
)
1212
from cloudquery.sdk.schema import Resource
13-
from .table_resolver import TableResolver
13+
from .table_resolver import TableResolver, Client
1414

1515
QUEUE_PER_WORKER = 100
1616

@@ -68,7 +68,7 @@ def shutdown(self):
6868
pool.shutdown()
6969

7070
def resolve_resource(
71-
self, resolver: TableResolver, client, parent: Resource, item: Any
71+
self, resolver: TableResolver, client: Client, parent: Resource, item: Any
7272
) -> Resource:
7373
resource = Resource(resolver.table, parent, item)
7474
resolver.pre_resource_resolve(client, resource)
@@ -81,18 +81,24 @@ def resolve_table(
8181
self,
8282
resolver: TableResolver,
8383
depth: int,
84-
client,
84+
client: Client,
8585
parent_item: Resource,
8686
res: queue.Queue,
8787
):
8888
try:
8989
if depth == 0:
9090
self._logger.info(
91-
"table resolver started", table=resolver.table.name, depth=depth
91+
"table resolver started",
92+
client_id=client.id(),
93+
table=resolver.table.name,
94+
depth=depth,
9295
)
9396
else:
9497
self._logger.debug(
95-
"table resolver started", table=resolver.table.name, depth=depth
98+
"table resolver started",
99+
client_id=client.id(),
100+
table=resolver.table.name,
101+
depth=depth,
96102
)
97103
total_resources = 0
98104
for item in resolver.resolve(client, parent_item):
@@ -103,6 +109,7 @@ def resolve_table(
103109
except Exception as e:
104110
self._logger.error(
105111
"failed to resolve resource",
112+
client_id=client.id(),
106113
table=resolver.table.name,
107114
depth=depth,
108115
exc_info=True,
@@ -123,19 +130,25 @@ def resolve_table(
123130
if depth == 0:
124131
self._logger.info(
125132
"table resolver finished successfully",
133+
client_id=client.id(),
126134
table=resolver.table.name,
135+
resources=total_resources,
127136
depth=depth,
128137
)
129138
else:
130139
self._logger.debug(
131140
"table resolver finished successfully",
141+
client_id=client.id(),
132142
table=resolver.table.name,
143+
resources=total_resources,
133144
depth=depth,
134145
)
135146
except Exception as e:
136147
self._logger.error(
137148
"table resolver finished with error",
149+
client_id=client.id(),
138150
table=resolver.table.name,
151+
resources=total_resources,
139152
depth=depth,
140153
exc_info=True,
141154
)

cloudquery/sdk/scheduler/table_resolver.py

+13-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,12 @@
1-
from typing import Any, Generator
1+
from cloudquery.sdk.schema.table import Table
2+
from cloudquery.sdk.schema import Resource
3+
from typing import Any, Generator, List
4+
5+
6+
class Client:
7+
def id(self) -> str:
8+
raise NotImplementedError()
9+
210

311
from cloudquery.sdk.schema import Resource
412
from cloudquery.sdk.schema.table import Table
@@ -17,22 +25,22 @@ def table(self) -> Table:
1725
def child_resolvers(self):
1826
return self._child_resolvers
1927

20-
def multiplex(self, client):
28+
def multiplex(self, client: Client) -> List[Client]:
2129
return [client]
2230

2331
def resolve(self, client, parent_resource) -> Generator[Any, None, None]:
2432
raise NotImplementedError()
2533

26-
def pre_resource_resolve(self, client, resource):
34+
def pre_resource_resolve(self, client: Client, resource):
2735
return
2836

29-
def resolve_column(self, client, resource: Resource, column_name: str):
37+
def resolve_column(self, client: Client, resource: Resource, column_name: str):
3038
if type(resource.item) is dict:
3139
if column_name in resource.item:
3240
resource.set(column_name, resource.item[column_name])
3341
else:
3442
if hasattr(resource.item, column_name):
3543
resource.set(column_name, getattr(resource.item, column_name))
3644

37-
def post_resource_resolve(self, client, resource):
45+
def post_resource_resolve(self, client: Client, resource):
3846
return

cloudquery/sdk/serve/plugin.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717

1818
def get_logger(args):
19-
log = structlog.get_logger()
19+
log = structlog.get_logger(processors=[structlog.processors.JSONRenderer()])
2020
return log
2121

2222

tests/scheduler/scheduler.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ def resolve(self, client, parent_resource) -> Generator[Any, None, None]:
3737

3838

3939
class TestClient:
40-
pass
40+
def id(self):
41+
return "test_client"
4142

4243

4344
def test_scheduler():

0 commit comments

Comments
 (0)