Files
HDMI-Tester/firmware/simulator.py

189 lines
6.6 KiB
Python

import json
import os
import time
import requests
import uuid
import socket
class HeadUnit:
def __init__(self, server_url="http://localhost:3000", config_path="firmware/config/device.json"):
self.server_url = server_url
self.config_path = config_path
self.config = self.load_config()
self.mac_address = self.get_mac()
self.serial_number = self.config.get("serial_number", str(uuid.uuid4())[:8].upper())
self.mode = self.config.get("mode", "DEV") # DEV or PROD
self.active_slot = self.config.get("active_slot", "A")
self.status = self.config.get("status", "PENDING")
self.save_config()
def get_mac(self):
# Simulator MAC
return ":".join(["%02x" % (uuid.getnode() >> ele & 0xff) for ele in range(0, 8*6, 8)][::-1])
def load_config(self):
if os.path.exists(self.config_path):
with open(self.config_path, 'r') as f:
return json.load(f)
return {}
def save_config(self):
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
self.config = {
"serial_number": self.serial_number,
"mode": self.mode,
"active_slot": self.active_slot,
"status": self.status,
"name": self.config.get("name", "Unnamed Device")
}
with open(self.config_path, 'w') as f:
json.dump(self.config, f, indent=4)
def register(self):
print(f"[*] Synchronising with mesh orchestration... ({self.serial_number})")
try:
resp = requests.post(f"{self.server_url}/api/devices/register", json={
"serialNumber": self.serial_number,
"macAddress": self.mac_address,
"activeSlot": self.active_slot,
"name": self.config.get("name", "Unnamed Device")
})
if resp.status_code == 200:
data = resp.json()
self.status = data.get("status")
# Adopt remote configuration
remote_slot = data.get("activeSlot")
if remote_slot and remote_slot != self.active_slot:
print(f"[!] Remote slot override: Switching {self.active_slot} -> {remote_slot}")
self.active_slot = remote_slot
remote_name = data.get("name")
if remote_name:
self.config["name"] = remote_name
self.save_config()
print(f"[+] Sync successful. Slot: {self.active_slot} | Status: {self.status}")
if self.status == "OFFLINE":
print("[!] Remote instruction: System decommissioned. Shutting down...")
return "SHUTDOWN"
return True
else:
print(f"[-] Synchronisation failed: {resp.text}")
except Exception as e:
print(f"[-] Connection error: {e}")
return False
def send_heartbeat(self):
# We'll use the report endpoint for heartbeats since it updates lastHeartbeat
pass
def report_test(self, test_data):
if self.status != "ENROLLED":
print("[-] Device not enrolled. Cannot report tests.")
return
print(f"[*] Reporting test result from Slot {self.active_slot} ({self.mode} mode)...")
payload = {
"serialNumber": self.serial_number,
**test_data
}
try:
resp = requests.post(f"{self.server_url}/api/tests/report", json=payload)
if resp.status_code == 200:
print("[+] Test reported successfully.")
else:
print(f"[-] Test report failed: {resp.text}")
except Exception as e:
print(f"[-] Connection error: {e}")
def switch_mode(self, new_mode):
self.mode = new_mode
self.save_config()
print(f"[*] Switched to {self.mode} mode.")
def switch_slot(self):
self.active_slot = "B" if self.active_slot == "A" else "A"
self.save_config()
print(f"[*] Switched to Slot {self.active_slot}.")
def generate_mock_hdmi_test():
return {
"type": "HDMI_OFFLINE",
"status": "PASS",
"hdmi5v": False,
"summary": "Offline diode check completed. All lines within tolerance.",
"diodeResults": {
"DDC_SCL": 0.542,
"DDC_SDA": 0.544,
"CEC": 0.612,
"HPD": 0.589,
"TMDS_CLK+": 0.498,
"TMDS_CLK-": 0.497
}
}
if __name__ == "__main__":
hu = HeadUnit()
def print_menu():
print("\n" + "="*45)
print(f" HDMI TESTER - HARDWARE SIMULATOR (v1.2.0)")
print("="*45)
print(f" Status: [{hu.status}]")
print(f" Mode: [{hu.mode}]")
print(f" Active: [Slot {hu.active_slot}]")
print(f" Serial: [{hu.serial_number}]")
print("-" * 45)
print(" 1. Register / Sync Device")
print(" 2. Report Mock HDMI Test")
print(" 3. Switch to DEV Mode")
print(" 4. Switch to PROD Mode")
print(" 5. Swap Active Slot (A/B)")
print(" 6. Start Automated Loop")
print(" 7. Request Re-enlistment")
print(" 0. Exit Orchestrator")
print("-" * 45)
while True:
print_menu()
choice = input("Select operation [0-7] >> ").strip()
if choice == "1":
hu.register()
elif choice == "2":
hu.report_test(generate_mock_hdmi_test())
elif choice == "3":
hu.switch_mode("DEV")
elif choice == "4":
hu.switch_mode("PROD")
elif choice == "5":
hu.switch_slot()
elif choice == "6":
print("\n[*] Entering automated telemetry loop. Press Ctrl+C to return to menu.")
try:
while True:
result = hu.register()
if result == "SHUTDOWN":
print("[!] System shutdown confirmed. Exiting loop.")
break
if hu.status == "ENROLLED":
hu.report_test(generate_mock_hdmi_test())
time.sleep(5)
if hu.status == "OFFLINE":
break
except KeyboardInterrupt:
print("\n[!] Loop interrupted by user.")
elif choice == "7":
hu.status = "PENDING"
hu.register()
elif choice == "0":
print("[*] Powering down head unit...")
break
else:
print("[!] Invalid selection.")