golang 数据库连接池database/sql 实现原理分析
golang对数据库的请求,抽象出来一套通用的连接池,用go的机制来说,golang只需要提供一个驱动(driver)的interface,底层不同数据库协议,由用户根据自己的数据库实现对应的驱动即可。本文从源码实现的角度,探索这里的细节以及需要避免的坑,基于1.14代码分析,部分bug在1.15中有修复或优化,这里也会提及。
golang版本:1.14
目录结构说明
└── sql
├── convert.go # 结果行的读取与转换
├── convert_test.go
├── ctxutil.go # 绑定上下文的一些通用方法
├── doc.txt
├── driver # driver 定义来实现数据库驱动所需要的接口
│ ├── driver.go
│ ├── types.go # 数据类型别名和转换
│ └── types_test.go
├── example_cli_test.go
├── example_service_test.go
├── example_test.go
├── fakedb_test.go
├── sql.go # 通用的接口和类型,包括事物,连接等
└── sql_test.go
主要数据结构
1. sql.DB
type DB struct {
// Atomic access only. At top of struct to prevent mis-alignment
// on 32-bit platforms. Of type time.Duration.
waitDuration int64 // 等待新的连接所需要的总时间
connector driver.Connector// 数据库驱动自己实现
// numClosed is an atomic counter which represents a total number of
// closed connections. Stmt.openStmt checks it before cleaning closed
// connections in Stmt.css.
numClosed uint64 // 关闭的连接数
mu sync.Mutex // protects following fields
freeConn []*driverConn
connRequests mapchan connRequest
nextRequestuint64 // Next key to use in connRequests.
numOpen int // number of opened and pending open connections
// Used to signal the need for new connections
// a goroutine running connectionOpener() reads on this chan and
// maybeOpenNewConnections sends on the chan (one send per needed connection)
// It is closed during db.Close(). The close tells the connectionOpener
// goroutine to exit.
openerCh chan struct{} // 用于通知需要创建新的连接
// resetterCh chan *driverConn// 已废弃
closed bool
dep mapdepSet // map[一级对象]map[二级对象]bool,一个外部以来,用于自动关闭
lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
maxIdle int // zero means defaultMaxIdleConns(2); negative means 0
maxOpen int // <= 0 means unlimited
maxLifetime time.Duration // maximum amount of time a connection may be reused
cleanerCh chan struct{} // 用于通知清理过期的连接,maxlife时间改变或者连接被关闭时会通过该channel通知
waitCount int64 // Total number of connections waited for. // 这些状态数据,可以通过db.Stat() 获取
maxIdleClosed int64 // Total number of connections closed due to idle.
maxLifetimeClosed int64 // Total number of connections closed due to max free limit.
stop func() // stop cancels the connection opener and the session resetter.
} sql.DB不是一个连接,它是数据库的抽象接口,也是整个连接池的句柄,对多个goroutine是并发安全的。它可以根据driver打开关闭数据库连接,管理连接池。这对不同的数据库来说都是一样的。
2. sql.driverConn
// driverConn wraps a driver.Conn with a mutex, to
// be held during all calls into the Conn. (including any calls onto
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
type driverConn struct {
db *DB
createdAt time.Time
sync.Mutex// guards following
ci driver.Conn// 由不同的驱动自己实现,对应一条具体的数据库连接
needReset bool // The connection session should be reset before use if true.
closed bool // 当前连接的状态,是否已经关闭
finalClosed bool // ci.Close has been called
openStmt map[*driverStmt]bool
// guarded by db.mu
inUse bool
onPut []func() // code (with db.mu held) run when conn is next returned// 归还连接的时候调用
dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked
} 对单个连接的封装,包含了实际的数据库连接以及相关的状态信息等
3. driver.Conn
// Conn is a connection to a database. It is not used concurrently
// by multiple goroutines.
//
// Conn is assumed to be stateful.
type Conn interface {
// Prepare returns a prepared statement, bound to this connection.
Prepare(query string) (Stmt, error)
// Close invalidates and potentially stops any current
// prepared statements and transactions, marking this
// connection as no longer in use.
//
// Because the sql package maintains a free pool of
// connections and only calls Close when there's a surplus of
// idle connections, it shouldn't be necessary for drivers to
// do their own connection caching.
Close() error
// Begin starts and returns a new transaction.
//
// Deprecated: Drivers should implement ConnBeginTx instead (or additionally).
Begin() (Tx, error)
} 一条具体的数据库连接,需要由不同驱动自己去实现接口
4. driver.Driver
type Driver interface {
Open(name string) (Conn, error)
} Driver 只包含一个函数,Open()用来返回一个可用连接,可能是新建立的,也可能是之前缓存的关闭的连接。
5. driver.DriverContext
type DriverContext interface {
// OpenConnector must parse the name in the same format that Driver.Open
// parses the name parameter.
OpenConnector(name string) (Connector, error)
} DriverContext 的目的是维护drievr上下文信息,避免了每次新建连接的时候都需要解析一遍 dsn。需要有Driver对象自己去实现。
6. driver.Connector
type Connector interface {
// Connect returns a connection to the database.
// Connect may return a cached connection (one previously
// closed), but doing so is unnecessary; the sql package
// maintains a pool of idle connections for efficient re-use.
//
// The provided context.Context is for dialing purposes only
// (see net.DialContext) and should not be stored or used for
// other purposes.
//
// The returned connection is only used by one goroutine at a
// time.
Connect(context.Context) (Conn, error)
// Driver returns the underlying Driver of the Connector,
// mainly to maintain compatibility with the Driver method
// on sql.DB.
Driver() Driver
} driver.Connector 是driver的插口,是一个接口类型的对象,由不同类型的数据库来实现。
driver.Connector 包含两个函数。
[*]Connect 用来建立连接
[*]Driver 用来返回一个 Driver 对象,Driver也是个接口类型对象,需要不同的数据库自己去实现。
主要操作流程
1.注册驱动
import (
_ "github.com/go-sql-driver/mysql"
)
var (
driversMu sync.RWMutex
drivers = make(mapdriver.Driver)
)
func Register(name string, driver driver.Driver) {
driversMu.Lock()
defer driversMu.Unlock()
if driver == nil {
panic("sql: Register driver is nil")
}
if _, dup := drivers; dup {
panic("sql: Register called twice for driver " + name)
}
drivers = driver
} /database/sql 提供的是一个通用的数据库连接池,当我们连接不同的数据库时,只需要将对应的数据库驱动注册进去就可以使用。
这里的注册,实际上就是将数据库名称和对应的数据库驱动(数据库连接包装器)添加的一个map中,每个import进来的库,需要在init函数中调用注册函数来实现。
2. 创建连接池句柄 sql.Open()
func Open(driverName, dataSourceName string) (*DB, error) {
driversMu.RLock()
driveri, ok := drivers// 1
driversMu.RUnlock()
if !ok {
return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
}
if driverCtx, ok := driveri.(driver.DriverContext); ok {// 2
connector, err := driverCtx.OpenConnector(dataSourceName)
if err != nil {
return nil, err
}
return OpenDB(connector), nil// 3
}
return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil// 4
}
func OpenDB(c driver.Connector) *DB {
ctx, cancel := context.WithCancel(context.Background())
db := &DB{
connector: c,
openerCh: make(chan struct{}, connectionRequestQueueSize),
lastPut: make(map[*driverConn]string),
connRequests: make(mapchan connRequest),
stop: cancel,
}
go db.connectionOpener(ctx)// 通过channel通知来创建连接
// go db.connectionResetter(ctx) // 用于重置连接,1.14废弃
return db
} Open函数通常解释为初始化db,这里只是通过驱动名称,获取到对应的驱动,并对驱动进行一系列的初始化操作,需要注意的是,Open并不会和db建立连接,只是在操作这些数据结构,启动后台协程之类的动作。
这里的dataSourceName简称dsn,包含了连接数据库所必须的参数,用户名密码ip端口等信息,由不同的驱动自己实现解析,当然,有些驱动也支持在dsn中配置一些数据库参数,如autocommit等。由于解析字符串得到这些信息会有一定的资源消耗,因此,还提供了对解析后的结果缓存的功能,避免了每次建立新的连接都需要解析一次,要做到这一点,需要驱动实现 driver.DriverContext 接口。
这个时候你就有了这样一个结构,不过此时的连接池中并没有连接,也就是说没有真正访问db
11. 不开启事务,如何固定占用一条连接
通过前面这些内容,能够发现,在不开启事务的情况下,连接完成一笔请求,回被放回到free池里去,所以哪怕连续执行两条select,也有可能用的不是同一个实际的数据库连接,某些特殊场景,比如我们执行完存储过程,想要select输出型结果时,这里就不满足要求。
简化下需求,其实是我们想要长时间占用一个连接,开启事务是一种解决方案,不过额外引入事务,可能会造成锁的延迟释放(以mysql两阶段锁为例), 这里可以用Context方法来实现,用法举例
{
var a int
ctx := context.Background()
cn, err := db.Conn(ctx)// 绑定一个连接
if err != nil {
return
}
// 执行第一次查询,将连接所有权转交给rows1
rows1, err := cn.QueryContext(ctx, "select * from t1")
if err != nil {
return
}
_ = rows1.Scan(&a)
_ = rows1.Close() // rows1 close,将连接所有权交给cn
// 执行第二次查询,将连接所有权转交给rows2
rows2, err = cn.QueryContext(ctx, "select * from t1")
if err != nil {
return
}
_ = rows2.Scan(&a)
_ = rows2.Close() // rows1 close,将连接所有权交给cn
// cn close,连接回收,放回free队列
_ = cn.Close()
} 关于db.Conn( ) 返回的sql.Conn对象,需要和driver.Conn 做区分,sql.Conn 是对driverConn的再一次封装,是为里提供连续的单个数据库连接,driver.Conn 是不同驱动要实现的接口
// Conn represents a single database connection rather than a pool of database
// connections. Prefer running queries from DB unless there is a specific
// need for a continuous single database connection.
//
// A Conn must call Close to return the connection to the database pool
// and may do so concurrently with a running query.
//
// After a call to Close, all operations on the
// connection fail with ErrConnDone.
type Conn struct {
db *DB
// closemu prevents the connection from closing while there
// is an active query. It is held for read during queries
// and exclusively during close.
closemu sync.RWMutex
// dc is owned until close, at which point
// it's returned to the connection pool.
dc *driverConn
// done transitions from 0 to 1 exactly once, on close.
// Once done, all operations fail with ErrConnDone.
// Use atomic operations on value when checking value.
done int32
}
12. 监控连接池状态
由于mysql协议是同步的,因此,当客户端游大量的并发请求,但是连接数要小于并发数的情况下,是会有一部分请求被阻塞,等待其它请求释放连接,在某些场景或使用不当的情况下,这里也可能会成为瓶颈。不过库中并没有详细记录每一笔请求的连接等待时间,只提供了累计的等待时间之和,以及其它的监控指标,在定位问题时可以用做参考。
库提供了 db.Stats( ) 方法,会从db对象中获取所有的监控指标,并生成对象 DBStats 对象
func (db *DB) Stats() DBStats {
wait := atomic.LoadInt64(&db.waitDuration)
db.mu.Lock()
defer db.mu.Unlock()
stats := DBStats{
MaxOpenConnections: db.maxOpen,
Idle: len(db.freeConn),
OpenConnections: db.numOpen,
InUse: db.numOpen - len(db.freeConn),
WaitCount: db.waitCount,
WaitDuration: time.Duration(wait),
MaxIdleClosed: db.maxIdleClosed,
MaxLifetimeClosed: db.maxLifetimeClosed,
}
return stats
} 一个简单的使用例子
func monitorConn(db *sql.DB) {
go func(db *sql.DB) {
mt := time.NewTicker(monitorDbInterval * time.Second)
for {
select {
case <-mt.C:
stat := db.Stats()
logutil.Errorf("monitor db conn(%p): maxopen(%d), open(%d), use(%d), idle(%d), "+
"wait(%d), idleClose(%d), lifeClose(%d), totalWait(%v)",
db,
stat.MaxOpenConnections, stat.OpenConnections,
stat.InUse, stat.Idle,
stat.WaitCount, stat.MaxIdleClosed,
stat.MaxLifetimeClosed, stat.WaitDuration)
}
}
}(db)
} 需要注意的是,1.15 之前,对 stat.MaxLifetimeClosed 对象统计会有异常,1.15 之后做了修复。
Attention
[*]注意连接所有者的传递关系,使用完成后要及时回收,如rows.Close(),row.Scan()等,不回收会造成连接泄漏,新的请求会被一直阻塞
[*]尽量避免使用占位符的方式执行sql,推荐自己完成sql的拼接或正常使用stmt
[*]1.15 后支持了对单个连接空闲时间的限制
[*]db.Conn( ) 能够持续占用一条连接,但是在该连接中,就没有办法调用之前prepare生成的stmt,但是在事务中可以,tx.Stmt( )可以生成特定于该事务的stmt
[*]go提供了数据库连接池回收策略,是针对freeConn的,换句话说,连接如果被一直占用,哪怕已经超过了生存时间,也不会被回收
[*]我们注意到,每次对连接池操作时,都要先加一把全局大锁,因此,当连接数较多(>1000),且请求量较大时,会存在较为严重的锁竞争,这一点通过top(sys)指标,以及pprof也能发现,因为,一个简单的方式,是将一个大的连接池拆分为多个小的连接池,一般情况下,通过简单的轮询将请求打散在多个连接池上,能有效降低锁的粒度
【完】
页:
[1]