- Published on
Hatagawa II - BlackHat MEA 2025 CTF (Quals)
- Authors

- Name
- president-xd
- @@justmohsin_

Hello! everyone, a Web/Cryptography CTF player from the Team HasHash. I'll be sharing my solutions for the crypto challenges I solved in this CTF. I hope you find them interesting!
Hatagawa II
Category: Cryptography
Description: It is said that once in a while, flags might float by when watching over the Hatagawa — but this time, the river disguises them in a slightly trickier way.
First look at the code they provided to us.
Challenge Script:
#!/usr/bin/env python3
#
# BlackHat MEA CTF 2025 Qualifiers :: Hatagawa II
#
# Documentation imports
from __future__ import annotations
from typing import Tuple, List, Dict, NewType, Union
# Native imports
from secrets import randbelow
import os
# External dependencies
# None
# Flag import
FLAG = os.environ.get('DYN_FLAG', 'BHFlagY{' + b'D3BUGG1NG_1S_FUN'.hex() + '}')
if isinstance(FLAG, str):
FLAG = FLAG.encode()
FFMT = (FLAG[:FLAG.index(b'{') + 1], b'}')
# Helper functions
# None
# Challenge classes
class Kawa:
""" 川 """
def __init__(self, par: Tuple[int], seed: int) -> None:
self.a, self.c, self.m = par
self.x = seed
def Get(self) -> bytes:
""" Generates and outputs the next internal state as bytes. """
self.x = (self.a * self.x + self.c) & self.m
return self.x.to_bytes(-(-self.m.bit_length() // 8), 'big')
class Hata:
""" 旗 """
def __init__(self, entropy: object) -> None:
self.entropy = entropy
def Strip(self, msg: bytes, fmt: Tuple[bytes, bytes]) -> bytes:
""" Strips a message from given bytestring formatting. """
return bytes.fromhex(msg.replace(fmt[0], b'').replace(fmt[1], b'').decode())
def Unstrip(self, cip: str, fmt: Tuple[bytes, bytes]) -> str:
""" Add given string formatting to a ciphertext. """
return fmt[0].decode() + cip + fmt[1].decode()
def Encrypt(self, msg: bytes, fmt: Tuple[bytes, bytes]) -> bytes:
""" Encrypts a message (except its formatting) using a one-time pad generated from entropy source. """
msg = self.Strip(msg, fmt)
otp = b''
while len(otp) <= len(msg):
otp += self.entropy.Get()
cip = bytes([x ^ y for x,y in zip(msg, otp)])
return self.Unstrip(cip.hex(), fmt)
# Main loop
if __name__ == "__main__":
# Challenge parameters
MOD = 2**64 - 1
MUL = (randbelow(MOD >> 3) << 3) | 5
ADD = randbelow(MOD) | 1
# Challenge setup
hatagawa = Hata(Kawa((MUL, ADD, MOD), randbelow(MOD)))
RIVER = r"""|
| ////\\\,,\///,,,,,\,/,\\,//////,\,,\\\,,\\\/,,,\,,//,\\\,
| ~ ~~~~ ~~ ~~~~~~~ ~ ~~ ~~~~~~ ~ ~~~ ~~~ ~~~ ~~ ~~
| ~~~~~~~ ~~~~~~ ~~~ ~ ~~~~~ ~~ ~~~~~~~ ~ ~~ ~~~~~
| ~~~ {} ~
| ~~~ ~ ~~~~~ ~~~~ ~ ~~~~ ~~~~~ ~~~~ ~~~ ~ ~~~~~
| ~~~ ~ ~~~~~ ~ ~~ ~ ~~~~ ~~~ ~~ ~~ ~~~~~~~ ~ ~~
| \\\\\'''\\'////'//'\''\\\/'''\''//'\\\''\///''''\'/'\\'//"""
print(RIVER.format(' '*17 + '旗 川' + ' '*16))
# Main loop
userOptions = ['Stay a while...']
TUI = "|\n| Menu:\n| " + "\n| ".join('[' + i[0] + ']' + i[1:] for i in userOptions) + "\n| [W]alk away\n|"
while True:
try:
print(TUI)
choice = input('| > ').lower()
# [W]alk away
if choice == 'w':
print("|\n| [~] You turn your back to the river...\n|")
break
# [S]tay a while...
elif choice == 's':
print("|\n| [~] Look! A flag..? For FREE?!")
print(RIVER.format(hatagawa.Encrypt(FLAG, FFMT)))
else:
print("|\n| [!] Invalid choice.")
except KeyboardInterrupt:
print("\n|\n| [~] Goodbye ~ !\n|")
break
except Exception as e:
print('|\n| [!] {}'.format(e))
The code may seem big, but it's quite straightforward. Let's break it down into crucial snippets of the code.
Crucial Snippets
1. 64-bit LCG Core
self.x = (self.a * self.x + self.c) & self.m # m = 2**64 - 1
return self.x.to_bytes(-(-self.m.bit_length() // 8), 'big') # 8 bytes
This is a Linear Congruential Generator (LCG):
& (2**64 - 1)effectively performs arithmetic modulo 2^64 (becausem = 2^64 − 1and masking keeps the low 64 bits)- The keystream block is the full internal 64-bit state rendered as 8 bytes (big endian)
Each call to Get() leaks the entire state of the LCG — extremely fragile if any plaintext is known or related.
2. Parameter Structure
MOD = 2**64 - 1
MUL = (randbelow(MOD >> 3) << 3) | 5 # ⇒ a ≡ 5 (mod 8)
ADD = randbelow(MOD) | 1 # ⇒ c is odd
ais constrained to5 mod 8(and thus odd)cis forced odd- These structural constraints heavily shrink the search space for
(a, c)and make the LCG "nice" to reason about modulo 2^64
3. Strip / Unstrip Logic (Format Handling)
def Strip(self, msg: bytes, fmt: Tuple[bytes, bytes]) -> bytes:
return bytes.fromhex(msg.replace(fmt[0], b'').replace(fmt[1], b'').decode())
def Unstrip(self, cip: str, fmt: Tuple[bytes, bytes]) -> str:
return fmt[0].decode() + cip + fmt[1].decode()
FFMTis(b"BHFlagY{", b"}")- Strip:
- Removes the prefix and suffix from
msg(the flag) - The inner part is hex: for the default flag,
b"D3BUGG1NG_1S_FUN".hex() bytes.fromhex(...)converts that hex string into the raw binary plaintext
- Removes the prefix and suffix from
Encryptworks on that stripped binary, not on the ASCII"BHFlagY{...}"directly- Unstrip:
- Takes the resulting ciphertext bytes, hex-encodes them to a string
- Rewraps as
BHFlagY{<hex_ciphertext>}
Visually, each time we press S, we see:
BHFlagY{<hex of (plaintext XOR keystream)>}
The outer formatting is constant and known; only the inner hex changes between samples.
4. OTP Generation and Reuse
def Encrypt(self, msg: bytes, fmt: Tuple[bytes, bytes]) -> bytes:
msg = self.Strip(msg, fmt)
otp = b''
while len(otp) <= len(msg):
otp += self.entropy.Get()
cip = bytes([x ^ y for x,y in zip(msg, otp)])
return self.Unstrip(cip.hex(), fmt)
- The OTP is built by concatenating successive PRNG states (
Get()calls) - The PRNG is not reset between user choices:
- First press: uses states
S[0], S[1], S[2], ... - Second press: uses later states
S[3], S[4], S[5], ...(depending on how many are consumed)
- First press: uses states
- Every press encrypts the same underlying plaintext bytes (the stripped flag), but with different, sequential chunks of the LCG stream
This is classic "many-time pad" behavior — a one-time pad used multiple times with a deterministic keystream.
Vulnerability / Bug
The core cryptographic issues:
1. LCG as a Keystream Generator
An LCG over 64 bits with known structure is not secure. Its outputs are linearly related, and with enough observations you can reconstruct the parameters.
2. Full State Leak per Block
Each 8-byte keystream block is literally the internal state x in big-endian form. Any relation between plaintext and ciphertext gives a direct relation to state.
3. No Reseeding / Global PRNG
The same global Kawa instance is used across all Stay presses. Every use consumes more states from the same chain, giving multiple ciphertexts of the same underlying message under different consecutive states.
4. Plaintext Invariance Across Samples
The plaintext being encrypted (after Strip) is the same bytes every time:
P (binary) = bytes.fromhex(inner_flag_hex)
For each block index b and sample k:
P[b] = S[3k + b] XOR C[k,b]
So for a fixed block index b, all S[3k + b] XOR C[k,b] must be equal across k. This gives a set of strong algebraic constraints linking the states and ciphertext blocks.
5. Strong Parameter Constraints
The multipliers and increment are restricted (a mod 8 = 5, c odd). That makes the solution space for the LCG parameters significantly smaller and makes solving via SMT (Z3) very feasible.
The combination of:
- Deterministic LCG keystream
- Reuse of OTP across multiple encryptions of the same plaintext
- Full-state leakage
- Strong algebraic constraints from plaintext invariance
Makes the scheme completely breakable with a few samples.
My Approach
Step 1: Collect Multiple Ciphertext Samples
We exploit the network interface:
- Connect to the remote service with
pwntools.remote - Repeatedly send
Sto trigger new encryptions - Each response prints an ASCII art river with a formatted flag inside:
... BHFlagY{<hex_ciphertext>} ... - We use a regex to extract
{[0-9a-f]+}and store each inner hex string as a ciphertext sampleC_k
Code fragment:
def parse_flag_block(text: bytes) -> Tuple[str, bytes]:
m = re.search(rb'([^\s{}]+)\{([0-9a-f]+)\}', text)
if not m:
m2 = re.search(rb'\{([0-9a-f]+)\}', text)
if not m2:
raise ValueError("Could not find {hex} in response.")
prefix = "BHFlagY"
hex_payload = m2.group(1).decode()
else:
prefix = m.group(1).decode()
hex_payload = m.group(2).decode()
return prefix, bytes.fromhex(hex_payload)
def collect_ciphertexts(host: str, port: int, num_samples: int):
r = remote(host, port)
r.recvuntil(b'> ')
cblocks = []
prefix_seen = None
for _ in range(num_samples):
r.sendline(b'S')
chunk = r.recvuntil(b'| > ', drop=False)
pref, cbytes = parse_flag_block(chunk)
if prefix_seen is None:
prefix_seen = pref
cblocks.append(cbytes)
r.sendline(b'W')
r.close()
return prefix_seen, cblocks
We grab ~8 samples to have enough constraints.
Step 2: Split Ciphertexts into Aligned Blocks
The plaintext after Strip is arbitrary bytes; we only know that the same bytes are used for every encryption. We split each ciphertext byte string into blocks corresponding to 64-bit keystream outputs:
def split_blocks(cblocks: List[bytes]):
N = len(cblocks[0])
B = ceil(N / 8) # number of blocks
last_len = N - 8*(B-1) # length of last partial block
obs_blocks = []
for cb in cblocks:
blocks = []
off = 0
for b in range(B):
L = 8 if b < B-1 else last_len
blocks.append(cb[off:off+L])
off += L
obs_blocks.append(blocks)
return N, B, last_len, obs_blocks
Each obs_blocks[k][b] is the b-th ciphertext block for sample k.
We then "top-align" each block into a 64-bit word:
def top_aligned_u64(block_bytes: bytes) -> int:
L = len(block_bytes)
v = int.from_bytes(block_bytes, 'big')
shift = 8 * (8 - L)
return (v << shift) & MASK64
So the bytes occupy the high bytes of a 64-bit integer, making it easy to compare "top bits" across samples even when the last block is shorter than 8 bytes.
Step 3: Build Z3 Model for LCG + Plaintext Invariance
We construct a bit-vector SMT system:
Unknowns:
a,cas 64-bitBitVecsS[i]for each emitted state (3 per sample)
Constraints:
Parameter shape
s.add((a & 7) == 5) # a ≡ 5 (mod 8) s.add((c & 1) == 1) # c oddLCG transitions
for i in range(total_states - 1): s.add(S[i+1] == a * S[i] + c) # modulo 2^64 via bit-vector wrapPlaintext invariance across samples / blocks
For each block index
band samplek:Let:
C[0,b]be the top-aligned ciphertext block from sample 0C[k,b]from samplek- Plaintext
P_bis the same for all k, so:
P_b = S[b] XOR C[0,b] = S[3k + b] XOR C[k,b]Implemented as:
for b in range(B): L = 8 if b < B-1 else last_len top_hi = 63 top_lo = 64 - 8*L C0b = BitVecVal(top_aligned_u64(obs[0][b]), W) for k in range(T): Ckb = BitVecVal(top_aligned_u64(obs[k][b]), W) left = S[3*k + b] ^ Ckb right = S[b] ^ C0b if L == 8: s.add(left == right) else: s.add(Extract(top_hi, top_lo, left) == Extract(top_hi, top_lo, right))For full blocks we equate all bits; for the shorter last block we equate only the top
8*Lbits.
Step 4: Ask Z3 to Solve and Recover (a, c) and Plaintext
Once constraints are set:
if s.check() != sat:
return None
m = s.model()
a_val = m[a].as_long() & MASK64
c_val = m[c].as_long() & MASK64
Then we recover the plaintext bytes by reusing:
plain = bytearray()
for b in range(B):
L = 8 if b < B-1 else last_len
Sb = (m[S[b]].as_long() & MASK64)
C0b = top_aligned_u64(obs[0][b])
Pb64 = Sb ^ C0b
pb = Pb64.to_bytes(8, 'big')[:L] # take TOP L bytes
plain += pb
This plain corresponds to the binary message produced by Strip (the pre-OTP bytes). The original flag format is:
prefix + "{" + plain.hex() + "}"
So we reconstruct the final flag string as:
true_flag = f"{prefix}{{{plain.hex()}}}"
Which matches the original FLAG.
Final Solution Script
Here is the full solver:
#!/usr/bin/env python3
"""
BlackHat MEA CTF 2025 Quals :: Hatagawa II
Advanced LCG Attack with Z3
"""
import sys
import re
from math import ceil
from typing import List, Tuple
from pwn import remote
from z3 import Solver, BitVec, BitVecVal, Extract, sat
W = 64
MASK64 = (1 << 64) - 1
# ---------------- I/O helpers ----------------
def parse_flag_block(text: bytes) -> Tuple[str, bytes]:
m = re.search(rb'([^\s{}]+)\{([0-9a-f]+)\}', text)
if not m:
m2 = re.search(rb'\{([0-9a-f]+)\}', text)
if not m2:
raise ValueError("Could not find {hex} in response.")
prefix = "BHFlagY"
hex_payload = m2.group(1).decode()
else:
prefix = m.group(1).decode()
hex_payload = m.group(2).decode()
return prefix, bytes.fromhex(hex_payload)
def collect_ciphertexts(host: str, port: int, num_samples: int) -> Tuple[str, List[bytes]]:
r = remote(host, port)
r.recvuntil(b'> ')
cblocks = []
prefix_seen = None
for _ in range(num_samples):
r.sendline(b'S')
chunk = r.recvuntil(b'| > ', drop=False)
pref, cbytes = parse_flag_block(chunk)
if prefix_seen is None:
prefix_seen = pref
cblocks.append(cbytes)
r.sendline(b'W')
try:
r.recv(timeout=0.2)
except Exception:
pass
r.close()
return prefix_seen, cblocks
# ---------------- Z3 construction ----------------
def split_blocks(cblocks: List[bytes]):
N = len(cblocks[0])
if any(len(cb) != N for cb in cblocks):
raise ValueError("Inconsistent ciphertext lengths.")
B = ceil(N / 8)
last_len = N - 8*(B-1) if B > 0 else 0
obs_blocks = []
for cb in cblocks:
blocks = []
off = 0
for b in range(B):
L = 8 if b < B-1 else last_len
blocks.append(cb[off:off+L])
off += L
obs_blocks.append(blocks)
return N, B, last_len, obs_blocks
def top_aligned_u64(block_bytes: bytes) -> int:
"""Place these bytes at the TOP of a 64-bit word (big-endian block)."""
L = len(block_bytes)
v = int.from_bytes(block_bytes, 'big')
shift = 8 * (8 - L)
return (v << shift) & MASK64
def solve(cblocks: List[bytes]):
T = len(cblocks)
N, B, last_len, obs = split_blocks(cblocks)
# Total emitted states across T presses: 3 per press.
total_states = 3 * T
s = Solver()
a = BitVec('a', W)
c = BitVec('c', W)
S = [BitVec(f's_{i}', W) for i in range(total_states)]
# Parameter shape: a ≡ 5 (mod 8); c odd
s.add((a & 7) == 5)
s.add((c & 1) == 1)
# LCG transitions across all emitted states
for i in range(total_states - 1):
s.add(S[i+1] == a * S[i] + c) # modulo 2^64 via bit-vector wrap
# Plaintext invariance across samples:
# For block b, top bytes of (S[3k+b] XOR C[k,b]) equal those of (S[b] XOR C[0,b]).
for b in range(B):
L = 8 if b < B-1 else last_len
top_hi = 63
top_lo = 64 - 8*L
C0b = BitVecVal(top_aligned_u64(obs[0][b]), W)
for k in range(T):
Ckb = BitVecVal(top_aligned_u64(obs[k][b]), W)
left = S[3*k + b] ^ Ckb
right = S[b] ^ C0b
if L == 8:
s.add(left == right)
else:
s.add(Extract(top_hi, top_lo, left) == Extract(top_hi, top_lo, right))
if s.check() != sat:
return None
m = s.model()
a_val = m[a].as_long() & MASK64
c_val = m[c].as_long() & MASK64
# Recover plaintext bytes from the base sample k=0
plain = bytearray()
for b in range(B):
L = 8 if b < B-1 else last_len
Sb = (m[S[b]].as_long() & MASK64)
C0b = top_aligned_u64(obs[0][b])
Pb64 = Sb ^ C0b
pb = Pb64.to_bytes(8, 'big')[:L] # take TOP L bytes
plain += pb
return a_val, c_val, bytes(plain)
# ---------------- main ----------------
def main():
host = sys.argv[1] if len(sys.argv) > 1 else "34.252.33.37"
port = int(sys.argv[2]) if len(sys.argv) > 2 else 32303
samples = int(sys.argv[3]) if len(sys.argv) > 3 else 8
print(f"[*] Connecting to {host}:{port} and collecting {samples} samples...")
prefix, cblocks = collect_ciphertexts(host, port, samples)
print(f"[*] Prefix detected: {prefix}")
print(f"[*] Ciphertext length: {len(cblocks[0])} bytes ({len(cblocks[0])*2} hex chars)")
res = solve(cblocks)
if res is None:
print("[!] UNSAT. Try increasing samples (e.g., 10–12).")
sys.exit(2)
a_val, c_val, plain = res
true_flag = f"{prefix}{{{plain.hex()}}}"
print(f"[+] a = 0x{a_val:016x}, c = 0x{c_val:016x}")
print(f"[+] True flag recovered:\n{true_flag}")
if __name__ == "__main__":
main()
References
- Linear Congruential Generator (LCG) — Wikipedia: https://en.wikipedia.org/wiki/Linear_congruential_generator
- Known-plaintext attack (KPA) — Wikipedia: https://en.wikipedia.org/wiki/Known-plaintext_attack
- One-time pad & many-time pad issues — Crypto.SE discussions: https://crypto.stackexchange.com/questions/67866/one-time-pad-vulnerabilities
- Z3 SMT solver and bit-vectors (official docs / tutorials): https://theory.stanford.edu/~nikolaj/programmingz3.html
Happy hacking!