解读 net/rpc

rpc 服务样例:rpc_service.go

package main

import (
	"log"
	"net"
	"net/http"
	"net/rpc"

	_ "net/http/pprof"
)

// 常数名参数化,确定接口,有利于维护
const HelloServiceName = "HelloService"

type HelloServiceInterface interface {
	Hello(request string, reply *string) error
}
func RegisterHelloService(svc HelloServiceInterface) error {
	return rpc.RegisterName(HelloServiceName, svc)
}

type HelloService struct {}

// Hello 方法必须满足 Go 语言的 RPC 规则:方法只能有两个可序列化的参数,其中第二个参数是指针类型,并且返回一个 error 类型,同时必须是公开的方法。
func (p *HelloService) Hello(request string, reply *string) error {
	*reply = "hello:" + request
	return nil
}

func main() {
	// 将 HelloService 类型的对象注册为一个 RPC 服务
	// rpc.Register 函数调用会将对象类型中所有满足 RPC 规则的对象方法注册为 RPC 函数
	// 所有注册的方法会放在 “HelloService” 服务空间之下。
	RegisterHelloService(new(HelloService))

	// 然后我们建立一个唯一的 TCP 链接
	listener, err := net.Listen("tcp", ":1234")
	if err != nil {
		log.Fatal("ListenTCP error:", err)
	}

	for {
		conn, err := listener.Accept()
		if err != nil {
			log.Fatal("Accept error:", err)
		}
		
		// 通过 rpc.ServeConn 函数在该 TCP 链接上为对方提供 RPC 服务。
		go rpc.ServeConn(conn)
	}
}

一、导入 net/rpc 包,生成空的 rpc 服务实例 DefaultServer

// Server represents an RPC Server.
type Server struct {
	serviceMap sync.Map   // map[string]*service
	reqLock    sync.Mutex // protects freeReq
	freeReq    *Request
	respLock   sync.Mutex // protects freeResp
	freeResp   *Response
}

func NewServer() *Server {
	return &Server{}
}

// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()

二、注册 rpc 服务得到后的 DefaultServer

&rpc.Server{
	serviceMap: sync.Map{
		mu: sync.Mutex{state: 0, sema: 0x0},
		read: atomic.Value{
			v: sync.readOnly{m: map[interface{}]*sync.entry(nil), amended: true}
		},
		
		dirty: map[interface{}]*sync.entry{
			"HelloService": (*sync.entry)(0xc000188038)
		}, 
		
		misses: 0
	},
	
	reqLock: sync.Mutex{state: 0, sema: 0x0},
	freeReq: (*rpc.Request)(nil),
	respLock: sync.Mutex{state: 0, sema: 0x0},
	freeResp: (*rpc.Response)(nil)
}

可知 HelloService 服务已经注册在 DefaultServer.serviceMap.dirty Map中(具体过程

三、提供 rpc 服务

// ServeCodec is like ServeConn but uses the specified codec to
// decode requests and encode responses.
func (server *Server) ServeCodec(codec ServerCodec) {
	sending := new(sync.Mutex)
	wg := new(sync.WaitGroup)
	for {
		// 在特定 TCP 连接上阻塞等待请求
		service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
		if err != nil {
			...
			if !keepReading {
				break
			}
			...
		}
		wg.Add(1)
		// 异步处理请求
		go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
	}
	// We've seen that there are no more requests.
	// Wait for responses to be sent before closing codec.
	wg.Wait()
	codec.Close()
}

rpc 客户端样例:rpc_client.go

package main

import (
	"fmt"
	"log"
	"net/rpc"
	"time"
)

// 常数名参数化,封装helloServiceClient方法,有利于维护,方法一目了然
// 现在客户端用户不用再担心RPC方法名字或参数类型不匹配等低级错误的发生。
const HelloServiceName = "HelloService"

type HelloServiceClient struct {
	*rpc.Client
}

func DialHelloService(network, address string) (*HelloServiceClient, error) {
	c, err := rpc.Dial(network, address)
	if err != nil {
		return nil, err
	}

	return &HelloServiceClient{Client: c /*rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))*/}, nil
}

func (p *HelloServiceClient) Hello(request string, reply *string) error {
	return p.Client.Call(HelloServiceName+".Hello", request, reply)
}

func main() {
	client, err := DialHelloService("tcp", "localhost:1234")
	if err != nil {
		log.Fatal("dialing:", err)
	}

	for {
		var reply string
		err = client.Hello("hello", &reply)
		if err != nil {
			log.Fatal(err)
		}
		fmt.Println(reply)
		time.Sleep(10 * time.Second)
	}
}

一、拨号连接到指定网络地址的RPC服务器后,调用指定的方法

// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
	call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
	return call.Error
}

// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
	call := new(Call)
	call.ServiceMethod = serviceMethod
	call.Args = args
	call.Reply = reply
	if done == nil {
		done = make(chan *Call, 10) // buffered.
	} else {
		if cap(done) == 0 {
			log.Panic("rpc: done channel is unbuffered")
		}
	}
	call.Done = done
	client.send(call)
	return call
}

方法 Go 是异步的,call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done 这是同步的


版权声明:本文为weixin_44559544原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。