forked from xenserver/python-libs
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_cpio.py
More file actions
158 lines (136 loc) · 4.96 KB
/
test_cpio.py
File metadata and controls
158 lines (136 loc) · 4.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
from __future__ import print_function
import os
import shutil
import subprocess
import unittest
import warnings
from xcp.cpiofile import CpioFile, CpioInfo, CpioFileCompat, CPIO_PLAIN, CPIO_GZIPPED
try:
from hashlib import md5
except:
from md5 import md5
def writeRandomFile(fn, size, start='', add='a'):
f = open(fn, 'wb')
m = md5()
m.update(start)
assert(len(add) != 0)
while size > 0:
d = m.digest()
if size < len(d):
d=d[:size]
f.write(d)
size -= len(d)
m.update(add)
f.close()
def check_call(cmd):
r = subprocess.call(cmd, shell=True)
if r != 0:
raise Exception('error executing command')
class TestCpio(unittest.TestCase):
def setUp(self):
self.doXZ = False
self.md5data = ''
# create some archive from scratch
shutil.rmtree('archive', True)
os.mkdir('archive')
writeRandomFile('archive/data', 10491)
self.md5data = md5(open('archive/data').read()).hexdigest()
check_call("find archive | cpio -o -H newc > archive.cpio")
check_call("gzip -c < archive.cpio > archive.cpio.gz")
check_call("bzip2 -c < archive.cpio > archive.cpio.bz2")
try:
import lzma
self.doXZ = subprocess.call("xz --check=crc32 --lzma2=dict=1MiB < archive.cpio > archive.cpio.xz", shell=True) == 0
except Exception as ex:
# FIXME will issue warning even if test_xz is not requested
warnings.warn("will not test cpio.xz: %s" % ex)
self.doXZ = False
def tearDown(self):
check_call("rm -rf archive archive.cpio* archive2")
# TODO check with file (like 'r:*')
# TODO use cat to check properly for pipes
def archiveExtract(self, fn, fmt='r|*'):
arc = CpioFile.open(fn, fmt)
found = False
for f in arc:
if f.isfile():
data = arc.extractfile(f).read()
self.assertEqual(len(data), f.size)
self.assertEqual(self.md5data, md5(data).hexdigest())
found = True
arc.close()
self.assertTrue(found)
# extract with extractall and compare
arc = CpioFile.open(fn, fmt)
check_call("rm -rf archive2")
os.rename('archive', 'archive2')
arc.extractall()
check_call("diff -rq archive2 archive")
arc.close()
def archiveCreate(self, fn, fmt='w'):
os.unlink(fn)
arc = CpioFile.open(fn, fmt)
f = arc.getcpioinfo('archive/data')
arc.addfile(f, open('archive/data'))
# test recursively add "."
os.chdir('archive')
arc.add(".")
os.chdir("..")
# TODO add self crafted file
arc.close()
# special case for XZ, test check type (crc32)
if fmt.endswith('xz'):
f = open(fn, 'rb')
f.seek(6)
self.assertEqual(f.read(2), '\x00\x01')
f.close()
self.archiveExtract(fn)
def doArchive(self, fn, fmt=None):
self.archiveExtract(fn)
self.archiveCreate(fn, fmt is None and 'w' or 'w|%s' % fmt )
if not fmt is None:
self.archiveExtract(fn, 'r|%s' % fmt)
def test_plain(self):
self.doArchive('archive.cpio')
def test_gz(self):
self.doArchive('archive.cpio.gz', 'gz')
def test_bz2(self):
self.doArchive('archive.cpio.bz2', 'bz2')
def test_xz(self):
if not self.doXZ:
raise unittest.SkipTest("lzma package or xz tool not available")
print('Running test for XZ')
self.doArchive('archive.cpio.xz', 'xz')
# CpioFileCompat testing
def archiveExtractCompat(self, fn, comp):
arc = CpioFileCompat(fn, mode="r", compression={"": CPIO_PLAIN,
"gz": CPIO_GZIPPED}[comp])
found = False
for f in arc.namelist():
info = arc.getinfo(f)
if info.isfile():
data = arc.read(f)
self.assertEqual(len(data), info.size)
self.assertEqual(self.md5data, md5(data).hexdigest())
found = True
arc.close()
self.assertTrue(found)
def archiveCreateCompat(self, fn, comp):
if os.path.exists(fn):
os.unlink(fn)
arc = CpioFileCompat(fn, mode="w", compression={"": CPIO_PLAIN,
"gz": CPIO_GZIPPED}[comp])
arc.write('archive/data')
arc.close()
self.archiveExtract(fn)
def doArchiveCompat(self, fn, fmt):
self.archiveExtractCompat(fn, fmt)
fn2 = "archive2" + fn[len("archive"):]
self.archiveCreateCompat(fn2, fmt)
self.archiveExtractCompat(fn2, fmt)
def test_compat_plain(self):
self.doArchiveCompat('archive.cpio', '')
def test_compat_gz(self):
# FIXME: this test exhibits "unclosed file" warnings when run
# under `-Wd`
self.doArchiveCompat('archive.cpio.gz', 'gz')