modjam/modjam/control.py
Alec Perkins 0f478bf720 Implement test/control stations, simulator, tests
Adds the modjam package: a MeshCore-backed test station service for
Pi (IDLE + RUNNING states, cuesheet-driven), a control station REPL
for the Mac, and a UDP simulator that swaps in for the radio when
SIMULATOR=true (drops cross-config packets and a configurable
fraction of test-payload datagrams to mimic real radio loss).
docker-compose runs three sim stations + control on a bridge net.
TSV log format matches the reference traces.

Pytest suite covers protocol codec, cuesheet builder, TSV logger,
config loader, and UDP radio packet routing/loss; .forgejo/workflows
runs it on push to main and on PRs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 21:27:41 -04:00

108 lines
3.4 KiB
Python

from __future__ import annotations
import asyncio
import shlex
import sys
import time
from . import protocol
from .config import StationConfig
from .radio.base import Radio, RxPacket
HELP = """\
Commands:
start [name=<n>] [f=<f1,f2>] [bw=<...>] [sf=<...>] [cr=<...>]
[pow=<...>] [size=<...>] [stations=A,B,C]
[duration=<s>] [padding=<s>] [spacing=<s>] [at=<min>]
stop multicast STOP to all stations
results <station> <name> (not yet implemented)
help show this
quit exit
"""
class ControlStation:
def __init__(self, cfg: StationConfig, radio: Radio):
self.cfg = cfg
self.radio = radio
async def run(self) -> None:
await self.radio.connect()
self.radio.on_message(self._on_message)
await self.radio.ensure_channel(self.cfg.channel_name, self.cfg.channel_psk)
await self.radio.set_radio(
self.cfg.control_radio.frequency_mhz,
self.cfg.control_radio.bandwidth_khz,
self.cfg.control_radio.spread_factor,
self.cfg.control_radio.coding_rate,
self.cfg.control_radio.tx_power_dbm,
)
print(f"[ctrl] online on {self.cfg.channel_name}; type `help`")
try:
await self._repl()
finally:
await self.radio.disconnect()
def _on_message(self, pkt: RxPacket) -> None:
ts = time.strftime("%H:%M:%S")
print(f"\r[{ts}] rx{f' {pkt.sender}' if pkt.sender else ''}: {pkt.text}")
sys.stdout.write("> ")
sys.stdout.flush()
async def _repl(self) -> None:
loop = asyncio.get_running_loop()
while True:
try:
sys.stdout.write("> ")
sys.stdout.flush()
line = await loop.run_in_executor(None, sys.stdin.readline)
except (EOFError, KeyboardInterrupt):
return
if not line:
return
line = line.strip()
if not line:
continue
try:
if not await self._dispatch(line):
return
except Exception as e:
print(f"[ctrl] error: {e}")
async def _dispatch(self, line: str) -> bool:
parts = shlex.split(line)
head = parts[0].lower()
if head in ("quit", "exit"):
return False
if head == "help":
print(HELP)
return True
if head == "stop":
await self.radio.send(protocol.encode("STOP"))
print("[ctrl] STOP sent")
return True
if head == "start":
args = self._parse_kv(parts[1:])
if "name" not in args:
print("start requires name=<...>")
return True
msg = protocol.encode("START", **args)
await self.radio.send(msg)
print(f"[ctrl] sent: {msg}")
return True
if head == "results":
print("[ctrl] results: not implemented in v1")
return True
print(f"unknown command: {head}")
return True
@staticmethod
def _parse_kv(tokens: list[str]) -> dict[str, str]:
args: dict[str, str] = {}
for t in tokens:
if "=" not in t:
continue
k, v = t.split("=", 1)
args[k.strip()] = v.strip()
return args