- Published on
Hatagawa - BlackhatMEA 2025 CTF (Quals)
- Authors
- Name
- president-xd
- @@justmohsin_

Hello! everyone, a Web/Cryptography CTF player from the Team HasHash
. I’ll be sharing my solutions for the crypto challenges I solved in this CTF. I hope you find them interesting!
Hatagawa
Category: Cryptography
Description: It is said that once in a while, flags might float by when watching over the Hatagawa.
First look at the code they provided to us.
Challenge Script:
#!/usr/bin/env python3
#
# BlackHat MEA CTF 2025 Qualifiers :: Hatagawa I
#
#
# Documentation imports
from __future__ import annotations
from typing import Tuple, List, Dict, NewType, Union
# Native imports
from secrets import randbelow
import os
# External dependencies
# None
# Flag import
FLAG = os.environ.get('DYN_FLAG', 'BHFlagY{this_is_a_fake_flag}')
if isinstance(FLAG, str):
FLAG = FLAG.encode()
# Helper functions
# None
# Challenge classes
class Kawa:
""" 川 """
def __init__(self, par: Tuple[int], seed: int) -> None:
self.a, self.c, self.m = par
self.x = seed
def Get(self) -> bytes:
""" Generates and outputs the next internal state as bytes. """
self.x = (self.a * self.x + self.c) & self.m
return self.x.to_bytes(-(-self.m.bit_length() // 8), 'big')
class Hata:
""" 旗 """
def __init__(self, entropy: object) -> None:
self.entropy = entropy
def Encrypt(self, msg: bytes) -> bytes:
""" Encrypts a message using a one-time pad generated from entropy source. """
otp = b''
while len(otp) < len(msg):
otp += self.entropy.Get()
return bytes([x ^ y for x,y in zip(msg, otp)])
# Main loop
if __name__ == "__main__":
# Challenge parameters
MOD = 2**64 - 1
MUL = (randbelow(MOD >> 3) << 3) | 5
ADD = randbelow(MOD) | 1
# Challenge setup
hatagawa = Hata(Kawa((MUL, ADD, MOD), randbelow(MOD)))
RIVER = r"""|
| ////\\\,,\///,,,,,\,/,\\,//////,\,,\\\,,\\\/,,,\,,//,\\\,
| ~ ~~~~ ~~ ~~~~~~~ ~ ~~ ~~~~~~ ~ ~~~ ~~~ ~~~ ~~ ~~
| ~~~~~~~ ~~~~~~ ~~~ ~ ~~~~~ ~~ ~~~~~~~ ~ ~~ ~~~~~
| ~~~ {} ~
| ~~~ ~ ~~~~~ ~~~~ ~ ~~~~ ~~~~~ ~~~~ ~~~ ~ ~~~~~
| ~~~ ~ ~~~~~ ~ ~~ ~ ~~~~ ~~~ ~~ ~~ ~~~~~~~ ~ ~~
| \\\\\'''\\'////'//'\''\\\/'''\''//'\\\''\///''''\'/'\\'//"""
print(RIVER.format(' '*21 + '旗 川' + ' '*21))
# Main loop
userOptions = ['Stay a while...']
TUI = "|\n| Menu:\n| " + "\n| ".join('[' + i[0] + ']' + i[1:] for i in userOptions) + "\n| [W]alk away\n|"
while True:
try:
print(TUI)
choice = input('| > ').lower()
# [W]alk away
if choice == 'w':
print("|\n| [~] You turn your back to the river...\n|")
break
# [S]tay a while...
elif choice == 's':
print("|\n| [~] Look! Is that a flag floating by?")
print(RIVER.format(hatagawa.Encrypt(FLAG).hex()))
else:
print("|\n| [!] Invalid choice.")
except KeyboardInterrupt:
print("\n|\n| [~] Goodbye ~ !\n|")
break
except Exception as e:
print('|\n| [!] {}'.format(e))
The code may seem big, but it’s quite straightforward. Let’s break it down into Crutial Snippets of the code.
Crutial Snippets
- 64-bit LCG update (wraps at 2^64)
self.x = (self.a * self.x + self.c) & self.m # self.m = 2**64 - 1
- This is a linear congruential generator (LCG).
& (2**64 - 1)
keeps only the lowest 64 bits ⇒ arithmetic is effectively mod 2^64.- Mod 2^64 is special: you can strip common powers of two, then invert the odd part easily.
- The keystream is the full state
return self.x.to_bytes(-(-self.m.bit_length() // 8), 'big') # 8 bytes
- Every block leaks the entire internal 64-bit state as 8 bytes.
- If you know 8 bytes of plaintext at that position, XOR gives you the exact state used there.
- Parameter shapes help the attack
MUL = (randbelow(MOD >> 3) << 3) | 5 # ⇒ a ≡ 5 (mod 8) (also a ≡ 1 (mod 4))
ADD = randbelow(MOD) | 1 # ⇒ c is odd
- These fixed properties (a mod 8, c odd) shrink the search and make lifting consistent.
- OTP = concatenated states; PRNG never resets
otp = b''
while len(otp) < len(msg):
otp += self.entropy.Get() # 8 bytes per call; advance LCG each time
- Press S again and again: the generator continues; it does not reseed.
- Each press encrypts the same flag but with the *next states.
- Known prefix reveals start state of each slice
# first 8 ciphertext bytes of slice i:
# ct_i[:8] = b"BHFlagY{" ^ p64(z_i)
z_i = int.from_bytes(bytes(ci ^ pi for ci, pi in zip(ct_i[:8], b"BHFlagY{")), "big")
- Flags start with
BHFlagY{
(8 bytes). XORing the first 8 ciphertext bytes with this prefix givesz_i
, the state at the start of slicei
.
- Slice starts follow a step-n LCG
Let one slice use n = ceil(len(flag)/8)
states. Then the starts z_i
obey:
z_{i+1} = A*z_i + c*S (mod 2^64)
A = a^n (mod 2^64)
S = 1 + a + a^2 + ... + a^{n-1} (mod 2^64)
With ≥ 3 slices (
z_0, z_1, z_2
), we can recoverA
from differences.With
A
we lift all valida
(respect a ≡ 5 (mod 8)
), then solve forc
(odd), and decrypt.
Vulnerability / Bug
Wrong modulus assumption: Using
& (2**64-1)
is effectively mod 2^64, notmod 2^64-1
.State leak: The keystream block is the state (8 bytes) → maximum leakage with any known plaintext.
No reseed/reset: Each S uses the same PRNG instance, advancing forward. Slices form a step-n LCG you can solve.
Predictable prefix: Flag starts with
BHFlagY{
(8 bytes) → gives you each slice’s start state instantly}.Parameter constraints: a ≡ 5 (mod 8), c odd → recovery becomes deterministic and fast.
My Approach
Goal: press S many times, recover the start states z_i
, solve the step-n LCG, decrypt.
Step 1 — Collect slices
- Open the connection to netcat instance, press S around 10–14 times.
- Each time, parse the big hex line to get the ciphertext slice.
z_i
Step 2 — Recover start states - Use the known prefix
P = b"BHFlagY{"
:z_i = u64( ct_i[:8] ^ P )
A = a^n (mod 2^64)
Step 3 — Recover the step multiplier For triples
(z0, z1, z2)
:d1 = z1 - z0
andd2 = z2 - z1
(mod 2^64)- Let
v = min(trailing_zeros(d1), trailing_zeros(d2))
- Divide both by
2^v
so they become odd, then computeA ≡ d2' * inv(d1') (mod 2^(64 - v))
- This yields a small set of candidates for
A
(the topv
bits are free).
Do this for several triples and intersect the candidate sets ⇒ usually one
A
.
a
from A
Step 4 — Lift all valid We need
a^n ≡ A (mod 2^64)
anda ≡ 5 (mod 8)
.Start with candidates
{5}
modulo 8.For each bit position
t = 3..63
:- Work modulo
2^(t+1)
. - Try adding
0
or1<<t
to each candidate; keep only those where(a_try^n) ≡ A (mod 2^(t+1))
.
- Work modulo
The survivors are the full 64-bit
a
values allowed by the constraints.
c
(must be odd)
Step 5 — Solve for Compute
S = 1 + a + a^2 + ... + a^{n-1} (mod 2^64)
.Let
vS = trailing_zeros(S)
. IfB = z_{i+1} − A*z_i
is not divisible by2^{vS}
, discard thisa
.Otherwise:
- Reduce:
B' = B / 2^{vS}
,S' = S / 2^{vS}
(nowS'
is odd). - Invert:
c ≡ B' * inv(S') (mod 2^(64 - vS))
- Lift to 64 bits: there are
2^{vS}
possiblec
; keep only odd ones.
- Reduce:
Verify the candidate
(a, c)
across all pairs(z_i, z_{i+1})
.
Step 6 — Decrypt
- For any slice, regenerate its OTP from start state and
(a, c)
and XOR. - You should see
BHFlagY{...}
.
Why this is works reliably: Everything happens mod
2^64
with tiny candidate sets. The constraints (prefix,a ≡ 5 (mod 8)
,c
odd) cut the space down hard. Cross-pair checks remove false positives.
Final Solution
Let's construct the final solution script:
#!/usr/bin/env python3
import re
import socket
import sys
from typing import List, Set
HOST_DEFAULT = "34.252.33.37"
PORT_DEFAULT = 30706
TAKE_SLICES = 14 # grab plenty of consecutive slices in one session
DEFAULT_PREFIX = b"BHFlagY{" # 8-byte known prefix from challenge
MASK64 = (1 << 64) - 1
HEX_RE = re.compile(rb"\b([0-9a-f]{32,})\b", re.I)
# -------------------- small helpers --------------------
def tz(x: int) -> int:
if x == 0:
return 64
return (x & -x).bit_length() - 1
def inv_odd_mod_2k(a: int, k: int) -> int:
assert a & 1
x = 1
for i in range(1, k):
mod = 1 << (i + 1)
x = (x * (2 - (a * x) % mod)) % mod
return x % (1 << k)
def u64(b: bytes) -> int:
return int.from_bytes(b, "big")
def p64(x: int) -> bytes:
return (x & MASK64).to_bytes(8, "big")
def geom_sum_mod(a: int, n: int, M: int) -> int:
s, term = 0, 1 % M
for _ in range(n):
s = (s + term) % M
term = (term * a) % M
return s
# -------------------- I/O with server --------------------
def recv_until(sock: socket.socket, needle: bytes, max_bytes: int = 1_000_000) -> bytes:
buf = bytearray()
while True:
chunk = sock.recv(4096)
if not chunk:
break
buf += chunk
if needle in buf or len(buf) >= max_bytes:
break
return bytes(buf)
def get_ciphertexts(host: str, port: int, count: int) -> List[bytes]:
s = socket.create_connection((host, port))
recv_until(s, b"Menu:")
ctexts = []
for _ in range(count):
s.sendall(b"S\n")
data = recv_until(s, b"Menu:")
m_all = HEX_RE.findall(data)
if not m_all:
s.sendall(b"W\n")
s.close()
raise RuntimeError("No ciphertext hex found in server output.")
hex_blob = max(m_all, key=len)
ctexts.append(bytes.fromhex(hex_blob.decode()))
s.sendall(b"W\n")
s.close()
return ctexts
# -------------------- core recovery --------------------
def recover_stepn_A_candidates(z0: int, z1: int, z2: int) -> Set[int]:
"""Return all 64-bit A candidates from z0->z1->z2 under modulus 2^64."""
d1 = (z1 - z0) & MASK64
d2 = (z2 - z1) & MASK64
if d1 == 0 or d2 == 0:
return set()
v = min(tz(d1), tz(d2))
k = 64 - v
d1p = d1 >> v
d2p = d2 >> v
inv = inv_odd_mod_2k(d1p, k)
A_mod = (d2p * inv) % (1 << k)
return { (A_mod + (t << k)) & MASK64 for t in range(1 << v) }
def lift_all_a_from_A(A: int, n: int) -> Set[int]:
"""
All a (64-bit) such that a^n == A (mod 2^64), with a ≡ 5 (mod 8).
Lift bit-by-bit: keep the extensions that match pow(a,n,M) at each M=2^(t+1).
"""
cands = {5} # per challenge MUL = (..<<3)|5 => a ≡ 5 (mod 8)
for t in range(3, 64):
M = 1 << (t + 1)
tgt = A & (M - 1)
new = set()
for a in cands:
a0 = a % M
# next bit can be 0 or 1
for bit in (0, 1):
a_try = (a0 + (bit << t)) % M
if pow(a_try, n, M) == tgt:
new.add(a_try)
if not new:
return set()
cands = new
return cands # full 64-bit values
def recover_c_lifts(a: int, z0: int, z1: int, n: int) -> List[int]:
"""All odd 64-bit c consistent with z1 = a^n*z0 + c*S (mod 2^64)."""
M = 1 << 64
A = pow(a, n, M)
S = geom_sum_mod(a, n, M)
vS = tz(S)
B = (z1 - (A * z0) % M) % M
if vS > 0 and (B & ((1 << vS) - 1)) != 0:
return []
k = 64 - vS
if k == 0:
return []
S_red = (S >> vS) % (1 << k)
if (S_red & 1) == 0:
return [] # should be odd after removing all 2s
invS = inv_odd_mod_2k(S_red, k)
c_base = ((B >> vS) * invS) % (1 << k)
cands = []
for t in range(1 << vS):
c = (c_base + (t << k)) & MASK64
if c & 1: # ADD was chosen odd
cands.append(c)
return cands
def stepn_from_z(z: int, a: int, c: int, n: int) -> int:
"""Compute z' = A*z + c*S (mod 2^64) using fast pow/sum."""
M = 1 << 64
A = pow(a, n, M)
S = geom_sum_mod(a, n, M)
return (A * z + (c * S) % M) & MASK64
def decrypt_first_slice(ct: bytes, a: int, c: int, z0: int) -> bytes:
L = len(ct)
n = (L + 7) // 8
s = z0
otp = bytearray()
for _ in range(n):
otp += p64(s)
s = (a * s + c) & MASK64
return bytes(ci ^ oi for ci, oi in zip(ct, otp[:L]))
def solve_instance(host: str, port: int, count: int, prefix: bytes):
if len(prefix) != 8:
raise RuntimeError("Flag prefix must be exactly 8 bytes (e.g., BHFlagY{).")
ctexts = get_ciphertexts(host, port, count=count)
L = len(ctexts[0])
n = (L + 7) // 8
# Recover start-of-slice states z_i from first 8 bytes using known prefix.
z = [ u64(bytes(ci ^ pi for ci, pi in zip(ct[:8], prefix))) for ct in ctexts ]
if len(z) < 3:
raise RuntimeError("Need at least 3 slices to solve.")
# Intersect A candidates from multiple triples to prune hard.
A_set = None
triples = min(len(z) - 2, 6) # use up to 6 triples
for i in range(triples):
Ai = recover_stepn_A_candidates(z[i], z[i+1], z[i+2])
if not Ai:
continue
A_set = Ai if A_set is None else (A_set & Ai)
if A_set and len(A_set) == 1:
break
if not A_set:
raise RuntimeError("Failed to recover step-n multiplier A.")
# Try each A, lift all 'a', solve 'c', verify across all pairs, decrypt.
for A in A_set:
a_candidates = lift_all_a_from_A(A, n)
for a in a_candidates:
# c must be consistent across ALL pairs modulo 2^(64 - v2(S))
M = 1 << 64
S = geom_sum_mod(a, n, M)
vS = tz(S)
k = 64 - vS
if k == 0:
continue
S_red = (S >> vS) % (1 << k)
if (S_red & 1) == 0:
continue
invS = inv_odd_mod_2k(S_red, k)
Achk = pow(a, n, M)
# derive c_hat from each pair and require equality
c_hat = None
consistent = True
for i in range(len(z) - 1):
B = ((z[i+1] - (Achk * z[i]) % M) % M)
if vS > 0 and (B & ((1 << vS) - 1)) != 0:
consistent = False
break
cur = ((B >> vS) * invS) % (1 << k)
if c_hat is None:
c_hat = cur
elif cur != c_hat:
consistent = False
break
if not consistent:
continue
if k >= 1 and (c_hat & 1) == 0:
continue # c must be odd
# enumerate the vS lifts of c and fully verify transitions
c_cands = [ ((c_hat + (t << k)) & MASK64) for t in range(1 << vS) if ((c_hat + (t << k)) & 1) ]
for c in c_cands:
ok = True
for i in range(len(z) - 1):
if stepn_from_z(z[i], a, c, n) != z[i+1]:
ok = False
break
if not ok:
continue
pt = decrypt_first_slice(ctexts[0], a, c, z[0])
if pt.startswith(prefix) and pt.endswith(b"}"):
print(pt.decode(errors="replace"))
return
# relaxed: brace could be inside later
if pt.startswith(prefix) and b"}" in pt:
print(pt.decode(errors="replace"))
return
raise RuntimeError("Parameters found, but no candidate produced a valid-looking flag. "
"Try increasing TAKE_SLICES or confirm the 8-byte prefix.")
# -------------------- entry --------------------
if __name__ == "__main__":
host = HOST_DEFAULT
port = PORT_DEFAULT
if len(sys.argv) >= 3:
host = sys.argv[1]
port = int(sys.argv[2])
prefix = DEFAULT_PREFIX
if len(sys.argv) >= 4:
prefix = sys.argv[3].encode()
try:
solve_instance(host, port, TAKE_SLICES, prefix)
except Exception as e:
print(f"[!] {e}")
sys.exit(1)
Happy Hacking!
References
- Linear Congruential Generator (LCG) — Wikipedia: https://en.wikipedia.org/wiki/Linear_congruential_generator
- Hull–Dobell Theorem (LCG full-period conditions) — Lecture notes (MATH5835M): https://mpaldridge.github.io/math5835/lectures/L11-lcg.html
- Hensel’s Lemma (lifting roots modulo prime powers) — Wikipedia: https://en.wikipedia.org/wiki/Hensel%27s_lemma
- Modular inverse modulo / via Newton–Raphson — J.-G. Dumas, On Newton-Raphson iteration for multiplicative inverses modulo prime powers (arXiv): https://arxiv.org/pdf/1209.6626
- Known-plaintext attack (KPA) — Wikipedia: https://en.wikipedia.org/wiki/Known-plaintext_attack
- One-time pad key reuse (“many-time pad”) — Crypto.SE discussion: https://crypto.stackexchange.com/questions/67866/one-time-pad-vulnerabilities
- Many-Time Pad demo/code — GitHub (educational): https://github.com/andreacanepa/Many-Time-Pad
- Lehmer / Park–Miller RNG (context on LCG variants) — Wikipedia: https://en.wikipedia.org/wiki/Lehmer_random_number_generator