diff --git a/pymodbus/datastore/context.py b/pymodbus/datastore/context.py index 369913c69..083aa8e00 100644 --- a/pymodbus/datastore/context.py +++ b/pymodbus/datastore/context.py @@ -2,6 +2,8 @@ from __future__ import annotations +from copy import deepcopy + from ..constants import ExcCodes from ..exceptions import NoSuchIdException from ..logging import Log @@ -25,36 +27,41 @@ class ModbusDeviceContext: # pylint: disable=too-few-public-methods _fx_mapper.update([(i, "h") for i in (3, 6, 16, 22, 23)]) _fx_mapper.update([(i, "c") for i in (1, 5, 15)]) + def __build_data(self, simdata): + """Build and/or correct values.""" + for entry in simdata: + entry.datatype = DataType.BITS + if not isinstance(entry.values, list): + entry.values = [entry.values] + for i, _value in enumerate(entry.values): + entry.values[i] = bool(entry.values[i]) + def __init__(self, *_args, di: ModbusSequentialDataBlock | ModbusSparseDataBlock | None = None, co: ModbusSequentialDataBlock | ModbusSparseDataBlock | None = None, ir: ModbusSequentialDataBlock | ModbusSparseDataBlock | None = None, hr: ModbusSequentialDataBlock | ModbusSparseDataBlock | None = None, ): - """Initialize the datastores.""" - self.store = { - "d": di, - "c": co, - "i": ir, - "h": hr, - } + """Define device.""" if not di: - di = ModbusSequentialDataBlock(0, values=0) + di = ModbusSequentialDataBlock(1, values=False) if not co: - co = ModbusSequentialDataBlock(0, values=0) + co = ModbusSequentialDataBlock(1, values=False) if not ir: - ir = ModbusSequentialDataBlock(0, values=0) + ir = ModbusSequentialDataBlock(1, values=0) if not hr: - hr = ModbusSequentialDataBlock(0, values=0) - for entry in co.simdata: - entry.datatype = DataType.BITS - for entry in di.simdata: - entry.datatype = DataType.BITS + hr = ModbusSequentialDataBlock(1, values=0) + co_simdata = deepcopy(co.simdata) + di_simdata = deepcopy(di.simdata) + ir_simdata = deepcopy(ir.simdata) + hr_simdata = deepcopy(hr.simdata) + self.__build_data(co_simdata) + self.__build_data(di_simdata) self.simdevice = SimDevice(0, simdata=( - co.simdata, - di.simdata, - ir.simdata, - hr.simdata)) + co_simdata, + di_simdata, + ir_simdata, + hr_simdata)) Log.warning("ModbusDeviceContext, ModbusSequentialDataBlock, " "ModbusSparseDataBlock are deprecated " "and will be removed in v4.\n" diff --git a/pymodbus/datastore/sequential.py b/pymodbus/datastore/sequential.py index 5a408098e..be9a84b6c 100644 --- a/pymodbus/datastore/sequential.py +++ b/pymodbus/datastore/sequential.py @@ -13,4 +13,4 @@ def __init__(self, address, values): :param address: The starting address of the datastore :param values: Either a list or a dictionary of values """ - self.simdata = [SimData(address, values=values, datatype=DataType.REGISTERS)] + self.simdata = [SimData(address-1, values=values, datatype=DataType.REGISTERS)] diff --git a/pymodbus/simulator/simdevice.py b/pymodbus/simulator/simdevice.py index 30a22e472..acec682d5 100644 --- a/pymodbus/simulator/simdevice.py +++ b/pymodbus/simulator/simdevice.py @@ -162,18 +162,18 @@ def __check_simple_blocks(self): if i < 2 and entry.datatype != DataType.BITS: raise TypeError(f"simdata[{inx}]=tuple[{TUPLE_NAMES[i]}] -> list[{inx}] not DataType.BITS, not allowed") - def __check_block(self, block: list[SimData], name: str): + def __check_block(self, block: list[SimData], use_bits, name: str): """Check block content.""" x_block = sorted(block, key=lambda x: x.address) last_address = x_block[0].address -1 for entry in x_block: - last_address = self.__check_block_entries(last_address, entry, name) + last_address = self.__check_block_entries(last_address, entry, use_bits, name) - def __check_block_entries(self, last_address: int, entry: SimData, _name: str) -> int: + def __check_block_entries(self, last_address: int, entry: SimData, use_bits: bool, _name: str) -> int: """Check block entries.""" if entry.address <= last_address: raise TypeError(f"SimData address {entry.address} is overlapping!") - blocks_regs = entry.build_registers(self.endian, self.string_encoding, False) * entry.count + blocks_regs = entry.build_registers(self.endian, self.string_encoding, use_bits) * entry.count return last_address + len(blocks_regs) def __check_parameters(self): @@ -182,10 +182,10 @@ def __check_parameters(self): self.__check_simple2() if isinstance(self.simdata, tuple): for i in range(4): - self.__check_block(cast(tuple,self.simdata)[i], TUPLE_NAMES[i]) + self.__check_block(cast(tuple,self.simdata)[i], (i in {0,1}), TUPLE_NAMES[i]) else: x_simdata = self.simdata if isinstance(self.simdata, list) else [self.simdata] - self.__check_block(x_simdata, "list") + self.__check_block(x_simdata, False, "list") def __post_init__(self): """Define a device.""" @@ -198,10 +198,10 @@ def __build_flags(self, simdata: SimData) -> int: flag_normal |= SimUtils.RunTimeFlag_READONLY return flag_normal - def __create_simdata(self, simdata: SimData, flag_list: list[int], reg_list: list[int]): + def __create_simdata(self, simdata: SimData, flag_list: list[int], reg_list: list[int], use_bits: bool): """Build registers for single SimData.""" flag_normal = self.__build_flags(simdata) - blocks_regs = simdata.build_registers(self.endian, self.string_encoding, False) + blocks_regs = simdata.build_registers(self.endian, self.string_encoding, use_bits) for _ in range(simdata.count): first = True for register in blocks_regs: @@ -212,7 +212,7 @@ def __create_simdata(self, simdata: SimData, flag_list: list[int], reg_list: li flag_list.append(flag_normal & ~SimUtils.RunTimeFlag_TYPE) reg_list.append(register) - def __create_block(self, simdata: list[SimData]) -> SimRegs: + def __create_block(self, simdata: list[SimData], use_bits: bool) -> SimRegs: """Create registers for device.""" flag_list: list[int] = [] reg_list: list[int] = [] @@ -223,7 +223,7 @@ def __create_block(self, simdata: list[SimData]) -> SimRegs: flag_list.append(DataType.INVALID) reg_list.append(0) next_address += 1 - self.__create_simdata(entry, flag_list, reg_list) + self.__create_simdata(entry, flag_list, reg_list, use_bits) flag_list.append(DataType.INVALID) reg_list.append(0) return (start_address, reg_list, flag_list) @@ -234,12 +234,12 @@ def build_device(self) -> SimRegs | dict[str, SimRegs]: if not isinstance(self.simdata, tuple): x_simdata = self.simdata if isinstance(self.simdata, list) else [self.simdata] x_simdata.sort(key=lambda x: x.address) - return self.__create_block(x_simdata) + return self.__create_block(x_simdata, False) b: dict[str, SimRegs] = {} # (, , , ) convert = {0: "c", 1: "d", 2: "h", 3: "i"} for i in range(4): x_simdata = cast(tuple, self.simdata)[i] x_simdata.sort(key=lambda x: x.address) - b[convert[i]] = self.__create_block(x_simdata) + b[convert[i]] = self.__create_block(x_simdata, (i in {0,1})) return b diff --git a/pymodbus/simulator/simruntime.py b/pymodbus/simulator/simruntime.py index a85411b75..fe82f3fcd 100644 --- a/pymodbus/simulator/simruntime.py +++ b/pymodbus/simulator/simruntime.py @@ -5,8 +5,7 @@ from __future__ import annotations from ..constants import ExcCodes -from ..pdu.utils import unpack_bitstring -from .simdevice import SimDevice, SimRegs +from .simdevice import SimDevice from .simutils import DataType, SimUtils @@ -36,26 +35,10 @@ def __init__(self, device: SimDevice): ) return self.shared = False - b_h = build["h"] - b_i = build["i"] - self.block = { - "c": self.convert_to_bit(build["c"]), - "d": self.convert_to_bit(build["d"]), - "h": (b_h[0], len(b_h[1]), b_h[1], b_h[2]), - "i": (b_i[0], len(b_i[1]), b_i[1], b_i[2]), - } - - @classmethod - def convert_to_bit(cls, block: SimRegs) -> tuple[int, int, list[int], list[int]]: - """Convert registers to bits.""" - new_registers: list[int] = [] - for entry in block[2]: - bool_list = unpack_bitstring(entry.to_bytes(2, byteorder="big")) - for x in bool_list: - new_registers.append(1 if x else 0) - new_flags: list[int] = [block[1][0]]*len(new_registers) - return (block[0], len(new_flags), new_flags, new_registers) - + self.block = {} + for i in ("c", "d", "h", "i"): + x = build[i] + self.block[i] = (x[0], len(x[1]), x[1], x[2]) async def get_block(self, func_code: int, address: int, count: int, values: list[int] | list[bool] | None) -> list[int] | list[bool] | ExcCodes: """Calculate offset.""" diff --git a/test/datastore/test_context.py b/test/datastore/test_context.py index 5a9f6bd9d..71c9dfdd6 100644 --- a/test/datastore/test_context.py +++ b/test/datastore/test_context.py @@ -19,7 +19,7 @@ async def test_datastore_device_Values(self): async def test_datastore_device_not_ok(self): """Test ModbusDeviceContext.""" - block = ModbusSequentialDataBlock(1, [17] * 8) + block = ModbusSequentialDataBlock(1, [17] * 16) ModbusDeviceContext(di=block, co=block, hr=block, ir=block) def test_datastore_server(self): @@ -37,9 +37,24 @@ def test_datastore_server_ids(self): srv = ModbusServerContext(devices=ModbusDeviceContext()) assert isinstance(srv.device_ids(), list) + async def test_datastore_build(self): + """Test ModbusServerContext.""" + block = ModbusSequentialDataBlock(1, 17) + dev = ModbusDeviceContext(di=block, co=block, hr=block, ir=block) + srv = ModbusServerContext(devices={1: dev}, single=False) + assert srv.device_ids() == [1] + await srv.async_setValues(1, 0x05, 0, [1]) + assert await srv.async_getValues(1, 0x03, 0) == ExcCodes.DEVICE_BUSY + with pytest.raises(NoSuchIdException): + await srv.async_getValues(15, 0, 0) + + async def test_datastore_build2(self): + """Test ModbusServerContext.""" + ModbusServerContext(devices=ModbusDeviceContext(), single=True) + async def test_datastore_server_device_id(self): """Test ModbusServerContext.""" - block = ModbusSequentialDataBlock(1, [17] * 8) + block = ModbusSequentialDataBlock(1, [17] * 16) dev = ModbusDeviceContext(di=block, co=block, hr=block, ir=block) srv = ModbusServerContext(devices={1: dev}, single=False) assert srv.device_ids() == [1] @@ -51,7 +66,7 @@ async def test_datastore_server_device_id(self): async def test_datastore_server_device_id_0(self): """Test ModbusServerContext.""" - block = ModbusSequentialDataBlock(1, [17] * 8) + block = ModbusSequentialDataBlock(1, [17] * 16) dev = ModbusDeviceContext(di=block, co=block, hr=block, ir=block) srv = ModbusServerContext(devices={0: dev}, single=False) await srv.async_getValues(15, 0x03, 0) diff --git a/test/simulator/test_simdevice.py b/test/simulator/test_simdevice.py index 04a399901..1eca9d042 100644 --- a/test/simulator/test_simdevice.py +++ b/test/simulator/test_simdevice.py @@ -194,8 +194,8 @@ def test_simdevice_build_blocks(self): [SimData(1, values=123, datatype=DataType.INT16)], [SimData(1, values=123, datatype=DataType.INT16)]) result = { - "c": (1, [123, 0], [DataType.BITS, DataType.INVALID]), - "d": (1, [123, 0], [DataType.BITS, DataType.INVALID]), + "c": (1, [0]*8 + [1]*2 + [0] + [1]*4 + [0]*2, [DataType.BITS] + [0] * 15 + [DataType.INVALID]), + "d": (1, [0]*8 + [1]*2 + [0] + [1]*4 + [0]*2, [DataType.BITS] + [0] * 15 + [DataType.INVALID]), "h": (1, [123, 0], [DataType.INT16, DataType.INVALID]), "i": (1, [123,0], [DataType.INT16, DataType.INVALID]) } @@ -216,8 +216,8 @@ def test_simdevice_build_bits(self): [SimData(1, values=123, datatype=DataType.INT16)] )) result_block = cast(dict[str, SimRegs], sd.build_device()) - assert len(result_block["c"][1]) == 2 - assert len(result_block["d"][1]) == 2 + assert len(result_block["c"][1]) == 17 + assert len(result_block["d"][1]) == 17 assert len(result_block["h"][1]) == 2 assert len(result_block["i"][1]) == 2 diff --git a/test/simulator/test_simruntime.py b/test/simulator/test_simruntime.py index ce799a995..474d7ba2b 100644 --- a/test/simulator/test_simruntime.py +++ b/test/simulator/test_simruntime.py @@ -38,16 +38,6 @@ def test_simruntime_instanciate(self, kwargs): sd = SimDevice(**kwargs) SimRuntime(sd) - @pytest.mark.parametrize(("block", "expect"), [ - ((3, [1], [0xffff]), (3, 16, [1]*16, [1]*16)), - ((3, [1], [0x0000]), (3, 16, [1]*16, [0]*16)), - ((3, [1], [0xffff, 0xffff]), (3, 32, [1]*32, [1]*32)), - ]) - async def test_simruntime_convert_bit(self, block, expect): - """Test that simdata can be objects.""" - result = SimRuntime.convert_to_bit(block) - assert result == expect - @pytest.mark.parametrize(("args", "expect"), [ ((3, 1, 1, None), -1), ((3, 200, 1, None), -1),