824 lines
31 KiB
Python
824 lines
31 KiB
Python
# 整合MQTT数据接收与高血压风险评估系统
|
||
from kk_tools.kk_mysql_save_api import save_oxygen_mysql_data,save_ceramic_mysql_data,save_radar_mysql_data,save_oxygen_stroke_mysql_data
|
||
from kk_tools.kk_tfcard_save_api import save_data_manager
|
||
from kk_tools.kk_sftp_save_api import sftp_upload_one_file
|
||
import os
|
||
import paho.mqtt.client as mqtt
|
||
import json
|
||
import time
|
||
from datetime import datetime
|
||
import threading
|
||
import numpy as np
|
||
from scipy import signal
|
||
import warnings
|
||
import logging
|
||
import sys
|
||
import traceback
|
||
import requests
|
||
from collections import deque
|
||
|
||
# ==================== 基础配置 ====================
|
||
# MQTT配置
|
||
MQTT_BROKER = "60.204.151.68" # MQTT 代理服务器
|
||
MQTT_PORT = 1883
|
||
MQTT_SUBSCRIBE_TOPIC = "/Radar60SP/E43819258520/sys/property/post" # 数据接收主题
|
||
MQTT_PUBLISH_TOPIC = "HypertensionRisk/mm" # 结果发布主题
|
||
|
||
# 设备配置
|
||
SENSER_ID = "H11223344S"
|
||
BASE_PATH = "./temp" # TF卡挂载点
|
||
ROBOT_ID = "d100000001"
|
||
|
||
# 数据整合配置
|
||
INTEGRATION_INTERVAL = 10 # 10秒整合一次
|
||
|
||
# 禁用警告
|
||
warnings.filterwarnings("ignore")
|
||
|
||
# ==================== 日志配置(来自第二段) ====================
|
||
# 设置基础路径
|
||
_container_path = "/config/workspace/Hypertension_Risk_Assessment"
|
||
script_dir = os.path.abspath(os.path.dirname(__file__))
|
||
|
||
def _find_cloud_dir(start_dir):
|
||
cur = start_dir
|
||
while True:
|
||
candidate = os.path.join(cur, 'cloud_hypertension_system.py')
|
||
if os.path.exists(candidate):
|
||
return cur
|
||
parent = os.path.dirname(cur)
|
||
if parent == cur:
|
||
return None
|
||
cur = parent
|
||
|
||
cloud_dir = _find_cloud_dir(script_dir)
|
||
if cloud_dir:
|
||
BASE_PATH_CLOUD = cloud_dir
|
||
elif os.getenv('USE_CONTAINER_PATH', '0') == '1' and os.path.exists(_container_path) and os.name != 'nt':
|
||
BASE_PATH_CLOUD = _container_path
|
||
else:
|
||
BASE_PATH_CLOUD = script_dir
|
||
|
||
EDGE_PATH = os.path.join(BASE_PATH_CLOUD, "artifacts", "edge")
|
||
LOG_PATH = os.path.join(BASE_PATH_CLOUD, "logs")
|
||
|
||
# 创建目录
|
||
os.makedirs(EDGE_PATH, exist_ok=True)
|
||
os.makedirs(LOG_PATH, exist_ok=True)
|
||
|
||
# 日志文件路径
|
||
SYSTEM_STATUS_LOG = os.path.join(EDGE_PATH, "edge_system_status.log")
|
||
|
||
# 配置控制台日志
|
||
console_logger = logging.getLogger("HypertensionSystem")
|
||
console_logger.setLevel(logging.INFO)
|
||
console_handler = logging.StreamHandler(sys.stdout)
|
||
console_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||
console_handler.setFormatter(console_formatter)
|
||
console_logger.addHandler(console_handler)
|
||
console_logger.propagate = False
|
||
|
||
# ==================== 高血压风险评估核心组件(来自第二段) ====================
|
||
class EdgeConfig:
|
||
"""边缘端配置"""
|
||
def __init__(self):
|
||
# 版本标识
|
||
self.version = "v1.0_edge_realtime"
|
||
self.device_id = "edge_rk3588_001"
|
||
|
||
# BCG传感器配置
|
||
self.bcg_config = {
|
||
'sampling_rate': 250,
|
||
'window_size': 30,
|
||
'slide_step': 5,
|
||
'hr_range': (40, 120),
|
||
'rr_range': (8, 25),
|
||
'quality_threshold': 0.5,
|
||
'bed_exit_threshold': 0.3
|
||
}
|
||
|
||
# 雷达传感器配置
|
||
self.radar_config = {
|
||
'sampling_rate': 20,
|
||
'presence_threshold': 0.6,
|
||
'bathroom_zone': {
|
||
'x_range': (0.5, 2.5),
|
||
'y_range': (0.5, 2.0),
|
||
'z_range': (0.3, 1.8)
|
||
},
|
||
'min_visit_duration': 30,
|
||
'event_merge_gap': 60,
|
||
'motion_threshold': 0.3
|
||
}
|
||
|
||
# 风险评估配置
|
||
self.risk_config = {
|
||
'rmssd_high_risk': 20,
|
||
'rmssd_medium_risk': 30,
|
||
'rmssd_low_risk': 40,
|
||
'hr_high_risk': 85,
|
||
'hr_medium_risk': 75,
|
||
'bathroom_high_risk': 3,
|
||
'bathroom_medium_risk': 2,
|
||
'hr_surge_high_risk': 20,
|
||
'hr_surge_medium_risk': 12,
|
||
'weights': {
|
||
'rmssd': 0.35,
|
||
'sdnn': 0.15,
|
||
'hr_level': 0.20,
|
||
'hr_surge': 0.10,
|
||
'bathroom_freq': 0.20
|
||
}
|
||
}
|
||
|
||
# 预警配置
|
||
self.alert_config = {
|
||
'high_risk_threshold': 0.65,
|
||
'medium_risk_threshold': 0.40,
|
||
'alert_cooldown': 300
|
||
}
|
||
|
||
class RingBuffer:
|
||
"""高效环形缓冲区"""
|
||
def __init__(self, size):
|
||
self.size = size
|
||
self.buffer = np.zeros(size, dtype=np.float32)
|
||
self.index = 0
|
||
self.is_filled = False
|
||
|
||
def append(self, value):
|
||
self.buffer[self.index] = value
|
||
self.index = (self.index + 1) % self.size
|
||
if self.index == 0:
|
||
self.is_filled = True
|
||
|
||
def extend(self, values):
|
||
for v in values:
|
||
self.append(v)
|
||
|
||
def get_data(self):
|
||
if not self.is_filled:
|
||
return self.buffer[:self.index]
|
||
else:
|
||
return np.concatenate([
|
||
self.buffer[self.index:],
|
||
self.buffer[:self.index]
|
||
])
|
||
|
||
class BCGProcessor:
|
||
"""BCG信号处理器"""
|
||
def __init__(self, config):
|
||
self.config = config
|
||
self.fs = config.bcg_config['sampling_rate']
|
||
self.hr_range = config.bcg_config['hr_range']
|
||
self.rr_range = config.bcg_config['rr_range']
|
||
self.hr_filter = self._design_hr_filter()
|
||
self.rr_filter = self._design_rr_filter()
|
||
|
||
def _design_hr_filter(self):
|
||
nyq = self.fs / 2
|
||
low = 0.8 / nyq
|
||
high = 3.0 / nyq
|
||
b, a = signal.butter(4, [low, high], btype='band')
|
||
return b, a
|
||
|
||
def _design_rr_filter(self):
|
||
nyq = self.fs / 2
|
||
low = 0.1 / nyq
|
||
high = 0.6 / nyq
|
||
b, a = signal.butter(4, [low, high], btype='band')
|
||
return b, a
|
||
|
||
def assess_signal_quality(self, bcg_signal):
|
||
try:
|
||
signal_power = np.mean(bcg_signal ** 2)
|
||
noise_estimate = np.var(np.diff(bcg_signal))
|
||
snr = signal_power / (noise_estimate + 1e-6)
|
||
snr_score = np.clip(snr / 10.0, 0, 1)
|
||
|
||
freqs, psd = signal.welch(bcg_signal, fs=self.fs, nperseg=min(512, len(bcg_signal)))
|
||
hr_band_mask = (freqs >= 0.8) & (freqs <= 3.0)
|
||
hr_band_power = np.sum(psd[hr_band_mask])
|
||
total_power = np.sum(psd)
|
||
spectrum_score = hr_band_power / (total_power + 1e-6)
|
||
|
||
autocorr = np.correlate(bcg_signal - np.mean(bcg_signal),
|
||
bcg_signal - np.mean(bcg_signal),
|
||
mode='same')
|
||
autocorr = autocorr[len(autocorr)//2:]
|
||
peak_pos = np.argmax(autocorr[int(0.3*self.fs):int(1.5*self.fs)]) + int(0.3*self.fs)
|
||
periodicity_score = autocorr[peak_pos] / (autocorr[0] + 1e-6)
|
||
periodicity_score = np.clip(periodicity_score, 0, 1)
|
||
|
||
quality = (0.40 * snr_score + 0.30 * spectrum_score + 0.30 * periodicity_score)
|
||
return float(np.clip(quality, 0, 1))
|
||
except Exception as e:
|
||
console_logger.error(f"信号质量评估失败: {e}")
|
||
return 0.0
|
||
|
||
def extract_heart_rate(self, bcg_signal):
|
||
try:
|
||
filtered = signal.filtfilt(self.hr_filter[0], self.hr_filter[1], bcg_signal)
|
||
peaks, properties = signal.find_peaks(
|
||
filtered,
|
||
distance=int(0.4 * self.fs),
|
||
prominence=0.3 * np.std(filtered)
|
||
)
|
||
|
||
if len(peaks) < 5:
|
||
return None, 0.0, None
|
||
|
||
rr_intervals = np.diff(peaks) / self.fs * 1000
|
||
median_rr = np.median(rr_intervals)
|
||
mad = np.median(np.abs(rr_intervals - median_rr))
|
||
valid_mask = np.abs(rr_intervals - median_rr) < 3 * mad
|
||
|
||
if np.sum(valid_mask) < 5:
|
||
return None, 0.0, None
|
||
|
||
clean_rr = rr_intervals[valid_mask]
|
||
mean_rr_ms = np.mean(clean_rr)
|
||
heart_rate = 60000.0 / mean_rr_ms
|
||
|
||
if not (self.hr_range[0] <= heart_rate <= self.hr_range[1]):
|
||
return None, 0.0, None
|
||
|
||
cv = np.std(clean_rr) / np.mean(clean_rr)
|
||
confidence = np.clip(1.0 - cv, 0, 1)
|
||
|
||
return float(heart_rate), float(confidence), clean_rr
|
||
except Exception as e:
|
||
console_logger.error(f"心率提取失败: {e}")
|
||
return None, 0.0, None
|
||
|
||
def calculate_hrv_features(self, rr_intervals):
|
||
try:
|
||
if len(rr_intervals) < 10:
|
||
return None
|
||
|
||
sdnn = np.std(rr_intervals)
|
||
diff_rr = np.diff(rr_intervals)
|
||
rmssd = np.sqrt(np.mean(diff_rr ** 2))
|
||
nn50 = np.sum(np.abs(diff_rr) > 50)
|
||
pnn50 = nn50 / len(diff_rr) if len(diff_rr) > 0 else 0
|
||
|
||
return {
|
||
'sdnn': float(sdnn),
|
||
'rmssd': float(rmssd),
|
||
'pnn50': float(pnn50)
|
||
}
|
||
except Exception as e:
|
||
console_logger.error(f"HRV特征计算失败: {e}")
|
||
return None
|
||
|
||
class RadarProcessor:
|
||
"""雷达信号处理器"""
|
||
def __init__(self, config):
|
||
self.config = config
|
||
self.bathroom_zone = config.radar_config['bathroom_zone']
|
||
self.presence_history = deque(maxlen=3)
|
||
self.current_visit_start = None
|
||
self.bathroom_visits = []
|
||
|
||
def detect_presence_in_bathroom(self, point_cloud, timestamp):
|
||
try:
|
||
if len(point_cloud) == 0:
|
||
self.presence_history.append(False)
|
||
return False, 0.0, None
|
||
|
||
dynamic_points = [p for p in point_cloud if abs(p[3]) > 0.05]
|
||
bathroom_points = []
|
||
|
||
for p in dynamic_points:
|
||
x, y, z = p[0], p[1], p[2]
|
||
if (self.bathroom_zone['x_range'][0] <= x <= self.bathroom_zone['x_range'][1] and
|
||
self.bathroom_zone['y_range'][0] <= y <= self.bathroom_zone['y_range'][1] and
|
||
self.bathroom_zone['z_range'][0] <= z <= self.bathroom_zone['z_range'][1]):
|
||
bathroom_points.append(p)
|
||
|
||
point_density = len(bathroom_points) / (len(point_cloud) + 1e-6)
|
||
is_present = point_density > self.config.radar_config['presence_threshold']
|
||
self.presence_history.append(is_present)
|
||
stable_presence = sum(self.presence_history) >= 2
|
||
confidence = point_density if stable_presence else 0.0
|
||
|
||
# 检测起夜事件
|
||
visit_event = self._detect_bathroom_visit(timestamp, stable_presence)
|
||
return stable_presence, float(confidence), visit_event
|
||
except Exception as e:
|
||
console_logger.error(f"存在检测失败: {e}")
|
||
return False, 0.0, None
|
||
|
||
def _detect_bathroom_visit(self, timestamp, is_present):
|
||
min_duration = self.config.radar_config['min_visit_duration']
|
||
|
||
if is_present and self.current_visit_start is None:
|
||
self.current_visit_start = timestamp
|
||
return None
|
||
|
||
elif not is_present and self.current_visit_start is not None:
|
||
duration = timestamp - self.current_visit_start
|
||
if duration >= min_duration:
|
||
visit_info = {
|
||
'start': self.current_visit_start,
|
||
'end': timestamp,
|
||
'duration': duration,
|
||
'datetime': datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
|
||
}
|
||
self.bathroom_visits.append(visit_info)
|
||
self.current_visit_start = None
|
||
return visit_info
|
||
else:
|
||
self.current_visit_start = None
|
||
return None
|
||
|
||
return None
|
||
|
||
class RiskAssessmentEngine:
|
||
"""风险评估引擎"""
|
||
def __init__(self, config):
|
||
self.config = config
|
||
self.risk_cfg = config.risk_config
|
||
|
||
def calculate_risk_score(self, hrv_features, hr, bathroom_visits):
|
||
if not hrv_features:
|
||
return 0.0, "UNKNOWN", {}
|
||
|
||
weights = self.risk_cfg['weights']
|
||
rmssd = hrv_features.get('rmssd', 0)
|
||
sdnn = hrv_features.get('sdnn', 0)
|
||
bathroom_freq = len(bathroom_visits)
|
||
|
||
# RMSSD风险评分
|
||
if rmssd < self.risk_cfg['rmssd_high_risk']:
|
||
rmssd_risk = 1.0
|
||
elif rmssd < self.risk_cfg['rmssd_medium_risk']:
|
||
rmssd_risk = 0.6
|
||
elif rmssd < self.risk_cfg['rmssd_low_risk']:
|
||
rmssd_risk = 0.4
|
||
else:
|
||
rmssd_risk = 0.2
|
||
|
||
# SDNN风险评分
|
||
if sdnn < 30:
|
||
sdnn_risk = 1.0
|
||
elif sdnn < 45:
|
||
sdnn_risk = 0.6
|
||
else:
|
||
sdnn_risk = 0.2
|
||
|
||
# 心率风险评分
|
||
if hr > self.risk_cfg['hr_high_risk']:
|
||
hr_risk = 1.0
|
||
elif hr > self.risk_cfg['hr_medium_risk']:
|
||
hr_risk = 0.6
|
||
else:
|
||
hr_risk = 0.2
|
||
|
||
# 起夜频率风险评分
|
||
if bathroom_freq >= self.risk_cfg['bathroom_high_risk']:
|
||
bathroom_risk = 1.0
|
||
elif bathroom_freq >= self.risk_cfg['bathroom_medium_risk']:
|
||
bathroom_risk = 0.6
|
||
else:
|
||
bathroom_risk = 0.2
|
||
|
||
# 总风险评分
|
||
total_risk = (
|
||
weights['rmssd'] * rmssd_risk +
|
||
weights['sdnn'] * sdnn_risk +
|
||
weights['hr_level'] * hr_risk +
|
||
weights['bathroom_freq'] * bathroom_risk
|
||
)
|
||
|
||
# 风险等级
|
||
if total_risk >= self.config.alert_config['high_risk_threshold']:
|
||
risk_level = 'HIGH'
|
||
elif total_risk >= self.config.alert_config['medium_risk_threshold']:
|
||
risk_level = 'MEDIUM'
|
||
else:
|
||
risk_level = 'LOW'
|
||
|
||
breakdown = {
|
||
'rmssd_risk': rmssd_risk,
|
||
'sdnn_risk': sdnn_risk,
|
||
'hr_risk': hr_risk,
|
||
'bathroom_risk': bathroom_risk
|
||
}
|
||
|
||
return float(total_risk), risk_level, breakdown
|
||
|
||
# ==================== 整合后的MQTT数据处理器 ====================
|
||
class MQTTHypertensionHandler:
|
||
def __init__(self, base_path, robot_id):
|
||
self.base_path = base_path
|
||
self.robot_id = robot_id
|
||
self.tf_card_manager = save_data_manager(base_path, robot_id)
|
||
|
||
# 初始化数据缓冲区
|
||
self.data_buffer = {} # 存储去重后的数据
|
||
self.buffer_lock = threading.Lock() # 线程锁
|
||
self.last_integration_time = time.time() # 上次整合时间
|
||
|
||
# 统计信息
|
||
self.total_messages = 0
|
||
self.processed_messages = 0
|
||
self.deduplicated_messages = 0
|
||
self.integration_count = 0
|
||
|
||
# 初始化高血压评估组件
|
||
self.edge_config = EdgeConfig()
|
||
self.bcg_processor = BCGProcessor(self.edge_config)
|
||
self.radar_processor = RadarProcessor(self.edge_config)
|
||
self.risk_engine = RiskAssessmentEngine(self.edge_config)
|
||
self.bcg_buffer = RingBuffer(self.edge_config.bcg_config['window_size'] *
|
||
self.edge_config.bcg_config['sampling_rate'])
|
||
|
||
# 设置MQTT客户端
|
||
self.mqtt_client = mqtt.Client()
|
||
self.mqtt_client.on_message = self.on_mqtt_message
|
||
self.mqtt_client.on_connect = self.on_mqtt_connect
|
||
|
||
# MQTT发布客户端(用于发送处理结果)
|
||
self.mqtt_publish_client = mqtt.Client()
|
||
|
||
# 连接MQTT服务器
|
||
self.mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
|
||
self.mqtt_publish_client.connect(MQTT_BROKER, MQTT_PORT, 60)
|
||
|
||
def on_mqtt_connect(self, client, userdata, flags, rc):
|
||
print(f"Connected to MQTT broker with result code {rc}")
|
||
client.subscribe(MQTT_SUBSCRIBE_TOPIC)
|
||
print(f"Subscribed to topics: {MQTT_SUBSCRIBE_TOPIC}")
|
||
print(f"数据整合模式: 每 {INTEGRATION_INTERVAL} 秒整合输出一次")
|
||
print(f"结果发布主题: {MQTT_PUBLISH_TOPIC}")
|
||
|
||
def extract_data_key(self, data):
|
||
"""从数据中提取唯一标识"""
|
||
try:
|
||
if isinstance(data, dict):
|
||
if "params" in data and isinstance(data["params"], dict):
|
||
params = data["params"]
|
||
param_keys = sorted(list(params.keys()))
|
||
if param_keys:
|
||
return tuple(param_keys)
|
||
|
||
version = data.get("version", "")
|
||
method = data.get("method", "")
|
||
if version and method:
|
||
return f"{version}|{method}"
|
||
|
||
return str(data)
|
||
except Exception as e:
|
||
print(f"提取数据key时出错: {e}")
|
||
return "unknown"
|
||
|
||
def process_sensor_data(self, data):
|
||
"""处理传感器数据并进行风险评估"""
|
||
# 记录处理开始时间,用于计算处理延迟
|
||
process_start_time = time.time()
|
||
|
||
risk_result = {
|
||
'type': 'UNKNOWN',
|
||
'timestamp': datetime.now().isoformat(),
|
||
# 新增:生理指标独立字段
|
||
'heart_rate': None,
|
||
'rmssd': None,
|
||
'sdnn': None,
|
||
# 新增:风险评分与等级独立字段
|
||
'risk_score': 0.0,
|
||
'risk_level': 'UNKNOWN',
|
||
# 新增:系统性能字段
|
||
'processing_delay_ms': 0.0,
|
||
'features': {},
|
||
'error': None
|
||
}
|
||
|
||
try:
|
||
# 识别数据类型(BCG/雷达)
|
||
params = data.get('params', {})
|
||
|
||
# 处理BCG数据
|
||
if 'bcg_signal' in params:
|
||
risk_result['type'] = 'BCG'
|
||
bcg_signal = np.array(params['bcg_signal'], dtype=np.float32)
|
||
|
||
# 信号质量评估
|
||
quality = self.bcg_processor.assess_signal_quality(bcg_signal)
|
||
if quality < self.edge_config.bcg_config['bed_exit_threshold']:
|
||
risk_result['error'] = "低质量BCG信号(可能离床)"
|
||
# 计算处理延迟
|
||
risk_result['processing_delay_ms'] = round((time.time() - process_start_time) * 1000, 2)
|
||
return risk_result
|
||
|
||
# 提取心率和HRV特征
|
||
hr, hr_conf, rr_intervals = self.bcg_processor.extract_heart_rate(bcg_signal)
|
||
if hr is None:
|
||
risk_result['error'] = "无法提取有效心率"
|
||
# 计算处理延迟
|
||
risk_result['processing_delay_ms'] = round((time.time() - process_start_time) * 1000, 2)
|
||
return risk_result
|
||
|
||
hrv_features = self.bcg_processor.calculate_hrv_features(rr_intervals)
|
||
if not hrv_features:
|
||
risk_result['error'] = "无法计算HRV特征"
|
||
# 计算处理延迟
|
||
risk_result['processing_delay_ms'] = round((time.time() - process_start_time) * 1000, 2)
|
||
return risk_result
|
||
|
||
# 计算风险评分
|
||
bathroom_visits = self.radar_processor.bathroom_visits
|
||
risk_score, risk_level, breakdown = self.risk_engine.calculate_risk_score(
|
||
hrv_features, hr, bathroom_visits
|
||
)
|
||
|
||
# 赋值独立字段:生理指标
|
||
risk_result['heart_rate'] = round(hr, 1)
|
||
risk_result['rmssd'] = round(hrv_features['rmssd'], 2)
|
||
risk_result['sdnn'] = round(hrv_features['sdnn'], 2)
|
||
# 赋值独立字段:风险评分与等级
|
||
risk_result['risk_score'] = round(risk_score, 3)
|
||
risk_result['risk_level'] = risk_level
|
||
|
||
risk_result['features'] = {
|
||
'hr': hr,
|
||
'hr_confidence': hr_conf,
|
||
'hrv': hrv_features,
|
||
'signal_quality': quality,
|
||
'bathroom_visits': len(bathroom_visits),
|
||
'risk_breakdown': breakdown
|
||
}
|
||
|
||
# 处理雷达数据
|
||
elif 'radar_point_cloud' in params:
|
||
risk_result['type'] = 'RADAR'
|
||
point_cloud = params['radar_point_cloud']
|
||
timestamp = params.get('timestamp', time.time())
|
||
|
||
# 检测浴室存在和起夜事件
|
||
is_present, confidence, visit_event = self.radar_processor.detect_presence_in_bathroom(
|
||
point_cloud, timestamp
|
||
)
|
||
|
||
risk_result['features'] = {
|
||
'in_bathroom': is_present,
|
||
'confidence': confidence,
|
||
'bathroom_visits': len(self.radar_processor.bathroom_visits),
|
||
'latest_visit': visit_event
|
||
}
|
||
|
||
# 如果有BCG数据,更新风险评分
|
||
if self.bcg_buffer.is_filled:
|
||
bcg_signal = self.bcg_buffer.get_data()
|
||
hr, _, rr_intervals = self.bcg_processor.extract_heart_rate(bcg_signal)
|
||
if hr and rr_intervals is not None:
|
||
hrv_features = self.bcg_processor.calculate_hrv_features(rr_intervals)
|
||
if hrv_features:
|
||
risk_score, risk_level, _ = self.risk_engine.calculate_risk_score(
|
||
hrv_features, hr, self.radar_processor.bathroom_visits
|
||
)
|
||
# 赋值独立字段
|
||
risk_result['heart_rate'] = round(hr, 1)
|
||
risk_result['rmssd'] = round(hrv_features['rmssd'], 2)
|
||
risk_result['sdnn'] = round(hrv_features['sdnn'], 2)
|
||
risk_result['risk_score'] = round(risk_score, 3)
|
||
risk_result['risk_level'] = risk_level
|
||
|
||
# 计算处理延迟(毫秒)
|
||
risk_result['processing_delay_ms'] = round((time.time() - process_start_time) * 1000, 2)
|
||
|
||
except Exception as e:
|
||
risk_result['error'] = f"处理失败: {str(e)}"
|
||
# 确保处理延迟字段有值
|
||
risk_result['processing_delay_ms'] = round((time.time() - process_start_time) * 1000, 2)
|
||
console_logger.error(f"传感器数据处理失败: {e}")
|
||
|
||
return risk_result
|
||
|
||
def on_mqtt_message(self, client, userdata, msg):
|
||
try:
|
||
# 解析MQTT消息
|
||
data = json.loads(msg.payload.decode())
|
||
self.total_messages += 1
|
||
|
||
# 处理数据并进行风险评估
|
||
risk_result = self.process_sensor_data(data)
|
||
|
||
# 提取数据的唯一标识
|
||
data_key = self.extract_data_key(data)
|
||
|
||
# 将原始数据+风险评估结果存入缓冲区
|
||
with self.buffer_lock:
|
||
if data_key in self.data_buffer:
|
||
self.data_buffer[data_key] = {
|
||
"data": data,
|
||
"risk_result": risk_result,
|
||
"timestamp": datetime.now().isoformat(),
|
||
"count": self.data_buffer[data_key].get("count", 1) + 1,
|
||
"first_seen": self.data_buffer[data_key].get("first_seen", datetime.now().isoformat())
|
||
}
|
||
self.deduplicated_messages += 1
|
||
else:
|
||
self.data_buffer[data_key] = {
|
||
"data": data,
|
||
"risk_result": risk_result,
|
||
"timestamp": datetime.now().isoformat(),
|
||
"count": 1,
|
||
"first_seen": datetime.now().isoformat()
|
||
}
|
||
self.processed_messages += 1
|
||
|
||
# 每10条消息打印一次统计
|
||
if self.total_messages % 10 == 0:
|
||
print(f"已接收 {self.total_messages} 条消息, 去重后 {len(self.data_buffer)} 种数据")
|
||
|
||
except Exception as e:
|
||
print(f"Error processing MQTT message: {e}")
|
||
|
||
def integrate_and_save_data(self):
|
||
"""整合数据并发布到MQTT"""
|
||
with self.buffer_lock:
|
||
if not self.data_buffer:
|
||
print("缓冲区为空,跳过本次整合")
|
||
return
|
||
|
||
self.integration_count += 1
|
||
|
||
# 准备整合后的数据
|
||
unique_data_items = {}
|
||
for data_key, buffer_entry in self.data_buffer.items():
|
||
key_str = "|".join(data_key) if isinstance(data_key, tuple) else str(data_key)
|
||
unique_data_items[key_str] = {
|
||
"data": buffer_entry["data"],
|
||
"risk_result": buffer_entry["risk_result"],
|
||
"first_seen": buffer_entry["first_seen"],
|
||
"last_seen": buffer_entry["timestamp"],
|
||
"message_count": buffer_entry["count"]
|
||
}
|
||
|
||
# 整合数据
|
||
integration_data = {
|
||
"integration_id": self.integration_count,
|
||
"timestamp": datetime.now().isoformat(),
|
||
"total_messages": self.total_messages,
|
||
"unique_data_count": len(self.data_buffer),
|
||
"sensor_id": SENSER_ID,
|
||
"integration_interval": INTEGRATION_INTERVAL,
|
||
"unique_data_items": unique_data_items,
|
||
"statistics": {
|
||
"total_messages_received": self.total_messages,
|
||
"duplicate_messages_removed": self.deduplicated_messages,
|
||
"unique_messages_kept": self.processed_messages
|
||
}
|
||
}
|
||
|
||
# 打印整合信息
|
||
self.print_integration_stats(integration_data)
|
||
|
||
# 发布到MQTT服务器
|
||
self.publish_to_mqtt(integration_data)
|
||
|
||
# 保存到本地/数据库
|
||
self.save_integrated_data(integration_data)
|
||
|
||
# 清空缓冲区
|
||
self.data_buffer.clear()
|
||
|
||
def publish_to_mqtt(self, integration_data):
|
||
"""发布处理结果到MQTT服务器"""
|
||
try:
|
||
# 将数据转换为JSON
|
||
payload = json.dumps(integration_data, ensure_ascii=False, default=str)
|
||
|
||
# 发布消息
|
||
result = self.mqtt_publish_client.publish(
|
||
MQTT_PUBLISH_TOPIC,
|
||
payload,
|
||
qos=1
|
||
)
|
||
|
||
# 等待发布完成
|
||
result.wait_for_publish()
|
||
|
||
if result.is_published():
|
||
print(f"整合数据 {self.integration_count} 已发布到MQTT主题: {MQTT_PUBLISH_TOPIC}")
|
||
else:
|
||
print(f"整合数据 {self.integration_count} MQTT发布失败")
|
||
|
||
except Exception as e:
|
||
print(f"发布MQTT消息时出错: {e}")
|
||
|
||
def save_integrated_data(self, integration_data):
|
||
"""保存整合后的数据"""
|
||
try:
|
||
# 生成保存格式
|
||
save_data = self.tf_card_manager.generate_fake_sensor_002_data(integration_data)
|
||
|
||
# 保存到TF卡
|
||
save_path = self.tf_card_manager.save_data_to_tfcard_json(
|
||
save_data, dir_name=SENSER_ID, json_type=SENSER_ID)
|
||
|
||
# 上传到SFTP
|
||
directory = os.path.dirname(save_path)
|
||
file_name = os.path.basename(save_path)
|
||
cleaned_path = directory[len(self.base_path + "/"):]
|
||
down_url = sftp_upload_one_file(save_path, cleaned_path, file_name)
|
||
|
||
print(f"整合数据 {self.integration_count} 已保存,SFTP路径: {down_url}")
|
||
|
||
# 保存到MySQL
|
||
save_radar_mysql_data(SENSER_ID, save_data, down_url)
|
||
|
||
except Exception as e:
|
||
print(f"保存整合数据时出错: {e}")
|
||
|
||
def print_integration_stats(self, integration_data):
|
||
"""打印整合统计信息"""
|
||
unique_count = integration_data["unique_data_count"]
|
||
|
||
print("=" * 80)
|
||
print(f"第 {self.integration_count} 次数据整合完成 (10秒周期)")
|
||
print("=" * 80)
|
||
print(f"统计信息:")
|
||
print(f" - 10秒内接收消息总数: {self.total_messages} 条")
|
||
print(f" - 去重后保留数据: {unique_count} 种不同数据")
|
||
print(f" - 去重掉的消息: {self.deduplicated_messages} 条")
|
||
|
||
# 打印风险评估结果
|
||
high_risk_count = 0
|
||
medium_risk_count = 0
|
||
|
||
for item in integration_data["unique_data_items"].values():
|
||
risk_level = item["risk_result"]["risk_level"]
|
||
if risk_level == "HIGH":
|
||
high_risk_count += 1
|
||
elif risk_level == "MEDIUM":
|
||
medium_risk_count += 1
|
||
|
||
print(f" - 高风险数据: {high_risk_count} 条")
|
||
print(f" - 中风险数据: {medium_risk_count} 条")
|
||
print(f" - 低风险/未知: {unique_count - high_risk_count - medium_risk_count} 条")
|
||
|
||
print(f"\n下次整合时间: {datetime.fromtimestamp(time.time() + INTEGRATION_INTERVAL).strftime('%H:%M:%S')}")
|
||
print("=" * 80)
|
||
|
||
def check_and_process_integration(self):
|
||
"""检查并执行数据整合"""
|
||
current_time = time.time()
|
||
if current_time - self.last_integration_time >= INTEGRATION_INTERVAL:
|
||
if self.data_buffer:
|
||
print(f"\n正在进行第 {self.integration_count + 1} 次数据整合...")
|
||
self.integrate_and_save_data()
|
||
else:
|
||
print(f"\n第 {self.integration_count + 1} 次整合周期结束,缓冲区无数据")
|
||
|
||
self.last_integration_time = current_time
|
||
return True
|
||
return False
|
||
|
||
def run(self):
|
||
"""主循环"""
|
||
self.mqtt_client.loop_start()
|
||
self.mqtt_publish_client.loop_start()
|
||
|
||
print(f"\nMQTT高血压风险评估系统启动")
|
||
print(f"数据整合间隔: {INTEGRATION_INTERVAL} 秒")
|
||
print(f"接收主题: {MQTT_SUBSCRIBE_TOPIC}")
|
||
print(f"发布主题: {MQTT_PUBLISH_TOPIC}")
|
||
print("按 Ctrl+C 停止程序")
|
||
print("-" * 80)
|
||
|
||
try:
|
||
while True:
|
||
self.check_and_process_integration()
|
||
time.sleep(0.1)
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n收到停止信号,正在进行最后一次数据整合...")
|
||
|
||
# 保存剩余数据
|
||
with self.buffer_lock:
|
||
if self.data_buffer:
|
||
print(f"正在保存缓冲区中剩余的 {len(self.data_buffer)} 条数据...")
|
||
self.integrate_and_save_data()
|
||
else:
|
||
print("缓冲区为空,无需保存")
|
||
|
||
# 停止MQTT客户端
|
||
self.mqtt_client.loop_stop()
|
||
self.mqtt_publish_client.loop_stop()
|
||
|
||
print("程序已停止")
|
||
print(f"\n最终统计:")
|
||
print(f" - 总共接收消息: {self.total_messages} 条")
|
||
print(f" - 执行整合次数: {self.integration_count} 次")
|
||
print("=" * 80)
|
||
|
||
# ==================== 主程序 ====================
|
||
if __name__ == "__main__":
|
||
# 安装依赖(如果需要)
|
||
# pip install numpy scipy requests paho-mqtt
|
||
|
||
# 创建并启动整合后的系统
|
||
handler = MQTTHypertensionHandler(BASE_PATH, ROBOT_ID)
|
||
handler.run() |