2 """IP{4,6} over IP{v,6} tunnel functional tests"""
5 from scapy.layers.inet6 import IPv6, Ether, IP, UDP, IPv6ExtHdrFragment
6 from scapy.all import fragment, fragment6, RandShort, defragment6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable
10 from socket import AF_INET, AF_INET6, inet_pton
13 """ Testipip is a subclass of VPPTestCase classes.
20 # Replace by deframent from scapy.
21 def reassemble(listoffragments):
22 buffer = StringIO.StringIO()
23 first = listoffragments[0]
25 for pkt in listoffragments:
26 buffer.seek(pkt[IP].frag*8)
27 buffer.write(pkt[IP].payload)
28 first.len = len(buffer.getvalue()) + 20
31 header = str(first[IP])[:20]
32 return first[IP].__class__(header + buffer.getvalue())
35 class TestIPIP(VppTestCase):
36 """ IPIP Test Case """
40 super(TestIPIP, cls).setUpClass()
41 cls.create_pg_interfaces(range(2))
42 cls.interfaces = list(cls.pg_interfaces)
45 super(TestIPIP, cls).setUp()
46 for i in cls.interfaces:
55 super(TestIPIP, self).tearDown()
57 for i in self.pg_interfaces:
62 def validate(self, rx, expected):
63 self.assertEqual(rx, expected.__class__(str(expected)))
65 def generate_ip4_frags(self, payload_length, fragment_size):
66 p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
67 p_payload = UDP(sport=1234, dport=1234) / self.payload(payload_length)
68 p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
69 outer_ip4 = (p_ether / IP(src=self.pg1.remote_ip4,
71 dst=self.pg0.local_ip4) / p_ip4 / p_payload)
72 frags = fragment(outer_ip4, fragment_size)
73 p4_reply = (p_ip4 / p_payload)
75 return frags, p4_reply
78 """ ip{v4,v6} over ip4 test """
79 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
80 p_ip6 = IPv6(src="1::1", dst="DEAD::1", nh='UDP', tc=42)
81 p_ip4 = IP(src="1.2.3.4", dst="130.67.0.1", tos=42)
82 p_payload = UDP(sport=1234, dport=1234)
85 rv = self.vapi.ipip_add_tunnel(
86 src_address=self.pg0.local_ip4n,
87 dst_address=self.pg1.remote_ip4n,
88 is_ipv6=0, tc_tos=0xFF)
89 sw_if_index = rv.sw_if_index
91 # Set interface up and enable IP on it
92 self.vapi.sw_interface_set_flags(sw_if_index, 1)
93 self.vapi.sw_interface_set_unnumbered(
94 ip_sw_if_index=self.pg0.sw_if_index,
95 sw_if_index=sw_if_index)
97 # Add IPv4 and IPv6 routes via tunnel interface
98 ip4_via_tunnel = VppIpRoute(
99 self, "130.67.0.0", 16,
100 [VppRoutePath("0.0.0.0",
102 proto=DpoProto.DPO_PROTO_IP4)], is_ip6=0)
103 ip4_via_tunnel.add_vpp_config()
105 ip6_via_tunnel = VppIpRoute(
109 proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1)
110 ip6_via_tunnel.add_vpp_config()
112 # IPv6 in to IPv4 tunnel
113 p6 = (p_ether / p_ip6 / p_payload)
115 p_inner_ip6.hlim -= 1
116 p6_reply = (IP(src=self.pg0.local_ip4, dst=self.pg1.remote_ip4,
117 proto='ipv6', id=0, tos=42) / p_inner_ip6 / p_payload)
119 rx = self.send_and_expect(self.pg0, p6*10, self.pg1)
121 self.validate(p[1], p6_reply)
123 # IPv4 in to IPv4 tunnel
124 p4 = (p_ether / p_ip4 / p_payload)
127 p4_reply = (IP(src=self.pg0.local_ip4, dst=self.pg1.remote_ip4,
129 p_ip4_inner / p_payload)
132 rx = self.send_and_expect(self.pg0, p4*10, self.pg1)
134 self.validate(p[1], p4_reply)
137 p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
139 # IPv4 tunnel to IPv4
140 p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
141 p4 = (p_ether / IP(src=self.pg1.remote_ip4,
142 dst=self.pg0.local_ip4) / p_ip4 / p_payload)
143 p4_reply = (p_ip4 / p_payload)
145 rx = self.send_and_expect(self.pg1, p4*10, self.pg0)
147 self.validate(p[1], p4_reply)
149 err = self.statistics.get_counter(
150 '/err/ipip4-input/packets decapsulated')
151 self.assertEqual(err, 10)
153 # IPv4 tunnel to IPv6
154 p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
155 p6 = (p_ether / IP(src=self.pg1.remote_ip4,
156 dst=self.pg0.local_ip4) / p_ip6 / p_payload)
157 p6_reply = (p_ip6 / p_payload)
159 rx = self.send_and_expect(self.pg1, p6*10, self.pg0)
161 self.validate(p[1], p6_reply)
163 err = self.statistics.get_counter(
164 '/err/ipip4-input/packets decapsulated')
165 self.assertEqual(err, 20)
168 # Fragmentation / Reassembly and Re-fragmentation
170 rv = self.vapi.ip_reassembly_enable_disable(
171 sw_if_index=self.pg1.sw_if_index,
174 # Send lots of fragments, verify reassembled packet
175 frags, p4_reply = self.generate_ip4_frags(3131, 1400)
177 for i in range(0, 1000):
179 self.pg1.add_stream(f)
180 self.pg_enable_capture()
182 rx = self.pg0.get_capture(1000)
185 self.validate(p[1], p4_reply)
187 err = self.statistics.get_counter(
188 '/err/ipip4-input/packets decapsulated')
189 self.assertEqual(err, 1020)
193 for i in range(1, 90):
194 frags, p4_reply = self.generate_ip4_frags(i * 100, 1000)
197 self.pg_enable_capture()
198 self.pg1.add_stream(f)
200 rx = self.pg0.get_capture(89)
203 self.validate(p[1], r[i])
206 # Now try with re-fragmentation
208 # Send fragments to tunnel head-end, for the tunnel head end
209 # to reassemble and then refragment
211 self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [576, 0, 0, 0])
212 frags, p4_reply = self.generate_ip4_frags(3123, 1200)
213 self.pg_enable_capture()
214 self.pg1.add_stream(frags)
216 rx = self.pg0.get_capture(6)
217 reass_pkt = reassemble(rx)
220 self.validate(reass_pkt, p4_reply)
222 self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [1600, 0, 0, 0])
223 frags, p4_reply = self.generate_ip4_frags(3123, 1200)
224 self.pg_enable_capture()
225 self.pg1.add_stream(frags)
227 rx = self.pg0.get_capture(2)
228 reass_pkt = reassemble(rx)
231 self.validate(reass_pkt, p4_reply)
233 def test_ipip_create(self):
234 """ ipip create / delete interface test """
235 rv = self.vapi.ipip_add_tunnel(
236 src_address=inet_pton(AF_INET, '1.2.3.4'),
237 dst_address=inet_pton(AF_INET, '2.3.4.5'), is_ipv6=0)
238 sw_if_index = rv.sw_if_index
239 self.vapi.ipip_del_tunnel(sw_if_index)
241 def test_ipip_vrf_create(self):
242 """ ipip create / delete interface VRF test """
244 t = VppIpTable(self, 20)
246 rv = self.vapi.ipip_add_tunnel(
247 src_address=inet_pton(AF_INET, '1.2.3.4'),
248 dst_address=inet_pton(AF_INET, '2.3.4.5'), is_ipv6=0,
250 sw_if_index = rv.sw_if_index
251 self.vapi.ipip_del_tunnel(sw_if_index)
253 def payload(self, len):
257 class TestIPIP6(VppTestCase):
258 """ IPIP6 Test Case """
262 super(TestIPIP6, cls).setUpClass()
263 cls.create_pg_interfaces(range(2))
264 cls.interfaces = list(cls.pg_interfaces)
267 for i in self.interfaces:
277 if not self.vpp_dead:
278 self.destroy_tunnel()
279 for i in self.pg_interfaces:
283 super(TestIPIP6, self).tearDown()
285 def setup_tunnel(self):
287 rv = self.vapi.ipip_add_tunnel(
288 src_address=self.pg0.local_ip6n,
289 dst_address=self.pg1.remote_ip6n, tc_tos=255)
291 sw_if_index = rv.sw_if_index
292 self.tunnel_if_index = sw_if_index
293 self.vapi.sw_interface_set_flags(sw_if_index, 1)
294 self.vapi.sw_interface_set_unnumbered(
295 ip_sw_if_index=self.pg0.sw_if_index, sw_if_index=sw_if_index)
297 # Add IPv4 and IPv6 routes via tunnel interface
298 ip4_via_tunnel = VppIpRoute(
299 self, "130.67.0.0", 16,
300 [VppRoutePath("0.0.0.0",
302 proto=DpoProto.DPO_PROTO_IP4)], is_ip6=0)
303 ip4_via_tunnel.add_vpp_config()
305 ip6_via_tunnel = VppIpRoute(
309 proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1)
310 ip6_via_tunnel.add_vpp_config()
312 self.tunnel_ip6_via_tunnel = ip6_via_tunnel
313 self.tunnel_ip4_via_tunnel = ip4_via_tunnel
315 def destroy_tunnel(self):
317 self.tunnel_ip4_via_tunnel.remove_vpp_config()
318 self.tunnel_ip6_via_tunnel.remove_vpp_config()
320 rv = self.vapi.ipip_del_tunnel(sw_if_index=self.tunnel_if_index)
322 def validate(self, rx, expected):
323 self.assertEqual(rx, expected.__class__(str(expected)))
325 def generate_ip6_frags(self, payload_length, fragment_size):
326 p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
327 p_payload = UDP(sport=1234, dport=1234) / self.payload(payload_length)
328 p_ip6 = IPv6(src="1::1", dst=self.pg0.remote_ip6)
329 outer_ip6 = (p_ether / IPv6(src=self.pg1.remote_ip6,
330 dst=self.pg0.local_ip6) /
331 IPv6ExtHdrFragment() / p_ip6 / p_payload)
332 frags = fragment6(outer_ip6, fragment_size)
333 p6_reply = (p_ip6 / p_payload)
335 return frags, p6_reply
337 def generate_ip6_hairpin_frags(self, payload_length, fragment_size):
338 p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
339 p_payload = UDP(sport=1234, dport=1234) / self.payload(payload_length)
340 p_ip6 = IPv6(src="1::1", dst="dead::1")
341 outer_ip6 = (p_ether / IPv6(src=self.pg1.remote_ip6,
342 dst=self.pg0.local_ip6) /
343 IPv6ExtHdrFragment() / p_ip6 / p_payload)
344 frags = fragment6(outer_ip6, fragment_size)
346 p6_reply = (IPv6(src=self.pg0.local_ip6, dst=self.pg1.remote_ip6,
347 hlim=63) / p_ip6 / p_payload)
349 return frags, p6_reply
351 def test_encap(self):
352 """ ip{v4,v6} over ip6 test encap """
353 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
354 p_ip6 = IPv6(src="1::1", dst="DEAD::1", tc=42, nh='UDP')
355 p_ip4 = IP(src="1.2.3.4", dst="130.67.0.1", tos=42)
356 p_payload = UDP(sport=1234, dport=1234)
359 # IPv6 in to IPv6 tunnel
360 p6 = (p_ether / p_ip6 / p_payload)
361 p6_reply = (IPv6(src=self.pg0.local_ip6, dst=self.pg1.remote_ip6,
364 p6_reply[1].hlim -= 1
365 rx = self.send_and_expect(self.pg0, p6*11, self.pg1)
367 self.validate(p[1], p6_reply)
369 # IPv4 in to IPv6 tunnel
370 p4 = (p_ether / p_ip4 / p_payload)
371 p4_reply = (IPv6(src=self.pg0.local_ip6,
372 dst=self.pg1.remote_ip6, hlim=64, tc=42) /
375 rx = self.send_and_expect(self.pg0, p4*11, self.pg1)
377 self.validate(p[1], p4_reply)
379 def test_decap(self):
380 """ ip{v4,v6} over ip6 test decap """
382 p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
383 p_ip6 = IPv6(src="1::1", dst="DEAD::1", tc=42, nh='UDP')
384 p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
385 p_payload = UDP(sport=1234, dport=1234)
388 # IPv6 tunnel to IPv4
390 p4 = (p_ether / IPv6(src=self.pg1.remote_ip6,
391 dst=self.pg0.local_ip6) / p_ip4 / p_payload)
392 p4_reply = (p_ip4 / p_payload)
394 rx = self.send_and_expect(self.pg1, p4*11, self.pg0)
396 self.validate(p[1], p4_reply)
398 # IPv6 tunnel to IPv6
399 p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
400 p6 = (p_ether / IPv6(src=self.pg1.remote_ip6,
401 dst=self.pg0.local_ip6) / p_ip6 / p_payload)
402 p6_reply = (p_ip6 / p_payload)
404 rx = self.send_and_expect(self.pg1, p6*11, self.pg0)
406 self.validate(p[1], p6_reply)
409 """ ip{v4,v6} over ip6 test frag """
411 p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
412 p_ip6 = IPv6(src="1::1", dst="DEAD::1", tc=42, nh='UDP')
413 p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
414 p_payload = UDP(sport=1234, dport=1234)
417 # Fragmentation / Reassembly and Re-fragmentation
419 rv = self.vapi.ip_reassembly_enable_disable(
420 sw_if_index=self.pg1.sw_if_index,
423 # Send lots of fragments, verify reassembled packet
424 before_cnt = self.statistics.get_counter(
425 '/err/ipip6-input/packets decapsulated')
426 frags, p6_reply = self.generate_ip6_frags(3131, 1400)
428 for i in range(0, 1000):
430 self.pg1.add_stream(f)
431 self.pg_enable_capture()
433 rx = self.pg0.get_capture(1000)
436 self.validate(p[1], p6_reply)
438 cnt = self.statistics.get_counter(
439 '/err/ipip6-input/packets decapsulated')
440 self.assertEqual(cnt, before_cnt + 1000)
444 # TODO: Check out why reassembly of atomic fragments don't work
445 for i in range(10, 90):
446 frags, p6_reply = self.generate_ip6_frags(i * 100, 1000)
449 self.pg_enable_capture()
450 self.pg1.add_stream(f)
452 rx = self.pg0.get_capture(80)
455 self.validate(p[1], r[i])
458 # Simple fragmentation
459 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
460 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1280, 0, 0, 0])
462 # IPv6 in to IPv6 tunnel
463 p_payload = UDP(sport=1234, dport=1234) / self.payload(1300)
465 p6 = (p_ether / p_ip6 / p_payload)
466 p6_reply = (IPv6(src=self.pg0.local_ip6, dst=self.pg1.remote_ip6,
469 p6_reply[1].hlim -= 1
470 self.pg_enable_capture()
471 self.pg0.add_stream(p6)
473 rx = self.pg1.get_capture(2)
475 # Scapy defragment doesn't deal well with multiple layers
476 # of samy type / Ethernet header first
477 f = [p[1] for p in rx]
478 reass_pkt = defragment6(f)
479 self.validate(reass_pkt, p6_reply)
481 # Now try with re-fragmentation
483 # Send large fragments to tunnel head-end, for the tunnel head end
484 # to reassemble and then refragment out the tunnel again.
487 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1280, 0, 0, 0])
488 frags, p6_reply = self.generate_ip6_hairpin_frags(8000, 1200)
489 self.pg_enable_capture()
490 self.pg1.add_stream(frags)
492 rx = self.pg1.get_capture(7)
493 f = [p[1] for p in rx]
494 reass_pkt = defragment6(f)
496 self.validate(reass_pkt, p6_reply)
498 def test_ipip_create(self):
499 """ ipip create / delete interface test """
500 rv = self.vapi.ipip_add_tunnel(
501 src_address=inet_pton(AF_INET, '1.2.3.4'),
502 dst_address=inet_pton(AF_INET, '2.3.4.5'), is_ipv6=0)
503 sw_if_index = rv.sw_if_index
504 self.vapi.ipip_del_tunnel(sw_if_index)
506 def test_ipip_vrf_create(self):
507 """ ipip create / delete interface VRF test """
509 t = VppIpTable(self, 20)
511 rv = self.vapi.ipip_add_tunnel(
512 src_address=inet_pton(AF_INET, '1.2.3.4'),
513 dst_address=inet_pton(AF_INET, '2.3.4.5'), is_ipv6=0,
515 sw_if_index = rv.sw_if_index
516 self.vapi.ipip_del_tunnel(sw_if_index)
518 def payload(self, len):
522 if __name__ == '__main__':
523 unittest.main(testRunner=VppTestRunner)