Skip to content
This repository was archived by the owner on Jan 14, 2025. It is now read-only.

Add ProcessManager and sharedStdIn #6

Merged
merged 7 commits into from
Jul 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@

- Initial commit of...
- `FutureOr<bool> String isExecutable(path)`.
- `ExitCode`.
- `ExitCode`
- `ProcessManager` and `Spawn`
- `sharedStdIn` and `SharedStdIn`
58 changes: 57 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

Contains utilities for the Dart VM's `dart:io`.

[![Build Status](https://travis-ci.org/dart-lang/io.svg?branch=master)](https://travis-ci.org/dart-lang/io)

## Usage

### Files
Expand All @@ -15,4 +17,58 @@ operating system.

#### `ExitCode`

An enum-type class that contains known exit codes.
An enum-type class that contains known exit codes.

#### `ProcessManager`

A higher-level service for spawning and communicating with processes.

##### Use `spawn` to create a process with std[in|out|err] forwarded by default

```dart
/// Runs `dartfmt` commands and `pub publish`.
Future<Null> main() async {
final manager = new ProcessManager();

// Runs dartfmt --version and outputs the result via stdout.
print('Running dartfmt --version');
var spawn = await manager.spawn('dartfmt', arguments: ['--version']);
await spawn.exitCode;

// Runs dartfmt -n . and outputs the result via stdout.
print('Running dartfmt -n .');
spawn = await manager.spawn('dartfmt', arguments: ['-n', '.']);
await spawn.exitCode;

// Runs pub publish. Upon hitting a blocking stdin state, you may directly
// output to the processes's stdin via your own, similar to how a bash or
// shell script would spawn a process.
print('Running pub publish');
spawn = await manager.spawn('pub', arguments: ['publish']);
await spawn.exitCode;

// Closes stdin for the entire program.
await sharedStdIn.terminate();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do, exactly? Practically speaking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closes the underlying stdin stream (allowing the isolate to exit).

}
```

#### `sharedStdIn`

A safer version of the default `stdin` stream from `dart:io` that allows a
subscriber to cancel their subscription, and then allows a _new_ subscriber to
start listening. This differs from the default behavior where only a single
listener is ever allowed in the application lifecycle:

```dart
test('should allow multiple subscribers', () async {
final logs = <String>[];
final asUtf8 = sharedStdIn.transform(UTF8.decoder);
// Wait for input for the user.
logs.add(await asUtf8.first);
// Wait for more input for the user.
logs.add(await asUtf8.first);
expect(logs, ['Hello World', 'Goodbye World']);
});
```

For testing, an instance of `SharedStdIn` may be created directly.
32 changes: 32 additions & 0 deletions example/spawn_process.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:io/io.dart';

/// Runs `dartfmt` commands and `pub publish`.
Future<Null> main() async {
final manager = new ProcessManager();

// Runs dartfmt --version and outputs the result via stdout.
print('Running dartfmt --version');
var spawn = await manager.spawn('dartfmt', arguments: ['--version']);
await spawn.exitCode;

// Runs dartfmt -n . and outputs the result via stdout.
print('Running dartfmt -n .');
spawn = await manager.spawn('dartfmt', arguments: ['-n', '.']);
await spawn.exitCode;

// Runs pub publish. Upon hitting a blocking stdin state, you may directly
// output to the processes's stdin via your own, similar to how a bash or
// shell script would spawn a process.
print('Running pub publish');
spawn = await manager.spawn('pub', arguments: ['publish']);
await spawn.exitCode;

// Closes stdin for the entire program.
await sharedStdIn.terminate();
}
2 changes: 2 additions & 0 deletions lib/io.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@

export 'src/exit_codes.dart' show ExitCode;
export 'src/permissions.dart' show isExecutable;
export 'src/process_manager.dart' show ProcessManager, Spawn;
export 'src/shared_stdin.dart' show SharedStdIn, sharedStdIn;
162 changes: 162 additions & 0 deletions lib/src/process_manager.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:io' as io;

import 'package:meta/meta.dart';

import 'shared_stdin.dart';

/// A high-level abstraction around using and managing processes on the system.
abstract class ProcessManager {
/// Terminates the global `stdin` listener, making future listens impossible.
///
/// This method should be invoked only at the _end_ of a program's execution.
static Future<Null> terminateStdIn() async {
await sharedStdIn.terminate();
}

/// Create a new instance of [ProcessManager] for the current platform.
///
/// May manually specify whether the current platform [isWindows], otherwise
/// this is derived from the Dart runtime (i.e. [io.Platform.isWindows]).
factory ProcessManager({
Stream<List<int>> stdin,
io.IOSink stdout,
io.IOSink stderr,
bool isWindows,
}) {
stdin ??= sharedStdIn;
stdout ??= io.stdout;
stderr ??= io.stderr;
isWindows ??= io.Platform.isWindows;
if (isWindows) {
return new _WindowsProcessManager(stdin, stdout, stderr);
}
return new _UnixProcessManager(stdin, stdout, stderr);
}

final Stream<List<int>> _stdin;
final io.IOSink _stdout;
final io.IOSink _stderr;

const ProcessManager._(this._stdin, this._stdout, this._stderr);

/// Spawns a process by invoking [executable] with [arguments].
///
/// This is _similar_ to [io.Process.start], but all standard input and output
/// is forwarded/routed between the process and the host, similar to how a
/// shell script works.
///
/// Returns a future that completes with a handle to the spawned process.
Future<io.Process> spawn(
String executable, {
Iterable<String> arguments: const [],
}) async {
final process = io.Process.start(executable, arguments.toList());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do any error handling here? Could we run into a state where io.Process.start executes but 'await process' never resolves?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we wouldn't run into any problems Process.start doesn't already have.

return new _ForwardingSpawn(await process, _stdin, _stdout, _stderr);
}
}

/// A process instance created and managed through [ProcessManager].
///
/// Unlike one created directly by [io.Process.start] or [io.Process.run], a
/// spawned process works more like executing a command in a shell script.
class Spawn implements io.Process {
final io.Process _delegate;

Spawn._(this._delegate) {
_delegate.exitCode.then((_) => _onClosed());
}

@mustCallSuper
@visibleForOverriding
void _onClosed() {}

@override
bool kill([io.ProcessSignal signal = io.ProcessSignal.SIGTERM]) =>
_delegate.kill(signal);

@override
Future<int> get exitCode => _delegate.exitCode;

@override
int get pid => _delegate.pid;

@override
Stream<List<int>> get stderr => _delegate.stderr;

@override
io.IOSink get stdin => _delegate.stdin;

@override
Stream<List<int>> get stdout => _delegate.stdout;
}

/// Forwards `stdin`/`stdout`/`stderr` to/from the host.
class _ForwardingSpawn extends Spawn {
final StreamSubscription _stdInSub;
final StreamSubscription _stdOutSub;
final StreamSubscription _stdErrSub;

factory _ForwardingSpawn(
io.Process delegate,
Stream<List<int>> stdin,
io.IOSink stdout,
io.IOSink stderr,
) {
final stdInSub = stdin.listen(delegate.stdin.add);
final stdOutSub = delegate.stdout.listen(stdout.add);
final stdErrSub = delegate.stderr.listen(stderr.add);
return new _ForwardingSpawn._delegate(
delegate,
stdInSub,
stdOutSub,
stdErrSub,
);
}

_ForwardingSpawn._delegate(
io.Process delegate,
this._stdInSub,
this._stdOutSub,
this._stdErrSub,
)
: super._(delegate);

@override
void _onClosed() {
_stdInSub.cancel();
_stdOutSub.cancel();
_stdErrSub.cancel();
super._onClosed();
}
}

class _UnixProcessManager extends ProcessManager {
const _UnixProcessManager(
Stream<List<int>> stdin,
io.IOSink stdout,
io.IOSink stderr,
)
: super._(
stdin,
stdout,
stderr,
);
}

class _WindowsProcessManager extends ProcessManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the idea that this would do something different at some point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!

const _WindowsProcessManager(
Stream<List<int>> stdin,
io.IOSink stdout,
io.IOSink stderr,
)
: super._(
stdin,
stdout,
stderr,
);
}
81 changes: 81 additions & 0 deletions lib/src/shared_stdin.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:io';

import 'package:meta/meta.dart';

/// A shared singleton instance of `dart:io`'s [stdin] stream.
///
/// _Unlike_ the normal [stdin] stream, [sharedStdIn] may switch subscribers
/// as long as the previous subscriber cancels before the new subscriber starts
/// listening.
///
/// [SharedStdIn.terminate] *must* be invoked in order to close the underlying
/// connection to [stdin], allowing your program to close automatically without
/// hanging.
final SharedStdIn sharedStdIn = new SharedStdIn(stdin);

/// A singleton wrapper around `stdin` that allows new subscribers.
///
/// This class is visible in order to be used as a test harness for mock
/// implementations of `stdin`. In normal programs, [sharedStdIn] should be
/// used directly.
@visibleForTesting
class SharedStdIn extends Stream<List<int>> {
StreamController<List<int>> _current;
StreamSubscription<List<int>> _sub;

SharedStdIn([Stream<List<int>> stream]) {
_sub = (stream ??= stdin).listen(_onInput);
}

void _onInput(List<int> event) => _getCurrent().add(event);

StreamController<List<int>> _getCurrent() {
if (_current == null) {
_current = new StreamController<List<int>>(
onCancel: () {
_current = null;
},
sync: true);
}
return _current;
}

@override
StreamSubscription<List<int>> listen(
void onData(List<int> event), {
Function onError,
void onDone(),
bool cancelOnError,
}) {
if (_sub == null) {
throw new StateError('Stdin has already been terminated.');
}
final controller = _getCurrent();
if (controller.hasListener) {
throw new StateError(''
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the empty string? I assume for code readability but that's a pattern I haven't come across before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Natalie is actually fixing this in dartfmt, but it is the only way to force the next lines to line up. I'm OK with removing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other way to force a newline is if the string is just over 80 chars when at this indent, and under 80 chars if dartfmt is forced to drop it down to the next line.

'Subscriber already listening. The existing subscriber must cancel '
'before another may be added.');
}
return controller.stream.listen(
onData,
onDone: onDone,
onError: onError,
cancelOnError: cancelOnError,
);
}

/// Terminates the connection to `stdin`, closing all subscription.
Future<Null> terminate() async {
if (_sub == null) {
throw new StateError('Stdin has already been terminated.');
}
await _sub.cancel();
await _current?.close();
_sub = null;
}
}
1 change: 1 addition & 0 deletions pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ environment:
sdk: ">=1.22.0 <2.0.0"

dev_dependencies:
path: ^1.0.0
test: ^0.12.0
7 changes: 7 additions & 0 deletions test/_files/stderr_hello.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:io';

void main() => stderr.write('Hello');
7 changes: 7 additions & 0 deletions test/_files/stdin_echo.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:io';

void main() => stdout.writeln('You said: ${stdin.readLineSync()}');
7 changes: 7 additions & 0 deletions test/_files/stdout_hello.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:io';

void main() => stdout.write('Hello');
1 change: 1 addition & 0 deletions test/permissions_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

@TestOn('vm')
import 'package:io/io.dart';
import 'package:test/test.dart';

Expand Down
Loading