import unittest
from parameterized import parameterized
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, GRE
from scapy.layers.inet import IP, UDP, ICMP
from framework import VppTestCase, VppTestRunner
from util import ppp, fragment_rfc791, fragment_rfc8200
-from vpp_gre_interface import VppGreInterface, VppGre6Interface
+from vpp_gre_interface import VppGreInterface
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath
self.logger.debug(ppp("Got packet:", packet))
ip = packet[scapy_ip_family]
udp = packet[UDP]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertTrue(
packet_index not in dropped_packet_indexes,
cls.create_stream(cls.packet_sizes)
cls.create_fragments()
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv4Reassembly, cls).tearDownClass()
+
def setUp(self):
""" Test setup - force timeout on existing reassemblies """
super(TestIPv4Reassembly, self).setUp()
def tearDown(self):
super(TestIPv4Reassembly, self).tearDown()
+
+ def show_commands_at_teardown(self):
self.logger.debug(self.vapi.ppcli("show ip4-reassembly details"))
self.logger.debug(self.vapi.ppcli("show buffers"))
cls.pkt_infos = []
for index, info in six.iteritems(infos):
p = info.data
- # cls.logger.debug(ppp("Packet:", p.__class__(str(p))))
+ # cls.logger.debug(ppp("Packet:",
+ # p.__class__(scapy.compat.raw(p))))
fragments_400 = fragment_rfc791(p, 400)
fragments_300 = fragment_rfc791(p, 300)
fragments_200 = [
# new reassemblies will be started and packet generator will
# freak out when it detects unfreed buffers
zipped = zip(frags_300, frags_200)
- for i, j in zipped[:-1]:
+ for i, j in zipped:
fragments.extend(i)
fragments.extend(j)
- fragments.append(zipped[-1][0])
+ fragments.pop()
self.pg_enable_capture()
self.src_if.add_stream(fragments)
cls.create_stream(cls.packet_sizes)
cls.create_fragments()
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv6Reassembly, cls).tearDownClass()
+
def setUp(self):
""" Test setup - force timeout on existing reassemblies """
super(TestIPv6Reassembly, self).setUp()
def tearDown(self):
super(TestIPv6Reassembly, self).tearDown()
+
+ def show_commands_at_teardown(self):
self.logger.debug(self.vapi.ppcli("show ip6-reassembly details"))
self.logger.debug(self.vapi.ppcli("show buffers"))
cls.pkt_infos = []
for index, info in six.iteritems(infos):
p = info.data
- # cls.logger.debug(ppp("Packet:", p.__class__(str(p))))
+ # cls.logger.debug(ppp("Packet:",
+ # p.__class__(scapy.compat.raw(p))))
fragments_400 = fragment_rfc8200(p, info.index, 400)
fragments_300 = fragment_rfc8200(p, info.index, 300)
cls.pkt_infos.append((index, fragments_400, fragments_300))
# new reassemblies will be started and packet generator will
# freak out when it detects unfreed buffers
zipped = zip(frags_400, frags_300)
- for i, j in zipped[:-1]:
+ for i, j in zipped:
fragments.extend(i)
fragments.extend(j)
- fragments.append(zipped[-1][0])
+ fragments.pop()
dropped_packet_indexes = set(
index for (index, _, frags) in self.pkt_infos if len(frags) > 1
Raw())
self.extend_packet(p, 1000, self.padding)
fragments = fragment_rfc8200(p, 1, 500)
- bad_fragment = p.__class__(str(fragments[1]))
+ bad_fragment = p.__class__(scapy.compat.raw(fragments[1]))
bad_fragment[IPv6ExtHdrFragment].nh = 59
bad_fragment[IPv6ExtHdrFragment].offset = 0
self.pg_enable_capture()
cls.create_stream()
cls.create_fragments()
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv4ReassemblyLocalNode, cls).tearDownClass()
+
def setUp(self):
""" Test setup - force timeout on existing reassemblies """
super(TestIPv4ReassemblyLocalNode, self).setUp()
def tearDown(self):
super(TestIPv4ReassemblyLocalNode, self).tearDown()
+
+ def show_commands_at_teardown(self):
self.logger.debug(self.vapi.ppcli("show ip4-reassembly details"))
self.logger.debug(self.vapi.ppcli("show buffers"))
cls.pkt_infos = []
for index, info in six.iteritems(infos):
p = info.data
- # cls.logger.debug(ppp("Packet:", p.__class__(str(p))))
+ # cls.logger.debug(ppp("Packet:",
+ # p.__class__(scapy.compat.raw(p))))
fragments_300 = fragment_rfc791(p, 300)
cls.pkt_infos.append((index, fragments_300))
cls.fragments_300 = [x for (_, frags) in cls.pkt_infos for x in frags]
self.logger.debug(ppp("Got packet:", packet))
ip = packet[IP]
icmp = packet[ICMP]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
if packet_index in seen:
raise Exception(ppp("Duplicate packet received", packet))
cls.packet_sizes = [64, 512, 1518, 9018]
cls.padding = " abcdefghijklmn"
+ @classmethod
+ def tearDownClass(cls):
+ super(TestFIFReassembly, cls).tearDownClass()
+
def setUp(self):
""" Test setup - force timeout on existing reassemblies """
super(TestFIFReassembly, self).setUp()
expire_walk_interval_ms=10000, is_ip6=1)
def tearDown(self):
+ super(TestFIFReassembly, self).tearDown()
+
+ def show_commands_at_teardown(self):
self.logger.debug(self.vapi.ppcli("show ip4-reassembly details"))
self.logger.debug(self.vapi.ppcli("show ip6-reassembly details"))
self.logger.debug(self.vapi.ppcli("show buffers"))
- super(TestFIFReassembly, self).tearDown()
def verify_capture(self, capture, ip_class, dropped_packet_indexes=[]):
"""Verify captured packet stream.
self.logger.debug(ppp("Got packet:", packet))
ip = packet[ip_class]
udp = packet[UDP]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertTrue(
packet_index not in dropped_packet_indexes,
# it shared for multiple test cases
self.tun_ip6 = "1002::1"
- self.gre6 = VppGre6Interface(self, self.src_if.local_ip6, self.tun_ip6)
+ self.gre6 = VppGreInterface(self, self.src_if.local_ip6, self.tun_ip6)
self.gre6.add_vpp_config()
self.gre6.admin_up()
self.gre6.config_ip6()