Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release reader immediately when shutting down a pipe #1208

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -2175,7 +2175,7 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
|source|.[=ReadableStream/[[storedError]]=].
1. Otherwise, [=shutdown=] with |source|.[=ReadableStream/[[storedError]]=].
1. <strong>Errors must be propagated backward:</strong> if |dest|.[=WritableStream/[[state]]=]
is or becomes "`errored`", then
is or becomes "`erroring`" or "`errored`", then
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's useful to keep the pipe going when dest has already become "erroring"? Any new writes will just error immediately, as per step 9 of WritableStreamDefaultWriterWrite.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

1. If |preventCancel| is false, [=shutdown with an action=] of !
[$ReadableStreamCancel$](|source|, |dest|.[=WritableStream/[[storedError]]=]) and with
|dest|.[=WritableStream/[[storedError]]=].
Expand All @@ -2198,6 +2198,17 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
|originalError|, then:
1. If |shuttingDown| is true, abort these substeps.
1. Set |shuttingDown| to true.
1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform
! [$ReadableStreamBYOBReaderRelease$](|reader|).
1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|).
MattiasBuelens marked this conversation as resolved.
Show resolved Hide resolved
1. Set |reader| to ! [$AcquireReadableStreamDefaultReader$](|source|).
<p class="note">The initial reader is released to ensure that any pending read requests
are immediately aborted, and no more chunks are pulled from |source|. A new reader is
acquired in order to keep |source| locked until the shutdown is [=finalized=], for example
to [=cancel a readable stream|cancel=] |source| if necessary.
This exchange of readers is not observable to author code and the user agent is free to
implement this differently, for example by keeping the same reader and internally aborting
its pending read requests.
1. If |dest|.[=WritableStream/[[state]]=] is "`writable`" and !
[$WritableStreamCloseQueuedOrInFlight$](|dest|) is false,
1. If any [=chunks=] have been read but not yet written, write them to |dest|.
Expand All @@ -2210,6 +2221,10 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
ask to shutdown, optionally with an error |error|, then:
1. If |shuttingDown| is true, abort these substeps.
1. Set |shuttingDown| to true.
1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform
MattiasBuelens marked this conversation as resolved.
Show resolved Hide resolved
! [$ReadableStreamBYOBReaderRelease$](|reader|).
1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|).
1. Set |reader| to ! [$AcquireReadableStreamDefaultReader$](|source|).
1. If |dest|.[=WritableStream/[[state]]=] is "`writable`" and !
[$WritableStreamCloseQueuedOrInFlight$](|dest|) is false,
1. If any [=chunks=] have been read but not yet written, write them to |dest|.
Expand Down
20 changes: 16 additions & 4 deletions reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetac
const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js');
const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js');
const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort,
WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease,
WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight } = require('./writable-streams.js');
WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterIsOrBecomesErrored,
WritableStreamDefaultWriterRelease, WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight } =
require('./writable-streams.js');
const { CancelSteps, PullSteps, ReleaseSteps } = require('./internal-methods.js');

const ReadableByteStreamController = require('../../generated/ReadableByteStreamController.js');
Expand Down Expand Up @@ -134,7 +135,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
assert(IsReadableStreamLocked(source) === false);
assert(IsWritableStreamLocked(dest) === false);

const reader = AcquireReadableStreamDefaultReader(source);
let reader = AcquireReadableStreamDefaultReader(source);
const writer = AcquireWritableStreamDefaultWriter(dest);

source._disturbed = true;
Expand Down Expand Up @@ -200,6 +201,12 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
}

return transformPromiseWith(writer._readyPromise, () => {
if (shuttingDown === true) {
return promiseResolvedWith(true);
}
if (dest._state !== 'writable' || WritableStreamCloseQueuedOrInFlight(dest) === true) {
return promiseResolvedWith(true);
}
Comment on lines +207 to +209
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implements @domenic's suggestion from #1207 (comment).

I don't know if we need to update the spec text for this. It already specifies that these checks must happen before performing any reads and writes:

Shutdown must stop activity: if shuttingDown becomes true, the user agent must not initiate further reads from reader, and must only perform writes of already-read chunks, as described below. In particular, the user agent must check the below conditions before performing any reads or writes, since they might lead to immediate shutdown.

We should still add a test for this particular case (although that might not be easy looking at the discussion in #1207).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally don't think we need to update the spec text.

return new Promise((resolveRead, rejectRead) => {
ReadableStreamDefaultReaderRead(
reader,
Expand Down Expand Up @@ -228,7 +235,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
});

// Errors must be propagated backward
isOrBecomesErrored(dest, writer._closedPromise, storedError => {
WritableStreamDefaultWriterIsOrBecomesErrored(writer, () => {
Copy link
Collaborator Author

@MattiasBuelens MattiasBuelens Jan 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new helper allows attaching a synchronous callback for when dest becomes "erroring" or "errored", following the discussion in #1207 (comment). I added a test for this in web-platform-tests/wpt@1646d65.

const storedError = dest._storedError;
if (preventCancel === false) {
shutdownWithAction(() => ReadableStreamCancel(source, storedError), true, storedError);
} else {
Expand Down Expand Up @@ -289,6 +297,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
return;
}
shuttingDown = true;
ReadableStreamDefaultReaderRelease(reader);
reader = AcquireReadableStreamDefaultReader(source);

if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) {
uponFulfillment(waitForWritesToFinish(), doTheRest);
Expand All @@ -310,6 +320,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
return;
}
shuttingDown = true;
ReadableStreamDefaultReaderRelease(reader);
reader = AcquireReadableStreamDefaultReader(source);

if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) {
uponFulfillment(waitForWritesToFinish(), () => finalize(isError, error));
Expand Down
24 changes: 24 additions & 0 deletions reference-implementation/lib/abstract-ops/writable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Object.assign(exports, {
WritableStreamDefaultWriterClose,
WritableStreamDefaultWriterCloseWithErrorPropagation,
WritableStreamDefaultWriterGetDesiredSize,
WritableStreamDefaultWriterIsOrBecomesErrored,
WritableStreamDefaultWriterRelease,
WritableStreamDefaultWriterWrite
});
Expand Down Expand Up @@ -143,6 +144,8 @@ function SetUpWritableStreamDefaultWriter(writer, stream) {
writer._stream = stream;
stream._writer = writer;

writer._errorListeners = [];

const state = stream._state;

if (state === 'writable') {
Expand Down Expand Up @@ -378,6 +381,11 @@ function WritableStreamStartErroring(stream, reason) {
const writer = stream._writer;
if (writer !== undefined) {
WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
const errorListeners = writer._errorListeners;
writer._errorListeners = [];
for (const errorListener of errorListeners) {
errorListener();
}
}

if (WritableStreamHasOperationMarkedInFlight(stream) === false && controller._started === true) {
Expand Down Expand Up @@ -475,6 +483,20 @@ function WritableStreamDefaultWriterGetDesiredSize(writer) {
return WritableStreamDefaultControllerGetDesiredSize(stream._controller);
}

function WritableStreamDefaultWriterIsOrBecomesErrored(writer, errorListener) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe since this is not part of the standard, it should start with a lower-case letter?

Copy link
Collaborator Author

@MattiasBuelens MattiasBuelens Jan 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. 👍

What do you suggest we do put in the spec text?

  • We can add a note saying that "errors must be propagated backward" must be handled synchronously as soon as that condition becomes true, in other words that it's not enough to add an asynchronous callback to writer.closed.
    • On the other hand: since the condition is now "is or becomes erroring or errored", maybe it's already clear enough that writer.closed is not good enough?
  • We can add a note in WritableStreamStartErroring below the "set stream.[[state]] to "erroring"" step to remind implementers that this is where that condition from pipeTo() can become true.
  • Alternatively, we tell implementers to look at the reference implementation for an example on how to do it... 😛

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this very specific to the one change from "writable" to "erroring"? In the reference implementation you've added a much more general listener setup.

I would suggest a general note saying "for all the 'becomes' conditions in the above, they must be processed synchronously as part of the [[state]] update, before any other web developer code can run." And then, if we anticipate that only being impactful in the one transition, we could append the extra note: "NOTE: Currently this requirement only has observable consequences for [the transition for writable stream states from from "writable" to "erroring"], and others could be done as asynchronous listeners". Or, if we think we might expand this listener usage in the future, then we should probably omit that note.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this very specific to the one change from "writable" to "erroring"?

That's the most noticeable case, since it determines whether or not we may drop a chunk (i.e. we accidentally read a chunk that we can no longer write).

I'm not sure whether it matters for the state transitions of the readable end. There might be an edge case where two shutdown conditions become true at the same time, and then it matters which condition is handled first. For example:

readableController.error(error1); // pipeTo() should immediately call writer.abort(error1)
writableController.error(error2); // should be ignored, since writable is already erroring
// => pipeTo() rejects with error1

versus:

writableController.error(error2); // pipeTo() should immediately call reader.cancel(error2)
readableController.error(error1); // should be ignored, since readable is already closed
// => pipeTo() rejects with error2

If we were to use a synchronous reaction for the writable -> erroring transition but an asynchronous reaction for the readable -> errored transition, then the first snippet would also behave like the second one... I think. 😛

I'll try to whip up some more WPTs to double check.

Copy link
Collaborator Author

@MattiasBuelens MattiasBuelens Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are two possible tests for this:

promise_test(async t => {
  const rs = recordingReadableStream();
  const ws = recordingWritableStream();

  const pipeToPromise = rs.pipeTo(ws);
  await flushAsyncEvents();

  rs.controller.error(error1);
  ws.controller.error(error2);

  await promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo must reject with readable\'s error');
  assert_array_equals(rs.eventsWithoutPulls, []);
  assert_array_equals(ws.events, []);

  await promise_rejects_exactly(t, error1, rs.getReader().closed);
  await promise_rejects_exactly(t, error2, ws.getWriter().closed);
}, 'Piping: error the readable stream right before erroring the writable stream');

promise_test(async t => {
  const rs = recordingReadableStream();
  const ws = recordingWritableStream();

  const pipeToPromise = rs.pipeTo(ws);
  await flushAsyncEvents();

  ws.controller.error(error1);
  rs.controller.error(error2);

  await promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo must reject with writable\'s error');
  assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
  assert_array_equals(ws.events, []);

  await promise_rejects_exactly(t, error1, ws.getWriter().closed);
  await rs.getReader().closed;
}, 'Piping: error the writable stream right before erroring the readable stream');

The behavior might be a bit surprising though. In the first test, ws is still writable when we call rs.controller.error(), so we end up in:

uponFulfillment(waitForWritesToFinish(), doTheRest);

This adds at least one microtask of delay (even if there are no pending writes), so we will not yet call ws.abort(error1). Instead, ws.controller.error(error2) goes through, and the abort gets ignored later on.

However, in the second test, because ws immediately becomes errored, we don't wait for pending writes to complete and instead we synchronously call rs.cancel(error1). Therefore, rs.controller.error(error2) gets ignored, and the stream ends up cancelled instead of errored.


The specification is a bit vague about this. It says:

Wait until every chunk that has been read has been written (i.e. the corresponding promises have settled).

It doesn't say how long this step can take. We may want to require that if there are no pending writes (i.e. we've never started any writes, or all writes have already settled), then this step must complete synchronously. Then, in the first test, we would call ws.abort(error1) synchronously and prevent ws.controller.error(error2). However, that might be tricky to actually implement correctly... 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest limiting the sync part to as small as possible to fix the issue. This still leaves the problem of how to spec it. We've tried to give latitude for implementations to optimise in their own way, but we're increasingly constraining their behaviour. Transparent thread offloading etc. may become impossible. I'm worried about it but I don't have an answer.

Copy link
Collaborator Author

@MattiasBuelens MattiasBuelens Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest limiting the sync part to as small as possible to fix the issue.

The sync part is already minimal. We have to go from "if source becomes errored" all the way to "perform WritableStreamAbort" in order to avoid ws.controller.error() from affecting the result. Thus, the entirety of step 3 in "shutdown with an action" must become synchronous (only if there are no pending writes that need to be awaited).


Anyway, I found another way to fix it. We keep track of how many ReadableStreamDefaultReaderRead() requests are still outstanding, and we only handle the source.[[state]] == "closed" transition after all those requests are settled. See MattiasBuelens@3c8b3c2.

However, this test still fails. We do call dest.abort() and source.cancel() in the correct order, but it seems like underlyingSink.abort() and underlyingSource.cancel() are being called in the wrong order. When we call WritableStreamStartErroring, the writable controller is not yet started, so we postpone calling sink.abort() until after sink.start() resolves. Previously, the writable would already have been started while we were asynchronously waiting for the writes to finish (even if there were no pending writes).

Adding await flushAsyncEvents() before calling pipeTo() in that test restores the order and fixes the problem. Good enough? 🤷‍♂️


We've tried to give latitude for implementations to optimise in their own way, but we're increasingly constraining their behaviour. Transparent thread offloading etc. may become impossible. I'm worried about it but I don't have an answer.

I agree, the reference implementation is becoming increasingly complicated in order to deal with these edge cases. 😞

I'm wondering if it's even worth trying to spec these edge cases, or instead allow some wiggle room in how quickly pipeTo() must respond to these state transitions. But then it would become impossible to test the behavior, or we'd have to allow multiple valid outcomes... 😕

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect most of the testable constraints we're imposing are in cases where the web developer controls one or both ends of the pipe, right? I'm not sure those are the ones we were planning to feasibly optimize, so starting to constrain them still seems like the right thing to do to me. But I might be missing something so please let me know.

On the larger problem, the root of the issue seems to be how imprecise "[[state]] is or becomes" is. Does that mean: (1) synchronously after the algorithm step which sets [[state]], probably interrupting other streams-implementation code; (2) synchronously after any streams-implementation code runs; (3) synchronously after any browser code runs; (4) asynchronously is OK to some degree?

My preference would be to try to resolve things like so:

  • Decide whether we're OK constraining all observable behavior, or want to allow leeway. In particular when one or both ends of a pipe are web-developer-created streams, a good bit more becomes observable.
  • Write tests reflecting the result of the previous decision. E.g. if we want to nail down all observable behavior, I think @MattiasBuelens has done a great job capturing as many scenarios as possible. (❤️!) We should analyze them for what reasonable expected behavior is, and assert that. (If we don't have strong feelings on reasonable expected behavior, then we can feel free to change the assertions in the next step.)
  • Come up with some more-rigorous formulation of "[[state]] is or becomes" for the spec which meets the expectations of those tests. This probably will make the spec more complex, and more like the reference implementation. E.g. it could be adding promise handlers (probably in combination with something like MattiasBuelens@3c8b3c2), or having separate synchronous state-change steps. Given that this will only be used for pipeTo, we can probably consider spec strategies that aren't as detailed and algorithmic as the rest of the spec, but they do need to be clear and unambigious between the (1)-(4) above.

As an example of how to apply this process,

Adding await flushAsyncEvents() before calling pipeTo() in that test restores the order and fixes the problem. Good enough? 🤷‍♂️

My preference would be that, if we decide to constrain all observable behavior, we have both variants of the test, with the version without flushAsyncEvents() having the assert for the other order.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect most of the testable constraints we're imposing are in cases where the web developer controls one or both ends of the pipe, right? I'm not sure those are the ones we were planning to feasibly optimize, so starting to constrain them still seems like the right thing to do to me. But I might be missing something so please let me know.

Correct. Streams created by the user agent will use the exported algorithms, and I think it's safe to assume that those will be called in a separate task, outside of web author code.

On the larger problem, the root of the issue seems to be how imprecise "[[state]] is or becomes" is. Does that mean: (1) synchronously after the algorithm step which sets [[state]], probably interrupting other streams-implementation code; (2) synchronously after any streams-implementation code runs; (3) synchronously after any browser code runs; (4) asynchronously is OK to some degree?

(2) may be ill-specified, since there are cases where streams code calls into author code, which can then call back into streams code. We've even had cases in the past where streams code calls back into itself, e.g. #1172.

I still prefer (1), and that's what I've been implementing. Yes, we need to be very careful when speccing, but at least any problems that arise can be fixed within the streams implementation.

My preference would be that, if we decide to constrain all observable behavior, we have both variants of the test, with the version without flushAsyncEvents() having the assert for the other order.

That seems reasonable. 👍

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coming back to this:

Adding await flushAsyncEvents() before calling pipeTo() in that test restores the order and fixes the problem. Good enough? 🤷‍♂️

It seems the main difference is that, when you call readable.cancel(), we always synchronously call source.cancel() regardless of whether source.start() has already settled. On the other hand, when you call writable.abort(), we first wait for sink.start() to settle before we call sink.abort().

IIRC the reason for this difference is so you can do e.g. an async loop in source.start():

new ReadableStream({
  async start(c) {
    for (let i = 0; i < 10; i++) {
      await new Promise(r => setTimeout(r, 1000));
      c.enqueue("chunk");
    }
    c.close();
  }
})

whereas for sink.start() this doesn't make sense.

I guess, if we really wanted to, we could have the test check when writableController.signal becomes aborted? That should happen synchronously regardless of whether sink.start() has already settled.

const stream = writer._stream;
if (stream === undefined) {
return;
}

const state = stream._state;
if (state === 'writable') {
writer._errorListeners.push(errorListener);
} else if (state === 'erroring' || state === 'errored') {
errorListener();
}
}

function WritableStreamDefaultWriterRelease(writer) {
const stream = writer._stream;
assert(stream !== undefined);
Expand All @@ -491,6 +513,8 @@ function WritableStreamDefaultWriterRelease(writer) {

stream._writer = undefined;
writer._stream = undefined;

stream._errorListeners = [];
}

function WritableStreamDefaultWriterWrite(writer, chunk) {
Expand Down
2 changes: 1 addition & 1 deletion reference-implementation/web-platform-tests