8ddc6bd5cd3233f3eaba25c36920486cdff0bfe7
[vpp.git] / test / test_map.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from util import fragment_rfc791, fragment_rfc8200
10
11 import scapy.compat
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.inet import IP, UDP, ICMP, TCP
15 from scapy.layers.inet6 import (
16     IPv6,
17     ICMPv6TimeExceeded,
18     IPv6ExtHdrFragment,
19     ICMPv6EchoRequest,
20     ICMPv6DestUnreach,
21 )
22
23
24 class TestMAP(VppTestCase):
25     """MAP Test Case"""
26
27     @classmethod
28     def setUpClass(cls):
29         super(TestMAP, cls).setUpClass()
30
31     @classmethod
32     def tearDownClass(cls):
33         super(TestMAP, cls).tearDownClass()
34
35     def setUp(self):
36         super(TestMAP, self).setUp()
37
38         # create 2 pg interfaces
39         self.create_pg_interfaces(range(4))
40
41         # pg0 is 'inside' IPv4
42         self.pg0.admin_up()
43         self.pg0.config_ip4()
44         self.pg0.resolve_arp()
45         self.pg0.generate_remote_hosts(2)
46         self.pg0.configure_ipv4_neighbors()
47
48         # pg1 is 'outside' IPv6
49         self.pg1.admin_up()
50         self.pg1.config_ip6()
51         self.pg1.generate_remote_hosts(4)
52         self.pg1.configure_ipv6_neighbors()
53
54     def tearDown(self):
55         super(TestMAP, self).tearDown()
56
57         for i in self.pg_interfaces:
58             for t in (0, 1):
59                 self.vapi.map_if_enable_disable(
60                     is_enable=0, sw_if_index=i.sw_if_index, is_translation=t
61                 )
62             i.unconfig_ip4()
63             i.unconfig_ip6()
64             i.admin_down()
65
66     def send_and_assert_encapped(self, packets, ip6_src, ip6_dst, dmac=None):
67         if not dmac:
68             dmac = self.pg1.remote_mac
69
70         self.pg0.add_stream(packets)
71
72         self.pg_enable_capture(self.pg_interfaces)
73         self.pg_start()
74
75         capture = self.pg1.get_capture(len(packets))
76         for rx, tx in zip(capture, packets):
77             self.assertEqual(rx[Ether].dst, dmac)
78             self.assertEqual(rx[IP].src, tx[IP].src)
79             self.assertEqual(rx[IPv6].src, ip6_src)
80             self.assertEqual(rx[IPv6].dst, ip6_dst)
81
82     def send_and_assert_encapped_one(self, packet, ip6_src, ip6_dst, dmac=None):
83         return self.send_and_assert_encapped([packet], ip6_src, ip6_dst, dmac)
84
85     def test_api_map_domain_dump(self):
86         map_dst = "2001::/64"
87         map_src = "3000::1/128"
88         client_pfx = "192.168.0.0/16"
89         tag = "MAP-E tag."
90         index = self.vapi.map_add_domain(
91             ip4_prefix=client_pfx, ip6_prefix=map_dst, ip6_src=map_src, tag=tag
92         ).index
93         rv = self.vapi.map_domain_dump()
94
95         # restore the state early so as to not impact subsequent tests.
96         # If an assert fails, we will not get the chance to do it at the end.
97         self.vapi.map_del_domain(index=index)
98
99         self.assertGreater(len(rv), 0, "Expected output from 'map_domain_dump'")
100
101         # typedefs are returned as ipaddress objects.
102         # wrap results in str() ugh! to avoid the need to call unicode.
103         self.assertEqual(str(rv[0].ip4_prefix), client_pfx)
104         self.assertEqual(str(rv[0].ip6_prefix), map_dst)
105         self.assertEqual(str(rv[0].ip6_src), map_src)
106
107         self.assertEqual(rv[0].tag, tag, "output produced incorrect tag value.")
108
109     def create_domains(self, ip4_pfx_str, ip6_pfx_str, ip6_src_str):
110         ip4_pfx = ipaddress.ip_network(ip4_pfx_str)
111         ip6_dst = ipaddress.ip_network(ip6_pfx_str)
112         mod = ip4_pfx.num_addresses / 1024
113         indicies = []
114         for i in range(ip4_pfx.num_addresses):
115             rv = self.vapi.map_add_domain(
116                 ip6_prefix=ip6_pfx_str,
117                 ip4_prefix=str(ip4_pfx[i]) + "/32",
118                 ip6_src=ip6_src_str,
119             )
120             indicies.append(rv.index)
121         return indicies
122
123     def test_api_map_domains_get(self):
124         # Create a bunch of domains
125         no_domains = 4096  # This must be large enough to ensure VPP suspends
126         domains = self.create_domains("130.67.0.0/20", "2001::/32", "2001::1/128")
127         self.assertEqual(len(domains), no_domains)
128
129         d = []
130         cursor = 0
131
132         # Invalid cursor
133         rv, details = self.vapi.map_domains_get(cursor=no_domains + 10)
134         self.assertEqual(rv.retval, -7)
135
136         # Delete a domain in the middle of walk
137         rv, details = self.vapi.map_domains_get(cursor=0)
138         self.assertEqual(rv.retval, -165)
139         self.vapi.map_del_domain(index=rv.cursor)
140         domains.remove(rv.cursor)
141
142         # Continue at point of deleted cursor
143         rv, details = self.vapi.map_domains_get(cursor=rv.cursor)
144         self.assertIn(rv.retval, [0, -165])
145
146         d = list(self.vapi.vpp.details_iter(self.vapi.map_domains_get))
147         self.assertEqual(len(d), no_domains - 1)
148
149         # Clean up
150         for i in domains:
151             self.vapi.map_del_domain(index=i)
152
153     def test_map_e_udp(self):
154         """MAP-E UDP"""
155
156         #
157         # Add a route to the MAP-BR
158         #
159         map_br_pfx = "2001::"
160         map_br_pfx_len = 32
161         map_route = VppIpRoute(
162             self,
163             map_br_pfx,
164             map_br_pfx_len,
165             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
166         )
167         map_route.add_vpp_config()
168
169         #
170         # Add a domain that maps from pg0 to pg1
171         #
172         map_dst = "2001::/32"
173         map_src = "3000::1/128"
174         client_pfx = "192.168.0.0/16"
175         map_translated_addr = "2001:0:101:7000:0:c0a8:101:7"
176         tag = "MAP-E tag."
177         self.vapi.map_add_domain(
178             ip4_prefix=client_pfx,
179             ip6_prefix=map_dst,
180             ip6_src=map_src,
181             ea_bits_len=20,
182             psid_offset=4,
183             psid_length=4,
184             tag=tag,
185         )
186
187         self.vapi.map_param_set_security_check(enable=1, fragments=1)
188
189         # Enable MAP on interface.
190         self.vapi.map_if_enable_disable(
191             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=0
192         )
193
194         # Ensure MAP doesn't steal all packets!
195         v4 = (
196             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
197             / IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4)
198             / UDP(sport=20000, dport=10000)
199             / Raw(b"\xa5" * 100)
200         )
201         rx = self.send_and_expect(self.pg0, v4 * 4, self.pg0)
202         v4_reply = v4[1]
203         v4_reply.ttl -= 1
204         for p in rx:
205             self.validate(p[1], v4_reply)
206
207         #
208         # Fire in a v4 packet that will be encapped to the BR
209         #
210         v4 = (
211             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
212             / IP(src=self.pg0.remote_ip4, dst="192.168.1.1")
213             / UDP(sport=20000, dport=10000)
214             / Raw(b"\xa5" * 100)
215         )
216
217         self.send_and_assert_encapped(v4 * 4, "3000::1", map_translated_addr)
218
219         #
220         # Verify reordered fragments are able to pass as well
221         #
222         v4 = (
223             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
224             / IP(id=1, src=self.pg0.remote_ip4, dst="192.168.1.1")
225             / UDP(sport=20000, dport=10000)
226             / Raw(b"\xa5" * 1000)
227         )
228
229         frags = fragment_rfc791(v4, 400)
230         frags.reverse()
231
232         self.send_and_assert_encapped(frags, "3000::1", map_translated_addr)
233
234         # Enable MAP on interface.
235         self.vapi.map_if_enable_disable(
236             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=0
237         )
238
239         # Ensure MAP doesn't steal all packets
240         v6 = (
241             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
242             / IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6)
243             / UDP(sport=20000, dport=10000)
244             / Raw(b"\xa5" * 100)
245         )
246         rx = self.send_and_expect(self.pg1, v6 * 1, self.pg1)
247         v6_reply = v6[1]
248         v6_reply.hlim -= 1
249         for p in rx:
250             self.validate(p[1], v6_reply)
251
252         #
253         # Fire in a V6 encapped packet.
254         # expect a decapped packet on the inside ip4 link
255         #
256         p = (
257             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
258             / IPv6(dst="3000::1", src=map_translated_addr)
259             / IP(dst=self.pg0.remote_ip4, src="192.168.1.1")
260             / UDP(sport=10000, dport=20000)
261             / Raw(b"\xa5" * 100)
262         )
263
264         self.pg1.add_stream(p)
265
266         self.pg_enable_capture(self.pg_interfaces)
267         self.pg_start()
268
269         rx = self.pg0.get_capture(1)
270         rx = rx[0]
271
272         self.assertFalse(rx.haslayer(IPv6))
273         self.assertEqual(rx[IP].src, p[IP].src)
274         self.assertEqual(rx[IP].dst, p[IP].dst)
275
276         #
277         # Verify encapped reordered fragments pass as well
278         #
279         p = (
280             IP(id=1, dst=self.pg0.remote_ip4, src="192.168.1.1")
281             / UDP(sport=10000, dport=20000)
282             / Raw(b"\xa5" * 1500)
283         )
284         frags = fragment_rfc791(p, 400)
285         frags.reverse()
286
287         stream = (
288             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
289             / IPv6(dst="3000::1", src=map_translated_addr)
290             / x
291             for x in frags
292         )
293
294         self.pg1.add_stream(stream)
295
296         self.pg_enable_capture(self.pg_interfaces)
297         self.pg_start()
298
299         rx = self.pg0.get_capture(len(frags))
300
301         for r in rx:
302             self.assertFalse(r.haslayer(IPv6))
303             self.assertEqual(r[IP].src, p[IP].src)
304             self.assertEqual(r[IP].dst, p[IP].dst)
305
306         # Verify that fragments pass even if ipv6 layer is fragmented
307         stream = (IPv6(dst="3000::1", src=map_translated_addr) / x for x in frags)
308
309         v6_stream = [
310             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / x
311             for i in range(len(frags))
312             for x in fragment_rfc8200(
313                 IPv6(dst="3000::1", src=map_translated_addr) / frags[i], i, 200
314             )
315         ]
316
317         self.pg1.add_stream(v6_stream)
318
319         self.pg_enable_capture(self.pg_interfaces)
320         self.pg_start()
321
322         rx = self.pg0.get_capture(len(frags))
323
324         for r in rx:
325             self.assertFalse(r.haslayer(IPv6))
326             self.assertEqual(r[IP].src, p[IP].src)
327             self.assertEqual(r[IP].dst, p[IP].dst)
328
329         #
330         # Pre-resolve. No API for this!!
331         #
332         self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
333
334         self.send_and_assert_no_replies(self.pg0, v4, "resolved via default route")
335
336         #
337         # Add a route to 4001::1. Expect the encapped traffic to be
338         # sent via that routes next-hop
339         #
340         pre_res_route = VppIpRoute(
341             self,
342             "4001::1",
343             128,
344             [VppRoutePath(self.pg1.remote_hosts[2].ip6, self.pg1.sw_if_index)],
345         )
346         pre_res_route.add_vpp_config()
347
348         self.send_and_assert_encapped_one(
349             v4, "3000::1", map_translated_addr, dmac=self.pg1.remote_hosts[2].mac
350         )
351
352         #
353         # change the route to the pre-solved next-hop
354         #
355         pre_res_route.modify(
356             [VppRoutePath(self.pg1.remote_hosts[3].ip6, self.pg1.sw_if_index)]
357         )
358         pre_res_route.add_vpp_config()
359
360         self.send_and_assert_encapped_one(
361             v4, "3000::1", map_translated_addr, dmac=self.pg1.remote_hosts[3].mac
362         )
363
364         #
365         # cleanup. The test infra's object registry will ensure
366         # the route is really gone and thus that the unresolve worked.
367         #
368         pre_res_route.remove_vpp_config()
369         self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
370
371     def test_map_e_inner_frag(self):
372         """MAP-E Inner fragmentation"""
373
374         #
375         # Add a route to the MAP-BR
376         #
377         map_br_pfx = "2001::"
378         map_br_pfx_len = 32
379         map_route = VppIpRoute(
380             self,
381             map_br_pfx,
382             map_br_pfx_len,
383             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
384         )
385         map_route.add_vpp_config()
386
387         #
388         # Add a domain that maps from pg0 to pg1
389         #
390         map_dst = "2001::/32"
391         map_src = "3000::1/128"
392         client_pfx = "192.168.0.0/16"
393         map_translated_addr = "2001:0:101:7000:0:c0a8:101:7"
394         tag = "MAP-E tag."
395         self.vapi.map_add_domain(
396             ip4_prefix=client_pfx,
397             ip6_prefix=map_dst,
398             ip6_src=map_src,
399             ea_bits_len=20,
400             psid_offset=4,
401             psid_length=4,
402             mtu=1000,
403             tag=tag,
404         )
405
406         # Enable MAP on interface.
407         self.vapi.map_if_enable_disable(
408             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=0
409         )
410
411         # Enable inner fragmentation
412         self.vapi.map_param_set_fragmentation(inner=1)
413
414         v4 = (
415             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
416             / IP(src=self.pg0.remote_ip4, dst="192.168.1.1")
417             / UDP(sport=20000, dport=10000)
418             / Raw(b"\xa5" * 1300)
419         )
420
421         self.pg_send(self.pg0, v4 * 1)
422         rx = self.pg1.get_capture(2)
423
424         # 1000-sizeof(ip6_header_t) = 960.
425         frags = fragment_rfc791(v4[1], 960)
426         frags[0].id = 0
427         frags[1].id = 0
428         frags[0].ttl -= 1
429         frags[1].ttl -= 1
430         frags[0].chksum = 0
431         frags[1].chksum = 0
432
433         v6_reply1 = IPv6(src="3000::1", dst=map_translated_addr, hlim=63) / frags[0]
434         v6_reply2 = IPv6(src="3000::1", dst=map_translated_addr, hlim=63) / frags[1]
435         rx[0][1].fl = 0
436         rx[1][1].fl = 0
437         rx[0][1][IP].id = 0
438         rx[1][1][IP].id = 0
439         rx[0][1][IP].chksum = 0
440         rx[1][1][IP].chksum = 0
441
442         self.validate(rx[0][1], v6_reply1)
443         self.validate(rx[1][1], v6_reply2)
444
445     def test_map_e_tcp_mss(self):
446         """MAP-E TCP MSS"""
447
448         #
449         # Add a route to the MAP-BR
450         #
451         map_br_pfx = "2001::"
452         map_br_pfx_len = 32
453         map_route = VppIpRoute(
454             self,
455             map_br_pfx,
456             map_br_pfx_len,
457             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
458         )
459         map_route.add_vpp_config()
460
461         #
462         # Add a domain that maps from pg0 to pg1
463         #
464         map_dst = "2001::/32"
465         map_src = "3000::1/128"
466         client_pfx = "192.168.0.0/16"
467         map_translated_addr = "2001:0:101:5000:0:c0a8:101:5"
468         tag = "MAP-E TCP tag."
469         self.vapi.map_add_domain(
470             ip4_prefix=client_pfx,
471             ip6_prefix=map_dst,
472             ip6_src=map_src,
473             ea_bits_len=20,
474             psid_offset=4,
475             psid_length=4,
476             tag=tag,
477         )
478
479         # Enable MAP on pg0 interface.
480         self.vapi.map_if_enable_disable(
481             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=0
482         )
483
484         # Enable MAP on pg1 interface.
485         self.vapi.map_if_enable_disable(
486             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=0
487         )
488
489         # TCP MSS clamping
490         mss_clamp = 1300
491         self.vapi.map_param_set_tcp(mss_clamp)
492
493         #
494         # Send a v4 packet that will be encapped.
495         #
496         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
497         p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.1.1")
498         p_tcp = TCP(sport=20000, dport=30000, flags="S", options=[("MSS", 1455)])
499         p4 = p_ether / p_ip4 / p_tcp
500
501         self.pg1.add_stream(p4)
502         self.pg_enable_capture(self.pg_interfaces)
503         self.pg_start()
504
505         rx = self.pg1.get_capture(1)
506         rx = rx[0]
507
508         self.assertTrue(rx.haslayer(IPv6))
509         self.assertEqual(rx[IP].src, p4[IP].src)
510         self.assertEqual(rx[IP].dst, p4[IP].dst)
511         self.assertEqual(rx[IPv6].src, "3000::1")
512         self.assertEqual(rx[TCP].options, TCP(options=[("MSS", mss_clamp)]).options)
513
514     def validate(self, rx, expected):
515         self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
516
517     def validate_frag6(self, p6_frag, p_ip6_expected):
518         self.assertFalse(p6_frag.haslayer(IP))
519         self.assertTrue(p6_frag.haslayer(IPv6))
520         self.assertTrue(p6_frag.haslayer(IPv6ExtHdrFragment))
521         self.assertEqual(p6_frag[IPv6].src, p_ip6_expected.src)
522         self.assertEqual(p6_frag[IPv6].dst, p_ip6_expected.dst)
523
524     def validate_frag_payload_len6(self, rx, proto, payload_len_expected):
525         payload_total = 0
526         for p in rx:
527             payload_total += p[IPv6].plen
528
529         # First fragment has proto
530         payload_total -= len(proto())
531
532         # Every fragment has IPv6 fragment header
533         payload_total -= len(IPv6ExtHdrFragment()) * len(rx)
534
535         self.assertEqual(payload_total, payload_len_expected)
536
537     def validate_frag4(self, p4_frag, p_ip4_expected):
538         self.assertFalse(p4_frag.haslayer(IPv6))
539         self.assertTrue(p4_frag.haslayer(IP))
540         self.assertTrue(p4_frag[IP].frag != 0 or p4_frag[IP].flags.MF)
541         self.assertEqual(p4_frag[IP].src, p_ip4_expected.src)
542         self.assertEqual(p4_frag[IP].dst, p_ip4_expected.dst)
543
544     def validate_frag_payload_len4(self, rx, proto, payload_len_expected):
545         payload_total = 0
546         for p in rx:
547             payload_total += len(p[IP].payload)
548
549         # First fragment has proto
550         payload_total -= len(proto())
551
552         self.assertEqual(payload_total, payload_len_expected)
553
554     def payload(self, len):
555         return "x" * len
556
557     def test_map_t(self):
558         """MAP-T"""
559
560         #
561         # Add a domain that maps from pg0 to pg1
562         #
563         map_dst = "2001:db8::/32"
564         map_src = "1234:5678:90ab:cdef::/64"
565         ip4_pfx = "192.168.0.0/24"
566         tag = "MAP-T Tag."
567
568         self.vapi.map_add_domain(
569             ip6_prefix=map_dst,
570             ip4_prefix=ip4_pfx,
571             ip6_src=map_src,
572             ea_bits_len=16,
573             psid_offset=6,
574             psid_length=4,
575             mtu=1500,
576             tag=tag,
577         )
578
579         # Enable MAP-T on interfaces.
580         self.vapi.map_if_enable_disable(
581             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=1
582         )
583         self.vapi.map_if_enable_disable(
584             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
585         )
586
587         # Ensure MAP doesn't steal all packets!
588         v4 = (
589             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
590             / IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4)
591             / UDP(sport=20000, dport=10000)
592             / Raw(b"\xa5" * 100)
593         )
594         rx = self.send_and_expect(self.pg0, v4 * 1, self.pg0)
595         v4_reply = v4[1]
596         v4_reply.ttl -= 1
597         for p in rx:
598             self.validate(p[1], v4_reply)
599         # Ensure MAP doesn't steal all packets
600         v6 = (
601             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
602             / IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6)
603             / UDP(sport=20000, dport=10000)
604             / Raw(b"\xa5" * 100)
605         )
606         rx = self.send_and_expect(self.pg1, v6 * 1, self.pg1)
607         v6_reply = v6[1]
608         v6_reply.hlim -= 1
609         for p in rx:
610             self.validate(p[1], v6_reply)
611
612         map_route = VppIpRoute(
613             self,
614             "2001:db8::",
615             32,
616             [
617                 VppRoutePath(
618                     self.pg1.remote_ip6,
619                     self.pg1.sw_if_index,
620                     proto=DpoProto.DPO_PROTO_IP6,
621                 )
622             ],
623         )
624         map_route.add_vpp_config()
625
626         #
627         # Send a v4 packet that will be translated
628         #
629         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
630         p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.0.1")
631         payload = TCP(sport=0xABCD, dport=0xABCD)
632
633         p4 = p_ether / p_ip4 / payload
634         p6_translated = (
635             IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1f0::c0a8:1:f")
636             / payload
637         )
638         p6_translated.hlim -= 1
639         rx = self.send_and_expect(self.pg0, p4 * 1, self.pg1)
640         for p in rx:
641             self.validate(p[1], p6_translated)
642
643         # Send back an IPv6 packet that will be "untranslated"
644         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
645         p_ip6 = IPv6(
646             src="2001:db8:1f0::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
647         )
648         p6 = p_ether6 / p_ip6 / payload
649         p4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4) / payload
650         p4_translated.id = 0
651         p4_translated.ttl -= 1
652         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg0)
653         for p in rx:
654             self.validate(p[1], p4_translated)
655
656         # IPv4 TTL=0
657         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst="192.168.0.1", ttl=0)
658         p4 = p_ether / ip4_ttl_expired / payload
659
660         icmp4_reply = (
661             IP(id=0, ttl=254, src=self.pg0.local_ip4, dst=self.pg0.remote_ip4)
662             / ICMP(type="time-exceeded", code="ttl-zero-during-transit")
663             / IP(src=self.pg0.remote_ip4, dst="192.168.0.1", ttl=0)
664             / payload
665         )
666         rx = self.send_and_expect(self.pg0, p4 * 1, self.pg0)
667         for p in rx:
668             self.validate(p[1], icmp4_reply)
669
670         # IPv4 TTL=1
671         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst="192.168.0.1", ttl=1)
672         p4 = p_ether / ip4_ttl_expired / payload
673
674         icmp4_reply = (
675             IP(id=0, ttl=254, src=self.pg0.local_ip4, dst=self.pg0.remote_ip4)
676             / ICMP(type="time-exceeded", code="ttl-zero-during-transit")
677             / IP(src=self.pg0.remote_ip4, dst="192.168.0.1", ttl=1)
678             / payload
679         )
680         rx = self.send_and_expect(self.pg0, p4 * 1, self.pg0)
681         for p in rx:
682             self.validate(p[1], icmp4_reply)
683
684         # IPv6 Hop limit at BR
685         ip6_hlim_expired = IPv6(
686             hlim=1,
687             src="2001:db8:1ab::c0a8:1:ab",
688             dst="1234:5678:90ab:cdef:ac:1001:200:0",
689         )
690         p6 = p_ether6 / ip6_hlim_expired / payload
691
692         icmp6_reply = (
693             IPv6(hlim=255, src=self.pg1.local_ip6, dst="2001:db8:1ab::c0a8:1:ab")
694             / ICMPv6TimeExceeded(code=0)
695             / IPv6(
696                 src="2001:db8:1ab::c0a8:1:ab",
697                 dst="1234:5678:90ab:cdef:ac:1001:200:0",
698                 hlim=1,
699             )
700             / payload
701         )
702         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg1)
703         for p in rx:
704             self.validate(p[1], icmp6_reply)
705
706         # IPv6 Hop limit beyond BR
707         ip6_hlim_expired = IPv6(
708             hlim=0,
709             src="2001:db8:1ab::c0a8:1:ab",
710             dst="1234:5678:90ab:cdef:ac:1001:200:0",
711         )
712         p6 = p_ether6 / ip6_hlim_expired / payload
713
714         icmp6_reply = (
715             IPv6(hlim=255, src=self.pg1.local_ip6, dst="2001:db8:1ab::c0a8:1:ab")
716             / ICMPv6TimeExceeded(code=0)
717             / IPv6(
718                 src="2001:db8:1ab::c0a8:1:ab",
719                 dst="1234:5678:90ab:cdef:ac:1001:200:0",
720                 hlim=0,
721             )
722             / payload
723         )
724         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg1)
725         for p in rx:
726             self.validate(p[1], icmp6_reply)
727
728         # IPv4 Well-known port
729         p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.0.1")
730         payload = UDP(sport=200, dport=200)
731         p4 = p_ether / p_ip4 / payload
732         self.send_and_assert_no_replies(self.pg0, p4 * 1)
733
734         # IPv6 Well-known port
735         payload = UDP(sport=200, dport=200)
736         p6 = p_ether6 / p_ip6 / payload
737         self.send_and_assert_no_replies(self.pg1, p6 * 1)
738
739         # UDP packet fragmentation
740         payload_len = 1453
741         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
742         p4 = p_ether / p_ip4 / payload
743         self.pg_enable_capture()
744         self.pg0.add_stream(p4)
745         self.pg_start()
746         rx = self.pg1.get_capture(2)
747
748         p_ip6_translated = IPv6(
749             src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1e0::c0a8:1:e"
750         )
751         for p in rx:
752             self.validate_frag6(p, p_ip6_translated)
753
754         self.validate_frag_payload_len6(rx, UDP, payload_len)
755
756         # UDP packet fragmentation send fragments
757         payload_len = 1453
758         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
759         p4 = p_ether / p_ip4 / payload
760         frags = fragment_rfc791(p4, fragsize=1000)
761         self.pg_enable_capture()
762         self.pg0.add_stream(frags)
763         self.pg_start()
764         rx = self.pg1.get_capture(2)
765
766         for p in rx:
767             self.validate_frag6(p, p_ip6_translated)
768
769         self.validate_frag_payload_len6(rx, UDP, payload_len)
770
771         # Send back an fragmented IPv6 UDP packet that will be "untranslated"
772         payload = UDP(sport=4000, dport=40000) / self.payload(payload_len)
773         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
774         p_ip6 = IPv6(
775             src="2001:db8:1e0::c0a8:1:e", dst="1234:5678:90ab:cdef:ac:1001:200:0"
776         )
777         p6 = p_ether6 / p_ip6 / payload
778         frags6 = fragment_rfc8200(p6, identification=0xDCBA, fragsize=1000)
779
780         p_ip4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4)
781         p4_translated = p_ip4_translated / payload
782         p4_translated.id = 0
783         p4_translated.ttl -= 1
784
785         self.pg_enable_capture()
786         self.pg1.add_stream(frags6)
787         self.pg_start()
788         rx = self.pg0.get_capture(2)
789
790         for p in rx:
791             self.validate_frag4(p, p4_translated)
792
793         self.validate_frag_payload_len4(rx, UDP, payload_len)
794
795         # ICMP packet fragmentation
796         payload = ICMP(id=6529) / self.payload(payload_len)
797         p4 = p_ether / p_ip4 / payload
798         self.pg_enable_capture()
799         self.pg0.add_stream(p4)
800         self.pg_start()
801         rx = self.pg1.get_capture(2)
802
803         p_ip6_translated = IPv6(
804             src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:160::c0a8:1:6"
805         )
806         for p in rx:
807             self.validate_frag6(p, p_ip6_translated)
808
809         self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len)
810
811         # ICMP packet fragmentation send fragments
812         payload = ICMP(id=6529) / self.payload(payload_len)
813         p4 = p_ether / p_ip4 / payload
814         frags = fragment_rfc791(p4, fragsize=1000)
815         self.pg_enable_capture()
816         self.pg0.add_stream(frags)
817         self.pg_start()
818         rx = self.pg1.get_capture(2)
819
820         for p in rx:
821             self.validate_frag6(p, p_ip6_translated)
822
823         self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len)
824
825         # TCP MSS clamping
826         self.vapi.map_param_set_tcp(1300)
827
828         #
829         # Send a v4 TCP SYN packet that will be translated and MSS clamped
830         #
831         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
832         p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.0.1")
833         payload = TCP(sport=0xABCD, dport=0xABCD, flags="S", options=[("MSS", 1460)])
834
835         p4 = p_ether / p_ip4 / payload
836         p6_translated = (
837             IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1f0::c0a8:1:f")
838             / payload
839         )
840         p6_translated.hlim -= 1
841         p6_translated[TCP].options = [("MSS", 1300)]
842         rx = self.send_and_expect(self.pg0, p4 * 1, self.pg1)
843         for p in rx:
844             self.validate(p[1], p6_translated)
845
846         # Send back an IPv6 packet that will be "untranslated"
847         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
848         p_ip6 = IPv6(
849             src="2001:db8:1f0::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
850         )
851         p6 = p_ether6 / p_ip6 / payload
852         p4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4) / payload
853         p4_translated.id = 0
854         p4_translated.ttl -= 1
855         p4_translated[TCP].options = [("MSS", 1300)]
856         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg0)
857         for p in rx:
858             self.validate(p[1], p4_translated)
859
860         # TCP MSS clamping cleanup
861         self.vapi.map_param_set_tcp(0)
862
863         # Enable icmp6 param to get back ICMPv6 unreachable messages in case
864         # of security check fails
865         self.vapi.map_param_set_icmp6(enable_unreachable=1)
866
867         # Send back an IPv6 packet that will be droppped due to security
868         # check fail
869         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
870         p_ip6_sec_check_fail = IPv6(
871             src="2001:db8:1fe::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
872         )
873         payload = TCP(sport=0xABCD, dport=0xABCD)
874         p6 = p_ether6 / p_ip6_sec_check_fail / payload
875
876         self.pg_send(self.pg1, p6 * 1)
877         self.pg0.get_capture(0, timeout=1)
878         rx = self.pg1.get_capture(1)
879
880         icmp6_reply = (
881             IPv6(hlim=255, src=self.pg1.local_ip6, dst="2001:db8:1fe::c0a8:1:f")
882             / ICMPv6DestUnreach(code=5)
883             / p_ip6_sec_check_fail
884             / payload
885         )
886
887         for p in rx:
888             self.validate(p[1], icmp6_reply)
889
890         # ICMPv6 unreachable messages cleanup
891         self.vapi.map_param_set_icmp6(enable_unreachable=0)
892
893     def test_map_t_ip6_psid(self):
894         """MAP-T v6->v4 PSID validation"""
895
896         #
897         # Add a domain that maps from pg0 to pg1
898         #
899         map_dst = "2001:db8::/32"
900         map_src = "1234:5678:90ab:cdef::/64"
901         ip4_pfx = "192.168.0.0/24"
902         tag = "MAP-T Test Domain"
903
904         self.vapi.map_add_domain(
905             ip6_prefix=map_dst,
906             ip4_prefix=ip4_pfx,
907             ip6_src=map_src,
908             ea_bits_len=16,
909             psid_offset=6,
910             psid_length=4,
911             mtu=1500,
912             tag=tag,
913         )
914
915         # Enable MAP-T on interfaces.
916         self.vapi.map_if_enable_disable(
917             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=1
918         )
919         self.vapi.map_if_enable_disable(
920             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
921         )
922
923         map_route = VppIpRoute(
924             self,
925             "2001:db8::",
926             32,
927             [
928                 VppRoutePath(
929                     self.pg1.remote_ip6,
930                     self.pg1.sw_if_index,
931                     proto=DpoProto.DPO_PROTO_IP6,
932                 )
933             ],
934         )
935         map_route.add_vpp_config()
936
937         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
938         p_ip6 = IPv6(
939             src="2001:db8:1f0::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
940         )
941
942         # Send good IPv6 source port, ensure translated IPv4 received
943         payload = TCP(sport=0xABCD, dport=80)
944         p6 = p_ether6 / p_ip6 / payload
945         p4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4) / payload
946         p4_translated.id = 0
947         p4_translated.ttl -= 1
948         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg0)
949         for p in rx:
950             self.validate(p[1], p4_translated)
951
952         # Send bad IPv6 source port, ensure translated IPv4 not received
953         payload = TCP(sport=0xDCBA, dport=80)
954         p6 = p_ether6 / p_ip6 / payload
955         self.send_and_assert_no_replies(self.pg1, p6 * 1)
956
957     def test_map_t_pre_resolve(self):
958         """MAP-T pre-resolve"""
959
960         # Add a domain that maps from pg0 to pg1
961         map_dst = "2001:db8::/32"
962         map_src = "1234:5678:90ab:cdef::/64"
963         ip4_pfx = "192.168.0.0/24"
964         tag = "MAP-T Test Domain."
965
966         self.vapi.map_add_domain(
967             ip6_prefix=map_dst,
968             ip4_prefix=ip4_pfx,
969             ip6_src=map_src,
970             ea_bits_len=16,
971             psid_offset=6,
972             psid_length=4,
973             mtu=1500,
974             tag=tag,
975         )
976
977         # Enable MAP-T on interfaces.
978         self.vapi.map_if_enable_disable(
979             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=1
980         )
981         self.vapi.map_if_enable_disable(
982             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
983         )
984
985         # Enable pre-resolve option
986         self.vapi.map_param_add_del_pre_resolve(
987             ip4_nh_address="10.1.2.3", ip6_nh_address="4001::1", is_add=1
988         )
989
990         # Add a route to 4001::1 and expect the translated traffic to be
991         # sent via that route next-hop.
992         pre_res_route6 = VppIpRoute(
993             self,
994             "4001::1",
995             128,
996             [VppRoutePath(self.pg1.remote_hosts[2].ip6, self.pg1.sw_if_index)],
997         )
998         pre_res_route6.add_vpp_config()
999
1000         # Add a route to 10.1.2.3 and expect the "untranslated" traffic to be
1001         # sent via that route next-hop.
1002         pre_res_route4 = VppIpRoute(
1003             self,
1004             "10.1.2.3",
1005             32,
1006             [VppRoutePath(self.pg0.remote_hosts[1].ip4, self.pg0.sw_if_index)],
1007         )
1008         pre_res_route4.add_vpp_config()
1009
1010         # Send an IPv4 packet that will be translated
1011         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1012         p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.0.1")
1013         payload = TCP(sport=0xABCD, dport=0xABCD)
1014         p4 = p_ether / p_ip4 / payload
1015
1016         p6_translated = (
1017             IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1f0::c0a8:1:f")
1018             / payload
1019         )
1020         p6_translated.hlim -= 1
1021
1022         rx = self.send_and_expect(self.pg0, p4 * 1, self.pg1)
1023         for p in rx:
1024             self.assertEqual(p[Ether].dst, self.pg1.remote_hosts[2].mac)
1025             self.validate(p[1], p6_translated)
1026
1027         # Send back an IPv6 packet that will be "untranslated"
1028         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1029         p_ip6 = IPv6(
1030             src="2001:db8:1f0::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
1031         )
1032         p6 = p_ether6 / p_ip6 / payload
1033
1034         p4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4) / payload
1035         p4_translated.id = 0
1036         p4_translated.ttl -= 1
1037
1038         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg0)
1039         for p in rx:
1040             self.assertEqual(p[Ether].dst, self.pg0.remote_hosts[1].mac)
1041             self.validate(p[1], p4_translated)
1042
1043         # Cleanup pre-resolve option
1044         self.vapi.map_param_add_del_pre_resolve(
1045             ip4_nh_address="10.1.2.3", ip6_nh_address="4001::1", is_add=0
1046         )
1047
1048
1049 if __name__ == "__main__":
1050     unittest.main(testRunner=VppTestRunner)