golang-im聊天
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.
 
 
 
 

190 строки
4.6 KiB

  1. package k8s
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "gim/pkg/logger"
  7. "go.uber.org/zap"
  8. "k8s.io/client-go/kubernetes"
  9. "k8s.io/client-go/rest"
  10. "sort"
  11. "strings"
  12. "google.golang.org/grpc/resolver"
  13. v1 "k8s.io/api/core/v1"
  14. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  15. "k8s.io/apimachinery/pkg/fields"
  16. )
  17. // 实现k8s地址解析,根据k8s的service的endpoints解析 比如,k8s:///namespace.server:port
  18. func init() {
  19. resolver.Register(NewK8sBuilder())
  20. }
  21. func GetK8STarget(namespace, server, port string) string {
  22. return fmt.Sprintf("k8s:///%s.%s:%s", namespace, server, port)
  23. }
  24. type k8sBuilder struct{}
  25. func NewK8sBuilder() resolver.Builder {
  26. return &k8sBuilder{}
  27. }
  28. func (b *k8sBuilder) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  29. return newK8sResolver(target, clientConn)
  30. }
  31. func (b *k8sBuilder) Scheme() string {
  32. return "k8s"
  33. }
  34. // k8sResolver k8s地址解析器
  35. type k8sResolver struct {
  36. clientConn resolver.ClientConn
  37. endpoint string
  38. ips []string
  39. port string
  40. cancel context.CancelFunc
  41. }
  42. func getK8sClient() (*kubernetes.Clientset, error) {
  43. config, err := rest.InClusterConfig()
  44. if err != nil {
  45. return nil, err
  46. }
  47. k8sClient, err := kubernetes.NewForConfig(config)
  48. if err != nil {
  49. return nil, err
  50. }
  51. return k8sClient, nil
  52. }
  53. func newK8sResolver(target resolver.Target, clientConn resolver.ClientConn) (*k8sResolver, error) {
  54. namespace, service, port, err := resolveEndpoint(target.Endpoint)
  55. if err != nil {
  56. return nil, err
  57. }
  58. k8sClient, err := getK8sClient()
  59. if err != nil {
  60. return nil, err
  61. }
  62. ctx, cancel := context.WithCancel(context.Background())
  63. k8sResolver := &k8sResolver{
  64. clientConn: clientConn,
  65. endpoint: target.Endpoint,
  66. port: port,
  67. cancel: cancel,
  68. }
  69. // 监听变化
  70. go func() {
  71. events, err := k8sClient.CoreV1().Endpoints(namespace).Watch(context.TODO(), metav1.ListOptions{
  72. LabelSelector: fields.OneTermEqualSelector("app", service).String(),
  73. })
  74. if err != nil {
  75. logger.Logger.Error("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Error(err))
  76. panic(err)
  77. }
  78. for {
  79. select {
  80. case <-ctx.Done():
  81. return
  82. case event := <-events.ResultChan():
  83. endpoints, ok := event.Object.(*v1.Endpoints)
  84. if !ok {
  85. logger.Logger.Warn("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Any("event", event))
  86. continue
  87. }
  88. ips, err := getIpsFromEndpoint(endpoints)
  89. if err != nil {
  90. logger.Logger.Warn("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Error(err))
  91. continue
  92. }
  93. k8sResolver.updateState(ips)
  94. }
  95. }
  96. }()
  97. return k8sResolver, nil
  98. }
  99. func (r *k8sResolver) ResolveNow(opt resolver.ResolveNowOptions) {
  100. logger.Logger.Info("k8s resolver resolveNow", zap.String("endpoint", r.endpoint))
  101. }
  102. func (r *k8sResolver) Close() {
  103. r.cancel()
  104. }
  105. // updateState 更新地址列表
  106. func (r *k8sResolver) updateState(newIPs []string) {
  107. if isEqualIps(r.ips, newIPs) {
  108. return
  109. }
  110. r.ips = newIPs
  111. addresses := make([]resolver.Address, 0, len(r.ips))
  112. for _, v := range r.ips {
  113. addresses = append(addresses, resolver.Address{
  114. Addr: v + ":" + r.port,
  115. })
  116. }
  117. state := resolver.State{
  118. Addresses: addresses,
  119. }
  120. logger.Logger.Info("k8s resolver updateState", zap.String("endpoint", r.endpoint), zap.Any("addresses", addresses))
  121. r.clientConn.UpdateState(state)
  122. return
  123. }
  124. // resolveEndpoint 对grpc的Endpoint进行解析,格式必须是:k8s:///namespace.server:port
  125. func resolveEndpoint(endpoint string) (namespace string, service string, port string, err error) {
  126. namespaceAndServerPort := strings.Split(endpoint, ".")
  127. if len(namespaceAndServerPort) != 2 {
  128. err = errors.New("endpoint must is namespace.server:port")
  129. return
  130. }
  131. namespace = namespaceAndServerPort[0]
  132. serverAndPort := strings.Split(namespaceAndServerPort[1], ":")
  133. if len(serverAndPort) != 2 {
  134. err = errors.New("endpoint must is namespace.server:port")
  135. return
  136. }
  137. service = serverAndPort[0]
  138. port = serverAndPort[1]
  139. return
  140. }
  141. // isEqualIps 判断两个地址列表是否相等
  142. func isEqualIps(s1, s2 []string) bool {
  143. if len(s1) != len(s2) {
  144. return false
  145. }
  146. sort.Strings(s1)
  147. sort.Strings(s2)
  148. for i := range s1 {
  149. if s1[i] != s2[i] {
  150. return false
  151. }
  152. }
  153. return true
  154. }
  155. // getIpsFromEndpoint 获取endpoints里面的IP列表
  156. func getIpsFromEndpoint(endpoints *v1.Endpoints) ([]string, error) {
  157. if len(endpoints.Subsets) < 1 {
  158. return nil, errors.New("subsets length less than 1")
  159. }
  160. endpointAddresses := endpoints.Subsets[0].Addresses
  161. ips := make([]string, 0, len(endpoints.Subsets[0].Addresses))
  162. for _, v := range endpointAddresses {
  163. ips = append(ips, v.IP)
  164. }
  165. return ips, nil
  166. }