-
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathgit.py
More file actions
391 lines (354 loc) · 15.2 KB
/
git.py
File metadata and controls
391 lines (354 loc) · 15.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
from . import common
import functools
import os
import re
import subprocess
import sys
import typing
logger = common.logger
GIT_CLONE_DEPTH = os.getenv("GIT_CLONE_DEPTH")
class GitError(common.WCError):
pass
class GitWorkingCopy(common.BaseWorkingCopy):
"""The git working copy.
Now supports git 1.5 and 1.6+ in a single codebase.
"""
# TODO: make this configurable? It might not make sense however, as we
# should make master and a lot of other conventional stuff configurable
_upstream_name = "origin"
def __init__(self, source: typing.Dict[str, str]):
self.git_executable = common.which("git")
if "rev" in source and "revision" in source:
raise ValueError(
"The source definition of '%s' contains "
"duplicate revision options." % source["name"]
)
# 'rev' is canonical
if "revision" in source:
source["rev"] = source["revision"]
del source["revision"]
if "rev" in source:
# drop default value for branch if rev is specified
if source.get("branch") == "main":
del source["branch"]
elif "branch" in source:
logger.error(
"Cannot specify both branch (%s) and rev/revision "
"(%s) in source for %s",
source["branch"],
source["rev"],
source["name"],
)
sys.exit(1)
super().__init__(source)
@functools.lru_cache(maxsize=4096)
def git_version(self) -> typing.Tuple[int, ...]:
cmd = self.run_git(["--version"])
stdout, stderr = cmd.communicate()
if cmd.returncode != 0:
logger.error("Could not determine git version")
logger.error(f"'git --version' output was:\n{stdout}\n{stderr}")
sys.exit(1)
m = re.search(r"git version (\d+)\.(\d+)(\.\d+)?(\.\d+)?", stdout)
if m is None:
logger.error("Unable to parse git version output")
logger.error(f"'git --version' output was:\n{stdout}\n{stderr}")
sys.exit(1)
version = m.groups()
if version[3] is not None:
version = (
int(version[0]),
int(version[1]),
int(version[2][1:]),
int(version[3][1:]),
)
elif version[2] is not None:
version = (int(version[0]), int(version[1]), int(version[2][1:]))
else:
version = (int(version[0]), int(version[1]))
if version < (1, 5):
logger.error(
"Git version %s is unsupported, please upgrade",
".".join([str(v) for v in version]),
)
sys.exit(1)
return version
@property
def _remote_branch_prefix(self):
if self.git_version() < (1, 6, 3):
return self._upstream_name
return "remotes/%s" % self._upstream_name
def run_git(self, commands: typing.List[str], **kwargs) -> subprocess.Popen:
commands.insert(0, self.git_executable)
kwargs["stdout"] = subprocess.PIPE
kwargs["stderr"] = subprocess.PIPE
# This should ease things up when multiple processes are trying to send
# back to the main one large chunks of output
kwargs["bufsize"] = -1
kwargs["universal_newlines"] = True
return subprocess.Popen(commands, **kwargs)
def git_merge_rbranch(
self, stdout_in: str, stderr_in: str, accept_missing: bool = False
) -> typing.Tuple[str, str]:
path = self.source["path"]
branch = self.source.get("branch", "master")
cmd = self.run_git(["branch", "-a"], cwd=path)
stdout, stderr = cmd.communicate()
if cmd.returncode != 0:
raise GitError("'git branch -a' failed.\n%s" % stderr)
stdout_in += stdout
stderr_in += stderr
if not re.search(r"^(\*| ) %s$" % re.escape(branch), stdout, re.M):
# The branch is not local. We should not have reached
# this, unless no branch was specified and we guess wrong
# that it should be master.
if accept_missing:
logger.info("No such branch %r", branch)
return (stdout_in, stderr_in)
logger.error("No such branch %r", branch)
sys.exit(1)
rbp = self._remote_branch_prefix
cmd = self.run_git(["merge", f"{rbp}/{branch}"], cwd=path)
stdout, stderr = cmd.communicate()
if cmd.returncode != 0:
raise GitError(
f"git merge of remote branch 'origin/{branch}' failed.\n{stderr}"
)
return stdout_in + stdout, stderr_in + stderr
def git_checkout(self, **kwargs) -> typing.Union[str, None]:
name = self.source["name"]
path = str(self.source["path"])
url = self.source["url"]
if os.path.exists(path):
self.output(
(logger.info, "Skipped cloning of existing package '%s'." % name)
)
return None
msg = "Cloned '%s' with git" % name
if "branch" in self.source:
msg += " using branch '%s'" % self.source["branch"]
msg += " from '%s'." % url
self.output((logger.info, msg))
args = ["clone", "--quiet"]
update_git_submodules = self.source.get("submodules", kwargs["submodules"])
if update_git_submodules == "recursive":
args.append("--recurse-submodules")
if "depth" in self.source or GIT_CLONE_DEPTH:
args.extend(["--depth", self.source.get("depth", GIT_CLONE_DEPTH)])
if "branch" in self.source:
args.extend(["-b", self.source["branch"]])
args.extend([url, path])
cmd = self.run_git(args)
stdout, stderr = cmd.communicate()
if cmd.returncode != 0:
raise GitError(f"git cloning of '{name}' failed.\n{stderr}")
if "rev" in self.source:
stdout, stderr = self.git_switch_branch(stdout, stderr)
if "pushurl" in self.source:
stdout, stderr = self.git_set_pushurl(stdout, stderr)
if update_git_submodules in ["always", "checkout"]:
stdout, stderr, initialized = self.git_init_submodules(stdout, stderr)
# Update only new submodules that we just registered. this is for safety reasons
# as git submodule update on modified submodules may cause code loss
for submodule in initialized:
stdout, stderr = self.git_update_submodules(
stdout, stderr, submodule=submodule
)
self.output(
(
logger.info,
"Initialized '%s' submodule at '%s' with git."
% (name, submodule),
)
)
if kwargs.get("verbose", False):
return stdout
return None
def git_switch_branch(
self, stdout_in: str, stderr_in: str, accept_missing: bool = False
) -> typing.Tuple[str, str]:
"""Switch branches.
If accept_missing is True, we do not switch the branch if it
is not there. Useful for switching back to master.
"""
path = self.source["path"]
branch = self.source.get("branch", "master")
rbp = self._remote_branch_prefix
cmd = self.run_git(["branch", "-a"], cwd=path)
stdout, stderr = cmd.communicate()
if cmd.returncode != 0:
raise GitError("'git branch -a' failed.\n%s" % stderr)
stdout_in += stdout
stderr_in += stderr
if "rev" in self.source:
# A tag or revision was specified instead of a branch
argv = ["checkout", self.source["rev"]]
self.output((logger.info, "Switching to rev '%s'." % self.source["rev"]))
elif re.search(r"^(\*| ) %s$" % re.escape(branch), stdout, re.M):
# the branch is local, normal checkout will work
argv = ["checkout", branch]
self.output((logger.info, f"Switching to branch '{branch}'."))
elif re.search(
"^ " + re.escape(rbp) + r"\/" + re.escape(branch) + "$", stdout, re.M
):
# the branch is not local, normal checkout won't work here
rbranch = f"{rbp}/{branch}"
argv = ["checkout", "-b", branch, rbranch]
self.output((logger.info, f"Switching to remote branch '{branch}'."))
elif accept_missing:
self.output((logger.info, f"No such branch {branch}"))
return (stdout_in + stdout, stderr_in + stderr)
else:
self.output((logger.error, f"No such branch {branch}"))
sys.exit(1)
# runs the checkout with predetermined arguments
cmd = self.run_git(argv, cwd=path)
stdout, stderr = cmd.communicate()
if cmd.returncode != 0:
raise GitError(f"git checkout of branch '{branch}' failed.\n{stderr}")
return (stdout_in + stdout, stderr_in + stderr)
def git_update(self, **kwargs) -> typing.Union[str, None]:
name = self.source["name"]
path = self.source["path"]
self.output((logger.info, "Updated '%s' with git." % name))
# First we fetch. This should always be possible.
argv = ["fetch"]
update_git_submodules = self.source.get("submodules", kwargs["submodules"])
if update_git_submodules == "recursive":
argv.append("--recurse-submodules")
cmd = self.run_git(argv, cwd=path)
stdout, stderr = cmd.communicate()
if cmd.returncode != 0:
raise GitError(f"git fetch of '{name}' failed.\n{stderr}")
if "rev" in self.source:
stdout, stderr = self.git_switch_branch(stdout, stderr)
elif "branch" in self.source:
stdout, stderr = self.git_switch_branch(stdout, stderr)
stdout, stderr = self.git_merge_rbranch(stdout, stderr)
else:
# We may have specified a branch previously but not
# anymore. In that case, we want to revert to master.
stdout, stderr = self.git_switch_branch(stdout, stderr, accept_missing=True)
stdout, stderr = self.git_merge_rbranch(stdout, stderr, accept_missing=True)
update_git_submodules = self.source.get("submodules", kwargs["submodules"])
if update_git_submodules in ["always", "recursive"]:
stdout, stderr, initialized = self.git_init_submodules(stdout, stderr)
# Update only new submodules that we just registered. this is for safety reasons
# as git submodule update on modified subomdules may cause code loss
for submodule in initialized:
stdout, stderr = self.git_update_submodules(
stdout,
stderr,
submodule=submodule,
recursive=update_git_submodules == "recursive",
)
self.output(
(
logger.info,
"Initialized '%s' submodule at '%s' with git."
% (name, submodule),
)
)
if kwargs.get("verbose", False):
return stdout
return None
def checkout(self, **kwargs) -> typing.Union[str, None]:
name = self.source["name"]
path = self.source["path"]
update = self.should_update(**kwargs)
if not os.path.exists(path):
return self.git_checkout(**kwargs)
if update:
return self.update(**kwargs)
elif self.matches():
self.output(
(logger.info, "Skipped checkout of existing package '%s'." % name)
)
else:
self.output(
(
logger.warning,
"Checkout URL for existing package '%s' differs. Expected '%s'."
% (name, self.source["url"]),
)
)
return None
def status(self, **kwargs) -> typing.Union[typing.Tuple[str, str], str]:
path = self.source["path"]
cmd = self.run_git(["status", "-s", "-b"], cwd=path)
stdout, stderr = cmd.communicate()
lines = stdout.strip().split("\n")
if len(lines) == 1:
if "ahead" in lines[0]:
status = "ahead"
else:
status = "clean"
else:
status = "dirty"
if kwargs.get("verbose", False):
return status, stdout
return status
def matches(self) -> bool:
name = self.source["name"]
path = self.source["path"]
# This is the old matching code: it does not work on 1.5 due to the
# lack of the -v switch
cmd = self.run_git(["remote", "show", "-n", self._upstream_name], cwd=path)
stdout, stderr = cmd.communicate()
if cmd.returncode != 0:
raise GitError(f"git remote of '{name}' failed.\n{stderr}")
return self.source["url"] in stdout.split()
def update(self, **kwargs) -> typing.Union[str, None]:
name = self.source["name"]
if not self.matches():
self.output(
(
logger.warning,
"Can't update package '%s' because its URL doesn't match." % name,
)
)
if self.status() != "clean" and not kwargs.get("force", False):
raise GitError("Can't update package '%s' because it's dirty." % name)
return self.git_update(**kwargs)
def git_set_pushurl(self, stdout_in, stderr_in) -> typing.Tuple[str, str]:
cmd = self.run_git(
[
"config",
"remote.%s.pushurl" % self._upstream_name,
self.source["pushurl"],
],
cwd=self.source["path"],
)
stdout, stderr = cmd.communicate()
if cmd.returncode != 0:
raise GitError(
"git config remote.%s.pushurl %s \nfailed.\n"
% (self._upstream_name, self.source["pushurl"])
)
return (stdout_in + stdout, stderr_in + stderr)
def git_init_submodules(
self, stdout_in, stderr_in
) -> typing.Tuple[str, str, typing.List]:
cmd = self.run_git(["submodule", "init"], cwd=self.source["path"])
stdout, stderr = cmd.communicate()
if cmd.returncode != 0:
raise GitError("git submodule init failed.\n")
output = stdout
if not output:
output = stderr
initialized_submodules = re.findall(r'\s+[\'"](.*?)[\'"]\s+\(.+\)', output)
return (stdout_in + stdout, stderr_in + stderr, initialized_submodules)
def git_update_submodules(
self, stdout_in, stderr_in, submodule="all", recursive: bool = False
) -> typing.Tuple[str, str]:
params = ["submodule", "update"]
if recursive:
params.append("--init")
params.append("--recursive")
if submodule != "all":
params.append(submodule)
cmd = self.run_git(params, cwd=self.source["path"])
stdout, stderr = cmd.communicate()
if cmd.returncode != 0:
raise GitError("git submodule update failed.\n")
return (stdout_in + stdout, stderr_in + stderr)