关于校园网无认证下行为的探测,以及 DNS 隧道

宇宙安全声明:本文纯探讨技术,没有任何使用不法手段绕过校园网计费或监管的意图.

比较好奇校园网下的认证机制,空闲的时候做了一些探索,在此记录下来。本文会阐述校园网如何对非认证主机断网,以及如何利用校园网对 UDP 53 端口的豁免实现免认证上网.

对于后者,@IcyFeather 大佬已经提供了一个很完备的方案,而本文将尝试给出一个基于此的 VPN 代码实现。

本文所有内容亦保存于 GitHub:GitHub - wait1210day/scu-netkazari: Netkazari - Research on the campus network of Sichuan University

网络行为探查

测试环境:江安西园某舍,宿舍中配备了无线 AP 和有线网络

接入阶段: 新设备接入,首先发送 DHCP Discover 包募集网络中的 DHCP 服务器,此时会收到来自 10.134.135.254 [58:69:6c:4c:47:53] 的回复. DHCP 向设备分发了 IP,子网掩码 255.255.248.0,以及默认网关 10.134.135.254. 据此,可以得知该子网 IP 地址范围为 10.134.128.010.134.135.254.
如下是抓包得到的 DHCP ACK 回复:

此时设备将获得 IP 地址,且网关也已经由 DHCP 请求知晓有新设备接入,并拿到了新设备的 MAC.

L2/L3 交换隔离:

Rule 1. 交换侧会丢弃 任何目的 MAC 不是网关的 Ethernet 帧,纵使目的 MAC 地址指向了网络内一个有效的设备. 特别地,对于广播 MAC 地址,除网关外的所有设备都不会收到广播. 也就是说:

        from MAC1 to MAC0
Source ---------------------> Gateway
MAC1           Link OK        MAC0


        from MAC1 to MAC2
Source ----------X----------> Destination
MAC1          Dropped         MAC2

Rule 2. 如果一个 IP 包的目的 MAC 是网关,网关将接受该包,并根据该包的目的 IP 地址将其转发到网络内对应的设备上(如果目的 IP 指向网络内的设备的话). 该过程数据包的转发路径:

        from IP1 [MAC1] to IP2 [MAC0]
Source ---------------------------------> Gateway
IP1 [MAC1]                                IP0 [MAC0]
                                            |
Destination <-------------------------------'
IP2 [MAC2]    from IP1 [MAC0] to IP2 [MAC2]

可以看到网关将修改 Ethernet 帧的 MAC 地址,而不改变 IP 帧的地址.

Rule 3. 根据 Rule 1,ARP 广播查询请求不会被局域网中的任何设备受理,而网关这时会给出回复. 无论发送对何种 IP 地址的查询,只要 IP 地址在某个特定范围内(目前观察到的范围是对当前子网下的所有 IP 都成立),无论这个 IP 是否被分配,网关都会给出回应. 且回应全部指向自己的 MAC 地址.

Rule 4. 根据 Rule 2,对于没有认证的主机,网关将拒绝转发来自它的流量(少数 UDP 端口得到了豁免,见下文),但会 继续转发流向它的流量.

针对以上四条规则,可总结出如下流控特征:

  • 交换侧通过丢弃任何目标 MAC 地址不是网关的帧来阻止内网设备间(不经过网关)直接通信;
  • 网关通过拦截 ARP 广播,并制造假的回复,引导所有主机将内网流量发送给自己(外网流量会由每台主机的默认路由规则自然地发向网关);
  • 网关通过拒绝转发来自未认证主机的流量来阻止未认证主机联网.

DNS Tunnel

网关在进行转发时,对 UDP 53 端口有豁免机制,这是用于 DNS 查询的端口. 本人对 UDP 50-100 内的所有端口都进行了测试,发现有且仅有 53 端口可以得到豁免.

具体的豁免机制为:无论主机是否认证,网关默认放行所有目标端口为 53 的 UDP 流量,直达公网,而不检测其源端口,也不检测 UDP 的载荷是否为有效的 DNS 数据报.

经过实验探查,防火墙对 UDP 连接的保活时间为 5s:自最后一个出站/入站包开始计时,若在5s 内没有匹配端口的包入站/出站,UDP 连接会被防火墙关闭,表现为禁止新的包在该端口上继续入站.

基于此,可以用 UDP 53 端口承载流量,然后发送到存在于公网的代理服务器,由代理服务器解析并代理流量. 虽然 UDP 是不可靠的传输,但是其中包含的协议(如 TCP)会保证传输是可靠的(如果协议需要的话).

下面的代码可以对 UDP 53 的往返连通性进行简单测试:

## Client:
import socket

SERVER_IP = 'xxx.xxx.xxx.xxx'

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.sendto('hello'.encode(), (SERVER_IP, 53))

packet, addr = sock.recvfrom(512)
print(f'recv: [{packet.decode()}] from {addr}')

## Server (requires root privilege):
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.bind(('0.0.0.0', 53))

while True:
    packet, addr = sock.recvfrom(512)
    # Sometimes we receive packets from 127.0.0.1, just ignore them
    if addr[0] == '127.0.0.1':
        continue
    print(f'recv: [{packet.decode()}] from {addr}')
    sock.sendto('response'.encode(), addr)

包冗余现象:观察到,从主机发出一个包,而对方能收到两个完全一致的包,原因暂且不明. 于是在我们的协议中加入了 sequence number 机制可以避免收到重复包.

一个 DNS tunnel 的参考实现可见:
(该实现是本人一天时间手搓出来的原型,没有任何复杂的传输控制机制,也没有任何加密,请勿用于实际使用中)

#!/usr/bin/env python3

import socket
import sys
import struct
import random
import asyncio
import errno
from pytun import TunTapDevice


SERVER_IP = 'xxx.xxx.xxx.xxx'       # For server, IP
CLIENT_IFACE = 'wlan0'              # For client, sending package from which iface, use `None` for default

PROTO_HDR = b'\x66\xcc\xff\xff'
PROTO_HDR_LENGTH = len(PROTO_HDR)

TUN_LINK_MTU = 900
TUN_TRANSPORT_SIZE = TUN_LINK_MTU + 16

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

class KazariIO:
    sock: socket.socket

    def proto_recv_next(self, bufsize: int) -> tuple[bytes, tuple[str, int]]:
        pass

    def proto_send_next(self, payload: bytes) -> None:
        pass

    def proto_recv_next_checked(self, bufsize: int) -> tuple[bytes, tuple[str, int]]:
        while True:
            packet = self.proto_recv_next(bufsize)
            if packet is not None:
                return packet


class KazariTunnelClientIO(KazariIO):
    def __init__(self, ip: str, udp_port: int, iface: str = None):
        self.server_ip = ip
        self.server_port = udp_port
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.proto_tx_seq = 0
        self.proto_rx_seq = 0
        if iface is not None:
            self.sock.setsockopt(socket.SOL_SOCKET, 25, str(iface + '\0').encode())

    def udp_recv(self, bufsize: int) -> bytes:
        packet, addr = self.sock.recvfrom(bufsize)
        if addr[0] == self.server_ip and addr[1] == self.server_port:
            return packet
        return None
            
    def proto_recv_next(self, bufsize: int) -> tuple[bytes, tuple[str, int]]:
        payload = self.udp_recv(bufsize)
        if payload is None:
            return None
        # 8 bytes == 4 bytes of header + 4 bytes of seq number
        if len(payload) <= 8:
            return None
        if payload[:PROTO_HDR_LENGTH] != PROTO_HDR:
            return None

        seq = int.from_bytes(payload[PROTO_HDR_LENGTH:PROTO_HDR_LENGTH + 4], byteorder='big')
        if seq <= self.proto_rx_seq:
            return None

        self.proto_rx_seq = seq
        return (payload[PROTO_HDR_LENGTH + 4:], (self.server_ip, self.server_port))
        
    def proto_send_next(self, payload: bytes) -> None:
        self.proto_tx_seq += 1
        udp_payload = PROTO_HDR + self.proto_tx_seq.to_bytes(4, byteorder='big') + payload
        self.sock.sendto(udp_payload, (self.server_ip, self.server_port))


class KazariTunnelServerIO(KazariIO):
    def __init__(self, listen_addr: str, udp_port: int):
        self.listen_addr = listen_addr
        self.port = udp_port
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.bind((listen_addr, udp_port))
        self.proto_rx_seq = 0
        self.proto_tx_seq = 0
        self.client_ip: None | str = None
        self.client_port: None | int = None

    def set_client_address(self, ip: str, port: int) -> None:
        self.client_ip = ip
        self.client_port = port
    
    def proto_recv_next(self, bufsize: int) -> tuple[bytes, tuple[str, int]]:
        payload, addr = self.sock.recvfrom(bufsize)
        if len(payload) <= 8:
            return None
        if payload[:PROTO_HDR_LENGTH] != PROTO_HDR:
            return None

        seq = int.from_bytes(payload[PROTO_HDR_LENGTH:PROTO_HDR_LENGTH + 4], byteorder='big')
        if seq <= self.proto_rx_seq:
            return None

        self.proto_rx_seq = seq
        return (payload[PROTO_HDR_LENGTH + 4:], addr)

    def proto_send_next(self, payload: bytes) -> None:
        self.proto_tx_seq += 1
        udp_payload = PROTO_HDR + self.proto_tx_seq.to_bytes(4, byteorder='big') + payload
        self.sock.sendto(udp_payload, (self.client_ip, self.client_port))


async def handle_peer_io(io: KazariIO, tun_ip: str) -> None:
    # Create TUN virtual device
    tun = TunTapDevice(name='kztun0')
    tun.addr = tun_ip
    tun.netmask = '255.255.255.0'
    tun.mtu = TUN_LINK_MTU
    tun.up()

    def on_recv():
        packet = None
        try:
            result = io.proto_recv_next(1024)
            if result is None:
                return
            packet = result[0]
        except socket.error as e:
            err = e.args[0]
            if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
                return
            else:
                print(f'IO error: {err}')

        if packet == b'HEARTBEAT':
            print('recv: peer heartbeat')
        elif packet[:7] == b'PAYLOAD':
            payload = packet[7:]
            print(f'got payload {len(payload)} bytes')
            tun.write(payload)
            #print(payload)
        else:
            print(f'unrecognized packet: {packet}')
        # handle_packet_arrival(packet, tun)

    def on_tun_recv():
        packet = tun.read(TUN_TRANSPORT_SIZE)
        io.proto_send_next(b'PAYLOAD' + packet)
        print(f'sent payload {len(packet)} bytes')

    async def on_send_heartbeat():
        while True:
            io.proto_send_next(b'HEARTBEAT')
            await asyncio.sleep(1)

    io.sock.setblocking(False)
    loop.add_reader(io.sock.fileno(), on_recv)
    loop.add_reader(tun.fileno(), on_tun_recv)

    await on_send_heartbeat()
    

def server_main():
    io = KazariTunnelServerIO('0.0.0.0', 53)
    
    # Waiting for handshake
    print('handshake: waiting for handshake SYN')
    handshake_payload, client_addr = io.proto_recv_next_checked(256)
    print(f'handshake: request from {client_addr[0]}:{client_addr[1]}')
    handshake_syn, handshake_id = struct.unpack('3sI', handshake_payload)
    if handshake_syn.decode() != 'SYN':
        print('handshake: invalid request')
    print(f'handshake: got SYN with id={handshake_id}, sending reply ACK with id={handshake_id + 1}')
    io.set_client_address(client_addr[0], client_addr[1])
    io.proto_send_next(struct.pack('3sI', 'ACK'.encode(), handshake_id + 1))
    print(f'handshake: done')

    loop.run_until_complete(handle_peer_io(io, '10.1.1.1'))


def client_main():
    io = KazariTunnelClientIO(SERVER_IP, 53, CLIENT_IFACE)
    
    # Handshake stage
    handshake_id = random.randint(0x0000, 0xffff)
    io.proto_send_next(struct.pack('3sI', 'SYN'.encode(), handshake_id))
    print(f'handshake: sent SYN with id={handshake_id}, waiting for ACK response')
    handshake_ack, handshake_ack_id = struct.unpack('3sI', io.proto_recv_next_checked(256)[0])
    if handshake_ack.decode() != 'ACK':
        print('handshake: invalid response')
        return
    print(f'handshake: got reply ACK with id={handshake_ack_id}')
    if handshake_ack_id != handshake_id + 1:
        print('handshake: invalid id in response')
        return
    print('handshake: done')

    loop.run_until_complete(handle_peer_io(io, '10.1.1.2'))


if __name__ == '__main__':
    mode = sys.argv[1]
    if mode == 'server':
        server_main()
    elif mode == 'client':
        client_main()
    else:
        print('invalid mode')

该实现利用 TUN 设备来接入内核 IP 协议栈,然后把数据包装为 UDP 包,发送到 53 端口.

用法: 在服务端,确保 53 端口没有被占用(它极有可能被 systemd-resolved 占用,如果是这样的话,暂时停止这个服务即可),然后运行脚本:

$ sudo ./dnstunnel.py server

在客户端,运行:

$ sudo ./dnstunnel.py client

此时,服务端和客户端都会被创建一个名为 kztun0 的网卡设备,服务端被设置 IP 10.1.1.1,客户端被设置 IP 10.1.1.2,然后二者就可以使用这个网卡进行基于 IPv4 协议的通讯了,可以在客户端使用 10.1.1.1 访问服务端,反之亦然,而无论客户端是否有校园网认证.

例如:可以跑 ssh 开一个 socks5 代理来上网

$ ssh -ND 1080 your_user_name@10.1.1.1

HTTP 速度测试(服务器端是 100Mpbs 带宽):

这也太烂了

1 个赞

可以了解下MOHOO项目谢谢喵

这是什么,搜了下没搜到

应该可以在协会gitlab上找到,如果没找到就抓yxh或者谁问问

This one? TexasOct/mohoo-client: A backend for mohoo service

@Texas_Oct cc

等208恢复供电吧,在208的gitlab上