本文主要研究一下CRDT
CRDT是Conflict-free Replicated Data Type的简称,也称为a passive synchronisation,即免冲突的可复制的数据类型,这种数据类型可以用于数据跨网络复制并且可以自动解决冲突达到一致,非常适合使用AP架构的系统在各个partition之间复制数据时使用;具体实现上可以分为State-based的CvRDT、Operation-based的CmRDT、Delta-based、Pure operation-based等
Consistency without Consensus,guarantee convergence to the same value in spite of network delays, partitions and message reordering
CvRDT
) CmRDT
) Delta-based可以理解为是结合State-based及Operation-based的一种改进,它通过delta-mutators来进行replicate
通常Operation-based的方式需要prepare方法生成operations,这里可能存在延时,Pure operation-based是指prepare的实现不是通过对比state生成operations,而是仅仅返回现成的operations,这就需要记录每一步对object state操作的operations
对于CRDT来说,为了实现Conflict-free Replicated对数据结构的一些操作需要满足如下条件:
(a+(b+c)=(a+b)+c),即grouping没有影响
(a+b=b+a),即order没有影响
(a+a=a),即duplication没有影响( 幂等
)
CRDT的基本数据类型有Counters、Registers、Sets
G-Counter
) 使用max函数来进行merge
PN-Counter
) 使用两个G-Counter来实现,一个用于递增,一个用于递减,最后取值进行sum
register有assign()及value()两种操作
LWW-Register
) 给每个assign操作添加unique ids,比如timestamps或者vector clock,使用max函数进行merge
MV-Register
) 类似G-Counter,每次assign都会新增一个版本,使用max函数进行merge
G-Set
) 使用union操作进行merge
2P-Set
) 使用两个G-Set来实现,一个addSet用于添加,一个removeSet用于移除
LWW-element Set
) 类似2P-Set,有一个addSet,一个removeSet,不过对于元素增加了timestamp信息,且timestamp较高的add及remove优先
OR-Set
) 类似2P-Set,有一个addSet,一个removeSet,不过对于元素增加了tag信息,对于同一个tag的操作add优先于remove
关于Array有Replicated Growable Array( RGA
),支持addRight(v, a)操作
Graph可以基于Sets结构实现,不过需要处理并发的addEdge(u, v)、removeVertex(u)操作
Map需要处理并发的put、rmv操作
这里使用 wurmloch-crdt 的实现
wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/GCounter.java
public class GCounter extends AbstractCrdt<GCounter, GCounter.UpdateCommand> { // fields private Map<String, Long> entries = HashMap.empty(); // constructor public GCounter(String nodeId, String crdtId) { super(nodeId, crdtId, BehaviorProcessor.create()); } // crdt @Override protected Option<UpdateCommand> processCommand(UpdateCommand command) { final Map<String, Long> oldEntries = entries; entries = entries.merge(command.entries, Math::max); return entries.equals(oldEntries)? Option.none() : Option.of(new UpdateCommand(crdtId, entries)); } // core functionality public long get() { return entries.values().sum().longValue(); } public void increment() { increment(1L); } public void increment(long value) { if (value < 1L) { throw new IllegalArgumentException("Value needs to be a positive number."); } entries = entries.put(nodeId, entries.get(nodeId).getOrElse(0L) + value); commands.onNext(new UpdateCommand( crdtId, entries )); } //...... }
wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/PNCounter.java
public class PNCounter extends AbstractCrdt<PNCounter, PNCounter.UpdateCommand> { // fields private Map<String, Long> pEntries = HashMap.empty(); private Map<String, Long> nEntries = HashMap.empty(); // constructor public PNCounter(String nodeId, String crtdId) { super(nodeId, crtdId, BehaviorProcessor.create()); } // crdt protected Option<UpdateCommand> processCommand(PNCounter.UpdateCommand command) { final Map<String, Long> oldPEntries = pEntries; final Map<String, Long> oldNEntries = nEntries; pEntries = pEntries.merge(command.pEntries, Math::max); nEntries = nEntries.merge(command.nEntries, Math::max); return pEntries.equals(oldPEntries) && nEntries.equals(oldNEntries)? Option.none() : Option.of(new UpdateCommand(crdtId, pEntries, nEntries)); } // core functionality public long get() { return pEntries.values().sum().longValue() - nEntries.values().sum().longValue(); } public void increment() { increment(1L); } public void increment(long value) { if (value < 1L) { throw new IllegalArgumentException("Value needs to be a positive number."); } pEntries = pEntries.put(nodeId, pEntries.get(nodeId).getOrElse(0L) + value); commands.onNext(new UpdateCommand( crdtId, pEntries, nEntries )); } public void decrement() { decrement(1L); } public void decrement(long value) { if (value < 1L) { throw new IllegalArgumentException("Value needs to be a positive number."); } nEntries = nEntries.put(nodeId, nEntries.get(nodeId).getOrElse(0L) + value); commands.onNext(new UpdateCommand( crdtId, pEntries, nEntries )); } //...... }
wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/LWWRegister.java
public class LWWRegister<T> extends AbstractCrdt<LWWRegister<T>, LWWRegister.SetCommand<T>> { // fields private T value; private StrictVectorClock clock; // constructor public LWWRegister(String nodeId, String crdtId) { super(nodeId, crdtId, BehaviorProcessor.create()); this.clock = new StrictVectorClock(nodeId); } // crdt protected Option<SetCommand<T>> processCommand(SetCommand<T> command) { if (clock.compareTo(command.getClock()) < 0) { clock = clock.merge(command.getClock()); doSet(command.getValue()); return Option.of(command); } return Option.none(); } // core functionality public T get() { return value; } public void set(T newValue) { if (! Objects.equals(value, newValue)) { doSet(newValue); commands.onNext(new SetCommand<>( crdtId, value, clock )); } } // implementation private void doSet(T value) { this.value = value; clock = clock.increment(); } //...... }
wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/MVRegister.java
public class MVRegister<T> extends AbstractCrdt<MVRegister<T>, MVRegister.SetCommand<T>> { // fields private Array<Entry<T>> entries = Array.empty(); // constructor public MVRegister(String nodeId, String crdtId) { super(nodeId, crdtId, ReplayProcessor.create()); } // crdt protected Option<SetCommand<T>> processCommand(SetCommand<T> command) { final Entry<T> newEntry = command.getEntry(); if (!entries.exists(entry -> entry.getClock().compareTo(newEntry.getClock()) > 0 || entry.getClock().equals(newEntry.getClock()))) { final Array<Entry<T>> newEntries = entries .filter(entry -> entry.getClock().compareTo(newEntry.getClock()) == 0) .append(newEntry); doSet(newEntries); return Option.of(command); } return Option.none(); } // core functionality public Array<T> get() { return entries.map(Entry::getValue); } public void set(T newValue) { if (entries.size() != 1 || !Objects.equals(entries.head().getValue(), newValue)) { final Entry<T> newEntry = new Entry<>(newValue, incVV()); doSet(Array.of(newEntry)); commands.onNext(new SetCommand<>( crdtId, newEntry )); } } // implementation private void doSet(Array<Entry<T>> newEntries) { entries = newEntries; } private VectorClock incVV() { final Array<VectorClock> clocks = entries.map(Entry::getClock); final VectorClock mergedClock = clocks.reduceOption(VectorClock::merge).getOrElse(new VectorClock()); return mergedClock.increment(nodeId); } //...... }
wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/GSet.java
public class GSet<E> extends AbstractSet<E> implements Crdt<GSet<E>, GSet.AddCommand<E>> { // fields private final String crdtId; private final Set<E> elements = new HashSet<>(); private final Processor<AddCommand<E>, AddCommand<E>> commands = ReplayProcessor.create(); // constructor public GSet(String crdtId) { this.crdtId = Objects.requireNonNull(crdtId, "Id must not be null"); } // crdt @Override public String getCrdtId() { return crdtId; } @Override public void subscribe(Subscriber<? super AddCommand<E>> subscriber) { commands.subscribe(subscriber); } @Override public void subscribeTo(Publisher<? extends AddCommand<E>> publisher) { Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> { final Option<AddCommand<E>> newCommand = processCommand(command); newCommand.peek(commands::onNext); }); } private Option<AddCommand<E>> processCommand(AddCommand<E> command) { return doAdd(command.getElement())? Option.of(command) : Option.none(); } // core functionality @Override public int size() { return elements.size(); } @Override public Iterator<E> iterator() { return new GSetIterator(); } @Override public boolean add(E element) { commands.onNext(new AddCommand<>(crdtId, element)); return doAdd(element); } // implementation private synchronized boolean doAdd(E element) { return elements.add(element); } //...... }
wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/TwoPSet.java
public class TwoPSet<E> extends AbstractSet<E> implements Crdt<TwoPSet<E>, TwoPSet.TwoPSetCommand<E>> { // fields private final String crdtId; private final Set<E> elements = new HashSet<>(); private final Set<E> tombstone = new HashSet<>(); private final Processor<TwoPSetCommand<E>, TwoPSetCommand<E>> commands = ReplayProcessor.create(); // constructor public TwoPSet(String crdtId) { this.crdtId = Objects.requireNonNull(crdtId, "CrdtId must not be null"); } // crdt @Override public String getCrdtId() { return crdtId; } @Override public void subscribe(Subscriber<? super TwoPSetCommand<E>> subscriber) { commands.subscribe(subscriber); } @Override public void subscribeTo(Publisher<? extends TwoPSetCommand<E>> publisher) { Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> { final Option<TwoPSetCommand<E>> newCommand = processCommand(command); newCommand.peek(commands::onNext); }); } private Option<TwoPSetCommand<E>> processCommand(TwoPSetCommand<E> command) { if (command instanceof TwoPSet.AddCommand) { return doAdd(((TwoPSet.AddCommand<E>) command).getElement())? Option.of(command) : Option.none(); } else if (command instanceof TwoPSet.RemoveCommand) { return doRemove(((TwoPSet.RemoveCommand<E>) command).getElement())? Option.of(command) : Option.none(); } return Option.none(); } // core functionality @Override public int size() { return elements.size(); } @Override public Iterator<E> iterator() { return new TwoPSetIterator(); } @Override public boolean add(E value) { final boolean changed = doAdd(value); if (changed) { commands.onNext(new TwoPSet.AddCommand<>(crdtId, value)); } return changed; } // implementation private boolean doAdd(E value) { return !tombstone.contains(value) && elements.add(value); } private boolean doRemove(E value) { return tombstone.add(value) | elements.remove(value); } //...... }
wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/ORSet.java
public class ORSet<E> extends AbstractSet<E> implements Crdt<ORSet<E>, ORSet.ORSetCommand<E>> /*, ObservableSet<E> */ { // fields private final String crdtId; private final Set<Element<E>> elements = new HashSet<>(); private final Set<Element<E>> tombstone = new HashSet<>(); private final Processor<ORSetCommand<E>, ORSetCommand<E>> commands = ReplayProcessor.create(); // constructor public ORSet(String crdtId) { this.crdtId = Objects.requireNonNull(crdtId, "Id must not be null"); } // crdt @Override public String getCrdtId() { return crdtId; } @Override public void subscribe(Subscriber<? super ORSetCommand<E>> subscriber) { commands.subscribe(subscriber); } @Override public void subscribeTo(Publisher<? extends ORSetCommand<E>> publisher) { Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> { final Option<ORSetCommand<E>> newCommand = processCommand(command); newCommand.peek(commands::onNext); }); } private Option<ORSetCommand<E>> processCommand(ORSetCommand<E> command) { if (command instanceof AddCommand) { return doAdd(((AddCommand<E>) command).getElement())? Option.of(command) : Option.none(); } else if (command instanceof RemoveCommand) { return doRemove(((RemoveCommand<E>) command).getElements())? Option.of(command) : Option.none(); } return Option.none(); } // core functionality @Override public int size() { return doElements().size(); } @Override public Iterator<E> iterator() { return new ORSetIterator(); } @Override public boolean add(E value) { final boolean contained = doContains(value); prepareAdd(value); return !contained; } // implementation private static <U> Predicate<Element<U>> matches(U value) { return element -> Objects.equals(value, element.getValue()); } private synchronized boolean doContains(E value) { return elements.parallelStream().anyMatch(matches(value)); } private synchronized Set<E> doElements() { return elements.parallelStream().map(Element::getValue).collect(Collectors.toSet()); } private synchronized void prepareAdd(E value) { final Element<E> element = new Element<>(value, UUID.randomUUID()); commands.onNext(new AddCommand<>(getCrdtId(), element)); doAdd(element); } private synchronized boolean doAdd(Element<E> element) { return (elements.add(element) | elements.removeAll(tombstone)) && (!tombstone.contains(element)); } private synchronized void prepareRemove(E value) { final Set<Element<E>> removes = elements.parallelStream().filter(matches(value)).collect(Collectors.toSet()); commands.onNext(new RemoveCommand<>(getCrdtId(), removes)); doRemove(removes); } private synchronized boolean doRemove(Collection<Element<E>> removes) { return elements.removeAll(removes) | tombstone.addAll(removes); } //...... }
G-Counter、PN-Counter
)、Registers( LWW-Register、MV-Register
)、Sets( G-Set、2P-Set、LWW-element Set、OR-Set
`)