from fastapi import FastAPI, HTTPException from fastapi.responses import HTMLResponse import requests import os import subprocess import sqlite3 from wakeonlan import send_magic_packet from pydantic import BaseModel from fastapi.openapi.utils import get_openapi from ping3 import ping # Database setup def init_db(): conn = sqlite3.connect("servers.db") cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS clusters ( name TEXT PRIMARY KEY, api_token TEXT NOT NULL)''') cursor.execute('''CREATE TABLE IF NOT EXISTS servers ( name TEXT PRIMARY KEY, ip TEXT NOT NULL, mac TEXT NOT NULL, cluster_name TEXT NOT NULL, FOREIGN KEY(cluster_name) REFERENCES clusters(name))''') conn.commit() conn.close() init_db() def get_server_info(server_name): conn = sqlite3.connect("servers.db") cursor = conn.cursor() cursor.execute("SELECT servers.ip, servers.mac, clusters.api_token FROM servers JOIN clusters ON servers.cluster_name = clusters.name WHERE servers.name = ?", (server_name,)) server = cursor.fetchone() conn.close() if server: return {"ip": server[0], "mac": server[1], "api_token": server[2]} return None def list_servers(): conn = sqlite3.connect("servers.db") cursor = conn.cursor() cursor.execute("SELECT servers.name, servers.ip, servers.mac, servers.cluster_name FROM servers") servers = cursor.fetchall() conn.close() return [{"name": s[0], "ip": s[1], "mac": s[2], "cluster_name": s[3]} for s in servers] def list_clusters(): conn = sqlite3.connect("servers.db") cursor = conn.cursor() cursor.execute("SELECT name FROM clusters") clusters = cursor.fetchall() conn.close() return [c[0] for c in clusters] def add_or_update_cluster(name: str, api_token: str): conn = sqlite3.connect("servers.db") cursor = conn.cursor() cursor.execute("REPLACE INTO clusters (name, api_token) VALUES (?, ?)", (name, api_token)) conn.commit() conn.close() def delete_cluster(name: str): conn = sqlite3.connect("servers.db") cursor = conn.cursor() cursor.execute("DELETE FROM clusters WHERE name = ?", (name,)) conn.commit() conn.close() def add_or_update_server(name: str, ip: str, mac: str, cluster_name: str): conn = sqlite3.connect("servers.db") cursor = conn.cursor() cursor.execute("REPLACE INTO servers (name, ip, mac, cluster_name) VALUES (?, ?, ?, ?)", (name, ip, mac, cluster_name)) conn.commit() conn.close() def delete_server(name: str): conn = sqlite3.connect("servers.db") cursor = conn.cursor() cursor.execute("DELETE FROM servers WHERE name = ?", (name,)) conn.commit() conn.close() app = FastAPI() class ServerModel(BaseModel): name: str ip: str mac: str cluster_name: str class ClusterModel(BaseModel): name: str api_token: str def get_proxmox_status(server_ip, api_token): """Fetch Proxmox server status using API token.""" headers = {"Authorization": f"PVEAPIToken={api_token}"} try: response = requests.get(f"https://{server_ip}:8006/api2/json/nodes", headers=headers, verify=False) response.raise_for_status() data = response.json() return {"status": data["data"][0]["status"]} # Extract node status except requests.RequestException as e: return {"status": "unknown", "error": str(e)} @app.get("/openapi.json") def get_openapi_spec(): """Returns the OpenAPI specification.""" return get_openapi(title=app.title, version="1.0.0", routes=app.routes) @app.get("/apidoc", response_class=HTMLResponse) def api_docs(): """Returns the API documentation using RapiDoc.""" return HTMLResponse(content=""" API Documentation """, status_code=200) @app.get("/statuses/{server_name}") def status(server_name: str): """Returns the Proxmox server status for a specific server.""" server = get_server_info(server_name) if not server: raise HTTPException(status_code=404, detail="Server not found") return get_proxmox_status(server["ip"], server["api_token"]) @app.get("/statuses") def list_all_statuses(): """Returns the statuses of all servers.""" servers = list_servers() statuses = {} for server in servers: cluster_token = get_server_info(server["name"])["api_token"] statuses[server["name"]] = get_proxmox_status(server["ip"], cluster_token) return statuses @app.get("/clusters") def get_clusters(): """Returns a list of all cluster names.""" return list_clusters() @app.post("/clusters") @app.put("/clusters") def add_cluster(cluster: ClusterModel): """Adds or updates a cluster.""" add_or_update_cluster(cluster.name, cluster.api_token) return {"message": "Cluster added/updated successfully"} @app.delete("/clusters/{cluster_name}") def remove_cluster(cluster_name: str): """Deletes a cluster.""" delete_cluster(cluster_name) return {"message": "Cluster deleted successfully"} def check_power_state(server_ip): """Check if the server is online by pinging it.""" return "on" if ping(server_ip, timeout=1) else "off" @app.get("/states/{server_name}") def get_power_status(server_name: str): """Returns the power status of a specific server.""" server = get_server_info(server_name) if not server: raise HTTPException(status_code=404, detail="Server not found") return {"power": check_power_state(server["ip"])} @app.get("/states") def list_all_states(): """Returns the power states of all servers.""" servers = list_servers() return {server["name"]: check_power_state(server["ip"]) for server in servers} @app.put("/states/{server_name}") @app.patch("/states/{server_name}") def control_power(server_name: str, state: str): """Controls the server power state (wake or shutdown).""" server = get_server_info(server_name) if not server: raise HTTPException(status_code=404, detail="Server not found") if state == "on": if check_power_state(server["ip"]) == "off": send_magic_packet(server["mac"]) return {"message": "Wake-on-LAN signal sent"} return {"message": "Server is already on"} elif state == "off": try: requests.post(f"https://{server["ip"]}:8006/api2/json/nodes/shutdown", verify=False) return {"message": "Shutdown command sent"} except requests.RequestException as e: raise HTTPException(status_code=500, detail=str(e)) else: raise HTTPException(status_code=400, detail="Invalid state. Use 'on' or 'off'") @app.get("/servers") def get_servers(): """Returns the list of registered servers.""" return list_servers() @app.post("/servers") @app.put("/servers") def add_server(server: ServerModel): """Adds or updates a server.""" add_or_update_server(server.name, server.ip, server.mac, server.cluster_name) return {"message": "Server added/updated successfully"} @app.delete("/servers/{server_name}") def remove_server(server_name: str): """Deletes a server.""" delete_server(server_name) return {"message": "Server deleted successfully"} # Run the server with: uvicorn main:app --host 0.0.0.0 --port 8000