myTestFreqAI/test_es_connection.py
zhangkun9038@dingtalk.com 30c4c750af 使用es记录log而非redis
2026-01-17 01:58:35 +08:00

110 lines
3.1 KiB
Python

#!/usr/bin/env python3
"""测试 Elasticsearch 连接"""
import requests
import json
from datetime import datetime
# ES 配置
ES_URL = "http://elastic.k8s.xunlang.home/"
ES_USERNAME = "elastic"
ES_PASSWORD = "your_secure_password"
print(f"正在连接 Elasticsearch: {ES_URL}")
try:
# 1. 测试连接 - 获取集群信息
print("\n=== 1. 测试连接 ===")
response = requests.get(
ES_URL,
auth=(ES_USERNAME, ES_PASSWORD),
timeout=10
)
if response.status_code == 200:
info = response.json()
print(f"✅ Elasticsearch 连接成功!")
print(f"集群名称: {info['cluster_name']}")
print(f"ES 版本: {info['version']['number']}")
else:
print(f"❌ 连接失败: HTTP {response.status_code}")
print(response.text)
exit(1)
# 2. 测试写入文档
print("\n=== 2. 测试写入文档 ===")
test_index = "freqai.livelog.test-2026.01"
test_doc = {
"@timestamp": datetime.now().isoformat(),
"container_name": "test_container",
"pair": "BTC/USDT",
"test": "connection_test",
"value": 12345.67
}
print(f"索引: {test_index}")
response = requests.post(
f"{ES_URL}{test_index}/_doc",
auth=(ES_USERNAME, ES_PASSWORD),
headers={"Content-Type": "application/json"},
data=json.dumps(test_doc),
timeout=10
)
if response.status_code in [200, 201]:
result = response.json()
doc_id = result['_id']
print(f"✅ 文档写入成功! ID: {doc_id}")
else:
print(f"❌ 写入失败: HTTP {response.status_code}")
print(response.text)
exit(1)
# 3. 测试查询文档
print("\n=== 3. 测试查询文档 ===")
response = requests.post(
f"{ES_URL}{test_index}/_search",
auth=(ES_USERNAME, ES_PASSWORD),
headers={"Content-Type": "application/json"},
data=json.dumps({
"query": {"match_all": {}},
"size": 1
}),
timeout=10
)
if response.status_code == 200:
result = response.json()
total = result['hits']['total']['value']
print(f"✅ 查询成功! 找到 {total} 条记录")
if result['hits']['hits']:
doc = result['hits']['hits'][0]['_source']
print(f"\n示例文档:")
print(json.dumps(doc, indent=2, ensure_ascii=False))
else:
print(f"❌ 查询失败: HTTP {response.status_code}")
print(response.text)
# 4. 清理测试索引
print(f"\n=== 4. 清理测试索引 ===")
response = requests.delete(
f"{ES_URL}{test_index}",
auth=(ES_USERNAME, ES_PASSWORD),
timeout=10
)
if response.status_code in [200, 404]:
print(f"✅ 测试索引已删除")
else:
print(f"⚠️ 删除索引返回: HTTP {response.status_code}")
print("\n=== 测试完成 ===")
except requests.exceptions.RequestException as e:
print(f"❌ HTTP 请求失败: {e}")
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()