Skip to content

Commit 0e57077

Browse files
committed
Change stream results to indicate cancellation
Resolves #444
1 parent c261e8b commit 0e57077

File tree

3 files changed

+190
-131
lines changed

3 files changed

+190
-131
lines changed

design/mvp/CanonicalABI.md

+44-30
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ values *into* the buffer's memory. Buffers are represented by the following 3
355355
abstract Python classes:
356356
```python
357357
class Buffer:
358-
MAX_LENGTH = 2**30 - 1
358+
MAX_LENGTH = 2**28 - 1
359359
t: ValType
360360
remain: Callable[[], int]
361361

@@ -1056,7 +1056,7 @@ stream.)
10561056
```python
10571057
RevokeBuffer = Callable[[], None]
10581058
OnPartialCopy = Callable[[RevokeBuffer], None]
1059-
OnCopyDone = Callable[[], None]
1059+
OnCopyDone = Callable[[Literal['completed','cancelled']], None]
10601060

10611061
class ReadableStream:
10621062
t: ValType
@@ -1069,7 +1069,8 @@ The key operation is `read` which works as follows:
10691069
* `read` is non-blocking, returning `'blocked'` if it would have blocked.
10701070
* The `On*` callbacks are only called *after* `read` returns `'blocked'`.
10711071
* `OnCopyDone` is called to indicate that the caller has regained ownership of
1072-
the buffer.
1072+
the buffer and whether this was due to the read/write completing or
1073+
being cancelled.
10731074
* `OnPartialCopy` is called to indicate a partial write has been made to the
10741075
buffer, but there may be further writes made in the future, so the caller
10751076
has *not* regained ownership of the buffer.
@@ -1122,21 +1123,21 @@ If set, the `pending_*` fields record the `Buffer` and `On*` callbacks of a
11221123
`read`. Closing the readable or writable end of a stream or cancelling a `read`
11231124
or `write` notifies any pending `read` or `write` via its `OnCopyDone`
11241125
callback, which lets the other side know that ownership of the `Buffer` has
1125-
been returned:
1126+
been returned and why:
11261127
```python
1127-
def reset_and_notify_pending(self):
1128+
def reset_and_notify_pending(self, why):
11281129
pending_on_copy_done = self.pending_on_copy_done
11291130
self.reset_pending()
1130-
pending_on_copy_done()
1131+
pending_on_copy_done(why)
11311132

11321133
def cancel(self):
1133-
self.reset_and_notify_pending()
1134+
self.reset_and_notify_pending('cancelled')
11341135

11351136
def close(self):
11361137
if not self.closed_:
11371138
self.closed_ = True
11381139
if self.pending_buffer:
1139-
self.reset_and_notify_pending()
1140+
self.reset_and_notify_pending('completed')
11401141

11411142
def closed(self):
11421143
return self.closed_
@@ -1180,7 +1181,7 @@ but in the opposite direction. Both are implemented by a single underlying
11801181
if self.pending_buffer.remain() > 0:
11811182
self.pending_on_partial_copy(self.reset_pending)
11821183
else:
1183-
self.reset_and_notify_pending()
1184+
self.reset_and_notify_pending('completed')
11841185
return 'done'
11851186
```
11861187

@@ -1241,10 +1242,10 @@ and closing once a value has been read-from or written-to the given buffer:
12411242
class FutureEnd(StreamEnd):
12421243
def close_after_copy(self, copy_op, buffer, on_copy_done):
12431244
assert(buffer.remain() == 1)
1244-
def on_copy_done_wrapper():
1245+
def on_copy_done_wrapper(why):
12451246
if buffer.remain() == 0:
12461247
self.stream.close()
1247-
on_copy_done()
1248+
on_copy_done(why)
12481249
ret = copy_op(buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper)
12491250
if ret == 'done' and buffer.remain() == 0:
12501251
self.stream.close()
@@ -3552,7 +3553,8 @@ multiple partial copies before having to context-switch back.
35523553
```python
35533554
if opts.sync:
35543555
final_revoke_buffer = None
3555-
def on_partial_copy(revoke_buffer):
3556+
def on_partial_copy(revoke_buffer, why = 'completed'):
3557+
assert(why == 'completed')
35563558
nonlocal final_revoke_buffer
35573559
final_revoke_buffer = revoke_buffer
35583560
if not async_copy.done():
@@ -3563,6 +3565,8 @@ multiple partial copies before having to context-switch back.
35633565
await task.wait_on(async_copy, sync = True)
35643566
final_revoke_buffer()
35653567
```
3568+
(When non-cooperative threads are added, the assertion that synchronous copies
3569+
can only be `completed`, and not `cancelled`, will no longer hold.)
35663570

35673571
In the asynchronous case, the `on_*` callbacks set a pending event on the
35683572
`Waitable` which will be delivered to core wasm when core wasm calls
@@ -3573,36 +3577,46 @@ allowing multiple partial copies to complete in the interim, reducing overall
35733577
context-switching overhead.
35743578
```python
35753579
else:
3576-
def copy_event(revoke_buffer):
3580+
def copy_event(why, revoke_buffer):
35773581
revoke_buffer()
35783582
e.copying = False
3579-
return (event_code, i, pack_copy_result(task, buffer, e))
3583+
return (event_code, i, pack_copy_result(task, e, buffer, why))
35803584
def on_partial_copy(revoke_buffer):
3581-
e.set_event(partial(copy_event, revoke_buffer))
3582-
def on_copy_done():
3583-
e.set_event(partial(copy_event, revoke_buffer = lambda:()))
3585+
e.set_event(partial(copy_event, 'completed', revoke_buffer))
3586+
def on_copy_done(why):
3587+
e.set_event(partial(copy_event, why, revoke_buffer = lambda:()))
35843588
if e.copy(buffer, on_partial_copy, on_copy_done) != 'done':
35853589
e.copying = True
35863590
return [BLOCKED]
3587-
return [pack_copy_result(task, buffer, e)]
3591+
return [pack_copy_result(task, e, buffer, 'completed')]
35883592
```
35893593
However the copy completes, the results are reported to the caller via
35903594
`pack_copy_result`:
35913595
```python
3592-
BLOCKED = 0xffff_ffff
3593-
CLOSED = 0x8000_0000
3596+
BLOCKED = 0xffff_ffff
3597+
COMPLETED = 0x0
3598+
CLOSED = 0x1
3599+
CANCELLED = 0x2
35943600

3595-
def pack_copy_result(task, buffer, e):
3596-
if buffer.progress or not e.stream.closed():
3597-
assert(buffer.progress <= Buffer.MAX_LENGTH < BLOCKED)
3598-
assert(not (buffer.progress & CLOSED))
3599-
return buffer.progress
3601+
def pack_copy_result(task, e, buffer, why):
3602+
if e.stream.closed():
3603+
result = CLOSED
3604+
elif why == 'cancelled':
3605+
result = CANCELLED
36003606
else:
3601-
return CLOSED
3602-
```
3603-
The order of tests here indicates that, if some progress was made and then the
3604-
stream was closed, only the progress is reported and the `CLOSED` status is
3605-
left to be discovered next time.
3607+
assert(why == 'completed')
3608+
assert(not isinstance(e, FutureEnd))
3609+
result = COMPLETED
3610+
assert(buffer.progress <= Buffer.MAX_LENGTH < 2**28)
3611+
packed = result | (buffer.progress << 4)
3612+
assert(packed != BLOCKED)
3613+
return packed
3614+
```
3615+
The `result` indicates whether the stream was closed by the other end, the
3616+
copy was cancelled by this end (via `{stream,future}.cancel-{read,write}`) or,
3617+
otherwise, completed successfully. In all cases, any number of elements (from
3618+
`0` to `n`) may have *first* been copied into or out of the buffer passed to
3619+
the `read` or `write` and so this number is packed into the `i32` result.
36063620

36073621

36083622
### 🔀 `canon {stream,future}.cancel-{read,write}`

design/mvp/canonical-abi/definitions.py

+35-26
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ def __init__(self, impl, dtor = None, dtor_sync = True, dtor_callback = None):
303303
#### Buffer State
304304

305305
class Buffer:
306-
MAX_LENGTH = 2**30 - 1
306+
MAX_LENGTH = 2**28 - 1
307307
t: ValType
308308
remain: Callable[[], int]
309309

@@ -638,7 +638,7 @@ def drop(self):
638638

639639
RevokeBuffer = Callable[[], None]
640640
OnPartialCopy = Callable[[RevokeBuffer], None]
641-
OnCopyDone = Callable[[], None]
641+
OnCopyDone = Callable[[Literal['completed','cancelled']], None]
642642

643643
class ReadableStream:
644644
t: ValType
@@ -665,19 +665,19 @@ def reset_pending(self):
665665
self.pending_on_partial_copy = None
666666
self.pending_on_copy_done = None
667667

668-
def reset_and_notify_pending(self):
668+
def reset_and_notify_pending(self, why):
669669
pending_on_copy_done = self.pending_on_copy_done
670670
self.reset_pending()
671-
pending_on_copy_done()
671+
pending_on_copy_done(why)
672672

673673
def cancel(self):
674-
self.reset_and_notify_pending()
674+
self.reset_and_notify_pending('cancelled')
675675

676676
def close(self):
677677
if not self.closed_:
678678
self.closed_ = True
679679
if self.pending_buffer:
680-
self.reset_and_notify_pending()
680+
self.reset_and_notify_pending('completed')
681681

682682
def closed(self):
683683
return self.closed_
@@ -703,7 +703,7 @@ def copy(self, buffer, on_partial_copy, on_copy_done, src, dst):
703703
if self.pending_buffer.remain() > 0:
704704
self.pending_on_partial_copy(self.reset_pending)
705705
else:
706-
self.reset_and_notify_pending()
706+
self.reset_and_notify_pending('completed')
707707
return 'done'
708708

709709
class StreamEnd(Waitable):
@@ -734,10 +734,10 @@ def copy(self, src, on_partial_copy, on_copy_done):
734734
class FutureEnd(StreamEnd):
735735
def close_after_copy(self, copy_op, buffer, on_copy_done):
736736
assert(buffer.remain() == 1)
737-
def on_copy_done_wrapper():
737+
def on_copy_done_wrapper(why):
738738
if buffer.remain() == 0:
739739
self.stream.close()
740-
on_copy_done()
740+
on_copy_done(why)
741741
ret = copy_op(buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper)
742742
if ret == 'done' and buffer.remain() == 0:
743743
self.stream.close()
@@ -2033,7 +2033,8 @@ async def copy(EndT, BufferT, event_code, t, opts, task, i, ptr, n):
20332033
buffer = BufferT(t, cx, ptr, n)
20342034
if opts.sync:
20352035
final_revoke_buffer = None
2036-
def on_partial_copy(revoke_buffer):
2036+
def on_partial_copy(revoke_buffer, why = 'completed'):
2037+
assert(why == 'completed')
20372038
nonlocal final_revoke_buffer
20382039
final_revoke_buffer = revoke_buffer
20392040
if not async_copy.done():
@@ -2044,29 +2045,37 @@ def on_partial_copy(revoke_buffer):
20442045
await task.wait_on(async_copy, sync = True)
20452046
final_revoke_buffer()
20462047
else:
2047-
def copy_event(revoke_buffer):
2048+
def copy_event(why, revoke_buffer):
20482049
revoke_buffer()
20492050
e.copying = False
2050-
return (event_code, i, pack_copy_result(task, buffer, e))
2051+
return (event_code, i, pack_copy_result(task, e, buffer, why))
20512052
def on_partial_copy(revoke_buffer):
2052-
e.set_event(partial(copy_event, revoke_buffer))
2053-
def on_copy_done():
2054-
e.set_event(partial(copy_event, revoke_buffer = lambda:()))
2053+
e.set_event(partial(copy_event, 'completed', revoke_buffer))
2054+
def on_copy_done(why):
2055+
e.set_event(partial(copy_event, why, revoke_buffer = lambda:()))
20552056
if e.copy(buffer, on_partial_copy, on_copy_done) != 'done':
20562057
e.copying = True
20572058
return [BLOCKED]
2058-
return [pack_copy_result(task, buffer, e)]
2059-
2060-
BLOCKED = 0xffff_ffff
2061-
CLOSED = 0x8000_0000
2062-
2063-
def pack_copy_result(task, buffer, e):
2064-
if buffer.progress or not e.stream.closed():
2065-
assert(buffer.progress <= Buffer.MAX_LENGTH < BLOCKED)
2066-
assert(not (buffer.progress & CLOSED))
2067-
return buffer.progress
2059+
return [pack_copy_result(task, e, buffer, 'completed')]
2060+
2061+
BLOCKED = 0xffff_ffff
2062+
COMPLETED = 0x0
2063+
CLOSED = 0x1
2064+
CANCELLED = 0x2
2065+
2066+
def pack_copy_result(task, e, buffer, why):
2067+
if e.stream.closed():
2068+
result = CLOSED
2069+
elif why == 'cancelled':
2070+
result = CANCELLED
20682071
else:
2069-
return CLOSED
2072+
assert(why == 'completed')
2073+
assert(not isinstance(e, FutureEnd))
2074+
result = COMPLETED
2075+
assert(buffer.progress <= Buffer.MAX_LENGTH < 2**28)
2076+
packed = result | (buffer.progress << 4)
2077+
assert(packed != BLOCKED)
2078+
return packed
20702079

20712080
### 🔀 `canon {stream,future}.cancel-{read,write}`
20722081

0 commit comments

Comments
 (0)