Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 71 additions & 15 deletions click/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ def make_input_stream(input, charset):
class Result(object):
"""Holds the captured result of an invoked CLI script."""

def __init__(self, runner, output_bytes, exit_code, exception,
def __init__(self, runner, output_wrapper, exit_code, exception,
exc_info=None):
#: The runner that created the result
self.runner = runner
#: The output as bytes.
self.output_bytes = output_bytes
#: The wrapped byte output streams.
self.output_wrapper = output_wrapper
#: The exit code as integer.
self.exit_code = exit_code
#: The exception that happend if one did.
Expand All @@ -88,9 +88,21 @@ def __init__(self, runner, output_bytes, exit_code, exception,

@property
def output(self):
"""The output as unicode string."""
return self.output_bytes.decode(self.runner.charset, 'replace') \
.replace('\r\n', '\n')
"""The combined output as unicode string."""
return self.output_wrapper.getvalue().decode(
self.runner.charset, 'replace').replace('\r\n', '\n')

@property
def stdout(self):
"""The output on stdout as unicode string."""
return self.output_wrapper.streams[0].getvalue().decode(
self.runner.charset, 'replace').replace('\r\n', '\n')

@property
def stderr(self):
"""The output on stderr as unicode string."""
return self.output_wrapper.streams[1].getvalue().decode(
self.runner.charset, 'replace').replace('\r\n', '\n')

def __repr__(self):
return '<Result %s>' % (
Expand Down Expand Up @@ -163,17 +175,62 @@ def isolation(self, input=None, env=None, color=False):

env = self.make_env(env)

class MultipleTextIOWrapper(object):
def __init__(self):
self.value = b''

def write(self, b):
self.value += b

def getvalue(self):
return self.value

# XXX: test
# def flush(self):
# for stream in self.streams:
# stream.flush()

if PY2:
sys.stdout = sys.stderr = bytes_output = StringIO()
class WrappedStringIO(object):
def __init__(self, wrapper):
self.wrapper = wrapper
self.stringIO = StringIO()

def write(self, b):
self.wrapper.write(b)
self.stringIO.write(b)

def flush(self):
# self.stringIO.flush()
pass

output_wrapper = MultipleTextIOWrapper()
stdout = WrappedStringIO(output_wrapper)
stderr = WrappedStringIO(output_wrapper)
output_wrapper.streams = [stdout, stderr]
sys.stdout, sys.stderr = stdout, stderr

if self.echo_stdin:
input = EchoingStdin(input, bytes_output)
input = EchoingStdin(input, stdout)
else:
bytes_output = io.BytesIO()
class WrappedBytesIO(io.BytesIO):
def __init__(self, wrapper):
self.wrapper = wrapper

def write(self, b):
self.wrapper.write(b)
super(WrappedBytesIO, self).write(b)

output_wrapper = MultipleTextIOWrapper()
stdout = WrappedBytesIO(output_wrapper)
stderr = WrappedBytesIO(output_wrapper)
output_wrapper.streams = [stdout, stderr]
sys.stdout = io.TextIOWrapper(stdout, encoding=self.charset)
sys.stderr = io.TextIOWrapper(stderr, encoding=self.charset)

if self.echo_stdin:
input = EchoingStdin(input, bytes_output)
input = EchoingStdin(input, output_wrapper.streams[0])
input = io.TextIOWrapper(input, encoding=self.charset)
sys.stdout = sys.stderr = io.TextIOWrapper(
bytes_output, encoding=self.charset)

sys.stdin = input

Expand Down Expand Up @@ -223,7 +280,7 @@ def should_strip_ansi(stream=None, color=None):
pass
else:
os.environ[key] = value
yield bytes_output
yield output_wrapper
finally:
for key, value in iteritems(old_env):
if value is None:
Expand Down Expand Up @@ -307,10 +364,9 @@ def invoke(self, cli, args=None, input=None, env=None,
exc_info = sys.exc_info()
finally:
sys.stdout.flush()
output = out.getvalue()

return Result(runner=self,
output_bytes=output,
output_wrapper=out,
exit_code=exit_code,
exception=exception,
exc_info=exc_info)
Expand Down
3 changes: 2 additions & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ def test_echo(runner):
@click.command()
def cli():
click.echo(b'\xf6')
click.echo(b'\xf7', err=True)
result = runner.invoke(cli, [])
assert result.output_bytes == b'\xf6\n'
assert result.output_wrapper.getvalue() == b'\xf6\n\xf7\n'

# Ensure we do not strip for bytes.
with runner.isolation() as out:
Expand Down