All checks were successful
tests / pytest (push) Successful in 1m24s
Two related changes to make sender/receiver logs analyzable:
1. MeshCoreRadio.send no longer returns until the radio firmware
reports the packet was actually transmitted. send_chan_msg only
waits for EventType.OK (firmware accepted bytes) — no
tx-complete event is pushed for channel messages, so we poll
commands.get_stats_packets() and wait for nb_sent to strictly
increment past a pre-send baseline. Times out after 30s.
2. Each test packet now embeds a random 8-hex-char token as the
4th comma-separated field of the prefix, distinct from any
radio-assigned packet id. Receivers extract it from the text
and write it on every TSV row, so a sender's queued/sent rows
tie 1:1 to each receiver's record without depending on
whatever packet id the radio surfaces on each side.
TSV gains a new column (3rd, after packet_id):
queued: ts queued <id> <token> <freq,bw,sf,cr,pow>
sent: ts sent <id> <token> <duration_ms> <text>
received: ts received <id> <token> <text>
Missing token writes "-".
UDP simulator regex updated to accept both legacy 3-field test
payloads and the new 4-field format with token.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
97 lines
3 KiB
Python
97 lines
3 KiB
Python
"""Tests for MeshCoreRadio.send tx-complete waiting.
|
|
|
|
Uses a stub `_mc` object so we don't need the meshcore lib or real hardware.
|
|
"""
|
|
import asyncio
|
|
import sys
|
|
import types
|
|
|
|
import pytest
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _stub_meshcore_module(monkeypatch):
|
|
"""Provide a fake `meshcore` module so MeshCoreRadio import inside
|
|
`connect()` would succeed if called. The send/wait tests below don't
|
|
call connect; they construct the radio manually.
|
|
"""
|
|
fake = types.ModuleType("meshcore")
|
|
fake.MeshCore = object
|
|
fake.EventType = types.SimpleNamespace(CHANNEL_MSG_RECV="rx")
|
|
monkeypatch.setitem(sys.modules, "meshcore", fake)
|
|
|
|
|
|
class _Event:
|
|
def __init__(self, payload, type_name="OK"):
|
|
self.payload = payload
|
|
self.type = types.SimpleNamespace(name=type_name)
|
|
|
|
|
|
class _StubCommands:
|
|
def __init__(self, nb_sent_sequence, send_response=None):
|
|
# nb_sent_sequence: list of values returned by successive
|
|
# get_stats_packets calls.
|
|
self._nb_sent = list(nb_sent_sequence)
|
|
self.sends: list[tuple[int, str]] = []
|
|
self._send_response = send_response or _Event({"packet_id": 7}, "OK")
|
|
|
|
async def get_stats_packets(self):
|
|
val = self._nb_sent.pop(0) if self._nb_sent else self._nb_sent[-1]
|
|
return _Event({"nb_sent": val})
|
|
|
|
async def send_chan_msg(self, chan, msg):
|
|
self.sends.append((chan, msg))
|
|
return self._send_response
|
|
|
|
|
|
class _StubMC:
|
|
def __init__(self, commands):
|
|
self.commands = commands
|
|
|
|
|
|
def _make_radio(commands):
|
|
from modjam.radio.meshcore import MeshCoreRadio
|
|
r = MeshCoreRadio()
|
|
r._mc = _StubMC(commands)
|
|
return r
|
|
|
|
|
|
def test_send_waits_until_nb_sent_increments():
|
|
# baseline read returns 100; after send poll returns 100, 100, 101
|
|
cmds = _StubCommands(nb_sent_sequence=[100, 100, 100, 101])
|
|
radio = _make_radio(cmds)
|
|
result = asyncio.run(radio.send("hello"))
|
|
assert cmds.sends == [(0, "hello")]
|
|
assert result.duration_ms >= 0
|
|
# baseline + at least 2 polls before increment
|
|
assert cmds._nb_sent == [] # all poll values consumed
|
|
|
|
|
|
def test_send_returns_immediately_when_increment_already_observed():
|
|
cmds = _StubCommands(nb_sent_sequence=[5, 7]) # baseline=5, first poll=7
|
|
radio = _make_radio(cmds)
|
|
result = asyncio.run(radio.send("x"))
|
|
assert result.packet_id == 7 # from send_chan_msg payload
|
|
|
|
|
|
def test_send_raises_on_error_event():
|
|
cmds = _StubCommands(
|
|
nb_sent_sequence=[0],
|
|
send_response=_Event({"error": "no_radio"}, "ERROR"),
|
|
)
|
|
radio = _make_radio(cmds)
|
|
with pytest.raises(RuntimeError, match="send_chan_msg failed"):
|
|
asyncio.run(radio.send("x"))
|
|
|
|
|
|
def test_wait_for_tx_times_out():
|
|
# nb_sent never increments
|
|
cmds = _StubCommands(nb_sent_sequence=[10] * 1000)
|
|
radio = _make_radio(cmds)
|
|
|
|
async def go():
|
|
baseline = await radio._read_nb_sent()
|
|
await radio._wait_for_tx(baseline, timeout=0.1, poll_interval=0.01)
|
|
|
|
with pytest.raises(TimeoutError):
|
|
asyncio.run(go())
|