go grpc测试_用gokit写出优雅的gRPC微服务

用go-kit写出优雅的gRPC微服务

微服务在这几年一直是大家谈论的话题。微服务是一种软件架构,它将一个大且聚合的业务项目拆解为多个小且独立的业务模块,模块即服务,各服务间使用高效的协议(protobuf、JSON 等)相互调用即是 RPC。这种拆分代码库的方式有以下特点:

  • 每个服务应作为小规模的、独立的业务模块运行,独立部署

  • 每个服务应在进行自动化测试和(分布式)部署时,不影响其他服务

  • 每个服务内部进行细致的错误检查和处理

本章介绍如何通过GRPC的方式来开发微服务项目,同时也需要支持http的RESTful的方式。

简介

Github: https://github.com/icowan/grpc-world

设计一个简单的数据存储的服务,通过get获取数据,通过put设置数据,类似于Redis的key,value存储。

服务有两个API:

  • get: 根据key获取内容

  • put: 根据key设置内容

那么初步定义一个Service实现这两个功能:

type Service interface {  Get(ctx context.Context, key string) (val string, err error)  Put(ctx context.Context, key, val string) (err error)}

以下内容全部通过go-kit的一些组件来实现。

4a495564dfd2040c81afa532e6425531.png

开始前

在创建grpc服务之前需要在您的开发机器上安装proto命令,主要用它来通过.proto文件生成pb文件。

在MacOS安装

$ brew install autoconf automake libtool

因为是基于go语言除了proto之外还需要protoc-gen-go命令:

$ go get -u google.golang.org/grpc$ cd $GOPATH/bin/$ lsprotoc-gen-go

需要保证protoc-gen-goGOPATH/bin目录下

开始撸Server端代码

基于go-kit组件来实现server端,go-kit非常适合用来作微服务的组件,它有非常优秀的代码规范能大减少开发人员犯错概率,刚接触时可能会觉得非常复杂,但用久之后你会发现它真的很方便。用go-kit的好处还有,假如以后换成其他框架,在 Go-kit 良好的架构下,我们只需要把 Transport 以及 Endpoint 层剥离,留下 Service 就可以方便集成到新的框架下面了  

go-kit主使用了三层结构:

  • Transport: 通信层,可以用各种不同的通信方式,如 HTTP RESTful 接口或者 gRPC 接口(这是个很大的优点,方便切换成任何通信协议)

    • http: http的传输处理

    • grpc: grpc传输处理

  • Endpoint: 终端层,主要实现各种接口的 handler,负责 req/resp 格式的转换

  • Service: 服务层,实现业务逻辑的地方

优雅的目录结构

.├── Dockerfile├── Makefile├── README.md├── client│   ├── grpc│   │   └── client.go│   └── http│       └── client.go├── cmd│   ├── main.go│   └── service│       └── service.go├── docker-compose.yaml├── go.mod├── go.sum└── pkg    ├── encode    │   └── response.go    ├── endpoint    │   └── endpoint.go    ├── grpc    │   ├── handler.go    │   └── pb    │       ├── service.pb.go    │       └── service.proto    ├── http    │   └── handler.go    ├── repository    │   └── repository.go    └── service        ├── logging.go        ├── middleware.go        └── service.go

/client/: 我用来演示的demo,可以不用

  • /client/http/: http 的例子

  • /client/grpc/: grpc 的例子

  • /cmd: 存放通过命令行启动的入口

  • /cmd/service/: 真正入口在这里,在这里初始化服务

  • /pkg/: 所有的工具,及service都在这里

    • /pkg/encode/: encode 工具

    • /pkg/endpoint/: go-kit 的端点层

    • /pkg/gprc or http: 传输层,入参、出参都在这里进行处理

    • /pkg/repository/: 仓库,这里我只把数据存内存了没做持久化处理

    • /pkg/service/: 业务逻辑在这里实现

Service

Service是具体的业务实现,这里只需要关注业务逻辑不需要关注框架本身,需要什么就传入什么,也方便以后迁移。

Service定义一个 interface ,并提供所需要实现的方法,若将来有升级或兼容,可以再实现一个Service2,就不再需要修改上层逻辑了,也能向前兼容。

import (  "context"  "github.com/go-kit/kit/log"  "github.com/icowan/grpc-world/pkg/repository")type Service interface {  Put(ctx context.Context, key, val string) (err error)}type service struct {  logger     log.Logger  repository repository.Repository}func (s *service) Put(ctx context.Context, key, val string) (err error) {  return s.repository.Put(key, val)}func New(logger log.Logger, repository repository.Repository) Service {  return &service{logger: logger, repository: repository}}

Endpoint

端点的主要功能是将Transport传过来的Request内容进行类型转换并将数据传到Service及处理返回的内容或转换。简单来说它就是Transport跟Service的桥梁。

type GetRequest struct {  Key string `json:"key"`  Val string `json:"val"`}type Endpoints struct {  GetEndpoint endpoint.Endpoint}func NewEndpoint(s service.Service, mdw map[string][]endpoint.Middleware) Endpoints {  eps := Endpoints{    GetEndpoint: func(ctx context.Context, request interface{}) (response interface{}, err error) {      req := request.(GetRequest)      val, err := s.Get(ctx, req.Key)      return encode.Response{Error: err,Data:  val}, err},  }  for _, m := range mdw["Get"] {    eps.GetEndpoint = m(eps.GetEndpoint)  }  return eps}

Middleware

中间件的的功能主要用来记录日志、限流、分布式追踪、权限验证等等,每个api可以根据需求定制所需要的中间件。

Logging

logging除了这种写法外也可以类型endpoint.Middleware的那种写法,我这里展示两种模式。

type loggingService struct {  logger log.Logger  next   Service}func NewLoggingService(logger log.Logger, s Service) Service {  return &loggingService{level.Info(logger), s}}func (l *loggingService) Put(ctx context.Context, key, val string) (err error) {  defer func(begin time.Time) {    _ = l.logger.Log("method", "Put","key", key,"took", time.Since(begin),"err", err)  }(time.Now())  return l.next.Put(ctx, key, val)}

Limiter

下面展示的是限流的中间件例子:

func TokenBucketLimitter(bkt *rate.Limiter) endpoint.Middleware {  return func(next endpoint.Endpoint) endpoint.Endpoint {    return func(ctx context.Context, request interface{}) (response interface{}, err error) {      if !bkt.Allow() {        return nil, errors.New("Rate limit exceed!")      }      return next(ctx, request)    }  }}

Repository

仓库只实现了一个简单的get及put功能,数据没有落地,进程停了数据就没了。

type Store struct {  Key       string  Val       string  CreatedAt time.Time}type StoreKey stringvar ErrUnknown = errors.New("unknown store")type Repository interface {  Put(key, val string) error  Get(key string) (res *Store, err error)}type store struct {  mtx    sync.RWMutex  stores map[StoreKey]*Store}func (s *store) Put(key, val string) error {  s.mtx.Lock()  defer s.mtx.Unlock()  s.stores[StoreKey(key)] = &Store{    Key:       key,    Val:       val,    CreatedAt: time.Now(),  }  return nil}// 省略一部分func New() Repository {  return &store{    stores: make(map[StoreKey]*Store),  }}

Transport

传输层剥离出来之后应用的传输方式就可以随意定义了,可以是HTTP也可以是gRPC或其他方式,它们所对接的都是Endpoint,所以Endpoint及Service都不需要做任何调整就能直接使用多种传输方式。

HTTP

下面是实现HTTP Transport的实现方式:

import (  "context"  "encoding/json"  kithttp "github.com/go-kit/kit/transport/http"  "github.com/gorilla/mux"  "github.com/icowan/grpc-world/pkg/encode"  ep "github.com/icowan/grpc-world/pkg/endpoint"  "github.com/pkg/errors"  "net/http")func MakeHTTPHandler(eps ep.Endpoints, opts ...kithttp.ServerOption) http.Handler {  r := mux.NewRouter()  r.Handle("/get/{key}", kithttp.NewServer(    eps.GetEndpoint,    decodeGetRequest,    encode.JsonResponse,    opts...,  )).Methods(http.MethodGet)  return r}func decodeGetRequest(_ context.Context, r *http.Request) (request interface{}, err error) {  vars := mux.Vars(r)  key, ok := vars["key"]  if !ok {    return nil, errors.New("route bad")  }  return ep.GetRequest{Key: key}, nil}

gRPC

gRPC的Transport会比http稍微麻烦一丢丢,主要是gRPC还需要实现一个grpcServerinterface,除此之外与http的实现几乎差不多。

定义proto文件及生成pb文件

在实现grpcServer之前先得定义接口:

syntax = "proto3";package pb;service Service {    rpc Get (GetRequest) returns (ServiceResponse) {}    rpc Put (GetRequest) returns (ServiceResponse) {}}message GetRequest {    string key = 1;    string val = 2;}message ServiceResponse {    bool success = 1;    int64 code = 2;    string data = 3;    string err = 4;}

生成pb文件:

进入pkg/grpc/pb/目录执行:

$ protoc service.proto --go_out==plugins=grpc:.$ cd .. && tree .├── handler.go└── pb    ├── service.pb.go    └── service.proto1 directory, 3 files

c0a19f20e75e87864079c416830301e9.png

以下gRPC的实现代码参考:

import (  "context"  "github.com/go-kit/kit/transport/grpc"  "github.com/icowan/grpc-world/pkg/encode"  ep "github.com/icowan/grpc-world/pkg/endpoint"  "github.com/icowan/grpc-world/pkg/grpc/pb")type grpcServer struct {  get grpc.Handler}func (g *grpcServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.ServiceResponse, error) {  _, rep, err := g.get.ServeGRPC(ctx, req)  if err != nil {    return nil, err  }  return rep.(*pb.ServiceResponse), nil}func MakeGRPCHandler(eps ep.Endpoints, opts ...grpc.ServerOption) pb.ServiceServer {  return &grpcServer{    get: grpc.NewServer(      eps.GetEndpoint,      decodeGetRequest,      encodeResponse,      opts...,    ),  }}func decodeGetRequest(_ context.Context, r interface{}) (interface{}, error) {  return ep.GetRequest{    Key: r.(*pb.GetRequest).Key,    Val: r.(*pb.GetRequest).Val,  }, nil}func encodeResponse(_ context.Context, r interface{}) (interface{}, error) {  resp := r.(encode.Response)  // ......省略  return &pb.ServiceResponse{    Success: resp.Success,    Code:    int64(resp.Code),  }, err}

入口Run()

同时启动HTTP服务及gRPC服务需要启动两个端口,默认启动的是:8080:8081,通过传参可以自定义。

在Run里面需要初始化数据仓库Repository,初发化Service,初始化Endpoint和初始化Transport,初始化完成之后再启动相应两个传输方式。

最后有一个监听退出信号(signal)的功能,可根据需要自行处理。

代码路径: cmd/service/server.go

const rateBucketNum = 20var (  logger log.Logger  fs       = flag.NewFlagSet("world", flag.ExitOnError)  httpAddr = fs.String("http-addr", ":8080", "HTTP listen address")  grpcAddr = fs.String("grpc-addr", ":8081", "gRPC listen address"))func Run() {  if err := fs.Parse(os.Args[1:]); err != nil {    panic(err)  }  logger = log.NewLogfmtLogger(os.Stderr)  store := repository.New() // 初始化仓库  svc := service.New(logger, store) //   svc = service.NewLoggingService(logger, svc)  ems := []endpoint.Middleware{    service.TokenBucketLimitter(rate.NewLimiter(rate.Every(time.Second*1), rateBucketNum)), // 限流  }  eps := ep.NewEndpoint(svc, map[string][]endpoint.Middleware{    "Put": ems,  })  g := &group.Group{}  initHttpHandler(eps, g)  initGRPCHandler(eps, g)  initCancelInterrupt(g)  _ = level.Error(logger).Log("exit", g.Run())}func initCancelInterrupt(g *group.Group) {  cancelInterrupt := make(chan struct{})  g.Add(func() error {    c := make(chan os.Signal, 1)    signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)    select {    case sig :=       return fmt.Errorf("received signal %s", sig)    case       return nil    }  }, func(error) {    close(cancelInterrupt)  })}func initHttpHandler(endpoints ep.Endpoints, g *group.Group) {  opts := []kithttp.ServerOption{    kithttp.ServerErrorHandler(transport.NewLogErrorHandler(level.Error(logger))),    kithttp.ServerErrorEncoder(encode.JsonError),  }  httpHandler := http.MakeHTTPHandler(endpoints, opts...)  httpListener, err := net.Listen("tcp", *httpAddr)  g.Add(func() error {    return netHttp.Serve(httpListener, httpHandler)  }, func(error) {// 略...})}func initGRPCHandler(endpoints ep.Endpoints, g *group.Group) {  grpcOpts := []kitgrpc.ServerOption{    kitgrpc.ServerErrorHandler(transport.NewLogErrorHandler(logger)),  }  grpcListener, err := net.Listen("tcp", *grpcAddr)  g.Add(func() error {    baseServer := googleGrpc.NewServer()    pb.RegisterServiceServer(baseServer, grpc.MakeGRPCHandler(endpoints, grpcOpts...))    return baseServer.Serve(grpcListener)  }, func(error) {// 略...)}

GRPC Client

http 的测试就不贴上来了,通过浏览器直接访问或 curl都可以,重定如何通过grpc调用server端:

import (  "context"  "github.com/icowan/grpc-world/pkg/grpc/pb"  "google.golang.org/grpc"  "log"  "time")func main() {  conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithInsecure(), grpc.WithBlock())  if err != nil {    log.Fatalf("did not connect: %v", err)  }  defer func() {    _ = conn.Close()  }()  svc := pb.NewServiceClient(conn)  ctx, cancel := context.WithTimeout(context.Background(), time.Second)  defer cancel()  r, err := svc.Put(ctx, &pb.GetRequest{    Key: "hello",    Val: "world",  })  if err != nil {    log.Fatalf("could not put: %v", err)  }  log.Println(r.GetSuccess())}

测试

$ make runGO111MODULE=on /usr/local/go/bin/go run ./cmd/main.go -http-addr :8080 -grpc-addr :8081

执行客户端测试命令:

$ go run ./client/grpc/client.go$ go run ./client/http/client.go

尾巴

本章所用的测试代码已经更新到了Github上,如果您觉得有参考价值的,可以将代码clone 下来,最好能给个star。

Github: https://github.com/icowan/grpc-world

谢谢了

如果我写的内容对您有用,谢谢大家了

21d8dc72a88214702d3617c55d2bafff.png


版权声明:本文为weixin_36388341原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。