import json import os import time import requests import uuid import socket class PortTester: def __init__(self, server_url="http://localhost:3000", config_path="firmware/config/device.json"): self.server_url = server_url self.config_path = config_path self.config = self.load_config() self.mac_address = self.get_mac() self.serial_number = self.config.get("serial_number", str(uuid.uuid4())[:8].upper()) self.mode = self.config.get("mode", "DEV") # DEV or PROD self.active_slot = self.config.get("active_slot", "A") self.status = self.config.get("status", "PENDING") self.save_config() def get_mac(self): # Simulator MAC return ":".join(["%02x" % (uuid.getnode() >> ele & 0xff) for ele in range(0, 8*6, 8)][::-1]) def load_config(self): if os.path.exists(self.config_path): with open(self.config_path, 'r') as f: return json.load(f) return {} def save_config(self): os.makedirs(os.path.dirname(self.config_path), exist_ok=True) self.config = { "serial_number": self.serial_number, "mode": self.mode, "active_slot": self.active_slot, "status": self.status, "name": self.config.get("name", "Unnamed Device") } with open(self.config_path, 'w') as f: json.dump(self.config, f, indent=4) def register(self): print(f"[*] Synchronising with mesh orchestration... ({self.serial_number})") try: resp = requests.post(f"{self.server_url}/api/devices/register", json={ "serialNumber": self.serial_number, "macAddress": self.mac_address, "activeSlot": self.active_slot, "name": self.config.get("name", "Unnamed Device") }) if resp.status_code == 200: data = resp.json() self.status = data.get("status") # Adopt remote configuration remote_slot = data.get("activeSlot") if remote_slot and remote_slot != self.active_slot: print(f"[!] Remote slot override: Switching {self.active_slot} -> {remote_slot}") self.active_slot = remote_slot remote_name = data.get("name") if remote_name: self.config["name"] = remote_name self.save_config() print(f"[+] Sync successful. Slot: {self.active_slot} | Status: {self.status}") if self.status == "OFFLINE": print("[!] Remote instruction: System decommissioned. Shutting down...") return "SHUTDOWN" return True else: print(f"[-] Synchronisation failed: {resp.text}") except Exception as e: print(f"[-] Connection error: {e}") return False def send_heartbeat(self): # We'll use the report endpoint for heartbeats since it updates lastHeartbeat pass def report_test(self, test_data): if self.status != "ENROLLED": print("[-] Device not enrolled. Cannot report tests.") return print(f"[*] Reporting test result from Slot {self.active_slot} ({self.mode} mode)...") payload = { "serialNumber": self.serial_number, **test_data } try: resp = requests.post(f"{self.server_url}/api/tests/report", json=payload) if resp.status_code == 200: print("[+] Test reported successfully.") else: print(f"[-] Test report failed: {resp.text}") except Exception as e: print(f"[-] Connection error: {e}") def switch_mode(self, new_mode): self.mode = new_mode self.save_config() print(f"[*] Switched to {self.mode} mode.") def switch_slot(self): self.active_slot = "B" if self.active_slot == "A" else "A" self.save_config() print(f"[*] Switched to Slot {self.active_slot}.") def generate_console_port_test(): """Generates mock spectral data for a console HDMI port.""" pins = ["DDC_SCL", "DDC_SDA", "CEC", "HPD", "TMDS_CLK+", "TMDS_CLK-"] results = {pin: round(0.400 + (random.random() * 0.2), 3) for pin in pins} return { "type": "PORT_DIAGNOSTIC", "status": "PASS", "hdmi5v": True, "summary": "Port handshake confirmed. Pin continuity within spec.", "diodeResults": results } if __name__ == "__main__": hu = PortTester() def print_menu(): print("\n" + "="*45) print(f" HDMI PORT DIAGNOSTICS - HARDWARE SIMULATOR") print("="*45) print(f" Status: [{hu.status}]") print(f" Mode: [{hu.mode}]") print(f" Active: [Slot {hu.active_slot}]") print(f" Node: [{hu.serial_number}]") print("-" * 45) print(" 1. Register / Sync Port Diagnostics") print(" 2. Run Console Port Sweep") print(" 3. Switch to DEV Mode") print(" 4. Switch to PROD Mode") print(" 5. Swap Active Slot (A/B)") print(" 6. Start Automated Loop") print(" 7. Request Re-enlistment") print(" 0. Power Down Node") print("-" * 45) while True: print_menu() choice = input("Select operation [0-7] >> ").strip() if choice == "1": hu.register() elif choice == "2": hu.report_test(generate_console_port_test()) elif choice == "3": hu.switch_mode("DEV") elif choice == "4": hu.switch_mode("PROD") elif choice == "5": hu.switch_slot() elif choice == "6": print("\n[*] Entering automated telemetry loop. Press Ctrl+C to return to menu.") try: while True: result = hu.register() if result == "SHUTDOWN": print("[!] System shutdown confirmed. Exiting loop.") break if hu.status == "ENROLLED": hu.report_test(generate_console_port_test()) time.sleep(5) if hu.status == "OFFLINE": break except KeyboardInterrupt: print("\n[!] Loop interrupted by user.") elif choice == "7": hu.status = "PENDING" hu.register() elif choice == "0": print("[*] Powering down head unit...") break else: print("[!] Invalid selection.")