modjam/tests/test_protocol.py
Alec Perkins 0f478bf720 Implement test/control stations, simulator, tests
Adds the modjam package: a MeshCore-backed test station service for
Pi (IDLE + RUNNING states, cuesheet-driven), a control station REPL
for the Mac, and a UDP simulator that swaps in for the radio when
SIMULATOR=true (drops cross-config packets and a configurable
fraction of test-payload datagrams to mimic real radio loss).
docker-compose runs three sim stations + control on a bridge net.
TSV log format matches the reference traces.

Pytest suite covers protocol codec, cuesheet builder, TSV logger,
config loader, and UDP radio packet routing/loss; .forgejo/workflows
runs it on push to main and on PRs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 21:27:41 -04:00

69 lines
1.9 KiB
Python

from modjam import protocol
def test_parse_simple():
c = protocol.parse("START|name:foo")
assert c.cmd == "START"
assert c.target is None
assert c.args == {"name": ["foo"]}
def test_parse_with_target():
c = protocol.parse("RESULTS:A|name:t1")
assert c.cmd == "RESULTS"
assert c.target == "A"
assert c.args == {"name": ["t1"]}
def test_parse_multi_value():
c = protocol.parse("START|bw:62.5,500|sf:7,8,9|name:x")
assert c.args["bw"] == ["62.5", "500"]
assert c.args["sf"] == ["7", "8", "9"]
def test_parse_empty_returns_none():
assert protocol.parse("") is None
assert protocol.parse(" \n") is None
def test_parse_lowercase_cmd_normalized():
assert protocol.parse("stop").cmd == "STOP"
def test_parse_arg_without_colon_skipped():
c = protocol.parse("START|name:x|brokenpart|sf:7")
assert c.args == {"name": ["x"], "sf": ["7"]}
def test_encode_scalar_and_list():
out = protocol.encode("START", name="x", sf=[7, 8], bw=500)
assert out == "START|name:x|sf:7,8|bw:500"
def test_encode_with_target():
assert protocol.encode("RESULTS", target="A", name="t1") == "RESULTS:A|name:t1"
def test_encode_skips_none():
assert protocol.encode("STOP", target=None, foo=None) == "STOP"
def test_encode_roundtrip():
src = protocol.encode("START", name="x", stations=["A", "B", "C"], sf=[7, 8])
parsed = protocol.parse(src)
assert parsed.cmd == "START"
assert parsed.args["name"] == ["x"]
assert parsed.args["stations"] == ["A", "B", "C"]
assert parsed.args["sf"] == ["7", "8"]
def test_heartbeat_format():
assert protocol.encode_heartbeat(123, "A", "IDLE", -100) == "123|A|IDLE|-100"
def test_next_format():
assert protocol.encode_next(123, "A", 915.1, 500, 7, 5, 22, 40) == "123|A|next:915.1/500/7/5/22/40"
def test_done_format():
assert protocol.encode_done(123, "A", 915.1, 500, 7, 5, 22, 40, 100, 88) == "123|A|done:915.1/500/7/5/22/40 100/88"