From 6d2aa58d370c265376bf89d8d3fe276c08d76038 Mon Sep 17 00:00:00 2001 From: Alec Perkins Date: Mon, 5 Jan 2026 21:50:56 -0500 Subject: [PATCH] Working cuesheet version --- .gitignore | 1 + README.md | 2 + modjam.py | 354 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 357 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 modjam.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e43b0f9 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.DS_Store diff --git a/README.md b/README.md new file mode 100644 index 0000000..379d90b --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ + +# LoRa Modulation Jam diff --git a/modjam.py b/modjam.py new file mode 100644 index 0000000..edefc39 --- /dev/null +++ b/modjam.py @@ -0,0 +1,354 @@ +from datetime import datetime, timedelta +from meshtastic.serial_interface import SerialInterface +from meshtastic.tcp_interface import TCPInterface +from pubsub import pub +from random import choice +from time import time, sleep +from typing import TypedDict, Literal +import argparse +import asyncio +import json +import string +import sys + + +logfile = None +interface: SerialInterface | None = None + + +type Bandwidth = Literal[62,125,250,500] +type SpreadFactor = Literal[5,6,7,8,9,10,11,12] +type CodingRate = Literal[5,6,7,8] +type Station = Literal['A','B','C','D'] + +class Config(TypedDict): + test_case_duration : int + test_case_padding : int + transmission_padding : int + frequency : list[float] + bandwidth : list[Bandwidth] + spread_factor : list[SpreadFactor] + coding_rate : list[CodingRate] + payload_size : list[int] + power : list[int] + start_at : int + stations : list[Station] + +class RunConfig(TypedDict): + this_station : Station + port : str + # hostname: str + +class RadioConfig(TypedDict): + bw : Bandwidth + sf : SpreadFactor + cr : CodingRate + freq : float + pow : int + +class CuesheetEntry(TypedDict): + start : int # Start case at T-s + end : int # End case at T-s + between : int # seconds between transmissions + freq : float + bw : Bandwidth + sf : SpreadFactor + cr : CodingRate + pow : int + size : int + sender : Station + +type Cuesheet = list[CuesheetEntry] + +def prepareConfig (): + parser = argparse.ArgumentParser( + prog='ModulationJam', + description='', + epilog='', + ) + + parser = argparse.ArgumentParser(prog='ModulationJam') + subparsers = parser.add_subparsers(help='') + parser_a = subparsers.add_parser('run', help='') + parser_a.add_argument('--test-case-duration', type=int, default=600, help='The total duration in seconds of each test case, including padding.') + parser_a.add_argument('--test-case-padding', type=int, default=60, help='The number of seconds between actual execution of test cases.') + parser_a.add_argument('--transmission-padding', type=int, default=2, help='The number of seconds between each test transmission.') + parser_a.add_argument('--frequency', type=float, action='append', default=[], help='The frequency in MHz to center on. Set repeatedly to test multiple frequencies.') + parser_a.add_argument('--bandwidth', type=float, action='append', choices=(62,125,250,500), default=[], help='The bandwidths in kHz to test (62 for 62.5kHz). Set repeatedly to test multiple bandwidths.') + parser_a.add_argument('--spread-factor', type=int, action='append', choices=(5,6,7,8,9,10,11,12), default=[], help='The spread factors to test. Set repeatedly to test multiple spread factors.') + parser_a.add_argument('--coding-rate', type=int, action='append', choices=(5,6,7,8), default=[], help='The coding rate to test. Set repeatedly to test multiple coding rates.') + parser_a.add_argument('--payload-size', type=int, action='append', default=[], help='The total payload size to set. Set repeatedly to test multiple payload sizes. default (22,)') + parser_a.add_argument('--power', type=int, action='append', default=[], help='The power in dBm to test. Set repeatedly to test mutliple power settings.') + parser_a.add_argument('--start-at', type=int, default=5, help='Start at the next %Nth minute of the hour; default 5, ie starts at the next minute that is a multiple of 5.') + parser_a.add_argument('--stations', type=str, action='append', choices=('A','B','C','D'), default=[], help='The identifiers of all participating stations..') + parser_a.add_argument('--this-station', type=str, required=True, choices=('A','B','C','D'), help='The identifier for this station.') + parser_a.add_argument('--port', type=str, required=False, help='The serial port of the node device.') + # parser_a.add_argument('--hostname', type=str, required=False, help='The tcp hostname of the node device') + + cli_args = parser.parse_args(sys.argv[1:]) + + LIST_DEFAULTS = { + 'frequency': [915.100], + 'bandwidth': (62,125,250,500), + 'spread_factor': (5,6,7,8,9,10,11,12), + 'coding_rate': (5,6,7,8), + 'payload_size': (40,), + 'power': (22,), + 'stations': ('A','B'), + } + + config: Config = { + 'test_case_duration': cli_args.test_case_duration, + 'test_case_padding': cli_args.test_case_padding, + 'transmission_padding': cli_args.transmission_padding, + 'frequency': cli_args.frequency, + 'bandwidth': cli_args.bandwidth, + 'spread_factor': cli_args.spread_factor, + 'coding_rate': cli_args.coding_rate, + 'payload_size': cli_args.payload_size, + 'power': cli_args.power, + 'start_at': cli_args.start_at, + 'stations': cli_args.stations, + } + + run_config: RunConfig = { + 'this_station': cli_args.this_station, + 'port': cli_args.port, + # 'hostname': cli_args.hostname, + } + + for k,v in config.items(): + if not v and k in LIST_DEFAULTS: + config[k] = LIST_DEFAULTS[k] + + return (config, run_config) + + +def buildCueSheet (config: Config): + cuesheet: Cuesheet = [] + t = 0 + num_permutations = 0 + for freq in config['frequency']: + for bw in config['bandwidth']: + for sf in config['spread_factor']: + for cr in config['coding_rate']: + for pow in config['power']: + num_permutations += 1 + for size in config['payload_size']: + for sender in config['stations']: + start_t = t + end_t = t + config['test_case_duration'] + start_padding_t = int(config['test_case_padding'] / 4 * 3) # Allow more time at the start for radio changes + end_padding_t = int(config['test_case_padding'] / 4) # Small buffer at the end in case the stations are out of sync + cuesheet.append({ + 'start': start_t + start_padding_t, # Start case at T-s + 'end': end_t - end_padding_t, # End case at T-s + 'between': config['transmission_padding'], # seconds between transmissions + 'freq': freq, + 'bw': bw, + 'sf': sf, + 'cr': cr, + 'pow': pow, + 'size': size, + 'sender': sender, + }) + t = end_t + print(num_permutations, 'permutations', len(cuesheet), 'tests') + print(t / (60.0 * 60.0), 'hours') + return cuesheet + + +def log (**parts): + if not logfile: + raise Exception('No logfile') + ts = time() + msg = '\t'.join(map(lambda x: str(x), parts)) + line = f'{ts}\t{msg}\n'; + print(line) + logfile.write(json.dumps({**parts, 'ts': ts})) + + +def configureRadio (conf: RadioConfig): + if not interface: + raise Exception('No interface connected') + node = interface.getNode('^all') + changed = False + + use_preset = False + bandwidth = conf['bw'] + spread_factor = conf['sf'] + coding_rate = conf['cr'] + override_frequency = conf['freq'] + tx_power = conf['pow'] + tx_enabled = True + hop_limit = 0 + + if use_preset != node.localConfig.lora.use_preset: + changed = True + node.localConfig.lora.use_preset = use_preset + if bandwidth != node.localConfig.lora.bandwidth: + changed = True + node.localConfig.lora.bandwidth = bandwidth + if spread_factor != node.localConfig.lora.spread_factor: + changed = True + node.localConfig.lora.spread_factor = spread_factor + if coding_rate != node.localConfig.lora.coding_rate: + changed = True + node.localConfig.lora.coding_rate = coding_rate + if override_frequency != node.localConfig.lora.override_frequency: + changed = True + node.localConfig.lora.override_frequency = override_frequency + if tx_power != node.localConfig.lora.tx_power: + changed = True + node.localConfig.lora.tx_power = tx_power + if tx_enabled != node.localConfig.lora.tx_enabled: + changed = True + node.localConfig.lora.tx_enabled = tx_enabled + if hop_limit != node.localConfig.lora.hop_limit: + changed = True + node.localConfig.lora.hop_limit = hop_limit + + if changed: + node.beginSettingsTransaction() + node.writeConfig('lora') + node.commitSettingsTransaction() + return changed + + +def reconnectRadio (): + global interface + if not interface: + raise Exception('No interface connected') + reconnected = None + reconnected_node = None + while not reconnected and not reconnected_node: + try: + reconnected = SerialInterface(interface.devPath, noNodes=True) + except: + pass + sleep(1) + if reconnected: + reconnected_node = interface.getNode('^all') + sleep(2) + interface = reconnected + + +def onReceiveText (packet, interface): + log( + event = 'received', + packet_id = packet['id'], + text = packet['decoded']['payload'].decode('utf-8'), + ) + + +txed = dict() +active_tx_id = None +active_tx_ms = None +def onStatus (line: str): + global active_tx_id + global active_tx_ms + if 'Started Tx' in line: + _, parts = line.split('Started Tx (id=') + active_tx_id = parts.split(' ')[0] + print('active_tx_id',active_tx_id) + elif active_tx_id: + # This assumes there is only ever one active transmission at a time (weird if not true!) + if 'Packet TX' in line: + active_tx_ms = line.split(':').pop().strip() + print('active_tx_ms', active_tx_ms, active_tx_id) + elif 'Completed sending' in line and active_tx_ms: + active_tx_num = int(active_tx_id, 16) + txed[active_tx_num] = int(active_tx_ms.replace('ms','')) + print('active_tx_num', active_tx_num, active_tx_id, txed[active_tx_num]) + active_tx_id = None + active_tx_ms = None + + +async def waitForTx (packet_id: int): + print('waiting for tx', packet_id) + s = time() + timed_out = False + while not packet_id in txed and not timed_out: + # sleep(0.1) + await asyncio.sleep(0.1) + if time() - s > 45: + timed_out = True + ms = txed.pop(packet_id, None) # pop to avoid a memory leak + if timed_out: + print(packet_id, 'timed out') + else: + print(packet_id, 'sent in', ms) + return ms + + +async def runCues (cuesheet: Cuesheet, run_config: RunConfig): + t = 0 + num_packets = 0 + scenario_prefix = '' + start = time() + + async def sendPacket (sender, size): + if not interface: + raise Exception('No interface connected') + text = f'{time()},{t},{num_packets}|' + while len(text) < size: + text += choice(string.ascii_letters) + print(t, sender, 'sending', text) + packet = interface.sendText(text) + log(event='queued',packet_id=packet.id,scenario=scenario_prefix) + ms = await waitForTx(packet.id) + if ms: + log(event='sent',packet_id=packet.id,duration_ms=ms,text=text) + + while cuesheet: + scenario = cuesheet.pop(0) + print(scenario) + print('configure radio') + scenario_prefix = f'{scenario['freq']},{scenario['bw']},{scenario['sf']},{scenario['cr']},{scenario['pow']}' + configureRadio(scenario) + reconnectRadio() + while t < scenario['start']: + t = time() - start + sleep(0.1) + + while t < scenario['end']: + t = time() - start + if scenario['sender'] == run_config['this_station']: + num_packets += 1 + await sendPacket(scenario['sender'], scenario['size']) + await asyncio.sleep(scenario['between']) + else: + # Wait until the next scenario and just listen + wait_for_s = max(scenario['end'] - t,0.5) + await asyncio.sleep(wait_for_s) + + +def sleepUntilStart (config): + n = datetime.now() + start_at = datetime(n.year,n.month,n.day,n.hour,n.minute) + timedelta(minutes=config['start_at'] - n.minute % config['start_at']) + print('sleeping', start_at - datetime.now(), 'until', start_at) + while datetime.now() < start_at: + sleep(0.1) + print('starting') + + +def main (): + config, run_config = prepareConfig() + cuesheet = buildCueSheet(config) # Do this before sleeping so the timing is displayed + + sleepUntilStart(config) + + pub.subscribe(onStatus, 'meshtastic.log') + pub.subscribe(onReceiveText, 'meshtastic.receive.text') + + global interface + interface = SerialInterface(run_config['port'], noNodes=True) + + global logfile + station = run_config['this_station'] + logfile = open('./' + station + '-' + str(time()) + '.jsonl', 'a') + + + asyncio.run(runCues(cuesheet, run_config)) + +main()