寫在前面

最近一年來,我都在做公司的RTB廣告系統(tǒng),包括SSP曝光服務(wù),ADX服務(wù)和DSP系統(tǒng)。因?yàn)槭堑谝淮卧诠居肎o語言實(shí)現(xiàn)這么一個大的系統(tǒng),中間因?yàn)楦鞣N原因造了很多輪子。現(xiàn)在稍微有點(diǎn)時間,覺著有必要總結(jié)這一年來用Go造輪子的經(jīng)驗(yàn)和不足。

集群中遇到的配置文件管理問題

RTB廣告系統(tǒng)中涉及到的服務(wù)程序并不算很多,但是因?yàn)镽TB系統(tǒng)會面臨很多的流量,而且為了確保可用性,最基本的就是多實(shí)例組成集群,同時考慮到后續(xù)業(yè)務(wù)增長,集群的擴(kuò)縮容也是要做的。我們在設(shè)計的時候,基于ZoooKeeper做了服務(wù)發(fā)現(xiàn),而我們的服務(wù)接入依靠Nginx集群,然后通過反向代理把請求負(fù)載均衡到不同的服務(wù)實(shí)例中。這里就存在以下問題:

  • 當(dāng)我們升級某個服務(wù)時,如何通知Nginx集群自動的摘除或者添加該服務(wù)實(shí)例,保證我們的升級不會影響到業(yè)務(wù)和用戶體驗(yàn)

  • 進(jìn)一步,任意一個服務(wù)集群內(nèi)的配置數(shù)據(jù)該如何自動更新和應(yīng)用?

業(yè)界方案

業(yè)界其實(shí)有很多成熟的方案解決這類問題:

  • 比如開源項(xiàng)目consul-template,但是這個工具只支持后端consul,而我們用的是ZooKeeper

  • 再比如confd,可以支持多種后端,比如etcd或者zookeeper,但是它用的ZooKeeper客戶端不支持在故障時對業(yè)務(wù)請求進(jìn)行重試,比如發(fā)起了一個GetW請求,而Session變成超時狀態(tài),這個時候GetW返回的Channel就不可用了,只能重新發(fā)起請求,但是重試多少次請求其實(shí)是不知道的,針對這個情況,我還在項(xiàng)目一開始的時候?qū)崿F(xiàn)了新的包,加入了對業(yè)務(wù)層透明的重試機(jī)制。

整體工作流程

  1. 解析模版,獲取要動態(tài)查詢的節(jié)點(diǎn)

  2. 向指定的服務(wù)器,比如ZooKeeper發(fā)起查詢請求,并觀察指定節(jié)點(diǎn)的變化

  3. 當(dāng)?shù)谝淮位蛘吖?jié)點(diǎn)發(fā)生變更后,查詢最新數(shù)據(jù)

  4. 把最新數(shù)據(jù)應(yīng)用到模版中生成新的配置文件數(shù)據(jù)

  5. 保存最新的配置文件數(shù)據(jù)到目標(biāo)路徑,并調(diào)用指定的命令應(yīng)用最新的配置文件

實(shí)現(xiàn)

模版機(jī)制

Go官方標(biāo)準(zhǔn)庫提供了Template包,支持if, range等控制語句,也支持用戶自定義方法。模版機(jī)制的方便之處在于,它本身是一種DSL,也算是一種支持計算的超級printf。舉例如下:

package mainimport (    "os"
    "text/template")var tplContent = `    {{range service "apiGateWay" }}        server {{.Name}} {{.ID}} {{.Address}}    {{end}}`type ApiGateWayService struct {
    Name    string
    ID      string
    Address string}func main() {
    tmpl, err := template.New("test.template").Funcs(template.FuncMap{        "service": func(serviceName string) []ApiGateWayService {            return []ApiGateWayService{
                {
                    Name:    "test",
                    ID:      "1",
                    Address: "192.168.1.100:9200",
                },
            }
        },
    }).Parse(tplContent)    if err != nil {        panic(err)
    }

    tmpl.Execute(os.Stdout, nil)、
}

上面的代碼實(shí)際上是做了類似這樣的過程,為了簡單描述,我還是直接寫一段GO代碼

package mainimport (    "fmt"
    "os")type ApiGateWayService struct {
    Name    string
    ID      string
    Address string}func service(serviceName string) []ApiGateWayService {    return []ApiGateWayService{
        {
            Name:    "test",
            ID:      "1",
            Address: "192.168.1.100:9200",
        },
    }
}func main() {    for _, v := range service("apiGateWay") {
        fmt.Fprintf(os.Stdout, "server %s %s %s", v.Name, v.ID, v.Address)
    }
}

解析和渲染模版

上面的代碼中,模版內(nèi)容如下:

var tplContent = `    {{range service "apiGateWay" }}        server {{.Name}} {{.ID}} {{.Address}}    {{end}}`
  • 當(dāng)我們拿到這么一個模板時,我們是不知道它是不是合法的,也許有語法錯誤,所以得先需要校驗(yàn)。這個我們可以調(diào)用template.Parse方法進(jìn)行解析、校驗(yàn)語法。

  • 當(dāng)語法沒有問題時,我們就開始進(jìn)行渲染。在我們這個示例中,模版引擎會在渲染時調(diào)用service方法并對其返回結(jié)果進(jìn)行循環(huán)處理,然后輸出相應(yīng)的數(shù)據(jù)到一個io.Writer中。service方法主要功能是根據(jù)傳入的服務(wù)名,去ZK中查詢相應(yīng)的節(jié)點(diǎn)的所有子節(jié)點(diǎn)的數(shù)據(jù),然后返回相應(yīng)的數(shù)據(jù)。

    1. 當(dāng)我們程序第一次運(yùn)行時,實(shí)際上我們還沒準(zhǔn)備好指定服務(wù)的數(shù)據(jù),但是我們至少在這一次知道了它要查詢哪個服務(wù)的數(shù)據(jù),并且我們這時可以啟動goroutine去后臺輪詢查詢數(shù)據(jù),并把這些數(shù)據(jù)放入到緩存中

    2. 那么當(dāng)我們之后再渲染時,就可以直接緩存中查詢指定服務(wù)的所有實(shí)例(也就是子節(jié)點(diǎn))的數(shù)據(jù),然后就可以渲染出最終想要的配置文件數(shù)據(jù)。之后就可以保存了。

整個代碼實(shí)現(xiàn)中略微復(fù)雜,其中的核心既不是如何緩存,也不是后臺如何查詢,而是要記錄下未知的服務(wù)名,以及假如已經(jīng)知道了服務(wù)名并且緩存了數(shù)據(jù),如何從緩存中查詢數(shù)據(jù),這個過程還是拿service方法舉例,代碼如下:

func serviceFunc(tracker *DataTracker, used, missing map[string]dependency.Dependency) func(...string) ([]dependency.Service, error) {    return func(s ...string) ([]dependency.Service, error) {        var r []dependency.Service        if len(s) == 0 || s[0] == "" {            return r, nil
        }

        d, err := dependency.ParseService(s...)        if err != nil {            return nil, err
        }

        addDependency(used, d)

        data, ok := tracker.Get(d)        if ok {            return data.([]dependency.Service), nil
        }

        addDependency(missing, d)        return r, nil
    }
}

其中tracker表示的是緩存對象,used和missing是一個map,其中value類型是dependency.Dependency, key為一個Dependency的HashCode,也就是一個唯一身份標(biāo)識。
Dependency是一個接口,主要用于查詢數(shù)據(jù),是一個阻塞過程。serviceFunc返回一個lambda,內(nèi)部主要主要的事情是記錄哪個Dependency對象用到了,并且嘗試從tracker中查詢緩存數(shù)據(jù),如果有就返回,沒有更新missing,記錄Dependency對象沒有相應(yīng)的緩存數(shù)據(jù),然后返回空數(shù)據(jù)。

數(shù)據(jù)查詢

當(dāng)我們完成了解析和渲染過程后,我們要檢查渲染過程中記錄的missing是否不為空,如果不空,就需要發(fā)起后臺查詢進(jìn)行處理,大體過程如下:

if len(missing) > 0 {    for _, v := range missing {        if !r.watcher.Watching(v) {
            r.watcher.Add(v)
            log.Debug("try to watch dependency", v.HashCode())
        }
    }    continue}

其中每個Dependency對象都有一個Fetch方法,當(dāng)通過Watcher.Add方法時,就會啟動一個獨(dú)立的goroutine負(fù)責(zé)調(diào)用Dependency.Fetch方法,然后通過channel把Dependency.Fetch的結(jié)果轉(zhuǎn)給DataTracker進(jìn)行緩存。

渲染結(jié)果保存

  • 當(dāng)渲染模板成功后,就得到了一個新的配置文件數(shù)據(jù)。首先我們要檢查生成的配置文件是否和原有的配置文件有差異,沒有差異的就不需要保存了。

  • 其次在保存時,有可能出現(xiàn)任何以外,為了確保出現(xiàn)以外時,能夠恢復(fù)回來,我們需要對原有的配置文件進(jìn)行備份。同時,我們?yōu)榱舜_保新的配置文件能夠落入到磁盤上,每次寫入文件后都調(diào)用Sync方法強(qiáng)制刷新到磁盤上。主要實(shí)現(xiàn)如下:

    func atomicWrite(path string, contents []byte, perms os.FileMode, backup bool) error {
    parent := filepath.Dir(path)if _, err := os.Stat(parent); os.IsNotExist(err) {    if err := os.MkdirAll(parent, 0755); err != nil {        return err
        }
    }
    
    f, err := ioutil.TempFile(parent, "")if err != nil {    return err
    }defer os.Remove(f.Name())if _, err := f.Write(contents); err != nil {    return err
    }if err := f.Sync(); err != nil {    return err
    }if err := f.Close(); err != nil {    return err
    }if err := os.Chmod(f.Name(), perms); err != nil {    return err
    }if backup {    if _, err := os.Stat(path); !os.IsNotExist(err) {        if err := copyFile(path, path+".bak"); err != nil {            return err
            }
        }
    }if err := os.Rename(f.Name(), path); err != nil {    return err
    }return nil}

命令執(zhí)行

當(dāng)保存好文件后,我們需要調(diào)用指定的命令,通知相應(yīng)的程序加載最新的配置,主要代碼如下:

func execute(command string, timeout time.Duration) error {    var shell, flag string
    if runtime.GOOS == "windows" {
        shell, flag = "cmd", "/C"
    } else {
        shell, flag = "/bin/sh", "-c"
    }

    cmd := exec.Command(shell, flag, command)
    cmd.Stdout = os.Stdout
    cmd.Stderr = os.Stderr    if err := cmd.Start(); err != nil {        return err
    }

    done := make(chan error, 1)    go func() {
        done <- cmd.Wait()
    }()    select {    case <-time.After(timeout):        if cmd.Process != nil {            if err := cmd.Process.Kill(); err != nil {                return fmt.Errorf("failed to kill %q in %s: %s", command, timeout, err)
            }
        }
        <-done // Allow the goroutine to finish
        return fmt.Errorf(            "command %q\n"+                "did not return for %s - if your command does not return, please\n"+                "make sure to background it",
            command, timeout)    case err := <-done:        return err
    }
}

總結(jié)

這個程序目前已經(jīng)有一年之久,現(xiàn)在回顧頭來看,幸好還記得當(dāng)初的決策原因。通過這個程序,對模版的使用倒是掌握了很多。目前這個輪子功能還較為簡單,僅僅實(shí)現(xiàn)了一個service方法,但是dependency包是獨(dú)立抽象的,可以支持任意的存儲類型,比如Consul。目前線上運(yùn)行穩(wěn)定。

這個工具在Nginx配置管理中,較為方便,當(dāng)然在一些流量較大的場景中,如果后端服務(wù)實(shí)例較多,擴(kuò)縮容時會帶來較大的波動,這點(diǎn)可以參考微博團(tuán)隊(duì)的Upsync:微博開源基于Nginx容器動態(tài)流量管理方案
但是我們沒有采用這個方案,考慮到的是基于模版的更新機(jī)制更為簡單,支持更多的服務(wù),普適性更強(qiáng),整體也更容易維護(hù)。

http://www.cnblogs.com/concurrency/p/6880276.html