import os import json import random from datetime import datetime import time from PIL import Image, ImageDraw import numpy as np import cv2 #pip install Pillow 2_28 #python -m pip install Pillow -i https://mirrors.aliyun.com/pypi/simple/ #python -m pip install spdlog -i https://mirrors.aliyun.com/pypi/simple/ # python -m pip install paramiko -i https://mirrors.aliyun.com/pypi/simple/ class save_data_manager: def __init__(self,base_path,robot_id): self.base_path_ = base_path # TF卡挂载点 self.robot_id_ = robot_id #数据库对象 def save_data_to_tfcard_json(self,json_data,dir_name,json_type): """生成并保存数据到TF卡""" now = datetime.now() date_str = now.strftime("%Y%m%d") hour_str = now.strftime("%H") # 1. 创建目录 save_dir = os.path.join(self.base_path_, "data", "device_info", self.robot_id_, dir_name, date_str, f"{date_str}_{hour_str}") os.makedirs(save_dir, exist_ok=True) # 2. 创建文件 timestamp = None if json_data['timestamp']: timestamp = json_data['timestamp'] else: timestamp = datetime.now().strftime("%Y%m%d_%H_%M_%S") filename = f"{timestamp}_{self.robot_id_}_{json_type}.json" # 3. 写入json json_save_path = os.path.join(save_dir, filename) print("json_path:",json_save_path) with open(json_save_path, "w") as f: json.dump(json_data, f, indent=2) print(f"Saved face data: {filename}") return json_save_path def save_data_to_tfcard_jpg(self,frame_data,dir_name,jpg_type): """生成并保存数据到TF卡""" now = datetime.now() date_str = now.strftime("%Y%m%d") hour_str = now.strftime("%H") # 1. 创建目录 save_dir = os.path.join(self.base_path_, "data", "device_info", self.robot_id_, dir_name, date_str, f"{date_str}_{hour_str}") os.makedirs(save_dir, exist_ok=True) # 2. 创建文件 timestamp = datetime.now().strftime("%Y%m%d_%H_%M_%S") filename = f"{timestamp}_{self.robot_id_}_{jpg_type}.jpg" # 3. 写入jpg jpg_save_path = os.path.join(save_dir, filename) print("jpg_save_path:",jpg_save_path) cv2.imwrite(jpg_save_path,frame_data) return jpg_save_path """生成模拟人脸关键点数据""" def generate_fake_face_data(self): """生成模拟人脸关键点数据""" return { "timestamp": datetime.now().strftime("%Y%m%d_%H_%M_%S"), "robot_id": self.robot_id_, "results": [ {"x": round(random.uniform(0, 200),0), "y": round(random.uniform(0, 1),0)} for _ in range(68) ], "confidence": random.uniform(0.7, 0.99) } """生成模拟人体姿态数据""" def generate_fake_pose_data(self): """生成模拟人体姿态数据""" keypoints = ["nose", "left_eye", "right_eye", "left_ear", "right_ear", "left_shoulder", "right_shoulder", "left_elbow", "right_elbow", "left_wrist", "right_wrist", "left_hip", "right_hip", "left_knee", "right_knee", "left_ankle", "right_ankle"] return { "timestamp": datetime.now().strftime("%Y%m%d_%H_%M_%S"), "robot_id": self.robot_id_, "results": {kp: { "x": round(random.uniform(0, 200),0), "y": round(random.uniform(0, 200),0), "score": round(random.uniform(0.7, 0.99),2) } for kp in keypoints} } """生成模拟-血样-传感器数据""" def generate_fake_sensor_001_data(self): print("生成模拟-血样-传感器数据") mks_data_random = { 'heart_rate': random.randint(60, 100), # 假设 心率 在60到180之间 'blood_oxygen': random.randint(85, 100), # 假设 血氧 在85到100之间 'micro_circulation': random.randint(50, 100), # 假设 微循环值 在50到100之间 'fatigue_index': random.randint(0, 100), # 假设 疲劳 指数在0到100之间 'systolic_pressure': random.randint(90, 180), # 假设 收缩压 在90到180之间 'diastolic_pressure': random.randint(60, 120), # 假设 舒张压 在60到120之间 'cardiac_output': random.randint(40, 100) # 假设 心输出 在40到100之间 } print("mks_data_random:",mks_data_random) return { "timestamp": datetime.now().strftime("%Y%m%d_%H_%M_%S"), "robot_id": self.robot_id_, "result": mks_data_random, } def generate_sensor_001_data(self,mks_data_random): print("生成模拟-血样-传感器数据") # mks_data_random = { # 'heart_rate': random.randint(60, 100), # 假设 心率 在60到180之间 # 'blood_oxygen': random.randint(85, 100), # 假设 血氧 在85到100之间 # 'micro_circulation': random.randint(50, 100), # 假设 微循环值 在50到100之间 # 'fatigue_index': random.randint(0, 100), # 假设 疲劳 指数在0到100之间 # 'systolic_pressure': random.randint(90, 180), # 假设 收缩压 在90到180之间 # 'diastolic_pressure': random.randint(60, 120), # 假设 舒张压 在60到120之间 # 'cardiac_output': random.randint(40, 100) # 假设 心输出 在40到100之间 # } print("mks_data_random:",mks_data_random) return { "timestamp": datetime.now().strftime("%Y%m%d_%H_%M_%S"), "robot_id": self.robot_id_, "result": mks_data_random, } """生成模拟-毫米波-传感器数据""" def generate_fake_sensor_002_data(self): print("生成模拟-毫米波-传感器数据") sensor_data = { "Part1Rst": { "Battery": 100, "Status": ["01"], "CountNum": random.randint(1, 4), "IsFall": 0 }, "Part2Rst": { "TargetNum": 1, "Target0": { "ID": 16, "PtNum": 22, "PointCloud": [ [-0.0691750283216743, 0.13160428640200078, 1.3956739328119288], [-0.07847705523541838, 0.15392815246426594, 1.383871249877477], [-0.08118316058836383, 0.15254387167066272, 1.3841153359302658], [-0.08118316058836383, 0.15254387167066272, 1.3841153359302658], [-0.0929625, 0.17252076850321676, 1.3695799337803236], [-0.08985277963433623, 0.1735465559405953, 1.366252506565061], [-0.08981986049799257, 0.1729446846963683, 1.3632120794907499], [-0.09402106608514023, 0.2008075149795186, 1.3752118342759272], [-0.16259914164055408, 0.3659265315697133, 1.2686130237385964], [-0.16217132883698243, 0.36379021511710175, 1.2623032883896637], [-0.16910310730617623, 0.3631439575505753, 1.2691036666139888], [-0.1638440883521219, 0.3831408593633806, 1.2404052475851655], [-0.17067092536679368, 0.3803548338347067, 1.2408964990548133], [-0.17067092536679368, 0.3803548338347067, 1.2408964990548133], [-0.17294653770501756, 0.4044264626613462, 1.2315388724510081], [-0.17294653770501756, 0.4044264626613462, 1.2315388724510081], [-0.17235096722025173, 0.40169857121649716, 1.224546809430924], [-0.16789502279593474, 0.4368924567706128, 1.236433858829858], [-0.16789502279593474, 0.4368924567706128, 1.236433858829858], [-0.16789502279593474, 0.4368924567706128, 1.236433858829858], [-0.16719017622805205, 0.5141370904782002, 1.2188803542752917], [-0.16758717901763867, 0.5169416889917281, 1.227432168376347] ], "CenterPoint": [-0.12395, 0.34706, 1.27605] } }, "Part3Rst": { "TargetNum": 1, "Target0": { "ID": 16, "Point": [-0.12395, 0.32227, 1.2512599999999998], "HeartRate": random.randint(50, 65), "BreathRate": 4 } } } print("sensor_data:",sensor_data) return { "timestamp": datetime.now().strftime("%Y%m%d_%H_%M_%S"), "robot_id": self.robot_id_, "result": sensor_data, } """生成模拟图片""" def generate_fake_image(self,width=640, height=480): """生成模拟图片""" img = Image.new("RGB", (width, height), color=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))) draw = ImageDraw.Draw(img) # 添加一些随机形状 for _ in range(random.randint(3, 10)): x1 = random.randint(0, width) y1 = random.randint(0, height) x2 = random.randint(x1, width) y2 = random.randint(y1, height) draw.rectangle([x1, y1, x2, y2], fill=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))) # 将 PIL 图像转换为 NumPy 数组 numpy_image = np.array(img) return numpy_image if __name__ == "__main__": base_path = "./sftp-data_1" # TF卡挂载点 robot_id = "d80801000058" #json写入管理(完成) test = save_data_manager(base_path,robot_id) #数据插入管理 #数据库同步管理(完成) #生成人脸数据 # face_data = test.generate_fake_face_data() # test.save_data_to_tfcard_json(face_data,dir_name="face",json_type="face") # #TODO 插入数据库 # #人体人脸数据 # pose_data = test.generate_fake_pose_data() # test.save_data_to_tfcard_json(pose_data,dir_name="pose",json_type="pose") # #TODO 插入数据库 #传感器数据 sensor_data = test.generate_fake_sensor_001_data() test.save_data_to_tfcard_json(sensor_data,dir_name="sensor_001",json_type="sensor_001") sensor_data = test.generate_fake_sensor_002_data() test.save_data_to_tfcard_json(sensor_data,dir_name="sensor_002",json_type="sensor_002") #TODO 插入数据 # frame_data = test.generate_fake_image() # test.save_data_to_tfcard_jpg(frame_data,dir_name="face_jpg",jpg_type="jpg") #TODO 插入数据库