Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions pymodbus/transaction/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from threading import RLock

from pymodbus.exceptions import ConnectionException, ModbusIOException
from pymodbus.framer import FramerBase
from pymodbus.framer import FramerAscii, FramerBase, FramerRTU
from pymodbus.logging import Log
from pymodbus.pdu import ExceptionResponse, ModbusPDU
from pymodbus.transport import CommParams, ModbusProtocol
Expand Down Expand Up @@ -49,6 +49,7 @@ def __init__(
self.retries = retries
self.next_tid: int = 0
self.request_dev_id: int = 0
self.request_transaction_id: int = 0
self.trace_packet = trace_packet or self.dummy_trace_packet
self.trace_pdu = trace_pdu or self.dummy_trace_pdu
self.trace_connect = trace_connect or self.dummy_trace_connect
Expand Down Expand Up @@ -191,6 +192,7 @@ async def execute(self, no_response_expected: bool, request: ModbusPDU) -> Modbu
def pdu_send(self, pdu: ModbusPDU, addr: tuple | None = None) -> None:
"""Build byte stream and send."""
self.request_dev_id = pdu.dev_id
self.request_transaction_id = pdu.transaction_id
packet = self.framer.buildFrame(self.trace_pdu(True, pdu))
if self.is_sync and self.comm_params.handle_local_echo:
self.sent_buffer = packet
Expand Down Expand Up @@ -218,19 +220,20 @@ def callback_data(self, data: bytes, addr: tuple | None = None) -> int:
self.last_addr = addr
if not self.is_server:
if pdu.dev_id != self.request_dev_id:
raise ModbusIOException(
f"ERROR: request ask for id={self.request_dev_id} but got id={pdu.dev_id}, CLOSING CONNECTION."
)
if self.response_future.done():
raise ModbusIOException("received pdu without a corresponding request")
self.response_future.set_result(self.last_pdu

)
Log.warning(f"ERROR: expected id {self.request_dev_id} but got {pdu.dev_id}, IGNORING.")
elif pdu.transaction_id != self.request_transaction_id:
Log.warning(f"ERROR: expected transaction {self.request_transaction_id} but got {pdu.transaction_id}, IGNORING.")
elif self.response_future.done():
Log.warning("ERROR: received pdu without a corresponding request, IGNORING")
else:
self.response_future.set_result(self.last_pdu)
return used_len

def getNextTID(self) -> int:
"""Retrieve the next transaction identifier."""
if self.next_tid >= 65000:
if isinstance(self.framer, (FramerAscii, FramerRTU)):
self.next_tid = 0
elif self.next_tid >= 65000:
self.next_tid = 1
else:
self.next_tid += 1
Expand Down
64 changes: 58 additions & 6 deletions test/transaction/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def test_transaction_manager_tid(self, use_clc):
"""Test next TID."""
transact = TransactionManager(
use_clc,
FramerRTU(DecodePDU(False)),
FramerSocket(DecodePDU(False)),
5,
False,
None,
Expand Down Expand Up @@ -161,8 +161,7 @@ async def test_transaction_data_2(self, use_clc, test):
else:
pdu.dev_id = 0
transact.response_future.set_result((1, pdu))
with pytest.raises(ModbusIOException):
transact.callback_data(packet)
transact.callback_data(packet)

@pytest.mark.parametrize("scenario", range(8))
async def test_transaction_execute(self, use_clc, scenario):
Expand Down Expand Up @@ -264,6 +263,7 @@ async def test_client_protocol_execute_outside(self, use_clc, no_resp):
None,
)
transact.send = mock.Mock()
transact.comm_params.timeout_connect = 0.1
request = ReadCoilsRequest(address=117, count=5, dev_id=1)
transact.retries = 0
transact.connection_made(mock.AsyncMock())
Expand All @@ -272,13 +272,13 @@ async def test_client_protocol_execute_outside(self, use_clc, no_resp):
await asyncio.sleep(0.2)
data = b"\x00\x00\x12\x34\x00\x06\x01\x01\x01\x02\x00\x04"
transact.data_received(data)
result = await resp
if no_resp:
result = await resp
assert result.isError()
assert isinstance(result, ExceptionResponse)
else:
assert not result.isError()
assert isinstance(result, ReadCoilsResponse)
with pytest.raises(ModbusIOException):
await resp

async def test_transaction_id0(self, use_clc):
"""Test tracers in disconnect."""
Expand Down Expand Up @@ -315,6 +315,58 @@ async def test_transaction_id0(self, use_clc):
await asyncio.sleep(0.1)
assert response == await resp

@pytest.mark.parametrize(("framer"), [FramerRTU, FramerSocket])
@pytest.mark.parametrize("scenario", range(2))
async def test_delayed_response(self, use_clc, framer, scenario):
"""Test delayed rtu response combined with retries."""
transact = TransactionManager(
use_clc,
framer(DecodePDU(False)),
5,
False,
None,
None,
None,
)
transact.send = mock.Mock()
request1 = ReadCoilsRequest(address=117, count=5, dev_id=1)
request2 = ReadCoilsRequest(address=118, count=2, dev_id=1)
response1 = ReadCoilsResponse(bits=[True, False, True, True] + [False]*4, dev_id=1)
response2 = ReadCoilsResponse(bits=[True] + [False]*7, dev_id=1)
if framer == FramerRTU:
cb_response1 = b'\x01\x01\x01\r\x90M'
cb_response2 = b'\x01\x01\x01\x01\x90H'
else:
cb_response1 = b'\x00\x01\x00\x00\x00\x04\x01\x01\x01\r'
cb_response2 = b'\x00\x02\x00\x00\x00\x04\x01\x01\x01\x01'
transact.retries = 1
transact.connection_made(mock.AsyncMock())
transact.transport.write = mock.Mock()
transact.comm_params.timeout_connect = 0.1

if scenario == 0: # timeout + double response
resp = asyncio.create_task(transact.execute(False, request1))
await asyncio.sleep(0.15)
transact.callback_data(cb_response1, None)
transact.callback_data(cb_response1, None)
result = await resp
assert result.bits == response1.bits
elif scenario == 1: # timeout + new request + double response
resp = asyncio.create_task(transact.execute(False, request1))
await asyncio.sleep(0.25)
with pytest.raises(ModbusIOException):
await resp
resp = asyncio.create_task(transact.execute(False, request2))
await asyncio.sleep(0.05)
transact.callback_data(cb_response1, None)
transact.callback_data(cb_response2, None)
result = await resp
if framer == FramerRTU:
# Return WRONG response
assert result.bits == response1.bits
else:
# Return CORRECT response
assert result.bits == response2.bits

@pytest.mark.parametrize("use_port", [5098])
class TestSyncTransaction:
Expand Down