modjam/tests/test_meshcore_radio.py

98 lines
3 KiB
Python
Raw Normal View History

"""Tests for MeshCoreRadio.send tx-complete waiting.
Uses a stub `_mc` object so we don't need the meshcore lib or real hardware.
"""
import asyncio
import sys
import types
import pytest
@pytest.fixture(autouse=True)
def _stub_meshcore_module(monkeypatch):
"""Provide a fake `meshcore` module so MeshCoreRadio import inside
`connect()` would succeed if called. The send/wait tests below don't
call connect; they construct the radio manually.
"""
fake = types.ModuleType("meshcore")
fake.MeshCore = object
fake.EventType = types.SimpleNamespace(CHANNEL_MSG_RECV="rx")
monkeypatch.setitem(sys.modules, "meshcore", fake)
class _Event:
def __init__(self, payload, type_name="OK"):
self.payload = payload
self.type = types.SimpleNamespace(name=type_name)
class _StubCommands:
def __init__(self, nb_sent_sequence, send_response=None):
# nb_sent_sequence: list of values returned by successive
# get_stats_packets calls.
self._nb_sent = list(nb_sent_sequence)
self.sends: list[tuple[int, str]] = []
self._send_response = send_response or _Event({"packet_id": 7}, "OK")
async def get_stats_packets(self):
val = self._nb_sent.pop(0) if self._nb_sent else self._nb_sent[-1]
return _Event({"nb_sent": val})
async def send_chan_msg(self, chan, msg):
self.sends.append((chan, msg))
return self._send_response
class _StubMC:
def __init__(self, commands):
self.commands = commands
def _make_radio(commands):
from modjam.radio.meshcore import MeshCoreRadio
r = MeshCoreRadio()
r._mc = _StubMC(commands)
return r
def test_send_waits_until_nb_sent_increments():
# baseline read returns 100; after send poll returns 100, 100, 101
cmds = _StubCommands(nb_sent_sequence=[100, 100, 100, 101])
radio = _make_radio(cmds)
result = asyncio.run(radio.send("hello"))
assert cmds.sends == [(0, "hello")]
assert result.duration_ms >= 0
# baseline + at least 2 polls before increment
assert cmds._nb_sent == [] # all poll values consumed
def test_send_returns_immediately_when_increment_already_observed():
cmds = _StubCommands(nb_sent_sequence=[5, 7]) # baseline=5, first poll=7
radio = _make_radio(cmds)
result = asyncio.run(radio.send("x"))
assert result.packet_id == 7 # from send_chan_msg payload
def test_send_raises_on_error_event():
cmds = _StubCommands(
nb_sent_sequence=[0],
send_response=_Event({"error": "no_radio"}, "ERROR"),
)
radio = _make_radio(cmds)
with pytest.raises(RuntimeError, match="send_chan_msg failed"):
asyncio.run(radio.send("x"))
def test_wait_for_tx_times_out():
# nb_sent never increments
cmds = _StubCommands(nb_sent_sequence=[10] * 1000)
radio = _make_radio(cmds)
async def go():
baseline = await radio._read_nb_sent()
await radio._wait_for_tx(baseline, timeout=0.1, poll_interval=0.01)
with pytest.raises(TimeoutError):
asyncio.run(go())