chuck-lua内置了基于coroutine的RPC支持,所有的远程方法调用都跟调用本地方法一样简单.下面先来看一个简单的示例。
rpcserver.lua
local Task = require("distri.uthread.task") local Distri = require("distri.distri") local Redis = require("distri.redis") local Socket = require("distri.socket") local chuck = require("chuck") local RPC = require("distri.rpc") local Packet = chuck.packet local config = RPC.Config( function (data) --encoder local wpk = Packet.wpacket(512) wpk:WriteTab(data) return wpk end, function (packet) --decoder return packet:ReadTab() end) local rpcServer = RPC.Server(config) rpcServer:RegService("hello",function () return "world" end) local server = Socket.stream.Listen("127.0.0.1",8010,function (s,errno) if s then s:Ok(4096,Socket.stream.decoder.rpacket(4096),function (_,msg,errno) if msg then rpcServer:ProcessRPC(s,msg) else s:Close() s = nil end end) end end) if server then Distri.Signal(chuck.signal.SIGINT,Distri.Stop) Distri.Run() end
rpcclient.lua
local Task = require("distri.uthread.task") local Distri = require("distri.distri") local Redis = require("distri.redis") local Socket = require("distri.socket") local chuck = require("chuck") local RPC = require("distri.rpc") local Packet = chuck.packet local Sche = require("distri.uthread.sche") local config = RPC.Config( function (data) --encoder local wpk = Packet.wpacket(512) wpk:WriteTab(data) return wpk end, function (packet) --decoder return packet:ReadTab() end) local rpcClient = RPC.Client(config) local c = 0 if Socket.stream.Connect("127.0.0.1",8010,function (s,errno) if s then if not s:Ok(4096,Socket.stream.decoder.rpacket(4096), function (_,msg,errno) if msg then c = c + 1 RPC.OnRPCResponse(config,s,msg) else print("close") s:Close() s = nil end end) then return end rpcClient:Connect(s) for i = 1,100 do Task.New(function () while true do local err,ret = rpcClient:Call("hello") if ret ~= "world" then print("err") break end end end) end end end) then Distri.Signal(chuck.signal.SIGINT,Distri.Stop) Distri.Run() end
首先需要了解的是 RPC.Config
,它是RPC的配置对象:
local rpc_config = {} function rpc_config:new(encoder,decoder,serializer,unserializer) if not encoder or not decoder then return nil end local o = {} o.__index = rpc_config setmetatable(o, o) o.decoder = decoder --将数据从packet中取出 o.encoder = encoder --使用特定封包格式将数据封装到packet中 o.serializer = serializer --将lua table序列化成传输格式,可以为空 o.unserializer = unserializer --将传输格式的数据反序列化成lua table,可以为空 return o end
其中提供了4个重要的方法,这个对象将会被传递给 RPC.Client
和 RPC.Server
供他们将调用的参数。
在上面的示例中,我只提供了 decoder
和 encoder
.下面简单介绍下这4个方法的作用.
serializer: rpc调用的参数和返回值都通过luatable包裹.这个函数用于将这个被包裹的table序列化成一种公共传输格式,例如json.其输出对象将会被提供给encoder.
unserializer: 与serializer的作用正好相反,将公共传输格式反序列化成luatable.其输出对象将会被提供给decoder.如果不提供这两个函数,则直接将luatable提供给encoder/decoder.
encoder: 将传输数据封装成网络包,供Send发送.
decoder: 从网络包中提取出传输的数据.
在上面的示例中,我使用了内置的一种packet.直接将luatable写入到packet中.
示例中需要注意的另外一点是, rpcClient:Call("hello")
必须在coroutine上下文中使用,否则将会返回错误.而 rpcServer:ProcessRPC(s,msg)
则不是必须的,但如果在RPC请求的处理函数中需要执行一些阻塞调用,例如请求别的远程方法,或请求redis数据.则必须将 rpcServer:ProcessRPC(s,msg)
放在coroutine下执行.
如果在一条链路上只涉及到RPC请求,那么完全可以对rpcclient做一个简单的封装,省得每次都手动执行所有的初始化步骤,例如:
local WrapRPCClient = {} function WrapRPCClient:new(config) local o = {} o.__index = actor setmetatable(o,o) o.rpc = RPC.Client(config) return o end function WrapRPCClient:Connect(host,port,on_success) local config = self.rpc.config if Socket.stream.Connect("127.0.0.1",8010,function (s,errno) if s then if not s:Ok(4096,Socket.stream.decoder.rpacket(4096), function (_,msg,errno) if msg then RPC.OnRPCResponse(config,s,msg) else print("close") s:Close() s = nil end end) then return end rpcClient:Connect(s) on_success() end end) then return true end end function WrapRPCClient:Call(func,...) return self.rpc:Call(func,...) end
有了这个封装之后,只需要调用 Connect
,当on_success回调之后就可以直接使用Call调用远程方法了.
https://github.com/sniperHW/chuck