参考:http://www.oschina.net/translate/build-a-realtime-chat-server-with-go-and-websockets
基于以上的基础上做了自动推送数据部分
程序主入口增加定时任务:
/**
*程序入口
*/
func main() {
//开启一个web scoket server
fs := http.FileServer(http.Dir("../public"))
http.Handle("/", fs)
http.HandleFunc("/ws", handleConnections)
go handleMessages()
go cronTask()
log.Println("http server started on: 30080")
err := http.ListenAndServe(":30080", nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}
定时任务每五秒从hdfs namenode 监控页面抓取指标数据
/**
*定时任务 推送消息
*/
func cronTask() {
c := cron.New()
defer c.Stop()
spec := "*/5 * * * * ?"
c.AddFunc(spec, func() {
logDebug("cron running...")
syncHDFSMetricInfo()
})
c.Start()select {}
}
抓取数据操作:
/**
*同步HDFS指标信息
*/
func syncHDFSMetricInfo() {
res := getHDFSInfoFromNameNode()
var data map[string]interface{}
if err := json.Unmarshal([]byte(res), &data);
err == nil {
beans := data["beans"].([]interface{})
bean := beans[0]
beanMap := bean.(map[string]interface{})
capacityTotal := beanMap["CapacityTotal"]d := dataFilter(capacityTotal.(float64))
//value, _ := fmt.Printf("%.1f", d["value"])
value := d["value"].(float64)
unit := d["unit"].(string)
syncMetricInfo("hdfs-go", "capacityTotal-go", float64(value), unit)
//消息写入channel
fmt.Println("start...")
msg := Message{"12070162xx@qq.com", "system", "系统自动推送消息..."}
fmt.Println(msg)
broadcast <- msg
fmt.Println("end...")
}}
处理错误信息方法:
/**
*处理错误信息
*/
func check(e error) {
if e != nil {
panic(e)
}
}
检查日志文件是否存在方法:
/**
*判断文件是否存在 存在返回true 不存在返回 fasle
*/
func checkFileIsExist(filename string) bool {
var exist = true
if _, err := os.Stat(filename);
os.IsNotExist(err) {
exist = false
}
return exist
}
记录日志通用方法:
/**
*写日志方法
*/
func logWrite(content string, fileName string) {
var f *os.File
var err1 error
if checkFileIsExist(fileName) { //如果文件存在
f, err1 = os.OpenFile(fileName, os.O_APPEND|os.O_WRONLY, 0666) //打开文件
} else {
f, err1 = os.Create(fileName) //创建文件
}
check(err1)
_, err2 := io.WriteString(f, content+"\n") //写入文件(字符串)
check(err2)
f.Close()
}
调试日志记录方法:
/**
*调试日志
*/
func logDebug(content string) {
var date = time.Now().Format("2006-01-02")
logName := "debug-" + date + ".log"
logWrite(content, logName)
}
错误日志记录方法:
/**
*错误日志
*/
func logError(content string) {
var date = time.Now().Format("2006-01-02")
logName := "error-" + date + ".log"
logWrite(content, logName)
}
数据抓取方法:
/**
*从hdfs namenode监控页面获取存储信息
*/
func getHDFSInfoFromNameNode() string {
url := "http://tempt13.ops.lycc.qihoo.net:50070/jmx?qry=Hadoop:service=NameNode,name=FSNamesystemState"
logDebug("get hdfs info url: " + url)resp, err := http.Get(url) //改送HTTP Get请求
if err != nil {
logError("get hdfs info namenode error: " + err.Error())
return ""
}if resp.Body != nil {
defer resp.Body.Close()
}if resp.StatusCode != http.StatusOK {
logError(errors.New(resp.Status).Error())
return ""
}data, err := ioutil.ReadAll(resp.Body)
logDebug("get hdfs info namenode data: " + string(data))
return string(data)
}
数据格式化方法(其实就是找到合适的单位)
/**
*Bytes 数据转成合适单位
*/
func dataFilter(value float64) map[string]interface{} {
a := [6]string{"B", "K", "M", "G", "T", "P"}
index := 0
for value >= 1024 {
if value > 1024 {
value = https://www.it610.com/article/value / 1024
index = index + 1
}
}
var m = make(map[string]interface{})
m["unit"] = a[index]
m["value"] = value
return m
}
数据库指标数据是否存在方法:
/**
*获取指标数据
*/
func isMetricInfoExist(appId string, metricName string) bool {
db, err := sql.Open("mysql", "root:792a1d086a2b0a5d@tcp(127.0.0.1:3306)/db_scgd?charset=utf8")
sql := "select * from scgd_metric_record where app_id= '" + appId + "' and metric_name= '" + metricName + "'"
rows, err := db.Query(sql)
if err != nil {
logError("get metric info from mysql error: " + err.Error())
}
db.Close()
columns, err := rows.Columns()
if err != nil {
logError("get rows column error: " + err.Error())
}
count := len(columns)
tableData := make([]map[string]interface{}, 0)
values := make([]interface{}, count)
valuePtrs := make([]interface{}, count)
for rows.Next() {
for i := 0;
i < count;
i++ {
valuePtrs[i] = &values[i]
}
rows.Scan(valuePtrs...)
entry := make(map[string]interface{})
for i, col := range columns {
var v interface{}
val := values[i]
b, ok := val.([]byte)
if ok {
v = string(b)
} else {
v = val
}
entry[col] = v
}
tableData = https://www.it610.com/article/append(tableData, entry)
return true
}
return false
}
新增指标数据方法:
/**
*新增指标
*/
func addMetricInfo(appId string, metricName string, metricValue float64, unit string) int64 {
db, err := sql.Open("mysql", "root:792a1d086a2b0a5d@tcp(127.0.0.1:3306)/db_scgd?charset=utf8")
stmt, err := db.Prepare(`insert into scgd_metric_record (metric_name, app_id, units, metric_value) values (?, ?, ?, ?)`)
if err != nil {
logError("add metric info prepare error: " + err.Error())
}
res, err := stmt.Exec(metricName, appId, unit, metricValue)
if err != nil {
logError("add metric info to mysql error: " + err.Error())
}
id, err := res.LastInsertId()
if err != nil {
logError("get last insert id error: " + err.Error())
}
stmt.Close()
db.Close()
return id
}
更新指标数据方法:
/**
*更新指标
*/
func updateMetricInfo(appId string, metricName string, metricValue float64, unit string) int64 {
fmt.Println("here")
fmt.Println(metricValue)
db, err := sql.Open("mysql", "root:792a1d086a2b0a5d@tcp(127.0.0.1:3306)/db_scgd?charset=utf8")
stmt, err := db.Prepare(`update scgd_metric_record set units=?, metric_value=https://www.it610.com/article/? where app_id=? and metric_name=?`)
if err != nil {
logError("update metric info prepare error: " + err.Error())
}
res, err := stmt.Exec(unit, metricValue, appId, metricName)
if err != nil {
logError("update metric info to mysql error: " + err.Error())
}
num, err := res.RowsAffected()
if err != nil {
logError("get rows affected error: " + err.Error())
}
stmt.Close()
db.Close()
return num
}
同步指标信息方法:
/**
*同步指标信息
*/
func syncMetricInfo(appId string, metricName string, metricValue float64, unit string) {
fmt.Println("get")
fmt.Println(metricValue)
res := isMetricInfoExist(appId, metricName)
if res == false {
//添加
addMetricInfo(appId, metricName, metricValue, unit)
} else {
//更新
updateMetricInfo(appId, metricName, metricValue, unit)
}
}
需要引入的包汇总(没有的包请自行百度用go get 命令安装):
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/gorilla/websocket"
"github.com/robfig/cron"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"time"
)
运行 go run websocket_server.go
最终效果:
文章图片
【websocket|go websocket 群组聊天+基于数据大屏推送技术】
推荐阅读
- Go|Docker后端部署详解(Go+Nginx)
- GO|GO,GO,GO!
- Go成长之路|go中判断空字符串、nil和len(t)的用法
- websocket|android中如何使用websocket,以及用nodejs做后台的实例
- go编译tools
- WebSocket安卓客户端实现详解(一)--连接建立与重连
- go grpc安装与使用
- goroutine 调度原理
- Go|Go进阶之路——复杂类型