使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警

春衣少年当酒歌,起舞四顾以笑和。这篇文章主要讲述使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警相关的知识,希望能为你提供帮助。
前言如果大家有在使用混合部署完全本地化邮件服务器(也包含 Linux 等其他平台的邮件服务器),或许很多人会经常遇到和我一样的问题:邮件服务器的公网 IP 被列入Spamhaus和类似的反垃圾邮件联盟的黑名单 导致业务部门发出去的邮件被对方邮件网关拦截或直接拒收。
背景假设你有运维这样的一个场景,这里以Exchange平台为例(相信很多企业都是使用该场景):
使用的是混合部署环境,并且:
发信邮件流是本地 Exchange ⇒ O365 Exchange Online;
收信邮件流是本地邮件网关 ⇒ 本地 Exchange 服务器⇒ O365用户邮箱
因此,如果管理员不能及时发现 IP 被列入黑名单,你的业务、销售部门此时还在不停的外发邮件。一般在 12-24小时左右,你的邮件公网 IP 会被微软列入黑名单,这个时候即便你在Exchange Online上将本地的公网 IP 添加到连接器的白名单,你的用户收发邮件仍然会收到影响。
解决方法 一、监控 Exchange 传输日志
【使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警】可以通过ELKSplunk这类的工具监控 Exchange 的传输日志,当出现关键词*550 5.7.1 Message rejected as spam by Content Filtering*550 5.7.1 Service unavailable; Client host[IP] blocked using Spamhaus 就发出告警通知邮件管理员
上述方法有个缺点,即当你通过该日志发现了问题,说明你的邮件服务器公网 IP 已经被列入黑名单有一段时间了,并且已经影响了你的生产环境。
二、通过python的第三方Pydnsbl模块进行实时监控
优点 当你的 IP 被列入 Spamhaus 黑名单后,会立刻被查询到,及时通知管理员并进行相应处理,就不会出现长时间未处理,导致微软同步 Spamhaus策略后整个公司的邮件都受到收发影响
官方文档
使用方法:
通过官方介绍,其使用方法非常简单,只要Python版本大于等于3.5版本即可。接下来我会基于这个方法做个详细的使用介绍,便于后期参考,也方便和我一样有类似需求的人查阅。

使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警

文章图片

优化有了这款工具和使用方法,接下来就要结合使用场景,来实现自动检查、自动触发告警。
由于我本人对 Python 不是很熟悉,只能根据自己的需求编写符合需求的脚本,如有不妥或可以优化、改进的地方,可留言赐教,谢谢!
#!/usr/bin/python3.7 # encoding: utf-8import urllib.request import json import pydnsbl import ssl ssl._create_default_https_context = ssl._create_unverified_contextTOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken" # 企业的id,在管理端-> "我的企业" 可以看到 # CORP_ID = "CORP_ID" CORP_ID = "CORP_ID****" # 某个自建应用的id及secret, 在管理端 -> 企业应用 -> 自建应用, 点进相应应用可以看到 APP_ID = "APP_ID*****" CORP_SECRET = "**********"class Wechat(object): "send monitor message by wechat"def __init__(self): self.CORP_ID = CORP_ID self.CORP_SECRET = CORP_SECRET self.APP_ID = APP_ID self.BASEURL = https://qyapi.weixin.qq.com/cgi-bin/ self.TOKEN_URL = gettoken?corpid=0& corpsecret=1.format( self.CORP_ID, self.CORP_SECRET)# 获取认证 token def Get_Token(self): try: response = urllib.request.urlopen( 01.format(self.BASEURL, self.TOKEN_URL)) access_token = json.loads( response.read().decode(utf-8))[access_token] with open(token, w) as f: f.write(access_token) except KeyError: raise KeyError return access_tokendef checker(self): Result = [] ip_checker = pydnsbl.DNSBLIpChecker() Result1 = ip_checker.check(68.128.212.240)# Exchange Server 01 Result2 = ip_checker.check(68.128.212.241)# Exchange Server 02 Result3 = ip_checker.check(68.128.212.242)# Exchange Server 03 Result.append(Result1) Result.append(Result2) Result.append(Result3) # domain_checker = pydnsbl.DNSBLDomainChecker() # Result4 = domain_checker.check(luxiu2.com)# Domain Name 01 # Result5 = domain_checker.check(videour.com)# Domain Name 02 # Result6 = domain_checker.check(jesdoit.com)# Domain Name 03 # Result.append(Result4) # Result.append(Result5) # Result.append(Result6) return Result# 本地 token def Local_Token(self): try: with open(token, r) as f: token = f.readline().strip() if token == : token = self.Get_Token() return token else: return token except IOError: token = self.Get_Token() return token# 获取报警人员名单 def Get_User(self, dep_id=1, fchild=1): #token = self.Get_Token() token = self.Local_Token() send_url = 0user/list?access_token=1& department_id=2& fetch_child3.format( self.BASEURL, token, dep_id, fchild,) respone = urllib.request.urlopen(url=send_url).read() stat = json.loads(respone)[userlist] user = for k in stat: user += 0 .format(k[mobile]) mobile = ,.join(user.split()) with open(user.txt, w) as f: f.write(mobile)# 发送报警信息 def Send_Message(self, content): self.content = # "touser":"User01|User02|User03",# 成员, @all及所有人"UserID1|UserID2|UserID3",//企业微信的唯一userid,非必输 "touser": jasonhuang, # "toparty": 1,# 部门,@all 及所有部门"PartyID1|PartyID2",//部门id,非必输,如果输入了就只给指定部门发送消息 "msgtype": text,# 消息类型,文本,图片 "agentid": self.APP_ID,# 企业应用 id "safe": "0",# "text": "content": content# 报警内容token = self.Local_Token() # 构建告警信息,必须是 json 格式 msg = messages_content = json.dumps(self.content) send_url = 0message/send?access_token=1.format( self.BASEURL, token) respone = urllib.request.urlopen( url=send_url, data=https://www.songbingjia.com/android/msg.encode("utf-8")).read() stat = json.loads(respone.decode())[errcode] if stat == 0: print(Succesfully Send To Wechat) else: token = self.Get_Token() send_url = 0message/send?access_token=1.format( self.BASEURL, token) respone = urllib.request.urlopen(url=send_url, data=https://www.songbingjia.com/android/msg).read() return responeif __name__ == __main__: msgs = [] msg = Wechat().checker() for i in msg: j = str(i) if j.find(BLACKLISTED) != -1: msgs.append(j) if msgs: msgsend = str(msgs) wechat = Wechat() wechat.Send_Message(邮件服务器公网IP状态告警:+msgsend) print(Error+msgsend) else: print(All Exchange Server Internet IP Status are Normal)

我在脚本第46-48行中监控了 3 台服务器的公网 IP 在Spamhaus的状态([font color=" #B22222" ]为测试效果,这些 IP 为网上找的垃圾邮件服务器公网 IP[/font]
使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警

文章图片

验证这里我们可以看到这 3 个 IP 均被提示异常,接下来我们去Spamhaus官网手动查询下结果看是否一致。
访问Lookup - Reputation Checker - Spamhaus并输入68.128.212.240进行查询,结果如下:
使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警

文章图片

其他两个 IP 也是一样的结果,所以可以判断其运行正常。
最后,通过上述脚本,制作个计划任务,每半小时或每小时执行一次,这样能很好的监控你的邮件服务器公网 IP 的运行状态了。 也可以启用脚本的 52-56行,对一个或多个域名进行监控,按需操作即可。
Enjoy~~

    推荐阅读