宇宙安全声明:本文纯探讨技术,没有任何使用不法手段绕过校园网计费或监管的意图.
比较好奇校园网下的认证机制,空闲的时候做了一些探索,在此记录下来。本文会阐述校园网如何对非认证主机断网,以及如何利用校园网对 UDP 53 端口的豁免实现免认证上网.
对于后者,@IcyFeather 大佬已经提供了一个很完备的方案,而本文将尝试给出一个基于此的 VPN 代码实现。
本文所有内容亦保存于 GitHub:GitHub - wait1210day/scu-netkazari: Netkazari - Research on the campus network of Sichuan University
网络行为探查
测试环境:江安西园某舍,宿舍中配备了无线 AP 和有线网络
接入阶段: 新设备接入,首先发送 DHCP Discover 包募集网络中的 DHCP 服务器,此时会收到来自 10.134.135.254 [58:69:6c:4c:47:53] 的回复. DHCP 向设备分发了 IP,子网掩码 255.255.248.0,以及默认网关 10.134.135.254. 据此,可以得知该子网 IP 地址范围为 10.134.128.0 到 10.134.135.254.
如下是抓包得到的 DHCP ACK 回复:
此时设备将获得 IP 地址,且网关也已经由 DHCP 请求知晓有新设备接入,并拿到了新设备的 MAC.
L2/L3 交换隔离:
Rule 1. 交换侧会丢弃 任何目的 MAC 不是网关的 Ethernet 帧,纵使目的 MAC 地址指向了网络内一个有效的设备. 特别地,对于广播 MAC 地址,除网关外的所有设备都不会收到广播. 也就是说:
from MAC1 to MAC0
Source ---------------------> Gateway
MAC1 Link OK MAC0
from MAC1 to MAC2
Source ----------X----------> Destination
MAC1 Dropped MAC2
Rule 2. 如果一个 IP 包的目的 MAC 是网关,网关将接受该包,并根据该包的目的 IP 地址将其转发到网络内对应的设备上(如果目的 IP 指向网络内的设备的话). 该过程数据包的转发路径:
from IP1 [MAC1] to IP2 [MAC0]
Source ---------------------------------> Gateway
IP1 [MAC1] IP0 [MAC0]
|
Destination <-------------------------------'
IP2 [MAC2] from IP1 [MAC0] to IP2 [MAC2]
可以看到网关将修改 Ethernet 帧的 MAC 地址,而不改变 IP 帧的地址.
Rule 3. 根据 Rule 1,ARP 广播查询请求不会被局域网中的任何设备受理,而网关这时会给出回复. 无论发送对何种 IP 地址的查询,只要 IP 地址在某个特定范围内(目前观察到的范围是对当前子网下的所有 IP 都成立),无论这个 IP 是否被分配,网关都会给出回应. 且回应全部指向自己的 MAC 地址.
Rule 4. 根据 Rule 2,对于没有认证的主机,网关将拒绝转发来自它的流量(少数 UDP 端口得到了豁免,见下文),但会 继续转发流向它的流量.
针对以上四条规则,可总结出如下流控特征:
- 交换侧通过丢弃任何目标 MAC 地址不是网关的帧来阻止内网设备间(不经过网关)直接通信;
- 网关通过拦截 ARP 广播,并制造假的回复,引导所有主机将内网流量发送给自己(外网流量会由每台主机的默认路由规则自然地发向网关);
- 网关通过拒绝转发来自未认证主机的流量来阻止未认证主机联网.
DNS Tunnel
网关在进行转发时,对 UDP 53 端口有豁免机制,这是用于 DNS 查询的端口. 本人对 UDP 50-100 内的所有端口都进行了测试,发现有且仅有 53 端口可以得到豁免.
具体的豁免机制为:无论主机是否认证,网关默认放行所有目标端口为 53 的 UDP 流量,直达公网,而不检测其源端口,也不检测 UDP 的载荷是否为有效的 DNS 数据报.
经过实验探查,防火墙对 UDP 连接的保活时间为 5s:自最后一个出站/入站包开始计时,若在5s 内没有匹配端口的包入站/出站,UDP 连接会被防火墙关闭,表现为禁止新的包在该端口上继续入站.
基于此,可以用 UDP 53 端口承载流量,然后发送到存在于公网的代理服务器,由代理服务器解析并代理流量. 虽然 UDP 是不可靠的传输,但是其中包含的协议(如 TCP)会保证传输是可靠的(如果协议需要的话).
下面的代码可以对 UDP 53 的往返连通性进行简单测试:
## Client:
import socket
SERVER_IP = 'xxx.xxx.xxx.xxx'
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.sendto('hello'.encode(), (SERVER_IP, 53))
packet, addr = sock.recvfrom(512)
print(f'recv: [{packet.decode()}] from {addr}')
## Server (requires root privilege):
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.bind(('0.0.0.0', 53))
while True:
packet, addr = sock.recvfrom(512)
# Sometimes we receive packets from 127.0.0.1, just ignore them
if addr[0] == '127.0.0.1':
continue
print(f'recv: [{packet.decode()}] from {addr}')
sock.sendto('response'.encode(), addr)
包冗余现象:观察到,从主机发出一个包,而对方能收到两个完全一致的包,原因暂且不明. 于是在我们的协议中加入了 sequence number 机制可以避免收到重复包.
一个 DNS tunnel 的参考实现可见:
(该实现是本人一天时间手搓出来的原型,没有任何复杂的传输控制机制,也没有任何加密,请勿用于实际使用中)
#!/usr/bin/env python3
import socket
import sys
import struct
import random
import asyncio
import errno
from pytun import TunTapDevice
SERVER_IP = 'xxx.xxx.xxx.xxx' # For server, IP
CLIENT_IFACE = 'wlan0' # For client, sending package from which iface, use `None` for default
PROTO_HDR = b'\x66\xcc\xff\xff'
PROTO_HDR_LENGTH = len(PROTO_HDR)
TUN_LINK_MTU = 900
TUN_TRANSPORT_SIZE = TUN_LINK_MTU + 16
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
class KazariIO:
sock: socket.socket
def proto_recv_next(self, bufsize: int) -> tuple[bytes, tuple[str, int]]:
pass
def proto_send_next(self, payload: bytes) -> None:
pass
def proto_recv_next_checked(self, bufsize: int) -> tuple[bytes, tuple[str, int]]:
while True:
packet = self.proto_recv_next(bufsize)
if packet is not None:
return packet
class KazariTunnelClientIO(KazariIO):
def __init__(self, ip: str, udp_port: int, iface: str = None):
self.server_ip = ip
self.server_port = udp_port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.proto_tx_seq = 0
self.proto_rx_seq = 0
if iface is not None:
self.sock.setsockopt(socket.SOL_SOCKET, 25, str(iface + '\0').encode())
def udp_recv(self, bufsize: int) -> bytes:
packet, addr = self.sock.recvfrom(bufsize)
if addr[0] == self.server_ip and addr[1] == self.server_port:
return packet
return None
def proto_recv_next(self, bufsize: int) -> tuple[bytes, tuple[str, int]]:
payload = self.udp_recv(bufsize)
if payload is None:
return None
# 8 bytes == 4 bytes of header + 4 bytes of seq number
if len(payload) <= 8:
return None
if payload[:PROTO_HDR_LENGTH] != PROTO_HDR:
return None
seq = int.from_bytes(payload[PROTO_HDR_LENGTH:PROTO_HDR_LENGTH + 4], byteorder='big')
if seq <= self.proto_rx_seq:
return None
self.proto_rx_seq = seq
return (payload[PROTO_HDR_LENGTH + 4:], (self.server_ip, self.server_port))
def proto_send_next(self, payload: bytes) -> None:
self.proto_tx_seq += 1
udp_payload = PROTO_HDR + self.proto_tx_seq.to_bytes(4, byteorder='big') + payload
self.sock.sendto(udp_payload, (self.server_ip, self.server_port))
class KazariTunnelServerIO(KazariIO):
def __init__(self, listen_addr: str, udp_port: int):
self.listen_addr = listen_addr
self.port = udp_port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((listen_addr, udp_port))
self.proto_rx_seq = 0
self.proto_tx_seq = 0
self.client_ip: None | str = None
self.client_port: None | int = None
def set_client_address(self, ip: str, port: int) -> None:
self.client_ip = ip
self.client_port = port
def proto_recv_next(self, bufsize: int) -> tuple[bytes, tuple[str, int]]:
payload, addr = self.sock.recvfrom(bufsize)
if len(payload) <= 8:
return None
if payload[:PROTO_HDR_LENGTH] != PROTO_HDR:
return None
seq = int.from_bytes(payload[PROTO_HDR_LENGTH:PROTO_HDR_LENGTH + 4], byteorder='big')
if seq <= self.proto_rx_seq:
return None
self.proto_rx_seq = seq
return (payload[PROTO_HDR_LENGTH + 4:], addr)
def proto_send_next(self, payload: bytes) -> None:
self.proto_tx_seq += 1
udp_payload = PROTO_HDR + self.proto_tx_seq.to_bytes(4, byteorder='big') + payload
self.sock.sendto(udp_payload, (self.client_ip, self.client_port))
async def handle_peer_io(io: KazariIO, tun_ip: str) -> None:
# Create TUN virtual device
tun = TunTapDevice(name='kztun0')
tun.addr = tun_ip
tun.netmask = '255.255.255.0'
tun.mtu = TUN_LINK_MTU
tun.up()
def on_recv():
packet = None
try:
result = io.proto_recv_next(1024)
if result is None:
return
packet = result[0]
except socket.error as e:
err = e.args[0]
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
return
else:
print(f'IO error: {err}')
if packet == b'HEARTBEAT':
print('recv: peer heartbeat')
elif packet[:7] == b'PAYLOAD':
payload = packet[7:]
print(f'got payload {len(payload)} bytes')
tun.write(payload)
#print(payload)
else:
print(f'unrecognized packet: {packet}')
# handle_packet_arrival(packet, tun)
def on_tun_recv():
packet = tun.read(TUN_TRANSPORT_SIZE)
io.proto_send_next(b'PAYLOAD' + packet)
print(f'sent payload {len(packet)} bytes')
async def on_send_heartbeat():
while True:
io.proto_send_next(b'HEARTBEAT')
await asyncio.sleep(1)
io.sock.setblocking(False)
loop.add_reader(io.sock.fileno(), on_recv)
loop.add_reader(tun.fileno(), on_tun_recv)
await on_send_heartbeat()
def server_main():
io = KazariTunnelServerIO('0.0.0.0', 53)
# Waiting for handshake
print('handshake: waiting for handshake SYN')
handshake_payload, client_addr = io.proto_recv_next_checked(256)
print(f'handshake: request from {client_addr[0]}:{client_addr[1]}')
handshake_syn, handshake_id = struct.unpack('3sI', handshake_payload)
if handshake_syn.decode() != 'SYN':
print('handshake: invalid request')
print(f'handshake: got SYN with id={handshake_id}, sending reply ACK with id={handshake_id + 1}')
io.set_client_address(client_addr[0], client_addr[1])
io.proto_send_next(struct.pack('3sI', 'ACK'.encode(), handshake_id + 1))
print(f'handshake: done')
loop.run_until_complete(handle_peer_io(io, '10.1.1.1'))
def client_main():
io = KazariTunnelClientIO(SERVER_IP, 53, CLIENT_IFACE)
# Handshake stage
handshake_id = random.randint(0x0000, 0xffff)
io.proto_send_next(struct.pack('3sI', 'SYN'.encode(), handshake_id))
print(f'handshake: sent SYN with id={handshake_id}, waiting for ACK response')
handshake_ack, handshake_ack_id = struct.unpack('3sI', io.proto_recv_next_checked(256)[0])
if handshake_ack.decode() != 'ACK':
print('handshake: invalid response')
return
print(f'handshake: got reply ACK with id={handshake_ack_id}')
if handshake_ack_id != handshake_id + 1:
print('handshake: invalid id in response')
return
print('handshake: done')
loop.run_until_complete(handle_peer_io(io, '10.1.1.2'))
if __name__ == '__main__':
mode = sys.argv[1]
if mode == 'server':
server_main()
elif mode == 'client':
client_main()
else:
print('invalid mode')
该实现利用 TUN 设备来接入内核 IP 协议栈,然后把数据包装为 UDP 包,发送到 53 端口.
用法: 在服务端,确保 53 端口没有被占用(它极有可能被 systemd-resolved 占用,如果是这样的话,暂时停止这个服务即可),然后运行脚本:
$ sudo ./dnstunnel.py server
在客户端,运行:
$ sudo ./dnstunnel.py client
此时,服务端和客户端都会被创建一个名为 kztun0 的网卡设备,服务端被设置 IP 10.1.1.1,客户端被设置 IP 10.1.1.2,然后二者就可以使用这个网卡进行基于 IPv4 协议的通讯了,可以在客户端使用 10.1.1.1 访问服务端,反之亦然,而无论客户端是否有校园网认证.
例如:可以跑 ssh 开一个 socks5 代理来上网
$ ssh -ND 1080 your_user_name@10.1.1.1
HTTP 速度测试(服务器端是 100Mpbs 带宽):
这也太烂了

