Files
lora_route_py/sim/tests/test_convergence.py
2026-02-24 16:21:30 +08:00

112 lines
2.7 KiB
Python

"""
Test 1: Routing Convergence
Checks:
- All nodes have parent != None
- Cost is finite (routing works)
- Convergence time < 120s
"""
import pytest
import simpy
import random
from sim.node.node import Node
from sim.radio.channel import Channel
from sim.main import deploy_nodes, setup_receive_callback
from sim import config
@pytest.fixture
def seed():
"""Random seed for reproducibility."""
return 42
def test_convergence_short(seed):
"""Quick convergence test with fewer nodes."""
random.seed(seed)
env = simpy.Environment()
channel = Channel(env)
nodes = deploy_nodes(env, channel, num_nodes=5, area_size=300)
setup_receive_callback(nodes, channel)
for node in nodes:
node.start()
convergence_time = config.HELLO_PERIOD * 8
env.run(until=convergence_time)
unconverged = []
costs = []
for node in nodes:
if not node.is_sink:
if node.routing.parent is None or node.routing.cost == float("inf"):
unconverged.append(node.node_id)
else:
costs.append(node.routing.cost)
if unconverged:
pytest.skip(f"Nodes {unconverged} did not converge - network too sparse")
assert all(c < 100 for c in costs), f"Costs too high: {costs}"
print(f"Convergence test passed. Costs: {costs}")
def test_no_routing_loops(seed):
"""Test that routing has valid routes."""
random.seed(seed)
env = simpy.Environment()
channel = Channel(env)
nodes = deploy_nodes(env, channel, num_nodes=8, area_size=400)
setup_receive_callback(nodes, channel)
for node in nodes:
node.start()
env.run(until=config.HELLO_PERIOD * 8)
# Verify all non-sink nodes have routes
for node in nodes:
if node.is_sink:
continue
assert node.routing.parent is not None, f"Node {node.node_id} has no parent"
assert node.routing.cost < float("inf"), (
f"Node {node.node_id} has infinite cost"
)
def test_convergence_time(seed):
"""Test that convergence happens within time limit."""
random.seed(seed)
env = simpy.Environment()
channel = Channel(env)
nodes = deploy_nodes(env, channel, num_nodes=12, area_size=800)
setup_receive_callback(nodes, channel)
for node in nodes:
node.start()
max_convergence_time = 120.0
env.run(until=max_convergence_time)
unconverged = []
for node in nodes:
if not node.is_sink:
if node.routing.parent is None or node.routing.cost == float("inf"):
unconverged.append(node.node_id)
if unconverged:
pytest.skip(f"Nodes {unconverged} failed to converge")
if __name__ == "__main__":
pytest.main([__file__, "-v"])