You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

204 lines
4.8 KiB

  1. package k8s
  2. import (
  3. "context"
  4. "egg-im/pkg/logger"
  5. "errors"
  6. "fmt"
  7. "go.uber.org/zap"
  8. v1 "k8s.io/api/discovery/v1"
  9. "k8s.io/client-go/kubernetes"
  10. "k8s.io/client-go/rest"
  11. "sort"
  12. "strings"
  13. "time"
  14. "google.golang.org/grpc/resolver"
  15. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  16. discoveryv1 "k8s.io/client-go/kubernetes/typed/discovery/v1"
  17. )
  18. // 实现k8s地址解析,根据k8s的service的endpoints解析 比如,k8s:///namespace.server:port
  19. func init() {
  20. resolver.Register(NewK8sBuilder())
  21. }
  22. func GetK8STarget(namespace, server, port string) string {
  23. return fmt.Sprintf("k8s:///%s.%s:%s", namespace, server, port)
  24. }
  25. type k8sBuilder struct{}
  26. func NewK8sBuilder() resolver.Builder {
  27. return &k8sBuilder{}
  28. }
  29. func (b *k8sBuilder) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  30. return newK8sResolver(target, clientConn)
  31. }
  32. func (b *k8sBuilder) Scheme() string {
  33. return "k8s"
  34. }
  35. // k8sResolver k8s地址解析器
  36. type k8sResolver struct {
  37. log *zap.Logger
  38. clientConn resolver.ClientConn
  39. discoveryClient discoveryv1.EndpointSliceInterface
  40. service string
  41. cancel context.CancelFunc
  42. ips []string
  43. port string
  44. }
  45. func GetK8sClient() (*kubernetes.Clientset, error) {
  46. config, err := rest.InClusterConfig()
  47. if err != nil {
  48. return nil, err
  49. }
  50. k8sClient, err := kubernetes.NewForConfig(config)
  51. if err != nil {
  52. return nil, err
  53. }
  54. return k8sClient, nil
  55. }
  56. func newK8sResolver(target resolver.Target, clientConn resolver.ClientConn) (*k8sResolver, error) {
  57. log := logger.Logger.With(zap.String("target", target.Endpoint))
  58. log.Info("k8s resolver build")
  59. namespace, service, port, err := parseTarget(target)
  60. if err != nil {
  61. log.Error("k8s resolver error", zap.Error(err))
  62. return nil, err
  63. }
  64. k8sClient, err := GetK8sClient()
  65. if err != nil {
  66. log.Error("k8s resolver error", zap.Error(err))
  67. return nil, err
  68. }
  69. ctx, cancel := context.WithCancel(context.Background())
  70. client := k8sClient.DiscoveryV1().EndpointSlices(namespace)
  71. k8sResolver := &k8sResolver{
  72. log: log,
  73. clientConn: clientConn,
  74. discoveryClient: client,
  75. service: service,
  76. cancel: cancel,
  77. port: port,
  78. }
  79. err = k8sResolver.updateState(true)
  80. if err != nil {
  81. log.Error("k8s resolver error", zap.Error(err))
  82. return nil, err
  83. }
  84. ticker := time.NewTicker(time.Second)
  85. // 监听变化
  86. go func() {
  87. for {
  88. select {
  89. case <-ctx.Done():
  90. return
  91. case <-ticker.C:
  92. _ = k8sResolver.updateState(false)
  93. }
  94. }
  95. }()
  96. return k8sResolver, nil
  97. }
  98. // ResolveNow grpc感知到连接异常,会做通知,观察日志得知
  99. func (r *k8sResolver) ResolveNow(opt resolver.ResolveNowOptions) {
  100. r.log.Info("k8s resolver resolveNow")
  101. }
  102. func (r *k8sResolver) Close() {
  103. r.log.Info("k8s resolver close")
  104. r.cancel()
  105. }
  106. // updateState 更新地址列表
  107. func (r *k8sResolver) updateState(isFromNew bool) error {
  108. list, err := r.discoveryClient.List(context.TODO(), metav1.ListOptions{LabelSelector: "app=" + r.service})
  109. if err != nil {
  110. r.log.Error("k8s resolver error", zap.Error(err))
  111. return err
  112. }
  113. newIPs := getIPs(list)
  114. if !isFromNew && isEqualIPs(r.ips, newIPs) {
  115. return nil
  116. }
  117. r.ips = newIPs
  118. addresses := make([]resolver.Address, 0, len(r.ips))
  119. for _, v := range r.ips {
  120. addresses = append(addresses, resolver.Address{
  121. Addr: v + ":" + r.port,
  122. })
  123. }
  124. state := resolver.State{
  125. Addresses: addresses,
  126. }
  127. r.log.Info("k8s resolver updateState", zap.Bool("is_from_new", isFromNew), zap.Any("service", r.service), zap.Any("addresses", addresses))
  128. err = r.clientConn.UpdateState(state)
  129. if err != nil {
  130. r.log.Error("k8s resolver error", zap.Error(err))
  131. return err
  132. }
  133. return nil
  134. }
  135. // parseTarget 对grpc的Endpoint进行解析,格式必须是:k8s:///namespace.server:port
  136. func parseTarget(target resolver.Target) (namespace string, service string, port string, err error) {
  137. namespaceAndServerPort := strings.Split(target.Endpoint, ".")
  138. if len(namespaceAndServerPort) != 2 {
  139. err = errors.New("endpoint must is namespace.server:port")
  140. return
  141. }
  142. namespace = namespaceAndServerPort[0]
  143. serverAndPort := strings.Split(namespaceAndServerPort[1], ":")
  144. if len(serverAndPort) != 2 {
  145. err = errors.New("endpoint must is namespace.server:port")
  146. return
  147. }
  148. service = serverAndPort[0]
  149. port = serverAndPort[1]
  150. return
  151. }
  152. // isEqualIPs 判断两个地址列表是否相等
  153. func isEqualIPs(s1, s2 []string) bool {
  154. if len(s1) != len(s2) {
  155. return false
  156. }
  157. sort.Strings(s1)
  158. sort.Strings(s2)
  159. for i := range s1 {
  160. if s1[i] != s2[i] {
  161. return false
  162. }
  163. }
  164. return true
  165. }
  166. // getIPs 获取EndpointSlice里面的IP列表
  167. func getIPs(list *v1.EndpointSliceList) []string {
  168. ips := make([]string, 0, 10)
  169. for _, slice := range list.Items {
  170. for _, endpoints := range slice.Endpoints {
  171. for _, address := range endpoints.Addresses {
  172. ips = append(ips, address)
  173. }
  174. }
  175. }
  176. return ips
  177. }