转载

canal初探

我计划以后将单位的搜索项目,慢慢过渡到ES集群,然后实现准实时的搜索.
canal可以感知数据库的变化,作为一个mysql的伪slave,我可以通过canal获取数据库变化,然后批量刷新到ES集群.

canal是阿里开源的一款产品..
虽然感觉国产开源不靠谱.但是没办法..洋人没做,自己又不会做.

主页:
https://github.com/alibaba/canal

下载地址:
https://github.com/alibaba/canal/releases

开发API:
https://github.com/alibaba/canal/wiki/ClientAPI

下载canal,解压然后修改配置文件.
canal.properties
canal.zkServers=192.168.16.105:2181,192.168.16.106:2181,192.168.16.108:2181,192.168.16.109:2181,192.168.16.110:2181
canal.destinations= songod
canal.instance.global.spring.xml = classpath:spring/default-instance.xml


songod/instance.properties
canal.instance.master.address = 192.168.16.98:3306
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp =

# username/password
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName = songod
canal.instance.connectionCharset = UTF-8

# table regex
canal.instance.filter.regex = .*//..*
# table black regex
canal.instance.filter.black.regex =


然后执行  startup.sh 

运行如下程序,接收数据变更.
  1. import java.util.HashMap;  
  2. import java.util.List;  
  3. import java.util.Map;  
  4. import java.util.concurrent.TimeUnit;  
  5.   
  6. import com.alibaba.otter.canal.client.CanalConnector;  
  7. import com.alibaba.otter.canal.client.CanalConnectors;  
  8. import com.alibaba.otter.canal.protocol.CanalEntry.Column;  
  9. import com.alibaba.otter.canal.protocol.CanalEntry.Entry;  
  10. import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;  
  11. import com.alibaba.otter.canal.protocol.CanalEntry.EventType;  
  12. import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;  
  13. import com.alibaba.otter.canal.protocol.CanalEntry.RowData;  
  14. import com.alibaba.otter.canal.protocol.Message;  
  15.   
  16. public class T {  
  17.     public static void main(String args[]) {  
  18.         // 创建链接  
  19.         CanalConnector connector = CanalConnectors.newClusterConnector("192.168.16.105:2181,192.168.16.108:2181",  
  20.                 "songod""""");  
  21.         int batchSize = 100;  
  22.   
  23.         connector.connect();  
  24.         connector.subscribe(".*//..*");  
  25.         while (true) {  
  26.             long batchId = -1;  
  27.             try {  
  28.                 Message message = connector.getWithoutAck(batchSize, new Long(5), TimeUnit.SECONDS); // 获取指定数量的数据  
  29.                 batchId = message.getId();  
  30.                 int size = message.getEntries().size();  
  31.                 System.out.println("batchSize:" + size);  
  32.                 printEntry(message.getEntries());  
  33.   
  34.                 connector.ack(batchId); // 提交确认  
  35.             } catch (Exception e) {  
  36.                 connector.rollback(batchId); // 处理失败, 回滚数据  
  37.                 connector.disconnect();  
  38.             }  
  39.         }  
  40.   
  41.     }  
  42.   
  43.     private static void printEntry(List<Entry> entrys) {  
  44.         for (Entry entry : entrys) {  
  45.             if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN  
  46.                     || entry.getEntryType() == EntryType.TRANSACTIONEND) {  
  47.                 continue;  
  48.             }  
  49.   
  50.             RowChange rowChage = null;  
  51.             try {  
  52.                 rowChage = RowChange.parseFrom(entry.getStoreValue());  
  53.             } catch (Exception e) {  
  54.                 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),  
  55.                         e);  
  56.             }  
  57.             EventType eventType = rowChage.getEventType();  
  58.             System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",  
  59.                     entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  
  60.                     entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));  
  61.   
  62.             for (RowData rowData : rowChage.getRowDatasList()) {  
  63.                 Map<String, Object> map = null;  
  64.                 if (eventType == EventType.DELETE) {  
  65.                     map = printColumn(rowData);  
  66.                 } else if (eventType == EventType.INSERT) {  
  67.                     map = printColumn(rowData);  
  68.                 } else if (eventType == EventType.UPDATE) {  
  69.                     map = printColumn(rowData);  
  70.                 }  
  71.                 System.out.print(eventType + ":");  
  72.                 System.out.println(map);  
  73.             }  
  74.         }  
  75.     }  
  76.   
  77.     private static Map<String, Object> printColumn(RowData rowData) {  
  78.         Map<String, Object> map = new HashMap<String, Object>();  
  79.         for (Column column : rowData.getBeforeColumnsList()) {  
  80.             map.put(column.getName(), column.getValue());  
  81.         }  
  82.         return map;  
  83.     }  
  84. }  

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.0.21</version>
</dependency>

参考:
http://blog.csdn.net/bbirdsky/article/details/41479479

正文到此结束
Loading...