16e9607cedcd6cfb65669ceae055b596ccac6a04
[vpp.git] / test / test_map.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from util import fragment_rfc791, fragment_rfc8200
10
11 import scapy.compat
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.inet import IP, UDP, ICMP, TCP
15 from scapy.layers.inet6 import (
16     IPv6,
17     ICMPv6TimeExceeded,
18     IPv6ExtHdrFragment,
19     ICMPv6EchoRequest,
20     ICMPv6DestUnreach,
21 )
22
23
24 class TestMAP(VppTestCase):
25     """MAP Test Case"""
26
27     @classmethod
28     def setUpClass(cls):
29         super(TestMAP, cls).setUpClass()
30
31     @classmethod
32     def tearDownClass(cls):
33         super(TestMAP, cls).tearDownClass()
34
35     def setUp(self):
36         super(TestMAP, self).setUp()
37
38         # create 2 pg interfaces
39         self.create_pg_interfaces(range(4))
40
41         # pg0 is 'inside' IPv4
42         self.pg0.admin_up()
43         self.pg0.config_ip4()
44         self.pg0.resolve_arp()
45         self.pg0.generate_remote_hosts(2)
46         self.pg0.configure_ipv4_neighbors()
47
48         # pg1 is 'outside' IPv6
49         self.pg1.admin_up()
50         self.pg1.config_ip6()
51         self.pg1.generate_remote_hosts(4)
52         self.pg1.configure_ipv6_neighbors()
53
54     def tearDown(self):
55         super(TestMAP, self).tearDown()
56         for i in self.pg_interfaces:
57             i.unconfig_ip4()
58             i.unconfig_ip6()
59             i.admin_down()
60
61     def send_and_assert_encapped(self, packets, ip6_src, ip6_dst, dmac=None):
62         if not dmac:
63             dmac = self.pg1.remote_mac
64
65         self.pg0.add_stream(packets)
66
67         self.pg_enable_capture(self.pg_interfaces)
68         self.pg_start()
69
70         capture = self.pg1.get_capture(len(packets))
71         for rx, tx in zip(capture, packets):
72             self.assertEqual(rx[Ether].dst, dmac)
73             self.assertEqual(rx[IP].src, tx[IP].src)
74             self.assertEqual(rx[IPv6].src, ip6_src)
75             self.assertEqual(rx[IPv6].dst, ip6_dst)
76
77     def send_and_assert_encapped_one(self, packet, ip6_src, ip6_dst, dmac=None):
78         return self.send_and_assert_encapped([packet], ip6_src, ip6_dst, dmac)
79
80     def test_api_map_domain_dump(self):
81         map_dst = "2001::/64"
82         map_src = "3000::1/128"
83         client_pfx = "192.168.0.0/16"
84         tag = "MAP-E tag."
85         index = self.vapi.map_add_domain(
86             ip4_prefix=client_pfx, ip6_prefix=map_dst, ip6_src=map_src, tag=tag
87         ).index
88         rv = self.vapi.map_domain_dump()
89
90         # restore the state early so as to not impact subsequent tests.
91         # If an assert fails, we will not get the chance to do it at the end.
92         self.vapi.map_del_domain(index=index)
93
94         self.assertGreater(len(rv), 0, "Expected output from 'map_domain_dump'")
95
96         # typedefs are returned as ipaddress objects.
97         # wrap results in str() ugh! to avoid the need to call unicode.
98         self.assertEqual(str(rv[0].ip4_prefix), client_pfx)
99         self.assertEqual(str(rv[0].ip6_prefix), map_dst)
100         self.assertEqual(str(rv[0].ip6_src), map_src)
101
102         self.assertEqual(rv[0].tag, tag, "output produced incorrect tag value.")
103
104     def create_domains(self, ip4_pfx_str, ip6_pfx_str, ip6_src_str):
105         ip4_pfx = ipaddress.ip_network(ip4_pfx_str)
106         ip6_dst = ipaddress.ip_network(ip6_pfx_str)
107         mod = ip4_pfx.num_addresses / 1024
108         indicies = []
109         for i in range(ip4_pfx.num_addresses):
110             rv = self.vapi.map_add_domain(
111                 ip6_prefix=ip6_pfx_str,
112                 ip4_prefix=str(ip4_pfx[i]) + "/32",
113                 ip6_src=ip6_src_str,
114             )
115             indicies.append(rv.index)
116         return indicies
117
118     def test_api_map_domains_get(self):
119         # Create a bunch of domains
120         no_domains = 4096  # This must be large enough to ensure VPP suspends
121         domains = self.create_domains("130.67.0.0/20", "2001::/32", "2001::1/128")
122         self.assertEqual(len(domains), no_domains)
123
124         d = []
125         cursor = 0
126
127         # Invalid cursor
128         rv, details = self.vapi.map_domains_get(cursor=no_domains + 10)
129         self.assertEqual(rv.retval, -7)
130
131         # Delete a domain in the middle of walk
132         rv, details = self.vapi.map_domains_get(cursor=0)
133         self.assertEqual(rv.retval, -165)
134         self.vapi.map_del_domain(index=rv.cursor)
135         domains.remove(rv.cursor)
136
137         # Continue at point of deleted cursor
138         rv, details = self.vapi.map_domains_get(cursor=rv.cursor)
139         self.assertIn(rv.retval, [0, -165])
140
141         d = list(self.vapi.vpp.details_iter(self.vapi.map_domains_get))
142         self.assertEqual(len(d), no_domains - 1)
143
144         # Clean up
145         for i in domains:
146             self.vapi.map_del_domain(index=i)
147
148     def test_map_e_udp(self):
149         """MAP-E UDP"""
150
151         #
152         # Add a route to the MAP-BR
153         #
154         map_br_pfx = "2001::"
155         map_br_pfx_len = 32
156         map_route = VppIpRoute(
157             self,
158             map_br_pfx,
159             map_br_pfx_len,
160             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
161         )
162         map_route.add_vpp_config()
163
164         #
165         # Add a domain that maps from pg0 to pg1
166         #
167         map_dst = "2001::/32"
168         map_src = "3000::1/128"
169         client_pfx = "192.168.0.0/16"
170         map_translated_addr = "2001:0:101:7000:0:c0a8:101:7"
171         tag = "MAP-E tag."
172         self.vapi.map_add_domain(
173             ip4_prefix=client_pfx,
174             ip6_prefix=map_dst,
175             ip6_src=map_src,
176             ea_bits_len=20,
177             psid_offset=4,
178             psid_length=4,
179             tag=tag,
180         )
181
182         self.vapi.map_param_set_security_check(enable=1, fragments=1)
183
184         # Enable MAP on interface.
185         self.vapi.map_if_enable_disable(
186             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=0
187         )
188
189         # Ensure MAP doesn't steal all packets!
190         v4 = (
191             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
192             / IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4)
193             / UDP(sport=20000, dport=10000)
194             / Raw(b"\xa5" * 100)
195         )
196         rx = self.send_and_expect(self.pg0, v4 * 4, self.pg0)
197         v4_reply = v4[1]
198         v4_reply.ttl -= 1
199         for p in rx:
200             self.validate(p[1], v4_reply)
201
202         #
203         # Fire in a v4 packet that will be encapped to the BR
204         #
205         v4 = (
206             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
207             / IP(src=self.pg0.remote_ip4, dst="192.168.1.1")
208             / UDP(sport=20000, dport=10000)
209             / Raw(b"\xa5" * 100)
210         )
211
212         self.send_and_assert_encapped(v4 * 4, "3000::1", map_translated_addr)
213
214         #
215         # Verify reordered fragments are able to pass as well
216         #
217         v4 = (
218             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
219             / IP(id=1, src=self.pg0.remote_ip4, dst="192.168.1.1")
220             / UDP(sport=20000, dport=10000)
221             / Raw(b"\xa5" * 1000)
222         )
223
224         frags = fragment_rfc791(v4, 400)
225         frags.reverse()
226
227         self.send_and_assert_encapped(frags, "3000::1", map_translated_addr)
228
229         # Enable MAP on interface.
230         self.vapi.map_if_enable_disable(
231             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=0
232         )
233
234         # Ensure MAP doesn't steal all packets
235         v6 = (
236             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
237             / IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6)
238             / UDP(sport=20000, dport=10000)
239             / Raw(b"\xa5" * 100)
240         )
241         rx = self.send_and_expect(self.pg1, v6 * 1, self.pg1)
242         v6_reply = v6[1]
243         v6_reply.hlim -= 1
244         for p in rx:
245             self.validate(p[1], v6_reply)
246
247         #
248         # Fire in a V6 encapped packet.
249         # expect a decapped packet on the inside ip4 link
250         #
251         p = (
252             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
253             / IPv6(dst="3000::1", src=map_translated_addr)
254             / IP(dst=self.pg0.remote_ip4, src="192.168.1.1")
255             / UDP(sport=10000, dport=20000)
256             / Raw(b"\xa5" * 100)
257         )
258
259         self.pg1.add_stream(p)
260
261         self.pg_enable_capture(self.pg_interfaces)
262         self.pg_start()
263
264         rx = self.pg0.get_capture(1)
265         rx = rx[0]
266
267         self.assertFalse(rx.haslayer(IPv6))
268         self.assertEqual(rx[IP].src, p[IP].src)
269         self.assertEqual(rx[IP].dst, p[IP].dst)
270
271         #
272         # Verify encapped reordered fragments pass as well
273         #
274         p = (
275             IP(id=1, dst=self.pg0.remote_ip4, src="192.168.1.1")
276             / UDP(sport=10000, dport=20000)
277             / Raw(b"\xa5" * 1500)
278         )
279         frags = fragment_rfc791(p, 400)
280         frags.reverse()
281
282         stream = (
283             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
284             / IPv6(dst="3000::1", src=map_translated_addr)
285             / x
286             for x in frags
287         )
288
289         self.pg1.add_stream(stream)
290
291         self.pg_enable_capture(self.pg_interfaces)
292         self.pg_start()
293
294         rx = self.pg0.get_capture(len(frags))
295
296         for r in rx:
297             self.assertFalse(r.haslayer(IPv6))
298             self.assertEqual(r[IP].src, p[IP].src)
299             self.assertEqual(r[IP].dst, p[IP].dst)
300
301         # Verify that fragments pass even if ipv6 layer is fragmented
302         stream = (IPv6(dst="3000::1", src=map_translated_addr) / x for x in frags)
303
304         v6_stream = [
305             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / x
306             for i in range(len(frags))
307             for x in fragment_rfc8200(
308                 IPv6(dst="3000::1", src=map_translated_addr) / frags[i], i, 200
309             )
310         ]
311
312         self.pg1.add_stream(v6_stream)
313
314         self.pg_enable_capture(self.pg_interfaces)
315         self.pg_start()
316
317         rx = self.pg0.get_capture(len(frags))
318
319         for r in rx:
320             self.assertFalse(r.haslayer(IPv6))
321             self.assertEqual(r[IP].src, p[IP].src)
322             self.assertEqual(r[IP].dst, p[IP].dst)
323
324         #
325         # Pre-resolve. No API for this!!
326         #
327         self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
328
329         self.send_and_assert_no_replies(self.pg0, v4, "resolved via default route")
330
331         #
332         # Add a route to 4001::1. Expect the encapped traffic to be
333         # sent via that routes next-hop
334         #
335         pre_res_route = VppIpRoute(
336             self,
337             "4001::1",
338             128,
339             [VppRoutePath(self.pg1.remote_hosts[2].ip6, self.pg1.sw_if_index)],
340         )
341         pre_res_route.add_vpp_config()
342
343         self.send_and_assert_encapped_one(
344             v4, "3000::1", map_translated_addr, dmac=self.pg1.remote_hosts[2].mac
345         )
346
347         #
348         # change the route to the pre-solved next-hop
349         #
350         pre_res_route.modify(
351             [VppRoutePath(self.pg1.remote_hosts[3].ip6, self.pg1.sw_if_index)]
352         )
353         pre_res_route.add_vpp_config()
354
355         self.send_and_assert_encapped_one(
356             v4, "3000::1", map_translated_addr, dmac=self.pg1.remote_hosts[3].mac
357         )
358
359         #
360         # cleanup. The test infra's object registry will ensure
361         # the route is really gone and thus that the unresolve worked.
362         #
363         pre_res_route.remove_vpp_config()
364         self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
365
366     def test_map_e_inner_frag(self):
367         """MAP-E Inner fragmentation"""
368
369         #
370         # Add a route to the MAP-BR
371         #
372         map_br_pfx = "2001::"
373         map_br_pfx_len = 32
374         map_route = VppIpRoute(
375             self,
376             map_br_pfx,
377             map_br_pfx_len,
378             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
379         )
380         map_route.add_vpp_config()
381
382         #
383         # Add a domain that maps from pg0 to pg1
384         #
385         map_dst = "2001::/32"
386         map_src = "3000::1/128"
387         client_pfx = "192.168.0.0/16"
388         map_translated_addr = "2001:0:101:7000:0:c0a8:101:7"
389         tag = "MAP-E tag."
390         self.vapi.map_add_domain(
391             ip4_prefix=client_pfx,
392             ip6_prefix=map_dst,
393             ip6_src=map_src,
394             ea_bits_len=20,
395             psid_offset=4,
396             psid_length=4,
397             mtu=1000,
398             tag=tag,
399         )
400
401         # Enable MAP on interface.
402         self.vapi.map_if_enable_disable(
403             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=0
404         )
405
406         # Enable inner fragmentation
407         self.vapi.map_param_set_fragmentation(inner=1)
408
409         v4 = (
410             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
411             / IP(src=self.pg0.remote_ip4, dst="192.168.1.1")
412             / UDP(sport=20000, dport=10000)
413             / Raw(b"\xa5" * 1300)
414         )
415
416         self.pg_send(self.pg0, v4 * 1)
417         rx = self.pg1.get_capture(2)
418
419         # 1000-sizeof(ip6_header_t) = 960.
420         frags = fragment_rfc791(v4[1], 960)
421         frags[0].id = 0
422         frags[1].id = 0
423         frags[0].ttl -= 1
424         frags[1].ttl -= 1
425         frags[0].chksum = 0
426         frags[1].chksum = 0
427
428         v6_reply1 = IPv6(src="3000::1", dst=map_translated_addr, hlim=63) / frags[0]
429         v6_reply2 = IPv6(src="3000::1", dst=map_translated_addr, hlim=63) / frags[1]
430         rx[0][1].fl = 0
431         rx[1][1].fl = 0
432         rx[0][1][IP].id = 0
433         rx[1][1][IP].id = 0
434         rx[0][1][IP].chksum = 0
435         rx[1][1][IP].chksum = 0
436
437         self.validate(rx[0][1], v6_reply1)
438         self.validate(rx[1][1], v6_reply2)
439
440     def test_map_e_tcp_mss(self):
441         """MAP-E TCP MSS"""
442
443         #
444         # Add a route to the MAP-BR
445         #
446         map_br_pfx = "2001::"
447         map_br_pfx_len = 32
448         map_route = VppIpRoute(
449             self,
450             map_br_pfx,
451             map_br_pfx_len,
452             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
453         )
454         map_route.add_vpp_config()
455
456         #
457         # Add a domain that maps from pg0 to pg1
458         #
459         map_dst = "2001::/32"
460         map_src = "3000::1/128"
461         client_pfx = "192.168.0.0/16"
462         map_translated_addr = "2001:0:101:5000:0:c0a8:101:5"
463         tag = "MAP-E TCP tag."
464         self.vapi.map_add_domain(
465             ip4_prefix=client_pfx,
466             ip6_prefix=map_dst,
467             ip6_src=map_src,
468             ea_bits_len=20,
469             psid_offset=4,
470             psid_length=4,
471             tag=tag,
472         )
473
474         # Enable MAP on pg0 interface.
475         self.vapi.map_if_enable_disable(
476             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=0
477         )
478
479         # Enable MAP on pg1 interface.
480         self.vapi.map_if_enable_disable(
481             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=0
482         )
483
484         # TCP MSS clamping
485         mss_clamp = 1300
486         self.vapi.map_param_set_tcp(mss_clamp)
487
488         #
489         # Send a v4 packet that will be encapped.
490         #
491         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
492         p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.1.1")
493         p_tcp = TCP(sport=20000, dport=30000, flags="S", options=[("MSS", 1455)])
494         p4 = p_ether / p_ip4 / p_tcp
495
496         self.pg1.add_stream(p4)
497         self.pg_enable_capture(self.pg_interfaces)
498         self.pg_start()
499
500         rx = self.pg1.get_capture(1)
501         rx = rx[0]
502
503         self.assertTrue(rx.haslayer(IPv6))
504         self.assertEqual(rx[IP].src, p4[IP].src)
505         self.assertEqual(rx[IP].dst, p4[IP].dst)
506         self.assertEqual(rx[IPv6].src, "3000::1")
507         self.assertEqual(rx[TCP].options, TCP(options=[("MSS", mss_clamp)]).options)
508
509     def validate(self, rx, expected):
510         self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
511
512     def validate_frag6(self, p6_frag, p_ip6_expected):
513         self.assertFalse(p6_frag.haslayer(IP))
514         self.assertTrue(p6_frag.haslayer(IPv6))
515         self.assertTrue(p6_frag.haslayer(IPv6ExtHdrFragment))
516         self.assertEqual(p6_frag[IPv6].src, p_ip6_expected.src)
517         self.assertEqual(p6_frag[IPv6].dst, p_ip6_expected.dst)
518
519     def validate_frag_payload_len6(self, rx, proto, payload_len_expected):
520         payload_total = 0
521         for p in rx:
522             payload_total += p[IPv6].plen
523
524         # First fragment has proto
525         payload_total -= len(proto())
526
527         # Every fragment has IPv6 fragment header
528         payload_total -= len(IPv6ExtHdrFragment()) * len(rx)
529
530         self.assertEqual(payload_total, payload_len_expected)
531
532     def validate_frag4(self, p4_frag, p_ip4_expected):
533         self.assertFalse(p4_frag.haslayer(IPv6))
534         self.assertTrue(p4_frag.haslayer(IP))
535         self.assertTrue(p4_frag[IP].frag != 0 or p4_frag[IP].flags.MF)
536         self.assertEqual(p4_frag[IP].src, p_ip4_expected.src)
537         self.assertEqual(p4_frag[IP].dst, p_ip4_expected.dst)
538
539     def validate_frag_payload_len4(self, rx, proto, payload_len_expected):
540         payload_total = 0
541         for p in rx:
542             payload_total += len(p[IP].payload)
543
544         # First fragment has proto
545         payload_total -= len(proto())
546
547         self.assertEqual(payload_total, payload_len_expected)
548
549     def payload(self, len):
550         return "x" * len
551
552     def test_map_t(self):
553         """MAP-T"""
554
555         #
556         # Add a domain that maps from pg0 to pg1
557         #
558         map_dst = "2001:db8::/32"
559         map_src = "1234:5678:90ab:cdef::/64"
560         ip4_pfx = "192.168.0.0/24"
561         tag = "MAP-T Tag."
562
563         self.vapi.map_add_domain(
564             ip6_prefix=map_dst,
565             ip4_prefix=ip4_pfx,
566             ip6_src=map_src,
567             ea_bits_len=16,
568             psid_offset=6,
569             psid_length=4,
570             mtu=1500,
571             tag=tag,
572         )
573
574         # Enable MAP-T on interfaces.
575         self.vapi.map_if_enable_disable(
576             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=1
577         )
578         self.vapi.map_if_enable_disable(
579             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
580         )
581
582         # Ensure MAP doesn't steal all packets!
583         v4 = (
584             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
585             / IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4)
586             / UDP(sport=20000, dport=10000)
587             / Raw(b"\xa5" * 100)
588         )
589         rx = self.send_and_expect(self.pg0, v4 * 1, self.pg0)
590         v4_reply = v4[1]
591         v4_reply.ttl -= 1
592         for p in rx:
593             self.validate(p[1], v4_reply)
594         # Ensure MAP doesn't steal all packets
595         v6 = (
596             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
597             / IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6)
598             / UDP(sport=20000, dport=10000)
599             / Raw(b"\xa5" * 100)
600         )
601         rx = self.send_and_expect(self.pg1, v6 * 1, self.pg1)
602         v6_reply = v6[1]
603         v6_reply.hlim -= 1
604         for p in rx:
605             self.validate(p[1], v6_reply)
606
607         map_route = VppIpRoute(
608             self,
609             "2001:db8::",
610             32,
611             [
612                 VppRoutePath(
613                     self.pg1.remote_ip6,
614                     self.pg1.sw_if_index,
615                     proto=DpoProto.DPO_PROTO_IP6,
616                 )
617             ],
618         )
619         map_route.add_vpp_config()
620
621         #
622         # Send a v4 packet that will be translated
623         #
624         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
625         p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.0.1")
626         payload = TCP(sport=0xABCD, dport=0xABCD)
627
628         p4 = p_ether / p_ip4 / payload
629         p6_translated = (
630             IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1f0::c0a8:1:f")
631             / payload
632         )
633         p6_translated.hlim -= 1
634         rx = self.send_and_expect(self.pg0, p4 * 1, self.pg1)
635         for p in rx:
636             self.validate(p[1], p6_translated)
637
638         # Send back an IPv6 packet that will be "untranslated"
639         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
640         p_ip6 = IPv6(
641             src="2001:db8:1f0::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
642         )
643         p6 = p_ether6 / p_ip6 / payload
644         p4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4) / payload
645         p4_translated.id = 0
646         p4_translated.ttl -= 1
647         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg0)
648         for p in rx:
649             self.validate(p[1], p4_translated)
650
651         # IPv4 TTL=0
652         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst="192.168.0.1", ttl=0)
653         p4 = p_ether / ip4_ttl_expired / payload
654
655         icmp4_reply = (
656             IP(id=0, ttl=254, src=self.pg0.local_ip4, dst=self.pg0.remote_ip4)
657             / ICMP(type="time-exceeded", code="ttl-zero-during-transit")
658             / IP(src=self.pg0.remote_ip4, dst="192.168.0.1", ttl=0)
659             / payload
660         )
661         rx = self.send_and_expect(self.pg0, p4 * 1, self.pg0)
662         for p in rx:
663             self.validate(p[1], icmp4_reply)
664
665         # IPv4 TTL=1
666         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst="192.168.0.1", ttl=1)
667         p4 = p_ether / ip4_ttl_expired / payload
668
669         icmp4_reply = (
670             IP(id=0, ttl=254, src=self.pg0.local_ip4, dst=self.pg0.remote_ip4)
671             / ICMP(type="time-exceeded", code="ttl-zero-during-transit")
672             / IP(src=self.pg0.remote_ip4, dst="192.168.0.1", ttl=1)
673             / payload
674         )
675         rx = self.send_and_expect(self.pg0, p4 * 1, self.pg0)
676         for p in rx:
677             self.validate(p[1], icmp4_reply)
678
679         # IPv6 Hop limit at BR
680         ip6_hlim_expired = IPv6(
681             hlim=1,
682             src="2001:db8:1ab::c0a8:1:ab",
683             dst="1234:5678:90ab:cdef:ac:1001:200:0",
684         )
685         p6 = p_ether6 / ip6_hlim_expired / payload
686
687         icmp6_reply = (
688             IPv6(hlim=255, src=self.pg1.local_ip6, dst="2001:db8:1ab::c0a8:1:ab")
689             / ICMPv6TimeExceeded(code=0)
690             / IPv6(
691                 src="2001:db8:1ab::c0a8:1:ab",
692                 dst="1234:5678:90ab:cdef:ac:1001:200:0",
693                 hlim=1,
694             )
695             / payload
696         )
697         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg1)
698         for p in rx:
699             self.validate(p[1], icmp6_reply)
700
701         # IPv6 Hop limit beyond BR
702         ip6_hlim_expired = IPv6(
703             hlim=0,
704             src="2001:db8:1ab::c0a8:1:ab",
705             dst="1234:5678:90ab:cdef:ac:1001:200:0",
706         )
707         p6 = p_ether6 / ip6_hlim_expired / payload
708
709         icmp6_reply = (
710             IPv6(hlim=255, src=self.pg1.local_ip6, dst="2001:db8:1ab::c0a8:1:ab")
711             / ICMPv6TimeExceeded(code=0)
712             / IPv6(
713                 src="2001:db8:1ab::c0a8:1:ab",
714                 dst="1234:5678:90ab:cdef:ac:1001:200:0",
715                 hlim=0,
716             )
717             / payload
718         )
719         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg1)
720         for p in rx:
721             self.validate(p[1], icmp6_reply)
722
723         # IPv4 Well-known port
724         p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.0.1")
725         payload = UDP(sport=200, dport=200)
726         p4 = p_ether / p_ip4 / payload
727         self.send_and_assert_no_replies(self.pg0, p4 * 1)
728
729         # IPv6 Well-known port
730         payload = UDP(sport=200, dport=200)
731         p6 = p_ether6 / p_ip6 / payload
732         self.send_and_assert_no_replies(self.pg1, p6 * 1)
733
734         # UDP packet fragmentation
735         payload_len = 1453
736         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
737         p4 = p_ether / p_ip4 / payload
738         self.pg_enable_capture()
739         self.pg0.add_stream(p4)
740         self.pg_start()
741         rx = self.pg1.get_capture(2)
742
743         p_ip6_translated = IPv6(
744             src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1e0::c0a8:1:e"
745         )
746         for p in rx:
747             self.validate_frag6(p, p_ip6_translated)
748
749         self.validate_frag_payload_len6(rx, UDP, payload_len)
750
751         # UDP packet fragmentation send fragments
752         payload_len = 1453
753         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
754         p4 = p_ether / p_ip4 / payload
755         frags = fragment_rfc791(p4, fragsize=1000)
756         self.pg_enable_capture()
757         self.pg0.add_stream(frags)
758         self.pg_start()
759         rx = self.pg1.get_capture(2)
760
761         for p in rx:
762             self.validate_frag6(p, p_ip6_translated)
763
764         self.validate_frag_payload_len6(rx, UDP, payload_len)
765
766         # Send back an fragmented IPv6 UDP packet that will be "untranslated"
767         payload = UDP(sport=4000, dport=40000) / self.payload(payload_len)
768         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
769         p_ip6 = IPv6(
770             src="2001:db8:1e0::c0a8:1:e", dst="1234:5678:90ab:cdef:ac:1001:200:0"
771         )
772         p6 = p_ether6 / p_ip6 / payload
773         frags6 = fragment_rfc8200(p6, identification=0xDCBA, fragsize=1000)
774
775         p_ip4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4)
776         p4_translated = p_ip4_translated / payload
777         p4_translated.id = 0
778         p4_translated.ttl -= 1
779
780         self.pg_enable_capture()
781         self.pg1.add_stream(frags6)
782         self.pg_start()
783         rx = self.pg0.get_capture(2)
784
785         for p in rx:
786             self.validate_frag4(p, p4_translated)
787
788         self.validate_frag_payload_len4(rx, UDP, payload_len)
789
790         # ICMP packet fragmentation
791         payload = ICMP(id=6529) / self.payload(payload_len)
792         p4 = p_ether / p_ip4 / payload
793         self.pg_enable_capture()
794         self.pg0.add_stream(p4)
795         self.pg_start()
796         rx = self.pg1.get_capture(2)
797
798         p_ip6_translated = IPv6(
799             src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:160::c0a8:1:6"
800         )
801         for p in rx:
802             self.validate_frag6(p, p_ip6_translated)
803
804         self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len)
805
806         # ICMP packet fragmentation send fragments
807         payload = ICMP(id=6529) / self.payload(payload_len)
808         p4 = p_ether / p_ip4 / payload
809         frags = fragment_rfc791(p4, fragsize=1000)
810         self.pg_enable_capture()
811         self.pg0.add_stream(frags)
812         self.pg_start()
813         rx = self.pg1.get_capture(2)
814
815         for p in rx:
816             self.validate_frag6(p, p_ip6_translated)
817
818         self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len)
819
820         # TCP MSS clamping
821         self.vapi.map_param_set_tcp(1300)
822
823         #
824         # Send a v4 TCP SYN packet that will be translated and MSS clamped
825         #
826         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
827         p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.0.1")
828         payload = TCP(sport=0xABCD, dport=0xABCD, flags="S", options=[("MSS", 1460)])
829
830         p4 = p_ether / p_ip4 / payload
831         p6_translated = (
832             IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1f0::c0a8:1:f")
833             / payload
834         )
835         p6_translated.hlim -= 1
836         p6_translated[TCP].options = [("MSS", 1300)]
837         rx = self.send_and_expect(self.pg0, p4 * 1, self.pg1)
838         for p in rx:
839             self.validate(p[1], p6_translated)
840
841         # Send back an IPv6 packet that will be "untranslated"
842         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
843         p_ip6 = IPv6(
844             src="2001:db8:1f0::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
845         )
846         p6 = p_ether6 / p_ip6 / payload
847         p4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4) / payload
848         p4_translated.id = 0
849         p4_translated.ttl -= 1
850         p4_translated[TCP].options = [("MSS", 1300)]
851         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg0)
852         for p in rx:
853             self.validate(p[1], p4_translated)
854
855         # TCP MSS clamping cleanup
856         self.vapi.map_param_set_tcp(0)
857
858         # Enable icmp6 param to get back ICMPv6 unreachable messages in case
859         # of security check fails
860         self.vapi.map_param_set_icmp6(enable_unreachable=1)
861
862         # Send back an IPv6 packet that will be droppped due to security
863         # check fail
864         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
865         p_ip6_sec_check_fail = IPv6(
866             src="2001:db8:1fe::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
867         )
868         payload = TCP(sport=0xABCD, dport=0xABCD)
869         p6 = p_ether6 / p_ip6_sec_check_fail / payload
870
871         self.pg_send(self.pg1, p6 * 1)
872         self.pg0.get_capture(0, timeout=1)
873         rx = self.pg1.get_capture(1)
874
875         icmp6_reply = (
876             IPv6(hlim=255, src=self.pg1.local_ip6, dst="2001:db8:1fe::c0a8:1:f")
877             / ICMPv6DestUnreach(code=5)
878             / p_ip6_sec_check_fail
879             / payload
880         )
881
882         for p in rx:
883             self.validate(p[1], icmp6_reply)
884
885         # ICMPv6 unreachable messages cleanup
886         self.vapi.map_param_set_icmp6(enable_unreachable=0)
887
888     def test_map_t_ip6_psid(self):
889         """MAP-T v6->v4 PSID validation"""
890
891         #
892         # Add a domain that maps from pg0 to pg1
893         #
894         map_dst = "2001:db8::/32"
895         map_src = "1234:5678:90ab:cdef::/64"
896         ip4_pfx = "192.168.0.0/24"
897         tag = "MAP-T Test Domain"
898
899         self.vapi.map_add_domain(
900             ip6_prefix=map_dst,
901             ip4_prefix=ip4_pfx,
902             ip6_src=map_src,
903             ea_bits_len=16,
904             psid_offset=6,
905             psid_length=4,
906             mtu=1500,
907             tag=tag,
908         )
909
910         # Enable MAP-T on interfaces.
911         self.vapi.map_if_enable_disable(
912             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=1
913         )
914         self.vapi.map_if_enable_disable(
915             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
916         )
917
918         map_route = VppIpRoute(
919             self,
920             "2001:db8::",
921             32,
922             [
923                 VppRoutePath(
924                     self.pg1.remote_ip6,
925                     self.pg1.sw_if_index,
926                     proto=DpoProto.DPO_PROTO_IP6,
927                 )
928             ],
929         )
930         map_route.add_vpp_config()
931
932         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
933         p_ip6 = IPv6(
934             src="2001:db8:1f0::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
935         )
936
937         # Send good IPv6 source port, ensure translated IPv4 received
938         payload = TCP(sport=0xABCD, dport=80)
939         p6 = p_ether6 / p_ip6 / payload
940         p4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4) / payload
941         p4_translated.id = 0
942         p4_translated.ttl -= 1
943         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg0)
944         for p in rx:
945             self.validate(p[1], p4_translated)
946
947         # Send bad IPv6 source port, ensure translated IPv4 not received
948         payload = TCP(sport=0xDCBA, dport=80)
949         p6 = p_ether6 / p_ip6 / payload
950         self.send_and_assert_no_replies(self.pg1, p6 * 1)
951
952     def test_map_t_pre_resolve(self):
953         """MAP-T pre-resolve"""
954
955         # Add a domain that maps from pg0 to pg1
956         map_dst = "2001:db8::/32"
957         map_src = "1234:5678:90ab:cdef::/64"
958         ip4_pfx = "192.168.0.0/24"
959         tag = "MAP-T Test Domain."
960
961         self.vapi.map_add_domain(
962             ip6_prefix=map_dst,
963             ip4_prefix=ip4_pfx,
964             ip6_src=map_src,
965             ea_bits_len=16,
966             psid_offset=6,
967             psid_length=4,
968             mtu=1500,
969             tag=tag,
970         )
971
972         # Enable MAP-T on interfaces.
973         self.vapi.map_if_enable_disable(
974             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=1
975         )
976         self.vapi.map_if_enable_disable(
977             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
978         )
979
980         # Enable pre-resolve option
981         self.vapi.map_param_add_del_pre_resolve(
982             ip4_nh_address="10.1.2.3", ip6_nh_address="4001::1", is_add=1
983         )
984
985         # Add a route to 4001::1 and expect the translated traffic to be
986         # sent via that route next-hop.
987         pre_res_route6 = VppIpRoute(
988             self,
989             "4001::1",
990             128,
991             [VppRoutePath(self.pg1.remote_hosts[2].ip6, self.pg1.sw_if_index)],
992         )
993         pre_res_route6.add_vpp_config()
994
995         # Add a route to 10.1.2.3 and expect the "untranslated" traffic to be
996         # sent via that route next-hop.
997         pre_res_route4 = VppIpRoute(
998             self,
999             "10.1.2.3",
1000             32,
1001             [VppRoutePath(self.pg0.remote_hosts[1].ip4, self.pg0.sw_if_index)],
1002         )
1003         pre_res_route4.add_vpp_config()
1004
1005         # Send an IPv4 packet that will be translated
1006         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1007         p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.0.1")
1008         payload = TCP(sport=0xABCD, dport=0xABCD)
1009         p4 = p_ether / p_ip4 / payload
1010
1011         p6_translated = (
1012             IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1f0::c0a8:1:f")
1013             / payload
1014         )
1015         p6_translated.hlim -= 1
1016
1017         rx = self.send_and_expect(self.pg0, p4 * 1, self.pg1)
1018         for p in rx:
1019             self.assertEqual(p[Ether].dst, self.pg1.remote_hosts[2].mac)
1020             self.validate(p[1], p6_translated)
1021
1022         # Send back an IPv6 packet that will be "untranslated"
1023         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1024         p_ip6 = IPv6(
1025             src="2001:db8:1f0::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
1026         )
1027         p6 = p_ether6 / p_ip6 / payload
1028
1029         p4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4) / payload
1030         p4_translated.id = 0
1031         p4_translated.ttl -= 1
1032
1033         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg0)
1034         for p in rx:
1035             self.assertEqual(p[Ether].dst, self.pg0.remote_hosts[1].mac)
1036             self.validate(p[1], p4_translated)
1037
1038         # Cleanup pre-resolve option
1039         self.vapi.map_param_add_del_pre_resolve(
1040             ip4_nh_address="10.1.2.3", ip6_nh_address="4001::1", is_add=0
1041         )
1042
1043
1044 if __name__ == "__main__":
1045     unittest.main(testRunner=VppTestRunner)