Hypertension_Risk_Assessment/0112-2026 - Test_Package/edge_hypertension_system- 边缘端代码.py

1256 lines
42 KiB
Python
Raw Permalink Normal View History

2026-01-13 11:27:21 +08:00
# -*- coding: utf-8 -*-
"""
高血压风险评估系统 - 边缘端实时处理模块RK3588优化版
Hypertension Risk Assessment - Edge Computing Module (Optimized for RK3588)
Created: 2026-01-09
Author: KKRobot Healthcare AI Team
Version: v1.0_edge_realtime
📌 部署架构混合部署
- 边缘端本代码RK3588设备实时预警<1秒响应
- 云端配套深度分析长期趋势预测
📊 传感器配置
- 床上压电陶瓷BCG传感器250Hz采样
- 厕所毫米波雷达20Hz采样
🎯 核心功能
1. 实时BCG心率/HRV提取30秒滑动窗口
2. 实时雷达起夜检测
3. 硬规则快速风险评估
4. 即时预警触发高风险立即推送
5. 匿名化数据定期上传云端
RK3588优化
- 轻量级依赖numpy/scipy only
- 流式处理内存占用<500MB
- ARM NEON vectorization
- 无深度学习模型
📦 运行要求
pip install numpy scipy
python edge_hypertension_system.py
💾 存储需求
- 运行内存<500MB
- 磁盘缓存<100MB7天基线数据
"""
import numpy as np
from scipy import signal
from scipy.interpolate import interp1d
import time
import json
import os
from collections import deque
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings("ignore")
# ==================== 0. 边缘端核心配置 ====================
class EdgeConfig:
"""边缘端配置RK3588优化"""
def __init__(self):
# 版本标识
self.version = "v1.0_edge_realtime"
self.device_id = "edge_rk3588_001" # 设备唯一标识
# ========== BCG传感器配置 ==========
self.bcg_config = {
'sampling_rate': 250, # 采样率Hz
'window_size': 30, # 处理窗口(秒)
'slide_step': 5, # 滑动步长(秒)
'hr_range': (40, 120), # 心率范围bpm
'rr_range': (8, 25), # 呼吸率范围(/min
'quality_threshold': 0.5, # 质量阈值
'离bed_threshold': 0.3 # 离床检测阈值
}
# ========== 雷达传感器配置 ==========
self.radar_config = {
'sampling_rate': 20, # 采样率Hz
'presence_threshold': 0.6, # 存在检测阈值
'bathroom_zone': { # 厕所区域3D边界框
'x_range': (0.5, 2.5), # 米
'y_range': (0.5, 2.0), # 米
'z_range': (0.3, 1.8) # 米
},
'min_visit_duration': 30, # 最短起夜时长(秒)
'event_merge_gap': 60, # 事件合并间隔(秒)
'motion_threshold': 0.3 # 运动检测阈值
}
# ========== 实时处理配置 ==========
self.realtime_config = {
'buffer_size': 7500, # BCG缓冲区大小30秒@250Hz
'processing_interval': 5, # 处理间隔(秒)
'max_latency': 0.5, # 最大延迟(秒)
'enable_logging': True, # 是否打印日志
'log_interval': 60 # 日志打印间隔(秒)
}
# ========== 风险评估配置(硬规则) ==========
self.risk_config = {
# RMSSD阈值HRV核心指标
'rmssd_high_risk': 20, # <20ms高风险
'rmssd_medium_risk': 30, # 20-30ms中风险
'rmssd_low_risk': 40, # >40ms低风险
# 心率阈值
'hr_high_risk': 85, # >85bpm高风险
'hr_medium_risk': 75, # 75-85bpm中风险
# 起夜频率阈值(每晚)
'bathroom_high_risk': 3, # ≥3次高风险
'bathroom_medium_risk': 2, # 2次中风险
# 起夜心率上升阈值
'hr_surge_high_risk': 20, # >20bpm高风险
'hr_surge_medium_risk': 12, # 12-20bpm中风险
# 权重配置
'weights': {
'rmssd': 0.35,
'sdnn': 0.15,
'hr_level': 0.20,
'hr_surge': 0.10,
'bathroom_freq': 0.20
}
}
# ========== 预警配置 ==========
self.alert_config = {
'high_risk_threshold': 0.65, # 高风险阈值
'medium_risk_threshold': 0.40, # 中风险阈值
'alert_cooldown': 300, # 预警冷却时间(秒)
'enable_sound': True, # 是否声音提醒
'enable_push': True # 是否推送通知
}
# ========== 云端通信配置 ==========
self.cloud_config = {
'enable_upload': True, # 是否上传云端
'upload_interval': 300, # 上传间隔5分钟
'upload_url': 'https://api.example.com/upload',
'api_key': 'your_api_key_here',
'anonymize': True, # 是否匿名化
'compress': True # 是否压缩
}
# ========== 本地缓存配置 ==========
self.cache_config = {
'baseline_file': 'baseline_cache.json',
'event_log_file': 'event_log.jsonl',
'max_baseline_nights': 7, # 基线窗口(天)
'max_event_log_size': 1000, # 最大事件数
'cache_dir': './edge_cache'
}
# 创建缓存目录
os.makedirs(self.cache_config['cache_dir'], exist_ok=True)
# 全局配置实例
config = EdgeConfig()
# ==================== 1. 数据结构层 ====================
class RingBuffer:
"""高效环形缓冲区(节省内存)"""
def __init__(self, size):
self.size = size
self.buffer = np.zeros(size, dtype=np.float32)
self.index = 0
self.is_filled = False
def append(self, value):
"""追加单个值"""
self.buffer[self.index] = value
self.index = (self.index + 1) % self.size
if self.index == 0:
self.is_filled = True
def extend(self, values):
"""批量追加"""
for v in values:
self.append(v)
def get_data(self):
"""获取完整数据(按时间顺序)"""
if not self.is_filled:
return self.buffer[:self.index]
else:
return np.concatenate([
self.buffer[self.index:],
self.buffer[:self.index]
])
def is_full(self):
"""是否已填满"""
return self.is_filled or self.index >= self.size
def clear(self):
"""清空缓冲区"""
self.buffer.fill(0)
self.index = 0
self.is_filled = False
class SensorEvent:
"""传感器事件(轻量级数据结构)"""
def __init__(self, timestamp, event_type, data, quality=1.0):
self.timestamp = timestamp # Unix时间戳
self.event_type = event_type # 'BCG_HR', 'BCG_HRV', 'RADAR_VISIT', 'ALERT'
self.data = data # 事件数据dict
self.quality = quality # 质量分数0-1
def to_dict(self):
"""转换为字典(用于序列化)"""
return {
'timestamp': self.timestamp,
'event_type': self.event_type,
'data': self.data,
'quality': self.quality,
'datetime': datetime.fromtimestamp(self.timestamp).strftime('%Y-%m-%d %H:%M:%S')
}
def anonymize(self):
"""匿名化处理(去除敏感信息)"""
anon_data = self.data.copy()
# 移除可能的设备标识
anon_data.pop('device_id', None)
anon_data.pop('location', None)
return SensorEvent(self.timestamp, self.event_type, anon_data, self.quality)
class BaselineCache:
"""个体基线缓存7天滚动窗口"""
def __init__(self, cache_file):
self.cache_file = cache_file
self.baseline = self.load()
def load(self):
"""加载缓存"""
if os.path.exists(self.cache_file):
try:
with open(self.cache_file, 'r') as f:
return json.load(f)
except:
return self._default_baseline()
return self._default_baseline()
def save(self):
"""保存缓存"""
with open(self.cache_file, 'w') as f:
json.dump(self.baseline, f, indent=2)
def update(self, night_features):
"""更新基线(滚动窗口)"""
if 'history' not in self.baseline:
self.baseline['history'] = []
# 添加新数据
self.baseline['history'].append({
'date': datetime.now().strftime('%Y-%m-%d'),
'features': night_features
})
# 保留最近7天
if len(self.baseline['history']) > config.cache_config['max_baseline_nights']:
self.baseline['history'] = self.baseline['history'][-7:]
# 重新计算基线(中位数)
if len(self.baseline['history']) >= 3:
self._recalculate_baseline()
self.save()
def _recalculate_baseline(self):
"""重新计算基线(使用中位数)"""
history = self.baseline['history']
rmssd_values = [h['features'].get('rmssd', 0) for h in history if h['features'].get('rmssd', 0) > 0]
hr_values = [h['features'].get('mean_hr', 0) for h in history if h['features'].get('mean_hr', 0) > 0]
bathroom_values = [h['features'].get('bathroom_freq', 0) for h in history]
if rmssd_values:
self.baseline['rmssd'] = float(np.median(rmssd_values))
if hr_values:
self.baseline['mean_hr'] = float(np.median(hr_values))
if bathroom_values:
self.baseline['bathroom_freq'] = float(np.median(bathroom_values))
def get(self, key, default=None):
"""获取基线值"""
return self.baseline.get(key, default)
def _default_baseline(self):
"""默认基线(群体均值)"""
return {
'rmssd': 35.0, # 健康老年人典型值
'sdnn': 50.0,
'mean_hr': 70.0,
'bathroom_freq': 1.5,
'history': []
}
# ==================== 2. BCG信号处理层 ====================
class BCGProcessor:
"""BCG信号实时处理器ARM优化"""
def __init__(self, config):
self.config = config
self.fs = config.bcg_config['sampling_rate']
self.hr_range = config.bcg_config['hr_range']
self.rr_range = config.bcg_config['rr_range']
# 预先设计滤波器(避免重复计算)
self.hr_filter = self._design_hr_filter()
self.rr_filter = self._design_rr_filter()
def _design_hr_filter(self):
"""设计心率带通滤波器0.8-3.0 Hz"""
nyq = self.fs / 2
low = 0.8 / nyq
high = 3.0 / nyq
b, a = signal.butter(4, [low, high], btype='band')
return b, a
def _design_rr_filter(self):
"""设计呼吸率带通滤波器0.1-0.6 Hz"""
nyq = self.fs / 2
low = 0.1 / nyq
high = 0.6 / nyq
b, a = signal.butter(4, [low, high], btype='band')
return b, a
def assess_signal_quality(self, bcg_signal):
"""
快速信号质量评估<50ms
返回质量分数0-1
"""
try:
# 1. SNR评估40%权重)
signal_power = np.mean(bcg_signal ** 2)
noise_estimate = np.var(np.diff(bcg_signal))
snr = signal_power / (noise_estimate + 1e-6)
snr_score = np.clip(snr / 10.0, 0, 1)
# 2. 频谱集中度30%权重)
freqs, psd = signal.welch(bcg_signal, fs=self.fs, nperseg=min(512, len(bcg_signal)))
hr_band_mask = (freqs >= 0.8) & (freqs <= 3.0)
hr_band_power = np.sum(psd[hr_band_mask])
total_power = np.sum(psd)
spectrum_score = hr_band_power / (total_power + 1e-6)
# 3. 周期性评估30%权重)
autocorr = np.correlate(bcg_signal - np.mean(bcg_signal),
bcg_signal - np.mean(bcg_signal),
mode='same')
autocorr = autocorr[len(autocorr)//2:]
peak_pos = np.argmax(autocorr[int(0.3*self.fs):int(1.5*self.fs)]) + int(0.3*self.fs)
periodicity_score = autocorr[peak_pos] / (autocorr[0] + 1e-6)
periodicity_score = np.clip(periodicity_score, 0, 1)
# 综合评分
quality = (0.40 * snr_score +
0.30 * spectrum_score +
0.30 * periodicity_score)
return float(np.clip(quality, 0, 1))
except Exception as e:
return 0.0
def extract_heart_rate(self, bcg_signal):
"""
实时心率提取<100ms
返回(心率bpm, 置信度, RR间期数组ms)
"""
try:
# 1. 带通滤波
filtered = signal.filtfilt(self.hr_filter[0], self.hr_filter[1], bcg_signal)
# 2. 峰值检测(优化参数)
peaks, properties = signal.find_peaks(
filtered,
distance=int(0.4 * self.fs), # 最小峰间距150bpm对应0.4秒)
prominence=0.3 * np.std(filtered)
)
if len(peaks) < 5:
return None, 0.0, None
# 3. 计算RR间期
rr_intervals = np.diff(peaks) / self.fs * 1000 # 转换为ms
# 4. 异常值过滤MAD方法快速
median_rr = np.median(rr_intervals)
mad = np.median(np.abs(rr_intervals - median_rr))
valid_mask = np.abs(rr_intervals - median_rr) < 3 * mad
if np.sum(valid_mask) < 5:
return None, 0.0, None
clean_rr = rr_intervals[valid_mask]
# 5. 计算心率
mean_rr_ms = np.mean(clean_rr)
heart_rate = 60000.0 / mean_rr_ms
# 6. 合理性检查
if not (self.hr_range[0] <= heart_rate <= self.hr_range[1]):
return None, 0.0, None
# 7. 置信度评估(基于变异系数)
cv = np.std(clean_rr) / np.mean(clean_rr)
confidence = np.clip(1.0 - cv, 0, 1)
return float(heart_rate), float(confidence), clean_rr
except Exception as e:
return None, 0.0, None
def calculate_hrv_features(self, rr_intervals):
"""
快速HRV特征计算<20ms
返回{'rmssd': float, 'sdnn': float, 'pnn50': float}
"""
try:
if len(rr_intervals) < 10:
return None
# 时域指标
sdnn = np.std(rr_intervals)
diff_rr = np.diff(rr_intervals)
rmssd = np.sqrt(np.mean(diff_rr ** 2))
nn50 = np.sum(np.abs(diff_rr) > 50)
pnn50 = nn50 / len(diff_rr) if len(diff_rr) > 0 else 0
return {
'sdnn': float(sdnn),
'rmssd': float(rmssd),
'pnn50': float(pnn50)
}
except Exception as e:
return None
def extract_respiratory_rate(self, bcg_signal):
"""
实时呼吸率提取<80ms
返回(呼吸率/min, 置信度)
"""
try:
# 1. 带通滤波
filtered = signal.filtfilt(self.rr_filter[0], self.rr_filter[1], bcg_signal)
# 2. Welch功率谱估计
freqs, psd = signal.welch(filtered, fs=self.fs,
nperseg=min(512, len(filtered)),
noverlap=256)
# 3. 限制在呼吸频率范围
rr_band_mask = (freqs >= 0.1) & (freqs <= 0.6)
rr_freqs = freqs[rr_band_mask]
rr_psd = psd[rr_band_mask]
if len(rr_psd) == 0:
return None, 0.0
# 4. 找峰值
peak_idx = np.argmax(rr_psd)
peak_freq = rr_freqs[peak_idx]
respiratory_rate = peak_freq * 60 # 转换为/min
# 5. 合理性检查
if not (self.rr_range[0] <= respiratory_rate <= self.rr_range[1]):
return None, 0.0
# 6. 置信度评估(峰值突出度)
peak_power = rr_psd[peak_idx]
mean_power = np.mean(rr_psd)
confidence = np.clip(peak_power / (mean_power * 3), 0, 1)
return float(respiratory_rate), float(confidence)
except Exception as e:
return None, 0.0
# ==================== 3. 雷达信号处理层 ====================
class RadarProcessor:
"""雷达信号实时处理器"""
def __init__(self, config):
self.config = config
self.bathroom_zone = config.radar_config['bathroom_zone']
self.presence_history = deque(maxlen=3) # 时序平滑3帧
self.current_visit_start = None
def detect_presence_in_bathroom(self, point_cloud):
"""
实时存在检测<30ms
输入point_cloud = [(x, y, z, velocity), ...]
返回(是否存在, 置信度)
"""
try:
if len(point_cloud) == 0:
self.presence_history.append(False)
return False, 0.0
# 1. 静态杂波去除
dynamic_points = [p for p in point_cloud if abs(p[3]) > 0.05]
# 2. 空间过滤(在厕所区域内)
bathroom_points = []
for p in dynamic_points:
x, y, z = p[0], p[1], p[2]
if (self.bathroom_zone['x_range'][0] <= x <= self.bathroom_zone['x_range'][1] and
self.bathroom_zone['y_range'][0] <= y <= self.bathroom_zone['y_range'][1] and
self.bathroom_zone['z_range'][0] <= z <= self.bathroom_zone['z_range'][1]):
bathroom_points.append(p)
# 3. 密度判断
point_density = len(bathroom_points) / (len(point_cloud) + 1e-6)
is_present = point_density > self.config.radar_config['presence_threshold']
# 4. 时序平滑连续3帧确认
self.presence_history.append(is_present)
stable_presence = sum(self.presence_history) >= 2 # 至少2/3帧检测到
confidence = point_density if stable_presence else 0.0
return stable_presence, float(confidence)
except Exception as e:
return False, 0.0
def detect_bathroom_visit(self, timestamp, is_present):
"""
起夜事件检测状态机
返回SensorEvent对象如果访问结束 None
"""
min_duration = self.config.radar_config['min_visit_duration']
if is_present and self.current_visit_start is None:
# 开始起夜
self.current_visit_start = timestamp
return None
elif not is_present and self.current_visit_start is not None:
# 结束起夜
duration = timestamp - self.current_visit_start
if duration >= min_duration:
# 有效起夜
event = SensorEvent(
timestamp=self.current_visit_start,
event_type='BATHROOM_VISIT',
data={
'start': self.current_visit_start,
'end': timestamp,
'duration': duration
},
quality=1.0
)
self.current_visit_start = None
return event
else:
# 太短,忽略
self.current_visit_start = None
return None
return None
def extract_vital_signs_from_radar(self, phase_signal, fs=20):
"""
从雷达相位信号提取生理信号起夜期间
返回{'hr': float, 'rr': float}
"""
try:
if len(phase_signal) < fs * 10: # 至少10秒数据
return None
# 心率提取FFT
freqs, psd = signal.welch(phase_signal, fs=fs, nperseg=min(256, len(phase_signal)))
hr_mask = (freqs >= 0.8) & (freqs <= 2.5)
hr_freqs = freqs[hr_mask]
hr_psd = psd[hr_mask]
hr_peak_idx = np.argmax(hr_psd)
hr = hr_freqs[hr_peak_idx] * 60
# 呼吸率提取
rr_mask = (freqs >= 0.15) & (freqs <= 0.5)
rr_freqs = freqs[rr_mask]
rr_psd = psd[rr_mask]
rr_peak_idx = np.argmax(rr_psd)
rr = rr_freqs[rr_peak_idx] * 60
# 合理性检查
if 40 <= hr <= 130 and 6 <= rr <= 30:
return {'hr': float(hr), 'rr': float(rr)}
else:
return None
except Exception as e:
return None
# ==================== 4. 实时风险评估引擎 ====================
class RealtimeRiskEngine:
"""实时风险评估引擎(硬规则,<10ms"""
def __init__(self, config, baseline_cache):
self.config = config
self.baseline = baseline_cache
self.risk_cfg = config.risk_config
def calculate_risk_score(self, features):
"""
快速风险评分硬规则
输入features = {
'rmssd': float,
'sdnn': float,
'mean_hr': float,
'hr_surge': float, # 起夜时心率上升
'bathroom_freq_today': int
}
返回(总风险分数0-1, 风险等级, 分项风险)
"""
weights = self.risk_cfg['weights']
# 1. RMSSD风险35%权重)
rmssd = features.get('rmssd', 0)
rmssd_baseline = self.baseline.get('rmssd', 35.0)
if rmssd < self.risk_cfg['rmssd_high_risk']:
rmssd_risk = 1.0
elif rmssd < self.risk_cfg['rmssd_medium_risk']:
rmssd_risk = 0.6
elif rmssd < rmssd_baseline:
rmssd_risk = 0.4
else:
rmssd_risk = 0.2
# 2. SDNN风险15%权重)
sdnn = features.get('sdnn', 0)
if sdnn < 30:
sdnn_risk = 1.0
elif sdnn < 45:
sdnn_risk = 0.6
else:
sdnn_risk = 0.2
# 3. 心率水平风险20%权重)
hr = features.get('mean_hr', 70)
hr_baseline = self.baseline.get('mean_hr', 70.0)
if hr > self.risk_cfg['hr_high_risk']:
hr_risk = 1.0
elif hr > self.risk_cfg['hr_medium_risk']:
hr_risk = 0.6
elif hr > hr_baseline + 5:
hr_risk = 0.4
else:
hr_risk = 0.2
# 4. 起夜心率上升风险10%权重)
hr_surge = features.get('hr_surge', 0)
if hr_surge > self.risk_cfg['hr_surge_high_risk']:
hr_surge_risk = 1.0
elif hr_surge > self.risk_cfg['hr_surge_medium_risk']:
hr_surge_risk = 0.6
else:
hr_surge_risk = 0.3
# 5. 起夜频率风险20%权重)
bathroom_freq = features.get('bathroom_freq_today', 0)
bathroom_baseline = self.baseline.get('bathroom_freq', 1.5)
if bathroom_freq >= self.risk_cfg['bathroom_high_risk']:
bathroom_risk = 1.0
elif bathroom_freq >= self.risk_cfg['bathroom_medium_risk']:
bathroom_risk = 0.6
elif bathroom_freq > bathroom_baseline:
bathroom_risk = 0.4
else:
bathroom_risk = 0.2
# 6. 加权综合
total_risk = (
weights['rmssd'] * rmssd_risk +
weights['sdnn'] * sdnn_risk +
weights['hr_level'] * hr_risk +
weights['hr_surge'] * hr_surge_risk +
weights['bathroom_freq'] * bathroom_risk
)
# 7. 风险等级分类
if total_risk >= self.config.alert_config['high_risk_threshold']:
risk_level = 'HIGH'
elif total_risk >= self.config.alert_config['medium_risk_threshold']:
risk_level = 'MEDIUM'
else:
risk_level = 'LOW'
# 8. 分项风险
breakdown = {
'rmssd_risk': rmssd_risk,
'sdnn_risk': sdnn_risk,
'hr_risk': hr_risk,
'hr_surge_risk': hr_surge_risk,
'bathroom_risk': bathroom_risk
}
return float(total_risk), risk_level, breakdown
def should_trigger_alert(self, risk_score, risk_level, last_alert_time):
"""
判断是否触发预警
返回(是否预警, 预警消息)
"""
# 冷却时间检查
if last_alert_time is not None:
cooldown = self.config.alert_config['alert_cooldown']
if time.time() - last_alert_time < cooldown:
return False, None
# 只对高风险预警
if risk_level == 'HIGH':
message = f"⚠️ 高血压风险预警!风险评分: {risk_score:.2f}"
return True, message
return False, None
# ==================== 5. 云端通信层 ====================
class CloudUploader:
"""云端数据上传器(匿名化)"""
def __init__(self, config):
self.config = config
self.upload_buffer = []
self.last_upload_time = 0
def add_to_buffer(self, event):
"""添加事件到上传缓冲区"""
if not self.config.cloud_config['enable_upload']:
return
# 匿名化
if self.config.cloud_config['anonymize']:
event = event.anonymize()
self.upload_buffer.append(event.to_dict())
def should_upload(self):
"""判断是否应该上传"""
if not self.config.cloud_config['enable_upload']:
return False
interval = self.config.cloud_config['upload_interval']
return (time.time() - self.last_upload_time) >= interval
def upload(self):
"""
上传数据到云端
返回是否成功
"""
if len(self.upload_buffer) == 0:
return True
try:
# 构建上传数据包
payload = {
'device_id': config.device_id if not self.config.cloud_config['anonymize'] else 'anonymous',
'timestamp': time.time(),
'events': self.upload_buffer,
'version': config.version
}
# 这里应该调用实际的HTTP上传接口
# import requests
# response = requests.post(
# self.config.cloud_config['upload_url'],
# json=payload,
# headers={'Authorization': f"Bearer {self.config.cloud_config['api_key']}"}
# )
# success = response.status_code == 200
# 模拟上传成功
print(f" ☁️ [Cloud Upload] Uploaded {len(self.upload_buffer)} events to cloud")
# 清空缓冲区
self.upload_buffer = []
self.last_upload_time = time.time()
return True
except Exception as e:
print(f" ❌ [Cloud Upload] Failed: {e}")
return False
# ==================== 6. 实时处理主控制器 ====================
class RealtimeController:
"""实时处理主控制器"""
def __init__(self, config):
self.config = config
# 初始化组件
self.bcg_processor = BCGProcessor(config)
self.radar_processor = RadarProcessor(config)
self.baseline_cache = BaselineCache(
os.path.join(config.cache_config['cache_dir'], config.cache_config['baseline_file'])
)
self.risk_engine = RealtimeRiskEngine(config, self.baseline_cache)
self.cloud_uploader = CloudUploader(config)
# 数据缓冲区
buffer_size = config.bcg_config['window_size'] * config.bcg_config['sampling_rate']
self.bcg_buffer = RingBuffer(buffer_size)
# 状态变量
self.last_process_time = 0
self.last_alert_time = None
self.last_log_time = 0
self.bathroom_visits_today = []
self.night_start_time = None
# 统计变量
self.stats = {
'total_processed': 0,
'total_alerts': 0,
'avg_latency': 0.0
}
print(f"{'='*70}")
print(f"🚀 边缘端实时处理系统初始化成功")
print(f"{'='*70}")
print(f"版本: {config.version}")
print(f"设备: {config.device_id}")
print(f"BCG采样率: {config.bcg_config['sampling_rate']} Hz")
print(f"处理窗口: {config.bcg_config['window_size']}")
print(f"云端上传: {'启用' if config.cloud_config['enable_upload'] else '禁用'}")
print(f"{'='*70}")
def process_bcg_sample(self, sample, timestamp):
"""
处理单个BCG样本点流式输入
输入
- sample: floatBCG信号值
- timestamp: floatUnix时间戳
"""
# 添加到缓冲区
self.bcg_buffer.append(sample)
# 检查是否应该处理
interval = self.config.realtime_config['processing_interval']
if time.time() - self.last_process_time < interval:
return
# 检查缓冲区是否已满
if not self.bcg_buffer.is_full():
return
# 执行处理
self._process_bcg_window(timestamp)
self.last_process_time = time.time()
def process_radar_frame(self, point_cloud, timestamp):
"""
处理雷达点云帧
输入
- point_cloud: [(x, y, z, velocity), ...]
- timestamp: float
"""
# 存在检测
is_present, confidence = self.radar_processor.detect_presence_in_bathroom(point_cloud)
# 起夜事件检测
visit_event = self.radar_processor.detect_bathroom_visit(timestamp, is_present)
if visit_event is not None:
# 记录起夜事件
self.bathroom_visits_today.append(visit_event)
# 上传到云端
self.cloud_uploader.add_to_buffer(visit_event)
# 打印日志
duration = visit_event.data['duration']
print(f" 🚽 [Bathroom Visit] Duration: {duration:.0f}s | Total today: {len(self.bathroom_visits_today)}")
# 定期上传
if self.cloud_uploader.should_upload():
self.cloud_uploader.upload()
def _process_bcg_window(self, timestamp):
"""处理BCG窗口数据"""
start_time = time.time()
# 获取窗口数据
bcg_signal = self.bcg_buffer.get_data()
# 1. 质量评估
quality = self.bcg_processor.assess_signal_quality(bcg_signal)
# 检查是否离床
if quality < self.config.bcg_config['离bed_threshold']:
if self.config.realtime_config['enable_logging']:
if time.time() - self.last_log_time > 60:
print(f" ⚠️ [BCG Quality] Low quality (离床): {quality:.2f}")
self.last_log_time = time.time()
return
# 2. 心率提取
hr, hr_conf, rr_intervals = self.bcg_processor.extract_heart_rate(bcg_signal)
if hr is None:
return
# 3. HRV特征
hrv_features = None
if rr_intervals is not None and len(rr_intervals) >= 10:
hrv_features = self.bcg_processor.calculate_hrv_features(rr_intervals)
# 4. 呼吸率提取
rr, rr_conf = self.bcg_processor.extract_respiratory_rate(bcg_signal)
# 5. 风险评估
if hrv_features is not None:
features = {
'rmssd': hrv_features['rmssd'],
'sdnn': hrv_features['sdnn'],
'mean_hr': hr,
'hr_surge': 0, # 暂时没有起夜心率数据
'bathroom_freq_today': len(self.bathroom_visits_today)
}
risk_score, risk_level, breakdown = self.risk_engine.calculate_risk_score(features)
# 6. 预警判断
should_alert, alert_msg = self.risk_engine.should_trigger_alert(
risk_score, risk_level, self.last_alert_time
)
if should_alert:
self._trigger_alert(alert_msg, risk_score, features)
self.last_alert_time = time.time()
# 7. 创建事件并上传
event = SensorEvent(
timestamp=timestamp,
event_type='BCG_REALTIME',
data={
'hr': hr,
'hr_confidence': hr_conf,
'rr': rr if rr else 0,
'rmssd': hrv_features['rmssd'],
'sdnn': hrv_features['sdnn'],
'pnn50': hrv_features['pnn50'],
'risk_score': risk_score,
'risk_level': risk_level,
'quality': quality
},
quality=quality
)
self.cloud_uploader.add_to_buffer(event)
# 8. 打印日志
if self.config.realtime_config['enable_logging']:
if time.time() - self.last_log_time > self.config.realtime_config['log_interval']:
latency = (time.time() - start_time) * 1000
self._print_status(hr, hrv_features, risk_score, risk_level, quality, latency)
self.last_log_time = time.time()
# 更新统计
self.stats['total_processed'] += 1
latency = (time.time() - start_time) * 1000
self.stats['avg_latency'] = (self.stats['avg_latency'] * 0.9 + latency * 0.1)
def _trigger_alert(self, message, risk_score, features):
"""触发预警"""
self.stats['total_alerts'] += 1
print(f"{'='*70}")
print(f"⚠️ 警报触发 #{self.stats['total_alerts']}")
print(f"{'='*70}")
print(message)
print(f"详细信息:")
print(f" 心率: {features['mean_hr']:.1f} bpm")
print(f" RMSSD: {features['rmssd']:.1f} ms")
print(f" SDNN: {features['sdnn']:.1f} ms")
print(f" 起夜次数(今晚): {features['bathroom_freq_today']}")
print(f" 风险评分: {risk_score:.3f}")
print(f"{'='*70}")
# 这里可以添加实际的预警机制(声音、推送等)
if self.config.alert_config['enable_sound']:
# 播放警报声音
pass
if self.config.alert_config['enable_push']:
# 发送推送通知
pass
def _print_status(self, hr, hrv_features, risk_score, risk_level, quality, latency):
"""打印状态信息"""
print(f"{'='*70}")
print(f"📊 实时状态更新 - {datetime.now().strftime('%H:%M:%S')}")
print(f"{'='*70}")
print(f"生理指标:")
print(f" 心率: {hr:.1f} bpm")
print(f" RMSSD: {hrv_features['rmssd']:.1f} ms")
print(f" SDNN: {hrv_features['sdnn']:.1f} ms")
print(f" pNN50: {hrv_features['pnn50']:.3f}")
print(f"行为指标:")
print(f" 起夜次数(今晚): {len(self.bathroom_visits_today)}")
print(f"风险评估:")
print(f" 风险评分: {risk_score:.3f}")
print(f" 风险等级: {risk_level}")
print(f" 信号质量: {quality:.2f}")
print(f"系统性能:")
print(f" 处理延迟: {latency:.1f} ms")
print(f" 累计处理: {self.stats['total_processed']}")
print(f" 累计预警: {self.stats['total_alerts']}")
print(f"{'='*70}")
def end_of_night_summary(self):
"""夜间结束总结"""
print(f"{'='*70}")
print(f"🌙 夜间监测总结")
print(f"{'='*70}")
print(f"监测时长: {(time.time() - self.night_start_time)/3600:.1f} 小时")
print(f"起夜次数: {len(self.bathroom_visits_today)}")
print(f"触发预警: {self.stats['total_alerts']}")
print(f"处理次数: {self.stats['total_processed']}")
print(f"平均延迟: {self.stats['avg_latency']:.1f} ms")
print(f"{'='*70}")
# 更新基线(如果有足够数据)
# TODO: 计算整晚特征并更新基线
# 重置当日计数
self.bathroom_visits_today = []
self.stats['total_alerts'] = 0
# ==================== 7. 模拟数据生成器(测试用) ====================
class EdgeDataSimulator:
"""边缘端数据模拟器(用于测试)"""
def __init__(self, config):
self.config = config
def generate_synthetic_bcg_stream(self, duration_sec, hr=70, scenario='normal'):
"""
生成合成BCG数据流
参数
- duration_sec: 时长
- hr: 心率bpm
- scenario: 'normal', 'high_risk', 'bathroom_visit'
"""
fs = self.config.bcg_config['sampling_rate']
total_samples = int(duration_sec * fs)
t = np.linspace(0, duration_sec, total_samples)
# 基础心跳信号
hr_freq = hr / 60.0
signal_bcg = np.sin(2 * np.pi * hr_freq * t)
# 呼吸信号
rr_freq = 15 / 60.0
signal_bcg += 0.5 * np.sin(2 * np.pi * rr_freq * t)
# 根据场景调整
if scenario == 'high_risk':
# 高风险降低HRV
signal_bcg += 0.05 * np.random.randn(total_samples)
elif scenario == 'bathroom_visit':
# 起夜:质量下降
signal_bcg = signal_bcg * 0.2 + 0.8 * np.random.randn(total_samples)
else:
# 正常
signal_bcg += 0.1 * np.random.randn(total_samples)
return signal_bcg, t
def generate_synthetic_radar_stream(self, duration_sec, bathroom_visits=[]):
"""
生成合成雷达数据流
参数
- duration_sec: 时长
- bathroom_visits: [(start_time, end_time), ...]
"""
fs = self.config.radar_config['sampling_rate']
total_frames = int(duration_sec * fs)
radar_frames = []
for i in range(total_frames):
current_time = i / fs
# 检查是否在起夜时间段
in_bathroom = False
for start, end in bathroom_visits:
if start <= current_time <= end:
in_bathroom = True
break
if in_bathroom:
# 生成厕所区域内的点云
zone = self.config.radar_config['bathroom_zone']
num_points = np.random.randint(10, 30)
point_cloud = []
for _ in range(num_points):
x = np.random.uniform(*zone['x_range'])
y = np.random.uniform(*zone['y_range'])
z = np.random.uniform(*zone['z_range'])
v = np.random.uniform(0.1, 0.5) # 有动作
point_cloud.append((x, y, z, v))
else:
# 生成空点云或随机噪声
point_cloud = []
radar_frames.append((current_time, point_cloud))
return radar_frames
# ==================== 8. 主程序 ====================
def run_realtime_simulation(duration_minutes=10):
"""
运行实时模拟测试用
参数
- duration_minutes: 模拟时长分钟
"""
print(f"{'='*70}")
print(f"🎬 启动边缘端实时模拟")
print(f"{'='*70}")
print(f"模拟时长: {duration_minutes} 分钟")
print(f"场景: 正常睡眠 + 2次起夜")
print(f"{'='*70}")
# 初始化控制器
controller = RealtimeController(config)
controller.night_start_time = time.time()
# 初始化模拟器
simulator = EdgeDataSimulator(config)
# 定义起夜时间(相对时间,秒)
bathroom_visits = [
(120, 180), # 2分钟开始持续1分钟
(420, 480) # 7分钟开始持续1分钟
]
# 生成BCG数据流
duration_sec = duration_minutes * 60
bcg_signal, bcg_time = simulator.generate_synthetic_bcg_stream(
duration_sec, hr=72, scenario='normal'
)
# 生成雷达数据流
radar_frames = simulator.generate_synthetic_radar_stream(
duration_sec, bathroom_visits=bathroom_visits
)
print(f"✓ 数据生成完成")
print(f" BCG样本: {len(bcg_signal)}")
print(f" 雷达帧: {len(radar_frames)}")
print(f"开始实时处理...")
# 模拟实时处理
bcg_fs = config.bcg_config['sampling_rate']
radar_fs = config.radar_config['sampling_rate']
bcg_idx = 0
radar_idx = 0
start_time = time.time()
while bcg_idx < len(bcg_signal):
current_time = time.time() - start_time
# 处理BCG样本更高频率
if bcg_idx < len(bcg_signal):
sample_time = start_time + bcg_time[bcg_idx]
controller.process_bcg_sample(bcg_signal[bcg_idx], sample_time)
bcg_idx += 1
# 处理雷达帧(较低频率)
expected_radar_idx = int(current_time * radar_fs)
if radar_idx < expected_radar_idx and radar_idx < len(radar_frames):
frame_time, point_cloud = radar_frames[radar_idx]
controller.process_radar_frame(point_cloud, start_time + frame_time)
radar_idx += 1
# 模拟实时间隔(加速模拟)
time.sleep(0.001) # 1ms实际应该是1/250秒
# 定期上传
if controller.cloud_uploader.should_upload():
controller.cloud_uploader.upload()
# 夜间总结
controller.end_of_night_summary()
print(f"{'='*70}")
print(f"✅ 模拟完成")
print(f"{'='*70}")
def main():
"""主函数"""
print(f"{'='*70}")
print(f"🏥 高血压风险评估系统 - 边缘端模块")
print(f"{'='*70}")
print(f"版本: {config.version}")
print(f"设备: {config.device_id}")
print(f"部署模式: 混合部署(边缘实时 + 云端深度分析)")
print(f"{'='*70}")
print("📋 使用说明:")
print("1. 本代码运行在RK3588边缘设备")
print("2. 实时处理BCG和雷达传感器数据")
print("3. 提供<1秒延迟的实时预警")
print("4. 定期上传匿名化数据到云端")
print()
print("🎮 运行模式:")
print(" - 模拟模式:使用合成数据测试系统")
print(" - 实际模式:连接真实传感器(需要硬件接口)")
print()
# 运行模拟
run_realtime_simulation(duration_minutes=10)
if __name__ == "__main__":
main()