上山打老虎 发表于 2021-8-3 11:17:39

MySQLmom程序增量同步MySQL数据到ES

说明: 演示mysqlmom增量的同步数据到ES环境中redis版本3.2.8,ES-5.0.0,mysql5.7.22

分析 binlog 的增量同步的要求:
1.确保要增量同步的MySql数据库开启binlog,且开启redis(为了存储最后一次读到的binlog文件名及读到的位置。未来可能支持本地文件存储该信息。)
注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;启动增量同步前。首先对所要增量同步的表进行一次全量同步,然后才是开启增量同步
2.需要在本地安装redis服务

<p>/usr/local/redis/bin/redis-server /usr/local/redis/conf/redis.conf</p>
<p>#/usr/local/redis/bin/redis-server /usr/local/redis/conf/redis.conf</p>
<p># ss -lntup|grep redis</p>
<p>tcp    LISTEN   0      8192   127.0.0.1:10201               <em>:</em>                   users:(("redis-server",pid=10597,fd=4))</p>
# /usr/local/redis/bin/redis-cli -h 127.0.0.1 -p 10201 -a 'YHu222tuEq' info Memory<br><h1>Memory</h1><br>
<p>used_memory:6179328</p>
<p>used_memory_human:5.89M</p>
<p>used_memory_rss:5095424</p>
<p>used_memory_rss_human:4.86M</p>
<p>used_memory_peak:6179328</p>
<p>used_memory_peak_human:5.89M</p>
<p>total_system_memory:16656146432</p>
<p>total_system_memory_human:15.51G</p>
<p>used_memory_lua:37888</p>
<p>used_memory_lua_human:37.00K</p>
<p>maxmemory:1000000000</p>
<p>maxmemory_human:953.67M</p>
<p>maxmemory_policy:noeviction</p>
<p>mem_fragmentation_ratio:0.82</p>
<p>mem_allocator:jemalloc-4.0.3</p>
3.新建配置文件,只支持"insert", "update"的增量同步:
mom new test_mom/binlog_config.py -t binlog --force
<p># mom new test_mom/binlog_config.py -t binlog --force</p>
<p>new config at /data1/soft/mysqlsmom/test_mom/binlog_config.py</p>
4.编辑 test_mom/binlog_config.py,按注释提示修改配置:

# cat   /data1/soft/mysqlsmom/test_mom/binlog_config.py<br><h1>coding=utf-8</h1><br>
<p>STREAM = "BINLOG"# "BINLOG" or "INIT"</p>
<p>SERVER_ID = 172160197 ### 确保每个用于binlog同步的配置文件的SERVER_ID不同</p>
<p>SLAVE_UUID = __name__</p>
<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1<br><h3>BULK_SIZE = 10000此参数只能是在全量导数据时开启,在增量同步数据时禁止开启,否则导致增量同步失败</h3><br></h1><br>
<p>BINLOG_CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>redis存储上次同步位置等信息</h1><br>
<p>REDIS = {</p>
<p>"host": "127.0.0.1",</p>
<p>"port": 10201,</p>
<p>"db": 0,</p>
<p>"password": "YHu222tuEq",# 不需要密码则注释或删掉该行</p>
}
<br><h1>配置es节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
<p>TASKS = [</p>
<p> {</p>
<space62423248866dbe108bef7355bbee3d9eCode 0>
]
<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"</h1><br>
5.运行
该进程会一直运行,实时同步新增和更改后的数据到elasticsearch;
注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;同步旧数据请看全量同步MySql数据到es;
6.验证测试:

<p>验证MySQL测试表stdb01.test01已经存在的数据:</p>
<p># mysql -e "select * from stdb01.test01;"</p>
<p>+----+----------+------------+---------------------+</p>
<p>| id | username | password   | create_time         |</p>
<p>+----+----------+------------+---------------------+</p>
<p>|1 | tomcat   | xiaohuahua | 2021-07-11 10:57:57 |</p>
<p>|2 | java   | 123456   | 2021-07-11 10:57:57 |</p>
<p>|3 | lua      | ssd123456| 2021-07-11 10:57:57 |</p>
<p>|4 | php      | seurw456   | 2021-07-11 10:57:57 |</p>
<p>|5 | python   | seueurw456 | 2021-07-11 10:57:58 |</p>
<p>|6 | java   | 123456   | 2021-07-11 16:59:47 |</p>
<p>|7 | java   | 123456   | 2021-07-11 16:59:51 |</p>
<p>|8 | java   | 123456   | 2021-07-11 16:59:58 |</p>
<p>|9 | tomcat   | ceshi001   | 2021-07-28 00:24:41 |</p>
<p>| 10 | c++      | 558996   | 2021-07-28 00:33:01 |</p>
<p>| 11 | c++      | 558996   | 2021-07-11 16:59:58 |</p>
<p>| 12 | c++      | 558996   | 2021-07-11 16:59:58 |</p>
<p>| 13 | java   | 596      | 2021-07-28 00:41:14 |</p>
<p>| 14 | java   | 7890       | 2021-07-28 00:41:34 |</p>
<p>| 15 | php      | 7890       | 2021-07-28 00:41:51 |</p>
<p>| 16 | python   | 654321   | 2021-07-28 00:42:08 |</p>
+----+----------+------------+---------------------+启动mysqlmom服务:
# mom run -c ./test_mom/binlog_config.py
binglog文件和pos位置点:
# /usr/local/redis/bin/redis-cli -h 127.0.0.1 -p 10201 -a 'YHu222tuEq'
127.0.0.1:10201&gt; info keyspace<br><h1>Keyspace</h1><br>
<p>db0:keys=2,expires=0,avg_ttl=0</p>
<p>127.0.0.1:10201&gt; keys *</p>
<p>1) "binlog_config_log_pos"</p>
<p>2) "binlog_config_log_file"</p>
<p>127.0.0.1:10201&gt; get binlog_config_log_pos</p>
<p>"2268"</p>
<p>127.0.0.1:10201&gt; get binlog_config_log_file</p>
"mysql-bin.000013"MySQL新增和删除记录:
root@tidb04 23:59:&gt;INSERT INTO test01(username,password,create_time) values('go', '654',now());
<p>root@tidb04 00:16:&gt; delete from test01 where id=17;</p>
<p>Query OK, 1 row affected (0.00 sec)</p>
<p># mom run -c ./test_mom/binlog_config.py</p>
<p>2573</p>
<p>2021-07-29 00:13:46,528 root         INFO   {"timestamp": "2021-07-29 00:13:46", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17}, "action": "insert", "table": "test01", "schema": "stdb01"}</p>
<p>2021-07-29 00:13:46,529 root         INFO   {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17, "_id": 17}</p>
<p>2870</p>
<p>2021-07-29 00:16:43,364 root         INFO   {"timestamp": "2021-07-29 00:16:43", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17}, "action": "delete", "table": "test01", "schema": "stdb01"}</p>
<strong>虽然记录了日志但是es里面没有任何值新增和删除的变化</strong>
<p>root@tidb04 00:19:&gt; INSERT INTO test01(username,password,create_time) values('go', '654',now());</p>
<p>Query OK, 1 row affected (0.00 sec)</p>
<p>root@tidb04 00:19:&gt; INSERT INTO test01(username,password,create_time) values('C++', '654',now());</p>
<p>Query OK, 1 row affected (0.01 sec)</p>
<p>3330</p>
<p>2021-07-29 00:19:47,088 root         INFO   {"timestamp": "2021-07-29 00:19:47", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1}, "action": "insert", "table": "test01", "schema": "stdb01"}</p>
<p>2021-07-29 00:19:47,088 root         INFO   {"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1, "_id": 1}</p>
<p>3636</p>
<p>2021-07-29 00:20:42,729 root         INFO   {"timestamp": "2021-07-29 00:20:42", "host": "172.16.0.197", "values": {"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2}, "action": "insert", "table": "test01", "schema": "stdb01"}</p>
<p>2021-07-29 00:20:42,729 root         INFO   {"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2, "_id": 2}</p>
<strong>重复尝试了好多次,虽然在启动binlog增量同步数据时,有增删数据日志的输出,但是登录ES,始终没看到ES中数据的变化</strong>

<strong>存放在redis中的pos位置点一直没变话:</strong>
<p>127.0.0.1:10201&gt; get binlog_config_log_file</p>
<p>"mysql-bin.000013"</p>
<p>127.0.0.1:10201&gt; get binlog_config_log_pos</p>
"2268"后面发现问题原因:
主要原因是错误的把配置文件binlog_config.py 中BULK_SIZE这个参数开启导致的。果断注销掉,重启binglog同步程序,终于可以看大有数据写入到ES了
7、全量同步stdb01.test01表数据到:

<p># time mom run -c ./test_mom/init_config.py</p>
<p>2021-07-29 00:23:47,371 root         INFO   {"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1, "_id": 1}</p>
<p>2021-07-29 00:23:47,371 root         INFO   {"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2, "_id": 2}</p>
<p>2021-07-29 00:23:48,501 elasticsearch INFO   POST http://172.16.0.247:9999/_bulk </p>
<p>real    0m1.768s</p>
<p>user    0m0.463s</p>
<p>sys 0m0.059s</p>
<p>$ curl 'http://172.16.0.247:9999/_cat/indices?v'</p>
<p>health status index      uuid                   pri rep docs.count docs.deleted store.size pri.store.size</p>
<p>yellow open   test01_index LMMynfXaThGktG_YEAO2QQ   5   1          2            0   10.1kb         10.1kb</p>
<p>$ curl -s -XGET 'http://172.16.0.247:9999/_cat/indices/test01_index?v'</p>
<p>health status index      uuid                   pri rep docs.count docs.deleted store.size pri.store.size</p>
<p>yellow open   test01_index LMMynfXaThGktG_YEAO2QQ   5   1          2            0   10.1kb         10.1kb</p>
8.再次启动binlog_config.py 进行增量同步:

<p>mom run -c ./test_mom/binlog_config.py</p>
<p>写入2条sql:</p>
<p>root@tidb04 00:44:&gt; INSERT INTO test01(username,password,create_time) values('php', '123',now());</p>
<p>Query OK, 1 row affected (0.01 sec)</p>
<p>root@tidb04 00:44:&gt; INSERT INTO test01(username,password,create_time) values('java', '321',now());</p>
<p>Query OK, 1 row affected (0.01 sec)</p>
<p>root@tidb04 00:44:&gt;select * from test01;</p>
<p>+----+----------+----------+---------------------+</p>
<p>| id | username | password | create_time         |</p>
<p>+----+----------+----------+---------------------+</p>
<p>|1 | go       | 654      | 2021-07-29 00:19:47 |</p>
<p>|2 | C++      | 654      | 2021-07-29 00:20:42 |</p>
<p>|3 | php      | 123      | 2021-07-29 00:44:39 |</p>
<p>|4 | java   | 321      | 2021-07-29 00:44:49 |</p>
<p>+----+----------+----------+---------------------+</p>
<p>4 rows in set (0.00 sec)</p>
<strong>存放在redis中的pos位置点开始变化了:</strong>
<p>127.0.0.1:10201&gt; get binlog_config_log_file</p>
<p>"mysql-bin.000013"</p>
<p>127.0.0.1:10201&gt; get binlog_config_log_pos</p>
<p>"3278"</p>
<strong>登录ES,看到insert数据也写入到ES了</strong>9、inert,update,delete的增量同步MySQL数据到ES的配置文件:

# cat mysqlsmom/test_mom/binlog_config.py<br><h1>coding=utf-8</h1><br>
<p>STREAM = "BINLOG"# "BINLOG" or "INIT"</p>
<p>SERVER_ID = 172160197 ### 确保每个用于binlog同步的配置文件的SERVER_ID不同</p>
<p>SLAVE_UUID = __name__</p>
<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1</h1><br><br><h1>BULK_SIZE = 10000   此参数只能是在全量导数据时开启,在增量同步数据时禁止开启,否则导致增量同步失败</h1><br>
<p>BINLOG_CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>redis存储上次同步位置等信息</h1><br>
<p>REDIS = {</p>
<p>"host": "127.0.0.1",</p>
<p>"port": 10201,</p>
<p>"db": 0,</p>
<p>"password": "YHu222tuEq",# 不需要密码则注释或删掉该行</p>
}
<br><h1>配置es节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
<p>TASKS = [</p>
<p> {</p>
<space62423248866dbe108bef7355bbee3d9eCode 1>

<space62423248866dbe108bef7355bbee3d9eCode 2>
]
<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"</h1><br>
关于MySQLmom增量同步指定MySQL表数据到ES简单介绍完毕,请继续关注博主,后面还会有更精彩的文章继续分享。

文档来源:51CTO技术博客https://blog.51cto.com/wujianwei/3241985
页: [1]
查看完整版本: MySQLmom程序增量同步MySQL数据到ES