转载

ElasticSearch56.3[SOURCE]+logstash6.4.2[RPM]的logstash-input-jdbc实现mysql数据同步

ElasticSearch56.3[SOURCE]+logstash6.4.2[RPM]的logstash-input-jdbc实现mysql数据同步

ElasticSearch安装忽略

安装logstash

官方:https://www.elastic.co/guide/en/logstash/current/installing-logstash.html

1.下载公共密钥

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

2.添加yum源

vim  /etc/yum.repos.d/logstash.repo

文件中写入

[logstash-5.x]

name=Elastic repository for 5.x packages

baseurl=https://artifacts.elastic.co/packages/5.x/yum

gpgcheck=1

gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch

enabled=1

autorefresh=1

type=rpm-md

#yum makecache

3.使用yum安装

yum install logstash

4.验证是否安装成功

进入 logstash 安装目录

cd /usr/share/logstash

运行

bin/logstash -e 'input { stdin { } } output { stdout {} }'

等待几秒钟 出现  

The stdin plugin is now waiting for input:

然后输入 

hello world

得到类似的结果

2016-11-24T08:01:55.949Z bogon hello world

安装logstash-input-jdbc插件

1.修改ruby仓库镜像

如果没有安装 gem 的话 安装gem 

安装ruby 

yum install ruby

装好以后测试一下 

ruby -v 

ruby 1.8.7 (2012-06-29 patchlevel 370) [i386-linux] 

ok 已经变成1.8.7 了

安装rubygems 

yum install rubygems

替换国内的镜像

gem sources --add https://gems.ruby-china.com/ --remove https://rubygems.org/

验证是否成功

gem sources -l

修改Gemfile的数据源地址

whereis logstash # 查看logstash安装的位置, 我的在 /usr/share/logstash目录

cd /usr/share/logstash

vim Gemfile

修改 source 的值 为: "https://gems.ruby-china.com/"

vim  Gemfile

# 找到 remote 修改它的值为:

https://gems.ruby-china.com/

或者直接替换源这样你不用改你的 Gemfile 的 source。

#gem install bundler

$ bundle config mirror.https://rubygems.org https://gems.ruby-china.com/

然后开始安装

/usr/share/logstash/bin/logstash-plugin  install logstash-input-jdbc

如果镜像地址没有改错的话应该可以直接安装

或者  进入源码地址的release页面logstash-input-jdbc

2.开始同步 mysql 数据进行全量更新, 一次同步表中所有的记录

需要建立 两个文件  一个  .conf后缀的文件    一个 .sql 后缀的文件

一个 mysql 的Java 驱动包  : mysql-connector-java-5.1.47-bin.jar    到这里下载:https://dev.mysql.com/downloads/connector/j/

需要同步数据库的表结构:

root@mysql6683 16:29:  [test]> desc test.cc;

+-------+-------------+------+-----+---------+-------+

| Field | Type        | Null | Key | Default | Extra |

+-------+-------------+------+-----+---------+-------+

| id    | int(11)     | YES  |     | NULL    |       |

| name  | varchar(20) | YES  |     | NULL    |       |

+-------+-------------+------+-----+---------+-------+

2 rows in set (0.00 sec)

root@mysql6683 16:29:  [test]> select * from test.cc;

+------+------+

| id   | name |

+------+------+

|    1 | rsc  |

|    2 | gmy  |

|    3 | ryx  |

+------+------+

3 rows in set (0.00 sec)

logstash-mysql.conf  内容:

里面的参数可以参考 logstash-input-jdbc官方参考文档

vim /usr/share/logstash/config/logstash-mysql.conf

input {

stdin {

}

jdbc {

# 数据库地址  端口  数据库名

jdbc_connection_string => "jdbc:mysql://localhost:3306/test"    #test表示使用test数据库

# 数据库用户名      

jdbc_user => "root"

# 数据库密码

jdbc_password => "123456"

# mysql java驱动地址 

jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.47-bin.jar"

jdbc_driver_class => "com.mysql.jdbc.Driver"

jdbc_paging_enabled => "true"

jdbc_page_size => "50000"

# sql 语句文件

statement_filepath => "/usr/share/logstash/config/test.sql"#通过引用外部文件执行sq语句

#statement => "SELECT * FROM hexin_erp_order"   #也可以通过直接输入查询语句

schedule => "* * * * *"

type => "jdbc"

}

}

output {

stdout {

codec => json_lines

}

elasticsearch {

hosts  => "localhost:9200"

index => "contacts"

document_type => "contact"

document_id => "%{id}"

}

}

cat /usr/share/logstash/config/test.sql

select * from cc

++++++++++++++++成功配置文件+++++++++++++++++++

input {

stdin {

}

jdbc {

jdbc_connection_string => "jdbc:mysql://192.168.66.83:3306/test"

jdbc_user => "root"

jdbc_password => "123456"

jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.47-bin.jar"

jdbc_driver_class => "com.mysql.jdbc.Driver"

jdbc_paging_enabled => "true"

jdbc_page_size => "50000"

statement_filepath => "/usr/share/logstash/config/test.sql"

schedule => "* * * * *"

type => "jdbc"

}

}

output {

stdout {

codec => "json_lines"

}

elasticsearch {

hosts  => "192.168.66.83:9200"

index => "contacts"

document_type => "contact"

document_id => "%{id}"

}

}

+++++++++++++++++++++++++++++++++++

注意: 在你的数据库里 要有一个数据库名字和logstash-mysql.conf 里的对应就可以了, 表名和 test.sql 里的对应就可以了   在表中  有一个id字段是为了和test.sql 中document_id => "%{id}" 这个参数对应 可以执行修改

然后开始执行

/usr/share/logstash/bin/logstash -f /usr/share/logstash/config/logstash-mysql.conf

如果出现错误 或者没有结果 可以进入 logs 目录中查看日志

出现类似这样的内容  就可以了

本例是对一个数据库表进行同步,如果需要同步多个表的数据,可以创建多个配置文件,也可以在一个配置文件中指定多个 jdbc input。配置中的所有项目都必须重新复制一遍。

增量更新

这个例子中的SQL执行的全量更新,如果需要进行增量更新,就需要对SQL进行一些修改。

input { 

stdin { 

jdbc { 

# 数据库地址端口数据库名 

jdbc_connection_string => "jdbc:mysql://localhost:3306/shen" 

# 数据库用户名 

jdbc_user => "root" 

# 数据库密码 

jdbc_password => "rootroot" 

# mysql java驱动地址 

jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.43-bin.jar" 

# 驱动类的名称 

jdbc_driver_class => "com.mysql.jdbc.Driver" 

jdbc_paging_enabled => "true" 

jdbc_page_size => "50000" 

#是否记录上次运行的结果 

record_last_run => true 

#记录上次运行结果的文件位置 

last_run_metadata_path => "" 

#是否使用数据库某一列的值, 

use_column_value => true 

tracking_column => "id" 

#numeric或者timestamp 

tracking_column_type => "numeric" 

#如果为true则会清除 last_run_metadata_path 的记录,即重新开始同步数据 

clean_run => false 

#sql_last_value根据tracking类型,默认为0或者1970-1-1 

statement => "SELECT * FROM TABLE WHERE id > :last_sql_value" 

# sql 语句文件,对于复杂的查询,可以放在文件中。 

# statement_filepath => "filename.sql" 

# 设置监听间隔,语法与Linux系统Cron相同 

schedule => "* * * * *" 

output { 

stdout { 

codec => json_lines 

elasticsearch { 

hosts=> "localhost:9200" 

index => "contacts" 

document_type => "contact" 

document_id => "%{id}" 

增量更新会忽略对历史数据的更新,如果业务中历史数据经常发生变化,则可以通过全量更新的方法。

查看是否同步成功:

[root@node6683 ~]# curl http://192.168.66.83:9200/contacts/contact/1?pretty=true

{

"_index" : "contacts",

"_type" : "contact",

"_id" : "1",

"_version" : 39,

"found" : true,

"_source" : {

"@version" : "1",

"@timestamp" : "2018-10-23T08:54:00.154Z",

"id" : 1,

"type" : "jdbc",

"name" : "rsc"

}

}

url 参数说明  contacts 是对应logstash-mysql.conf中 index => "contacts" contact 对应document_type => "contact"   1表示id为1 的数据

ElasticSearch56.3[SOURCE]+logstash6.4.2[RPM]的logstash-input-jdbc实现mysql数据同步

原文  http://www.linuxmysql.com/14/2018/956.htm
正文到此结束
Loading...