评论

收藏

[MySQL] 基于gevent和pymysql实现mysql读写的异步非堵塞方案

数据库 数据库 发布于:2021-07-04 10:40 | 阅读数:451 | 评论:0

  咱们经常使用的mysql库,MySQL-Python库是用C写的,很遗憾它是阻塞的,要实现异步的MySQL驱动必须用Python版本的MySQL驱动!

  现在社区里面有两个纯python实现的mysql驱动。一个是 myconnpy  另一个是PyMysql ~ 这两个mysql驱动文档相当的少呀,好在他们的用法和MySQldb相当的像,不然就要头疼的看代码了。。。 实现的方式是用socket来交互,不像mysqldb封装了libmysqlclient那样 !
  
  myconnpy中国的tornado大牛推荐过,但是也评价过,貌似有些bug的样子。

  我这边就用Mozilla公司 也在用的pymysql ~
  
  咱们先创建数据库和数据表
   DSC0000.jpg
  安装python的mysql模块~
   DSC0001.jpg
  
  PySQL针对mysql操作demo还算简单的~
    
import pymysql
db = pymysql.connect(host = 'localhost', passwd = '123123', user = 'root', db = 'rui')
cursor = db.cursor()
sql='select count(*) from kkk'
data = cursor.execute(sql)
print cursor
cursor.close()
db.close()
  
  其实咱们也可以模拟apache那样prefork模式,来派生任务执行对象。
  prefork采用预派生子进程方式,用单独的子进程来处理 不同的请求,进程之间彼此独立。我这边测试是mysql堵塞方式,大家也可以利用这种方案模拟多个任务执行。有点类似多进程的样子,消耗比较大的~
  
#!/usr/bin/python
# -*- coding: utf-8 -*-
#xiaorui.cc
import MySQLdb, pymysql
import signal, os, sys
                                                                                                                                                                                                                                                                                                  
workers = {}
                                                                                                                                                                                                                                                                                                  
def run():
  con = MySQLdb.connect(host='localhost', db='rui', user='root', passwd='123123')
  signal.signal(signal.SIGTERM, lambda sig, status: sys.exit(0))
  cur = con.cursor()
  cur.execute("SELECT SLEEP(30)")
                                                                                                                                                                                                                                                                                                  
def killall(sig, status):
  for pid in workers.keys():
  os.kill(pid, signal.SIGTERM)
                                                                                                                                                                                                                                                                                                  
def waitall():
  for pid in workers.keys():
  try:
    os.waitpid(pid, 0)
  except:
    print "waitpid: interrupted exception"
                                                                                                                                                                                                                                                                                                  
def main():
  print os.getpid()
  signal.signal(signal.SIGTERM, killall)
  for i in range(3):
  pid = os.fork()
  if pid == 0:
    try:
    run()
    except:
    print "run: interrupted exception"
    sys.exit(0)
  else:
    workers[pid] = 1
  waitall()
                                                                                                                                                                                                                                                                                                  
if __name__ == '__main__':
  main()
  看到这三个sleep30都在跑吗? 看起来生效了,但三个进程还是消耗了130秒。
   DSC0002.jpg
  
   DSC0003.jpg
  
  好了说正题,今天怎么主要说的就是gevent和python下的mysql驱动测试,分享个gevent和pymysql的在一起使用的实例demo ~
   
def goodquery(sql):
  db = pymysql.connect(host = 'localhost', passwd = '123123', user = 'root', db= 'rui')
  cursor = db.cursor()
  data = cursor.execute(sql)
  cursor.close()
  db.close()
  return cursor
sqla='select count(*) from kkk'
sqlb="select * from kkk where name like '%888888%'"
jobs = [gevent.spawn(goodquery, (sqla)),gevent.spawn(goodquery, (sqlb))]
#jobs = [gevent.spawn(goodquery, (sqla)),gevent.spawn(goodquery,(sqlb)),gevent.spawn(goodquery,(sqlb))]
gevent.joinall(jobs, timeout=2)
what_you_want = [job.value for job in jobs]
print what_you_want
for i in what_you_want:
  for a in i:
    print a
  
   DSC0004.jpg
  
  哎,还是有点堵塞。。。 跑了6个耗时3s的sql,共用了18秒的时间。。。。
   DSC0005.jpg
  
  经过一上午的折腾,得知gevent的版本没有用对,只有gevent 1.0 才完美支持socket,然后需要在引入模块的后面,打上别的补丁!
   DSC0006.png
  
gevent.monkey.patch_socket()<span style="background-color: rgb(255, 255, 255);"> </span>
real  0m8.993s
user  0m0.071s
sys   0m0.016s
  在这里我再测试下多线程的版本:
  
import pymysql
import threading
def goodquery(sql):
  db = pymysql.connect(host = 'localhost', passwd = '123123', user = 'root', db= 'rui')
  cursor = db.cursor()
  data = cursor.execute(sql)
  cursor.close()
  db.close()
  print cursor
  return cursor
sqla='select count(*) from kkk'
sqlb="select * from kkk where name like '%888888%'"
#jobs = [gevent.spawn(goodquery,(sqla)),gevent.spawn(goodquery,(sqlb)),gevent.spawn(goodquery,(sqlb)),gevent.spawn(goodquery,(sqlb)),gevent.spawn(goodquery,(sqlb))]
#jobs = [gevent.spawn(goodquery, (sqla)),gevent.spawn(goodquery,(sqlb)),gevent.spawn(goodquery,(sqlb))]
#gevent.joinall(jobs, timeout=30)
#what_you_want = [job.value for job in jobs]
threads=[]
for i in range(5):                                                        
  threads.append( threading.Thread( target=goodquery,args=(sqlb,) ) )
for t in threads:
  t.start()
for t in threads:
  t.join()
  
   DSC0007.jpg
  他的测试结果要比gevent慢点 ~但也是并发的执行,可以在mysql进程里面看到执行的记录 ~
  
[root@101 ~]# time python t.py
real  0m11.122s
user  0m0.095s
sys   0m0.026s<span style="background-color: rgb(255, 255, 255);">  </span>

  总结下:

          gevent pymysql 或者是 threading pymysql 是靠谱的~  是可以解决大数据下的mysql读写堵塞的问题的~   
          但是和sohu、腾讯的朋友讨论了下我的这个方案,mysql堵塞是在与事务处理时发生的。 看来mysql的堵塞不是这么搞解决的,以后有环境后,会继续的追踪这事 ~

关注下面的标签,发现更多相似文章