特殊提示:
本次演示的ES的版本为公司内部定制的elasticsearch-5.0.0的版本测试的,而且ES是单节点安装。本次演示从一个全量同步MySQL表数据到elasticsearch-5.0.0开始
一、创建全量同步配置文件[root@tidb05 mysqlsmom]# mom new test_mom/init_config.py -t init --force
<p>/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!</p>
<p>RequestsDependencyWarning)</p>
<p>new config at /data1/soft/mysqlsmom/test_mom/init_config.py</p>
<p>[root@tidb05 mysqlsmom]# echo $?</p>
0 说明:test_mom是可以指定名称的。
解决其中一个warning警告,其实对本次演示时没任何影响的,但是对于洁癖的我,看着总觉得不爽。具体warning警告内容和解决办法如下:[root@tidb05 mysqlsmom]# mom new 197_testdb_mom/init_config.py -t init --force
<p>/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!</p>
<p>RequestsDependencyWarning)</p>
<p>new config at /data1/soft/mysqlsmom/197_testdb_mom/init_config.py</p>
<p>[root@tidb05 mysqlsmom]# ll /data1/soft/mysqlsmom/197_testdb_mom/init_config.py</p>
-rw-r--r-- 1 root root 1298 Jul 11 10:46 /data1/soft/mysqlsmom/197_testdb_mom/init_config.py
上面的/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!
<strong>解决办法:</strong>
<p>pip uninstall urllib3 -y</p>
<p>pip uninstall chardet -y</p>
pip install requests 二、案例配置演示
案例一:全量同步MySQL某张表全部字段数据到es
创建测试库和测试表,并写入测试数据到test01测试表:create database stdb01;
<p>CREATE TABLE <code>test01 (
id int(8) NOT NULL AUTO_INCREMENT,
username varchar(20) COLLATE utf8_unicode_ci NOT NULL,
password varchar(20) COLLATE utf8_unicode_ci NOT NULL,
create_time varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test01(username,password,create_time) values('tomcat', 'xiaohuahua',now());
INSERT INTO test01(username,password,create_time) values('java', '123456',now());
INSERT INTO test01(username,password,create_time) values('lua', 'ssd123456',now());
INSERT INTO test01(username,password,create_time) values('php', 'seurw456',now());
INSERT INTO test01(username,password,create_time) values('python', 'seueurw456',now());
root@tidb04 10:57: [stdb01]> select from test01;
+----+----------+------------+---------------------+
| id | username | password | create_time |
+----+----------+------------+---------------------+
| 1 | tomcat | xiaohuahua | 2021-07-11 10:57:57 |
| 2 | java | 123456 | 2021-07-11 10:57:57 |
| 3 | lua | ssd123456 | 2021-07-11 10:57:57 |
| 4 | php | seurw456 | 2021-07-11 10:57:57 |
| 5 | python | seueurw456 | 2021-07-11 10:57:58 |
+----+----------+------------+---------------------+
5 rows in set (0.00 sec) 创建连接库的账户:(这个账户的权限其实可以再小点:grant replication slave on . )GRANT ALL PRIVILEGES ON </em>.* TO 'click_rep'@'172.16.0.246' identified by 'jwtest123456';flush privileges; 全量同步stdb01.test01表数据的配置文件内容如下:[root@tidb05 mysqlsmom]# cat /data1/soft/mysqlsmom/test_mom/init_config.py<br><h1>coding=utf-8</h1><br>
STREAM = "INIT"<br><h1>修改数据库连接</h1><br>
<p>CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1</h1><br>
BULK_SIZE = 1<br><h1>修改elasticsearch节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
<p>TASKS = [</p>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 0>
]<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"
执行全量同步命令:[root@tidb05 mysqlsmom]# pwd
<p>/data1/soft/mysqlsmom</p>
<p>[root@tidb05 mysqlsmom]# ls</p>
<p>197_testdb_mom docs mysqlsmom README.md README_OLD.md setup.py test_mom</p>
<p>[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py</p>
<p>/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!</p>
<p>RequestsDependencyWarning)</p>
<p>2021-07-11 11:09:44,500 root INFO {"username": "tomcat", "password": "xiaohuahua", "create_time": "2021-07-11 10:57:57", "id": 1, "_id": 1}</p>
<p>2021-07-11 11:09:44,623 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.123s]</p>
<p>2021-07-11 11:09:44,624 root INFO {"username": "java", "password": "123456", "create_time": "2021-07-11 10:57:57", "id": 2, "_id": 2}</p>
<p>2021-07-11 11:09:44,630 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.006s]</p>
<p>2021-07-11 11:09:44,630 root INFO {"username": "lua", "password": "ssd123456", "create_time": "2021-07-11 10:57:57", "id": 3, "_id": 3}</p>
<p>2021-07-11 11:09:44,639 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.009s]</p>
<p>2021-07-11 11:09:44,640 root INFO {"username": "php", "password": "seurw456", "create_time": "2021-07-11 10:57:57", "id": 4, "_id": 4}</p>
<p>2021-07-11 11:09:44,644 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.004s]</p>
<p>2021-07-11 11:09:44,645 root INFO {"username": "python", "password": "seueurw456", "create_time": "2021-07-11 10:57:58", "id": 5, "_id": 5}</p>
<p>2021-07-11 11:09:44,650 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]</p>
<p>real 0m0.640s</p>
<p>user 0m0.444s</p>
sys 0m0.051s 具体ES里面的数据此处就不再截图演示了。
案例二:全量同步MySQL某张表部分字段数据到es
创建测试表test02和写入测试数据:CREATE TABLE <code>test02 (
id int(8) NOT NULL AUTO_INCREMENT,
username varchar(20) COLLATE utf8_unicode_ci NOT NULL,
password varchar(20) COLLATE utf8_unicode_ci NOT NULL,
create_time varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test02(username,password,create_time) values('tomcat', 'xiaohuahua',now());
INSERT INTO test02(username,password,create_time) values('java', '123456',now());
INSERT INTO test02(username,password,create_time) values('lua', 'ssd123456',now());
INSERT INTO test02(username,password,create_time) values('php', 'seurw456',now());
INSERT INTO test02(username,password,create_time) values('python', 'seueurw456',now()); 运行同步命令:<p>[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py</p>
<p>/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!</p>
<p>RequestsDependencyWarning)</p>
<p>2021-07-11 11:25:53,126 root INFO {"username": "tomcat", "_id": 1, "id": 1}</p>
<p>2021-07-11 11:25:53,217 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.091s]</p>
<p>2021-07-11 11:25:53,218 root INFO {"username": "java", "_id": 2, "id": 2}</p>
<p>2021-07-11 11:25:53,223 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]</p>
<p>2021-07-11 11:25:53,223 root INFO {"username": "lua", "_id": 3, "id": 3}</p>
<p>2021-07-11 11:25:53,228 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]</p>
<p>2021-07-11 11:25:53,229 root INFO {"username": "php", "_id": 4, "id": 4}</p>
<p>2021-07-11 11:25:53,235 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.006s]</p>
<p>2021-07-11 11:25:53,235 root INFO {"username": "python", "_id": 5, "id": 5}</p>
<p>2021-07-11 11:25:53,241 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.006s]</p>
<p>real 0m0.597s</p>
<p>user 0m0.440s</p>
sys 0m0.047s 具体ES里面的数据此处就不再截图演示了。
此案例演示的具体配置文件如下:[root@tidb05 mysqlsmom]# cat /data1/soft/mysqlsmom/test_mom/init_config.py<br><h1>coding=utf-8</h1><br>
STREAM = "INIT"<br><h1>修改数据库连接</h1><br>
<p>CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1</h1><br>
BULK_SIZE = 1<br><h1>修改elasticsearch节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
<p>TASKS = [</p>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 1>
]<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"
案例三:全量同步MySQL多张表数据到es
创建测试表test03,test04,写入测试数据:CREATE TABLE <code>test03 (
id int(8) NOT NULL AUTO_INCREMENT,
username varchar(20) COLLATE utf8_unicode_ci NOT NULL,
password varchar(20) COLLATE utf8_unicode_ci NOT NULL,
create_time varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test03(username,password,create_time) values('tomcat', 'xiaohuahua',now());
INSERT INTO test03(username,password,create_time) values('java', '123456',now());
CREATE TABLE test04 (
id int(8) NOT NULL AUTO_INCREMENT,
username varchar(20) COLLATE utf8_unicode_ci NOT NULL,
password varchar(20) COLLATE utf8_unicode_ci NOT NULL,
create_time varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test04(username,password,create_time) values('tomcat', 'xiaohuahua',now());
INSERT INTO test04(username,password,create_time) values('java', '123456',now());
root@tidb04 12:59: [stdb01]> select from test04;
+----+----------+------------+---------------------+
| id | username | password | create_time |
+----+----------+------------+---------------------+
| 1 | tomcat | xiaohuahua | 2021-07-11 12:59:01 |
| 2 | java | 123456 | 2021-07-11 12:59:01 |
+----+----------+------------+---------------------+
2 rows in set (0.00 sec)
root@tidb04 12:59: [stdb01]> select from test03;
+----+----------+------------+---------------------+
| id | username | password | create_time |
+----+----------+------------+---------------------+
| 1 | tomcat | xiaohuahua | 2021-07-11 12:58:53 |
| 2 | java | 123456 | 2021-07-11 12:58:54 |
+----+----------+------------+---------------------+
2 rows in set (0.00 sec) 此案例配置文件内容如下:[root@tidb05 test_mom]# cat init_config.py<br><h1>coding=utf-8</h1><br>
STREAM = "INIT"<br><h1>修改数据库连接</h1><br>
<p>CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1</h1><br>
BULK_SIZE = 1<br><h1>修改elasticsearch节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
TASKS = [<br><h1>同步stdb01.test03到es:</h1><br>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 2><br><h1>同步stdb01.test04到es:</h1><br>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 3>
]<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"
启动运行命令:[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py
<p>/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn't match a supported version!</p>
<p>RequestsDependencyWarning)</p>
<p>2021-07-11 13:01:09,473 root INFO {"username": "tomcat", "password": "xiaohuahua", "create_time": "2021-07-11 12:58:53", "id": 1, "_id": 1}</p>
<p>2021-07-11 13:01:09,555 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.082s]</p>
<p>2021-07-11 13:01:09,556 root INFO {"username": "java", "password": "123456", "create_time": "2021-07-11 12:58:54", "id": 2, "_id": 2}</p>
<p>2021-07-11 13:01:09,561 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]</p>
<p>2021-07-11 13:01:09,564 root INFO {"username": "tomcat", "_id": 1, "id": 1}</p>
<p>2021-07-11 13:01:09,636 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.072s]</p>
<p>2021-07-11 13:01:09,636 root INFO {"username": "java", "_id": 2, "id": 2}</p>
<p>2021-07-11 13:01:09,642 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]</p>
<p>real 0m0.629s</p>
<p>user 0m0.411s</p>
sys 0m0.055s 具体ES里面的数据此处就不再截图演示了。
案例四、同步同MySQL实例下不同的库不同的表数据到ES
MySQL表数据如下: <p>root@tidb04 10:58: [test_db]> select * from stdb01.test01;</p>
<p>+----+----------+----------+---------------------+</p>
<p>| id | username | password | create_time |</p>
<p>+----+----------+----------+---------------------+</p>
<p>| 30 | fox | 556 | 2021-07-30 08:19:37 |</p>
<p>| 31 | fox | 556 | 2021-07-30 08:19:38 |</p>
<p>+----+----------+----------+---------------------+</p>
<p>2 rows in set (0.00 sec)</p>
<p>root@tidb04 10:58: [test_db]> select <em> from test_db.test01;</p>
<p>+----+----------+------------+---------------------+</p>
<p>| id | username | password | create_time |</p>
<p>+----+----------+------------+---------------------+</p>
<p>| 1 | tomcat | xiaohuahua | 2021-07-03 23:51:17 |</p>
<p>| 2 | php | xiao | 2021-07-03 23:53:36 |</p>
<p>| 3 | fix | xiao | 2021-07-03 23:53:49 |</p>
<p>| 4 | java | bai | 2021-07-03 23:54:01 |</p>
<p>+----+----------+------------+---------------------+</p>
4 rows in set (0.00 sec) 全量同步MySQL数据到ES: [root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py
<p>2021-08-01 10:58:49,966 root INFO {"username": "fox", "password": "556", "create_time": "2021-07-30 08:19:37", "id": 30, "_id": 30}</p>
<p>2021-08-01 10:58:49,967 root INFO {"username": "fox", "password": "556", "create_time": "2021-07-30 08:19:38", "id": 31, "_id": 31}</p>
<p>2021-08-01 10:58:50,115 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.148s]</p>
<p>2021-08-01 10:58:50,119 root INFO {"username": "tomcat", "_id": 1, "id": 1}</p>
<p>2021-08-01 10:58:50,119 root INFO {"username": "php", "_id": 2, "id": 2}</p>
<p>2021-08-01 10:58:50,119 root INFO {"username": "fix", "_id": 3, "id": 3}</p>
<p>2021-08-01 10:58:50,119 root INFO {"username": "java", "_id": 4, "id": 4}</p>
<p>2021-08-01 10:58:50,259 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.139s]</p>
<p>real 0m0.873s</p>
<p>user 0m0.505s</p>
sys 0m0.080s ES数据验证:
全量同步的配置文件内容如下:[root@tidb05 mysqlsmom]# cat ./test_mom/init_config.py<br><h1>coding=utf-8</h1><br>
STREAM = "INIT"<br><h1>修改数据库连接</h1><br>
<p>CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1</h1><br>
BULK_SIZE = 50000<br><h1>修改elasticsearch节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
TASKS = [<br><h1>同步stdb01.test03到es:</h1><br>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 4><br><h2>同步test_db.test01到es:</h2><br>
<p> {</p>
<spaceaf36b46d733641ec38b154de40cee487Code 5>
<p>
<spaceaf36b46d733641ec38b154de40cee487Code 6>
</p>
]<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"
案例五、设置参数BULK_SIZE =1和=10000进行测试全量同步时间测试
设置BULK_SIZE =10000进行全量同步表t_work_order_follow数据到es测试[root@tidb04 ~]# mysql -e "select count(</em>) from stdb01.t_work_order_follow;"
<p>+----------+</p>
<p>| count(*) |</p>
<p>+----------+</p>
<p>| 3975925 |</p>
<p>+----------+</p>
<p>[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py</p>
<p>real 30m7.618s</p>
<p>user 23m51.398s</p>
sys 0m58.087s 设置参数BULK_SIZE =1 进行全量同步表t_work_order_follow数据到es测试,说实话花费的时间非常的长了:<p>root@tidb04 08:07: [test_db]> select count(<em>) from t_work_order_follow;</p>
<p>+----------+</p>
<p>| count(</em>) |</p>
<p>+----------+</p>
<p>| 3975925 |</p>
<p>+----------+</p>
<p>1 row in set (0.62 sec)</p>
<p>[root@tidb06 mysqlsmom]# time mom run -c ./test_mom/init_config.py</p>
<p>real 237m59.067s</p>
<p>user 53m49.099s</p>
<p>sys 3m25.431s</p> 后面测试把设置BULK_SIZE =50000 测试结果和设置BULK_SIZE =10000消耗的时间基本差不多。
全量同步演示到此介绍完毕。后面会分享下增量同步的配置方法,尽情期待。
|