from fastapi import FastAPI, HTTPException, Request from fastapi.responses import HTMLResponse, JSONResponse import requests import os import subprocess import sqlite3 from wakeonlan import send_magic_packet from pydantic import BaseModel from fastapi.openapi.utils import get_openapi from ping3 import ping # Database setup def init_db(): conn = sqlite3.connect("servers.db") cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS clusters ( name TEXT PRIMARY KEY, api_token TEXT NOT NULL)''') cursor.execute('''CREATE TABLE IF NOT EXISTS servers ( name TEXT PRIMARY KEY, ip TEXT NOT NULL, mac TEXT NOT NULL, cluster_name TEXT NOT NULL, FOREIGN KEY(cluster_name) REFERENCES clusters(name))''') conn.commit() conn.close() init_db() def execute_query(query, params=()): try: conn = sqlite3.connect("servers.db") cursor = conn.cursor() cursor.execute(query, params) conn.commit() conn.close() except sqlite3.Error as e: raise DatabaseError(str(e)) def fetch_query(query, params=()): try: conn = sqlite3.connect("servers.db") cursor = conn.cursor() cursor.execute(query, params) result = cursor.fetchall() conn.close() return result except sqlite3.Error as e: raise DatabaseError(str(e)) def get_server_info(server_name): result = fetch_query( "SELECT servers.ip, servers.mac, clusters.api_token FROM servers JOIN clusters ON servers.cluster_name = clusters.name WHERE servers.name = ?", (server_name,) ) if result: return {"ip": result[0][0], "mac": result[0][1], "api_token": result[0][2]} return None def list_servers(): servers = fetch_query( "SELECT servers.name, servers.ip, servers.mac, servers.cluster_name FROM servers" ) if servers: return [{"name": s[0], "ip": s[1], "mac": s[2], "cluster_name": s[3]} for s in servers] return None def list_clusters(): clusters = fetch_query("SELECT name FROM clusters") if clusters: return [c[0] for c in clusters] return None def add_or_update_cluster(name: str, api_token: str): execute_query("REPLACE INTO clusters (name, api_token) VALUES (?, ?)", (name, api_token)) def delete_cluster(name: str): execute_query("DELETE FROM clusters WHERE name = ?", (name,)) def add_or_update_server(name: str, ip: str, mac: str, cluster_name: str): execute_query("REPLACE INTO servers (name, ip, mac, cluster_name) VALUES (?, ?, ?, ?)", (name, ip, mac, cluster_name)) def delete_server(name: str): execute_query("DELETE FROM servers WHERE name = ?", (name,)) app = FastAPI() class DatabaseError(Exception): """Custom exception for database errors.""" def __init__(self, message: str): self.message = message class ProxmoxAPIError(Exception): """Custom exception for Proxmox API errors.""" def __init__(self, message: str): self.message = message @app.exception_handler(DatabaseError) def database_error_handler(request: Request, exc: DatabaseError): return JSONResponse( status_code=500, content={"error": "Database Error", "message": exc.message}, ) @app.exception_handler(ProxmoxAPIError) def proxmox_api_error_handler(request: Request, exc: ProxmoxAPIError): return JSONResponse( status_code=500, content={"error": "Proxmox API Error", "message": exc.message}, ) @app.exception_handler(Exception) def generic_exception_handler(request: Request, exc: Exception): return JSONResponse( status_code=500, content={"error": "Internal Server Error", "message": str(exc)}, ) class ServerModel(BaseModel): name: str ip: str mac: str cluster_name: str class ClusterModel(BaseModel): name: str api_token: str def get_proxmox_status(server_ip, api_token): """Fetch Proxmox server status using API token.""" headers = {"Authorization": f"PVEAPIToken={api_token}"} try: response = requests.get(f"https://{server_ip}:8006/api2/json/nodes", headers=headers, verify=False) response.raise_for_status() data = response.json() return {"status": data["data"][0]["status"]} # Extract node status except requests.exceptions.Timeout: raise ProxmoxAPIError("Connection to Proxmox server timed out") except requests.exceptions.ConnectionError: raise ProxmoxAPIError("Failed to connect to Proxmox server") except requests.exceptions.RequestException as e: raise ProxmoxAPIError(str(e)) @app.get("/openapi.json") def get_openapi_spec(): """Returns the OpenAPI specification.""" return get_openapi(title=app.title, version="1.0.0", routes=app.routes) @app.get("/apidoc", response_class=HTMLResponse) def api_docs(): """Returns the API documentation using RapiDoc.""" return HTMLResponse(content="""