Skip to content

Commit

Permalink
Add tests for FSCTL_QUERY_FILE_REGIONS
Browse files Browse the repository at this point in the history
  • Loading branch information
anodos325 committed Nov 19, 2024
1 parent 16cccbe commit 4f7d9b0
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 0 deletions.
141 changes: 141 additions & 0 deletions tests/api2/test_smb_ioctl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import pytest
import random

from dataclasses import asdict
from middlewared.test.integration.assets.account import user, group
from middlewared.test.integration.assets.pool import dataset

from protocols import smb_connection
from protocols.SMB import (
FsctlQueryFileRegionsRequest,
FileRegionInfo,
FileUsage,
)

from samba import ntstatus
from samba import NTSTATUSError

SHARE_NAME = 'ioctl_share'


@pytest.fixture(scope='module')
def setup_smb_tests(request):
with dataset('smbclient-testing', data={'share_type': 'SMB'}) as ds:
with user({
'username': 'smbuser',
'full_name': 'smbuser',
'group_create': True,
'password': 'Abcd1234'
}) as u:
with smb_share(os.path.join('/mnt', ds), SHARE_NAME) as s:
try:
call('service.start', 'cifs')
yield {'dataset': ds, 'share': s, 'user': u}
finally:
call('service.stop', 'cifs')


def test__query_file_regions_normal(setup_smb_tests):
ds, share, smb_user = setup_smb_tests
with smb_connection(
share=SHARE_NAME,
username=smbuser['username'],
password='Abcd1234',
smb1=False
) as c:
fd = c.create_file("file_regions_normal", "w")
buf = random.randbytes(1024)

for offset in range(0, 128):
c.write(fd, offset=offset * 1024, data=buf)

# First get with region omitted. This should return entire file
fsctl_request_null_region = FsctlQueryFileRegionsRequest(region=None)
fsctl_resp = c.fsctl(fd, fsctl_request_null_region)

assert fsctl_resp.flags == 0
assert fsctl_resp.total_region_entry_count == 1
assert fsctl_resp.region_entry_count == 1
assert fsctl_resp.reserved == 1
assert fsctl_resp.region is not None

assert fsctl_resp.region.offset == 0
assert fsctl_resp.region.length == 128 * 1024
assert fsctl_resp.region.desired_usage == FileUsage.VALID_CACHED_DATA
assert fsctl_resp.region.reserved == 0

# Take same region we retrieved from server and use with new request
fsctl_request_with_region = FsctlQueryFileRegionsRequest(region=fsctl_resp.region)
fsctl_resp2 = c.fsctl(fd, fsctl_request_with_region)

assert asdict(fsctl_resp) == asdict(fsctl_resp2)


def test__query_file_regions_with_holes(setup_smb_tests):
ds, share, smb_user = setup_smb_tests
with smb_connection(
share=SHARE_NAME,
username=smbuser['username'],
password='Abcd1234',
smb1=False
) as c:
fd = c.create_file("file_regions_normal", "w")
buf = random.randbytes(4096)

# insert some holes in file
for offset in range(0, 130):
if offset % 2 == 0:
c.write(fd, offset=offset * 4096, data=buf)

fsctl_request_null_region = FsctlQueryFileRegionsRequest(region=None)
fsctl_resp = c.fsctl(fd, fsctl_request_null_region)

assert fsctl_resp.flags == 0
assert fsctl_resp.total_region_entry_count == 1
assert fsctl_resp.region_entry_count == 1
assert fsctl_resp.reserved == 1
assert fsctl_resp.region is not None

assert fsctl_resp.region.offset == 0
assert fsctl_resp.region.length == 128 * 4096
assert fsctl_resp.region.desired_usage == FileUsage.VALID_CACHED_DATA
assert fsctl_resp.region.reserved == 0

# Take same region we retrieved from server and use with new request
fsctl_request_with_region = FsctlQueryFileRegionsRequest(region=fsctl_resp.region)
fsctl_resp2 = c.fsctl(fd, fsctl_request_with_region)

assert asdict(fsctl_resp) == asdict(fsctl_resp2)


def test__query_file_regions_trailing_zeroes(setup_smb_tests):
"""
FileRegionInfo should contain Valid Data Length which is length in bytes of data
that has been written to the file in the specified region, from the beginning of
the region untile the last byte that has not been zeroed or uninitialized
"""
ds, share, smb_user = setup_smb_tests
with smb_connection(
share=SHARE_NAME,
username=smbuser['username'],
password='Abcd1234',
smb1=False
) as c:
fd = c.create_file("file_regions_normal", "w")
buf = random.randbytes(4096)

# insert a hole in file
c.write(fd, offset=0, data=buf)
c.write(fd, offset=8192, data=buf)

# requesting entire file should give full length
fsctl_request_null_region = FsctlQueryFileRegionsRequest(region=None)
fsctl_resp = c.fsctl(fd, fsctl_request_null_region)
assert fsctl_resp.region.length == 12288

# requesting region that has hole at end of it should only give data length
limited_region = FileRegionInfo(offset=0, length=8192)
fsctl_request_limited_region = FsctlQueryFileRegionsRequest(region=limited_region)
fsctl_resp = c.fsctl(fd, fsctl_request_limited_region)
assert fsctl_resp.region.offset == 0
assert fsctl_resp.region.length == 4096
63 changes: 63 additions & 0 deletions tests/protocols/smb_proto.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import sys
import enum
import struct
import subprocess
from dataclasses import dataclass
from functions import SRVTarget, get_host_ip
from platform import system

Expand Down Expand Up @@ -32,6 +34,60 @@
libsmb_has_rename = 'rename' in dir(libsmb.Conn)


class Fsctl(enum.IntEnum):
QUERY_FILE_REGIONS = 0x00090284


class FileUsage(enum.IntEnum):
VALID_CACHED_DATA = 0x00000001 # NTFS
VALID_NONCACHED_DATA = 0x00000002 # REFS


@dataclass(frozen=True)
class FileRegionInfo:
""" MS-FSCC 2.3.56.1 """
offset: int
length: int
desired_usage: FileUsage = FileUsage.VALID_CACHED_DATA
reserved: int = 0 # by protocol must be zero


@dataclass(frozen=True)
class FsctlQueryFileRegionsReply:
""" MS-FSCC 2.3.56 """
flags: int # by protocol must be zero
total_region_entry_count: int
region_entry_count: int
reserved: int # by protocol must be zero
region: FileRegionInfo


@dataclass(frozen=True)
class FsctlQueryFileRegionsRequest:
""" MS-FSCC 2.3.55 """
region_info: FileRegionInfo | None

def __post_init__(self):
self.fsctl = Fsctl.QUERY_FILE_REGIONS

def pack(self):
if self.region_info is None:
return b''

return struct.pack(
'<qqII',
self.region_info.offset,
self.region_info.length,
self.region_info.desired_usage,
self.region_info.reserved
)

def unpack(self, buf):
unpacked_resp = list(struct.unpack('<IIII', buf[0:16])
unpacked_resp.append(FileRegionInfo(struct.unpack('<qqII', buf[16:])))
return FsctlQueryFileRegionsReply(*unpacked_resp)


class ACLControl(enum.IntFlag):
SEC_DESC_OWNER_DEFAULTED = 0x0001
SEC_DESC_GROUP_DEFAULTED = 0x0002
Expand Down Expand Up @@ -353,3 +409,10 @@ def inherit_acl(self, path, action):
cl = subprocess.run(cmd, capture_output=True)
if cl.returncode != 0:
raise RuntimeError(cl.stdout.decode() or cl.stderr.decode())

def fsctl(self, idx, fsctl_request):
resp = self._connection.fsctl(
self._open_files[idx]["fh"], fsctl_request.fsctl, fsctl_request.pack()
)

return fsctl_request.unpack(resp)

0 comments on commit 4f7d9b0

Please sign in to comment.