My multi-paxos service implement :-)
这是对multi-paxos协议服务的实现,并提供了给了使用者可以拓展的简单接口,以及简单的客户端。本实现有如下特性:
Paxos算法解决的问题是在一个可能发生上述异常的分布式系统中如何就某个值达成一致,保证不论发生以上任何异常,都不会破坏决议的一致性。一个典型的场景是,在一个分布式数据库系统中,如果各节点的初始状态一致,每个节点都执行相同的操作序列,那么他们最后能得到一个一致的状态。为保证每个节点执行相同的命令序列,需要在每一条指令上执行一个“一致性算法”以保证每个节点看到的指令一致。
paxos协议中有三种角色:
paxos协议保证在每一轮的提案中,只要某一个提案被大于半数的accepter接受,本轮的提案也就生效了,不会再被修改和破坏。具体的算法说明可以看 维基百科 。
使用MyPaxos协议服务,需要下面几步:
PaxosCallback
我在这里使用MyPaxos来实现一个分布式的简单kv存储。
{ "nodes" : [{ // 节点1,服务器端口为33333,accepter的端口为33334,proposer的端口为33335 // learner的端口为33336 "id" : 1, "host" : "localhost", "port" : 33333 }, { "id" : 2, "host" : "localhost", "port" : 33343 }, { "id" : 3, "host" : "localhost", "port" : 33353 }], "myid" : 1, //本节点的id "timeout" : 1000, //通信超时 "learningInterval" : 1000, // learner的学习时间间隔 "dataDir" : "./dataDir/", // 数据持久化的位置,用于崩溃恢复 "enableDataPersistence" : false // 是否开启数据持久化功能 }
public class KvCallback implements PaxosCallback { /** * 使用map来保存key与value映射 */ private Map<String, String> kv = new HashMap<>(); private Gson gson = new Gson(); @Override public void callback(String msg) { /** * 一共提供了三种动作: get : 获取 put : 添加 delete : 删除 */ MsgBean bean = gson.fromJson(msg, MsgBean.class); switch (bean.getType()) { case "get": System.out.println(kv.get(bean.getKey())); break; case "put": kv.put(bean.getKey(), bean.getValue()); System.out.println("ok"); break; case "delete": kv.remove(bean.getKey()); System.out.println("ok"); break; default: break; } } }
public class MsgBean { private String type; private String key; private String value; public MsgBean(String type, String key, String value) { super(); this.type = type; this.key = key; this.value = value; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getKey() { return key; } public void setKey(String key) { this.key = key; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } }
public class ServerTest { public static void main(String[] args) { MyPaxos server = new MyPaxos(new KvCallback(), "./conf/conf.json"); try { server.start(); } catch (IOException | InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
public class ClientTest { public static void main(String[] args) { MyPaxosClient client = new MyPaxosClient("localhost", 33333); try { client.submit(new Gson().toJson(new MsgBean("put", "name", "Mike"))); client.submit(new Gson().toJson(new MsgBean("put", "age", "22"))); client.submit(new Gson().toJson(new MsgBean("get", "name", ""))); client.submit(new Gson().toJson(new MsgBean("delete", "name", ""))); client.submit(new Gson().toJson(new MsgBean("get", "name", ""))); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
节点1
learner-1 start... proposer-1 start... accepter-1 start... paxos server-1 start... ok ok Mike ok null
节点2
learner-2 start... paxos server-2 start... proposer-2 start... accepter-2 start... ok ok Mike ok null
节点3
learner-3 start... proposer-3 start... accepter-3 start... paxos server-3 start... ok ok Mike ok null