Files
lora_route_py/docs/py_plan.md
sinlatansen 6c0526b2ef init
2026-02-24 15:47:53 +08:00

6.2 KiB
Raw Permalink Blame History

下面是可直接交给执行 AI 的 plan.md。 目标是:严格工程化、可自动实现、可自动测试、可验证正确性,避免模糊描述。

该计划默认:

  • Python ≥ 3.10
  • 使用 SimPy
  • 面向未来 STM32WL 移植(协议一致)

plan.md


STM32WL SubGHz 多跳汇聚网络

SimPy 离散事件仿真工程计划


1. 项目目标

构建一个 协议级一致的离散事件仿真系统,用于验证:

  • 多节点 → 单 Sink 汇聚
  • 多跳转发
  • ACK可靠传输
  • 路由自动收敛
  • LoRa 信道冲突行为

仿真代码必须满足:

routing logic 可直接迁移到 STM32 C 状态机

2. 总体架构

采用 事件驱动 + 分层设计

sim/
│
├── main.py
├── config.py
│
├── core/
│   ├── packet.py
│   ├── event.py
│   └── metrics.py
│
├── radio/
│   ├── channel.py
│   ├── propagation.py
│   └── airtime.py
│
├── mac/
│   └── reliable_mac.py
│
├── routing/
│   └── gradient_routing.py
│
├── node/
│   └── node.py
│
└── tests/
    ├── test_convergence.py
    ├── test_reliability.py
    └── test_collision.py

3. 核心设计原则(必须遵守)

3.1 禁止直接调用

模块之间只能通过接口通信:

Node → MAC → Radio → Channel

禁止:

Node 直接访问 Channel

3.2 状态机驱动

所有行为必须:

event-driven

禁止:

  • while True busy loop
  • sleep()

只能:

yield env.timeout()

3.3 STM32一致性约束重要

函数结构必须未来可映射:

Python STM32
on_receive() OnRxDone
send_packet() Radio.Send
timeout_event UTIL_TIMER

4. 配置系统

config.py

必须集中定义:

NODE_COUNT = 12
AREA_SIZE = 800

HELLO_PERIOD = 8.0
DATA_PERIOD = 30.0

TX_POWER = 14
RSSI_THRESHOLD = -105

ACK_TIMEOUT_FACTOR = 2.5
MAX_RETRY = 3

SF = 9
BW = 125000
CR = 5

禁止硬编码参数。


5. Packet 模型

core/packet.py

class Packet:
    type
    src
    dst
    seq
    hop
    payload

类型:

HELLO = 1
DATA  = 2
ACK   = 3

6. 信道模型

6.1 Channel (radio/channel.py)

职责:

  • 广播传播
  • airtime占用
  • 碰撞检测

接口:

transmit(packet, sender)

6.2 碰撞规则

发生碰撞条件:

time overlap
AND
|RSSI1 - RSSI2| < 6 dB

结果:

全部丢弃

6.3 传播模型

propagation.py

RSSI =
    TX_POWER
    - 10*n*log10(distance)
    + gaussian_noise

参数:

n = 2.7
noise σ = 3

7. Airtime 计算

radio/airtime.py

必须实现真实 LoRa airtime 公式。

输出:

seconds

用于:

ACK timeout
channel busy time
energy calculation

8. MAC 层

mac/reliable_mac.py


8.1 发送流程

enqueue packet
↓
backoff
↓
transmit
↓
wait ACK
↓
retry or success

8.2 Backoff

random(0 ~ 2s)

8.3 ACK管理

维护:

pending_ack[seq]

超时:

ACK_TIMEOUT = airtime × ACK_TIMEOUT_FACTOR

9. 路由层(核心)

routing/gradient_routing.py


9.1 状态

self.cost
self.parent
self.neighbors{}

9.2 HELLO 处理

new_cost =
    neighbor_cost + 1 + link_penalty
link_penalty =
max(0, (RSSI_THRESHOLD - RSSI)/8)

9.3 更新条件

if new_cost < cost - 1:
    parent = sender

9.4 数据转发

if not sink:
    send to parent

10. Node 对象

node/node.py


必须实现的协程

hello_task()
data_task()
receive_task()

生命周期

start
 ↓
route convergence
 ↓
data generation
 ↓
forwarding

11. Metrics 系统

core/metrics.py

必须统计:

sent_packets
received_packets
delivery_ratio
avg_delay
avg_hop
retransmissions
collisions
convergence_time

12. 仿真执行流程

main.py


步骤:

1. 创建 env
2. 创建 Channel
3. 随机部署节点
4. 指定 sink
5. 启动节点进程
6. env.run(sim_time)
7. 输出 metrics

13. 自动测试设计(关键)

执行 AI 必须通过以下测试。


Test 1 — 路由收敛

test_convergence.py

检查:

所有节点 parent != None
cost 单调递增
无环

最大收敛时间:

< 120s

Test 2 — 数据可靠性

test_reliability.py

运行:

1000s 仿真

断言:

PDR > 0.9
avg_retry < 2

Test 3 — 碰撞存在性

test_collision.py

增加发送频率。

断言:

collision_count > 0

证明信道模型有效。


14. 可视化(可选但推荐)

输出:

topology.json

包含:

node position
parent link
cost

用于后续画图。


15. 日志规范

必须结构化:

[timestamp][node][event]

例:

12.4 NODE3 ROUTE_UPDATE parent=1 cost=4

16. 验收标准执行AI必须满足

全部通过:

  • pytest 全绿
  • 无未处理异常
  • 1000s 仿真 < 10s 真实时间
  • PDR ≥ 90%
  • 网络稳定无震荡

17. 第一阶段实施顺序(严格按序)

执行 AI 必须:

STEP 1 packet + config
STEP 2 airtime
STEP 3 channel
STEP 4 node skeleton
STEP 5 routing HELLO only
STEP 6 convergence test
STEP 7 MAC + ACK
STEP 8 data forwarding
STEP 9 metrics
STEP 10 tests

禁止跳步骤。


18. 完成标志

当以下成立:

12节点随机拓扑
↓
自动形成树
↓
稳定汇聚数据
↓
PDR > 90%

则 Python 仿真阶段完成。