unity容器监听
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

189 lines
4.0 KiB

package biz
import (
"context"
v1 "git.gz.internal.jumaiyx.cn/jm/jmproto/room/v1"
"git.gz.internal.jumaiyx.cn/job/room-server-clear/internal/util"
"git.gz.internal.jumaiyx.cn/job/room-server-clear/pkg/room"
k8s_client "git.gz.internal.jumaiyx.cn/pkg/k8s-client"
"git.gz.internal.jumaiyx.cn/pkg/k8s-client/deployment"
"git.gz.internal.jumaiyx.cn/pkg/k8s-client/service"
"git.gz.internal.jumaiyx.cn/pkg/log"
"github.com/google/wire"
"strings"
"sync"
"time"
)
var wg sync.WaitGroup
// ProviderBizSet is biz providers.
var ProviderBizSet = wire.NewSet(NewBiz)
type PodData struct {
RoomId int64
BranchId int64
Name string
}
type Biz struct {
log log.Logger
room *room.RoomService
depChan chan PodData
serChan chan PodData
deploy deployment.Deployment
ser service.Service
}
func NewBiz(logger log.Logger, roomServer *room.RoomService) *Biz {
return &Biz{
log: logger,
depChan: make(chan PodData, 1000),
serChan: make(chan PodData, 1000),
room: roomServer,
}
}
func (biz *Biz) Check(ctx context.Context) {
deploy, err := k8s_client.NewDeployment(k8s_client.UnityNamespace, biz.log)
if err != nil {
biz.log.Errorf("Service connect failed:%v", err)
return
}
ser, err := k8s_client.NewService(k8s_client.UnityNamespace, biz.log)
if err != nil {
biz.log.Errorf("Service connect failed:%v", err)
return
}
biz.deploy = deploy
biz.ser = ser
wg.Add(3)
go func() {
defer wg.Done()
biz.Deployment(ctx)
}()
go func() {
defer wg.Done()
biz.Service(ctx)
}()
go func() {
defer wg.Done()
biz.Room(ctx)
}()
wg.Wait()
biz.log.Info("Task done")
}
func (biz *Biz) Deployment(ctx context.Context) {
deployments, err := biz.deploy.List(ctx)
if err != nil {
biz.log.Errorf("Get pod list failed:%v", err)
return
}
for _, v := range deployments {
names := strings.Split(v.Name, "-")
pn := PodData{
Name: v.Name,
}
if len(names) == 5 {
pn.RoomId = util.StringTarnsInt64(names[3])
pn.BranchId = util.StringTarnsInt64(names[4])
}
select {
case biz.depChan <- pn:
case <-time.After(30 * time.Second): //最多等到30秒,避免死锁
biz.log.Info("deployment quit")
return
}
}
}
func (biz *Biz) Service(ctx context.Context) {
services, err := biz.ser.List(ctx)
if err != nil {
biz.log.Errorf("Get pod list failed:%v", err)
return
}
for _, v := range services {
names := strings.Split(v.Name, "-")
pn := PodData{
Name: v.Name,
}
if len(names) == 5 {
pn.RoomId = util.StringTarnsInt64(names[3])
pn.BranchId = util.StringTarnsInt64(names[4])
}
select {
case biz.serChan <- pn:
case <-time.After(30 * time.Second): //最多等到30秒,避免死锁
biz.log.Info("service quit")
return
}
}
}
func (biz *Biz) Room(ctx context.Context) {
for {
select {
case dep := <-biz.depChan:
err := biz.del(ctx, dep, func(name string) error {
biz.log.Infof("deployment name:%s", name)
return biz.deploy.Delete(ctx, name)
})
if err != nil {
biz.log.Errorf("Delete deployment failed:%v", err)
}
continue
case ser := <-biz.serChan:
err := biz.del(ctx, ser, func(name string) error {
biz.log.Infof("service name:%s", name)
return biz.ser.Delete(ctx, name)
})
if err != nil {
biz.log.Errorf("Delete deployment failed:%v", err)
}
continue
case <-time.After(30 * time.Second): //最多等到30秒,避免死锁
biz.log.Info("room quit")
return
}
}
}
func (biz *Biz) del(ctx context.Context, p PodData, del func(string) error) error {
// 无房间直接删除
if p.RoomId == 0 {
return del(p.Name)
}
r, err := biz.room.GetRoom(ctx, &v1.GetRoomRequest{
RoomId: p.RoomId,
})
if err != nil {
biz.log.Errorf("Get room failed:%v", err)
return err
}
// 删除不存在的房间
if r.RoomId == 0 || r.Status != 1 {
return del(p.Name)
}
// 删除不存在的支线
var flag bool
for _, branch := range r.Branches {
if branch.BranchId == p.BranchId {
flag = true
}
}
if !flag {
return del(p.Name)
}
return nil
}