Skip to content

Commit 8d5fa02

Browse files
committed
Decorate SchedulerState methods with @ccall
Should allow faster C API calls to these methods in addition to their existing Python APIs. Also disable exception checking when Python objects are `return`ed since these already can and are checked for an exception. Note that `@ccall` is not permitted on functions taking `**kwargs` so those have been skipped.
1 parent 902b0e0 commit 8d5fa02

File tree

1 file changed

+46
-0
lines changed

1 file changed

+46
-0
lines changed

distributed/scheduler.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1686,6 +1686,8 @@ def validate(self, v: bint):
16861686
def workers(self):
16871687
return self._workers
16881688

1689+
@ccall
1690+
@exceptval(check=False)
16891691
def _remove_from_processing(self, ts: TaskState) -> str:
16901692
"""
16911693
Remove *ts* from the set of processing tasks.
@@ -1771,6 +1773,8 @@ def _add_to_memory(
17711773
recommendations=recommendations,
17721774
)
17731775

1776+
@ccall
1777+
@exceptval(check=False)
17741778
def transition_released_waiting(self, key) -> tuple:
17751779
try:
17761780
ts: TaskState = self._tasks[key]
@@ -1826,6 +1830,8 @@ def transition_released_waiting(self, key) -> tuple:
18261830
pdb.set_trace()
18271831
raise
18281832

1833+
@ccall
1834+
@exceptval(check=False)
18291835
def transition_no_worker_waiting(self, key) -> tuple:
18301836
try:
18311837
ts: TaskState = self._tasks[key]
@@ -1873,6 +1879,8 @@ def transition_no_worker_waiting(self, key) -> tuple:
18731879
pdb.set_trace()
18741880
raise
18751881

1882+
@ccall
1883+
@exceptval(check=False)
18761884
def decide_worker(self, ts: TaskState) -> WorkerState:
18771885
"""
18781886
Decide on a worker for task *ts*. Return a WorkerState.
@@ -1916,6 +1924,8 @@ def decide_worker(self, ts: TaskState) -> WorkerState:
19161924

19171925
return ws
19181926

1927+
@ccall
1928+
@exceptval(check=False)
19191929
def transition_waiting_processing(self, key) -> tuple:
19201930
try:
19211931
ts: TaskState = self._tasks[key]
@@ -2121,6 +2131,8 @@ def transition_processing_memory(
21212131
pdb.set_trace()
21222132
raise
21232133

2134+
@ccall
2135+
@exceptval(check=False)
21242136
def transition_memory_released(self, key, safe: bint = False) -> tuple:
21252137
ws: WorkerState
21262138
try:
@@ -2194,6 +2206,8 @@ def transition_memory_released(self, key, safe: bint = False) -> tuple:
21942206
pdb.set_trace()
21952207
raise
21962208

2209+
@ccall
2210+
@exceptval(check=False)
21972211
def transition_released_erred(self, key) -> tuple:
21982212
try:
21992213
ts: TaskState = self._tasks[key]
@@ -2240,6 +2254,8 @@ def transition_released_erred(self, key) -> tuple:
22402254
pdb.set_trace()
22412255
raise
22422256

2257+
@ccall
2258+
@exceptval(check=False)
22432259
def transition_erred_released(self, key) -> tuple:
22442260
try:
22452261
ts: TaskState = self._tasks[key]
@@ -2281,6 +2297,8 @@ def transition_erred_released(self, key) -> tuple:
22812297
pdb.set_trace()
22822298
raise
22832299

2300+
@ccall
2301+
@exceptval(check=False)
22842302
def transition_waiting_released(self, key) -> tuple:
22852303
try:
22862304
ts: TaskState = self._tasks[key]
@@ -2320,6 +2338,8 @@ def transition_waiting_released(self, key) -> tuple:
23202338
pdb.set_trace()
23212339
raise
23222340

2341+
@ccall
2342+
@exceptval(check=False)
23232343
def transition_processing_released(self, key) -> tuple:
23242344
try:
23252345
ts: TaskState = self._tasks[key]
@@ -2446,6 +2466,8 @@ def transition_processing_erred(
24462466
pdb.set_trace()
24472467
raise
24482468

2469+
@ccall
2470+
@exceptval(check=False)
24492471
def transition_no_worker_released(self, key) -> tuple:
24502472
try:
24512473
ts: TaskState = self._tasks[key]
@@ -2475,6 +2497,8 @@ def transition_no_worker_released(self, key) -> tuple:
24752497
pdb.set_trace()
24762498
raise
24772499

2500+
@ccall
2501+
@exceptval(check=False)
24782502
def _propagate_forgotten(
24792503
self, ts: TaskState, recommendations: dict, worker_msgs: dict
24802504
):
@@ -2514,6 +2538,8 @@ def _propagate_forgotten(
25142538
worker_msgs[w] = {"op": "delete-data", "keys": [key], "report": False}
25152539
ts._who_has.clear()
25162540

2541+
@ccall
2542+
@exceptval(check=False)
25172543
def transition_memory_forgotten(self, key) -> tuple:
25182544
ws: WorkerState
25192545
try:
@@ -2557,6 +2583,8 @@ def transition_memory_forgotten(self, key) -> tuple:
25572583
pdb.set_trace()
25582584
raise
25592585

2586+
@ccall
2587+
@exceptval(check=False)
25602588
def transition_released_forgotten(self, key) -> tuple:
25612589
try:
25622590
ts: TaskState = self._tasks[key]
@@ -2595,6 +2623,8 @@ def transition_released_forgotten(self, key) -> tuple:
25952623
pdb.set_trace()
25962624
raise
25972625

2626+
@ccall
2627+
@exceptval(check=False)
25982628
def check_idle_saturated(self, ws: WorkerState, occ: double = -1.0):
25992629
"""Update the status of the idle and saturated state
26002630
@@ -2636,6 +2666,8 @@ def check_idle_saturated(self, ws: WorkerState, occ: double = -1.0):
26362666

26372667
saturated.discard(ws)
26382668

2669+
@ccall
2670+
@exceptval(check=False)
26392671
def _client_releases_keys(
26402672
self, keys=None, client=None, recommendations: dict = None
26412673
):
@@ -2660,6 +2692,8 @@ def _client_releases_keys(
26602692
elif ts._state != "erred" and not ts._waiters:
26612693
recommendations[ts._key] = "released"
26622694

2695+
@ccall
2696+
@exceptval(check=False)
26632697
def _task_to_msg(self, ts: TaskState, duration=None) -> dict:
26642698
""" Convert a single computational task to a message """
26652699
ws: WorkerState
@@ -2697,6 +2731,8 @@ def _task_to_msg(self, ts: TaskState, duration=None) -> dict:
26972731

26982732
return msg
26992733

2734+
@ccall
2735+
@exceptval(check=False)
27002736
def _task_to_report_msg(self, ts: TaskState) -> dict:
27012737
if ts is None:
27022738
return {"op": "cancelled-key", "key": ts._key}
@@ -2715,6 +2751,8 @@ def _task_to_report_msg(self, ts: TaskState) -> dict:
27152751
else:
27162752
return None
27172753

2754+
@ccall
2755+
@exceptval(check=False)
27182756
def _task_to_client_msgs(self, ts: TaskState) -> dict:
27192757
cs: ClientState
27202758
clients: dict = self._clients
@@ -2734,6 +2772,8 @@ def _task_to_client_msgs(self, ts: TaskState) -> dict:
27342772

27352773
return client_msgs
27362774

2775+
@ccall
2776+
@exceptval(check=False)
27372777
def _reevaluate_occupancy_worker(self, ws: WorkerState):
27382778
""" See reevaluate_occupancy """
27392779
old = ws._occupancy
@@ -2758,6 +2798,7 @@ def _reevaluate_occupancy_worker(self, ws: WorkerState):
27582798
steal.remove_key_from_stealable(ts)
27592799
steal.put_key_in_stealable(ts)
27602800

2801+
@ccall
27612802
def get_comm_cost(self, ts: TaskState, ws: WorkerState) -> double:
27622803
"""
27632804
Get the estimated communication cost (in s.) to compute the task
@@ -2771,6 +2812,7 @@ def get_comm_cost(self, ts: TaskState, ws: WorkerState) -> double:
27712812
nbytes += dts._nbytes
27722813
return nbytes / bandwidth
27732814

2815+
@ccall
27742816
def get_task_duration(self, ts: TaskState, default: double = -1) -> double:
27752817
"""
27762818
Get the estimated computation cost of the given task
@@ -2787,6 +2829,8 @@ def get_task_duration(self, ts: TaskState, default: double = -1) -> double:
27872829

27882830
return duration
27892831

2832+
@ccall
2833+
@exceptval(check=False)
27902834
def valid_workers(self, ts: TaskState) -> set:
27912835
"""Return set of currently valid workers for key
27922836
@@ -2837,6 +2881,8 @@ def valid_workers(self, ts: TaskState) -> set:
28372881

28382882
return s
28392883

2884+
@ccall
2885+
@exceptval(check=False)
28402886
def worker_objective(self, ts: TaskState, ws: WorkerState) -> tuple:
28412887
"""
28422888
Objective function to determine which worker should get the task

0 commit comments

Comments
 (0)