This commit is contained in:
zhangkun9038@dingtalk.com 2025-10-28 01:41:26 +08:00
parent 31281d58c5
commit 9a2579b87f

View File

@ -3,26 +3,30 @@ package main
import (
"context"
"flag"
"log"
"fmt"
"os"
"os/signal"
"regexp"
"strconv"
"sync"
"syscall"
"time"
"github.com/go-redis/redis/v8"
"gitea.zjmud.xyz/phyer/myTestFreqAI/goflow/config"
)
// 全局变量
var (
ctx = context.Background()
hostname string
redisAddr string
redisPassword string
redisDB int
client *redis.Client
ctx = context.Background()
hostname string
redisAddr string
redisPassword string
redisDB int
client *redis.Client
wg sync.WaitGroup
)
// 获取正则表达式匹配的第一个结果
// 辅助函数 - 获取正则匹配的第一个结果
func getFirstMatch(re *regexp.Regexp, line string) string {
matches := re.FindStringSubmatch(line)
if len(matches) > 1 {
@ -71,44 +75,107 @@ func extractStepID(stepInfo string) int {
return id
}
// 主函数入口
func main() {
log.Println("开始初始化HyperOpt客户端...")
// 解析命令行参数
redisAddr = *flag.String("redis-addr", config.DefaultRedisConfig().Addr, "Redis服务器地址")
redisPassword = *flag.String("redis-password", config.DefaultRedisConfig().Password, "Redis密码")
redisDB = *flag.Int("redis-db", config.DefaultRedisConfig().DB, "Redis数据库索引")
flag.Parse()
log.Println("命令行参数解析完成")
// 订阅任务频道函数
func subscribeToTasks() {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("[订阅协程] 启动任务订阅协程")
// 创建订阅
pubsub := client.Subscribe(ctx, config.HyperoptTasksChannel, config.HyperoptChannel)
defer func() {
fmt.Println("[订阅协程] 关闭任务订阅连接")
pubsub.Close()
}()
fmt.Println("[订阅协程] 等待接收任务消息...")
ch := pubsub.Channel()
// 接收消息循环
for msg := range ch {
fmt.Printf("[订阅协程] 收到来自%s的任务消息\n", msg.Channel)
fmt.Printf("[订阅协程] 消息内容长度: %d 字节\n", len(msg.Payload))
}
}()
}
// 主函数
func main() {
// 确保输出立即刷新
err := os.Stdout.Sync()
if err != nil {
fmt.Printf("刷新输出失败: %v\n", err)
}
fmt.Println("开始初始化HyperOpt客户端...")
// 解析命令行参数
redisAddrFlag := flag.String("redis-addr", config.DefaultRedisConfig().Addr, "Redis服务器地址")
redisPasswordFlag := flag.String("redis-password", config.DefaultRedisConfig().Password, "Redis密码")
redisDBFlag := flag.Int("redis-db", config.DefaultRedisConfig().DB, "Redis数据库索引")
flag.Parse()
// 设置全局变量
redisAddr = *redisAddrFlag
redisPassword = *redisPasswordFlag
redisDB = *redisDBFlag
fmt.Println("命令行参数解析完成")
// 获取主机名
var err error
hostname, err = os.Hostname()
if err != nil {
log.Fatalf("获取主机名失败: %v", err)
fmt.Printf("错误: 获取主机名失败: %v\n", err)
os.Exit(1)
}
log.Printf("主机名: %s", hostname)
fmt.Printf("主机名: %s\n", hostname)
// 初始化Redis客户端
redisConfig := config.RedisConfig{
fmt.Printf("Redis客户端初始化连接地址: %s, 数据库: %d\n", redisAddr, redisDB)
client = redis.NewClient(&redis.Options{
Addr: redisAddr,
Password: redisPassword,
DB: redisDB,
}
client = redis.NewClient(&redis.Options{
Addr: redisConfig.Addr,
Password: redisConfig.Password,
DB: redisConfig.DB,
})
defer client.Close()
log.Printf("Redis客户端初始化连接地址: %s, 数据库: %d", redisAddr, redisDB)
// 测试Redis连接
_, err = client.Ping(ctx).Result()
if err != nil {
log.Fatalf("无法连接到Redis: %v", err)
fmt.Printf("错误: 无法连接到Redis: %v\n", err)
os.Exit(1)
}
fmt.Println("Redis连接测试成功")
// 订阅频道
hyperoptChannelName := config.HyperoptTasksChannel
fmt.Printf("开始订阅%s频道\n", hyperoptChannelName)
subscribeToTasks()
fmt.Println("客户端初始化完成")
fmt.Println("客户端初始化完成,等待接收任务...")
fmt.Println("按Ctrl+C退出")
// 保持主协程运行,定期输出等待信息
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
// 等待中断信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
// 主循环
for {
select {
case <-quit:
fmt.Println("接收到中断信号,开始关闭客户端")
fmt.Println("正在关闭客户端...")
wg.Wait()
fmt.Println("客户端已关闭")
return
case <-ticker.C:
fmt.Println("等待接收任务消息...")
}
}
log.Println("Redis连接测试成功")
log.Println("客户端初始化完成")
}