下面是**可直接交给执行 AI 的 `plan.md`**。 目标是:**严格工程化、可自动实现、可自动测试、可验证正确性**,避免模糊描述。 该计划默认: * Python ≥ 3.10 * 使用 **SimPy** * 面向未来 STM32WL 移植(协议一致) --- # `plan.md` --- # STM32WL SubGHz 多跳汇聚网络 ## SimPy 离散事件仿真工程计划 --- # 1. 项目目标 构建一个 **协议级一致的离散事件仿真系统**,用于验证: * 多节点 → 单 Sink 汇聚 * 多跳转发 * ACK可靠传输 * 路由自动收敛 * LoRa 信道冲突行为 仿真代码必须满足: ``` routing logic 可直接迁移到 STM32 C 状态机 ``` --- # 2. 总体架构 采用 **事件驱动 + 分层设计**。 ``` sim/ │ ├── main.py ├── config.py │ ├── core/ │ ├── packet.py │ ├── event.py │ └── metrics.py │ ├── radio/ │ ├── channel.py │ ├── propagation.py │ └── airtime.py │ ├── mac/ │ └── reliable_mac.py │ ├── routing/ │ └── gradient_routing.py │ ├── node/ │ └── node.py │ └── tests/ ├── test_convergence.py ├── test_reliability.py └── test_collision.py ``` --- # 3. 核心设计原则(必须遵守) ## 3.1 禁止直接调用 模块之间只能通过接口通信: ``` Node → MAC → Radio → Channel ``` 禁止: ``` Node 直接访问 Channel ``` --- ## 3.2 状态机驱动 所有行为必须: ``` event-driven ``` 禁止: * while True busy loop * sleep() 只能: ``` yield env.timeout() ``` --- ## 3.3 STM32一致性约束(重要) 函数结构必须未来可映射: | Python | STM32 | | ------------- | ---------- | | on_receive() | OnRxDone | | send_packet() | Radio.Send | | timeout_event | UTIL_TIMER | --- # 4. 配置系统 `config.py` 必须集中定义: ```python NODE_COUNT = 12 AREA_SIZE = 800 HELLO_PERIOD = 8.0 DATA_PERIOD = 30.0 TX_POWER = 14 RSSI_THRESHOLD = -105 ACK_TIMEOUT_FACTOR = 2.5 MAX_RETRY = 3 SF = 9 BW = 125000 CR = 5 ``` 禁止硬编码参数。 --- # 5. Packet 模型 `core/packet.py` ```python class Packet: type src dst seq hop payload ``` 类型: ``` HELLO = 1 DATA = 2 ACK = 3 ``` --- # 6. 信道模型 ## 6.1 Channel (`radio/channel.py`) 职责: * 广播传播 * airtime占用 * 碰撞检测 接口: ```python transmit(packet, sender) ``` --- ## 6.2 碰撞规则 发生碰撞条件: ``` time overlap AND |RSSI1 - RSSI2| < 6 dB ``` 结果: ``` 全部丢弃 ``` --- ## 6.3 传播模型 `propagation.py` ```python RSSI = TX_POWER - 10*n*log10(distance) + gaussian_noise ``` 参数: ``` n = 2.7 noise σ = 3 ``` --- # 7. Airtime 计算 `radio/airtime.py` 必须实现真实 LoRa airtime 公式。 输出: ``` seconds ``` 用于: ``` ACK timeout channel busy time energy calculation ``` --- # 8. MAC 层 `mac/reliable_mac.py` --- ## 8.1 发送流程 ``` enqueue packet ↓ backoff ↓ transmit ↓ wait ACK ↓ retry or success ``` --- ## 8.2 Backoff ``` random(0 ~ 2s) ``` --- ## 8.3 ACK管理 维护: ``` pending_ack[seq] ``` 超时: ``` ACK_TIMEOUT = airtime × ACK_TIMEOUT_FACTOR ``` --- # 9. 路由层(核心) `routing/gradient_routing.py` --- ## 9.1 状态 ```python self.cost self.parent self.neighbors{} ``` --- ## 9.2 HELLO 处理 ``` new_cost = neighbor_cost + 1 + link_penalty ``` ``` link_penalty = max(0, (RSSI_THRESHOLD - RSSI)/8) ``` --- ## 9.3 更新条件 ``` if new_cost < cost - 1: parent = sender ``` --- ## 9.4 数据转发 ``` if not sink: send to parent ``` --- # 10. Node 对象 `node/node.py` --- ## 必须实现的协程 ```python hello_task() data_task() receive_task() ``` --- ## 生命周期 ``` start ↓ route convergence ↓ data generation ↓ forwarding ``` --- # 11. Metrics 系统 `core/metrics.py` 必须统计: ``` sent_packets received_packets delivery_ratio avg_delay avg_hop retransmissions collisions convergence_time ``` --- # 12. 仿真执行流程 `main.py` --- 步骤: ``` 1. 创建 env 2. 创建 Channel 3. 随机部署节点 4. 指定 sink 5. 启动节点进程 6. env.run(sim_time) 7. 输出 metrics ``` --- # 13. 自动测试设计(关键) 执行 AI 必须通过以下测试。 --- ## Test 1 — 路由收敛 `test_convergence.py` 检查: ``` 所有节点 parent != None cost 单调递增 无环 ``` 最大收敛时间: ``` < 120s ``` --- ## Test 2 — 数据可靠性 `test_reliability.py` 运行: ``` 1000s 仿真 ``` 断言: ``` PDR > 0.9 avg_retry < 2 ``` --- ## Test 3 — 碰撞存在性 `test_collision.py` 增加发送频率。 断言: ``` collision_count > 0 ``` 证明信道模型有效。 --- # 14. 可视化(可选但推荐) 输出: ``` topology.json ``` 包含: ``` node position parent link cost ``` 用于后续画图。 --- # 15. 日志规范 必须结构化: ``` [timestamp][node][event] ``` 例: ``` 12.4 NODE3 ROUTE_UPDATE parent=1 cost=4 ``` --- # 16. 验收标准(执行AI必须满足) 全部通过: * [ ] pytest 全绿 * [ ] 无未处理异常 * [ ] 1000s 仿真 < 10s 真实时间 * [ ] PDR ≥ 90% * [ ] 网络稳定无震荡 --- # 17. 第一阶段实施顺序(严格按序) 执行 AI 必须: ``` STEP 1 packet + config STEP 2 airtime STEP 3 channel STEP 4 node skeleton STEP 5 routing HELLO only STEP 6 convergence test STEP 7 MAC + ACK STEP 8 data forwarding STEP 9 metrics STEP 10 tests ``` 禁止跳步骤。 --- # 18. 完成标志 当以下成立: ``` 12节点随机拓扑 ↓ 自动形成树 ↓ 稳定汇聚数据 ↓ PDR > 90% ``` 则 Python 仿真阶段完成。