房间麦克风热点计数
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

167 lines
4.0 KiB

package biz
import (
"context"
"fmt"
gv1 "git.gz.internal.jumaiyx.cn/jm/jmproto/gift/v1"
mv1 "git.gz.internal.jumaiyx.cn/jm/jmproto/message/v1"
"git.gz.internal.jumaiyx.cn/job/room-mike-hot-timer/internal/data/model"
idencode "git.gz.internal.jumaiyx.cn/pkg/id-encode"
"git.gz.internal.jumaiyx.cn/pkg/log"
"github.com/pkg/errors"
"time"
)
var (
waitErr = errors.New("wait 5s")
)
type MikeHotTimer struct {
log log.Logger
rmRepo RoomMikeRepo
gift gv1.GiftClient
message mv1.MessageClient
ide idencode.IDEncoding
}
func NewMikeHotTimer(log log.Logger,
rmRepo RoomMikeRepo,
gift gv1.GiftClient,
message mv1.MessageClient,
ide idencode.IDEncoding,
) *MikeHotTimer {
return &MikeHotTimer{
log: log,
rmRepo: rmRepo,
gift: gift,
message: message,
ide: ide,
}
}
func (mk *MikeHotTimer) Run() {
for {
err := mk.Handle()
if errors.Is(err, waitErr) {
fmt.Println("waiting")
time.Sleep(time.Second * 5)
}
}
}
func (mk *MikeHotTimer) Handle() error {
ctx := context.TODO()
// 获取需要推送的麦号
hot, err := mk.rmRepo.GetMikeHotPop(ctx)
if err != nil {
time.Sleep(time.Second * 5)
mk.log.Errorf("Get mike hot pop failed:%v", err)
return err
}
// 操作过快熔断
if hot == nil || hot.MikeId == 0 {
return waitErr
}
if (hot.Timer + 5) > time.Now().Unix() {
err = mk.rmRepo.PushMikeHot(ctx, hot)
if err != nil {
mk.log.Errorf("Create mike hot failed:%v", err)
}
return waitErr
}
// 查看是否已经被删除的麦号
mikeHot, ext, err := mk.rmRepo.GetMikeHot(ctx, hot.RoomId, hot.MikeId)
if err != nil {
mk.log.Errorf("Ext delete mike hot id failed:%v", err)
err = mk.rmRepo.PushMikeHot(ctx, hot)
return err
}
if !ext {
fmt.Println("已经被删除")
if err != nil {
mk.log.Errorf("Create mike hot failed:%v", err)
}
return nil
}
if mikeHot.StartTime != hot.StartTime {
fmt.Println("清楚旧的队列")
if err != nil {
mk.log.Errorf("Create mike hot failed:%v", err)
}
return nil
}
// 获取消费总额
t := mikeHot.StartTime * 1000
giftRsp, err := mk.gift.QueryGiveRoomGift(ctx, &gv1.QueryGiveRoomGiftReq{
RoomIdList: []int64{mikeHot.RoomId},
AcceptGiftUidList: []int64{mikeHot.Uid},
CreateAtStart: &t,
})
if err != nil {
mk.log.Errorf("Delete mike hot failed:%v", err)
err = mk.rmRepo.PushMikeHot(ctx, hot)
if err != nil {
mk.log.Errorf("Create mike hot failed:%v", err)
}
}
mikeHot.Timer = time.Now().Unix()
if mikeHot.Total == giftRsp.TotalAmountDiamond {
err = mk.rmRepo.PushMikeHot(ctx, mikeHot)
if err != nil {
mk.log.Errorf("Create mike hot failed:%v", err)
}
return nil
}
mikeHot.Total = giftRsp.TotalAmountDiamond
// 超过两天重新检查一下mike是否存在
if mikeHot.StartTime+172800 <= time.Now().Unix() {
mike, err := mk.rmRepo.First(ctx, &RoomMikeCond{
Id: mikeHot.MikeId,
})
if err != nil {
mk.log.Errorf("Get mike first failed:%v", err)
err = mk.rmRepo.PushMikeHot(ctx, mikeHot)
return err
}
if mike == nil || mike.ID == 0 {
mk.log.Error("Get mike first null")
mk.rmRepo.DeleteMikeHot(ctx, mikeHot.RoomId, mikeHot.MikeId)
return errors.New("Get mike first null")
}
}
_, err = mk.message.RoomPush(ctx, &mv1.RoomPushRequest{
Code: mv1.RoomPushCode_HotCounter,
RoomId: mikeHot.RoomId,
Uid: mikeHot.Uid,
HotCounter: &mv1.RoomPushRequest_HotCounter{
MikeId: mikeHot.MikeId,
RoomId: mikeHot.RoomId,
Uid: mk.ide.Encode(mikeHot.Uid),
MikeNo: mikeHot.MikeNo,
Number: mikeHot.Total,
},
})
if err != nil {
mk.log.Errorf("Push failed:%v", err)
}
mikeHot.Timer = time.Now().Unix()
err = mk.createMikeHot(ctx, mikeHot)
if err != nil {
mk.log.Errorf("Create mike hot failed:%v", err)
}
return nil
}
func (mk *MikeHotTimer) createMikeHot(ctx context.Context, hot *model.MikeUpHot) error {
if mk.rmRepo.SetMikeHotLock(ctx, hot.RoomId, hot.Uid) {
err := mk.rmRepo.CreateMikeHot(ctx, hot, true)
if err != nil {
mk.log.Errorf("Create mike hot failed:%v", err)
return err
}
}
return nil
}