Files
metalcheck/app/main.py
Aleksandr Tcitlionok acef6cb9a1
Some checks failed
CI Pipeline / Build and Push Docker Image (push) Failing after 2m48s
update(app): add helm chart and cleanup repo
2024-12-11 09:05:15 +00:00

85 lines
2.5 KiB
Python

import json
import os
import logging
from fastapi import FastAPI
from database import init_db, insert_metal_node, insert_virtual_machine
from fastapi.middleware.cors import CORSMiddleware
from routes import metal, vm, k8s, export, think
from pydantic import BaseModel, ValidationError
class MetalNode(BaseModel):
name: str
location: str
vendor: str
cpu: int
memory: str
storage: str
time_on_duty: int
initial_cost: float
class VirtualMachine(BaseModel):
name: str
location: str
cpu: int
memory: str
storage: str
vm_type: str
app = FastAPI()
METAL_JSON = "data/metal.json"
VM_JSON = "data/vm.json"
logger = logging.getLogger("uvicorn")
@app.on_event("startup")
async def startup_event():
init_db()
if os.path.exists(METAL_JSON):
with open(METAL_JSON, "r") as file:
metal_nodes = json.load(file)
for node in metal_nodes:
try:
validated_node = MetalNode(**node)
insert_metal_node(
name=validated_node.name,
location=validated_node.location,
vendor=validated_node.vendor,
cpu=validated_node.cpu,
memory=validated_node.memory,
storage=validated_node.storage,
time_on_duty=validated_node.time_on_duty,
initial_cost=validated_node.initial_cost
)
except ValidationError as e:
logger.error(f"Invalid metal node data: {e}")
if os.path.exists(VM_JSON):
with open(VM_JSON, "r") as file:
vms = json.load(file)
for vm in vms:
try:
validated_vm = VirtualMachine(**vm)
insert_virtual_machine(
name=validated_vm.name,
location=validated_vm.location,
cpu=validated_vm.cpu,
memory=validated_vm.memory,
storage=validated_vm.storage,
vm_type=validated_vm.vm_type
)
except ValidationError as e:
logger.error(f"Invalid VM data: {e}")
# Include routes
app.include_router(metal.router)
app.include_router(vm.router)
app.include_router(k8s.router)
app.include_router(export.router)
app.include_router(think.router)
@app.get("/")
def root():
return {"message": "Welcome to Metal Check API"}