1
0
Fork 0
mirror of https://github.com/immich-app/immich.git synced 2025-01-08 12:56:48 +01:00
immich/machine-learning/app/main.py

158 lines
4.5 KiB
Python
Raw Normal View History

import asyncio
import gc
import os
2023-12-14 20:51:24 +01:00
import signal
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor
2023-12-14 20:51:24 +01:00
from typing import Any, Iterator
from zipfile import BadZipFile
import orjson
2023-12-14 20:51:24 +01:00
from fastapi import Depends, FastAPI, Form, HTTPException, UploadFile
from fastapi.responses import ORJSONResponse
from onnxruntime.capi.onnxruntime_pybind11_state import InvalidProtobuf, NoSuchFile
from starlette.formparsers import MultiPartParser
from app.models.base import InferenceModel
from .config import log, settings
from .models.cache import ModelCache
from .schemas import (
MessageResponse,
ModelType,
TextResponse,
)
MultiPartParser.max_file_size = 2**26 # spools to disk if payload is 64 MiB or larger
2023-05-17 19:07:17 +02:00
app = FastAPI()
2023-12-14 20:51:24 +01:00
model_cache = ModelCache(ttl=settings.model_ttl, revalidate=settings.model_ttl > 0)
thread_pool: ThreadPoolExecutor | None = None
lock = threading.Lock()
active_requests = 0
last_called: float | None = None
2023-04-26 12:39:24 +02:00
2023-12-14 20:51:24 +01:00
@app.on_event("startup")
def startup() -> None:
global thread_pool
log.info(
(
"Created in-memory cache with unloading "
f"{f'after {settings.model_ttl}s of inactivity' if settings.model_ttl > 0 else 'disabled'}."
)
)
# asyncio is a huge bottleneck for performance, so we use a thread pool to run blocking code
2023-12-14 20:51:24 +01:00
thread_pool = ThreadPoolExecutor(settings.request_threads) if settings.request_threads > 0 else None
if settings.model_ttl > 0 and settings.model_ttl_poll_s > 0:
asyncio.ensure_future(idle_shutdown_task())
log.info(f"Initialized request thread pool with {settings.request_threads} threads.")
2023-12-14 20:51:24 +01:00
@app.on_event("shutdown")
def shutdown() -> None:
log.handlers.clear()
for model in model_cache.cache._cache.values():
del model
if thread_pool is not None:
thread_pool.shutdown()
gc.collect()
def update_state() -> Iterator[None]:
global active_requests, last_called
active_requests += 1
last_called = time.time()
try:
yield
finally:
active_requests -= 1
@app.get("/", response_model=MessageResponse)
async def root() -> dict[str, str]:
2023-04-26 12:39:24 +02:00
return {"message": "Immich ML"}
@app.get("/ping", response_model=TextResponse)
def ping() -> str:
return "pong"
2023-12-14 20:51:24 +01:00
@app.post("/predict", dependencies=[Depends(update_state)])
async def predict(
model_name: str = Form(alias="modelName"),
model_type: ModelType = Form(alias="modelType"),
options: str = Form(default="{}"),
text: str | None = Form(default=None),
image: UploadFile | None = None,
) -> Any:
if image is not None:
inputs: str | bytes = await image.read()
elif text is not None:
inputs = text
else:
raise HTTPException(400, "Either image or text must be provided")
try:
kwargs = orjson.loads(options)
except orjson.JSONDecodeError:
raise HTTPException(400, f"Invalid options JSON: {options}")
2023-12-14 20:51:24 +01:00
model = await load(await model_cache.get(model_name, model_type, **kwargs))
model.configure(**kwargs)
outputs = await run(model, inputs)
return ORJSONResponse(outputs)
async def run(model: InferenceModel, inputs: Any) -> Any:
2023-12-14 20:51:24 +01:00
if thread_pool is None:
return model.predict(inputs)
2023-12-14 20:51:24 +01:00
return await asyncio.get_running_loop().run_in_executor(thread_pool, model.predict, inputs)
async def load(model: InferenceModel) -> InferenceModel:
if model.loaded:
return model
def _load() -> None:
2023-12-14 20:51:24 +01:00
with lock:
model.load()
loop = asyncio.get_running_loop()
try:
2023-12-14 20:51:24 +01:00
if thread_pool is None:
model.load()
else:
2023-12-14 20:51:24 +01:00
await loop.run_in_executor(thread_pool, _load)
return model
except (OSError, InvalidProtobuf, BadZipFile, NoSuchFile):
log.warn(
(
f"Failed to load {model.model_type.replace('_', ' ')} model '{model.model_name}'."
"Clearing cache and retrying."
)
)
model.clear_cache()
2023-12-14 20:51:24 +01:00
if thread_pool is None:
model.load()
else:
2023-12-14 20:51:24 +01:00
await loop.run_in_executor(thread_pool, _load)
return model
async def idle_shutdown_task() -> None:
while True:
log.debug("Checking for inactivity...")
2023-12-14 20:51:24 +01:00
if (
last_called is not None
and not active_requests
and not lock.locked()
and time.time() - last_called > settings.model_ttl
):
log.info("Shutting down due to inactivity.")
2023-12-14 20:51:24 +01:00
os.kill(os.getpid(), signal.SIGINT)
break
await asyncio.sleep(settings.model_ttl_poll_s)