Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 26 additions & 19 deletions pymodbus/datastore/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

from copy import deepcopy

from ..constants import ExcCodes
from ..exceptions import NoSuchIdException
from ..logging import Log
Expand All @@ -25,36 +27,41 @@ class ModbusDeviceContext: # pylint: disable=too-few-public-methods
_fx_mapper.update([(i, "h") for i in (3, 6, 16, 22, 23)])
_fx_mapper.update([(i, "c") for i in (1, 5, 15)])

def __build_data(self, simdata):
"""Build and/or correct values."""
for entry in simdata:
entry.datatype = DataType.BITS
if not isinstance(entry.values, list):
entry.values = [entry.values]
for i, _value in enumerate(entry.values):
entry.values[i] = bool(entry.values[i])

def __init__(self, *_args,
di: ModbusSequentialDataBlock | ModbusSparseDataBlock | None = None,
co: ModbusSequentialDataBlock | ModbusSparseDataBlock | None = None,
ir: ModbusSequentialDataBlock | ModbusSparseDataBlock | None = None,
hr: ModbusSequentialDataBlock | ModbusSparseDataBlock | None = None,
):
"""Initialize the datastores."""
self.store = {
"d": di,
"c": co,
"i": ir,
"h": hr,
}
"""Define device."""
if not di:
di = ModbusSequentialDataBlock(0, values=0)
di = ModbusSequentialDataBlock(1, values=False)
if not co:
co = ModbusSequentialDataBlock(0, values=0)
co = ModbusSequentialDataBlock(1, values=False)
if not ir:
ir = ModbusSequentialDataBlock(0, values=0)
ir = ModbusSequentialDataBlock(1, values=0)
if not hr:
hr = ModbusSequentialDataBlock(0, values=0)
for entry in co.simdata:
entry.datatype = DataType.BITS
for entry in di.simdata:
entry.datatype = DataType.BITS
hr = ModbusSequentialDataBlock(1, values=0)
co_simdata = deepcopy(co.simdata)
di_simdata = deepcopy(di.simdata)
ir_simdata = deepcopy(ir.simdata)
hr_simdata = deepcopy(hr.simdata)
self.__build_data(co_simdata)
self.__build_data(di_simdata)
self.simdevice = SimDevice(0, simdata=(
co.simdata,
di.simdata,
ir.simdata,
hr.simdata))
co_simdata,
di_simdata,
ir_simdata,
hr_simdata))
Log.warning("ModbusDeviceContext, ModbusSequentialDataBlock, "
"ModbusSparseDataBlock are deprecated "
"and will be removed in v4.\n"
Expand Down
2 changes: 1 addition & 1 deletion pymodbus/datastore/sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ def __init__(self, address, values):
:param address: The starting address of the datastore
:param values: Either a list or a dictionary of values
"""
self.simdata = [SimData(address, values=values, datatype=DataType.REGISTERS)]
self.simdata = [SimData(address-1, values=values, datatype=DataType.REGISTERS)]
24 changes: 12 additions & 12 deletions pymodbus/simulator/simdevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,18 @@ def __check_simple_blocks(self):
if i < 2 and entry.datatype != DataType.BITS:
raise TypeError(f"simdata[{inx}]=tuple[{TUPLE_NAMES[i]}] -> list[{inx}] not DataType.BITS, not allowed")

def __check_block(self, block: list[SimData], name: str):
def __check_block(self, block: list[SimData], use_bits, name: str):
"""Check block content."""
x_block = sorted(block, key=lambda x: x.address)
last_address = x_block[0].address -1
for entry in x_block:
last_address = self.__check_block_entries(last_address, entry, name)
last_address = self.__check_block_entries(last_address, entry, use_bits, name)

def __check_block_entries(self, last_address: int, entry: SimData, _name: str) -> int:
def __check_block_entries(self, last_address: int, entry: SimData, use_bits: bool, _name: str) -> int:
"""Check block entries."""
if entry.address <= last_address:
raise TypeError(f"SimData address {entry.address} is overlapping!")
blocks_regs = entry.build_registers(self.endian, self.string_encoding, False) * entry.count
blocks_regs = entry.build_registers(self.endian, self.string_encoding, use_bits) * entry.count
return last_address + len(blocks_regs)

def __check_parameters(self):
Expand All @@ -182,10 +182,10 @@ def __check_parameters(self):
self.__check_simple2()
if isinstance(self.simdata, tuple):
for i in range(4):
self.__check_block(cast(tuple,self.simdata)[i], TUPLE_NAMES[i])
self.__check_block(cast(tuple,self.simdata)[i], (i in {0,1}), TUPLE_NAMES[i])
else:
x_simdata = self.simdata if isinstance(self.simdata, list) else [self.simdata]
self.__check_block(x_simdata, "list")
self.__check_block(x_simdata, False, "list")

def __post_init__(self):
"""Define a device."""
Expand All @@ -198,10 +198,10 @@ def __build_flags(self, simdata: SimData) -> int:
flag_normal |= SimUtils.RunTimeFlag_READONLY
return flag_normal

def __create_simdata(self, simdata: SimData, flag_list: list[int], reg_list: list[int]):
def __create_simdata(self, simdata: SimData, flag_list: list[int], reg_list: list[int], use_bits: bool):
"""Build registers for single SimData."""
flag_normal = self.__build_flags(simdata)
blocks_regs = simdata.build_registers(self.endian, self.string_encoding, False)
blocks_regs = simdata.build_registers(self.endian, self.string_encoding, use_bits)
for _ in range(simdata.count):
first = True
for register in blocks_regs:
Expand All @@ -212,7 +212,7 @@ def __create_simdata(self, simdata: SimData, flag_list: list[int], reg_list: li
flag_list.append(flag_normal & ~SimUtils.RunTimeFlag_TYPE)
reg_list.append(register)

def __create_block(self, simdata: list[SimData]) -> SimRegs:
def __create_block(self, simdata: list[SimData], use_bits: bool) -> SimRegs:
"""Create registers for device."""
flag_list: list[int] = []
reg_list: list[int] = []
Expand All @@ -223,7 +223,7 @@ def __create_block(self, simdata: list[SimData]) -> SimRegs:
flag_list.append(DataType.INVALID)
reg_list.append(0)
next_address += 1
self.__create_simdata(entry, flag_list, reg_list)
self.__create_simdata(entry, flag_list, reg_list, use_bits)
flag_list.append(DataType.INVALID)
reg_list.append(0)
return (start_address, reg_list, flag_list)
Expand All @@ -234,12 +234,12 @@ def build_device(self) -> SimRegs | dict[str, SimRegs]:
if not isinstance(self.simdata, tuple):
x_simdata = self.simdata if isinstance(self.simdata, list) else [self.simdata]
x_simdata.sort(key=lambda x: x.address)
return self.__create_block(x_simdata)
return self.__create_block(x_simdata, False)
b: dict[str, SimRegs] = {}
# (<coils>, <discrete inputs>, <holding registers>, <input registers>)
convert = {0: "c", 1: "d", 2: "h", 3: "i"}
for i in range(4):
x_simdata = cast(tuple, self.simdata)[i]
x_simdata.sort(key=lambda x: x.address)
b[convert[i]] = self.__create_block(x_simdata)
b[convert[i]] = self.__create_block(x_simdata, (i in {0,1}))
return b
27 changes: 5 additions & 22 deletions pymodbus/simulator/simruntime.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from __future__ import annotations

from ..constants import ExcCodes
from ..pdu.utils import unpack_bitstring
from .simdevice import SimDevice, SimRegs
from .simdevice import SimDevice
from .simutils import DataType, SimUtils


Expand Down Expand Up @@ -36,26 +35,10 @@ def __init__(self, device: SimDevice):
)
return
self.shared = False
b_h = build["h"]
b_i = build["i"]
self.block = {
"c": self.convert_to_bit(build["c"]),
"d": self.convert_to_bit(build["d"]),
"h": (b_h[0], len(b_h[1]), b_h[1], b_h[2]),
"i": (b_i[0], len(b_i[1]), b_i[1], b_i[2]),
}

@classmethod
def convert_to_bit(cls, block: SimRegs) -> tuple[int, int, list[int], list[int]]:
"""Convert registers to bits."""
new_registers: list[int] = []
for entry in block[2]:
bool_list = unpack_bitstring(entry.to_bytes(2, byteorder="big"))
for x in bool_list:
new_registers.append(1 if x else 0)
new_flags: list[int] = [block[1][0]]*len(new_registers)
return (block[0], len(new_flags), new_flags, new_registers)

self.block = {}
for i in ("c", "d", "h", "i"):
x = build[i]
self.block[i] = (x[0], len(x[1]), x[1], x[2])

async def get_block(self, func_code: int, address: int, count: int, values: list[int] | list[bool] | None) -> list[int] | list[bool] | ExcCodes:
"""Calculate offset."""
Expand Down
21 changes: 18 additions & 3 deletions test/datastore/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async def test_datastore_device_Values(self):

async def test_datastore_device_not_ok(self):
"""Test ModbusDeviceContext."""
block = ModbusSequentialDataBlock(1, [17] * 8)
block = ModbusSequentialDataBlock(1, [17] * 16)
ModbusDeviceContext(di=block, co=block, hr=block, ir=block)

def test_datastore_server(self):
Expand All @@ -37,9 +37,24 @@ def test_datastore_server_ids(self):
srv = ModbusServerContext(devices=ModbusDeviceContext())
assert isinstance(srv.device_ids(), list)

async def test_datastore_build(self):
"""Test ModbusServerContext."""
block = ModbusSequentialDataBlock(1, 17)
dev = ModbusDeviceContext(di=block, co=block, hr=block, ir=block)
srv = ModbusServerContext(devices={1: dev}, single=False)
assert srv.device_ids() == [1]
await srv.async_setValues(1, 0x05, 0, [1])
assert await srv.async_getValues(1, 0x03, 0) == ExcCodes.DEVICE_BUSY
with pytest.raises(NoSuchIdException):
await srv.async_getValues(15, 0, 0)

async def test_datastore_build2(self):
"""Test ModbusServerContext."""
ModbusServerContext(devices=ModbusDeviceContext(), single=True)

async def test_datastore_server_device_id(self):
"""Test ModbusServerContext."""
block = ModbusSequentialDataBlock(1, [17] * 8)
block = ModbusSequentialDataBlock(1, [17] * 16)
dev = ModbusDeviceContext(di=block, co=block, hr=block, ir=block)
srv = ModbusServerContext(devices={1: dev}, single=False)
assert srv.device_ids() == [1]
Expand All @@ -51,7 +66,7 @@ async def test_datastore_server_device_id(self):

async def test_datastore_server_device_id_0(self):
"""Test ModbusServerContext."""
block = ModbusSequentialDataBlock(1, [17] * 8)
block = ModbusSequentialDataBlock(1, [17] * 16)
dev = ModbusDeviceContext(di=block, co=block, hr=block, ir=block)
srv = ModbusServerContext(devices={0: dev}, single=False)
await srv.async_getValues(15, 0x03, 0)
8 changes: 4 additions & 4 deletions test/simulator/test_simdevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ def test_simdevice_build_blocks(self):
[SimData(1, values=123, datatype=DataType.INT16)],
[SimData(1, values=123, datatype=DataType.INT16)])
result = {
"c": (1, [123, 0], [DataType.BITS, DataType.INVALID]),
"d": (1, [123, 0], [DataType.BITS, DataType.INVALID]),
"c": (1, [0]*8 + [1]*2 + [0] + [1]*4 + [0]*2, [DataType.BITS] + [0] * 15 + [DataType.INVALID]),
"d": (1, [0]*8 + [1]*2 + [0] + [1]*4 + [0]*2, [DataType.BITS] + [0] * 15 + [DataType.INVALID]),
"h": (1, [123, 0], [DataType.INT16, DataType.INVALID]),
"i": (1, [123,0], [DataType.INT16, DataType.INVALID])
}
Expand All @@ -216,8 +216,8 @@ def test_simdevice_build_bits(self):
[SimData(1, values=123, datatype=DataType.INT16)]
))
result_block = cast(dict[str, SimRegs], sd.build_device())
assert len(result_block["c"][1]) == 2
assert len(result_block["d"][1]) == 2
assert len(result_block["c"][1]) == 17
assert len(result_block["d"][1]) == 17
assert len(result_block["h"][1]) == 2
assert len(result_block["i"][1]) == 2

Expand Down
10 changes: 0 additions & 10 deletions test/simulator/test_simruntime.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,6 @@ def test_simruntime_instanciate(self, kwargs):
sd = SimDevice(**kwargs)
SimRuntime(sd)

@pytest.mark.parametrize(("block", "expect"), [
((3, [1], [0xffff]), (3, 16, [1]*16, [1]*16)),
((3, [1], [0x0000]), (3, 16, [1]*16, [0]*16)),
((3, [1], [0xffff, 0xffff]), (3, 32, [1]*32, [1]*32)),
])
async def test_simruntime_convert_bit(self, block, expect):
"""Test that simdata can be objects."""
result = SimRuntime.convert_to_bit(block)
assert result == expect

@pytest.mark.parametrize(("args", "expect"), [
((3, 1, 1, None), -1),
((3, 200, 1, None), -1),
Expand Down