unity容器监听
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

269 lines
6.2 KiB

package biz
import (
"context"
"fmt"
"git.gz.internal.jumaiyx.cn/jm/jmproto/conf"
containerv1 "git.gz.internal.jumaiyx.cn/jm/jmproto/container/v1"
"git.gz.internal.jumaiyx.cn/pkg/config"
k8sclient "git.gz.internal.jumaiyx.cn/pkg/k8s-client"
"git.gz.internal.jumaiyx.cn/pkg/k8s-client/pod"
"git.gz.internal.jumaiyx.cn/pkg/log"
"git.gz.internal.jumaiyx.cn/pkg/webhook"
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/google/wire"
"github.com/redis/go-redis/v9"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"os"
"time"
)
const (
redisKey = "container:monitor:%s"
limit = 2
//outTime = int64(86400) * 1
outTime = int64(86400) * 3
wOutTime = int64(86400) * 7
//wOutTime = int64(86400) * 7
)
var Exclude = []string{"room-server-3-0-20230407163627"}
// ProviderBizSet is biz providers.
var ProviderBizSet = wire.NewSet(NewBiz)
func NewRedis(c *conf.Data) *redis.Client {
dataRedis := c.Redis
return redis.NewClient(&redis.Options{
DB: int(dataRedis.Db),
Addr: dataRedis.Addr,
Username: dataRedis.Username,
Password: dataRedis.Password,
})
}
func NewPod(logger log.Logger) (pod.PodRepo, error) {
return k8sclient.Pod(k8sclient.Overview{
Namespace: k8sclient.UnityNamespace,
Log: logger,
})
}
type Data struct {
Name string
Second int64
}
type Biz struct {
logger log.Logger
redisClient *redis.Client
pod pod.PodRepo
webhookConf *conf.Webhook
nameCh chan []Data
flagCh chan bool
}
func NewBiz(logger log.Logger, data *conf.Data, webhookConf *conf.Webhook) (*Biz, error) {
newPod, err := NewPod(logger)
if err != nil {
return nil, err
}
return &Biz{logger: logger, redisClient: NewRedis(data), pod: newPod, webhookConf: webhookConf, nameCh: make(chan []Data, 1), flagCh: make(chan bool, 1)}, nil
}
// Run 执行检测
func (biz *Biz) Run(ctx context.Context) {
go func() {
biz.Room(ctx)
}()
continueToken := ""
for {
pods, err := biz.pod.List(ctx, v1.ListOptions{
Limit: limit,
Continue: continueToken,
})
if err != nil {
biz.logger.Error(err)
break
}
var nameList []Data
for _, p := range pods.Items {
name := p.Name[:31]
result, err := biz.redisClient.Get(ctx, fmt.Sprintf(redisKey, name)).Result()
if err != nil {
biz.logger.Error(err)
}
if result != "" {
continue
}
var flag bool
for _, e := range Exclude {
if e == name {
flag = true
}
}
if flag {
continue
}
s := time.Now().Unix() - p.Status.StartTime.Time.Unix()
if s > outTime {
nameList = append(nameList, Data{
Name: name,
Second: s,
})
//_, err = biz.redisClient.Set(ctx, fmt.Sprintf(redisKey, p.Name), "1", time.Duration(wOutTime-s)*time.Second).Result()
//if err != nil {
// biz.logger.Error(err)
//}
}
}
if len(nameList) != 0 {
biz.nameCh <- nameList
}
continueToken = pods.Continue
if continueToken == "" {
break
}
}
biz.flagCh <- true
}
func (biz *Biz) Room(ctx context.Context) {
for {
select {
case dataList := <-biz.nameCh:
if len(dataList) == 0 {
break
}
var nameList []string
for _, data := range dataList {
nameList = append(nameList, data.Name)
}
client, err := biz.containerClient(ctx)
if err != nil {
biz.logger.Error(err)
break
}
containerListRsp, err := client.GetContainerList(ctx, &containerv1.GetContainerListReq{
NameList: nameList,
})
if err != nil {
biz.logger.Error(err)
break
}
containerMap := make(map[string]*containerv1.ContainerData, len(containerListRsp.List))
for _, data := range containerListRsp.List {
containerMap[data.Name] = data
}
for _, v := range dataList {
data, ok := containerMap[v.Name]
if ok {
roomId := int64(0)
if data.Params != nil {
roomId = data.Params.RoomId
}
biz.webhook(v.Name, v.Second, roomId, false)
} else {
//biz.webhook(v.Name, v.Second, 0, true)
}
}
case <-biz.flagCh:
//flag = true
return
}
//if flag {
// return
//}
}
}
func (biz *Biz) containerClient(ctx context.Context) (containerv1.ContainerClient, error) {
//discovery, err := register.Discovery(ctx, biz.logger, base.ContainerServiceName, register.WithTimeout(time.Second*10))
discovery, err := grpc.DialInsecure(
ctx,
grpc.WithEndpoint("127.0.0.1:9000"),
grpc.WithTimeout(30*time.Second),
)
if err != nil {
biz.logger.Errorf("configure client failed:%v", err)
return nil, err
}
return containerv1.NewContainerClient(discovery), nil
}
func (biz *Biz) webhook(name string, s int64, roomId int64, f bool) {
go func() {
env := os.Getenv(config.JmEnvironment)
if env == "" {
env = "dev"
}
var elements []webhook.InteractiveElementsBody
elements = append(elements, webhook.InteractiveElementsBody{
Tag: webhook.InteractiveTagDiv,
Text: &webhook.InteractiveText{
Tag: webhook.InteractiveTagPlainText,
Content: "容器名称:" + name + "\n",
},
})
elements = append(elements, webhook.InteractiveElementsBody{
Tag: webhook.InteractiveTagDiv,
Text: &webhook.InteractiveText{
Tag: webhook.InteractiveTagPlainText,
Content: "运行环境:" + env + "\n",
},
})
d, h, m := biz.shi(s)
elements = append(elements, webhook.InteractiveElementsBody{
Tag: webhook.InteractiveTagDiv,
Text: &webhook.InteractiveText{
Tag: webhook.InteractiveTagPlainText,
Content: fmt.Sprintf("运行时长:%d天%d时%d分\n", d, h, m),
},
})
room := "关联房间:无\n"
if roomId == 0 {
room = fmt.Sprintf("关联房间:%d\n", roomId)
}
if f {
room = "关联房间:容器数据不存在\n"
}
elements = append(elements, webhook.InteractiveElementsBody{
Tag: webhook.InteractiveTagDiv,
Text: &webhook.InteractiveText{
Tag: webhook.InteractiveTagPlainText,
Content: room,
},
})
color := webhook.GeneralColor
if s >= outTime {
color = webhook.WarnColor
}
webhook.InteractivePush(biz.webhookConf.ContainerWebhook.Url, webhook.CardContent("容器时长提醒", color, elements...))
}()
}
func (biz *Biz) shi(s int64) (d, h, m int64) {
d, s = biz.yu(s, 86400)
if s != 0 && s > 3600 {
h, s = biz.yu(s, 3600)
}
if s != 0 && s > 60 {
m, s = biz.yu(s, 60)
}
return
}
func (biz *Biz) yu(s int64, d int64) (m int64, c int64) {
m = s / d
c = s % d
return
}