You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
228 lines
5.2 KiB
228 lines
5.2 KiB
2 years ago
|
package consul
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
|
||
|
"github.com/go-kratos/kratos/v2/registry"
|
||
|
|
||
|
"github.com/hashicorp/consul/api"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
_ registry.Registrar = &Registry{}
|
||
|
_ registry.Discovery = &Registry{}
|
||
|
)
|
||
|
|
||
|
// Option is consul registry option.
|
||
|
type Option func(*Registry)
|
||
|
|
||
|
// WithHealthCheck with registry health check option.
|
||
|
func WithHealthCheck(enable bool) Option {
|
||
|
return func(o *Registry) {
|
||
|
o.enableHealthCheck = enable
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithHeartbeat enable or disable heartbeat
|
||
|
func WithHeartbeat(enable bool) Option {
|
||
|
return func(o *Registry) {
|
||
|
if o.cli != nil {
|
||
|
o.cli.heartbeat = enable
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithServiceResolver with endpoint function option.
|
||
|
func WithServiceResolver(fn ServiceResolver) Option {
|
||
|
return func(o *Registry) {
|
||
|
if o.cli != nil {
|
||
|
o.cli.resolver = fn
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithHealthCheckInterval with healthcheck interval in seconds.
|
||
|
func WithHealthCheckInterval(interval int) Option {
|
||
|
return func(o *Registry) {
|
||
|
if o.cli != nil {
|
||
|
o.cli.healthcheckInterval = interval
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithDeregisterCriticalServiceAfter with deregister-critical-service-after in seconds.
|
||
|
func WithDeregisterCriticalServiceAfter(interval int) Option {
|
||
|
return func(o *Registry) {
|
||
|
if o.cli != nil {
|
||
|
o.cli.deregisterCriticalServiceAfter = interval
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// WithServiceCheck with service checks
|
||
|
func WithServiceCheck(checks ...*api.AgentServiceCheck) Option {
|
||
|
return func(o *Registry) {
|
||
|
if o.cli != nil {
|
||
|
o.cli.serviceChecks = checks
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Config is consul registry config
|
||
|
type Config struct {
|
||
|
*api.Config
|
||
|
}
|
||
|
|
||
|
// Registry is consul registry
|
||
|
type Registry struct {
|
||
|
cli *Client
|
||
|
enableHealthCheck bool
|
||
|
registry map[string]*serviceSet
|
||
|
lock sync.RWMutex
|
||
|
}
|
||
|
|
||
|
// New creates consul registry
|
||
|
func New(apiClient *api.Client, opts ...Option) *Registry {
|
||
|
r := &Registry{
|
||
|
cli: NewClient(apiClient),
|
||
|
registry: make(map[string]*serviceSet),
|
||
|
enableHealthCheck: true,
|
||
|
}
|
||
|
for _, o := range opts {
|
||
|
o(r)
|
||
|
}
|
||
|
return r
|
||
|
}
|
||
|
|
||
|
// Register register service
|
||
|
func (r *Registry) Register(ctx context.Context, svc *registry.ServiceInstance) error {
|
||
|
return r.cli.Register(ctx, svc, r.enableHealthCheck)
|
||
|
}
|
||
|
|
||
|
// Deregister deregister service
|
||
|
func (r *Registry) Deregister(ctx context.Context, svc *registry.ServiceInstance) error {
|
||
|
return r.cli.Deregister(ctx, svc.ID)
|
||
|
}
|
||
|
|
||
|
// GetService return service by name
|
||
|
func (r *Registry) GetService(ctx context.Context, name string) ([]*registry.ServiceInstance, error) {
|
||
|
r.lock.RLock()
|
||
|
defer r.lock.RUnlock()
|
||
|
set := r.registry[name]
|
||
|
|
||
|
getRemote := func() []*registry.ServiceInstance {
|
||
|
services, _, err := r.cli.Service(ctx, name, 0, true)
|
||
|
if err == nil && len(services) > 0 {
|
||
|
return services
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
if set == nil {
|
||
|
if s := getRemote(); len(s) > 0 {
|
||
|
return s, nil
|
||
|
}
|
||
|
return nil, fmt.Errorf("service %s not resolved in registry", name)
|
||
|
}
|
||
|
ss, _ := set.services.Load().([]*registry.ServiceInstance)
|
||
|
if ss == nil {
|
||
|
if s := getRemote(); len(s) > 0 {
|
||
|
return s, nil
|
||
|
}
|
||
|
return nil, fmt.Errorf("service %s not found in registry", name)
|
||
|
}
|
||
|
return ss, nil
|
||
|
}
|
||
|
|
||
|
// ListServices return service list.
|
||
|
func (r *Registry) ListServices() (allServices map[string][]*registry.ServiceInstance, err error) {
|
||
|
r.lock.RLock()
|
||
|
defer r.lock.RUnlock()
|
||
|
allServices = make(map[string][]*registry.ServiceInstance)
|
||
|
for name, set := range r.registry {
|
||
|
var services []*registry.ServiceInstance
|
||
|
ss, _ := set.services.Load().([]*registry.ServiceInstance)
|
||
|
if ss == nil {
|
||
|
continue
|
||
|
}
|
||
|
services = append(services, ss...)
|
||
|
allServices[name] = services
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Watch resolve service by name
|
||
|
func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, error) {
|
||
|
r.lock.Lock()
|
||
|
defer r.lock.Unlock()
|
||
|
set, ok := r.registry[name]
|
||
|
if !ok {
|
||
|
set = &serviceSet{
|
||
|
watcher: make(map[*watcher]struct{}),
|
||
|
services: &atomic.Value{},
|
||
|
serviceName: name,
|
||
|
}
|
||
|
r.registry[name] = set
|
||
|
}
|
||
|
|
||
|
// 初始化watcher
|
||
|
w := &watcher{
|
||
|
event: make(chan struct{}, 1),
|
||
|
}
|
||
|
w.ctx, w.cancel = context.WithCancel(context.Background())
|
||
|
w.set = set
|
||
|
set.lock.Lock()
|
||
|
set.watcher[w] = struct{}{}
|
||
|
set.lock.Unlock()
|
||
|
ss, _ := set.services.Load().([]*registry.ServiceInstance)
|
||
|
if len(ss) > 0 {
|
||
|
// If the service has a value, it needs to be pushed to the watcher,
|
||
|
// otherwise the initial data may be blocked forever during the watch.
|
||
|
w.event <- struct{}{}
|
||
|
}
|
||
|
|
||
|
if !ok {
|
||
|
err := r.resolve(set)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
return w, nil
|
||
|
}
|
||
|
|
||
|
func (r *Registry) resolve(ss *serviceSet) error {
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||
|
services, idx, err := r.cli.Service(ctx, ss.serviceName, 0, true)
|
||
|
cancel()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
} else if len(services) > 0 {
|
||
|
ss.broadcast(services)
|
||
|
}
|
||
|
go func() {
|
||
|
ticker := time.NewTicker(time.Second)
|
||
|
defer ticker.Stop()
|
||
|
for {
|
||
|
<-ticker.C
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*120)
|
||
|
tmpService, tmpIdx, err := r.cli.Service(ctx, ss.serviceName, idx, true)
|
||
|
cancel()
|
||
|
if err != nil {
|
||
|
time.Sleep(time.Second)
|
||
|
continue
|
||
|
}
|
||
|
if len(tmpService) != 0 && tmpIdx != idx {
|
||
|
services = tmpService
|
||
|
ss.broadcast(services)
|
||
|
}
|
||
|
idx = tmpIdx
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
return nil
|
||
|
}
|