评论

收藏

[MySQL] MySQLmom程序增量同步MySQL数据到ES

数据库 数据库 发布于:2021-08-03 11:17 | 阅读数:314 | 评论:0

说明: 演示mysqlmom增量的同步数据到ES环境中redis版本3.2.8,ES-5.0.0,mysql5.7.22

分析 binlog 的增量同步的要求:
1.确保要增量同步的MySql数据库开启binlog,且开启redis(为了存储最后一次读到的binlog文件名及读到的位置。未来可能支持本地文件存储该信息。)
注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;启动增量同步前。首先对所要增量同步的表进行一次全量同步,然后才是开启增量同步

2.需要在本地安装redis服务
<p>/usr/local/redis/bin/redis-server /usr/local/redis/conf/redis.conf</p>
<p>[root@tidb05 conf]#  /usr/local/redis/bin/redis-server /usr/local/redis/conf/redis.conf</p>
<p>[root@tidb05 conf]# ss -lntup|grep redis</p>
<p>tcp  LISTEN   0    8192   127.0.0.1:10201         <em>:</em>           users:(("redis-server",pid=10597,fd=4))</p>
[root@tidb05 conf]# /usr/local/redis/bin/redis-cli -h 127.0.0.1 -p 10201 -a 'YHu222tuEq' info Memory<br><h1>Memory</h1><br>
<p>used_memory:6179328</p>
<p>used_memory_human:5.89M</p>
<p>used_memory_rss:5095424</p>
<p>used_memory_rss_human:4.86M</p>
<p>used_memory_peak:6179328</p>
<p>used_memory_peak_human:5.89M</p>
<p>total_system_memory:16656146432</p>
<p>total_system_memory_human:15.51G</p>
<p>used_memory_lua:37888</p>
<p>used_memory_lua_human:37.00K</p>
<p>maxmemory:1000000000</p>
<p>maxmemory_human:953.67M</p>
<p>maxmemory_policy:noeviction</p>
<p>mem_fragmentation_ratio:0.82</p>
<p>mem_allocator:jemalloc-4.0.3</p>
3.新建配置文件,只支持"insert", "update"的增量同步:
mom new test_mom/binlog_config.py -t binlog --force
<p>[root@tidb05 mysqlsmom]# mom new test_mom/binlog_config.py -t binlog --force</p>
<p>new config at /data1/soft/mysqlsmom/test_mom/binlog_config.py</p>
4.编辑 test_mom/binlog_config.py,按注释提示修改配置:
[root@tidb05 conf]# cat   /data1/soft/mysqlsmom/test_mom/binlog_config.py<br><h1>coding=utf-8</h1><br>
<p>STREAM = "BINLOG"  # "BINLOG" or "INIT"</p>
<p>SERVER_ID = 172160197 ### 确保每个用于binlog同步的配置文件的SERVER_ID不同</p>
<p>SLAVE_UUID = __name__</p>
<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1<br><h3>BULK_SIZE = 10000  此参数只能是在全量导数据时开启,在增量同步数据时禁止开启,否则导致增量同步失败</h3><br></h1><br>
<p>BINLOG_CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>redis存储上次同步位置等信息</h1><br>
<p>REDIS = {</p>
<p>"host": "127.0.0.1",</p>
<p>"port": 10201,</p>
<p>"db": 0,</p>
<p>"password": "YHu222tuEq",  # 不需要密码则注释或删掉该行</p>
}
<br><h1>配置es节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
<p>TASKS = [</p>
<p> {</p>
<space62423248866dbe108bef7355bbee3d9eCode 0>
]
<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"</h1><br>
5.运行
该进程会一直运行,实时同步新增和更改后的数据到elasticsearch;
注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;同步旧数据请看全量同步MySql数据到es;
6.验证测试:
<p>验证MySQL测试表stdb01.test01已经存在的数据:</p>
<p>[root@tidb04 ~]# mysql -e "select * from stdb01.test01;"</p>
<p>+----+----------+------------+---------------------+</p>
<p>| id | username | password   | create_time     |</p>
<p>+----+----------+------------+---------------------+</p>
<p>|  1 | tomcat   | xiaohuahua | 2021-07-11 10:57:57 |</p>
<p>|  2 | java   | 123456   | 2021-07-11 10:57:57 |</p>
<p>|  3 | lua    | ssd123456  | 2021-07-11 10:57:57 |</p>
<p>|  4 | php    | seurw456   | 2021-07-11 10:57:57 |</p>
<p>|  5 | python   | seueurw456 | 2021-07-11 10:57:58 |</p>
<p>|  6 | java   | 123456   | 2021-07-11 16:59:47 |</p>
<p>|  7 | java   | 123456   | 2021-07-11 16:59:51 |</p>
<p>|  8 | java   | 123456   | 2021-07-11 16:59:58 |</p>
<p>|  9 | tomcat   | ceshi001   | 2021-07-28 00:24:41 |</p>
<p>| 10 | c++    | 558996   | 2021-07-28 00:33:01 |</p>
<p>| 11 | c++    | 558996   | 2021-07-11 16:59:58 |</p>
<p>| 12 | c++    | 558996   | 2021-07-11 16:59:58 |</p>
<p>| 13 | java   | 596    | 2021-07-28 00:41:14 |</p>
<p>| 14 | java   | 7890     | 2021-07-28 00:41:34 |</p>
<p>| 15 | php    | 7890     | 2021-07-28 00:41:51 |</p>
<p>| 16 | python   | 654321   | 2021-07-28 00:42:08 |</p>
+----+----------+------------+---------------------+
启动mysqlmom服务:
[root@tidb05 mysqlsmom]# mom run -c ./test_mom/binlog_config.py
binglog文件和pos位置点:
[root@tidb05 ~]# /usr/local/redis/bin/redis-cli -h 127.0.0.1 -p 10201 -a 'YHu222tuEq' 
127.0.0.1:10201&gt; info keyspace<br><h1>Keyspace</h1><br>
<p>db0:keys=2,expires=0,avg_ttl=0</p>
<p>127.0.0.1:10201&gt; keys *</p>
<p>1) "binlog_config_log_pos"</p>
<p>2) "binlog_config_log_file"</p>
<p>127.0.0.1:10201&gt; get binlog_config_log_pos</p>
<p>"2268"</p>
<p>127.0.0.1:10201&gt; get binlog_config_log_file</p>
"mysql-bin.000013"
MySQL新增和删除记录:
root@tidb04 23:59:  [stdb01]&gt;  INSERT INTO test01(username,password,create_time) values('go', '654',now());
<p>root@tidb04 00:16:  [stdb01]&gt; delete from test01 where id=17;</p>
<p>Query OK, 1 row affected (0.00 sec)</p>
<p>[root@tidb05 mysqlsmom]# mom run -c ./test_mom/binlog_config.py</p>
<p>2573</p>
<p>2021-07-29 00:13:46,528 root     INFO   {"timestamp": "2021-07-29 00:13:46", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17}, "action": "insert", "table": "test01", "schema": "stdb01"}</p>
<p>2021-07-29 00:13:46,529 root     INFO   {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17, "_id": 17}</p>
<p>2870</p>
<p>2021-07-29 00:16:43,364 root     INFO   {"timestamp": "2021-07-29 00:16:43", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17}, "action": "delete", "table": "test01", "schema": "stdb01"}</p>
<strong>虽然记录了日志但是es里面没有任何值新增和删除的变化</strong>
<p>root@tidb04 00:19:  [stdb01]&gt; INSERT INTO test01(username,password,create_time) values('go', '654',now());</p>
<p>Query OK, 1 row affected (0.00 sec)</p>
<p>root@tidb04 00:19:  [stdb01]&gt; INSERT INTO test01(username,password,create_time) values('C++', '654',now());</p>
<p>Query OK, 1 row affected (0.01 sec)</p>
<p>3330</p>
<p>2021-07-29 00:19:47,088 root     INFO   {"timestamp": "2021-07-29 00:19:47", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1}, "action": "insert", "table": "test01", "schema": "stdb01"}</p>
<p>2021-07-29 00:19:47,088 root     INFO   {"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1, "_id": 1}</p>
<p>3636</p>
<p>2021-07-29 00:20:42,729 root     INFO   {"timestamp": "2021-07-29 00:20:42", "host": "172.16.0.197", "values": {"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2}, "action": "insert", "table": "test01", "schema": "stdb01"}</p>
<p>2021-07-29 00:20:42,729 root     INFO   {"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2, "_id": 2}</p>
<strong>重复尝试了好多次,虽然在启动binlog增量同步数据时,有增删数据日志的输出,但是登录ES,始终没看到ES中数据的变化</strong>
<strong>存放在redis中的pos位置点一直没变话:</strong>
<p>127.0.0.1:10201&gt; get binlog_config_log_file</p>
<p>"mysql-bin.000013"</p>
<p>127.0.0.1:10201&gt; get binlog_config_log_pos</p>
"2268"
后面发现问题原因:
主要原因是错误的把配置文件binlog_config.py 中BULK_SIZE  这个参数开启导致的。果断注销掉,重启binglog同步程序,终于可以看大有数据写入到ES了
7、全量同步stdb01.test01表数据到:
<p>[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py</p>
<p>2021-07-29 00:23:47,371 root     INFO   {"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1, "_id": 1}</p>
<p>2021-07-29 00:23:47,371 root     INFO   {"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2, "_id": 2}</p>
<p>2021-07-29 00:23:48,501 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk [status:200 request:1.130s]</p>
<p>real  0m1.768s</p>
<p>user  0m0.463s</p>
<p>sys 0m0.059s</p>
<p>[es@tidb06 logs]$ curl 'http://172.16.0.247:9999/_cat/indices?v'</p>
<p>health status index    uuid           pri rep docs.count docs.deleted store.size pri.store.size</p>
<p>yellow open   test01_index LMMynfXaThGktG_YEAO2QQ   5   1      2      0   10.1kb     10.1kb</p>
<p>[es@tidb06 logs]$ curl -s -XGET 'http://172.16.0.247:9999/_cat/indices/test01_index?v'</p>
<p>health status index    uuid           pri rep docs.count docs.deleted store.size pri.store.size</p>
<p>yellow open   test01_index LMMynfXaThGktG_YEAO2QQ   5   1      2      0   10.1kb     10.1kb</p>
8.再次启动binlog_config.py 进行增量同步:
<p>mom run -c ./test_mom/binlog_config.py</p>
<p>写入2条sql:</p>
<p>root@tidb04 00:44:  [stdb01]&gt; INSERT INTO test01(username,password,create_time) values('php', '123',now());</p>
<p>Query OK, 1 row affected (0.01 sec)</p>
<p>root@tidb04 00:44:  [stdb01]&gt; INSERT INTO test01(username,password,create_time) values('java', '321',now());</p>
<p>Query OK, 1 row affected (0.01 sec)</p>
<p>root@tidb04 00:44:  [stdb01]&gt;  select * from test01;</p>
<p>+----+----------+----------+---------------------+</p>
<p>| id | username | password | create_time     |</p>
<p>+----+----------+----------+---------------------+</p>
<p>|  1 | go     | 654    | 2021-07-29 00:19:47 |</p>
<p>|  2 | C++    | 654    | 2021-07-29 00:20:42 |</p>
<p>|  3 | php    | 123    | 2021-07-29 00:44:39 |</p>
<p>|  4 | java   | 321    | 2021-07-29 00:44:49 |</p>
<p>+----+----------+----------+---------------------+</p>
<p>4 rows in set (0.00 sec)</p>
<strong>存放在redis中的pos位置点开始变化了:</strong>
<p>127.0.0.1:10201&gt; get binlog_config_log_file</p>
<p>"mysql-bin.000013"</p>
<p>127.0.0.1:10201&gt; get binlog_config_log_pos</p>
<p>"3278"</p>
<strong>登录ES,看到insert数据也写入到ES了</strong>
9、inert,update,delete的增量同步MySQL数据到ES的配置文件:
[root@tidb05 soft]# cat mysqlsmom/test_mom/binlog_config.py<br><h1>coding=utf-8</h1><br>
<p>STREAM = "BINLOG"  # "BINLOG" or "INIT"</p>
<p>SERVER_ID = 172160197 ### 确保每个用于binlog同步的配置文件的SERVER_ID不同</p>
<p>SLAVE_UUID = __name__</p>
<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1</h1><br><br><h1>BULK_SIZE = 10000   此参数只能是在全量导数据时开启,在增量同步数据时禁止开启,否则导致增量同步失败</h1><br>
<p>BINLOG_CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>redis存储上次同步位置等信息</h1><br>
<p>REDIS = {</p>
<p>"host": "127.0.0.1",</p>
<p>"port": 10201,</p>
<p>"db": 0,</p>
<p>"password": "YHu222tuEq",  # 不需要密码则注释或删掉该行</p>
}
<br><h1>配置es节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
<p>TASKS = [</p>
<p> {</p>
<space62423248866dbe108bef7355bbee3d9eCode 1>
<space62423248866dbe108bef7355bbee3d9eCode 2>
]
<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"</h1><br>
关于MySQLmom增量同步指定MySQL表数据到ES简单介绍完毕,请继续关注博主,后面还会有更精彩的文章继续分享。

关注下面的标签,发现更多相似文章