discovery源码
builder.go
package discovery
import (
"context"
"errors"
"strings"
"time"
"mxshop/pkg/log"
"google.golang.org/grpc/resolver"
"mxshop/gmicro/registry"
)
const name = "discovery"
// Option is builder option.
type Option func(o *builder)
// WithTimeout with timeout option.
func WithTimeout(timeout time.Duration) Option {
return func(b *builder) {
b.timeout = timeout
}
}
// WithInsecure with isSecure option.
func WithInsecure(insecure bool) Option {
return func(b *builder) {
b.insecure = insecure
}
}
type builder struct {
discoverer registry.Discovery
timeout time.Duration
insecure bool
}
// NewBuilder creates a builder which is used to factory registry resolvers.
func NewBuilder(d registry.Discovery, opts ...Option) resolver.Builder {
b := &builder{
discoverer: d,
timeout: time.Second * 10,
insecure: false,
}
for _, o := range opts {
o(b)
}
return b
}
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
var (
err error
w registry.Watcher
)
done := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(context.Background())
go func() {
w, err = b.discoverer.Watch(ctx, strings.TrimPrefix(target.URL.Path, "/"))
close(done)
}()
select {
case <-done:
case <-time.After(b.timeout):
err = errors.New("discovery create watcher overtime")
}
if err != nil {
cancel()
return nil, err
}
r := &discoveryResolver{
w: w,
cc: cc,
ctx: ctx,
cancel: cancel,
insecure: b.insecure,
}
go r.watch()
return r, nil
}
// Scheme return scheme of discovery
func (*builder) Scheme() string {
return name
}
resolver.go
package discovery
import (
"context"
"encoding/json"
"errors"
"net/url"
"strconv"
"time"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
"mxshop/gmicro/registry"
"mxshop/pkg/log"
)
type discoveryResolver struct {
w registry.Watcher
cc resolver.ClientConn
ctx context.Context
cancel context.CancelFunc
insecure bool
}
func (r *discoveryResolver) watch() {
for {
select {
case <-r.ctx.Done():
return
default:
}
ins, err := r.w.Next()
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
log.Errorf("[resolver] Failed to watch discovery endpoint: %v", err)
time.Sleep(time.Second)
continue
}
r.update(ins)
}
}
func (r *discoveryResolver) update(ins []*registry.ServiceInstance) {
addrs := make([]resolver.Address, 0)
endpoints := make(map[string]struct{})
for _, in := range ins {
endpoint, err := ParseEndpoint(in.Endpoints, "grpc", !r.insecure)
if err != nil {
log.Errorf("[resolver] Failed to parse discovery endpoint: %v", err)
continue
}
if endpoint == "" {
continue
}
// filter redundant endpoints
if _, ok := endpoints[endpoint]; ok {
continue
}
endpoints[endpoint] = struct{}{}
addr := resolver.Address{
ServerName: in.Name,
Attributes: parseAttributes(in.Metadata),
Addr: endpoint,
}
addr.Attributes = addr.Attributes.WithValue("rawServiceInstance", in)
addrs = append(addrs, addr)
}
if len(addrs) == 0 {
log.Warnf("[resolver] Zero endpoint found,refused to write, instances: %v", ins)
return
}
err := r.cc.UpdateState(resolver.State{Addresses: addrs})
if err != nil {
log.Errorf("[resolver] failed to update state: %s", err)
}
b, _ := json.Marshal(ins)
log.Infof("[resolver] update instances: %s", b)
}
func (r *discoveryResolver) Close() {
r.cancel()
err := r.w.Stop()
if err != nil {
log.Errorf("[resolver] failed to watch top: %s", err)
}
}
func (r *discoveryResolver) ResolveNow(options resolver.ResolveNowOptions) {}
func parseAttributes(md map[string]string) *attributes.Attributes {
var a *attributes.Attributes
for k, v := range md {
if a == nil {
a = attributes.New(k, v)
} else {
a = a.WithValue(k, v)
}
}
return a
}
// NewEndpoint new an Endpoint URL.
func NewEndpoint(scheme, host string, isSecure bool) *url.URL {
var query string
if isSecure {
query = "isSecure=true"
}
return &url.URL{Scheme: scheme, Host: host, RawQuery: query}
}
// ParseEndpoint parses an Endpoint URL.
func ParseEndpoint(endpoints []string, scheme string, isSecure bool) (string, error) {
for _, e := range endpoints {
u, err := url.Parse(e)
if err != nil {
return "", err
}
if u.Scheme == scheme {
if IsSecure(u) == isSecure {
return u.Host, nil
}
}
}
return "", nil
}
// IsSecure parses isSecure for Endpoint URL.
func IsSecure(u *url.URL) bool {
ok, err := strconv.ParseBool(u.Query().Get("isSecure"))
if err != nil {
return false
}
return ok
}