上山打老虎 发表于 2021-6-26 10:54:35

golang 数据库连接池database/sql 实现原理分析

  golang对数据库的请求,抽象出来一套通用的连接池,用go的机制来说,golang只需要提供一个驱动(driver)的interface,底层不同数据库协议,由用户根据自己的数据库实现对应的驱动即可。
  本文从源码实现的角度,探索这里的细节以及需要避免的坑,基于1.14代码分析,部分bug在1.15中有修复或优化,这里也会提及。
  golang版本:1.14

目录结构说明

└── sql
    ├── convert.go         # 结果行的读取与转换
    ├── convert_test.go
    ├── ctxutil.go         # 绑定上下文的一些通用方法
    ├── doc.txt
    ├── driver               # driver 定义来实现数据库驱动所需要的接口
    │   ├── driver.go
    │   ├── types.go         # 数据类型别名和转换
    │   └── types_test.go
    ├── example_cli_test.go
    ├── example_service_test.go
    ├── example_test.go
    ├── fakedb_test.go
    ├── sql.go               # 通用的接口和类型,包括事物,连接等
    └── sql_test.go
主要数据结构

1. sql.DB

type DB struct {
    // Atomic access only. At top of struct to prevent mis-alignment
    // on 32-bit platforms. Of type time.Duration.
    waitDuration int64          // 等待新的连接所需要的总时间
    connector driver.Connector// 数据库驱动自己实现
    // numClosed is an atomic counter which represents a total number of
    // closed connections. Stmt.openStmt checks it before cleaning closed
    // connections in Stmt.css.
    numClosed uint64         // 关闭的连接数

    mu         sync.Mutex // protects following fields
    freeConn   []*driverConn
    connRequests mapchan connRequest
    nextRequestuint64 // Next key to use in connRequests.
    numOpen      int    // number of opened and pending open connections
    // Used to signal the need for new connections
    // a goroutine running connectionOpener() reads on this chan and
    // maybeOpenNewConnections sends on the chan (one send per needed connection)
    // It is closed during db.Close(). The close tells the connectionOpener
    // goroutine to exit.
    openerCh          chan struct{}      // 用于通知需要创建新的连接
    // resetterCh      chan *driverConn// 已废弃
    closed            bool
    dep               mapdepSet // map[一级对象]map[二级对象]bool,一个外部以来,用于自动关闭
    lastPut         map[*driverConn]string // stacktrace of last conn's put; debug only
    maxIdle         int                  // zero means defaultMaxIdleConns(2); negative means 0
    maxOpen         int                  // <= 0 means unlimited
    maxLifetime       time.Duration          // maximum amount of time a connection may be reused
    cleanerCh         chan struct{}          // 用于通知清理过期的连接,maxlife时间改变或者连接被关闭时会通过该channel通知
    waitCount         int64 // Total number of connections waited for.   // 这些状态数据,可以通过db.Stat() 获取
    maxIdleClosed   int64 // Total number of connections closed due to idle.
    maxLifetimeClosed int64 // Total number of connections closed due to max free limit.

    stop func() // stop cancels the connection opener and the session resetter.
}  sql.DB不是一个连接,它是数据库的抽象接口,也是整个连接池的句柄,对多个goroutine是并发安全的。它可以根据driver打开关闭数据库连接,管理连接池。这对不同的数据库来说都是一样的。

2. sql.driverConn

// driverConn wraps a driver.Conn with a mutex, to
// be held during all calls into the Conn. (including any calls onto
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
type driverConn struct {
   db      *DB
   createdAt time.Time

   sync.Mutex// guards following
   ci          driver.Conn// 由不同的驱动自己实现,对应一条具体的数据库连接
   needReset   bool         // The connection session should be reset before use if true.
   closed      bool         // 当前连接的状态,是否已经关闭
   finalClosed bool         // ci.Close has been called
   openStmt    map[*driverStmt]bool

   // guarded by db.mu
   inUse      bool
   onPut      []func() // code (with db.mu held) run when conn is next returned// 归还连接的时候调用
   dbmuClosed bool   // same as closed, but guarded by db.mu, for removeClosedStmtLocked
}  对单个连接的封装,包含了实际的数据库连接以及相关的状态信息等

3. driver.Conn

// Conn is a connection to a database. It is not used concurrently
// by multiple goroutines.
//
// Conn is assumed to be stateful.
type Conn interface {
   // Prepare returns a prepared statement, bound to this connection.
   Prepare(query string) (Stmt, error)

   // Close invalidates and potentially stops any current
   // prepared statements and transactions, marking this
   // connection as no longer in use.
   //
   // Because the sql package maintains a free pool of
   // connections and only calls Close when there's a surplus of
   // idle connections, it shouldn't be necessary for drivers to
   // do their own connection caching.
   Close() error

   // Begin starts and returns a new transaction.
   //
   // Deprecated: Drivers should implement ConnBeginTx instead (or additionally).
   Begin() (Tx, error)
}  一条具体的数据库连接,需要由不同驱动自己去实现接口

4. driver.Driver

type Driver interface {
    Open(name string) (Conn, error)
}  Driver 只包含一个函数,Open()用来返回一个可用连接,可能是新建立的,也可能是之前缓存的关闭的连接。

5. driver.DriverContext

type DriverContext interface {
// OpenConnector must parse the name in the same format that Driver.Open
// parses the name parameter.
    OpenConnector(name string) (Connector, error)
}  DriverContext 的目的是维护drievr上下文信息,避免了每次新建连接的时候都需要解析一遍 dsn。需要有Driver对象自己去实现。

6. driver.Connector

type Connector interface {
// Connect returns a connection to the database.
// Connect may return a cached connection (one previously
// closed), but doing so is unnecessary; the sql package
// maintains a pool of idle connections for efficient re-use.
//
// The provided context.Context is for dialing purposes only
// (see net.DialContext) and should not be stored or used for
// other purposes.
//
// The returned connection is only used by one goroutine at a
// time.
    Connect(context.Context) (Conn, error)
// Driver returns the underlying Driver of the Connector,
// mainly to maintain compatibility with the Driver method
// on sql.DB.
    Driver() Driver
}  driver.Connector 是driver的插口,是一个接口类型的对象,由不同类型的数据库来实现。
  driver.Connector 包含两个函数。

[*]Connect 用来建立连接
[*]Driver 用来返回一个 Driver 对象,Driver也是个接口类型对象,需要不同的数据库自己去实现。

主要操作流程

1.注册驱动

import (
    _ "github.com/go-sql-driver/mysql"
)

var (
    driversMu sync.RWMutex
    drivers   = make(mapdriver.Driver)
)
func Register(name string, driver driver.Driver) {
    driversMu.Lock()
    defer driversMu.Unlock()
    if driver == nil {
      panic("sql: Register driver is nil")
    }
    if _, dup := drivers; dup {
      panic("sql: Register called twice for driver " + name)
    }
    drivers = driver
}  /database/sql 提供的是一个通用的数据库连接池,当我们连接不同的数据库时,只需要将对应的数据库驱动注册进去就可以使用。
  这里的注册,实际上就是将数据库名称和对应的数据库驱动(数据库连接包装器)添加的一个map中,每个import进来的库,需要在init函数中调用注册函数来实现。

2. 创建连接池句柄 sql.Open()

func Open(driverName, dataSourceName string) (*DB, error) {
    driversMu.RLock()
    driveri, ok := drivers// 1
    driversMu.RUnlock()
    if !ok {
      return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
    }
    if driverCtx, ok := driveri.(driver.DriverContext); ok {// 2
      connector, err := driverCtx.OpenConnector(dataSourceName)
      if err != nil {
            return nil, err
      }
      return OpenDB(connector), nil// 3
    }
    return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil// 4
}

func OpenDB(c driver.Connector) *DB {
   ctx, cancel := context.WithCancel(context.Background())
   db := &DB{
      connector:    c,
      openerCh:   make(chan struct{}, connectionRequestQueueSize),
      lastPut:      make(map[*driverConn]string),
      connRequests: make(mapchan connRequest),
      stop:         cancel,
   }

   go db.connectionOpener(ctx)// 通过channel通知来创建连接
   // go db.connectionResetter(ctx) // 用于重置连接,1.14废弃
   return db
}  Open函数通常解释为初始化db,这里只是通过驱动名称,获取到对应的驱动,并对驱动进行一系列的初始化操作,需要注意的是,Open并不会和db建立连接,只是在操作这些数据结构,启动后台协程之类的动作。
  这里的dataSourceName简称dsn,包含了连接数据库所必须的参数,用户名密码ip端口等信息,由不同的驱动自己实现解析,当然,有些驱动也支持在dsn中配置一些数据库参数,如autocommit等。由于解析字符串得到这些信息会有一定的资源消耗,因此,还提供了对解析后的结果缓存的功能,避免了每次建立新的连接都需要解析一次,要做到这一点,需要驱动实现 driver.DriverContext 接口。
  这个时候你就有了这样一个结构,不过此时的连接池中并没有连接,也就是说没有真正访问db


  
11. 不开启事务,如何固定占用一条连接


  通过前面这些内容,能够发现,在不开启事务的情况下,连接完成一笔请求,回被放回到free池里去,所以哪怕连续执行两条select,也有可能用的不是同一个实际的数据库连接,某些特殊场景,比如我们执行完存储过程,想要select输出型结果时,这里就不满足要求。
  简化下需求,其实是我们想要长时间占用一个连接,开启事务是一种解决方案,不过额外引入事务,可能会造成锁的延迟释放(以mysql两阶段锁为例), 这里可以用Context方法来实现,用法举例
{
   var a int
   ctx := context.Background()
   cn, err := db.Conn(ctx)// 绑定一个连接
   if err != nil {
      return
   }

   // 执行第一次查询,将连接所有权转交给rows1
   rows1, err := cn.QueryContext(ctx, "select * from t1")
   if err != nil {
      return
   }
   _ = rows1.Scan(&a)
   _ = rows1.Close() // rows1 close,将连接所有权交给cn

   // 执行第二次查询,将连接所有权转交给rows2
   rows2, err = cn.QueryContext(ctx, "select * from t1")
   if err != nil {
      return
   }
   _ = rows2.Scan(&a)
   _ = rows2.Close() // rows1 close,将连接所有权交给cn

   // cn close,连接回收,放回free队列
   _ = cn.Close()
}  关于db.Conn( ) 返回的sql.Conn对象,需要和driver.Conn 做区分,sql.Conn 是对driverConn的再一次封装,是为里提供连续的单个数据库连接,driver.Conn 是不同驱动要实现的接口
// Conn represents a single database connection rather than a pool of database
// connections. Prefer running queries from DB unless there is a specific
// need for a continuous single database connection.
//
// A Conn must call Close to return the connection to the database pool
// and may do so concurrently with a running query.
//
// After a call to Close, all operations on the
// connection fail with ErrConnDone.

type Conn struct {
   db *DB

   // closemu prevents the connection from closing while there
   // is an active query. It is held for read during queries
   // and exclusively during close.
   closemu sync.RWMutex

   // dc is owned until close, at which point
   // it's returned to the connection pool.
   dc *driverConn

   // done transitions from 0 to 1 exactly once, on close.
   // Once done, all operations fail with ErrConnDone.
   // Use atomic operations on value when checking value.
   done int32
}  
12. 监控连接池状态


  由于mysql协议是同步的,因此,当客户端游大量的并发请求,但是连接数要小于并发数的情况下,是会有一部分请求被阻塞,等待其它请求释放连接,在某些场景或使用不当的情况下,这里也可能会成为瓶颈。不过库中并没有详细记录每一笔请求的连接等待时间,只提供了累计的等待时间之和,以及其它的监控指标,在定位问题时可以用做参考。
  库提供了 db.Stats( ) 方法,会从db对象中获取所有的监控指标,并生成对象 DBStats 对象
func (db *DB) Stats() DBStats {
   wait := atomic.LoadInt64(&db.waitDuration)

   db.mu.Lock()
   defer db.mu.Unlock()

   stats := DBStats{
      MaxOpenConnections: db.maxOpen,

      Idle:            len(db.freeConn),
      OpenConnections: db.numOpen,
      InUse:         db.numOpen - len(db.freeConn),

      WaitCount:         db.waitCount,
      WaitDuration:      time.Duration(wait),
      MaxIdleClosed:   db.maxIdleClosed,
      MaxLifetimeClosed: db.maxLifetimeClosed,
   }
   return stats
}  一个简单的使用例子
func monitorConn(db *sql.DB) {
   go func(db *sql.DB) {
      mt := time.NewTicker(monitorDbInterval * time.Second)
      for {
         select {
         case <-mt.C:
            stat := db.Stats()
            logutil.Errorf("monitor db conn(%p): maxopen(%d), open(%d), use(%d), idle(%d), "+
               "wait(%d), idleClose(%d), lifeClose(%d), totalWait(%v)",
               db,
               stat.MaxOpenConnections, stat.OpenConnections,
               stat.InUse, stat.Idle,
               stat.WaitCount, stat.MaxIdleClosed,
               stat.MaxLifetimeClosed, stat.WaitDuration)
         }
      }
   }(db)
}  需要注意的是,1.15 之前,对 stat.MaxLifetimeClosed 对象统计会有异常,1.15 之后做了修复。

Attention


[*]注意连接所有者的传递关系,使用完成后要及时回收,如rows.Close(),row.Scan()等,不回收会造成连接泄漏,新的请求会被一直阻塞
[*]尽量避免使用占位符的方式执行sql,推荐自己完成sql的拼接或正常使用stmt
[*]1.15 后支持了对单个连接空闲时间的限制
[*]db.Conn( ) 能够持续占用一条连接,但是在该连接中,就没有办法调用之前prepare生成的stmt,但是在事务中可以,tx.Stmt( )可以生成特定于该事务的stmt
[*]go提供了数据库连接池回收策略,是针对freeConn的,换句话说,连接如果被一直占用,哪怕已经超过了生存时间,也不会被回收
[*]我们注意到,每次对连接池操作时,都要先加一把全局大锁,因此,当连接数较多(>1000),且请求量较大时,会存在较为严重的锁竞争,这一点通过top(sys)指标,以及pprof也能发现,因为,一个简单的方式,是将一个大的连接池拆分为多个小的连接池,一般情况下,通过简单的轮询将请求打散在多个连接池上,能有效降低锁的粒度
  
【完】
页: [1]
查看完整版本: golang 数据库连接池database/sql 实现原理分析