Insightface
InsightFace: 2D and 3D Face Analysis Project
code: https://github.com/deepinsight/insightface
TensorRT
在Jetson AGX Orin上,onnx模型转tensorRT
Jetpack自带tensorRT
添加 trtexec
到路径
export PATH=/usr/src/tensorrt/bin:$PATH
转模型:
trtexec --onnx=model.onnx --saveEngine=model.plan
显式指定Batch大小
trtexec --onnx=det_2.5g.onnx --saveEngine=det_2.5g.plan --explicitBatch --minShapes=input.1:1x3x640x640 --optShapes=input.1:1x3x640x640 --maxShapes=input.1:1x3x640x640
检查模型输入输出脚本:
inspect.py
import argparse
import os
import sys
import glob
import tensorrt as trt
parser = argparse.ArgumentParser(description="Inspect TensorRT engine IO tensors")
parser.add_argument(
"--model",
type=str,
default=None,
help="Path to the .plan file; if not specified, all .plan files in current directory will be used"
)
args = parser.parse_args()
def inspect_engine(path):
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
with open(path, "rb") as f, trt.Runtime(TRT_LOGGER) as runtime:
engine = runtime.deserialize_cuda_engine(f.read())
print(f"\n=== Inspecting: {path} ===")
for i in range(engine.num_io_tensors):
name = engine.get_tensor_name(i)
shape = engine.get_tensor_shape(name)
dtype = engine.get_tensor_dtype(name)
mode = engine.get_tensor_mode(name)
io_type = "Input" if mode == trt.TensorIOMode.INPUT else "Output"
print(f"{io_type} - {name}: shape={shape}, dtype={dtype}")
if args.model:
# 单文件模式
if not os.path.isfile(args.model):
print(f"[错误] 找不到模型文件: {args.model}")
sys.exit(1)
inspect_engine(args.model)
else:
# 遍历当前路径下所有 .plan 文件
plan_files = glob.glob("*.plan")
if not plan_files:
print("[提示] 当前目录下未找到任何 .plan 文件")
sys.exit(0)
for pf in plan_files:
inspect_engine(pf)
Conda tensorRT
Jetson 上 tensorRT 只能通过软链接系统包实现, 具体版本自行更换
conda install mamba -c conda-forge
mamba env create -n trt python=3.8
cd ~/miniconda3/envs/trt/lib/python3.8/site-packages
ln -s /usr/lib/python3.8/dist-packages/tensorrt tensorrt
ln -s /usr/lib/python3.8/dist-packages/tensorrt-8.5.2.2.dist-info tensorrt-8.5.2.2.dist-info
ln -s /usr/lib/python3.8/dist-packages/onnx_graphsurgeon onnx_graphsurgeon
ln -s /usr/lib/python3.8/dist-packages/onnx_graphsurgeon-0.3.12.dist-info onnx_graphsurgeon-0.3.12.dist-info
ln -s /usr/lib/python3.8/dist-packages/uff uff
ln -s /usr/lib/python3.8/dist-packages/uff-0.6.9.dist-info uff-0.6.9.dist-info
mamba activate trt
pip install opencv-python
pip install scikit-image
sed -i 's/bool: np.bool/bool: bool/g' ~/miniconda3/envs/trt/lib/python3.8/site-packages/tensorrt/__init__.py
测试
python -c "import tensorrt as trt; print(trt.__version__)"
模型
模型有5个,详见链接:
det_2.5g
: 人脸检测1k3d68
: 3D 68点 landmark2d106det
: 2D 106点 landmarkgenderage
: 性别年龄w600k_r50
: 特征
基本框架
common.py
import numpy as np
from numpy.linalg import norm as l2norm
from loguru import logger
import tensorrt as trt
import pycuda.driver as cuda
import os
import cv2
import pycuda.autoinit # initializes CUDA context
def affine_crop(img, bbox, out_size):
x1, y1, x2, y2 = bbox[:4]
w, h = (x2 - x1), (y2 - y1)
center = ((x2 + x1) / 2.0, (y2 + y1) / 2.0)
scale = out_size / (max(w, h) * 1.5)
M = cv2.getRotationMatrix2D(center, 0, scale)
M[0, 2] += (out_size / 2.0 - center[0])
M[1, 2] += (out_size / 2.0 - center[1])
aimg = cv2.warpAffine(img, M, (out_size, out_size), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, borderValue=0)
return aimg, M
class Face(dict):
def __init__(self, d=None, **kwargs):
if d is None:
d = {}
if kwargs:
d.update(**kwargs)
for k, v in d.items():
setattr(self, k, v)
# Class attributes
#for k in self.__class__.__dict__.keys():
# if not (k.startswith('__') and k.endswith('__')) and not k in ('update', 'pop'):
# setattr(self, k, getattr(self, k))
def __setattr__(self, name, value):
if isinstance(value, (list, tuple)):
value = [self.__class__(x)
if isinstance(x, dict) else x for x in value]
elif isinstance(value, dict) and not isinstance(value, self.__class__):
value = self.__class__(value)
super(Face, self).__setattr__(name, value)
super(Face, self).__setitem__(name, value)
__setitem__ = __setattr__
def __getattr__(self, name):
return None
@property
def embedding_norm(self):
if self.embedding is None:
return None
return l2norm(self.embedding)
@property
def normed_embedding(self):
if self.embedding is None:
return None
return self.embedding / self.embedding_norm
@property
def sex(self):
if self.gender is None:
return None
return 'M' if self.gender==1 else 'F'
# ----------------------------
# Generic TensorRT runner
# ----------------------------
class TrtRunner:
def __init__(self, engine_path: str):
assert os.path.exists(engine_path), f"Missing engine: {engine_path}"
logger.info(f"Loading TRT engine: {engine_path}")
trt_logger = trt.Logger(trt.Logger.ERROR)
with open(engine_path, "rb") as f:
engine_data = f.read()
runtime = trt.Runtime(trt_logger)
self.engine = runtime.deserialize_cuda_engine(engine_data)
self.context = self.engine.create_execution_context()
self.stream = cuda.Stream()
logger.info(f"Engine created: {engine_path}")
# Bindings
self.input_indices = [i for i in range(self.engine.num_bindings) if self.engine.binding_is_input(i)]
self.output_indices = [i for i in range(self.engine.num_bindings) if not self.engine.binding_is_input(i)]
logger.debug(f"Bindings total={self.engine.num_bindings}, inputs={self.input_indices}, outputs={self.output_indices}")
# Allocated buffers
self.bindings = [None] * self.engine.num_bindings
self._allocated = False
def _nptype(self, dt):
return trt.nptype(dt)
def allocate(self, input_shape: tuple):
# Set dynamic shape (if needed) and allocate IO buffers
inp_idx = self.input_indices[0]
self.context.set_binding_shape(inp_idx, input_shape)
logger.debug(f"Allocating buffers for input shape={input_shape}")
# Input
in_dtype = self._nptype(self.engine.get_binding_dtype(inp_idx))
in_size = int(np.prod(input_shape))
self.in_host = cuda.pagelocked_empty(in_size, in_dtype)
self.in_dev = cuda.mem_alloc(self.in_host.nbytes)
self.bindings[inp_idx] = int(self.in_dev)
logger.debug(f"Input dtype={in_dtype}, size={in_size}, bytes={self.in_host.nbytes}")
# Outputs
self.out_hosts, self.out_devs, self.out_shapes = [], [], []
for oi in self.output_indices:
oshape = tuple(self.context.get_binding_shape(oi))
odtype = self._nptype(self.engine.get_binding_dtype(oi))
osize = int(np.prod(oshape))
o_host = cuda.pagelocked_empty(osize, odtype)
o_dev = cuda.mem_alloc(o_host.nbytes)
self.out_hosts.append(o_host)
self.out_devs.append(o_dev)
self.out_shapes.append(oshape)
self.bindings[oi] = int(o_dev)
logger.debug(f"Output[{oi}] shape={oshape}, dtype={odtype}, bytes={o_host.nbytes}")
self._allocated = True
def infer(self, input_array: np.ndarray):
assert self._allocated, "Call allocate(input_shape) before infer()"
assert tuple(input_array.shape) == tuple(self.context.get_binding_shape(self.input_indices[0]))
logger.debug("Starting TRT inference")
# HtoD input
np.copyto(self.in_host, input_array.ravel())
cuda.memcpy_htod_async(self.in_dev, self.in_host, self.stream)
# Execute
self.context.execute_async_v2(self.bindings, self.stream.handle)
# DtoH outputs
for o_host, o_dev in zip(self.out_hosts, self.out_devs):
cuda.memcpy_dtoh_async(o_host, o_dev, self.stream)
self.stream.synchronize()
logger.debug("Finished TRT inference")
# Wrap outputs
return [np.array(o_host).reshape(shape) for o_host, shape in zip(self.out_hosts, self.out_shapes)]
人脸检测
retinaface.py
from common import TrtRunner
from loguru import logger
import numpy as np
import cv2
import time
import pycuda.autoinit # initializes CUDA context
# ----------------------------
# RetinaFace helpers (pre/post)
# ----------------------------
def distance2bbox(points, distance, max_shape=None):
"""Decode distance prediction to bounding box.
Args:
points (Tensor): Shape (n, 2), [x, y].
distance (Tensor): Distance from the given point to 4
boundaries (left, top, right, bottom).
max_shape (tuple): Shape of the image.
Returns:
Tensor: Decoded bboxes.
"""
x1 = points[:, 0] - distance[:, 0]
y1 = points[:, 1] - distance[:, 1]
x2 = points[:, 0] + distance[:, 2]
y2 = points[:, 1] + distance[:, 3]
if max_shape is not None:
x1 = x1.clamp(min=0, max=max_shape[1])
y1 = y1.clamp(min=0, max=max_shape[0])
x2 = x2.clamp(min=0, max=max_shape[1])
y2 = y2.clamp(min=0, max=max_shape[0])
return np.stack([x1, y1, x2, y2], axis=-1)
def distance2kps(points, distance, max_shape=None):
"""Decode distance prediction to bounding box.
Args:
points (Tensor): Shape (n, 2), [x, y].
distance (Tensor): Distance from the given point to 4
boundaries (left, top, right, bottom).
max_shape (tuple): Shape of the image.
Returns:
Tensor: Decoded bboxes.
"""
preds = []
for i in range(0, distance.shape[1], 2):
px = points[:, i%2] + distance[:, i]
py = points[:, i%2+1] + distance[:, i+1]
if max_shape is not None:
px = px.clamp(min=0, max=max_shape[1])
py = py.clamp(min=0, max=max_shape[0])
preds.append(px)
preds.append(py)
return np.stack(preds, axis=-1)
def nms_det(dets, thresh=0.4):
if dets.size == 0:
return []
x1, y1, x2, y2, scores = dets[:, 0], dets[:, 1], dets[:, 2], dets[:, 3], dets[:, 4]
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
ovr = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(ovr <= thresh)[0]
order = order[inds + 1]
return keep
class RetinaFaceTRT:
def __init__(self, engine_path, det_thresh=0.5, nms_thresh=0.4, input_size=(640, 640)):
self.runner = TrtRunner(engine_path)
self.det_thresh = det_thresh
self.nms_thresh = nms_thresh
self.input_size = tuple(input_size) # (w, h)
self.input_mean = 127.5
self.input_std = 128.0
dummy = (1, 3, self.input_size[1], self.input_size[0])
self.runner.allocate(dummy)
self.outputs_count = len(self.runner.out_shapes)
logger.debug(f"TRT outputs_count={self.outputs_count}")
# default assumptions
if self.outputs_count in (6, 9):
self.fmc = 3
self._feat_stride_fpn = [8, 16, 32]
elif self.outputs_count in (10, 15):
self.fmc = 5
self._feat_stride_fpn = [8, 16, 32, 64, 128]
else:
self.fmc = None
self._feat_stride_fpn = []
# analyze bindings to build a robust map
# self._head_map = self._analyze_outputs(self.runner.out_shapes, self.input_size)
# self.use_kps = any('kps' in heads for heads in self._head_map.values())
# logger.info(f"Head map strides={sorted(self._head_map.keys())}, use_kps={self.use_kps}")
self.center_cache = {}
logger.debug(f"fmc={self.fmc}")
logger.debug(f"_feat_stride_fpn={self._feat_stride_fpn}")
def _analyze_outputs(self, out_shapes, input_size):
# Returns: { stride: { 'scores': meta, 'bbox': meta, 'kps': meta? } }
# meta = { 'index': int, 'layout': 'NCHW'|'NHWC', 'A': int, 'dims': int, 'H': int, 'W': int }
input_w, input_h = input_size
valid_strides = {8, 16, 32, 64, 128}
head_map = {}
for idx, shp in enumerate(out_shapes):
# accept rank-4 only
if len(shp) != 4:
continue
n, d1, d2, d3 = shp
# try NCHW
cand = []
for layout in ('NCHW', 'NHWC'):
if layout == 'NCHW':
C, H, W = d1, d2, d3
else:
H, W, C = d1, d2, d3
if H <= 0 or W <= 0:
continue
if (input_h % H) != 0 or (input_w % W) != 0:
continue
stride_h = input_h // H
stride_w = input_w // W
if stride_h != stride_w or stride_h not in valid_strides:
continue
stride = stride_h
# classify by channel count
role = None
A = 1
dims = None
if C % 4 == 0:
role = 'bbox'
dims = 4
A = C // 4
if C % 10 == 0:
# prefer kps if exactly divisible by 10
role = 'kps'
dims = 10
A = C // 10
# scores can be 1 or 2 per anchor; resolve last
if role is None:
role = 'scores'
# let reshape auto-detect per-anchor dims (1 or 2) using A from bbox later
dims = 0 # auto
A = 1
cand.append((stride, layout, role, A, dims, H, W))
if not cand:
continue
# pick the first candidate; for ambiguous cases NCHW usually correct
stride, layout, role, A, dims, H, W = cand[0]
meta = {'index': idx, 'layout': layout, 'A': int(A), 'dims': int(dims), 'H': int(H), 'W': int(W)}
if stride not in head_map:
head_map[stride] = {}
# if multiple candidates claim same role, prefer one with expected dims (bbox=4, kps=10, scores<=2)
if role in head_map[stride]:
prev = head_map[stride][role]
prefer = (role == 'bbox' and dims == 4) or (role == 'kps' and dims == 10) or (role == 'scores' and dims <= 2)
if prefer:
head_map[stride][role] = meta
else:
head_map[stride][role] = meta
# sanity: ensure bbox and scores for each stride exist; drop incomplete strides
for s in list(head_map.keys()):
if 'bbox' not in head_map[s] or 'scores' not in head_map[s]:
del head_map[s]
# if user specified fmc/strides earlier, filter to them; else keep discovered ones
if self._feat_stride_fpn:
head_map = {s: head_map[s] for s in self._feat_stride_fpn if s in head_map}
logger.debug(f"Built head_map: { {s: list(head_map[s].keys()) for s in head_map} }")
return head_map
def _reshape_head(self, arr, meta):
# Returns flattened per-location per-anchor array with shape:
# scores: (K*A,) or (K*A,2)
# bbox: (K*A,4)
# kps: (K*A,kps_dim)
layout, A, dims, H, W = meta['layout'], meta['A'], meta['dims'], meta['H'], meta['W']
if layout == 'NCHW':
# arr shape: (1, C, H, W)
arr = arr.reshape(1, -1, H, W)
arr = np.transpose(arr, (0, 2, 3, 1)) # (1,H,W,C)
else:
# arr shape: (1, H, W, C)
arr = arr.reshape(1, H, W, -1)
C = arr.shape[-1]
# auto-derive per-anchor dims if requested (dims <= 0)
if dims is None or dims <= 0:
if A > 0 and C % A == 0:
dims = C // A
else:
dims = C
# final shape: (H*W*A, dims)
out = arr.reshape(-1, C)
if dims == 1:
return out.reshape(-1)
out = out.reshape(-1, dims)
return out
# ----------------------------
# RetinaFace helpers (pre/post)
# ----------------------------
def distance2bbox(self, points, distance):
x1 = points[:, 0] - distance[:, 0]
y1 = points[:, 1] - distance[:, 1]
x2 = points[:, 0] + distance[:, 2]
y2 = points[:, 1] + distance[:, 3]
return np.stack([x1, y1, x2, y2], axis=-1)
def distance2kps(self, points, distance):
preds = []
for i in range(0, distance.shape[1], 2):
px = points[:, i % 2] + distance[:, i]
py = points[:, i % 2 + 1] + distance[:, i + 1]
preds.append(px)
preds.append(py)
return np.stack(preds, axis=-1)
def nms_det(self, dets, thresh=0.4):
if dets.size == 0:
return []
x1, y1, x2, y2, scores = dets[:, 0], dets[:, 1], dets[:, 2], dets[:, 3], dets[:, 4]
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
ovr = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(ovr <= thresh)[0]
order = order[inds + 1]
return keep
def _preprocess(self, img):
ih, iw = img.shape[:2]
in_w, in_h = self.input_size
im_ratio = ih / float(iw)
model_ratio = in_h / float(in_w)
if im_ratio > model_ratio:
new_h = in_h
new_w = int(round(new_h / im_ratio))
else:
new_w = in_w
new_h = int(round(new_w * im_ratio))
resized = cv2.resize(img, (new_w, new_h))
det_img = np.zeros((in_h, in_w, 3), dtype=np.uint8)
det_img[:new_h, :new_w, :] = resized
input_size = (in_w, in_h)
blob = cv2.dnn.blobFromImage(det_img, 1.0 / self.input_std, input_size,
(self.input_mean, self.input_mean, self.input_mean), swapRB=True)
det_scale = new_h / float(ih)
return blob, det_scale
def _reorder_retinaface_outputs(self, outputs):
"""
Reorder interleaved RetinaFace heads [score,bbox,kps]* into InsightFace order:
[scores_s8.., scores_s16.., scores_s32.., bboxes_*, kps_*]
Applies only when outputs match expected shapes:
- 9 heads with last-dims {1,4,10} (with kps), or
- 6 heads with last-dims {1,4} (no kps).
"""
try:
if not isinstance(outputs, (list, tuple)) or len(outputs) not in (6, 9):
return outputs
# Collect per-output (N, D) where D is last-dim (1 for scores, 4 for bbox, 10 for kps)
shapes = [o.shape for o in outputs]
# Only handle rank-2 heads: (N, D)
if any(len(s) != 2 for s in shapes):
return outputs
Ns = [s[0] for s in shapes]
Ds = [s[1] for s in shapes]
uniq_dims = sorted(set(Ds))
# Validate expected head dims
if len(outputs) == 9 and uniq_dims != [1, 4, 10]:
return outputs
if len(outputs) == 6 and uniq_dims != [1, 4]:
return outputs
# Group heads by their last-dimension (1=scores, 4=bbox, 10=kps)
groups = {d: [] for d in uniq_dims}
for o in outputs:
groups[o.shape[1]].append(o)
# Sort each group by N descending so strides are [s8, s16, s32]
for d in groups:
groups[d].sort(key=lambda a: a.shape[0], reverse=True)
order_dims = [1, 4] + ([10] if 10 in groups else [])
reordered = []
for d in order_dims:
reordered.extend(groups[d])
return reordered
except Exception:
# Fail-safe: if anything unexpected, return original order
return outputs
def detect(self, img, max_num=0, metric='default'):
blob, det_scale = self._preprocess(img)
logger.debug(f"Preprocess: blob.shape={blob.shape}, det_scale={det_scale:.6f}")
t0 = time.time()
net_outs = self.runner.infer(blob)
net_outs = self._reorder_retinaface_outputs(net_outs)
t_det = time.time() - t0
logger.debug(f"TRT detect time={t_det*1000:.2f} ms, outputs={len(net_outs)}")
for i, o in enumerate(net_outs):
logger.debug(f" output[{i}] shape={o.shape}")
# logger.debug(f"Detect net outputs={net_outs}")
input_height = blob.shape[2]
input_width = blob.shape[3]
fmc = 3
_num_anchors = 2
_feat_stride_fpn = [8, 16, 32]
use_kps = True
scores_list, bboxes_list, kpss_list = [], [], []
for idx, stride in enumerate(_feat_stride_fpn):
logger.debug(f"idx={idx}, stride={stride}")
scores = net_outs[idx]
bbox_preds = net_outs[idx+fmc]
bbox_preds = bbox_preds * stride
logger.debug(f"bbox_preds.spahe={bbox_preds.shape}")
if use_kps:
kps_preds = net_outs[idx+fmc*2] * stride
height = input_height // stride
width = input_width // stride
K = height * width
key = (height, width, stride)
if key in self.center_cache:
anchor_centers = self.center_cache[key]
else:
anchor_centers = np.stack(np.mgrid[:height, :width][::-1], axis=-1).astype(np.float32)
anchor_centers = (anchor_centers * stride).reshape( (-1, 2) )
if _num_anchors>1:
anchor_centers = np.stack([anchor_centers]*_num_anchors, axis=1).reshape( (-1,2) )
if len(self.center_cache)<100:
self.center_cache[key] = anchor_centers
pos_inds = np.where(scores>=self.det_thresh)[0]
bboxes = distance2bbox(anchor_centers, bbox_preds)
pos_scores = scores[pos_inds]
pos_bboxes = bboxes[pos_inds]
scores_list.append(pos_scores)
bboxes_list.append(pos_bboxes)
if use_kps:
kpss = distance2kps(anchor_centers, kps_preds)
#kpss = kps_preds
kpss = kpss.reshape( (kpss.shape[0], -1, 2) )
pos_kpss = kpss[pos_inds]
kpss_list.append(pos_kpss)
logger.debug(f"score_list={scores_list}")
logger.debug(f"bboxes_list={bboxes_list}")
logger.debug(f"kpss_list={kpss_list}")
if not scores_list:
logger.debug("No detections above threshold")
return np.zeros((0, 5), dtype=np.float32), None, t_det
scores = np.vstack(scores_list)
order = scores.ravel().argsort()[::-1]
bboxes = (np.vstack(bboxes_list) / det_scale).astype(np.float32, copy=False)
pre_det = np.hstack((bboxes, scores)).astype(np.float32, copy=False)
logger.debug(f"pre_det.shape={pre_det.shape}, top_score={pre_det[:,4].max():.4f}")
pre_det = pre_det[order, :]
keep = nms_det(pre_det, self.nms_thresh)
logger.debug(f"NMS keep={len(keep)} of {pre_det.shape[0]} (nms_thresh={self.nms_thresh})")
det = pre_det[keep, :]
kpss = None
if use_kps and kpss_list:
kpss = (np.vstack(kpss_list) / det_scale).astype(np.float32, copy=False)
kpss = kpss[order, :, :][keep, :, :]
if max_num > 0 and det.shape[0] > max_num:
logger.debug(f"Limiting detections to max_num={max_num} from {det.shape[0]}")
area = (det[:, 2] - det[:, 0]) * (det[:, 3] - det[:, 1])
img_center = img.shape[0] // 2, img.shape[1] // 2
offsets = np.vstack([
(det[:, 0] + det[:, 2]) / 2 - img_center[1],
(det[:, 1] + det[:, 3]) / 2 - img_center[0]
])
offset_dist_squared = np.sum(np.power(offsets, 2.0), 0)
values = area if metric == 'max' else area - offset_dist_squared * 2.0
bindex = np.argsort(values)[::-1][:max_num]
det = det[bindex, :]
if kpss is not None:
kpss = kpss[bindex, :]
return det, kpss, t_det
性别、年龄
attribute.py
from common import TrtRunner
from loguru import logger
import numpy as np
import cv2
import pycuda.autoinit # initializes CUDA context
from skimage import transform as trans
def transform(data, center, output_size, scale, rotation):
scale_ratio = scale
rot = float(rotation) * np.pi / 180.0
#translation = (output_size/2-center[0]*scale_ratio, output_size/2-center[1]*scale_ratio)
t1 = trans.SimilarityTransform(scale=scale_ratio)
cx = center[0] * scale_ratio
cy = center[1] * scale_ratio
t2 = trans.SimilarityTransform(translation=(-1 * cx, -1 * cy))
t3 = trans.SimilarityTransform(rotation=rot)
t4 = trans.SimilarityTransform(translation=(output_size / 2,
output_size / 2))
t = t1 + t2 + t3 + t4
M = t.params[0:2]
cropped = cv2.warpAffine(data,
M, (output_size, output_size),
borderValue=0.0)
return cropped, M
class AttributeTRT:
def __init__(self, engine_path):
self.runner = TrtRunner(engine_path)
self.input_mean = 0.0
self.input_std = 1.0
in_shape = self.runner.engine.get_binding_shape(self.runner.input_indices[0])
self.input_size = (96 if -1 in in_shape else in_shape[2], 96 if -1 in in_shape else in_shape[3])
def infer(self, img, face):
bbox = face.bbox
w, h = (bbox[2] - bbox[0]), (bbox[3] - bbox[1])
center = (bbox[2] + bbox[0]) / 2, (bbox[3] + bbox[1]) / 2
rotate = 0
_scale = self.input_size[0] / (max(w, h)*1.5)
aimg, M = transform(img, center, self.input_size[0], _scale, rotate)
input_size = tuple(aimg.shape[0:2][::-1])
blob = cv2.dnn.blobFromImage(aimg, 1.0/self.input_std, input_size,
(self.input_mean, self.input_mean, self.input_mean), swapRB=True)
req_shape = blob.shape
cur_shape = tuple(self.runner.context.get_binding_shape(self.runner.input_indices[0]))
if (not self.runner._allocated) or (cur_shape != req_shape):
self.runner.allocate(req_shape)
pred = self.runner.infer(blob)[0][0]
pred = np.asarray(pred).reshape(-1)
if pred.shape[0] == 3:
gender = int(np.argmax(pred[:2]))
age = int(np.round(pred[2] * 100))
return gender, age
return pred
landmark
landmark.py
from common import TrtRunner, affine_crop
from loguru import logger
import numpy as np
import cv2
import pycuda.autoinit # initializes CUDA context
# ----------------------------
# Landmark, Attribute, ArcFace TRT wrappers
# ----------------------------
def trans_points(pts, M):
# pts: (N,2) or (N,3) where last dim z not used for transform
pts2 = pts.copy()
xy = pts2[:, :2]
ones = np.ones((xy.shape[0], 1), dtype=xy.dtype)
xy1 = np.hstack([xy, ones])
dst = xy1 @ M.T
pts2[:, 0:2] = dst
return pts2
class LandmarkTRT:
def __init__(self, engine_path):
self.runner = TrtRunner(engine_path)
# Match InsightFace Landmark preprocessing (raw pixels)
self.input_mean = 0.0
self.input_std = 1.0
def infer(self, img, bbox):
# allocate using engine's declared input if static, else 192x192
# Read needed input size from binding shape (assume (1,3,H,W))
# If dynamic, we use 192x192; can adjust if your model differs
in_shape = self.runner.engine.get_binding_shape(self.runner.input_indices[0])
H = 192 if -1 in in_shape else in_shape[2]
W = 192 if -1 in in_shape else in_shape[3]
assert H == W, "Landmark input must be square"
aimg, M = affine_crop(img, bbox, H)
blob = cv2.dnn.blobFromImage(aimg, 1.0 / self.input_std, (W, H),
(self.input_mean, self.input_mean, self.input_mean), swapRB=True).astype(np.float32)
logger.debug(f"blob={blob}")
# Ensure allocation before inference
req_shape = blob.shape
cur_shape = tuple(self.runner.context.get_binding_shape(self.runner.input_indices[0]))
if (not self.runner._allocated) or (cur_shape != req_shape):
self.runner.allocate(req_shape)
out = self.runner.infer(blob)[0][0]
if out.shape[0] >= 3000:
pred = out.reshape((-1, 3))
pred[:, 0:2] += 1
pred[:, 0:2] *= (H // 2)
pred[:, 2] *= (H // 2)
else:
pred = out.reshape((-1, 2))
pred[:, 0:2] += 1
pred[:, 0:2] *= (H // 2)
IM = cv2.invertAffineTransform(M)
pred = trans_points(pred, IM)
return pred
特征
arcface.py
from common import TrtRunner
from loguru import logger
import numpy as np
import cv2
import pycuda.autoinit # initializes CUDA context
class ArcFaceTRT:
def __init__(self, engine_path):
self.runner = TrtRunner(engine_path)
self.input_mean = 127.5
self.input_std = 127.5
# arcface template for 112x112
self.dst5 = np.array([
[38.2946, 51.6963],
[73.5318, 51.5014],
[56.0252, 71.7366],
[41.5493, 92.3655],
[70.7299, 92.2041],
], dtype=np.float32)
def align_by_5p(self, img, kps5, out_size=112):
src = kps5.astype(np.float32)
dst = self.dst5.copy()
if out_size != 112:
scale = out_size / 112.0
dst *= scale
M, _ = cv2.estimateAffinePartial2D(src, dst, method=cv2.LMEDS)
aimg = cv2.warpAffine(img, M, (out_size, out_size), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT, borderValue=0)
return aimg
def embed(self, img, kps5):
in_shape = self.runner.engine.get_binding_shape(self.runner.input_indices[0])
H = 112 if -1 in in_shape else in_shape[2]
W = 112 if -1 in in_shape else in_shape[3]
aimg = self.align_by_5p(img, kps5, out_size=H)
blob = cv2.dnn.blobFromImage(aimg, 1.0 / self.input_std, (W, H),
(self.input_mean, self.input_mean, self.input_mean), swapRB=True).astype(np.float32)
# Ensure allocation before inference
req_shape = blob.shape
cur_shape = tuple(self.runner.context.get_binding_shape(self.runner.input_indices[0]))
if (not self.runner._allocated) or (cur_shape != req_shape):
self.runner.allocate(req_shape)
feat = self.runner.infer(blob)[0]
feat = feat.reshape(-1).astype(np.float32)
n = np.linalg.norm(feat) + 1e-12
return feat / n