171 lines
4.3 KiB
Python
171 lines
4.3 KiB
Python
"""
|
|
LoRa Airtime Calculation.
|
|
|
|
Implements the real LoRa airtime formula for accurate simulation.
|
|
Reference: Semtech SX1276/77/78/79 Datasheet
|
|
"""
|
|
|
|
import math
|
|
from sim import config
|
|
|
|
|
|
def calculate_symbol_time(sf: int, bw: int) -> float:
|
|
"""
|
|
Calculate symbol time.
|
|
|
|
T_symbol = 2^SF / BW
|
|
|
|
Args:
|
|
sf: Spreading Factor (7-12)
|
|
bw: Bandwidth in Hz
|
|
|
|
Returns:
|
|
Symbol time in seconds
|
|
"""
|
|
return (2**sf) / bw
|
|
|
|
|
|
def calculate_payload_airtime(
|
|
payload_size: int,
|
|
sf: int,
|
|
bw: int,
|
|
cr: int,
|
|
use_header: bool = True,
|
|
low_data_rate_optimize: bool = None,
|
|
) -> float:
|
|
"""
|
|
Calculate payload airtime.
|
|
|
|
Args:
|
|
payload_size: Payload size in bytes
|
|
sf: Spreading Factor (7-12)
|
|
bw: Bandwidth in Hz
|
|
cr: Coding Rate (5-8, represents 4/5 to 4/8)
|
|
use_header: Whether packet header is present
|
|
low_data_rate_optimize: Low Data Rate Optimization flag
|
|
Set to True if SF >= 11 or BW <= 125kHz
|
|
|
|
Returns:
|
|
Payload airtime in seconds
|
|
"""
|
|
# Determine DE (Low Data Rate Optimization)
|
|
if low_data_rate_optimize is None:
|
|
# Auto-detect: DE = 1 if SF >= 11 or BW <= 125 kHz
|
|
de = 1 if (sf >= 11 or bw <= 125000) else 0
|
|
else:
|
|
de = 1 if low_data_rate_optimize else 0
|
|
|
|
# H = 0 if header is present, 1 if no header
|
|
h = 0 if use_header else 1
|
|
|
|
# Calculate number of payload symbols
|
|
# N_payload = 8 + max(ceil((8*PL - 4*SF + 28 - 16 - 20*H) / (4*(SF - 2*DE))) * (CR + 4), 0)
|
|
numerator = 8 * payload_size - 4 * sf + 28 - 16 - 20 * h
|
|
denominator = 4 * (sf - 2 * de)
|
|
|
|
if denominator <= 0:
|
|
# SF - 2*DE <= 0, use minimum
|
|
n_payload = 0
|
|
else:
|
|
n_payload = 8 + max(math.ceil(numerator / denominator) * (cr + 4), 0)
|
|
|
|
# Calculate time
|
|
symbol_time = calculate_symbol_time(sf, bw)
|
|
return n_payload * symbol_time
|
|
|
|
|
|
def calculate_preamble_airtime(sf: int, bw: int, preamble: int = None) -> float:
|
|
"""
|
|
Calculate preamble airtime.
|
|
|
|
T_preamble = (PREAMBLE + 4.25) * T_symbol
|
|
|
|
Args:
|
|
sf: Spreading Factor (7-12)
|
|
bw: Bandwidth in Hz
|
|
preamble: Number of preamble symbols (default from config)
|
|
|
|
Returns:
|
|
Preamble airtime in seconds
|
|
"""
|
|
if preamble is None:
|
|
preamble = config.PREAMBLE
|
|
|
|
symbol_time = calculate_symbol_time(sf, bw)
|
|
return (preamble + 4.25) * symbol_time
|
|
|
|
|
|
def calculate_packet_airtime(
|
|
payload_size: int,
|
|
sf: int = None,
|
|
bw: int = None,
|
|
cr: int = None,
|
|
preamble: int = None,
|
|
) -> float:
|
|
"""
|
|
Calculate total packet airtime.
|
|
|
|
T_packet = T_preamble + T_payload
|
|
|
|
Args:
|
|
payload_size: Payload size in bytes
|
|
sf: Spreading Factor (default from config)
|
|
bw: Bandwidth in Hz (default from config)
|
|
cr: Coding Rate (default from config)
|
|
preamble: Number of preamble symbols (default from config)
|
|
|
|
Returns:
|
|
Total packet airtime in seconds
|
|
"""
|
|
if sf is None:
|
|
sf = config.SF
|
|
if bw is None:
|
|
bw = config.BW
|
|
if cr is None:
|
|
cr = config.CR
|
|
|
|
preamble_time = calculate_preamble_airtime(sf, bw, preamble)
|
|
payload_time = calculate_payload_airtime(payload_size, sf, bw, cr)
|
|
|
|
return preamble_time + payload_time
|
|
|
|
|
|
def calculate_ack_time(ack_seq: int = 1) -> float:
|
|
"""
|
|
Calculate ACK packet airtime.
|
|
|
|
ACK packet structure:
|
|
- 1 byte for type
|
|
- 1 byte for seq
|
|
- 1 byte for dst
|
|
- Total: 3 bytes (minimal)
|
|
|
|
Args:
|
|
ack_seq: ACK sequence number (affects total size)
|
|
|
|
Returns:
|
|
ACK airtime in seconds
|
|
"""
|
|
# ACK = type(1) + seq(1) + dst(1) + reserved(1) = 4 bytes minimum
|
|
ack_size = 4
|
|
return calculate_packet_airtime(ack_size)
|
|
|
|
|
|
# Convenience function for quick calculations
|
|
def get_hello_airtime() -> float:
|
|
"""Get airtime for HELLO packet (minimal size)."""
|
|
# HELLO = type(1) + src(1) + cost(4) + seq(1) = ~7 bytes
|
|
return calculate_packet_airtime(7)
|
|
|
|
|
|
def get_data_airtime(payload_size: int = 16) -> float:
|
|
"""Get airtime for DATA packet."""
|
|
# DATA = type(1) + src(1) + dst(1) + seq(2) + hop(1) + payload(n)
|
|
base_size = 6
|
|
return calculate_packet_airtime(base_size + payload_size)
|
|
|
|
|
|
def get_ack_airtime() -> float:
|
|
"""Get airtime for ACK packet."""
|
|
return calculate_ack_time()
|