package biz import ( "context" gv1 "git.gz.internal.jumaiyx.cn/jm/jmproto/gift/v1" v1 "git.gz.internal.jumaiyx.cn/jm/jmproto/room/v1" "git.gz.internal.jumaiyx.cn/job/room-hot-compute/internal/data" "git.gz.internal.jumaiyx.cn/job/room-hot-compute/internal/util" jredis "git.gz.internal.jumaiyx.cn/job/room-hot-compute/pkg/redis" "git.gz.internal.jumaiyx.cn/pkg/log" "github.com/go-redis/redis/v8" "github.com/google/wire" "sync" "time" ) const ( NumberKey = "room:number:ranking" HotKey = "room:hot:ranking" ) var wg sync.WaitGroup // ProviderBizSet is biz providers. var ProviderBizSet = wire.NewSet(NewBiz) type hotData struct { roomId int64 number int64 diamond int64 } type hot struct { roomIds []int64 list []*hotData } type Biz struct { log log.Logger room v1.RoomClient gift gv1.GiftClient redis *jredis.Cache hotChan chan *hot computeChan chan *hotData } func NewBiz(logger log.Logger, roomServer v1.RoomClient, gift gv1.GiftClient, data *data.Data) *Biz { return &Biz{ log: logger, room: roomServer, gift: gift, redis: data.Cache, hotChan: make(chan *hot, 100), computeChan: make(chan *hotData, 100), } } func (biz *Biz) Check(ctx context.Context) { wg.Add(3) // 获取去房间人数值 go func() { defer wg.Done() biz.getScores(ctx) }() // 获取房间开播时间 // 获取房间消费数值 go func() { defer wg.Done() biz.getRoomDiamond(ctx) }() // 获取固定热度参数值 // 统计热点值 //写入排序 go func() { defer wg.Done() biz.statistics(ctx) }() wg.Wait() } func (biz *Biz) getScores(ctx context.Context) { // 获取去房间人数值 var ( o int64 ) for { l := o + 2 scores, err := biz.redis.ZRevRangeWithScores(ctx, NumberKey, o, l) if err != nil { biz.log.Errorf("error:%v", err) return } if len(scores) == 0 { biz.log.Info("Get scores out") return } var ( ids []int64 list []*hotData ) for _, score := range scores { roomId := util.StringTarnsInt64(score.Member.(string)) ids = append(ids, roomId) list = append(list, &hotData{ roomId: roomId, number: util.Float64IncrTarnsInt64(score.Score), }) } biz.hotChan <- &hot{ roomIds: ids, list: list, } o = l + 1 } } func (biz *Biz) getRoomDiamond(ctx context.Context) { for { select { case h := <-biz.hotChan: // 获取房间开播时间 room, err := biz.room.ListRoom(ctx, &v1.ListRoomRequest{ Status: 1, Ids: h.roomIds, }) if err != nil { biz.log.Errorf("error:%v", err) return } roomNumber := make(map[int64]int64, len(h.list)) for _, v := range h.list { roomNumber[v.roomId] = v.number } for _, r := range room.Data { // 获取房间消费数值 startTime := util.TraStrToUnix(r.StartTime) gift, err := biz.gift.QueryGiveRoomGift(ctx, &gv1.QueryGiveRoomGiftReq{ Uid: 0, RoomIdList: []int64{r.Id}, CreateAtStart: &startTime, }) if err != nil { biz.log.Errorf("error:%v", err) continue } biz.computeChan <- &hotData{ roomId: r.Id, number: roomNumber[r.Id], diamond: gift.TotalAmountDiamond, } } case <-time.After(time.Second * 10): biz.log.Info("Get room diamond out") return } } } // 实时人数 * 10 + 流水 * 10 - 自定义值 = 热播 // 初始热度分 + 用户交互产生的热度分 – 随时间衰减的热度分 func (biz *Biz) statistics(ctx context.Context) { for { select { case h := <-biz.computeChan: _, err := biz.redis.ZAdd(ctx, HotKey, &redis.Z{ Member: h.roomId, Score: float64(h.number*10 + h.diamond*10), }) if err != nil { biz.log.Errorf("Set room hot ranking err:%v", err) continue } case <-time.After(time.Second * 10): biz.log.Info("Statistics out") return } } }