Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 981|回复: 0

python调用阿里云产品接口实现自动发现异常访问ip并禁用2小时

[复制链接]
  • TA的每日心情
    奋斗
    2024-4-6 11:05
  • 签到天数: 748 天

    [LV.9]以坛为家II

    2034

    主题

    2092

    帖子

    70万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    705612
    发表于 2021-8-28 10:06:49 | 显示全部楼层 |阅读模式


    因为有些人使用爬虫访问web,浪费服务器资源不说还会影响正常用户,所以需要限制爬虫ip,本可以通过nginx或者防火墙限制,但需要重新编译nginx,开启防火墙怕又不知道会影响到哪些程序,所以

    想用/etc/hosts.deny文件限制某些ip访问web的,于是配置了http:124.90.*.* 发现根本没用,但我之前使用该文件是可以限制ip ssh的,为什么现在不能限制http了呢,

    于是上网搜了下,发现如下内容:

    hosts.allow和hosts.deny规则的执行者为TCP wrappers,对应守护进程为tcpd;而tcpd执行依赖于程序使用了libwrap库。

    也就是说:hosts.allow和hosts.deny支持且只支持使用了libwrap库的服务。

    那如何查看程序是否使用libwarp?有俩种方法:

    方法一、查看hosts_access字段串

    查看应用程序是否支持 wrapper,可以使用 strings 程序然后 grep 字符串 hosts_access:

    方法二、使用ldd,

    ldd /usr/sbin/sshd | grep libwrap

    查测发现使用xinetd的都可以、sshd可以、vsftpd可以,nginx httpd则不可以。



    继续寻找方法时发现可以调用负载均衡相关api结果,于是乎采用了此方法

    '''
    先安装模块,主要有两个,一个是核心模块,必须安装 pip install aliyun-python-sdk-core
    另一个是你要修改哪个产品,本次是更改负载均衡,所以安装  pip install aliyun-python-sdk-slb  ,其他产品参考 https://developer.aliyun.com/tools/sdk#/python
    功能:每隔10分钟分析各个服务器日志,查看是否有异常IP调用短信接口(单个IP一分钟内调用短信接口10次视为异常IP),如有则把IP加入黑名单,2个小时后解封
    '''
    
    #!/usr/bin/python
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkcore.acs_exception.exceptions import ClientException
    from aliyunsdkcore.acs_exception.exceptions import ServerException
    from aliyunsdkslb.request.v20140515.AddAccessControlListEntryRequest import AddAccessControlListEntryRequest
    from aliyunsdkslb.request.v20140515.RemoveAccessControlListEntryRequest import RemoveAccessControlListEntryRequest
    
    import smtplib
    from email.mime.text import MIMEText
    from email.header import Header
    import paramiko,re,logging,os
    import time,datetime,json
    
    #过滤日志,最近一分钟调用短信接口超过10次的ip
    def check_log(host,file):
        # 创建SSH对象
        ssh = paramiko.SSHClient()
        # 允许连接不在know_hosts文件中的主机
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        # 连接服务器
    
        try:
            ssh.connect(host, port=22, username='root', password='******$%^')
            # 执行命令, 过滤日志,最近一分钟调用短信接口超过10次的ip
            one_min_time = (datetime.datetime.now()-datetime.timedelta(minutes=1)).strftime("%Y-%m-%d %H:%M")  #一分钟前时间
            shell = "sed -n '/%s/,$p' %s | grep regip | awk '{print $5}' | cut -d : -f2 | sort | uniq -c | sort -r | head -5 | awk '{if ($1 >9)print $2}'" % (one_min_time,file)
            #print(shell)
            stdin, stdout, stderr = ssh.exec_command(shell)
            # 获取命令结果
            result = stdout.read().decode(encoding = "utf-8")
    
            deny_host_re = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}')  #待拒绝ip的正则表达式
            deny_host_list = deny_host_re.findall(result)
            if deny_host_list:
                for deny_host in deny_host_list:
                    now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
                    shell = 'cat %s  | grep "%s" | wc -l' % (deny_file,deny_host)
                    if int(os.popen(shell).read()) == 0:      #判断当前IP是否已存在黑名单,避免重复操作
                        f = open(deny_file, "a", encoding='utf-8')
                        str_info = now_time + "  " + deny_host + "  " + "服务器地址:" + host + "\n"  #文本格式:封禁时间  待封IP  待封IP所在服务器地址
                        logging.info("str_info: %s" % str_info)
                        f.write(str_info)
                        f.close()
                        slb_add_host(deny_host,accessKeyId,accessSecret,acl_id)   #执行添加黑名单动作
                    else:
                        logging.info("IP:%s 已存在黑名单中" % deny_host)
    
            else:
                logging.info("主机:%s 本次检测无异常~" % host)
    
        except Exception as e:
            logging.error("主机:%s 连接失败,原因:%s" % ( host,e ) )
        # 关闭连接
        ssh.close()
    
    #ip加入黑名单时发送邮件提醒
    #这种发邮件是使用服务器上邮件服务器,需要在服务器/etc/mail.rc里进行相应配置
    # def send_mail(host,str):
    #     try:
    #         shell = '''
    #                 echo -e "短信接口异常调用IP:%s\n状态:%s"|mail -s "短信接口异常调用"   zhangpan@tan66.com
    #                 ''' % (host,str)
    #         os.system(shell)
    #         logging.info("封禁通知邮件发送成功")
    #     except smtplib.SMTPException as e:
    #         logging.error("发送邮件失败,原因:%s" % e)
    
    #这种更灵活方便,无需在服务器上配置
    def send_mail(host,str):
        # 第三方 SMTP 服务
        mail_host = "smtp.exmail.qq.com"  # 设置服务器
        mail_user = "********@tan66.com"  # 用户名
        mail_pass = "********"  # 密码
    
        sender = '******@tan66.com'
        receivers = ['*******@tan66.com']  # 接收邮件,可设置为你的QQ邮箱或者其他邮箱
    
        message = MIMEText('短信接口异常调用IP:%s\n状态:%s!'% (host,str), 'plain', 'utf-8')
        message['From'] = Header("短信接口", 'utf-8')
        message['To'] = Header("研发组", 'utf-8')
    
        subject = '短信接口异常调用'
        message['Subject'] = Header(subject, 'utf-8')
    
        try:
            smtpObj = smtplib.SMTP_SSL(mail_host,465) # 465 为腾讯企业邮箱SMTP SSL端口号
            smtpObj.login(mail_user, mail_pass)
            smtpObj.sendmail(sender, receivers, message.as_string())
            logging.info("邮件发送成功")
        except smtplib.SMTPException as e:
            logging.error("发送邮件失败,原因:%s" % e)
    
    
    
    #阿里云slb访问控制里添加ip
    def slb_add_host(host,accessKeyId,accessSecret,acl_id):
        client = AcsClient(accessKeyId, accessSecret, 'cn-hangzhou')  # 阿里云账号的key
        request = AddAccessControlListEntryRequest()  # 本次api调用的接口,接口有很多,可参考https://help.aliyun.com/document_detail/27570.html?spm=a2c4g.11186623.6.622.3b735682J2ldXT
        request.set_accept_format('json')
    
        try:
            host = host + "/32"
            request.set_AclEntrys([{"entry": host, "comment": "deny"}])  # 需要拒绝的ip
            request.set_AclId(acl_id)  # 负载均衡安全组id
            response = client.do_action_with_exception(request)  # 执行操作
            logging.info(str(response, encoding='utf-8'))  # 查看调用接口结果
            logging.info("slb添加异常IP:%s成功!" % host)
            send_mail(host, str="添加至黑名单")
        except BaseException as e:
            logging.error("添加黑名单失败,原因:%s" % e)
    
    
    
    #slb删除ip
    def slb_del_host(host,accessKeyId,accessSecret,acl_id):
        host = str(host) + "/32"
        logging.info("正在解封IP:%s" % host)
        try:
            client = AcsClient(accessKeyId, accessSecret, 'cn-hangzhou')
            request = RemoveAccessControlListEntryRequest()
            request.set_accept_format('json')
            request.set_AclEntrys([{"entry": host, "comment": "deny"}])
            request.set_AclId(acl_id)
    
            client.do_action_with_exception(request)
            logging.info("slb删除IP:%s成功" % host)  # 查看调用接口结果
            send_mail(host, str="移出黑名单")
        except BaseException as e:
            logging.error("移出黑名单失败,原因:%s" % e)
    
    # 删除slb里黑名单ip
    def del_text(deny_file):
        with open(deny_file, "r", encoding='utf-8') as f:
            data = f.read()
        logging.info("黑名单文件:%s" % deny_file)
        if data:
            d_list = []  # 解封ip清单
            with open(deny_file, "r", encoding='utf-8') as f:
                lines = f.readlines()
                # logging.info("lines:%s" % lines)
                for line in lines:
                    if line.strip().lstrip():
                        time1 = line.split("  ")[0]  # 获取文本中被封ip的时间
                        time2 = time.mktime(time.strptime(time1, '%Y-%m-%d %H:%M'))  # 将时间转换给时间戳
                        time3 = int(now_time - time2)  # 解封倒计时
                        host = line.split("  ")[1].strip().lstrip()
                        if time3 > 7200:
                            slb_del_host(host, accessKeyId, accessSecret, acl_id)
                            d_list.append(host)
                        else:
                            logging.info("%-15s 预计%s 秒解封" % (host, 7200 - time3))
            for deny in d_list:
                shell = "sed -i '/%s/d' %s" % (deny, deny_file)
                os.system(shell)
                logging.info("文本中删除黑名单ip:%s" % deny)
    
    if __name__ == "__main__":
    
        accessKeyId = '*********'               #key  需要登录阿里云账号获取
        accessSecret = '********'
        acl_id = 'acl-**********'           #负载均衡安全组id
        log_file = '/var/log/msg-api.log'
        BASE_DIR = os.path.dirname(os.path.abspath(__file__))
        deny_file = BASE_DIR + "/deny.hosts"
        if not os.path.exists(log_file):
            os.mknod(log_file)
        if not os.path.exists(deny_file):
            os.mknod(deny_file)
        now_time = time.time()
        logging.basicConfig(level=logging.INFO,
                            format='%(asctime)s %(levelname)s %(message)s',
                            filename=log_file,
                            filemode='a+')
    
        logging.info("starting")
        host_dict = {"10.253.168.*":"api",
                     "10.253.208.*":"api",
                     "10.253.210.*":"cs",
                     "10.253.210.*":"cs",
                     }
        api_log_file = '/data1/application/api/tomcat/logs/catalina.out'
        cs_log_file = '/data1/application/cs/tomcat-1/logs/catalina.out'
        oss_log_file = '/data1/application/server/tomcat-2/logs/catalina.out'
        for host in host_dict:
            if host_dict[host] == "api":
                check_log(host,api_log_file)
            else:
                check_log(host,cs_log_file)
                check_log(host,oss_log_file)
    
        del_text(deny_file)

     



    如果日志使用了es收集,则更方便操作
    '''
    先安装模块,主要有两个,一个是核心模块,必须安装 pip install aliyun-python-sdk-core
    另一个是你要修改哪个产品,本次是更改负载均衡,所以安装  pip install aliyun-python-sdk-slb  ,其他产品参考 https://developer.aliyun.com/tools/sdk#/python
    功能:每隔10分钟分析各个服务器日志,查看是否有异常IP调用短信接口(单个IP一分钟内调用短信接口10次视为异常IP),如有则把IP加入黑名单,2个小时后解封
    '''
    
    #!/usr/bin/python
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkcore.acs_exception.exceptions import ClientException
    from aliyunsdkcore.acs_exception.exceptions import ServerException
    from aliyunsdkslb.request.v20140515.AddAccessControlListEntryRequest import AddAccessControlListEntryRequest
    from aliyunsdkslb.request.v20140515.RemoveAccessControlListEntryRequest import RemoveAccessControlListEntryRequest
    from elasticsearch import Elasticsearch
    import smtplib
    from email.mime.text import MIMEText
    from email.header import Header
    import paramiko,re,logging,os
    import time,datetime,json
    
    
    
    class slb:
        accessKeyId  =  '********'                  # key  需要登录阿里云账号获取
        accessSecret =  '********'
        acl_id       =  'acl-****'         # 负载均衡安全组id
        log_file     =  '/var/log/msg-api.log'
        BASE_DIR     =  os.path.dirname(os.path.abspath(__file__))
        deny_file    =  BASE_DIR + "/deny.hosts"
    
        logging.basicConfig(level=logging.INFO,
                            format='%(asctime)s %(levelname)s %(message)s',
                            filename=log_file,
                            filemode='a+')
    
        logging.info("starting")
        def __init__(self,index):
            self.index = index
    
        def start(self):
            if not os.path.exists(slb.log_file):
                os.mknod(slb.log_file)
            if not os.path.exists(slb.deny_file):
                os.mknod(slb.deny_file)
    
    
        def chack_log(self):
            self.start()
            es=Elasticsearch("10.253.**.**",port=9200)
            body = {
                      "query": {
                        "bool": {
                          "must": [
                            {
                              "match": {
                                "message": "regip"
                              }
                            },
                            {
                              "range": {
                                "@timestamp": {
                                  "gte": "now-1m",
                                  "lte": "now",
                                  "format": "epoch_millis"
                                }
                              }
                            }
                          ],
                          "must_not": []
                        }
                      }
                    }
            logging.info(f"开始检查{index}日志")
            result = es.search(index=self.index, body=body)  #返回字典形式
            result1 = json.dumps(result, indent=2, ensure_ascii=False) #返回格式化形式,方便查看,但不是字典
            if result["hits"]["total"] > 9:   #es查询结果大于9  表示可能有用户异常调用短信接口
                ip_list = []
                ip_dict = {}
                try:
                    #获取一分钟内调用短信接口的所有IP
                    for info in result["hits"]["hits"]:
                        log_info = info["_source"]["log_info"]
                        deny_host_re = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}')  # ip的正则表达式
                        deny_host = re.search(deny_host_re,log_info).group()
                        ip_list.append(deny_host)
                    # 统计各个IP调用结果次数
                    for deny_ip in ip_list:
                        if deny_ip in ip_dict:
                            ip_dict[deny_ip] += 1
                        else:
                            ip_dict[deny_ip] = 1
                    #判断异常IP是否已存在,不存在则执行添加黑名单操作
                    for deny_host,count in ip_dict.items():  #遍历字典,deny_host为调用短信接口的ip  count为调用次数
                        if count > 2:
                            now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
                            shell = f'cat {slb.deny_file}  | grep "{deny_host}" | wc -l'
                            if int(os.popen(shell).read()) == 0:  # 判断当前IP是否已存在黑名单,避免重复操作
                                f = open(slb.deny_file, "a", encoding='utf-8')
                                str_info = now_time + "  " + deny_host + "  " + f"web:{index}" + "\n"  # 文本格式:封禁时间  待封IP  web服务器
                                logging.info(f"{str_info}str_info: %s")
                                f.write(str_info)
                                f.close()
                                self.slb_add_host(deny_host)  # 执行添加黑名单动作
                                logging.info("执行添加黑名单操作")
                            else:
                                logging.info(f"IP:{deny_host} 已存在黑名单中")
                except BaseException as e:
                    logging.error(f"检查{index}日志时发现错误,错误信息:{e}")
            else:
                logging.info(f"{index} 本次检测无异常")
    
    
    
         #添加ip至黑名单
        def slb_add_host(self,host):
            client = AcsClient(self.accessKeyId, self.accessSecret, 'cn-hangzhou')  # 阿里云账号的key
            request = AddAccessControlListEntryRequest()  # 本次api调用的接口,接口有很多,可参考https://help.aliyun.com/document_detail/27570.html?spm=a2c4g.11186623.6.622.3b735682J2ldXT
            request.set_accept_format('json')
            try:
                host = host + "/32"
                request.set_AclEntrys([{"entry": host, "comment": "deny"}])  # 需要拒绝的ip
                request.set_AclId(self.acl_id)  # 负载均衡安全组id
                response = client.do_action_with_exception(request)  # 执行操作
                logging.info(str(response, encoding='utf-8'))  # 查看调用接口结果
                self.send_mail(host, str="添加至黑名单")
                logging.info(f"slb添加异常IP:{host}成功!")
            except BaseException as e:
                logging.error(f"添加黑名单失败,原因:{e}" )
    
    
        #slb黑名单里移出ip
        def slb_del_host(self,host):
            host = str(host) + "/32"
            logging.info(f"正在解封IP:{host}" )
            try:
                client = AcsClient(self.accessKeyId, self.accessSecret, 'cn-hangzhou')
                request = RemoveAccessControlListEntryRequest()
                request.set_accept_format('json')
                request.set_AclEntrys([{"entry": host, "comment": "deny"}])
                request.set_AclId(self.acl_id)
    
                client.do_action_with_exception(request)
                self.send_mail(host, str="移出黑名单")
                logging.info(f"slb删除IP:{host}成功" )  # 查看调用接口结果
            except BaseException as e:
                logging.error("移出黑名单失败,原因:%s" % e)
    
        #发送邮件
        def send_mail(self,host, str):
            # 第三方 SMTP 服务
            mail_host = "smtp.exmail.qq.com"  # 设置服务器
            mail_user = "****@tan66.com"  # 用户名
            mail_pass = "*****"  # 密码
    
            sender = '******@tan66.com'
            receivers = ['*****@tan66.com']  # 接收邮件,可设置为你的QQ邮箱或者其他邮箱
    
            message = MIMEText(f'短信接口异常调用IP:{host}\n状态:{str}!', 'plain', 'utf-8')
            message['From'] = Header("短信接口", 'utf-8')
            message['To'] = Header("研发组", 'utf-8')
    
            subject = '短信接口异常调用'
            message['Subject'] = Header(subject, 'utf-8')
    
            try:
                smtpObj = smtplib.SMTP_SSL(mail_host, 465)  # 465 为腾讯企业邮箱SMTP SSL端口号
                smtpObj.login(mail_user, mail_pass)
                smtpObj.sendmail(sender, receivers, message.as_string())
                logging.info("邮件发送成功")
            except smtplib.SMTPException as e:
                logging.error("发送邮件失败,原因:%s" % e)
    
    
        # 删除slb文本里黑名单ip
        def del_text(self):
            with open(self.deny_file, "r", encoding='utf-8') as f:
                data = f.read()
            #logging.info("黑名单文件:%s" % self.deny_file)
            if data:
                d_list = []  # 解封ip清单
                with open(self.deny_file, "r", encoding='utf-8') as f:
                    lines = f.readlines()
                    # logging.info("lines:%s" % lines)
                    for line in lines:
                        if line.strip().lstrip():
                            now_time = time.time()
                            time1 = line.split("  ")[0]  # 获取文本中被封ip的时间
                            time2 = time.mktime(time.strptime(time1, '%Y-%m-%d %H:%M'))  # 将时间转换给时间戳
                            time3 = int(now_time - time2)  # 解封倒计时
                            host = line.split("  ")[1].strip().lstrip()
                            if time3 > 7200:
                                self.slb_del_host(host)
                                d_list.append(host)
                            else:
                                logging.info("%-15s 预计%s 秒解封" % (host, 7200 - time3))
                for deny in d_list:
                    shell = "sed -i '/%s/d' %s" % (deny, self.deny_file)
                    os.system(shell)
                    logging.info("文本中删除黑名单ip:%s" % deny)
    
    
    if __name__ == "__main__":
        index_list = ["**-tomcat","**-tomcat","*****"]
        for index in index_list:
            s = slb(index)
            s.chack_log()
        s.del_text()

     



    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-4-29 14:21 , Processed in 0.071329 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表