1
0
Fork 0
mirror of https://github.com/immich-app/immich.git synced 2025-01-17 01:06:46 +01:00
immich/machine-learning/app/models/facial_recognition.py
renovate[bot] 20be42cec0
chore(deps): update machine-learning (#6302)
* chore(deps): update machine-learning

* fix typing, use new lifespan syntax

* wrap in try / finally

* move log

---------

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
Co-authored-by: mertalev <101130780+mertalev@users.noreply.github.com>
2024-01-13 05:00:09 +00:00

87 lines
2.8 KiB
Python

from pathlib import Path
from typing import Any
import cv2
import numpy as np
from insightface.model_zoo import ArcFaceONNX, RetinaFace
from insightface.utils.face_align import norm_crop
from numpy.typing import NDArray
from app.config import clean_name
from app.schemas import Face, ModelType, is_ndarray
from .base import InferenceModel
class FaceRecognizer(InferenceModel):
_model_type = ModelType.FACIAL_RECOGNITION
def __init__(
self,
model_name: str,
min_score: float = 0.7,
cache_dir: Path | str | None = None,
**model_kwargs: Any,
) -> None:
self.min_score = model_kwargs.pop("minScore", min_score)
super().__init__(clean_name(model_name), cache_dir, **model_kwargs)
def _load(self) -> None:
self.det_model = RetinaFace(session=self._make_session(self.det_file))
self.rec_model = ArcFaceONNX(self.rec_file.as_posix(), session=self._make_session(self.rec_file))
self.det_model.prepare(
ctx_id=0,
det_thresh=self.min_score,
input_size=(640, 640),
)
self.rec_model.prepare(ctx_id=0)
def _predict(self, image: NDArray[np.uint8] | bytes) -> list[Face]:
if isinstance(image, bytes):
decoded_image = cv2.imdecode(np.frombuffer(image, np.uint8), cv2.IMREAD_COLOR)
else:
decoded_image = image
assert is_ndarray(decoded_image, np.uint8)
bboxes, kpss = self.det_model.detect(decoded_image)
if bboxes.size == 0:
return []
assert is_ndarray(kpss, np.float32)
scores = bboxes[:, 4].tolist()
bboxes = bboxes[:, :4].round().tolist()
results = []
height, width, _ = decoded_image.shape
for (x1, y1, x2, y2), score, kps in zip(bboxes, scores, kpss):
cropped_img = norm_crop(decoded_image, kps)
embedding: NDArray[np.float32] = self.rec_model.get_feat(cropped_img)[0]
face: Face = {
"imageWidth": width,
"imageHeight": height,
"boundingBox": {
"x1": x1,
"y1": y1,
"x2": x2,
"y2": y2,
},
"score": score,
"embedding": embedding,
}
results.append(face)
return results
@property
def cached(self) -> bool:
return self.det_file.is_file() and self.rec_file.is_file()
@property
def det_file(self) -> Path:
return self.cache_dir / "detection" / "model.onnx"
@property
def rec_file(self) -> Path:
return self.cache_dir / "recognition" / "model.onnx"
def configure(self, **model_kwargs: Any) -> None:
self.det_model.det_thresh = model_kwargs.pop("minScore", self.det_model.det_thresh)