package biz import ( "context" "fmt" v1 "git.gz.internal.jumaiyx.cn/jm/jmproto/room/v1" "git.gz.internal.jumaiyx.cn/job/room-server-clear/internal/util" k8s_client "git.gz.internal.jumaiyx.cn/pkg/k8s-client" "git.gz.internal.jumaiyx.cn/pkg/k8s-client/deployment" "git.gz.internal.jumaiyx.cn/pkg/k8s-client/service" "git.gz.internal.jumaiyx.cn/pkg/log" "github.com/go-kratos/kratos/v2/transport/grpc" "github.com/google/wire" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "strings" "sync" "time" ) const ( limit = 100 outTime = 10 ) var wg sync.WaitGroup // ProviderBizSet is biz providers. var ProviderBizSet = wire.NewSet(NewBiz) type PodData struct { RoomId int64 BranchId int64 Name string StartTime string } type Biz struct { log log.Logger depChan chan []PodData serChan chan []PodData deploy deployment.Deployment ser service.Service } func NewBiz(logger log.Logger) *Biz { return &Biz{ log: logger, depChan: make(chan []PodData, 1), serChan: make(chan []PodData, 1), } } func (biz *Biz) roomServerClient(ctx context.Context) (v1.RoomClient, error) { //discovery, err := kubediscovery.Discovery(ctx, biz.log, base.RoomServiceName, kubediscovery.Namespace(k8s_client.DevNamespace)) discovery, err := grpc.DialInsecure(ctx, grpc.WithEndpoint("127.0.0.1:9000"), grpc.WithTimeout(10*time.Second)) if err != nil { biz.log.Errorf("room client failed:%v", err) return nil, err } return v1.NewRoomClient(discovery), nil } // Check // 定期删除unity的空置容器 func (biz *Biz) Check(ctx context.Context) { deploy, err := k8s_client.NewDeployment(k8s_client.UnityNamespace, biz.log) if err != nil { biz.log.Errorf("Service connect failed:%v", err) return } biz.deploy = deploy serice, err := k8s_client.NewService(k8s_client.UnityNamespace, biz.log) if err != nil { biz.log.Errorf("Service connect failed:%v", err) return } biz.ser = serice wg.Add(4) go func() { defer wg.Done() biz.deployment(ctx) return }() go func() { defer wg.Done() biz.deploymentDel(ctx) return }() go func() { defer wg.Done() biz.service(ctx) }() go func() { defer wg.Done() biz.serviceDel(ctx) }() wg.Wait() } // 获取unity的deployment func (biz *Biz) deployment(ctx context.Context) { var cont string for { rsp, err := biz.deploy.List(ctx, metav1.ListOptions{ Limit: limit, Continue: cont, }) if err != nil { biz.log.Errorf("get deployment fail:%v", err) return } var podDatas []PodData for _, item := range rsp.Items { names := strings.Split(item.Name, "-") podDatas = append(podDatas, PodData{ Name: item.Name, RoomId: util.StringTarnsInt64(names[3]), BranchId: util.StringTarnsInt64(names[4]), StartTime: names[5], }) } if len(podDatas) > 0 { biz.depChan <- podDatas } cont = rsp.Continue if cont == "" { break } } return } func (biz *Biz) deploymentDel(ctx context.Context) { for { select { case pods := <-biz.depChan: var ids []int64 for _, pod := range pods { ids = append(ids, pod.RoomId) } roomMap, err := biz.roomList(ctx, ids) if err != nil { biz.log.Errorf("get room list fail:%v", err) } for _, pod := range pods { if _, ok := roomMap[pod.RoomId]; !ok { fmt.Println(pod.Name) biz.deploy.Delete(ctx, pod.Name) } } case <-time.After(time.Second * outTime): return } } } // 获取unity的service func (biz *Biz) service(ctx context.Context) { var cont string for { rsp, err := biz.ser.List(ctx, metav1.ListOptions{ Limit: limit, Continue: cont, }) if err != nil { biz.log.Errorf("get deployment fail:%v", err) return } var podDatas []PodData for _, item := range rsp.Items { names := strings.Split(item.Name, "-") podDatas = append(podDatas, PodData{ Name: item.Name, RoomId: util.StringTarnsInt64(names[3]), BranchId: util.StringTarnsInt64(names[4]), StartTime: names[5], }) } biz.serChan <- podDatas cont = rsp.Continue if cont == "" { break } } return } func (biz *Biz) serviceDel(ctx context.Context) { for { select { case pods := <-biz.serChan: var ids []int64 for _, pod := range pods { ids = append(ids, pod.RoomId) } roomMap, err := biz.roomList(ctx, ids) if err != nil { biz.log.Errorf("get room list fail:%v", err) } for _, pod := range pods { if _, ok := roomMap[pod.RoomId]; !ok { biz.ser.Delete(ctx, pod.Name) } } case <-time.After(time.Second * outTime): return } } } // 查看房间信息 func (biz *Biz) roomList(ctx context.Context, ids []int64) (map[int64]int32, error) { client, err := biz.roomServerClient(ctx) if err != nil { biz.log.Errorf("room server client fail:%v", err) return nil, err } rsp, err := client.GetRoomList(ctx, &v1.GetRoomListRequest{ Ids: ids, Status: int32(v1.RoomStatus_RoomStatusUp), }) if err != nil { return nil, err } items := make(map[int64]int32) for _, item := range rsp.Data { items[item.RoomId] = 1 } return items, nil }