package biz import ( "context" v1 "git.gz.internal.jumaiyx.cn/jm/jmproto/room/v1" "git.gz.internal.jumaiyx.cn/job/room-server-clear/internal/util" "git.gz.internal.jumaiyx.cn/job/room-server-clear/pkg/room" k8s_client "git.gz.internal.jumaiyx.cn/pkg/k8s-client" "git.gz.internal.jumaiyx.cn/pkg/k8s-client/deployment" "git.gz.internal.jumaiyx.cn/pkg/k8s-client/service" "git.gz.internal.jumaiyx.cn/pkg/log" "github.com/google/wire" "strings" "sync" "time" ) var wg sync.WaitGroup // ProviderBizSet is biz providers. var ProviderBizSet = wire.NewSet(NewBiz) type PodData struct { RoomId int64 BranchId int64 Name string } type Biz struct { log log.Logger room *room.RoomService depChan chan PodData serChan chan PodData deploy deployment.Deployment ser service.Service } func NewBiz(logger log.Logger, roomServer *room.RoomService) *Biz { return &Biz{ log: logger, depChan: make(chan PodData, 1000), serChan: make(chan PodData, 1000), room: roomServer, } } func (biz *Biz) Check(ctx context.Context) { deploy, err := k8s_client.NewDeployment(k8s_client.UnityNamespace, biz.log) if err != nil { biz.log.Errorf("Service connect failed:%v", err) return } ser, err := k8s_client.NewService(k8s_client.UnityNamespace, biz.log) if err != nil { biz.log.Errorf("Service connect failed:%v", err) return } biz.deploy = deploy biz.ser = ser wg.Add(3) go func() { defer wg.Done() biz.Deployment(ctx) }() go func() { defer wg.Done() biz.Service(ctx) }() go func() { defer wg.Done() biz.Room(ctx) }() wg.Wait() biz.log.Info("Task done") } func (biz *Biz) Deployment(ctx context.Context) { deployments, err := biz.deploy.List(ctx) if err != nil { biz.log.Errorf("Get pod list failed:%v", err) return } for _, v := range deployments { names := strings.Split(v.Name, "-") pn := PodData{ Name: v.Name, } if len(names) == 5 { pn.RoomId = util.StringTarnsInt64(names[3]) pn.BranchId = util.StringTarnsInt64(names[4]) } select { case biz.depChan <- pn: case <-time.After(30 * time.Second): //最多等到30秒,避免死锁 biz.log.Info("deployment quit") return } } } func (biz *Biz) Service(ctx context.Context) { services, err := biz.ser.List(ctx) if err != nil { biz.log.Errorf("Get pod list failed:%v", err) return } for _, v := range services { names := strings.Split(v.Name, "-") pn := PodData{ Name: v.Name, } if len(names) == 5 { pn.RoomId = util.StringTarnsInt64(names[3]) pn.BranchId = util.StringTarnsInt64(names[4]) } select { case biz.serChan <- pn: case <-time.After(30 * time.Second): //最多等到30秒,避免死锁 biz.log.Info("service quit") return } } } func (biz *Biz) Room(ctx context.Context) { for { select { case dep := <-biz.depChan: err := biz.del(ctx, dep, func(name string) error { biz.log.Infof("deployment name:%s", name) return biz.deploy.Delete(ctx, name) }) if err != nil { biz.log.Errorf("Delete deployment failed:%v", err) } continue case ser := <-biz.serChan: err := biz.del(ctx, ser, func(name string) error { biz.log.Infof("service name:%s", name) return biz.ser.Delete(ctx, name) }) if err != nil { biz.log.Errorf("Delete deployment failed:%v", err) } continue case <-time.After(30 * time.Second): //最多等到30秒,避免死锁 biz.log.Info("room quit") return } } } func (biz *Biz) del(ctx context.Context, p PodData, del func(string) error) error { // 无房间直接删除 if p.RoomId == 0 { return del(p.Name) } r, err := biz.room.GetRoom(ctx, &v1.GetRoomRequest{ RoomId: p.RoomId, }) if err != nil { biz.log.Errorf("Get room failed:%v", err) return err } // 删除不存在的房间 if r.RoomId == 0 || r.Status != 1 { return del(p.Name) } // 删除不存在的支线 var flag bool for _, branch := range r.Branches { if branch.BranchId == p.BranchId { flag = true } } if !flag { return del(p.Name) } return nil }