186 lines
6.7 KiB
Python
186 lines
6.7 KiB
Python
import json
|
|
import os
|
|
import time
|
|
import requests
|
|
import uuid
|
|
import socket
|
|
|
|
class PortTester:
|
|
def __init__(self, server_url="http://localhost:3000", config_path="firmware/config/device.json"):
|
|
self.server_url = server_url
|
|
self.config_path = config_path
|
|
self.config = self.load_config()
|
|
self.mac_address = self.get_mac()
|
|
self.serial_number = self.config.get("serial_number", str(uuid.uuid4())[:8].upper())
|
|
self.mode = self.config.get("mode", "DEV") # DEV or PROD
|
|
self.active_slot = self.config.get("active_slot", "A")
|
|
self.status = self.config.get("status", "PENDING")
|
|
self.save_config()
|
|
|
|
def get_mac(self):
|
|
# Simulator MAC
|
|
return ":".join(["%02x" % (uuid.getnode() >> ele & 0xff) for ele in range(0, 8*6, 8)][::-1])
|
|
|
|
def load_config(self):
|
|
if os.path.exists(self.config_path):
|
|
with open(self.config_path, 'r') as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
def save_config(self):
|
|
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
|
|
self.config = {
|
|
"serial_number": self.serial_number,
|
|
"mode": self.mode,
|
|
"active_slot": self.active_slot,
|
|
"status": self.status,
|
|
"name": self.config.get("name", "Unnamed Device")
|
|
}
|
|
with open(self.config_path, 'w') as f:
|
|
json.dump(self.config, f, indent=4)
|
|
|
|
def register(self):
|
|
print(f"[*] Synchronising with mesh orchestration... ({self.serial_number})")
|
|
try:
|
|
resp = requests.post(f"{self.server_url}/api/devices/register", json={
|
|
"serialNumber": self.serial_number,
|
|
"macAddress": self.mac_address,
|
|
"activeSlot": self.active_slot,
|
|
"name": self.config.get("name", "Unnamed Device")
|
|
})
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
self.status = data.get("status")
|
|
|
|
# Adopt remote configuration
|
|
remote_slot = data.get("activeSlot")
|
|
if remote_slot and remote_slot != self.active_slot:
|
|
print(f"[!] Remote slot override: Switching {self.active_slot} -> {remote_slot}")
|
|
self.active_slot = remote_slot
|
|
|
|
remote_name = data.get("name")
|
|
if remote_name:
|
|
self.config["name"] = remote_name
|
|
|
|
self.save_config()
|
|
print(f"[+] Sync successful. Slot: {self.active_slot} | Status: {self.status}")
|
|
|
|
if self.status == "OFFLINE":
|
|
print("[!] Remote instruction: System decommissioned. Shutting down...")
|
|
return "SHUTDOWN"
|
|
|
|
return True
|
|
else:
|
|
print(f"[-] Synchronisation failed: {resp.text}")
|
|
except Exception as e:
|
|
print(f"[-] Connection error: {e}")
|
|
return False
|
|
|
|
def send_heartbeat(self):
|
|
# We'll use the report endpoint for heartbeats since it updates lastHeartbeat
|
|
pass
|
|
|
|
def report_test(self, test_data):
|
|
if self.status != "ENROLLED":
|
|
print("[-] Device not enrolled. Cannot report tests.")
|
|
return
|
|
|
|
print(f"[*] Reporting test result from Slot {self.active_slot} ({self.mode} mode)...")
|
|
payload = {
|
|
"serialNumber": self.serial_number,
|
|
**test_data
|
|
}
|
|
try:
|
|
resp = requests.post(f"{self.server_url}/api/tests/report", json=payload)
|
|
if resp.status_code == 200:
|
|
print("[+] Test reported successfully.")
|
|
else:
|
|
print(f"[-] Test report failed: {resp.text}")
|
|
except Exception as e:
|
|
print(f"[-] Connection error: {e}")
|
|
|
|
def switch_mode(self, new_mode):
|
|
self.mode = new_mode
|
|
self.save_config()
|
|
print(f"[*] Switched to {self.mode} mode.")
|
|
|
|
def switch_slot(self):
|
|
self.active_slot = "B" if self.active_slot == "A" else "A"
|
|
self.save_config()
|
|
print(f"[*] Switched to Slot {self.active_slot}.")
|
|
|
|
def generate_console_port_test():
|
|
"""Generates mock spectral data for a console HDMI port."""
|
|
pins = ["DDC_SCL", "DDC_SDA", "CEC", "HPD", "TMDS_CLK+", "TMDS_CLK-"]
|
|
results = {pin: round(0.400 + (random.random() * 0.2), 3) for pin in pins}
|
|
|
|
return {
|
|
"type": "PORT_DIAGNOSTIC",
|
|
"status": "PASS",
|
|
"hdmi5v": True,
|
|
"summary": "Port handshake confirmed. Pin continuity within spec.",
|
|
"diodeResults": results
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
hu = PortTester()
|
|
|
|
def print_menu():
|
|
print("\n" + "="*45)
|
|
print(f" HDMI PORT DIAGNOSTICS - HARDWARE SIMULATOR")
|
|
print("="*45)
|
|
print(f" Status: [{hu.status}]")
|
|
print(f" Mode: [{hu.mode}]")
|
|
print(f" Active: [Slot {hu.active_slot}]")
|
|
print(f" Node: [{hu.serial_number}]")
|
|
print("-" * 45)
|
|
print(" 1. Register / Sync Port Diagnostics")
|
|
print(" 2. Run Console Port Sweep")
|
|
print(" 3. Switch to DEV Mode")
|
|
print(" 4. Switch to PROD Mode")
|
|
print(" 5. Swap Active Slot (A/B)")
|
|
print(" 6. Start Automated Loop")
|
|
print(" 7. Request Re-enlistment")
|
|
print(" 0. Power Down Node")
|
|
print("-" * 45)
|
|
|
|
while True:
|
|
print_menu()
|
|
choice = input("Select operation [0-7] >> ").strip()
|
|
|
|
if choice == "1":
|
|
hu.register()
|
|
elif choice == "2":
|
|
hu.report_test(generate_console_port_test())
|
|
elif choice == "3":
|
|
hu.switch_mode("DEV")
|
|
elif choice == "4":
|
|
hu.switch_mode("PROD")
|
|
elif choice == "5":
|
|
hu.switch_slot()
|
|
elif choice == "6":
|
|
print("\n[*] Entering automated telemetry loop. Press Ctrl+C to return to menu.")
|
|
try:
|
|
while True:
|
|
result = hu.register()
|
|
if result == "SHUTDOWN":
|
|
print("[!] System shutdown confirmed. Exiting loop.")
|
|
break
|
|
|
|
if hu.status == "ENROLLED":
|
|
hu.report_test(generate_console_port_test())
|
|
time.sleep(5)
|
|
|
|
if hu.status == "OFFLINE":
|
|
break
|
|
except KeyboardInterrupt:
|
|
print("\n[!] Loop interrupted by user.")
|
|
elif choice == "7":
|
|
hu.status = "PENDING"
|
|
hu.register()
|
|
elif choice == "0":
|
|
print("[*] Powering down head unit...")
|
|
break
|
|
else:
|
|
print("[!] Invalid selection.")
|