94 lines
2.5 KiB
Python
94 lines
2.5 KiB
Python
"""
|
|
Topology Analysis Tools.
|
|
|
|
Functions for analyzing and exporting network topology.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from typing import List, Dict, Any
|
|
|
|
|
|
def export_topology_json(
|
|
nodes: List[Any], filepath: str = "analysis/topology_export.json"
|
|
):
|
|
"""
|
|
Export topology to JSON file.
|
|
|
|
Args:
|
|
nodes: List of Node objects
|
|
filepath: Output file path
|
|
"""
|
|
topology = {"nodes": []}
|
|
|
|
for node in nodes:
|
|
node_info = {
|
|
"id": node.node_id,
|
|
"x": round(node.x, 2),
|
|
"y": round(node.y, 2),
|
|
"cost": int(node.routing.cost) if node.routing.cost != float("inf") else -1,
|
|
"parent": node.routing.parent,
|
|
"is_sink": node.is_sink,
|
|
}
|
|
topology["nodes"].append(node_info)
|
|
|
|
# Ensure directory exists
|
|
os.makedirs(os.path.dirname(filepath), exist_ok=True)
|
|
|
|
with open(filepath, "w") as f:
|
|
json.dump(topology, f, indent=2)
|
|
|
|
return topology
|
|
|
|
|
|
def analyze_parent_tree(nodes: List[Any]) -> Dict[str, Any]:
|
|
"""
|
|
Analyze the parent tree structure.
|
|
|
|
Returns:
|
|
Dictionary with tree analysis
|
|
"""
|
|
# Build parent map
|
|
parent_map = {}
|
|
for node in nodes:
|
|
if node.routing.parent is not None:
|
|
parent_map[node.node_id] = node.routing.parent
|
|
|
|
# Count children per node
|
|
children_count = {}
|
|
for node_id, parent_id in parent_map.items():
|
|
if parent_id not in children_count:
|
|
children_count[parent_id] = 0
|
|
children_count[parent_id] += 1
|
|
|
|
# Find root (sink)
|
|
sink = next((n for n in nodes if n.is_sink), None)
|
|
|
|
return {
|
|
"parent_map": parent_map,
|
|
"children_count": children_count,
|
|
"sink_id": sink.node_id if sink else None,
|
|
"total_links": len(parent_map),
|
|
}
|
|
|
|
|
|
def find_unreachable_nodes(nodes: List[Any]) -> List[int]:
|
|
"""Find nodes without a valid route to sink."""
|
|
unreachable = []
|
|
for node in nodes:
|
|
if not node.is_sink:
|
|
if node.routing.parent is None or node.routing.cost == float("inf"):
|
|
unreachable.append(node.node_id)
|
|
return unreachable
|
|
|
|
|
|
def calculate_hop_distribution(nodes: List[Any]) -> Dict[int, int]:
|
|
"""Calculate hop count distribution."""
|
|
hop_dist = {}
|
|
for node in nodes:
|
|
if not node.is_sink:
|
|
cost = int(node.routing.cost) if node.routing.cost != float("inf") else -1
|
|
if cost >= 0:
|
|
hop_dist[cost] = hop_dist.get(cost, 0) + 1
|
|
return hop_dist
|