-
Notifications
You must be signed in to change notification settings - Fork 221
Expand file tree
/
Copy pathborg_job.py
More file actions
373 lines (311 loc) · 14.5 KB
/
borg_job.py
File metadata and controls
373 lines (311 loc) · 14.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
import json
import logging
import os
import select
import shlex
import shutil
import signal
import sys
import time
from collections import namedtuple
from datetime import datetime as dt
from subprocess import PIPE, Popen, TimeoutExpired
from threading import Lock
from PyQt6 import QtCore
from PyQt6.QtWidgets import QApplication
from vorta import application
from vorta.borg.jobs_manager import JobInterface
from vorta.i18n import trans_late, translate
from vorta.inhibitor.abc import Inhibitor
from vorta.keyring.abc import VortaKeyring
from vorta.keyring.db import VortaDBKeyring
from vorta.store.models import BackupProfileMixin, EventLogModel
from vorta.utils import borg_compat, pretty_bytes
keyring_lock = Lock()
db_lock = Lock()
logger = logging.getLogger(__name__)
FakeRepo = namedtuple('Repo', ['url', 'name', 'id', 'extra_borg_arguments', 'encryption'])
FakeProfile = namedtuple('FakeProfile', ['id', 'repo', 'name', 'ssh_key'])
"""
All methods in this class must be thread safe. Particularly,
I strongly unadvised global variable and class variables.
Sqlite access are thread-safe because peewee is thread-safe.
The method prepare is not thread-safe because of keyring and I don't know why. That's why I added a
temporary mutex.
"""
class BorgJob(JobInterface, BackupProfileMixin):
"""
Base class to run `borg` command line jobs. If a command needs more pre- or post-processing
it should subclass `BorgJob`.
"""
updated = QtCore.pyqtSignal(str)
result = QtCore.pyqtSignal(dict)
keyring = None # Store keyring to minimize imports
def __init__(self, cmd, params, site="default"):
"""
Thread to run Borg operations in.
:param cmd: Borg command line
:param params: Pass options that were used to build cmd and may be needed to
process the result.
:param site: For scheduler. Only one job can run per site at one time. Site is
usually the repository ID, or 'default' for misc Borg commands.
"""
super().__init__()
self.site_id = site
self.app: application.VortaApp = QApplication.instance()
# Default state is should_inhibit false. Subclasses can set should_inhibit in their own
# constructors
self.should_inhibit = False
# Declare labels here for translation
self.category_label = {
"files": trans_late("BorgJob", "Files"),
"original": trans_late("BorgJob", "Original"),
"deduplicated": trans_late("BorgJob", "Deduplicated"),
"compressed": trans_late("BorgJob", "Compressed"),
}
cmd[0] = self.prepare_bin()
# Add extra Borg args to command. Never pass None.
extra_args_str = params.get('extra_borg_arguments')
if extra_args_str is not None and len(extra_args_str) > 0:
extra_args = shlex.split(extra_args_str)
cmd = cmd[:2] + extra_args + cmd[2:]
env = os.environ.copy()
env['BORG_HOSTNAME_IS_UNIQUE'] = '1'
env['BORG_RELOCATED_REPO_ACCESS_IS_OK'] = '1'
env['BORG_RSH'] = 'ssh'
if 'additional_env' in params:
env = {**env, **params['additional_env']}
password = params.get('password')
if password is not None:
env['BORG_PASSPHRASE'] = password
else:
env['BORG_PASSPHRASE'] = '9999999' # Set dummy password to avoid prompt.
if env.get('BORG_PASSCOMMAND', False):
env.pop('BORG_PASSPHRASE', None) # Unset passphrase
ssh_key = params.get('ssh_key')
if ssh_key is not None:
ssh_key_path = os.path.expanduser(f'~/.ssh/{ssh_key}')
env['BORG_RSH'] += f' -i {ssh_key_path}'
self.env = env
self.cmd = cmd
self.cwd = params.get('cwd', None)
self.params = params
self.process = None
self.cleanup_files = params.get('cleanup_files', [])
def repo_id(self):
return self.site_id
def cancel(self):
logger.debug("Cancel job on site %s", self.site_id)
if self.process is not None:
self.process.send_signal(signal.SIGINT)
try:
self.process.wait(timeout=3)
except TimeoutExpired:
try:
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
except ProcessLookupError:
pass
@classmethod
def prepare(cls, profile):
"""
Prepare for running Borg. This function in the base class should be called from all
subclasses and calls that define their own `cmd`.
The `prepare()` step does these things:
- validate if all conditions to run command are met
- build borg command
`prepare()` is run 2x. First at the global level and then for each subcommand.
:return: dict(ok: book, message: str)
"""
ret = {'ok': False}
if cls.prepare_bin() is None:
ret['message'] = trans_late('messages', 'Borg binary was not found.')
return ret
if profile.repo is None:
ret['message'] = trans_late('messages', 'Select a backup repository first.')
return ret
if not borg_compat.check('JSON_LOG'):
ret['message'] = trans_late('messages', 'Your Borg version is too old. >=1.1.0 is required.')
return ret
# Try to get password from chosen keyring backend.
with keyring_lock:
cls.keyring = VortaKeyring.get_keyring()
logger.debug("Using %s keyring to store passwords.", cls.keyring.__class__.__name__)
ret['password'] = cls.keyring.get_password('vorta-repo', profile.repo.url)
# Check if keyring is locked
if profile.repo.encryption != 'none' and not cls.keyring.is_unlocked:
ret['message'] = trans_late(
'messages',
'Please unlock your system password manager or disable it under Misc',
)
return ret
# Try to fall back to DB Keyring, if we use the system keychain.
if ret['password'] is None and cls.keyring.is_system:
logger.debug('Password not found in primary keyring. Falling back to VortaDBKeyring.')
ret['password'] = VortaDBKeyring().get_password('vorta-repo', profile.repo.url)
# Give warning and continue if password is found there.
if ret['password'] is not None:
logger.warning(
'Found password in database, but secure storage was available. '
'Consider re-adding the repo to use it.'
)
# Password is required for encryption, cannot continue
if ret['password'] is None and not isinstance(profile.repo, FakeRepo) and profile.repo.encryption != 'none':
ret['message'] = trans_late(
'messages',
"Your repo passphrase was stored in a password manager which is no longer available.\n"
"Try unlinking and re-adding your repo.",
)
return ret
ret['ssh_key'] = profile.ssh_key
ret['repo_id'] = profile.repo.id
ret['repo_url'] = profile.repo.url
ret['repo_name'] = profile.repo.name
ret['extra_borg_arguments'] = profile.repo.extra_borg_arguments
ret['profile_name'] = profile.name
ret['profile_id'] = profile.id
ret['ok'] = True
ret['cleanup_files'] = []
return ret
@classmethod
def prepare_bin(cls):
"""Find packaged borg binary. Prefer globally installed."""
# On MacOS, the PATH environment variable does not seem to be set when run as a pyinstaller binary.
# More info at https://github.com/borgbase/vorta/issues/2100
# Set the path to also find homebrew installs of Borg, and avoid falling back to the embedded binary.
if sys.platform == 'darwin':
current_path = os.environ.get("PATH", "/usr/bin:/bin")
os.environ["PATH"] = f"{current_path}:/opt/homebrew/bin:/usr/local/bin"
# Now continue looking for the borg binary to use
borg_in_path = shutil.which('borg')
if borg_in_path:
return borg_in_path
elif sys.platform == 'darwin':
# macOS: Look in pyinstaller bundle
from Foundation import NSBundle
mainBundle = NSBundle.mainBundle()
bundled_borg = os.path.join(mainBundle.bundlePath(), 'Contents', 'Resources', 'borg-dir', 'borg.exe')
if os.path.isfile(bundled_borg):
return bundled_borg
return None
def run(self):
self.started_event()
with db_lock:
log_entry = EventLogModel(
category=self.params.get('category', 'user'),
subcommand=self.cmd[1],
profile=self.params.get('profile_id', None),
)
log_entry.save()
# logs: put cmd arguments with special strings in quotation marks
quote_strings = [' ', '*', '?', 're:']
cmd_args_to_log = self.cmd[:]
for i, arg in enumerate(cmd_args_to_log):
if any(quotestr in arg for quotestr in quote_strings):
cmd_args_to_log[i] = "'" + arg + "'" # add quotes
logger.info('Running command: %s', ' '.join(cmd_args_to_log))
del cmd_args_to_log
p = Popen(
self.cmd,
stdout=PIPE,
stderr=PIPE,
bufsize=1,
universal_newlines=True,
env=self.env,
cwd=self.cwd,
start_new_session=True,
)
error_messages = [] # List of error messages included in the result
self.process = p
# Prevent blocking of stdout/err. Via https://stackoverflow.com/a/7730201/3983708
os.set_blocking(p.stdout.fileno(), False)
os.set_blocking(p.stderr.fileno(), False)
def read_async(fd):
try:
return fd.read()
except (IOError, TypeError):
return ''
with self.get_inhibitor():
stdout = []
while True:
# Wait for new output
select.select([p.stdout, p.stderr], [], [], 0.1)
stdout.append(read_async(p.stdout))
stderr = read_async(p.stderr)
if stderr:
for line in stderr.split('\n'):
try:
parsed = json.loads(line)
if parsed['type'] == 'log_message':
context = {
'msgid': parsed.get('msgid'),
'repo_url': self.params['repo_url'],
'profile_name': self.params.get('profile_name'),
'cmd': self.params['cmd'][1],
}
self.app.backup_log_event.emit(
f'[{self.params["profile_name"]}] {parsed["levelname"]}: {parsed["message"]}',
context,
)
level_int = getattr(logging, parsed["levelname"])
logger.log(level_int, parsed["message"])
if level_int >= logging.WARNING:
# Append log to list of error messages
error_messages.append((level_int, parsed["message"]))
elif parsed['type'] == 'file_status':
self.app.backup_log_event.emit(
f'[{self.params["profile_name"]}] {parsed["path"]} ({parsed["status"]})', {}
)
elif parsed['type'] == 'progress_percent' and parsed.get("message"):
self.app.backup_log_event.emit(
f'[{self.params["profile_name"]}] {parsed["message"]}', {}
)
elif parsed['type'] == 'archive_progress' and not parsed.get('finished', False):
msg = (
f"{translate('BorgJob', 'Files')}: {parsed['nfiles']}, "
f"{translate('BorgJob', 'Original')}: {pretty_bytes(parsed['original_size'])}, "
# f"{translate('BorgJob','Compressed')}: {pretty_bytes(parsed['compressed_size'])}, " # noqa: E501
f"{translate('BorgJob', 'Deduplicated')}: {pretty_bytes(parsed.get('deduplicated_size', 0))}" # noqa: E501
)
self.app.backup_progress_event.emit(f"[{self.params['profile_name']}] {msg}")
except json.decoder.JSONDecodeError:
msg = line.strip()
if msg: # Log only if there is something to log.
self.app.backup_log_event.emit(f'[{self.params["profile_name"]}] {msg}', {})
logger.warning(msg)
if p.poll() is not None:
time.sleep(0.1)
stdout.append(read_async(p.stdout))
break
result = {
'params': self.params,
'returncode': self.process.returncode,
'cmd': self.cmd,
'errors': error_messages,
}
stdout = ''.join(stdout)
try:
result['data'] = json.loads(stdout)
except ValueError:
result['data'] = stdout
log_entry.returncode = p.returncode
log_entry.repo_url = self.params.get('repo_url', None)
log_entry.end_time = dt.now()
with db_lock:
log_entry.save()
self.process_result(result)
self.finished_event(result)
for tmpfile in self.cleanup_files:
tmpfile.close()
def process_result(self, result):
pass
def started_event(self):
self.updated.emit(self.tr('Task started'))
def finished_event(self, result):
self.result.emit(result)
def get_inhibitor(self) -> Inhibitor:
if self.should_inhibit:
logger.debug("job will inhibit")
return Inhibitor.get_inhibitor(f"borg {self.cmd[1]}")
else:
logger.debug("job will not inhibit")
return Inhibitor.get_noop_inhibitor()