diff options
Diffstat (limited to 'tools/testing/selftests/bpf/generate_udp_fragments.py')
| -rwxr-xr-x | tools/testing/selftests/bpf/generate_udp_fragments.py | 90 | 
1 files changed, 90 insertions, 0 deletions
diff --git a/tools/testing/selftests/bpf/generate_udp_fragments.py b/tools/testing/selftests/bpf/generate_udp_fragments.py new file mode 100755 index 000000000000..2b8a1187991c --- /dev/null +++ b/tools/testing/selftests/bpf/generate_udp_fragments.py @@ -0,0 +1,90 @@ +#!/bin/env python3 +# SPDX-License-Identifier: GPL-2.0 + +""" +This script helps generate fragmented UDP packets. + +While it is technically possible to dynamically generate +fragmented packets in C, it is much harder to read and write +said code. `scapy` is relatively industry standard and really +easy to read / write. + +So we choose to write this script that generates a valid C +header. Rerun script and commit generated file after any +modifications. +""" + +import argparse +import os + +from scapy.all import * + + +# These constants must stay in sync with `ip_check_defrag.c` +VETH1_ADDR = "172.16.1.200" +VETH0_ADDR6 = "fc00::100" +VETH1_ADDR6 = "fc00::200" +CLIENT_PORT = 48878 +SERVER_PORT = 48879 +MAGIC_MESSAGE = "THIS IS THE ORIGINAL MESSAGE, PLEASE REASSEMBLE ME" + + +def print_header(f): +    f.write("// SPDX-License-Identifier: GPL-2.0\n") +    f.write("/* DO NOT EDIT -- this file is generated */\n") +    f.write("\n") +    f.write("#ifndef _IP_CHECK_DEFRAG_FRAGS_H\n") +    f.write("#define _IP_CHECK_DEFRAG_FRAGS_H\n") +    f.write("\n") +    f.write("#include <stdint.h>\n") +    f.write("\n") + + +def print_frags(f, frags, v6): +    for idx, frag in enumerate(frags): +        # 10 bytes per line to keep width in check +        chunks = [frag[i : i + 10] for i in range(0, len(frag), 10)] +        chunks_fmted = [", ".join([str(hex(b)) for b in chunk]) for chunk in chunks] +        suffix = "6" if v6 else "" + +        f.write(f"static uint8_t frag{suffix}_{idx}[] = {{\n") +        for chunk in chunks_fmted: +            f.write(f"\t{chunk},\n") +        f.write(f"}};\n") + + +def print_trailer(f): +    f.write("\n") +    f.write("#endif /* _IP_CHECK_DEFRAG_FRAGS_H */\n") + + +def main(f): +    # srcip of 0 is filled in by IP_HDRINCL +    sip = "0.0.0.0" +    sip6 = VETH0_ADDR6 +    dip = VETH1_ADDR +    dip6 = VETH1_ADDR6 +    sport = CLIENT_PORT +    dport = SERVER_PORT +    payload = MAGIC_MESSAGE.encode() + +    # Disable UDPv4 checksums to keep code simpler +    pkt = IP(src=sip,dst=dip) / UDP(sport=sport,dport=dport,chksum=0) / Raw(load=payload) +    # UDPv6 requires a checksum +    # Also pin the ipv6 fragment header ID, otherwise it's a random value +    pkt6 = IPv6(src=sip6,dst=dip6) / IPv6ExtHdrFragment(id=0xBEEF) / UDP(sport=sport,dport=dport) / Raw(load=payload) + +    frags = [f.build() for f in pkt.fragment(24)] +    frags6 = [f.build() for f in fragment6(pkt6, 72)] + +    print_header(f) +    print_frags(f, frags, False) +    print_frags(f, frags6, True) +    print_trailer(f) + + +if __name__ == "__main__": +    dir = os.path.dirname(os.path.realpath(__file__)) +    header = f"{dir}/ip_check_defrag_frags.h" +    with open(header, "w") as f: +        main(f)  |