#!/usr/bin/env python3
import unittest
-from random import shuffle, choice, randrange
+from random import shuffle, randrange
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase
+from asfframework import VppTestRunner
-import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, GRE
from scapy.layers.inet import IP, UDP, ICMP, icmptypes
ICMPv6EchoRequest,
ICMPv6EchoReply,
)
-from framework import VppTestCase, VppTestRunner
-from util import ppp, ppc, fragment_rfc791, fragment_rfc8200
+from util import ppp, fragment_rfc791, fragment_rfc8200
from vpp_gre_interface import VppGreInterface
-from vpp_ip import DpoProto
-from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto
+from vpp_ip_route import VppIpRoute, VppRoutePath
from vpp_papi import VppEnum
# 35 is enough to have >257 400-byte fragments
first_packets = [[] for n in range(self.vpp_worker_count)]
second_packets = [[] for n in range(self.vpp_worker_count)]
rest_of_packets = [[] for n in range(self.vpp_worker_count)]
- for (_, p) in self.pkt_infos:
+ for _, p in self.pkt_infos:
wi = randrange(self.vpp_worker_count)
second_packets[wi].append(p[0])
if len(p) <= 1:
self.assertIn(ICMPv6ParamProblem, icmp)
self.assert_equal(icmp[ICMPv6ParamProblem].code, 3, "ICMP code")
- def test_truncated_fragment(self):
- """truncated fragment"""
- pkt = (
- Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
- / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44, plen=2)
- / IPv6ExtHdrFragment(nh=6)
- )
-
- self.send_and_assert_no_replies(self.pg0, [pkt], self.pg0)
-
def test_invalid_frag_size(self):
"""fragment size not a multiple of 8"""
p = (
def test_atomic_fragment(self):
"""IPv6 atomic fragment"""
pkt = (
- Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
/ IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44, plen=65535)
/ IPv6ExtHdrFragment(
offset=8191, m=1, res1=0xFF, res2=0xFF, nh=255, id=0xFFFF
self.send_and_assert_no_replies(self.pg0, [pkt])
pkt = (
- Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
/ IPv6(src=self.pg0.remote_ip6, dst=self.pg0.remote_ip6)
/ ICMPv6EchoRequest()
)
def test_one_fragment(self):
"""whole packet in one fragment processed independently"""
pkt = (
- Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
/ IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
/ ICMPv6EchoRequest()
/ Raw("X" * 1600)
# send an atomic fragment with same id - should be reassembled
pkt = (
- Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
/ IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
/ IPv6ExtHdrFragment(id=1)
/ ICMPv6EchoRequest()
def test_bunch_of_fragments(self):
"""valid fragments followed by rogue fragments and atomic fragment"""
pkt = (
- Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
/ IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
/ ICMPv6EchoRequest()
/ Raw("X" * 1600)
self.send_and_assert_no_replies(self.pg0, inc_frag * 604)
pkt = (
- Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
/ IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
/ IPv6ExtHdrFragment(id=1)
/ ICMPv6EchoRequest()
)
self.vapi.ip_local_reass_enable_disable(enable_ip6=True)
pkt = (
- Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac)
+ Ether(src=self.src_if.remote_mac, dst=self.src_if.local_mac)
/ IPv6(src=self.src_if.remote_ip6, dst=self.src_if.local_ip6)
/ ICMPv6EchoRequest(id=1234)
/ Raw("X" * 1600)
first_packets = [[] for n in range(self.vpp_worker_count)]
second_packets = [[] for n in range(self.vpp_worker_count)]
rest_of_packets = [[] for n in range(self.vpp_worker_count)]
- for (_, p) in self.pkt_infos:
+ for _, p in self.pkt_infos:
wi = randrange(self.vpp_worker_count)
second_packets[wi].append(p[0])
if len(p) <= 1:
def test_one_fragment(self):
"""whole packet in one fragment processed independently"""
pkt = (
- Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac)
+ Ether(src=self.src_if.remote_mac, dst=self.src_if.local_mac)
/ IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6)
/ ICMPv6EchoRequest()
/ Raw("X" * 1600)
# send an atomic fragment with same id - should be reassembled
pkt = (
- Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac)
+ Ether(src=self.src_if.remote_mac, dst=self.src_if.local_mac)
/ IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6)
/ IPv6ExtHdrFragment(id=1)
/ ICMPv6EchoRequest()
def test_bunch_of_fragments(self):
"""valid fragments followed by rogue fragments and atomic fragment"""
pkt = (
- Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac)
+ Ether(src=self.src_if.remote_mac, dst=self.src_if.local_mac)
/ IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6)
/ ICMPv6EchoRequest()
/ Raw("X" * 1600)
rx = self.send_and_expect(self.src_if, frags, self.dst_if)
rogue = (
- Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac)
+ Ether(src=self.src_if.remote_mac, dst=self.src_if.local_mac)
/ IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6)
/ IPv6ExtHdrFragment(id=1, nh=58, offset=608)
/ Raw("X" * 308)
self.send_and_expect(self.src_if, rogue * 604, self.dst_if)
pkt = (
- Ether(src=self.src_if.local_mac, dst=self.src_if.remote_mac)
+ Ether(src=self.src_if.remote_mac, dst=self.src_if.local_mac)
/ IPv6(src=self.src_if.remote_ip6, dst=self.dst_if.remote_ip6)
/ IPv6ExtHdrFragment(id=1)
/ ICMPv6EchoRequest()