Implement test/control stations, simulator, tests

Adds the modjam package: a MeshCore-backed test station service for
Pi (IDLE + RUNNING states, cuesheet-driven), a control station REPL
for the Mac, and a UDP simulator that swaps in for the radio when
SIMULATOR=true (drops cross-config packets and a configurable
fraction of test-payload datagrams to mimic real radio loss).
docker-compose runs three sim stations + control on a bridge net.
TSV log format matches the reference traces.

Pytest suite covers protocol codec, cuesheet builder, TSV logger,
config loader, and UDP radio packet routing/loss; .forgejo/workflows
runs it on push to main and on PRs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alec K2XAP 2026-05-07 21:27:41 -04:00
parent 7424416b58
commit 0f478bf720
31 changed files with 1708 additions and 0 deletions

View file

@ -0,0 +1,20 @@
name: tests
on:
push:
branches: [main]
pull_request:
jobs:
pytest:
runs-on: docker
container:
image: python:3.12-slim
steps:
- uses: actions/checkout@v4
- name: Install package + test deps
run: pip install --no-cache-dir -e '.[test]'
- name: Run pytest
run: pytest -v

7
.gitignore vendored
View file

@ -1 +1,8 @@
.DS_Store .DS_Store
__pycache__/
*.pyc
*.egg-info/
.venv/
sim/logs/
*.tsv
!reference/*.tsv

93
README.md Normal file
View file

@ -0,0 +1,93 @@
# mesh-control
Operator quickstart for the LoRa modulation jam test framework. Protocol/spec lives in [reference/concept.md](reference/concept.md).
## Components
- `modjam-station` — runs on each Test Station (Raspberry Pi or simulator container) talking to a MeshCore node over USB serial.
- `modjam-control` — interactive REPL on the macbook (or simulator container) that sends START/STOP commands.
- Simulator — `docker compose` with three test stations (A, B, C) and one control container, swapping the radio for UDP.
## Install (host: Pi or Mac)
```sh
python3.11 -m venv .venv && source .venv/bin/activate
pip install -e .
```
Drop a config at `~/modjam-config.json`. Use [sim/station-a.json](sim/station-a.json) as a template; add `"port": "/dev/ttyUSB0"` (Pi) or `"/dev/tty.usbmodem1301"` (Mac) for the attached MeshCore node.
Run:
```sh
modjam-station # on each Pi
modjam-control # on the Mac
```
## Simulator
Three stations + control, no hardware:
```sh
docker compose build
docker compose up -d station-a station-b station-c
docker compose run --rm control
```
Inside the control REPL:
```
> start name=sim1 stations=A,B,C bw=500 sf=8 cr=5 pow=22 duration=20 padding=10 spacing=2 size=40 at=1
> stop
> quit
```
TSV logs appear in `./sim/logs/` (one per station per run, format matches [reference/A.tsv](reference/A.tsv) / [reference/B.tsv](reference/B.tsv)).
`SIMULATOR=true` is what flips the radio backend from MeshCore-USB to UDP. The UDP backend drops cross-config datagrams, so receivers only "hear" senders whose freq/bw/sf/cr/channel match — same behavior as real radios.
The simulator also drops a fraction of received **test packets** at random to mimic real-world packet loss. Default is 15%; override with `SIM_PACKET_LOSS=0.0` (no loss) up to `1.0`. Protocol traffic (START/STOP, heartbeats, next/done) is never dropped — only payloads matching the test-packet pattern. Each station gets a deterministic per-station RNG seed so reruns are repeatable; set `SIM_SEED=...` to vary.
```sh
SIM_PACKET_LOSS=0.3 docker compose up -d station-a station-b station-c
```
## REPL commands
| Command | Effect |
|---|---|
| `start [k=v ...]` | encode and broadcast a START command (see [reference/concept.md](reference/concept.md) for keys: `name`, `f`, `bw`, `sf`, `cr`, `pow`, `size`, `stations`, `duration`, `padding`, `spacing`, `at`) |
| `stop` | multicast STOP — running stations return to IDLE |
| `help` | show command list |
| `quit` | exit |
## Layout
```
modjam/
├── cli.py # entrypoints
├── config.py # ~/modjam-config.json loader
├── protocol.py # `<cmd>[:<station>]|k:v,…` codec + heartbeat/next/done formatters
├── cuesheet.py # build cuesheet from START params
├── station.py # IDLE/RUNNING state machine
├── control.py # REPL + rx tail
├── log.py # TSV logger
└── radio/
├── base.py # Radio ABC
├── meshcore.py # MeshCore over USB serial
├── udp.py # simulator radio
└── factory.py # picks backend from SIMULATOR env
```
## Tests
```sh
pip install -e '.[test]'
pytest
```
CI runs the same suite via [.forgejo/workflows/test.yml](.forgejo/workflows/test.yml) on push to `main` and on PRs.
## Scope
v1: IDLE + RUNNING only. RESULTS / DOWNLINKING is not implemented yet (see [reference/concept.md](reference/concept.md) and [notes.md](notes.md) for the protocol).

54
docker-compose.yml Normal file
View file

@ -0,0 +1,54 @@
services:
station-a:
build: { context: ., dockerfile: docker/station.Dockerfile }
environment:
SIMULATOR: "true"
PEERS: "station-a,station-b,station-c,control"
MODJAM_CONFIG: "/etc/modjam/config.json"
SIM_PACKET_LOSS: "${SIM_PACKET_LOSS:-0.15}"
volumes:
- ./sim/station-a.json:/etc/modjam/config.json:ro
- ./sim/logs:/var/log/modjam
networks: [modjam]
station-b:
build: { context: ., dockerfile: docker/station.Dockerfile }
environment:
SIMULATOR: "true"
PEERS: "station-a,station-b,station-c,control"
MODJAM_CONFIG: "/etc/modjam/config.json"
SIM_PACKET_LOSS: "${SIM_PACKET_LOSS:-0.15}"
volumes:
- ./sim/station-b.json:/etc/modjam/config.json:ro
- ./sim/logs:/var/log/modjam
networks: [modjam]
station-c:
build: { context: ., dockerfile: docker/station.Dockerfile }
environment:
SIMULATOR: "true"
PEERS: "station-a,station-b,station-c,control"
MODJAM_CONFIG: "/etc/modjam/config.json"
SIM_PACKET_LOSS: "${SIM_PACKET_LOSS:-0.15}"
volumes:
- ./sim/station-c.json:/etc/modjam/config.json:ro
- ./sim/logs:/var/log/modjam
networks: [modjam]
control:
build: { context: ., dockerfile: docker/control.Dockerfile }
environment:
SIMULATOR: "true"
PEERS: "station-a,station-b,station-c,control"
MODJAM_CONFIG: "/etc/modjam/config.json"
SIM_PACKET_LOSS: "${SIM_PACKET_LOSS:-0.15}"
volumes:
- ./sim/control.json:/etc/modjam/config.json:ro
- ./sim/logs:/var/log/modjam
stdin_open: true
tty: true
networks: [modjam]
networks:
modjam:
driver: bridge

View file

@ -0,0 +1,9 @@
FROM python:3.12-slim
WORKDIR /app
COPY pyproject.toml requirements.txt /app/
COPY modjam /app/modjam
RUN pip install --no-cache-dir -e .
ENV PYTHONUNBUFFERED=1
CMD ["modjam-control"]

View file

@ -0,0 +1,9 @@
FROM python:3.12-slim
WORKDIR /app
COPY pyproject.toml requirements.txt /app/
COPY modjam /app/modjam
RUN pip install --no-cache-dir -e .
ENV PYTHONUNBUFFERED=1
CMD ["modjam-station"]

0
modjam/__init__.py Normal file
View file

44
modjam/cli.py Normal file
View file

@ -0,0 +1,44 @@
from __future__ import annotations
import argparse
import asyncio
import os
import sys
from .config import StationConfig
from .control import ControlStation
from .radio.factory import make_radio
from .station import TestStation
def station_main() -> int:
parser = argparse.ArgumentParser(prog="modjam-station")
parser.add_argument("--config", help="Path to modjam-config.json (default: $MODJAM_CONFIG or ~/modjam-config.json)")
args = parser.parse_args()
cfg = StationConfig.load(args.config)
if cfg.data_radio is None:
print("station config must include data_radio", file=sys.stderr)
return 2
radio = make_radio(cfg.this_station_name, port=cfg.port)
station = TestStation(cfg, radio)
try:
asyncio.run(station.run())
except KeyboardInterrupt:
pass
return 0
def control_main() -> int:
parser = argparse.ArgumentParser(prog="modjam-control")
parser.add_argument("--config", help="Path to control-config.json (default: $MODJAM_CONFIG or ~/modjam-config.json)")
args = parser.parse_args()
cfg = StationConfig.load(args.config)
radio = make_radio(cfg.this_station_name, port=cfg.port)
ctrl = ControlStation(cfg, radio)
try:
asyncio.run(ctrl.run())
except KeyboardInterrupt:
pass
return 0

51
modjam/config.py Normal file
View file

@ -0,0 +1,51 @@
from dataclasses import dataclass
from pathlib import Path
import json
import os
@dataclass
class RadioConfig:
frequency_mhz: float
bandwidth_khz: float
spread_factor: int
coding_rate: int
tx_power_dbm: int
@classmethod
def from_dict(cls, d: dict) -> "RadioConfig":
return cls(
frequency_mhz=float(d["frequency_mhz"]),
bandwidth_khz=float(d["bandwidth_khz"]),
spread_factor=int(d["spread_factor"]),
coding_rate=int(d["coding_rate"]),
tx_power_dbm=int(d["tx_power_dbm"]),
)
@dataclass
class StationConfig:
this_station_name: str
channel_name: str
channel_psk: str
idle_heartbeat_min: int
control_radio: RadioConfig
data_radio: RadioConfig | None
port: str | None = None
log_dir: str = "."
@classmethod
def load(cls, path: str | Path | None = None) -> "StationConfig":
path = Path(path or os.environ.get("MODJAM_CONFIG") or Path.home() / "modjam-config.json")
with open(path) as f:
d = json.load(f)
return cls(
this_station_name=d["this_station_name"],
channel_name=d["channel_name"],
channel_psk=d["channel_psk"],
idle_heartbeat_min=int(d.get("idle_heartbeat_min", 15)),
control_radio=RadioConfig.from_dict(d["control_radio"]),
data_radio=RadioConfig.from_dict(d["data_radio"]) if "data_radio" in d else None,
port=d.get("port"),
log_dir=d.get("log_dir", "."),
)

108
modjam/control.py Normal file
View file

@ -0,0 +1,108 @@
from __future__ import annotations
import asyncio
import shlex
import sys
import time
from . import protocol
from .config import StationConfig
from .radio.base import Radio, RxPacket
HELP = """\
Commands:
start [name=<n>] [f=<f1,f2>] [bw=<...>] [sf=<...>] [cr=<...>]
[pow=<...>] [size=<...>] [stations=A,B,C]
[duration=<s>] [padding=<s>] [spacing=<s>] [at=<min>]
stop multicast STOP to all stations
results <station> <name> (not yet implemented)
help show this
quit exit
"""
class ControlStation:
def __init__(self, cfg: StationConfig, radio: Radio):
self.cfg = cfg
self.radio = radio
async def run(self) -> None:
await self.radio.connect()
self.radio.on_message(self._on_message)
await self.radio.ensure_channel(self.cfg.channel_name, self.cfg.channel_psk)
await self.radio.set_radio(
self.cfg.control_radio.frequency_mhz,
self.cfg.control_radio.bandwidth_khz,
self.cfg.control_radio.spread_factor,
self.cfg.control_radio.coding_rate,
self.cfg.control_radio.tx_power_dbm,
)
print(f"[ctrl] online on {self.cfg.channel_name}; type `help`")
try:
await self._repl()
finally:
await self.radio.disconnect()
def _on_message(self, pkt: RxPacket) -> None:
ts = time.strftime("%H:%M:%S")
print(f"\r[{ts}] rx{f' {pkt.sender}' if pkt.sender else ''}: {pkt.text}")
sys.stdout.write("> ")
sys.stdout.flush()
async def _repl(self) -> None:
loop = asyncio.get_running_loop()
while True:
try:
sys.stdout.write("> ")
sys.stdout.flush()
line = await loop.run_in_executor(None, sys.stdin.readline)
except (EOFError, KeyboardInterrupt):
return
if not line:
return
line = line.strip()
if not line:
continue
try:
if not await self._dispatch(line):
return
except Exception as e:
print(f"[ctrl] error: {e}")
async def _dispatch(self, line: str) -> bool:
parts = shlex.split(line)
head = parts[0].lower()
if head in ("quit", "exit"):
return False
if head == "help":
print(HELP)
return True
if head == "stop":
await self.radio.send(protocol.encode("STOP"))
print("[ctrl] STOP sent")
return True
if head == "start":
args = self._parse_kv(parts[1:])
if "name" not in args:
print("start requires name=<...>")
return True
msg = protocol.encode("START", **args)
await self.radio.send(msg)
print(f"[ctrl] sent: {msg}")
return True
if head == "results":
print("[ctrl] results: not implemented in v1")
return True
print(f"unknown command: {head}")
return True
@staticmethod
def _parse_kv(tokens: list[str]) -> dict[str, str]:
args: dict[str, str] = {}
for t in tokens:
if "=" not in t:
continue
k, v = t.split("=", 1)
args[k.strip()] = v.strip()
return args

99
modjam/cuesheet.py Normal file
View file

@ -0,0 +1,99 @@
from dataclasses import dataclass
VALID_BW = (62.5, 125.0, 250.0, 500.0)
VALID_SF = (6, 7, 8, 9, 10, 11, 12)
VALID_CR = (5, 6, 7, 8)
@dataclass
class CueEntry:
start: float # absolute unix ts: tune-and-send start
end: float # absolute unix ts: stop sending
next_at: float # absolute unix ts: send "next" message (start - 10)
done_at: float # absolute unix ts: send "done" message (end + 10)
tune_at: float # absolute unix ts: actually call set_radio (next_at + 10 = start)
spacing: float # seconds between transmissions
freq: float
bw: float
sf: int
cr: int
pow: int
size: int
sender: str
@dataclass
class CueParams:
name: str
freq: list[float]
bw: list[float]
sf: list[int]
cr: list[int]
pow: list[int]
size: list[int]
stations: list[str]
duration: int # seconds per case (sender airtime window)
padding: int # seconds between cases
spacing: int # seconds between transmissions
at: int # %nth-minute boundary to start
base_t: float # absolute start unix ts (computed from `at`)
def build(params: CueParams) -> list[CueEntry]:
entries: list[CueEntry] = []
t = params.base_t
for freq in params.freq:
for bw in params.bw:
for sf in params.sf:
for cr in params.cr:
for pw in params.pow:
for size in params.size:
for sender in params.stations:
next_at = t
tune_at = next_at + 10
start = tune_at
end = start + params.duration
done_at = end + 10
entries.append(CueEntry(
start=start,
end=end,
next_at=next_at,
done_at=done_at,
tune_at=tune_at,
spacing=float(params.spacing),
freq=freq,
bw=bw,
sf=sf,
cr=cr,
pow=pw,
size=size,
sender=sender,
))
t = done_at + params.padding
return entries
def parse_start_args(args: dict[str, list[str]]) -> dict:
"""Convert string arg lists from a START command into typed values + defaults."""
def floats(key, default):
return [float(x) for x in args.get(key, [])] or default
def ints(key, default):
return [int(x) for x in args.get(key, [])] or default
name = (args.get("name") or [None])[0]
if not name:
raise ValueError("START requires name")
return dict(
name=name,
freq=floats("f", [916.1]),
bw=floats("bw", list(VALID_BW)),
sf=ints("sf", list(VALID_SF)),
cr=ints("cr", list(VALID_CR)),
pow=ints("pow", [22]),
size=ints("size", [40]),
stations=args.get("stations") or ["A", "B"],
duration=int((args.get("duration") or ["300"])[0]),
padding=int((args.get("padding") or ["60"])[0]),
spacing=int((args.get("spacing") or ["2"])[0]),
at=int((args.get("at") or ["5"])[0]),
)

33
modjam/log.py Normal file
View file

@ -0,0 +1,33 @@
from pathlib import Path
from time import time
from typing import IO
class TsvLogger:
"""Append-only TSV logger matching reference/A.tsv + reference/B.tsv format.
queued: <ts> queued <packet_id> <freq>,<bw>,<sf>,<cr>,<pow>
sent: <ts> sent <packet_id> <duration_ms> <text>
received: <ts> received <packet_id> <text>
"""
def __init__(self, station: str, log_dir: str | Path = "."):
Path(log_dir).mkdir(parents=True, exist_ok=True)
path = Path(log_dir) / f"{station}-{int(time())}.tsv"
self._fh: IO = open(path, "a", buffering=1) # line-buffered
self.path = path
def queued(self, packet_id: int | str, freq: float, bw: float, sf: int, cr: int, pw: int) -> None:
self._fh.write(f"{time()}\tqueued\t{packet_id}\t{freq},{bw},{sf},{cr},{pw}\n")
def sent(self, packet_id: int | str, duration_ms: int, text: str) -> None:
self._fh.write(f"{time()}\tsent\t{packet_id}\t{duration_ms}\t{text}\n")
def received(self, packet_id: int | str, text: str) -> None:
self._fh.write(f"{time()}\treceived\t{packet_id}\t{text}\n")
def close(self) -> None:
try:
self._fh.close()
except Exception:
pass

57
modjam/protocol.py Normal file
View file

@ -0,0 +1,57 @@
from dataclasses import dataclass, field
from typing import Iterable
@dataclass
class Command:
cmd: str
target: str | None
args: dict[str, list[str]] = field(default_factory=dict)
def get(self, key: str, default: list[str] | None = None) -> list[str] | None:
return self.args.get(key, default)
def parse(text: str) -> Command | None:
text = text.strip()
if not text:
return None
parts = text.split("|")
head = parts[0]
if ":" in head:
cmd, target = head.split(":", 1)
else:
cmd, target = head, None
args: dict[str, list[str]] = {}
for part in parts[1:]:
if ":" not in part:
continue
k, v = part.split(":", 1)
args[k.strip()] = [x.strip() for x in v.split(",") if x.strip()]
return Command(cmd=cmd.strip().upper(), target=target, args=args)
def encode(cmd: str, target: str | None = None, **args: Iterable) -> str:
head = f"{cmd}:{target}" if target else cmd
parts = [head]
for k, v in args.items():
if v is None:
continue
if isinstance(v, (list, tuple, set)):
v = ",".join(str(x) for x in v)
parts.append(f"{k}:{v}")
return "|".join(parts)
# Heartbeat / status messages — pipe-separated positional, no key:value structure.
def encode_heartbeat(ts: int, station: str, state: str, noise_floor: int) -> str:
return f"{ts}|{station}|{state}|{noise_floor}"
def encode_next(ts: int, station: str, freq: float, bw: float, sf: int, cr: int, pow_: int, size: int) -> str:
return f"{ts}|{station}|next:{freq}/{bw}/{sf}/{cr}/{pow_}/{size}"
def encode_done(ts: int, station: str, freq: float, bw: float, sf: int, cr: int, pow_: int, size: int, n_sent: int, n_rcvd: int) -> str:
return f"{ts}|{station}|done:{freq}/{bw}/{sf}/{cr}/{pow_}/{size} {n_sent}/{n_rcvd}"

0
modjam/radio/__init__.py Normal file
View file

45
modjam/radio/base.py Normal file
View file

@ -0,0 +1,45 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Awaitable, Callable
@dataclass
class RxPacket:
packet_id: int | str
text: str
sender: str | None = None
rssi: int | None = None
snr: float | None = None
@dataclass
class SendResult:
packet_id: int | str
duration_ms: int
MessageHandler = Callable[[RxPacket], Awaitable[None] | None]
class Radio(ABC):
@abstractmethod
async def connect(self) -> None: ...
@abstractmethod
async def set_radio(self, freq_mhz: float, bw_khz: float,
sf: int, cr: int, tx_power_dbm: int) -> None: ...
@abstractmethod
async def ensure_channel(self, name: str, psk_b64: str) -> None: ...
@abstractmethod
async def send(self, text: str) -> SendResult: ...
@abstractmethod
def on_message(self, cb: MessageHandler) -> None: ...
@abstractmethod
async def get_noise_floor(self) -> int: ...
@abstractmethod
async def disconnect(self) -> None: ...

15
modjam/radio/factory.py Normal file
View file

@ -0,0 +1,15 @@
from __future__ import annotations
import os
from .base import Radio
def make_radio(station: str, port: str | None = None) -> Radio:
if os.environ.get("SIMULATOR", "").lower() in ("1", "true", "yes"):
from .udp import UDPRadio
peers = [p.strip() for p in os.environ.get("PEERS", "").split(",") if p.strip()]
peers = [p for p in peers if p != station and p.lower() != station.lower()]
return UDPRadio(station=station, peers=peers)
from .meshcore import MeshCoreRadio
return MeshCoreRadio(port=port)

100
modjam/radio/meshcore.py Normal file
View file

@ -0,0 +1,100 @@
from __future__ import annotations
import asyncio
import base64
import hashlib
import time
from itertools import count
from .base import MessageHandler, Radio, RxPacket, SendResult
def _psk_bytes(psk_b64: str) -> bytes:
"""Decode the channel PSK from base64; pad/truncate to 16 bytes."""
try:
raw = base64.b64decode(psk_b64 + "=" * (-len(psk_b64) % 4), validate=False)
except Exception:
raw = psk_b64.encode("utf-8")
if len(raw) >= 16:
return raw[:16]
return hashlib.sha256(raw).digest()[:16]
class MeshCoreRadio(Radio):
"""Wraps the `meshcore` Python library over USB serial."""
def __init__(self, port: str | None = None, baud: int = 115200):
self._port = port
self._baud = baud
self._mc = None
self._handlers: list[MessageHandler] = []
self._channel_idx = 0
self._configured_channel: tuple[str, str] | None = None
self._counter = count(1)
async def connect(self) -> None:
from meshcore import MeshCore, EventType
if self._port:
self._mc = await MeshCore.create_serial(self._port, self._baud)
else:
self._mc = await MeshCore.create_serial()
self._EventType = EventType
self._mc.subscribe(EventType.CHANNEL_MSG_RECV, self._on_chan_msg)
await self._mc.start_auto_message_fetching()
async def _on_chan_msg(self, event) -> None:
p = event.payload or {}
text = p.get("text") or (p.get("payload", b"").decode("utf-8", errors="replace") if isinstance(p.get("payload"), (bytes, bytearray)) else "")
pkt = RxPacket(
packet_id=p.get("packet_id") or p.get("id") or next(self._counter),
text=text,
sender=str(p.get("pubkey_prefix") or p.get("from") or ""),
rssi=p.get("rssi"),
snr=p.get("snr"),
)
for cb in self._handlers:
res = cb(pkt)
if asyncio.iscoroutine(res):
await res
async def set_radio(self, freq_mhz: float, bw_khz: float,
sf: int, cr: int, tx_power_dbm: int) -> None:
assert self._mc
await self._mc.commands.set_radio(freq_mhz, bw_khz, sf, cr)
await self._mc.commands.set_tx_power(tx_power_dbm)
async def ensure_channel(self, name: str, psk_b64: str) -> None:
assert self._mc
if self._configured_channel == (name, psk_b64):
return
await self._mc.commands.set_channel(self._channel_idx, name, _psk_bytes(psk_b64))
self._configured_channel = (name, psk_b64)
async def send(self, text: str) -> SendResult:
assert self._mc
t0 = time.perf_counter()
result = await self._mc.commands.send_chan_msg(self._channel_idx, text)
ms = int((time.perf_counter() - t0) * 1000)
payload = getattr(result, "payload", {}) or {}
pid = payload.get("packet_id") or payload.get("id") or next(self._counter)
return SendResult(packet_id=pid, duration_ms=ms)
def on_message(self, cb: MessageHandler) -> None:
self._handlers.append(cb)
async def get_noise_floor(self) -> int:
assert self._mc
try:
ev = await self._mc.commands.get_stats_radio()
payload = getattr(ev, "payload", {}) or {}
return int(payload.get("noise_floor", payload.get("noise", -100)))
except Exception:
return -100
async def disconnect(self) -> None:
if self._mc:
try:
await self._mc.disconnect()
except Exception:
pass
self._mc = None

159
modjam/radio/udp.py Normal file
View file

@ -0,0 +1,159 @@
from __future__ import annotations
import asyncio
import hashlib
import json
import os
import random
import re
import socket
import time
from itertools import count
from .base import MessageHandler, Radio, RxPacket, SendResult
SIM_PORT = int(os.environ.get("SIM_PORT", "9000"))
# Test packet payload format: `<float_ts>,<float_t>,<int_n>|<padding>`.
# Heartbeat/next/done payloads start with `<int_ts>|<station>|...` — no comma
# before the first `|` — so this regex matches test packets only.
_TEST_PACKET_RE = re.compile(r"^\d+(?:\.\d+)?,\d+(?:\.\d+)?,\d+\|")
def _psk_hash(psk_b64: str) -> str:
return hashlib.sha256(psk_b64.encode("utf-8")).hexdigest()[:16]
def _params_match(a: dict, b: dict) -> bool:
"""Real radios only hear if freq/bw/sf/cr/channel agree."""
return (
abs(a["freq"] - b["freq"]) < 1e-3
and abs(a["bw"] - b["bw"]) < 1e-3
and a["sf"] == b["sf"]
and a["cr"] == b["cr"]
and a["channel"] == b["channel"]
and a["psk_hash"] == b["psk_hash"]
)
class _UdpProtocol(asyncio.DatagramProtocol):
def __init__(self, radio: "UDPRadio"):
self.radio = radio
def datagram_received(self, data: bytes, addr) -> None:
try:
msg = json.loads(data.decode("utf-8"))
except Exception:
return
self.radio._handle_incoming(msg)
class UDPRadio(Radio):
"""Simulator radio: broadcasts JSON datagrams to listed peers.
Drops incoming packets whose sender radio params don't match this radio's
current settings mimics real radios not hearing off-config transmissions.
"""
def __init__(self, station: str, peers: list[str], port: int = SIM_PORT):
self._station = station
self._peers = [p for p in peers if p]
self._port = port
self._handlers: list[MessageHandler] = []
self._counter = count(1)
self._loop: asyncio.AbstractEventLoop | None = None
self._transport: asyncio.DatagramTransport | None = None
self._cur = {
"freq": 0.0, "bw": 0.0, "sf": 0, "cr": 0, "pow": 0,
"channel": "", "psk_hash": "",
}
self._noise_floor = int(os.environ.get("SIM_NOISE", "-100"))
loss = float(os.environ.get("SIM_PACKET_LOSS", "0") or 0)
self._test_packet_loss = max(0.0, min(1.0, loss))
self._rng = random.Random(os.environ.get("SIM_SEED") or station)
async def connect(self) -> None:
self._loop = asyncio.get_running_loop()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setblocking(False)
sock.bind(("0.0.0.0", self._port))
self._transport, _ = await self._loop.create_datagram_endpoint(
lambda: _UdpProtocol(self), sock=sock
)
async def set_radio(self, freq_mhz: float, bw_khz: float,
sf: int, cr: int, tx_power_dbm: int) -> None:
self._cur["freq"] = float(freq_mhz)
self._cur["bw"] = float(bw_khz)
self._cur["sf"] = int(sf)
self._cur["cr"] = int(cr)
self._cur["pow"] = int(tx_power_dbm)
async def ensure_channel(self, name: str, psk_b64: str) -> None:
self._cur["channel"] = name
self._cur["psk_hash"] = _psk_hash(psk_b64)
async def send(self, text: str) -> SendResult:
assert self._transport
pid = next(self._counter)
msg = {
**self._cur,
"sender": self._station,
"packet_id": pid,
"text": text,
"ts": time.time(),
}
data = json.dumps(msg).encode("utf-8")
t0 = time.perf_counter()
for peer in self._peers:
try:
addrs = socket.getaddrinfo(peer, self._port, type=socket.SOCK_DGRAM)
except socket.gaierror:
continue
for *_, sockaddr in addrs:
try:
self._transport.sendto(data, sockaddr)
break
except OSError:
continue
ms = int((time.perf_counter() - t0) * 1000) or 1
return SendResult(packet_id=pid, duration_ms=ms)
def _handle_incoming(self, msg: dict) -> None:
if msg.get("sender") == self._station:
return
if not _params_match(msg, self._cur):
return
text = msg.get("text", "") or ""
if (
self._test_packet_loss > 0
and _TEST_PACKET_RE.match(text)
and self._rng.random() < self._test_packet_loss
):
return
pkt = RxPacket(
packet_id=msg.get("packet_id"),
text=text,
sender=msg.get("sender"),
rssi=msg.get("rssi"),
snr=msg.get("snr"),
)
for cb in self._handlers:
res = cb(pkt)
if asyncio.iscoroutine(res):
assert self._loop
asyncio.run_coroutine_threadsafe(res, self._loop)
def on_message(self, cb: MessageHandler) -> None:
self._handlers.append(cb)
async def get_noise_floor(self) -> int:
return self._noise_floor
async def disconnect(self) -> None:
if self._transport:
self._transport.close()
self._transport = None

255
modjam/station.py Normal file
View file

@ -0,0 +1,255 @@
from __future__ import annotations
import asyncio
import random
import string
import time
from datetime import datetime, timedelta
from . import cuesheet, protocol
from .config import RadioConfig, StationConfig
from .log import TsvLogger
from .radio.base import Radio, RxPacket
def _now_ts() -> int:
return int(time.time())
def _next_minute_boundary(at: int) -> float:
n = datetime.now()
floor = datetime(n.year, n.month, n.day, n.hour, n.minute)
delta_min = at - n.minute % at
if delta_min == 0:
delta_min = at
return (floor + timedelta(minutes=delta_min)).timestamp()
class TestStation:
HEARTBEAT_STARTUP_DELAY_S = 2
def __init__(self, cfg: StationConfig, radio: Radio):
self.cfg = cfg
self.radio = radio
self.state: str = "IDLE"
self._cmd_queue: asyncio.Queue = asyncio.Queue()
self._stop_run = asyncio.Event()
self._case_active = False
self._cur_radio_label: tuple[float, float, int, int, int] | None = None
self._rcvd_count = 0
self._sent_count = 0
self._logger: TsvLogger | None = None
# ---- lifecycle ----
async def run(self) -> None:
await self.radio.connect()
self.radio.on_message(self._on_message)
await self.radio.ensure_channel(self.cfg.channel_name, self.cfg.channel_psk)
await self._tune(self.cfg.control_radio)
hb_task = asyncio.create_task(self._heartbeat_loop(), name="heartbeat")
try:
while True:
await self._idle_until_start()
if self._pending_start:
await self._run_test(self._pending_start)
self._pending_start = None
finally:
hb_task.cancel()
await self.radio.disconnect()
if self._logger:
self._logger.close()
# ---- IDLE ----
_pending_start = None
async def _idle_until_start(self) -> None:
self.state = "IDLE"
await self._tune(self.cfg.control_radio)
while True:
cmd = await self._cmd_queue.get()
if cmd.cmd != "START":
continue
if not self._addressed_to_us(cmd):
continue
stations = cmd.get("stations") or ["A", "B"]
if self.cfg.this_station_name not in stations:
continue
self._pending_start = cmd
return
def _addressed_to_us(self, cmd: protocol.Command) -> bool:
if cmd.target is None:
return True # multicast
return cmd.target.upper() == self.cfg.this_station_name.upper()
async def _heartbeat_loop(self) -> None:
await asyncio.sleep(self.HEARTBEAT_STARTUP_DELAY_S)
interval = max(1, self.cfg.idle_heartbeat_min) * 60
while True:
try:
if self.state == "IDLE":
nf = await self.radio.get_noise_floor()
msg = protocol.encode_heartbeat(_now_ts(), self.cfg.this_station_name, self.state, nf)
await self.radio.send(msg)
except Exception as e:
print(f"[hb] error: {e}")
await asyncio.sleep(interval)
# ---- RUNNING ----
async def _run_test(self, start_cmd: protocol.Command) -> None:
self.state = "RUNNING"
self._stop_run.clear()
try:
params = cuesheet.parse_start_args(start_cmd.args)
except ValueError as e:
print(f"[run] bad START: {e}")
return
if self.cfg.this_station_name not in params["stations"]:
print(f"[run] station {self.cfg.this_station_name} not in {params['stations']}; ignoring")
return
base_t = _next_minute_boundary(params["at"])
cue_params = cuesheet.CueParams(base_t=base_t, **params)
cues = cuesheet.build(cue_params)
print(f"[run] {len(cues)} cases, ~{(cues[-1].done_at - base_t) / 3600:.2f}h, base_t={base_t}")
self._logger = TsvLogger(self.cfg.this_station_name, self.cfg.log_dir)
try:
for cue in cues:
if self._stop_run.is_set() or self._check_stop_in_queue():
print("[run] STOP received, returning to IDLE")
return
await self._run_case(cue)
finally:
await self._tune(self.cfg.control_radio)
if self._logger:
self._logger.close()
self._logger = None
self.state = "IDLE"
async def _run_case(self, cue: cuesheet.CueEntry) -> None:
# T-10: announce next
await self._sleep_until(cue.next_at)
if self._stop_run.is_set() or self._check_stop_in_queue():
return
await self._tune(self.cfg.control_radio)
await self.radio.send(protocol.encode_next(
_now_ts(), self.cfg.this_station_name,
cue.freq, cue.bw, cue.sf, cue.cr, cue.pow, cue.size,
))
# T0: tune to data radio
await self._sleep_until(cue.tune_at)
if self._stop_run.is_set():
return
await self.radio.set_radio(cue.freq, cue.bw, cue.sf, cue.cr, cue.pow)
self._cur_radio_label = (cue.freq, cue.bw, cue.sf, cue.cr, cue.pow)
self._sent_count = 0
self._rcvd_count = 0
self._case_active = True
try:
if cue.sender == self.cfg.this_station_name:
await self._send_loop(cue)
else:
await self._listen_until(cue.end)
finally:
self._case_active = False
# post-case: control radio + done
await self._tune(self.cfg.control_radio)
await self._sleep_until(cue.done_at)
await self.radio.send(protocol.encode_done(
_now_ts(), self.cfg.this_station_name,
cue.freq, cue.bw, cue.sf, cue.cr, cue.pow, cue.size,
self._sent_count, self._rcvd_count,
))
async def _send_loop(self, cue: cuesheet.CueEntry) -> None:
n = 0
case_start = time.time()
while time.time() < cue.end and not self._stop_run.is_set():
n += 1
self._sent_count = n
text = self._make_payload(cue.size, n, time.time() - case_start)
assert self._logger
try:
# cannot know packet_id before send; log queued with placeholder, then sent
result = await self.radio.send(text)
self._logger.queued(result.packet_id, cue.freq, cue.bw, cue.sf, cue.cr, cue.pow)
self._logger.sent(result.packet_id, result.duration_ms, text)
except Exception as e:
print(f"[send] error: {e}")
try:
await asyncio.wait_for(self._stop_run.wait(), timeout=cue.spacing)
return # stop_run set
except asyncio.TimeoutError:
pass
async def _listen_until(self, until_ts: float) -> None:
while time.time() < until_ts and not self._stop_run.is_set():
remaining = until_ts - time.time()
try:
await asyncio.wait_for(self._stop_run.wait(), timeout=max(0.1, remaining))
return
except asyncio.TimeoutError:
pass
@staticmethod
def _make_payload(size: int, n: int, t: float) -> str:
prefix = f"{time.time()},{t},{n}|"
if len(prefix) >= size:
return prefix
pad = "".join(random.choice(string.ascii_letters) for _ in range(size - len(prefix)))
return prefix + pad
# ---- helpers ----
async def _tune(self, rc: RadioConfig) -> None:
await self.radio.set_radio(
rc.frequency_mhz, rc.bandwidth_khz, rc.spread_factor, rc.coding_rate, rc.tx_power_dbm,
)
async def _sleep_until(self, ts: float) -> None:
while True:
now = time.time()
if now >= ts:
return
try:
await asyncio.wait_for(self._stop_run.wait(), timeout=ts - now)
return
except asyncio.TimeoutError:
return
def _check_stop_in_queue(self) -> bool:
# Drain queue checking for STOP without blocking.
found = False
while not self._cmd_queue.empty():
cmd = self._cmd_queue.get_nowait()
if cmd.cmd == "STOP":
found = True
self._stop_run.set()
return found
# ---- rx routing ----
def _on_message(self, pkt: RxPacket) -> None:
text = pkt.text or ""
cmd = None
if text:
try:
cmd = protocol.parse(text)
except Exception:
cmd = None
if cmd and cmd.cmd in ("START", "STOP"):
if cmd.cmd == "STOP" and self.state == "RUNNING":
self._stop_run.set()
self._cmd_queue.put_nowait(cmd)
return
if self._case_active and self._logger:
self._rcvd_count += 1
self._logger.received(pkt.packet_id, text)

46
notes.md Normal file
View file

@ -0,0 +1,46 @@
Review the @README.md and implement a python process for the test stations that can run on a Raspberry Pi and a process for the control station that can run on a macbook.
Also set up a simulator using docker-compose with Docker containers for each Test Station process and a control station, with a `SIMULATOR=true` env variable. When this is true, the process uses UDP on the Docker network between the containers to "transmit" and "listen" for packets instead of looking for a USB serial attached node device.
Reference a prototype in @reference/modjam-prototype.py and its output tsv files.
#### DOWNLINKING
The control node sends a `RESULTS:<station>|<testname>` command on the `control_radio` settings. This is received by all Test Stations in `IDLE` state but only the station addressed by the command acts on the command.
Some arbitrary time after, the control station sends a `RESULTS:A|test1` command. The Control Station and Test Station A tune to the data radio settings and Test Station A enters `DOWNLINKING` state. Each beacons a "ready" message until they hear the other station's "ready", at which point Test Station A
Simulator using Docker containers for each Test Station process, with a `SIMULATOR=true` env variable. When this is true, the process uses UDP on the Docker network between the containers to "transmit" and "listen" for packets.
Commands:
- START
- STOP
- RESULTS
<cmd>[:<station>]|<attr1:val1,val2,>|<attr2:val1,val2,>
Listens for START command from control node
`START|name:<testname>|f:915.1|bw:62.5,125,250,500|sf:6,7,8,9,10,11,12|cr:4,5,6,7,8|pow:10,22`
RESULTS command:
`RESULTS:<station>\tname:<testname>`
RESULTS:A|name
sends summary of each trial
<station>|<name>|<freq>|<bw>|<sf>|<cr>|<pow>|<A>|<B>|<C>
A|test1|915.1|500|7|8|22|100|88|84
B|test1|915.1|500|7|8|22|85|100|80
C|test1|915.1|500|7|8|22|85|100|80

27
pyproject.toml Normal file
View file

@ -0,0 +1,27 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "modjam"
version = "0.1.0"
description = "LoRa modulation jam — packet delivery rate measurement across MeshCore radio settings"
requires-python = ">=3.11"
dependencies = [
"meshcore==2.3.7",
]
[project.optional-dependencies]
test = [
"pytest>=8",
]
[project.scripts]
modjam-station = "modjam.cli:station_main"
modjam-control = "modjam.cli:control_main"
[tool.setuptools.packages.find]
include = ["modjam*"]
[tool.pytest.ini_options]
testpaths = ["tests"]

14
sim/control.json Normal file
View file

@ -0,0 +1,14 @@
{
"this_station_name": "CTRL",
"channel_name": "modjam",
"channel_psk": "MTIzNGFiY2RlZmdoaWprbA==",
"idle_heartbeat_min": 60,
"log_dir": "/var/log/modjam",
"control_radio": {
"frequency_mhz": 915.125,
"bandwidth_khz": 500,
"spread_factor": 12,
"coding_rate": 8,
"tx_power_dbm": 22
}
}

21
sim/station-a.json Normal file
View file

@ -0,0 +1,21 @@
{
"this_station_name": "A",
"channel_name": "modjam",
"channel_psk": "MTIzNGFiY2RlZmdoaWprbA==",
"idle_heartbeat_min": 1,
"log_dir": "/var/log/modjam",
"control_radio": {
"frequency_mhz": 915.125,
"bandwidth_khz": 500,
"spread_factor": 12,
"coding_rate": 8,
"tx_power_dbm": 22
},
"data_radio": {
"frequency_mhz": 915.125,
"bandwidth_khz": 500,
"spread_factor": 7,
"coding_rate": 5,
"tx_power_dbm": 1
}
}

21
sim/station-b.json Normal file
View file

@ -0,0 +1,21 @@
{
"this_station_name": "B",
"channel_name": "modjam",
"channel_psk": "MTIzNGFiY2RlZmdoaWprbA==",
"idle_heartbeat_min": 1,
"log_dir": "/var/log/modjam",
"control_radio": {
"frequency_mhz": 915.125,
"bandwidth_khz": 500,
"spread_factor": 12,
"coding_rate": 8,
"tx_power_dbm": 22
},
"data_radio": {
"frequency_mhz": 915.125,
"bandwidth_khz": 500,
"spread_factor": 7,
"coding_rate": 5,
"tx_power_dbm": 1
}
}

21
sim/station-c.json Normal file
View file

@ -0,0 +1,21 @@
{
"this_station_name": "C",
"channel_name": "modjam",
"channel_psk": "MTIzNGFiY2RlZmdoaWprbA==",
"idle_heartbeat_min": 1,
"log_dir": "/var/log/modjam",
"control_radio": {
"frequency_mhz": 915.125,
"bandwidth_khz": 500,
"spread_factor": 12,
"coding_rate": 8,
"tx_power_dbm": 22
},
"data_radio": {
"frequency_mhz": 915.125,
"bandwidth_khz": 500,
"spread_factor": 7,
"coding_rate": 5,
"tx_power_dbm": 1
}
}

0
tests/__init__.py Normal file
View file

63
tests/test_config.py Normal file
View file

@ -0,0 +1,63 @@
import json
import pytest
from modjam.config import StationConfig
SAMPLE = {
"this_station_name": "A",
"channel_name": "modjam",
"channel_psk": "MTIzNGFiY2RlZmdoaWprbA==",
"idle_heartbeat_min": 2,
"control_radio": {
"frequency_mhz": 915.125,
"bandwidth_khz": 500,
"spread_factor": 12,
"coding_rate": 8,
"tx_power_dbm": 22,
},
"data_radio": {
"frequency_mhz": 915.125,
"bandwidth_khz": 500,
"spread_factor": 7,
"coding_rate": 5,
"tx_power_dbm": 1,
},
}
def _write(tmp_path, data):
p = tmp_path / "modjam-config.json"
p.write_text(json.dumps(data))
return p
def test_load_full_config(tmp_path):
cfg = StationConfig.load(_write(tmp_path, SAMPLE))
assert cfg.this_station_name == "A"
assert cfg.idle_heartbeat_min == 2
assert cfg.control_radio.spread_factor == 12
assert cfg.data_radio is not None
assert cfg.data_radio.tx_power_dbm == 1
def test_load_defaults_heartbeat(tmp_path):
d = dict(SAMPLE)
d.pop("idle_heartbeat_min")
cfg = StationConfig.load(_write(tmp_path, d))
assert cfg.idle_heartbeat_min == 15
def test_load_without_data_radio(tmp_path):
d = dict(SAMPLE)
d.pop("data_radio")
cfg = StationConfig.load(_write(tmp_path, d))
assert cfg.data_radio is None
def test_load_missing_required(tmp_path):
d = dict(SAMPLE)
d.pop("channel_name")
with pytest.raises(KeyError):
StationConfig.load(_write(tmp_path, d))

86
tests/test_cuesheet.py Normal file
View file

@ -0,0 +1,86 @@
import pytest
from modjam import cuesheet
def test_parse_start_args_defaults():
args = cuesheet.parse_start_args({"name": ["x"]})
assert args["name"] == "x"
assert args["freq"] == [916.1]
assert args["bw"] == list(cuesheet.VALID_BW)
assert args["sf"] == list(cuesheet.VALID_SF)
assert args["cr"] == list(cuesheet.VALID_CR)
assert args["pow"] == [22]
assert args["size"] == [40]
assert args["stations"] == ["A", "B"]
assert args["duration"] == 300
assert args["padding"] == 60
assert args["spacing"] == 2
assert args["at"] == 5
def test_parse_start_args_overrides():
args = cuesheet.parse_start_args({
"name": ["t1"],
"f": ["910.2", "915.1"],
"bw": ["62.5", "500"],
"sf": ["7", "8"],
"cr": ["5"],
"pow": ["10", "22"],
"stations": ["A", "B", "C"],
"duration": ["60"],
"padding": ["5"],
"spacing": ["1"],
"size": ["40", "200"],
"at": ["1"],
})
assert args["freq"] == [910.2, 915.1]
assert args["bw"] == [62.5, 500.0]
assert args["sf"] == [7, 8]
assert args["pow"] == [10, 22]
assert args["stations"] == ["A", "B", "C"]
assert args["duration"] == 60
assert args["size"] == [40, 200]
def test_parse_start_args_requires_name():
with pytest.raises(ValueError):
cuesheet.parse_start_args({})
def test_build_case_count():
args = cuesheet.parse_start_args({
"name": ["x"],
"f": ["915.1"],
"bw": ["500"],
"sf": ["7", "8"],
"cr": ["5"],
"pow": ["22"],
"size": ["40"],
"stations": ["A", "B"],
"duration": ["10"],
"padding": ["2"],
"spacing": ["1"],
"at": ["1"],
})
cues = cuesheet.build(cuesheet.CueParams(base_t=0.0, **args))
# 1 freq * 1 bw * 2 sf * 1 cr * 1 pow * 1 size * 2 stations
assert len(cues) == 4
def test_build_timing_invariants():
args = cuesheet.parse_start_args({"name": ["x"], "f": ["915.1"], "bw": ["500"], "sf": ["7"], "cr": ["5"], "pow": ["22"], "size": ["40"], "stations": ["A"], "duration": ["20"], "padding": ["5"], "spacing": ["1"], "at": ["1"]})
cues = cuesheet.build(cuesheet.CueParams(base_t=1000.0, **args))
c = cues[0]
assert c.next_at == 1000.0
assert c.tune_at == 1010.0
assert c.start == c.tune_at
assert c.end == c.start + 20
assert c.done_at == c.end + 10
assert c.spacing == 1.0
def test_build_consecutive_cases_separated_by_padding():
args = cuesheet.parse_start_args({"name": ["x"], "f": ["915.1"], "bw": ["500"], "sf": ["7"], "cr": ["5"], "pow": ["22"], "size": ["40"], "stations": ["A", "B"], "duration": ["10"], "padding": ["3"], "spacing": ["1"], "at": ["1"]})
cues = cuesheet.build(cuesheet.CueParams(base_t=0.0, **args))
assert cues[1].next_at == cues[0].done_at + 3

37
tests/test_log.py Normal file
View file

@ -0,0 +1,37 @@
from modjam.log import TsvLogger
def test_tsv_logger_writes_all_event_types(tmp_path):
logger = TsvLogger("Z", tmp_path)
logger.queued(42, 915.1, 500.0, 7, 5, 22)
logger.sent(42, 13, "1234.5,0.0,1|abc")
logger.received(99, "1234.5,1.0,2|xyz")
logger.close()
rows = logger.path.read_text().strip().split("\n")
assert len(rows) == 3
parts0 = rows[0].split("\t")
assert parts0[1] == "queued"
assert parts0[2] == "42"
assert parts0[3] == "915.1,500.0,7,5,22"
parts1 = rows[1].split("\t")
assert parts1[1] == "sent"
assert parts1[2] == "42"
assert parts1[3] == "13"
assert parts1[4] == "1234.5,0.0,1|abc"
parts2 = rows[2].split("\t")
assert parts2[1] == "received"
assert parts2[2] == "99"
assert parts2[3] == "1234.5,1.0,2|xyz"
def test_tsv_logger_creates_log_dir(tmp_path):
sub = tmp_path / "nested" / "logs"
logger = TsvLogger("A", sub)
logger.queued(1, 0, 0, 0, 0, 0)
logger.close()
assert sub.is_dir()
assert logger.path.exists()

69
tests/test_protocol.py Normal file
View file

@ -0,0 +1,69 @@
from modjam import protocol
def test_parse_simple():
c = protocol.parse("START|name:foo")
assert c.cmd == "START"
assert c.target is None
assert c.args == {"name": ["foo"]}
def test_parse_with_target():
c = protocol.parse("RESULTS:A|name:t1")
assert c.cmd == "RESULTS"
assert c.target == "A"
assert c.args == {"name": ["t1"]}
def test_parse_multi_value():
c = protocol.parse("START|bw:62.5,500|sf:7,8,9|name:x")
assert c.args["bw"] == ["62.5", "500"]
assert c.args["sf"] == ["7", "8", "9"]
def test_parse_empty_returns_none():
assert protocol.parse("") is None
assert protocol.parse(" \n") is None
def test_parse_lowercase_cmd_normalized():
assert protocol.parse("stop").cmd == "STOP"
def test_parse_arg_without_colon_skipped():
c = protocol.parse("START|name:x|brokenpart|sf:7")
assert c.args == {"name": ["x"], "sf": ["7"]}
def test_encode_scalar_and_list():
out = protocol.encode("START", name="x", sf=[7, 8], bw=500)
assert out == "START|name:x|sf:7,8|bw:500"
def test_encode_with_target():
assert protocol.encode("RESULTS", target="A", name="t1") == "RESULTS:A|name:t1"
def test_encode_skips_none():
assert protocol.encode("STOP", target=None, foo=None) == "STOP"
def test_encode_roundtrip():
src = protocol.encode("START", name="x", stations=["A", "B", "C"], sf=[7, 8])
parsed = protocol.parse(src)
assert parsed.cmd == "START"
assert parsed.args["name"] == ["x"]
assert parsed.args["stations"] == ["A", "B", "C"]
assert parsed.args["sf"] == ["7", "8"]
def test_heartbeat_format():
assert protocol.encode_heartbeat(123, "A", "IDLE", -100) == "123|A|IDLE|-100"
def test_next_format():
assert protocol.encode_next(123, "A", 915.1, 500, 7, 5, 22, 40) == "123|A|next:915.1/500/7/5/22/40"
def test_done_format():
assert protocol.encode_done(123, "A", 915.1, 500, 7, 5, 22, 40, 100, 88) == "123|A|done:915.1/500/7/5/22/40 100/88"

145
tests/test_udp_radio.py Normal file
View file

@ -0,0 +1,145 @@
import os
import pytest
from modjam.radio.udp import UDPRadio, _params_match, _psk_hash
@pytest.fixture
def radio(monkeypatch):
"""Construct a UDPRadio without binding sockets."""
monkeypatch.delenv("SIM_PACKET_LOSS", raising=False)
monkeypatch.delenv("SIM_SEED", raising=False)
return UDPRadio(station="A", peers=["B", "C"])
def _msg(**overrides):
base = {
"sender": "B",
"freq": 915.1, "bw": 500.0, "sf": 7, "cr": 5, "pow": 22,
"channel": "modjam", "psk_hash": "abc123",
"packet_id": 42,
"text": "1234567890.0,1.0,1|payload",
"ts": 0.0,
}
base.update(overrides)
return base
def _tune(radio, **overrides):
radio._cur.update({
"freq": 915.1, "bw": 500.0, "sf": 7, "cr": 5, "pow": 22,
"channel": "modjam", "psk_hash": "abc123",
})
radio._cur.update(overrides)
def test_params_match_exact():
a = {"freq": 915.1, "bw": 500.0, "sf": 7, "cr": 5, "channel": "x", "psk_hash": "h"}
b = dict(a)
assert _params_match(a, b) is True
@pytest.mark.parametrize("field,bad", [
("freq", 916.0),
("bw", 250.0),
("sf", 8),
("cr", 6),
("channel", "other"),
("psk_hash", "different"),
])
def test_params_match_rejects_mismatch(field, bad):
a = {"freq": 915.1, "bw": 500.0, "sf": 7, "cr": 5, "channel": "x", "psk_hash": "h"}
b = dict(a)
b[field] = bad
assert _params_match(a, b) is False
def test_psk_hash_deterministic_and_short():
h1 = _psk_hash("hello")
h2 = _psk_hash("hello")
assert h1 == h2 and len(h1) == 16
assert _psk_hash("hello") != _psk_hash("world")
def test_handle_incoming_drops_self(radio):
_tune(radio)
received = []
radio.on_message(lambda p: received.append(p))
radio._handle_incoming(_msg(sender="A"))
assert received == []
def test_handle_incoming_drops_param_mismatch(radio):
_tune(radio, sf=8)
received = []
radio.on_message(lambda p: received.append(p))
radio._handle_incoming(_msg(sf=7))
assert received == []
def test_handle_incoming_delivers_match(radio):
_tune(radio)
received = []
radio.on_message(lambda p: received.append(p))
radio._handle_incoming(_msg())
assert len(received) == 1
assert received[0].sender == "B"
assert received[0].text.startswith("1234567890.0,")
def test_packet_loss_drops_test_packets(monkeypatch):
monkeypatch.setenv("SIM_PACKET_LOSS", "1.0") # drop everything
monkeypatch.setenv("SIM_SEED", "fixed")
r = UDPRadio(station="A", peers=["B"])
_tune(r)
received = []
r.on_message(lambda p: received.append(p))
for i in range(20):
r._handle_incoming(_msg(packet_id=i, text=f"1234.5,1.0,{i}|x"))
assert received == [] # 100% drop
def test_packet_loss_does_not_drop_protocol_messages(monkeypatch):
monkeypatch.setenv("SIM_PACKET_LOSS", "1.0")
monkeypatch.setenv("SIM_SEED", "fixed")
r = UDPRadio(station="A", peers=["B"])
_tune(r)
received = []
r.on_message(lambda p: received.append(p))
# heartbeat + START + STOP should never drop, regardless of loss.
r._handle_incoming(_msg(text="1234567890|B|IDLE|-100"))
r._handle_incoming(_msg(text="START|name:foo"))
r._handle_incoming(_msg(text="STOP"))
r._handle_incoming(_msg(text="1234567890|B|next:915.1/500/7/5/22/40"))
assert len(received) == 4
def test_packet_loss_partial_with_seed_is_deterministic(monkeypatch):
monkeypatch.setenv("SIM_PACKET_LOSS", "0.5")
monkeypatch.setenv("SIM_SEED", "fixed-seed")
def run_once():
r = UDPRadio(station="A", peers=["B"])
_tune(r)
out = []
r.on_message(lambda p: out.append(p.packet_id))
for i in range(50):
r._handle_incoming(_msg(packet_id=i, text=f"100.0,1.0,{i}|x"))
return out
a = run_once()
b = run_once()
assert a == b # deterministic
assert 0 < len(a) < 50 # some dropped, some kept
def test_packet_loss_zero_keeps_all_test_packets(monkeypatch):
monkeypatch.setenv("SIM_PACKET_LOSS", "0")
r = UDPRadio(station="A", peers=["B"])
_tune(r)
received = []
r.on_message(lambda p: received.append(p))
for i in range(10):
r._handle_incoming(_msg(packet_id=i, text=f"100.0,1.0,{i}|y"))
assert len(received) == 10