X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=test%2Ftest_ipip.py;h=c01972a24138abb63915a614da312f7dab0f9b79;hb=8feeaff56;hp=b7a0179957b058a576a6de8d0f6f4edd4967ce49;hpb=282093f1fe783b5d36a014d4495995cd64e2e3fb;p=vpp.git diff --git a/test/test_ipip.py b/test/test_ipip.py index b7a0179957b..c01972a2413 100644 --- a/test/test_ipip.py +++ b/test/test_ipip.py @@ -8,7 +8,7 @@ from framework import VppTestCase, VppTestRunner from vpp_ip import DpoProto from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable from socket import AF_INET, AF_INET6, inet_pton -import StringIO +from util import reassemble4 """ Testipip is a subclass of VPPTestCase classes. @@ -17,21 +17,6 @@ IPIP tests. """ -# Replace by deframent from scapy. -def reassemble(listoffragments): - buffer = StringIO.StringIO() - first = listoffragments[0] - buffer.seek(20) - for pkt in listoffragments: - buffer.seek(pkt[IP].frag*8) - buffer.write(pkt[IP].payload) - first.len = len(buffer.getvalue()) + 20 - first.flags = 0 - del(first.chksum) - header = str(first[IP])[:20] - return first[IP].__class__(header + buffer.getvalue()) - - class TestIPIP(VppTestCase): """ IPIP Test Case """ @@ -41,9 +26,9 @@ class TestIPIP(VppTestCase): cls.create_pg_interfaces(range(2)) cls.interfaces = list(cls.pg_interfaces) - def setUp(cls): - super(TestIPIP, cls).setUp() - for i in cls.interfaces: + def setUp(self): + super(TestIPIP, self).setUp() + for i in self.interfaces: i.admin_up() i.config_ip4() i.config_ip6() @@ -60,7 +45,7 @@ class TestIPIP(VppTestCase): i.admin_down() def validate(self, rx, expected): - self.assertEqual(rx, expected.__class__(str(expected))) + self.assertEqual(rx, expected.__class__(expected)) def generate_ip4_frags(self, payload_length, fragment_size): p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) @@ -91,8 +76,8 @@ class TestIPIP(VppTestCase): # Set interface up and enable IP on it self.vapi.sw_interface_set_flags(sw_if_index, 1) self.vapi.sw_interface_set_unnumbered( - ip_sw_if_index=self.pg0.sw_if_index, - sw_if_index=sw_if_index) + sw_if_index=self.pg0.sw_if_index, + unnumbered_sw_if_index=sw_if_index) # Add IPv4 and IPv6 routes via tunnel interface ip4_via_tunnel = VppIpRoute( @@ -116,7 +101,7 @@ class TestIPIP(VppTestCase): p6_reply = (IP(src=self.pg0.local_ip4, dst=self.pg1.remote_ip4, proto='ipv6', id=0, tos=42) / p_inner_ip6 / p_payload) p6_reply.ttl -= 1 - rx = self.send_and_expect(self.pg0, p6*10, self.pg1) + rx = self.send_and_expect(self.pg0, p6 * 10, self.pg1) for p in rx: self.validate(p[1], p6_reply) @@ -129,7 +114,7 @@ class TestIPIP(VppTestCase): p_ip4_inner / p_payload) p4_reply.ttl -= 1 p4_reply.id = 0 - rx = self.send_and_expect(self.pg0, p4*10, self.pg1) + rx = self.send_and_expect(self.pg0, p4 * 10, self.pg1) for p in rx: self.validate(p[1], p4_reply) @@ -142,7 +127,7 @@ class TestIPIP(VppTestCase): dst=self.pg0.local_ip4) / p_ip4 / p_payload) p4_reply = (p_ip4 / p_payload) p4_reply.ttl -= 1 - rx = self.send_and_expect(self.pg1, p4*10, self.pg0) + rx = self.send_and_expect(self.pg1, p4 * 10, self.pg0) for p in rx: self.validate(p[1], p4_reply) @@ -156,7 +141,7 @@ class TestIPIP(VppTestCase): dst=self.pg0.local_ip4) / p_ip6 / p_payload) p6_reply = (p_ip6 / p_payload) p6_reply.hlim = 63 - rx = self.send_and_expect(self.pg1, p6*10, self.pg0) + rx = self.send_and_expect(self.pg1, p6 * 10, self.pg0) for p in rx: self.validate(p[1], p6_reply) @@ -214,7 +199,7 @@ class TestIPIP(VppTestCase): self.pg1.add_stream(frags) self.pg_start() rx = self.pg0.get_capture(6) - reass_pkt = reassemble(rx) + reass_pkt = reassemble4(rx) p4_reply.ttl -= 1 p4_reply.id = 256 self.validate(reass_pkt, p4_reply) @@ -225,7 +210,7 @@ class TestIPIP(VppTestCase): self.pg1.add_stream(frags) self.pg_start() rx = self.pg0.get_capture(2) - reass_pkt = reassemble(rx) + reass_pkt = reassemble4(rx) p4_reply.ttl -= 1 p4_reply.id = 512 self.validate(reass_pkt, p4_reply) @@ -264,6 +249,7 @@ class TestIPIP6(VppTestCase): cls.interfaces = list(cls.pg_interfaces) def setUp(self): + super(TestIPIP6, self).setUp() for i in self.interfaces: i.admin_up() i.config_ip4() @@ -292,7 +278,8 @@ class TestIPIP6(VppTestCase): self.tunnel_if_index = sw_if_index self.vapi.sw_interface_set_flags(sw_if_index, 1) self.vapi.sw_interface_set_unnumbered( - ip_sw_if_index=self.pg0.sw_if_index, sw_if_index=sw_if_index) + sw_if_index=self.pg0.sw_if_index, + unnumbered_sw_if_index=sw_if_index) # Add IPv4 and IPv6 routes via tunnel interface ip4_via_tunnel = VppIpRoute( @@ -320,7 +307,7 @@ class TestIPIP6(VppTestCase): rv = self.vapi.ipip_del_tunnel(sw_if_index=self.tunnel_if_index) def validate(self, rx, expected): - self.assertEqual(rx, expected.__class__(str(expected))) + self.assertEqual(rx, expected.__class__(expected)) def generate_ip6_frags(self, payload_length, fragment_size): p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) @@ -362,7 +349,7 @@ class TestIPIP6(VppTestCase): hlim=64, tc=42) / p_ip6 / p_payload) p6_reply[1].hlim -= 1 - rx = self.send_and_expect(self.pg0, p6*11, self.pg1) + rx = self.send_and_expect(self.pg0, p6 * 11, self.pg1) for p in rx: self.validate(p[1], p6_reply) @@ -372,7 +359,7 @@ class TestIPIP6(VppTestCase): dst=self.pg1.remote_ip6, hlim=64, tc=42) / p_ip4 / p_payload) p4_reply[1].ttl -= 1 - rx = self.send_and_expect(self.pg0, p4*11, self.pg1) + rx = self.send_and_expect(self.pg0, p4 * 11, self.pg1) for p in rx: self.validate(p[1], p4_reply) @@ -391,7 +378,7 @@ class TestIPIP6(VppTestCase): dst=self.pg0.local_ip6) / p_ip4 / p_payload) p4_reply = (p_ip4 / p_payload) p4_reply.ttl -= 1 - rx = self.send_and_expect(self.pg1, p4*11, self.pg0) + rx = self.send_and_expect(self.pg1, p4 * 11, self.pg0) for p in rx: self.validate(p[1], p4_reply) @@ -401,7 +388,7 @@ class TestIPIP6(VppTestCase): dst=self.pg0.local_ip6) / p_ip6 / p_payload) p6_reply = (p_ip6 / p_payload) p6_reply.hlim = 63 - rx = self.send_and_expect(self.pg1, p6*11, self.pg0) + rx = self.send_and_expect(self.pg1, p6 * 11, self.pg0) for p in rx: self.validate(p[1], p6_reply) @@ -473,7 +460,7 @@ class TestIPIP6(VppTestCase): rx = self.pg1.get_capture(2) # Scapy defragment doesn't deal well with multiple layers - # of samy type / Ethernet header first + # of same type / Ethernet header first f = [p[1] for p in rx] reass_pkt = defragment6(f) self.validate(reass_pkt, p6_reply)