Skip to content

Commit 75c760c

Browse files
committed
Add a RunnerSuite.channel() call that can connect to a test worker
Partially addresses flutter#711
1 parent 093a59f commit 75c760c

6 files changed

Lines changed: 197 additions & 62 deletions

File tree

lib/src/runner/load_suite.dart

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import 'dart:async';
66

77
import 'package:stack_trace/stack_trace.dart';
8+
import 'package:stream_channel/stream_channel.dart';
89

910
import '../../test.dart';
1011
import '../backend/group.dart';
@@ -193,5 +194,8 @@ class LoadSuite extends Suite implements RunnerSuite {
193194
return new LoadSuite._filtered(this, filtered);
194195
}
195196

197+
StreamChannel channel(String name) =>
198+
throw new UnsupportedError("LoadSuite.channel() is not supported.");
199+
196200
Future close() async {}
197201
}

lib/src/runner/plugin/platform_helpers.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ Future<RunnerSuiteController> deserializeSuite(
4848
var suiteChannel = new MultiChannel(channel.transform(disconnector));
4949

5050
suiteChannel.sink.add({
51+
'type': 'initial',
5152
'platform': platform.serialize(),
5253
'metadata': suiteConfig.metadata.serialize(),
5354
'os': (platform == TestPlatform.vm || platform == TestPlatform.nodeJS)
@@ -112,7 +113,7 @@ Future<RunnerSuiteController> deserializeSuite(
112113
});
113114

114115
return new RunnerSuiteController(
115-
environment, suiteConfig, await completer.future,
116+
environment, suiteConfig, suiteChannel, await completer.future,
116117
path: path,
117118
platform: platform,
118119
os: currentOS,

lib/src/runner/plugin/remote_platform_helpers.dart

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import 'package:stream_channel/stream_channel.dart';
66

77
import '../remote_listener.dart';
8+
import '../suite_channel_manager.dart';
89

910
/// Returns a channel that will emit a serialized representation of the tests
1011
/// defined in [getMain].
@@ -23,3 +24,20 @@ import '../remote_listener.dart';
2324
/// where they'll be visible in the development console.
2425
StreamChannel serializeSuite(Function getMain(), {bool hidePrints: true}) =>
2526
RemoteListener.start(getMain, hidePrints: hidePrints);
27+
28+
/// Returns a channel that communicates with a plugin in the test runner.
29+
///
30+
/// This connects to a channel created by code in the test runner calling
31+
/// `RunnerSuite.channel()` with the same name. It can be used used to send and
32+
/// receive any JSON-serializable object.
33+
///
34+
/// Throws a [StateError] if [name] has already been used for a channel.
35+
StreamChannel suiteChannel(String name) {
36+
var manager = SuiteChannelManager.current;
37+
if (manager == null) {
38+
throw new StateError(
39+
'suiteChannel() may only be called within a test worker.');
40+
}
41+
42+
return manager.connectOut(name);
43+
}

lib/src/runner/remote_listener.dart

Lines changed: 73 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import 'dart:async';
66

7+
import 'package:async/async.dart';
78
import 'package:stream_channel/stream_channel.dart';
89
import 'package:term_glyph/term_glyph.dart' as glyph;
910

@@ -20,6 +21,7 @@ import '../frontend/test_chain.dart';
2021
import '../util/remote_exception.dart';
2122
import '../util/stack_trace_mapper.dart';
2223
import '../utils.dart';
24+
import 'suite_channel_manager.dart';
2325

2426
class RemoteListener {
2527
/// The test suite to run.
@@ -52,66 +54,79 @@ class RemoteListener {
5254
var verboseChain = true;
5355

5456
var printZone = hidePrints ? null : Zone.current;
55-
runZoned(() async {
56-
var main;
57-
try {
58-
main = getMain();
59-
} on NoSuchMethodError catch (_) {
60-
_sendLoadException(channel, "No top-level main() function defined.");
61-
return;
62-
} catch (error, stackTrace) {
63-
_sendError(channel, error, stackTrace, verboseChain);
64-
return;
65-
}
66-
67-
if (main is! Function) {
68-
_sendLoadException(channel, "Top-level main getter is not a function.");
69-
return;
70-
} else if (main is! AsyncFunction) {
71-
_sendLoadException(
72-
channel, "Top-level main() function takes arguments.");
73-
return;
74-
}
75-
76-
var message = await channel.stream.first;
77-
78-
if (message['asciiGlyphs'] ?? false) glyph.ascii = true;
79-
var metadata = new Metadata.deserialize(message['metadata']);
80-
verboseChain = metadata.verboseTrace;
81-
var declarer = new Declarer(
82-
metadata: metadata,
83-
platformVariables: new Set.from(message['platformVariables']),
84-
collectTraces: message['collectTraces'],
85-
noRetry: message['noRetry']);
86-
87-
configureTestChaining(
88-
mapper: StackTraceMapper.deserialize(message['stackTraceMapper']),
89-
exceptPackages: _deserializeSet(message['foldTraceExcept']),
90-
onlyPackages: _deserializeSet(message['foldTraceOnly']));
91-
92-
await declarer.declare(main);
93-
94-
var suite = new Suite(declarer.build(),
95-
platform: new TestPlatform.deserialize(message['platform']),
96-
os: message['os'] == null
97-
? null
98-
: OperatingSystem.find(message['os']),
99-
path: message['path']);
100-
101-
runZoned(() {
102-
Invoker.guard(
103-
() => new RemoteListener._(suite, printZone)._listen(channel));
104-
},
105-
// Make the declarer visible to running tests so that they'll throw
106-
// useful errors when calling `test()` and `group()` within a test,
107-
// and so they can add to the declarer's `tearDownAll()` list.
108-
zoneValues: {#test.declarer: declarer});
109-
}, onError: (error, stackTrace) {
110-
_sendError(channel, error, stackTrace, verboseChain);
111-
}, zoneSpecification: new ZoneSpecification(print: (_, __, ___, line) {
57+
var spec = new ZoneSpecification(print: (_, __, ___, line) {
11258
if (printZone != null) printZone.print(line);
11359
channel.sink.add({"type": "print", "line": line});
114-
}));
60+
});
61+
62+
runZoned(() {
63+
new SuiteChannelManager().asCurrent(() async {
64+
var main;
65+
try {
66+
main = getMain();
67+
} on NoSuchMethodError catch (_) {
68+
_sendLoadException(channel, "No top-level main() function defined.");
69+
return;
70+
} catch (error, stackTrace) {
71+
_sendError(channel, error, stackTrace, verboseChain);
72+
return;
73+
}
74+
75+
if (main is! Function) {
76+
_sendLoadException(
77+
channel, "Top-level main getter is not a function.");
78+
return;
79+
} else if (main is! AsyncFunction) {
80+
_sendLoadException(
81+
channel, "Top-level main() function takes arguments.");
82+
return;
83+
}
84+
85+
var queue = new StreamQueue(channel.stream);
86+
var message = await queue.next;
87+
assert(message['type'] == 'initial');
88+
89+
queue.rest.listen((message) {
90+
assert(message["type"] == "suiteChannel");
91+
SuiteChannelManager.current.connectIn(
92+
message['name'], channel.virtualChannel(message['id']));
93+
});
94+
95+
if (message['asciiGlyphs'] ?? false) glyph.ascii = true;
96+
var metadata = new Metadata.deserialize(message['metadata']);
97+
verboseChain = metadata.verboseTrace;
98+
var declarer = new Declarer(
99+
metadata: metadata,
100+
platformVariables: new Set.from(message['platformVariables']),
101+
collectTraces: message['collectTraces'],
102+
noRetry: message['noRetry']);
103+
104+
configureTestChaining(
105+
mapper: StackTraceMapper.deserialize(message['stackTraceMapper']),
106+
exceptPackages: _deserializeSet(message['foldTraceExcept']),
107+
onlyPackages: _deserializeSet(message['foldTraceOnly']));
108+
109+
await declarer.declare(main);
110+
111+
var suite = new Suite(declarer.build(),
112+
platform: new TestPlatform.deserialize(message['platform']),
113+
os: message['os'] == null
114+
? null
115+
: OperatingSystem.find(message['os']),
116+
path: message['path']);
117+
118+
runZoned(() {
119+
Invoker.guard(
120+
() => new RemoteListener._(suite, printZone)._listen(channel));
121+
},
122+
// Make the declarer visible to running tests so that they'll throw
123+
// useful errors when calling `test()` and `group()` within a test,
124+
// and so they can add to the declarer's `tearDownAll()` list.
125+
zoneValues: {#test.declarer: declarer});
126+
});
127+
}, onError: (error, stackTrace) {
128+
_sendError(channel, error, stackTrace, verboseChain);
129+
}, zoneSpecification: spec);
115130

116131
return controller.foreign;
117132
}

lib/src/runner/runner_suite.dart

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import 'dart:async';
66

77
import 'package:async/async.dart';
8+
import 'package:stream_channel/stream_channel.dart';
89

910
import '../backend/group.dart';
1011
import '../backend/operating_system.dart';
@@ -45,15 +46,22 @@ class RunnerSuite extends Suite {
4546
/// The event is `true` when debugging starts and `false` when it ends.
4647
Stream<bool> get onDebugging => _controller._onDebuggingController.stream;
4748

49+
/// Returns a channel that communicates with the remote suite.
50+
///
51+
/// This connects to a channel created by code in the test worker calling
52+
/// `suiteChannel()` from `remote_platform_helpers.dart` with the same name.
53+
/// It can be used used to send and receive any JSON-serializable object.
54+
StreamChannel channel(String name) => _controller.channel(name);
55+
4856
/// A shortcut constructor for creating a [RunnerSuite] that never goes into
49-
/// debugging mode.
57+
/// debugging mode and doesn't support suite channels.
5058
factory RunnerSuite(
5159
Environment environment, SuiteConfiguration config, Group group,
5260
{String path,
5361
TestPlatform platform,
5462
OperatingSystem os,
5563
AsyncFunction onClose}) {
56-
var controller = new RunnerSuiteController(environment, config, group,
64+
var controller = new RunnerSuiteController(environment, config, null, group,
5765
path: path, platform: platform, os: os, onClose: onClose);
5866
return controller.suite;
5967
}
@@ -84,6 +92,9 @@ class RunnerSuiteController {
8492
/// The configuration for this suite.
8593
final SuiteConfiguration _config;
8694

95+
/// A channel that communicates with the remote suite.
96+
final MultiChannel _suiteChannel;
97+
8798
/// The function to call when the suite is closed.
8899
final AsyncFunction _onClose;
89100

@@ -93,7 +104,11 @@ class RunnerSuiteController {
93104
/// The controller for [suite.onDebugging].
94105
final _onDebuggingController = new StreamController<bool>.broadcast();
95106

96-
RunnerSuiteController(this._environment, this._config, Group group,
107+
/// The channel names that have already been used.
108+
final _channelNames = new Set<String>();
109+
110+
RunnerSuiteController(
111+
this._environment, this._config, this._suiteChannel, Group group,
97112
{String path,
98113
TestPlatform platform,
99114
OperatingSystem os,
@@ -112,6 +127,27 @@ class RunnerSuiteController {
112127
_onDebuggingController.add(debugging);
113128
}
114129

130+
/// Returns a channel that communicates with the remote suite.
131+
///
132+
/// This connects to a channel created by code in the test worker calling
133+
/// `suiteChannel()` from `remote_platform_helpers.dart` with the same name.
134+
/// It can be used used to send and receive any JSON-serializable object.
135+
///
136+
/// This is exposed on the [RunnerSuiteController] so that runner plugins can
137+
/// communicate with the workers they spawn before the associated [suite] is
138+
/// fully loaded.
139+
StreamChannel channel(String name) {
140+
if (!_channelNames.add(name)) {
141+
throw new StateError(
142+
'Duplicate RunnerSuite.channel() connection "$name".');
143+
}
144+
145+
var channel = _suiteChannel.virtualChannel();
146+
_suiteChannel.sink
147+
.add({"type": "suiteChannel", "name": name, "id": channel.id});
148+
return channel;
149+
}
150+
115151
/// The backing function for [suite.close].
116152
Future _close() => _closeMemo.runOnce(() async {
117153
_onDebuggingController.close();
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
7+
import 'package:stream_channel/stream_channel.dart';
8+
9+
/// The key used to look up [SuiteChannelManager.current] in a zone.
10+
final _currentKey = new Object();
11+
12+
/// A class that connects incoming and outgoing channels with the same names.
13+
class SuiteChannelManager {
14+
/// Connections from the test runner that have yet to connect to corresponding
15+
/// calls to [suiteChannel] within this worker.
16+
final _incomingConnections = <String, StreamChannel>{};
17+
18+
/// Connections from calls to [suiteChannel] that have yet to connect to
19+
/// corresponding connections from the test runner.
20+
final _outgoingConnections = <String, StreamChannelCompleter>{};
21+
22+
/// The channel names that have already been used.
23+
final _names = new Set<String>();
24+
25+
/// Returns the current manager, or `null` if this isn't called within a call
26+
/// to [asCurrent].
27+
static SuiteChannelManager get current =>
28+
Zone.current[_currentKey] as SuiteChannelManager;
29+
30+
/// Runs [body] with [this] as [SuiteChannelManager.current].
31+
///
32+
/// This is zone-scoped, so [this] will be the current configuration in any
33+
/// asynchronous callbacks transitively created by [body].
34+
T asCurrent<T>(T body()) => runZoned(body, zoneValues: {_currentKey: this});
35+
36+
/// Creates a connection to the test runnner's channel with the given [name].
37+
StreamChannel connectOut(String name) {
38+
if (_incomingConnections.containsKey(name)) {
39+
return _incomingConnections[name];
40+
} else if (_names.contains(name)) {
41+
throw new StateError('Duplicate suiteChannel() connection "$name".');
42+
} else {
43+
_names.add(name);
44+
var completer = new StreamChannelCompleter();
45+
_outgoingConnections[name] = completer;
46+
return completer.channel;
47+
}
48+
}
49+
50+
/// Connects [channel] to this worker's channel with the given [name].
51+
void connectIn(String name, StreamChannel channel) {
52+
if (_outgoingConnections.containsKey(name)) {
53+
_outgoingConnections.remove(name).setChannel(channel);
54+
} else if (_incomingConnections.containsKey(name)) {
55+
throw new StateError(
56+
'Duplicate RunnerSuite.channel() connection "$name".');
57+
} else {
58+
_incomingConnections[name] = channel;
59+
}
60+
}
61+
}

0 commit comments

Comments
 (0)