介绍
go-kit提供了限流模块,该模块采用令牌桶算法实现,其实是封装了一下golang自带的golang.org/x/time/rate包来实现的。
令牌桶
令牌桶这种控制机制基于令牌桶中是否存在令牌来指示什么时候可以发送流量。令牌桶中的每一个令牌都代表一个字节。如果令牌桶中存在令牌,则允许发送流量;而如果令牌桶中不存在令牌,则不允许发送流量。因此,如果突发门限被合理地配置并且令牌桶中有足够的令牌,那么流量就可以以峰值速率发送。
令牌桶算法的基本过程如下:
假如用户配置的平均发送速率为r,则每隔1/r秒一个令牌被加入到桶中;
假设桶最多可以存发b个令牌。如果令牌到达时令牌桶已经满了,那么这个令牌会被丢弃;
当一个n个字节的[数据包]到达时,就从令牌桶中删除n个令牌,并且数据包被发送到网络;
如果令牌桶中少于n个令牌,那么不会删除令牌,并且认为这个数据包在流量限制之外;
两种限流
1、DelayingLimiter【限流延迟访问】
2、ErroringLimiter【限流错误返回】
Middleware
因为endpoint的封装,我们在使用go-kit提供的其它中间件时十分简单。下面就是一个完整的限流延迟中间件
把已有的endPoint外再包一层endPoint,再从最外层向内一层层调用
func NewDelayingLimiter(limit Waiter) endpoint.Middleware {
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
if err := limit.Wait(ctx);
err != nil {
return nil, err
}
return next(ctx, request)
}
}
}
使用延迟限流
【微服务|go-kit实践之3(go-kit 微服务的限流实现)】在go-kit 实现注册发现与负载均衡(https://blog.csdn.net/weixin_42117918/article/details/89208850)Server处代码做一下更改:
原来代码:
bookServer := new(BookServer)
bookInfoHandler := grpc_transport.NewServer(
makeGetBookInfoEndpoint(),
decodeRequest,
encodeResponse,
)
bookServer.bookInfoHandler = bookInfoHandler
更改之后的代码:
bookServer := new(BookServer)
bookInfoEndPoint:=makeGetBookInfoEndpoint()
//rate路径:golang.org/x/time/rate
limiter := rate.NewLimiter(rate.Every(time.Second * 3), 1)//限流3秒,临牌数:1
//通过DelayingLimiter中间件,在bookInfoEndPoint 的外层再包裹一层限流的endPoint
bookInfoEndPoint = ratelimit.NewDelayingLimiter(limiter)(bookInfoEndPoint)
bookInfoHandler := grpc_transport.NewServer(
bookInfoEndPoint, //限流的endpoint
decodeRequest,
encodeResponse,
)
bookServer.bookInfoHandler = bookInfoHandler
完整代码 1、Server端:
package mainimport (
"MyKit"
"context"
"fmt"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/ratelimit"
"github.com/go-kit/kit/sd/etcdv3"
grpc_transport "github.com/go-kit/kit/transport/grpc"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"net"
"time"
)type BookServer struct {
bookListHandler grpc_transport.Handler
bookInfoHandler grpc_transport.Handler
}//一下两个方法实现了 protoc生成go文件对应的接口:
/*
// BookServiceServer is the server API for BookService service.
type BookServiceServer interface {
GetBookInfo(context.Context, *BookInfoParams) (*BookInfo, error)
GetBookList(context.Context, *BookListParams) (*BookList, error)
}
*/
//通过grpc调用GetBookInfo时,GetBookInfo只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理
func (s *BookServer) GetBookInfo(ctx context.Context, in *book.BookInfoParams) (*book.BookInfo, error) { _, rsp, err := s.bookInfoHandler.ServeGRPC(ctx, in)
if err != nil {
return nil, err }
/*
if info,ok:=rsp.(*book.BookInfo);
ok {
return info,nil
}
return nil,errors.New("rsp.(*book.BookInfo)断言出错")
*/
return rsp.(*book.BookInfo), err //直接返回断言的结果
}//通过grpc调用GetBookList时,GetBookList只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理
func (s *BookServer) GetBookList(ctx context.Context, in *book.BookListParams) (*book.BookList, error) {
_, rsp, err := s.bookListHandler.ServeGRPC(ctx, in)
if err != nil {
return nil, err
}
return rsp.(*book.BookList), err
}//创建bookList的EndPoint
func makeGetBookListEndpoint()endpoint.Endpoint{
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
b:=new(book.BookList)
b.BookList=append(b.BookList,&book.BookInfo{BookId:1,BookName:"Go语言入门到精通"})
b.BookList=append(b.BookList,&book.BookInfo{BookId:2,BookName:"微服务入门到精通"})
b.BookList=append(b.BookList,&book.BookInfo{BookId:2,BookName:"区块链入门到精通"})
return b,nil
}
}//创建bookInfo的EndPoint
func makeGetBookInfoEndpoint() endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
//请求详情时返回 书籍信息
req := request.(*book.BookInfoParams)
b := new(book.BookInfo)
b.BookId = req.BookId
b.BookName = "Go入门到精通"
return b, nil
}
}func decodeRequest(_ context.Context, req interface{}) (interface{}, error) {
return req, nil
}func encodeResponse(_ context.Context, rsp interface{}) (interface{}, error) {
return rsp, nil
}func main() {
var (
etcdServer= "127.0.0.1:2379"//etcd服务的IP地址
prefix= "/services/book/"//服务的目录
ServerInstance = "127.0.0.1:50052"//当前实例Server的地址
key= prefix + ServerInstance //服务实例注册的路径
value= https://www.it610.com/article/ServerInstance
ctx= context.Background()
//服务监听地址
serviceAddress =":50052"
)
//etcd连接参数
option := etcdv3.ClientOptions{DialTimeout: time.Second * 3, DialKeepAlive: time.Second * 3}
//创建连接
client, err := etcdv3.NewClient(ctx, []string{etcdServer}, option)
if err != nil {
panic(err)
}
//创建注册
registrar := etcdv3.NewRegistrar(client, etcdv3.Service{Key: key, Value: value}, log.NewNopLogger())
registrar.Register() //启动注册服务
bookServer := new(BookServer)
bookInfoEndPoint:=makeGetBookInfoEndpoint()
//rate路径:golang.org/x/time/rate
limiter := rate.NewLimiter(rate.Every(time.Second * 3), 1) //限流3秒,临牌数:1
//通过DelayingLimiter中间件,在bookInfoEndPoint 的外层再包裹一层限流的endPoint
bookInfoEndPoint = ratelimit.NewDelayingLimiter(limiter)(bookInfoEndPoint)
bookListHandler := grpc_transport.NewServer(
makeGetBookListEndpoint(),
decodeRequest,
encodeResponse,
)
bookServer.bookListHandler = bookListHandler
bookInfoHandler := grpc_transport.NewServer(
bookInfoEndPoint,
decodeRequest,
encodeResponse,
)
bookServer.bookInfoHandler = bookInfoHandler
listener, err := net.Listen("tcp", serviceAddress) //网络监听,注意对应的包为:"net"
if err != nil {
fmt.Println(err)
return
}
gs := grpc.NewServer(grpc.UnaryInterceptor(grpc_transport.Interceptor))
book.RegisterBookServiceServer(gs, bookServer) //调用protoc生成的代码对应的注册方法
gs.Serve(listener)//启动Server}
2、Client端
package mainimport (
"MyKit"
"context"
"fmt"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd"
"github.com/go-kit/kit/sd/etcdv3"
"github.com/go-kit/kit/sd/lb"
"google.golang.org/grpc"
"io"
"time"
)func main() { var (
//注册中心地址
etcdServer = "127.0.0.1:2379"
//监听的服务前缀
prefix = "/services/book/"
ctx= context.Background()
)
options := etcdv3.ClientOptions{
DialTimeout:time.Second * 3,
DialKeepAlive: time.Second * 3,
}
//连接注册中心
client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options)
if err != nil {
panic(err)
}
logger := log.NewNopLogger()
//创建实例管理器, 此管理器会Watch监听etc中prefix的目录变化更新缓存的服务实例数据
instancer, err := etcdv3.NewInstancer(client, prefix, logger)
if err != nil {
panic(err)
}
//创建端点管理器, 此管理器根据Factory和监听的到实例创建endPoint并订阅instancer的变化动态更新Factory创建的endPoint
endpointer := sd.NewEndpointer(instancer, reqFactory, logger) //reqFactory自定义的函数,主要用于端点层(endpoint)接受并显示数据
//创建负载均衡器
balancer := lb.NewRoundRobin(endpointer) /**
我们可以通过负载均衡器直接获取请求的endPoint,发起请求
reqEndPoint,_ := balancer.Endpoint()
*/ /**
也可以通过retry定义尝试次数进行请求
*/
reqEndPoint := lb.Retry(30, 30*time.Second, balancer) //请求次数为30,时间为30S(时间需要多于服务器限流时间3s) //现在我们可以通过 endPoint 发起请求了
for i:=0;
i<10;
i++ {//发送10次请求
req := struct{}{}
ctx=context.Background()
if _, err = reqEndPoint(ctx, req);
err != nil {
panic(err)
}
}
}//通过传入的 实例地址创建对应的请求endPoint
func reqFactory(instanceAddr string) (endpoint.Endpoint, io.Closer, error) {
return func(ctx context.Context, request interface{}) (interface{}, error) {
conn, err := grpc.Dial(instanceAddr, grpc.WithInsecure())
if err != nil {
fmt.Println(err)
panic("connect error")
}
defer conn.Close()
bookClient := book.NewBookServiceClient(conn)
bi, _ := bookClient.GetBookInfo(context.Background(), &book.BookInfoParams{BookId: 1})
fmt.Println("获取书籍详情")
fmt.Println("bookId: 1", " => ", "bookName:", bi.BookName)
fmt.Println("请求服务成功: ", instanceAddr,"当前时间为:",time.Now().Format("2006-01-02 15:04:05.99"))
/*bl, _ := bookClient.GetBookList(context.Background(), &book.BookListParams{Page: 1, Limit: 10})
fmt.Println("获取书籍列表")
for _, b := range bl.BookList {
fmt.Println("bookId:", b.BookId, " => ", "bookName:", b.BookName)
}*/
return nil, nil
}, nil, nil
}
运行效果及分析
1、启动etcd
2、启动Server
3、启动Client
client端运行效果如下:
文章图片
可以看到Server对Client端进行了限流,限流时间间隔为:3s。
推荐阅读
- 微服务|微服务系列:服务发现与注册-----Eureka(面试突击!你想了解的Eureka都在这里.持续更新中......)
- 每日一书|每日一书丨学习微服务最好的方式(阅读《微服务架构设计模式》)
- 【golang】leetcode中级-字母异位词分组&无重复字符的最长子串
- 彻底理解Golang Map
- kratos线上开源年会它来啦~
- 深入浅出 Golang 资源嵌入方案(go-bindata篇)
- 深入浅出 Golang 资源嵌入方案(前篇)
- golang 经典案例总结
- Go实战 | 基于有向无环图的并发执行流的实现
- Golang 数组和切片