|
|
|
package task
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
v2 "git.gz.internal.jumaiyx.cn/jm/jmproto/room/v2"
|
|
|
|
"git.gz.internal.jumaiyx.cn/pkg/client"
|
|
|
|
configv2 "git.gz.internal.jumaiyx.cn/pkg/config/v2"
|
|
|
|
"git.gz.internal.jumaiyx.cn/pkg/config/v2/cproto"
|
|
|
|
k8sclient "git.gz.internal.jumaiyx.cn/pkg/k8s-client/v2"
|
|
|
|
"git.gz.internal.jumaiyx.cn/pkg/webhook/wechat"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
|
|
"strconv"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
hotKey = "room:hot:ranking"
|
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
redisInstance *redis.Client
|
|
|
|
once sync.Once
|
|
|
|
)
|
|
|
|
|
|
|
|
func NewRedis() *redis.Client {
|
|
|
|
|
|
|
|
once.Do(func() {
|
|
|
|
redisConfig := &cproto.Redis{}
|
|
|
|
_ = configv2.Get(configv2.Redis, redisConfig)
|
|
|
|
redisInstance = redis.NewClient(&redis.Options{
|
|
|
|
Addr: redisConfig.Addr,
|
|
|
|
Password: redisConfig.Password,
|
|
|
|
PoolSize: 10,
|
|
|
|
ReadTimeout: 100 * time.Second,
|
|
|
|
WriteTimeout: 100 * time.Second,
|
|
|
|
})
|
|
|
|
})
|
|
|
|
return redisInstance
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewTask() {
|
|
|
|
start := time.Now().Unix()
|
|
|
|
err := task(0, 10000)
|
|
|
|
times := time.Now().Unix() - start
|
|
|
|
if times == 0 {
|
|
|
|
times = 1
|
|
|
|
}
|
|
|
|
wechatHook(times, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
func task(start, end int64) error {
|
|
|
|
redisClient := NewRedis()
|
|
|
|
result, err := redisClient.ZRange(context.Background(), hotKey, start, end).Result()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if len(result) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
var (
|
|
|
|
members []redis.Z
|
|
|
|
ids []int64
|
|
|
|
)
|
|
|
|
for _, id := range result {
|
|
|
|
i, _ := strconv.Atoi(id)
|
|
|
|
if i != 0 {
|
|
|
|
ids = append(ids, int64(i))
|
|
|
|
}
|
|
|
|
members = append(members, redis.Z{
|
|
|
|
Member: id,
|
|
|
|
Score: 0,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
err = redisClient.ZAdd(context.Background(), hotKey, members...).Err()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
roomClient, roomClientClose, err := client.GetRoomClientV2(context.Background())
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer roomClientClose()
|
|
|
|
_, err = roomClient.IncrRoomHotspot(context.Background(), &v2.IncrRoomHotspotReq{
|
|
|
|
RoomIdList: ids,
|
|
|
|
HotValue: 0,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// 递增查询
|
|
|
|
start = end + 1
|
|
|
|
end = start + 10000
|
|
|
|
|
|
|
|
return task(start, end)
|
|
|
|
}
|
|
|
|
|
|
|
|
func wechatHook(times int64, err error) {
|
|
|
|
envi := "测试"
|
|
|
|
if k8sclient.Environment() == k8sclient.MasterNamespace {
|
|
|
|
envi = "正式"
|
|
|
|
}
|
|
|
|
hook := wechat.NewMarkdown("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=00dd9216-1f50-42be-8fe1-e66640d7bb27").
|
|
|
|
Title(1, "直播间热度清理").Br().
|
|
|
|
Text("环境:" + envi).Br().
|
|
|
|
Text("耗时:" + strconv.Itoa(int(times)) + "s").Br().Text("状态:")
|
|
|
|
if err != nil {
|
|
|
|
hook = hook.FontColor("失败", wechat.Warning).Br().Text("异常:").FontColor(err.Error(), wechat.Warning)
|
|
|
|
} else {
|
|
|
|
hook = hook.FontColor("完成", wechat.Info)
|
|
|
|
}
|
|
|
|
_ = hook.Send()
|
|
|
|
}
|