import json import os import time import requests import uuid import socket class HeadUnit: def __init__(self, server_url="http://localhost:3000", config_path="firmware/config/device.json"): self.server_url = server_url self.config_path = config_path self.config = self.load_config() self.mac_address = self.get_mac() self.serial_number = self.config.get("serial_number", str(uuid.uuid4())[:8].upper()) self.mode = self.config.get("mode", "DEV") # DEV or PROD self.active_slot = self.config.get("active_slot", "A") self.status = self.config.get("status", "PENDING") self.save_config() def get_mac(self): # Simulator MAC return ":".join(["%02x" % (uuid.getnode() >> ele & 0xff) for ele in range(0, 8*6, 8)][::-1]) def load_config(self): if os.path.exists(self.config_path): with open(self.config_path, 'r') as f: return json.load(f) return {} def save_config(self): os.makedirs(os.path.dirname(self.config_path), exist_ok=True) self.config = { "serial_number": self.serial_number, "mode": self.mode, "active_slot": self.active_slot, "status": self.status } with open(self.config_path, 'w') as f: json.dump(self.config, f, indent=4) def register(self): print(f"[*] Registering device {self.serial_number} ({self.mac_address})...") try: resp = requests.post(f"{self.server_url}/api/devices/register", json={ "serialNumber": self.serial_number, "macAddress": self.mac_address }) if resp.status_code == 200: data = resp.json() self.status = data.get("status") self.save_config() print(f"[+] Registration successful. Status: {self.status}") return True else: print(f"[-] Registration failed: {resp.text}") except Exception as e: print(f"[-] Connection error: {e}") return False def send_heartbeat(self): # We'll use the report endpoint for heartbeats since it updates lastHeartbeat pass def report_test(self, test_data): if self.status != "ENROLLED": print("[-] Device not enrolled. Cannot report tests.") return print(f"[*] Reporting test result from Slot {self.active_slot} ({self.mode} mode)...") payload = { "serialNumber": self.serial_number, **test_data } try: resp = requests.post(f"{self.server_url}/api/tests/report", json=payload) if resp.status_code == 200: print("[+] Test reported successfully.") else: print(f"[-] Test report failed: {resp.text}") except Exception as e: print(f"[-] Connection error: {e}") def switch_mode(self, new_mode): self.mode = new_mode self.save_config() print(f"[*] Switched to {self.mode} mode.") def switch_slot(self): self.active_slot = "B" if self.active_slot == "A" else "A" self.save_config() print(f"[*] Switched to Slot {self.active_slot}.") def generate_mock_hdmi_test(): return { "type": "HDMI_OFFLINE", "status": "PASS", "hdmi5v": False, "summary": "Offline diode check completed. All lines within tolerance.", "diodeResults": { "DDC_SCL": 0.542, "DDC_SDA": 0.544, "CEC": 0.612, "HPD": 0.589, "TMDS_CLK+": 0.498, "TMDS_CLK-": 0.497 } } if __name__ == "__main__": import sys hu = HeadUnit() if len(sys.argv) < 2: print("Usage: python simulator.py [register|test|mode-dev|mode-prod|swap|loop]") sys.exit(1) cmd = sys.argv[1] if cmd == "register": hu.register() elif cmd == "test": hu.report_test(generate_mock_hdmi_test()) elif cmd == "mode-dev": hu.switch_mode("DEV") elif cmd == "mode-prod": hu.switch_mode("PROD") elif cmd == "swap": hu.switch_slot() elif cmd == "loop": print("[*] Starting simulator loop. Ctrl+C to exit.") while True: hu.register() if hu.status == "ENROLLED": hu.report_test(generate_mock_hdmi_test()) time.sleep(10) else: print("Unknown command.")