Hypertension_Risk_Assessment/0112-2026 - Test_Package/cloud_hypertension_system-云端代码.py

1244 lines
41 KiB
Python
Raw Permalink Normal View History

2026-01-13 11:27:21 +08:00
# -*- coding: utf-8 -*-
"""
高血压风险评估系统 - 云端深度分析模块
Hypertension Risk Assessment - Cloud Deep Analysis Module
Created: 2026-01-10
Author: KKRobot Healthcare AI Team
Version: v1.0_cloud_analytics
📌 部署架构混合部署云端组件
- 边缘端RK3588设备实时预警
- 云端本代码服务器深度分析与长期趋势
🎯 核心功能
1. 接收边缘端上传的匿名化数据REST API
2. HRV深度分析时域+频域指标
3. 机器学习风险预测模型
4. 长期趋势分析与可视化
5. 个性化基线优化
6. 自动生成诊断报告
7. 参数优化与下发
💻 技术栈
- Web框架Flask轻量级REST API
- 数据库SQLite可切换PostgreSQL
- 机器学习PyTorch回归模型
- 数据分析pandas, scipy
- 可视化matplotlib
📦 运行要求
pip install flask numpy scipy pandas torch matplotlib sqlalchemy
python cloud_hypertension_system.py
🔐 安全特性
- API Key认证
- 数据匿名化验证
- SQL注入防护
- HTTPS支持生产环境
"""
import numpy as np
import pandas as pd
from scipy import signal
from scipy.stats import entropy
import torch
import torch.nn as nn
import torch.optim as optim
from flask import Flask, request, jsonify
from sqlalchemy import create_engine, Column, Integer, Float, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime, timedelta
import json
import os
import hashlib
import warnings
import matplotlib
matplotlib.use('Agg') # 无GUI后端
import matplotlib.pyplot as plt
from collections import defaultdict
import pickle
warnings.filterwarnings("ignore")
# ==================== 0. 云端核心配置 ====================
class CloudConfig:
"""云端服务器配置"""
def __init__(self):
# 版本标识
self.version = "v1.0_cloud_analytics"
# ========== API服务配置 ==========
self.api_config = {
'host': '0.0.0.0',
'port': 5000,
'debug': False,
'api_keys': [
'your_api_key_here',
'edge_device_key_001'
],
'max_request_size': 10 * 1024 * 1024, # 10MB
'enable_cors': True
}
# ========== 数据库配置 ==========
self.database_config = {
'type': 'sqlite', # 'sqlite' or 'postgresql'
'sqlite_path': './cloud_database.db',
'postgresql_url': 'postgresql://user:password@localhost/hypertension_db',
'echo': False # SQL日志
}
# ========== HRV深度分析配置 ==========
self.hrv_analysis_config = {
'enable_frequency_domain': True,
'frequency_bands': {
'vlf': (0.003, 0.04), # 超低频
'lf': (0.04, 0.15), # 低频
'hf': (0.15, 0.4) # 高频
},
'psd_method': 'welch',
'detrend': True
}
# ========== 机器学习模型配置 ==========
self.ml_config = {
'model_type': 'neural_network', # 'neural_network' or 'random_forest'
'input_features': [
'rmssd', 'sdnn', 'pnn50', 'mean_hr',
'lf_power', 'hf_power', 'lf_hf_ratio',
'bathroom_freq', 'bathroom_duration'
],
'hidden_dims': [64, 32, 16],
'learning_rate': 0.001,
'batch_size': 32,
'epochs': 50,
'validation_split': 0.2,
'model_save_path': './models/cloud_model.pth'
}
# ========== 趋势分析配置 ==========
self.trend_config = {
'min_days_for_trend': 7, # 最少7天才计算趋势
'baseline_window': 14, # 14天滚动基线
'alert_threshold_increase': 0.15, # 风险评分上升15%触发警报
'smoothing_window': 3 # 3天移动平均
}
# ========== 报告生成配置 ==========
self.report_config = {
'report_dir': './reports',
'chart_dir': './charts',
'template_path': './templates/report_template.html',
'export_formats': ['pdf', 'html', 'json']
}
# ========== 参数优化配置 ==========
self.optimization_config = {
'enable_auto_tuning': True,
'tune_interval_days': 7, # 每7天优化一次
'min_samples_for_tuning': 50,
'performance_metrics': ['sensitivity', 'specificity', 'auc']
}
# 创建必要目录
for dir_path in [self.report_config['report_dir'],
self.report_config['chart_dir'],
os.path.dirname(self.ml_config['model_save_path'])]:
os.makedirs(dir_path, exist_ok=True)
# 全局配置实例
config = CloudConfig()
# ==================== 1. 数据库模型层 ====================
Base = declarative_base()
class Device(Base):
"""设备表(匿名化)"""
__tablename__ = 'devices'
id = Column(Integer, primary_key=True)
device_id = Column(String(100), unique=True, nullable=False) # 匿名ID
device_type = Column(String(50)) # 'RK3588', 'RK3568', etc.
registration_date = Column(DateTime, default=datetime.now)
last_upload = Column(DateTime)
total_uploads = Column(Integer, default=0)
class SensorData(Base):
"""传感器数据表"""
__tablename__ = 'sensor_data'
id = Column(Integer, primary_key=True)
device_id = Column(String(100), nullable=False)
timestamp = Column(DateTime, nullable=False)
event_type = Column(String(50)) # 'BCG_REALTIME', 'BATHROOM_VISIT'
# BCG生理指标
heart_rate = Column(Float)
rmssd = Column(Float)
sdnn = Column(Float)
pnn50 = Column(Float)
respiratory_rate = Column(Float)
# 雷达行为指标
bathroom_freq = Column(Integer)
bathroom_duration = Column(Float)
# 风险评估
risk_score = Column(Float)
risk_level = Column(String(20))
signal_quality = Column(Float)
# 原始数据JSON
raw_data = Column(Text)
class NightlySummary(Base):
"""夜间汇总表"""
__tablename__ = 'nightly_summary'
id = Column(Integer, primary_key=True)
device_id = Column(String(100), nullable=False)
date = Column(String(20), nullable=False) # 'YYYY-MM-DD'
# BCG汇总
mean_hr = Column(Float)
min_hr = Column(Float)
max_hr = Column(Float)
mean_rmssd = Column(Float)
mean_sdnn = Column(Float)
mean_pnn50 = Column(Float)
# HRV频域指标
vlf_power = Column(Float)
lf_power = Column(Float)
hf_power = Column(Float)
lf_hf_ratio = Column(Float)
# 行为汇总
total_bathroom_visits = Column(Integer)
total_bathroom_duration = Column(Float)
avg_bathroom_duration = Column(Float)
first_bathroom_hour = Column(Float)
# 风险评估
mean_risk_score = Column(Float)
max_risk_score = Column(Float)
dominant_risk_level = Column(String(20))
# 数据质量
valid_data_hours = Column(Float)
total_data_points = Column(Integer)
class RiskAlert(Base):
"""风险警报表"""
__tablename__ = 'risk_alerts'
id = Column(Integer, primary_key=True)
device_id = Column(String(100), nullable=False)
timestamp = Column(DateTime, nullable=False)
alert_type = Column(String(50)) # 'HIGH_RISK', 'TREND_INCREASE', 'ANOMALY'
risk_score = Column(Float)
message = Column(Text)
is_read = Column(Integer, default=0)
class DatabaseManager:
"""数据库管理器"""
def __init__(self, config):
self.config = config
# 创建引擎
if config.database_config['type'] == 'sqlite':
db_url = f"sqlite:///{config.database_config['sqlite_path']}"
else:
db_url = config.database_config['postgresql_url']
self.engine = create_engine(
db_url,
echo=config.database_config['echo']
)
# 创建表
Base.metadata.create_all(self.engine)
# 创建会话工厂
Session = sessionmaker(bind=self.engine)
self.session = Session()
print(f"✓ 数据库初始化成功: {config.database_config['type']}")
def add_sensor_data(self, device_id, event):
"""添加传感器数据"""
data = SensorData(
device_id=device_id,
timestamp=datetime.fromtimestamp(event['timestamp']),
event_type=event['event_type'],
heart_rate=event['data'].get('hr'),
rmssd=event['data'].get('rmssd'),
sdnn=event['data'].get('sdnn'),
pnn50=event['data'].get('pnn50'),
respiratory_rate=event['data'].get('rr'),
bathroom_freq=event['data'].get('bathroom_freq'),
bathroom_duration=event['data'].get('bathroom_duration'),
risk_score=event['data'].get('risk_score'),
risk_level=event['data'].get('risk_level'),
signal_quality=event.get('quality', 1.0),
raw_data=json.dumps(event['data'])
)
self.session.add(data)
self.session.commit()
def get_device_data(self, device_id, start_date, end_date):
"""获取设备数据"""
query = self.session.query(SensorData).filter(
SensorData.device_id == device_id,
SensorData.timestamp >= start_date,
SensorData.timestamp <= end_date
)
return query.all()
def get_nightly_summaries(self, device_id, days=30):
"""获取夜间汇总"""
query = self.session.query(NightlySummary).filter(
NightlySummary.device_id == device_id
).order_by(NightlySummary.date.desc()).limit(days)
return query.all()
def add_nightly_summary(self, summary_data):
"""添加夜间汇总"""
summary = NightlySummary(**summary_data)
self.session.add(summary)
self.session.commit()
def add_risk_alert(self, device_id, alert_type, risk_score, message):
"""添加风险警报"""
alert = RiskAlert(
device_id=device_id,
timestamp=datetime.now(),
alert_type=alert_type,
risk_score=risk_score,
message=message
)
self.session.add(alert)
self.session.commit()
# ==================== 2. HRV深度分析层 ====================
class HRVDeepAnalyzer:
"""HRV深度分析器频域+非线性)"""
def __init__(self, config):
self.config = config
self.freq_bands = config.hrv_analysis_config['frequency_bands']
def calculate_frequency_domain(self, rr_intervals, fs=4.0):
"""
计算频域HRV指标
参数
- rr_intervals: RR间期数组ms
- fs: 重采样率Hz
返回{'vlf_power', 'lf_power', 'hf_power', 'lf_hf_ratio', ...}
"""
try:
if len(rr_intervals) < 20:
return None
# 1. 转换为时间序列
rr_sec = rr_intervals / 1000.0
cum_time = np.cumsum(rr_sec)
cum_time = cum_time - cum_time[0]
# 2. 插值到均匀采样
time_uniform = np.arange(0, cum_time[-1], 1.0/fs)
interp_func = interp1d(cum_time, rr_sec, kind='cubic', fill_value='extrapolate')
rr_uniform = interp_func(time_uniform)
# 3. 去趋势
if self.config.hrv_analysis_config['detrend']:
rr_uniform = signal.detrend(rr_uniform)
# 4. Welch功率谱估计
freqs, psd = signal.welch(
rr_uniform,
fs=fs,
nperseg=min(256, len(rr_uniform)),
scaling='density'
)
# 5. 计算各频段能量
vlf_mask = (freqs >= self.freq_bands['vlf'][0]) & (freqs < self.freq_bands['vlf'][1])
lf_mask = (freqs >= self.freq_bands['lf'][0]) & (freqs < self.freq_bands['lf'][1])
hf_mask = (freqs >= self.freq_bands['hf'][0]) & (freqs < self.freq_bands['hf'][1])
vlf_power = np.trapz(psd[vlf_mask], freqs[vlf_mask])
lf_power = np.trapz(psd[lf_mask], freqs[lf_mask])
hf_power = np.trapz(psd[hf_mask], freqs[hf_mask])
total_power = vlf_power + lf_power + hf_power
# 6. 比率指标
lf_hf_ratio = lf_power / (hf_power + 1e-6)
lf_norm = lf_power / (lf_power + hf_power + 1e-6)
hf_norm = hf_power / (lf_power + hf_power + 1e-6)
return {
'vlf_power': float(vlf_power),
'lf_power': float(lf_power),
'hf_power': float(hf_power),
'total_power': float(total_power),
'lf_hf_ratio': float(lf_hf_ratio),
'lf_norm': float(lf_norm),
'hf_norm': float(hf_norm),
'vlf_percent': float(vlf_power / total_power * 100),
'lf_percent': float(lf_power / total_power * 100),
'hf_percent': float(hf_power / total_power * 100)
}
except Exception as e:
print(f" ⚠️ 频域分析失败: {e}")
return None
def calculate_nonlinear_features(self, rr_intervals):
"""
计算非线性HRV指标
返回{'sd1', 'sd2', 'sample_entropy', ...}
"""
try:
if len(rr_intervals) < 20:
return None
# 1. Poincaré图指标SD1, SD2
diff_rr = np.diff(rr_intervals)
sd1 = np.sqrt(np.var(diff_rr) / 2)
sd2 = np.sqrt(2 * np.var(rr_intervals) - np.var(diff_rr) / 2)
# 2. 样本熵Sample Entropy
# 简化版本(完整版较复杂)
sample_entropy_value = self._calculate_sample_entropy(rr_intervals, m=2, r=0.2)
return {
'sd1': float(sd1),
'sd2': float(sd2),
'sd1_sd2_ratio': float(sd1 / (sd2 + 1e-6)),
'sample_entropy': float(sample_entropy_value)
}
except Exception as e:
print(f" ⚠️ 非线性分析失败: {e}")
return None
def _calculate_sample_entropy(self, data, m=2, r=0.2):
"""计算样本熵(简化版)"""
try:
N = len(data)
if N < 10:
return 0.0
std_data = np.std(data)
r = r * std_data
def _maxdist(xi, xj):
return np.max(np.abs(xi - xj))
def _phi(m):
patterns = np.array([data[i:i+m] for i in range(N-m)])
C = np.zeros(N-m)
for i in range(N-m):
for j in range(N-m):
if i != j and _maxdist(patterns[i], patterns[j]) < r:
C[i] += 1
return np.sum(np.log(C / (N-m-1) + 1e-10)) / (N-m)
return float(_phi(m) - _phi(m+1))
except:
return 0.0
def comprehensive_analysis(self, rr_intervals):
"""
综合HRV分析
返回包含时域+频域+非线性指标的完整字典
"""
results = {}
# 时域指标
if len(rr_intervals) >= 10:
diff_rr = np.diff(rr_intervals)
results['time_domain'] = {
'mean_rr': float(np.mean(rr_intervals)),
'sdnn': float(np.std(rr_intervals)),
'rmssd': float(np.sqrt(np.mean(diff_rr ** 2))),
'pnn50': float(np.sum(np.abs(diff_rr) > 50) / len(diff_rr)),
'cv': float(np.std(rr_intervals) / np.mean(rr_intervals))
}
# 频域指标
freq_results = self.calculate_frequency_domain(rr_intervals)
if freq_results:
results['frequency_domain'] = freq_results
# 非线性指标
nonlinear_results = self.calculate_nonlinear_features(rr_intervals)
if nonlinear_results:
results['nonlinear'] = nonlinear_results
return results
# ==================== 3. 机器学习模型层 ====================
class RiskPredictionModel(nn.Module):
"""深度学习风险预测模型"""
def __init__(self, input_dim, hidden_dims=[64, 32, 16]):
super(RiskPredictionModel, self).__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(nn.BatchNorm1d(hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Dropout(0.3))
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, 1))
layers.append(nn.Sigmoid())
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x).squeeze(-1)
class MLModelTrainer:
"""机器学习模型训练器"""
def __init__(self, config):
self.config = config
self.model = None
self.scaler = None
self.feature_names = config.ml_config['input_features']
def prepare_training_data(self, db_manager, device_ids):
"""
从数据库准备训练数据
返回X (features), y (labels)
"""
X_list = []
y_list = []
for device_id in device_ids:
summaries = db_manager.get_nightly_summaries(device_id, days=90)
for summary in summaries:
# 提取特征
features = []
for feat_name in self.feature_names:
value = getattr(summary, feat_name, 0)
features.append(value if value is not None else 0)
# 提取标签(风险评分)
label = summary.mean_risk_score
if label is not None and len(features) == len(self.feature_names):
X_list.append(features)
y_list.append(label)
if len(X_list) == 0:
return None, None
X = np.array(X_list)
y = np.array(y_list)
# 标准化
from sklearn.preprocessing import StandardScaler
self.scaler = StandardScaler()
X = self.scaler.fit_transform(X)
return X, y
def train_model(self, X, y):
"""训练模型"""
if X is None or len(X) < 50:
print(" ⚠️ 训练数据不足需要至少50个样本")
return False
print(f"{'='*70}")
print(f"🤖 开始训练机器学习模型")
print(f"{'='*70}")
print(f"训练样本: {len(X)}")
print(f"特征维度: {X.shape[1]}")
# 划分训练/验证集
split_idx = int(len(X) * (1 - self.config.ml_config['validation_split']))
X_train, X_val = X[:split_idx], X[split_idx:]
y_train, y_val = y[:split_idx], y[split_idx:]
# 转换为Tensor
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val, dtype=torch.float32)
# 初始化模型
input_dim = X.shape[1]
self.model = RiskPredictionModel(
input_dim=input_dim,
hidden_dims=self.config.ml_config['hidden_dims']
)
# 损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(
self.model.parameters(),
lr=self.config.ml_config['learning_rate']
)
# 训练循环
best_val_loss = float('inf')
patience = 10
patience_counter = 0
for epoch in range(self.config.ml_config['epochs']):
# 训练
self.model.train()
optimizer.zero_grad()
outputs = self.model(X_train_tensor)
loss = criterion(outputs, y_train_tensor)
loss.backward()
optimizer.step()
# 验证
self.model.eval()
with torch.no_grad():
val_outputs = self.model(X_val_tensor)
val_loss = criterion(val_outputs, y_val_tensor)
# 早停
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
# 保存最佳模型
torch.save({
'model_state_dict': self.model.state_dict(),
'scaler': self.scaler,
'feature_names': self.feature_names
}, self.config.ml_config['model_save_path'])
else:
patience_counter += 1
if patience_counter >= patience:
print(f" 提前停止于 epoch {epoch+1}")
break
if (epoch + 1) % 10 == 0:
print(f" Epoch {epoch+1}/{self.config.ml_config['epochs']} | "
f"Train Loss: {loss.item():.4f} | Val Loss: {val_loss.item():.4f}")
print(f"✓ 模型训练完成!最佳验证损失: {best_val_loss:.4f}")
print(f"✓ 模型已保存: {self.config.ml_config['model_save_path']}")
print(f"{'='*70}")
return True
def load_model(self):
"""加载已训练模型"""
model_path = self.config.ml_config['model_save_path']
if not os.path.exists(model_path):
return False
try:
checkpoint = torch.load(model_path)
input_dim = len(checkpoint['feature_names'])
self.model = RiskPredictionModel(
input_dim=input_dim,
hidden_dims=self.config.ml_config['hidden_dims']
)
self.model.load_state_dict(checkpoint['model_state_dict'])
self.model.eval()
self.scaler = checkpoint['scaler']
self.feature_names = checkpoint['feature_names']
print(f"✓ 模型加载成功: {model_path}")
return True
except Exception as e:
print(f"❌ 模型加载失败: {e}")
return False
def predict(self, features):
"""
预测风险评分
参数features = {'rmssd': ..., 'sdnn': ..., ...}
返回风险评分0-1
"""
if self.model is None:
return None
# 提取特征值
feature_vector = []
for feat_name in self.feature_names:
feature_vector.append(features.get(feat_name, 0))
# 标准化
feature_vector = np.array(feature_vector).reshape(1, -1)
feature_vector = self.scaler.transform(feature_vector)
# 预测
with torch.no_grad():
feature_tensor = torch.tensor(feature_vector, dtype=torch.float32)
prediction = self.model(feature_tensor)
return float(prediction.item())
# ==================== 4. 趋势分析层 ====================
class TrendAnalyzer:
"""长期趋势分析器"""
def __init__(self, config):
self.config = config
def analyze_trends(self, nightly_summaries):
"""
分析长期趋势
参数nightly_summaries = [NightlySummary对象列表]
返回趋势分析结果
"""
if len(nightly_summaries) < self.config.trend_config['min_days_for_trend']:
return None
# 提取时间序列
dates = [datetime.strptime(s.date, '%Y-%m-%d') for s in nightly_summaries]
risk_scores = [s.mean_risk_score for s in nightly_summaries]
rmssd_values = [s.mean_rmssd for s in nightly_summaries]
hr_values = [s.mean_hr for s in nightly_summaries]
bathroom_freqs = [s.total_bathroom_visits for s in nightly_summaries]
# 移动平滑
window = self.config.trend_config['smoothing_window']
risk_smoothed = self._moving_average(risk_scores, window)
rmssd_smoothed = self._moving_average(rmssd_values, window)
# 计算趋势方向
risk_trend = self._calculate_trend(risk_smoothed)
rmssd_trend = self._calculate_trend(rmssd_smoothed)
# 检测异常值
risk_anomalies = self._detect_anomalies(risk_scores)
# 计算变化率
recent_risk = np.mean(risk_scores[-7:]) # 最近7天
baseline_risk = np.mean(risk_scores[:14]) # 前14天基线
risk_change_percent = (recent_risk - baseline_risk) / (baseline_risk + 1e-6) * 100
return {
'risk_trend': risk_trend, # 'increasing', 'stable', 'decreasing'
'rmssd_trend': rmssd_trend,
'risk_change_percent': float(risk_change_percent),
'recent_avg_risk': float(recent_risk),
'baseline_avg_risk': float(baseline_risk),
'anomaly_count': len(risk_anomalies),
'anomaly_dates': [dates[i].strftime('%Y-%m-%d') for i in risk_anomalies],
'avg_bathroom_freq': float(np.mean(bathroom_freqs)),
'max_bathroom_freq': int(np.max(bathroom_freqs))
}
def _moving_average(self, data, window):
"""移动平均"""
if len(data) < window:
return data
return np.convolve(data, np.ones(window)/window, mode='valid')
def _calculate_trend(self, data):
"""计算趋势方向(线性回归斜率)"""
if len(data) < 3:
return 'stable'
x = np.arange(len(data))
slope = np.polyfit(x, data, 1)[0]
if slope > 0.01:
return 'increasing'
elif slope < -0.01:
return 'decreasing'
else:
return 'stable'
def _detect_anomalies(self, data, threshold=2.5):
"""检测异常值(基于标准差)"""
mean = np.mean(data)
std = np.std(data)
anomalies = []
for i, value in enumerate(data):
if abs(value - mean) > threshold * std:
anomalies.append(i)
return anomalies
def should_alert(self, trend_result):
"""判断是否应该触发趋势警报"""
if trend_result is None:
return False, None
alert_threshold = self.config.trend_config['alert_threshold_increase']
if trend_result['risk_change_percent'] > alert_threshold * 100:
message = (f"⚠️ 风险评分上升趋势警报!"
f"近期风险: {trend_result['recent_avg_risk']:.3f}"
f"基线风险: {trend_result['baseline_avg_risk']:.3f}"
f"上升幅度: {trend_result['risk_change_percent']:.1f}%")
return True, message
return False, None
# ==================== 5. 报告生成层 ====================
class ReportGenerator:
"""诊断报告生成器"""
def __init__(self, config):
self.config = config
def generate_comprehensive_report(self, device_id, db_manager, ml_trainer, trend_analyzer):
"""
生成综合诊断报告
返回报告字典
"""
print(f"{'='*70}")
print(f"📄 生成综合诊断报告")
print(f"{'='*70}")
print(f"设备ID: {device_id}")
# 1. 获取最近30天数据
summaries = db_manager.get_nightly_summaries(device_id, days=30)
if len(summaries) == 0:
print(" ⚠️ 无可用数据")
return None
print(f"✓ 加载 {len(summaries)} 天数据")
# 2. 趋势分析
trend_result = trend_analyzer.analyze_trends(summaries)
# 3. 最新状态
latest = summaries[0]
latest_features = {
'rmssd': latest.mean_rmssd,
'sdnn': latest.mean_sdnn,
'mean_hr': latest.mean_hr,
'lf_power': latest.lf_power,
'hf_power': latest.hf_power,
'lf_hf_ratio': latest.lf_hf_ratio,
'bathroom_freq': latest.total_bathroom_visits,
'bathroom_duration': latest.total_bathroom_duration
}
# 4. ML预测
ml_prediction = None
if ml_trainer.model is not None:
ml_prediction = ml_trainer.predict(latest_features)
# 5. 计算统计量
stats = self._calculate_statistics(summaries)
# 6. 生成图表
chart_paths = self._generate_charts(device_id, summaries)
# 7. 医学建议
recommendations = self._generate_recommendations(latest_features, trend_result)
# 8. 构建报告
report = {
'device_id': device_id,
'generation_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'analysis_period': f"{summaries[-1].date} ~ {summaries[0].date}",
'total_days': len(summaries),
'latest_status': {
'date': latest.date,
'mean_hr': latest.mean_hr,
'mean_rmssd': latest.mean_rmssd,
'mean_sdnn': latest.mean_sdnn,
'lf_hf_ratio': latest.lf_hf_ratio,
'bathroom_visits': latest.total_bathroom_visits,
'risk_score': latest.mean_risk_score,
'risk_level': latest.dominant_risk_level
},
'ml_prediction': {
'predicted_risk': ml_prediction,
'model_version': config.version
} if ml_prediction else None,
'trend_analysis': trend_result,
'statistics': stats,
'charts': chart_paths,
'recommendations': recommendations
}
# 9. 保存报告
report_path = self._save_report(device_id, report)
report['report_path'] = report_path
print(f"✓ 报告生成完成: {report_path}")
print(f"{'='*70}")
return report
def _calculate_statistics(self, summaries):
"""计算统计指标"""
hr_values = [s.mean_hr for s in summaries if s.mean_hr]
rmssd_values = [s.mean_rmssd for s in summaries if s.mean_rmssd]
risk_values = [s.mean_risk_score for s in summaries if s.mean_risk_score]
return {
'hr': {
'mean': float(np.mean(hr_values)),
'std': float(np.std(hr_values)),
'min': float(np.min(hr_values)),
'max': float(np.max(hr_values))
},
'rmssd': {
'mean': float(np.mean(rmssd_values)),
'std': float(np.std(rmssd_values)),
'min': float(np.min(rmssd_values)),
'max': float(np.max(rmssd_values))
},
'risk_score': {
'mean': float(np.mean(risk_values)),
'std': float(np.std(risk_values)),
'trend': '上升' if np.mean(risk_values[-7:]) > np.mean(risk_values[:7]) else '下降'
}
}
def _generate_charts(self, device_id, summaries):
"""生成可视化图表"""
chart_dir = self.config.report_config['chart_dir']
charts = {}
dates = [datetime.strptime(s.date, '%Y-%m-%d') for s in reversed(summaries)]
# 1. 风险评分趋势图
risk_scores = [s.mean_risk_score for s in reversed(summaries)]
plt.figure(figsize=(10, 4))
plt.plot(dates, risk_scores, 'o-', linewidth=2, markersize=4)
plt.axhline(y=0.65, color='r', linestyle='--', label='高风险阈值')
plt.axhline(y=0.40, color='orange', linestyle='--', label='中风险阈值')
plt.xlabel('日期')
plt.ylabel('风险评分')
plt.title('30天风险评分趋势')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
risk_chart_path = os.path.join(chart_dir, f'{device_id}_risk_trend.png')
plt.savefig(risk_chart_path, dpi=100)
plt.close()
charts['risk_trend'] = risk_chart_path
# 2. HRV指标图
rmssd_values = [s.mean_rmssd for s in reversed(summaries)]
plt.figure(figsize=(10, 4))
plt.plot(dates, rmssd_values, 's-', linewidth=2, markersize=4, color='green')
plt.xlabel('日期')
plt.ylabel('RMSSD (ms)')
plt.title('30天HRV变化RMSSD')
plt.grid(True, alpha=0.3)
plt.tight_layout()
hrv_chart_path = os.path.join(chart_dir, f'{device_id}_hrv_trend.png')
plt.savefig(hrv_chart_path, dpi=100)
plt.close()
charts['hrv_trend'] = hrv_chart_path
return charts
def _generate_recommendations(self, features, trend_result):
"""生成医学建议"""
recommendations = []
# 基于RMSSD
if features.get('rmssd', 0) < 20:
recommendations.append("⚠️ HRV显著降低建议加强心血管监测")
elif features.get('rmssd', 0) < 30:
recommendations.append("💡 HRV偏低建议适度有氧运动改善")
# 基于心率
if features.get('mean_hr', 0) > 85:
recommendations.append("⚠️ 静息心率偏高,建议咨询心血管专科医生")
# 基于LF/HF比
if features.get('lf_hf_ratio', 0) > 2.5:
recommendations.append("💡 交感神经活性较高,建议放松训练(冥想、深呼吸)")
# 基于起夜频率
if features.get('bathroom_freq', 0) >= 3:
recommendations.append("⚠️ 夜尿频繁,建议泌尿科检查")
# 基于趋势
if trend_result and trend_result['risk_trend'] == 'increasing':
recommendations.append("📈 风险评分呈上升趋势,建议增加监测频率")
if len(recommendations) == 0:
recommendations.append("✅ 各项指标正常,保持健康生活方式")
return recommendations
def _save_report(self, device_id, report):
"""保存报告为JSON"""
report_dir = self.config.report_config['report_dir']
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"report_{device_id}_{timestamp}.json"
filepath = os.path.join(report_dir, filename)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
return filepath
# ==================== 6. Flask API层 ====================
app = Flask(__name__)
# 全局组件
db_manager = None
ml_trainer = None
trend_analyzer = None
report_generator = None
def authenticate_request():
"""API认证"""
api_key = request.headers.get('Authorization')
if api_key:
api_key = api_key.replace('Bearer ', '')
if api_key not in config.api_config['api_keys']:
return False
return True
@app.route('/api/upload', methods=['POST'])
def upload_data():
"""接收边缘端上传的数据"""
if not authenticate_request():
return jsonify({'error': 'Unauthorized'}), 401
try:
data = request.get_json()
device_id = data.get('device_id', 'unknown')
events = data.get('events', [])
# 存储到数据库
for event in events:
db_manager.add_sensor_data(device_id, event)
print(f" ☁️ [API] 接收上传: 设备={device_id}, 事件数={len(events)}")
return jsonify({
'status': 'success',
'received_events': len(events),
'device_id': device_id
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/report/<device_id>', methods=['GET'])
def get_report(device_id):
"""获取设备诊断报告"""
if not authenticate_request():
return jsonify({'error': 'Unauthorized'}), 401
try:
report = report_generator.generate_comprehensive_report(
device_id, db_manager, ml_trainer, trend_analyzer
)
if report is None:
return jsonify({'error': 'No data available'}), 404
return jsonify(report), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/optimize/<device_id>', methods=['POST'])
def optimize_parameters(device_id):
"""优化边缘端参数"""
if not authenticate_request():
return jsonify({'error': 'Unauthorized'}), 401
try:
# 获取设备历史数据
summaries = db_manager.get_nightly_summaries(device_id, days=30)
# 基于数据优化阈值
rmssd_values = [s.mean_rmssd for s in summaries if s.mean_rmssd]
hr_values = [s.mean_hr for s in summaries if s.mean_hr]
optimized_params = {
'rmssd_baseline': float(np.median(rmssd_values)),
'hr_baseline': float(np.median(hr_values)),
'quality_threshold': 0.5, # 可以根据数据质量调整
'upload_interval': 300
}
print(f" 🔧 [API] 参数优化: 设备={device_id}")
return jsonify({
'status': 'success',
'optimized_parameters': optimized_params
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/health', methods=['GET'])
def health_check():
"""健康检查"""
return jsonify({
'status': 'healthy',
'version': config.version,
'timestamp': datetime.now().isoformat()
}), 200
# ==================== 7. 主程序 ====================
def initialize_cloud_system():
"""初始化云端系统"""
global db_manager, ml_trainer, trend_analyzer, report_generator
print(f"{'='*70}")
print(f"☁️ 高血压风险评估系统 - 云端模块")
print(f"{'='*70}")
print(f"版本: {config.version}")
print(f"API端口: {config.api_config['port']}")
print(f"{'='*70}")
# 初始化组件
print("📦 初始化组件...")
db_manager = DatabaseManager(config)
ml_trainer = MLModelTrainer(config)
trend_analyzer = TrendAnalyzer(config)
report_generator = ReportGenerator(config)
# 尝试加载已训练模型
ml_trainer.load_model()
print(f"✅ 云端系统初始化完成!")
def run_demo():
"""运行演示(无需边缘端)"""
print(f"{'='*70}")
print(f"🎬 云端系统演示模式")
print(f"{'='*70}")
initialize_cloud_system()
# 模拟边缘端上传数据
print("📤 模拟边缘端数据上传...")
device_id = "demo_device_001"
for day in range(14):
date = (datetime.now() - timedelta(days=14-day)).strftime('%Y-%m-%d')
# 模拟夜间汇总数据
summary_data = {
'device_id': device_id,
'date': date,
'mean_hr': 70 + np.random.randn() * 3,
'min_hr': 60 + np.random.randn() * 2,
'max_hr': 85 + np.random.randn() * 3,
'mean_rmssd': 35 + np.random.randn() * 5,
'mean_sdnn': 50 + np.random.randn() * 8,
'mean_pnn50': 0.12 + np.random.randn() * 0.03,
'vlf_power': 300 + np.random.randn() * 50,
'lf_power': 500 + np.random.randn() * 80,
'hf_power': 400 + np.random.randn() * 60,
'lf_hf_ratio': 1.2 + np.random.randn() * 0.3,
'total_bathroom_visits': int(np.random.choice([1, 2, 3], p=[0.5, 0.3, 0.2])),
'total_bathroom_duration': 180 + np.random.randn() * 40,
'avg_bathroom_duration': 90 + np.random.randn() * 20,
'first_bathroom_hour': 2.5 + np.random.randn() * 1.0,
'mean_risk_score': 0.35 + day * 0.015 + np.random.randn() * 0.05, # 逐渐上升
'max_risk_score': 0.50 + day * 0.02,
'dominant_risk_level': 'MEDIUM' if day > 7 else 'LOW',
'valid_data_hours': 7.5 + np.random.randn() * 0.5,
'total_data_points': 900 + int(np.random.randn() * 50)
}
db_manager.add_nightly_summary(summary_data)
print(f"✓ 已添加14天模拟数据")
# 生成报告
report = report_generator.generate_comprehensive_report(
device_id, db_manager, ml_trainer, trend_analyzer
)
if report:
print("📊 报告摘要:")
print(f" 设备ID: {report['device_id']}")
print(f" 分析周期: {report['analysis_period']}")
print(f" 最新风险评分: {report['latest_status']['risk_score']:.3f}")
print(f" 风险等级: {report['latest_status']['risk_level']}")
print(f" 趋势: {report['trend_analysis']['risk_trend']}")
print(f" 风险变化: {report['trend_analysis']['risk_change_percent']:.1f}%")
print(f" 医学建议:")
for rec in report['recommendations']:
print(f" - {rec}")
print(f" 报告已保存: {report['report_path']}")
def main():
"""主函数"""
import sys
if len(sys.argv) > 1 and sys.argv[1] == 'demo':
# 演示模式
run_demo()
else:
# API服务模式
initialize_cloud_system()
print(f"🌐 启动API服务...")
print(f"监听地址: http://{config.api_config['host']}:{config.api_config['port']}")
print(f"可用端点:")
print(f" POST /api/upload - 接收边缘端数据")
print(f" GET /api/report/<device> - 获取诊断报告")
print(f" POST /api/optimize/<device> - 优化参数")
print(f" GET /api/health - 健康检查")
print(f"按Ctrl+C停止服务")
app.run(
host=config.api_config['host'],
port=config.api_config['port'],
debug=config.api_config['debug']
)
if __name__ == "__main__":
main()