文章介绍的中心就是围绕着这么两点来说的, 为了更使文章更简明,这边以之前在公司做的一个需求为例:
需要一个循环ID生成器,循环生成从 Min 到 Max 的数字ID,在ID递增到 Max 后,返回到 Min 重新开始递增;必须能保证多个进程并发请求时生成的ID不同。
此需求要解决的问题恰好为我们要解决的进程间通信需要解决的两个问题:
select for update
我们常用的 mysql 也可以被当作中间介质来实现进程间的通信,我们规定好某一个数据表内的某一行数据作为消息交换的中转站,使用 mysql 自带的锁来协调多个进程的存取冲突。
事务的设计目的就是为了解决多进程并发查询时数据冲突的问题,可是我们常用的事务只能保证数据冲突时会被回滚,数据不会出现错误,并不能实现请求的并行化。对一些数据冲突回滚的请求,需要我们在外层添加逻辑重试。
这里介绍 mysql 的一种语法: select for update,会给固定数据加上互斥锁,且另一个请求在获取锁失败时,会阻塞至获取锁成功,mysql 帮我们实现了自旋;
用法如下:
1.关闭 mysql 的自动提交,自动提交默认打开,除非使用 transition 语句显示开启事务,默认会将每一条 sql 作为一个事务直接提交执行,这里关闭。 set autocommit=0;
2.使用select for update 语句给数据添加互斥锁。注意:需求 mysql 的 innodb 引擎支持;
3.进行数据更新和处理操作;
4.主动提交事务,并将 自动提交恢复;commit; set autocommit=1; 代码实现
然后是代码实现:
// 数据库连接实现各有不同,demo 可以自己修改一下。
function getCycleIdFromMysql($max, $min = 0){
Db::db()->execute('set autocommit = 0');
$res = Db::db()->qsqlone('SELECT cycle_id FROM cycle_id_generator WHERE id = 1 FOR UPDATE');
$cycle_id = $res['cycle_id'] + 1;
if($cycle_id > $max){
$cycle_id = $min;
}
Db::db()->execute("UPDATE cycle_id_generator SET cycle_id = {$cycle_id} WHERE id = 1");
Db::db()->execute('commit');
Db::db()->execute('set autocommit = 1');
return $cycle_id;
}
redis
incr
redis 是我们常用的缓存服务器,由于其使用内存存储数据,性能很高。我们使用一个固定的普通键来作为消息中转站,然后利用其incr命令的原子性和其执行结果(递增后的值),实现 cycle_id 的递增。
incr(key) 若 key 不存在,redis 会先将值设置为0,然后执行递增操作;
递增没有问题,可是我们还有个需求是在要其值达到 max 时,再将其置为 min,这时就可能会出现进程A在更新值为 min 时,另一个进程B也检测到值大于了 max,然后将值置为 min,可是这时的值已经不是 max,即发生了值重复更新,那么返回的值必然会有重复;
这时,我们就需要自己来实现锁了。