春衣少年当酒歌,起舞四顾以笑和。这篇文章主要讲述使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警相关的知识,希望能为你提供帮助。
前言如果大家有在使用混合部署
或完全本地化
的 邮件服务器
(也包含 Linux 等其他平台的邮件服务器),或许很多人会经常遇到和我一样的问题:邮件服务器的公网 IP 被列入Spamhaus和类似的反垃圾邮件联盟的黑名单 导致业务部门发出去的邮件被对方邮件网关
拦截或直接拒收。
背景假设你有运维这样的一个场景,这里以Exchange
平台为例(相信很多企业都是使用该场景):
使用的是混合部署环境,并且:发信邮件流
是本地 Exchange ⇒
O365 Exchange Online;
收信邮件流
是本地邮件网关 ⇒
本地 Exchange 服务器⇒
O365用户邮箱
因此,如果管理员不能及时发现 IP 被列入黑名单,你的业务、销售部门此时还在不停的外发邮件。一般在 12-24小时左右,你的邮件公网 IP 会被微软列入黑名单,这个时候即便你在Exchange Online
上将本地的公网 IP 添加到连接器的白名单,你的用户收发邮件仍然会收到影响。
解决方法
一、监控 Exchange 传输日志
【使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警】可以通过ELK
,Splunk
这类的工具监控 Exchange 的传输日志,当出现关键词*550 5.7.1 Message rejected as spam by Content Filtering*
或550 5.7.1 Service unavailable;
Client host[IP] blocked using Spamhaus
就发出告警通知邮件管理员
上述方法有个缺点,即当你通过该日志发现了问题,说明你的邮件服务器公网 IP 已经被列入黑名单有一段时间了,并且已经影响了你的生产环境。
二、通过python的第三方Pydnsbl模块进行实时监控
优点 当你的 IP 被列入 Spamhaus 黑名单后,会立刻被查询到,及时通知管理员并进行相应处理,就不会出现长时间未处理,导致微软同步 Spamhaus策略后整个公司的邮件都受到收发影响
官方文档
使用方法:
通过官方介绍,其使用方法非常简单,只要Python
版本大于等于3.5版本即可。接下来我会基于这个方法做个详细的使用介绍,便于后期参考,也方便和我一样有类似需求的人查阅。
文章图片
优化有了这款工具和使用方法,接下来就要结合使用场景,来实现自动检查、自动触发告警。
由于我本人对 Python 不是很熟悉,只能根据自己的需求编写符合需求的脚本,如有不妥或可以优化、改进的地方,可留言赐教,谢谢!
#!/usr/bin/python3.7
# encoding: utf-8import urllib.request
import json
import pydnsbl
import ssl
ssl._create_default_https_context = ssl._create_unverified_contextTOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
# 企业的id,在管理端->
"我的企业" 可以看到
# CORP_ID = "CORP_ID"
CORP_ID = "CORP_ID****"
# 某个自建应用的id及secret, 在管理端 ->
企业应用 ->
自建应用, 点进相应应用可以看到
APP_ID = "APP_ID*****"
CORP_SECRET = "**********"class Wechat(object):
"send monitor message by wechat"def __init__(self):
self.CORP_ID = CORP_ID
self.CORP_SECRET = CORP_SECRET
self.APP_ID = APP_ID
self.BASEURL = https://qyapi.weixin.qq.com/cgi-bin/
self.TOKEN_URL = gettoken?corpid=0&
corpsecret=1.format(
self.CORP_ID, self.CORP_SECRET)# 获取认证 token
def Get_Token(self):
try:
response = urllib.request.urlopen(
01.format(self.BASEURL, self.TOKEN_URL))
access_token = json.loads(
response.read().decode(utf-8))[access_token]
with open(token, w) as f:
f.write(access_token)
except KeyError:
raise KeyError
return access_tokendef checker(self):
Result = []
ip_checker = pydnsbl.DNSBLIpChecker()
Result1 = ip_checker.check(68.128.212.240)# Exchange Server 01
Result2 = ip_checker.check(68.128.212.241)# Exchange Server 02
Result3 = ip_checker.check(68.128.212.242)# Exchange Server 03
Result.append(Result1)
Result.append(Result2)
Result.append(Result3)
# domain_checker = pydnsbl.DNSBLDomainChecker()
# Result4 = domain_checker.check(luxiu2.com)# Domain Name 01
# Result5 = domain_checker.check(videour.com)# Domain Name 02
# Result6 = domain_checker.check(jesdoit.com)# Domain Name 03
# Result.append(Result4)
# Result.append(Result5)
# Result.append(Result6)
return Result# 本地 token
def Local_Token(self):
try:
with open(token, r) as f:
token = f.readline().strip()
if token == :
token = self.Get_Token()
return token
else:
return token
except IOError:
token = self.Get_Token()
return token# 获取报警人员名单
def Get_User(self, dep_id=1, fchild=1):
#token = self.Get_Token()
token = self.Local_Token()
send_url = 0user/list?access_token=1&
department_id=2&
fetch_child3.format(
self.BASEURL, token, dep_id, fchild,)
respone = urllib.request.urlopen(url=send_url).read()
stat = json.loads(respone)[userlist]
user =
for k in stat:
user += 0 .format(k[mobile])
mobile = ,.join(user.split())
with open(user.txt, w) as f:
f.write(mobile)# 发送报警信息
def Send_Message(self, content):
self.content =
# "touser":"User01|User02|User03",# 成员, @all及所有人"UserID1|UserID2|UserID3",//企业微信的唯一userid,非必输
"touser": jasonhuang,
# "toparty": 1,# 部门,@all 及所有部门"PartyID1|PartyID2",//部门id,非必输,如果输入了就只给指定部门发送消息
"msgtype": text,# 消息类型,文本,图片
"agentid": self.APP_ID,# 企业应用 id
"safe": "0",#
"text":
"content": content# 报警内容token = self.Local_Token()
# 构建告警信息,必须是 json 格式
msg = messages_content = json.dumps(self.content)
send_url = 0message/send?access_token=1.format(
self.BASEURL, token)
respone = urllib.request.urlopen(
url=send_url, data=https://www.songbingjia.com/android/msg.encode("utf-8")).read()
stat = json.loads(respone.decode())[errcode]
if stat == 0:
print(Succesfully Send To Wechat)
else:
token = self.Get_Token()
send_url = 0message/send?access_token=1.format(
self.BASEURL, token)
respone = urllib.request.urlopen(url=send_url, data=https://www.songbingjia.com/android/msg).read()
return responeif __name__ == __main__:
msgs = []
msg = Wechat().checker()
for i in msg:
j = str(i)
if j.find(BLACKLISTED) != -1:
msgs.append(j)
if msgs:
msgsend = str(msgs)
wechat = Wechat()
wechat.Send_Message(邮件服务器公网IP状态告警:+msgsend)
print(Error+msgsend)
else:
print(All Exchange Server Internet IP Status are Normal)
我在脚本第
46-48
行中监控了 3 台服务器的公网 IP 在Spamhaus
的状态([font color="
#B22222"
]为测试效果,这些 IP 为网上找的垃圾邮件服务器公网 IP[/font])文章图片
验证这里我们可以看到这 3 个 IP 均被提示异常,接下来我们去
Spamhaus
官网手动查询下结果看是否一致。访问Lookup - Reputation Checker - Spamhaus并输入
68.128.212.240
进行查询,结果如下:文章图片
其他两个 IP 也是一样的结果,所以可以判断其运行正常。
最后,通过上述脚本,制作个计划任务,每半小时或每小时执行一次,这样能很好的监控你的邮件服务器公网 IP 的运行状态了。 也可以启用脚本的
52-56
行,对一个或多个域名进行监控,按需操作即可。Enjoy~~
推荐阅读
- Java基础之Optional类(JDK1.8新特性)
- Powershell实现多硬盘的电脑使用SCCM安装系统选择目的盘清单
- 记一次nginx405的错误
- JS 变量作用域与内存
- Java基础之方法引用(JDK1.8新特性)
- [ C++ ] 类与对象(下)日期类Date补充及流提取和流插入运算符重载
- Java基础之Stream流(JDK1.8新特性)
- HarmonyOS - 实现多设备协同
- 看漫画漫画柜mhgui,Python爬虫之神奇的eval,附赠一个压缩模块