|
| 1 | +"""Bzip2 header parsing and brute-force generation utilities tests.""" |
| 2 | + |
| 3 | +import hashlib |
| 4 | +import shutil |
| 5 | +import subprocess |
| 6 | +from pathlib import Path |
| 7 | + |
| 8 | +import pytest |
| 9 | + |
| 10 | +from torrent_compress_recovery.bz2 import ( |
| 11 | + BZIP2_MAGIC, |
| 12 | + BZIP2_MAX_LEVEL, |
| 13 | + BZIP2_MIN_LEVEL, |
| 14 | + Bzip2Header, |
| 15 | + _get_block_size_description, |
| 16 | + format_bzip2_header, |
| 17 | + parse_bzip2_header, |
| 18 | + patch_bzip2_header, |
| 19 | + sha1_piece, |
| 20 | +) |
| 21 | + |
| 22 | + |
| 23 | +def test_parse_bzip2_header(temp_dir: Path): |
| 24 | + """Test parsing bzip2 header from a file.""" |
| 25 | + if not shutil.which("bzip2"): |
| 26 | + pytest.skip("bzip2 tool not available") |
| 27 | + |
| 28 | + # Create test file |
| 29 | + bz2_file = temp_dir / "test.txt.bz2" |
| 30 | + result = subprocess.run(["bzip2", "-c"], input=b"Hello world", capture_output=True, check=True) |
| 31 | + bz2_file.write_bytes(result.stdout) |
| 32 | + |
| 33 | + header = parse_bzip2_header(bz2_file) |
| 34 | + assert header is not None |
| 35 | + assert isinstance(header.level, int) |
| 36 | + assert isinstance(header.block_size, int) |
| 37 | + assert BZIP2_MIN_LEVEL <= header.level <= BZIP2_MAX_LEVEL |
| 38 | + |
| 39 | + |
| 40 | +def test_format_bzip2_header(): |
| 41 | + """Test formatting bzip2 header for display.""" |
| 42 | + header = Bzip2Header(level=6, block_size=600000) |
| 43 | + out = format_bzip2_header(header) |
| 44 | + assert "compression level: 6" in out |
| 45 | + assert "block size: 600,000 bytes (600 KB)" in out |
| 46 | + |
| 47 | + # Test different levels |
| 48 | + header_level1 = Bzip2Header(level=1, block_size=100000) |
| 49 | + out_level1 = format_bzip2_header(header_level1) |
| 50 | + assert "compression level: 1" in out_level1 |
| 51 | + assert "block size: 100,000 bytes (100 KB)" in out_level1 |
| 52 | + |
| 53 | + header_level9 = Bzip2Header(level=9, block_size=900000) |
| 54 | + out_level9 = format_bzip2_header(header_level9) |
| 55 | + assert "compression level: 9" in out_level9 |
| 56 | + assert "block size: 900,000 bytes (900 KB)" in out_level9 |
| 57 | + |
| 58 | + |
| 59 | +def test_patch_bzip2_header(temp_dir: Path): |
| 60 | + """Test patching bzip2 header.""" |
| 61 | + # Create minimal bzip2-like data |
| 62 | + bz2_magic = b"BZh" |
| 63 | + level_byte = b"1" # Level 1 |
| 64 | + data = bz2_magic + level_byte + b"some compressed data" |
| 65 | + |
| 66 | + # Create new header with level 9 |
| 67 | + new_header = Bzip2Header(level=9, block_size=900000) |
| 68 | + patched = patch_bzip2_header(data, new_header) |
| 69 | + |
| 70 | + # Check that compression level was updated |
| 71 | + assert patched[3:4] == b"9" # Level should be 9 |
| 72 | + assert patched.startswith(BZIP2_MAGIC) |
| 73 | + |
| 74 | + |
| 75 | +def test_generate_bzip2_candidates(temp_dir: Path): |
| 76 | + """Test generating bzip2 compression candidates.""" |
| 77 | + if not shutil.which("bzip2"): |
| 78 | + pytest.skip("bzip2 tool not available") |
| 79 | + |
| 80 | + raw = temp_dir / "raw" |
| 81 | + raw.mkdir() |
| 82 | + src = raw / "sample.txt" |
| 83 | + src.write_text("sample content") |
| 84 | + |
| 85 | + header = Bzip2Header(level=6, block_size=600000) |
| 86 | + |
| 87 | + # Import here to avoid circular imports if the module doesn't exist |
| 88 | + try: |
| 89 | + from torrent_compress_recovery.bz2 import generate_bzip2_candidates |
| 90 | + |
| 91 | + candidates = generate_bzip2_candidates(src, header) |
| 92 | + |
| 93 | + # Should have at least some candidates |
| 94 | + assert len(candidates) > 0 |
| 95 | + |
| 96 | + # Check that candidates have labels and data |
| 97 | + for label, data in candidates: |
| 98 | + assert isinstance(label, str) |
| 99 | + assert isinstance(data, bytes) |
| 100 | + assert len(data) > 0 |
| 101 | + except ImportError: |
| 102 | + pytest.skip("generate_bzip2_candidates not available") |
| 103 | + |
| 104 | + |
| 105 | +def test_sha1_piece(): |
| 106 | + """Test SHA-1 piece hashing.""" |
| 107 | + data = b"test data for sha1" |
| 108 | + hash_result = sha1_piece(data) |
| 109 | + |
| 110 | + # Verify it's a 20-byte SHA-1 hash |
| 111 | + assert isinstance(hash_result, bytes) |
| 112 | + assert len(hash_result) == 20 |
| 113 | + |
| 114 | + # Verify consistency |
| 115 | + hash_result2 = sha1_piece(data) |
| 116 | + assert hash_result == hash_result2 |
| 117 | + |
| 118 | + |
| 119 | +def test_sha256_piece(): |
| 120 | + """Test SHA-256 piece hashing.""" |
| 121 | + data = b"test data for sha256" |
| 122 | + |
| 123 | + # Import here to avoid circular imports if the function doesn't exist |
| 124 | + try: |
| 125 | + from torrent_compress_recovery.bz2 import sha256_piece |
| 126 | + |
| 127 | + hash_result = sha256_piece(data) |
| 128 | + |
| 129 | + # Verify it's a 32-byte SHA-256 hash |
| 130 | + assert isinstance(hash_result, bytes) |
| 131 | + assert len(hash_result) == 32 |
| 132 | + |
| 133 | + # Verify consistency |
| 134 | + hash_result2 = sha256_piece(data) |
| 135 | + assert hash_result == hash_result2 |
| 136 | + except ImportError: |
| 137 | + pytest.skip("sha256_piece not available") |
| 138 | + |
| 139 | + |
| 140 | +def test_find_matching_candidate(): |
| 141 | + """Test finding matching candidate for bzip2 data.""" |
| 142 | + data = b"test data for matching" |
| 143 | + hash_result = hashlib.sha1(data).digest() |
| 144 | + |
| 145 | + # Import here to avoid circular imports if the function doesn't exist |
| 146 | + try: |
| 147 | + from torrent_compress_recovery.bz2 import find_matching_candidate as find_bzip2_candidate |
| 148 | + |
| 149 | + # Test with matching candidate |
| 150 | + candidates = [("test_candidate", data)] |
| 151 | + result = find_bzip2_candidate(candidates, hash_result, piece_length=len(data)) |
| 152 | + assert result == ("test_candidate", data) |
| 153 | + |
| 154 | + # Test with no matching candidate |
| 155 | + candidates = [("wrong_candidate", b"wrong data")] |
| 156 | + result = find_bzip2_candidate(candidates, hash_result, piece_length=len(data)) |
| 157 | + assert result is None |
| 158 | + except ImportError: |
| 159 | + pytest.skip("find_matching_candidate not available") |
| 160 | + |
| 161 | + |
| 162 | +def test_find_matching_candidate_sha256(): |
| 163 | + """Test finding matching candidate for bzip2 data using SHA-256.""" |
| 164 | + data = b"test data for sha256 matching" |
| 165 | + hash_result = hashlib.sha256(data).digest() |
| 166 | + |
| 167 | + # Import here to avoid circular imports if the function doesn't exist |
| 168 | + try: |
| 169 | + from torrent_compress_recovery.bz2 import find_matching_candidate_sha256 |
| 170 | + |
| 171 | + # Test with matching candidate |
| 172 | + candidates = [("test_candidate", data)] |
| 173 | + result = find_matching_candidate_sha256(candidates, hash_result) |
| 174 | + assert result == data |
| 175 | + |
| 176 | + # Test with no matching candidate |
| 177 | + candidates = [("wrong_candidate", b"wrong data")] |
| 178 | + result = find_matching_candidate_sha256(candidates, hash_result) |
| 179 | + assert result is None |
| 180 | + except ImportError: |
| 181 | + pytest.skip("find_matching_candidate_sha256 not available") |
| 182 | + |
| 183 | + |
| 184 | +def test_parse_bzip2_header_invalid_file(temp_dir: Path): |
| 185 | + """Test parsing header from invalid bzip2 file.""" |
| 186 | + invalid_file = temp_dir / "invalid.bz2" |
| 187 | + invalid_file.write_bytes(b"not a bzip2 file") |
| 188 | + |
| 189 | + header = parse_bzip2_header(invalid_file) |
| 190 | + assert header is None |
| 191 | + |
| 192 | + |
| 193 | +def test_parse_bzip2_header_too_small(temp_dir: Path): |
| 194 | + """Test parsing header from file that's too small.""" |
| 195 | + small_file = temp_dir / "small.bz2" |
| 196 | + small_file.write_bytes(b"BZ") # Only partial magic |
| 197 | + |
| 198 | + header = parse_bzip2_header(small_file) |
| 199 | + assert header is None |
| 200 | + |
| 201 | + |
| 202 | +def test_parse_bzip2_header_invalid_level(temp_dir: Path): |
| 203 | + """Test parsing header with invalid compression level.""" |
| 204 | + # Create bzip2-like data with invalid level (not 1-9) |
| 205 | + bz2_magic = b"BZh" |
| 206 | + invalid_level = b"0" # Invalid level |
| 207 | + data = bz2_magic + invalid_level + b"some compressed data" |
| 208 | + |
| 209 | + invalid_file = temp_dir / "invalid_level.bz2" |
| 210 | + invalid_file.write_bytes(data) |
| 211 | + |
| 212 | + header = parse_bzip2_header(invalid_file) |
| 213 | + assert header is None |
| 214 | + |
| 215 | + |
| 216 | +def test_bzip2_header_usage_in_compression(): |
| 217 | + """Test that header information is actually used in compression.""" |
| 218 | + # Create minimal bzip2-like data |
| 219 | + bz2_magic = b"BZh" |
| 220 | + level_byte = b"1" # Level 1 |
| 221 | + data = bz2_magic + level_byte + b"some compressed data" |
| 222 | + |
| 223 | + # Test different header settings produce different results |
| 224 | + header1 = Bzip2Header(level=1, block_size=100000) |
| 225 | + header2 = Bzip2Header(level=9, block_size=900000) |
| 226 | + |
| 227 | + patched1 = patch_bzip2_header(data, header1) |
| 228 | + patched2 = patch_bzip2_header(data, header2) |
| 229 | + |
| 230 | + # Results should be different |
| 231 | + assert patched1 != patched2 |
| 232 | + assert patched1[3:4] == b"1" # Level 1 |
| 233 | + assert patched2[3:4] == b"9" # Level 9 |
| 234 | + |
| 235 | + |
| 236 | +def test_get_block_size_description(): |
| 237 | + """Test block size description function.""" |
| 238 | + # Test valid levels |
| 239 | + assert _get_block_size_description(1) == "100,000 bytes (100 KB)" |
| 240 | + assert _get_block_size_description(6) == "600,000 bytes (600 KB)" |
| 241 | + assert _get_block_size_description(9) == "900,000 bytes (900 KB)" |
| 242 | + |
| 243 | + # Test invalid level (should return 0) |
| 244 | + assert _get_block_size_description(0) == "0 bytes (0 KB)" |
| 245 | + assert _get_block_size_description(10) == "0 bytes (0 KB)" |
0 commit comments