Browse Source

update k8s_resolver.go

master
DengBiao 2 years ago
parent
commit
d9529a0a74
5 changed files with 359 additions and 79 deletions
  1. +44
    -0
      chart/templates/new_role/cluster_role.yaml
  2. +0
    -9
      chart/templates/role/cluster_role.yaml
  3. +56
    -70
      pkg/grpclib/resolver/k8s/k8s_resolver.go
  4. +203
    -0
      pkg/grpclib/resolver/k8s_new/k8s_resolver.go
  5. +56
    -0
      pkg/grpclib/resolver/k8s_new/k8s_resolver_test.go

+ 44
- 0
chart/templates/new_role/cluster_role.yaml View File

@@ -0,0 +1,44 @@
# 为pod中的服务赋予发现服务和读取配置的权限
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: pod-role
namespace: gim
rules:
- apiGroups:
- ""
resources:
- pods
- pods/status
- services
- services/status
- endpoints
- endpoints/status
- configmaps
- configmaps/status
verbs:
- get
- list
- watch
- apiGroups:
- "discovery.k8s.io"
resources:
- endpointslices
- endpointslices/status
verbs:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: argo-namespaces-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: pod-role
subjects:
- kind: ServiceAccount
name: default
namespace: gim

+ 0
- 9
chart/templates/role/cluster_role.yaml View File

@@ -20,15 +20,6 @@ rules:
- get
- list
- watch
- apiGroups:
- "discovery.k8s.io"
resources:
- endpointslices
- endpointslices/status
verbs:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding


+ 56
- 70
pkg/grpclib/resolver/k8s/k8s_resolver.go View File

@@ -6,16 +6,15 @@ import (
"fmt"
"gim/pkg/logger"
"go.uber.org/zap"
v1 "k8s.io/api/discovery/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sort"
"strings"
"time"

"google.golang.org/grpc/resolver"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
discoveryv1 "k8s.io/client-go/kubernetes/typed/discovery/v1"
"k8s.io/apimachinery/pkg/fields"
)

// 实现k8s地址解析,根据k8s的service的endpoints解析 比如,k8s:///namespace.server:port
@@ -43,18 +42,14 @@ func (b *k8sBuilder) Scheme() string {

// k8sResolver k8s地址解析器
type k8sResolver struct {
log *zap.Logger
clientConn resolver.ClientConn
discoveryClient discoveryv1.EndpointSliceInterface
service string

cancel context.CancelFunc

ips []string
port string
clientConn resolver.ClientConn
endpoint string
ips []string
port string
cancel context.CancelFunc
}

func GetK8sClient() (*kubernetes.Clientset, error) {
func getK8sClient() (*kubernetes.Clientset, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
@@ -67,71 +62,67 @@ func GetK8sClient() (*kubernetes.Clientset, error) {
}

func newK8sResolver(target resolver.Target, clientConn resolver.ClientConn) (*k8sResolver, error) {
log := logger.Logger.With(zap.String("target", target.Endpoint))
log.Info("k8s resolver build")
namespace, service, port, err := parseTarget(target)
namespace, service, port, err := resolveEndpoint(target.Endpoint)
if err != nil {
log.Error("k8s resolver error", zap.Error(err))
return nil, err
}

k8sClient, err := GetK8sClient()
k8sClient, err := getK8sClient()
if err != nil {
log.Error("k8s resolver error", zap.Error(err))
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
client := k8sClient.DiscoveryV1().EndpointSlices(namespace)
k8sResolver := &k8sResolver{
log: log,
clientConn: clientConn,
discoveryClient: client,
service: service,
cancel: cancel,
port: port,
}
err = k8sResolver.updateState(true)
if err != nil {
log.Error("k8s resolver error", zap.Error(err))
return nil, err
clientConn: clientConn,
endpoint: target.Endpoint,
port: port,
cancel: cancel,
}

ticker := time.NewTicker(time.Second)
// 监听变化
go func() {
events, err := k8sClient.CoreV1().Endpoints(namespace).Watch(context.TODO(), metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector("app", service).String(),
})
if err != nil {
logger.Logger.Error("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Error(err))
panic(err)
}

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_ = k8sResolver.updateState(false)
case event := <-events.ResultChan():
endpoints, ok := event.Object.(*v1.Endpoints)
if !ok {
logger.Logger.Warn("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Any("event", event))
continue
}
ips, err := getIpsFromEndpoint(endpoints)
if err != nil {
logger.Logger.Warn("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Error(err))
continue
}
k8sResolver.updateState(ips)
}
}
}()
return k8sResolver, nil
}

// ResolveNow grpc感知到连接异常,会做通知,观察日志得知
func (r *k8sResolver) ResolveNow(opt resolver.ResolveNowOptions) {
r.log.Info("k8s resolver resolveNow")
logger.Logger.Info("k8s resolver resolveNow", zap.String("endpoint", r.endpoint))
}

func (r *k8sResolver) Close() {
r.log.Info("k8s resolver close")
r.cancel()
}

// updateState 更新地址列表
func (r *k8sResolver) updateState(isFromNew bool) error {
list, err := r.discoveryClient.List(context.TODO(), metav1.ListOptions{LabelSelector: "app=" + r.service})
if err != nil {
r.log.Error("k8s resolver error", zap.Error(err))
return err
}
newIPs := getIPs(list)
if !isFromNew && isEqualIPs(r.ips, newIPs) {
return nil
func (r *k8sResolver) updateState(newIPs []string) {
if isEqualIps(r.ips, newIPs) {
return
}
r.ips = newIPs

@@ -144,18 +135,14 @@ func (r *k8sResolver) updateState(isFromNew bool) error {
state := resolver.State{
Addresses: addresses,
}
r.log.Info("k8s resolver updateState", zap.Bool("is_from_new", isFromNew), zap.Any("service", r.service), zap.Any("addresses", addresses))
err = r.clientConn.UpdateState(state)
if err != nil {
r.log.Error("k8s resolver error", zap.Error(err))
return err
}
return nil
logger.Logger.Info("k8s resolver updateState", zap.String("endpoint", r.endpoint), zap.Any("addresses", addresses))
r.clientConn.UpdateState(state)
return
}

// parseTarget 对grpc的Endpoint进行解析,格式必须是:k8s:///namespace.server:port
func parseTarget(target resolver.Target) (namespace string, service string, port string, err error) {
namespaceAndServerPort := strings.Split(target.Endpoint, ".")
// resolveEndpoint 对grpc的Endpoint进行解析,格式必须是:k8s:///namespace.server:port
func resolveEndpoint(endpoint string) (namespace string, service string, port string, err error) {
namespaceAndServerPort := strings.Split(endpoint, ".")
if len(namespaceAndServerPort) != 2 {
err = errors.New("endpoint must is namespace.server:port")
return
@@ -171,8 +158,8 @@ func parseTarget(target resolver.Target) (namespace string, service string, port
return
}

// isEqualIPs 判断两个地址列表是否相等
func isEqualIPs(s1, s2 []string) bool {
// isEqualIps 判断两个地址列表是否相等
func isEqualIps(s1, s2 []string) bool {
if len(s1) != len(s2) {
return false
}
@@ -187,17 +174,16 @@ func isEqualIPs(s1, s2 []string) bool {
return true
}

// getIPs 获取EndpointSlice里面的IP列表
func getIPs(list *v1.EndpointSliceList) []string {
ips := make([]string, 0, 10)

for _, slice := range list.Items {
for _, endpoints := range slice.Endpoints {
for _, address := range endpoints.Addresses {
ips = append(ips, address)
}
}
// getIpsFromEndpoint 获取endpoints里面的IP列表
func getIpsFromEndpoint(endpoints *v1.Endpoints) ([]string, error) {
if len(endpoints.Subsets) < 1 {
return nil, errors.New("subsets length less than 1")
}
endpointAddresses := endpoints.Subsets[0].Addresses

return ips
ips := make([]string, 0, len(endpoints.Subsets[0].Addresses))
for _, v := range endpointAddresses {
ips = append(ips, v.IP)
}
return ips, nil
}

+ 203
- 0
pkg/grpclib/resolver/k8s_new/k8s_resolver.go View File

@@ -0,0 +1,203 @@
package k8s

import (
"context"
"errors"
"fmt"
"gim/pkg/logger"
"go.uber.org/zap"
v1 "k8s.io/api/discovery/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sort"
"strings"
"time"

"google.golang.org/grpc/resolver"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
discoveryv1 "k8s.io/client-go/kubernetes/typed/discovery/v1"
)

// 实现k8s地址解析,根据k8s的service的endpoints解析 比如,k8s:///namespace.server:port
func init() {
resolver.Register(NewK8sBuilder())
}

func GetK8STarget(namespace, server, port string) string {
return fmt.Sprintf("k8s:///%s.%s:%s", namespace, server, port)
}

type k8sBuilder struct{}

func NewK8sBuilder() resolver.Builder {
return &k8sBuilder{}
}

func (b *k8sBuilder) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
return newK8sResolver(target, clientConn)
}

func (b *k8sBuilder) Scheme() string {
return "k8s"
}

// k8sResolver k8s地址解析器
type k8sResolver struct {
log *zap.Logger
clientConn resolver.ClientConn
discoveryClient discoveryv1.EndpointSliceInterface
service string

cancel context.CancelFunc

ips []string
port string
}

func GetK8sClient() (*kubernetes.Clientset, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
k8sClient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return k8sClient, nil
}

func newK8sResolver(target resolver.Target, clientConn resolver.ClientConn) (*k8sResolver, error) {
log := logger.Logger.With(zap.String("target", target.Endpoint))
log.Info("k8s resolver build")
namespace, service, port, err := parseTarget(target)
if err != nil {
log.Error("k8s resolver error", zap.Error(err))
return nil, err
}

k8sClient, err := GetK8sClient()
if err != nil {
log.Error("k8s resolver error", zap.Error(err))
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
client := k8sClient.DiscoveryV1().EndpointSlices(namespace)
k8sResolver := &k8sResolver{
log: log,
clientConn: clientConn,
discoveryClient: client,
service: service,
cancel: cancel,
port: port,
}
err = k8sResolver.updateState(true)
if err != nil {
log.Error("k8s resolver error", zap.Error(err))
return nil, err
}

ticker := time.NewTicker(time.Second)
// 监听变化
go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_ = k8sResolver.updateState(false)
}
}
}()
return k8sResolver, nil
}

// ResolveNow grpc感知到连接异常,会做通知,观察日志得知
func (r *k8sResolver) ResolveNow(opt resolver.ResolveNowOptions) {
r.log.Info("k8s resolver resolveNow")
}

func (r *k8sResolver) Close() {
r.log.Info("k8s resolver close")
r.cancel()
}

// updateState 更新地址列表
func (r *k8sResolver) updateState(isFromNew bool) error {
list, err := r.discoveryClient.List(context.TODO(), metav1.ListOptions{LabelSelector: "app=" + r.service})
if err != nil {
r.log.Error("k8s resolver error", zap.Error(err))
return err
}
newIPs := getIPs(list)
if !isFromNew && isEqualIPs(r.ips, newIPs) {
return nil
}
r.ips = newIPs

addresses := make([]resolver.Address, 0, len(r.ips))
for _, v := range r.ips {
addresses = append(addresses, resolver.Address{
Addr: v + ":" + r.port,
})
}
state := resolver.State{
Addresses: addresses,
}
r.log.Info("k8s resolver updateState", zap.Bool("is_from_new", isFromNew), zap.Any("service", r.service), zap.Any("addresses", addresses))
err = r.clientConn.UpdateState(state)
if err != nil {
r.log.Error("k8s resolver error", zap.Error(err))
return err
}
return nil
}

// parseTarget 对grpc的Endpoint进行解析,格式必须是:k8s:///namespace.server:port
func parseTarget(target resolver.Target) (namespace string, service string, port string, err error) {
namespaceAndServerPort := strings.Split(target.Endpoint, ".")
if len(namespaceAndServerPort) != 2 {
err = errors.New("endpoint must is namespace.server:port")
return
}
namespace = namespaceAndServerPort[0]
serverAndPort := strings.Split(namespaceAndServerPort[1], ":")
if len(serverAndPort) != 2 {
err = errors.New("endpoint must is namespace.server:port")
return
}
service = serverAndPort[0]
port = serverAndPort[1]
return
}

// isEqualIPs 判断两个地址列表是否相等
func isEqualIPs(s1, s2 []string) bool {
if len(s1) != len(s2) {
return false
}

sort.Strings(s1)
sort.Strings(s2)
for i := range s1 {
if s1[i] != s2[i] {
return false
}
}
return true
}

// getIPs 获取EndpointSlice里面的IP列表
func getIPs(list *v1.EndpointSliceList) []string {
ips := make([]string, 0, 10)

for _, slice := range list.Items {
for _, endpoints := range slice.Endpoints {
for _, address := range endpoints.Addresses {
ips = append(ips, address)
}
}
}

return ips
}

+ 56
- 0
pkg/grpclib/resolver/k8s_new/k8s_resolver_test.go View File

@@ -0,0 +1,56 @@
package k8s

import (
"context"
"fmt"
"testing"

"google.golang.org/grpc"
)

func Test_resolveEndpoint(t *testing.T) {
fmt.Println(resolveEndpoint("namespace.server_name:port"))
}

func TestClient(t *testing.T) {
_, err := grpc.DialContext(context.TODO(), "172.18.0.2:8000", grpc.WithInsecure())
if err != nil {
panic(err)
}
}

func Test_isEqual(t *testing.T) {
type args struct {
s1 []string
s2 []string
}
tests := []struct {
name string
args args
want bool
}{
{
name: "",
args: args{s1: []string{"1", "2"}, s2: []string{"2", "1"}},
want: true,
},
{
name: "",
args: args{s1: []string{"1", "2"}, s2: []string{"1", "2"}},
want: true,
},
{
name: "",
args: args{s1: []string{"1", "2"}, s2: []string{"1"}},
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := isEqualIps(tt.args.s1, tt.args.s2); got != tt.want {
t.Errorf("isEqual() = %v, want %v", got, tt.want)
}
})
}
}

Loading…
Cancel
Save