You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
110 lines
2.4 KiB
110 lines
2.4 KiB
package apollo |
|
|
|
import ( |
|
"context" |
|
"strings" |
|
|
|
"github.com/go-kratos/kratos/v2/encoding" |
|
|
|
"github.com/go-kratos/kratos/v2/config" |
|
"github.com/go-kratos/kratos/v2/log" |
|
|
|
"github.com/apolloconfig/agollo/v4/storage" |
|
) |
|
|
|
type watcher struct { |
|
out <-chan []*config.KeyValue |
|
|
|
ctx context.Context |
|
cancelFn func() |
|
} |
|
|
|
type customChangeListener struct { |
|
in chan<- []*config.KeyValue |
|
apollo *apollo |
|
} |
|
|
|
func (c *customChangeListener) onChange(namespace string, changes map[string]*storage.ConfigChange) []*config.KeyValue { |
|
kv := make([]*config.KeyValue, 0, 2) |
|
if strings.Contains(namespace, ".") && !strings.Contains(namespace, properties) && |
|
(format(namespace) == yaml || format(namespace) == yml || format(namespace) == json) { |
|
value, err := c.apollo.client.GetConfigCache(namespace).Get("content") |
|
if err != nil { |
|
log.Warnw("apollo get config failed", "err", err) |
|
return nil |
|
} |
|
kv = append(kv, &config.KeyValue{ |
|
Key: namespace, |
|
Value: []byte(value.(string)), |
|
Format: format(namespace), |
|
}) |
|
|
|
return kv |
|
} |
|
|
|
next := make(map[string]interface{}) |
|
|
|
for key, change := range changes { |
|
resolve(genKey(namespace, key), change.NewValue, next) |
|
} |
|
|
|
f := format(namespace) |
|
codec := encoding.GetCodec(f) |
|
val, err := codec.Marshal(next) |
|
if err != nil { |
|
log.Warnf("apollo could not handle namespace %s: %v", namespace, err) |
|
return nil |
|
} |
|
kv = append(kv, &config.KeyValue{ |
|
Key: namespace, |
|
Value: val, |
|
Format: f, |
|
}) |
|
|
|
return kv |
|
} |
|
|
|
func (c *customChangeListener) OnChange(changeEvent *storage.ChangeEvent) { |
|
change := c.onChange(changeEvent.Namespace, changeEvent.Changes) |
|
if len(change) == 0 { |
|
return |
|
} |
|
|
|
c.in <- change |
|
} |
|
|
|
func (c *customChangeListener) OnNewestChange(changeEvent *storage.FullChangeEvent) {} |
|
|
|
func newWatcher(a *apollo) (config.Watcher, error) { |
|
changeCh := make(chan []*config.KeyValue) |
|
listener := &customChangeListener{in: changeCh, apollo: a} |
|
a.client.AddChangeListener(listener) |
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
return &watcher{ |
|
out: changeCh, |
|
|
|
ctx: ctx, |
|
cancelFn: func() { |
|
a.client.RemoveChangeListener(listener) |
|
cancel() |
|
}, |
|
}, nil |
|
} |
|
|
|
// Next will be blocked until the Stop method is called |
|
func (w *watcher) Next() ([]*config.KeyValue, error) { |
|
select { |
|
case kv := <-w.out: |
|
return kv, nil |
|
case <-w.ctx.Done(): |
|
return nil, w.ctx.Err() |
|
} |
|
} |
|
|
|
func (w *watcher) Stop() error { |
|
if w.cancelFn != nil { |
|
w.cancelFn() |
|
} |
|
return nil |
|
}
|
|
|