mirror of
https://github.com/immich-app/immich.git
synced 2025-01-11 06:10:28 +02:00
328a58ac0d
added models to config dropdown fixed downloading updated tests use hf for face models formatting
101 lines
3.1 KiB
Python
101 lines
3.1 KiB
Python
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import onnxruntime as ort
|
|
from insightface.model_zoo import ArcFaceONNX, RetinaFace
|
|
from insightface.utils.face_align import norm_crop
|
|
|
|
from app.config import clean_name
|
|
from app.schemas import ModelType, ndarray_f32
|
|
|
|
from .base import InferenceModel
|
|
|
|
|
|
class FaceRecognizer(InferenceModel):
|
|
_model_type = ModelType.FACIAL_RECOGNITION
|
|
|
|
def __init__(
|
|
self,
|
|
model_name: str,
|
|
min_score: float = 0.7,
|
|
cache_dir: Path | str | None = None,
|
|
**model_kwargs: Any,
|
|
) -> None:
|
|
self.min_score = model_kwargs.pop("minScore", min_score)
|
|
super().__init__(clean_name(model_name), cache_dir, **model_kwargs)
|
|
|
|
def _load(self) -> None:
|
|
self.det_model = RetinaFace(
|
|
session=ort.InferenceSession(
|
|
self.det_file.as_posix(),
|
|
sess_options=self.sess_options,
|
|
providers=self.providers,
|
|
provider_options=self.provider_options,
|
|
),
|
|
)
|
|
self.rec_model = ArcFaceONNX(
|
|
self.rec_file.as_posix(),
|
|
session=ort.InferenceSession(
|
|
self.rec_file.as_posix(),
|
|
sess_options=self.sess_options,
|
|
providers=self.providers,
|
|
provider_options=self.provider_options,
|
|
),
|
|
)
|
|
|
|
self.det_model.prepare(
|
|
ctx_id=0,
|
|
det_thresh=self.min_score,
|
|
input_size=(640, 640),
|
|
)
|
|
self.rec_model.prepare(ctx_id=0)
|
|
|
|
def _predict(self, image: ndarray_f32 | bytes) -> list[dict[str, Any]]:
|
|
if isinstance(image, bytes):
|
|
image = cv2.imdecode(np.frombuffer(image, np.uint8), cv2.IMREAD_COLOR)
|
|
bboxes, kpss = self.det_model.detect(image)
|
|
if bboxes.size == 0:
|
|
return []
|
|
assert isinstance(image, np.ndarray) and isinstance(kpss, np.ndarray)
|
|
|
|
scores = bboxes[:, 4].tolist()
|
|
bboxes = bboxes[:, :4].round().tolist()
|
|
|
|
results = []
|
|
height, width, _ = image.shape
|
|
for (x1, y1, x2, y2), score, kps in zip(bboxes, scores, kpss):
|
|
cropped_img = norm_crop(image, kps)
|
|
embedding = self.rec_model.get_feat(cropped_img)[0].tolist()
|
|
results.append(
|
|
{
|
|
"imageWidth": width,
|
|
"imageHeight": height,
|
|
"boundingBox": {
|
|
"x1": x1,
|
|
"y1": y1,
|
|
"x2": x2,
|
|
"y2": y2,
|
|
},
|
|
"score": score,
|
|
"embedding": embedding,
|
|
}
|
|
)
|
|
return results
|
|
|
|
@property
|
|
def cached(self) -> bool:
|
|
return self.det_file.is_file() and self.rec_file.is_file()
|
|
|
|
@property
|
|
def det_file(self) -> Path:
|
|
return self.cache_dir / "detection" / "model.onnx"
|
|
|
|
@property
|
|
def rec_file(self) -> Path:
|
|
return self.cache_dir / "recognition" / "model.onnx"
|
|
|
|
def configure(self, **model_kwargs: Any) -> None:
|
|
self.det_model.det_thresh = model_kwargs.pop("minScore", self.det_model.det_thresh)
|