前言
星瑞 O6 的 AI 能力依托先进的技术架构得以展现。其采用 Armv9 架构,集成了 Arm®v9 CPU 核心、Arm Immortalis™ GPU 以及安谋科技 “周易” NPU 。这样的异构架构设计是其 AI 性能的基石,在处理 AI 相关任务时,CPU、GPU、NPU 能够协同工作,大大加速 AI 模型的运行效率。
其中,“周易” NPU 发挥着关键作用,它拥有高达 30TOPS 的算力 。如此强大的算力使得星瑞 O6 能够成功适配多款主流 AI 模型,从而满足生成式 AI 在多元端侧场景的应用需求。
这里我们用NPU做目标检测,再配合supervison库做跟踪,进行车辆行驶速度的估计。
1. 整体步骤
1.1 下载supervision代码
https://github.com/roboflow/s...
下载测试视频:https://media.roboflow.com/su...
1.2 代码集成
supervision中已经做了行车速度的demo,但是它是用其他检测框架做的检测。这里我们需要做的是将检测部分的替换成我们星瑞O6的NPU后端。具体就不展开了。详细代码如下
import argparse
from collections import defaultdict, deque
import os
import cv2
import numpy as np
import torch
import time
import sys
sys.path.insert(0, "/home/radxa/1_AI_models/supervision-0.25.0")
_abs_path = os.path.join(os.path.dirname("/home/radxa/1_AI_models/ai_model_hub/models/ComputeVision/Object_Detection/onnx_yolov8_l/inference_npu.py"), "../../../../")
sys.path.append(_abs_path)
from utils.image_process import preprocess_object_detect_method1
from utils.object_detect_postprocess import postprocess_yolo, xywh2xyxy
from utils.NOE_Engine import EngineInfer
from PIL import Image
import supervision as sv
SOURCE = np.array([[1252, 787], [2298, 803], [5039, 2159], [-550, 2159]])
TARGET_WIDTH = 25
TARGET_HEIGHT = 250
TARGET = np.array(
[
[0, 0],
[TARGET_WIDTH - 1, 0],
[TARGET_WIDTH - 1, TARGET_HEIGHT - 1],
[0, TARGET_HEIGHT - 1],
]
)
class CIX_MODEL(object):
def __init__(self, model_path):
self.model = EngineInfer(model_path)
def __call__(self, pilimage):
pilimage = Image.fromarray(pilimage)
src_shape, new_shape, show_image, data = preprocess_object_detect_method1(
pilimage, target_size=(640, 640), mode="BGR"
)
pred = self.model.forward(data.astype(np.float32))[0]
pred = np.reshape(pred, (84, 8400))
pred = np.transpose(pred, (1, 0))
results = postprocess_yolo(pred, 0.3, 0.45)
bbox_xywh = results[:, :4]
bbox_xyxy = xywh2xyxy(bbox_xywh)
x_scale = src_shape[1] / new_shape[1]
y_scale = src_shape[0] / new_shape[0]
bbox_xyxy *= (x_scale, y_scale, x_scale, y_scale)
results[:, :4] = bbox_xyxy
return results
def add_fps(image, fps, thickness):
height, width, _ = image.shape
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 2
color = (0, 0, 255)
text = f"FPS: {fps:.2f}"
text_size = cv2.getTextSize(text, font, font_scale, thickness)[0]
x = width - text_size[0] - 10
y = text_size[1] + 10
cv2.putText(image, text, (x, y), font, font_scale, color, thickness)
class ViewTransformer:
def __init__(self, source: np.ndarray, target: np.ndarray) -> None:
source = source.astype(np.float32)
target = target.astype(np.float32)
self.m = cv2.getPerspectiveTransform(source, target)
def transform_points(self, points: np.ndarray) -> np.ndarray:
if points.size == 0:
return points
reshaped_points = points.reshape(-1, 1, 2).astype(np.float32)
transformed_points = cv2.perspectiveTransform(reshaped_points, self.m)
return transformed_points.reshape(-1, 2)
def parse_arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Vehicle Speed Estimation using Ultralytics and Supervision"
)
parser.add_argument(
"--source_video_path",
required=True,
help="Path to the source video file",
type=str,
)
parser.add_argument(
"--target_video_path",
required=True,
help="Path to the target video file (output)",
type=str,
)
parser.add_argument(
"--confidence_threshold",
default=0.3,
help="Confidence threshold for the model",
type=float,
)
parser.add_argument(
"--iou_threshold", default=0.7, help="IOU threshold for the model", type=float
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_arguments()
video_info = sv.VideoInfo.from_video_path(video_path=args.source_video_path)
model = CIX_MODEL("/home/radxa/1_AI_models/ai_model_hub/asserts/models/yolov8_l/yolov8_l.cix")
byte_track = sv.ByteTrack(
frame_rate=video_info.fps, track_activation_threshold=args.confidence_threshold
)
thickness = sv.calculate_optimal_line_thickness(
resolution_wh=video_info.resolution_wh
)
text_scale = sv.calculate_optimal_text_scale(resolution_wh=video_info.resolution_wh)
box_annotator = sv.BoxAnnotator(thickness=thickness)
label_annotator = sv.LabelAnnotator(
text_scale=text_scale,
text_thickness=thickness,
text_position=sv.Position.BOTTOM_CENTER,
)
trace_annotator = sv.TraceAnnotator(
thickness=thickness,
trace_length=video_info.fps * 2,
position=sv.Position.BOTTOM_CENTER,
)
frame_generator = sv.get_video_frames_generator(source_path=args.source_video_path)
polygon_zone = sv.PolygonZone(polygon=SOURCE)
view_transformer = ViewTransformer(source=SOURCE, target=TARGET)
coordinates = defaultdict(lambda: deque(maxlen=video_info.fps))
with sv.VideoSink(args.target_video_path, video_info) as sink:
for frame in frame_generator:
start_time = time.perf_counter()
result = model(frame)
detections = sv.Detections(xyxy=result[:, :4],
confidence=result[:, 4],
class_id=result[:, 5].astype(int))
detections = detections[detections.confidence > args.confidence_threshold]
detections = detections[polygon_zone.trigger(detections)]
detections = detections.with_nms(threshold=args.iou_threshold)
detections = byte_track.update_with_detections(detections=detections)
points = detections.get_anchors_coordinates(
anchor=sv.Position.BOTTOM_CENTER
)
points = view_transformer.transform_points(points=points).astype(int)
for tracker_id, [_, y] in zip(detections.tracker_id, points):
coordinates[tracker_id].append(y)
labels = []
for tracker_id in detections.tracker_id:
if len(coordinates[tracker_id]) < video_info.fps / 2:
labels.append(f"#{tracker_id}")
else:
coordinate_start = coordinates[tracker_id][-1]
coordinate_end = coordinates[tracker_id][0]
distance = abs(coordinate_start - coordinate_end)
use_time = len(coordinates[tracker_id]) / video_info.fps
speed = distance / use_time * 3.6
labels.append(f"#{tracker_id} {int(speed)} km/h")
end_time = time.perf_counter()
annotated_frame = frame.copy()
annotated_frame = trace_annotator.annotate(
scene=annotated_frame, detections=detections
)
annotated_frame = box_annotator.annotate(
scene=annotated_frame, detections=detections
)
annotated_frame = label_annotator.annotate(
scene=annotated_frame, detections=detections, labels=labels
)
fps = 1 / (end_time - start_time)
add_fps(annotated_frame, fps, thickness)
cv2.imwrite("data/test.jpg", annotated_frame)
sink.write_frame(annotated_frame)
cv2.imshow("frame", annotated_frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cv2.destroyAllWindows()
1.3 运行demo
source /home/radxa/1_AI_models/ai_model_hub/.venv/bin/activate
python examples/speed_estimation/cix_example.py --source_video_path data/vehicles.mp4 --target_video_path data/vehicles-cixyolov8l.mp4 --confidence_threshold 0.3 --iou_threshold 0.5
会将可视化结果保存成视频。
https://www.bilibili.com/vide...
2.总结
整个pipeline,检测+跟踪下来只有6.xxfps,检测是可以做到20+fps的。