房间热播计算任务
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

182 lines
3.8 KiB

package biz
import (
"context"
gv1 "git.gz.internal.jumaiyx.cn/jm/jmproto/gift/v1"
v1 "git.gz.internal.jumaiyx.cn/jm/jmproto/room/v1"
"git.gz.internal.jumaiyx.cn/job/room-hot-compute/internal/data"
"git.gz.internal.jumaiyx.cn/job/room-hot-compute/internal/util"
jredis "git.gz.internal.jumaiyx.cn/job/room-hot-compute/pkg/redis"
"git.gz.internal.jumaiyx.cn/pkg/log"
"github.com/go-redis/redis/v8"
"github.com/google/wire"
"sync"
"time"
)
const (
NumberKey = "room:number:ranking"
HotKey = "room:hot:ranking"
)
var wg sync.WaitGroup
// ProviderBizSet is biz providers.
var ProviderBizSet = wire.NewSet(NewBiz)
type hotData struct {
roomId int64
number int64
diamond int64
}
type hot struct {
roomIds []int64
list []*hotData
}
type Biz struct {
log log.Logger
room v1.RoomClient
gift gv1.GiftClient
redis *jredis.Cache
hotChan chan *hot
computeChan chan *hotData
}
func NewBiz(logger log.Logger, roomServer v1.RoomClient, gift gv1.GiftClient, data *data.Data) *Biz {
return &Biz{
log: logger,
room: roomServer,
gift: gift,
redis: data.Cache,
hotChan: make(chan *hot, 100),
computeChan: make(chan *hotData, 100),
}
}
func (biz *Biz) Check(ctx context.Context) {
wg.Add(3)
// 获取去房间人数值
go func() {
defer wg.Done()
biz.getScores(ctx)
}()
// 获取房间开播时间
// 获取房间消费数值
go func() {
defer wg.Done()
biz.getRoomDiamond(ctx)
}()
// 获取固定热度参数值
// 统计热点值
//写入排序
go func() {
defer wg.Done()
biz.statistics(ctx)
}()
wg.Wait()
}
func (biz *Biz) getScores(ctx context.Context) {
// 获取去房间人数值
var (
o int64
)
for {
l := o + 2
scores, err := biz.redis.ZRevRangeWithScores(ctx, NumberKey, o, l)
if err != nil {
biz.log.Errorf("error:%v", err)
return
}
if len(scores) == 0 {
biz.log.Info("Get scores out")
return
}
var (
ids []int64
list []*hotData
)
for _, score := range scores {
roomId := util.StringTarnsInt64(score.Member.(string))
ids = append(ids, roomId)
list = append(list, &hotData{
roomId: roomId,
number: util.Float64IncrTarnsInt64(score.Score),
})
}
biz.hotChan <- &hot{
roomIds: ids,
list: list,
}
o = l + 1
}
}
func (biz *Biz) getRoomDiamond(ctx context.Context) {
for {
select {
case h := <-biz.hotChan:
// 获取房间开播时间
room, err := biz.room.ListRoom(ctx, &v1.ListRoomRequest{
Status: 1,
Ids: h.roomIds,
})
if err != nil {
biz.log.Errorf("error:%v", err)
return
}
roomNumber := make(map[int64]int64, len(h.list))
for _, v := range h.list {
roomNumber[v.roomId] = v.number
}
for _, r := range room.Data {
// 获取房间消费数值
startTime := util.TraStrToUnix(r.StartTime)
gift, err := biz.gift.QueryGiveRoomGift(ctx, &gv1.QueryGiveRoomGiftReq{
Uid: 0,
RoomIdList: []int64{r.Id},
CreateAtStart: &startTime,
})
if err != nil {
biz.log.Errorf("error:%v", err)
continue
}
biz.computeChan <- &hotData{
roomId: r.Id,
number: roomNumber[r.Id],
diamond: gift.TotalAmountDiamond,
}
}
case <-time.After(time.Second * 10):
biz.log.Info("Get room diamond out")
return
}
}
}
// 实时人数 * 10 + 流水 * 10 - 自定义值 = 热播
// 初始热度分 + 用户交互产生的热度分 – 随时间衰减的热度分
func (biz *Biz) statistics(ctx context.Context) {
for {
select {
case h := <-biz.computeChan:
_, err := biz.redis.ZAdd(ctx, HotKey, &redis.Z{
Member: h.roomId,
Score: float64(h.number*10 + h.diamond*10),
})
if err != nil {
biz.log.Errorf("Set room hot ranking err:%v", err)
continue
}
case <-time.After(time.Second * 10):
biz.log.Info("Statistics out")
return
}
}
}