python|Python+Celery实现基于Fastnetmon异常流量清洗

背景 FastNetMon+Influxdb+Grafana+GoBGP可搭建一套基于 NetFLOW / sFLOW 的流量统计报告系统,其中:

  • FastNetMon 是一个基于多种抓包引擎(NetFlow, IPFIX, sFLOW, netmap, PF_RING, PCAP)的DoS/DDoS攻击高效分析工具,可以探测和分析网络中的异常流量情况,同时可以通过外部脚本通知或阻断攻击;
  • InfluxDB 是一款开源开源时序型数据库,和FastNetMon集成,用于将数据统计进行存储;
  • Grafana 是一款非常强大且易用的数据可视化工具;
  • GoBGP 是一个开源 BGP 实现,可以提供 BGP 协议的控制平面功能;
清洗需求 查询Influxdb,当某个IP段超过一定的阈值时,我们需要进行以下操作:
  1. 将开始时间、IP段、流量、是否过期、开始时间记录至统计数据库并开始计时;
  2. 执行Gobgp打标命令;
  3. 对于已过期的IP段,不要重复录入Mysql;
  4. 对于未过期的IP段,需保留历史记录,因此按步骤1再次记录至Mysql;
  5. 当计时延迟3600s结束之后,更新IP段的过期字段、结束时间并执行Gobgp去标命令;
需求分析 1.统计数据库选型
统计数据库我们可以复用Influxdb或使用Mysql,但Influxdb作为时序数据库,后续的字段更新必须基于时间戳和tag查询,字段的更新不友好,因此最终我们选择Mysql。
2.异步解耦
需求中bgp打标、去标、延迟执行等操作,在程序运行过程中计时、命令等待、命令返回错误等意外情况,都会导致运行中断,因此我们考虑使用Python + Celery(消息队列工具,可用于处理实时数据以及任务调度),来与以上情况进行异步解耦。
另,在实际应用中Celery可通过队列来调度任务,不用担心并发量高时系统负载过大。
环境准备 1.InfluxDB环境
# 若influxdb环境已存在,可跳过此步骤 docker pull influxdb:1.8docker run -p 8086:8086 \ --name influxdb \ --restart unless-stopped \ -e DOCKER_INFLUXDB_INIT_USERNAME=admin \ -e DOCKER_INFLUXDB_INIT_PASSWORD=admin@123 \ -v /data/influxdb/data:/var/lib/influxdb \ -v /data/influxdb/config/influxdb.conf:/etc/influxdb/influxdb.conf \ -v /etc/localtime:/etc/localtime \ -d influxdb:1.8

数据库记录(若有测试需求,可私聊获取):
python|Python+Celery实现基于Fastnetmon异常流量清洗
文章图片

2.Mysql数据库
# 1.安装mysql docker pull mysql:5.7docker run -p 3306:3306 --name mysql \ -v /usr/local/docker/mysql/conf:/etc/mysql \ -v /usr/local/docker/mysql/logs:/var/log/mysql \ -v /usr/local/docker/mysql/data:/var/lib/mysql \ -e MYSQL_ROOT_PASSWORD=123456 \ -d mysql:5.7# 2.运行容器 docker run -p 3306:3306 --name mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7# 3.登录 docker exec -it mysql bash mysql -uroot -p123456# 4.建库 create database fastnetmon # 5.建表 CREATE TABLE `statistic` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键', `network` varchar(40) DEFAULT NULL COMMENT 'IP段', `bits_incoming` int(11) DEFAULT NULL COMMENT '进口流量', `starttime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `endtime` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '结束时间', isexpire BOOLEAN DEFAULT NULL COMMENT '是否过期', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='统计表';

3.redis环境
yum install redis -y systemctl start redis

4.python环境
# 1.ananconda虚拟环境安装 # 2.安装python 3.9 conda create -n influx python=3.9 source activate influx pip install influxdb celery redis pymysql

目录结构
influxdb ├── celery_app │├── celeryconfig.py │├── gobgp.py │├── __init__.py │└── record.py ├── celery.log ├── dbconn.py ├── influx.log ├── influx.py ├── __init__.py ├── log.py └── Readme

代码详解 1.主程序
# 1.日志模块 vim log.py # -*- coding: utf-8 -*- import os import logginglogging.basicConfig( level = logging.INFO, format = '%(asctime)s, %(filename)s, %(levelname)s, %(message)s', datefmt = '%Y-%m-%d %H:%M:%S', filename = "influx.log", filemode = 'a' )# 2.数据库操作封装 vim dbconn.py # -*- coding: utf-8 -*- import requests import json from log import logging import pymysqldef QueryInflux(sql): try: url = "http://192.168.3.243:8086/query?pretty=true&db=fastnetmon&q=" + sql # 超时10s res = requests.get(url, timeout=10) return json.loads(res.text) except Exception as e: logging.error(e)def ConnMySql(): mysql_conn = pymysql.connect(host= '192.168.3.243', port= 3306, user= 'root', password= '123456', db= 'fastnetmon') return mysql_conndef QueryMySql(sql): try: mysql_conn = ConnMySql() with mysql_conn.cursor() as cursor: cursor.execute(sql) select_result = cursor.fetchone() return select_result except Exception as e: logging.error(e)def InsertMySql(sql): try: mysql_conn = ConnMySql() with mysql_conn.cursor() as cursor: cursor.execute(sql) mysql_conn.commit() except Exception as e: mysql_conn.rollback() logging.error(e)# 3.主程序 vim influx.py # -*- coding: utf-8 -*- from celery_app import gobgp,record from dbconn import QueryInflux,QueryMySql,InsertMySql from log import logging import datetimedef WorkHard(*lst): try: #下发bgp打包任务至celery,立即执行 #gobgp.cmd.delay("java -version") gobgp.cmd.delay("/opt/gobgp/gobgp global rib add -a ipv4 " + lst[0][2] +" community 65100:888") # 入库 starttime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') # isexpire字段 1:过期,0:未过期 sql = "INSERT INTO statistic (network, bits_incoming, isexpire, starttime ) VALUES ('{0}','{1}', '{2}', '{3}')".format(lst[0][2], lst[0][1], 0, starttime) logging.info("插入IP段 %s 到 statistic表" % lst[0][2]) InsertMySql(sql)#3600秒后记录更新 record.update.apply_async(args = [lst[0][2]], countdown = 3600)except Exception as e: logging.error(e)if __name__ == '__main__': # 查询原始表符合要求的数据 #top_sql = "select top(bits_incoming, network, 30),network from networks_traffic where time > now() - 10d tz('Asia/Shanghai')" top_sql = "select top(bits_incoming, network, 30),network from networks_traffic where time > now() - 6s" top_res = QueryInflux(top_sql) try: for item in top_res["results"]: # 若结果不为空,则判断是否过期 if "series" in item: for series in item["series"]: for value in series["values"]: # 指定阈值 if value[1] > 184549376: # statistic表是否为空 exist_measure = QueryMySql("select * from statistic limit 1; ") # 为空,则插入 ifexist_measure is None: WorkHard(value) # 不为空 else: # 是否存在过期数据 exist_isexp_sql = "select count(*)from statistic where network = \'" + value[2] + "\' and isexpire = '1'" exist_isexp_ip = QueryMySql(exist_isexp_sql) # 不存在过期数据 if exist_isexp_ip[0] == 0: #是否存在未过期数据 exist_noexp_sql = "select count(*)from statistic where network = \'" + value[2] + "\' and isexpire = '0'" exist_noexp_ip = QueryMySql(exist_noexp_sql) # 未过期数据不存在 if exist_noexp_ip[0] == 0: WorkHard(value) # 未过期数据存在,跳过 else: logging.info("IP段 %s 已存在且未过期,跳过" % value[2]) # 存在过期数据 else: WorkHard(value) except Exception as e: logging.error(e)

注意:当主程序第一次运行时,需要首先考虑到数据库为空的情况,然后再判断是否存在过期数据。
  • 日志模块,实现对日志格式的定义
  • 数据库操作封装,实现对Influxdb、Mysql的连接、查询、插入等操作的封装;
    其中Influxdb我们选择使用http api,而是不是influx-client,因为大数据量情况下client端操作会卡死库。
  • 主程序,主流程逻辑判断与调用;
2.Celery异步操作
# 1.Celery实例初始化 vim __init__.py # -*- coding: utf-8 -*- from celery import Celery# 创建 Celery 实例 app = Celery('tasks')# 通过 Celery 实例加载配置模块 app.config_from_object('celery_app.celeryconfig')# 2.Celery配置 vim celeryconfig.py # -*- coding: utf-8 -*- # 指定Broker BROKER_URL = 'redis://127.0.0.1:6379' # 指定Backend CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定时区,默认是 UTC CELERY_TIMEZONE='Asia/Shanghai' # 指定导入的任务模块 CELERY_IMPORTS = ( 'celery_app.gobgp', 'celery_app.record' )# 3.命令执行模块 vim gobgp.py # -*- coding: utf-8 -*- # bgp打包import sys import subprocess from celery_app import app sys.path.append("..")@app.task def cmd(command): try: subp = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) #等待执行时间 subp.wait(3) if subp.poll() == 0: #执行命令成功,返回命令结果 res = subp.communicate() else: #命令执行不成功,返回报错 res = subp.stderr.read()print(res) except Exception as e: print(e)# 4.数据库更新 vim record.py # -*- coding: utf-8 -*- # 更新过期记录from celery_app import app,gobgp import sys sys.path.append("..") from dbconn import QueryMySql,InsertMySql,QueryInflux import datetime@app.task def update(network): try: # 去标 #gobgp.cmd("pwd") gobgp.cmd("/opt/gobgp/gobgp globa rib del " + network + " -a ipv4")# 过期且更新结束时间 endtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') InsertMySql("update statistic set endtime = '{0}', isexpire = '{1}' where network = '{2}' and isexpire = '{3}'".format(endtime, 1, network, 0))print('设置IP段 %s 过期' % network) except Exception as e: print(e)

Celery任务主要分为:
  • 命令执行模块,执行具体的命令操作,捕获命令返回结果、命令异常捕获;
  • 数据库记录模块,命令执行及数据库记录更新等操作;
程序执行
# 1.运行celery # 首先在influxdb目录下运行celery cd influxdb nohup celery -A celery_app worker --loglevel=info -c 4>> celery.log2>&1 & # 建议使用supervisor托管celery,实现celery的自启动管理 # 2.运行主程序 python influx.py# 3.日志查看 cd influxdb celery任务在此目录下查看celery.log 主程序日志在此目录下查看influx.log

Mysql数据库新记录、未过期记录、过期记录效果如下:
python|Python+Celery实现基于Fastnetmon异常流量清洗
文章图片

python|Python+Celery实现基于Fastnetmon异常流量清洗
文章图片

总结 【python|Python+Celery实现基于Fastnetmon异常流量清洗】通过这篇文章,如果你想快搭建一套基于 NetFLOW / sFLOW 的流量统计报告系统,你可以体验下FastNetMon+Influxdb+Grafana+GoBGP的解决方案;如果你想学习Python + Celery 的具体使用,也可参考清洗需求来进行实践。

    推荐阅读