遇到的问题

作为一个WEB开发者,MySQL和Redis是日常应用开发中经常用到的。MySQL作为开源的关系型数据库,以其免费、快速、体积下等特性受到众多开发者的青睐,尤其是以PHP为主要编程语言的开发者PHP+MySQL已经成为标配。但是在应对高并发、大数据量查询等场景就非常吃力,当然这也不是MySQL的强项。为了解决这些问题我我们通常需要将要计算的结果缓存起来或者将高频访问的数据提前缓存,等有请求到来时直接从缓存中读取。Redis的出现完美的解决了上面的问题,其单线程安全、高速非阻塞式I/O以及精准的时间控制就是为高并发场景下缓存完美方案。在具体的实现细节上其中一点要考虑,就是如何将请求的目的数据提前缓存到Redis中呢?

传统的解决方案

最暴力最直接的方法是,哪里用到我就在哪里写,然后定义一堆”切点“在需要的地方调用缓存函数。那么我们来说说这种方式存在的几个问题:
1、”切点“定义过多,代码耦合度很高。
”切点“这里定义为需要处理缓存的地方,借用了OOP中”切点“的概念,我们在日志系统的经常提到的遇到这样的问题。略微复杂的系统会定义几个甚至十几个这样的切点,每一个都要调用对应的函数,业务代码和缓存同步代码紧耦合,不利于后续的维护工作。
2、不同类型的数据需要定义多个不同的缓存函数。
系统多数情况下会有多种需要缓存的数据类型,每种类型都需要定义相应的处理函数。
3、项目整体架构复杂。
关键业务中掺杂同步业务,项目看起来比较混乱,后续的迭代开发和维护都是令人头疼的问题。

新的解决方案

大型数据库系统都提供了一种称为CDC的技术。CDC(Change Data Capture),中文直译“数据变更捕获”,简单来说CDC能够帮助我们识别从上次提取之后发生变化的数据。利用CDC,在对源表进行INSERT、UPDATE或 DELETE等操作的同时就可以提取数据。在获取到相关事件h之后可以通过该特性直接将数据变更同步到Redis缓存中,可以构建低耦合的模块来实现Mysql到Redis的缓存方式。SqlServer和Oracle都提供了原生支持,单时Mysql并没有提供原生支持,只能寻求第三方方案。
常见的两种方式:通过Mysql主从复制,将一份数据复制到伪造的Mysql slave中,然后在“伪造slave”中完成同步等业务。
另外一种是读取分析mysql二进制日志方式来实现,比如:Maxwell Debezium python-mysql-replication 等
CDC按是否侵入大体分为两种:侵入式和非侵入式。侵入式指CDC操作会给源系统带来性能影响,只要CDC操作以任何一种方式对源数据库执行了SQL操作,就认为是侵入式的。
常用的4种CDC方法是:基于时间戳的CDC基于触发器的CDC基于快照的CDC基于日志的CDC,其中前三种是侵入式的。
这里我们重点介绍python-mysql-replication, 它是纯Python实现构建在PyMYSQL之上MySQL复制协议。 允许监听诸如插入,更新,删除其数据和原始SQL查询之类的事件。
常见使用场景:

  • MySQL到NoSQL数据库复制
  • mysql到搜索引擎复制(全文索引)
  • 在数据库中发生更改时无效缓存
  • 审计
  • 实时分析

本文主要用到第一条和第三条。

新方案的具体实施

  1. 安装必要的依赖库

pip install -r requirements.txt
安装python-mysql-replication
pip install mysql-replication

  1. MySQL配置
[mysqld]server-id         = 1
log-bin             = /var/log/mysql/mysql-bin.log
expire_logs_days = 10
max_binlog_size  = 100M
binlog-format    = row #这个非常重要events,一定要设置,只有设置了才能监听到insert\delete\updates等事件

新方案DEMO

  1. 创建Mysq实例
CREATE DATABASE test;
use test;
CREATE TABLE test4 (id int NOT NULL AUTO_INCREMENT, data VARCHAR(255), data2 VARCHAR(255), PRIMARY KEY(id));
INSERT INTO test4 (data,data2) VALUES ("Hello", "World");
UPDATE test4 SET data = "World", data2="Hello" WHERE id = 1;
DELETE FROM test4 WHERE id = 1;
  1. Python-mysql-replcation示例
#!/usr/bin/env python 
# -*- coding: utf-8 -*-
# 打印所有事件和内容
from pymysqlreplication import BinLogStreamReader

mysql_settings = {'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': ''}

stream = BinLogStreamReader(connection_settings = mysql_settings, server_id=100)

for binlogevent in stream:
    binlogevent.dump()

stream.close()

对数据库进行增删改查,会打印相应的信息

=== RotateEvent ===
Date: 1970-01-01T01:00:00
Event size: 24
Read bytes: 0

=== FormatDescriptionEvent ===
Date: 2012-10-07T15:03:06
Event size: 84
Read bytes: 0

=== QueryEvent ===
Date: 2012-10-07T15:03:16
Event size: 64
Read bytes: 64
Schema: test
Execution time: 0
Query: CREATE DATABASE test

=== QueryEvent ===
Date: 2012-10-07T15:03:16
Event size: 151
Read bytes: 151
Schema: test
Execution time: 0
Query: CREATE TABLE test4 (id int NOT NULL AUTO_INCREMENT, data VARCHAR(255), data2 VARCHAR(255), PRIMARY KEY(id))

=== QueryEvent ===
Date: 2012-10-07T15:03:16
Event size: 49
Read bytes: 49
Schema: test
Execution time: 0
Query: BEGIN

=== TableMapEvent ===
Date: 2012-10-07T15:03:16
Event size: 31
Read bytes: 30
Table id: 781
Schema: test
Table: test4
Columns: 3

=== WriteRowsEvent ===
Date: 2012-10-07T15:03:16
Event size: 27
Read bytes: 10
Table: test.test4
Affected columns: 3
Changed rows: 1
Values:
  1. 一个同步Redis的简单示例
#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
# Update a redis server cache when an evenement is trigger
# in MySQL replication log
#

import redis

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
    DeleteRowsEvent,
    UpdateRowsEvent,
    WriteRowsEvent,
)

MYSQL_SETTINGS = {
    "host": "127.0.0.1",
    "port": 3306,
    "user": "root",
    "passwd": ""
}


def main():
    r = redis.Redis()

    stream = BinLogStreamReader(
        connection_settings=MYSQL_SETTINGS,
        only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])

    for binlogevent in stream:
        prefix = "%s:%s:" % (binlogevent.schema, binlogevent.table)

        for row in binlogevent.rows:
            if isinstance(binlogevent, DeleteRowsEvent):
                vals = row["values"]
                r.delete(prefix + str(vals["id"]))
            elif isinstance(binlogevent, UpdateRowsEvent):
                vals = row["after_values"]
                r.hmset(prefix + str(vals["id"]), vals)
            elif isinstance(binlogevent, WriteRowsEvent):
                vals = row["values"]
                r.hmset(prefix + str(vals["id"]), vals)

    stream.close()


if __name__ == "__main__":
    main()

总结

基于CDC方案的Mysql到Redis同步,较好的解决了一开始我们遇到的逻辑高度耦合、代码和逻辑混乱问题,将各个功能模块发降低了维护和开发成本。我们在实际的项目中已经有大规模的应用,也积累的一些实战经验。
尽管新方案有点很多单时也存在一些限制。

  • 不支持GEOMETRY字段类型
    GEOMETRY字段未解码将获得原始数据。
  • binlog_row_image限制
    仅支持[binlog_row_image = full](这是默认值)。
  • BOOLEAN 和 BOOL
    布尔值作为TINYINT(1)返回,实际上mysql boolean和bool以tinyint方式实现。

参考文档:
mysql的数据变更通知方法调研 https://www.jianshu.com/p/2cf66f39cd58
一个简单的数据订阅程序(for DBA) http://seanlook.com/2017/09/05/mysql-binlog-subscribe-simple-for-dba/

Tags: redis同步mysql, mysql cdc, mysql 变更数据捕获, python-mysql-replaction

Related Posts:
  • [尚无相关文章]

Leave a Comment