Browse Source

update k8s_resolver.go

master
DengBiao 2 years ago
parent
commit
659d3798aa
1 changed files with 87 additions and 71 deletions
  1. +87
    -71
      pkg/grpclib/resolver/k8s/k8s_resolver.go

+ 87
- 71
pkg/grpclib/resolver/k8s/k8s_resolver.go View File

@@ -10,16 +10,33 @@ import (
"k8s.io/client-go/rest"
"sort"
"strings"
"time"

"google.golang.org/grpc/resolver"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
)

var k8sClientSet *kubernetes.Clientset

func GetK8sClient() (*kubernetes.Clientset, error) {
if k8sClientSet == nil {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
k8sClientSet, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
}
return k8sClientSet, nil
}

// 实现k8s地址解析,根据k8s的service的endpoints解析 比如,k8s:///namespace.server:port
func init() {
resolver.Register(NewK8sBuilder())
resolver.Register(&k8sBuilder{})
}

func GetK8STarget(namespace, server, port string) string {
@@ -28,10 +45,6 @@ func GetK8STarget(namespace, server, port string) string {

type k8sBuilder struct{}

func NewK8sBuilder() resolver.Builder {
return &k8sBuilder{}
}

func (b *k8sBuilder) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
return newK8sResolver(target, clientConn)
}
@@ -42,107 +55,111 @@ func (b *k8sBuilder) Scheme() string {

// k8sResolver k8s地址解析器
type k8sResolver struct {
clientConn resolver.ClientConn
endpoint string
ips []string
port string
cancel context.CancelFunc
}
log *zap.Logger
clientConn resolver.ClientConn
endpointsClient corev1.EndpointsInterface
service string

func getK8sClient() (*kubernetes.Clientset, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
k8sClient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return k8sClient, nil
cancel context.CancelFunc

ips []string
port string
}

func newK8sResolver(target resolver.Target, clientConn resolver.ClientConn) (*k8sResolver, error) {
namespace, service, port, err := resolveEndpoint(target.Endpoint)
log := logger.Logger.With(zap.String("target", target.Endpoint))
log.Info("k8s resolver build")
namespace, service, port, err := parseTarget(target)
if err != nil {
log.Error("k8s resolver error", zap.Error(err))
return nil, err
}

k8sClient, err := getK8sClient()
k8sClient, err := GetK8sClient()
if err != nil {
log.Error("k8s resolver error", zap.Error(err))
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
client := k8sClient.CoreV1().Endpoints(namespace)
k8sResolver := &k8sResolver{
clientConn: clientConn,
endpoint: target.Endpoint,
port: port,
cancel: cancel,
log: log,
clientConn: clientConn,
endpointsClient: client,
service: service,
cancel: cancel,
port: port,
}
err = k8sResolver.updateState(true)
if err != nil {
log.Error("k8s resolver error", zap.Error(err))
return nil, err
}

ticker := time.NewTicker(time.Second)
// 监听变化
go func() {
events, err := k8sClient.CoreV1().Endpoints(namespace).Watch(context.TODO(), metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector("app", service).String(),
})
if err != nil {
logger.Logger.Error("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Error(err))
panic(err)
}

for {
select {
case <-ctx.Done():
return
case event := <-events.ResultChan():
endpoints, ok := event.Object.(*v1.Endpoints)
if !ok {
logger.Logger.Warn("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Any("event", event))
continue
}
ips, err := getIpsFromEndpoint(endpoints)
if err != nil {
logger.Logger.Warn("k8s resolver error", zap.String("endpoint", target.Endpoint), zap.Error(err))
continue
}
k8sResolver.updateState(ips)
case <-ticker.C:
_ = k8sResolver.updateState(false)
}
}
}()
return k8sResolver, nil
}

// ResolveNow grpc感知到连接异常,会做通知,观察日志得知
func (r *k8sResolver) ResolveNow(opt resolver.ResolveNowOptions) {
logger.Logger.Info("k8s resolver resolveNow", zap.String("endpoint", r.endpoint))
r.log.Info("k8s resolver resolveNow")
}

func (r *k8sResolver) Close() {
r.log.Info("k8s resolver close")
r.cancel()
}

// updateState 更新地址列表
func (r *k8sResolver) updateState(newIPs []string) {
if isEqualIps(r.ips, newIPs) {
return
func (r *k8sResolver) updateState(isFromNew bool) error {
endpoints, err := r.endpointsClient.Get(context.TODO(), r.service, metav1.GetOptions{})
if err != nil {
r.log.Error("k8s resolver error", zap.Error(err))
return err
}
newIPs := getIPs(endpoints)
if len(newIPs) == 0 {
return nil
}
if !isFromNew && isEqualIPs(r.ips, newIPs) {
return nil
}
r.ips = newIPs

addresses := make([]resolver.Address, 0, len(r.ips))
for _, v := range r.ips {
for _, ip := range r.ips {
addresses = append(addresses, resolver.Address{
Addr: v + ":" + r.port,
Addr: ip + ":" + r.port,
})
}
state := resolver.State{
Addresses: addresses,
}
logger.Logger.Info("k8s resolver updateState", zap.String("endpoint", r.endpoint), zap.Any("addresses", addresses))
r.clientConn.UpdateState(state)
return
r.log.Info("k8s resolver updateState", zap.Bool("is_from_new", isFromNew), zap.Any("service", r.service), zap.Any("addresses", addresses))
// 这里地址数量不能为0,为0会返回错误
err = r.clientConn.UpdateState(state)
if err != nil {
r.log.Error("k8s resolver error", zap.Error(err))
return err
}
return nil
}

// resolveEndpoint 对grpc的Endpoint进行解析,格式必须是:k8s:///namespace.server:port
func resolveEndpoint(endpoint string) (namespace string, service string, port string, err error) {
namespaceAndServerPort := strings.Split(endpoint, ".")
// parseTarget 对grpc的Endpoint进行解析,格式必须是:k8s:///namespace.server:port
func parseTarget(target resolver.Target) (namespace string, service string, port string, err error) {
namespaceAndServerPort := strings.Split(target.Endpoint, ".")
if len(namespaceAndServerPort) != 2 {
err = errors.New("endpoint must is namespace.server:port")
return
@@ -158,8 +175,8 @@ func resolveEndpoint(endpoint string) (namespace string, service string, port st
return
}

// isEqualIps 判断两个地址列表是否相等
func isEqualIps(s1, s2 []string) bool {
// isEqualIPs 判断两个地址列表是否相等
func isEqualIPs(s1, s2 []string) bool {
if len(s1) != len(s2) {
return false
}
@@ -174,16 +191,15 @@ func isEqualIps(s1, s2 []string) bool {
return true
}

// getIpsFromEndpoint 获取endpoints里面的IP列表
func getIpsFromEndpoint(endpoints *v1.Endpoints) ([]string, error) {
if len(endpoints.Subsets) < 1 {
return nil, errors.New("subsets length less than 1")
// getIPs 获取EndpointSlice里面的IP列表
func getIPs(endpoints *v1.Endpoints) []string {
ips := make([]string, 0, 10)
if len(endpoints.Subsets) <= 0 {
return ips
}
endpointAddresses := endpoints.Subsets[0].Addresses

ips := make([]string, 0, len(endpoints.Subsets[0].Addresses))
for _, v := range endpointAddresses {
ips = append(ips, v.IP)
for _, address := range endpoints.Subsets[0].Addresses {
ips = append(ips, address.IP)
}
return ips, nil
return ips
}

Loading…
Cancel
Save