golang|go-kit 上手之example stringsvc3 通过代理实现分布式处理


  • 代理中间件
  • 使用的包
    • 创建到特定地址代理服务器的client
    • 定义使用了代理机制的新服务
    • 根据用户输入的代理服务器地址生成对应的代理服务器中间件
  • main
  • 运行结果

代理中间件 stringsvc3没有完全按照官网中stringsvc3的写法,而是在stringsvc2的基础上增加了proxy.go
主要就是给uppercase增加了代理中间件,主要步骤分三步:
1)向特定地址代理服务器发送请求的client的编码和解码函数。
2)生成向特定地址代理服务器发送请求的client。
3)用client配合load balancer构建代理服务器中间件。
使用的包
package mainimport ( "bytes" "context" "encoding/json" "errors" "fmt" "net/http" "net/url" "strings" "time""github.com/go-kit/kit/sd/lb""golang.org/x/time/rate""github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/ratelimit" "github.com/sony/gobreaker""io/ioutil""github.com/go-kit/kit/circuitbreaker""github.com/go-kit/kit/log" "github.com/go-kit/kit/sd" httptransport "github.com/go-kit/kit/transport/http" )

定义所需类型
type ServiceMiddleware func(StringService) StringService

获取用户指定的代理服务器地址列表,本样例中,用户输入多个代理服务器用”,”分割
func split(s string) []string { a := strings.Split(s, ",") for i := range a { a[i] = strings.TrimSpace(a[i]) }return a }

根据httptransport.NewClient的参数需要,需要一个将client的request编码的函数(如下encodeRequest),以及将代理服务器返回的数据转为response的函数(如下:decodeUppercaseResponse)
【golang|go-kit 上手之example stringsvc3 通过代理实现分布式处理】往代理服务发送请求时,将request转为io.ReaderCloser
func encodeRequest(_ context.Context, r *http.Request, request interface{}) error { var buf bytes.Buffer if err := json.NewEncoder(&buf).Encode(request); err != nil { return err } r.Body = ioutil.NopCloser(&buf) return nil}func decodeUppercaseResponse(_ context.Context, r *http.Response) (interface{}, error) { var response uppercaseResponse if err := json.NewDecoder(r.Body).Decode(&response); err != nil { return nil, err } return response, nil }

创建到特定地址代理服务器的client
func makeUppercaseProxy(ctc context.Context, instance string) endpoint.Endpoint { if !strings.HasPrefix(instance, "http") { instance = "http://" + instance }u, err := url.Parse(instance) if err != nil { panic(err) } if u.Path == "" { u.Path = "/uppercase" }return httptransport.NewClient( "GET", u, encodeRequest, decodeUppercaseResponse, ).Endpoint() }

定义使用了代理机制的新服务
type proxymw struct { nextStringService //用于处理Count请求 uppercase endpoint.Endpoint //load balance处理uppercase }//直接用当前服务处理Count请求 func (mw proxymw) Count(ctx context.Context, s string) int { return mw.next.Count(ctx, s) }//将uppercase请求发往各个代理服务器中(后面会讲到通过Load balancer实现) func (mw proxymw) Uppercase(ctx context.Context, s string) (string, error) { response, err := mw.uppercase(ctx, uppercaseRequest{S: s}) if err != nil { return "", err }resp := response.(uppercaseResponse) if resp.Err != "" { return resp.V, errors.New(resp.Err) }return resp.V, nil }

根据用户输入的代理服务器地址生成对应的代理服务器中间件
//根据用户输入的多个地址,创建到多个服务器的代理 func proxyMiddleware(ctx context.Context, instances string, logger log.Logger) ServiceMiddleware { if instances == "" { logger.Log("proxy_to", "none") return func(next StringService) StringService { return next } }var ( qps= 100//请求频率超过多少会返回错误 maxAttempts = 3//请求在放弃前重试多少次,用于 load balancer maxTime= 250 * time.Millisecond // 请求在放弃前的超时时间,用于 load balancer )var ( instanceList = split(instances) endpointersd.FixedEndpointer ) logger.Log("proxy_to", fmt.Sprint(instanceList)) for _, instance := range instanceList { var e endpoint.Endpoint e = makeUppercaseProxy(ctx, instance) //创建client e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e) //添加breader e = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), qps))(e) //添加limiter endpointer = append(endpointer, e) } balancer := lb.NewRoundRobin(endpointer) //添加load balancer//Retry封装一个service load balancer,返回面向特定service method的load balancer。到这个endpoint的请求会自动通过 //load balancer进行分配到各个代理服务器中。返回失败的请求会自动retry直到成功或者到达最大失败次数或者超时。 retry := lb.Retry(maxAttempts, maxTime, balancer)//添加retry机制 return func(next StringService) StringService { return proxymw{next, retry} } }

main
func main() { var ( listen = flag.String("listen", ":8080", "http lisetern address") proxy= flag.String("proxy", "", "optional ") ) flag.Parse() logger := log.NewLogfmtLogger(os.Stderr) logger = log.With(logger, "listern", *listen, "caller", log.DefaultCaller) //注意这里的 filedkeys要和 methodField 中的一致,不然会报错 //fieldKeys := []string{"metod", "error"} //2017/10/19 18:09:28 http: panic serving [::1]:55246: label name "metod" missing in label map fieldKeys := []string{"method", "error"} requestCount := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{ Namespace: "my_gropu", Subsystem: "string_service", Name:"request_count", Help:"Number of requests received.", }, fieldKeys)requestLatency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ Namespace: "my_gropu", Subsystem: "string_service", Name:"request_latence_microseconds", Help:"Number of requests in misroseconds.", }, fieldKeys)countResult := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ Namespace: "my_gropu", Subsystem: "string_service", Name:"count_result", Help:"The result of each count method.", }, []string{})//svc := stringService{} //cannot use logMiddleware literal (type logMiddleware) as type stringService in assignmentvar svc StringService svc = stringService{} svc = proxyMiddleware(context.Background(), *proxy, logger)(svc) svc = logMiddleware{logger, svc} svc = instrumentingMiddleware{requestCount, requestLatency, countResult, svc}uppercaseHandler := httptransport.NewServer( makeUppercaseEndpoint(svc), decodeUpperCaseRequest, encodeResponse, )countHandler := httptransport.NewServer( makeCountEndpoint(svc), decodeCountRequest, encodeResponse, ) http.Handle("/uppercase", uppercaseHandler) http.Handle("/count", countHandler) http.Handle("/metrics", promhttp.Handler()) logger.Log("msg", "HTTP", "addr", *listen) logger.Log("err", http.ListenAndServe(*listen, nil)) }

相对于stringsvc2.md,就增加了
svc = proxyMiddleware(context.Background(), *proxy, logger)(svc)

可以看到,利用gokit的middlerware方式书写代码,增加功能非常简单。
运行结果
Sean-MacBook-Air:stringsrv3 kes$ ./main -listen=:8001 listern=:8001 caller=proxy.go:109 proxy_to=none listern=:8001 caller=main.go:329 msg=HTTP addr=:8001 listern=:8001 caller=main.go:186 logmethod=uppercase input=foo output=FOO err=null took=1.883μsSean-MacBook-Air:stringsrv3 kes$ ./main -listen=:8082 listern=:8082 caller=proxy.go:109 proxy_to=none listern=:8082 caller=main.go:329 msg=HTTP addr=:8082 listern=:8082 caller=main.go:186 logmethod=uppercase input=bar output=BAR err=null took=1.993μsSean-MacBook-Air:stringsrv3 kes$ ./main -listen=:8080 -proxy=localhost:8001,localhost:8082 listern=:8080 caller=proxy.go:123 proxy_to="[localhost:8001 localhost:8082]" listern=:8080 caller=main.go:329 msg=HTTP addr=:8080 listern=:8080 caller=main.go:186 logmethod=uppercase input=foo output=FOO err=null took=4.496073ms listern=:8080 caller=main.go:186 logmethod=uppercase input=bar output=BAR err=null took=1.983719msSean-MacBook-Air:goproject kes$ for s in foo bar ; do curl -d"{\"s\":\"$s\"}" localhost:8080/uppercase; done {"v":"FOO"} {"v":"BAR"}

推荐干货满满的技术专栏
golang|go-kit 上手之example stringsvc3 通过代理实现分布式处理
文章图片


    推荐阅读