xy/kk_tfcard_save_api.py
2025-12-03 16:13:38 +08:00

253 lines
11 KiB
Python

import os
import json
import random
from datetime import datetime
import time
from PIL import Image, ImageDraw
import numpy as np
import cv2
#pip install Pillow 2_28
#python -m pip install Pillow -i https://mirrors.aliyun.com/pypi/simple/
#python -m pip install spdlog -i https://mirrors.aliyun.com/pypi/simple/
# python -m pip install paramiko -i https://mirrors.aliyun.com/pypi/simple/
class save_data_manager:
def __init__(self,base_path,robot_id):
self.base_path_ = base_path # TF卡挂载点
self.robot_id_ = robot_id
#数据库对象
def save_data_to_tfcard_json(self,json_data,dir_name,json_type):
"""生成并保存数据到TF卡"""
now = datetime.now()
date_str = now.strftime("%Y%m%d")
hour_str = now.strftime("%H")
# 1. 创建目录
save_dir = os.path.join(self.base_path_, "data", "device_info", self.robot_id_, dir_name, date_str, f"{date_str}_{hour_str}")
os.makedirs(save_dir, exist_ok=True)
# 2. 创建文件
timestamp = None
if json_data['timestamp']:
timestamp = json_data['timestamp']
else:
timestamp = datetime.now().strftime("%Y%m%d_%H_%M_%S")
filename = f"{timestamp}_{self.robot_id_}_{json_type}.json"
# 3. 写入json
json_save_path = os.path.join(save_dir, filename)
print("json_path:",json_save_path)
with open(json_save_path, "w") as f:
json.dump(json_data, f, indent=2)
print(f"Saved face data: {filename}")
return json_save_path
def save_data_to_tfcard_jpg(self,frame_data,dir_name,jpg_type):
"""生成并保存数据到TF卡"""
now = datetime.now()
date_str = now.strftime("%Y%m%d")
hour_str = now.strftime("%H")
# 1. 创建目录
save_dir = os.path.join(self.base_path_, "data", "device_info", self.robot_id_, dir_name, date_str, f"{date_str}_{hour_str}")
os.makedirs(save_dir, exist_ok=True)
# 2. 创建文件
timestamp = datetime.now().strftime("%Y%m%d_%H_%M_%S")
filename = f"{timestamp}_{self.robot_id_}_{jpg_type}.jpg"
# 3. 写入jpg
jpg_save_path = os.path.join(save_dir, filename)
print("jpg_save_path:",jpg_save_path)
cv2.imwrite(jpg_save_path,frame_data)
return jpg_save_path
"""生成模拟人脸关键点数据"""
def generate_fake_face_data(self):
"""生成模拟人脸关键点数据"""
return {
"timestamp": datetime.now().strftime("%Y%m%d_%H_%M_%S"),
"robot_id": self.robot_id_,
"results": [
{"x": round(random.uniform(0, 200),0), "y": round(random.uniform(0, 1),0)} for _ in range(68)
],
"confidence": random.uniform(0.7, 0.99)
}
"""生成模拟人体姿态数据"""
def generate_fake_pose_data(self):
"""生成模拟人体姿态数据"""
keypoints = ["nose", "left_eye", "right_eye", "left_ear", "right_ear",
"left_shoulder", "right_shoulder", "left_elbow", "right_elbow",
"left_wrist", "right_wrist", "left_hip", "right_hip",
"left_knee", "right_knee", "left_ankle", "right_ankle"]
return {
"timestamp": datetime.now().strftime("%Y%m%d_%H_%M_%S"),
"robot_id": self.robot_id_,
"results": {kp: {
"x": round(random.uniform(0, 200),0),
"y": round(random.uniform(0, 200),0),
"score": round(random.uniform(0.7, 0.99),2)
} for kp in keypoints}
}
"""生成模拟-血样-传感器数据"""
def generate_fake_sensor_001_data(self):
print("生成模拟-血样-传感器数据")
mks_data_random = {
'heart_rate': random.randint(60, 100), # 假设 心率 在60到180之间
'blood_oxygen': random.randint(85, 100), # 假设 血氧 在85到100之间
'micro_circulation': random.randint(50, 100), # 假设 微循环值 在50到100之间
'fatigue_index': random.randint(0, 100), # 假设 疲劳 指数在0到100之间
'systolic_pressure': random.randint(90, 180), # 假设 收缩压 在90到180之间
'diastolic_pressure': random.randint(60, 120), # 假设 舒张压 在60到120之间
'cardiac_output': random.randint(40, 100) # 假设 心输出 在40到100之间
}
print("mks_data_random:",mks_data_random)
return {
"timestamp": datetime.now().strftime("%Y%m%d_%H_%M_%S"),
"robot_id": self.robot_id_,
"result": mks_data_random,
}
def generate_sensor_001_data(self,mks_data_random):
print("生成模拟-血样-传感器数据")
# mks_data_random = {
# 'heart_rate': random.randint(60, 100), # 假设 心率 在60到180之间
# 'blood_oxygen': random.randint(85, 100), # 假设 血氧 在85到100之间
# 'micro_circulation': random.randint(50, 100), # 假设 微循环值 在50到100之间
# 'fatigue_index': random.randint(0, 100), # 假设 疲劳 指数在0到100之间
# 'systolic_pressure': random.randint(90, 180), # 假设 收缩压 在90到180之间
# 'diastolic_pressure': random.randint(60, 120), # 假设 舒张压 在60到120之间
# 'cardiac_output': random.randint(40, 100) # 假设 心输出 在40到100之间
# }
print("mks_data_random:",mks_data_random)
return {
"timestamp": datetime.now().strftime("%Y%m%d_%H_%M_%S"),
"robot_id": self.robot_id_,
"result": mks_data_random,
}
"""生成模拟-毫米波-传感器数据"""
def generate_fake_sensor_002_data(self):
print("生成模拟-毫米波-传感器数据")
sensor_data = {
"Part1Rst": {
"Battery": 100,
"Status": ["01"],
"CountNum": random.randint(1, 4),
"IsFall": 0
},
"Part2Rst": {
"TargetNum": 1,
"Target0": {
"ID": 16,
"PtNum": 22,
"PointCloud": [
[-0.0691750283216743, 0.13160428640200078, 1.3956739328119288],
[-0.07847705523541838, 0.15392815246426594, 1.383871249877477],
[-0.08118316058836383, 0.15254387167066272, 1.3841153359302658],
[-0.08118316058836383, 0.15254387167066272, 1.3841153359302658],
[-0.0929625, 0.17252076850321676, 1.3695799337803236],
[-0.08985277963433623, 0.1735465559405953, 1.366252506565061],
[-0.08981986049799257, 0.1729446846963683, 1.3632120794907499],
[-0.09402106608514023, 0.2008075149795186, 1.3752118342759272],
[-0.16259914164055408, 0.3659265315697133, 1.2686130237385964],
[-0.16217132883698243, 0.36379021511710175, 1.2623032883896637],
[-0.16910310730617623, 0.3631439575505753, 1.2691036666139888],
[-0.1638440883521219, 0.3831408593633806, 1.2404052475851655],
[-0.17067092536679368, 0.3803548338347067, 1.2408964990548133],
[-0.17067092536679368, 0.3803548338347067, 1.2408964990548133],
[-0.17294653770501756, 0.4044264626613462, 1.2315388724510081],
[-0.17294653770501756, 0.4044264626613462, 1.2315388724510081],
[-0.17235096722025173, 0.40169857121649716, 1.224546809430924],
[-0.16789502279593474, 0.4368924567706128, 1.236433858829858],
[-0.16789502279593474, 0.4368924567706128, 1.236433858829858],
[-0.16789502279593474, 0.4368924567706128, 1.236433858829858],
[-0.16719017622805205, 0.5141370904782002, 1.2188803542752917],
[-0.16758717901763867, 0.5169416889917281, 1.227432168376347]
],
"CenterPoint": [-0.12395, 0.34706, 1.27605]
}
},
"Part3Rst": {
"TargetNum": 1,
"Target0": {
"ID": 16,
"Point": [-0.12395, 0.32227, 1.2512599999999998],
"HeartRate": random.randint(50, 65),
"BreathRate": 4
}
}
}
print("sensor_data:",sensor_data)
return {
"timestamp": datetime.now().strftime("%Y%m%d_%H_%M_%S"),
"robot_id": self.robot_id_,
"result": sensor_data,
}
"""生成模拟图片"""
def generate_fake_image(self,width=640, height=480):
"""生成模拟图片"""
img = Image.new("RGB", (width, height), color=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)))
draw = ImageDraw.Draw(img)
# 添加一些随机形状
for _ in range(random.randint(3, 10)):
x1 = random.randint(0, width)
y1 = random.randint(0, height)
x2 = random.randint(x1, width)
y2 = random.randint(y1, height)
draw.rectangle([x1, y1, x2, y2],
fill=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)))
# 将 PIL 图像转换为 NumPy 数组
numpy_image = np.array(img)
return numpy_image
if __name__ == "__main__":
base_path = "./sftp-data_1" # TF卡挂载点
robot_id = "d80801000058"
#json写入管理(完成)
test = save_data_manager(base_path,robot_id)
#数据插入管理
#数据库同步管理(完成)
#生成人脸数据
# face_data = test.generate_fake_face_data()
# test.save_data_to_tfcard_json(face_data,dir_name="face",json_type="face")
# #TODO 插入数据库
# #人体人脸数据
# pose_data = test.generate_fake_pose_data()
# test.save_data_to_tfcard_json(pose_data,dir_name="pose",json_type="pose")
# #TODO 插入数据库
#传感器数据
sensor_data = test.generate_fake_sensor_001_data()
test.save_data_to_tfcard_json(sensor_data,dir_name="sensor_001",json_type="sensor_001")
sensor_data = test.generate_fake_sensor_002_data()
test.save_data_to_tfcard_json(sensor_data,dir_name="sensor_002",json_type="sensor_002")
#TODO 插入数据
# frame_data = test.generate_fake_image()
# test.save_data_to_tfcard_jpg(frame_data,dir_name="face_jpg",jpg_type="jpg")
#TODO 插入数据库