Skip to content

Commit 17fee27

Browse files
authored
fix: Fix race in scheduler (#25)
This fixes a data race in the scheduler that caused some child resources to be missed. The implementation is now correct, but it could be made more efficient. The point is that we have to submit the sentinel indicating the start of the resolver before submitting the job, otherwise the job might send a "finished" signal before the "start" signal is sent, causing the counts to be equal before all resources have actually been processed.
1 parent a53bb0e commit 17fee27

File tree

1 file changed

+11
-21
lines changed

1 file changed

+11
-21
lines changed

cloudquery/sdk/scheduler/scheduler.py

+11-21
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,8 @@ def __init__(self, maxsize, *args, **kwargs):
2222

2323

2424
class TableResolverStarted:
25-
def __init__(self, count=1) -> None:
26-
self._count = count
27-
28-
@property
29-
def count(self):
30-
return self._count
25+
def __init__(self) -> None:
26+
pass
3127

3228

3329
class TableResolverFinished:
@@ -89,7 +85,6 @@ def resolve_table(
8985
parent_item: Resource,
9086
res: queue.Queue,
9187
):
92-
table_resolvers_started = 0
9388
try:
9489
if depth == 0:
9590
self._logger.info(
@@ -115,6 +110,7 @@ def resolve_table(
115110
continue
116111
res.put(SyncInsertMessage(resource.to_arrow_record()))
117112
for child_resolvers in resolver.child_resolvers:
113+
res.put(TableResolverStarted())
118114
self._pools[depth + 1].submit(
119115
self.resolve_table,
120116
child_resolvers,
@@ -123,7 +119,6 @@ def resolve_table(
123119
resource,
124120
res,
125121
)
126-
table_resolvers_started += 1
127122
total_resources += 1
128123
if depth == 0:
129124
self._logger.info(
@@ -145,7 +140,6 @@ def resolve_table(
145140
exc_info=True,
146141
)
147142
finally:
148-
res.put(TableResolverStarted(count=table_resolvers_started))
149143
res.put(TableResolverFinished())
150144

151145
def _sync(
@@ -155,17 +149,13 @@ def _sync(
155149
res: queue.Queue,
156150
deterministic_cq_id=False,
157151
):
158-
total_table_resolvers = 0
159-
try:
160-
for resolver in resolvers:
161-
clients = resolver.multiplex(client)
162-
for client in clients:
163-
self._pools[0].submit(
164-
self.resolve_table, resolver, 0, client, None, res
165-
)
166-
total_table_resolvers += 1
167-
finally:
168-
res.put(TableResolverStarted(total_table_resolvers))
152+
for resolver in resolvers:
153+
clients = resolver.multiplex(client)
154+
for client in clients:
155+
res.put(TableResolverStarted())
156+
self._pools[0].submit(
157+
self.resolve_table, resolver, 0, client, None, res
158+
)
169159

170160
def sync(
171161
self, client, resolvers: List[TableResolver], deterministic_cq_id=False
@@ -180,7 +170,7 @@ def sync(
180170
while True:
181171
message = res.get()
182172
if type(message) == TableResolverStarted:
183-
total_table_resolvers += message.count
173+
total_table_resolvers += 1
184174
if total_table_resolvers == finished_table_resolvers:
185175
break
186176
continue

0 commit comments

Comments
 (0)