"""上传测试脚本 - 测试正常数据上传""" import requests import json import time from datetime import datetime # 配置 CLOUD_URL = "http://127.0.0.1:5000/api/upload" API_KEY = "edge_device_key_001" DEVICE_ID = "test_edge_device_001" def test_upload(): """测试正常上传""" print("="*60) print("📤 云端数据上传测试") print("="*60) # 当前Unix时间戳 current_time = time.time() # 构造测试数据 test_data = { "device_id": DEVICE_ID, "upload_time": datetime.now().isoformat(), "events": [ { "timestamp": current_time, "heart_rate": 72.5, "rmssd": 32.8, "sdnn": 45.2, "pnn50": 0.125, "signal_quality": 0.87, "risk_score": 0.285, "risk_level": "LOW" }, { "timestamp": current_time + 30, "heart_rate": 74.2, "rmssd": 30.5, "sdnn": 42.8, "pnn50": 0.115, "signal_quality": 0.82, "risk_score": 0.312, "risk_level": "LOW" }, { "timestamp": current_time + 60, "heart_rate": 71.8, "rmssd": 34.2, "sdnn": 46.5, "pnn50": 0.135, "signal_quality": 0.89, "risk_score": 0.265, "risk_level": "LOW" } ], "bathroom_events": [ { "start_time": current_time + 300, "end_time": current_time + 375, "duration": 75 } ], "summary": { "avg_hr": 72.8, "avg_rmssd": 32.5, "avg_risk_score": 0.287, "total_alerts": 0 } } headers = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json" } print(f"URL: {CLOUD_URL}") print(f"设备ID: {DEVICE_ID}") print(f"事件数: {len(test_data['events'])}") print(f"起夜事件: {len(test_data['bathroom_events'])}") print("-" * 60) try: response = requests.post(CLOUD_URL, json=test_data, headers=headers, timeout=10) print(f"\n状态码: {response.status_code}") print(f"响应内容:") print(json.dumps(response.json(), indent=2, ensure_ascii=False)) if response.status_code == 200: print("\n" + "="*60) print("✅ 上传成功!") print("="*60) print("\n💡 提示:可以查看云端控制台,应该能看到接收日志") # 保存响应到文件 with open('artifacts/cloud/upload_response.json', 'w', encoding='utf-8') as f: json.dump(response.json(), f, indent=2, ensure_ascii=False) print(f"✓ 响应已保存: artifacts/cloud/upload_response.json") return True else: print("\n" + "="*60) print(f"❌ 上传失败: {response.json().get('error', '未知错误')}") print("="*60) return False except requests.exceptions.ConnectionError: print("\n❌ 连接失败: 无法连接到云端服务器") print("请先运行: python scripts/check_health.py") return False except Exception as e: print(f"\n❌ 请求失败: {e}") return False if __name__ == '__main__': success = test_upload() exit(0 if success else 1)