package task import ( "context" onlinev1 "git.gz.internal.jumaiyx.cn/jm/jmproto/online/v1" v2 "git.gz.internal.jumaiyx.cn/jm/jmproto/room/v2" "git.gz.internal.jumaiyx.cn/pkg/client" configv2 "git.gz.internal.jumaiyx.cn/pkg/config/v2" "git.gz.internal.jumaiyx.cn/pkg/config/v2/cproto" k8sclient "git.gz.internal.jumaiyx.cn/pkg/k8s-client/v2" "git.gz.internal.jumaiyx.cn/pkg/webhook/wechat" "github.com/redis/go-redis/v9" "strconv" "sync" "time" ) const ( hotKey = "room:hot:ranking" ) var ( redisInstance *redis.Client once sync.Once ) func NewRedis() *redis.Client { once.Do(func() { redisConfig := &cproto.Redis{} _ = configv2.Get(configv2.Redis, redisConfig) redisInstance = redis.NewClient(&redis.Options{ Addr: redisConfig.Addr, Password: redisConfig.Password, PoolSize: 10, ReadTimeout: 100 * time.Second, WriteTimeout: 100 * time.Second, }) }) return redisInstance } func NewTask() { start := time.Now().Unix() err := task(0, 10000) times := time.Now().Unix() - start if times == 0 { times = 1 } wechatHook(times, err) } func task(start, end int64) error { ctx := context.Background() redisClient := NewRedis() //defer func() { redisClient.Close() }() result, err := redisClient.ZRangeWithScores(ctx, hotKey, start, end).Result() if err != nil { return err } if len(result) == 0 { return nil } var ( members []redis.Z ids []int64 ) for _, val := range result { if val.Score > 0 { id, _ := strconv.Atoi(val.Member.(string)) if id != 0 { ids = append(ids, int64(id)) members = append(members, redis.Z{ Member: id, Score: 0, }) } } } if len(ids) != 0 { err = redisClient.ZAdd(ctx, hotKey, members...).Err() if err != nil { return err } //liveClient, liveClientClose, err := client.GetLiveClient(ctx) //if err != nil { // return err //} //defer func() { _ = liveClientClose() }() // //onlineTotalResp, err := liveClient.GetLiveOnlineTotal(ctx, &livev1.GetLiveOnlineTotalReq{ // RoomIdList: ids, //}) //if err != nil { // return err //} onlineClient, onlineClientClose, err := client.GetOnlineClient(ctx) if err != nil { return err } defer func() { _ = onlineClientClose() }() onlineTotalResp, err := onlineClient.GetLiveOnlineTotal(ctx, &onlinev1.GetLiveOnlineTotalRequest{ RoomIdList: ids, }) if err != nil { return err } roomClient, roomClientClose, err := client.GetRoomClientV2(ctx) if err != nil { return err } defer func() { _ = roomClientClose() }() onlineMap := onlineTotalResp.Items var items []*v2.IncrRoomHotspotReq_Item for _, id := range ids { online := onlineMap[id] items = append(items, &v2.IncrRoomHotspotReq_Item{ RoomId: id, HotValue: int32(online) * 10, }) } _, err = roomClient.IncrRoomHotspot(ctx, &v2.IncrRoomHotspotReq{ Items: items, }) if err != nil { return err } } // 递增查询 start = end + 1 end = start + 10000 return task(start, end) } func wechatHook(times int64, err error) { if err != nil { envi := "测试" if k8sclient.Environment() == k8sclient.MasterNamespace { envi = "正式" } hook := wechat.NewMarkdown("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=00dd9216-1f50-42be-8fe1-e66640d7bb27"). Title(1, "直播间热度清理").Br(). Text("环境:" + envi).Br(). Text("耗时:" + strconv.Itoa(int(times)) + "s").Br().Text("状态:") if err != nil { hook = hook.FontColor("失败", wechat.Warning).Br().Text("异常:").FontColor(err.Error(), wechat.Warning) } else { hook = hook.FontColor("完成", wechat.Info) } _ = hook.Send() } }