diff --git a/pkg/grpclib/resolver/k8s/k8s_resolver.go b/pkg/grpclib/resolver/k8s/k8s_resolver.go index e1aa767..597d085 100644 --- a/pkg/grpclib/resolver/k8s/k8s_resolver.go +++ b/pkg/grpclib/resolver/k8s/k8s_resolver.go @@ -10,16 +10,33 @@ import ( "k8s.io/client-go/rest" "sort" "strings" + "time" "google.golang.org/grpc/resolver" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" ) +var k8sClientSet *kubernetes.Clientset + +func GetK8sClient() (*kubernetes.Clientset, error) { + if k8sClientSet == nil { + config, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + k8sClientSet, err = kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + } + return k8sClientSet, nil +} + // 实现k8s地址解析,根据k8s的service的endpoints解析 比如,k8s:///namespace.server:port func init() { - resolver.Register(NewK8sBuilder()) + resolver.Register(&k8sBuilder{}) } func GetK8STarget(namespace, server, port string) string { @@ -28,10 +45,6 @@ func GetK8STarget(namespace, server, port string) string { type k8sBuilder struct{} -func NewK8sBuilder() resolver.Builder { - return &k8sBuilder{} -} - func (b *k8sBuilder) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { return newK8sResolver(target, clientConn) } @@ -42,107 +55,111 @@ func (b *k8sBuilder) Scheme() string { // k8sResolver k8s地址解析器 type k8sResolver struct { - clientConn resolver.ClientConn - endpoint string - ips []string - port string - cancel context.CancelFunc -} + log *zap.Logger + clientConn resolver.ClientConn + endpointsClient corev1.EndpointsInterface + service string -func getK8sClient() (*kubernetes.Clientset, error) { - config, err := rest.InClusterConfig() - if err != nil { - return nil, err - } - k8sClient, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - return k8sClient, nil + cancel context.CancelFunc + + ips []string + port string } func newK8sResolver(target resolver.Target, clientConn resolver.ClientConn) (*k8sResolver, error) { - namespace, service, port, err := resolveEndpoint(target.Endpoint) + log := logger.Logger.With(zap.String("target", target.Endpoint)) + log.Info("k8s resolver build") + namespace, service, port, err := parseTarget(target) if err != nil { + log.Error("k8s resolver error", zap.Error(err)) return nil, err } - k8sClient, err := getK8sClient() + k8sClient, err := GetK8sClient() if err != nil { + log.Error("k8s resolver error", zap.Error(err)) return nil, err } + ctx, cancel := context.WithCancel(context.Background()) + client := k8sClient.CoreV1().Endpoints(namespace) k8sResolver := &k8sResolver{ - clientConn: clientConn, - endpoint: target.Endpoint, - port: port, - cancel: cancel, + log: log, + clientConn: clientConn, + endpointsClient: client, + service: service, + cancel: cancel, + port: port, + } + err = k8sResolver.updateState(true) + if err != nil { + log.Error("k8s resolver error", zap.Error(err)) + return nil, err } + ticker := time.NewTicker(time.Second) // 监听变化 go func() { - events, err := k8sClient.CoreV1().Endpoints(namespace).Watch(context.TODO(), metav1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector("app", service).String(), - }) - if err != nil { - logger.Logger.Error("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Error(err)) - panic(err) - } - for { select { case <-ctx.Done(): return - case event := <-events.ResultChan(): - endpoints, ok := event.Object.(*v1.Endpoints) - if !ok { - logger.Logger.Warn("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Any("event", event)) - continue - } - ips, err := getIpsFromEndpoint(endpoints) - if err != nil { - logger.Logger.Warn("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Error(err)) - continue - } - k8sResolver.updateState(ips) + case <-ticker.C: + _ = k8sResolver.updateState(false) } } }() return k8sResolver, nil } +// ResolveNow grpc感知到连接异常,会做通知,观察日志得知 func (r *k8sResolver) ResolveNow(opt resolver.ResolveNowOptions) { - logger.Logger.Info("k8s resolver resolveNow", zap.String("endpoint", r.endpoint)) + r.log.Info("k8s resolver resolveNow") } func (r *k8sResolver) Close() { + r.log.Info("k8s resolver close") r.cancel() } // updateState 更新地址列表 -func (r *k8sResolver) updateState(newIPs []string) { - if isEqualIps(r.ips, newIPs) { - return +func (r *k8sResolver) updateState(isFromNew bool) error { + endpoints, err := r.endpointsClient.Get(context.TODO(), r.service, metav1.GetOptions{}) + if err != nil { + r.log.Error("k8s resolver error", zap.Error(err)) + return err + } + newIPs := getIPs(endpoints) + if len(newIPs) == 0 { + return nil + } + if !isFromNew && isEqualIPs(r.ips, newIPs) { + return nil } r.ips = newIPs addresses := make([]resolver.Address, 0, len(r.ips)) - for _, v := range r.ips { + for _, ip := range r.ips { addresses = append(addresses, resolver.Address{ - Addr: v + ":" + r.port, + Addr: ip + ":" + r.port, }) } state := resolver.State{ Addresses: addresses, } - logger.Logger.Info("k8s resolver updateState", zap.String("endpoint", r.endpoint), zap.Any("addresses", addresses)) - r.clientConn.UpdateState(state) - return + r.log.Info("k8s resolver updateState", zap.Bool("is_from_new", isFromNew), zap.Any("service", r.service), zap.Any("addresses", addresses)) + // 这里地址数量不能为0,为0会返回错误 + err = r.clientConn.UpdateState(state) + if err != nil { + r.log.Error("k8s resolver error", zap.Error(err)) + return err + } + return nil } -// resolveEndpoint 对grpc的Endpoint进行解析,格式必须是:k8s:///namespace.server:port -func resolveEndpoint(endpoint string) (namespace string, service string, port string, err error) { - namespaceAndServerPort := strings.Split(endpoint, ".") +// parseTarget 对grpc的Endpoint进行解析,格式必须是:k8s:///namespace.server:port +func parseTarget(target resolver.Target) (namespace string, service string, port string, err error) { + namespaceAndServerPort := strings.Split(target.Endpoint, ".") if len(namespaceAndServerPort) != 2 { err = errors.New("endpoint must is namespace.server:port") return @@ -158,8 +175,8 @@ func resolveEndpoint(endpoint string) (namespace string, service string, port st return } -// isEqualIps 判断两个地址列表是否相等 -func isEqualIps(s1, s2 []string) bool { +// isEqualIPs 判断两个地址列表是否相等 +func isEqualIPs(s1, s2 []string) bool { if len(s1) != len(s2) { return false } @@ -174,16 +191,15 @@ func isEqualIps(s1, s2 []string) bool { return true } -// getIpsFromEndpoint 获取endpoints里面的IP列表 -func getIpsFromEndpoint(endpoints *v1.Endpoints) ([]string, error) { - if len(endpoints.Subsets) < 1 { - return nil, errors.New("subsets length less than 1") +// getIPs 获取EndpointSlice里面的IP列表 +func getIPs(endpoints *v1.Endpoints) []string { + ips := make([]string, 0, 10) + if len(endpoints.Subsets) <= 0 { + return ips } - endpointAddresses := endpoints.Subsets[0].Addresses - ips := make([]string, 0, len(endpoints.Subsets[0].Addresses)) - for _, v := range endpointAddresses { - ips = append(ips, v.IP) + for _, address := range endpoints.Subsets[0].Addresses { + ips = append(ips, address.IP) } - return ips, nil + return ips }