Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions dissect/vmfs/descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@
address_type,
)
from dissect.vmfs.c_vmfs import FS3_AddrType, FS3_DescriptorType, FS3_ZeroLevelAddrType, FS6_DirBlockType, c_vmfs
from dissect.vmfs.exception import NotAnRDMFileError, NotASymlinkError, VolumeNotAvailableError
from dissect.vmfs.exception import (
FileNotFoundError,
NotADirectoryError,
NotAnRDMFileError,
NotASymlinkError,
VolumeNotAvailableError,
)
from dissect.vmfs.util import vmfs_uuid

if TYPE_CHECKING:
Expand Down Expand Up @@ -137,8 +143,8 @@ def debug(self) -> str:
Lock [type {li.type:x} offset {li.addr.offset} v {li.token}, hb offset {li.hbAddr.offset}
gen {li.hbGen.gen}, mode {li.mode}, owner {vmfs_uuid(li.owner)} mtime {li.mtime}
num {li.numHolders} gblnum {li.gblNumHolders} gblgen {li.gblGen} gblbrk {li.gblBrk}]
Addr <{address_type(self.address)}, {cluster}, {resource}>, gen {fd.generation}, links {fd.numLinks}, type {type_str}, flags {fd.flags:#x}, uid {fd.uid}, gid {fd.gid}, mode {fd.mode:o}
len {fd.length}, nb {fd.numBlocks} tbz {fd.numTBZBlocksLo | fd.numTBZBlocksHi << 32}, cow {fd.numCOWBlocksLo | fd.numCOWBlocksHi << 32}, newSinceEpoch {fd.newSinceEpochLo | fd.newSinceEpochHi << 32}, zla {fd.zla}, bs {fd.blockSize}
Addr <{address_type(self.address)}, {cluster}, {resource}>, gen {fd.generation}, links {fd.linkCount}, type {type_str}, flags {fd.flags:#x}, uid {fd.uid}, gid {fd.gid}, mode {fd.mode:o}
len {fd.fileLength}, nb {fd.numBlocks} tbz {fd.numTBZBlocksLo | fd.numTBZBlocksHi << 32}, cow {fd.numCOWBlocksLo | fd.numCOWBlocksHi << 32}, newSinceEpoch {fd.newSinceEpochLo | fd.newSinceEpochHi << 32}, zla {fd.zeroLevelAddrType}, bs {fd.blockSize}
affinityFD <{address_type(fd.affinityFD)},{affinity_cluster},{affinity_resource}>, parentFD <{address_type(fd.parentFD)},{parent_cluster},{parent_resource}>, tbzGranularityShift {fd.tbzGranularityShift}, numLFB {fd.numLFB}
lastSFBClusterNum {fd.lastSFBClusterNum}, numPreAllocBlocks {fd.numPreAllocBlocks}, numPointerBlocks {fd.numPointerBlocks}
""").strip() # noqa: E501
Expand Down Expand Up @@ -232,13 +238,19 @@ def mode(self) -> int:

Access the mode through the :attr:`metadata` attribute to get the raw mode value.
"""
if stat.S_IFMT(self.metadata.mode) == stat.S_IFDIR:
if stat.S_IFMT(self.metadata.mode):
# If the mode already has a type bit set, return it as is
return self.metadata.mode

if self.type == FS3_DescriptorType.SYMLINK:
if self.is_dir():
return self.metadata.mode | stat.S_IFDIR

if self.is_symlink():
return self.metadata.mode | stat.S_IFLNK
if self.type == FS3_DescriptorType.RDM:

if self.is_rdm():
return self.metadata.mode | stat.S_IFBLK

return self.metadata.mode | stat.S_IFREG

@property
Expand Down Expand Up @@ -272,11 +284,11 @@ def link(self) -> str:

def is_dir(self) -> bool:
"""Return whether this file descriptor is a directory."""
return self.type == FS3_DescriptorType.DIRECTORY
return self.type == FS3_DescriptorType.DIRECTORY or (self.is_system() and stat.S_ISDIR(self.metadata.mode))

def is_file(self) -> bool:
"""Return whether this file descriptor is a regular file."""
return self.type == FS3_DescriptorType.REGFILE
return self.type == FS3_DescriptorType.REGFILE or (self.is_system() and not stat.S_ISDIR(self.metadata.mode))

def is_symlink(self) -> bool:
"""Return whether this file descriptor is a symlink."""
Expand Down
22 changes: 20 additions & 2 deletions tests/test_vmfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import contextlib
import gzip
import stat

import pytest

Expand Down Expand Up @@ -73,11 +74,20 @@ def test_vmfs_basic(path: str | list[str], type: str, request: pytest.FixtureReq
assert FS3_Config.DENSE_SBPC in fs.descriptor.config

root_dir = fs.get("/").listdir()
for name in (".vh.sf", ".pb2.sf", ".pbc.sf", ".fbb.sf", ".fdc.sf", ".sbc.sf") + (
for name in (".vh.sf", ".pb2.sf", ".pbc.sf", ".fbb.sf", ".fdc.sf", ".sbc.sf", ".sdd.sf") + (
(".jbc.sf",) if fs.is_vmfs6 else ()
):
assert name in root_dir, f"Expected {name} in root directory"
assert root_dir[name].fd.is_system(), f"{name} should be a system file"

fd = root_dir[name].fd
assert fd.is_system(), f"{name} should be a system file"

if name == ".sdd.sf":
assert fd.is_dir(), f"{name} should be a directory"
assert stat.S_ISDIR(fd.mode), f"{name} should be a directory"
else:
assert fd.is_file(), f"{name} should be a regular file"
assert stat.S_ISREG(fd.mode), f"{name} should be a regular file"


@pytest.mark.parametrize(
Expand All @@ -93,6 +103,11 @@ def test_vmfs_content(path: str) -> None:
vs = lvm.LVM(fh)
fs = vmfs.VMFS(vs.volumes[0].open())

# Test root
fd = fs.get("/")
assert fd.is_dir()
assert stat.S_ISDIR(fd.mode)

# Test a small file
fd = fs.get("small")
_assert_is_file(fd, 5, FS3_ZeroLevelAddrType.FILE_DESCRIPTOR_RESIDENT)
Expand All @@ -111,11 +126,13 @@ def test_vmfs_content(path: str) -> None:
# Test a symlink
fd = fs.get("symlink")
assert fd.is_symlink()
assert stat.S_ISLNK(fd.mode)
assert fd.link == f"/vmfs/volumes/{fs.label}/small"

# Test a directory
fd = fs.get("directory")
assert fd.is_dir()
assert stat.S_ISDIR(fd.mode)
contents = fd.listdir()

assert set(contents.keys()) == {f"file{i}" for i in range(1, 101)} | {".", ".."}, (
Expand Down Expand Up @@ -213,5 +230,6 @@ def _assert_is_file(fd: vmfs.FileDescriptor, size: int, zla: FS3_ZeroLevelAddrTy
assert not fd.is_symlink()
assert not fd.is_rdm()
assert not fd.is_system()
assert stat.S_ISREG(fd.mode)
assert fd.size == size
assert fd.zla == zla
Loading