from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable
from socket import AF_INET, AF_INET6, inet_pton
-import StringIO
+from util import reassemble4
""" Testipip is a subclass of VPPTestCase classes.
"""
-# Replace by deframent from scapy.
-def reassemble(listoffragments):
- buffer = StringIO.StringIO()
- first = listoffragments[0]
- buffer.seek(20)
- for pkt in listoffragments:
- buffer.seek(pkt[IP].frag*8)
- buffer.write(pkt[IP].payload)
- first.len = len(buffer.getvalue()) + 20
- first.flags = 0
- del(first.chksum)
- header = str(first[IP])[:20]
- return first[IP].__class__(header + buffer.getvalue())
-
-
class TestIPIP(VppTestCase):
""" IPIP Test Case """
cls.create_pg_interfaces(range(2))
cls.interfaces = list(cls.pg_interfaces)
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPIP, cls).tearDownClass()
+
def setUp(self):
super(TestIPIP, self).setUp()
for i in self.interfaces:
i.admin_down()
def validate(self, rx, expected):
- self.assertEqual(rx, expected.__class__(str(expected)))
+ self.assertEqual(rx, expected.__class__(expected))
def generate_ip4_frags(self, payload_length, fragment_size):
p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
# Set interface up and enable IP on it
self.vapi.sw_interface_set_flags(sw_if_index, 1)
self.vapi.sw_interface_set_unnumbered(
- ip_sw_if_index=self.pg0.sw_if_index,
- sw_if_index=sw_if_index)
+ sw_if_index=self.pg0.sw_if_index,
+ unnumbered_sw_if_index=sw_if_index)
# Add IPv4 and IPv6 routes via tunnel interface
ip4_via_tunnel = VppIpRoute(
p6_reply = (IP(src=self.pg0.local_ip4, dst=self.pg1.remote_ip4,
proto='ipv6', id=0, tos=42) / p_inner_ip6 / p_payload)
p6_reply.ttl -= 1
- rx = self.send_and_expect(self.pg0, p6*10, self.pg1)
+ rx = self.send_and_expect(self.pg0, p6 * 10, self.pg1)
for p in rx:
self.validate(p[1], p6_reply)
p_ip4_inner / p_payload)
p4_reply.ttl -= 1
p4_reply.id = 0
- rx = self.send_and_expect(self.pg0, p4*10, self.pg1)
+ rx = self.send_and_expect(self.pg0, p4 * 10, self.pg1)
for p in rx:
self.validate(p[1], p4_reply)
dst=self.pg0.local_ip4) / p_ip4 / p_payload)
p4_reply = (p_ip4 / p_payload)
p4_reply.ttl -= 1
- rx = self.send_and_expect(self.pg1, p4*10, self.pg0)
+ rx = self.send_and_expect(self.pg1, p4 * 10, self.pg0)
for p in rx:
self.validate(p[1], p4_reply)
dst=self.pg0.local_ip4) / p_ip6 / p_payload)
p6_reply = (p_ip6 / p_payload)
p6_reply.hlim = 63
- rx = self.send_and_expect(self.pg1, p6*10, self.pg0)
+ rx = self.send_and_expect(self.pg1, p6 * 10, self.pg0)
for p in rx:
self.validate(p[1], p6_reply)
self.pg1.add_stream(frags)
self.pg_start()
rx = self.pg0.get_capture(6)
- reass_pkt = reassemble(rx)
+ reass_pkt = reassemble4(rx)
p4_reply.ttl -= 1
p4_reply.id = 256
self.validate(reass_pkt, p4_reply)
self.pg1.add_stream(frags)
self.pg_start()
rx = self.pg0.get_capture(2)
- reass_pkt = reassemble(rx)
+ reass_pkt = reassemble4(rx)
p4_reply.ttl -= 1
p4_reply.id = 512
self.validate(reass_pkt, p4_reply)
cls.create_pg_interfaces(range(2))
cls.interfaces = list(cls.pg_interfaces)
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPIP6, cls).tearDownClass()
+
def setUp(self):
+ super(TestIPIP6, self).setUp()
for i in self.interfaces:
i.admin_up()
i.config_ip4()
self.tunnel_if_index = sw_if_index
self.vapi.sw_interface_set_flags(sw_if_index, 1)
self.vapi.sw_interface_set_unnumbered(
- ip_sw_if_index=self.pg0.sw_if_index, sw_if_index=sw_if_index)
+ sw_if_index=self.pg0.sw_if_index,
+ unnumbered_sw_if_index=sw_if_index)
# Add IPv4 and IPv6 routes via tunnel interface
ip4_via_tunnel = VppIpRoute(
rv = self.vapi.ipip_del_tunnel(sw_if_index=self.tunnel_if_index)
def validate(self, rx, expected):
- self.assertEqual(rx, expected.__class__(str(expected)))
+ self.assertEqual(rx, expected.__class__(expected))
def generate_ip6_frags(self, payload_length, fragment_size):
p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
hlim=64, tc=42) /
p_ip6 / p_payload)
p6_reply[1].hlim -= 1
- rx = self.send_and_expect(self.pg0, p6*11, self.pg1)
+ rx = self.send_and_expect(self.pg0, p6 * 11, self.pg1)
for p in rx:
self.validate(p[1], p6_reply)
dst=self.pg1.remote_ip6, hlim=64, tc=42) /
p_ip4 / p_payload)
p4_reply[1].ttl -= 1
- rx = self.send_and_expect(self.pg0, p4*11, self.pg1)
+ rx = self.send_and_expect(self.pg0, p4 * 11, self.pg1)
for p in rx:
self.validate(p[1], p4_reply)
dst=self.pg0.local_ip6) / p_ip4 / p_payload)
p4_reply = (p_ip4 / p_payload)
p4_reply.ttl -= 1
- rx = self.send_and_expect(self.pg1, p4*11, self.pg0)
+ rx = self.send_and_expect(self.pg1, p4 * 11, self.pg0)
for p in rx:
self.validate(p[1], p4_reply)
dst=self.pg0.local_ip6) / p_ip6 / p_payload)
p6_reply = (p_ip6 / p_payload)
p6_reply.hlim = 63
- rx = self.send_and_expect(self.pg1, p6*11, self.pg0)
+ rx = self.send_and_expect(self.pg1, p6 * 11, self.pg0)
for p in rx:
self.validate(p[1], p6_reply)
rx = self.pg1.get_capture(2)
# Scapy defragment doesn't deal well with multiple layers
- # of samy type / Ethernet header first
+ # of same type / Ethernet header first
f = [p[1] for p in rx]
reass_pkt = defragment6(f)
self.validate(reass_pkt, p6_reply)