396 lines
12 KiB
Python
396 lines
12 KiB
Python
"""
|
|
Node implementation for LoRa multi-hop network simulation.
|
|
|
|
Each node runs three main coroutines:
|
|
- hello_task(): Periodic HELLO broadcast for neighbor discovery
|
|
- data_task(): Data generation and forwarding
|
|
- receive_task(): Packet reception handling
|
|
"""
|
|
|
|
import simpy
|
|
import random
|
|
from typing import Optional
|
|
from dataclasses import dataclass
|
|
|
|
from sim.core.packet import Packet, PacketType
|
|
from sim.routing import (
|
|
GradientRouting,
|
|
FloodingRouting,
|
|
RandomForwardRouting,
|
|
BROADCAST,
|
|
)
|
|
from sim.mac.reliable_mac import ReliableMAC
|
|
from sim.radio.channel import Channel, ReceivedPacket
|
|
from sim.core.metrics import MetricsCollector
|
|
from sim import config
|
|
|
|
|
|
def create_routing(node_id: int, is_sink: bool, routing_type: str = "gradient"):
|
|
"""
|
|
Factory function to create routing protocol.
|
|
|
|
Args:
|
|
node_id: Node ID
|
|
is_sink: Whether this is the sink
|
|
routing_type: Type of routing ("gradient", "flooding", "random")
|
|
|
|
Returns:
|
|
Routing protocol instance
|
|
"""
|
|
routing_type = routing_type.lower()
|
|
if routing_type == "flooding":
|
|
return FloodingRouting(node_id, is_sink)
|
|
elif routing_type == "random":
|
|
return RandomForwardRouting(node_id, is_sink)
|
|
else: # default to gradient
|
|
return GradientRouting(node_id, is_sink)
|
|
|
|
|
|
@dataclass
|
|
class NodeStats:
|
|
"""Node statistics."""
|
|
|
|
hello_sent: int = 0
|
|
hello_received: int = 0
|
|
data_sent: int = 0
|
|
data_received: int = 0
|
|
data_forwarded: int = 0
|
|
ack_received: int = 0
|
|
packets_dropped: int = 0
|
|
route_updates: int = 0
|
|
|
|
|
|
class Node:
|
|
"""
|
|
LoRa node with routing and MAC.
|
|
|
|
STM32 consistency:
|
|
- on_receive() ↔ OnRxDone
|
|
- send_packet() ↔ Radio.Send
|
|
- timeout_event ↔ UTIL_TIMER
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
env: simpy.Environment,
|
|
node_id: int,
|
|
x: float,
|
|
y: float,
|
|
channel: Channel,
|
|
is_sink: bool = False,
|
|
metrics_collector: MetricsCollector = None,
|
|
routing_type: str = "gradient",
|
|
):
|
|
"""
|
|
Initialize node.
|
|
|
|
Args:
|
|
env: SimPy environment
|
|
node_id: Node ID
|
|
x: X coordinate
|
|
y: Y coordinate
|
|
channel: Wireless channel
|
|
is_sink: Whether this is the sink node
|
|
metrics_collector: Metrics collector for observability
|
|
routing_type: Type of routing ("gradient", "flooding", "random")
|
|
"""
|
|
self.env = env
|
|
self.node_id = node_id
|
|
self.x = x
|
|
self.y = y
|
|
self.channel = channel
|
|
self.is_sink = is_sink
|
|
|
|
# Metrics collector for hop tracking
|
|
self.metrics_collector = metrics_collector
|
|
|
|
# Routing type
|
|
self.routing_type = routing_type
|
|
|
|
# Register position with channel
|
|
self.channel.register_node(node_id, x, y)
|
|
|
|
# Layers - use factory to create routing
|
|
self.routing = create_routing(node_id, is_sink, routing_type)
|
|
self.mac = ReliableMAC(env, node_id)
|
|
|
|
# Sequence numbers
|
|
self.hello_seq = 0
|
|
self.data_seq = 0
|
|
|
|
# Statistics
|
|
self.stats = NodeStats()
|
|
|
|
# Event to signal when converged
|
|
self.converged = env.event()
|
|
self._converged = False
|
|
|
|
# Process handles (set when started)
|
|
self._hello_process: Optional[simpy.Process] = None
|
|
self._data_process: Optional[simpy.Process] = None
|
|
self._receive_process: Optional[simpy.Process] = None
|
|
self._mac_process: Optional[simpy.Process] = None
|
|
|
|
def start(self):
|
|
"""Start all node tasks."""
|
|
self._hello_process = self.env.process(self.hello_task())
|
|
self._data_process = self.env.process(self.data_task())
|
|
self._receive_process = self.env.process(self.receive_task())
|
|
self._mac_process = self.env.process(self.mac_task())
|
|
|
|
def hello_task(self):
|
|
"""
|
|
Periodic HELLO broadcast task.
|
|
|
|
Broadcasts routing information to neighbors.
|
|
"""
|
|
while True:
|
|
# Wait for HELLO period with small random jitter to reduce collisions
|
|
jitter = random.uniform(0, config.HELLO_PERIOD * 0.3)
|
|
yield self.env.timeout(config.HELLO_PERIOD + jitter)
|
|
|
|
# Create and send HELLO packet
|
|
packet = self.routing.create_hello_packet()
|
|
self.stats.hello_sent += 1
|
|
|
|
# Transmit on channel (broadcast)
|
|
self.channel.transmit(packet, self.node_id)
|
|
|
|
def data_task(self):
|
|
"""
|
|
Data generation and forwarding task.
|
|
|
|
- All nodes generate data periodically
|
|
- Data is sent towards sink via parent
|
|
- Sink receives and counts data
|
|
"""
|
|
# Wait for initial convergence
|
|
yield self.env.timeout(config.HELLO_PERIOD * 3)
|
|
|
|
# Check if route is established
|
|
if not self.routing.is_route_valid() and not self.is_sink:
|
|
self._check_convergence()
|
|
|
|
while True:
|
|
# All nodes generate data with random jitter to avoid collisions
|
|
jitter = random.uniform(0, config.DATA_PERIOD * 0.5)
|
|
yield self.env.timeout(config.DATA_PERIOD + jitter)
|
|
|
|
# Only generate if we have a route to sink
|
|
if self.is_sink:
|
|
# Sink doesn't generate new data, it just receives
|
|
pass
|
|
elif self.routing.is_route_valid():
|
|
# Regular nodes generate and send data
|
|
self._generate_data()
|
|
|
|
def receive_task(self):
|
|
"""
|
|
Receive task - processes incoming packets.
|
|
|
|
This is the main receive handler, called by channel.
|
|
"""
|
|
# This is a generator that waits forever - actual receives
|
|
# come through on_receive() callback
|
|
while True:
|
|
yield self.env.timeout(float("inf"))
|
|
|
|
def on_receive(self, received: ReceivedPacket):
|
|
"""
|
|
Handle received packet (called by channel).
|
|
|
|
This corresponds to STM32's OnRxDone callback.
|
|
|
|
Args:
|
|
received: Received packet info
|
|
"""
|
|
packet = received.packet
|
|
|
|
# Drop if collision
|
|
if received.collision:
|
|
self.stats.packets_dropped += 1
|
|
return
|
|
|
|
# Update packet RSSI
|
|
packet.rssi = received.rssi
|
|
|
|
# Process based on type
|
|
if packet.is_hello:
|
|
self._process_hello(packet)
|
|
elif packet.is_data:
|
|
self._process_data(packet)
|
|
elif packet.is_ack:
|
|
self._process_ack(packet)
|
|
|
|
def _process_hello(self, packet: Packet):
|
|
"""Process received HELLO packet."""
|
|
self.stats.hello_received += 1
|
|
|
|
# Update routing
|
|
if self.routing.process_hello(packet, packet.rssi):
|
|
self.stats.route_updates += 1
|
|
|
|
# Check if we just converged
|
|
if not self._converged and self.routing.is_route_valid():
|
|
self._check_convergence()
|
|
|
|
def _process_data(self, packet: Packet):
|
|
"""Process received DATA packet."""
|
|
# If we're the sink, receive the packet
|
|
if self.is_sink:
|
|
self.stats.data_received += 1
|
|
|
|
# Record unique packet received (for PDR)
|
|
if self.metrics_collector:
|
|
self.metrics_collector.record_packet_received(packet.src, packet.seq)
|
|
self.metrics_collector.record_hop_count(packet.hop)
|
|
|
|
# Send ACK back to source
|
|
self._send_ack(packet.src, packet.seq)
|
|
return
|
|
|
|
# For flooding: check if we've seen this packet before
|
|
if hasattr(self.routing, "should_forward"):
|
|
if not self.routing.should_forward(packet):
|
|
return # Already forwarded
|
|
|
|
# Get next hop and handle flooding
|
|
if hasattr(self.routing, "get_next_hop"):
|
|
next_hop = self.routing.get_next_hop(packet)
|
|
|
|
# Handle flooding (BROADCAST)
|
|
if next_hop == BROADCAST:
|
|
# Forward to all neighbors
|
|
neighbors = self.routing.get_all_neighbors()
|
|
for neighbor in neighbors:
|
|
if neighbor != packet.src: # Don't send back to sender
|
|
self._forward_data_to_neighbor(packet, neighbor)
|
|
return
|
|
|
|
# Handle regular unicast routing
|
|
if next_hop is not None and next_hop != self.node_id:
|
|
self._forward_data(packet)
|
|
|
|
def _forward_data_to_neighbor(self, packet: Packet, neighbor: int):
|
|
"""Forward a data packet to a specific neighbor (for flooding)."""
|
|
# Record this node in the path and increment hop count
|
|
packet.add_hop(self.node_id)
|
|
|
|
# Send to specific neighbor
|
|
self.mac.enqueue(packet, neighbor)
|
|
self.stats.data_forwarded += 1
|
|
|
|
def _send_ack(self, dst: int, seq: int):
|
|
"""Send ACK packet to destination."""
|
|
ack = Packet(
|
|
type=PacketType.ACK,
|
|
src=self.node_id,
|
|
dst=dst,
|
|
seq=seq,
|
|
hop=0,
|
|
payload=None,
|
|
)
|
|
# Send directly to the node (unreliable, no MAC queue)
|
|
self.channel.transmit(ack, self.node_id)
|
|
|
|
def _process_ack(self, packet: Packet):
|
|
"""Process received ACK packet."""
|
|
if self.mac.ack_received(packet.seq):
|
|
self.stats.ack_received += 1
|
|
|
|
def _generate_data(self):
|
|
"""Generate a new data packet and send towards sink."""
|
|
packet = Packet(
|
|
type=PacketType.DATA,
|
|
src=self.node_id,
|
|
dst=config.SINK_NODE_ID,
|
|
seq=self.data_seq,
|
|
hop=1, # Start at 1 hop (first link)
|
|
payload=f"data_{self.data_seq}",
|
|
)
|
|
self.data_seq += 1
|
|
self.stats.data_sent += 1
|
|
|
|
# Get next hop and send
|
|
next_hop = self.routing.get_next_hop(packet)
|
|
|
|
# Handle flooding (broadcast to all)
|
|
if next_hop == BROADCAST:
|
|
neighbors = self.routing.get_all_neighbors()
|
|
for neighbor in neighbors:
|
|
self.mac.enqueue(packet, neighbor)
|
|
elif next_hop is not None:
|
|
self.mac.enqueue(packet, next_hop)
|
|
|
|
def _forward_data(self, packet: Packet):
|
|
"""Forward a data packet towards sink."""
|
|
# Record this node in the path and increment hop count
|
|
packet.add_hop(self.node_id)
|
|
|
|
# Get next hop
|
|
next_hop = self.routing.get_next_hop(packet)
|
|
|
|
# Handle flooding
|
|
if next_hop == BROADCAST:
|
|
neighbors = self.routing.get_all_neighbors()
|
|
for neighbor in neighbors:
|
|
if neighbor != packet.src:
|
|
self._forward_data_to_neighbor(packet, neighbor)
|
|
elif next_hop is not None:
|
|
self.mac.enqueue(packet, next_hop)
|
|
self.stats.data_forwarded += 1
|
|
|
|
def _check_forward(self):
|
|
"""Check if there's data to forward."""
|
|
pass
|
|
|
|
def _check_convergence(self):
|
|
"""Check if routing has converged."""
|
|
if not self._converged:
|
|
if self.routing.is_route_valid():
|
|
self._converged = True
|
|
self.converged.succeed()
|
|
|
|
def mac_task(self):
|
|
"""
|
|
MAC layer task - handles sending queue and retries.
|
|
|
|
Simplified: No ACK waiting for DATA packets to improve throughput.
|
|
"""
|
|
while True:
|
|
if self.mac.has_pending():
|
|
item = self.mac.dequeue()
|
|
if item:
|
|
packet, dst = item
|
|
|
|
# Wait for backoff
|
|
backoff = self.mac.calculate_backoff()
|
|
yield self.env.timeout(backoff)
|
|
|
|
# Send packet
|
|
self.channel.transmit(packet, self.node_id)
|
|
self.mac.record_send()
|
|
|
|
# Small wait to prevent busy loop
|
|
yield self.env.timeout(0.1)
|
|
|
|
def send_packet(self, packet: Packet, dst: int):
|
|
"""Send a packet (called by upper layers)."""
|
|
self.channel.transmit(packet, self.node_id)
|
|
|
|
def get_stats(self) -> dict:
|
|
"""Get node statistics."""
|
|
return {
|
|
"node_id": self.node_id,
|
|
"is_sink": self.is_sink,
|
|
"x": self.x,
|
|
"y": self.y,
|
|
"stats": self.stats.__dict__,
|
|
"routing": self.routing.get_routing_table(),
|
|
"mac": self.mac.get_stats(),
|
|
}
|
|
|
|
def wait_converged(self):
|
|
"""Wait for routing to converge."""
|
|
return self.converged
|