Skip to content
This repository was archived by the owner on Jan 15, 2026. It is now read-only.

Commit a05d6c3

Browse files
connection: Rework TransferManager
* Split TransferManager and TransferHandle: * TransferManager deals with the generic monitoring. To abort a transfer, it simply cancels the transfer and raises an exception from manage(). * TransferHandle provides a way for the manager to query the state of the transfer and cancel it. It is backend-specific. * Remove most of the state in TransferManager, along with the associated background command leak etc * Use a daemonic monitor thread to behave as excpected on interpreter shutdown. * Ensure a transfer manager _always_ exists. When no management is desired, a noop object is used. This avoids using a None sentinel, which is invariably mishandled by some code leading to crashes. * Try to merge more paths in the code to uncover as many issues as possible in testing. * Fix percentage for SSHTransferHandle (transferred / (remaining + transferred) instead of transferred / remaining) * Rename total_timeout TransferManager parameter and attribute to total_transfer_timeout to match the connection name parameter.
1 parent 38d3026 commit a05d6c3

4 files changed

Lines changed: 206 additions & 178 deletions

File tree

devlib/connection.py

Lines changed: 125 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,11 @@
1414
#
1515

1616
from abc import ABC, abstractmethod
17-
from contextlib import contextmanager
17+
from contextlib import contextmanager, nullcontext
1818
from datetime import datetime
1919
from functools import partial
2020
from weakref import WeakSet
2121
from shlex import quote
22-
from time import monotonic
2322
import os
2423
import signal
2524
import socket
@@ -61,12 +60,26 @@ class ConnectionBase(InitCheckpoint):
6160
"""
6261
Base class for all connections.
6362
"""
64-
def __init__(self):
63+
def __init__(
64+
self,
65+
poll_transfers=False,
66+
start_transfer_poll_delay=30,
67+
total_transfer_timeout=3600,
68+
transfer_poll_period=30,
69+
):
6570
self._current_bg_cmds = WeakSet()
6671
self._closed = False
6772
self._close_lock = threading.Lock()
6873
self.busybox = None
6974

75+
self.transfer_mgr = TransferManager(
76+
self,
77+
start_transfer_poll_delay=start_transfer_poll_delay,
78+
total_transfer_timeout=total_transfer_timeout,
79+
transfer_poll_period=transfer_poll_period,
80+
) if poll_transfers else NoopTransferManager()
81+
82+
7083
def cancel_running_command(self):
7184
bg_cmds = set(self._current_bg_cmds)
7285
for bg_cmd in bg_cmds:
@@ -500,161 +513,147 @@ def __exit__(self, *args, **kwargs):
500513
self.adb_popen.__exit__(*args, **kwargs)
501514

502515

503-
class TransferManagerBase(ABC):
516+
class TransferManager:
517+
def __init__(self, conn, transfer_poll_period=30, start_transfer_poll_delay=30, total_transfer_timeout=3600):
518+
self.conn = conn
519+
self.transfer_poll_period = transfer_poll_period
520+
self.total_transfer_timeout = total_transfer_timeout
521+
self.start_transfer_poll_delay = start_transfer_poll_delay
504522

505-
def _pull_dest_size(self, dest):
506-
if os.path.isdir(dest):
507-
return sum(
508-
os.stat(os.path.join(dirpath, f)).st_size
509-
for dirpath, _, fnames in os.walk(dest)
510-
for f in fnames
511-
)
512-
else:
513-
return os.stat(dest).st_size
523+
self.logger = logging.getLogger('FileTransfer')
514524

515-
def _push_dest_size(self, dest):
516-
cmd = '{} du -s {}'.format(quote(self.conn.busybox), quote(dest))
517-
out = self.conn.execute(cmd)
525+
@contextmanager
526+
def manage(self, sources, dest, direction, handle):
527+
excep = None
528+
stop_thread = threading.Event()
529+
530+
def monitor():
531+
nonlocal excep
532+
533+
def cancel(reason):
534+
self.logger.warning(
535+
f'Cancelling file transfer {sources} -> {dest} due to: {reason}'
536+
)
537+
handle.cancel()
538+
539+
start_t = time.monotonic()
540+
stop_thread.wait(self.start_transfer_poll_delay)
541+
while not stop_thread.wait(self.transfer_poll_period):
542+
if not handle.isactive():
543+
cancel(reason='transfer inactive')
544+
elif monotonic() - start_t > self.total_transfer_timeout:
545+
cancel(reason='transfer timed out')
546+
excep = TimeoutError(f'{direction}: {sources} -> {dest}')
547+
548+
m_thread = threading.Thread(target=monitor, daemon=True)
518549
try:
519-
return int(out.split()[0])
520-
except ValueError:
521-
return 0
550+
m_thread.start()
551+
yield self
552+
finally:
553+
stop_thread.set()
554+
m_thread.join()
555+
if excep is not None:
556+
raise excep
522557

523-
def __init__(self, conn, poll_period, start_transfer_poll_delay, total_timeout):
524-
self.conn = conn
525-
self.poll_period = poll_period
526-
self.total_timeout = total_timeout
527-
self.start_transfer_poll_delay = start_transfer_poll_delay
528558

529-
self.logger = logging.getLogger('FileTransfer')
530-
self.managing = threading.Event()
531-
self.transfer_started = threading.Event()
532-
self.transfer_completed = threading.Event()
533-
self.transfer_aborted = threading.Event()
559+
class NoopTransferManager:
560+
def manage(self, *args, **kwargs):
561+
return nullcontext(self)
534562

535-
self.monitor_thread = None
536-
self.sources = None
537-
self.dest = None
538-
self.direction = None
563+
564+
class TransferHandleBase(ABC):
565+
def __init__(self, manager):
566+
self.manager = manager
539567

540568
@abstractmethod
541-
def _cancel(self):
569+
def isactive(self):
542570
pass
543571

544-
def cancel(self, reason=None):
545-
msg = 'Cancelling file transfer {} -> {}'.format(self.sources, self.dest)
546-
if reason is not None:
547-
msg += ' due to \'{}\''.format(reason)
548-
self.logger.warning(msg)
549-
self.transfer_aborted.set()
550-
self._cancel()
551-
552572
@abstractmethod
553-
def isactive(self):
573+
def cancel(self):
554574
pass
555575

556-
@contextmanager
557-
def manage(self, sources, dest, direction):
558-
try:
559-
self.sources, self.dest, self.direction = sources, dest, direction
560-
m_thread = threading.Thread(target=self._monitor)
561576

562-
self.transfer_completed.clear()
563-
self.transfer_aborted.clear()
564-
self.transfer_started.set()
577+
class PopenTransferHandle(TransferHandleBase):
578+
def __init__(self, bg_cmd, dest, direction, *args, **kwargs):
579+
super().__init__(*args, **kwargs)
565580

566-
m_thread.start()
567-
yield self
568-
except BaseException:
569-
self.cancel(reason='exception during transfer')
570-
raise
571-
finally:
572-
self.transfer_completed.set()
573-
self.transfer_started.set()
574-
m_thread.join()
575-
self.transfer_started.clear()
576-
self.transfer_completed.clear()
577-
self.transfer_aborted.clear()
581+
if direction == 'push':
582+
sample_size = self._push_dest_size
583+
elif direction == 'pull':
584+
sample_size = self._pull_dest_size
585+
else:
586+
raise ValueError(f'Unknown direction: {direction}')
578587

579-
def _monitor(self):
580-
start_t = monotonic()
581-
self.transfer_completed.wait(self.start_transfer_poll_delay)
582-
while not self.transfer_completed.wait(self.poll_period):
583-
if not self.isactive():
584-
self.cancel(reason='transfer inactive')
585-
elif monotonic() - start_t > self.total_timeout:
586-
self.cancel(reason='transfer timed out')
588+
self.sample_size = lambda: sample_size(dest)
587589

590+
self.bg_cmd = bg_cmd
591+
self.last_sample = 0
588592

589-
class PopenTransferManager(TransferManagerBase):
593+
@staticmethod
594+
def _pull_dest_size(dest):
595+
if os.path.isdir(dest):
596+
return sum(
597+
os.stat(os.path.join(dirpath, f)).st_size
598+
for dirpath, _, fnames in os.walk(dest)
599+
for f in fnames
600+
)
601+
else:
602+
return os.stat(dest).st_size
590603

591-
def __init__(self, conn, poll_period=30, start_transfer_poll_delay=30, total_timeout=3600):
592-
super().__init__(conn, poll_period, start_transfer_poll_delay, total_timeout)
593-
self.transfer = None
594-
self.last_sample = None
604+
def _push_dest_size(self, dest):
605+
conn = self.manager.conn
606+
cmd = '{} du -s -- {}'.format(quote(conn.busybox), quote(dest))
607+
out = conn.execute(cmd)
608+
return int(out.split()[0])
595609

596-
def _cancel(self):
597-
if self.transfer:
598-
self.transfer.cancel()
599-
self.transfer = None
600-
self.last_sample = None
610+
def cancel(self):
611+
self.bg_cmd.cancel()
601612

602613
def isactive(self):
603-
size_fn = self._push_dest_size if self.direction == 'push' else self._pull_dest_size
604-
curr_size = size_fn(self.dest)
605-
self.logger.debug('Polled file transfer, destination size {}'.format(curr_size))
606-
active = True if self.last_sample is None else curr_size > self.last_sample
607-
self.last_sample = curr_size
608-
return active
614+
try:
615+
curr_size = self.sample_size()
616+
except Exception as e:
617+
self.logger.debug(f'File size polling failed: {e}')
618+
return True
619+
else:
620+
self.logger.debug(f'Polled file transfer, destination size: {curr_size}')
621+
if curr_size:
622+
active = curr_size > self.last_sample
623+
self.last_sample = curr_size
624+
return active
625+
# If the file is empty it will never grow in size, so we assume
626+
# everything is going well.
627+
else:
628+
return True
609629

610-
def set_transfer_and_wait(self, popen_bg_cmd):
611-
self.transfer = popen_bg_cmd
612-
self.last_sample = None
613-
ret = self.transfer.wait()
614630

615-
if ret and not self.transfer_aborted.is_set():
616-
raise subprocess.CalledProcessError(ret, self.transfer.popen.args)
617-
elif self.transfer_aborted.is_set():
618-
raise TimeoutError(self.transfer.popen.args)
631+
class SSHTransferHandle(TransferHandleBase):
619632

633+
def __init__(self, handle, *args, **kwargs):
634+
super().__init__(*args, **kwargs)
620635

621-
class SSHTransferManager(TransferManagerBase):
636+
# SFTPClient or SSHClient
637+
self.handle = handle
622638

623-
def __init__(self, conn, poll_period=30, start_transfer_poll_delay=30, total_timeout=3600):
624-
super().__init__(conn, poll_period, start_transfer_poll_delay, total_timeout)
625-
self.transferer = None
626639
self.progressed = False
627-
self.transferred = None
628-
self.to_transfer = None
640+
self.transferred = 0
641+
self.to_transfer = 0
629642

630-
def _cancel(self):
631-
self.transferer.close()
643+
def cancel(self):
644+
self.handle.close()
632645

633646
def isactive(self):
634647
progressed = self.progressed
635-
self.progressed = False
636-
msg = 'Polled transfer: {}% [{}B/{}B]'
637-
pc = format((self.transferred / self.to_transfer) * 100, '.2f')
638-
self.logger.debug(msg.format(pc, self.transferred, self.to_transfer))
648+
if progressed:
649+
self.progressed = False
650+
pc = (self.transferred / (self.transferred + self.to_transfer)) * 100
651+
self.logger.debug(
652+
f'Polled transfer: {pc:.2f}% [{self.transferred}B/{self.to_transfer}B]'
653+
)
639654
return progressed
640655

641-
@contextmanager
642-
def manage(self, sources, dest, direction, transferer):
643-
with super().manage(sources, dest, direction):
644-
try:
645-
self.progressed = False
646-
self.transferer = transferer # SFTPClient or SCPClient
647-
yield self
648-
except socket.error as e:
649-
if self.transfer_aborted.is_set():
650-
self.transfer_aborted.clear()
651-
method = 'SCP' if self.conn.use_scp else 'SFTP'
652-
raise TimeoutError('{} {}: {} -> {}'.format(method, self.direction, sources, self.dest))
653-
else:
654-
raise e
655-
656656
def progress_cb(self, to_transfer, transferred):
657-
if self.transfer_started.is_set():
658-
self.progressed = True
659-
self.transferred = transferred
660-
self.to_transfer = to_transfer
657+
self.progressed = True
658+
self.transferred = transferred
659+
self.to_transfer = to_transfer

devlib/target.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2940,7 +2940,7 @@ def __init__(self,
29402940
ssh_conn_params = ['host', 'username', 'password', 'keyfile',
29412941
'port', 'timeout', 'sudo_cmd',
29422942
'strict_host_check', 'use_scp',
2943-
'total_timeout', 'poll_transfers',
2943+
'total_transfer_timeout', 'poll_transfers',
29442944
'start_transfer_poll_delay']
29452945
self.ssh_connection_settings = {}
29462946
for setting in ssh_conn_params:

0 commit comments

Comments
 (0)