热播队列处理
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

196 lines
5.6 KiB

1 year ago
package task
import (
"context"
"fmt"
giftv1 "git.gz.internal.jumaiyx.cn/jm/jmproto/gift/v1"
livev1 "git.gz.internal.jumaiyx.cn/jm/jmproto/live/v1"
v2 "git.gz.internal.jumaiyx.cn/jm/jmproto/room/v2"
"git.gz.internal.jumaiyx.cn/pkg/client"
configv2 "git.gz.internal.jumaiyx.cn/pkg/config/v2"
"git.gz.internal.jumaiyx.cn/pkg/config/v2/cproto"
"git.gz.internal.jumaiyx.cn/pkg/jtime"
"github.com/redis/go-redis/v9"
"strconv"
"sync"
"time"
)
const (
hotspotQueueKey = "room:hotspot:sort"
hotKey = "room:hot:ranking"
)
var (
redisInstance *redis.Client
once sync.Once
)
func NewRedis() *redis.Client {
redisConfig := &cproto.Redis{}
err := configv2.Get(context.Background(), configv2.Redis, redisConfig)
if err != nil {
}
once.Do(func() {
redisInstance = redis.NewClient(&redis.Options{
Addr: redisConfig.Addr,
Password: redisConfig.Password,
PoolSize: 10,
ReadTimeout: 100 * time.Second,
WriteTimeout: 100 * time.Second,
})
})
return redisInstance
}
func NewTask() {
for {
redisClient := NewRedis()
ctx := context.Background()
lLen := redisClient.LLen(ctx, hotspotQueueKey).Val()
if lLen == 0 {
1 year ago
//fmt.Println("等待")
1 year ago
time.Sleep(time.Second * 10)
continue
1 year ago
}
result, err := redisClient.RPop(ctx, hotspotQueueKey).Result()
if err != nil {
continue
}
id, _ := strconv.Atoi(result)
if id == 0 {
continue
}
roomId := int64(id)
var (
value int64 // 房间初始热度值
currentOnlineTotal int64 // 实时在线人数总值
todayOnlineTotal int64 // 一天的人数总值
amountTotal int64 // 金额总数值
//lastAmountTotal int64 // 上次金额总值
)
now := time.Now()
// 获取房间信息
1 year ago
roomClientV2, roomClientV2Close, err := client.GetRoomClientV2(ctx)
1 year ago
//roomClientV2, err := client.GetRoomClientLocalV2("127.0.0.1:9001")
if err != nil {
1 year ago
fmt.Println("获取房间服务失败:", err.Error())
continue
1 year ago
}
1 year ago
defer roomClientV2Close()
1 year ago
roomRsp, err := roomClientV2.GetRoom(ctx, &v2.GetRoomReq{
RoomId: roomId,
})
if err != nil {
1 year ago
fmt.Println("获取房间失败:", err.Error())
1 year ago
continue
}
// 忽略下播的房间和广场房间
if roomRsp.Status != v2.RoomStatus_RoomStatusUp || roomRsp.RoomType == v2.RoomType_Hall {
1 year ago
continue
1 year ago
}
// 获取房间
// 获取房间初始值
value = 100
1 year ago
liveClient, liveClientClose, err := client.GetLiveClient(ctx)
1 year ago
if err != nil {
1 year ago
fmt.Println("获取直播服务失败:", err.Error())
1 year ago
continue
}
1 year ago
defer liveClientClose()
defaultHotValueResp, err := liveClient.GetDefaultHotValueList(ctx, &livev1.GetDefaultHotValueListReq{
1 year ago
RoomList: []int64{roomId},
})
if err != nil {
1 year ago
fmt.Println("获取热度默认值失败:", err.Error())
1 year ago
continue
}
if len(defaultHotValueResp.Items) != 0 {
value = int64(defaultHotValueResp.Items[0].HotValue)
}
// 获取房间在线人数
onlineCountRsp, err := roomClientV2.GetOnlineListCount(ctx, &v2.GetOnlineListReq{
RoomIdList: []int64{roomId},
StatusList: []int32{1},
})
if err != nil {
1 year ago
fmt.Println("获取在线总数失败:", err.Error())
1 year ago
continue
}
currentOnlineTotal = onlineCountRsp.Total
// 一天之前的时间
//todayMinutesSecond := (now.Unix() - 10*60) * 1000
todayMinutes := jtime.TimeTimestamp(jtime.SecondsAgoTime(now, -(86400)))
// 一个星期前的时间
prevWeekTime := jtime.SecondsAgoTime(time.Now(), -86400*7)
startTime := prevWeekTime.UnixMilli()
// 获取房间总流水
1 year ago
giftClient, giftClientClose, err := client.GetGiftClient(ctx)
1 year ago
if err != nil {
1 year ago
fmt.Println("获取礼物服务失败:", err.Error())
continue
1 year ago
}
1 year ago
defer giftClientClose()
1 year ago
giftRsp, err := giftClient.QueryGiveRoomGift(ctx, &giftv1.QueryGiveRoomGiftReq{
RoomIdList: []int64{roomId},
CreateAtStart: &startTime,
})
if err != nil {
1 year ago
fmt.Println("获取礼物数据失败:", err.Error())
continue
1 year ago
}
amountTotal = giftRsp.TotalAmountDiamond + giftRsp.TotalAmountGoldCoins
// 获取一天的总人数
lastOnlineCountRsp, err := roomClientV2.GetOnlineListCount(ctx, &v2.GetOnlineListReq{
RoomIdList: []int64{roomId},
CreatedStart: todayMinutes,
Deleted: true,
})
if err != nil {
1 year ago
fmt.Println("获取一天总人数失败:", err.Error())
continue
1 year ago
}
todayOnlineTotal = lastOnlineCountRsp.Total
// 初始热度值 + (一个小时内总人数 * 10 + 一个星期内总流水) - 时间段内((十分钟之前总数 - 现在总人数) + (十分钟之前总流水 - 总流水【一个星期内】)) = 热度值
// 总人数【一天内】+(实时在线人数/总人数)* 10 = 人数热度值
// 总流水【一个星期内】 * 10 = 流水热度值
//currentOnlineTotal + (todayOnlineTotal * todayOnlineTotal) * 10
1 year ago
//fmt.Println("amountTotal:", (todayOnlineTotal*todayOnlineTotal)*10)
//fmt.Println("amountTotal:", amountTotal*10)
1 year ago
v := value + currentOnlineTotal + (todayOnlineTotal*todayOnlineTotal)*10 + amountTotal*10
1 year ago
//fmt.Println(v)
1 year ago
//usecase.logger.Info("实际人数:", onlineTotal)
//usecase.logger.Info("总流水:", amountTotal)
//usecase.logger.Info("十分钟之前总数:", lastOnlineTotal)
//usecase.logger.Info("十分钟之前总流水:", lastAmountTotal)
//usecase.logger.Info("热度值:", v)
1 year ago
err = redisClient.ZAdd(ctx, hotKey, redis.Z{
1 year ago
Member: roomId,
Score: float64(v),
}).Err()
if err != nil {
1 year ago
fmt.Println("写入热度值失败:", err.Error())
1 year ago
continue
}
// 写入数据库
hotValue := int32(v)
_, err = roomClientV2.UpdateRoom(ctx, &v2.UpdateRoomReq{
Id: roomId,
Data: &v2.UpdateRoomReq_Data{
HotValue: &hotValue,
},
})
if err != nil {
continue
}
}
}