公司的Riak版本是2.0.4,目前已根據CMDB三級業(yè)務部署了十幾套集群,大部分是跨機房部署。監(jiān)控采集分為兩個大的維度,第一個維度是單機,也就是 「IP:端口」;第二個維度是集群,也就是所有節(jié)點指標的統(tǒng)計結果。本文主要介紹采集的指標和采集程序。

一、采集的指標

1、吞吐量指標

1.1 單機

采集方法:

/usr/sbin/riak-admin status
指標 功能
node_gets 某節(jié)點前一分鐘處理的 GET 請求數量,包括該節(jié)點上非本地虛擬節(jié)點處理的 GET 請求
node_puts 某節(jié)點前一分鐘處理的 PUT 請求數量,包括該節(jié)點上非本地虛擬節(jié)點處理的 PUT 請求

 

 

 

1.2 集群

指標 功能 統(tǒng)計方法
node_gets_total 集群前一分鐘處理的 GET 請求數量 SUM(node_gets)
node_puts_total 集群前一分鐘處理的 PUT 請求數量 SUM(node_puts)

 

 

 

2、延遲指標

2.1 單機

采集方法:

/usr/sbin/riak-admin status
指標 功能
node_get_fsm_time_mean 客戶端發(fā)起 GET 請求到收到響應時間間隔的均值
node_get_fsm_time_median 客戶端發(fā)起 GET 請求到收到響應時間間隔的中值
node_get_fsm_time_95 客戶端發(fā)起 GET 請求到收到響應時間間隔的 95 百分位值
node_get_fsm_time_100 客戶端發(fā)起 GET 請求到收到響應時間間隔的 100 百分位值
node_put_fsm_time_mean 客戶端發(fā)起 PUT 請求到收到響應時間間隔的均值
node_put_fsm_time_median 客戶端發(fā)起 PUT 請求到收到響應時間間隔的中值
node_put_fsm_time_95 客戶端發(fā)起 PUT 請求到收到響應時間間隔的 95 百分位值
node_put_fsm_time_100 客戶端發(fā)起 PUT 請求到收到響應時間間隔的 100 百分位值

 

 

 

 

 

 

 


2.2 集群

指標 功能 統(tǒng)計方法
node_get_fsm_time_mean_avg 客戶端發(fā)起 GET 請求到收到響應時間間隔的均值 AVG(node_get_fsm_time_mean)
node_put_fsm_time_mean_avg 客戶端發(fā)起 PUT 請求到收到響應時間間隔的均值 AVG(node_put_fsm_time_mean)

 

 

 

3、Erlang 資源使用情況指標(單機)

采集方法:

/usr/sbin/riak-admin status
指標 功能
sys_process_count Erlang 進程的數量
memory_processes 分配給 Erlang 進程的內存總量(單位 bytes)
memory_processes_used Erlang 進程使用的內存總量(單位 bytes)

 

 

 


4、Riak 負荷/健康指標

4.1 單機

采集方法:

/usr/sbin/riak-admin status
指標 功能
read_repairs 某節(jié)點前一分鐘處理的讀取修復操作數量
node_get_fsm_siblings_mean 某節(jié)點前一分鐘所有 GET 操作處理的兄弟數據數量均值
node_get_fsm_siblings_median 某節(jié)點前一分鐘所有 GET 操作處理的兄弟數據數量中值
node_get_fsm_siblings_95 某節(jié)點前一分鐘所有 GET 操作處理的兄弟數據數量 95 百分位值
node_get_fsm_siblings_100 某節(jié)點前一分鐘所有 GET 操作處理的兄弟數據數量 100 百分位值
node_get_fsm_objsize_mean 某節(jié)點前一分鐘流經 GET_FSM 的對象大小均值
node_get_fsm_objsize_median 某節(jié)點前一分鐘流經 GET_FSM 的對象大小中值
node_get_fsm_objsize_95 某節(jié)點前一分鐘流經 GET_FSM 的對象大小 95 百分位值
node_get_fsm_objsize_100 某節(jié)點前一分鐘流經 GET_FSM 的對象大小 100 百分位值

 

 

 

 

 

 

 

 


4.2 集群

指標 功能 統(tǒng)計方法
read_repairs_total 集群前一分鐘處理的讀取修復操作數量 SUM(read_repairs)
node_get_fsm_siblings_mean_avg 集群前一分鐘所有 GET 操作處理的兄弟數據數量均值 AVG(node_get_fsm_siblings_mean)
node_get_fsm_objsize_mean_avg 集群前一分鐘流經 GET_FSM 的對象大小均值 AVG(node_get_fsm_objsize_mean)

 

 

 


5、其他

5.1 LevelDB合并錯誤(單機)

采集方法:

find /data1/riak/data/leveldb -name "LOG" -exec grep -l 'Compaction error' {} \; | wc -l

5.2 LevelDB讀取塊操作錯誤(單機)

采集方法:

/usr/sbin/riak-admin status
指標 功能
leveldb_read_block_error LevelDB 讀取塊操作錯誤數量

 

 

5.3 節(jié)點存活狀態(tài)(單機)

采集方法:

/usr/sbin/riak-admin member-status | grep `ifconfig | grep "inet addr:10" | awk -F':' '{print $2}' | awk '{print $1}'`

輸出如下,valid表示節(jié)點正常

valid 9.0%      -- 'riak@10.1.80.114'

5.4 Riak Error Log(單機)

Riak 日志路徑:/data1/riak/logs 
采集文件:/data1/riak/logs/* 采集時間段:最近一分鐘 
采集內容:最近一分鐘發(fā)生的錯誤數 
采集示例:grep error -rn /data1/riak/logs | wc -l 
說明:這個采集需要程序處理下邏輯,在此不給出完整的采集方法

   二、采集程序

1、Riak監(jiān)控系統(tǒng)設計

 DBA通過前臺頁面根據CMDB三級業(yè)務添加/卸載Riak集群監(jiān)控,根據CMDB的ip添加Riak單機監(jiān)控(單機屬于集群,不能單獨存在,可增量添加單機監(jiān)控),填寫ip和端口,配置閾值、負責人等信息

1)數據庫設計

復制代碼
mysql> use riakMonitor
show tabReading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> show tables; +---------------------------+
| Tables_in_riakMonitor     |
+---------------------------+
| riakClusterConf           | 
| riakClusterDisplay        | 
| riakClusterStatus         | 
| riakClusterStatusTemplate | 
| riakSingleConf            | 
| riakSingleDisplay         | 
| riakSingleStatus          | 
| riakSingleStatusTemplate  | 
+---------------------------+ 8 rows in set (0.00 sec)
復制代碼

Template表作為歷史庫表模板,歷史庫按月分庫,按ip分表

2) 單機Agent設計

  • Agent會通過自動調度平臺下發(fā)到目標機器,Crond周期是1分鐘,直接上報到mysql數據庫。運行時間超過45s 會被調度平臺kill
  • 如果檢測不到riak或者命令出錯則會發(fā)送rtx告警給admins + dba, 系統(tǒng)錯誤會發(fā)送給admins

3) 集群匯聚設計

  • 集群數據根據節(jié)點agent上報數據在50s的時候select出當前一分鐘的數據計算匯聚入庫
  • 程序每分鐘都會清除clusterStatus的數據,如果agent在本分鐘上報心跳異?;蛘呱蠄髸r間不在集群程序運行前(50s),cluster則不會統(tǒng)計該ip數據,但平均值計算時的除數會算上該ip(+1)
  • 集群計算同時會寫進歷史庫,并創(chuàng)建歷史表

4) CGI接口設計(NodeJs)

  • 異步接收agent上報的數據,根據redis的ip列表轉換成ip1
  • 如果redis獲取的ip1不存在singleConf表中則會拒絕上報,返回3003錯誤
  • 上報成功會入singleStatus和歷史庫,并創(chuàng)建歷史表

5) 代碼列表

復制代碼
CGI :  /data/riakMonitor   # daemon
agent: /home/opd/script/riakMonitor  #  crond
analyzer: /opdData/opdOnline/script/kmc/riakMonitor/analyzer  # crond 1、從CMDB更新single/cluster conf數據 2、同步conf和display 3、解析status數據到display 4、異常數據寫入 5、告警
riakTool: /opdData/opdOnline/script/kmc/riakMonitor/riakTool  # daemon
每分鐘第50s運行一次 1、獲取監(jiān)控集群和集群的ip,計算結果并匯聚 2、操作redis,將集群數據入歷史庫
復制代碼

2、采集程序部分代碼 (單機,python2.4)

1) 采集指標函數

復制代碼
def getRiakMeta():
    thisFuncName = str(sys._getframe().f_code.co_name)
    cmdStr = "/usr/sbin/riak-admin status" cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr) if 0 != cmdCode:
        msgTxt = "[%s] %s failed" % (thisFuncName, cmdStr)
        logger.error(msgTxt)
        sendRtx(MYCONF.riakAdmins, thisFuncName+" %s Fail:" % cmdStr) return 1 data["node_gets"] = data["node_puts"] = data["node_get_fsm_time_mean"] = data["node_get_fsm_time_median"] = 0
    data["node_get_fsm_time_95"] = data["node_get_fsm_time_100"] = data["node_put_fsm_time_mean"] = 0
    data["node_put_fsm_time_median"] = data["node_put_fsm_time_95"] = data["node_put_fsm_time_100"] = 0
    data["sys_process_count"] = data["memory_processes"] = data["memory_processes_used"] = 0
    data["read_repairs"] = data["node_get_fsm_siblings_mean"] = data["node_get_fsm_siblings_median"] = 0
    data["node_get_fsm_siblings_95"] = data["node_get_fsm_siblings_100"] = data["node_get_fsm_objsize_mean"] = 0
    data["node_get_fsm_objsize_median"] = data["node_get_fsm_objsize_95"] = data["node_get_fsm_objsize_100"] = 0
    data["leveldb_read_block_error"] = 0

    riakItemInfo = cmdStdout.split('\n') for each in riakItemInfo:
        eachInfo = each.split(" : ") if 2 == len(eachInfo):
            itemKey = eachInfo[0]
            itemValue = eachInfo[1].replace('<<"', '').replace('">>', '') if itemKey in data: 
                logger.debug("%s:%s" % (itemKey, itemValue)) try:
                    data[itemKey] = str(round(float(itemValue), 2)) except ValueError:
                    data[itemKey] = itemValue except: raise cmdStr = """ find /data1/riak/data/leveldb -name "LOG" -exec grep -l 'Compaction error' {} \; | wc -l """ cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr) if 0 != cmdCode:
        msgTxt = "[%s] %s failed" % (thisFuncName, cmdStr)
        logger.error(msgTxt)
        sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt) return 1 data["leveldb_compaction_error"] = cmdStdout #不用轉int  cmdStr = "/usr/sbin/riak-admin member-status | grep %s" % data["mainIp"]
    cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr)
    logger.debug(cmdStdout) if 0 != cmdCode:
        msgTxt = "[%s] %s failed" % (thisFuncName, cmdStr)
        logger.error(msgTxt)
        sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt) return 1 if cmdStdout.strip().startswith('valid'):
        data["is_active"] = 1 else:
        data["is_active"] = 0

    data["riak_error_log"] = 0
    riakLogPath = "/data1/riak/logs/" if not os.path.isdir(riakLogPath):
        msgTxt = "[%s] %s not exists" % (thisFuncName, riakLogPath)
        logger.error(msgTxt)
        sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt) return 1 riakLogInfo = os.listdir(riakLogPath)
    reportTimeSec = time.mktime(time.strptime(data["report_time"], "%Y-%m-%d %H:%M:%S")) for each in riakLogInfo:
        logger.debug("fileName: "+each)
        eachFile = os.path.join(riakLogPath, each) if os.path.isfile(eachFile): try:
                eachFd = open(eachFile, 'r') except IOError, e:
                msgTxt = "I/O error({}): {}".format(e.errno, e.strerror)
                logger.error(msgTxt)
                sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt) return 1 else: for eachLine in eachFd: #從頭讀,怕文件太大撐爆內存 if "error" in eachLine: #2016-03-20 04:57:09.704 [info] <0.19012.49>@riak_kv_index_h eachInfo = eachLine.split(' ') try:
                            eachTimeStr = "%s %s" % (eachInfo[0], eachInfo[1][:-4])
                            eachTimeSec = time.mktime(time.strptime(eachTimeStr, "%Y-%m-%d %H:%M:%S")) if reportTimeSec - 60 <= eachTimeSec < reportTimeSec:
                                logger.debug(eachLine)
                                data["riak_error_log"] += 1 elif eachTimeSec >= reportTimeSec: break except:
                            msgTxt = "file(%s) format wrong " % eachFile
                            logger.error(msgTxt) break #sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt) #eachFile.close() #return 1  eachFd.close() return 0
復制代碼

2)  上報和失敗重傳函數

復制代碼
def report2server(content, retry): '''上報到入庫程序,根據ip求余獲取優(yōu)先的server,如果上報失敗會遍歷server列表''' thisFuncName = "" try:
        thisFuncName = str(sys._getframe().f_code.co_name)
        pos = data["ip"] % len(MYCONF.reportServer)
        serverKeys = MYCONF.reportServer.keys()
        serverKeys.sort()
        serverKeys = serverKeys[pos:] + serverKeys[:pos] for serverId in serverKeys:
            cmdStr = "/usr/bin/curl -s --connect-timeout %d -m %d -d '%s&reTry=%d' %s" %(
                    MYCONF.curlConnectTimeout, MYCONF.curlMaxTimeout, content, retry, MYCONF.reportServer[serverId])
            cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr)
            logger.info(cmdStr + "\ncmdCode:" + str(cmdCode) + "\n" + cmdStdout + cmdStderr) if 0 == cmdCode: return 0 return 1 except:
        exceptmsg = StringIO.StringIO()
        traceback.print_exc(file=exceptmsg)
        msgTxt = exceptmsg.getvalue()
        sendRtx(MYCONF.admins, thisFuncName + " Fail:" + msgTxt) return 1 def reportScheduler(reportRecord=0): '''reportRecord = 0 表示上報data中采集的新數據, 
    reportRecord = 1 表示從reportFailFile里面獲取最新的一條數據上報到server,然后需要處理reportFailFile''' thisFuncName = "" try:
        thisFuncName = str(sys._getframe().f_code.co_name) if 1 == reportRecord: # 從上報失敗文件中獲取最后一條數據,上報之 if not reportFail.has_section("index"): #這里不要去add_section("index") 該誰add誰add去 return 0 if not reportFail.has_option("index", "index") or "" == reportFail.get("index", "index").strip(): return 0
            indexVec = MYCONF.splitRe.split(reportFail.get("index", "index").strip())
            index = indexVec[-1] if "" == index:
                msgTxt = reportFail.get("index", "index").strip()
                sendRtx(MYCONF.admins, thisFuncName + "[系統(tǒng)錯誤] index.index 末尾有多余的逗號 " + msgTxt) return 1 if not reportFail.has_option("content", index + "_c") or not reportFail.has_option("content", index + "_t"): # _c 是內容  _t 是重試次數 msgTxt = "content sector 缺少 %s_c 或 %s_t" %(index, index)
                sendRtx(MYCONF.admins, thisFuncName + "[系統(tǒng)錯誤] " + msgTxt) return 1 content = reportFail.get("content", index + "_c")
            retry = reportFail.getint("content", index + "_t")
            retry += 1 code = report2server(content, retry) if 0 == code: # 發(fā)送成功  indexVec.remove(index) if indexVec:
                    reportFail.set("index", "index", ",".join(indexVec)) else:
                    reportFail.set("index", "index", "")
                reportFail.remove_option("content", index + "_c")
                reportFail.remove_option("content", index + "_t") elif retry > MYCONF.maxRetry: # 重發(fā)失敗,且超過最大重試次數  indexVec.remove(index) if indexVec:
                    reportFail.set("index", "index", ",".join(indexVec)) else:
                    reportFail.set("index", "index", "")
                reportFail.remove_option("content", index + "_c")
                reportFail.remove_option("content", index + "_t") else: # 重發(fā)失敗, 更新 _t (retry) 字段 reportFail.set("content", index + "_t", retry) else: # 發(fā)送新數據 index = data["report_time"].replace(" ", "").replace("-", "").replace(":", "")
            content = urllib.urlencode(data)
            retry = 0
            code = report2server(content, retry) if 0 == code: return 0 if not reportFail.has_section("index"):
                reportFail.add_section("index")
                reportFail.set("index", "index", index)
                reportFail.add_section("content")
                reportFail.set("content", index + "_c", content)
                reportFail.set("content", index + "_t", retry) else:
                indexVec = MYCONF.splitRe.split(reportFail.get("index", "index").strip())
                indexVec.append(index) if len(indexVec) > MYCONF.maxFailRecord: # 超過最大 fail record 數 reportFail.set("index", "index", ",".join(indexVec[len(indexVec) - MYCONF.maxFailRecord:]))
                    reportFail.set("content", index + "_c", content)
                    reportFail.set("content", index + "_t", retry) for i in range(0, len(indexVec) - MYCONF.maxFailRecord):
                        delIndex = indexVec[i]
                        reportFail.remove_option("content", delIndex + "_c")
                        reportFail.remove_option("content", delIndex + "_t") else:
                    reportFail.set("index", "index", ",".join(indexVec))
                    reportFail.set("content", index + "_c", content)
                    reportFail.set("content", index + "_t", retry) return 0 except:
        exceptmsg = StringIO.StringIO()
        traceback.print_exc(file=exceptmsg)
        msgTxt = exceptmsg.getvalue()
        sendRtx(MYCONF.admins, thisFuncName + " Fail:" + msgTxt) return 1
復制代碼

3) 獲取shell命令輸出函數

復制代碼
def getCmdResult(cmdStr): '''獲取shell命令的返回碼,標準輸出,標準錯誤''' #child = subprocess.Popen(cmdStr, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) #cmdStdout, cmdStderr = child.communicate() #cmdCode = child.wait()  #return (cmdCode, cmdStdout, cmdStderr) thisFuncName = str(sys._getframe().f_code.co_name)
    nowTime = int(time.time())
    tmpstdout = os.path.join(MYCONF.basePath, "cmd.stdout.%d.tmp" % nowTime)
    tmpstderr = os.path.join(MYCONF.basePath, "cmd.stderr.%d.tmp" % nowTime) if "debug" == MYCONF.role:
        msgTxt = "[%d]Run Cmd: %s" % (nowTime, cmdStr)
        logger.debug(msgTxt)

    cmdStr = "(%s) 1>%s 2>%s" %(cmdStr, tmpstdout, tmpstderr)
    cmdCode = os.system(cmdStr) >> 8 cdmStdout = cmdStderr = "" try:
        outfd = open(tmpstdout)
        cmdStdout = outfd.read()
        errfd = open(tmpstderr)
        cmdStderr = errfd.read() except:
        exceptmsg = StringIO.StringIO()
        traceback.print_exc(file=exceptmsg)
        msgTxt = exceptmsg.getvalue()
        sendRtx(MYCONF.admins, thisFuncName + " Fail:" + msgTxt)
        cmdCode = 110 else:
        outfd.close()
        errfd.close()
        os.remove(tmpstderr)
        os.remove(tmpstdout) return (cmdCode, cmdStdout, cmdStderr)
復制代碼

4) 讀/寫Cache函數

復制代碼
def readLastCache(): global lastCache
    lastCache = ConfigParser.ConfigParser() if not os.path.isfile(MYCONF.lastCacheFile): try:
            fd = open(MYCONF.lastCacheFile, "w") except IOError, e:
            logger.error("I/O error({}): {}".format(e.errno, e.strerror)) return 1 else:
            fd.close()
    lastCache.readfp(open(MYCONF.lastCacheFile), "rb") return 0 def writeCache():
    thisFuncName = "" try:
        thisFuncName = str(sys._getframe().f_code.co_name)
        lastCache.write(open(MYCONF.lastCacheFile, 'w')) return 0 except:
        exceptmsg = StringIO.StringIO()
        traceback.print_exc(file=exceptmsg)
        msgTxt = exceptmsg.getvalue()
        logger.error(msgTxt) return 1
復制代碼

5)  讀/寫失敗記錄

復制代碼
def readFailRecord(): global reportFail
    reportFail = ConfigParser.ConfigParser() if not os.path.isfile(MYCONF.lastReportFailFile): try:
            fd = open(MYCONF.lastReportFailFile, "w") except IOError, e:
            logger.error("I/O error({}): {}".format(e.errno, e.strerror)) return 1 else:
            fd.close()
    reportFail.readfp(open(MYCONF.lastReportFailFile), "rb") return 0 def writeFailRecord():
    thisFuncName = "" try:
        thisFuncName = str(sys._getframe().f_code.co_name)
        reportFail.write(open(MYCONF.lastReportFailFile, 'w')) return 0 except:
        exceptmsg = StringIO.StringIO()
        traceback.print_exc(file=exceptmsg)
        msgTxt = exceptmsg.getvalue()
        logger.error(msgTxt) return 1
復制代碼

6) main函數

復制代碼
def main():
    data["osType"] = 0 # 0表示 linux data["version"] = MYCONF.version # 當前程序的自定義版本號 data["report_time"] = time.strftime("%Y-%m-%d %H:%M:00") #上報時間,由于目前基礎監(jiān)控是分鐘級監(jiān)控粒度,因此秒取 00  initLog()
    logger.info('='*80) if 0 == checkLastPid() and 0 == readLastCache() and 0 == getLoginIp():
        readFailRecord() # 讀取早遷采集周期上報失敗,需要重傳的數據 reportScheduler(reportRecord=1) #從 fail record 中選取最近的一條信息上報給服務器 if 0 == getRiakMeta():
            reportScheduler(reportRecord=0)
        writeFailRecord()
        writeCache()
    logger.info('='*80)
    logging.shutdown() return 
復制代碼

3、添加/卸載監(jiān)控

1) 添加監(jiān)控

添加監(jiān)控需要先添加集群(不支持先添加IP),添加集群會默認把所有IP都添加監(jiān)控(前臺將在clusterConf新增記錄,并在singleConf增加對應的ip記錄,然后調用調度平臺,檢測ip是否已經安裝)如果該集群在CMDB里面新增Ip,則需要手動添加監(jiān)控(前臺提供新增監(jiān)控節(jié)點,插入singleConf)

2) 卸載監(jiān)控

復制代碼
(1) 卸載監(jiān)控可以卸載整個集群的監(jiān)控(將clusterConf needMonitor置0,同步將singleConf的needMonitor都置0,然后調用
調度平臺
卸載集群下的所有機器,如果該ip存在其他集群并且需要監(jiān)控,則不用調用
調度平臺
卸載)也可以卸載單個節(jié)點的監(jiān)控(前臺將singleConf的needMonitor置0,調用
調度平臺
,同樣判斷ip是否存在其他集群) (2) 添加卸載監(jiān)控部由前臺調用
調度平臺
接口,并修改數據庫(插入數據或者更新need_monitor) (3) Single/cluster dislplay表會同步conf表的數據,只保留need_monitor=1的數據
復制代碼

4、CMDB數據同步

后臺一直同步CMDB的數據和conf表的數據,如果不在CMDB的則需要刪掉conf里面的數據,不管needMonitor的值為多少。刪除三級業(yè)務的話只需要刪除clusterConf表對應的記錄,single會自動同步外鍵(嘗試調用
調度平臺
卸載接口,卸載掉被刪除的三級業(yè)務ID下面的所有已安裝監(jiān)控的IP)

5、前臺展示

1) 集群狀態(tài)展示

 

2) 單機節(jié)點狀態(tài)展示