canal初探
我计划以后将单位的搜索项目,慢慢过渡到ES集群,然后实现准实时的搜索.
canal可以感知数据库的变化,作为一个mysql的伪slave,我可以通过canal获取数据库变化,然后批量刷新到ES集群.
canal是阿里开源的一款产品..
虽然感觉国产开源不靠谱.但是没办法..洋人没做,自己又不会做.
主页:
https://github.com/alibaba/canal
下载地址:
https://github.com/alibaba/canal/releases
开发API:
https://github.com/alibaba/canal/wiki/ClientAPI
下载canal,解压然后修改配置文件.
canal.properties
canal.zkServers=192.168.16.105:2181,192.168.16.106:2181,192.168.16.108:2181,192.168.16.109:2181,192.168.16.110:2181
canal.destinations= songod
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
songod/instance.properties
canal.instance.master.address = 192.168.16.98:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
# username/password
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName = songod
canal.instance.connectionCharset = UTF-8
# table regex
canal.instance.filter.regex = .*//..*
# table black regex
canal.instance.filter.black.regex =
然后执行 startup.sh
运行如下程序,接收数据变更.
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.TimeUnit;
-
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.client.CanalConnectors;
- import com.alibaba.otter.canal.protocol.CanalEntry.Column;
- import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
- import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
- import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
- import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
- import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
- import com.alibaba.otter.canal.protocol.Message;
-
- public class T {
- public static void main(String args[]) {
- // 创建链接
- CanalConnector connector = CanalConnectors.newClusterConnector("192.168.16.105:2181,192.168.16.108:2181",
- "songod", "", "");
- int batchSize = 100;
-
- connector.connect();
- connector.subscribe(".*//..*");
- while (true) {
- long batchId = -1;
- try {
- Message message = connector.getWithoutAck(batchSize, new Long(5), TimeUnit.SECONDS); // 获取指定数量的数据
- batchId = message.getId();
- int size = message.getEntries().size();
- System.out.println("batchSize:" + size);
- printEntry(message.getEntries());
-
- connector.ack(batchId); // 提交确认
- } catch (Exception e) {
- connector.rollback(batchId); // 处理失败, 回滚数据
- connector.disconnect();
- }
- }
-
- }
-
- private static void printEntry(List<Entry> entrys) {
- for (Entry entry : entrys) {
- if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
- || entry.getEntryType() == EntryType.TRANSACTIONEND) {
- continue;
- }
-
- RowChange rowChage = null;
- try {
- rowChage = RowChange.parseFrom(entry.getStoreValue());
- } catch (Exception e) {
- throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
- e);
- }
- EventType eventType = rowChage.getEventType();
- System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
-
- for (RowData rowData : rowChage.getRowDatasList()) {
- Map<String, Object> map = null;
- if (eventType == EventType.DELETE) {
- map = printColumn(rowData);
- } else if (eventType == EventType.INSERT) {
- map = printColumn(rowData);
- } else if (eventType == EventType.UPDATE) {
- map = printColumn(rowData);
- }
- System.out.print(eventType + ":");
- System.out.println(map);
- }
- }
- }
-
- private static Map<String, Object> printColumn(RowData rowData) {
- Map<String, Object> map = new HashMap<String, Object>();
- for (Column column : rowData.getBeforeColumnsList()) {
- map.put(column.getName(), column.getValue());
- }
- return map;
- }
- }
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.21</version>
</dependency>
参考:
http://blog.csdn.net/bbirdsky/article/details/41479479
正文到此结束