rpc 服务样例:rpc_service.go
package main
import (
"log"
"net"
"net/http"
"net/rpc"
_ "net/http/pprof"
)
// 常数名参数化,确定接口,有利于维护
const HelloServiceName = "HelloService"
type HelloServiceInterface interface {
Hello(request string, reply *string) error
}
func RegisterHelloService(svc HelloServiceInterface) error {
return rpc.RegisterName(HelloServiceName, svc)
}
type HelloService struct {}
// Hello 方法必须满足 Go 语言的 RPC 规则:方法只能有两个可序列化的参数,其中第二个参数是指针类型,并且返回一个 error 类型,同时必须是公开的方法。
func (p *HelloService) Hello(request string, reply *string) error {
*reply = "hello:" + request
return nil
}
func main() {
// 将 HelloService 类型的对象注册为一个 RPC 服务
// rpc.Register 函数调用会将对象类型中所有满足 RPC 规则的对象方法注册为 RPC 函数
// 所有注册的方法会放在 “HelloService” 服务空间之下。
RegisterHelloService(new(HelloService))
// 然后我们建立一个唯一的 TCP 链接
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}
// 通过 rpc.ServeConn 函数在该 TCP 链接上为对方提供 RPC 服务。
go rpc.ServeConn(conn)
}
}
一、导入 net/rpc 包,生成空的 rpc 服务实例 DefaultServer
// Server represents an RPC Server.
type Server struct {
serviceMap sync.Map // map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
}
func NewServer() *Server {
return &Server{}
}
// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()
二、注册 rpc 服务得到后的 DefaultServer
&rpc.Server{
serviceMap: sync.Map{
mu: sync.Mutex{state: 0, sema: 0x0},
read: atomic.Value{
v: sync.readOnly{m: map[interface{}]*sync.entry(nil), amended: true}
},
dirty: map[interface{}]*sync.entry{
"HelloService": (*sync.entry)(0xc000188038)
},
misses: 0
},
reqLock: sync.Mutex{state: 0, sema: 0x0},
freeReq: (*rpc.Request)(nil),
respLock: sync.Mutex{state: 0, sema: 0x0},
freeResp: (*rpc.Response)(nil)
}
可知 HelloService 服务已经注册在 DefaultServer.serviceMap.dirty Map中(具体过程)
三、提供 rpc 服务
// ServeCodec is like ServeConn but uses the specified codec to
// decode requests and encode responses.
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
// 在特定 TCP 连接上阻塞等待请求
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
...
if !keepReading {
break
}
...
}
wg.Add(1)
// 异步处理请求
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
}
// We've seen that there are no more requests.
// Wait for responses to be sent before closing codec.
wg.Wait()
codec.Close()
}
rpc 客户端样例:rpc_client.go
package main
import (
"fmt"
"log"
"net/rpc"
"time"
)
// 常数名参数化,封装helloServiceClient方法,有利于维护,方法一目了然
// 现在客户端用户不用再担心RPC方法名字或参数类型不匹配等低级错误的发生。
const HelloServiceName = "HelloService"
type HelloServiceClient struct {
*rpc.Client
}
func DialHelloService(network, address string) (*HelloServiceClient, error) {
c, err := rpc.Dial(network, address)
if err != nil {
return nil, err
}
return &HelloServiceClient{Client: c /*rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))*/}, nil
}
func (p *HelloServiceClient) Hello(request string, reply *string) error {
return p.Client.Call(HelloServiceName+".Hello", request, reply)
}
func main() {
client, err := DialHelloService("tcp", "localhost:1234")
if err != nil {
log.Fatal("dialing:", err)
}
for {
var reply string
err = client.Hello("hello", &reply)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
time.Sleep(10 * time.Second)
}
}
一、拨号连接到指定网络地址的RPC服务器后,调用指定的方法
// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}
方法 Go 是异步的,call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done 这是同步的
版权声明:本文为weixin_44559544原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。