上山打老虎 发表于 2021-8-3 11:57:30

MySQLmom程序全量同步MySQL表数据到ES

特殊提示:
本次演示的ES的版本为公司内部定制的elasticsearch-5.0.0的版本测试的,而且ES是单节点安装。本次演示从一个全量同步MySQL表数据到elasticsearch-5.0.0开始

一、创建全量同步配置文件
# mom new test_mom/init_config.py -t init --force
<p>/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!</p>
<p>RequestsDependencyWarning)</p>
<p>new config at /data1/soft/mysqlsmom/test_mom/init_config.py</p>
<p># echo $?</p>
0说明:test_mom是可以指定名称的。
解决其中一个warning警告,其实对本次演示时没任何影响的,但是对于洁癖的我,看着总觉得不爽。具体warning警告内容和解决办法如下:
# mom new 197_testdb_mom/init_config.py -t init --force
<p>/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!</p>
<p>RequestsDependencyWarning)</p>
<p>new config at /data1/soft/mysqlsmom/197_testdb_mom/init_config.py</p>
<p># ll /data1/soft/mysqlsmom/197_testdb_mom/init_config.py</p>
-rw-r--r-- 1 root root 1298 Jul 11 10:46 /data1/soft/mysqlsmom/197_testdb_mom/init_config.py上面的/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!

<strong>解决办法:</strong>
<p>pip uninstall urllib3 -y</p>
<p>pip uninstall chardet -y</p>
pip install requests
二、案例配置演示
案例一:全量同步MySQL某张表全部字段数据到es
创建测试库和测试表,并写入测试数据到test01测试表:
create database stdb01;
<p>CREATE TABLE <code>test01 (
id int(8) NOT NULL AUTO_INCREMENT,
username varchar(20) COLLATE utf8_unicode_ci NOT NULL,
password varchar(20) COLLATE utf8_unicode_ci NOT NULL,
create_time varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test01(username,password,create_time) values('tomcat', 'xiaohuahua',now());
INSERT INTO test01(username,password,create_time) values('java', '123456',now());
INSERT INTO test01(username,password,create_time) values('lua', 'ssd123456',now());
INSERT INTO test01(username,password,create_time) values('php', 'seurw456',now());
INSERT INTO test01(username,password,create_time) values('python', 'seueurw456',now());
root@tidb04 10:57:> selectfrom test01;
+----+----------+------------+---------------------+
| id | username | password   | create_time         |
+----+----------+------------+---------------------+
|1 | tomcat   | xiaohuahua | 2021-07-11 10:57:57 |
|2 | java   | 123456   | 2021-07-11 10:57:57 |
|3 | lua      | ssd123456| 2021-07-11 10:57:57 |
|4 | php      | seurw456   | 2021-07-11 10:57:57 |
|5 | python   | seueurw456 | 2021-07-11 10:57:58 |
+----+----------+------------+---------------------+
5 rows in set (0.00 sec)创建连接库的账户:(这个账户的权限其实可以再小点:grant replication slave on . )
GRANT ALL PRIVILEGES ON </em>.* TO 'click_rep'@'172.16.0.246' identified by 'jwtest123456';flush privileges;全量同步stdb01.test01表数据的配置文件内容如下:

# cat /data1/soft/mysqlsmom/test_mom/init_config.py<br><h1>coding=utf-8</h1><br>
STREAM = "INIT"<br><h1>修改数据库连接</h1><br>
<p>CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1</h1><br>
BULK_SIZE = 1<br><h1>修改elasticsearch节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
<p>TASKS = [</p>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 0>
]<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"

执行全量同步命令:
# pwd
<p>/data1/soft/mysqlsmom</p>
<p># ls</p>
<p>197_testdb_momdocsmysqlsmomREADME.mdREADME_OLD.mdsetup.pytest_mom</p>
<p># time mom run -c ./test_mom/init_config.py</p>
<p>/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!</p>
<p>RequestsDependencyWarning)</p>
<p>2021-07-11 11:09:44,500 root         INFO   {"username": "tomcat", "password": "xiaohuahua", "create_time": "2021-07-11 10:57:57", "id": 1, "_id": 1}</p>
<p>2021-07-11 11:09:44,623 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk </p>
<p>2021-07-11 11:09:44,624 root         INFO   {"username": "java", "password": "123456", "create_time": "2021-07-11 10:57:57", "id": 2, "_id": 2}</p>
<p>2021-07-11 11:09:44,630 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk </p>
<p>2021-07-11 11:09:44,630 root         INFO   {"username": "lua", "password": "ssd123456", "create_time": "2021-07-11 10:57:57", "id": 3, "_id": 3}</p>
<p>2021-07-11 11:09:44,639 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk </p>
<p>2021-07-11 11:09:44,640 root         INFO   {"username": "php", "password": "seurw456", "create_time": "2021-07-11 10:57:57", "id": 4, "_id": 4}</p>
<p>2021-07-11 11:09:44,644 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk </p>
<p>2021-07-11 11:09:44,645 root         INFO   {"username": "python", "password": "seueurw456", "create_time": "2021-07-11 10:57:58", "id": 5, "_id": 5}</p>
<p>2021-07-11 11:09:44,650 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk </p>
<p>real    0m0.640s</p>
<p>user    0m0.444s</p>
sys 0m0.051s具体ES里面的数据此处就不再截图演示了。
案例二:全量同步MySQL某张表部分字段数据到es
创建测试表test02和写入测试数据:
CREATE TABLE <code>test02 (
id int(8) NOT NULL AUTO_INCREMENT,
username varchar(20) COLLATE utf8_unicode_ci NOT NULL,
password varchar(20) COLLATE utf8_unicode_ci NOT NULL,
create_time varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test02(username,password,create_time) values('tomcat', 'xiaohuahua',now());
INSERT INTO test02(username,password,create_time) values('java', '123456',now());
INSERT INTO test02(username,password,create_time) values('lua', 'ssd123456',now());
INSERT INTO test02(username,password,create_time) values('php', 'seurw456',now());
INSERT INTO test02(username,password,create_time) values('python', 'seueurw456',now());运行同步命令:

<p># time mom run -c ./test_mom/init_config.py</p>
<p>/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!</p>
<p>RequestsDependencyWarning)</p>
<p>2021-07-11 11:25:53,126 root         INFO   {"username": "tomcat", "_id": 1, "id": 1}</p>
<p>2021-07-11 11:25:53,217 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk </p>
<p>2021-07-11 11:25:53,218 root         INFO   {"username": "java", "_id": 2, "id": 2}</p>
<p>2021-07-11 11:25:53,223 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk </p>
<p>2021-07-11 11:25:53,223 root         INFO   {"username": "lua", "_id": 3, "id": 3}</p>
<p>2021-07-11 11:25:53,228 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk </p>
<p>2021-07-11 11:25:53,229 root         INFO   {"username": "php", "_id": 4, "id": 4}</p>
<p>2021-07-11 11:25:53,235 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk </p>
<p>2021-07-11 11:25:53,235 root         INFO   {"username": "python", "_id": 5, "id": 5}</p>
<p>2021-07-11 11:25:53,241 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk </p>
<p>real    0m0.597s</p>
<p>user    0m0.440s</p>
sys 0m0.047s具体ES里面的数据此处就不再截图演示了。
此案例演示的具体配置文件如下:

# cat /data1/soft/mysqlsmom/test_mom/init_config.py<br><h1>coding=utf-8</h1><br>
STREAM = "INIT"<br><h1>修改数据库连接</h1><br>
<p>CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1</h1><br>
BULK_SIZE = 1<br><h1>修改elasticsearch节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
<p>TASKS = [</p>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 1>
]<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"

案例三:全量同步MySQL多张表数据到es
创建测试表test03,test04,写入测试数据:
CREATE TABLE <code>test03 (
id int(8) NOT NULL AUTO_INCREMENT,
username varchar(20) COLLATE utf8_unicode_ci NOT NULL,
password varchar(20) COLLATE utf8_unicode_ci NOT NULL,
create_time varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test03(username,password,create_time) values('tomcat', 'xiaohuahua',now());
INSERT INTO test03(username,password,create_time) values('java', '123456',now());
CREATE TABLE test04 (
id int(8) NOT NULL AUTO_INCREMENT,
username varchar(20) COLLATE utf8_unicode_ci NOT NULL,
password varchar(20) COLLATE utf8_unicode_ci NOT NULL,
create_time varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test04(username,password,create_time) values('tomcat', 'xiaohuahua',now());
INSERT INTO test04(username,password,create_time) values('java', '123456',now());
root@tidb04 12:59:> selectfrom test04;
+----+----------+------------+---------------------+
| id | username | password   | create_time         |
+----+----------+------------+---------------------+
|1 | tomcat   | xiaohuahua | 2021-07-11 12:59:01 |
|2 | java   | 123456   | 2021-07-11 12:59:01 |
+----+----------+------------+---------------------+
2 rows in set (0.00 sec)
root@tidb04 12:59:> selectfrom test03;
+----+----------+------------+---------------------+
| id | username | password   | create_time         |
+----+----------+------------+---------------------+
|1 | tomcat   | xiaohuahua | 2021-07-11 12:58:53 |
|2 | java   | 123456   | 2021-07-11 12:58:54 |
+----+----------+------------+---------------------+
2 rows in set (0.00 sec)
此案例配置文件内容如下:

# cat init_config.py<br><h1>coding=utf-8</h1><br>
STREAM = "INIT"<br><h1>修改数据库连接</h1><br>
<p>CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1</h1><br>
BULK_SIZE = 1<br><h1>修改elasticsearch节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
TASKS = [<br><h1>同步stdb01.test03到es:</h1><br>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 2><br><h1>同步stdb01.test04到es:</h1><br>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 3>
]<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"

启动运行命令:
# time mom run -c ./test_mom/init_config.py
<p>/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!</p>
<p>RequestsDependencyWarning)</p>
<p>2021-07-11 13:01:09,473 root         INFO   {"username": "tomcat", "password": "xiaohuahua", "create_time": "2021-07-11 12:58:53", "id": 1, "_id": 1}</p>
<p>2021-07-11 13:01:09,555 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk </p>
<p>2021-07-11 13:01:09,556 root         INFO   {"username": "java", "password": "123456", "create_time": "2021-07-11 12:58:54", "id": 2, "_id": 2}</p>
<p>2021-07-11 13:01:09,561 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk </p>
<p>2021-07-11 13:01:09,564 root         INFO   {"username": "tomcat", "_id": 1, "id": 1}</p>
<p>2021-07-11 13:01:09,636 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk </p>
<p>2021-07-11 13:01:09,636 root         INFO   {"username": "java", "_id": 2, "id": 2}</p>
<p>2021-07-11 13:01:09,642 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk </p>
<p>real    0m0.629s</p>
<p>user    0m0.411s</p>
sys 0m0.055s具体ES里面的数据此处就不再截图演示了。
案例四、同步同MySQL实例下不同的库不同的表数据到ES
MySQL表数据如下:

<p>root@tidb04 10:58:&gt; select * from stdb01.test01;</p>
<p>+----+----------+----------+---------------------+</p>
<p>| id | username | password | create_time         |</p>
<p>+----+----------+----------+---------------------+</p>
<p>| 30 | fox      | 556      | 2021-07-30 08:19:37 |</p>
<p>| 31 | fox      | 556      | 2021-07-30 08:19:38 |</p>
<p>+----+----------+----------+---------------------+</p>
<p>2 rows in set (0.00 sec)</p>
<p>root@tidb04 10:58:&gt; select <em> from test_db.test01;</p>
<p>+----+----------+------------+---------------------+</p>
<p>| id | username | password   | create_time         |</p>
<p>+----+----------+------------+---------------------+</p>
<p>|1 | tomcat   | xiaohuahua | 2021-07-03 23:51:17 |</p>
<p>|2 | php      | xiao       | 2021-07-03 23:53:36 |</p>
<p>|3 | fix      | xiao       | 2021-07-03 23:53:49 |</p>
<p>|4 | java   | bai      | 2021-07-03 23:54:01 |</p>
<p>+----+----------+------------+---------------------+</p>
4 rows in set (0.00 sec)全量同步MySQL数据到ES:
# time mom run -c ./test_mom/init_config.py
<p>2021-08-01 10:58:49,966 root         INFO   {"username": "fox", "password": "556", "create_time": "2021-07-30 08:19:37", "id": 30, "_id": 30}</p>
<p>2021-08-01 10:58:49,967 root         INFO   {"username": "fox", "password": "556", "create_time": "2021-07-30 08:19:38", "id": 31, "_id": 31}</p>
<p>2021-08-01 10:58:50,115 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk </p>
<p>2021-08-01 10:58:50,119 root         INFO   {"username": "tomcat", "_id": 1, "id": 1}</p>
<p>2021-08-01 10:58:50,119 root         INFO   {"username": "php", "_id": 2, "id": 2}</p>
<p>2021-08-01 10:58:50,119 root         INFO   {"username": "fix", "_id": 3, "id": 3}</p>
<p>2021-08-01 10:58:50,119 root         INFO   {"username": "java", "_id": 4, "id": 4}</p>
<p>2021-08-01 10:58:50,259 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk </p>
<p>real    0m0.873s</p>
<p>user    0m0.505s</p>
sys 0m0.080sES数据验证:



全量同步的配置文件内容如下:

# cat ./test_mom/init_config.py<br><h1>coding=utf-8</h1><br>
STREAM = "INIT"<br><h1>修改数据库连接</h1><br>
<p>CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1</h1><br>
BULK_SIZE = 50000<br><h1>修改elasticsearch节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
TASKS = [<br><h1>同步stdb01.test03到es:</h1><br>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 4><br><h2>同步test_db.test01到es:</h2><br>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 5>
<p>
<spaceaf36b46d733641ec38b154de40cee487Code 6>
</p>
]<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"

案例五、设置参数BULK_SIZE =1和=10000进行测试全量同步时间测试
设置BULK_SIZE =10000进行全量同步表t_work_order_follow数据到es测试
# mysql -e "select count(</em>) from stdb01.t_work_order_follow;"
<p>+----------+</p>
<p>| count(*) |</p>
<p>+----------+</p>
<p>|3975925 |</p>
<p>+----------+</p>
<p>#time mom run -c ./test_mom/init_config.py</p>
<p>real    30m7.618s</p>
<p>user    23m51.398s</p>
sys   0m58.087s设置参数BULK_SIZE =1 进行全量同步表t_work_order_follow数据到es测试,说实话花费的时间非常的长了:

<p>root@tidb04 08:07:&gt; select count(<em>)from t_work_order_follow;</p>
<p>+----------+</p>
<p>| count(</em>) |</p>
<p>+----------+</p>
<p>|3975925 |</p>
<p>+----------+</p>
<p>1 row in set (0.62 sec)</p>
<p># time mom run -c ./test_mom/init_config.py</p>
<p>real    237m59.067s</p>
<p>user    53m49.099s</p>
<p>sys   3m25.431s</p>
后面测试把设置BULK_SIZE =50000 测试结果和设置BULK_SIZE =10000消耗的时间基本差不多。
全量同步演示到此介绍完毕。后面会分享下增量同步的配置方法,尽情期待。

文档来源:51CTO技术博客https://blog.51cto.com/wujianwei/3241929
页: [1]
查看完整版本: MySQLmom程序全量同步MySQL表数据到ES