Sangfor AF防火墙端口映射自动化实现原理(针对目的端口映射)

项目背景

提高运维效率,用户自助申请端口映射。
经过层层审批后–>自动开通!

准备工作

*需要在AF上打上natrule.cgi的接口补丁。
*需获取AF的TOKEN。

原理及实现步骤

*通过AF接口,将端口映射的5元组插入到设备并生效。
1、判断端口映射是否重名。
2、判断单个端口是否和用户填写的端口重复。
3、判断多个不连续端口是否和用户填写的端口重复。
4、判断多个连续端口是否和用户填写的端口重复。
5、前端判断用户输入的端口及IP的合法性。
6、获取现在有端口映射列表。

代码实现

import urllib
import urllib.request
import json
import ssl

#获取现有所有端口映射详细数据
def nat_list():
    url = 'https://X.X.X.X/cgi-bin/natrule.cgi'
    values = {"opr":"list","token": "xxxxxxxxxxxxxxx"}
    jdata = json.dumps(values, ensure_ascii=False).encode(encoding='utf-8')     # 对数据进行JSON格式化编码
    contex = ssl._create_unverified_context()
    req = urllib.request.Request(url, jdata)        # 生成页面请求的完整数据
    response = urllib.request.urlopen(req, context=contex)      # 证书不可信的情况
    message = json.loads(response.read().decode('utf-8'))       # 获取服务器返回的页面信息
    list = message['data']
    return list

#将现在有端口映射所有natname都提取出来
def getnatname():
        resp = nat_list()  # 获取端口映射列表所有数据
        #  循环取现有端口映射的name
        namelist = []
        try:
            for i in range(len(resp)):
                if resp[i]['type'] == 1:
                    cn = resp[i]['name']
                    dstname = []
                    dstname =dict({'name':cn})
                    namelist.append(dstname)
            return namelist
        except Exception as e:
            print(e)

#将现在有端口映射所有公网IP地址有提取出来
def getip():
        resp = nat_list()  # 获取端口映射列表所有数据
        #  循环取现有端口映射的dst_ip
        dstip = []  # 将现有端口映射列表的公网IP以字典形式传入列表dstlist
        try:
            for i in range(len(resp)):
                if resp[i]['type'] == 1:
                    an = resp[i]['dst_ip']  # 将现有端口映射dst_ip赋值给an
                    ip = []
                    ip = dict({'dst_ip': an})
                    dstip.append(ip)
            return dstip
        except Exception as e:
                print(e)

#将现在有设备上的所有公网端口(包括单独的端口、范围的端口,不连续的端口)都提取出来
def getport():
    resp = nat_list()  # 获取端口映射列表所有数据
    #  循环取现有端口映射的dst_port
    dstport = []  # 将现有端口映射列表的公网端口以字典形式传入列表dstlist
    try:
        for i in range(len(resp)):
            if resp[i]['type'] == 1:
                bn = resp[i]['dst_port']  # 将现有端口映射dst_port赋值给bn
                port = []
                port = dict({'dst_port': bn})
                dstport.append(port)
        return dstport
    except Exception as e:
        print(e)

#将现在有设备上的所有目的端口映射的公网端口和公网IP整理出来
def ipport():
    allports = getport()
    allips = getip()
    a = []  #将所有IP和端口合并成一个list
    z = []  #将带有-的端口组进行合并
    for i in range(len(allports)):
        strip = allips[i]['dst_ip']
        strport = allports[i]['dst_port']
        if ',' in strport:
            strports = strport.split(',')
            for j in range(len(strports)):
                an = strip
                bn = strports[j]
                cn = dict({'dst_ip': an, 'dst_port': bn})
                a.append(cn)
        elif '-' in strport:
            strports = strport.split('-')
            for x in range(len(strports)):
                begin = strports[x]
                z.append(begin)
            start = int(z[0])
            end = int(z[1]) + 1
            for y in range(start, end):
                an = strip
                bn = str(y)
                cn = dict({'dst_ip': an, 'dst_port': bn})
                a.append(cn)
        else:
            cn = dict({'dst_ip': strip, 'dst_port': strport})
            a.append(cn)
    return a        

#前端传值到这里进行与现在有端口映射公网端口和IP比对
def dst_portip(name,dst_ip,dst_port):
    a = {'dst_ip':dst_ip,'dst_port':dst_port}
    aa ={'dst_port':dst_port,'dst_ip':dst_ip}
    b = {'name':name}
    dstlist = ipport()
    namelist = getnatname()
    if dstlist or namelist:
        if b in namelist:
           return 0  #若映射名重复,返回0
        elif a in dstlist or aa in dstlist:
           return 1  #若端口和IP重复,返回1
    return 10

总结:

端口映射自动化的难处关键在于细节的处理,主要有:重名判断,公网IP、端口判断,IP合法性判断,连续及不连续端口重复判断。

以上这些判断好之后,就不会有太大问题了。