package biz import ( "context" "fmt" "git.gz.internal.jumaiyx.cn/jm/jmproto/conf" containerv1 "git.gz.internal.jumaiyx.cn/jm/jmproto/container/v1" "git.gz.internal.jumaiyx.cn/pkg/config" k8sclient "git.gz.internal.jumaiyx.cn/pkg/k8s-client" "git.gz.internal.jumaiyx.cn/pkg/k8s-client/pod" "git.gz.internal.jumaiyx.cn/pkg/log" "git.gz.internal.jumaiyx.cn/pkg/webhook" "github.com/go-kratos/kratos/v2/transport/grpc" "github.com/google/wire" "github.com/redis/go-redis/v9" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "os" "time" ) const ( redisKey = "container:monitor:%s" limit = 2 //outTime = int64(86400) * 1 outTime = int64(86400) * 3 wOutTime = int64(86400) * 7 //wOutTime = int64(86400) * 7 ) var Exclude = []string{"room-server-3-0-20230407163627"} // ProviderBizSet is biz providers. var ProviderBizSet = wire.NewSet(NewBiz) func NewRedis(c *conf.Data) *redis.Client { dataRedis := c.Redis return redis.NewClient(&redis.Options{ DB: int(dataRedis.Db), Addr: dataRedis.Addr, Username: dataRedis.Username, Password: dataRedis.Password, }) } func NewPod(logger log.Logger) (pod.PodRepo, error) { return k8sclient.Pod(k8sclient.Overview{ Namespace: k8sclient.UnityNamespace, Log: logger, }) } type Data struct { Name string Second int64 } type Biz struct { logger log.Logger redisClient *redis.Client pod pod.PodRepo webhookConf *conf.Webhook nameCh chan []Data flagCh chan bool } func NewBiz(logger log.Logger, data *conf.Data, webhookConf *conf.Webhook) (*Biz, error) { newPod, err := NewPod(logger) if err != nil { return nil, err } return &Biz{logger: logger, redisClient: NewRedis(data), pod: newPod, webhookConf: webhookConf, nameCh: make(chan []Data, 1), flagCh: make(chan bool, 1)}, nil } // Run 执行检测 func (biz *Biz) Run(ctx context.Context) { go func() { biz.Room(ctx) }() continueToken := "" for { pods, err := biz.pod.List(ctx, v1.ListOptions{ Limit: limit, Continue: continueToken, }) if err != nil { biz.logger.Error(err) break } var nameList []Data for _, p := range pods.Items { name := p.Name[:31] result, err := biz.redisClient.Get(ctx, fmt.Sprintf(redisKey, name)).Result() if err != nil { biz.logger.Error(err) } if result != "" { continue } var flag bool for _, e := range Exclude { if e == name { flag = true } } if flag { continue } s := time.Now().Unix() - p.Status.StartTime.Time.Unix() if s > outTime { nameList = append(nameList, Data{ Name: name, Second: s, }) //_, err = biz.redisClient.Set(ctx, fmt.Sprintf(redisKey, p.Name), "1", time.Duration(wOutTime-s)*time.Second).Result() //if err != nil { // biz.logger.Error(err) //} } } if len(nameList) != 0 { biz.nameCh <- nameList } continueToken = pods.Continue if continueToken == "" { break } } biz.flagCh <- true } func (biz *Biz) Room(ctx context.Context) { for { select { case dataList := <-biz.nameCh: if len(dataList) == 0 { break } var nameList []string for _, data := range dataList { nameList = append(nameList, data.Name) } client, err := biz.containerClient(ctx) if err != nil { biz.logger.Error(err) break } containerListRsp, err := client.GetContainerList(ctx, &containerv1.GetContainerListReq{ NameList: nameList, }) if err != nil { biz.logger.Error(err) break } containerMap := make(map[string]*containerv1.ContainerData, len(containerListRsp.List)) for _, data := range containerListRsp.List { containerMap[data.Name] = data } for _, v := range dataList { data, ok := containerMap[v.Name] if ok { roomId := int64(0) if data.Params != nil { roomId = data.Params.RoomId } biz.webhook(v.Name, v.Second, roomId, false) } else { //biz.webhook(v.Name, v.Second, 0, true) } } case <-biz.flagCh: //flag = true return } //if flag { // return //} } } func (biz *Biz) containerClient(ctx context.Context) (containerv1.ContainerClient, error) { //discovery, err := register.Discovery(ctx, biz.logger, base.ContainerServiceName, register.WithTimeout(time.Second*10)) discovery, err := grpc.DialInsecure( ctx, grpc.WithEndpoint("127.0.0.1:9000"), grpc.WithTimeout(30*time.Second), ) if err != nil { biz.logger.Errorf("configure client failed:%v", err) return nil, err } return containerv1.NewContainerClient(discovery), nil } func (biz *Biz) webhook(name string, s int64, roomId int64, f bool) { go func() { env := os.Getenv(config.JmEnvironment) if env == "" { env = "dev" } var elements []webhook.InteractiveElementsBody elements = append(elements, webhook.InteractiveElementsBody{ Tag: webhook.InteractiveTagDiv, Text: &webhook.InteractiveText{ Tag: webhook.InteractiveTagPlainText, Content: "容器名称:" + name + "\n", }, }) elements = append(elements, webhook.InteractiveElementsBody{ Tag: webhook.InteractiveTagDiv, Text: &webhook.InteractiveText{ Tag: webhook.InteractiveTagPlainText, Content: "运行环境:" + env + "\n", }, }) d, h, m := biz.shi(s) elements = append(elements, webhook.InteractiveElementsBody{ Tag: webhook.InteractiveTagDiv, Text: &webhook.InteractiveText{ Tag: webhook.InteractiveTagPlainText, Content: fmt.Sprintf("运行时长:%d天%d时%d分\n", d, h, m), }, }) room := "关联房间:无\n" if roomId == 0 { room = fmt.Sprintf("关联房间:%d\n", roomId) } if f { room = "关联房间:容器数据不存在\n" } elements = append(elements, webhook.InteractiveElementsBody{ Tag: webhook.InteractiveTagDiv, Text: &webhook.InteractiveText{ Tag: webhook.InteractiveTagPlainText, Content: room, }, }) color := webhook.GeneralColor if s >= outTime { color = webhook.WarnColor } webhook.InteractivePush(biz.webhookConf.ContainerWebhook.Url, webhook.CardContent("容器时长提醒", color, elements...)) }() } func (biz *Biz) shi(s int64) (d, h, m int64) { d, s = biz.yu(s, 86400) if s != 0 && s > 3600 { h, s = biz.yu(s, 3600) } if s != 0 && s > 60 { m, s = biz.yu(s, 60) } return } func (biz *Biz) yu(s int64, d int64) (m int64, c int64) { m = s / d c = s % d return }