diff --git a/click/testing.py b/click/testing.py index ae36d10144..44ad07dec7 100644 --- a/click/testing.py +++ b/click/testing.py @@ -73,12 +73,12 @@ def make_input_stream(input, charset): class Result(object): """Holds the captured result of an invoked CLI script.""" - def __init__(self, runner, output_bytes, exit_code, exception, + def __init__(self, runner, output_wrapper, exit_code, exception, exc_info=None): #: The runner that created the result self.runner = runner - #: The output as bytes. - self.output_bytes = output_bytes + #: The wrapped byte output streams. + self.output_wrapper = output_wrapper #: The exit code as integer. self.exit_code = exit_code #: The exception that happend if one did. @@ -88,9 +88,21 @@ def __init__(self, runner, output_bytes, exit_code, exception, @property def output(self): - """The output as unicode string.""" - return self.output_bytes.decode(self.runner.charset, 'replace') \ - .replace('\r\n', '\n') + """The combined output as unicode string.""" + return self.output_wrapper.getvalue().decode( + self.runner.charset, 'replace').replace('\r\n', '\n') + + @property + def stdout(self): + """The output on stdout as unicode string.""" + return self.output_wrapper.streams[0].getvalue().decode( + self.runner.charset, 'replace').replace('\r\n', '\n') + + @property + def stderr(self): + """The output on stderr as unicode string.""" + return self.output_wrapper.streams[1].getvalue().decode( + self.runner.charset, 'replace').replace('\r\n', '\n') def __repr__(self): return '' % ( @@ -163,17 +175,62 @@ def isolation(self, input=None, env=None, color=False): env = self.make_env(env) + class MultipleTextIOWrapper(object): + def __init__(self): + self.value = b'' + + def write(self, b): + self.value += b + + def getvalue(self): + return self.value + + # XXX: test + # def flush(self): + # for stream in self.streams: + # stream.flush() + if PY2: - sys.stdout = sys.stderr = bytes_output = StringIO() + class WrappedStringIO(object): + def __init__(self, wrapper): + self.wrapper = wrapper + self.stringIO = StringIO() + + def write(self, b): + self.wrapper.write(b) + self.stringIO.write(b) + + def flush(self): + # self.stringIO.flush() + pass + + output_wrapper = MultipleTextIOWrapper() + stdout = WrappedStringIO(output_wrapper) + stderr = WrappedStringIO(output_wrapper) + output_wrapper.streams = [stdout, stderr] + sys.stdout, sys.stderr = stdout, stderr + if self.echo_stdin: - input = EchoingStdin(input, bytes_output) + input = EchoingStdin(input, stdout) else: - bytes_output = io.BytesIO() + class WrappedBytesIO(io.BytesIO): + def __init__(self, wrapper): + self.wrapper = wrapper + + def write(self, b): + self.wrapper.write(b) + super(WrappedBytesIO, self).write(b) + + output_wrapper = MultipleTextIOWrapper() + stdout = WrappedBytesIO(output_wrapper) + stderr = WrappedBytesIO(output_wrapper) + output_wrapper.streams = [stdout, stderr] + sys.stdout = io.TextIOWrapper(stdout, encoding=self.charset) + sys.stderr = io.TextIOWrapper(stderr, encoding=self.charset) + if self.echo_stdin: - input = EchoingStdin(input, bytes_output) + input = EchoingStdin(input, output_wrapper.streams[0]) input = io.TextIOWrapper(input, encoding=self.charset) - sys.stdout = sys.stderr = io.TextIOWrapper( - bytes_output, encoding=self.charset) sys.stdin = input @@ -223,7 +280,7 @@ def should_strip_ansi(stream=None, color=None): pass else: os.environ[key] = value - yield bytes_output + yield output_wrapper finally: for key, value in iteritems(old_env): if value is None: @@ -307,10 +364,9 @@ def invoke(self, cli, args=None, input=None, env=None, exc_info = sys.exc_info() finally: sys.stdout.flush() - output = out.getvalue() return Result(runner=self, - output_bytes=output, + output_wrapper=out, exit_code=exit_code, exception=exception, exc_info=exc_info) diff --git a/tests/test_utils.py b/tests/test_utils.py index 88923adbc2..a73804264e 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -34,8 +34,9 @@ def test_echo(runner): @click.command() def cli(): click.echo(b'\xf6') + click.echo(b'\xf7', err=True) result = runner.invoke(cli, []) - assert result.output_bytes == b'\xf6\n' + assert result.output_wrapper.getvalue() == b'\xf6\n\xf7\n' # Ensure we do not strip for bytes. with runner.isolation() as out: