Go并发模式:context上下文
在用gin框架写一个go web app的时候,发现浏览器关闭时gin的handler没有立刻结束,为了实现让浏览器关闭时,gin的handler也及时关闭,遂google发现了context包,于是顺手翻译一下这篇介绍context包的博客。
翻译自:https://blog.golang.org/context (需科学上网)
介绍
在Go服务器中,每一个请求都有一个负责处理它的请求处理器(也是一个goroutine),请求处理器经常会启动额外的goroutine去访问后端,如数据库、RPC服务等,这些由请求处理器启动的goroutine常常需要获取对应请求携带的值,如用户ID、授权token、请求的超时时间等,当请求被取消或者超时,这些goroutine也应该要尽快结束,以便系统可以尽快回收它们占用的资源。
因此Google开发了context包,使跨API将请求作用域(request-scoped)变量、取消信号、超时时间传递给请求处理器启动的goroutine变得简单起来。这篇文章就是讲述如何使用context包以及给出了一个完整示例。
Context(上下文)
context包的核心是 Context 类型:
// Context携带超时时间,取消信号,请求作用域变量横跨API边界
// Context的方法是并发安全的
type Context interface {
// Done 返回一个通道,当Context取消或超时,该通道关闭
Done() <-chan struct{}
// Err 在Done通道关闭后给出该Context取消的原因
Err() error
// Deadline 如果Context有超时限制,该方法会返回Context将被取消的时间
Deadline() (deadline time.Time, ok bool)
// Value 返回该Context携带的变量
Value(key interface{}) interface{}
}
(代码以及描述经过精简,推荐阅读 godoc)
Done方法返回一个通道用于传递Context的取消信号:当Done通道关闭的时候,跟该Context相关的方法应该放弃工作并且返回。Err方法返回Context被取消的原因。另外这篇文章(Pipelines and Cancelation)详细讨论了Done通道的用法。
Context类型没有Cancel方法和Done通道只读的原因是相同的:接收取消信号的方法一般都不会是发送取消信号的那个,比方说当父任务启动goroutine去执行子任务时,子任务不应该能够取消父任务,相反,WithCancel方法(下面会讲)提供了取消Context实例的途径。
Context是并发安全的,因此可以在代码中将一个Context实例传递给任意个goroutine并且取消这个Context来通知所有goroutine。
Deadline方法允许程序去决定是否要开始工作,如果Context距离剩余的时间不足以完成工作,那开始工作的开销就不那么划算了,代码中也可以根据超时期限来设定IO操作的超时时间。
Value方法允许Context示例携带请求作用域的数据,这些数据必须得是并发安全的。
Derived contexts(派生上下文)
context包提供了从已有Context实例派生新实例的方法,这些实例会形成一棵树:当一个Context实例被取消,所有由它派生的Context都会被取消
后台(Background)是所有Context树的根,它是永远不会被取消的:
// Background 返回一个空的Context。它永远不会被取消,也没有超时,并且不携带任何值,
// Background 通常作为顶级的Context用在main方法、init方法和tests中
func Background() Context
WithCancel和WithTimeout方法从给定的Context派生出新的Context,新的Context可以在父Context结束前被取消。跟请求关联的Context通常会在请求返回的时候被取消。WithCancel在取消多余的请求时很有用,WithTimeout则在需要给后端请求设置超时时很有用:
// WithCancel 返回父Context的拷贝和cancel方法,当父Context的Done通道关闭或cancel方法被调用时,该拷贝的Done通道被关闭
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// CancelFunc取消Context
type CancelFunc func()
// WithTimeout 返回父Context的拷贝和cancel方法,当父Context的Done通道关闭或cancel方法被调用或达到设定的超时期限时,该拷贝的Done通道关闭
// 该拷贝的超时期限为now + timeout或父Context的超时期限,当该拷贝被取消时,会回收timer所占用的资源
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
WithValue提供了将请求作用域的变量关联到Context上的途径:
// WithValue 返回携带了传入的key-value的父Context的拷贝
func WithValue(parent Context, key interface{}, val interface{}) Context
了解Context的作用最好的方法还是看实际例子。
Example: Google Web Search(示例:google 搜索)
本示例是一个HTTP服务器,处理类似这样的URL:/search?q=golang&timeout=1s,将query中的"golang"发送给 Google Web Search API (译者注:该API已经凉了,因此译者最后给出的完整示例代码做了些许更改)并且将返回结果输出,timeout参数决定HTTP服务器在多久收取消这个请求。
代码分了3个包:
- server:main方法,处理/search请求
- userip:用来获取用户的IP并且关联到Context上用于发送给Google Search API
- google:发送请求到Google Search API的相关代码
The server program
server程序处理/search?q=golang请求,它将handleSearch方法注册到/search路径上,该请求处理器会初始化变量名为ctx的Context,当请求返回时会安排取消掉ctx,如果URL中带timeout参数,则超时后立刻取消ctx:
func handleSearch(w http.ResponseWriter, req *http.Request) {
// ctx is the Context for this handler. Calling cancel closes the
// ctx.Done channel, which is the cancellation signal for requests
// started by this handler.
var (
ctx context.Context
cancel context.CancelFunc
)
timeout, err := time.ParseDuration(req.FormValue("timeout"))
if err == nil {
// The request has a timeout, so create a context that is
// canceled automatically when the timeout expires.
ctx, cancel = context.WithTimeout(context.Background(), timeout)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
defer cancel() // Cancel ctx as soon as handleSearch returns.
请求处理器利用userip包的方法获得用户的IP,因为IP参数是后端服务需要的参数(google search会根据用户所在国家返回对应结果),因此绑定到ctx上:
// Check the search query.
query := req.FormValue("q")
if query == "" {
http.Error(w, "no query", http.StatusBadRequest)
return
}
// Store the user IP in ctx for use by code in other packages.
userIP, err := userip.FromRequest(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
ctx = userip.NewContext(ctx, userIP)
请求处理器以ctx和查询语句为参数调用google.Search:
// Run the Google search and print the results.
start := time.Now()
results, err := google.Search(ctx, query)
elapsed := time.Since(start)
如果搜索成功,请求处理器输出结果:
if err := resultsTemplate.Execute(w, struct {
Results google.Results
Timeout, Elapsed time.Duration
}{
Results: results,
Timeout: timeout,
Elapsed: elapsed,
}); err != nil {
log.Print(err)
return
}
Package userip
userip包提供获取用户IP以及将IP关联到Context的方法。Context提供了key-value的映射表,key和value都是interface{}类型,因为key的类型需要支持等式比较(support equality),value则必须是并发安全的,userip包隐藏了映射的细节并且使用自定义的类型获取Context的值。
为了避免key冲突,userip定义了一个非导出的私有类型 type key ,并且用该类型的值作为context值的key:
// The key type is unexported to prevent collisions with context keys defined in
// other packages.
type key int
// userIPkey is the context key for the user IP address. Its value of zero is
// arbitrary. If this package defined other context keys, they would have
// different integer values.
const userIPKey key = 0
FromRequest从http.Request中获取用户IP:
func FromRequest(req *http.Request) (net.IP, error) {
ip, _, err := net.SplitHostPort(req.RemoteAddr)
if err != nil {
return nil, fmt.Errorf("userip: %q is not IP:port", req.RemoteAddr)
}
NewContext返回携带了用户IP的新的Context:
func NewContext(ctx context.Context, userIP net.IP) context.Context {
return context.WithValue(ctx, userIPKey, userIP)
}
FromContext从Context中获取用户IP:
func FromContext(ctx context.Context) (net.IP, bool) {
// ctx.Value returns nil if ctx has no value for the key;
// the net.IP type assertion returns ok=false for nil.
userIP, ok := ctx.Value(userIPKey).(net.IP)
return userIP, ok
}
Package google
google.Search方法发送一个HTTP请求到Google Web Search API并且解析JSON结果,该方法接收一个Context类型参数ctx,当ctx被取消时,中止这次HTTP请求。
Google Web Search API 的请求参数包含搜索关键字和用户IP:
func Search(ctx context.Context, query string) (Results, error) {
// Prepare the Google Search API request.
req, err := http.NewRequest("GET", "https://ajax.googleapis.com/ajax/services/search/web?v=1.0", nil)
if err != nil {
return nil, err
}
q := req.URL.Query()
q.Set("q", query)
// If ctx is carrying the user IP address, forward it to the server.
// Google APIs use the user IP to distinguish server-initiated requests
// from end-user requests.
if userIP, ok := userip.FromContext(ctx); ok {
q.Set("userip", userIP.String())
}
req.URL.RawQuery = q.Encode()
Search方法使用httpDo方法来发送HTTP请求,当ctx.Done关闭时取消该请求,无论该请求目前是在发送中还是响应中。Search传递一个闭包函数给httpDo来处理HTTP响应:
var results Results
err = httpDo(ctx, req, func(resp *http.Response, err error) error {
if err != nil {
return err
}
defer resp.Body.Close()
// Parse the JSON search result.
// https://developers.google.com/web-search/docs/#fonje
var data struct {
ResponseData struct {
Results []struct {
TitleNoFormatting string
URL string
}
}
}
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return err
}
for _, res := range data.ResponseData.Results {
results = append(results, Result{Title: res.TitleNoFormatting, URL: res.URL})
}
return nil
})
// httpDo waits for the closure we provided to return, so it's safe to
// read results here.
return results, err
httpDo方法启动新的goroutine来发送HTTP请求和处理响应,在goroutine结束前,它会在ctx.Done关闭时取消这个请求:
func httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error {
// Run the HTTP request in a goroutine and pass the response to f.
tr := &http.Transport{}
client := &http.Client{Transport: tr}
c := make(chan error, 1)
go func() { c <- f(client.Do(req)) }()
select {
case <-ctx.Done():
tr.CancelRequest(req)
<-c // Wait for f to return.
return ctx.Err()
case err := <-c:
return err
}
}
Adapting code for Contexts
许多框架如Gorilla和Tomb之类的都支持context云云… 懒得翻了这段。
Conclusion(结论)
在谷歌,我们要求Go程序员将Context作为第一参数传递给所有在请求路径中方法。这使得Go代码在多个团队间能够很好地交互。它提供了一个简单时间控制和取消方法,并且保证关键的数据如安全认证可以在Go程序中正确地流转。
服务器框架如果想基于Context来构建的话,应该要实现用Context来桥接框架的包以及其他需要Context参数的包,他们的client类库可以从调用代码接收Context变量。通过发布请求作用域参数以及取消方案的通用接口Context,使得开发者可以更简单地分享代码来构建可伸缩服务。
原作者: Sameer Ajmani
---------------------------------------------------------------------------------------------------------------------------------
完整可运行的代码示例
代码组织形式
-- context_test
----main.go
----userip
------userip.go
------google.go
main.go
package main
import (
"context_test/google"
"context_test/userip"
"context"
"time"
"net/http"
"log"
"encoding/json"
)
func handleSearch(w http.ResponseWriter, req *http.Request) {
// ctx is the Context for this handler. Calling cancel closes the
// ctx.Done channel, which is the cancellation signal for requests
// started by this handler.
var (
ctx context.Context
cancel context.CancelFunc
)
timeout, err := time.ParseDuration(req.FormValue("timeout"))
if err == nil {
// The request has a timeout, so create a context that is
// canceled automatically when the timeout expires.
ctx, cancel = context.WithTimeout(context.Background(), timeout)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
defer cancel() // Cancel ctx as soon as handleSearch returns.
// Check the search query.
query := req.FormValue("q")
if query == "" {
http.Error(w, "no query", http.StatusBadRequest)
return
}
// Store the user IP in ctx for use by code in other packages.
userIP, err := userip.FromRequest(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
ctx = userip.NewContext(ctx, userIP)
// Run the Google search and print the results.
start := time.Now()
results, err := google.Search(ctx, query)
elapsed := time.Since(start)
output,err := json.Marshal(struct {
Results google.Results
Timeout, Elapsed time.Duration
}{
Results: results,
Timeout: timeout,
Elapsed: elapsed,
})
if err != nil {
log.Print(err)
return
}
w.Write(output)
}
func main() {
http.HandleFunc("/search",handleSearch)
log.Println("start..")
http.ListenAndServe(":8090",nil)
}
userip.go
package userip
import (
"fmt"
"context"
"net"
"net/http"
)
// The key type is unexported to prevent collisions with context keys defined in
// other packages.
type key int
// userIPkey is the context key for the user IP address. Its value of zero is
// arbitrary. If this package defined other context keys, they would have
// different integer values.
const userIPKey key = 0
func FromRequest(req *http.Request) (net.IP, error) {
ip, _, err := net.SplitHostPort(req.RemoteAddr)
if err != nil {
return nil, fmt.Errorf("userip: %q is not IP:port", req.RemoteAddr)
}
return net.ParseIP(ip),nil
}
func NewContext(ctx context.Context, userIP net.IP) context.Context {
return context.WithValue(ctx, userIPKey, userIP)
}
func FromContext(ctx context.Context) (net.IP, bool) {
// ctx.Value returns nil if ctx has no value for the key;
// the net.IP type assertion returns ok=false for nil.
userIP, ok := ctx.Value(userIPKey).(net.IP)
return userIP, ok
}
google.go
package google
import (
"context_test/userip"
"net/http"
"context"
"encoding/json"
)
type Result struct {
Title string
URL string
}
type Results []Result
func Search(ctx context.Context, query string) (Results, error) {
// the Google Search API is nolonger available, so replace it with an example API
req, err := http.NewRequest("GET", "http://blog.cngal.org/context_test.php", nil)
if err != nil {
return nil, err
}
q := req.URL.Query()
q.Set("q", query)
// If ctx is carrying the user IP address, forward it to the server.
// Google APIs use the user IP to distinguish server-initiated requests
// from end-user requests.
if userIP, ok := userip.FromContext(ctx); ok {
q.Set("userip", userIP.String())
}
req.URL.RawQuery = q.Encode()
var results Results
err = httpDo(ctx, req, func(resp *http.Response, err error) error {
if err != nil {
return err
}
defer resp.Body.Close()
// Parse the JSON search result.
var data struct {
ResponseData struct {
Results []struct {
TitleNoFormatting string
URL string
}
}
}
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return err
}
for _, res := range data.ResponseData.Results {
results = append(results, Result{Title: res.TitleNoFormatting, URL: res.URL})
}
return nil
})
// httpDo waits for the closure we provided to return, so it's safe to
// read results here.
return results, err
}
func httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error {
// Run the HTTP request in a goroutine and pass the response to f.
tr := &http.Transport{}
client := &http.Client{Transport: tr}
c := make(chan error, 1)
go func() { c <- f(client.Do(req)) }()
select {
case <-ctx.Done():
tr.CancelRequest(req)
<-c // Wait for f to return.
return ctx.Err()
case err := <-c:
return err
}
}