用go-kit写出优雅的gRPC微服务
微服务在这几年一直是大家谈论的话题。微服务是一种软件架构,它将一个大且聚合的业务项目拆解为多个小且独立的业务模块,模块即服务,各服务间使用高效的协议(protobuf、JSON 等)相互调用即是 RPC。这种拆分代码库的方式有以下特点:
每个服务应作为小规模的、独立的业务模块运行,独立部署
每个服务应在进行自动化测试和(分布式)部署时,不影响其他服务
每个服务内部进行细致的错误检查和处理
本章介绍如何通过GRPC的方式来开发微服务项目,同时也需要支持http的RESTful的方式。
简介
Github: https://github.com/icowan/grpc-world
设计一个简单的数据存储的服务,通过get获取数据,通过put设置数据,类似于Redis的key,value存储。
服务有两个API:
get
: 根据key获取内容put
: 根据key设置内容
那么初步定义一个Service实现这两个功能:
type Service interface { Get(ctx context.Context, key string) (val string, err error) Put(ctx context.Context, key, val string) (err error)}
以下内容全部通过go-kit的一些组件来实现。
开始前
在创建grpc服务之前需要在您的开发机器上安装proto命令,主要用它来通过.proto文件生成pb文件。
在MacOS安装
$ brew install autoconf automake libtool
因为是基于go语言除了proto之外还需要protoc-gen-go命令:
$ go get -u google.golang.org/grpc$ cd $GOPATH/bin/$ lsprotoc-gen-go
需要保证protoc-gen-go
在GOPATH/bin
目录下
开始撸Server端代码
基于go-kit组件来实现server端,go-kit非常适合用来作微服务的组件,它有非常优秀的代码规范能大减少开发人员犯错概率,刚接触时可能会觉得非常复杂,但用久之后你会发现它真的很方便。用go-kit的好处还有,假如以后换成其他框架,在 Go-kit 良好的架构下,我们只需要把 Transport 以及 Endpoint 层剥离,留下 Service 就可以方便集成到新的框架下面了
go-kit主使用了三层结构:
Transport: 通信层,可以用各种不同的通信方式,如 HTTP RESTful 接口或者 gRPC 接口(这是个很大的优点,方便切换成任何通信协议)
http
: http的传输处理grpc
: grpc传输处理
Endpoint: 终端层,主要实现各种接口的 handler,负责 req/resp 格式的转换
Service: 服务层,实现业务逻辑的地方
优雅的目录结构
.├── Dockerfile├── Makefile├── README.md├── client│ ├── grpc│ │ └── client.go│ └── http│ └── client.go├── cmd│ ├── main.go│ └── service│ └── service.go├── docker-compose.yaml├── go.mod├── go.sum└── pkg ├── encode │ └── response.go ├── endpoint │ └── endpoint.go ├── grpc │ ├── handler.go │ └── pb │ ├── service.pb.go │ └── service.proto ├── http │ └── handler.go ├── repository │ └── repository.go └── service ├── logging.go ├── middleware.go └── service.go
/client/
: 我用来演示的demo,可以不用
/client/http/
: http 的例子/client/grpc/
: grpc 的例子
/cmd
: 存放通过命令行启动的入口/cmd/service/
: 真正入口在这里,在这里初始化服务/pkg/
: 所有的工具,及service都在这里/pkg/encode/
: encode 工具/pkg/endpoint/
: go-kit 的端点层/pkg/gprc or http
: 传输层,入参、出参都在这里进行处理/pkg/repository/
: 仓库,这里我只把数据存内存了没做持久化处理/pkg/service/
: 业务逻辑在这里实现
Service
Service是具体的业务实现,这里只需要关注业务逻辑不需要关注框架本身,需要什么就传入什么,也方便以后迁移。
Service定义一个 interface ,并提供所需要实现的方法,若将来有升级或兼容,可以再实现一个Service2,就不再需要修改上层逻辑了,也能向前兼容。
import ( "context" "github.com/go-kit/kit/log" "github.com/icowan/grpc-world/pkg/repository")type Service interface { Put(ctx context.Context, key, val string) (err error)}type service struct { logger log.Logger repository repository.Repository}func (s *service) Put(ctx context.Context, key, val string) (err error) { return s.repository.Put(key, val)}func New(logger log.Logger, repository repository.Repository) Service { return &service{logger: logger, repository: repository}}
Endpoint
端点的主要功能是将Transport传过来的Request内容进行类型转换并将数据传到Service及处理返回的内容或转换。简单来说它就是Transport跟Service的桥梁。
type GetRequest struct { Key string `json:"key"` Val string `json:"val"`}type Endpoints struct { GetEndpoint endpoint.Endpoint}func NewEndpoint(s service.Service, mdw map[string][]endpoint.Middleware) Endpoints { eps := Endpoints{ GetEndpoint: func(ctx context.Context, request interface{}) (response interface{}, err error) { req := request.(GetRequest) val, err := s.Get(ctx, req.Key) return encode.Response{Error: err,Data: val}, err}, } for _, m := range mdw["Get"] { eps.GetEndpoint = m(eps.GetEndpoint) } return eps}
Middleware
中间件的的功能主要用来记录日志、限流、分布式追踪、权限验证等等,每个api可以根据需求定制所需要的中间件。
Logging
logging除了这种写法外也可以类型endpoint.Middleware
的那种写法,我这里展示两种模式。
type loggingService struct { logger log.Logger next Service}func NewLoggingService(logger log.Logger, s Service) Service { return &loggingService{level.Info(logger), s}}func (l *loggingService) Put(ctx context.Context, key, val string) (err error) { defer func(begin time.Time) { _ = l.logger.Log("method", "Put","key", key,"took", time.Since(begin),"err", err) }(time.Now()) return l.next.Put(ctx, key, val)}
Limiter
下面展示的是限流的中间件例子:
func TokenBucketLimitter(bkt *rate.Limiter) endpoint.Middleware { return func(next endpoint.Endpoint) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (response interface{}, err error) { if !bkt.Allow() { return nil, errors.New("Rate limit exceed!") } return next(ctx, request) } }}
Repository
仓库只实现了一个简单的get及put功能,数据没有落地,进程停了数据就没了。
type Store struct { Key string Val string CreatedAt time.Time}type StoreKey stringvar ErrUnknown = errors.New("unknown store")type Repository interface { Put(key, val string) error Get(key string) (res *Store, err error)}type store struct { mtx sync.RWMutex stores map[StoreKey]*Store}func (s *store) Put(key, val string) error { s.mtx.Lock() defer s.mtx.Unlock() s.stores[StoreKey(key)] = &Store{ Key: key, Val: val, CreatedAt: time.Now(), } return nil}// 省略一部分func New() Repository { return &store{ stores: make(map[StoreKey]*Store), }}
Transport
传输层剥离出来之后应用的传输方式就可以随意定义了,可以是HTTP也可以是gRPC或其他方式,它们所对接的都是Endpoint,所以Endpoint及Service都不需要做任何调整就能直接使用多种传输方式。
HTTP
下面是实现HTTP Transport的实现方式:
import ( "context" "encoding/json" kithttp "github.com/go-kit/kit/transport/http" "github.com/gorilla/mux" "github.com/icowan/grpc-world/pkg/encode" ep "github.com/icowan/grpc-world/pkg/endpoint" "github.com/pkg/errors" "net/http")func MakeHTTPHandler(eps ep.Endpoints, opts ...kithttp.ServerOption) http.Handler { r := mux.NewRouter() r.Handle("/get/{key}", kithttp.NewServer( eps.GetEndpoint, decodeGetRequest, encode.JsonResponse, opts..., )).Methods(http.MethodGet) return r}func decodeGetRequest(_ context.Context, r *http.Request) (request interface{}, err error) { vars := mux.Vars(r) key, ok := vars["key"] if !ok { return nil, errors.New("route bad") } return ep.GetRequest{Key: key}, nil}
gRPC
gRPC的Transport会比http稍微麻烦一丢丢,主要是gRPC还需要实现一个grpcServer
的 interface
,除此之外与http的实现几乎差不多。
定义proto文件及生成pb文件
在实现grpcServer之前先得定义接口:
syntax = "proto3";package pb;service Service { rpc Get (GetRequest) returns (ServiceResponse) {} rpc Put (GetRequest) returns (ServiceResponse) {}}message GetRequest { string key = 1; string val = 2;}message ServiceResponse { bool success = 1; int64 code = 2; string data = 3; string err = 4;}
生成pb文件:
进入pkg/grpc/pb/
目录执行:
$ protoc service.proto --go_out==plugins=grpc:.$ cd .. && tree .├── handler.go└── pb ├── service.pb.go └── service.proto1 directory, 3 files
以下gRPC的实现代码参考:
import ( "context" "github.com/go-kit/kit/transport/grpc" "github.com/icowan/grpc-world/pkg/encode" ep "github.com/icowan/grpc-world/pkg/endpoint" "github.com/icowan/grpc-world/pkg/grpc/pb")type grpcServer struct { get grpc.Handler}func (g *grpcServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.ServiceResponse, error) { _, rep, err := g.get.ServeGRPC(ctx, req) if err != nil { return nil, err } return rep.(*pb.ServiceResponse), nil}func MakeGRPCHandler(eps ep.Endpoints, opts ...grpc.ServerOption) pb.ServiceServer { return &grpcServer{ get: grpc.NewServer( eps.GetEndpoint, decodeGetRequest, encodeResponse, opts..., ), }}func decodeGetRequest(_ context.Context, r interface{}) (interface{}, error) { return ep.GetRequest{ Key: r.(*pb.GetRequest).Key, Val: r.(*pb.GetRequest).Val, }, nil}func encodeResponse(_ context.Context, r interface{}) (interface{}, error) { resp := r.(encode.Response) // ......省略 return &pb.ServiceResponse{ Success: resp.Success, Code: int64(resp.Code), }, err}
入口Run()
同时启动HTTP服务及gRPC服务需要启动两个端口,默认启动的是:8080
和:8081
,通过传参可以自定义。
在Run里面需要初始化数据仓库Repository
,初发化Service
,初始化Endpoint
和初始化Transport
,初始化完成之后再启动相应两个传输方式。
最后有一个监听退出信号(signal
)的功能,可根据需要自行处理。
代码路径: cmd/service/server.go
const rateBucketNum = 20var ( logger log.Logger fs = flag.NewFlagSet("world", flag.ExitOnError) httpAddr = fs.String("http-addr", ":8080", "HTTP listen address") grpcAddr = fs.String("grpc-addr", ":8081", "gRPC listen address"))func Run() { if err := fs.Parse(os.Args[1:]); err != nil { panic(err) } logger = log.NewLogfmtLogger(os.Stderr) store := repository.New() // 初始化仓库 svc := service.New(logger, store) // svc = service.NewLoggingService(logger, svc) ems := []endpoint.Middleware{ service.TokenBucketLimitter(rate.NewLimiter(rate.Every(time.Second*1), rateBucketNum)), // 限流 } eps := ep.NewEndpoint(svc, map[string][]endpoint.Middleware{ "Put": ems, }) g := &group.Group{} initHttpHandler(eps, g) initGRPCHandler(eps, g) initCancelInterrupt(g) _ = level.Error(logger).Log("exit", g.Run())}func initCancelInterrupt(g *group.Group) { cancelInterrupt := make(chan struct{}) g.Add(func() error { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) select { case sig := return fmt.Errorf("received signal %s", sig) case return nil } }, func(error) { close(cancelInterrupt) })}func initHttpHandler(endpoints ep.Endpoints, g *group.Group) { opts := []kithttp.ServerOption{ kithttp.ServerErrorHandler(transport.NewLogErrorHandler(level.Error(logger))), kithttp.ServerErrorEncoder(encode.JsonError), } httpHandler := http.MakeHTTPHandler(endpoints, opts...) httpListener, err := net.Listen("tcp", *httpAddr) g.Add(func() error { return netHttp.Serve(httpListener, httpHandler) }, func(error) {// 略...})}func initGRPCHandler(endpoints ep.Endpoints, g *group.Group) { grpcOpts := []kitgrpc.ServerOption{ kitgrpc.ServerErrorHandler(transport.NewLogErrorHandler(logger)), } grpcListener, err := net.Listen("tcp", *grpcAddr) g.Add(func() error { baseServer := googleGrpc.NewServer() pb.RegisterServiceServer(baseServer, grpc.MakeGRPCHandler(endpoints, grpcOpts...)) return baseServer.Serve(grpcListener) }, func(error) {// 略...)}
GRPC Client
http 的测试就不贴上来了,通过浏览器直接访问或 curl都可以,重定如何通过grpc调用server端:
import ( "context" "github.com/icowan/grpc-world/pkg/grpc/pb" "google.golang.org/grpc" "log" "time")func main() { conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Fatalf("did not connect: %v", err) } defer func() { _ = conn.Close() }() svc := pb.NewServiceClient(conn) ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() r, err := svc.Put(ctx, &pb.GetRequest{ Key: "hello", Val: "world", }) if err != nil { log.Fatalf("could not put: %v", err) } log.Println(r.GetSuccess())}
测试
$ make runGO111MODULE=on /usr/local/go/bin/go run ./cmd/main.go -http-addr :8080 -grpc-addr :8081
执行客户端测试命令:
$ go run ./client/grpc/client.go$ go run ./client/http/client.go
尾巴
本章所用的测试代码已经更新到了Github上,如果您觉得有参考价值的,可以将代码clone 下来,最好能给个star。
Github: https://github.com/icowan/grpc-world
谢谢了
如果我写的内容对您有用,谢谢大家了