package biz import ( "context" "fmt" gv1 "git.gz.internal.jumaiyx.cn/jm/jmproto/gift/v1" mv1 "git.gz.internal.jumaiyx.cn/jm/jmproto/message/v1" "git.gz.internal.jumaiyx.cn/job/room-mike-hot-timer/internal/data/model" idencode "git.gz.internal.jumaiyx.cn/pkg/id-encode" "git.gz.internal.jumaiyx.cn/pkg/log" "github.com/pkg/errors" "time" ) var ( waitErr = errors.New("wait 5s") ) type MikeHotTimer struct { log log.Logger rmRepo RoomMikeRepo gift gv1.GiftClient message mv1.MessageClient ide idencode.IDEncoding } func NewMikeHotTimer(log log.Logger, rmRepo RoomMikeRepo, gift gv1.GiftClient, message mv1.MessageClient, ide idencode.IDEncoding, ) *MikeHotTimer { return &MikeHotTimer{ log: log, rmRepo: rmRepo, gift: gift, message: message, ide: ide, } } func (mk *MikeHotTimer) Run() { for { err := mk.Handle() if errors.Is(err, waitErr) { fmt.Println("waiting") time.Sleep(time.Second * 5) } } } func (mk *MikeHotTimer) Handle() error { ctx := context.TODO() // 获取需要推送的麦号 hot, err := mk.rmRepo.GetMikeHotPop(ctx) if err != nil { time.Sleep(time.Second * 5) mk.log.Errorf("Get mike hot pop failed:%v", err) return err } // 操作过快熔断 if hot == nil || hot.MikeId == 0 { return waitErr } if (hot.Timer + 5) > time.Now().Unix() { err = mk.rmRepo.PushMikeHot(ctx, hot) if err != nil { mk.log.Errorf("Create mike hot failed:%v", err) } return waitErr } // 查看是否已经被删除的麦号 mikeHot, ext, err := mk.rmRepo.GetMikeHot(ctx, hot.RoomId, hot.MikeId) if err != nil { mk.log.Errorf("Ext delete mike hot id failed:%v", err) err = mk.rmRepo.PushMikeHot(ctx, hot) return err } if !ext { fmt.Println("已经被删除") if err != nil { mk.log.Errorf("Create mike hot failed:%v", err) } return nil } if mikeHot.StartTime != hot.StartTime { fmt.Println("清楚旧的队列") if err != nil { mk.log.Errorf("Create mike hot failed:%v", err) } return nil } // 获取消费总额 t := mikeHot.StartTime * 1000 giftRsp, err := mk.gift.QueryGiveRoomGift(ctx, &gv1.QueryGiveRoomGiftReq{ RoomIdList: []int64{mikeHot.RoomId}, AcceptGiftUidList: []int64{mikeHot.Uid}, CreateAtStart: &t, }) if err != nil { mk.log.Errorf("Delete mike hot failed:%v", err) err = mk.rmRepo.PushMikeHot(ctx, hot) if err != nil { mk.log.Errorf("Create mike hot failed:%v", err) } } mikeHot.Timer = time.Now().Unix() if mikeHot.Total == giftRsp.TotalAmountDiamond { err = mk.rmRepo.PushMikeHot(ctx, mikeHot) if err != nil { mk.log.Errorf("Create mike hot failed:%v", err) } return nil } mikeHot.Total = giftRsp.TotalAmountDiamond // 超过两天重新检查一下mike是否存在 if mikeHot.StartTime+172800 <= time.Now().Unix() { mike, err := mk.rmRepo.First(ctx, &RoomMikeCond{ Id: mikeHot.MikeId, }) if err != nil { mk.log.Errorf("Get mike first failed:%v", err) err = mk.rmRepo.PushMikeHot(ctx, mikeHot) return err } if mike == nil || mike.ID == 0 { mk.log.Error("Get mike first null") mk.rmRepo.DeleteMikeHot(ctx, mikeHot.RoomId, mikeHot.MikeId) return errors.New("Get mike first null") } } _, err = mk.message.RoomPush(ctx, &mv1.RoomPushRequest{ Code: mv1.RoomPushCode_HotCounter, RoomId: mikeHot.RoomId, Uid: mikeHot.Uid, HotCounter: &mv1.RoomPushRequest_HotCounter{ MikeId: mikeHot.MikeId, RoomId: mikeHot.RoomId, Uid: mk.ide.Encode(mikeHot.Uid), MikeNo: mikeHot.MikeNo, Number: mikeHot.Total, }, }) if err != nil { mk.log.Errorf("Push failed:%v", err) } mikeHot.Timer = time.Now().Unix() err = mk.createMikeHot(ctx, mikeHot) if err != nil { mk.log.Errorf("Create mike hot failed:%v", err) } return nil } func (mk *MikeHotTimer) createMikeHot(ctx context.Context, hot *model.MikeUpHot) error { if mk.rmRepo.SetMikeHotLock(ctx, hot.RoomId, hot.Uid) { err := mk.rmRepo.CreateMikeHot(ctx, hot, true) if err != nil { mk.log.Errorf("Create mike hot failed:%v", err) return err } } return nil }