golang-im聊天
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

187 rivejä
4.1 KiB

  1. package k8s
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "sort"
  8. "strings"
  9. "google.golang.org/grpc/resolver"
  10. v1 "k8s.io/api/core/v1"
  11. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  12. "k8s.io/apimachinery/pkg/fields"
  13. "k8s.io/client-go/kubernetes"
  14. "k8s.io/client-go/rest"
  15. )
  16. // 实现k8s地址解析,根据k8s的service的endpoints解析 比如,k8s:///namespace.server:port
  17. func init() {
  18. resolver.Register(NewK8sBuilder())
  19. }
  20. func GetK8STarget(namespace, server, port string) string {
  21. return fmt.Sprintf("k8s:///%s.%s:%s", namespace, server, port)
  22. }
  23. type k8sBuilder struct{}
  24. func NewK8sBuilder() resolver.Builder {
  25. return &k8sBuilder{}
  26. }
  27. func (b *k8sBuilder) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  28. namespace, service, port, err := resolveEndpoint(target.Endpoint)
  29. if err != nil {
  30. return nil, err
  31. }
  32. k8sClient := getK8sClient()
  33. endpoints, err := k8sClient.CoreV1().Endpoints(namespace).Get(context.TODO(), service, metav1.GetOptions{})
  34. if err != nil {
  35. return nil, err
  36. }
  37. ips, err := getIpsFromEndpoint(endpoints)
  38. if err != nil {
  39. return nil, err
  40. }
  41. k8sResolver := &k8sResolver{
  42. ips: ips,
  43. port: port,
  44. clientConn: clientConn,
  45. }
  46. k8sResolver.updateState(nil)
  47. // 监听变化
  48. go func() {
  49. events, err := k8sClient.CoreV1().Endpoints(namespace).Watch(context.TODO(), metav1.ListOptions{
  50. LabelSelector: fields.OneTermEqualSelector("app", service).String(),
  51. })
  52. if err != nil {
  53. log.Println(err)
  54. panic(err)
  55. }
  56. for {
  57. event := <-events.ResultChan()
  58. endpoints, ok := event.Object.(*v1.Endpoints)
  59. if !ok {
  60. //log.Println("event not is endpoints")
  61. continue
  62. }
  63. ips, err := getIpsFromEndpoint(endpoints)
  64. if err != nil {
  65. log.Println(err)
  66. continue
  67. }
  68. k8sResolver.updateState(ips)
  69. }
  70. }()
  71. return k8sResolver, nil
  72. }
  73. func (b *k8sBuilder) Scheme() string {
  74. return "k8s"
  75. }
  76. // resolveEndpoint 对grpc的Endpoint进行解析,格式必须是:k8s:///namespace.server:port
  77. func resolveEndpoint(endpoint string) (namespace string, service string, port string, err error) {
  78. namespaceAndServerPort := strings.Split(endpoint, ".")
  79. if len(namespaceAndServerPort) != 2 {
  80. err = errors.New("endpoint must is namespace.server:port")
  81. return
  82. }
  83. namespace = namespaceAndServerPort[0]
  84. serverAndPort := strings.Split(namespaceAndServerPort[1], ":")
  85. if len(serverAndPort) != 2 {
  86. err = errors.New("endpoint must is namespace.server:port")
  87. return
  88. }
  89. service = serverAndPort[0]
  90. port = serverAndPort[1]
  91. return
  92. }
  93. func getK8sClient() *kubernetes.Clientset {
  94. config, err := rest.InClusterConfig()
  95. if err != nil {
  96. log.Println(err)
  97. panic(err)
  98. }
  99. k8sClient, err := kubernetes.NewForConfig(config)
  100. if err != nil {
  101. log.Println(err)
  102. panic(err)
  103. }
  104. return k8sClient
  105. }
  106. // k8sResolver k8s地址解析器
  107. type k8sResolver struct {
  108. ips []string
  109. port string
  110. clientConn resolver.ClientConn
  111. }
  112. func (r *k8sResolver) ResolveNow(opt resolver.ResolveNowOptions) {
  113. r.updateState(nil)
  114. }
  115. func (r *k8sResolver) Close() {}
  116. // updateState 更新地址列表
  117. func (r *k8sResolver) updateState(newIPs []string) {
  118. if newIPs != nil {
  119. if isEqual(r.ips, newIPs) {
  120. return
  121. }
  122. r.ips = newIPs
  123. }
  124. addresses := make([]resolver.Address, 0, len(r.ips))
  125. for _, v := range r.ips {
  126. addresses = append(addresses, resolver.Address{
  127. Addr: v + ":" + r.port,
  128. })
  129. }
  130. state := resolver.State{
  131. Addresses: addresses,
  132. }
  133. log.Println("updateState", addresses)
  134. r.clientConn.UpdateState(state)
  135. return
  136. }
  137. // isEqual 判断两个地址列表是否相等
  138. func isEqual(s1, s2 []string) bool {
  139. if len(s1) != len(s2) {
  140. return false
  141. }
  142. sort.Strings(s1)
  143. sort.Strings(s2)
  144. for i := range s1 {
  145. if s1[i] != s2[i] {
  146. return false
  147. }
  148. }
  149. return true
  150. }
  151. // getIpsFromEndpoint 获取endpoints里面的IP列表
  152. func getIpsFromEndpoint(endpoints *v1.Endpoints) ([]string, error) {
  153. if len(endpoints.Subsets) < 1 {
  154. return nil, errors.New("subsets length less than 1")
  155. }
  156. endpointAddresses := endpoints.Subsets[0].Addresses
  157. ips := make([]string, 0, len(endpoints.Subsets[0].Addresses))
  158. for _, v := range endpointAddresses {
  159. ips = append(ips, v.IP)
  160. }
  161. return ips, nil
  162. }