diff --git a/modjam/log.py b/modjam/log.py index 1bddce6..15a771a 100644 --- a/modjam/log.py +++ b/modjam/log.py @@ -4,11 +4,15 @@ from typing import IO class TsvLogger: - """Append-only TSV logger matching reference/A.tsv + reference/B.tsv format. + """Append-only TSV logger. - queued: queued ,,,, - sent: sent - received: received + Columns: + queued: queued ,,,, + sent: sent + received: received + + `token` is the in-payload identifier we control (see modjam.payload), + written as `-` when absent (e.g. for non-test traffic). """ def __init__(self, station: str, log_dir: str | Path = ".", test_name: str | None = None): @@ -18,14 +22,18 @@ class TsvLogger: self._fh: IO = open(path, "a", buffering=1) # line-buffered self.path = path - def queued(self, packet_id: int | str, freq: float, bw: float, sf: int, cr: int, pw: int) -> None: - self._fh.write(f"{time()}\tqueued\t{packet_id}\t{freq},{bw},{sf},{cr},{pw}\n") + @staticmethod + def _tok(token: str | None) -> str: + return token if token else "-" - def sent(self, packet_id: int | str, duration_ms: int, text: str) -> None: - self._fh.write(f"{time()}\tsent\t{packet_id}\t{duration_ms}\t{text}\n") + def queued(self, packet_id: int | str, token: str | None, freq: float, bw: float, sf: int, cr: int, pw: int) -> None: + self._fh.write(f"{time()}\tqueued\t{packet_id}\t{self._tok(token)}\t{freq},{bw},{sf},{cr},{pw}\n") - def received(self, packet_id: int | str, text: str) -> None: - self._fh.write(f"{time()}\treceived\t{packet_id}\t{text}\n") + def sent(self, packet_id: int | str, token: str | None, duration_ms: int, text: str) -> None: + self._fh.write(f"{time()}\tsent\t{packet_id}\t{self._tok(token)}\t{duration_ms}\t{text}\n") + + def received(self, packet_id: int | str, token: str | None, text: str) -> None: + self._fh.write(f"{time()}\treceived\t{packet_id}\t{self._tok(token)}\t{text}\n") def close(self) -> None: try: diff --git a/modjam/payload.py b/modjam/payload.py new file mode 100644 index 0000000..0a3ad5b --- /dev/null +++ b/modjam/payload.py @@ -0,0 +1,44 @@ +"""Test-packet payload encoding. + +Format: `,,,|` + +The trailing token is a random 8-hex-char identifier we control, distinct +from any radio-assigned packet id. It lets us correlate a sender's queued +log entry with each receiver's record of the same payload, without relying +on whatever packet id the radio happens to surface on each side. +""" +from __future__ import annotations + +import secrets +import string +from random import choice + + +TOKEN_LEN = 8 + + +def make_test_payload(size: int, seq: int, elapsed: float, ts: float) -> tuple[str, str]: + """Build a test-packet text payload of approximately `size` bytes. + + Returns (text, token). + """ + token = secrets.token_hex(TOKEN_LEN // 2) + prefix = f"{ts},{elapsed},{seq},{token}|" + if len(prefix) >= size: + return prefix, token + pad = "".join(choice(string.ascii_letters) for _ in range(size - len(prefix))) + return prefix + pad, token + + +def extract_token(text: str) -> str | None: + """Extract the token from a test-packet text, or None if not present.""" + if not text or "|" not in text: + return None + head = text.split("|", 1)[0] + parts = head.split(",") + if len(parts) < 4: + return None + candidate = parts[3] + if len(candidate) != TOKEN_LEN or not all(c in "0123456789abcdef" for c in candidate): + return None + return candidate diff --git a/modjam/radio/meshcore.py b/modjam/radio/meshcore.py index 67fad50..cb140cc 100644 --- a/modjam/radio/meshcore.py +++ b/modjam/radio/meshcore.py @@ -71,14 +71,47 @@ class MeshCoreRadio(Radio): self._configured_channel = (name, psk_b64) async def send(self, text: str) -> SendResult: + """Send a channel message and return only after the radio reports tx complete. + + meshcore's `send_chan_msg` resolves on EventType.OK, which signals the + firmware accepted the bytes — the LoRa modem may still be queuing or + transmitting. There is no firmware-emitted tx-complete event for + channel messages, so we poll `get_stats_packets` and wait for + `nb_sent` to increment past its pre-send baseline. (See + reference/meshcore-py/src/meshcore/commands/{messaging,device}.py and + parsing.py for the field definitions.) + """ assert self._mc + baseline = await self._read_nb_sent() t0 = time.perf_counter() result = await self._mc.commands.send_chan_msg(self._channel_idx, text) - ms = int((time.perf_counter() - t0) * 1000) payload = getattr(result, "payload", {}) or {} + rtype = getattr(getattr(result, "type", None), "name", "") + if rtype == "ERROR" or "error" in payload: + raise RuntimeError(f"send_chan_msg failed: {payload}") + await self._wait_for_tx(baseline) + ms = int((time.perf_counter() - t0) * 1000) pid = payload.get("packet_id") or payload.get("id") or next(self._counter) return SendResult(packet_id=pid, duration_ms=ms) + async def _read_nb_sent(self) -> int: + assert self._mc + ev = await self._mc.commands.get_stats_packets() + return int((getattr(ev, "payload", {}) or {}).get("nb_sent", 0)) + + async def _wait_for_tx(self, baseline: int, timeout: float = 30.0, + poll_interval: float = 0.05) -> None: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + cur = await self._read_nb_sent() + if cur > baseline: + return + await asyncio.sleep(poll_interval) + raise TimeoutError( + f"radio did not report tx complete within {timeout}s " + f"(nb_sent stuck at {baseline})" + ) + def on_message(self, cb: MessageHandler) -> None: self._handlers.append(cb) diff --git a/modjam/radio/udp.py b/modjam/radio/udp.py index 5af2a2f..259613e 100644 --- a/modjam/radio/udp.py +++ b/modjam/radio/udp.py @@ -15,10 +15,10 @@ from .base import MessageHandler, Radio, RxPacket, SendResult SIM_PORT = int(os.environ.get("SIM_PORT", "9000")) -# Test packet payload format: `,,|`. +# Test packet payload format: `,,[,]|`. # Heartbeat/next/done payloads start with `||...` — no comma # before the first `|` — so this regex matches test packets only. -_TEST_PACKET_RE = re.compile(r"^\d+(?:\.\d+)?,\d+(?:\.\d+)?,\d+\|") +_TEST_PACKET_RE = re.compile(r"^\d+(?:\.\d+)?,\d+(?:\.\d+)?,\d+(?:,[0-9a-f]+)?\|") def _psk_hash(psk_b64: str) -> str: diff --git a/modjam/station.py b/modjam/station.py index 15b3677..f2dd1ad 100644 --- a/modjam/station.py +++ b/modjam/station.py @@ -1,12 +1,10 @@ from __future__ import annotations import asyncio -import random -import string import time from datetime import datetime, timedelta -from . import cuesheet, protocol +from . import cuesheet, payload, protocol from .config import RadioConfig, StationConfig from .log import TsvLogger from .radio.base import Radio, RxPacket @@ -179,13 +177,16 @@ class TestStation: while time.time() < cue.end and not self._stop_run.is_set(): n += 1 self._sent_count = n - text = self._make_payload(cue.size, n, time.time() - case_start) + text, token = payload.make_test_payload(cue.size, n, time.time() - case_start, time.time()) assert self._logger try: - # cannot know packet_id before send; log queued with placeholder, then sent + # At queue time we don't yet have a radio packet id, so reuse + # the token (our cross-station correlator) for the id column. + self._logger.queued(token, token, cue.freq, cue.bw, cue.sf, cue.cr, cue.pow) + # radio.send returns only after the radio firmware reports the + # packet has actually been transmitted (see MeshCoreRadio). result = await self.radio.send(text) - self._logger.queued(result.packet_id, cue.freq, cue.bw, cue.sf, cue.cr, cue.pow) - self._logger.sent(result.packet_id, result.duration_ms, text) + self._logger.sent(result.packet_id, token, result.duration_ms, text) except Exception as e: print(f"[send] error: {e}") try: @@ -203,14 +204,6 @@ class TestStation: except asyncio.TimeoutError: pass - @staticmethod - def _make_payload(size: int, n: int, t: float) -> str: - prefix = f"{time.time()},{t},{n}|" - if len(prefix) >= size: - return prefix - pad = "".join(random.choice(string.ascii_letters) for _ in range(size - len(prefix))) - return prefix + pad - # ---- helpers ---- async def _tune(self, rc: RadioConfig) -> None: @@ -256,4 +249,5 @@ class TestStation: return if self._case_active and self._logger: self._rcvd_count += 1 - self._logger.received(pkt.packet_id, text) + token = payload.extract_token(text) + self._logger.received(pkt.packet_id, token, text) diff --git a/tests/test_log.py b/tests/test_log.py index 771f903..9a490e4 100644 --- a/tests/test_log.py +++ b/tests/test_log.py @@ -3,9 +3,9 @@ from modjam.log import TsvLogger def test_tsv_logger_writes_all_event_types(tmp_path): logger = TsvLogger("Z", tmp_path) - logger.queued(42, 915.1, 500.0, 7, 5, 22) - logger.sent(42, 13, "1234.5,0.0,1|abc") - logger.received(99, "1234.5,1.0,2|xyz") + logger.queued(42, "deadbeef", 915.1, 500.0, 7, 5, 22) + logger.sent(42, "deadbeef", 13, "1234.5,0.0,1,deadbeef|abc") + logger.received(99, "cafe1234", "1234.5,1.0,2,cafe1234|xyz") logger.close() rows = logger.path.read_text().strip().split("\n") @@ -14,24 +14,35 @@ def test_tsv_logger_writes_all_event_types(tmp_path): parts0 = rows[0].split("\t") assert parts0[1] == "queued" assert parts0[2] == "42" - assert parts0[3] == "915.1,500.0,7,5,22" + assert parts0[3] == "deadbeef" + assert parts0[4] == "915.1,500.0,7,5,22" parts1 = rows[1].split("\t") assert parts1[1] == "sent" assert parts1[2] == "42" - assert parts1[3] == "13" - assert parts1[4] == "1234.5,0.0,1|abc" + assert parts1[3] == "deadbeef" + assert parts1[4] == "13" + assert parts1[5] == "1234.5,0.0,1,deadbeef|abc" parts2 = rows[2].split("\t") assert parts2[1] == "received" assert parts2[2] == "99" - assert parts2[3] == "1234.5,1.0,2|xyz" + assert parts2[3] == "cafe1234" + assert parts2[4] == "1234.5,1.0,2,cafe1234|xyz" + + +def test_tsv_logger_renders_missing_token_as_dash(tmp_path): + logger = TsvLogger("Z", tmp_path) + logger.received(99, None, "no-token-text") + logger.close() + parts = logger.path.read_text().strip().split("\t") + assert parts[3] == "-" def test_tsv_logger_creates_log_dir(tmp_path): sub = tmp_path / "nested" / "logs" logger = TsvLogger("A", sub) - logger.queued(1, 0, 0, 0, 0, 0) + logger.queued(1, "tok", 0, 0, 0, 0, 0) logger.close() assert sub.is_dir() assert logger.path.exists() diff --git a/tests/test_meshcore_radio.py b/tests/test_meshcore_radio.py new file mode 100644 index 0000000..3ae2ed5 --- /dev/null +++ b/tests/test_meshcore_radio.py @@ -0,0 +1,97 @@ +"""Tests for MeshCoreRadio.send tx-complete waiting. + +Uses a stub `_mc` object so we don't need the meshcore lib or real hardware. +""" +import asyncio +import sys +import types + +import pytest + + +@pytest.fixture(autouse=True) +def _stub_meshcore_module(monkeypatch): + """Provide a fake `meshcore` module so MeshCoreRadio import inside + `connect()` would succeed if called. The send/wait tests below don't + call connect; they construct the radio manually. + """ + fake = types.ModuleType("meshcore") + fake.MeshCore = object + fake.EventType = types.SimpleNamespace(CHANNEL_MSG_RECV="rx") + monkeypatch.setitem(sys.modules, "meshcore", fake) + + +class _Event: + def __init__(self, payload, type_name="OK"): + self.payload = payload + self.type = types.SimpleNamespace(name=type_name) + + +class _StubCommands: + def __init__(self, nb_sent_sequence, send_response=None): + # nb_sent_sequence: list of values returned by successive + # get_stats_packets calls. + self._nb_sent = list(nb_sent_sequence) + self.sends: list[tuple[int, str]] = [] + self._send_response = send_response or _Event({"packet_id": 7}, "OK") + + async def get_stats_packets(self): + val = self._nb_sent.pop(0) if self._nb_sent else self._nb_sent[-1] + return _Event({"nb_sent": val}) + + async def send_chan_msg(self, chan, msg): + self.sends.append((chan, msg)) + return self._send_response + + +class _StubMC: + def __init__(self, commands): + self.commands = commands + + +def _make_radio(commands): + from modjam.radio.meshcore import MeshCoreRadio + r = MeshCoreRadio() + r._mc = _StubMC(commands) + return r + + +def test_send_waits_until_nb_sent_increments(): + # baseline read returns 100; after send poll returns 100, 100, 101 + cmds = _StubCommands(nb_sent_sequence=[100, 100, 100, 101]) + radio = _make_radio(cmds) + result = asyncio.run(radio.send("hello")) + assert cmds.sends == [(0, "hello")] + assert result.duration_ms >= 0 + # baseline + at least 2 polls before increment + assert cmds._nb_sent == [] # all poll values consumed + + +def test_send_returns_immediately_when_increment_already_observed(): + cmds = _StubCommands(nb_sent_sequence=[5, 7]) # baseline=5, first poll=7 + radio = _make_radio(cmds) + result = asyncio.run(radio.send("x")) + assert result.packet_id == 7 # from send_chan_msg payload + + +def test_send_raises_on_error_event(): + cmds = _StubCommands( + nb_sent_sequence=[0], + send_response=_Event({"error": "no_radio"}, "ERROR"), + ) + radio = _make_radio(cmds) + with pytest.raises(RuntimeError, match="send_chan_msg failed"): + asyncio.run(radio.send("x")) + + +def test_wait_for_tx_times_out(): + # nb_sent never increments + cmds = _StubCommands(nb_sent_sequence=[10] * 1000) + radio = _make_radio(cmds) + + async def go(): + baseline = await radio._read_nb_sent() + await radio._wait_for_tx(baseline, timeout=0.1, poll_interval=0.01) + + with pytest.raises(TimeoutError): + asyncio.run(go()) diff --git a/tests/test_payload.py b/tests/test_payload.py new file mode 100644 index 0000000..bf9fa65 --- /dev/null +++ b/tests/test_payload.py @@ -0,0 +1,51 @@ +from modjam import payload + + +def test_make_payload_pads_to_size(): + text, token = payload.make_test_payload(size=80, seq=1, elapsed=0.5, ts=1234.5) + assert len(text) == 80 + assert text.startswith("1234.5,0.5,1,") + assert f",{token}|" in text + + +def test_make_payload_short_when_prefix_exceeds_size(): + text, token = payload.make_test_payload(size=5, seq=99, elapsed=12.34, ts=1234567890.0) + # prefix already > 5; payload returns prefix as-is + assert text.endswith("|") + assert token in text + + +def test_token_is_hex_and_correct_length(): + _, token = payload.make_test_payload(size=80, seq=1, elapsed=0.0, ts=0.0) + assert len(token) == payload.TOKEN_LEN + assert all(c in "0123456789abcdef" for c in token) + + +def test_tokens_are_unique(): + tokens = {payload.make_test_payload(80, 1, 0, 0)[1] for _ in range(50)} + assert len(tokens) == 50 # vanishingly unlikely to collide + + +def test_extract_token_roundtrip(): + text, token = payload.make_test_payload(size=120, seq=42, elapsed=3.0, ts=999.0) + assert payload.extract_token(text) == token + + +def test_extract_token_returns_none_on_protocol_message(): + assert payload.extract_token("1234567890|A|IDLE|-100") is None + assert payload.extract_token("START|name:foo") is None + assert payload.extract_token("STOP") is None + + +def test_extract_token_returns_none_on_legacy_format(): + # 3-field legacy format (no token) — should not falsely extract + assert payload.extract_token("1234.5,1.0,42|abcdef") is None + + +def test_extract_token_rejects_non_hex(): + assert payload.extract_token("1234.5,1.0,42,nothex!!|abc") is None + + +def test_extract_token_handles_empty(): + assert payload.extract_token("") is None + assert payload.extract_token("nopipe") is None diff --git a/tests/test_udp_radio.py b/tests/test_udp_radio.py index b4a5eac..34c86d0 100644 --- a/tests/test_udp_radio.py +++ b/tests/test_udp_radio.py @@ -19,7 +19,7 @@ def _msg(**overrides): "freq": 915.1, "bw": 500.0, "sf": 7, "cr": 5, "pow": 22, "channel": "modjam", "psk_hash": "abc123", "packet_id": 42, - "text": "1234567890.0,1.0,1|payload", + "text": "1234567890.0,1.0,1,deadbeef|payload", "ts": 0.0, } base.update(overrides) @@ -96,7 +96,7 @@ def test_packet_loss_drops_test_packets(monkeypatch): received = [] r.on_message(lambda p: received.append(p)) for i in range(20): - r._handle_incoming(_msg(packet_id=i, text=f"1234.5,1.0,{i}|x")) + r._handle_incoming(_msg(packet_id=i, text=f"1234.5,1.0,{i},aabbccdd|x")) assert received == [] # 100% drop @@ -125,7 +125,7 @@ def test_packet_loss_partial_with_seed_is_deterministic(monkeypatch): out = [] r.on_message(lambda p: out.append(p.packet_id)) for i in range(50): - r._handle_incoming(_msg(packet_id=i, text=f"100.0,1.0,{i}|x")) + r._handle_incoming(_msg(packet_id=i, text=f"100.0,1.0,{i},11223344|x")) return out a = run_once() @@ -141,5 +141,13 @@ def test_packet_loss_zero_keeps_all_test_packets(monkeypatch): received = [] r.on_message(lambda p: received.append(p)) for i in range(10): - r._handle_incoming(_msg(packet_id=i, text=f"100.0,1.0,{i}|y")) + r._handle_incoming(_msg(packet_id=i, text=f"100.0,1.0,{i},55667788|y")) assert len(received) == 10 + + +def test_test_packet_regex_matches_legacy_and_token_formats(): + from modjam.radio.udp import _TEST_PACKET_RE + assert _TEST_PACKET_RE.match("1234.5,1.0,42|abc") # legacy 3-field + assert _TEST_PACKET_RE.match("1234.5,1.0,42,deadbeef|abc") # with token + assert not _TEST_PACKET_RE.match("1234567890|A|IDLE|-100") # heartbeat + assert not _TEST_PACKET_RE.match("START|name:foo")