说明: 演示mysqlmom增量的同步数据到ES环境中redis版本3.2.8,ES-5.0.0,mysql5.7.22
分析 binlog 的增量同步的要求:
1.确保要增量同步的MySql数据库开启binlog,且开启redis(为了存储最后一次读到的binlog文件名及读到的位置。未来可能支持本地文件存储该信息。)
注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;启动增量同步前。首先对所要增量同步的表进行一次全量同步,然后才是开启增量同步
2.需要在本地安装redis服务<p>/usr/local/redis/bin/redis-server /usr/local/redis/conf/redis.conf</p>
<p>[root@tidb05 conf]# /usr/local/redis/bin/redis-server /usr/local/redis/conf/redis.conf</p>
<p>[root@tidb05 conf]# ss -lntup|grep redis</p>
<p>tcp LISTEN 0 8192 127.0.0.1:10201 <em>:</em> users:(("redis-server",pid=10597,fd=4))</p>
[root@tidb05 conf]# /usr/local/redis/bin/redis-cli -h 127.0.0.1 -p 10201 -a 'YHu222tuEq' info Memory<br><h1>Memory</h1><br>
<p>used_memory:6179328</p>
<p>used_memory_human:5.89M</p>
<p>used_memory_rss:5095424</p>
<p>used_memory_rss_human:4.86M</p>
<p>used_memory_peak:6179328</p>
<p>used_memory_peak_human:5.89M</p>
<p>total_system_memory:16656146432</p>
<p>total_system_memory_human:15.51G</p>
<p>used_memory_lua:37888</p>
<p>used_memory_lua_human:37.00K</p>
<p>maxmemory:1000000000</p>
<p>maxmemory_human:953.67M</p>
<p>maxmemory_policy:noeviction</p>
<p>mem_fragmentation_ratio:0.82</p>
<p>mem_allocator:jemalloc-4.0.3</p> 3.新建配置文件,只支持"insert", "update"的增量同步:mom new test_mom/binlog_config.py -t binlog --force
<p>[root@tidb05 mysqlsmom]# mom new test_mom/binlog_config.py -t binlog --force</p>
<p>new config at /data1/soft/mysqlsmom/test_mom/binlog_config.py</p> 4.编辑 test_mom/binlog_config.py,按注释提示修改配置:[root@tidb05 conf]# cat /data1/soft/mysqlsmom/test_mom/binlog_config.py<br><h1>coding=utf-8</h1><br>
<p>STREAM = "BINLOG" # "BINLOG" or "INIT"</p>
<p>SERVER_ID = 172160197 ### 确保每个用于binlog同步的配置文件的SERVER_ID不同</p>
<p>SLAVE_UUID = __name__</p>
<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1<br><h3>BULK_SIZE = 10000 此参数只能是在全量导数据时开启,在增量同步数据时禁止开启,否则导致增量同步失败</h3><br></h1><br>
<p>BINLOG_CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>redis存储上次同步位置等信息</h1><br>
<p>REDIS = {</p>
<p>"host": "127.0.0.1",</p>
<p>"port": 10201,</p>
<p>"db": 0,</p>
<p>"password": "YHu222tuEq", # 不需要密码则注释或删掉该行</p>
}
<br><h1>配置es节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
<p>TASKS = [</p>
<p> {</p>
<space62423248866dbe108bef7355bbee3d9eCode 0>
]
<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"</h1><br> 5.运行
该进程会一直运行,实时同步新增和更改后的数据到elasticsearch;
注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;同步旧数据请看全量同步MySql数据到es;
6.验证测试:<p>验证MySQL测试表stdb01.test01已经存在的数据:</p>
<p>[root@tidb04 ~]# mysql -e "select * from stdb01.test01;"</p>
<p>+----+----------+------------+---------------------+</p>
<p>| id | username | password | create_time |</p>
<p>+----+----------+------------+---------------------+</p>
<p>| 1 | tomcat | xiaohuahua | 2021-07-11 10:57:57 |</p>
<p>| 2 | java | 123456 | 2021-07-11 10:57:57 |</p>
<p>| 3 | lua | ssd123456 | 2021-07-11 10:57:57 |</p>
<p>| 4 | php | seurw456 | 2021-07-11 10:57:57 |</p>
<p>| 5 | python | seueurw456 | 2021-07-11 10:57:58 |</p>
<p>| 6 | java | 123456 | 2021-07-11 16:59:47 |</p>
<p>| 7 | java | 123456 | 2021-07-11 16:59:51 |</p>
<p>| 8 | java | 123456 | 2021-07-11 16:59:58 |</p>
<p>| 9 | tomcat | ceshi001 | 2021-07-28 00:24:41 |</p>
<p>| 10 | c++ | 558996 | 2021-07-28 00:33:01 |</p>
<p>| 11 | c++ | 558996 | 2021-07-11 16:59:58 |</p>
<p>| 12 | c++ | 558996 | 2021-07-11 16:59:58 |</p>
<p>| 13 | java | 596 | 2021-07-28 00:41:14 |</p>
<p>| 14 | java | 7890 | 2021-07-28 00:41:34 |</p>
<p>| 15 | php | 7890 | 2021-07-28 00:41:51 |</p>
<p>| 16 | python | 654321 | 2021-07-28 00:42:08 |</p>
+----+----------+------------+---------------------+ 启动mysqlmom服务:[root@tidb05 mysqlsmom]# mom run -c ./test_mom/binlog_config.py binglog文件和pos位置点:[root@tidb05 ~]# /usr/local/redis/bin/redis-cli -h 127.0.0.1 -p 10201 -a 'YHu222tuEq'
127.0.0.1:10201> info keyspace<br><h1>Keyspace</h1><br>
<p>db0:keys=2,expires=0,avg_ttl=0</p>
<p>127.0.0.1:10201> keys *</p>
<p>1) "binlog_config_log_pos"</p>
<p>2) "binlog_config_log_file"</p>
<p>127.0.0.1:10201> get binlog_config_log_pos</p>
<p>"2268"</p>
<p>127.0.0.1:10201> get binlog_config_log_file</p>
"mysql-bin.000013" MySQL新增和删除记录:root@tidb04 23:59: [stdb01]> INSERT INTO test01(username,password,create_time) values('go', '654',now());
<p>root@tidb04 00:16: [stdb01]> delete from test01 where id=17;</p>
<p>Query OK, 1 row affected (0.00 sec)</p>
<p>[root@tidb05 mysqlsmom]# mom run -c ./test_mom/binlog_config.py</p>
<p>2573</p>
<p>2021-07-29 00:13:46,528 root INFO {"timestamp": "2021-07-29 00:13:46", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17}, "action": "insert", "table": "test01", "schema": "stdb01"}</p>
<p>2021-07-29 00:13:46,529 root INFO {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17, "_id": 17}</p>
<p>2870</p>
<p>2021-07-29 00:16:43,364 root INFO {"timestamp": "2021-07-29 00:16:43", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17}, "action": "delete", "table": "test01", "schema": "stdb01"}</p>
<strong>虽然记录了日志但是es里面没有任何值新增和删除的变化</strong>
<p>root@tidb04 00:19: [stdb01]> INSERT INTO test01(username,password,create_time) values('go', '654',now());</p>
<p>Query OK, 1 row affected (0.00 sec)</p>
<p>root@tidb04 00:19: [stdb01]> INSERT INTO test01(username,password,create_time) values('C++', '654',now());</p>
<p>Query OK, 1 row affected (0.01 sec)</p>
<p>3330</p>
<p>2021-07-29 00:19:47,088 root INFO {"timestamp": "2021-07-29 00:19:47", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1}, "action": "insert", "table": "test01", "schema": "stdb01"}</p>
<p>2021-07-29 00:19:47,088 root INFO {"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1, "_id": 1}</p>
<p>3636</p>
<p>2021-07-29 00:20:42,729 root INFO {"timestamp": "2021-07-29 00:20:42", "host": "172.16.0.197", "values": {"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2}, "action": "insert", "table": "test01", "schema": "stdb01"}</p>
<p>2021-07-29 00:20:42,729 root INFO {"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2, "_id": 2}</p>
<strong>重复尝试了好多次,虽然在启动binlog增量同步数据时,有增删数据日志的输出,但是登录ES,始终没看到ES中数据的变化</strong>
<strong>存放在redis中的pos位置点一直没变话:</strong>
<p>127.0.0.1:10201> get binlog_config_log_file</p>
<p>"mysql-bin.000013"</p>
<p>127.0.0.1:10201> get binlog_config_log_pos</p>
"2268" 后面发现问题原因:
主要原因是错误的把配置文件binlog_config.py 中BULK_SIZE 这个参数开启导致的。果断注销掉,重启binglog同步程序,终于可以看大有数据写入到ES了
7、全量同步stdb01.test01表数据到:<p>[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py</p>
<p>2021-07-29 00:23:47,371 root INFO {"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1, "_id": 1}</p>
<p>2021-07-29 00:23:47,371 root INFO {"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2, "_id": 2}</p>
<p>2021-07-29 00:23:48,501 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:1.130s]</p>
<p>real 0m1.768s</p>
<p>user 0m0.463s</p>
<p>sys 0m0.059s</p>
<p>[es@tidb06 logs]$ curl 'http://172.16.0.247:9999/_cat/indices?v'</p>
<p>health status index uuid pri rep docs.count docs.deleted store.size pri.store.size</p>
<p>yellow open test01_index LMMynfXaThGktG_YEAO2QQ 5 1 2 0 10.1kb 10.1kb</p>
<p>[es@tidb06 logs]$ curl -s -XGET 'http://172.16.0.247:9999/_cat/indices/test01_index?v'</p>
<p>health status index uuid pri rep docs.count docs.deleted store.size pri.store.size</p>
<p>yellow open test01_index LMMynfXaThGktG_YEAO2QQ 5 1 2 0 10.1kb 10.1kb</p> 8.再次启动binlog_config.py 进行增量同步:<p>mom run -c ./test_mom/binlog_config.py</p>
<p>写入2条sql:</p>
<p>root@tidb04 00:44: [stdb01]> INSERT INTO test01(username,password,create_time) values('php', '123',now());</p>
<p>Query OK, 1 row affected (0.01 sec)</p>
<p>root@tidb04 00:44: [stdb01]> INSERT INTO test01(username,password,create_time) values('java', '321',now());</p>
<p>Query OK, 1 row affected (0.01 sec)</p>
<p>root@tidb04 00:44: [stdb01]> select * from test01;</p>
<p>+----+----------+----------+---------------------+</p>
<p>| id | username | password | create_time |</p>
<p>+----+----------+----------+---------------------+</p>
<p>| 1 | go | 654 | 2021-07-29 00:19:47 |</p>
<p>| 2 | C++ | 654 | 2021-07-29 00:20:42 |</p>
<p>| 3 | php | 123 | 2021-07-29 00:44:39 |</p>
<p>| 4 | java | 321 | 2021-07-29 00:44:49 |</p>
<p>+----+----------+----------+---------------------+</p>
<p>4 rows in set (0.00 sec)</p>
<strong>存放在redis中的pos位置点开始变化了:</strong>
<p>127.0.0.1:10201> get binlog_config_log_file</p>
<p>"mysql-bin.000013"</p>
<p>127.0.0.1:10201> get binlog_config_log_pos</p>
<p>"3278"</p>
<strong>登录ES,看到insert数据也写入到ES了</strong> 9、inert,update,delete的增量同步MySQL数据到ES的配置文件:[root@tidb05 soft]# cat mysqlsmom/test_mom/binlog_config.py<br><h1>coding=utf-8</h1><br>
<p>STREAM = "BINLOG" # "BINLOG" or "INIT"</p>
<p>SERVER_ID = 172160197 ### 确保每个用于binlog同步的配置文件的SERVER_ID不同</p>
<p>SLAVE_UUID = __name__</p>
<br><h1>一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1</h1><br><br><h1>BULK_SIZE = 10000 此参数只能是在全量导数据时开启,在增量同步数据时禁止开启,否则导致增量同步失败</h1><br>
<p>BINLOG_CONNECTION = {</p>
<p>'host': '172.16.0.197',</p>
<p>'port': 3306,</p>
<p>'user': 'click_rep',</p>
<p>'passwd': 'jwtest123456'</p>
}<br><h1>redis存储上次同步位置等信息</h1><br>
<p>REDIS = {</p>
<p>"host": "127.0.0.1",</p>
<p>"port": 10201,</p>
<p>"db": 0,</p>
<p>"password": "YHu222tuEq", # 不需要密码则注释或删掉该行</p>
}
<br><h1>配置es节点</h1><br><br><h1>NODES = [{"host": "127.0.0.1", "port": 9200}]</h1><br>
<p>NODES = [{"host": "172.16.0.247", "port": 9999}]</p>
<p>TASKS = [</p>
<p> {</p>
<space62423248866dbe108bef7355bbee3d9eCode 1>
<space62423248866dbe108bef7355bbee3d9eCode 2>
]
<br><h1>CUSTOM_ROW_HANDLERS = "./my_handlers.py"</h1><br><br><h1>CUSTOM_ROW_FILTERS = "./my_filters.py"</h1><br> 关于MySQLmom增量同步指定MySQL表数据到ES简单介绍完毕,请继续关注博主,后面还会有更精彩的文章继续分享。
|