From d9529a0a74d9bcd525b2c1a6ed85e68fde6f5bee Mon Sep 17 00:00:00 2001 From: DengBiao <2319963317@qq.com> Date: Fri, 9 Dec 2022 14:59:36 +0800 Subject: [PATCH] update k8s_resolver.go --- chart/templates/new_role/cluster_role.yaml | 44 ++++ chart/templates/role/cluster_role.yaml | 9 - pkg/grpclib/resolver/k8s/k8s_resolver.go | 126 +++++------ pkg/grpclib/resolver/k8s_new/k8s_resolver.go | 203 ++++++++++++++++++ .../resolver/k8s_new/k8s_resolver_test.go | 56 +++++ 5 files changed, 359 insertions(+), 79 deletions(-) create mode 100644 chart/templates/new_role/cluster_role.yaml create mode 100644 pkg/grpclib/resolver/k8s_new/k8s_resolver.go create mode 100644 pkg/grpclib/resolver/k8s_new/k8s_resolver_test.go diff --git a/chart/templates/new_role/cluster_role.yaml b/chart/templates/new_role/cluster_role.yaml new file mode 100644 index 0000000..1c2a2df --- /dev/null +++ b/chart/templates/new_role/cluster_role.yaml @@ -0,0 +1,44 @@ +# 为pod中的服务赋予发现服务和读取配置的权限 +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: pod-role + namespace: gim +rules: + - apiGroups: + - "" + resources: + - pods + - pods/status + - services + - services/status + - endpoints + - endpoints/status + - configmaps + - configmaps/status + verbs: + - get + - list + - watch + - apiGroups: + - "discovery.k8s.io" + resources: + - endpointslices + - endpointslices/status + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: argo-namespaces-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: pod-role +subjects: + - kind: ServiceAccount + name: default + namespace: gim diff --git a/chart/templates/role/cluster_role.yaml b/chart/templates/role/cluster_role.yaml index 1c2a2df..844f704 100644 --- a/chart/templates/role/cluster_role.yaml +++ b/chart/templates/role/cluster_role.yaml @@ -20,15 +20,6 @@ rules: - get - list - watch - - apiGroups: - - "discovery.k8s.io" - resources: - - endpointslices - - endpointslices/status - verbs: - - get - - list - - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/pkg/grpclib/resolver/k8s/k8s_resolver.go b/pkg/grpclib/resolver/k8s/k8s_resolver.go index a9b01df..e1aa767 100644 --- a/pkg/grpclib/resolver/k8s/k8s_resolver.go +++ b/pkg/grpclib/resolver/k8s/k8s_resolver.go @@ -6,16 +6,15 @@ import ( "fmt" "gim/pkg/logger" "go.uber.org/zap" - v1 "k8s.io/api/discovery/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "sort" "strings" - "time" "google.golang.org/grpc/resolver" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - discoveryv1 "k8s.io/client-go/kubernetes/typed/discovery/v1" + "k8s.io/apimachinery/pkg/fields" ) // 实现k8s地址解析,根据k8s的service的endpoints解析 比如,k8s:///namespace.server:port @@ -43,18 +42,14 @@ func (b *k8sBuilder) Scheme() string { // k8sResolver k8s地址解析器 type k8sResolver struct { - log *zap.Logger - clientConn resolver.ClientConn - discoveryClient discoveryv1.EndpointSliceInterface - service string - - cancel context.CancelFunc - - ips []string - port string + clientConn resolver.ClientConn + endpoint string + ips []string + port string + cancel context.CancelFunc } -func GetK8sClient() (*kubernetes.Clientset, error) { +func getK8sClient() (*kubernetes.Clientset, error) { config, err := rest.InClusterConfig() if err != nil { return nil, err @@ -67,71 +62,67 @@ func GetK8sClient() (*kubernetes.Clientset, error) { } func newK8sResolver(target resolver.Target, clientConn resolver.ClientConn) (*k8sResolver, error) { - log := logger.Logger.With(zap.String("target", target.Endpoint)) - log.Info("k8s resolver build") - namespace, service, port, err := parseTarget(target) + namespace, service, port, err := resolveEndpoint(target.Endpoint) if err != nil { - log.Error("k8s resolver error", zap.Error(err)) return nil, err } - k8sClient, err := GetK8sClient() + k8sClient, err := getK8sClient() if err != nil { - log.Error("k8s resolver error", zap.Error(err)) return nil, err } - ctx, cancel := context.WithCancel(context.Background()) - client := k8sClient.DiscoveryV1().EndpointSlices(namespace) k8sResolver := &k8sResolver{ - log: log, - clientConn: clientConn, - discoveryClient: client, - service: service, - cancel: cancel, - port: port, - } - err = k8sResolver.updateState(true) - if err != nil { - log.Error("k8s resolver error", zap.Error(err)) - return nil, err + clientConn: clientConn, + endpoint: target.Endpoint, + port: port, + cancel: cancel, } - ticker := time.NewTicker(time.Second) // 监听变化 go func() { + events, err := k8sClient.CoreV1().Endpoints(namespace).Watch(context.TODO(), metav1.ListOptions{ + LabelSelector: fields.OneTermEqualSelector("app", service).String(), + }) + if err != nil { + logger.Logger.Error("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Error(err)) + panic(err) + } + for { select { case <-ctx.Done(): return - case <-ticker.C: - _ = k8sResolver.updateState(false) + case event := <-events.ResultChan(): + endpoints, ok := event.Object.(*v1.Endpoints) + if !ok { + logger.Logger.Warn("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Any("event", event)) + continue + } + ips, err := getIpsFromEndpoint(endpoints) + if err != nil { + logger.Logger.Warn("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Error(err)) + continue + } + k8sResolver.updateState(ips) } } }() return k8sResolver, nil } -// ResolveNow grpc感知到连接异常,会做通知,观察日志得知 func (r *k8sResolver) ResolveNow(opt resolver.ResolveNowOptions) { - r.log.Info("k8s resolver resolveNow") + logger.Logger.Info("k8s resolver resolveNow", zap.String("endpoint", r.endpoint)) } func (r *k8sResolver) Close() { - r.log.Info("k8s resolver close") r.cancel() } // updateState 更新地址列表 -func (r *k8sResolver) updateState(isFromNew bool) error { - list, err := r.discoveryClient.List(context.TODO(), metav1.ListOptions{LabelSelector: "app=" + r.service}) - if err != nil { - r.log.Error("k8s resolver error", zap.Error(err)) - return err - } - newIPs := getIPs(list) - if !isFromNew && isEqualIPs(r.ips, newIPs) { - return nil +func (r *k8sResolver) updateState(newIPs []string) { + if isEqualIps(r.ips, newIPs) { + return } r.ips = newIPs @@ -144,18 +135,14 @@ func (r *k8sResolver) updateState(isFromNew bool) error { state := resolver.State{ Addresses: addresses, } - r.log.Info("k8s resolver updateState", zap.Bool("is_from_new", isFromNew), zap.Any("service", r.service), zap.Any("addresses", addresses)) - err = r.clientConn.UpdateState(state) - if err != nil { - r.log.Error("k8s resolver error", zap.Error(err)) - return err - } - return nil + logger.Logger.Info("k8s resolver updateState", zap.String("endpoint", r.endpoint), zap.Any("addresses", addresses)) + r.clientConn.UpdateState(state) + return } -// parseTarget 对grpc的Endpoint进行解析,格式必须是:k8s:///namespace.server:port -func parseTarget(target resolver.Target) (namespace string, service string, port string, err error) { - namespaceAndServerPort := strings.Split(target.Endpoint, ".") +// resolveEndpoint 对grpc的Endpoint进行解析,格式必须是:k8s:///namespace.server:port +func resolveEndpoint(endpoint string) (namespace string, service string, port string, err error) { + namespaceAndServerPort := strings.Split(endpoint, ".") if len(namespaceAndServerPort) != 2 { err = errors.New("endpoint must is namespace.server:port") return @@ -171,8 +158,8 @@ func parseTarget(target resolver.Target) (namespace string, service string, port return } -// isEqualIPs 判断两个地址列表是否相等 -func isEqualIPs(s1, s2 []string) bool { +// isEqualIps 判断两个地址列表是否相等 +func isEqualIps(s1, s2 []string) bool { if len(s1) != len(s2) { return false } @@ -187,17 +174,16 @@ func isEqualIPs(s1, s2 []string) bool { return true } -// getIPs 获取EndpointSlice里面的IP列表 -func getIPs(list *v1.EndpointSliceList) []string { - ips := make([]string, 0, 10) - - for _, slice := range list.Items { - for _, endpoints := range slice.Endpoints { - for _, address := range endpoints.Addresses { - ips = append(ips, address) - } - } +// getIpsFromEndpoint 获取endpoints里面的IP列表 +func getIpsFromEndpoint(endpoints *v1.Endpoints) ([]string, error) { + if len(endpoints.Subsets) < 1 { + return nil, errors.New("subsets length less than 1") } + endpointAddresses := endpoints.Subsets[0].Addresses - return ips + ips := make([]string, 0, len(endpoints.Subsets[0].Addresses)) + for _, v := range endpointAddresses { + ips = append(ips, v.IP) + } + return ips, nil } diff --git a/pkg/grpclib/resolver/k8s_new/k8s_resolver.go b/pkg/grpclib/resolver/k8s_new/k8s_resolver.go new file mode 100644 index 0000000..a9b01df --- /dev/null +++ b/pkg/grpclib/resolver/k8s_new/k8s_resolver.go @@ -0,0 +1,203 @@ +package k8s + +import ( + "context" + "errors" + "fmt" + "gim/pkg/logger" + "go.uber.org/zap" + v1 "k8s.io/api/discovery/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "sort" + "strings" + "time" + + "google.golang.org/grpc/resolver" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + discoveryv1 "k8s.io/client-go/kubernetes/typed/discovery/v1" +) + +// 实现k8s地址解析,根据k8s的service的endpoints解析 比如,k8s:///namespace.server:port +func init() { + resolver.Register(NewK8sBuilder()) +} + +func GetK8STarget(namespace, server, port string) string { + return fmt.Sprintf("k8s:///%s.%s:%s", namespace, server, port) +} + +type k8sBuilder struct{} + +func NewK8sBuilder() resolver.Builder { + return &k8sBuilder{} +} + +func (b *k8sBuilder) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + return newK8sResolver(target, clientConn) +} + +func (b *k8sBuilder) Scheme() string { + return "k8s" +} + +// k8sResolver k8s地址解析器 +type k8sResolver struct { + log *zap.Logger + clientConn resolver.ClientConn + discoveryClient discoveryv1.EndpointSliceInterface + service string + + cancel context.CancelFunc + + ips []string + port string +} + +func GetK8sClient() (*kubernetes.Clientset, error) { + config, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + k8sClient, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + return k8sClient, nil +} + +func newK8sResolver(target resolver.Target, clientConn resolver.ClientConn) (*k8sResolver, error) { + log := logger.Logger.With(zap.String("target", target.Endpoint)) + log.Info("k8s resolver build") + namespace, service, port, err := parseTarget(target) + if err != nil { + log.Error("k8s resolver error", zap.Error(err)) + return nil, err + } + + k8sClient, err := GetK8sClient() + if err != nil { + log.Error("k8s resolver error", zap.Error(err)) + return nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + client := k8sClient.DiscoveryV1().EndpointSlices(namespace) + k8sResolver := &k8sResolver{ + log: log, + clientConn: clientConn, + discoveryClient: client, + service: service, + cancel: cancel, + port: port, + } + err = k8sResolver.updateState(true) + if err != nil { + log.Error("k8s resolver error", zap.Error(err)) + return nil, err + } + + ticker := time.NewTicker(time.Second) + // 监听变化 + go func() { + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + _ = k8sResolver.updateState(false) + } + } + }() + return k8sResolver, nil +} + +// ResolveNow grpc感知到连接异常,会做通知,观察日志得知 +func (r *k8sResolver) ResolveNow(opt resolver.ResolveNowOptions) { + r.log.Info("k8s resolver resolveNow") +} + +func (r *k8sResolver) Close() { + r.log.Info("k8s resolver close") + r.cancel() +} + +// updateState 更新地址列表 +func (r *k8sResolver) updateState(isFromNew bool) error { + list, err := r.discoveryClient.List(context.TODO(), metav1.ListOptions{LabelSelector: "app=" + r.service}) + if err != nil { + r.log.Error("k8s resolver error", zap.Error(err)) + return err + } + newIPs := getIPs(list) + if !isFromNew && isEqualIPs(r.ips, newIPs) { + return nil + } + r.ips = newIPs + + addresses := make([]resolver.Address, 0, len(r.ips)) + for _, v := range r.ips { + addresses = append(addresses, resolver.Address{ + Addr: v + ":" + r.port, + }) + } + state := resolver.State{ + Addresses: addresses, + } + r.log.Info("k8s resolver updateState", zap.Bool("is_from_new", isFromNew), zap.Any("service", r.service), zap.Any("addresses", addresses)) + err = r.clientConn.UpdateState(state) + if err != nil { + r.log.Error("k8s resolver error", zap.Error(err)) + return err + } + return nil +} + +// parseTarget 对grpc的Endpoint进行解析,格式必须是:k8s:///namespace.server:port +func parseTarget(target resolver.Target) (namespace string, service string, port string, err error) { + namespaceAndServerPort := strings.Split(target.Endpoint, ".") + if len(namespaceAndServerPort) != 2 { + err = errors.New("endpoint must is namespace.server:port") + return + } + namespace = namespaceAndServerPort[0] + serverAndPort := strings.Split(namespaceAndServerPort[1], ":") + if len(serverAndPort) != 2 { + err = errors.New("endpoint must is namespace.server:port") + return + } + service = serverAndPort[0] + port = serverAndPort[1] + return +} + +// isEqualIPs 判断两个地址列表是否相等 +func isEqualIPs(s1, s2 []string) bool { + if len(s1) != len(s2) { + return false + } + + sort.Strings(s1) + sort.Strings(s2) + for i := range s1 { + if s1[i] != s2[i] { + return false + } + } + return true +} + +// getIPs 获取EndpointSlice里面的IP列表 +func getIPs(list *v1.EndpointSliceList) []string { + ips := make([]string, 0, 10) + + for _, slice := range list.Items { + for _, endpoints := range slice.Endpoints { + for _, address := range endpoints.Addresses { + ips = append(ips, address) + } + } + } + + return ips +} diff --git a/pkg/grpclib/resolver/k8s_new/k8s_resolver_test.go b/pkg/grpclib/resolver/k8s_new/k8s_resolver_test.go new file mode 100644 index 0000000..ea2ae28 --- /dev/null +++ b/pkg/grpclib/resolver/k8s_new/k8s_resolver_test.go @@ -0,0 +1,56 @@ +package k8s + +import ( + "context" + "fmt" + "testing" + + "google.golang.org/grpc" +) + +func Test_resolveEndpoint(t *testing.T) { + fmt.Println(resolveEndpoint("namespace.server_name:port")) +} + +func TestClient(t *testing.T) { + _, err := grpc.DialContext(context.TODO(), "172.18.0.2:8000", grpc.WithInsecure()) + if err != nil { + panic(err) + } +} + +func Test_isEqual(t *testing.T) { + type args struct { + s1 []string + s2 []string + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "", + args: args{s1: []string{"1", "2"}, s2: []string{"2", "1"}}, + want: true, + }, + { + name: "", + args: args{s1: []string{"1", "2"}, s2: []string{"1", "2"}}, + want: true, + }, + { + name: "", + args: args{s1: []string{"1", "2"}, s2: []string{"1"}}, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isEqualIps(tt.args.s1, tt.args.s2); got != tt.want { + t.Errorf("isEqual() = %v, want %v", got, tt.want) + } + }) + } +}