You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
117 lines
2.7 KiB
117 lines
2.7 KiB
package task |
|
|
|
import ( |
|
"context" |
|
v2 "git.gz.internal.jumaiyx.cn/jm/jmproto/room/v2" |
|
"git.gz.internal.jumaiyx.cn/pkg/client" |
|
configv2 "git.gz.internal.jumaiyx.cn/pkg/config/v2" |
|
"git.gz.internal.jumaiyx.cn/pkg/config/v2/cproto" |
|
k8sclient "git.gz.internal.jumaiyx.cn/pkg/k8s-client/v2" |
|
"git.gz.internal.jumaiyx.cn/pkg/webhook/wechat" |
|
"github.com/redis/go-redis/v9" |
|
"strconv" |
|
"sync" |
|
"time" |
|
) |
|
|
|
const ( |
|
hotKey = "room:hot:ranking" |
|
) |
|
|
|
var ( |
|
redisInstance *redis.Client |
|
once sync.Once |
|
) |
|
|
|
func NewRedis() *redis.Client { |
|
|
|
once.Do(func() { |
|
redisConfig := &cproto.Redis{} |
|
_ = configv2.Get(configv2.Redis, redisConfig) |
|
redisInstance = redis.NewClient(&redis.Options{ |
|
Addr: redisConfig.Addr, |
|
Password: redisConfig.Password, |
|
PoolSize: 10, |
|
ReadTimeout: 100 * time.Second, |
|
WriteTimeout: 100 * time.Second, |
|
}) |
|
}) |
|
return redisInstance |
|
} |
|
|
|
func NewTask() { |
|
start := time.Now().Unix() |
|
err := task(0, 10000) |
|
times := time.Now().Unix() - start |
|
if times == 0 { |
|
times = 1 |
|
} |
|
wechatHook(times, err) |
|
} |
|
|
|
func task(start, end int64) error { |
|
redisClient := NewRedis() |
|
result, err := redisClient.ZRangeWithScores(context.Background(), hotKey, start, end).Result() |
|
if err != nil { |
|
return err |
|
} |
|
if len(result) == 0 { |
|
return nil |
|
} |
|
var ( |
|
members []redis.Z |
|
ids []int64 |
|
) |
|
for _, val := range result { |
|
if val.Score > 0 { |
|
id, _ := strconv.Atoi(val.Member.(string)) |
|
if id != 0 { |
|
ids = append(ids, int64(id)) |
|
members = append(members, redis.Z{ |
|
Member: id, |
|
Score: 0, |
|
}) |
|
} |
|
} |
|
} |
|
if len(ids) != 0 { |
|
err = redisClient.ZAdd(context.Background(), hotKey, members...).Err() |
|
if err != nil { |
|
return err |
|
} |
|
roomClient, roomClientClose, err := client.GetRoomClientV2(context.Background()) |
|
if err != nil { |
|
return err |
|
} |
|
defer func() { _ = roomClientClose() }() |
|
_, err = roomClient.IncrRoomHotspot(context.Background(), &v2.IncrRoomHotspotReq{ |
|
RoomIdList: ids, |
|
HotValue: 0, |
|
}) |
|
if err != nil { |
|
return err |
|
} |
|
} |
|
// 递增查询 |
|
start = end + 1 |
|
end = start + 10000 |
|
|
|
return task(start, end) |
|
} |
|
|
|
func wechatHook(times int64, err error) { |
|
envi := "测试" |
|
if k8sclient.Environment() == k8sclient.MasterNamespace { |
|
envi = "正式" |
|
} |
|
hook := wechat.NewMarkdown("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=00dd9216-1f50-42be-8fe1-e66640d7bb27"). |
|
Title(1, "直播间热度清理").Br(). |
|
Text("环境:" + envi).Br(). |
|
Text("耗时:" + strconv.Itoa(int(times)) + "s").Br().Text("状态:") |
|
if err != nil { |
|
hook = hook.FontColor("失败", wechat.Warning).Br().Text("异常:").FontColor(err.Error(), wechat.Warning) |
|
} else { |
|
hook = hook.FontColor("完成", wechat.Info) |
|
} |
|
_ = hook.Send() |
|
}
|
|
|