评论

收藏

[MySQL] golang 数据库连接池database/sql 实现原理分析

数据库 数据库 发布于:2021-06-26 10:54 | 阅读数:1010 | 评论:0

  golang对数据库的请求,抽象出来一套通用的连接池,用go的机制来说,golang只需要提供一个驱动(driver)的interface,底层不同数据库协议,由用户根据自己的数据库实现对应的驱动即可。
  本文从源码实现的角度,探索这里的细节以及需要避免的坑,基于1.14代码分析,部分bug在1.15中有修复或优化,这里也会提及。
  golang版本:1.14

目录结构说明
└── sql
  ├── convert.go       # 结果行的读取与转换
  ├── convert_test.go
  ├── ctxutil.go       # 绑定上下文的一些通用方法
  ├── doc.txt
  ├── driver         # driver 定义来实现数据库驱动所需要的接口
  │   ├── driver.go
  │   ├── types.go     # 数据类型别名和转换
  │   └── types_test.go
  ├── example_cli_test.go
  ├── example_service_test.go
  ├── example_test.go
  ├── fakedb_test.go
  ├── sql.go         # 通用的接口和类型,包括事物,连接等
  └── sql_test.go
主要数据结构

1. sql.DB
type DB struct {
  // Atomic access only. At top of struct to prevent mis-alignment
  // on 32-bit platforms. Of type time.Duration.
  waitDuration int64      // 等待新的连接所需要的总时间
  connector driver.Connector  // 数据库驱动自己实现
  // numClosed is an atomic counter which represents a total number of
  // closed connections. Stmt.openStmt checks it before cleaning closed
  // connections in Stmt.css.
  numClosed uint64       // 关闭的连接数
  mu       sync.Mutex // protects following fields
  freeConn   []*driverConn
  connRequests map[uint64]chan connRequest
  nextRequest  uint64 // Next key to use in connRequests.
  numOpen    int  // number of opened and pending open connections
  // Used to signal the need for new connections
  // a goroutine running connectionOpener() reads on this chan and
  // maybeOpenNewConnections sends on the chan (one send per needed connection)
  // It is closed during db.Close(). The close tells the connectionOpener
  // goroutine to exit.
  openerCh      chan struct{}    // 用于通知需要创建新的连接
  // resetterCh    chan *driverConn  // 已废弃
  closed      bool
  dep         map[finalCloser]depSet // map[一级对象]map[二级对象]bool,一个外部以来,用于自动关闭
  lastPut       map[*driverConn]string // stacktrace of last conn's put; debug only
  maxIdle       int          // zero means defaultMaxIdleConns(2); negative means 0
  maxOpen       int          // <= 0 means unlimited
  maxLifetime     time.Duration      // maximum amount of time a connection may be reused
  cleanerCh     chan struct{}      // 用于通知清理过期的连接,maxlife时间改变或者连接被关闭时会通过该channel通知
  waitCount     int64 // Total number of connections waited for.   // 这些状态数据,可以通过db.Stat() 获取
  maxIdleClosed   int64 // Total number of connections closed due to idle.
  maxLifetimeClosed int64 // Total number of connections closed due to max free limit.
  stop func() // stop cancels the connection opener and the session resetter.
}
  sql.DB不是一个连接,它是数据库的抽象接口,也是整个连接池的句柄,对多个goroutine是并发安全的。它可以根据driver打开关闭数据库连接,管理连接池。这对不同的数据库来说都是一样的。

2. sql.driverConn
// driverConn wraps a driver.Conn with a mutex, to
// be held during all calls into the Conn. (including any calls onto
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
type driverConn struct {
   db    *DB
   createdAt time.Time
   sync.Mutex  // guards following
   ci      driver.Conn  // 由不同的驱动自己实现,对应一条具体的数据库连接
   needReset   bool     // The connection session should be reset before use if true.
   closed    bool     // 当前连接的状态,是否已经关闭
   finalClosed bool     // ci.Close has been called
   openStmt  map[*driverStmt]bool
   // guarded by db.mu
   inUse    bool
   onPut    []func() // code (with db.mu held) run when conn is next returned  // 归还连接的时候调用
   dbmuClosed bool   // same as closed, but guarded by db.mu, for removeClosedStmtLocked
}
  对单个连接的封装,包含了实际的数据库连接以及相关的状态信息等

3. driver.Conn
// Conn is a connection to a database. It is not used concurrently
// by multiple goroutines.
//
// Conn is assumed to be stateful.
type Conn interface {
   // Prepare returns a prepared statement, bound to this connection.
   Prepare(query string) (Stmt, error)
   // Close invalidates and potentially stops any current
   // prepared statements and transactions, marking this
   // connection as no longer in use.
   //
   // Because the sql package maintains a free pool of
   // connections and only calls Close when there's a surplus of
   // idle connections, it shouldn't be necessary for drivers to
   // do their own connection caching.
   Close() error
   // Begin starts and returns a new transaction.
   //
   // Deprecated: Drivers should implement ConnBeginTx instead (or additionally).
   Begin() (Tx, error)
}
  一条具体的数据库连接,需要由不同驱动自己去实现接口

4. driver.Driver
type Driver interface {
  Open(name string) (Conn, error)
}
  Driver 只包含一个函数,Open()用来返回一个可用连接,可能是新建立的,也可能是之前缓存的关闭的连接。

5. driver.DriverContext
type DriverContext interface {
// OpenConnector must parse the name in the same format that Driver.Open
// parses the name parameter.
  OpenConnector(name string) (Connector, error)
}
  DriverContext 的目的是维护drievr上下文信息,避免了每次新建连接的时候都需要解析一遍 dsn。需要有Driver对象自己去实现。

6. driver.Connector
type Connector interface {
// Connect returns a connection to the database.
// Connect may return a cached connection (one previously
// closed), but doing so is unnecessary; the sql package
// maintains a pool of idle connections for efficient re-use.
//
// The provided context.Context is for dialing purposes only
// (see net.DialContext) and should not be stored or used for
// other purposes.
//
// The returned connection is only used by one goroutine at a
// time.
  Connect(context.Context) (Conn, error)
// Driver returns the underlying Driver of the Connector,
// mainly to maintain compatibility with the Driver method
// on sql.DB.
  Driver() Driver
}
  driver.Connector 是driver的插口,是一个接口类型的对象,由不同类型的数据库来实现。
  driver.Connector 包含两个函数。

  • Connect 用来建立连接
  • Driver 用来返回一个 Driver 对象,Driver也是个接口类型对象,需要不同的数据库自己去实现。

主要操作流程

1.  注册驱动
import (
  _ "github.com/go-sql-driver/mysql"
)
var (
  driversMu sync.RWMutex
  drivers   = make(map[string]driver.Driver)
)
func Register(name string, driver driver.Driver) {
  driversMu.Lock()
  defer driversMu.Unlock()
  if driver == nil {
    panic("sql: Register driver is nil")
  }
  if _, dup := drivers[name]; dup {
    panic("sql: Register called twice for driver " + name)
  }
  drivers[name] = driver
}
  /database/sql 提供的是一个通用的数据库连接池,当我们连接不同的数据库时,只需要将对应的数据库驱动注册进去就可以使用。
  这里的注册,实际上就是将数据库名称和对应的数据库驱动(数据库连接包装器)添加的一个map中,每个import进来的库,需要在init函数中调用注册函数来实现。

2. 创建连接池句柄 sql.Open()
func Open(driverName, dataSourceName string) (*DB, error) {
  driversMu.RLock()
  driveri, ok := drivers[driverName]  // 1
  driversMu.RUnlock()
  if !ok {
    return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
  }
  if driverCtx, ok := driveri.(driver.DriverContext); ok {  // 2
    connector, err := driverCtx.OpenConnector(dataSourceName)
    if err != nil {
      return nil, err
    }
    return OpenDB(connector), nil  // 3
  }
  return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil  // 4
}
func OpenDB(c driver.Connector) *DB {
   ctx, cancel := context.WithCancel(context.Background())
   db := &DB{
    connector:  c,
    openerCh:   make(chan struct{}, connectionRequestQueueSize),
    lastPut:    make(map[*driverConn]string),
    connRequests: make(map[uint64]chan connRequest),
    stop:     cancel,
   }
   go db.connectionOpener(ctx)  // 通过channel通知来创建连接
   // go db.connectionResetter(ctx) // 用于重置连接,1.14废弃
   return db
}
  Open函数通常解释为初始化db,这里只是通过驱动名称,获取到对应的驱动,并对驱动进行一系列的初始化操作,需要注意的是,Open并不会和db建立连接,只是在操作这些数据结构,启动后台协程之类的动作。
  这里的dataSourceName简称dsn,包含了连接数据库所必须的参数,用户名密码ip端口等信息,由不同的驱动自己实现解析,当然,有些驱动也支持在dsn中配置一些数据库参数,如autocommit等。由于解析字符串得到这些信息会有一定的资源消耗,因此,还提供了对解析后的结果缓存的功能,避免了每次建立新的连接都需要解析一次,要做到这一点,需要驱动实现 driver.DriverContext 接口。
  这个时候你就有了这样一个结构,不过此时的连接池中并没有连接,也就是说没有真正访问db

DSC0000.png
  
11. 不开启事务,如何固定占用一条连接


  通过前面这些内容,能够发现,在不开启事务的情况下,连接完成一笔请求,回被放回到free池里去,所以哪怕连续执行两条select,也有可能用的不是同一个实际的数据库连接,某些特殊场景,比如我们执行完存储过程,想要select输出型结果时,这里就不满足要求。
  简化下需求,其实是我们想要长时间占用一个连接,开启事务是一种解决方案,不过额外引入事务,可能会造成锁的延迟释放(以mysql两阶段锁为例), 这里可以用Context方法来实现,用法举例
{
   var a int
   ctx := context.Background()
   cn, err := db.Conn(ctx)  // 绑定一个连接
   if err != nil {
    return
   }
   // 执行第一次查询,将连接所有权转交给rows1
   rows1, err := cn.QueryContext(ctx, "select * from t1")
   if err != nil {
    return
   }
   _ = rows1.Scan(&a)
   _ = rows1.Close() // rows1 close,将连接所有权交给cn 
   // 执行第二次查询,将连接所有权转交给rows2
   rows2, err = cn.QueryContext(ctx, "select * from t1")
   if err != nil {
    return
   }
   _ = rows2.Scan(&a)
   _ = rows2.Close() // rows1 close,将连接所有权交给cn
   // cn close,连接回收,放回free队列
   _ = cn.Close()
}
  关于db.Conn( ) 返回的sql.Conn对象,需要和driver.Conn 做区分,sql.Conn 是对driverConn的再一次封装,是为里提供连续的单个数据库连接,driver.Conn 是不同驱动要实现的接口
// Conn represents a single database connection rather than a pool of database
// connections. Prefer running queries from DB unless there is a specific
// need for a continuous single database connection.
//
// A Conn must call Close to return the connection to the database pool
// and may do so concurrently with a running query.
//
// After a call to Close, all operations on the
// connection fail with ErrConnDone.
type Conn struct {
   db *DB
   // closemu prevents the connection from closing while there
   // is an active query. It is held for read during queries
   // and exclusively during close.
   closemu sync.RWMutex
   // dc is owned until close, at which point
   // it's returned to the connection pool.
   dc *driverConn
   // done transitions from 0 to 1 exactly once, on close.
   // Once done, all operations fail with ErrConnDone.
   // Use atomic operations on value when checking value.
   done int32
}
  
12. 监控连接池状态


  由于mysql协议是同步的,因此,当客户端游大量的并发请求,但是连接数要小于并发数的情况下,是会有一部分请求被阻塞,等待其它请求释放连接,在某些场景或使用不当的情况下,这里也可能会成为瓶颈。不过库中并没有详细记录每一笔请求的连接等待时间,只提供了累计的等待时间之和,以及其它的监控指标,在定位问题时可以用做参考。
  库提供了 db.Stats( ) 方法,会从db对象中获取所有的监控指标,并生成对象 DBStats 对象
func (db *DB) Stats() DBStats {
   wait := atomic.LoadInt64(&db.waitDuration)
   db.mu.Lock()
   defer db.mu.Unlock()
   stats := DBStats{
    MaxOpenConnections: db.maxOpen,
    Idle:      len(db.freeConn),
    OpenConnections: db.numOpen,
    InUse:       db.numOpen - len(db.freeConn),
    WaitCount:     db.waitCount,
    WaitDuration:    time.Duration(wait),
    MaxIdleClosed:   db.maxIdleClosed,
    MaxLifetimeClosed: db.maxLifetimeClosed,
   }
   return stats
}
  一个简单的使用例子
func monitorConn(db *sql.DB) {
   go func(db *sql.DB) {
    mt := time.NewTicker(monitorDbInterval * time.Second)
    for {
     select {
     case <-mt.C:
      stat := db.Stats()
      logutil.Errorf("monitor db conn(%p): maxopen(%d), open(%d), use(%d), idle(%d), "+
         "wait(%d), idleClose(%d), lifeClose(%d), totalWait(%v)",
         db,
         stat.MaxOpenConnections, stat.OpenConnections,
         stat.InUse, stat.Idle,
         stat.WaitCount, stat.MaxIdleClosed,
         stat.MaxLifetimeClosed, stat.WaitDuration)
     }
    }
   }(db)
}
  需要注意的是,1.15 之前,对 stat.MaxLifetimeClosed 对象统计会有异常,1.15 之后做了修复。

Attention


  • 注意连接所有者的传递关系,使用完成后要及时回收,如rows.Close(),row.Scan()等,不回收会造成连接泄漏,新的请求会被一直阻塞
  • 尽量避免使用占位符的方式执行sql,推荐自己完成sql的拼接或正常使用stmt
  • 1.15 后支持了对单个连接空闲时间的限制
  • db.Conn( ) 能够持续占用一条连接,但是在该连接中,就没有办法调用之前prepare生成的stmt,但是在事务中可以,tx.Stmt( )可以生成特定于该事务的stmt
  • go提供了数据库连接池回收策略,是针对freeConn的,换句话说,连接如果被一直占用,哪怕已经超过了生存时间,也不会被回收
  • 我们注意到,每次对连接池操作时,都要先加一把全局大锁,因此,当连接数较多(>1000),且请求量较大时,会存在较为严重的锁竞争,这一点通过top(sys)指标,以及pprof也能发现,因为,一个简单的方式,是将一个大的连接池拆分为多个小的连接池,一般情况下,通过简单的轮询将请求打散在多个连接池上,能有效降低锁的粒度
  
【完】
关注下面的标签,发现更多相似文章