Skip to content
This repository was archived by the owner on Jan 15, 2026. It is now read-only.

Commit 8b92f55

Browse files
douglas-raillard-armmarcbonnici
authored andcommitted
connection: Add BackgroundCommand.communicate()
Add a communicate() method in the style of Popen.communicate(). Unlike Popen.communicate, it will raise a CalledProcessError if the command exit with non-zero code.
1 parent ad5a97a commit 8b92f55

1 file changed

Lines changed: 121 additions & 0 deletions

File tree

devlib/connection.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import threading
2828
import time
2929
import logging
30+
import select
31+
import fcntl
3032

3133
from devlib.utils.misc import InitCheckpoint
3234

@@ -36,6 +38,24 @@
3638
def _kill_pgid_cmd(pgid, sig, busybox):
3739
return '{} kill -{} -{}'.format(busybox, sig.value, pgid)
3840

41+
def _popen_communicate(bg, popen, input, timeout):
42+
try:
43+
stdout, stderr = popen.communicate(input=input, timeout=timeout)
44+
except subprocess.TimeoutExpired:
45+
bg.cancel()
46+
raise
47+
48+
ret = popen.returncode
49+
if ret:
50+
raise subprocess.CalledProcessError(
51+
ret,
52+
popen.args,
53+
stdout,
54+
stderr,
55+
)
56+
else:
57+
return (stdout, stderr)
58+
3959

4060
class ConnectionBase(InitCheckpoint):
4161
"""
@@ -126,6 +146,21 @@ def wait(self):
126146
Block until the background command completes, and return its exit code.
127147
"""
128148

149+
def communicate(self, input=b'', timeout=None):
150+
"""
151+
Block until the background command completes while reading stdout and stderr.
152+
Return ``tuple(stdout, stderr)``. If the return code is non-zero,
153+
raises a :exc:`subprocess.CalledProcessError` exception.
154+
"""
155+
try:
156+
return self._communicate(input=input, timeout=timeout)
157+
finally:
158+
self.close()
159+
160+
@abstractmethod
161+
def _communicate(self, input, timeout):
162+
pass
163+
129164
@abstractmethod
130165
def poll(self):
131166
"""
@@ -214,6 +249,9 @@ def pid(self):
214249
def wait(self):
215250
return self.popen.wait()
216251

252+
def _communicate(self, input, timeout):
253+
return _popen_communicate(self, self.popen, input, timeout)
254+
217255
def poll(self):
218256
return self.popen.poll()
219257

@@ -273,6 +311,85 @@ def wait(self):
273311
self.redirect_thread.join()
274312
return status
275313

314+
def _communicate(self, input, timeout):
315+
stdout = self._stdout
316+
stderr = self._stderr
317+
stdin = self._stdin
318+
chan = self.chan
319+
320+
# For some reason, file descriptors in the read-list of select() can
321+
# still end up blocking in .read(), so make the non-blocking to avoid a
322+
# deadlock. Since _communicate() will consume all input and all output
323+
# until the command dies, we can do whatever we want with the pipe
324+
# without affecting external users.
325+
for s in (stdout, stderr):
326+
fcntl.fcntl(s.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
327+
328+
out = {stdout: [], stderr: []}
329+
ret = None
330+
can_send = True
331+
332+
select_timeout = 1
333+
if timeout is not None:
334+
select_timeout = min(select_timeout, 1)
335+
336+
def create_out():
337+
return (
338+
b''.join(out[stdout]),
339+
b''.join(out[stderr])
340+
)
341+
342+
start = monotonic()
343+
344+
while ret is None:
345+
# Even if ret is not None anymore, we need to drain the streams
346+
ret = self.poll()
347+
348+
if timeout is not None and ret is None and monotonic() - start >= timeout:
349+
self.cancel()
350+
_stdout, _stderr = create_out()
351+
raise subprocess.TimeoutExpired(self.cmd, timeout, _stdout, _stderr)
352+
353+
can_send &= (not chan.closed) & bool(input)
354+
wlist = [chan] if can_send else []
355+
356+
if can_send and chan.send_ready():
357+
try:
358+
n = chan.send(input)
359+
# stdin might have been closed already
360+
except OSError:
361+
can_send = False
362+
chan.shutdown_write()
363+
else:
364+
input = input[n:]
365+
if not input:
366+
# Send EOF on stdin
367+
chan.shutdown_write()
368+
369+
rs, ws, _ = select.select(
370+
[x for x in (stdout, stderr) if not x.closed],
371+
wlist,
372+
[],
373+
select_timeout,
374+
)
375+
376+
for r in rs:
377+
chunk = r.read()
378+
if chunk:
379+
out[r].append(chunk)
380+
381+
_stdout, _stderr = create_out()
382+
383+
if ret:
384+
raise subprocess.CalledProcessError(
385+
ret,
386+
self.cmd,
387+
_stdout,
388+
_stderr,
389+
)
390+
else:
391+
return (_stdout, _stderr)
392+
276393
def poll(self):
277394
# Wait for the redirection thread to finish, otherwise we would
278395
# indicate the caller that the command is finished and that the streams
@@ -356,6 +473,10 @@ def pid(self):
356473
def wait(self):
357474
return self.adb_popen.wait()
358475

476+
def _communicate(self, input, timeout):
477+
return _popen_communicate(self, self.adb_popen, input, timeout)
478+
479+
359480
def poll(self):
360481
return self.adb_popen.poll()
361482

0 commit comments

Comments
 (0)