Sfoglia il codice sorgente

update k8s_resolver.go

master
DengBiao 1 anno fa
parent
commit
bd57af2ec9
1 ha cambiato i file con 80 aggiunte e 77 eliminazioni
  1. +80
    -77
      pkg/grpclib/resolver/k8s/k8s_resolver.go

+ 80
- 77
pkg/grpclib/resolver/k8s/k8s_resolver.go Vedi File

@@ -4,7 +4,10 @@ import (
"context"
"errors"
"fmt"
"log"
"gim/pkg/logger"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sort"
"strings"

@@ -12,8 +15,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

// 实现k8s地址解析,根据k8s的service的endpoints解析 比如,k8s:///namespace.server:port
@@ -32,26 +33,51 @@ func NewK8sBuilder() resolver.Builder {
}

func (b *k8sBuilder) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
namespace, service, port, err := resolveEndpoint(target.Endpoint)
return newK8sResolver(target, clientConn)
}

func (b *k8sBuilder) Scheme() string {
return "k8s"
}

// k8sResolver k8s地址解析器
type k8sResolver struct {
clientConn resolver.ClientConn
endpoint string
ips []string
port string
cancel context.CancelFunc
}

func getK8sClient() (*kubernetes.Clientset, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
k8sClient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return k8sClient, nil
}

k8sClient := getK8sClient()
endpoints, err := k8sClient.CoreV1().Endpoints(namespace).Get(context.TODO(), service, metav1.GetOptions{})
func newK8sResolver(target resolver.Target, clientConn resolver.ClientConn) (*k8sResolver, error) {
namespace, service, port, err := resolveEndpoint(target.Endpoint)
if err != nil {
return nil, err
}
ips, err := getIpsFromEndpoint(endpoints)

k8sClient, err := getK8sClient()
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
k8sResolver := &k8sResolver{
ips: ips,
port: port,
clientConn: clientConn,
endpoint: target.Endpoint,
port: port,
cancel: cancel,
}
k8sResolver.updateState(nil)

// 监听变化
go func() {
@@ -59,87 +85,46 @@ func (b *k8sBuilder) Build(target resolver.Target, clientConn resolver.ClientCon
LabelSelector: fields.OneTermEqualSelector("app", service).String(),
})
if err != nil {
log.Println(err)
logger.Logger.Error("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Error(err))
panic(err)
}

for {
event := <-events.ResultChan()
endpoints, ok := event.Object.(*v1.Endpoints)
if !ok {
//log.Println("event not is endpoints")
continue
}
ips, err := getIpsFromEndpoint(endpoints)
if err != nil {
log.Println(err)
continue
select {
case <-ctx.Done():
return
case event := <-events.ResultChan():
endpoints, ok := event.Object.(*v1.Endpoints)
if !ok {
logger.Logger.Warn("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Any("event", event))
continue
}
ips, err := getIpsFromEndpoint(endpoints)
if err != nil {
logger.Logger.Warn("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Error(err))
continue
}
k8sResolver.updateState(ips)
}
k8sResolver.updateState(ips)
}
}()

return k8sResolver, nil
}

func (b *k8sBuilder) Scheme() string {
return "k8s"
}

// resolveEndpoint 对grpc的Endpoint进行解析,格式必须是:k8s:///namespace.server:port
func resolveEndpoint(endpoint string) (namespace string, service string, port string, err error) {
namespaceAndServerPort := strings.Split(endpoint, ".")
if len(namespaceAndServerPort) != 2 {
err = errors.New("endpoint must is namespace.server:port")
return
}
namespace = namespaceAndServerPort[0]
serverAndPort := strings.Split(namespaceAndServerPort[1], ":")
if len(serverAndPort) != 2 {
err = errors.New("endpoint must is namespace.server:port")
return
}
service = serverAndPort[0]
port = serverAndPort[1]
return
}

func getK8sClient() *kubernetes.Clientset {
config, err := rest.InClusterConfig()
if err != nil {
log.Println(err)
panic(err)
}
k8sClient, err := kubernetes.NewForConfig(config)
if err != nil {
log.Println(err)
panic(err)
}
return k8sClient
}

// k8sResolver k8s地址解析器
type k8sResolver struct {
ips []string
port string
clientConn resolver.ClientConn
}

func (r *k8sResolver) ResolveNow(opt resolver.ResolveNowOptions) {
r.updateState(nil)
logger.Logger.Info("k8s resolver resolveNow", zap.String("endpoint", r.endpoint))
}

func (r *k8sResolver) Close() {}
func (r *k8sResolver) Close() {
r.cancel()
}

// updateState 更新地址列表
func (r *k8sResolver) updateState(newIPs []string) {
if newIPs != nil {
if isEqual(r.ips, newIPs) {
return
}

r.ips = newIPs
if isEqualIps(r.ips, newIPs) {
return
}
r.ips = newIPs

addresses := make([]resolver.Address, 0, len(r.ips))
for _, v := range r.ips {
@@ -150,13 +135,31 @@ func (r *k8sResolver) updateState(newIPs []string) {
state := resolver.State{
Addresses: addresses,
}
log.Println("updateState", addresses)
logger.Logger.Info("k8s resolver updateState", zap.String("endpoint", r.endpoint), zap.Any("addresses", addresses))
r.clientConn.UpdateState(state)
return
}

// isEqual 判断两个地址列表是否相等
func isEqual(s1, s2 []string) bool {
// resolveEndpoint 对grpc的Endpoint进行解析,格式必须是:k8s:///namespace.server:port
func resolveEndpoint(endpoint string) (namespace string, service string, port string, err error) {
namespaceAndServerPort := strings.Split(endpoint, ".")
if len(namespaceAndServerPort) != 2 {
err = errors.New("endpoint must is namespace.server:port")
return
}
namespace = namespaceAndServerPort[0]
serverAndPort := strings.Split(namespaceAndServerPort[1], ":")
if len(serverAndPort) != 2 {
err = errors.New("endpoint must is namespace.server:port")
return
}
service = serverAndPort[0]
port = serverAndPort[1]
return
}

// isEqualIps 判断两个地址列表是否相等
func isEqualIps(s1, s2 []string) bool {
if len(s1) != len(s2) {
return false
}


Caricamento…
Annulla
Salva