You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
183 lines
3.8 KiB
183 lines
3.8 KiB
2 years ago
|
package biz
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
gv1 "git.gz.internal.jumaiyx.cn/jm/jmproto/gift/v1"
|
||
|
v1 "git.gz.internal.jumaiyx.cn/jm/jmproto/room/v1"
|
||
|
"git.gz.internal.jumaiyx.cn/job/room-hot-compute/internal/data"
|
||
|
"git.gz.internal.jumaiyx.cn/job/room-hot-compute/internal/util"
|
||
|
jredis "git.gz.internal.jumaiyx.cn/job/room-hot-compute/pkg/redis"
|
||
|
"git.gz.internal.jumaiyx.cn/pkg/log"
|
||
|
"github.com/go-redis/redis/v8"
|
||
|
"github.com/google/wire"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
NumberKey = "room:number:ranking"
|
||
|
HotKey = "room:hot:ranking"
|
||
|
)
|
||
|
|
||
|
var wg sync.WaitGroup
|
||
|
|
||
|
// ProviderBizSet is biz providers.
|
||
|
var ProviderBizSet = wire.NewSet(NewBiz)
|
||
|
|
||
|
type hotData struct {
|
||
|
roomId int64
|
||
|
number int64
|
||
|
diamond int64
|
||
|
}
|
||
|
type hot struct {
|
||
|
roomIds []int64
|
||
|
list []*hotData
|
||
|
}
|
||
|
|
||
|
type Biz struct {
|
||
|
log log.Logger
|
||
|
room v1.RoomClient
|
||
|
gift gv1.GiftClient
|
||
|
redis *jredis.Cache
|
||
|
|
||
|
hotChan chan *hot
|
||
|
computeChan chan *hotData
|
||
|
}
|
||
|
|
||
|
func NewBiz(logger log.Logger, roomServer v1.RoomClient, gift gv1.GiftClient, data *data.Data) *Biz {
|
||
|
return &Biz{
|
||
|
log: logger,
|
||
|
room: roomServer,
|
||
|
gift: gift,
|
||
|
redis: data.Cache,
|
||
|
hotChan: make(chan *hot, 100),
|
||
|
computeChan: make(chan *hotData, 100),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (biz *Biz) Check(ctx context.Context) {
|
||
|
wg.Add(3)
|
||
|
// 获取去房间人数值
|
||
|
go func() {
|
||
|
defer wg.Done()
|
||
|
biz.getScores(ctx)
|
||
|
}()
|
||
|
|
||
|
// 获取房间开播时间
|
||
|
// 获取房间消费数值
|
||
|
go func() {
|
||
|
defer wg.Done()
|
||
|
biz.getRoomDiamond(ctx)
|
||
|
}()
|
||
|
|
||
|
// 获取固定热度参数值
|
||
|
|
||
|
// 统计热点值
|
||
|
//写入排序
|
||
|
go func() {
|
||
|
defer wg.Done()
|
||
|
biz.statistics(ctx)
|
||
|
}()
|
||
|
|
||
|
wg.Wait()
|
||
|
}
|
||
|
|
||
|
func (biz *Biz) getScores(ctx context.Context) {
|
||
|
// 获取去房间人数值
|
||
|
var (
|
||
|
o int64
|
||
|
)
|
||
|
for {
|
||
|
l := o + 2
|
||
|
scores, err := biz.redis.ZRevRangeWithScores(ctx, NumberKey, o, l)
|
||
|
if err != nil {
|
||
|
biz.log.Errorf("error:%v", err)
|
||
|
return
|
||
|
}
|
||
|
if len(scores) == 0 {
|
||
|
biz.log.Info("Get scores out")
|
||
|
return
|
||
|
}
|
||
|
var (
|
||
|
ids []int64
|
||
|
list []*hotData
|
||
|
)
|
||
|
for _, score := range scores {
|
||
|
roomId := util.StringTarnsInt64(score.Member.(string))
|
||
|
ids = append(ids, roomId)
|
||
|
list = append(list, &hotData{
|
||
|
roomId: roomId,
|
||
|
number: util.Float64IncrTarnsInt64(score.Score),
|
||
|
})
|
||
|
}
|
||
|
biz.hotChan <- &hot{
|
||
|
roomIds: ids,
|
||
|
list: list,
|
||
|
}
|
||
|
o = l + 1
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (biz *Biz) getRoomDiamond(ctx context.Context) {
|
||
|
for {
|
||
|
select {
|
||
|
case h := <-biz.hotChan:
|
||
|
// 获取房间开播时间
|
||
|
room, err := biz.room.ListRoom(ctx, &v1.ListRoomRequest{
|
||
|
Status: 1,
|
||
|
Ids: h.roomIds,
|
||
|
})
|
||
|
if err != nil {
|
||
|
biz.log.Errorf("error:%v", err)
|
||
|
return
|
||
|
}
|
||
|
roomNumber := make(map[int64]int64, len(h.list))
|
||
|
for _, v := range h.list {
|
||
|
roomNumber[v.roomId] = v.number
|
||
|
}
|
||
|
for _, r := range room.Data {
|
||
|
// 获取房间消费数值
|
||
|
startTime := util.TraStrToUnix(r.StartTime)
|
||
|
gift, err := biz.gift.QueryGiveRoomGift(ctx, &gv1.QueryGiveRoomGiftReq{
|
||
|
Uid: 0,
|
||
|
RoomIdList: []int64{r.Id},
|
||
|
CreateAtStart: &startTime,
|
||
|
})
|
||
|
if err != nil {
|
||
|
biz.log.Errorf("error:%v", err)
|
||
|
continue
|
||
|
}
|
||
|
biz.computeChan <- &hotData{
|
||
|
roomId: r.Id,
|
||
|
number: roomNumber[r.Id],
|
||
|
diamond: gift.TotalAmountDiamond,
|
||
|
}
|
||
|
}
|
||
|
case <-time.After(time.Second * 10):
|
||
|
biz.log.Info("Get room diamond out")
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// 实时人数 * 10 + 流水 * 10 - 自定义值 = 热播
|
||
|
// 初始热度分 + 用户交互产生的热度分 – 随时间衰减的热度分
|
||
|
func (biz *Biz) statistics(ctx context.Context) {
|
||
|
for {
|
||
|
select {
|
||
|
case h := <-biz.computeChan:
|
||
|
_, err := biz.redis.ZAdd(ctx, HotKey, &redis.Z{
|
||
|
Member: h.roomId,
|
||
|
Score: float64(h.number*10 + h.diamond*10),
|
||
|
})
|
||
|
if err != nil {
|
||
|
biz.log.Errorf("Set room hot ranking err:%v", err)
|
||
|
continue
|
||
|
}
|
||
|
case <-time.After(time.Second * 10):
|
||
|
biz.log.Info("Statistics out")
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|