diff --git a/pkg/grpclib/resolver/k8s/k8s_resolver.go b/pkg/grpclib/resolver/k8s/k8s_resolver.go index 5ece6e3..e1aa767 100644 --- a/pkg/grpclib/resolver/k8s/k8s_resolver.go +++ b/pkg/grpclib/resolver/k8s/k8s_resolver.go @@ -4,7 +4,10 @@ import ( "context" "errors" "fmt" - "log" + "gim/pkg/logger" + "go.uber.org/zap" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "sort" "strings" @@ -12,8 +15,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" ) // 实现k8s地址解析,根据k8s的service的endpoints解析 比如,k8s:///namespace.server:port @@ -32,26 +33,51 @@ func NewK8sBuilder() resolver.Builder { } func (b *k8sBuilder) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - namespace, service, port, err := resolveEndpoint(target.Endpoint) + return newK8sResolver(target, clientConn) +} + +func (b *k8sBuilder) Scheme() string { + return "k8s" +} + +// k8sResolver k8s地址解析器 +type k8sResolver struct { + clientConn resolver.ClientConn + endpoint string + ips []string + port string + cancel context.CancelFunc +} + +func getK8sClient() (*kubernetes.Clientset, error) { + config, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + k8sClient, err := kubernetes.NewForConfig(config) if err != nil { return nil, err } + return k8sClient, nil +} - k8sClient := getK8sClient() - endpoints, err := k8sClient.CoreV1().Endpoints(namespace).Get(context.TODO(), service, metav1.GetOptions{}) +func newK8sResolver(target resolver.Target, clientConn resolver.ClientConn) (*k8sResolver, error) { + namespace, service, port, err := resolveEndpoint(target.Endpoint) if err != nil { return nil, err } - ips, err := getIpsFromEndpoint(endpoints) + + k8sClient, err := getK8sClient() if err != nil { return nil, err } + ctx, cancel := context.WithCancel(context.Background()) k8sResolver := &k8sResolver{ - ips: ips, - port: port, clientConn: clientConn, + endpoint: target.Endpoint, + port: port, + cancel: cancel, } - k8sResolver.updateState(nil) // 监听变化 go func() { @@ -59,87 +85,46 @@ func (b *k8sBuilder) Build(target resolver.Target, clientConn resolver.ClientCon LabelSelector: fields.OneTermEqualSelector("app", service).String(), }) if err != nil { - log.Println(err) + logger.Logger.Error("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Error(err)) panic(err) } for { - event := <-events.ResultChan() - endpoints, ok := event.Object.(*v1.Endpoints) - if !ok { - //log.Println("event not is endpoints") - continue - } - ips, err := getIpsFromEndpoint(endpoints) - if err != nil { - log.Println(err) - continue + select { + case <-ctx.Done(): + return + case event := <-events.ResultChan(): + endpoints, ok := event.Object.(*v1.Endpoints) + if !ok { + logger.Logger.Warn("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Any("event", event)) + continue + } + ips, err := getIpsFromEndpoint(endpoints) + if err != nil { + logger.Logger.Warn("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Error(err)) + continue + } + k8sResolver.updateState(ips) } - k8sResolver.updateState(ips) } }() - return k8sResolver, nil } -func (b *k8sBuilder) Scheme() string { - return "k8s" -} - -// resolveEndpoint 对grpc的Endpoint进行解析,格式必须是:k8s:///namespace.server:port -func resolveEndpoint(endpoint string) (namespace string, service string, port string, err error) { - namespaceAndServerPort := strings.Split(endpoint, ".") - if len(namespaceAndServerPort) != 2 { - err = errors.New("endpoint must is namespace.server:port") - return - } - namespace = namespaceAndServerPort[0] - serverAndPort := strings.Split(namespaceAndServerPort[1], ":") - if len(serverAndPort) != 2 { - err = errors.New("endpoint must is namespace.server:port") - return - } - service = serverAndPort[0] - port = serverAndPort[1] - return -} - -func getK8sClient() *kubernetes.Clientset { - config, err := rest.InClusterConfig() - if err != nil { - log.Println(err) - panic(err) - } - k8sClient, err := kubernetes.NewForConfig(config) - if err != nil { - log.Println(err) - panic(err) - } - return k8sClient -} - -// k8sResolver k8s地址解析器 -type k8sResolver struct { - ips []string - port string - clientConn resolver.ClientConn -} - func (r *k8sResolver) ResolveNow(opt resolver.ResolveNowOptions) { - r.updateState(nil) + logger.Logger.Info("k8s resolver resolveNow", zap.String("endpoint", r.endpoint)) } -func (r *k8sResolver) Close() {} +func (r *k8sResolver) Close() { + r.cancel() +} // updateState 更新地址列表 func (r *k8sResolver) updateState(newIPs []string) { - if newIPs != nil { - if isEqual(r.ips, newIPs) { - return - } - - r.ips = newIPs + if isEqualIps(r.ips, newIPs) { + return } + r.ips = newIPs addresses := make([]resolver.Address, 0, len(r.ips)) for _, v := range r.ips { @@ -150,13 +135,31 @@ func (r *k8sResolver) updateState(newIPs []string) { state := resolver.State{ Addresses: addresses, } - log.Println("updateState", addresses) + logger.Logger.Info("k8s resolver updateState", zap.String("endpoint", r.endpoint), zap.Any("addresses", addresses)) r.clientConn.UpdateState(state) return } -// isEqual 判断两个地址列表是否相等 -func isEqual(s1, s2 []string) bool { +// resolveEndpoint 对grpc的Endpoint进行解析,格式必须是:k8s:///namespace.server:port +func resolveEndpoint(endpoint string) (namespace string, service string, port string, err error) { + namespaceAndServerPort := strings.Split(endpoint, ".") + if len(namespaceAndServerPort) != 2 { + err = errors.New("endpoint must is namespace.server:port") + return + } + namespace = namespaceAndServerPort[0] + serverAndPort := strings.Split(namespaceAndServerPort[1], ":") + if len(serverAndPort) != 2 { + err = errors.New("endpoint must is namespace.server:port") + return + } + service = serverAndPort[0] + port = serverAndPort[1] + return +} + +// isEqualIps 判断两个地址列表是否相等 +func isEqualIps(s1, s2 []string) bool { if len(s1) != len(s2) { return false }