一、介绍 grpc提供了简单的负载均衡,需要自己实现服务发现resolve。我们既然要使用go-kit来治理微服务,那么我们就使用go-kit的注册发现、负载均衡机制。
go-kit官方【stringsvc3】例子中使用的负载均衡方案是通过服务端转发进行,翻找下源码go-kit的服务注册发现、负载均衡在【sd】包中。下面我们介绍怎么通过go-kit进行客户端负载均衡。
go-kit提供的注册中心
【Golang|go-kit实践之2(go-kit 实现注册发现与负载均衡)】1、 etcd
2、 consul
3、 eureka
4、 zookeeper
go-kit提供的负载均衡
1、 random[随机]
2、 roundRobin[轮询]
只需实现Balancer接口,我们可以很容易的增加其它负载均衡机制
type Balancer interface {
Endpoint() (endpoint.Endpoint, error)
}
etcd注册发现
etcd和zookeeper类似是一个高可用、强一致性的存储仓库,拥有服务发现功能。 我们就通过go-kit提供的etcd包来实现服务注册发现
二、示例 1、protobuf文件及生成对应的go文件
syntax = "proto3";
// 请求书详情的参数结构book_id 32位整形
message BookInfoParams {
int32 book_id = 1;
}// 书详情信息的结构book_name字符串类型
message BookInfo {
int32 book_id = 1;
stringbook_name = 2;
}// 请求书列表的参数结构page、limit32位整形
message BookListParams {
int32 page = 1;
int32 limit = 2;
}// 书列表的结构BookInfo结构数组
message BookList {
repeated BookInfo book_list = 1;
}
// 定义 获取书详情和 书列表服务入参出参分别为上面所定义的结构
service BookService {
rpc GetBookInfo (BookInfoParams) returns (BookInfo) {}
rpc GetBookList (BookListParams) returns (BookList) {}
}
生成对应的go语言代码文件:protoc --go_out=plugins=grpc:. book.proto(其中:protobuf文件名为:book.proto)
2、Server端代码
package mainimport (
"MyKit"
"context"
"fmt"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd/etcdv3"
grpc_transport "github.com/go-kit/kit/transport/grpc"
"google.golang.org/grpc"
"net"
"time"
)type BookServer struct {
bookListHandler grpc_transport.Handler
bookInfoHandler grpc_transport.Handler
}//一下两个方法实现了 protoc生成go文件对应的接口:
/*
// BookServiceServer is the server API for BookService service.
type BookServiceServer interface {
GetBookInfo(context.Context, *BookInfoParams) (*BookInfo, error)
GetBookList(context.Context, *BookListParams) (*BookList, error)
}
*/
//通过grpc调用GetBookInfo时,GetBookInfo只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理
func (s *BookServer) GetBookInfo(ctx context.Context, in *book.BookInfoParams) (*book.BookInfo, error) { _, rsp, err := s.bookInfoHandler.ServeGRPC(ctx, in)
if err != nil {
return nil, err }
/*
if info,ok:=rsp.(*book.BookInfo);
ok {
return info,nil
}
return nil,errors.New("rsp.(*book.BookInfo)断言出错")
*/
return rsp.(*book.BookInfo), err //直接返回断言的结果
}//通过grpc调用GetBookList时,GetBookList只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理
func (s *BookServer) GetBookList(ctx context.Context, in *book.BookListParams) (*book.BookList, error) {
_, rsp, err := s.bookListHandler.ServeGRPC(ctx, in)
if err != nil {
return nil, err
}
return rsp.(*book.BookList), err
}//创建bookList的EndPoint
func makeGetBookListEndpoint()endpoint.Endpoint{
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
b:=new(book.BookList)
b.BookList=append(b.BookList,&book.BookInfo{BookId:1,BookName:"Go语言入门到精通"})
b.BookList=append(b.BookList,&book.BookInfo{BookId:2,BookName:"微服务入门到精通"})
b.BookList=append(b.BookList,&book.BookInfo{BookId:2,BookName:"区块链入门到精通"})
return b,nil
}
}//创建bookInfo的EndPoint
func makeGetBookInfoEndpoint() endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
//请求详情时返回 书籍信息
req := request.(*book.BookInfoParams)
b := new(book.BookInfo)
b.BookId = req.BookId
b.BookName = "Go入门到精通"
return b, nil
}
}func decodeRequest(_ context.Context, req interface{}) (interface{}, error) {
return req, nil
}func encodeResponse(_ context.Context, rsp interface{}) (interface{}, error) {
return rsp, nil
}func main() {
var (
etcdServer= "127.0.0.1:2379"//etcd服务的IP地址
prefix= "/services/book/"//服务的目录
ServerInstance = "127.0.0.1:50052"//当前实例Server的地址
key= prefix + ServerInstance //服务实例注册的路径
value= https://www.it610.com/article/ServerInstance
ctx= context.Background()
//服务监听地址
serviceAddress =":50052"
)
//etcd连接参数
option := etcdv3.ClientOptions{DialTimeout: time.Second * 3, DialKeepAlive: time.Second * 3}
//创建连接
client, err := etcdv3.NewClient(ctx, []string{etcdServer}, option)
if err != nil {
panic(err)
}
//创建注册
registrar := etcdv3.NewRegistrar(client, etcdv3.Service{Key: key, Value: value}, log.NewNopLogger())
registrar.Register() //启动注册服务
bookServer := new(BookServer)
bookListHandler := grpc_transport.NewServer(
makeGetBookListEndpoint(),
decodeRequest,
encodeResponse,
)
bookServer.bookListHandler = bookListHandler bookInfoHandler := grpc_transport.NewServer(
makeGetBookInfoEndpoint(),
decodeRequest,
encodeResponse,
)
bookServer.bookInfoHandler = bookInfoHandler listener, err := net.Listen("tcp", serviceAddress) //网络监听,注意对应的包为:"net"
if err != nil {
fmt.Println(err)
return
}
gs := grpc.NewServer(grpc.UnaryInterceptor(grpc_transport.Interceptor))
book.RegisterBookServiceServer(gs, bookServer) //调用protoc生成的代码对应的注册方法
gs.Serve(listener)//启动Server}
3、Client端代码
package mainimport (
"MyKit"
"context"
"fmt"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd"
"github.com/go-kit/kit/sd/etcdv3"
"github.com/go-kit/kit/sd/lb"
"google.golang.org/grpc"
"io"
"time"
)func main() { var (
//注册中心地址
etcdServer = "127.0.0.1:2379"
//监听的服务前缀
prefix = "/services/book/"
ctx= context.Background()
)
options := etcdv3.ClientOptions{
DialTimeout:time.Second * 3,
DialKeepAlive: time.Second * 3,
}
//连接注册中心
client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options)
if err != nil {
panic(err)
}
logger := log.NewNopLogger()
//创建实例管理器, 此管理器会Watch监听etc中prefix的目录变化更新缓存的服务实例数据
instancer, err := etcdv3.NewInstancer(client, prefix, logger)
if err != nil {
panic(err)
}
//创建端点管理器, 此管理器根据Factory和监听的到实例创建endPoint并订阅instancer的变化动态更新Factory创建的endPoint
endpointer := sd.NewEndpointer(instancer, reqFactory, logger) //reqFactory自定义的函数,主要用于端点层(endpoint)接受并显示数据
//创建负载均衡器
balancer := lb.NewRoundRobin(endpointer) /**
我们可以通过负载均衡器直接获取请求的endPoint,发起请求
reqEndPoint,_ := balancer.Endpoint()
*/ /**
也可以通过retry定义尝试次数进行请求
*/
reqEndPoint := lb.Retry(3, 3*time.Second, balancer) //现在我们可以通过 endPoint 发起请求了
req := struct{}{}
if _, err = reqEndPoint(ctx, req);
err != nil {
panic(err)
}
}//通过传入的 实例地址创建对应的请求endPoint
func reqFactory(instanceAddr string) (endpoint.Endpoint, io.Closer, error) {
return func(ctx context.Context, request interface{}) (interface{}, error) {
fmt.Println("请求服务: ", instanceAddr)
conn, err := grpc.Dial(instanceAddr, grpc.WithInsecure())
if err != nil {
fmt.Println(err)
panic("connect error")
}
defer conn.Close()
bookClient := book.NewBookServiceClient(conn)
bi, _ := bookClient.GetBookInfo(context.Background(), &book.BookInfoParams{BookId: 1})
fmt.Println("获取书籍详情")
fmt.Println("bookId: 1", " => ", "bookName:", bi.BookName)bl, _ := bookClient.GetBookList(context.Background(), &book.BookListParams{Page: 1, Limit: 10})
fmt.Println("获取书籍列表")
for _, b := range bl.BookList {
fmt.Println("bookId:", b.BookId, " => ", "bookName:", b.BookName)
}
return nil, nil
}, nil, nil
}
4、运行
(1)安装etcd并启动
由于本实例服务发现采用了etcd,因此在运行之前需要先安装etcd并运行。
(2)etcd是一个分布式一致性键值存储,其主要用于分布式系统的共享配置和服务发现。etcd由Go语言编写.
下载地址: https://github.com/coreos/etcd/releases
将压缩文件解压到指定文件夹,解压后的目录如下:
文章图片
其中etcd.exe是服务端,etcdctl.exe是客户端。点击etcd.exe运行etcd服务。(注:设置环境变量自由决定,此实例也可以不用设置)
文章图片
(2)实例运行
先运行Server端,在运行Client端,效果如下:
文章图片
5、问题汇总
如果运行时,提示一下错误:
panic: /debug/requests is already registered. You may have two independent copies of golang.org/x/net/trace in your binary, trying to maintain separate state. This may involve a vendored copy of golang.org/
x/net/trace.goroutine 1 [running]:
go.etcd.io/etcd/vendor/golang.org/x/net/trace.init.0()
D:/GoSrc/src/go.etcd.io/etcd/vendor/golang.org/x/net/trace/trace.go:116 +0x1ab
exit status 2
说明golang.org/x/net/包下的 trace 与go.etcd.io/etcd/vendor/golang.org/x/net/ 包下trace有冲突,解决方法:找到go.etcd.io\etcd\vendor目录:
文章图片
由于已经在src目录下存在golang.org与google.golang.org两个包
文章图片
推荐阅读
- 微服务|微服务系列:服务发现与注册-----Eureka(面试突击!你想了解的Eureka都在这里.持续更新中......)
- 每日一书|每日一书丨学习微服务最好的方式(阅读《微服务架构设计模式》)
- 【golang】leetcode中级-字母异位词分组&无重复字符的最长子串
- 彻底理解Golang Map
- kratos线上开源年会它来啦~
- 深入浅出 Golang 资源嵌入方案(go-bindata篇)
- 深入浅出 Golang 资源嵌入方案(前篇)
- golang 经典案例总结
- Go实战 | 基于有向无环图的并发执行流的实现
- Golang 数组和切片