6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from util import fragment_rfc791, fragment_rfc8200
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.inet import IP, UDP, ICMP, TCP
15 from scapy.layers.inet6 import (
24 class TestMAP(VppTestCase):
29 super(TestMAP, cls).setUpClass()
32 def tearDownClass(cls):
33 super(TestMAP, cls).tearDownClass()
36 super(TestMAP, self).setUp()
38 # create 2 pg interfaces
39 self.create_pg_interfaces(range(4))
41 # pg0 is 'inside' IPv4
44 self.pg0.resolve_arp()
45 self.pg0.generate_remote_hosts(2)
46 self.pg0.configure_ipv4_neighbors()
48 # pg1 is 'outside' IPv6
51 self.pg1.generate_remote_hosts(4)
52 self.pg1.configure_ipv6_neighbors()
55 super(TestMAP, self).tearDown()
56 for i in self.pg_interfaces:
61 def send_and_assert_encapped(self, packets, ip6_src, ip6_dst, dmac=None):
63 dmac = self.pg1.remote_mac
65 self.pg0.add_stream(packets)
67 self.pg_enable_capture(self.pg_interfaces)
70 capture = self.pg1.get_capture(len(packets))
71 for rx, tx in zip(capture, packets):
72 self.assertEqual(rx[Ether].dst, dmac)
73 self.assertEqual(rx[IP].src, tx[IP].src)
74 self.assertEqual(rx[IPv6].src, ip6_src)
75 self.assertEqual(rx[IPv6].dst, ip6_dst)
77 def send_and_assert_encapped_one(self, packet, ip6_src, ip6_dst, dmac=None):
78 return self.send_and_assert_encapped([packet], ip6_src, ip6_dst, dmac)
80 def test_api_map_domain_dump(self):
82 map_src = "3000::1/128"
83 client_pfx = "192.168.0.0/16"
85 index = self.vapi.map_add_domain(
86 ip4_prefix=client_pfx, ip6_prefix=map_dst, ip6_src=map_src, tag=tag
88 rv = self.vapi.map_domain_dump()
90 # restore the state early so as to not impact subsequent tests.
91 # If an assert fails, we will not get the chance to do it at the end.
92 self.vapi.map_del_domain(index=index)
94 self.assertGreater(len(rv), 0, "Expected output from 'map_domain_dump'")
96 # typedefs are returned as ipaddress objects.
97 # wrap results in str() ugh! to avoid the need to call unicode.
98 self.assertEqual(str(rv[0].ip4_prefix), client_pfx)
99 self.assertEqual(str(rv[0].ip6_prefix), map_dst)
100 self.assertEqual(str(rv[0].ip6_src), map_src)
102 self.assertEqual(rv[0].tag, tag, "output produced incorrect tag value.")
104 def create_domains(self, ip4_pfx_str, ip6_pfx_str, ip6_src_str):
105 ip4_pfx = ipaddress.ip_network(ip4_pfx_str)
106 ip6_dst = ipaddress.ip_network(ip6_pfx_str)
107 mod = ip4_pfx.num_addresses / 1024
109 for i in range(ip4_pfx.num_addresses):
110 rv = self.vapi.map_add_domain(
111 ip6_prefix=ip6_pfx_str,
112 ip4_prefix=str(ip4_pfx[i]) + "/32",
115 indicies.append(rv.index)
118 def test_api_map_domains_get(self):
119 # Create a bunch of domains
120 no_domains = 4096 # This must be large enough to ensure VPP suspends
121 domains = self.create_domains("130.67.0.0/20", "2001::/32", "2001::1/128")
122 self.assertEqual(len(domains), no_domains)
128 rv, details = self.vapi.map_domains_get(cursor=no_domains + 10)
129 self.assertEqual(rv.retval, -7)
131 # Delete a domain in the middle of walk
132 rv, details = self.vapi.map_domains_get(cursor=0)
133 self.assertEqual(rv.retval, -165)
134 self.vapi.map_del_domain(index=rv.cursor)
135 domains.remove(rv.cursor)
137 # Continue at point of deleted cursor
138 rv, details = self.vapi.map_domains_get(cursor=rv.cursor)
139 self.assertIn(rv.retval, [0, -165])
141 d = list(self.vapi.vpp.details_iter(self.vapi.map_domains_get))
142 self.assertEqual(len(d), no_domains - 1)
146 self.vapi.map_del_domain(index=i)
148 def test_map_e_udp(self):
152 # Add a route to the MAP-BR
154 map_br_pfx = "2001::"
156 map_route = VppIpRoute(
160 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
162 map_route.add_vpp_config()
165 # Add a domain that maps from pg0 to pg1
167 map_dst = "2001::/32"
168 map_src = "3000::1/128"
169 client_pfx = "192.168.0.0/16"
170 map_translated_addr = "2001:0:101:7000:0:c0a8:101:7"
172 self.vapi.map_add_domain(
173 ip4_prefix=client_pfx,
182 self.vapi.map_param_set_security_check(enable=1, fragments=1)
184 # Enable MAP on interface.
185 self.vapi.map_if_enable_disable(
186 is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=0
189 # Ensure MAP doesn't steal all packets!
191 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
192 / IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4)
193 / UDP(sport=20000, dport=10000)
196 rx = self.send_and_expect(self.pg0, v4 * 4, self.pg0)
200 self.validate(p[1], v4_reply)
203 # Fire in a v4 packet that will be encapped to the BR
206 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
207 / IP(src=self.pg0.remote_ip4, dst="192.168.1.1")
208 / UDP(sport=20000, dport=10000)
212 self.send_and_assert_encapped(v4 * 4, "3000::1", map_translated_addr)
215 # Verify reordered fragments are able to pass as well
218 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
219 / IP(id=1, src=self.pg0.remote_ip4, dst="192.168.1.1")
220 / UDP(sport=20000, dport=10000)
221 / Raw(b"\xa5" * 1000)
224 frags = fragment_rfc791(v4, 400)
227 self.send_and_assert_encapped(frags, "3000::1", map_translated_addr)
229 # Enable MAP on interface.
230 self.vapi.map_if_enable_disable(
231 is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=0
234 # Ensure MAP doesn't steal all packets
236 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
237 / IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6)
238 / UDP(sport=20000, dport=10000)
241 rx = self.send_and_expect(self.pg1, v6 * 1, self.pg1)
245 self.validate(p[1], v6_reply)
248 # Fire in a V6 encapped packet.
249 # expect a decapped packet on the inside ip4 link
252 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
253 / IPv6(dst="3000::1", src=map_translated_addr)
254 / IP(dst=self.pg0.remote_ip4, src="192.168.1.1")
255 / UDP(sport=10000, dport=20000)
259 self.pg1.add_stream(p)
261 self.pg_enable_capture(self.pg_interfaces)
264 rx = self.pg0.get_capture(1)
267 self.assertFalse(rx.haslayer(IPv6))
268 self.assertEqual(rx[IP].src, p[IP].src)
269 self.assertEqual(rx[IP].dst, p[IP].dst)
272 # Verify encapped reordered fragments pass as well
275 IP(id=1, dst=self.pg0.remote_ip4, src="192.168.1.1")
276 / UDP(sport=10000, dport=20000)
277 / Raw(b"\xa5" * 1500)
279 frags = fragment_rfc791(p, 400)
283 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
284 / IPv6(dst="3000::1", src=map_translated_addr)
289 self.pg1.add_stream(stream)
291 self.pg_enable_capture(self.pg_interfaces)
294 rx = self.pg0.get_capture(len(frags))
297 self.assertFalse(r.haslayer(IPv6))
298 self.assertEqual(r[IP].src, p[IP].src)
299 self.assertEqual(r[IP].dst, p[IP].dst)
301 # Verify that fragments pass even if ipv6 layer is fragmented
302 stream = (IPv6(dst="3000::1", src=map_translated_addr) / x for x in frags)
305 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / x
306 for i in range(len(frags))
307 for x in fragment_rfc8200(
308 IPv6(dst="3000::1", src=map_translated_addr) / frags[i], i, 200
312 self.pg1.add_stream(v6_stream)
314 self.pg_enable_capture(self.pg_interfaces)
317 rx = self.pg0.get_capture(len(frags))
320 self.assertFalse(r.haslayer(IPv6))
321 self.assertEqual(r[IP].src, p[IP].src)
322 self.assertEqual(r[IP].dst, p[IP].dst)
325 # Pre-resolve. No API for this!!
327 self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
329 self.send_and_assert_no_replies(self.pg0, v4, "resolved via default route")
332 # Add a route to 4001::1. Expect the encapped traffic to be
333 # sent via that routes next-hop
335 pre_res_route = VppIpRoute(
339 [VppRoutePath(self.pg1.remote_hosts[2].ip6, self.pg1.sw_if_index)],
341 pre_res_route.add_vpp_config()
343 self.send_and_assert_encapped_one(
344 v4, "3000::1", map_translated_addr, dmac=self.pg1.remote_hosts[2].mac
348 # change the route to the pre-solved next-hop
350 pre_res_route.modify(
351 [VppRoutePath(self.pg1.remote_hosts[3].ip6, self.pg1.sw_if_index)]
353 pre_res_route.add_vpp_config()
355 self.send_and_assert_encapped_one(
356 v4, "3000::1", map_translated_addr, dmac=self.pg1.remote_hosts[3].mac
360 # cleanup. The test infra's object registry will ensure
361 # the route is really gone and thus that the unresolve worked.
363 pre_res_route.remove_vpp_config()
364 self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
366 def test_map_e_inner_frag(self):
367 """MAP-E Inner fragmentation"""
370 # Add a route to the MAP-BR
372 map_br_pfx = "2001::"
374 map_route = VppIpRoute(
378 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
380 map_route.add_vpp_config()
383 # Add a domain that maps from pg0 to pg1
385 map_dst = "2001::/32"
386 map_src = "3000::1/128"
387 client_pfx = "192.168.0.0/16"
388 map_translated_addr = "2001:0:101:7000:0:c0a8:101:7"
390 self.vapi.map_add_domain(
391 ip4_prefix=client_pfx,
401 # Enable MAP on interface.
402 self.vapi.map_if_enable_disable(
403 is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=0
406 # Enable inner fragmentation
407 self.vapi.map_param_set_fragmentation(inner=1)
410 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
411 / IP(src=self.pg0.remote_ip4, dst="192.168.1.1")
412 / UDP(sport=20000, dport=10000)
413 / Raw(b"\xa5" * 1300)
416 self.pg_send(self.pg0, v4 * 1)
417 rx = self.pg1.get_capture(2)
419 # 1000-sizeof(ip6_header_t) = 960.
420 frags = fragment_rfc791(v4[1], 960)
428 v6_reply1 = IPv6(src="3000::1", dst=map_translated_addr, hlim=63) / frags[0]
429 v6_reply2 = IPv6(src="3000::1", dst=map_translated_addr, hlim=63) / frags[1]
434 rx[0][1][IP].chksum = 0
435 rx[1][1][IP].chksum = 0
437 self.validate(rx[0][1], v6_reply1)
438 self.validate(rx[1][1], v6_reply2)
440 def test_map_e_tcp_mss(self):
444 # Add a route to the MAP-BR
446 map_br_pfx = "2001::"
448 map_route = VppIpRoute(
452 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
454 map_route.add_vpp_config()
457 # Add a domain that maps from pg0 to pg1
459 map_dst = "2001::/32"
460 map_src = "3000::1/128"
461 client_pfx = "192.168.0.0/16"
462 map_translated_addr = "2001:0:101:5000:0:c0a8:101:5"
463 tag = "MAP-E TCP tag."
464 self.vapi.map_add_domain(
465 ip4_prefix=client_pfx,
474 # Enable MAP on pg0 interface.
475 self.vapi.map_if_enable_disable(
476 is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=0
479 # Enable MAP on pg1 interface.
480 self.vapi.map_if_enable_disable(
481 is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=0
486 self.vapi.map_param_set_tcp(mss_clamp)
489 # Send a v4 packet that will be encapped.
491 p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
492 p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.1.1")
493 p_tcp = TCP(sport=20000, dport=30000, flags="S", options=[("MSS", 1455)])
494 p4 = p_ether / p_ip4 / p_tcp
496 self.pg1.add_stream(p4)
497 self.pg_enable_capture(self.pg_interfaces)
500 rx = self.pg1.get_capture(1)
503 self.assertTrue(rx.haslayer(IPv6))
504 self.assertEqual(rx[IP].src, p4[IP].src)
505 self.assertEqual(rx[IP].dst, p4[IP].dst)
506 self.assertEqual(rx[IPv6].src, "3000::1")
507 self.assertEqual(rx[TCP].options, TCP(options=[("MSS", mss_clamp)]).options)
509 def validate(self, rx, expected):
510 self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
512 def validate_frag6(self, p6_frag, p_ip6_expected):
513 self.assertFalse(p6_frag.haslayer(IP))
514 self.assertTrue(p6_frag.haslayer(IPv6))
515 self.assertTrue(p6_frag.haslayer(IPv6ExtHdrFragment))
516 self.assertEqual(p6_frag[IPv6].src, p_ip6_expected.src)
517 self.assertEqual(p6_frag[IPv6].dst, p_ip6_expected.dst)
519 def validate_frag_payload_len6(self, rx, proto, payload_len_expected):
522 payload_total += p[IPv6].plen
524 # First fragment has proto
525 payload_total -= len(proto())
527 # Every fragment has IPv6 fragment header
528 payload_total -= len(IPv6ExtHdrFragment()) * len(rx)
530 self.assertEqual(payload_total, payload_len_expected)
532 def validate_frag4(self, p4_frag, p_ip4_expected):
533 self.assertFalse(p4_frag.haslayer(IPv6))
534 self.assertTrue(p4_frag.haslayer(IP))
535 self.assertTrue(p4_frag[IP].frag != 0 or p4_frag[IP].flags.MF)
536 self.assertEqual(p4_frag[IP].src, p_ip4_expected.src)
537 self.assertEqual(p4_frag[IP].dst, p_ip4_expected.dst)
539 def validate_frag_payload_len4(self, rx, proto, payload_len_expected):
542 payload_total += len(p[IP].payload)
544 # First fragment has proto
545 payload_total -= len(proto())
547 self.assertEqual(payload_total, payload_len_expected)
549 def payload(self, len):
552 def test_map_t(self):
556 # Add a domain that maps from pg0 to pg1
558 map_dst = "2001:db8::/32"
559 map_src = "1234:5678:90ab:cdef::/64"
560 ip4_pfx = "192.168.0.0/24"
563 self.vapi.map_add_domain(
574 # Enable MAP-T on interfaces.
575 self.vapi.map_if_enable_disable(
576 is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=1
578 self.vapi.map_if_enable_disable(
579 is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
582 # Ensure MAP doesn't steal all packets!
584 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
585 / IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4)
586 / UDP(sport=20000, dport=10000)
589 rx = self.send_and_expect(self.pg0, v4 * 1, self.pg0)
593 self.validate(p[1], v4_reply)
594 # Ensure MAP doesn't steal all packets
596 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
597 / IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6)
598 / UDP(sport=20000, dport=10000)
601 rx = self.send_and_expect(self.pg1, v6 * 1, self.pg1)
605 self.validate(p[1], v6_reply)
607 map_route = VppIpRoute(
614 self.pg1.sw_if_index,
615 proto=DpoProto.DPO_PROTO_IP6,
619 map_route.add_vpp_config()
622 # Send a v4 packet that will be translated
624 p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
625 p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.0.1")
626 payload = TCP(sport=0xABCD, dport=0xABCD)
628 p4 = p_ether / p_ip4 / payload
630 IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1f0::c0a8:1:f")
633 p6_translated.hlim -= 1
634 rx = self.send_and_expect(self.pg0, p4 * 1, self.pg1)
636 self.validate(p[1], p6_translated)
638 # Send back an IPv6 packet that will be "untranslated"
639 p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
641 src="2001:db8:1f0::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
643 p6 = p_ether6 / p_ip6 / payload
644 p4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4) / payload
646 p4_translated.ttl -= 1
647 rx = self.send_and_expect(self.pg1, p6 * 1, self.pg0)
649 self.validate(p[1], p4_translated)
652 ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst="192.168.0.1", ttl=0)
653 p4 = p_ether / ip4_ttl_expired / payload
656 IP(id=0, ttl=254, src=self.pg0.local_ip4, dst=self.pg0.remote_ip4)
657 / ICMP(type="time-exceeded", code="ttl-zero-during-transit")
658 / IP(src=self.pg0.remote_ip4, dst="192.168.0.1", ttl=0)
661 rx = self.send_and_expect(self.pg0, p4 * 1, self.pg0)
663 self.validate(p[1], icmp4_reply)
666 ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst="192.168.0.1", ttl=1)
667 p4 = p_ether / ip4_ttl_expired / payload
670 IP(id=0, ttl=254, src=self.pg0.local_ip4, dst=self.pg0.remote_ip4)
671 / ICMP(type="time-exceeded", code="ttl-zero-during-transit")
672 / IP(src=self.pg0.remote_ip4, dst="192.168.0.1", ttl=1)
675 rx = self.send_and_expect(self.pg0, p4 * 1, self.pg0)
677 self.validate(p[1], icmp4_reply)
679 # IPv6 Hop limit at BR
680 ip6_hlim_expired = IPv6(
682 src="2001:db8:1ab::c0a8:1:ab",
683 dst="1234:5678:90ab:cdef:ac:1001:200:0",
685 p6 = p_ether6 / ip6_hlim_expired / payload
688 IPv6(hlim=255, src=self.pg1.local_ip6, dst="2001:db8:1ab::c0a8:1:ab")
689 / ICMPv6TimeExceeded(code=0)
691 src="2001:db8:1ab::c0a8:1:ab",
692 dst="1234:5678:90ab:cdef:ac:1001:200:0",
697 rx = self.send_and_expect(self.pg1, p6 * 1, self.pg1)
699 self.validate(p[1], icmp6_reply)
701 # IPv6 Hop limit beyond BR
702 ip6_hlim_expired = IPv6(
704 src="2001:db8:1ab::c0a8:1:ab",
705 dst="1234:5678:90ab:cdef:ac:1001:200:0",
707 p6 = p_ether6 / ip6_hlim_expired / payload
710 IPv6(hlim=255, src=self.pg1.local_ip6, dst="2001:db8:1ab::c0a8:1:ab")
711 / ICMPv6TimeExceeded(code=0)
713 src="2001:db8:1ab::c0a8:1:ab",
714 dst="1234:5678:90ab:cdef:ac:1001:200:0",
719 rx = self.send_and_expect(self.pg1, p6 * 1, self.pg1)
721 self.validate(p[1], icmp6_reply)
723 # IPv4 Well-known port
724 p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.0.1")
725 payload = UDP(sport=200, dport=200)
726 p4 = p_ether / p_ip4 / payload
727 self.send_and_assert_no_replies(self.pg0, p4 * 1)
729 # IPv6 Well-known port
730 payload = UDP(sport=200, dport=200)
731 p6 = p_ether6 / p_ip6 / payload
732 self.send_and_assert_no_replies(self.pg1, p6 * 1)
734 # UDP packet fragmentation
736 payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
737 p4 = p_ether / p_ip4 / payload
738 self.pg_enable_capture()
739 self.pg0.add_stream(p4)
741 rx = self.pg1.get_capture(2)
743 p_ip6_translated = IPv6(
744 src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1e0::c0a8:1:e"
747 self.validate_frag6(p, p_ip6_translated)
749 self.validate_frag_payload_len6(rx, UDP, payload_len)
751 # UDP packet fragmentation send fragments
753 payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
754 p4 = p_ether / p_ip4 / payload
755 frags = fragment_rfc791(p4, fragsize=1000)
756 self.pg_enable_capture()
757 self.pg0.add_stream(frags)
759 rx = self.pg1.get_capture(2)
762 self.validate_frag6(p, p_ip6_translated)
764 self.validate_frag_payload_len6(rx, UDP, payload_len)
766 # Send back an fragmented IPv6 UDP packet that will be "untranslated"
767 payload = UDP(sport=4000, dport=40000) / self.payload(payload_len)
768 p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
770 src="2001:db8:1e0::c0a8:1:e", dst="1234:5678:90ab:cdef:ac:1001:200:0"
772 p6 = p_ether6 / p_ip6 / payload
773 frags6 = fragment_rfc8200(p6, identification=0xDCBA, fragsize=1000)
775 p_ip4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4)
776 p4_translated = p_ip4_translated / payload
778 p4_translated.ttl -= 1
780 self.pg_enable_capture()
781 self.pg1.add_stream(frags6)
783 rx = self.pg0.get_capture(2)
786 self.validate_frag4(p, p4_translated)
788 self.validate_frag_payload_len4(rx, UDP, payload_len)
790 # ICMP packet fragmentation
791 payload = ICMP(id=6529) / self.payload(payload_len)
792 p4 = p_ether / p_ip4 / payload
793 self.pg_enable_capture()
794 self.pg0.add_stream(p4)
796 rx = self.pg1.get_capture(2)
798 p_ip6_translated = IPv6(
799 src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:160::c0a8:1:6"
802 self.validate_frag6(p, p_ip6_translated)
804 self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len)
806 # ICMP packet fragmentation send fragments
807 payload = ICMP(id=6529) / self.payload(payload_len)
808 p4 = p_ether / p_ip4 / payload
809 frags = fragment_rfc791(p4, fragsize=1000)
810 self.pg_enable_capture()
811 self.pg0.add_stream(frags)
813 rx = self.pg1.get_capture(2)
816 self.validate_frag6(p, p_ip6_translated)
818 self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len)
821 self.vapi.map_param_set_tcp(1300)
824 # Send a v4 TCP SYN packet that will be translated and MSS clamped
826 p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
827 p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.0.1")
828 payload = TCP(sport=0xABCD, dport=0xABCD, flags="S", options=[("MSS", 1460)])
830 p4 = p_ether / p_ip4 / payload
832 IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1f0::c0a8:1:f")
835 p6_translated.hlim -= 1
836 p6_translated[TCP].options = [("MSS", 1300)]
837 rx = self.send_and_expect(self.pg0, p4 * 1, self.pg1)
839 self.validate(p[1], p6_translated)
841 # Send back an IPv6 packet that will be "untranslated"
842 p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
844 src="2001:db8:1f0::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
846 p6 = p_ether6 / p_ip6 / payload
847 p4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4) / payload
849 p4_translated.ttl -= 1
850 p4_translated[TCP].options = [("MSS", 1300)]
851 rx = self.send_and_expect(self.pg1, p6 * 1, self.pg0)
853 self.validate(p[1], p4_translated)
855 # TCP MSS clamping cleanup
856 self.vapi.map_param_set_tcp(0)
858 # Enable icmp6 param to get back ICMPv6 unreachable messages in case
859 # of security check fails
860 self.vapi.map_param_set_icmp6(enable_unreachable=1)
862 # Send back an IPv6 packet that will be droppped due to security
864 p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
865 p_ip6_sec_check_fail = IPv6(
866 src="2001:db8:1fe::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
868 payload = TCP(sport=0xABCD, dport=0xABCD)
869 p6 = p_ether6 / p_ip6_sec_check_fail / payload
871 self.pg_send(self.pg1, p6 * 1)
872 self.pg0.get_capture(0, timeout=1)
873 rx = self.pg1.get_capture(1)
876 IPv6(hlim=255, src=self.pg1.local_ip6, dst="2001:db8:1fe::c0a8:1:f")
877 / ICMPv6DestUnreach(code=5)
878 / p_ip6_sec_check_fail
883 self.validate(p[1], icmp6_reply)
885 # ICMPv6 unreachable messages cleanup
886 self.vapi.map_param_set_icmp6(enable_unreachable=0)
888 def test_map_t_ip6_psid(self):
889 """MAP-T v6->v4 PSID validation"""
892 # Add a domain that maps from pg0 to pg1
894 map_dst = "2001:db8::/32"
895 map_src = "1234:5678:90ab:cdef::/64"
896 ip4_pfx = "192.168.0.0/24"
897 tag = "MAP-T Test Domain"
899 self.vapi.map_add_domain(
910 # Enable MAP-T on interfaces.
911 self.vapi.map_if_enable_disable(
912 is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=1
914 self.vapi.map_if_enable_disable(
915 is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
918 map_route = VppIpRoute(
925 self.pg1.sw_if_index,
926 proto=DpoProto.DPO_PROTO_IP6,
930 map_route.add_vpp_config()
932 p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
934 src="2001:db8:1f0::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
937 # Send good IPv6 source port, ensure translated IPv4 received
938 payload = TCP(sport=0xABCD, dport=80)
939 p6 = p_ether6 / p_ip6 / payload
940 p4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4) / payload
942 p4_translated.ttl -= 1
943 rx = self.send_and_expect(self.pg1, p6 * 1, self.pg0)
945 self.validate(p[1], p4_translated)
947 # Send bad IPv6 source port, ensure translated IPv4 not received
948 payload = TCP(sport=0xDCBA, dport=80)
949 p6 = p_ether6 / p_ip6 / payload
950 self.send_and_assert_no_replies(self.pg1, p6 * 1)
952 def test_map_t_pre_resolve(self):
953 """MAP-T pre-resolve"""
955 # Add a domain that maps from pg0 to pg1
956 map_dst = "2001:db8::/32"
957 map_src = "1234:5678:90ab:cdef::/64"
958 ip4_pfx = "192.168.0.0/24"
959 tag = "MAP-T Test Domain."
961 self.vapi.map_add_domain(
972 # Enable MAP-T on interfaces.
973 self.vapi.map_if_enable_disable(
974 is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=1
976 self.vapi.map_if_enable_disable(
977 is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
980 # Enable pre-resolve option
981 self.vapi.map_param_add_del_pre_resolve(
982 ip4_nh_address="10.1.2.3", ip6_nh_address="4001::1", is_add=1
985 # Add a route to 4001::1 and expect the translated traffic to be
986 # sent via that route next-hop.
987 pre_res_route6 = VppIpRoute(
991 [VppRoutePath(self.pg1.remote_hosts[2].ip6, self.pg1.sw_if_index)],
993 pre_res_route6.add_vpp_config()
995 # Add a route to 10.1.2.3 and expect the "untranslated" traffic to be
996 # sent via that route next-hop.
997 pre_res_route4 = VppIpRoute(
1001 [VppRoutePath(self.pg0.remote_hosts[1].ip4, self.pg0.sw_if_index)],
1003 pre_res_route4.add_vpp_config()
1005 # Send an IPv4 packet that will be translated
1006 p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1007 p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.0.1")
1008 payload = TCP(sport=0xABCD, dport=0xABCD)
1009 p4 = p_ether / p_ip4 / payload
1012 IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1f0::c0a8:1:f")
1015 p6_translated.hlim -= 1
1017 rx = self.send_and_expect(self.pg0, p4 * 1, self.pg1)
1019 self.assertEqual(p[Ether].dst, self.pg1.remote_hosts[2].mac)
1020 self.validate(p[1], p6_translated)
1022 # Send back an IPv6 packet that will be "untranslated"
1023 p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1025 src="2001:db8:1f0::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
1027 p6 = p_ether6 / p_ip6 / payload
1029 p4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4) / payload
1030 p4_translated.id = 0
1031 p4_translated.ttl -= 1
1033 rx = self.send_and_expect(self.pg1, p6 * 1, self.pg0)
1035 self.assertEqual(p[Ether].dst, self.pg0.remote_hosts[1].mac)
1036 self.validate(p[1], p4_translated)
1038 # Cleanup pre-resolve option
1039 self.vapi.map_param_add_del_pre_resolve(
1040 ip4_nh_address="10.1.2.3", ip6_nh_address="4001::1", is_add=0
1044 if __name__ == "__main__":
1045 unittest.main(testRunner=VppTestRunner)