一. 前言
我们决定使用oak相机采集rgb图像,识别人体的姿态,然后将姿态转换为英文字母,然后作为控制信号利用socket发送到unity3d中对小车进行控制。
二. 代码功能模块
1. socket通信
函数 socket.socket 创建一个 socket,该函数带有Address Family和Type两个参数。
Address Family:可以选择AF_INET(用于 Internet 进程间通信) 或者AF_UNIX(用于同一台机器进程间通信),实际工作中常用AF_INET。
Type:套接字类型,可以是SOCK_STREAM(流式套接字,主要用于 TCP 协议)或者SOCK_DGRAM(数据报套接字,主要用于 UDP 协议)
以下是使用到的主要方法:
sk.bind(address)将套接字绑定到地址。address地址的格式取决于地址族。在AF_INET下,以元组(host, port)的形式表示地址。
sk.listen(backlog)开始监听传入连接。backlog指定在拒绝连接之前,可以挂起的最大连接数量。backlog等于5,表示内核已经接到了连接请求,但服务器还没有调用accept进行处理的连接个数最大为5。这个值不能无限大,因为要在内核中维护连接队列。
sk.accept()接受连接并返回(conn, address),其中conn是新的套接字对象,可以用来接收和发送数据。address是连接客户端的地址。接收TCP 客户的连接(阻塞式)等待连接的到来。
def connectServer():
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print("Socket created")
try:
sk.bind((HOST, PORT)) # 绑定端口
except socket.error as msg:
print(msg)
sys.exit()
print("Socket bind complete")
sk.listen(10) # 开始监听传入连接
conn, addr = sk.accept() # 接受连接并返回
print("Connected with " + addr[0] + ":" + str(addr[1]))
return conn, sk
2. 将人体姿态转换为英文字母
MovenetDepthai.py模块使用了COCO数据集使用的关键点评估指标。
我们要识别手部动作,关键在于利用5、6、7、8四个特征点,分别对应left_shoulder、right_shoulder、left_elbow、right_elbow,也就是左右肩膀和左右手肘。
手臂在不同位置时,赋予不同的值,思路如下:
- 将特征点坐标由以图像坐标原点转为换为以肩部特征点为原点
调用body.keypoints[KEYPOINT_DICT['xxx']]可以获得特定关键点在图像中的坐标。
然后简单地利用相减转换一下坐标。
- 计算连线与竖直方向的夹角

- 将以肩膀为圆心,肩膀特征点与手肘特征点为半径的圆分成八等份,计算手臂落在哪个区域内

- 制定手肘位置和字字母之间的映射关系
由于控制小车运动只需要几个方向以及一些控制的操作,因此无需要定义所有字母。
semaphore_flag = {
(3, 4): 'W', (2, 4): '-', (1, 4): 'A', (0, 4): '-',
(4, 7): 'S', (4, 6): '-', (4, 5): 'D', (2, 3): '-',
(0, 3): 'Q', (0, 6): '-', (3, 0): 'E', (3, 7): '-',
(3, 6): 'J', (3, 5): '-', (2, 1): 'K', (2, 0): '-',
(2, 7): 'P', (2, 6): '-', (2, 5): 'L', (1, 0): '-',
(1, 7): 'U', (0, 5): '-', (7, 6): 'F', (7, 5): '-',
(1, 6): '-', (5, 6): '-',
}
- 将思路转换为代码
def recognize_gesture(body):
def angle_with_y(v):
# v: 平面坐标(x, y)
# 返回v与图像平面y轴的角度
if v[1] == 0:
return 90
angle = atan2(v[0], v[1]) # tan(angle) = v[0] / v[1]
return degrees(angle)
# 识别左右两边的肩和肘部的坐标,若没有识别到就返回None
# 利用COCO数据集使用的关键点评估指标
# body.scores[] 关键点的信任度
if body.scores[KEYPOINT_DICT['right_elbow']] < body.score_thresh or \
body.scores[KEYPOINT_DICT['right_shoulder']] < body.score_thresh or \
body.scores[KEYPOINT_DICT['left_elbow']] < body.score_thresh or \
body.scores[KEYPOINT_DICT['left_shoulder']] < body.score_thresh:
return None
# body.keypoints[] 关键点在图像中的坐标
# 将肩部的关键点视为坐标原点,计算肩部与肘部连线与y轴的夹角
right_arm_angle = angle_with_y(body.keypoints[KEYPOINT_DICT['right_elbow']] - body.keypoints[KEYPOINT_DICT['right_shoulder']])
left_arm_angle = angle_with_y(body.keypoints[KEYPOINT_DICT['left_elbow']] - body.keypoints[KEYPOINT_DICT['left_shoulder']])
# 分成八等份
right_pose = int((right_arm_angle + 202.5) / 45) % 8
left_pose = int((left_arm_angle + 202.5) / 45) % 8
letter = semaphore_flag.get((right_pose, left_pose), None)
if letter is not None:
letter = letter + str(right_pose) + str(left_pose)
return letter
3. 对视频的每一帧进行识别
使用MovenetDepthai模块进行人体姿态识别,并将识别结果实时渲染到采集的视频画面中,然后在画面中加上文字表示的识别结果。
while True:
frame, body = pose.next_frame() # 在下一帧中运行blazepose算法
if frame is None:
break
frame = renderer.draw(frame, body) # 绘制2D骨骼轮廓
letter = recognize_gesture(body) # 进行手势识别
if letter:
cv2.putText(frame, letter, (frame.shape[1] // 2, 100), cv2.FONT_HERSHEY_PLAIN, 5, (0, 190, 255), 3) # 在画面上添加文字
conn.sendall(letter[0:1].encode()) # 将数据发送到连接的客户端上
if letter == "F":
conn.close()
sk.close()
break
key = renderer.waitKey(delay=1)
三. 整体代码
import cv2
import sys
import socket
import argparse
from math import atan2, degrees
from MovenetDepthai import MovenetDepthai, KEYPOINT_DICT
from MovenetRenderer import MovenetRenderer
sys.path.append("")
# HOST = "10.27.130.226" # 本机的ip地址
HOST = "10.27.209.121"
# HOST = "192.168.112.43"
PORT = 8888
semaphore_flag = {
(3, 4): 'W', (2, 4): '-', (1, 4): 'A', (0, 4): '-',
(4, 7): 'S', (4, 6): '-', (4, 5): 'D', (2, 3): '-',
(0, 3): 'Q', (0, 6): '-', (3, 0): 'E', (3, 7): '-',
(3, 6): 'J', (3, 5): '-', (2, 1): 'K', (2, 0): '-',
(2, 7): 'P', (2, 6): '-', (2, 5): 'L', (1, 0): '-',
(1, 7): 'U', (0, 5): '-', (7, 6): 'F', (7, 5): '-',
(1, 6): '-', (5, 6): '-',
}
def connectServer():
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print("Socket created")
try:
sk.bind((HOST, PORT)) # 绑定端口
except socket.error as msg:
print(msg)
sys.exit()
print("Socket bind complete")
sk.listen(10) # 开始监听传入连接
conn, addr = sk.accept() # 接受连接并返回
print("Connected with " + addr[0] + ":" + str(addr[1]))
return conn, sk
def recognize_gesture(body):
def angle_with_y(v):
# v: 平面坐标(x, y)
# 返回v与图像平面y轴的角度
if v[1] == 0:
return 90
angle = atan2(v[0], v[1]) # tan(angle) = v[0] / v[1]
return degrees(angle)
# 识别左右两边的肩和肘部的坐标,若没有识别到就返回None
# 利用COCO数据集使用的关键点评估指标
# body.scores[] 关键点的信任度
if body.scores[KEYPOINT_DICT['right_elbow']] < body.score_thresh or \
body.scores[KEYPOINT_DICT['right_shoulder']] < body.score_thresh or \
body.scores[KEYPOINT_DICT['left_elbow']] < body.score_thresh or \
body.scores[KEYPOINT_DICT['left_shoulder']] < body.score_thresh:
return None
# body.keypoints[] 关键点在图像中的坐标
# 将肩部的关键点视为坐标原点,计算肩部与肘部连线与y轴的夹角
right_arm_angle = angle_with_y(body.keypoints[KEYPOINT_DICT['right_elbow']] - body.keypoints[KEYPOINT_DICT['right_shoulder']])
left_arm_angle = angle_with_y(body.keypoints[KEYPOINT_DICT['left_elbow']] - body.keypoints[KEYPOINT_DICT['left_shoulder']])
# 分成八等份
right_pose = int((right_arm_angle + 202.5) / 45) % 8
left_pose = int((left_arm_angle + 202.5) / 45) % 8
letter = semaphore_flag.get((right_pose, left_pose), None)
if letter is not None:
letter = letter + str(right_pose) + str(left_pose)
return letter
# 默认参数input='rgb', model='thunder', output=None
parser = argparse.ArgumentParser()
parser.add_argument("-m", "--model", type=str, choices=['lightning', 'thunder'], default='thunder',
help="Model to use (default=%(default)s")
parser.add_argument('-i', '--input', type=str, default='rgb',
help="'rgb' or 'rgb_laconic' or path to video/image file to use as input (default: %(default)s)")
parser.add_argument("-o", "--output",
help="Path to output video file")
args = parser.parse_args()
pose = MovenetDepthai(input_src=args.input, model=args.model)
renderer = MovenetRenderer(pose, output=args.output)
conn, sk = connectServer() # 创建socket连接
while True:
frame, body = pose.next_frame() # 在下一帧中运行blazepose算法
if frame is None:
break
frame = renderer.draw(frame, body) # 绘制2D骨骼轮廓
letter = recognize_gesture(body) # 进行手势识别
if letter:
cv2.putText(frame, letter, (frame.shape[1] // 2, 100), cv2.FONT_HERSHEY_PLAIN, 5, (0, 190, 255), 3) # 在画面上添加文字
conn.sendall(letter[0:1].encode()) # 将数据发送到连接的客户端上
if letter == "F":
conn.close()
sk.close()
break
key = renderer.waitKey(delay=1)
renderer.exit()
pose.exit()