120 lines
3.6 KiB
Python
Raw Permalink Normal View History

2026-01-13 11:27:21 +08:00
"""上传测试脚本 - 测试正常数据上传"""
import requests
import json
import time
from datetime import datetime
# 配置
CLOUD_URL = "http://127.0.0.1:5000/api/upload"
API_KEY = "edge_device_key_001"
DEVICE_ID = "test_edge_device_001"
def test_upload():
"""测试正常上传"""
print("="*60)
print("📤 云端数据上传测试")
print("="*60)
# 当前Unix时间戳
current_time = time.time()
# 构造测试数据
test_data = {
"device_id": DEVICE_ID,
"upload_time": datetime.now().isoformat(),
"events": [
{
"timestamp": current_time,
"heart_rate": 72.5,
"rmssd": 32.8,
"sdnn": 45.2,
"pnn50": 0.125,
"signal_quality": 0.87,
"risk_score": 0.285,
"risk_level": "LOW"
},
{
"timestamp": current_time + 30,
"heart_rate": 74.2,
"rmssd": 30.5,
"sdnn": 42.8,
"pnn50": 0.115,
"signal_quality": 0.82,
"risk_score": 0.312,
"risk_level": "LOW"
},
{
"timestamp": current_time + 60,
"heart_rate": 71.8,
"rmssd": 34.2,
"sdnn": 46.5,
"pnn50": 0.135,
"signal_quality": 0.89,
"risk_score": 0.265,
"risk_level": "LOW"
}
],
"bathroom_events": [
{
"start_time": current_time + 300,
"end_time": current_time + 375,
"duration": 75
}
],
"summary": {
"avg_hr": 72.8,
"avg_rmssd": 32.5,
"avg_risk_score": 0.287,
"total_alerts": 0
}
}
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
print(f"URL: {CLOUD_URL}")
print(f"设备ID: {DEVICE_ID}")
print(f"事件数: {len(test_data['events'])}")
print(f"起夜事件: {len(test_data['bathroom_events'])}")
print("-" * 60)
try:
response = requests.post(CLOUD_URL, json=test_data, headers=headers, timeout=10)
print(f"\n状态码: {response.status_code}")
print(f"响应内容:")
print(json.dumps(response.json(), indent=2, ensure_ascii=False))
if response.status_code == 200:
print("\n" + "="*60)
print("✅ 上传成功!")
print("="*60)
print("\n💡 提示:可以查看云端控制台,应该能看到接收日志")
# 保存响应到文件
with open('artifacts/cloud/upload_response.json', 'w', encoding='utf-8') as f:
json.dump(response.json(), f, indent=2, ensure_ascii=False)
print(f"✓ 响应已保存: artifacts/cloud/upload_response.json")
return True
else:
print("\n" + "="*60)
print(f"❌ 上传失败: {response.json().get('error', '未知错误')}")
print("="*60)
return False
except requests.exceptions.ConnectionError:
print("\n❌ 连接失败: 无法连接到云端服务器")
print("请先运行: python scripts/check_health.py")
return False
except Exception as e:
print(f"\n❌ 请求失败: {e}")
return False
if __name__ == '__main__':
success = test_upload()
exit(0 if success else 1)