websocket|go websocket 群组聊天+基于数据大屏推送技术

参考:http://www.oschina.net/translate/build-a-realtime-chat-server-with-go-and-websockets
基于以上的基础上做了自动推送数据部分
程序主入口增加定时任务:

/** *程序入口 */ func main() { //开启一个web scoket server fs := http.FileServer(http.Dir("../public")) http.Handle("/", fs) http.HandleFunc("/ws", handleConnections) go handleMessages() go cronTask() log.Println("http server started on: 30080") err := http.ListenAndServe(":30080", nil) if err != nil { log.Fatal("ListenAndServe: ", err) } }

定时任务每五秒从hdfs namenode 监控页面抓取指标数据
/** *定时任务 推送消息 */ func cronTask() { c := cron.New() defer c.Stop() spec := "*/5 * * * * ?" c.AddFunc(spec, func() { logDebug("cron running...") syncHDFSMetricInfo() }) c.Start()select {} }

抓取数据操作:
/** *同步HDFS指标信息 */ func syncHDFSMetricInfo() { res := getHDFSInfoFromNameNode() var data map[string]interface{} if err := json.Unmarshal([]byte(res), &data); err == nil { beans := data["beans"].([]interface{}) bean := beans[0] beanMap := bean.(map[string]interface{}) capacityTotal := beanMap["CapacityTotal"]d := dataFilter(capacityTotal.(float64)) //value, _ := fmt.Printf("%.1f", d["value"]) value := d["value"].(float64) unit := d["unit"].(string) syncMetricInfo("hdfs-go", "capacityTotal-go", float64(value), unit) //消息写入channel fmt.Println("start...") msg := Message{"12070162xx@qq.com", "system", "系统自动推送消息..."} fmt.Println(msg) broadcast <- msg fmt.Println("end...") }}

处理错误信息方法:
/** *处理错误信息 */ func check(e error) { if e != nil { panic(e) } }

检查日志文件是否存在方法:
/** *判断文件是否存在 存在返回true 不存在返回 fasle */ func checkFileIsExist(filename string) bool { var exist = true if _, err := os.Stat(filename); os.IsNotExist(err) { exist = false } return exist }

记录日志通用方法:
/** *写日志方法 */ func logWrite(content string, fileName string) { var f *os.File var err1 error if checkFileIsExist(fileName) { //如果文件存在 f, err1 = os.OpenFile(fileName, os.O_APPEND|os.O_WRONLY, 0666) //打开文件 } else { f, err1 = os.Create(fileName) //创建文件 } check(err1) _, err2 := io.WriteString(f, content+"\n") //写入文件(字符串) check(err2) f.Close() }

调试日志记录方法:
/** *调试日志 */ func logDebug(content string) { var date = time.Now().Format("2006-01-02") logName := "debug-" + date + ".log" logWrite(content, logName) }

错误日志记录方法:
/** *错误日志 */ func logError(content string) { var date = time.Now().Format("2006-01-02") logName := "error-" + date + ".log" logWrite(content, logName) }

数据抓取方法:
/** *从hdfs namenode监控页面获取存储信息 */ func getHDFSInfoFromNameNode() string { url := "http://tempt13.ops.lycc.qihoo.net:50070/jmx?qry=Hadoop:service=NameNode,name=FSNamesystemState" logDebug("get hdfs info url: " + url)resp, err := http.Get(url) //改送HTTP Get请求 if err != nil { logError("get hdfs info namenode error: " + err.Error()) return "" }if resp.Body != nil { defer resp.Body.Close() }if resp.StatusCode != http.StatusOK { logError(errors.New(resp.Status).Error()) return "" }data, err := ioutil.ReadAll(resp.Body) logDebug("get hdfs info namenode data: " + string(data)) return string(data) }

数据格式化方法(其实就是找到合适的单位)
/** *Bytes 数据转成合适单位 */ func dataFilter(value float64) map[string]interface{} { a := [6]string{"B", "K", "M", "G", "T", "P"} index := 0 for value >= 1024 { if value > 1024 { value = https://www.it610.com/article/value / 1024 index = index + 1 } } var m = make(map[string]interface{}) m["unit"] = a[index] m["value"] = value return m }


数据库指标数据是否存在方法:
/** *获取指标数据 */ func isMetricInfoExist(appId string, metricName string) bool { db, err := sql.Open("mysql", "root:792a1d086a2b0a5d@tcp(127.0.0.1:3306)/db_scgd?charset=utf8") sql := "select * from scgd_metric_record where app_id= '" + appId + "' and metric_name= '" + metricName + "'" rows, err := db.Query(sql) if err != nil { logError("get metric info from mysql error: " + err.Error()) } db.Close() columns, err := rows.Columns() if err != nil { logError("get rows column error: " + err.Error()) } count := len(columns) tableData := make([]map[string]interface{}, 0) values := make([]interface{}, count) valuePtrs := make([]interface{}, count) for rows.Next() { for i := 0; i < count; i++ { valuePtrs[i] = &values[i] } rows.Scan(valuePtrs...) entry := make(map[string]interface{}) for i, col := range columns { var v interface{} val := values[i] b, ok := val.([]byte) if ok { v = string(b) } else { v = val } entry[col] = v } tableData = https://www.it610.com/article/append(tableData, entry) return true } return false }


新增指标数据方法:
/** *新增指标 */ func addMetricInfo(appId string, metricName string, metricValue float64, unit string) int64 { db, err := sql.Open("mysql", "root:792a1d086a2b0a5d@tcp(127.0.0.1:3306)/db_scgd?charset=utf8") stmt, err := db.Prepare(`insert into scgd_metric_record (metric_name, app_id, units, metric_value) values (?, ?, ?, ?)`) if err != nil { logError("add metric info prepare error: " + err.Error()) } res, err := stmt.Exec(metricName, appId, unit, metricValue) if err != nil { logError("add metric info to mysql error: " + err.Error()) } id, err := res.LastInsertId() if err != nil { logError("get last insert id error: " + err.Error()) } stmt.Close() db.Close() return id }

更新指标数据方法:
/** *更新指标 */ func updateMetricInfo(appId string, metricName string, metricValue float64, unit string) int64 { fmt.Println("here") fmt.Println(metricValue) db, err := sql.Open("mysql", "root:792a1d086a2b0a5d@tcp(127.0.0.1:3306)/db_scgd?charset=utf8") stmt, err := db.Prepare(`update scgd_metric_record set units=?, metric_value=https://www.it610.com/article/? where app_id=? and metric_name=?`) if err != nil { logError("update metric info prepare error: " + err.Error()) } res, err := stmt.Exec(unit, metricValue, appId, metricName) if err != nil { logError("update metric info to mysql error: " + err.Error()) } num, err := res.RowsAffected() if err != nil { logError("get rows affected error: " + err.Error()) } stmt.Close() db.Close() return num }


同步指标信息方法:
/** *同步指标信息 */ func syncMetricInfo(appId string, metricName string, metricValue float64, unit string) { fmt.Println("get") fmt.Println(metricValue) res := isMetricInfoExist(appId, metricName) if res == false { //添加 addMetricInfo(appId, metricName, metricValue, unit) } else { //更新 updateMetricInfo(appId, metricName, metricValue, unit) } }

需要引入的包汇总(没有的包请自行百度用go get 命令安装):
import ( "database/sql" "encoding/json" "errors" "fmt" _ "github.com/go-sql-driver/mysql" "github.com/gorilla/websocket" "github.com/robfig/cron" "io" "io/ioutil" "log" "net/http" "os" "time" )

运行 go run websocket_server.go
最终效果:

websocket|go websocket 群组聊天+基于数据大屏推送技术
文章图片










【websocket|go websocket 群组聊天+基于数据大屏推送技术】





    推荐阅读