diff --git a/edge_hypertension_system_beta.py b/edge_hypertension_system_beta.py new file mode 100644 index 0000000..8e833cb --- /dev/null +++ b/edge_hypertension_system_beta.py @@ -0,0 +1,824 @@ +# 整合MQTT数据接收与高血压风险评估系统 +from kk_tools.kk_mysql_save_api import save_oxygen_mysql_data,save_ceramic_mysql_data,save_radar_mysql_data,save_oxygen_stroke_mysql_data +from kk_tools.kk_tfcard_save_api import save_data_manager +from kk_tools.kk_sftp_save_api import sftp_upload_one_file +import os +import paho.mqtt.client as mqtt +import json +import time +from datetime import datetime +import threading +import numpy as np +from scipy import signal +import warnings +import logging +import sys +import traceback +import requests +from collections import deque + +# ==================== 基础配置 ==================== +# MQTT配置 +MQTT_BROKER = "60.204.151.68" # MQTT 代理服务器 +MQTT_PORT = 1883 +MQTT_SUBSCRIBE_TOPIC = "/Radar60SP/E43819258520/sys/property/post" # 数据接收主题 +MQTT_PUBLISH_TOPIC = "HypertensionRisk/mm" # 结果发布主题 + +# 设备配置 +SENSER_ID = "H11223344S" +BASE_PATH = "./temp" # TF卡挂载点 +ROBOT_ID = "d100000001" + +# 数据整合配置 +INTEGRATION_INTERVAL = 10 # 10秒整合一次 + +# 禁用警告 +warnings.filterwarnings("ignore") + +# ==================== 日志配置(来自第二段) ==================== +# 设置基础路径 +_container_path = "/config/workspace/Hypertension_Risk_Assessment" +script_dir = os.path.abspath(os.path.dirname(__file__)) + +def _find_cloud_dir(start_dir): + cur = start_dir + while True: + candidate = os.path.join(cur, 'cloud_hypertension_system.py') + if os.path.exists(candidate): + return cur + parent = os.path.dirname(cur) + if parent == cur: + return None + cur = parent + +cloud_dir = _find_cloud_dir(script_dir) +if cloud_dir: + BASE_PATH_CLOUD = cloud_dir +elif os.getenv('USE_CONTAINER_PATH', '0') == '1' and os.path.exists(_container_path) and os.name != 'nt': + BASE_PATH_CLOUD = _container_path +else: + BASE_PATH_CLOUD = script_dir + +EDGE_PATH = os.path.join(BASE_PATH_CLOUD, "artifacts", "edge") +LOG_PATH = os.path.join(BASE_PATH_CLOUD, "logs") + +# 创建目录 +os.makedirs(EDGE_PATH, exist_ok=True) +os.makedirs(LOG_PATH, exist_ok=True) + +# 日志文件路径 +SYSTEM_STATUS_LOG = os.path.join(EDGE_PATH, "edge_system_status.log") + +# 配置控制台日志 +console_logger = logging.getLogger("HypertensionSystem") +console_logger.setLevel(logging.INFO) +console_handler = logging.StreamHandler(sys.stdout) +console_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') +console_handler.setFormatter(console_formatter) +console_logger.addHandler(console_handler) +console_logger.propagate = False + +# ==================== 高血压风险评估核心组件(来自第二段) ==================== +class EdgeConfig: + """边缘端配置""" + def __init__(self): + # 版本标识 + self.version = "v1.0_edge_realtime" + self.device_id = "edge_rk3588_001" + + # BCG传感器配置 + self.bcg_config = { + 'sampling_rate': 250, + 'window_size': 30, + 'slide_step': 5, + 'hr_range': (40, 120), + 'rr_range': (8, 25), + 'quality_threshold': 0.5, + 'bed_exit_threshold': 0.3 + } + + # 雷达传感器配置 + self.radar_config = { + 'sampling_rate': 20, + 'presence_threshold': 0.6, + 'bathroom_zone': { + 'x_range': (0.5, 2.5), + 'y_range': (0.5, 2.0), + 'z_range': (0.3, 1.8) + }, + 'min_visit_duration': 30, + 'event_merge_gap': 60, + 'motion_threshold': 0.3 + } + + # 风险评估配置 + self.risk_config = { + 'rmssd_high_risk': 20, + 'rmssd_medium_risk': 30, + 'rmssd_low_risk': 40, + 'hr_high_risk': 85, + 'hr_medium_risk': 75, + 'bathroom_high_risk': 3, + 'bathroom_medium_risk': 2, + 'hr_surge_high_risk': 20, + 'hr_surge_medium_risk': 12, + 'weights': { + 'rmssd': 0.35, + 'sdnn': 0.15, + 'hr_level': 0.20, + 'hr_surge': 0.10, + 'bathroom_freq': 0.20 + } + } + + # 预警配置 + self.alert_config = { + 'high_risk_threshold': 0.65, + 'medium_risk_threshold': 0.40, + 'alert_cooldown': 300 + } + +class RingBuffer: + """高效环形缓冲区""" + def __init__(self, size): + self.size = size + self.buffer = np.zeros(size, dtype=np.float32) + self.index = 0 + self.is_filled = False + + def append(self, value): + self.buffer[self.index] = value + self.index = (self.index + 1) % self.size + if self.index == 0: + self.is_filled = True + + def extend(self, values): + for v in values: + self.append(v) + + def get_data(self): + if not self.is_filled: + return self.buffer[:self.index] + else: + return np.concatenate([ + self.buffer[self.index:], + self.buffer[:self.index] + ]) + +class BCGProcessor: + """BCG信号处理器""" + def __init__(self, config): + self.config = config + self.fs = config.bcg_config['sampling_rate'] + self.hr_range = config.bcg_config['hr_range'] + self.rr_range = config.bcg_config['rr_range'] + self.hr_filter = self._design_hr_filter() + self.rr_filter = self._design_rr_filter() + + def _design_hr_filter(self): + nyq = self.fs / 2 + low = 0.8 / nyq + high = 3.0 / nyq + b, a = signal.butter(4, [low, high], btype='band') + return b, a + + def _design_rr_filter(self): + nyq = self.fs / 2 + low = 0.1 / nyq + high = 0.6 / nyq + b, a = signal.butter(4, [low, high], btype='band') + return b, a + + def assess_signal_quality(self, bcg_signal): + try: + signal_power = np.mean(bcg_signal ** 2) + noise_estimate = np.var(np.diff(bcg_signal)) + snr = signal_power / (noise_estimate + 1e-6) + snr_score = np.clip(snr / 10.0, 0, 1) + + freqs, psd = signal.welch(bcg_signal, fs=self.fs, nperseg=min(512, len(bcg_signal))) + hr_band_mask = (freqs >= 0.8) & (freqs <= 3.0) + hr_band_power = np.sum(psd[hr_band_mask]) + total_power = np.sum(psd) + spectrum_score = hr_band_power / (total_power + 1e-6) + + autocorr = np.correlate(bcg_signal - np.mean(bcg_signal), + bcg_signal - np.mean(bcg_signal), + mode='same') + autocorr = autocorr[len(autocorr)//2:] + peak_pos = np.argmax(autocorr[int(0.3*self.fs):int(1.5*self.fs)]) + int(0.3*self.fs) + periodicity_score = autocorr[peak_pos] / (autocorr[0] + 1e-6) + periodicity_score = np.clip(periodicity_score, 0, 1) + + quality = (0.40 * snr_score + 0.30 * spectrum_score + 0.30 * periodicity_score) + return float(np.clip(quality, 0, 1)) + except Exception as e: + console_logger.error(f"信号质量评估失败: {e}") + return 0.0 + + def extract_heart_rate(self, bcg_signal): + try: + filtered = signal.filtfilt(self.hr_filter[0], self.hr_filter[1], bcg_signal) + peaks, properties = signal.find_peaks( + filtered, + distance=int(0.4 * self.fs), + prominence=0.3 * np.std(filtered) + ) + + if len(peaks) < 5: + return None, 0.0, None + + rr_intervals = np.diff(peaks) / self.fs * 1000 + median_rr = np.median(rr_intervals) + mad = np.median(np.abs(rr_intervals - median_rr)) + valid_mask = np.abs(rr_intervals - median_rr) < 3 * mad + + if np.sum(valid_mask) < 5: + return None, 0.0, None + + clean_rr = rr_intervals[valid_mask] + mean_rr_ms = np.mean(clean_rr) + heart_rate = 60000.0 / mean_rr_ms + + if not (self.hr_range[0] <= heart_rate <= self.hr_range[1]): + return None, 0.0, None + + cv = np.std(clean_rr) / np.mean(clean_rr) + confidence = np.clip(1.0 - cv, 0, 1) + + return float(heart_rate), float(confidence), clean_rr + except Exception as e: + console_logger.error(f"心率提取失败: {e}") + return None, 0.0, None + + def calculate_hrv_features(self, rr_intervals): + try: + if len(rr_intervals) < 10: + return None + + sdnn = np.std(rr_intervals) + diff_rr = np.diff(rr_intervals) + rmssd = np.sqrt(np.mean(diff_rr ** 2)) + nn50 = np.sum(np.abs(diff_rr) > 50) + pnn50 = nn50 / len(diff_rr) if len(diff_rr) > 0 else 0 + + return { + 'sdnn': float(sdnn), + 'rmssd': float(rmssd), + 'pnn50': float(pnn50) + } + except Exception as e: + console_logger.error(f"HRV特征计算失败: {e}") + return None + +class RadarProcessor: + """雷达信号处理器""" + def __init__(self, config): + self.config = config + self.bathroom_zone = config.radar_config['bathroom_zone'] + self.presence_history = deque(maxlen=3) + self.current_visit_start = None + self.bathroom_visits = [] + + def detect_presence_in_bathroom(self, point_cloud, timestamp): + try: + if len(point_cloud) == 0: + self.presence_history.append(False) + return False, 0.0, None + + dynamic_points = [p for p in point_cloud if abs(p[3]) > 0.05] + bathroom_points = [] + + for p in dynamic_points: + x, y, z = p[0], p[1], p[2] + if (self.bathroom_zone['x_range'][0] <= x <= self.bathroom_zone['x_range'][1] and + self.bathroom_zone['y_range'][0] <= y <= self.bathroom_zone['y_range'][1] and + self.bathroom_zone['z_range'][0] <= z <= self.bathroom_zone['z_range'][1]): + bathroom_points.append(p) + + point_density = len(bathroom_points) / (len(point_cloud) + 1e-6) + is_present = point_density > self.config.radar_config['presence_threshold'] + self.presence_history.append(is_present) + stable_presence = sum(self.presence_history) >= 2 + confidence = point_density if stable_presence else 0.0 + + # 检测起夜事件 + visit_event = self._detect_bathroom_visit(timestamp, stable_presence) + return stable_presence, float(confidence), visit_event + except Exception as e: + console_logger.error(f"存在检测失败: {e}") + return False, 0.0, None + + def _detect_bathroom_visit(self, timestamp, is_present): + min_duration = self.config.radar_config['min_visit_duration'] + + if is_present and self.current_visit_start is None: + self.current_visit_start = timestamp + return None + + elif not is_present and self.current_visit_start is not None: + duration = timestamp - self.current_visit_start + if duration >= min_duration: + visit_info = { + 'start': self.current_visit_start, + 'end': timestamp, + 'duration': duration, + 'datetime': datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') + } + self.bathroom_visits.append(visit_info) + self.current_visit_start = None + return visit_info + else: + self.current_visit_start = None + return None + + return None + +class RiskAssessmentEngine: + """风险评估引擎""" + def __init__(self, config): + self.config = config + self.risk_cfg = config.risk_config + + def calculate_risk_score(self, hrv_features, hr, bathroom_visits): + if not hrv_features: + return 0.0, "UNKNOWN", {} + + weights = self.risk_cfg['weights'] + rmssd = hrv_features.get('rmssd', 0) + sdnn = hrv_features.get('sdnn', 0) + bathroom_freq = len(bathroom_visits) + + # RMSSD风险评分 + if rmssd < self.risk_cfg['rmssd_high_risk']: + rmssd_risk = 1.0 + elif rmssd < self.risk_cfg['rmssd_medium_risk']: + rmssd_risk = 0.6 + elif rmssd < self.risk_cfg['rmssd_low_risk']: + rmssd_risk = 0.4 + else: + rmssd_risk = 0.2 + + # SDNN风险评分 + if sdnn < 30: + sdnn_risk = 1.0 + elif sdnn < 45: + sdnn_risk = 0.6 + else: + sdnn_risk = 0.2 + + # 心率风险评分 + if hr > self.risk_cfg['hr_high_risk']: + hr_risk = 1.0 + elif hr > self.risk_cfg['hr_medium_risk']: + hr_risk = 0.6 + else: + hr_risk = 0.2 + + # 起夜频率风险评分 + if bathroom_freq >= self.risk_cfg['bathroom_high_risk']: + bathroom_risk = 1.0 + elif bathroom_freq >= self.risk_cfg['bathroom_medium_risk']: + bathroom_risk = 0.6 + else: + bathroom_risk = 0.2 + + # 总风险评分 + total_risk = ( + weights['rmssd'] * rmssd_risk + + weights['sdnn'] * sdnn_risk + + weights['hr_level'] * hr_risk + + weights['bathroom_freq'] * bathroom_risk + ) + + # 风险等级 + if total_risk >= self.config.alert_config['high_risk_threshold']: + risk_level = 'HIGH' + elif total_risk >= self.config.alert_config['medium_risk_threshold']: + risk_level = 'MEDIUM' + else: + risk_level = 'LOW' + + breakdown = { + 'rmssd_risk': rmssd_risk, + 'sdnn_risk': sdnn_risk, + 'hr_risk': hr_risk, + 'bathroom_risk': bathroom_risk + } + + return float(total_risk), risk_level, breakdown + +# ==================== 整合后的MQTT数据处理器 ==================== +class MQTTHypertensionHandler: + def __init__(self, base_path, robot_id): + self.base_path = base_path + self.robot_id = robot_id + self.tf_card_manager = save_data_manager(base_path, robot_id) + + # 初始化数据缓冲区 + self.data_buffer = {} # 存储去重后的数据 + self.buffer_lock = threading.Lock() # 线程锁 + self.last_integration_time = time.time() # 上次整合时间 + + # 统计信息 + self.total_messages = 0 + self.processed_messages = 0 + self.deduplicated_messages = 0 + self.integration_count = 0 + + # 初始化高血压评估组件 + self.edge_config = EdgeConfig() + self.bcg_processor = BCGProcessor(self.edge_config) + self.radar_processor = RadarProcessor(self.edge_config) + self.risk_engine = RiskAssessmentEngine(self.edge_config) + self.bcg_buffer = RingBuffer(self.edge_config.bcg_config['window_size'] * + self.edge_config.bcg_config['sampling_rate']) + + # 设置MQTT客户端 + self.mqtt_client = mqtt.Client() + self.mqtt_client.on_message = self.on_mqtt_message + self.mqtt_client.on_connect = self.on_mqtt_connect + + # MQTT发布客户端(用于发送处理结果) + self.mqtt_publish_client = mqtt.Client() + + # 连接MQTT服务器 + self.mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60) + self.mqtt_publish_client.connect(MQTT_BROKER, MQTT_PORT, 60) + + def on_mqtt_connect(self, client, userdata, flags, rc): + print(f"Connected to MQTT broker with result code {rc}") + client.subscribe(MQTT_SUBSCRIBE_TOPIC) + print(f"Subscribed to topics: {MQTT_SUBSCRIBE_TOPIC}") + print(f"数据整合模式: 每 {INTEGRATION_INTERVAL} 秒整合输出一次") + print(f"结果发布主题: {MQTT_PUBLISH_TOPIC}") + + def extract_data_key(self, data): + """从数据中提取唯一标识""" + try: + if isinstance(data, dict): + if "params" in data and isinstance(data["params"], dict): + params = data["params"] + param_keys = sorted(list(params.keys())) + if param_keys: + return tuple(param_keys) + + version = data.get("version", "") + method = data.get("method", "") + if version and method: + return f"{version}|{method}" + + return str(data) + except Exception as e: + print(f"提取数据key时出错: {e}") + return "unknown" + + def process_sensor_data(self, data): + """处理传感器数据并进行风险评估""" + # 记录处理开始时间,用于计算处理延迟 + process_start_time = time.time() + + risk_result = { + 'type': 'UNKNOWN', + 'timestamp': datetime.now().isoformat(), + # 新增:生理指标独立字段 + 'heart_rate': None, + 'rmssd': None, + 'sdnn': None, + # 新增:风险评分与等级独立字段 + 'risk_score': 0.0, + 'risk_level': 'UNKNOWN', + # 新增:系统性能字段 + 'processing_delay_ms': 0.0, + 'features': {}, + 'error': None + } + + try: + # 识别数据类型(BCG/雷达) + params = data.get('params', {}) + + # 处理BCG数据 + if 'bcg_signal' in params: + risk_result['type'] = 'BCG' + bcg_signal = np.array(params['bcg_signal'], dtype=np.float32) + + # 信号质量评估 + quality = self.bcg_processor.assess_signal_quality(bcg_signal) + if quality < self.edge_config.bcg_config['bed_exit_threshold']: + risk_result['error'] = "低质量BCG信号(可能离床)" + # 计算处理延迟 + risk_result['processing_delay_ms'] = round((time.time() - process_start_time) * 1000, 2) + return risk_result + + # 提取心率和HRV特征 + hr, hr_conf, rr_intervals = self.bcg_processor.extract_heart_rate(bcg_signal) + if hr is None: + risk_result['error'] = "无法提取有效心率" + # 计算处理延迟 + risk_result['processing_delay_ms'] = round((time.time() - process_start_time) * 1000, 2) + return risk_result + + hrv_features = self.bcg_processor.calculate_hrv_features(rr_intervals) + if not hrv_features: + risk_result['error'] = "无法计算HRV特征" + # 计算处理延迟 + risk_result['processing_delay_ms'] = round((time.time() - process_start_time) * 1000, 2) + return risk_result + + # 计算风险评分 + bathroom_visits = self.radar_processor.bathroom_visits + risk_score, risk_level, breakdown = self.risk_engine.calculate_risk_score( + hrv_features, hr, bathroom_visits + ) + + # 赋值独立字段:生理指标 + risk_result['heart_rate'] = round(hr, 1) + risk_result['rmssd'] = round(hrv_features['rmssd'], 2) + risk_result['sdnn'] = round(hrv_features['sdnn'], 2) + # 赋值独立字段:风险评分与等级 + risk_result['risk_score'] = round(risk_score, 3) + risk_result['risk_level'] = risk_level + + risk_result['features'] = { + 'hr': hr, + 'hr_confidence': hr_conf, + 'hrv': hrv_features, + 'signal_quality': quality, + 'bathroom_visits': len(bathroom_visits), + 'risk_breakdown': breakdown + } + + # 处理雷达数据 + elif 'radar_point_cloud' in params: + risk_result['type'] = 'RADAR' + point_cloud = params['radar_point_cloud'] + timestamp = params.get('timestamp', time.time()) + + # 检测浴室存在和起夜事件 + is_present, confidence, visit_event = self.radar_processor.detect_presence_in_bathroom( + point_cloud, timestamp + ) + + risk_result['features'] = { + 'in_bathroom': is_present, + 'confidence': confidence, + 'bathroom_visits': len(self.radar_processor.bathroom_visits), + 'latest_visit': visit_event + } + + # 如果有BCG数据,更新风险评分 + if self.bcg_buffer.is_filled: + bcg_signal = self.bcg_buffer.get_data() + hr, _, rr_intervals = self.bcg_processor.extract_heart_rate(bcg_signal) + if hr and rr_intervals is not None: + hrv_features = self.bcg_processor.calculate_hrv_features(rr_intervals) + if hrv_features: + risk_score, risk_level, _ = self.risk_engine.calculate_risk_score( + hrv_features, hr, self.radar_processor.bathroom_visits + ) + # 赋值独立字段 + risk_result['heart_rate'] = round(hr, 1) + risk_result['rmssd'] = round(hrv_features['rmssd'], 2) + risk_result['sdnn'] = round(hrv_features['sdnn'], 2) + risk_result['risk_score'] = round(risk_score, 3) + risk_result['risk_level'] = risk_level + + # 计算处理延迟(毫秒) + risk_result['processing_delay_ms'] = round((time.time() - process_start_time) * 1000, 2) + + except Exception as e: + risk_result['error'] = f"处理失败: {str(e)}" + # 确保处理延迟字段有值 + risk_result['processing_delay_ms'] = round((time.time() - process_start_time) * 1000, 2) + console_logger.error(f"传感器数据处理失败: {e}") + + return risk_result + + def on_mqtt_message(self, client, userdata, msg): + try: + # 解析MQTT消息 + data = json.loads(msg.payload.decode()) + self.total_messages += 1 + + # 处理数据并进行风险评估 + risk_result = self.process_sensor_data(data) + + # 提取数据的唯一标识 + data_key = self.extract_data_key(data) + + # 将原始数据+风险评估结果存入缓冲区 + with self.buffer_lock: + if data_key in self.data_buffer: + self.data_buffer[data_key] = { + "data": data, + "risk_result": risk_result, + "timestamp": datetime.now().isoformat(), + "count": self.data_buffer[data_key].get("count", 1) + 1, + "first_seen": self.data_buffer[data_key].get("first_seen", datetime.now().isoformat()) + } + self.deduplicated_messages += 1 + else: + self.data_buffer[data_key] = { + "data": data, + "risk_result": risk_result, + "timestamp": datetime.now().isoformat(), + "count": 1, + "first_seen": datetime.now().isoformat() + } + self.processed_messages += 1 + + # 每10条消息打印一次统计 + if self.total_messages % 10 == 0: + print(f"已接收 {self.total_messages} 条消息, 去重后 {len(self.data_buffer)} 种数据") + + except Exception as e: + print(f"Error processing MQTT message: {e}") + + def integrate_and_save_data(self): + """整合数据并发布到MQTT""" + with self.buffer_lock: + if not self.data_buffer: + print("缓冲区为空,跳过本次整合") + return + + self.integration_count += 1 + + # 准备整合后的数据 + unique_data_items = {} + for data_key, buffer_entry in self.data_buffer.items(): + key_str = "|".join(data_key) if isinstance(data_key, tuple) else str(data_key) + unique_data_items[key_str] = { + "data": buffer_entry["data"], + "risk_result": buffer_entry["risk_result"], + "first_seen": buffer_entry["first_seen"], + "last_seen": buffer_entry["timestamp"], + "message_count": buffer_entry["count"] + } + + # 整合数据 + integration_data = { + "integration_id": self.integration_count, + "timestamp": datetime.now().isoformat(), + "total_messages": self.total_messages, + "unique_data_count": len(self.data_buffer), + "sensor_id": SENSER_ID, + "integration_interval": INTEGRATION_INTERVAL, + "unique_data_items": unique_data_items, + "statistics": { + "total_messages_received": self.total_messages, + "duplicate_messages_removed": self.deduplicated_messages, + "unique_messages_kept": self.processed_messages + } + } + + # 打印整合信息 + self.print_integration_stats(integration_data) + + # 发布到MQTT服务器 + self.publish_to_mqtt(integration_data) + + # 保存到本地/数据库 + self.save_integrated_data(integration_data) + + # 清空缓冲区 + self.data_buffer.clear() + + def publish_to_mqtt(self, integration_data): + """发布处理结果到MQTT服务器""" + try: + # 将数据转换为JSON + payload = json.dumps(integration_data, ensure_ascii=False, default=str) + + # 发布消息 + result = self.mqtt_publish_client.publish( + MQTT_PUBLISH_TOPIC, + payload, + qos=1 + ) + + # 等待发布完成 + result.wait_for_publish() + + if result.is_published(): + print(f"整合数据 {self.integration_count} 已发布到MQTT主题: {MQTT_PUBLISH_TOPIC}") + else: + print(f"整合数据 {self.integration_count} MQTT发布失败") + + except Exception as e: + print(f"发布MQTT消息时出错: {e}") + + def save_integrated_data(self, integration_data): + """保存整合后的数据""" + try: + # 生成保存格式 + save_data = self.tf_card_manager.generate_fake_sensor_002_data(integration_data) + + # 保存到TF卡 + save_path = self.tf_card_manager.save_data_to_tfcard_json( + save_data, dir_name=SENSER_ID, json_type=SENSER_ID) + + # 上传到SFTP + directory = os.path.dirname(save_path) + file_name = os.path.basename(save_path) + cleaned_path = directory[len(self.base_path + "/"):] + down_url = sftp_upload_one_file(save_path, cleaned_path, file_name) + + print(f"整合数据 {self.integration_count} 已保存,SFTP路径: {down_url}") + + # 保存到MySQL + save_radar_mysql_data(SENSER_ID, save_data, down_url) + + except Exception as e: + print(f"保存整合数据时出错: {e}") + + def print_integration_stats(self, integration_data): + """打印整合统计信息""" + unique_count = integration_data["unique_data_count"] + + print("=" * 80) + print(f"第 {self.integration_count} 次数据整合完成 (10秒周期)") + print("=" * 80) + print(f"统计信息:") + print(f" - 10秒内接收消息总数: {self.total_messages} 条") + print(f" - 去重后保留数据: {unique_count} 种不同数据") + print(f" - 去重掉的消息: {self.deduplicated_messages} 条") + + # 打印风险评估结果 + high_risk_count = 0 + medium_risk_count = 0 + + for item in integration_data["unique_data_items"].values(): + risk_level = item["risk_result"]["risk_level"] + if risk_level == "HIGH": + high_risk_count += 1 + elif risk_level == "MEDIUM": + medium_risk_count += 1 + + print(f" - 高风险数据: {high_risk_count} 条") + print(f" - 中风险数据: {medium_risk_count} 条") + print(f" - 低风险/未知: {unique_count - high_risk_count - medium_risk_count} 条") + + print(f"\n下次整合时间: {datetime.fromtimestamp(time.time() + INTEGRATION_INTERVAL).strftime('%H:%M:%S')}") + print("=" * 80) + + def check_and_process_integration(self): + """检查并执行数据整合""" + current_time = time.time() + if current_time - self.last_integration_time >= INTEGRATION_INTERVAL: + if self.data_buffer: + print(f"\n正在进行第 {self.integration_count + 1} 次数据整合...") + self.integrate_and_save_data() + else: + print(f"\n第 {self.integration_count + 1} 次整合周期结束,缓冲区无数据") + + self.last_integration_time = current_time + return True + return False + + def run(self): + """主循环""" + self.mqtt_client.loop_start() + self.mqtt_publish_client.loop_start() + + print(f"\nMQTT高血压风险评估系统启动") + print(f"数据整合间隔: {INTEGRATION_INTERVAL} 秒") + print(f"接收主题: {MQTT_SUBSCRIBE_TOPIC}") + print(f"发布主题: {MQTT_PUBLISH_TOPIC}") + print("按 Ctrl+C 停止程序") + print("-" * 80) + + try: + while True: + self.check_and_process_integration() + time.sleep(0.1) + + except KeyboardInterrupt: + print("\n收到停止信号,正在进行最后一次数据整合...") + + # 保存剩余数据 + with self.buffer_lock: + if self.data_buffer: + print(f"正在保存缓冲区中剩余的 {len(self.data_buffer)} 条数据...") + self.integrate_and_save_data() + else: + print("缓冲区为空,无需保存") + + # 停止MQTT客户端 + self.mqtt_client.loop_stop() + self.mqtt_publish_client.loop_stop() + + print("程序已停止") + print(f"\n最终统计:") + print(f" - 总共接收消息: {self.total_messages} 条") + print(f" - 执行整合次数: {self.integration_count} 次") + print("=" * 80) + +# ==================== 主程序 ==================== +if __name__ == "__main__": + # 安装依赖(如果需要) + # pip install numpy scipy requests paho-mqtt + + # 创建并启动整合后的系统 + handler = MQTTHypertensionHandler(BASE_PATH, ROBOT_ID) + handler.run() \ No newline at end of file