presto-go-client 是一个golang的Presto客户端,基于golang database/sql通用接口实现,方便开发者快速方便使用。这也是定义接口协议的意义体现。
使用
db, err := sql.Open("presto", "http://localhost:9")
if err != nil {
t.Fatal(err)
}
defer db.Close()
rows, err := db.Query("select 1 ", sql.Named("X-Presto-User", "root"))
if err != nil {
t.Fatal(err)
}
var testId string
for rows.Next() {
err := rows.Scan(&testId)
if err != nil {
t.Fatal(err)
}
}
由于实现了database/sql通用数据库操作接口,使用方式和其他数据库一样。
presto实现
注册Driver
sql.Register("presto", &sqldriver{})
规范操作
sql.Open
初始化DB Struct: OpenDB不验证参数,但是不真正创建和数据库的链接。数据库链接的建立时机是在第一次真正需要建立的时候。
- 保存Connector:链接器,保存链接的信息,以及driver(这里是指presto,即实现了driver.Driver接口的结构体)指针
- 初始化openerCh:开启器channel,是一个默认1000000长度的struct{} chan
- 链接请求记录map初始化
- 可取消context初始化
sql.DB不是数据库链接,而是数据库的一种虚拟,它可以做很重要的任务:打开关闭链接;链接池管理。实现并发访问数据存储的功能。
sql.DB被设计为持久存在的,不要频繁的开启和关闭。通用的方案是为每一个数据库创建一个全局、或者方便传递的对象,保持对象存在,直到程序结束。
创建链接
链接建立是在query函数db.query
调用的时候, db.conn
完成这些操作:
- 判断数据库是否关闭(db.closed), DB的Close函数,会赋值该开关为true
- 检测context是否Done,如果Done会返回
ctx.Err()
,这里也是QueryContext中context的实现 - 取
db.freeConn
中的最后一个链接,或者调用db.connector.Connect
重新创建一个或多个(给openerCh传递信号)
链接池是一个管理空间链接,给请求复用链接的机制。database/sql的实现: 当某个goroutine需要链接的时候,先查看空间链接数组是否有可用,如果有,直接返回;如果没有,则需要判断当前开启的链接数是否达到最大值,如果是阻塞goroutine。否则创建一个链接。
链接池相关操作对应函数:
- DB.conn:获取链接
- driverConn.releaseConn:调用DB.PutConn释放连接
- DB.PutConn:进行一些处理,调用DB.putConnDBLocked。
- DB.putConnDBLocked:丢弃连接、返回连接给等待的协程或放到空闲队列。
- DB.maybeOpenNewConnections:根据目前的连接请求数量和目前还能打开的连接数量判断是否发送创建连接信号给协程connectionOpener。
- DB.connectionOpener:负责异步创建连接。
presto真正意义上的链接建立就在这一步,由于presto是一个http client,所以建立链接的过程很简单,只需要初始化以及验证即可:
- 解析dsn
- kerberos检测
- 增加了custom_client的支持
- 初始化http headers
c := &Conn{
baseURL: prestoURL.Scheme + "://" + prestoURL.Host,
httpClient: *httpClient,
httpHeaders: make(http.Header),
kerberosClient: kerberosClient,
kerberosEnabled: kerberosEnabled,
}
查询执行
查询执行对应函数:
- QueryContext 接受sql、参数,获取数据库链接,调用Driver的对应方法查询数据
- Query是QueryContext不支持context的版本
- Exec 修改数据,注意如果不关心返回结果,推荐使用Exec,因为Query会保留数据库链接,直到sql.Rows关闭。可能存在维度数据,导致链接不被释放。
var testId string
for rows.Next() {
err := rows.Scan(&testId)
if err != nil {
t.Fatal(err)
}
}
Presto在这里的实现,又可以被借鉴的地方。它利用NamedArgs来设置http headers。
- 检查args是否有特定的参数,设置header
- 发送post请求给/v1/statement
- 解析为driverRows
- 如果nextUri非空,迭代rows.fetch,直到获取到最后的数据
POST http://127.0.0.1:8081/v1/statement
Host: 127.0.0.1:8081
User-Agent: Go-http-client/1.1
X-Presto-Catalog: hive
X-Presto-User: test
Accept-Encoding: gzip
select 1
next && scan
next
next函数结果指示下一行是否可用:true 可用,false 数据集耗尽(io.EOF),或者中间出现错误。 注意 rows的关闭这个时候是调用房的责任,当rows开启状态的时候, 数据库链接是忙碌状态。如果忘记close,或者rows存在时间较长,是可能出现链接泄漏的情况的。
func (qr *driverRows) Next(dest []driver.Value) error {
// ...
if qr.columns == nil || qr.rowindex >= len(qr.data) {
if qr.nextURI == "" {
qr.err = io.EOF
return qr.err
}
if err := qr.fetch(true); err != nil {
return err
}
}
// ...
for i, v := range qr.coltype {
vv, err := v.ConvertValue(qr.data[qr.rowindex][i])
// ...
dest[i] = vv
}
qr.rowindex++
return nil
}
以上是presto的next实现:
- columns如果为初始化为nil,代表没有数据或执行错误
- rowindex大于等于时机的数据:nextUri不为空代表有后续数据,调用fetch继续请求
- 正常情况挨个转化数据到dest,提供给scan使用
scan
for i, sv := range rs.lastcols {
err := convertAssignRows(dest[i], sv, rs)
if err != nil {
return fmt.Errorf(`sql: Scan error on column index %d, name %q: %w`, i, rs.rowsi.Columns()[i], err)
}
}
注意lastcols,就是next的参数dest。scan辅助你对结果的操作,帮你自己转换变量,迭代数据集的每一行。
// 仅需一行代码
err = db.QueryRow("select name from users where id = ?", 1).Scan(&name)
Driver的意义
Driver是真正执行任务的代码,像presto-go-client,所有和presto链接建立、数据解析等操作都由该库完成。而database/sql,定义了整个数据库操作流程,以及所有相关的结构定义。presto-go-client只需要按照定义实现出来,即可以被使用。
这样的设计,使你使用多种数据库(对应多个driver),却是相同的代码,相同的流程。也方便初次使用某个数据库的时候,能够快速接入。也方便driver开发者,无需关心管理上的细节,只需要实现就好。
但是也限制了自己任务发挥的能力,有得也有失。像presto提供的是一个http协议的接口,参数、验证信息都是依赖http协议,一些附加信息也无法突破next/scan操作的限制。