From 06e31432833229f4f27bc7191b820cd407feffbd Mon Sep 17 00:00:00 2001 From: lyz <18813864538@163.com> Date: Tue, 13 Jan 2026 11:27:21 +0800 Subject: [PATCH] =?UTF-8?q?=E9=AB=98=E8=A1=80=E5=8E=8B=E8=AF=84=E4=BC=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../1- scripts/check_health.py | 49 + .../1- scripts/test_negative.py | 113 ++ .../1- scripts/test_report.py | 68 + .../1- scripts/test_upload.py | 119 ++ .../2-test_data/sample_missing_field.json | 8 + .../2-test_data/sample_normal.json | 39 + .../2-test_data/sample_wrong_type.json | 14 + .../3-docs/2-day-summary-template.md | 141 ++ .../3-docs/Bug-Report-Template.md | 202 +++ .../3-docs/Checklist-Cloud.md | 170 +++ .../3-docs/Checklist-Edge.md | 181 +++ .../3-docs/Test-Log-Template.md | 110 ++ .../cloud_hypertension_system-云端代码.py | 1243 ++++++++++++++++ .../edge_hypertension_system- 边缘端代码.py | 1255 +++++++++++++++++ .../requirements-依赖清单.txt | 8 + .../主指南(必读)-README_TEST.md | 231 +++ .../详细计划-TestPlan-2Days.md | 479 +++++++ 17 files changed, 4430 insertions(+) create mode 100644 0112-2026 - Test_Package/1- scripts/check_health.py create mode 100644 0112-2026 - Test_Package/1- scripts/test_negative.py create mode 100644 0112-2026 - Test_Package/1- scripts/test_report.py create mode 100644 0112-2026 - Test_Package/1- scripts/test_upload.py create mode 100644 0112-2026 - Test_Package/2-test_data/sample_missing_field.json create mode 100644 0112-2026 - Test_Package/2-test_data/sample_normal.json create mode 100644 0112-2026 - Test_Package/2-test_data/sample_wrong_type.json create mode 100644 0112-2026 - Test_Package/3-docs/2-day-summary-template.md create mode 100644 0112-2026 - Test_Package/3-docs/Bug-Report-Template.md create mode 100644 0112-2026 - Test_Package/3-docs/Checklist-Cloud.md create mode 100644 0112-2026 - Test_Package/3-docs/Checklist-Edge.md create mode 100644 0112-2026 - Test_Package/3-docs/Test-Log-Template.md create mode 100644 0112-2026 - Test_Package/cloud_hypertension_system-云端代码.py create mode 100644 0112-2026 - Test_Package/edge_hypertension_system- 边缘端代码.py create mode 100644 0112-2026 - Test_Package/requirements-依赖清单.txt create mode 100644 0112-2026 - Test_Package/主指南(必读)-README_TEST.md create mode 100644 0112-2026 - Test_Package/详细计划-TestPlan-2Days.md diff --git a/0112-2026 - Test_Package/1- scripts/check_health.py b/0112-2026 - Test_Package/1- scripts/check_health.py new file mode 100644 index 0000000..64d6eb7 --- /dev/null +++ b/0112-2026 - Test_Package/1- scripts/check_health.py @@ -0,0 +1,49 @@ +"""快速健康检查脚本 - 验证云端服务是否在运行""" + +import requests +import sys + +def check_server(url="http://127.0.0.1:5000"): + """检查云端服务器状态""" + print("="*60) + print("🔍 云端服务器健康检查") + print("="*60) + print(f"地址: {url}") + print("-" * 60) + + try: + response = requests.get(f"{url}/api/health", timeout=3) + + if response.status_code == 200: + data = response.json() + print("✅ 服务器正在运行!\n") + print(f"状态: {data.get('status')}") + print(f"版本: {data.get('version')}") + print(f"时间: {data.get('timestamp')}") + print("\n" + "="*60) + print("✅ 可以继续执行测试脚本") + print("="*60) + return True + else: + print(f"⚠️ 服务器响应异常: {response.status_code}") + return False + + except requests.exceptions.ConnectionError: + print("❌ 无法连接到服务器!\n") + print("可能的原因:") + print(" 1. 云端服务器没有启动") + print(" 2. 端口5000被占用") + print(" 3. 防火墙阻止了连接\n") + print("解决方法:") + print(" 请在独立的命令行窗口中运行:") + print(" → python cloud_hypertension_system.py\n") + print("="*60) + return False + + except Exception as e: + print(f"❌ 检查失败: {e}") + return False + +if __name__ == '__main__': + is_running = check_server() + sys.exit(0 if is_running else 1) diff --git a/0112-2026 - Test_Package/1- scripts/test_negative.py b/0112-2026 - Test_Package/1- scripts/test_negative.py new file mode 100644 index 0000000..e762ec1 --- /dev/null +++ b/0112-2026 - Test_Package/1- scripts/test_negative.py @@ -0,0 +1,113 @@ +"""异常场景测试脚本""" + +import requests +import json +import time + +CLOUD_URL = "http://127.0.0.1:5000/api/upload" + +def test_scenario(name, url, data, headers, expected_status): + """测试单个异常场景""" + print(f"\n{'='*60}") + print(f"测试场景: {name}") + print('='*60) + + try: + response = requests.post(url, json=data, headers=headers, timeout=5) + + print(f"状态码: {response.status_code}") + print(f"响应: {json.dumps(response.json(), indent=2, ensure_ascii=False)}") + + if response.status_code == expected_status: + print(f"✅ 通过(符合预期 {expected_status})") + return True + else: + print(f"❌ 失败(预期 {expected_status},实际 {response.status_code})") + return False + except Exception as e: + print(f"❌ 异常: {e}") + return False + +def main(): + """运行所有异常场景测试""" + print("="*60) + print("🧪 云端异常场景测试") + print("="*60) + + results = [] + current_time = time.time() + + # 场景1: 错误的API Key (401) + results.append(test_scenario( + "错误的API Key", + CLOUD_URL, + { + "device_id": "test_device", + "upload_time": "2026-01-12T10:00:00", + "events": [{"timestamp": current_time, "heart_rate": 72}], + "summary": {} + }, + { + "Authorization": "Bearer wrong_api_key_123456", + "Content-Type": "application/json" + }, + 401 + )) + + # 场景2: 缺少必填字段 (400) + results.append(test_scenario( + "缺少必填字段 (events)", + CLOUD_URL, + { + "device_id": "test_device", + "upload_time": "2026-01-12T10:00:00", + # 故意缺少 events + "summary": {} + }, + { + "Authorization": "Bearer edge_device_key_001", + "Content-Type": "application/json" + }, + 400 + )) + + # 场景3: 字段类型错误 (400) + results.append(test_scenario( + "字段类型错误 (heart_rate为字符串)", + CLOUD_URL, + { + "device_id": "test_device", + "upload_time": "2026-01-12T10:00:00", + "events": [{ + "timestamp": current_time, + "heart_rate": "not_a_number", # 故意用字符串 + "rmssd": 30, + "sdnn": 45 + }], + "summary": {} + }, + { + "Authorization": "Bearer edge_device_key_001", + "Content-Type": "application/json" + }, + 400 + )) + + # 总结 + print("\n" + "="*60) + print("📊 测试总结") + print("="*60) + print(f"总共测试: {len(results)} 个场景") + print(f"通过: {sum(results)} 个") + print(f"失败: {len(results) - sum(results)} 个") + + if all(results): + print("\n✅ 所有异常场景测试通过!") + return True + else: + print("\n⚠️ 部分测试未通过,请查看详细日志") + return False + +if __name__ == '__main__': + success = main() + exit(0 if success else 1) diff --git a/0112-2026 - Test_Package/1- scripts/test_report.py b/0112-2026 - Test_Package/1- scripts/test_report.py new file mode 100644 index 0000000..056ae5e --- /dev/null +++ b/0112-2026 - Test_Package/1- scripts/test_report.py @@ -0,0 +1,68 @@ +"""报告获取测试脚本""" + +import requests +import json + +# 配置 +CLOUD_URL = "http://127.0.0.1:5000/api/report" +API_KEY = "edge_device_key_001" +DEVICE_ID = "test_edge_device_001" + +def test_report(): + """测试报告获取""" + print("="*60) + print("📊 云端报告获取测试") + print("="*60) + + url = f"{CLOUD_URL}/{DEVICE_ID}" + headers = {"Authorization": f"Bearer {API_KEY}"} + + print(f"URL: {url}") + print(f"设备ID: {DEVICE_ID}") + print("-" * 60) + + try: + response = requests.get(url, headers=headers, timeout=30) + + print(f"\n状态码: {response.status_code}") + + if response.status_code == 200: + report = response.json() + + print("\n📄 报告摘要:") + print(f" 设备ID: {report.get('device_id')}") + + period = report.get('period', {}) + print(f" 分析周期: {period.get('start')} ~ {period.get('end')}") + print(f" 数据天数: {period.get('days')}") + + print(f" 最新风险评分: {report.get('latest_risk_score', 0):.3f}") + print(f" 风险等级: {report.get('risk_level')}") + + if 'medical_advice' in report and report['medical_advice']: + print(f"\n 医学建议:") + for advice in report['medical_advice']: + print(f" - {advice}") + + print("\n✅ 报告获取成功!") + print(f"\n完整报告路径: {report.get('report_path')}") + + # 保存到本地 + local_path = f"artifacts/cloud/report_{DEVICE_ID}.json" + with open(local_path, "w", encoding='utf-8') as f: + json.dump(report, f, indent=2, ensure_ascii=False) + print(f"本地副本: {local_path}") + + print("="*60) + return True + else: + print(f"\n❌ 获取失败: {response.json().get('error')}") + return False + + except Exception as e: + print(f"\n❌ 请求失败: {e}") + return False + +if __name__ == '__main__': + success = test_report() + exit(0 if success else 1) diff --git a/0112-2026 - Test_Package/1- scripts/test_upload.py b/0112-2026 - Test_Package/1- scripts/test_upload.py new file mode 100644 index 0000000..c94d590 --- /dev/null +++ b/0112-2026 - Test_Package/1- scripts/test_upload.py @@ -0,0 +1,119 @@ +"""上传测试脚本 - 测试正常数据上传""" + +import requests +import json +import time +from datetime import datetime + +# 配置 +CLOUD_URL = "http://127.0.0.1:5000/api/upload" +API_KEY = "edge_device_key_001" +DEVICE_ID = "test_edge_device_001" + +def test_upload(): + """测试正常上传""" + print("="*60) + print("📤 云端数据上传测试") + print("="*60) + + # 当前Unix时间戳 + current_time = time.time() + + # 构造测试数据 + test_data = { + "device_id": DEVICE_ID, + "upload_time": datetime.now().isoformat(), + "events": [ + { + "timestamp": current_time, + "heart_rate": 72.5, + "rmssd": 32.8, + "sdnn": 45.2, + "pnn50": 0.125, + "signal_quality": 0.87, + "risk_score": 0.285, + "risk_level": "LOW" + }, + { + "timestamp": current_time + 30, + "heart_rate": 74.2, + "rmssd": 30.5, + "sdnn": 42.8, + "pnn50": 0.115, + "signal_quality": 0.82, + "risk_score": 0.312, + "risk_level": "LOW" + }, + { + "timestamp": current_time + 60, + "heart_rate": 71.8, + "rmssd": 34.2, + "sdnn": 46.5, + "pnn50": 0.135, + "signal_quality": 0.89, + "risk_score": 0.265, + "risk_level": "LOW" + } + ], + "bathroom_events": [ + { + "start_time": current_time + 300, + "end_time": current_time + 375, + "duration": 75 + } + ], + "summary": { + "avg_hr": 72.8, + "avg_rmssd": 32.5, + "avg_risk_score": 0.287, + "total_alerts": 0 + } + } + + headers = { + "Authorization": f"Bearer {API_KEY}", + "Content-Type": "application/json" + } + + print(f"URL: {CLOUD_URL}") + print(f"设备ID: {DEVICE_ID}") + print(f"事件数: {len(test_data['events'])}") + print(f"起夜事件: {len(test_data['bathroom_events'])}") + print("-" * 60) + + try: + response = requests.post(CLOUD_URL, json=test_data, headers=headers, timeout=10) + + print(f"\n状态码: {response.status_code}") + print(f"响应内容:") + print(json.dumps(response.json(), indent=2, ensure_ascii=False)) + + if response.status_code == 200: + print("\n" + "="*60) + print("✅ 上传成功!") + print("="*60) + print("\n💡 提示:可以查看云端控制台,应该能看到接收日志") + + # 保存响应到文件 + with open('artifacts/cloud/upload_response.json', 'w', encoding='utf-8') as f: + json.dump(response.json(), f, indent=2, ensure_ascii=False) + print(f"✓ 响应已保存: artifacts/cloud/upload_response.json") + + return True + else: + print("\n" + "="*60) + print(f"❌ 上传失败: {response.json().get('error', '未知错误')}") + print("="*60) + return False + + except requests.exceptions.ConnectionError: + print("\n❌ 连接失败: 无法连接到云端服务器") + print("请先运行: python scripts/check_health.py") + return False + except Exception as e: + print(f"\n❌ 请求失败: {e}") + return False + +if __name__ == '__main__': + success = test_upload() + exit(0 if success else 1) diff --git a/0112-2026 - Test_Package/2-test_data/sample_missing_field.json b/0112-2026 - Test_Package/2-test_data/sample_missing_field.json new file mode 100644 index 0000000..d51cc46 --- /dev/null +++ b/0112-2026 - Test_Package/2-test_data/sample_missing_field.json @@ -0,0 +1,8 @@ +{ + "device_id": "test_edge_device_001", + "upload_time": "2026-01-12T10:00:00", + "_comment": "故意缺少 events 字段来测试错误处理", + "summary": { + "avg_hr": 72.0 + } +} diff --git a/0112-2026 - Test_Package/2-test_data/sample_normal.json b/0112-2026 - Test_Package/2-test_data/sample_normal.json new file mode 100644 index 0000000..f68e868 --- /dev/null +++ b/0112-2026 - Test_Package/2-test_data/sample_normal.json @@ -0,0 +1,39 @@ +{ + "device_id": "test_edge_device_001", + "upload_time": "2026-01-12T10:00:00", + "events": [ + { + "timestamp": 1704841234.0, + "heart_rate": 72.5, + "rmssd": 32.8, + "sdnn": 45.2, + "pnn50": 0.125, + "signal_quality": 0.87, + "risk_score": 0.285, + "risk_level": "LOW" + }, + { + "timestamp": 1704841264.0, + "heart_rate": 74.2, + "rmssd": 30.5, + "sdnn": 42.8, + "pnn50": 0.115, + "signal_quality": 0.82, + "risk_score": 0.312, + "risk_level": "LOW" + } + ], + "bathroom_events": [ + { + "start_time": 1704841534.0, + "end_time": 1704841609.0, + "duration": 75 + } + ], + "summary": { + "avg_hr": 73.4, + "avg_rmssd": 31.7, + "avg_risk_score": 0.299, + "total_alerts": 0 + } +} diff --git a/0112-2026 - Test_Package/2-test_data/sample_wrong_type.json b/0112-2026 - Test_Package/2-test_data/sample_wrong_type.json new file mode 100644 index 0000000..3435546 --- /dev/null +++ b/0112-2026 - Test_Package/2-test_data/sample_wrong_type.json @@ -0,0 +1,14 @@ +{ + "device_id": "test_edge_device_001", + "upload_time": "2026-01-12T10:00:00", + "events": [ + { + "timestamp": 1704841234.0, + "heart_rate": "seventy_two", + "_comment": "heart_rate应该是数字,这里故意用字符串", + "rmssd": 32.8, + "sdnn": 45.2 + } + ], + "summary": {} +} diff --git a/0112-2026 - Test_Package/3-docs/2-day-summary-template.md b/0112-2026 - Test_Package/3-docs/2-day-summary-template.md new file mode 100644 index 0000000..7925470 --- /dev/null +++ b/0112-2026 - Test_Package/3-docs/2-day-summary-template.md @@ -0,0 +1,141 @@ +# 测试总结报告 + +**项目名称:** 高血压风险评估系统 +**测试周期:** 2026-01-13 ~ 2026-01-15 +**测试人员:** 成员A(云端)、成员B(边缘端) +**报告日期:** 2026-01-15 + +--- + +## 执行概况 + +| 维度 | 计划 | 实际 | 完成率 | +|------|------|------|--------| +| 测试用例总数 | XX | XX | XX% | +| 通过用例 | - | XX | - | +| 失败用例 | - | XX | - | +| 阻塞用例 | - | XX | - | + +--- + +## 核心问题:能否演示闭环? + +### 回答(选一个) + +- [ ] ✅ **能** - 可以演示完整的边缘端→云端→报告闭环 +- [ ] ⚠️ **部分能** - 主要功能可用,但有XX个问题 +- [ ] ❌ **不能** - 存在阻塞性问题,无法演示 + +### 演示路径描述 + +[如果能演示,简要描述演示步骤;如果不能,说明缺失哪一环] + +--- + +## 最大阻塞问题 Top 3 + +### 问题 #1(P0) + +**问题描述:** [一句话] + +**影响范围:** 云端 / 边缘端 / 联调 + +**阻塞原因:** [为什么这个问题会阻塞测试] + +**建议方案:** [如何修复] + +**预计修复时间:** X小时/天 + +--- + +### 问题 #2(P0/P1) + +[同上格式] + +--- + +### 问题 #3(P0/P1) + +[同上格式] + +--- + +## 下一步行动(传感器接入前最先修的3个点) + +### 优先级 P0(必须修) + +1. **[问题标题]** - Bug #XXX + - 为什么必须修:[说明] + - 责任人:[谁来修] + - 完成时间:[预计] + +2. ... + +### 优先级 P1(建议修) + +1. **[问题标题]** - Bug #XXX + - 为什么建议修:[说明] + +--- + +## 测试覆盖情况 + +### 云端模块 + +| 功能模块 | 测试情况 | 结果 | +|---------|---------|------| +| 服务启动 | ✅ 已测试 | 通过 | +| 健康检查API | ✅ 已测试 | 通过 | +| 数据上传API | ✅ 已测试 | 通过/失败 | +| 报告生成API | ✅ 已测试 | 通过/失败 | +| 异常场景 | ✅ 已测试 | 部分通过 | +| 数据库落库 | ✅ 已测试 | 通过 | + +### 边缘端模块 + +| 功能模块 | 测试情况 | 结果 | +|---------|---------|------| +| 本地运行稳定性 | ✅ 已测试 | 通过 | +| 生理指标计算 | ✅ 已测试 | 通过 | +| 风险评估 | ✅ 已测试 | 通过 | +| 起夜检测 | ✅ 已测试 | 通过 | +| 云端上传 | ✅ 已测试 | 通过/失败 | +| 容错处理 | ✅ 已测试 | 部分通过 | + +### 端到端测试 + +| 测试场景 | 测试情况 | 结果 | +|---------|---------|------| +| 完整闭环 | ✅ 已测试 | 通过/失败 | +| 失败注入 | ✅ 已测试 | 部分通过 | + +--- + +## 团队协作评价 + +**沟通效率:** ⭐⭐⭐⭐⭐ / 5 +**任务分工:** 清晰 / 一般 / 不清晰 +**进度把控:** 按时 / 延期 + +**改进建议:** +- [如果有的话] + +--- + +## 附件清单 + +- [ ] Test-Log.md(完整测试记录) +- [ ] Bug-List.xlsx(缺陷清单) +- [ ] Checklist-Cloud.md(云端检查表) +- [ ] Checklist-Edge.md(边缘端检查表) +- [ ] artifacts/目录(所有日志、截图、数据库文件) + +--- + + 测试人员签字 + +**成员叶(云端):** _________ 日期:_________ + +**成员吕(边缘端):** _________ 日期:_________ + + diff --git a/0112-2026 - Test_Package/3-docs/Bug-Report-Template.md b/0112-2026 - Test_Package/3-docs/Bug-Report-Template.md new file mode 100644 index 0000000..0b6f7c7 --- /dev/null +++ b/0112-2026 - Test_Package/3-docs/Bug-Report-Template.md @@ -0,0 +1,202 @@ +# Bug报告模板 + +**使用说明:** 每发现一个Bug,复制下面的模板填写 + +--- + +## Bug #[编号] + +**标题:** [一句话描述问题] + +**严重程度:** +- [ ] P0 - 阻塞性(系统崩溃/无法启动/核心功能不可用) +- [ ] P1 - 严重(主要功能异常/数据错误) +- [ ] P2 - 一般(界面问题/提示不清晰/性能问题) +- [ ] P3 - 建议(优化建议/文档问题) + +**发现时间:** 2026-01-XX XX:XX + +**发现人:** [你的名字] + +**影响范围:** +- [ ] 云端 +- [ ] 边缘端 +- [ ] 端到端联调 +- [ ] 其他:__________ + +--- + +### 问题描述 + +[清晰描述问题是什么,用户会看到什么] + +--- + +### 复现步骤 + +**前置条件:** +- [例如:云端服务正在运行] +- [例如:已安装所有依赖] + +**复现步骤:** +1. [第一步] +2. [第二步] +3. [第三步] + +--- + +### 预期结果 + +[应该发生什么] + +--- + +### 实际结果 + +[实际发生了什么] + +--- + +### 环境信息 + +**操作系统:** Windows 10 / macOS XX / Linux +**Python版本:** 3.x.x +**关键依赖版本:** +- Flask: x.x.x +- numpy: x.x.x + +**配置信息:** +- device_id: xxxx +- API Key: xxxx(前4位) +- 云端地址: xxxx + +--- + +### 日志与截图 + +**错误日志:** +``` +[粘贴关键错误日志,至少前后20行] +``` + +**完整日志文件:** +- `artifacts/xxx/error.log` + +**截图:** +- `artifacts/xxx/screenshot.png` + +--- + +### 建议解决方案 + +[如果你有想法,写下来;没有也可以留空] + +--- + +### 状态跟踪 + +- [ ] 已报告 +- [ ] 开发中 +- [ ] 已修复 +- [ ] 已验证 +- [ ] 已关闭 + +**修复责任人:** __________ +**预计修复时间:** __________ + +--- +--- + +## Bug示例(参考) + +### Bug #001 + +**标题:** 云端启动时数据库连接失败 + +**严重程度:** P0 - 阻塞性 + +**发现时间:** 2026-01-12 10:15 + +**发现人:** 测试成员A + +**影响范围:** ✅ 云端 + +--- + +### 问题描述 + +运行 `python cloud_hypertension_system.py` 后,系统报错退出,提示数据库连接失败。 + +--- + +### 复现步骤 + +**前置条件:** +- Python 3.8已安装 +- 已执行 `pip install -r requirements.txt` + +**复现步骤:** +1. 打开PowerShell +2. 导航到项目目录 +3. 执行 `python cloud_hypertension_system.py` + +--- + +### 预期结果 + +云端服务正常启动,显示: +``` +✓ 数据库初始化成功 +Running on http://127.0.0.1:5000 +``` + +--- + +### 实际结果 + +程序报错退出: +``` +sqlite3.OperationalError: unable to open database file +``` + +--- + +### 环境信息 + +**操作系统:** Windows 10 +**Python版本:** 3.9.7 +**SQLite版本:** 3.35.5 + +--- + +### 日志与截图 + +**错误日志:** +``` +Traceback (most recent call last): + File "cloud_hypertension_system.py", line 850, in init_database + self.db_manager = DatabaseManager(self.config.database_config) + File "cloud_hypertension_system.py", line 125, in __init__ + self.engine = create_engine(...) +sqlite3.OperationalError: unable to open database file +``` + +**完整日志:** `artifacts/cloud/startup_error.log` + +--- + +### 建议解决方案 + +检查数据库文件路径是否存在写权限。可能需要: +1. 手动创建数据库目录 +2. 或者修改数据库路径到用户目录 + +--- + +### 状态跟踪 + +- [x] 已报告(2026-01-12) +- [ ] 开发中 +- [ ] 已修复 +- [ ] 已验证 +- [ ] 已关闭 diff --git a/0112-2026 - Test_Package/3-docs/Checklist-Cloud.md b/0112-2026 - Test_Package/3-docs/Checklist-Cloud.md new file mode 100644 index 0000000..77e9e49 --- /dev/null +++ b/0112-2026 - Test_Package/3-docs/Checklist-Cloud.md @@ -0,0 +1,170 @@ +# 云端测试检查表 - Checklist (Cloud) + +**测试人员:** ___________ +**测试日期:** 2026-01-XX +**系统版本:** v1.0 + +**使用说明:** 完成一项勾选一项,全部完成后签字 + +--- + +## 环境准备 + +- [ ] Python 3.8+已安装 +- [ ] 依赖已安装(`pip install -r requirements.txt`) +- [ ] 项目目录结构完整 +- [ ] 有独立的测试目录(避免污染生产数据) + +--- + +## 基础功能测试 + +### 服务启动 + +- [ ] 能成功启动云端服务 +- [ ] 启动日志无错误 +- [ ] 数据库初始化成功 +- [ ] API端点注册成功 +- [ ] 监听在正确端口(5000) + +**证据文件:** `artifacts/cloud/startup.png` + +--- + +### API测试 + +#### `/api/health` - 健康检查 + +- [ ] 返回状态码200 +- [ ] 响应包含 `"status": "healthy"` +- [ ] 响应包含版本号 +- [ ] 响应包含时间戳 + +**证据文件:** `artifacts/cloud/health.json` + +--- + +#### `POST /api/upload` - 数据上传 + +##### 正常场景 + +- [ ] 使用正确API Key能上传成功 +- [ ] 返回状态码200 +- [ ] 响应包含 `"status": "success"` +- [ ] 响应包含接收的记录数 +- [ ] 云端控制台显示接收日志 + +**证据文件:** `artifacts/cloud/upload_response.json` + +##### 异常场景 + +- [ ] 错误API Key返回401 +- [ ] 缺少必填字段返回400 +- [ ] 字段类型错误返回400 +- [ ] 错误信息清晰可操作 +- [ ] 所有异常都有适当的HTTP状态码(不是500) + +**证据文件:** `artifacts/cloud/negative_tests.md` + +--- + +#### `GET /api/report/` - 获取报告 + +- [ ] 有数据时能成功生成报告 +- [ ] 返回状态码200 +- [ ] 报告包含基础统计 +- [ ] 报告包含时间范围 +- [ ] 报告包含设备ID +- [ ] 报告文件已保存到 `./reports/` 目录 + +**证据文件:** `artifacts/cloud/report_sample.json` + +##### 边界情况 + +- [ ] 无数据时返回友好提示 +- [ ] 不存在的设备返回404或提示信息 + +--- + +### 数据库功能 + +- [ ] 能成功创建数据库文件 +- [ ] 数据表结构正确 +- [ ] 上传数据能正确落库 +- [ ] 记录数与上传数一致 +- [ ] 时间戳格式正确 +- [ ] 能查询已存数据 + +**证据文件:** `artifacts/cloud/db_check.txt` + +##### 数据完整性 + +- [ ] 重复上传行为符合预期(幂等/累加/拒绝) +- [ ] 数据类型正确(int/float/str) +- [ ] 外键关系正确 + +--- + +## 稳定性测试 + +- [ ] 连续运行2小时无崩溃 +- [ ] 内存使用稳定(无泄漏) +- [ ] CPU占用合理(<50%) +- [ ] 多次上传后性能无明显下降 + +**证据文件:** `artifacts/cloud/stability.log` + +--- + +## 日志与监控 + +- [ ] 关键操作有日志记录 +- [ ] 日志时间戳正确 +- [ ] 错误日志包含堆栈信息 +- [ ] 日志格式统一 +- [ ] 敏感信息已脱敏(API Key等) + +--- + +## 配置与部署 + +- [ ] 配置文件结构清晰 +- [ ] 可以修改端口 +- [ ] 可以修改数据库路径 +- [ ] 可以添加/删除API Key +- [ ] 配置变更后能生效 + +--- + +## 文档完整性 + +- [ ] README说明清晰 +- [ ] API文档存在且准确 +- [ ] 错误码说明完整 +- [ ] 有使用示例 + +--- + +## 总结 + +### 统计 + +- 总测试项:XX +- 通过:XX +- 失败:XX +- 阻塞:XX + +### P0问题(必须修复) + +1. [问题描述 + Bug编号] +2. ... + +### 验收结论 + +- [ ] ✅ 云端基本功能可用,可以进入联调 +- [ ] ⚠️ 有问题但不阻塞联调,可以带问题联调 +- [ ] ❌ 存在阻塞问题,必须先修复 + +**签字:** +测试人员:_________ 日期:_________ +复核人员:_________ 日期:_________ diff --git a/0112-2026 - Test_Package/3-docs/Checklist-Edge.md b/0112-2026 - Test_Package/3-docs/Checklist-Edge.md new file mode 100644 index 0000000..aa58f8f --- /dev/null +++ b/0112-2026 - Test_Package/3-docs/Checklist-Edge.md @@ -0,0 +1,181 @@ +# 边缘端测试检查表 - Checklist (Edge) + +**测试人员:** ___________ +**测试日期:** 2026-01-XX +**系统版本:** v1.0 + +**使用说明:** 完成一项勾选一项,全部完成后签字 + +--- + +## 环境准备 + +- [ ] Python 3.8+已安装 +- [ ] 依赖已安装(`pip install numpy scipy`) +- [ ] 项目目录结构完整 +- [ ] 配置文件可访问 + +--- + +## 基础功能测试 + +### 本地运行(模拟模式) + +- [ ] 能成功启动边缘端 +- [ ] 启动日志无错误 +- [ ] 显示初始化成功信息 +- [ ] 进入实时处理循环 + +**证据文件:** `artifacts/edge/startup.png` + +--- + +### 数据生成与处理 + +#### 生理指标输出 + +- [ ] 每30秒输出一次实时状态 +- [ ] 心率在合理范围(50-100 bpm) +- [ ] RMSSD有数值(>0) +- [ ] SDNN有数值(>0) +- [ ] pNN50有数值(0-1之间) +- [ ] 信号质量有数值(0-1之间) + +**证据文件:** `artifacts/edge/screenshots/normal_output.png` + +--- + +#### 风险评估 + +- [ ] 风险评分在0-1范围内 +- [ ] 风险等级显示(LOW/MEDIUM/HIGH) +- [ ] 风险评分随指标变化 +- [ ] 高风险时能触发告警(模拟) + +**证据文件:** `artifacts/edge/screenshots/risk_alert.png` + +--- + +#### 起夜检测 + +- [ ] 能检测到起夜事件(🚽) +- [ ] 记录起夜时长 +- [ ] 记录起夜次数(今晚累计) +- [ ] 起夜时BCG质量下降 +- [ ] 起夜结束后BCG质量恢复 + +**证据文件:** `artifacts/edge/screenshots/bathroom_event.png` + +--- + +### 系统性能 + +- [ ] 处理延迟 <500ms(大部分时间) +- [ ] CPU占用 <30% +- [ ] 内存占用 <500MB +- [ ] 无内存泄漏(长时间运行) + +**证据文件:** `artifacts/edge/performance.log` + +--- + +## 稳定性测试 + +- [ ] 连续运行1小时无崩溃 +- [ ] 至少看到1次起夜事件 +- [ ] 至少看到120次实时状态输出(1小时) +- [ ] 无Python异常 +- [ ] 无数据异常(NaN, Inf等) + +**证据文件:** `artifacts/edge/edge_1hour.log` + +--- + +## 上传功能测试 + +### 云端连接(与成员A协同) + +- [ ] 能连接到云端(与成员A确认云端在运行) +- [ ] 配置正确(URL, API Key, device_id) +- [ ] 能成功上传数据 +- [ ] 云端控制台显示接收日志 +- [ ] 上传频率正确(每5分钟) + +**证据文件:** `artifacts/edge/upload_success.log` + +--- + +### 上传容错 + +#### 云端不可达 + +- [ ] 云端关闭时,边缘端显示连接失败 +- [ ] 边缘端继续本地处理(不崩溃) +- [ ] 错误信息清晰 +- [ ] 云端恢复后能继续上传 + +**证据文件:** `artifacts/edge/upload_fail.log` + +#### API认证失败 + +- [ ] 错误API Key时显示401错误 +- [ ] 错误信息包含"Unauthorized" +- [ ] 边缘端继续本地处理 + +**证据文件:** `artifacts/edge/auth_fail.log` + +--- + +## 数据质量 + +- [ ] 心率数据合理(无异常跳变) +- [ ] HRV数据合理(符合生理范围) +- [ ] 时间戳单调递增 +- [ ] 无数据重复 +- [ ] 信号质量评估合理 + +--- + +## 配置与可调性 + +- [ ] 可以修改device_id +- [ ] 可以修改云端URL +- [ ] 可以修改上传间隔 +- [ ] 可以开关云端上传 +- [ ] 配置变更后能生效 + +--- + +## 日志可读性 + +- [ ] 日志输出清晰 +- [ ] 关键事件有日志(启动、起夜、上传、告警) +- [ ] 日志时间戳正确 +- [ ] 错误日志包含详细信息 +- [ ] 日志格式统一 + +--- + +## 总结 + +### 统计 + +- 总测试项:XX +- 通过:XX +- 失败:XX +- 阻塞:XX + +### P0问题(必须修复) + +1. [问题描述 + Bug编号] +2. ... + +### 验收结论 + +- [ ] ✅ 边缘端基本功能可用,可以进入联调 +- [ ] ⚠️ 有问题但不阻塞联调,可以带问题联调 +- [ ] ❌ 存在阻塞问题,必须先修复 + +**签字:** +测试人员:_________ 日期:_________ +复核人员:_________ 日期:_________ diff --git a/0112-2026 - Test_Package/3-docs/Test-Log-Template.md b/0112-2026 - Test_Package/3-docs/Test-Log-Template.md new file mode 100644 index 0000000..2c771e9 --- /dev/null +++ b/0112-2026 - Test_Package/3-docs/Test-Log-Template.md @@ -0,0 +1,110 @@ +# 测试记录 - Test Log + +**测试人员:** [你的名字] +**测试日期:** 2026-01-XX +**系统版本:** v1.0 +**测试环境:** Windows 10 / macOS / Linux + Python 3.x + +--- + +## Day 1 - [日期] + +### 09:30 - 10:00 | 环境准备 + +**操作:** +- [ ] 安装依赖:`pip install -r requirements.txt` +- [ ] 验证Python版本:`python --version` + +**结果:** +``` +[粘贴命令输出] +``` + +**问题:** +- 无 / [描述遇到的问题] + +--- + +### 10:00 - 12:00 | 上午测试 + +#### Test 1.1:[测试名称] + +**时间:** 10:05 - 10:20 +**目标:** [本次测试要验证什么] + +**操作步骤:** +1. [步骤1] +2. [步骤2] +3. [步骤3] + +**预期结果:** +- [预期1] +- [预期2] + +**实际结果:** +- [实际1] +- [实际2] + +**状态:** ✅ 通过 / ❌ 失败 / ⚠️ 部分通过 + +**证据文件:** +- `artifacts/xxx/xxx.log` +- `artifacts/xxx/screenshot.png` + +**备注:** +[任何额外的观察或注释] + +--- + +#### Test 1.2:[下一个测试] + +**时间:** 10:25 - 10:40 +...(重复上面的格式) + +--- + +### 13:30 - 15:30 | 下午测试 + +[继续按同样格式记录] + +--- + +## Day 2 - [日期] + +[继续记录Day 2的测试] + +--- + +## 测试总结 + +### 统计 + +| 类别 | 总数 | 通过 | 失败 | 部分通过 | +|------|------|------|------|---------| +| 云端测试 | X | X | X | X | +| 边缘端测试 | X | X | X | X | +| 联调测试 | X | X | X | X | +| **合计** | **X** | **X** | **X** | **X** | + +### 主要发现 + +**关键问题(P0):** +1. [问题描述 + Bug编号] +2. ... + +**一般问题(P1):** +1. ... + +**建议改进(P2):** +1. ... + +### 验收结论 + +- [ ] 云端基本功能可用 +- [ ] 边缘端基本功能可用 +- [ ] 端到端闭环通过 +- [ ] 可以进入下一阶段(传感器接入) + +**签字:** +测试人员:_________ 日期:_________ +复核人员:_________ 日期:_________ diff --git a/0112-2026 - Test_Package/cloud_hypertension_system-云端代码.py b/0112-2026 - Test_Package/cloud_hypertension_system-云端代码.py new file mode 100644 index 0000000..b0b55d6 --- /dev/null +++ b/0112-2026 - Test_Package/cloud_hypertension_system-云端代码.py @@ -0,0 +1,1243 @@ +# -*- coding: utf-8 -*- +""" +高血压风险评估系统 - 云端深度分析模块 +Hypertension Risk Assessment - Cloud Deep Analysis Module + +Created: 2026-01-10 +Author: KKRobot Healthcare AI Team +Version: v1.0_cloud_analytics + +📌 部署架构:混合部署(云端组件) +- 边缘端:RK3588设备,实时预警 +- 云端(本代码):服务器,深度分析与长期趋势 + +🎯 核心功能: +1. 接收边缘端上传的匿名化数据(REST API) +2. HRV深度分析(时域+频域指标) +3. 机器学习风险预测模型 +4. 长期趋势分析与可视化 +5. 个性化基线优化 +6. 自动生成诊断报告 +7. 参数优化与下发 + +💻 技术栈: +- Web框架:Flask(轻量级REST API) +- 数据库:SQLite(可切换PostgreSQL) +- 机器学习:PyTorch(回归模型) +- 数据分析:pandas, scipy +- 可视化:matplotlib + +📦 运行要求: +pip install flask numpy scipy pandas torch matplotlib sqlalchemy +python cloud_hypertension_system.py + +🔐 安全特性: +- API Key认证 +- 数据匿名化验证 +- SQL注入防护 +- HTTPS支持(生产环境) +""" + +import numpy as np +import pandas as pd +from scipy import signal +from scipy.stats import entropy +import torch +import torch.nn as nn +import torch.optim as optim +from flask import Flask, request, jsonify +from sqlalchemy import create_engine, Column, Integer, Float, String, DateTime, Text +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker +from datetime import datetime, timedelta +import json +import os +import hashlib +import warnings +import matplotlib +matplotlib.use('Agg') # 无GUI后端 +import matplotlib.pyplot as plt +from collections import defaultdict +import pickle + +warnings.filterwarnings("ignore") + +# ==================== 0. 云端核心配置 ==================== + +class CloudConfig: + """云端服务器配置""" + + def __init__(self): + # 版本标识 + self.version = "v1.0_cloud_analytics" + + # ========== API服务配置 ========== + self.api_config = { + 'host': '0.0.0.0', + 'port': 5000, + 'debug': False, + 'api_keys': [ + 'your_api_key_here', + 'edge_device_key_001' + ], + 'max_request_size': 10 * 1024 * 1024, # 10MB + 'enable_cors': True + } + + # ========== 数据库配置 ========== + self.database_config = { + 'type': 'sqlite', # 'sqlite' or 'postgresql' + 'sqlite_path': './cloud_database.db', + 'postgresql_url': 'postgresql://user:password@localhost/hypertension_db', + 'echo': False # SQL日志 + } + + # ========== HRV深度分析配置 ========== + self.hrv_analysis_config = { + 'enable_frequency_domain': True, + 'frequency_bands': { + 'vlf': (0.003, 0.04), # 超低频 + 'lf': (0.04, 0.15), # 低频 + 'hf': (0.15, 0.4) # 高频 + }, + 'psd_method': 'welch', + 'detrend': True + } + + # ========== 机器学习模型配置 ========== + self.ml_config = { + 'model_type': 'neural_network', # 'neural_network' or 'random_forest' + 'input_features': [ + 'rmssd', 'sdnn', 'pnn50', 'mean_hr', + 'lf_power', 'hf_power', 'lf_hf_ratio', + 'bathroom_freq', 'bathroom_duration' + ], + 'hidden_dims': [64, 32, 16], + 'learning_rate': 0.001, + 'batch_size': 32, + 'epochs': 50, + 'validation_split': 0.2, + 'model_save_path': './models/cloud_model.pth' + } + + # ========== 趋势分析配置 ========== + self.trend_config = { + 'min_days_for_trend': 7, # 最少7天才计算趋势 + 'baseline_window': 14, # 14天滚动基线 + 'alert_threshold_increase': 0.15, # 风险评分上升15%触发警报 + 'smoothing_window': 3 # 3天移动平均 + } + + # ========== 报告生成配置 ========== + self.report_config = { + 'report_dir': './reports', + 'chart_dir': './charts', + 'template_path': './templates/report_template.html', + 'export_formats': ['pdf', 'html', 'json'] + } + + # ========== 参数优化配置 ========== + self.optimization_config = { + 'enable_auto_tuning': True, + 'tune_interval_days': 7, # 每7天优化一次 + 'min_samples_for_tuning': 50, + 'performance_metrics': ['sensitivity', 'specificity', 'auc'] + } + + # 创建必要目录 + for dir_path in [self.report_config['report_dir'], + self.report_config['chart_dir'], + os.path.dirname(self.ml_config['model_save_path'])]: + os.makedirs(dir_path, exist_ok=True) + +# 全局配置实例 +config = CloudConfig() + +# ==================== 1. 数据库模型层 ==================== + +Base = declarative_base() + +class Device(Base): + """设备表(匿名化)""" + __tablename__ = 'devices' + + id = Column(Integer, primary_key=True) + device_id = Column(String(100), unique=True, nullable=False) # 匿名ID + device_type = Column(String(50)) # 'RK3588', 'RK3568', etc. + registration_date = Column(DateTime, default=datetime.now) + last_upload = Column(DateTime) + total_uploads = Column(Integer, default=0) + +class SensorData(Base): + """传感器数据表""" + __tablename__ = 'sensor_data' + + id = Column(Integer, primary_key=True) + device_id = Column(String(100), nullable=False) + timestamp = Column(DateTime, nullable=False) + event_type = Column(String(50)) # 'BCG_REALTIME', 'BATHROOM_VISIT' + + # BCG生理指标 + heart_rate = Column(Float) + rmssd = Column(Float) + sdnn = Column(Float) + pnn50 = Column(Float) + respiratory_rate = Column(Float) + + # 雷达行为指标 + bathroom_freq = Column(Integer) + bathroom_duration = Column(Float) + + # 风险评估 + risk_score = Column(Float) + risk_level = Column(String(20)) + signal_quality = Column(Float) + + # 原始数据(JSON) + raw_data = Column(Text) + +class NightlySummary(Base): + """夜间汇总表""" + __tablename__ = 'nightly_summary' + + id = Column(Integer, primary_key=True) + device_id = Column(String(100), nullable=False) + date = Column(String(20), nullable=False) # 'YYYY-MM-DD' + + # BCG汇总 + mean_hr = Column(Float) + min_hr = Column(Float) + max_hr = Column(Float) + mean_rmssd = Column(Float) + mean_sdnn = Column(Float) + mean_pnn50 = Column(Float) + + # HRV频域指标 + vlf_power = Column(Float) + lf_power = Column(Float) + hf_power = Column(Float) + lf_hf_ratio = Column(Float) + + # 行为汇总 + total_bathroom_visits = Column(Integer) + total_bathroom_duration = Column(Float) + avg_bathroom_duration = Column(Float) + first_bathroom_hour = Column(Float) + + # 风险评估 + mean_risk_score = Column(Float) + max_risk_score = Column(Float) + dominant_risk_level = Column(String(20)) + + # 数据质量 + valid_data_hours = Column(Float) + total_data_points = Column(Integer) + +class RiskAlert(Base): + """风险警报表""" + __tablename__ = 'risk_alerts' + + id = Column(Integer, primary_key=True) + device_id = Column(String(100), nullable=False) + timestamp = Column(DateTime, nullable=False) + alert_type = Column(String(50)) # 'HIGH_RISK', 'TREND_INCREASE', 'ANOMALY' + risk_score = Column(Float) + message = Column(Text) + is_read = Column(Integer, default=0) + +class DatabaseManager: + """数据库管理器""" + + def __init__(self, config): + self.config = config + + # 创建引擎 + if config.database_config['type'] == 'sqlite': + db_url = f"sqlite:///{config.database_config['sqlite_path']}" + else: + db_url = config.database_config['postgresql_url'] + + self.engine = create_engine( + db_url, + echo=config.database_config['echo'] + ) + + # 创建表 + Base.metadata.create_all(self.engine) + + # 创建会话工厂 + Session = sessionmaker(bind=self.engine) + self.session = Session() + + print(f"✓ 数据库初始化成功: {config.database_config['type']}") + + def add_sensor_data(self, device_id, event): + """添加传感器数据""" + data = SensorData( + device_id=device_id, + timestamp=datetime.fromtimestamp(event['timestamp']), + event_type=event['event_type'], + heart_rate=event['data'].get('hr'), + rmssd=event['data'].get('rmssd'), + sdnn=event['data'].get('sdnn'), + pnn50=event['data'].get('pnn50'), + respiratory_rate=event['data'].get('rr'), + bathroom_freq=event['data'].get('bathroom_freq'), + bathroom_duration=event['data'].get('bathroom_duration'), + risk_score=event['data'].get('risk_score'), + risk_level=event['data'].get('risk_level'), + signal_quality=event.get('quality', 1.0), + raw_data=json.dumps(event['data']) + ) + self.session.add(data) + self.session.commit() + + def get_device_data(self, device_id, start_date, end_date): + """获取设备数据""" + query = self.session.query(SensorData).filter( + SensorData.device_id == device_id, + SensorData.timestamp >= start_date, + SensorData.timestamp <= end_date + ) + return query.all() + + def get_nightly_summaries(self, device_id, days=30): + """获取夜间汇总""" + query = self.session.query(NightlySummary).filter( + NightlySummary.device_id == device_id + ).order_by(NightlySummary.date.desc()).limit(days) + return query.all() + + def add_nightly_summary(self, summary_data): + """添加夜间汇总""" + summary = NightlySummary(**summary_data) + self.session.add(summary) + self.session.commit() + + def add_risk_alert(self, device_id, alert_type, risk_score, message): + """添加风险警报""" + alert = RiskAlert( + device_id=device_id, + timestamp=datetime.now(), + alert_type=alert_type, + risk_score=risk_score, + message=message + ) + self.session.add(alert) + self.session.commit() + +# ==================== 2. HRV深度分析层 ==================== + +class HRVDeepAnalyzer: + """HRV深度分析器(频域+非线性)""" + + def __init__(self, config): + self.config = config + self.freq_bands = config.hrv_analysis_config['frequency_bands'] + + def calculate_frequency_domain(self, rr_intervals, fs=4.0): + """ + 计算频域HRV指标 + + 参数: + - rr_intervals: RR间期数组(ms) + - fs: 重采样率(Hz) + + 返回:{'vlf_power', 'lf_power', 'hf_power', 'lf_hf_ratio', ...} + """ + try: + if len(rr_intervals) < 20: + return None + + # 1. 转换为时间序列 + rr_sec = rr_intervals / 1000.0 + cum_time = np.cumsum(rr_sec) + cum_time = cum_time - cum_time[0] + + # 2. 插值到均匀采样 + time_uniform = np.arange(0, cum_time[-1], 1.0/fs) + interp_func = interp1d(cum_time, rr_sec, kind='cubic', fill_value='extrapolate') + rr_uniform = interp_func(time_uniform) + + # 3. 去趋势 + if self.config.hrv_analysis_config['detrend']: + rr_uniform = signal.detrend(rr_uniform) + + # 4. Welch功率谱估计 + freqs, psd = signal.welch( + rr_uniform, + fs=fs, + nperseg=min(256, len(rr_uniform)), + scaling='density' + ) + + # 5. 计算各频段能量 + vlf_mask = (freqs >= self.freq_bands['vlf'][0]) & (freqs < self.freq_bands['vlf'][1]) + lf_mask = (freqs >= self.freq_bands['lf'][0]) & (freqs < self.freq_bands['lf'][1]) + hf_mask = (freqs >= self.freq_bands['hf'][0]) & (freqs < self.freq_bands['hf'][1]) + + vlf_power = np.trapz(psd[vlf_mask], freqs[vlf_mask]) + lf_power = np.trapz(psd[lf_mask], freqs[lf_mask]) + hf_power = np.trapz(psd[hf_mask], freqs[hf_mask]) + + total_power = vlf_power + lf_power + hf_power + + # 6. 比率指标 + lf_hf_ratio = lf_power / (hf_power + 1e-6) + lf_norm = lf_power / (lf_power + hf_power + 1e-6) + hf_norm = hf_power / (lf_power + hf_power + 1e-6) + + return { + 'vlf_power': float(vlf_power), + 'lf_power': float(lf_power), + 'hf_power': float(hf_power), + 'total_power': float(total_power), + 'lf_hf_ratio': float(lf_hf_ratio), + 'lf_norm': float(lf_norm), + 'hf_norm': float(hf_norm), + 'vlf_percent': float(vlf_power / total_power * 100), + 'lf_percent': float(lf_power / total_power * 100), + 'hf_percent': float(hf_power / total_power * 100) + } + + except Exception as e: + print(f" ⚠️ 频域分析失败: {e}") + return None + + def calculate_nonlinear_features(self, rr_intervals): + """ + 计算非线性HRV指标 + + 返回:{'sd1', 'sd2', 'sample_entropy', ...} + """ + try: + if len(rr_intervals) < 20: + return None + + # 1. Poincaré图指标(SD1, SD2) + diff_rr = np.diff(rr_intervals) + sd1 = np.sqrt(np.var(diff_rr) / 2) + sd2 = np.sqrt(2 * np.var(rr_intervals) - np.var(diff_rr) / 2) + + # 2. 样本熵(Sample Entropy) + # 简化版本(完整版较复杂) + sample_entropy_value = self._calculate_sample_entropy(rr_intervals, m=2, r=0.2) + + return { + 'sd1': float(sd1), + 'sd2': float(sd2), + 'sd1_sd2_ratio': float(sd1 / (sd2 + 1e-6)), + 'sample_entropy': float(sample_entropy_value) + } + + except Exception as e: + print(f" ⚠️ 非线性分析失败: {e}") + return None + + def _calculate_sample_entropy(self, data, m=2, r=0.2): + """计算样本熵(简化版)""" + try: + N = len(data) + if N < 10: + return 0.0 + + std_data = np.std(data) + r = r * std_data + + def _maxdist(xi, xj): + return np.max(np.abs(xi - xj)) + + def _phi(m): + patterns = np.array([data[i:i+m] for i in range(N-m)]) + C = np.zeros(N-m) + for i in range(N-m): + for j in range(N-m): + if i != j and _maxdist(patterns[i], patterns[j]) < r: + C[i] += 1 + return np.sum(np.log(C / (N-m-1) + 1e-10)) / (N-m) + + return float(_phi(m) - _phi(m+1)) + + except: + return 0.0 + + def comprehensive_analysis(self, rr_intervals): + """ + 综合HRV分析 + + 返回:包含时域+频域+非线性指标的完整字典 + """ + results = {} + + # 时域指标 + if len(rr_intervals) >= 10: + diff_rr = np.diff(rr_intervals) + results['time_domain'] = { + 'mean_rr': float(np.mean(rr_intervals)), + 'sdnn': float(np.std(rr_intervals)), + 'rmssd': float(np.sqrt(np.mean(diff_rr ** 2))), + 'pnn50': float(np.sum(np.abs(diff_rr) > 50) / len(diff_rr)), + 'cv': float(np.std(rr_intervals) / np.mean(rr_intervals)) + } + + # 频域指标 + freq_results = self.calculate_frequency_domain(rr_intervals) + if freq_results: + results['frequency_domain'] = freq_results + + # 非线性指标 + nonlinear_results = self.calculate_nonlinear_features(rr_intervals) + if nonlinear_results: + results['nonlinear'] = nonlinear_results + + return results + +# ==================== 3. 机器学习模型层 ==================== + +class RiskPredictionModel(nn.Module): + """深度学习风险预测模型""" + + def __init__(self, input_dim, hidden_dims=[64, 32, 16]): + super(RiskPredictionModel, self).__init__() + + layers = [] + prev_dim = input_dim + + for hidden_dim in hidden_dims: + layers.append(nn.Linear(prev_dim, hidden_dim)) + layers.append(nn.BatchNorm1d(hidden_dim)) + layers.append(nn.ReLU()) + layers.append(nn.Dropout(0.3)) + prev_dim = hidden_dim + + layers.append(nn.Linear(prev_dim, 1)) + layers.append(nn.Sigmoid()) + + self.network = nn.Sequential(*layers) + + def forward(self, x): + return self.network(x).squeeze(-1) + +class MLModelTrainer: + """机器学习模型训练器""" + + def __init__(self, config): + self.config = config + self.model = None + self.scaler = None + self.feature_names = config.ml_config['input_features'] + + def prepare_training_data(self, db_manager, device_ids): + """ + 从数据库准备训练数据 + + 返回:X (features), y (labels) + """ + X_list = [] + y_list = [] + + for device_id in device_ids: + summaries = db_manager.get_nightly_summaries(device_id, days=90) + + for summary in summaries: + # 提取特征 + features = [] + for feat_name in self.feature_names: + value = getattr(summary, feat_name, 0) + features.append(value if value is not None else 0) + + # 提取标签(风险评分) + label = summary.mean_risk_score + + if label is not None and len(features) == len(self.feature_names): + X_list.append(features) + y_list.append(label) + + if len(X_list) == 0: + return None, None + + X = np.array(X_list) + y = np.array(y_list) + + # 标准化 + from sklearn.preprocessing import StandardScaler + self.scaler = StandardScaler() + X = self.scaler.fit_transform(X) + + return X, y + + def train_model(self, X, y): + """训练模型""" + if X is None or len(X) < 50: + print(" ⚠️ 训练数据不足(需要至少50个样本)") + return False + + print(f"{'='*70}") + print(f"🤖 开始训练机器学习模型") + print(f"{'='*70}") + print(f"训练样本: {len(X)}") + print(f"特征维度: {X.shape[1]}") + + # 划分训练/验证集 + split_idx = int(len(X) * (1 - self.config.ml_config['validation_split'])) + X_train, X_val = X[:split_idx], X[split_idx:] + y_train, y_val = y[:split_idx], y[split_idx:] + + # 转换为Tensor + X_train_tensor = torch.tensor(X_train, dtype=torch.float32) + y_train_tensor = torch.tensor(y_train, dtype=torch.float32) + X_val_tensor = torch.tensor(X_val, dtype=torch.float32) + y_val_tensor = torch.tensor(y_val, dtype=torch.float32) + + # 初始化模型 + input_dim = X.shape[1] + self.model = RiskPredictionModel( + input_dim=input_dim, + hidden_dims=self.config.ml_config['hidden_dims'] + ) + + # 损失函数和优化器 + criterion = nn.MSELoss() + optimizer = optim.Adam( + self.model.parameters(), + lr=self.config.ml_config['learning_rate'] + ) + + # 训练循环 + best_val_loss = float('inf') + patience = 10 + patience_counter = 0 + + for epoch in range(self.config.ml_config['epochs']): + # 训练 + self.model.train() + optimizer.zero_grad() + outputs = self.model(X_train_tensor) + loss = criterion(outputs, y_train_tensor) + loss.backward() + optimizer.step() + + # 验证 + self.model.eval() + with torch.no_grad(): + val_outputs = self.model(X_val_tensor) + val_loss = criterion(val_outputs, y_val_tensor) + + # 早停 + if val_loss < best_val_loss: + best_val_loss = val_loss + patience_counter = 0 + # 保存最佳模型 + torch.save({ + 'model_state_dict': self.model.state_dict(), + 'scaler': self.scaler, + 'feature_names': self.feature_names + }, self.config.ml_config['model_save_path']) + else: + patience_counter += 1 + + if patience_counter >= patience: + print(f" 提前停止于 epoch {epoch+1}") + break + + if (epoch + 1) % 10 == 0: + print(f" Epoch {epoch+1}/{self.config.ml_config['epochs']} | " + f"Train Loss: {loss.item():.4f} | Val Loss: {val_loss.item():.4f}") + + print(f"✓ 模型训练完成!最佳验证损失: {best_val_loss:.4f}") + print(f"✓ 模型已保存: {self.config.ml_config['model_save_path']}") + print(f"{'='*70}") + + return True + + def load_model(self): + """加载已训练模型""" + model_path = self.config.ml_config['model_save_path'] + if not os.path.exists(model_path): + return False + + try: + checkpoint = torch.load(model_path) + + input_dim = len(checkpoint['feature_names']) + self.model = RiskPredictionModel( + input_dim=input_dim, + hidden_dims=self.config.ml_config['hidden_dims'] + ) + self.model.load_state_dict(checkpoint['model_state_dict']) + self.model.eval() + + self.scaler = checkpoint['scaler'] + self.feature_names = checkpoint['feature_names'] + + print(f"✓ 模型加载成功: {model_path}") + return True + + except Exception as e: + print(f"❌ 模型加载失败: {e}") + return False + + def predict(self, features): + """ + 预测风险评分 + + 参数:features = {'rmssd': ..., 'sdnn': ..., ...} + 返回:风险评分(0-1) + """ + if self.model is None: + return None + + # 提取特征值 + feature_vector = [] + for feat_name in self.feature_names: + feature_vector.append(features.get(feat_name, 0)) + + # 标准化 + feature_vector = np.array(feature_vector).reshape(1, -1) + feature_vector = self.scaler.transform(feature_vector) + + # 预测 + with torch.no_grad(): + feature_tensor = torch.tensor(feature_vector, dtype=torch.float32) + prediction = self.model(feature_tensor) + + return float(prediction.item()) + +# ==================== 4. 趋势分析层 ==================== + +class TrendAnalyzer: + """长期趋势分析器""" + + def __init__(self, config): + self.config = config + + def analyze_trends(self, nightly_summaries): + """ + 分析长期趋势 + + 参数:nightly_summaries = [NightlySummary对象列表] + 返回:趋势分析结果 + """ + if len(nightly_summaries) < self.config.trend_config['min_days_for_trend']: + return None + + # 提取时间序列 + dates = [datetime.strptime(s.date, '%Y-%m-%d') for s in nightly_summaries] + risk_scores = [s.mean_risk_score for s in nightly_summaries] + rmssd_values = [s.mean_rmssd for s in nightly_summaries] + hr_values = [s.mean_hr for s in nightly_summaries] + bathroom_freqs = [s.total_bathroom_visits for s in nightly_summaries] + + # 移动平滑 + window = self.config.trend_config['smoothing_window'] + risk_smoothed = self._moving_average(risk_scores, window) + rmssd_smoothed = self._moving_average(rmssd_values, window) + + # 计算趋势方向 + risk_trend = self._calculate_trend(risk_smoothed) + rmssd_trend = self._calculate_trend(rmssd_smoothed) + + # 检测异常值 + risk_anomalies = self._detect_anomalies(risk_scores) + + # 计算变化率 + recent_risk = np.mean(risk_scores[-7:]) # 最近7天 + baseline_risk = np.mean(risk_scores[:14]) # 前14天基线 + risk_change_percent = (recent_risk - baseline_risk) / (baseline_risk + 1e-6) * 100 + + return { + 'risk_trend': risk_trend, # 'increasing', 'stable', 'decreasing' + 'rmssd_trend': rmssd_trend, + 'risk_change_percent': float(risk_change_percent), + 'recent_avg_risk': float(recent_risk), + 'baseline_avg_risk': float(baseline_risk), + 'anomaly_count': len(risk_anomalies), + 'anomaly_dates': [dates[i].strftime('%Y-%m-%d') for i in risk_anomalies], + 'avg_bathroom_freq': float(np.mean(bathroom_freqs)), + 'max_bathroom_freq': int(np.max(bathroom_freqs)) + } + + def _moving_average(self, data, window): + """移动平均""" + if len(data) < window: + return data + return np.convolve(data, np.ones(window)/window, mode='valid') + + def _calculate_trend(self, data): + """计算趋势方向(线性回归斜率)""" + if len(data) < 3: + return 'stable' + + x = np.arange(len(data)) + slope = np.polyfit(x, data, 1)[0] + + if slope > 0.01: + return 'increasing' + elif slope < -0.01: + return 'decreasing' + else: + return 'stable' + + def _detect_anomalies(self, data, threshold=2.5): + """检测异常值(基于标准差)""" + mean = np.mean(data) + std = np.std(data) + anomalies = [] + + for i, value in enumerate(data): + if abs(value - mean) > threshold * std: + anomalies.append(i) + + return anomalies + + def should_alert(self, trend_result): + """判断是否应该触发趋势警报""" + if trend_result is None: + return False, None + + alert_threshold = self.config.trend_config['alert_threshold_increase'] + + if trend_result['risk_change_percent'] > alert_threshold * 100: + message = (f"⚠️ 风险评分上升趋势警报!" + f"近期风险: {trend_result['recent_avg_risk']:.3f}" + f"基线风险: {trend_result['baseline_avg_risk']:.3f}" + f"上升幅度: {trend_result['risk_change_percent']:.1f}%") + return True, message + + return False, None + +# ==================== 5. 报告生成层 ==================== + +class ReportGenerator: + """诊断报告生成器""" + + def __init__(self, config): + self.config = config + + def generate_comprehensive_report(self, device_id, db_manager, ml_trainer, trend_analyzer): + """ + 生成综合诊断报告 + + 返回:报告字典 + """ + print(f"{'='*70}") + print(f"📄 生成综合诊断报告") + print(f"{'='*70}") + print(f"设备ID: {device_id}") + + # 1. 获取最近30天数据 + summaries = db_manager.get_nightly_summaries(device_id, days=30) + + if len(summaries) == 0: + print(" ⚠️ 无可用数据") + return None + + print(f"✓ 加载 {len(summaries)} 天数据") + + # 2. 趋势分析 + trend_result = trend_analyzer.analyze_trends(summaries) + + # 3. 最新状态 + latest = summaries[0] + latest_features = { + 'rmssd': latest.mean_rmssd, + 'sdnn': latest.mean_sdnn, + 'mean_hr': latest.mean_hr, + 'lf_power': latest.lf_power, + 'hf_power': latest.hf_power, + 'lf_hf_ratio': latest.lf_hf_ratio, + 'bathroom_freq': latest.total_bathroom_visits, + 'bathroom_duration': latest.total_bathroom_duration + } + + # 4. ML预测 + ml_prediction = None + if ml_trainer.model is not None: + ml_prediction = ml_trainer.predict(latest_features) + + # 5. 计算统计量 + stats = self._calculate_statistics(summaries) + + # 6. 生成图表 + chart_paths = self._generate_charts(device_id, summaries) + + # 7. 医学建议 + recommendations = self._generate_recommendations(latest_features, trend_result) + + # 8. 构建报告 + report = { + 'device_id': device_id, + 'generation_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'analysis_period': f"{summaries[-1].date} ~ {summaries[0].date}", + 'total_days': len(summaries), + + 'latest_status': { + 'date': latest.date, + 'mean_hr': latest.mean_hr, + 'mean_rmssd': latest.mean_rmssd, + 'mean_sdnn': latest.mean_sdnn, + 'lf_hf_ratio': latest.lf_hf_ratio, + 'bathroom_visits': latest.total_bathroom_visits, + 'risk_score': latest.mean_risk_score, + 'risk_level': latest.dominant_risk_level + }, + + 'ml_prediction': { + 'predicted_risk': ml_prediction, + 'model_version': config.version + } if ml_prediction else None, + + 'trend_analysis': trend_result, + + 'statistics': stats, + + 'charts': chart_paths, + + 'recommendations': recommendations + } + + # 9. 保存报告 + report_path = self._save_report(device_id, report) + report['report_path'] = report_path + + print(f"✓ 报告生成完成: {report_path}") + print(f"{'='*70}") + + return report + + def _calculate_statistics(self, summaries): + """计算统计指标""" + hr_values = [s.mean_hr for s in summaries if s.mean_hr] + rmssd_values = [s.mean_rmssd for s in summaries if s.mean_rmssd] + risk_values = [s.mean_risk_score for s in summaries if s.mean_risk_score] + + return { + 'hr': { + 'mean': float(np.mean(hr_values)), + 'std': float(np.std(hr_values)), + 'min': float(np.min(hr_values)), + 'max': float(np.max(hr_values)) + }, + 'rmssd': { + 'mean': float(np.mean(rmssd_values)), + 'std': float(np.std(rmssd_values)), + 'min': float(np.min(rmssd_values)), + 'max': float(np.max(rmssd_values)) + }, + 'risk_score': { + 'mean': float(np.mean(risk_values)), + 'std': float(np.std(risk_values)), + 'trend': '上升' if np.mean(risk_values[-7:]) > np.mean(risk_values[:7]) else '下降' + } + } + + def _generate_charts(self, device_id, summaries): + """生成可视化图表""" + chart_dir = self.config.report_config['chart_dir'] + charts = {} + + dates = [datetime.strptime(s.date, '%Y-%m-%d') for s in reversed(summaries)] + + # 1. 风险评分趋势图 + risk_scores = [s.mean_risk_score for s in reversed(summaries)] + + plt.figure(figsize=(10, 4)) + plt.plot(dates, risk_scores, 'o-', linewidth=2, markersize=4) + plt.axhline(y=0.65, color='r', linestyle='--', label='高风险阈值') + plt.axhline(y=0.40, color='orange', linestyle='--', label='中风险阈值') + plt.xlabel('日期') + plt.ylabel('风险评分') + plt.title('30天风险评分趋势') + plt.legend() + plt.grid(True, alpha=0.3) + plt.tight_layout() + + risk_chart_path = os.path.join(chart_dir, f'{device_id}_risk_trend.png') + plt.savefig(risk_chart_path, dpi=100) + plt.close() + charts['risk_trend'] = risk_chart_path + + # 2. HRV指标图 + rmssd_values = [s.mean_rmssd for s in reversed(summaries)] + + plt.figure(figsize=(10, 4)) + plt.plot(dates, rmssd_values, 's-', linewidth=2, markersize=4, color='green') + plt.xlabel('日期') + plt.ylabel('RMSSD (ms)') + plt.title('30天HRV变化(RMSSD)') + plt.grid(True, alpha=0.3) + plt.tight_layout() + + hrv_chart_path = os.path.join(chart_dir, f'{device_id}_hrv_trend.png') + plt.savefig(hrv_chart_path, dpi=100) + plt.close() + charts['hrv_trend'] = hrv_chart_path + + return charts + + def _generate_recommendations(self, features, trend_result): + """生成医学建议""" + recommendations = [] + + # 基于RMSSD + if features.get('rmssd', 0) < 20: + recommendations.append("⚠️ HRV显著降低,建议加强心血管监测") + elif features.get('rmssd', 0) < 30: + recommendations.append("💡 HRV偏低,建议适度有氧运动改善") + + # 基于心率 + if features.get('mean_hr', 0) > 85: + recommendations.append("⚠️ 静息心率偏高,建议咨询心血管专科医生") + + # 基于LF/HF比 + if features.get('lf_hf_ratio', 0) > 2.5: + recommendations.append("💡 交感神经活性较高,建议放松训练(冥想、深呼吸)") + + # 基于起夜频率 + if features.get('bathroom_freq', 0) >= 3: + recommendations.append("⚠️ 夜尿频繁,建议泌尿科检查") + + # 基于趋势 + if trend_result and trend_result['risk_trend'] == 'increasing': + recommendations.append("📈 风险评分呈上升趋势,建议增加监测频率") + + if len(recommendations) == 0: + recommendations.append("✅ 各项指标正常,保持健康生活方式") + + return recommendations + + def _save_report(self, device_id, report): + """保存报告为JSON""" + report_dir = self.config.report_config['report_dir'] + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + filename = f"report_{device_id}_{timestamp}.json" + filepath = os.path.join(report_dir, filename) + + with open(filepath, 'w', encoding='utf-8') as f: + json.dump(report, f, indent=2, ensure_ascii=False) + + return filepath + +# ==================== 6. Flask API层 ==================== + +app = Flask(__name__) + +# 全局组件 +db_manager = None +ml_trainer = None +trend_analyzer = None +report_generator = None + +def authenticate_request(): + """API认证""" + api_key = request.headers.get('Authorization') + if api_key: + api_key = api_key.replace('Bearer ', '') + + if api_key not in config.api_config['api_keys']: + return False + return True + +@app.route('/api/upload', methods=['POST']) +def upload_data(): + """接收边缘端上传的数据""" + if not authenticate_request(): + return jsonify({'error': 'Unauthorized'}), 401 + + try: + data = request.get_json() + + device_id = data.get('device_id', 'unknown') + events = data.get('events', []) + + # 存储到数据库 + for event in events: + db_manager.add_sensor_data(device_id, event) + + print(f" ☁️ [API] 接收上传: 设备={device_id}, 事件数={len(events)}") + + return jsonify({ + 'status': 'success', + 'received_events': len(events), + 'device_id': device_id + }), 200 + + except Exception as e: + return jsonify({'error': str(e)}), 500 + +@app.route('/api/report/', methods=['GET']) +def get_report(device_id): + """获取设备诊断报告""" + if not authenticate_request(): + return jsonify({'error': 'Unauthorized'}), 401 + + try: + report = report_generator.generate_comprehensive_report( + device_id, db_manager, ml_trainer, trend_analyzer + ) + + if report is None: + return jsonify({'error': 'No data available'}), 404 + + return jsonify(report), 200 + + except Exception as e: + return jsonify({'error': str(e)}), 500 + +@app.route('/api/optimize/', methods=['POST']) +def optimize_parameters(device_id): + """优化边缘端参数""" + if not authenticate_request(): + return jsonify({'error': 'Unauthorized'}), 401 + + try: + # 获取设备历史数据 + summaries = db_manager.get_nightly_summaries(device_id, days=30) + + # 基于数据优化阈值 + rmssd_values = [s.mean_rmssd for s in summaries if s.mean_rmssd] + hr_values = [s.mean_hr for s in summaries if s.mean_hr] + + optimized_params = { + 'rmssd_baseline': float(np.median(rmssd_values)), + 'hr_baseline': float(np.median(hr_values)), + 'quality_threshold': 0.5, # 可以根据数据质量调整 + 'upload_interval': 300 + } + + print(f" 🔧 [API] 参数优化: 设备={device_id}") + + return jsonify({ + 'status': 'success', + 'optimized_parameters': optimized_params + }), 200 + + except Exception as e: + return jsonify({'error': str(e)}), 500 + +@app.route('/api/health', methods=['GET']) +def health_check(): + """健康检查""" + return jsonify({ + 'status': 'healthy', + 'version': config.version, + 'timestamp': datetime.now().isoformat() + }), 200 + +# ==================== 7. 主程序 ==================== + +def initialize_cloud_system(): + """初始化云端系统""" + global db_manager, ml_trainer, trend_analyzer, report_generator + + print(f"{'='*70}") + print(f"☁️ 高血压风险评估系统 - 云端模块") + print(f"{'='*70}") + print(f"版本: {config.version}") + print(f"API端口: {config.api_config['port']}") + print(f"{'='*70}") + + # 初始化组件 + print("📦 初始化组件...") + + db_manager = DatabaseManager(config) + ml_trainer = MLModelTrainer(config) + trend_analyzer = TrendAnalyzer(config) + report_generator = ReportGenerator(config) + + # 尝试加载已训练模型 + ml_trainer.load_model() + + print(f"✅ 云端系统初始化完成!") + +def run_demo(): + """运行演示(无需边缘端)""" + print(f"{'='*70}") + print(f"🎬 云端系统演示模式") + print(f"{'='*70}") + + initialize_cloud_system() + + # 模拟边缘端上传数据 + print("📤 模拟边缘端数据上传...") + + device_id = "demo_device_001" + + for day in range(14): + date = (datetime.now() - timedelta(days=14-day)).strftime('%Y-%m-%d') + + # 模拟夜间汇总数据 + summary_data = { + 'device_id': device_id, + 'date': date, + 'mean_hr': 70 + np.random.randn() * 3, + 'min_hr': 60 + np.random.randn() * 2, + 'max_hr': 85 + np.random.randn() * 3, + 'mean_rmssd': 35 + np.random.randn() * 5, + 'mean_sdnn': 50 + np.random.randn() * 8, + 'mean_pnn50': 0.12 + np.random.randn() * 0.03, + 'vlf_power': 300 + np.random.randn() * 50, + 'lf_power': 500 + np.random.randn() * 80, + 'hf_power': 400 + np.random.randn() * 60, + 'lf_hf_ratio': 1.2 + np.random.randn() * 0.3, + 'total_bathroom_visits': int(np.random.choice([1, 2, 3], p=[0.5, 0.3, 0.2])), + 'total_bathroom_duration': 180 + np.random.randn() * 40, + 'avg_bathroom_duration': 90 + np.random.randn() * 20, + 'first_bathroom_hour': 2.5 + np.random.randn() * 1.0, + 'mean_risk_score': 0.35 + day * 0.015 + np.random.randn() * 0.05, # 逐渐上升 + 'max_risk_score': 0.50 + day * 0.02, + 'dominant_risk_level': 'MEDIUM' if day > 7 else 'LOW', + 'valid_data_hours': 7.5 + np.random.randn() * 0.5, + 'total_data_points': 900 + int(np.random.randn() * 50) + } + + db_manager.add_nightly_summary(summary_data) + + print(f"✓ 已添加14天模拟数据") + + # 生成报告 + report = report_generator.generate_comprehensive_report( + device_id, db_manager, ml_trainer, trend_analyzer + ) + + if report: + print("📊 报告摘要:") + print(f" 设备ID: {report['device_id']}") + print(f" 分析周期: {report['analysis_period']}") + print(f" 最新风险评分: {report['latest_status']['risk_score']:.3f}") + print(f" 风险等级: {report['latest_status']['risk_level']}") + print(f" 趋势: {report['trend_analysis']['risk_trend']}") + print(f" 风险变化: {report['trend_analysis']['risk_change_percent']:.1f}%") + print(f" 医学建议:") + for rec in report['recommendations']: + print(f" - {rec}") + print(f" 报告已保存: {report['report_path']}") + +def main(): + """主函数""" + import sys + + if len(sys.argv) > 1 and sys.argv[1] == 'demo': + # 演示模式 + run_demo() + else: + # API服务模式 + initialize_cloud_system() + + print(f"🌐 启动API服务...") + print(f"监听地址: http://{config.api_config['host']}:{config.api_config['port']}") + print(f"可用端点:") + print(f" POST /api/upload - 接收边缘端数据") + print(f" GET /api/report/ - 获取诊断报告") + print(f" POST /api/optimize/ - 优化参数") + print(f" GET /api/health - 健康检查") + print(f"按Ctrl+C停止服务") + + app.run( + host=config.api_config['host'], + port=config.api_config['port'], + debug=config.api_config['debug'] + ) + +if __name__ == "__main__": + main() diff --git a/0112-2026 - Test_Package/edge_hypertension_system- 边缘端代码.py b/0112-2026 - Test_Package/edge_hypertension_system- 边缘端代码.py new file mode 100644 index 0000000..daaa4fa --- /dev/null +++ b/0112-2026 - Test_Package/edge_hypertension_system- 边缘端代码.py @@ -0,0 +1,1255 @@ +# -*- coding: utf-8 -*- +""" +高血压风险评估系统 - 边缘端实时处理模块(RK3588优化版) +Hypertension Risk Assessment - Edge Computing Module (Optimized for RK3588) + +Created: 2026-01-09 +Author: KKRobot Healthcare AI Team +Version: v1.0_edge_realtime + +📌 部署架构:混合部署 +- 边缘端(本代码):RK3588设备,实时预警(<1秒响应) +- 云端(配套):深度分析,长期趋势预测 + +📊 传感器配置: +- 床上:压电陶瓷BCG传感器(250Hz采样) +- 厕所:毫米波雷达(20Hz采样) + +🎯 核心功能: +1. 实时BCG心率/HRV提取(30秒滑动窗口) +2. 实时雷达起夜检测 +3. 硬规则快速风险评估 +4. 即时预警触发(高风险立即推送) +5. 匿名化数据定期上传云端 + +⚡ RK3588优化: +- 轻量级依赖(numpy/scipy only) +- 流式处理(内存占用<500MB) +- ARM NEON vectorization +- 无深度学习模型 + +📦 运行要求: +pip install numpy scipy +python edge_hypertension_system.py + +💾 存储需求: +- 运行内存:<500MB +- 磁盘缓存:<100MB(7天基线数据) +""" + +import numpy as np +from scipy import signal +from scipy.interpolate import interp1d +import time +import json +import os +from collections import deque +from datetime import datetime, timedelta +import warnings +warnings.filterwarnings("ignore") + +# ==================== 0. 边缘端核心配置 ==================== + +class EdgeConfig: + """边缘端配置(RK3588优化)""" + + def __init__(self): + # 版本标识 + self.version = "v1.0_edge_realtime" + self.device_id = "edge_rk3588_001" # 设备唯一标识 + + # ========== BCG传感器配置 ========== + self.bcg_config = { + 'sampling_rate': 250, # 采样率(Hz) + 'window_size': 30, # 处理窗口(秒) + 'slide_step': 5, # 滑动步长(秒) + 'hr_range': (40, 120), # 心率范围(bpm) + 'rr_range': (8, 25), # 呼吸率范围(/min) + 'quality_threshold': 0.5, # 质量阈值 + '离bed_threshold': 0.3 # 离床检测阈值 + } + + # ========== 雷达传感器配置 ========== + self.radar_config = { + 'sampling_rate': 20, # 采样率(Hz) + 'presence_threshold': 0.6, # 存在检测阈值 + 'bathroom_zone': { # 厕所区域(3D边界框) + 'x_range': (0.5, 2.5), # 米 + 'y_range': (0.5, 2.0), # 米 + 'z_range': (0.3, 1.8) # 米 + }, + 'min_visit_duration': 30, # 最短起夜时长(秒) + 'event_merge_gap': 60, # 事件合并间隔(秒) + 'motion_threshold': 0.3 # 运动检测阈值 + } + + # ========== 实时处理配置 ========== + self.realtime_config = { + 'buffer_size': 7500, # BCG缓冲区大小(30秒@250Hz) + 'processing_interval': 5, # 处理间隔(秒) + 'max_latency': 0.5, # 最大延迟(秒) + 'enable_logging': True, # 是否打印日志 + 'log_interval': 60 # 日志打印间隔(秒) + } + + # ========== 风险评估配置(硬规则) ========== + self.risk_config = { + # RMSSD阈值(HRV核心指标) + 'rmssd_high_risk': 20, # <20ms高风险 + 'rmssd_medium_risk': 30, # 20-30ms中风险 + 'rmssd_low_risk': 40, # >40ms低风险 + + # 心率阈值 + 'hr_high_risk': 85, # >85bpm高风险 + 'hr_medium_risk': 75, # 75-85bpm中风险 + + # 起夜频率阈值(每晚) + 'bathroom_high_risk': 3, # ≥3次高风险 + 'bathroom_medium_risk': 2, # 2次中风险 + + # 起夜心率上升阈值 + 'hr_surge_high_risk': 20, # >20bpm高风险 + 'hr_surge_medium_risk': 12, # 12-20bpm中风险 + + # 权重配置 + 'weights': { + 'rmssd': 0.35, + 'sdnn': 0.15, + 'hr_level': 0.20, + 'hr_surge': 0.10, + 'bathroom_freq': 0.20 + } + } + + # ========== 预警配置 ========== + self.alert_config = { + 'high_risk_threshold': 0.65, # 高风险阈值 + 'medium_risk_threshold': 0.40, # 中风险阈值 + 'alert_cooldown': 300, # 预警冷却时间(秒) + 'enable_sound': True, # 是否声音提醒 + 'enable_push': True # 是否推送通知 + } + + # ========== 云端通信配置 ========== + self.cloud_config = { + 'enable_upload': True, # 是否上传云端 + 'upload_interval': 300, # 上传间隔(秒,5分钟) + 'upload_url': 'https://api.example.com/upload', + 'api_key': 'your_api_key_here', + 'anonymize': True, # 是否匿名化 + 'compress': True # 是否压缩 + } + + # ========== 本地缓存配置 ========== + self.cache_config = { + 'baseline_file': 'baseline_cache.json', + 'event_log_file': 'event_log.jsonl', + 'max_baseline_nights': 7, # 基线窗口(天) + 'max_event_log_size': 1000, # 最大事件数 + 'cache_dir': './edge_cache' + } + + # 创建缓存目录 + os.makedirs(self.cache_config['cache_dir'], exist_ok=True) + +# 全局配置实例 +config = EdgeConfig() + +# ==================== 1. 数据结构层 ==================== + +class RingBuffer: + """高效环形缓冲区(节省内存)""" + + def __init__(self, size): + self.size = size + self.buffer = np.zeros(size, dtype=np.float32) + self.index = 0 + self.is_filled = False + + def append(self, value): + """追加单个值""" + self.buffer[self.index] = value + self.index = (self.index + 1) % self.size + if self.index == 0: + self.is_filled = True + + def extend(self, values): + """批量追加""" + for v in values: + self.append(v) + + def get_data(self): + """获取完整数据(按时间顺序)""" + if not self.is_filled: + return self.buffer[:self.index] + else: + return np.concatenate([ + self.buffer[self.index:], + self.buffer[:self.index] + ]) + + def is_full(self): + """是否已填满""" + return self.is_filled or self.index >= self.size + + def clear(self): + """清空缓冲区""" + self.buffer.fill(0) + self.index = 0 + self.is_filled = False + +class SensorEvent: + """传感器事件(轻量级数据结构)""" + + def __init__(self, timestamp, event_type, data, quality=1.0): + self.timestamp = timestamp # Unix时间戳(秒) + self.event_type = event_type # 'BCG_HR', 'BCG_HRV', 'RADAR_VISIT', 'ALERT' + self.data = data # 事件数据(dict) + self.quality = quality # 质量分数(0-1) + + def to_dict(self): + """转换为字典(用于序列化)""" + return { + 'timestamp': self.timestamp, + 'event_type': self.event_type, + 'data': self.data, + 'quality': self.quality, + 'datetime': datetime.fromtimestamp(self.timestamp).strftime('%Y-%m-%d %H:%M:%S') + } + + def anonymize(self): + """匿名化处理(去除敏感信息)""" + anon_data = self.data.copy() + # 移除可能的设备标识 + anon_data.pop('device_id', None) + anon_data.pop('location', None) + return SensorEvent(self.timestamp, self.event_type, anon_data, self.quality) + +class BaselineCache: + """个体基线缓存(7天滚动窗口)""" + + def __init__(self, cache_file): + self.cache_file = cache_file + self.baseline = self.load() + + def load(self): + """加载缓存""" + if os.path.exists(self.cache_file): + try: + with open(self.cache_file, 'r') as f: + return json.load(f) + except: + return self._default_baseline() + return self._default_baseline() + + def save(self): + """保存缓存""" + with open(self.cache_file, 'w') as f: + json.dump(self.baseline, f, indent=2) + + def update(self, night_features): + """更新基线(滚动窗口)""" + if 'history' not in self.baseline: + self.baseline['history'] = [] + + # 添加新数据 + self.baseline['history'].append({ + 'date': datetime.now().strftime('%Y-%m-%d'), + 'features': night_features + }) + + # 保留最近7天 + if len(self.baseline['history']) > config.cache_config['max_baseline_nights']: + self.baseline['history'] = self.baseline['history'][-7:] + + # 重新计算基线(中位数) + if len(self.baseline['history']) >= 3: + self._recalculate_baseline() + + self.save() + + def _recalculate_baseline(self): + """重新计算基线(使用中位数)""" + history = self.baseline['history'] + + rmssd_values = [h['features'].get('rmssd', 0) for h in history if h['features'].get('rmssd', 0) > 0] + hr_values = [h['features'].get('mean_hr', 0) for h in history if h['features'].get('mean_hr', 0) > 0] + bathroom_values = [h['features'].get('bathroom_freq', 0) for h in history] + + if rmssd_values: + self.baseline['rmssd'] = float(np.median(rmssd_values)) + if hr_values: + self.baseline['mean_hr'] = float(np.median(hr_values)) + if bathroom_values: + self.baseline['bathroom_freq'] = float(np.median(bathroom_values)) + + def get(self, key, default=None): + """获取基线值""" + return self.baseline.get(key, default) + + def _default_baseline(self): + """默认基线(群体均值)""" + return { + 'rmssd': 35.0, # 健康老年人典型值 + 'sdnn': 50.0, + 'mean_hr': 70.0, + 'bathroom_freq': 1.5, + 'history': [] + } + +# ==================== 2. BCG信号处理层 ==================== + +class BCGProcessor: + """BCG信号实时处理器(ARM优化)""" + + def __init__(self, config): + self.config = config + self.fs = config.bcg_config['sampling_rate'] + self.hr_range = config.bcg_config['hr_range'] + self.rr_range = config.bcg_config['rr_range'] + + # 预先设计滤波器(避免重复计算) + self.hr_filter = self._design_hr_filter() + self.rr_filter = self._design_rr_filter() + + def _design_hr_filter(self): + """设计心率带通滤波器(0.8-3.0 Hz)""" + nyq = self.fs / 2 + low = 0.8 / nyq + high = 3.0 / nyq + b, a = signal.butter(4, [low, high], btype='band') + return b, a + + def _design_rr_filter(self): + """设计呼吸率带通滤波器(0.1-0.6 Hz)""" + nyq = self.fs / 2 + low = 0.1 / nyq + high = 0.6 / nyq + b, a = signal.butter(4, [low, high], btype='band') + return b, a + + def assess_signal_quality(self, bcg_signal): + """ + 快速信号质量评估(<50ms) + + 返回:质量分数(0-1) + """ + try: + # 1. SNR评估(40%权重) + signal_power = np.mean(bcg_signal ** 2) + noise_estimate = np.var(np.diff(bcg_signal)) + snr = signal_power / (noise_estimate + 1e-6) + snr_score = np.clip(snr / 10.0, 0, 1) + + # 2. 频谱集中度(30%权重) + freqs, psd = signal.welch(bcg_signal, fs=self.fs, nperseg=min(512, len(bcg_signal))) + hr_band_mask = (freqs >= 0.8) & (freqs <= 3.0) + hr_band_power = np.sum(psd[hr_band_mask]) + total_power = np.sum(psd) + spectrum_score = hr_band_power / (total_power + 1e-6) + + # 3. 周期性评估(30%权重) + autocorr = np.correlate(bcg_signal - np.mean(bcg_signal), + bcg_signal - np.mean(bcg_signal), + mode='same') + autocorr = autocorr[len(autocorr)//2:] + peak_pos = np.argmax(autocorr[int(0.3*self.fs):int(1.5*self.fs)]) + int(0.3*self.fs) + periodicity_score = autocorr[peak_pos] / (autocorr[0] + 1e-6) + periodicity_score = np.clip(periodicity_score, 0, 1) + + # 综合评分 + quality = (0.40 * snr_score + + 0.30 * spectrum_score + + 0.30 * periodicity_score) + + return float(np.clip(quality, 0, 1)) + + except Exception as e: + return 0.0 + + def extract_heart_rate(self, bcg_signal): + """ + 实时心率提取(<100ms) + + 返回:(心率bpm, 置信度, RR间期数组ms) + """ + try: + # 1. 带通滤波 + filtered = signal.filtfilt(self.hr_filter[0], self.hr_filter[1], bcg_signal) + + # 2. 峰值检测(优化参数) + peaks, properties = signal.find_peaks( + filtered, + distance=int(0.4 * self.fs), # 最小峰间距(150bpm对应0.4秒) + prominence=0.3 * np.std(filtered) + ) + + if len(peaks) < 5: + return None, 0.0, None + + # 3. 计算RR间期 + rr_intervals = np.diff(peaks) / self.fs * 1000 # 转换为ms + + # 4. 异常值过滤(MAD方法,快速) + median_rr = np.median(rr_intervals) + mad = np.median(np.abs(rr_intervals - median_rr)) + valid_mask = np.abs(rr_intervals - median_rr) < 3 * mad + + if np.sum(valid_mask) < 5: + return None, 0.0, None + + clean_rr = rr_intervals[valid_mask] + + # 5. 计算心率 + mean_rr_ms = np.mean(clean_rr) + heart_rate = 60000.0 / mean_rr_ms + + # 6. 合理性检查 + if not (self.hr_range[0] <= heart_rate <= self.hr_range[1]): + return None, 0.0, None + + # 7. 置信度评估(基于变异系数) + cv = np.std(clean_rr) / np.mean(clean_rr) + confidence = np.clip(1.0 - cv, 0, 1) + + return float(heart_rate), float(confidence), clean_rr + + except Exception as e: + return None, 0.0, None + + def calculate_hrv_features(self, rr_intervals): + """ + 快速HRV特征计算(<20ms) + + 返回:{'rmssd': float, 'sdnn': float, 'pnn50': float} + """ + try: + if len(rr_intervals) < 10: + return None + + # 时域指标 + sdnn = np.std(rr_intervals) + + diff_rr = np.diff(rr_intervals) + rmssd = np.sqrt(np.mean(diff_rr ** 2)) + + nn50 = np.sum(np.abs(diff_rr) > 50) + pnn50 = nn50 / len(diff_rr) if len(diff_rr) > 0 else 0 + + return { + 'sdnn': float(sdnn), + 'rmssd': float(rmssd), + 'pnn50': float(pnn50) + } + + except Exception as e: + return None + + def extract_respiratory_rate(self, bcg_signal): + """ + 实时呼吸率提取(<80ms) + + 返回:(呼吸率/min, 置信度) + """ + try: + # 1. 带通滤波 + filtered = signal.filtfilt(self.rr_filter[0], self.rr_filter[1], bcg_signal) + + # 2. Welch功率谱估计 + freqs, psd = signal.welch(filtered, fs=self.fs, + nperseg=min(512, len(filtered)), + noverlap=256) + + # 3. 限制在呼吸频率范围 + rr_band_mask = (freqs >= 0.1) & (freqs <= 0.6) + rr_freqs = freqs[rr_band_mask] + rr_psd = psd[rr_band_mask] + + if len(rr_psd) == 0: + return None, 0.0 + + # 4. 找峰值 + peak_idx = np.argmax(rr_psd) + peak_freq = rr_freqs[peak_idx] + respiratory_rate = peak_freq * 60 # 转换为/min + + # 5. 合理性检查 + if not (self.rr_range[0] <= respiratory_rate <= self.rr_range[1]): + return None, 0.0 + + # 6. 置信度评估(峰值突出度) + peak_power = rr_psd[peak_idx] + mean_power = np.mean(rr_psd) + confidence = np.clip(peak_power / (mean_power * 3), 0, 1) + + return float(respiratory_rate), float(confidence) + + except Exception as e: + return None, 0.0 + +# ==================== 3. 雷达信号处理层 ==================== + +class RadarProcessor: + """雷达信号实时处理器""" + + def __init__(self, config): + self.config = config + self.bathroom_zone = config.radar_config['bathroom_zone'] + self.presence_history = deque(maxlen=3) # 时序平滑(3帧) + self.current_visit_start = None + + def detect_presence_in_bathroom(self, point_cloud): + """ + 实时存在检测(<30ms) + + 输入:point_cloud = [(x, y, z, velocity), ...] + 返回:(是否存在, 置信度) + """ + try: + if len(point_cloud) == 0: + self.presence_history.append(False) + return False, 0.0 + + # 1. 静态杂波去除 + dynamic_points = [p for p in point_cloud if abs(p[3]) > 0.05] + + # 2. 空间过滤(在厕所区域内) + bathroom_points = [] + for p in dynamic_points: + x, y, z = p[0], p[1], p[2] + if (self.bathroom_zone['x_range'][0] <= x <= self.bathroom_zone['x_range'][1] and + self.bathroom_zone['y_range'][0] <= y <= self.bathroom_zone['y_range'][1] and + self.bathroom_zone['z_range'][0] <= z <= self.bathroom_zone['z_range'][1]): + bathroom_points.append(p) + + # 3. 密度判断 + point_density = len(bathroom_points) / (len(point_cloud) + 1e-6) + is_present = point_density > self.config.radar_config['presence_threshold'] + + # 4. 时序平滑(连续3帧确认) + self.presence_history.append(is_present) + stable_presence = sum(self.presence_history) >= 2 # 至少2/3帧检测到 + + confidence = point_density if stable_presence else 0.0 + + return stable_presence, float(confidence) + + except Exception as e: + return False, 0.0 + + def detect_bathroom_visit(self, timestamp, is_present): + """ + 起夜事件检测(状态机) + + 返回:SensorEvent对象(如果访问结束)或 None + """ + min_duration = self.config.radar_config['min_visit_duration'] + + if is_present and self.current_visit_start is None: + # 开始起夜 + self.current_visit_start = timestamp + return None + + elif not is_present and self.current_visit_start is not None: + # 结束起夜 + duration = timestamp - self.current_visit_start + + if duration >= min_duration: + # 有效起夜 + event = SensorEvent( + timestamp=self.current_visit_start, + event_type='BATHROOM_VISIT', + data={ + 'start': self.current_visit_start, + 'end': timestamp, + 'duration': duration + }, + quality=1.0 + ) + self.current_visit_start = None + return event + else: + # 太短,忽略 + self.current_visit_start = None + return None + + return None + + def extract_vital_signs_from_radar(self, phase_signal, fs=20): + """ + 从雷达相位信号提取生理信号(起夜期间) + + 返回:{'hr': float, 'rr': float} + """ + try: + if len(phase_signal) < fs * 10: # 至少10秒数据 + return None + + # 心率提取(FFT) + freqs, psd = signal.welch(phase_signal, fs=fs, nperseg=min(256, len(phase_signal))) + hr_mask = (freqs >= 0.8) & (freqs <= 2.5) + hr_freqs = freqs[hr_mask] + hr_psd = psd[hr_mask] + + hr_peak_idx = np.argmax(hr_psd) + hr = hr_freqs[hr_peak_idx] * 60 + + # 呼吸率提取 + rr_mask = (freqs >= 0.15) & (freqs <= 0.5) + rr_freqs = freqs[rr_mask] + rr_psd = psd[rr_mask] + + rr_peak_idx = np.argmax(rr_psd) + rr = rr_freqs[rr_peak_idx] * 60 + + # 合理性检查 + if 40 <= hr <= 130 and 6 <= rr <= 30: + return {'hr': float(hr), 'rr': float(rr)} + else: + return None + + except Exception as e: + return None + +# ==================== 4. 实时风险评估引擎 ==================== + +class RealtimeRiskEngine: + """实时风险评估引擎(硬规则,<10ms)""" + + def __init__(self, config, baseline_cache): + self.config = config + self.baseline = baseline_cache + self.risk_cfg = config.risk_config + + def calculate_risk_score(self, features): + """ + 快速风险评分(硬规则) + + 输入:features = { + 'rmssd': float, + 'sdnn': float, + 'mean_hr': float, + 'hr_surge': float, # 起夜时心率上升 + 'bathroom_freq_today': int + } + + 返回:(总风险分数0-1, 风险等级, 分项风险) + """ + weights = self.risk_cfg['weights'] + + # 1. RMSSD风险(35%权重) + rmssd = features.get('rmssd', 0) + rmssd_baseline = self.baseline.get('rmssd', 35.0) + + if rmssd < self.risk_cfg['rmssd_high_risk']: + rmssd_risk = 1.0 + elif rmssd < self.risk_cfg['rmssd_medium_risk']: + rmssd_risk = 0.6 + elif rmssd < rmssd_baseline: + rmssd_risk = 0.4 + else: + rmssd_risk = 0.2 + + # 2. SDNN风险(15%权重) + sdnn = features.get('sdnn', 0) + if sdnn < 30: + sdnn_risk = 1.0 + elif sdnn < 45: + sdnn_risk = 0.6 + else: + sdnn_risk = 0.2 + + # 3. 心率水平风险(20%权重) + hr = features.get('mean_hr', 70) + hr_baseline = self.baseline.get('mean_hr', 70.0) + + if hr > self.risk_cfg['hr_high_risk']: + hr_risk = 1.0 + elif hr > self.risk_cfg['hr_medium_risk']: + hr_risk = 0.6 + elif hr > hr_baseline + 5: + hr_risk = 0.4 + else: + hr_risk = 0.2 + + # 4. 起夜心率上升风险(10%权重) + hr_surge = features.get('hr_surge', 0) + if hr_surge > self.risk_cfg['hr_surge_high_risk']: + hr_surge_risk = 1.0 + elif hr_surge > self.risk_cfg['hr_surge_medium_risk']: + hr_surge_risk = 0.6 + else: + hr_surge_risk = 0.3 + + # 5. 起夜频率风险(20%权重) + bathroom_freq = features.get('bathroom_freq_today', 0) + bathroom_baseline = self.baseline.get('bathroom_freq', 1.5) + + if bathroom_freq >= self.risk_cfg['bathroom_high_risk']: + bathroom_risk = 1.0 + elif bathroom_freq >= self.risk_cfg['bathroom_medium_risk']: + bathroom_risk = 0.6 + elif bathroom_freq > bathroom_baseline: + bathroom_risk = 0.4 + else: + bathroom_risk = 0.2 + + # 6. 加权综合 + total_risk = ( + weights['rmssd'] * rmssd_risk + + weights['sdnn'] * sdnn_risk + + weights['hr_level'] * hr_risk + + weights['hr_surge'] * hr_surge_risk + + weights['bathroom_freq'] * bathroom_risk + ) + + # 7. 风险等级分类 + if total_risk >= self.config.alert_config['high_risk_threshold']: + risk_level = 'HIGH' + elif total_risk >= self.config.alert_config['medium_risk_threshold']: + risk_level = 'MEDIUM' + else: + risk_level = 'LOW' + + # 8. 分项风险 + breakdown = { + 'rmssd_risk': rmssd_risk, + 'sdnn_risk': sdnn_risk, + 'hr_risk': hr_risk, + 'hr_surge_risk': hr_surge_risk, + 'bathroom_risk': bathroom_risk + } + + return float(total_risk), risk_level, breakdown + + def should_trigger_alert(self, risk_score, risk_level, last_alert_time): + """ + 判断是否触发预警 + + 返回:(是否预警, 预警消息) + """ + # 冷却时间检查 + if last_alert_time is not None: + cooldown = self.config.alert_config['alert_cooldown'] + if time.time() - last_alert_time < cooldown: + return False, None + + # 只对高风险预警 + if risk_level == 'HIGH': + message = f"⚠️ 高血压风险预警!风险评分: {risk_score:.2f}" + return True, message + + return False, None + +# ==================== 5. 云端通信层 ==================== + +class CloudUploader: + """云端数据上传器(匿名化)""" + + def __init__(self, config): + self.config = config + self.upload_buffer = [] + self.last_upload_time = 0 + + def add_to_buffer(self, event): + """添加事件到上传缓冲区""" + if not self.config.cloud_config['enable_upload']: + return + + # 匿名化 + if self.config.cloud_config['anonymize']: + event = event.anonymize() + + self.upload_buffer.append(event.to_dict()) + + def should_upload(self): + """判断是否应该上传""" + if not self.config.cloud_config['enable_upload']: + return False + + interval = self.config.cloud_config['upload_interval'] + return (time.time() - self.last_upload_time) >= interval + + def upload(self): + """ + 上传数据到云端 + + 返回:是否成功 + """ + if len(self.upload_buffer) == 0: + return True + + try: + # 构建上传数据包 + payload = { + 'device_id': config.device_id if not self.config.cloud_config['anonymize'] else 'anonymous', + 'timestamp': time.time(), + 'events': self.upload_buffer, + 'version': config.version + } + + # 这里应该调用实际的HTTP上传接口 + # import requests + # response = requests.post( + # self.config.cloud_config['upload_url'], + # json=payload, + # headers={'Authorization': f"Bearer {self.config.cloud_config['api_key']}"} + # ) + # success = response.status_code == 200 + + # 模拟上传成功 + print(f" ☁️ [Cloud Upload] Uploaded {len(self.upload_buffer)} events to cloud") + + # 清空缓冲区 + self.upload_buffer = [] + self.last_upload_time = time.time() + + return True + + except Exception as e: + print(f" ❌ [Cloud Upload] Failed: {e}") + return False + +# ==================== 6. 实时处理主控制器 ==================== + +class RealtimeController: + """实时处理主控制器""" + + def __init__(self, config): + self.config = config + + # 初始化组件 + self.bcg_processor = BCGProcessor(config) + self.radar_processor = RadarProcessor(config) + self.baseline_cache = BaselineCache( + os.path.join(config.cache_config['cache_dir'], config.cache_config['baseline_file']) + ) + self.risk_engine = RealtimeRiskEngine(config, self.baseline_cache) + self.cloud_uploader = CloudUploader(config) + + # 数据缓冲区 + buffer_size = config.bcg_config['window_size'] * config.bcg_config['sampling_rate'] + self.bcg_buffer = RingBuffer(buffer_size) + + # 状态变量 + self.last_process_time = 0 + self.last_alert_time = None + self.last_log_time = 0 + self.bathroom_visits_today = [] + self.night_start_time = None + + # 统计变量 + self.stats = { + 'total_processed': 0, + 'total_alerts': 0, + 'avg_latency': 0.0 + } + + print(f"{'='*70}") + print(f"🚀 边缘端实时处理系统初始化成功") + print(f"{'='*70}") + print(f"版本: {config.version}") + print(f"设备: {config.device_id}") + print(f"BCG采样率: {config.bcg_config['sampling_rate']} Hz") + print(f"处理窗口: {config.bcg_config['window_size']} 秒") + print(f"云端上传: {'启用' if config.cloud_config['enable_upload'] else '禁用'}") + print(f"{'='*70}") + + def process_bcg_sample(self, sample, timestamp): + """ + 处理单个BCG样本点(流式输入) + + 输入: + - sample: float(BCG信号值) + - timestamp: float(Unix时间戳) + """ + # 添加到缓冲区 + self.bcg_buffer.append(sample) + + # 检查是否应该处理 + interval = self.config.realtime_config['processing_interval'] + if time.time() - self.last_process_time < interval: + return + + # 检查缓冲区是否已满 + if not self.bcg_buffer.is_full(): + return + + # 执行处理 + self._process_bcg_window(timestamp) + self.last_process_time = time.time() + + def process_radar_frame(self, point_cloud, timestamp): + """ + 处理雷达点云帧 + + 输入: + - point_cloud: [(x, y, z, velocity), ...] + - timestamp: float + """ + # 存在检测 + is_present, confidence = self.radar_processor.detect_presence_in_bathroom(point_cloud) + + # 起夜事件检测 + visit_event = self.radar_processor.detect_bathroom_visit(timestamp, is_present) + + if visit_event is not None: + # 记录起夜事件 + self.bathroom_visits_today.append(visit_event) + + # 上传到云端 + self.cloud_uploader.add_to_buffer(visit_event) + + # 打印日志 + duration = visit_event.data['duration'] + print(f" 🚽 [Bathroom Visit] Duration: {duration:.0f}s | Total today: {len(self.bathroom_visits_today)}") + + # 定期上传 + if self.cloud_uploader.should_upload(): + self.cloud_uploader.upload() + + def _process_bcg_window(self, timestamp): + """处理BCG窗口数据""" + start_time = time.time() + + # 获取窗口数据 + bcg_signal = self.bcg_buffer.get_data() + + # 1. 质量评估 + quality = self.bcg_processor.assess_signal_quality(bcg_signal) + + # 检查是否离床 + if quality < self.config.bcg_config['离bed_threshold']: + if self.config.realtime_config['enable_logging']: + if time.time() - self.last_log_time > 60: + print(f" ⚠️ [BCG Quality] Low quality (离床): {quality:.2f}") + self.last_log_time = time.time() + return + + # 2. 心率提取 + hr, hr_conf, rr_intervals = self.bcg_processor.extract_heart_rate(bcg_signal) + + if hr is None: + return + + # 3. HRV特征 + hrv_features = None + if rr_intervals is not None and len(rr_intervals) >= 10: + hrv_features = self.bcg_processor.calculate_hrv_features(rr_intervals) + + # 4. 呼吸率提取 + rr, rr_conf = self.bcg_processor.extract_respiratory_rate(bcg_signal) + + # 5. 风险评估 + if hrv_features is not None: + features = { + 'rmssd': hrv_features['rmssd'], + 'sdnn': hrv_features['sdnn'], + 'mean_hr': hr, + 'hr_surge': 0, # 暂时没有起夜心率数据 + 'bathroom_freq_today': len(self.bathroom_visits_today) + } + + risk_score, risk_level, breakdown = self.risk_engine.calculate_risk_score(features) + + # 6. 预警判断 + should_alert, alert_msg = self.risk_engine.should_trigger_alert( + risk_score, risk_level, self.last_alert_time + ) + + if should_alert: + self._trigger_alert(alert_msg, risk_score, features) + self.last_alert_time = time.time() + + # 7. 创建事件并上传 + event = SensorEvent( + timestamp=timestamp, + event_type='BCG_REALTIME', + data={ + 'hr': hr, + 'hr_confidence': hr_conf, + 'rr': rr if rr else 0, + 'rmssd': hrv_features['rmssd'], + 'sdnn': hrv_features['sdnn'], + 'pnn50': hrv_features['pnn50'], + 'risk_score': risk_score, + 'risk_level': risk_level, + 'quality': quality + }, + quality=quality + ) + + self.cloud_uploader.add_to_buffer(event) + + # 8. 打印日志 + if self.config.realtime_config['enable_logging']: + if time.time() - self.last_log_time > self.config.realtime_config['log_interval']: + latency = (time.time() - start_time) * 1000 + self._print_status(hr, hrv_features, risk_score, risk_level, quality, latency) + self.last_log_time = time.time() + + # 更新统计 + self.stats['total_processed'] += 1 + latency = (time.time() - start_time) * 1000 + self.stats['avg_latency'] = (self.stats['avg_latency'] * 0.9 + latency * 0.1) + + def _trigger_alert(self, message, risk_score, features): + """触发预警""" + self.stats['total_alerts'] += 1 + + print(f"{'='*70}") + print(f"⚠️ 警报触发 #{self.stats['total_alerts']}") + print(f"{'='*70}") + print(message) + print(f"详细信息:") + print(f" 心率: {features['mean_hr']:.1f} bpm") + print(f" RMSSD: {features['rmssd']:.1f} ms") + print(f" SDNN: {features['sdnn']:.1f} ms") + print(f" 起夜次数(今晚): {features['bathroom_freq_today']}") + print(f" 风险评分: {risk_score:.3f}") + print(f"{'='*70}") + + # 这里可以添加实际的预警机制(声音、推送等) + if self.config.alert_config['enable_sound']: + # 播放警报声音 + pass + + if self.config.alert_config['enable_push']: + # 发送推送通知 + pass + + def _print_status(self, hr, hrv_features, risk_score, risk_level, quality, latency): + """打印状态信息""" + print(f"{'='*70}") + print(f"📊 实时状态更新 - {datetime.now().strftime('%H:%M:%S')}") + print(f"{'='*70}") + print(f"生理指标:") + print(f" 心率: {hr:.1f} bpm") + print(f" RMSSD: {hrv_features['rmssd']:.1f} ms") + print(f" SDNN: {hrv_features['sdnn']:.1f} ms") + print(f" pNN50: {hrv_features['pnn50']:.3f}") + print(f"行为指标:") + print(f" 起夜次数(今晚): {len(self.bathroom_visits_today)}") + print(f"风险评估:") + print(f" 风险评分: {risk_score:.3f}") + print(f" 风险等级: {risk_level}") + print(f" 信号质量: {quality:.2f}") + print(f"系统性能:") + print(f" 处理延迟: {latency:.1f} ms") + print(f" 累计处理: {self.stats['total_processed']} 次") + print(f" 累计预警: {self.stats['total_alerts']} 次") + print(f"{'='*70}") + + def end_of_night_summary(self): + """夜间结束总结""" + print(f"{'='*70}") + print(f"🌙 夜间监测总结") + print(f"{'='*70}") + print(f"监测时长: {(time.time() - self.night_start_time)/3600:.1f} 小时") + print(f"起夜次数: {len(self.bathroom_visits_today)}") + print(f"触发预警: {self.stats['total_alerts']} 次") + print(f"处理次数: {self.stats['total_processed']} 次") + print(f"平均延迟: {self.stats['avg_latency']:.1f} ms") + print(f"{'='*70}") + + # 更新基线(如果有足够数据) + # TODO: 计算整晚特征并更新基线 + + # 重置当日计数 + self.bathroom_visits_today = [] + self.stats['total_alerts'] = 0 + +# ==================== 7. 模拟数据生成器(测试用) ==================== + +class EdgeDataSimulator: + """边缘端数据模拟器(用于测试)""" + + def __init__(self, config): + self.config = config + + def generate_synthetic_bcg_stream(self, duration_sec, hr=70, scenario='normal'): + """ + 生成合成BCG数据流 + + 参数: + - duration_sec: 时长(秒) + - hr: 心率(bpm) + - scenario: 'normal', 'high_risk', 'bathroom_visit' + """ + fs = self.config.bcg_config['sampling_rate'] + total_samples = int(duration_sec * fs) + + t = np.linspace(0, duration_sec, total_samples) + + # 基础心跳信号 + hr_freq = hr / 60.0 + signal_bcg = np.sin(2 * np.pi * hr_freq * t) + + # 呼吸信号 + rr_freq = 15 / 60.0 + signal_bcg += 0.5 * np.sin(2 * np.pi * rr_freq * t) + + # 根据场景调整 + if scenario == 'high_risk': + # 高风险:降低HRV + signal_bcg += 0.05 * np.random.randn(total_samples) + elif scenario == 'bathroom_visit': + # 起夜:质量下降 + signal_bcg = signal_bcg * 0.2 + 0.8 * np.random.randn(total_samples) + else: + # 正常 + signal_bcg += 0.1 * np.random.randn(total_samples) + + return signal_bcg, t + + def generate_synthetic_radar_stream(self, duration_sec, bathroom_visits=[]): + """ + 生成合成雷达数据流 + + 参数: + - duration_sec: 时长(秒) + - bathroom_visits: [(start_time, end_time), ...] + """ + fs = self.config.radar_config['sampling_rate'] + total_frames = int(duration_sec * fs) + + radar_frames = [] + + for i in range(total_frames): + current_time = i / fs + + # 检查是否在起夜时间段 + in_bathroom = False + for start, end in bathroom_visits: + if start <= current_time <= end: + in_bathroom = True + break + + if in_bathroom: + # 生成厕所区域内的点云 + zone = self.config.radar_config['bathroom_zone'] + num_points = np.random.randint(10, 30) + point_cloud = [] + for _ in range(num_points): + x = np.random.uniform(*zone['x_range']) + y = np.random.uniform(*zone['y_range']) + z = np.random.uniform(*zone['z_range']) + v = np.random.uniform(0.1, 0.5) # 有动作 + point_cloud.append((x, y, z, v)) + else: + # 生成空点云或随机噪声 + point_cloud = [] + + radar_frames.append((current_time, point_cloud)) + + return radar_frames + +# ==================== 8. 主程序 ==================== + +def run_realtime_simulation(duration_minutes=10): + """ + 运行实时模拟(测试用) + + 参数: + - duration_minutes: 模拟时长(分钟) + """ + print(f"{'='*70}") + print(f"🎬 启动边缘端实时模拟") + print(f"{'='*70}") + print(f"模拟时长: {duration_minutes} 分钟") + print(f"场景: 正常睡眠 + 2次起夜") + print(f"{'='*70}") + + # 初始化控制器 + controller = RealtimeController(config) + controller.night_start_time = time.time() + + # 初始化模拟器 + simulator = EdgeDataSimulator(config) + + # 定义起夜时间(相对时间,秒) + bathroom_visits = [ + (120, 180), # 2分钟开始,持续1分钟 + (420, 480) # 7分钟开始,持续1分钟 + ] + + # 生成BCG数据流 + duration_sec = duration_minutes * 60 + bcg_signal, bcg_time = simulator.generate_synthetic_bcg_stream( + duration_sec, hr=72, scenario='normal' + ) + + # 生成雷达数据流 + radar_frames = simulator.generate_synthetic_radar_stream( + duration_sec, bathroom_visits=bathroom_visits + ) + + print(f"✓ 数据生成完成") + print(f" BCG样本: {len(bcg_signal)}") + print(f" 雷达帧: {len(radar_frames)}") + print(f"开始实时处理...") + + # 模拟实时处理 + bcg_fs = config.bcg_config['sampling_rate'] + radar_fs = config.radar_config['sampling_rate'] + + bcg_idx = 0 + radar_idx = 0 + start_time = time.time() + + while bcg_idx < len(bcg_signal): + current_time = time.time() - start_time + + # 处理BCG样本(更高频率) + if bcg_idx < len(bcg_signal): + sample_time = start_time + bcg_time[bcg_idx] + controller.process_bcg_sample(bcg_signal[bcg_idx], sample_time) + bcg_idx += 1 + + # 处理雷达帧(较低频率) + expected_radar_idx = int(current_time * radar_fs) + if radar_idx < expected_radar_idx and radar_idx < len(radar_frames): + frame_time, point_cloud = radar_frames[radar_idx] + controller.process_radar_frame(point_cloud, start_time + frame_time) + radar_idx += 1 + + # 模拟实时间隔(加速模拟) + time.sleep(0.001) # 1ms(实际应该是1/250秒) + + # 定期上传 + if controller.cloud_uploader.should_upload(): + controller.cloud_uploader.upload() + + # 夜间总结 + controller.end_of_night_summary() + + print(f"{'='*70}") + print(f"✅ 模拟完成") + print(f"{'='*70}") + +def main(): + """主函数""" + print(f"{'='*70}") + print(f"🏥 高血压风险评估系统 - 边缘端模块") + print(f"{'='*70}") + print(f"版本: {config.version}") + print(f"设备: {config.device_id}") + print(f"部署模式: 混合部署(边缘实时 + 云端深度分析)") + print(f"{'='*70}") + + print("📋 使用说明:") + print("1. 本代码运行在RK3588边缘设备") + print("2. 实时处理BCG和雷达传感器数据") + print("3. 提供<1秒延迟的实时预警") + print("4. 定期上传匿名化数据到云端") + print() + print("🎮 运行模式:") + print(" - 模拟模式:使用合成数据测试系统") + print(" - 实际模式:连接真实传感器(需要硬件接口)") + print() + + # 运行模拟 + run_realtime_simulation(duration_minutes=10) + +if __name__ == "__main__": + main() diff --git a/0112-2026 - Test_Package/requirements-依赖清单.txt b/0112-2026 - Test_Package/requirements-依赖清单.txt new file mode 100644 index 0000000..00bd16f --- /dev/null +++ b/0112-2026 - Test_Package/requirements-依赖清单.txt @@ -0,0 +1,8 @@ +flask>=2.3.0 +numpy>=1.24.0 +scipy>=1.10.0 +pandas>=2.0.0 +torch>=2.0.0 +matplotlib>=3.7.0 +sqlalchemy>=2.0.0 +requests>=2.31.0 diff --git a/0112-2026 - Test_Package/主指南(必读)-README_TEST.md b/0112-2026 - Test_Package/主指南(必读)-README_TEST.md new file mode 100644 index 0000000..2a9b8b1 --- /dev/null +++ b/0112-2026 - Test_Package/主指南(必读)-README_TEST.md @@ -0,0 +1,231 @@ +# 高血压风险评估系统 - 2天测试指南 + +**测试周期:2天(Day 1组件测试 + Day 2联调测试)** +**测试人员:成员A(云端负责人)+ 成员B(边缘端负责人)** +**测试目标:验证云端/边缘端基本功能可用,完成最小端到端闭环** + +--- + +## 📦 测试包内容 + +``` +test_package/ +├── README_TEST.md ← 你正在看的文件(测试总指南) +├── TestPlan-2Days.md ← 详细2天测试计划(按小时排) +├── requirements.txt ← Python依赖清单 +│ +├── docs/ ← 测试文档模板 +│ ├── Test-Log-Template.md ← 测试记录模板(复制后填写) +│ ├── Bug-Report-Template.md ← 缺陷报告模板 +│ └── Checklist-Cloud.md ← 云端测试检查表 +│ └── Checklist-Edge.md ← 边缘端测试检查表 +│ +├── scripts/ ← 测试脚本 +│ ├── check_health.py ← 快速健康检查 +│ ├── test_upload.py ← 上传测试(正常场景) +│ ├── test_report.py ← 报告获取测试 +│ ├── test_negative.py ← 异常场景测试 +│ └── start_cloud_bg.py ← 后台启动云端(可选) +│ +├── test_data/ ← 测试样例数据 +│ ├── sample_normal.json ← 正常上传样例 +│ ├── sample_missing_field.json ← 缺字段样例 +│ └── sample_wrong_type.json ← 错误类型样例 +│ +├── artifacts/ ← 测试结果输出目录(自动生成) +│ ├── cloud/ ← 云端测试产物 +│ └── edge/ ← 边缘端测试产物 +│ +└── [代码文件] + ├── cloud_hypertension_system.py ← 云端主程序 + └── edge_hypertension_system.py ← 边缘端主程序 +``` + +--- + +## ⚡ 5分钟快速启动(必读) + +### 环境准备(两人都做) + +```bash +# 1. 安装依赖 +pip install -r requirements.txt + +# 2. 验证Python版本(需要3.8+) +python --version + +# 3. 快速测试:检查云端健康 +python scripts/check_health.py +# 如果看到"❌ 无法连接"是正常的(此时云端还没启动) +``` + +### 成员A:云端快速测试(5分钟) + +```bash +# 第1步:在PowerShell/终端1中启动云端 +python cloud_hypertension_system.py + +# 看到这个就对了: +# ✓ 数据库初始化成功 +# Running on http://127.0.0.1:5000 + +# 保持这个窗口运行! + +# 第2步:打开新终端2,测试健康检查 +python scripts/check_health.py +# 应该看到:✅ 服务器正在运行 + +# 第3步:测试上传 +python scripts/test_upload.py +# 应该看到:✅ 上传成功! + +# ✅ 如果以上3步都成功 → 云端基本可用 +``` + +### 成员B:边缘端快速测试(5分钟) + +```bash +# 第1步:直接运行边缘端(模拟模式) +python edge_hypertension_system.py + +# 应该看到每30秒输出: +# 📊 实时状态更新 +# 心率: 72.3 bpm +# 风险评分: 0.285 (LOW) + +# 第2步:运行10分钟,观察: +# - 有起夜事件(🚽) +# - 有风险预警(⚠️,可能出现) +# - 无崩溃、无异常 + +# ✅ 如果能稳定运行10分钟 → 边缘端基本可用 +``` + +--- + +## 🎯 关键配置说明 + +### 云端配置(cloud_hypertension_system.py) + +```python +# 第23-28行:API配置 +'port': 5000, # 监听端口 +'api_keys': [ # 鉴权密钥(测试用) + 'edge_device_key_001', + 'edge_device_key_002' +] + +# 第44行:数据库配置 +'type': 'sqlite', # 使用SQLite(测试用) +'sqlite_path': './cloud_database.db' +``` + +### 边缘端配置(edge_hypertension_system.py) + +```python +# 第75-78行:云端上传配置 +'enable_upload': True, # 是否上传到云端 +'upload_url': 'http://127.0.0.1:5000/api/upload', # 云端地址 +'upload_interval': 300, # 上传间隔(秒) +'api_key': 'edge_device_key_001' + +# 第68-70行:设备ID +'device_id': 'edge_rk3588_001' # 测试设备ID +``` + +**⚠️ 重要:两人测试时使用不同的device_id!** +- 成员A测试时用:`test_edge_device_001` +- 成员B测试时用:`test_edge_device_002` +- 联调时用:`edge_rk3588_001` + +--- + +## 📋 测试执行流程(详细版见TestPlan-2Days.md) + +### Day 1:组件测试(并行) + +**上午(09:30-12:00)** +- ✅ 成员A:云端冒烟(health、upload、report) +- ✅ 成员B:边缘端本地运行稳定性 + +**下午(13:30-16:30)** +- ✅ 成员A:云端异常场景(401、400、重复上传) +- ✅ 成员B:边缘端上传容错(断网重连) +- ✅ 交叉复核(各自在对方机器上复现) + +### Day 2:联调测试(协同) + +**上午(09:30-12:00)** +- ✅ 端到端闭环:边缘端 → 上传 → 云端落库 → 报告生成 + +**下午(13:30-16:00)** +- ✅ 失败注入测试 +- ✅ 整理交付物 + +--- + +## ✅ 验收标准(必须通过) + +| 项目 | 成员A | 成员B | 证据文件 | +|------|------|------|---------| +| 云端健康检查 | ✅ | - | artifacts/cloud/health.json | +| upload→report闭环 | ✅ | - | artifacts/cloud/upload_response.json | +| 错误码正确(401/400) | ✅ | - | artifacts/cloud/negative_tests.log | +| 边缘端10分钟无崩溃 | - | ✅ | artifacts/edge/edge_10min.log | +| 联调成功 | ✅ | ✅ | artifacts/end2end/combined.log | + +--- + +## 🐛 常见问题速查 + +### Q1: 云端启动后,测试脚本报"连接拒绝"? +**A:** 检查云端是否真的在运行: +```bash +python scripts/check_health.py +# 如果显示"❌ 无法连接" → 云端没启动或崩溃了 +``` + +### Q2: 测试脚本报"500错误"? +**A:** 查看云端窗口的错误日志,通常是: +- 时间戳格式错误(应该是数字,不是字符串) +- 数据库锁定(关闭其他访问数据库的程序) + +### Q3: 边缘端上传失败? +**A:** 检查: +1. 云端是否启动?(check_health.py) +2. URL配置是否正确?(第77行) +3. API Key是否匹配?(第78行) + +### Q4: 端口5000被占用? +**A:** 查找并杀死占用进程: +```bash +# Windows +netstat -ano | findstr :5000 +taskkill /PID /F + +# 或者修改云端端口为5001 +``` + +--- + +## 📞 支持与反馈 + +- **技术问题**:立即在测试记录中标注,并截图+日志 +- **阻塞问题**:第一时间同步给项目负责人 +- **测试进度**:每天下班前提交当天Test-Log.md + +--- + +## 🎉 测试完成后交付 + +**必须提交(2个人一起):** +1. `Test-Log.md`(按模板填写,带时间线) +2. `Bug-List.xlsx`(所有缺陷,P0/P1/P2分级) +3. `artifacts/`目录(所有日志、截图、数据库文件) +4. `2-day-summary.md`(一页纸结论) + +**提交方式:** +- 打包成 `test-results-YYYYMMDD.zip` +- 发送给项目负责人 + +--- diff --git a/0112-2026 - Test_Package/详细计划-TestPlan-2Days.md b/0112-2026 - Test_Package/详细计划-TestPlan-2Days.md new file mode 100644 index 0000000..c9ec5ef --- /dev/null +++ b/0112-2026 - Test_Package/详细计划-TestPlan-2Days.md @@ -0,0 +1,479 @@ +# 高血压风险评估系统 - 2天详细测试计划 + +**版本:** v1.0 +**日期:** 2026-01-12 +**测试人员:** 成员A(云端)、成员B(边缘端) + +--- + +## 📅 Day 1:组件测试日(并行执行) + +### 09:30 - 10:00 | 启动会(两人一起) + +**地点:** 会议室或线上会议 +**参与:** 成员A、成员B、项目负责人 + +**议程:** +1. 明确测试范围(功能冒烟,不做性能与安全) +2. 对齐device_id规则: + - 成员A用:`test_edge_device_001` + - 成员B用:`test_edge_device_002` + - 联调用:`edge_rk3588_001` +3. 确认日志输出目录:`artifacts/cloud/` 和 `artifacts/edge/` +4. 同步沟通方式(钉钉/微信群即时响应) + +**产出:** 两人都清楚自己的任务清单 + +--- + +### 10:00 - 12:00 | 上午:组件冒烟(并行) + +#### 成员A:云端基础功能验证 + +**任务清单:** + +✅ **Task 1.1:服务启动验证(15分钟)** +```bash +# 1. 启动云端 +python cloud_hypertension_system.py + +# 2. 检查输出 +# 预期: +# ✓ 数据库初始化成功 +# Running on http://127.0.0.1:5000 +``` +**记录:** 截图启动日志,保存为 `artifacts/cloud/startup.png` + +--- + +✅ **Task 1.2:健康检查API(10分钟)** +```bash +# 在新终端运行 +python scripts/check_health.py +``` +**验收标准:** +- 返回状态码200 +- 响应包含:`"status": "healthy"` + +**记录:** 保存响应到 `artifacts/cloud/health.json` + +--- + +✅ **Task 1.3:正常上传测试(30分钟)** +```bash +python scripts/test_upload.py +``` +**验收标准:** +- 返回状态码200 +- 响应包含:`"status": "success"` +- 云端控制台显示:`☁️ [API] 接收上传: 设备=test_edge_device_001` + +**检查数据库:** +```bash +# 查看数据库是否有数据 +sqlite3 cloud_database.db "SELECT COUNT(*) FROM sensor_data;" +# 应该返回:3(上传了3条事件) +``` + +**记录:** +- 保存响应到 `artifacts/cloud/upload_response.json` +- 保存云端日志到 `artifacts/cloud/upload.log` + +--- + +✅ **Task 1.4:报告生成测试(30分钟)** +```bash +python scripts/test_report.py +``` +**验收标准:** +- 返回状态码200 +- 生成报告文件:`./reports/report_test_edge_device_001_*.json` +- 报告包含基本统计(虽然数据很少) + +**记录:** +- 复制生成的报告到 `artifacts/cloud/report_sample.json` + +--- + +✅ **Task 1.5:数据库检查(15分钟)** +```bash +# 检查各表记录数 +sqlite3 cloud_database.db < artifacts/edge/edge_45min.log 2>&1 + +# 让它运行45分钟,观察输出 +``` + +**验收标准(每30秒检查一次):** +- ✅ 心率输出在合理范围(50-100 bpm) +- ✅ 风险评分有数值(0-1之间) +- ✅ 无Python异常/崩溃 +- ✅ 至少看到1次起夜事件(🚽) + +**记录:** +- 截图正常输出(3-5张) +- 截图起夜事件 +- 保存完整日志:`artifacts/edge/edge_45min.log` + +--- + +✅ **Task 1.2:输出格式验证(15分钟)** + +**检查输出是否包含:** +- 时间戳 +- 生理指标(心率、RMSSD、SDNN) +- 风险评分与等级 +- 系统性能(处理延迟) + +**记录:** 在 `Test-Log.md` 中记录样例输出 + +--- + +### 12:00 - 13:30 | 午休 + +--- + +### 13:30 - 15:30 | 下午:深入测试(并行) + +#### 成员A:云端异常场景与健壮性 + +✅ **Task 2.1:异常场景测试(60分钟)** +```bash +python scripts/test_negative.py +``` + +**测试场景:** + +1. **401 Unauthorized(错误API Key)** + - 修改脚本中的API Key为错误值 + - 预期:返回401,错误信息清晰 + +2. **400 Bad Request(缺少字段)** + - 使用 `test_data/sample_missing_field.json` + - 预期:返回400,指出缺少哪个字段 + +3. **400 Bad Request(类型错误)** + - 使用 `test_data/sample_wrong_type.json` + - 预期:返回400,指出类型不匹配 + +**记录:** 每个场景保存: +- 请求内容 +- 响应状态码 +- 错误信息 +- 云端日志 + +保存到:`artifacts/cloud/negative_tests.md` + +--- + +✅ **Task 2.2:重复上传测试(30分钟)** + +**目标:** 验证重复上传的行为 + +```python +# 连续上传同一份数据3次 +for i in range(3): + response = requests.post(...) + print(f"第{i+1}次上传:{response.status_code}") +``` + +**检查数据库:** +```bash +sqlite3 cloud_database.db "SELECT COUNT(*) FROM sensor_data WHERE device_id='test_edge_device_001';" +# 记录数量是3、9还是其他? +``` + +**记录:** +- 重复上传行为(是幂等/累加/拒绝?) +- 如果不符合预期,记录为Bug + +--- + +#### 成员B:边缘端容错与上传 + +✅ **Task 2.1:上传功能测试(60分钟)** + +**前提:** 确保云端正在运行(与成员A协调) + +**步骤:** + +1. **修改边缘端配置,启用上传:** + ```python + # edge_hypertension_system.py 第75行 + 'enable_upload': True, + 'upload_url': 'http://127.0.0.1:5000/api/upload', + ``` + +2. **运行边缘端,观察上传:** + ```bash + python edge_hypertension_system.py + ``` + +3. **每5分钟应该看到:** + ``` + ☁️ [Cloud Upload] Uploaded 24 events to cloud + ✅ 上传成功! + ``` + +**验收标准:** +- 至少成功上传2次(10分钟内) +- 云端控制台显示接收日志 + +**记录:** +- 边缘端上传日志 +- 云端接收日志(找成员A要) + +--- + +✅ **Task 2.2:上传失败与重试(30分钟)** + +**模拟场景1:云端不可达** + +1. **停止云端服务**(找成员A协调) +2. 观察边缘端输出: + ``` + ☁️ [Cloud Upload] Failed: Connection refused + ``` +3. 边缘端应该继续本地处理(不崩溃) + +**模拟场景2:云端恢复** + +1. **重新启动云端** +2. 观察边缘端下次上传是否成功 + +**记录:** +- 失败日志:`artifacts/edge/upload_fail.log` +- 恢复日志:`artifacts/edge/upload_recover.log` + +--- + +### 15:30 - 16:30 | 交叉复核 + +**目标:** 避免"只在我电脑上能跑"的问题 + +**步骤:** + +1. **成员A 在自己电脑上复现 成员B的操作:** + - 运行边缘端10分钟 + - 检查输出是否正常 + +2. **成员B 在自己电脑上复现 成员A的操作:** + - 启动云端 + - 运行 `test_upload.py` + - 检查是否成功 + +**记录:** +- 如果复现失败,记录差异(系统版本、Python版本、依赖版本) + +--- + +### 16:30 - 17:00 | Day 1总结 + +**两人一起填写:** +- Day 1完成情况(哪些通过,哪些失败) +- 发现的问题清单 +- Day 2重点关注点 + +--- + +## 📅 Day 2:联调测试日(协同执行) + +### 09:30 - 11:00 | 端到端闭环测试 + +**目标:** 验证完整链路:边缘端 → 云端 → 报告 + +**前提:** +- 两人在同一网络 +- 或者都连接到云端服务器 + +**步骤:** + +✅ **Step 1:启动云端(成员A)** +```bash +python cloud_hypertension_system.py +``` + +✅ **Step 2:配置边缘端(成员B)** +```python +# 修改 device_id 为联调专用 +'device_id': 'edge_rk3588_001', +'upload_url': 'http://127.0.0.1:5000/api/upload', # 或成员A的IP +``` + +✅ **Step 3:启动边缘端(成员B)** +```bash +python edge_hypertension_system.py +``` + +✅ **Step 4:等待15分钟** +- 边缘端应该上传3次(每5分钟一次) +- 云端应该接收到数据 + +✅ **Step 5:生成报告(成员A)** +```bash +python scripts/test_report.py +# 或直接访问API: +# GET http://127.0.0.1:5000/api/report/edge_rk3588_001 +``` + +✅ **Step 6:验证结果** +- 报告包含15分钟的数据 +- 数据量约:72条记录(15分钟 × 2次/分钟 × 240秒) + +**记录:** +- 边缘端日志:`artifacts/end2end/edge.log` +- 云端日志:`artifacts/end2end/cloud.log` +- 生成的报告:`artifacts/end2end/report.json` +- 数据库文件:`artifacts/end2end/cloud_database.db` + +--- + +### 11:00 - 12:00 | 失败注入测试 + +**目标:** 验证系统容错与错误提示 + +✅ **Scenario 1:错误API Key(10分钟)** +- 修改边缘端API Key为错误值 +- 预期:边缘端显示"401 Unauthorized" +- 预期:边缘端继续本地处理 + +✅ **Scenario 2:云端不可达(15分钟)** +- 停止云端 +- 预期:边缘端显示连接失败,但不崩溃 +- 重启云端 +- 预期:边缘端恢复上传 + +✅ **Scenario 3:数据格式错误(15分钟)** +- 手动修改上传数据,制造格式错误 +- 预期:云端返回400错误,指出问题 +- 预期:边缘端记录错误但不崩溃 + +**记录:** 每个场景的日志和截图 + +--- + +### 12:00 - 13:30 | 午休 + +--- + +### 13:30 - 15:00 | 整理交付物 + +**任务分工:** + +**成员A:整理云端产物** +- [ ] 收集所有日志文件到 `artifacts/cloud/` +- [ ] 导出数据库文件 +- [ ] 整理API测试结果 +- [ ] 填写 `Checklist-Cloud.md` + +**成员B:整理边缘端产物** +- [ ] 收集所有日志文件到 `artifacts/edge/` +- [ ] 整理运行截图 +- [ ] 填写 `Checklist-Edge.md` + +**两人一起:** +- [ ] 填写 `Test-Log.md`(完整时间线) +- [ ] 填写 `Bug-List.xlsx`(所有缺陷) +- [ ] 撰写 `2-day-summary.md` + +--- + +### 15:00 - 16:00 | 测试总结会 + +**参与:** 成员A、成员B、项目负责人 + +**议程(只回答3个问题):** + +1. **现在能否演示闭环?** + - 能 / 不能 + - 如果不能,差哪一步? + +2. **最大阻塞是什么?** + - 列出Top 3问题 + - 每个问题的建议解决方案 + +3. **下一步(传感器接入)最先要修哪3个点?** + - P0级问题优先 + - 给出修复优先级 + +**产出:** +- 明确下一步行动计划 +- 分配Bug修复责任人 + +--- + +## 📦 最终交付清单 + +**必须提交(打包成test-results-YYYYMMDD.zip):** + +``` +test-results-20260114/ +├── Test-Log.md ← 完整测试记录 +├── Bug-List.xlsx ← 缺陷清单(P0/P1/P2分级) +├── 2-day-summary.md ← 一页纸总结 +├── Checklist-Cloud.md ← 云端检查表(已勾选) +├── Checklist-Edge.md ← 边缘端检查表(已勾选) +│ +└── artifacts/ ← 所有测试产物 + ├── cloud/ + │ ├── startup.png + │ ├── health.json + │ ├── upload_response.json + │ ├── report_sample.json + │ ├── negative_tests.md + │ ├── cloud_database.db ← 数据库快照 + │ └── *.log ← 所有日志 + │ + ├── edge/ + │ ├── edge_45min.log + │ ├── upload_fail.log + │ ├── screenshots/ ← 截图目录 + │ └── *.log + │ + └── end2end/ + ├── edge.log + ├── cloud.log + ├── report.json + └── cloud_database.db +``` + +--- + +## 🎯 验收标准(项目负责人用) + +| 编号 | 测试项 | 责任人 | 必须通过 | 证据文件 | +|-----|--------|--------|---------|---------| +| C1 | 云端服务启动 | A | ✅ | artifacts/cloud/startup.png | +| C2 | 健康检查API | A | ✅ | artifacts/cloud/health.json | +| C3 | upload→report闭环 | A | ✅ | artifacts/cloud/upload_response.json + report_sample.json | +| C4 | 错误码正确(401/400) | A | ✅ | artifacts/cloud/negative_tests.md | +| C5 | 数据库落库正确 | A | ✅ | artifacts/cloud/db_check.txt | +| E1 | 边缘端45分钟无崩溃 | B | ✅ | artifacts/edge/edge_45min.log | +| E2 | 心率/风险评分正常 | B | ✅ | artifacts/edge/screenshots/ | +| E3 | 上传成功 | B | ✅ | artifacts/edge/*.log | +| E4 | 上传失败容错 | B | Nice | artifacts/edge/upload_fail.log | +| I1 | 端到端闭环成功 | A+B | ✅ | artifacts/end2end/* | +| I2 | 失败注入测试 | A+B | Nice | artifacts/end2end/failure_*.log | + +**Must(必须通过):** C1-C5, E1-E3, I1 +**Nice(加分项):** E4, I2 + +--- + +**测试愉快!有问题随时沟通。** 🚀