wip: 完成 client api 分析
This commit is contained in:
89
internal/invoke/client.go
Normal file
89
internal/invoke/client.go
Normal file
@ -0,0 +1,89 @@
|
||||
package invoke
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"esway/internal/tool"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/resolver"
|
||||
)
|
||||
|
||||
const (
|
||||
SCHEME = "sonar"
|
||||
)
|
||||
|
||||
type Client[T any] struct {
|
||||
domain string
|
||||
endpoints []string
|
||||
fn func(grpc.ClientConnInterface) T
|
||||
opts []grpc.DialOption
|
||||
|
||||
cc *grpc.ClientConn
|
||||
}
|
||||
|
||||
func (c *Client[T]) Session() T {
|
||||
return c.fn(c.cc)
|
||||
}
|
||||
|
||||
var clients = &sync.Map{}
|
||||
|
||||
// NewClient
|
||||
/*
|
||||
* domain => Example: sonar_search
|
||||
* endpoints => Example: []string{"sonar_search:8080", "sonar_search:80801"} or []string{"10.10.10.10:32000", "10.10.10.10:32001"}
|
||||
* fn => Example: system.NewSystemSrvClient
|
||||
* opts => Example: grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
*/
|
||||
func NewClient[T any](
|
||||
domain string,
|
||||
endpoints []string,
|
||||
fn func(grpc.ClientConnInterface) T,
|
||||
opts ...grpc.DialOption,
|
||||
) (*Client[T], error) {
|
||||
cached, ok := clients.Load(domain)
|
||||
if ok {
|
||||
if client, ok := cached.(*Client[T]); ok {
|
||||
return client, nil
|
||||
}
|
||||
}
|
||||
|
||||
resolved := resolver.State{Addresses: make([]resolver.Address, 0)}
|
||||
|
||||
locker.Lock()
|
||||
for _, item := range endpoints {
|
||||
resolved.Addresses = append(resolved.Addresses, resolver.Address{Addr: item})
|
||||
}
|
||||
ips[domain] = resolved
|
||||
locker.Unlock()
|
||||
|
||||
fullAddress := fmt.Sprintf("%s://%s", SCHEME, domain)
|
||||
|
||||
opts = append(opts,
|
||||
grpc.WithResolvers(myBuilder),
|
||||
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
|
||||
grpc.WithChainUnaryInterceptor(retryInterceptor(3, 3*time.Second)),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
|
||||
conn, err := grpc.DialContext(
|
||||
tool.Timeout(3),
|
||||
fullAddress,
|
||||
opts...,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := &Client[T]{
|
||||
cc: conn,
|
||||
fn: fn,
|
||||
}
|
||||
|
||||
clients.Store(domain, c)
|
||||
|
||||
return c, nil
|
||||
}
|
82
internal/invoke/resolve.go
Normal file
82
internal/invoke/resolve.go
Normal file
@ -0,0 +1,82 @@
|
||||
package invoke
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
scheme = "bifrost"
|
||||
)
|
||||
|
||||
type CustomBuilder struct{}
|
||||
|
||||
func (cb *CustomBuilder) Scheme() string {
|
||||
return scheme
|
||||
}
|
||||
|
||||
func (cb *CustomBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
|
||||
cr := &customResolver{
|
||||
cc: cc,
|
||||
target: target,
|
||||
}
|
||||
|
||||
cr.ResolveNow(resolver.ResolveNowOptions{})
|
||||
|
||||
return cr, nil
|
||||
}
|
||||
|
||||
type customResolver struct {
|
||||
sync.Mutex
|
||||
target resolver.Target
|
||||
cc resolver.ClientConn
|
||||
ips map[string]string
|
||||
}
|
||||
|
||||
func (cr *customResolver) ResolveNow(o resolver.ResolveNowOptions) {
|
||||
var (
|
||||
addrs = make([]resolver.Address, 0)
|
||||
hp []string
|
||||
)
|
||||
|
||||
cr.Lock()
|
||||
defer cr.Unlock()
|
||||
|
||||
if hp = strings.Split(cr.target.URL.Host, ":"); len(hp) >= 2 {
|
||||
if ip, ok := pool[hp[0]]; ok {
|
||||
addr := fmt.Sprintf("%s:%s", ip, hp[1])
|
||||
addrs = append(addrs, resolver.Address{Addr: addr})
|
||||
}
|
||||
}
|
||||
|
||||
_ = cr.cc.UpdateState(resolver.State{Addresses: addrs})
|
||||
}
|
||||
|
||||
func (cr *customResolver) Close() {}
|
||||
|
||||
var (
|
||||
cb = &CustomBuilder{}
|
||||
pool = make(map[string]string)
|
||||
)
|
||||
|
||||
func init() {
|
||||
resolver.Register(cb)
|
||||
}
|
||||
|
||||
type CustomDomain struct {
|
||||
Domain string
|
||||
IP string
|
||||
}
|
||||
|
||||
func NewCustomBuilder(cds ...CustomDomain) resolver.Builder {
|
||||
locker.Lock()
|
||||
defer locker.Unlock()
|
||||
|
||||
for _, cd := range cds {
|
||||
pool[cd.Domain] = cd.IP
|
||||
}
|
||||
|
||||
return cb
|
||||
}
|
43
internal/invoke/resolve_v2.go
Normal file
43
internal/invoke/resolve_v2.go
Normal file
@ -0,0 +1,43 @@
|
||||
package invoke
|
||||
|
||||
import (
|
||||
"github.com/sirupsen/logrus"
|
||||
"sync"
|
||||
|
||||
"google.golang.org/grpc/resolver"
|
||||
)
|
||||
|
||||
type Builder struct{}
|
||||
|
||||
func (b *Builder) Scheme() string {
|
||||
return SCHEME
|
||||
}
|
||||
|
||||
func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
|
||||
cr := &Resolver{
|
||||
cc: cc,
|
||||
target: target,
|
||||
}
|
||||
|
||||
cr.ResolveNow(resolver.ResolveNowOptions{})
|
||||
|
||||
return cr, nil
|
||||
}
|
||||
|
||||
type Resolver struct {
|
||||
target resolver.Target
|
||||
cc resolver.ClientConn
|
||||
}
|
||||
|
||||
func (r *Resolver) ResolveNow(o resolver.ResolveNowOptions) {
|
||||
logrus.Tracef("resolve_v2 ResolveNow => target: %s, %v", r.target.URL.Host, ips)
|
||||
_ = r.cc.UpdateState(ips[r.target.URL.Host])
|
||||
}
|
||||
|
||||
func (cr *Resolver) Close() {}
|
||||
|
||||
var (
|
||||
locker = &sync.Mutex{}
|
||||
myBuilder = &Builder{}
|
||||
ips = map[string]resolver.State{}
|
||||
)
|
43
internal/invoke/retry.go
Normal file
43
internal/invoke/retry.go
Normal file
@ -0,0 +1,43 @@
|
||||
package invoke
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func retryInterceptor(maxAttempt int, interval time.Duration) grpc.UnaryClientInterceptor {
|
||||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
|
||||
if maxAttempt == 0 {
|
||||
return invoker(ctx, method, req, reply, cc, opts...)
|
||||
}
|
||||
|
||||
duration := interval
|
||||
|
||||
for attempt := 1; attempt <= maxAttempt; attempt++ {
|
||||
|
||||
if err := invoker(ctx, method, req, reply, cc, opts...); err != nil {
|
||||
if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
|
||||
logrus.Debugf("Connection failed err: %v, retry %d after %fs", err, attempt, duration.Seconds())
|
||||
|
||||
time.Sleep(duration)
|
||||
duration *= 2
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil // 请求成功,不需要重试
|
||||
}
|
||||
|
||||
return fmt.Errorf("max retry attempts reached")
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user