寫在前面
最近一年來,我都在做公司的RTB廣告系統(tǒng),包括SSP曝光服務(wù),ADX服務(wù)和DSP系統(tǒng)。因?yàn)槭堑谝淮卧诠居肎o語言實(shí)現(xiàn)這么一個大的系統(tǒng),中間因?yàn)楦鞣N原因造了很多輪子。現(xiàn)在稍微有點(diǎn)時間,覺著有必要總結(jié)這一年來用Go造輪子的經(jīng)驗(yàn)和不足。
集群中遇到的配置文件管理問題
RTB廣告系統(tǒng)中涉及到的服務(wù)程序并不算很多,但是因?yàn)镽TB系統(tǒng)會面臨很多的流量,而且為了確保可用性,最基本的就是多實(shí)例組成集群,同時考慮到后續(xù)業(yè)務(wù)增長,集群的擴(kuò)縮容也是要做的。我們在設(shè)計的時候,基于ZoooKeeper做了服務(wù)發(fā)現(xiàn),而我們的服務(wù)接入依靠Nginx集群,然后通過反向代理把請求負(fù)載均衡到不同的服務(wù)實(shí)例中。這里就存在以下問題:
當(dāng)我們升級某個服務(wù)時,如何通知Nginx集群自動的摘除或者添加該服務(wù)實(shí)例,保證我們的升級不會影響到業(yè)務(wù)和用戶體驗(yàn)
進(jìn)一步,任意一個服務(wù)集群內(nèi)的配置數(shù)據(jù)該如何自動更新和應(yīng)用?
業(yè)界方案
業(yè)界其實(shí)有很多成熟的方案解決這類問題:
比如開源項(xiàng)目consul-template,但是這個工具只支持后端consul,而我們用的是ZooKeeper
再比如confd,可以支持多種后端,比如etcd或者zookeeper,但是它用的ZooKeeper客戶端不支持在故障時對業(yè)務(wù)請求進(jìn)行重試,比如發(fā)起了一個GetW請求,而Session變成超時狀態(tài),這個時候GetW返回的Channel就不可用了,只能重新發(fā)起請求,但是重試多少次請求其實(shí)是不知道的,針對這個情況,我還在項(xiàng)目一開始的時候?qū)崿F(xiàn)了新的包,加入了對業(yè)務(wù)層透明的重試機(jī)制。
整體工作流程
解析模版,獲取要動態(tài)查詢的節(jié)點(diǎn)
向指定的服務(wù)器,比如ZooKeeper發(fā)起查詢請求,并觀察指定節(jié)點(diǎn)的變化
當(dāng)?shù)谝淮位蛘吖?jié)點(diǎn)發(fā)生變更后,查詢最新數(shù)據(jù)
把最新數(shù)據(jù)應(yīng)用到模版中生成新的配置文件數(shù)據(jù)
保存最新的配置文件數(shù)據(jù)到目標(biāo)路徑,并調(diào)用指定的命令應(yīng)用最新的配置文件
實(shí)現(xiàn)
模版機(jī)制
Go官方標(biāo)準(zhǔn)庫提供了Template包,支持if, range等控制語句,也支持用戶自定義方法。模版機(jī)制的方便之處在于,它本身是一種DSL,也算是一種支持計算的超級printf。舉例如下:
package mainimport ( "os" "text/template")var tplContent = ` {{range service "apiGateWay" }} server {{.Name}} {{.ID}} {{.Address}} {{end}}`type ApiGateWayService struct { Name string ID string Address string}func main() { tmpl, err := template.New("test.template").Funcs(template.FuncMap{ "service": func(serviceName string) []ApiGateWayService { return []ApiGateWayService{ { Name: "test", ID: "1", Address: "192.168.1.100:9200", }, } }, }).Parse(tplContent) if err != nil { panic(err) } tmpl.Execute(os.Stdout, nil)、 }
上面的代碼實(shí)際上是做了類似這樣的過程,為了簡單描述,我還是直接寫一段GO代碼
package mainimport ( "fmt" "os")type ApiGateWayService struct { Name string ID string Address string}func service(serviceName string) []ApiGateWayService { return []ApiGateWayService{ { Name: "test", ID: "1", Address: "192.168.1.100:9200", }, } }func main() { for _, v := range service("apiGateWay") { fmt.Fprintf(os.Stdout, "server %s %s %s", v.Name, v.ID, v.Address) } }
解析和渲染模版
上面的代碼中,模版內(nèi)容如下:
var tplContent = ` {{range service "apiGateWay" }} server {{.Name}} {{.ID}} {{.Address}} {{end}}`
當(dāng)我們拿到這么一個模板時,我們是不知道它是不是合法的,也許有語法錯誤,所以得先需要校驗(yàn)。這個我們可以調(diào)用template.Parse方法進(jìn)行解析、校驗(yàn)語法。
當(dāng)語法沒有問題時,我們就開始進(jìn)行渲染。在我們這個示例中,模版引擎會在渲染時調(diào)用service方法并對其返回結(jié)果進(jìn)行循環(huán)處理,然后輸出相應(yīng)的數(shù)據(jù)到一個io.Writer中。service方法主要功能是根據(jù)傳入的服務(wù)名,去ZK中查詢相應(yīng)的節(jié)點(diǎn)的所有子節(jié)點(diǎn)的數(shù)據(jù),然后返回相應(yīng)的數(shù)據(jù)。
當(dāng)我們程序第一次運(yùn)行時,實(shí)際上我們還沒準(zhǔn)備好指定服務(wù)的數(shù)據(jù),但是我們至少在這一次知道了它要查詢哪個服務(wù)的數(shù)據(jù),并且我們這時可以啟動goroutine去后臺輪詢查詢數(shù)據(jù),并把這些數(shù)據(jù)放入到緩存中
那么當(dāng)我們之后再渲染時,就可以直接緩存中查詢指定服務(wù)的所有實(shí)例(也就是子節(jié)點(diǎn))的數(shù)據(jù),然后就可以渲染出最終想要的配置文件數(shù)據(jù)。之后就可以保存了。
整個代碼實(shí)現(xiàn)中略微復(fù)雜,其中的核心既不是如何緩存,也不是后臺如何查詢,而是要記錄下未知的服務(wù)名,以及假如已經(jīng)知道了服務(wù)名并且緩存了數(shù)據(jù),如何從緩存中查詢數(shù)據(jù),這個過程還是拿service方法舉例,代碼如下:
func serviceFunc(tracker *DataTracker, used, missing map[string]dependency.Dependency) func(...string) ([]dependency.Service, error) { return func(s ...string) ([]dependency.Service, error) { var r []dependency.Service if len(s) == 0 || s[0] == "" { return r, nil } d, err := dependency.ParseService(s...) if err != nil { return nil, err } addDependency(used, d) data, ok := tracker.Get(d) if ok { return data.([]dependency.Service), nil } addDependency(missing, d) return r, nil } }
其中tracker表示的是緩存對象,used和missing是一個map,其中value類型是dependency.Dependency, key為一個Dependency的HashCode,也就是一個唯一身份標(biāo)識。
Dependency是一個接口,主要用于查詢數(shù)據(jù),是一個阻塞過程。serviceFunc返回一個lambda,內(nèi)部主要主要的事情是記錄哪個Dependency對象用到了,并且嘗試從tracker中查詢緩存數(shù)據(jù),如果有就返回,沒有更新missing,記錄Dependency對象沒有相應(yīng)的緩存數(shù)據(jù),然后返回空數(shù)據(jù)。
數(shù)據(jù)查詢
當(dāng)我們完成了解析和渲染過程后,我們要檢查渲染過程中記錄的missing是否不為空,如果不空,就需要發(fā)起后臺查詢進(jìn)行處理,大體過程如下:
if len(missing) > 0 { for _, v := range missing { if !r.watcher.Watching(v) { r.watcher.Add(v) log.Debug("try to watch dependency", v.HashCode()) } } continue}
其中每個Dependency對象都有一個Fetch方法,當(dāng)通過Watcher.Add方法時,就會啟動一個獨(dú)立的goroutine負(fù)責(zé)調(diào)用Dependency.Fetch方法,然后通過channel把Dependency.Fetch的結(jié)果轉(zhuǎn)給DataTracker進(jìn)行緩存。
渲染結(jié)果保存
當(dāng)渲染模板成功后,就得到了一個新的配置文件數(shù)據(jù)。首先我們要檢查生成的配置文件是否和原有的配置文件有差異,沒有差異的就不需要保存了。
其次在保存時,有可能出現(xiàn)任何以外,為了確保出現(xiàn)以外時,能夠恢復(fù)回來,我們需要對原有的配置文件進(jìn)行備份。同時,我們?yōu)榱舜_保新的配置文件能夠落入到磁盤上,每次寫入文件后都調(diào)用Sync方法強(qiáng)制刷新到磁盤上。主要實(shí)現(xiàn)如下:
func atomicWrite(path string, contents []byte, perms os.FileMode, backup bool) error { parent := filepath.Dir(path)if _, err := os.Stat(parent); os.IsNotExist(err) { if err := os.MkdirAll(parent, 0755); err != nil { return err } } f, err := ioutil.TempFile(parent, "")if err != nil { return err }defer os.Remove(f.Name())if _, err := f.Write(contents); err != nil { return err }if err := f.Sync(); err != nil { return err }if err := f.Close(); err != nil { return err }if err := os.Chmod(f.Name(), perms); err != nil { return err }if backup { if _, err := os.Stat(path); !os.IsNotExist(err) { if err := copyFile(path, path+".bak"); err != nil { return err } } }if err := os.Rename(f.Name(), path); err != nil { return err }return nil}
命令執(zhí)行
當(dāng)保存好文件后,我們需要調(diào)用指定的命令,通知相應(yīng)的程序加載最新的配置,主要代碼如下:
func execute(command string, timeout time.Duration) error { var shell, flag string if runtime.GOOS == "windows" { shell, flag = "cmd", "/C" } else { shell, flag = "/bin/sh", "-c" } cmd := exec.Command(shell, flag, command) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if err := cmd.Start(); err != nil { return err } done := make(chan error, 1) go func() { done <- cmd.Wait() }() select { case <-time.After(timeout): if cmd.Process != nil { if err := cmd.Process.Kill(); err != nil { return fmt.Errorf("failed to kill %q in %s: %s", command, timeout, err) } } <-done // Allow the goroutine to finish return fmt.Errorf( "command %q\n"+ "did not return for %s - if your command does not return, please\n"+ "make sure to background it", command, timeout) case err := <-done: return err } }
總結(jié)
這個程序目前已經(jīng)有一年之久,現(xiàn)在回顧頭來看,幸好還記得當(dāng)初的決策原因。通過這個程序,對模版的使用倒是掌握了很多。目前這個輪子功能還較為簡單,僅僅實(shí)現(xiàn)了一個service方法,但是dependency包是獨(dú)立抽象的,可以支持任意的存儲類型,比如Consul。目前線上運(yùn)行穩(wěn)定。
這個工具在Nginx配置管理中,較為方便,當(dāng)然在一些流量較大的場景中,如果后端服務(wù)實(shí)例較多,擴(kuò)縮容時會帶來較大的波動,這點(diǎn)可以參考微博團(tuán)隊(duì)的Upsync:微博開源基于Nginx容器動態(tài)流量管理方案
但是我們沒有采用這個方案,考慮到的是基于模版的更新機(jī)制更為簡單,支持更多的服務(wù),普適性更強(qiáng),整體也更容易維護(hù)。
http://www.cnblogs.com/concurrency/p/6880276.html