from fastapi import APIRouter, Response from database import fetch_all from kubernetes import client, config import yaml import json import logging router = APIRouter() # Helper functions for conversions def convert_cpu_to_cores(cpu): """ Convert CPU usage to cores for human-readable format. Handles units: n (nano), u (micro), m (milli), or none (cores). Returns float values for cores, rounded appropriately. """ if "n" in cpu: # Nanocores to cores return round(int(cpu.replace("n", "")) / 1e9, 4) elif "u" in cpu: # Microcores to cores return round(int(cpu.replace("u", "")) / 1e6, 4) elif "m" in cpu: # Millicores to cores return round(int(cpu.replace("m", "")) / 1000, 4) return float(cpu) # Already in cores def convert_memory_to_mib(memory): """ Convert memory to MiB (mebibytes). Handles units: Ki (kibibytes), Mi (mebibytes), Gi (gibibytes). """ if "Ki" in memory: return int(memory.replace("Ki", "")) / 1024 elif "Mi" in memory: return int(memory.replace("Mi", "")) elif "Gi" in memory: return int(memory.replace("Gi", "")) * 1024 return float(memory) # Fetch Kubernetes data with namespace resource usage def fetch_k8s_data_with_usage(): config.load_incluster_config() v1 = client.CoreV1Api() metrics_client = client.CustomObjectsApi() # Fetch nodes nodes = [{ "node_name": node.metadata.name, "cpu": node.status.capacity.get("cpu"), "memory": round(convert_memory_to_mib(node.status.capacity.get("memory")), 2), # Convert to MiB "pods_allocatable": node.status.allocatable.get("pods"), } for node in v1.list_node().items] # Fetch namespaces namespaces = [ns.metadata.name for ns in v1.list_namespace().items] # Fetch pod metrics and calculate namespace resource usage namespace_usage = {} pod_metrics = metrics_client.list_cluster_custom_object( group="metrics.k8s.io", version="v1beta1", plural="pods" ) for pod in pod_metrics["items"]: pod_namespace = pod["metadata"]["namespace"] if pod_namespace not in namespace_usage: namespace_usage[pod_namespace] = {"cpu": 0, "memory": 0} for container in pod["containers"]: cpu_usage = container["usage"]["cpu"] memory_usage = container["usage"]["memory"] # Convert CPU to cores and memory to MiB namespace_usage[pod_namespace]["cpu"] += convert_cpu_to_cores(cpu_usage) namespace_usage[pod_namespace]["memory"] += convert_memory_to_mib(memory_usage) # Round and format usage for readability namespace_usage = { ns: { "cpu": round(usage["cpu"], 4), # Round to 4 decimal places "memory": round(usage["memory"], 2), # Memory in MiB } for ns, usage in namespace_usage.items() } return {"nodes": nodes, "namespaces": namespaces, "namespace_usage": namespace_usage} # Export endpoint @router.get("/export") def export_data(format: str = "yaml"): data = { "metal_nodes": fetch_all("metal_nodes"), "virtual_machines": fetch_all("virtual_machines"), "kubernetes": fetch_k8s_data_with_usage(), } if format.lower() == "yaml": yaml_data = yaml.safe_dump(data, sort_keys=False) return Response(content=yaml_data, media_type="text/yaml") return Response(content=json.dumps(data, indent=2), media_type="application/json")