modjam/tests/test_udp_radio.py

154 lines
4.6 KiB
Python
Raw Permalink Normal View History

import os
import pytest
from modjam.radio.udp import UDPRadio, _params_match, _psk_hash
@pytest.fixture
def radio(monkeypatch):
"""Construct a UDPRadio without binding sockets."""
monkeypatch.delenv("SIM_PACKET_LOSS", raising=False)
monkeypatch.delenv("SIM_SEED", raising=False)
return UDPRadio(station="A", peers=["B", "C"])
def _msg(**overrides):
base = {
"sender": "B",
"freq": 915.1, "bw": 500.0, "sf": 7, "cr": 5, "pow": 22,
"channel": "modjam", "psk_hash": "abc123",
"packet_id": 42,
"text": "1234567890.0,1.0,1,deadbeef|payload",
"ts": 0.0,
}
base.update(overrides)
return base
def _tune(radio, **overrides):
radio._cur.update({
"freq": 915.1, "bw": 500.0, "sf": 7, "cr": 5, "pow": 22,
"channel": "modjam", "psk_hash": "abc123",
})
radio._cur.update(overrides)
def test_params_match_exact():
a = {"freq": 915.1, "bw": 500.0, "sf": 7, "cr": 5, "channel": "x", "psk_hash": "h"}
b = dict(a)
assert _params_match(a, b) is True
@pytest.mark.parametrize("field,bad", [
("freq", 916.0),
("bw", 250.0),
("sf", 8),
("cr", 6),
("channel", "other"),
("psk_hash", "different"),
])
def test_params_match_rejects_mismatch(field, bad):
a = {"freq": 915.1, "bw": 500.0, "sf": 7, "cr": 5, "channel": "x", "psk_hash": "h"}
b = dict(a)
b[field] = bad
assert _params_match(a, b) is False
def test_psk_hash_deterministic_and_short():
h1 = _psk_hash("hello")
h2 = _psk_hash("hello")
assert h1 == h2 and len(h1) == 16
assert _psk_hash("hello") != _psk_hash("world")
def test_handle_incoming_drops_self(radio):
_tune(radio)
received = []
radio.on_message(lambda p: received.append(p))
radio._handle_incoming(_msg(sender="A"))
assert received == []
def test_handle_incoming_drops_param_mismatch(radio):
_tune(radio, sf=8)
received = []
radio.on_message(lambda p: received.append(p))
radio._handle_incoming(_msg(sf=7))
assert received == []
def test_handle_incoming_delivers_match(radio):
_tune(radio)
received = []
radio.on_message(lambda p: received.append(p))
radio._handle_incoming(_msg())
assert len(received) == 1
assert received[0].sender == "B"
assert received[0].text.startswith("1234567890.0,")
def test_packet_loss_drops_test_packets(monkeypatch):
monkeypatch.setenv("SIM_PACKET_LOSS", "1.0") # drop everything
monkeypatch.setenv("SIM_SEED", "fixed")
r = UDPRadio(station="A", peers=["B"])
_tune(r)
received = []
r.on_message(lambda p: received.append(p))
for i in range(20):
r._handle_incoming(_msg(packet_id=i, text=f"1234.5,1.0,{i},aabbccdd|x"))
assert received == [] # 100% drop
def test_packet_loss_does_not_drop_protocol_messages(monkeypatch):
monkeypatch.setenv("SIM_PACKET_LOSS", "1.0")
monkeypatch.setenv("SIM_SEED", "fixed")
r = UDPRadio(station="A", peers=["B"])
_tune(r)
received = []
r.on_message(lambda p: received.append(p))
# heartbeat + START + STOP should never drop, regardless of loss.
r._handle_incoming(_msg(text="1234567890|B|IDLE|-100"))
r._handle_incoming(_msg(text="START|name:foo"))
r._handle_incoming(_msg(text="STOP"))
r._handle_incoming(_msg(text="1234567890|B|next:915.1/500/7/5/22/40"))
assert len(received) == 4
def test_packet_loss_partial_with_seed_is_deterministic(monkeypatch):
monkeypatch.setenv("SIM_PACKET_LOSS", "0.5")
monkeypatch.setenv("SIM_SEED", "fixed-seed")
def run_once():
r = UDPRadio(station="A", peers=["B"])
_tune(r)
out = []
r.on_message(lambda p: out.append(p.packet_id))
for i in range(50):
r._handle_incoming(_msg(packet_id=i, text=f"100.0,1.0,{i},11223344|x"))
return out
a = run_once()
b = run_once()
assert a == b # deterministic
assert 0 < len(a) < 50 # some dropped, some kept
def test_packet_loss_zero_keeps_all_test_packets(monkeypatch):
monkeypatch.setenv("SIM_PACKET_LOSS", "0")
r = UDPRadio(station="A", peers=["B"])
_tune(r)
received = []
r.on_message(lambda p: received.append(p))
for i in range(10):
r._handle_incoming(_msg(packet_id=i, text=f"100.0,1.0,{i},55667788|y"))
assert len(received) == 10
def test_test_packet_regex_matches_legacy_and_token_formats():
from modjam.radio.udp import _TEST_PACKET_RE
assert _TEST_PACKET_RE.match("1234.5,1.0,42|abc") # legacy 3-field
assert _TEST_PACKET_RE.match("1234.5,1.0,42,deadbeef|abc") # with token
assert not _TEST_PACKET_RE.match("1234567890|A|IDLE|-100") # heartbeat
assert not _TEST_PACKET_RE.match("START|name:foo")