discovery源码

builder.go

  1. package discovery
  2. import (
  3. "context"
  4. "errors"
  5. "strings"
  6. "time"
  7. "mxshop/pkg/log"
  8. "google.golang.org/grpc/resolver"
  9. "mxshop/gmicro/registry"
  10. )
  11. const name = "discovery"
  12. // Option is builder option.
  13. type Option func(o *builder)
  14. // WithTimeout with timeout option.
  15. func WithTimeout(timeout time.Duration) Option {
  16. return func(b *builder) {
  17. b.timeout = timeout
  18. }
  19. }
  20. // WithInsecure with isSecure option.
  21. func WithInsecure(insecure bool) Option {
  22. return func(b *builder) {
  23. b.insecure = insecure
  24. }
  25. }
  26. type builder struct {
  27. discoverer registry.Discovery
  28. timeout time.Duration
  29. insecure bool
  30. }
  31. // NewBuilder creates a builder which is used to factory registry resolvers.
  32. func NewBuilder(d registry.Discovery, opts ...Option) resolver.Builder {
  33. b := &builder{
  34. discoverer: d,
  35. timeout: time.Second * 10,
  36. insecure: false,
  37. }
  38. for _, o := range opts {
  39. o(b)
  40. }
  41. return b
  42. }
  43. func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  44. var (
  45. err error
  46. w registry.Watcher
  47. )
  48. done := make(chan struct{}, 1)
  49. ctx, cancel := context.WithCancel(context.Background())
  50. go func() {
  51. w, err = b.discoverer.Watch(ctx, strings.TrimPrefix(target.URL.Path, "/"))
  52. close(done)
  53. }()
  54. select {
  55. case <-done:
  56. case <-time.After(b.timeout):
  57. err = errors.New("discovery create watcher overtime")
  58. }
  59. if err != nil {
  60. cancel()
  61. return nil, err
  62. }
  63. r := &discoveryResolver{
  64. w: w,
  65. cc: cc,
  66. ctx: ctx,
  67. cancel: cancel,
  68. insecure: b.insecure,
  69. }
  70. go r.watch()
  71. return r, nil
  72. }
  73. // Scheme return scheme of discovery
  74. func (*builder) Scheme() string {
  75. return name
  76. }

resolver.go

  1. package discovery
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "net/url"
  7. "strconv"
  8. "time"
  9. "google.golang.org/grpc/attributes"
  10. "google.golang.org/grpc/resolver"
  11. "mxshop/gmicro/registry"
  12. "mxshop/pkg/log"
  13. )
  14. type discoveryResolver struct {
  15. w registry.Watcher
  16. cc resolver.ClientConn
  17. ctx context.Context
  18. cancel context.CancelFunc
  19. insecure bool
  20. }
  21. func (r *discoveryResolver) watch() {
  22. for {
  23. select {
  24. case <-r.ctx.Done():
  25. return
  26. default:
  27. }
  28. ins, err := r.w.Next()
  29. if err != nil {
  30. if errors.Is(err, context.Canceled) {
  31. return
  32. }
  33. log.Errorf("[resolver] Failed to watch discovery endpoint: %v", err)
  34. time.Sleep(time.Second)
  35. continue
  36. }
  37. r.update(ins)
  38. }
  39. }
  40. func (r *discoveryResolver) update(ins []*registry.ServiceInstance) {
  41. addrs := make([]resolver.Address, 0)
  42. endpoints := make(map[string]struct{})
  43. for _, in := range ins {
  44. endpoint, err := ParseEndpoint(in.Endpoints, "grpc", !r.insecure)
  45. if err != nil {
  46. log.Errorf("[resolver] Failed to parse discovery endpoint: %v", err)
  47. continue
  48. }
  49. if endpoint == "" {
  50. continue
  51. }
  52. // filter redundant endpoints
  53. if _, ok := endpoints[endpoint]; ok {
  54. continue
  55. }
  56. endpoints[endpoint] = struct{}{}
  57. addr := resolver.Address{
  58. ServerName: in.Name,
  59. Attributes: parseAttributes(in.Metadata),
  60. Addr: endpoint,
  61. }
  62. addr.Attributes = addr.Attributes.WithValue("rawServiceInstance", in)
  63. addrs = append(addrs, addr)
  64. }
  65. if len(addrs) == 0 {
  66. log.Warnf("[resolver] Zero endpoint found,refused to write, instances: %v", ins)
  67. return
  68. }
  69. err := r.cc.UpdateState(resolver.State{Addresses: addrs})
  70. if err != nil {
  71. log.Errorf("[resolver] failed to update state: %s", err)
  72. }
  73. b, _ := json.Marshal(ins)
  74. log.Infof("[resolver] update instances: %s", b)
  75. }
  76. func (r *discoveryResolver) Close() {
  77. r.cancel()
  78. err := r.w.Stop()
  79. if err != nil {
  80. log.Errorf("[resolver] failed to watch top: %s", err)
  81. }
  82. }
  83. func (r *discoveryResolver) ResolveNow(options resolver.ResolveNowOptions) {}
  84. func parseAttributes(md map[string]string) *attributes.Attributes {
  85. var a *attributes.Attributes
  86. for k, v := range md {
  87. if a == nil {
  88. a = attributes.New(k, v)
  89. } else {
  90. a = a.WithValue(k, v)
  91. }
  92. }
  93. return a
  94. }
  95. // NewEndpoint new an Endpoint URL.
  96. func NewEndpoint(scheme, host string, isSecure bool) *url.URL {
  97. var query string
  98. if isSecure {
  99. query = "isSecure=true"
  100. }
  101. return &url.URL{Scheme: scheme, Host: host, RawQuery: query}
  102. }
  103. // ParseEndpoint parses an Endpoint URL.
  104. func ParseEndpoint(endpoints []string, scheme string, isSecure bool) (string, error) {
  105. for _, e := range endpoints {
  106. u, err := url.Parse(e)
  107. if err != nil {
  108. return "", err
  109. }
  110. if u.Scheme == scheme {
  111. if IsSecure(u) == isSecure {
  112. return u.Host, nil
  113. }
  114. }
  115. }
  116. return "", nil
  117. }
  118. // IsSecure parses isSecure for Endpoint URL.
  119. func IsSecure(u *url.URL) bool {
  120. ok, err := strconv.ParseBool(u.Query().Get("isSecure"))
  121. if err != nil {
  122. return false
  123. }
  124. return ok
  125. }