评论

收藏

[MySQL] MySQLmom程序全量同步MySQL表数据到ES

数据库 数据库 发布于:2021-08-03 11:57 | 阅读数:528 | 评论:0

特殊提示:
本次演示的ES的版本为公司内部定制的elasticsearch-5.0.0的版本测试的,而且ES是单节点安装。本次演示从一个全量同步MySQL表数据到elasticsearch-5.0.0开始

一、创建全量同步配置文件
[root@tidb05 mysqlsmom]# mom new test_mom/init_config.py -t init --force
<p>/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!</p>
<p>RequestsDependencyWarning)</p>
<p>new config at /data1/soft/mysqlsmom/test_mom/init_config.py</p>
<p>[root@tidb05 mysqlsmom]# echo $?</p>
0
说明:test_mom是可以指定名称的。
解决其中一个warning警告,其实对本次演示时没任何影响的,但是对于洁癖的我,看着总觉得不爽。具体warning警告内容和解决办法如下:
[root@tidb05 mysqlsmom]# mom new 197_testdb_mom/init_config.py -t init --force
<p>/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!</p>
<p>RequestsDependencyWarning)</p>
<p>new config at /data1/soft/mysqlsmom/197_testdb_mom/init_config.py</p>
<p>[root@tidb05 mysqlsmom]# ll /data1/soft/mysqlsmom/197_testdb_mom/init_config.py</p>
-rw-r--r-- 1 root root 1298 Jul 11 10:46 /data1/soft/mysqlsmom/197_testdb_mom/init_config.py
上面的/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!
<strong>解决办法:</strong>
<p>pip uninstall urllib3 -y</p>
<p>pip uninstall chardet -y</p>
pip install requests
二、案例配置演示
案例一:全量同步MySQL某张表全部字段数据到es
创建测试库和测试表,并写入测试数据到test01测试表:
create database stdb01;
<p>CREATE TABLE <code>test01 (
 id int(8) NOT NULL AUTO_INCREMENT,
 username varchar(20) COLLATE utf8_unicode_ci NOT NULL,
 password varchar(20) COLLATE utf8_unicode_ci NOT NULL,
 create_time varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test01(username,password,create_time) values('tomcat', 'xiaohuahua',now());
INSERT INTO test01(username,password,create_time) values('java', '123456',now());
INSERT INTO test01(username,password,create_time) values('lua', 'ssd123456',now());
INSERT INTO test01(username,password,create_time) values('php', 'seurw456',now());
INSERT INTO test01(username,password,create_time) values('python', 'seueurw456',now());
root@tidb04 10:57:  [stdb01]> select  from test01;
+----+----------+------------+---------------------+
| id | username | password   | create_time     |
+----+----------+------------+---------------------+
|  1 | tomcat   | xiaohuahua | 2021-07-11 10:57:57 |
|  2 | java   | 123456   | 2021-07-11 10:57:57 |
|  3 | lua    | ssd123456  | 2021-07-11 10:57:57 |
|  4 | php    | seurw456   | 2021-07-11 10:57:57 |
|  5 | python   | seueurw456 | 2021-07-11 10:57:58 |
+----+----------+------------+---------------------+
5 rows in set (0.00 sec)
创建连接库的账户:(这个账户的权限其实可以再小点:grant replication slave on .
GRANT ALL PRIVILEGES ON </em>.* TO 'click_rep'@'172.16.0.246' identified by 'jwtest123456';flush privileges;
全量同步stdb01.test01表数据的配置文件内容如下:
[root@tidb05 mysqlsmom]# cat /data1/soft/mysqlsmom/test_mom/init_config.py<br><h1>coding=utf-8</h1><br>
STREAM = "INIT"<br><h1>修改数据库连接</h1><br>
<p>CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1</h1><br>
BULK_SIZE = 1<br><h1>修改elasticsearch节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
<p>TASKS = [</p>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 0>
]<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"


执行全量同步命令:
[root@tidb05 mysqlsmom]# pwd
<p>/data1/soft/mysqlsmom</p>
<p>[root@tidb05 mysqlsmom]# ls</p>
<p>197_testdb_mom  docs  mysqlsmom  README.md  README_OLD.md  setup.py  test_mom</p>
<p>[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py</p>
<p>/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!</p>
<p>RequestsDependencyWarning)</p>
<p>2021-07-11 11:09:44,500 root     INFO   {"username": "tomcat", "password": "xiaohuahua", "create_time": "2021-07-11 10:57:57", "id": 1, "_id": 1}</p>
<p>2021-07-11 11:09:44,623 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk [status:200 request:0.123s]</p>
<p>2021-07-11 11:09:44,624 root     INFO   {"username": "java", "password": "123456", "create_time": "2021-07-11 10:57:57", "id": 2, "_id": 2}</p>
<p>2021-07-11 11:09:44,630 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk [status:200 request:0.006s]</p>
<p>2021-07-11 11:09:44,630 root     INFO   {"username": "lua", "password": "ssd123456", "create_time": "2021-07-11 10:57:57", "id": 3, "_id": 3}</p>
<p>2021-07-11 11:09:44,639 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk [status:200 request:0.009s]</p>
<p>2021-07-11 11:09:44,640 root     INFO   {"username": "php", "password": "seurw456", "create_time": "2021-07-11 10:57:57", "id": 4, "_id": 4}</p>
<p>2021-07-11 11:09:44,644 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk [status:200 request:0.004s]</p>
<p>2021-07-11 11:09:44,645 root     INFO   {"username": "python", "password": "seueurw456", "create_time": "2021-07-11 10:57:58", "id": 5, "_id": 5}</p>
<p>2021-07-11 11:09:44,650 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]</p>
<p>real  0m0.640s</p>
<p>user  0m0.444s</p>
sys 0m0.051s
具体ES里面的数据此处就不再截图演示了
案例二:全量同步MySQL某张表部分字段数据到es
创建测试表test02和写入测试数据:
CREATE TABLE <code>test02 (
 id int(8) NOT NULL AUTO_INCREMENT,
 username varchar(20) COLLATE utf8_unicode_ci NOT NULL,
 password varchar(20) COLLATE utf8_unicode_ci NOT NULL,
 create_time varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test02(username,password,create_time) values('tomcat', 'xiaohuahua',now());
INSERT INTO test02(username,password,create_time) values('java', '123456',now());
INSERT INTO test02(username,password,create_time) values('lua', 'ssd123456',now());
INSERT INTO test02(username,password,create_time) values('php', 'seurw456',now());
INSERT INTO test02(username,password,create_time) values('python', 'seueurw456',now());
运行同步命令:
<p>[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py</p>
<p>/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!</p>
<p>RequestsDependencyWarning)</p>
<p>2021-07-11 11:25:53,126 root     INFO   {"username": "tomcat", "_id": 1, "id": 1}</p>
<p>2021-07-11 11:25:53,217 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk [status:200 request:0.091s]</p>
<p>2021-07-11 11:25:53,218 root     INFO   {"username": "java", "_id": 2, "id": 2}</p>
<p>2021-07-11 11:25:53,223 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]</p>
<p>2021-07-11 11:25:53,223 root     INFO   {"username": "lua", "_id": 3, "id": 3}</p>
<p>2021-07-11 11:25:53,228 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]</p>
<p>2021-07-11 11:25:53,229 root     INFO   {"username": "php", "_id": 4, "id": 4}</p>
<p>2021-07-11 11:25:53,235 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk [status:200 request:0.006s]</p>
<p>2021-07-11 11:25:53,235 root     INFO   {"username": "python", "_id": 5, "id": 5}</p>
<p>2021-07-11 11:25:53,241 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk [status:200 request:0.006s]</p>
<p>real  0m0.597s</p>
<p>user  0m0.440s</p>
sys 0m0.047s
具体ES里面的数据此处就不再截图演示了
此案例演示的具体配置文件如下:
[root@tidb05 mysqlsmom]# cat /data1/soft/mysqlsmom/test_mom/init_config.py<br><h1>coding=utf-8</h1><br>
STREAM = "INIT"<br><h1>修改数据库连接</h1><br>
<p>CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1</h1><br>
BULK_SIZE = 1<br><h1>修改elasticsearch节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
<p>TASKS = [</p>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 1>
]<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"


案例三:全量同步MySQL多张表数据到es
创建测试表test03,test04,写入测试数据:
CREATE TABLE <code>test03 (
 id int(8) NOT NULL AUTO_INCREMENT,
 username varchar(20) COLLATE utf8_unicode_ci NOT NULL,
 password varchar(20) COLLATE utf8_unicode_ci NOT NULL,
 create_time varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test03(username,password,create_time) values('tomcat', 'xiaohuahua',now());
INSERT INTO test03(username,password,create_time) values('java', '123456',now());
CREATE TABLE test04 (
 id int(8) NOT NULL AUTO_INCREMENT,
 username varchar(20) COLLATE utf8_unicode_ci NOT NULL,
 password varchar(20) COLLATE utf8_unicode_ci NOT NULL,
 create_time varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test04(username,password,create_time) values('tomcat', 'xiaohuahua',now());
INSERT INTO test04(username,password,create_time) values('java', '123456',now());
root@tidb04 12:59:  [stdb01]> select  from test04;
+----+----------+------------+---------------------+
| id | username | password   | create_time     |
+----+----------+------------+---------------------+
|  1 | tomcat   | xiaohuahua | 2021-07-11 12:59:01 |
|  2 | java   | 123456   | 2021-07-11 12:59:01 |
+----+----------+------------+---------------------+
2 rows in set (0.00 sec)
root@tidb04 12:59:  [stdb01]> select  from test03;
+----+----------+------------+---------------------+
| id | username | password   | create_time     |
+----+----------+------------+---------------------+
|  1 | tomcat   | xiaohuahua | 2021-07-11 12:58:53 |
|  2 | java   | 123456   | 2021-07-11 12:58:54 |
+----+----------+------------+---------------------+
2 rows in set (0.00 sec)
此案例配置文件内容如下:
[root@tidb05 test_mom]# cat init_config.py<br><h1>coding=utf-8</h1><br>
STREAM = "INIT"<br><h1>修改数据库连接</h1><br>
<p>CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1</h1><br>
BULK_SIZE = 1<br><h1>修改elasticsearch节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
TASKS = [<br><h1>同步stdb01.test03到es:</h1><br>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 2><br><h1>同步stdb01.test04到es:</h1><br>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 3>
]<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"


启动运行命令:
[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py 
<p>/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!</p>
<p>RequestsDependencyWarning)</p>
<p>2021-07-11 13:01:09,473 root     INFO   {"username": "tomcat", "password": "xiaohuahua", "create_time": "2021-07-11 12:58:53", "id": 1, "_id": 1}</p>
<p>2021-07-11 13:01:09,555 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk [status:200 request:0.082s]</p>
<p>2021-07-11 13:01:09,556 root     INFO   {"username": "java", "password": "123456", "create_time": "2021-07-11 12:58:54", "id": 2, "_id": 2}</p>
<p>2021-07-11 13:01:09,561 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]</p>
<p>2021-07-11 13:01:09,564 root     INFO   {"username": "tomcat", "_id": 1, "id": 1}</p>
<p>2021-07-11 13:01:09,636 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk [status:200 request:0.072s]</p>
<p>2021-07-11 13:01:09,636 root     INFO   {"username": "java", "_id": 2, "id": 2}</p>
<p>2021-07-11 13:01:09,642 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]</p>
<p>real  0m0.629s</p>
<p>user  0m0.411s</p>
sys 0m0.055s
具体ES里面的数据此处就不再截图演示了
案例四、同步同MySQL实例下不同的库不同的表数据到ES
MySQL表数据如下:
<p>root@tidb04 10:58:  [test_db]&gt; select * from stdb01.test01;</p>
<p>+----+----------+----------+---------------------+</p>
<p>| id | username | password | create_time     |</p>
<p>+----+----------+----------+---------------------+</p>
<p>| 30 | fox    | 556    | 2021-07-30 08:19:37 |</p>
<p>| 31 | fox    | 556    | 2021-07-30 08:19:38 |</p>
<p>+----+----------+----------+---------------------+</p>
<p>2 rows in set (0.00 sec)</p>
<p>root@tidb04 10:58:  [test_db]&gt; select <em> from test_db.test01;</p>
<p>+----+----------+------------+---------------------+</p>
<p>| id | username | password   | create_time     |</p>
<p>+----+----------+------------+---------------------+</p>
<p>|  1 | tomcat   | xiaohuahua | 2021-07-03 23:51:17 |</p>
<p>|  2 | php    | xiao     | 2021-07-03 23:53:36 |</p>
<p>|  3 | fix    | xiao     | 2021-07-03 23:53:49 |</p>
<p>|  4 | java   | bai    | 2021-07-03 23:54:01 |</p>
<p>+----+----------+------------+---------------------+</p>
4 rows in set (0.00 sec)
全量同步MySQL数据到ES:
[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py 
<p>2021-08-01 10:58:49,966 root     INFO   {"username": "fox", "password": "556", "create_time": "2021-07-30 08:19:37", "id": 30, "_id": 30}</p>
<p>2021-08-01 10:58:49,967 root     INFO   {"username": "fox", "password": "556", "create_time": "2021-07-30 08:19:38", "id": 31, "_id": 31}</p>
<p>2021-08-01 10:58:50,115 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk [status:200 request:0.148s]</p>
<p>2021-08-01 10:58:50,119 root     INFO   {"username": "tomcat", "_id": 1, "id": 1}</p>
<p>2021-08-01 10:58:50,119 root     INFO   {"username": "php", "_id": 2, "id": 2}</p>
<p>2021-08-01 10:58:50,119 root     INFO   {"username": "fix", "_id": 3, "id": 3}</p>
<p>2021-08-01 10:58:50,119 root     INFO   {"username": "java", "_id": 4, "id": 4}</p>
<p>2021-08-01 10:58:50,259 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk [status:200 request:0.139s]</p>
<p>real  0m0.873s</p>
<p>user  0m0.505s</p>
sys 0m0.080s
ES数据验证:
DSC0000.png
DSC0001.png

DSC0002.png

全量同步的配置文件内容如下:
[root@tidb05 mysqlsmom]# cat ./test_mom/init_config.py<br><h1>coding=utf-8</h1><br>
STREAM = "INIT"<br><h1>修改数据库连接</h1><br>
<p>CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1</h1><br>
BULK_SIZE = 50000<br><h1>修改elasticsearch节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
TASKS = [<br><h1>同步stdb01.test03到es:</h1><br>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 4><br><h2>同步test_db.test01到es:</h2><br>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 5>
<p>
<spaceaf36b46d733641ec38b154de40cee487Code 6>
</p>
]<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"


案例五、设置参数BULK_SIZE =1和=10000进行测试全量同步时间测试
设置BULK_SIZE =10000进行全量同步表t_work_order_follow数据到es测
[root@tidb04 ~]# mysql -e "select count(</em>) from stdb01.t_work_order_follow;"
<p>+----------+</p>
<p>| count(*) |</p>
<p>+----------+</p>
<p>|  3975925 |</p>
<p>+----------+</p>
<p>[root@tidb05 mysqlsmom]#  time mom run -c ./test_mom/init_config.py</p>
<p>real  30m7.618s</p>
<p>user  23m51.398s</p>
sys   0m58.087s
设置参数BULK_SIZE =1 进行全量同步表t_work_order_follow数据到es测试,说实话花费的时间非常的长了:
<p>root@tidb04 08:07:  [test_db]&gt; select count(<em>)  from t_work_order_follow;</p>
<p>+----------+</p>
<p>| count(</em>) |</p>
<p>+----------+</p>
<p>|  3975925 |</p>
<p>+----------+</p>
<p>1 row in set (0.62 sec)</p>
<p>[root@tidb06 mysqlsmom]# time mom run -c ./test_mom/init_config.py</p>
<p>real  237m59.067s</p>
<p>user  53m49.099s</p>
<p>sys   3m25.431s</p>
后面测试把设置BULK_SIZE =50000 测试结果和设置BULK_SIZE =10000消耗的时间基本差不多。
全量同步演示到此介绍完毕。后面会分享下增量同步的配置方法,尽情期待。

关注下面的标签,发现更多相似文章