mirror of
https://github.com/immich-app/immich.git
synced 2025-01-04 02:46:47 +01:00
1ec9a60e41
* configurable batch size, default openvino to 1 * update docs * don't add a new dependency for two lines * fix typing
86 lines
4 KiB
Python
86 lines
4 KiB
Python
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import numpy as np
|
|
import onnx
|
|
import onnxruntime as ort
|
|
from insightface.model_zoo import ArcFaceONNX
|
|
from insightface.utils.face_align import norm_crop
|
|
from numpy.typing import NDArray
|
|
from onnx.tools.update_model_dims import update_inputs_outputs_dims
|
|
from PIL import Image
|
|
|
|
from app.config import log, settings
|
|
from app.models.base import InferenceModel
|
|
from app.models.transforms import decode_cv2
|
|
from app.schemas import FaceDetectionOutput, FacialRecognitionOutput, ModelFormat, ModelSession, ModelTask, ModelType
|
|
|
|
|
|
class FaceRecognizer(InferenceModel):
|
|
depends = [(ModelType.DETECTION, ModelTask.FACIAL_RECOGNITION)]
|
|
identity = (ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION)
|
|
|
|
def __init__(self, model_name: str, min_score: float = 0.7, **model_kwargs: Any) -> None:
|
|
super().__init__(model_name, **model_kwargs)
|
|
self.min_score = model_kwargs.pop("minScore", min_score)
|
|
max_batch_size = settings.max_batch_size.facial_recognition if settings.max_batch_size else None
|
|
self.batch_size = max_batch_size if max_batch_size else self._batch_size_default
|
|
|
|
def _load(self) -> ModelSession:
|
|
session = self._make_session(self.model_path)
|
|
if (not self.batch_size or self.batch_size > 1) and str(session.get_inputs()[0].shape[0]) != "batch":
|
|
self._add_batch_axis(self.model_path)
|
|
session = self._make_session(self.model_path)
|
|
self.model = ArcFaceONNX(
|
|
self.model_path.with_suffix(".onnx").as_posix(),
|
|
session=session,
|
|
)
|
|
return session
|
|
|
|
def _predict(
|
|
self, inputs: NDArray[np.uint8] | bytes | Image.Image, faces: FaceDetectionOutput, **kwargs: Any
|
|
) -> FacialRecognitionOutput:
|
|
if faces["boxes"].shape[0] == 0:
|
|
return []
|
|
inputs = decode_cv2(inputs)
|
|
cropped_faces = self._crop(inputs, faces)
|
|
embeddings = self._predict_batch(cropped_faces)
|
|
return self.postprocess(faces, embeddings)
|
|
|
|
def _predict_batch(self, cropped_faces: list[NDArray[np.uint8]]) -> NDArray[np.float32]:
|
|
if not self.batch_size or len(cropped_faces) <= self.batch_size:
|
|
embeddings: NDArray[np.float32] = self.model.get_feat(cropped_faces)
|
|
return embeddings
|
|
|
|
batch_embeddings: list[NDArray[np.float32]] = []
|
|
for i in range(0, len(cropped_faces), self.batch_size):
|
|
batch_embeddings.append(self.model.get_feat(cropped_faces[i : i + self.batch_size]))
|
|
return np.concatenate(batch_embeddings, axis=0)
|
|
|
|
def postprocess(self, faces: FaceDetectionOutput, embeddings: NDArray[np.float32]) -> FacialRecognitionOutput:
|
|
return [
|
|
{
|
|
"boundingBox": {"x1": x1, "y1": y1, "x2": x2, "y2": y2},
|
|
"embedding": embedding,
|
|
"score": score,
|
|
}
|
|
for (x1, y1, x2, y2), embedding, score in zip(faces["boxes"], embeddings, faces["scores"])
|
|
]
|
|
|
|
def _crop(self, image: NDArray[np.uint8], faces: FaceDetectionOutput) -> list[NDArray[np.uint8]]:
|
|
return [norm_crop(image, landmark) for landmark in faces["landmarks"]]
|
|
|
|
def _add_batch_axis(self, model_path: Path) -> None:
|
|
log.debug(f"Adding batch axis to model {model_path}")
|
|
proto = onnx.load(model_path)
|
|
static_input_dims = [shape.dim_value for shape in proto.graph.input[0].type.tensor_type.shape.dim[1:]]
|
|
static_output_dims = [shape.dim_value for shape in proto.graph.output[0].type.tensor_type.shape.dim[1:]]
|
|
input_dims = {proto.graph.input[0].name: ["batch"] + static_input_dims}
|
|
output_dims = {proto.graph.output[0].name: ["batch"] + static_output_dims}
|
|
updated_proto = update_inputs_outputs_dims(proto, input_dims, output_dims)
|
|
onnx.save(updated_proto, model_path)
|
|
|
|
@property
|
|
def _batch_size_default(self) -> int | None:
|
|
providers = ort.get_available_providers()
|
|
return None if self.model_format == ModelFormat.ONNX and "OpenVINOExecutionProvider" not in providers else 1
|