hs-test: fixed timed out tests passing in the CI
[vpp.git] / test / test_map.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import unittest
5
6 from framework import VppTestCase
7 from asfframework import VppTestRunner
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath
10 from util import fragment_rfc791, fragment_rfc8200
11
12 import scapy.compat
13 from scapy.layers.l2 import Ether
14 from scapy.packet import Raw
15 from scapy.layers.inet import IP, UDP, ICMP, TCP
16 from scapy.layers.inet6 import (
17     IPv6,
18     ICMPv6TimeExceeded,
19     IPv6ExtHdrFragment,
20     ICMPv6EchoRequest,
21     ICMPv6DestUnreach,
22 )
23
24
25 class TestMAP(VppTestCase):
26     """MAP Test Case"""
27
28     @classmethod
29     def setUpClass(cls):
30         super(TestMAP, cls).setUpClass()
31
32     @classmethod
33     def tearDownClass(cls):
34         super(TestMAP, cls).tearDownClass()
35
36     def setUp(self):
37         super(TestMAP, self).setUp()
38
39         # create 2 pg interfaces
40         self.create_pg_interfaces(range(4))
41
42         # pg0 is 'inside' IPv4
43         self.pg0.admin_up()
44         self.pg0.config_ip4()
45         self.pg0.resolve_arp()
46         self.pg0.generate_remote_hosts(2)
47         self.pg0.configure_ipv4_neighbors()
48
49         # pg1 is 'outside' IPv6
50         self.pg1.admin_up()
51         self.pg1.config_ip6()
52         self.pg1.generate_remote_hosts(4)
53         self.pg1.configure_ipv6_neighbors()
54
55     def tearDown(self):
56         super(TestMAP, self).tearDown()
57
58         for i in self.pg_interfaces:
59             for t in (0, 1):
60                 self.vapi.map_if_enable_disable(
61                     is_enable=0, sw_if_index=i.sw_if_index, is_translation=t
62                 )
63             i.unconfig_ip4()
64             i.unconfig_ip6()
65             i.admin_down()
66
67     def send_and_assert_encapped(self, packets, ip6_src, ip6_dst, dmac=None):
68         if not dmac:
69             dmac = self.pg1.remote_mac
70
71         self.pg0.add_stream(packets)
72
73         self.pg_enable_capture(self.pg_interfaces)
74         self.pg_start()
75
76         capture = self.pg1.get_capture(len(packets))
77         for rx, tx in zip(capture, packets):
78             self.assertEqual(rx[Ether].dst, dmac)
79             self.assertEqual(rx[IP].src, tx[IP].src)
80             self.assertEqual(rx[IPv6].src, ip6_src)
81             self.assertEqual(rx[IPv6].dst, ip6_dst)
82
83     def send_and_assert_encapped_one(self, packet, ip6_src, ip6_dst, dmac=None):
84         return self.send_and_assert_encapped([packet], ip6_src, ip6_dst, dmac)
85
86     def test_api_map_domain_dump(self):
87         map_dst = "2001::/64"
88         map_src = "3000::1/128"
89         client_pfx = "192.168.0.0/16"
90         tag = "MAP-E tag."
91         index = self.vapi.map_add_domain(
92             ip4_prefix=client_pfx, ip6_prefix=map_dst, ip6_src=map_src, tag=tag
93         ).index
94         rv = self.vapi.map_domain_dump()
95
96         # restore the state early so as to not impact subsequent tests.
97         # If an assert fails, we will not get the chance to do it at the end.
98         self.vapi.map_del_domain(index=index)
99
100         self.assertGreater(len(rv), 0, "Expected output from 'map_domain_dump'")
101
102         # typedefs are returned as ipaddress objects.
103         # wrap results in str() ugh! to avoid the need to call unicode.
104         self.assertEqual(str(rv[0].ip4_prefix), client_pfx)
105         self.assertEqual(str(rv[0].ip6_prefix), map_dst)
106         self.assertEqual(str(rv[0].ip6_src), map_src)
107
108         self.assertEqual(rv[0].tag, tag, "output produced incorrect tag value.")
109
110     def create_domains(self, ip4_pfx_str, ip6_pfx_str, ip6_src_str):
111         ip4_pfx = ipaddress.ip_network(ip4_pfx_str)
112         ip6_dst = ipaddress.ip_network(ip6_pfx_str)
113         mod = ip4_pfx.num_addresses / 1024
114         indicies = []
115         for i in range(ip4_pfx.num_addresses):
116             rv = self.vapi.map_add_domain(
117                 ip6_prefix=ip6_pfx_str,
118                 ip4_prefix=str(ip4_pfx[i]) + "/32",
119                 ip6_src=ip6_src_str,
120             )
121             indicies.append(rv.index)
122         return indicies
123
124     def test_api_map_domains_get(self):
125         # Create a bunch of domains
126         no_domains = 4096  # This must be large enough to ensure VPP suspends
127         domains = self.create_domains("130.67.0.0/20", "2001::/32", "2001::1/128")
128         self.assertEqual(len(domains), no_domains)
129
130         d = []
131         cursor = 0
132
133         # Invalid cursor
134         rv, details = self.vapi.map_domains_get(cursor=no_domains + 10)
135         self.assertEqual(rv.retval, -7)
136
137         # Delete a domain in the middle of walk
138         rv, details = self.vapi.map_domains_get(cursor=0)
139         self.assertEqual(rv.retval, -165)
140         self.vapi.map_del_domain(index=rv.cursor)
141         domains.remove(rv.cursor)
142
143         # Continue at point of deleted cursor
144         rv, details = self.vapi.map_domains_get(cursor=rv.cursor)
145         self.assertIn(rv.retval, [0, -165])
146
147         d = list(self.vapi.vpp.details_iter(self.vapi.map_domains_get))
148         self.assertEqual(len(d), no_domains - 1)
149
150         # Clean up
151         for i in domains:
152             self.vapi.map_del_domain(index=i)
153
154     def test_map_e_udp(self):
155         """MAP-E UDP"""
156
157         #
158         # Add a route to the MAP-BR
159         #
160         map_br_pfx = "2001::"
161         map_br_pfx_len = 32
162         map_route = VppIpRoute(
163             self,
164             map_br_pfx,
165             map_br_pfx_len,
166             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
167         )
168         map_route.add_vpp_config()
169
170         #
171         # Add a domain that maps from pg0 to pg1
172         #
173         map_dst = "2001::/32"
174         map_src = "3000::1/128"
175         client_pfx = "192.168.0.0/16"
176         map_translated_addr = "2001:0:101:7000:0:c0a8:101:7"
177         tag = "MAP-E tag."
178         self.vapi.map_add_domain(
179             ip4_prefix=client_pfx,
180             ip6_prefix=map_dst,
181             ip6_src=map_src,
182             ea_bits_len=20,
183             psid_offset=4,
184             psid_length=4,
185             tag=tag,
186         )
187
188         self.vapi.map_param_set_security_check(enable=1, fragments=1)
189
190         # Enable MAP on interface.
191         self.vapi.map_if_enable_disable(
192             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=0
193         )
194
195         # Ensure MAP doesn't steal all packets!
196         v4 = (
197             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
198             / IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4)
199             / UDP(sport=20000, dport=10000)
200             / Raw(b"\xa5" * 100)
201         )
202         rx = self.send_and_expect(self.pg0, v4 * 4, self.pg0)
203         v4_reply = v4[1]
204         v4_reply.ttl -= 1
205         for p in rx:
206             self.validate(p[1], v4_reply)
207
208         #
209         # Fire in a v4 packet that will be encapped to the BR
210         #
211         v4 = (
212             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
213             / IP(src=self.pg0.remote_ip4, dst="192.168.1.1")
214             / UDP(sport=20000, dport=10000)
215             / Raw(b"\xa5" * 100)
216         )
217
218         self.send_and_assert_encapped(v4 * 4, "3000::1", map_translated_addr)
219
220         #
221         # Verify reordered fragments are able to pass as well
222         #
223         v4 = (
224             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
225             / IP(id=1, src=self.pg0.remote_ip4, dst="192.168.1.1")
226             / UDP(sport=20000, dport=10000)
227             / Raw(b"\xa5" * 1000)
228         )
229
230         frags = fragment_rfc791(v4, 400)
231         frags.reverse()
232
233         self.send_and_assert_encapped(frags, "3000::1", map_translated_addr)
234
235         # Enable MAP on interface.
236         self.vapi.map_if_enable_disable(
237             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=0
238         )
239
240         # Ensure MAP doesn't steal all packets
241         v6 = (
242             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
243             / IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6)
244             / UDP(sport=20000, dport=10000)
245             / Raw(b"\xa5" * 100)
246         )
247         rx = self.send_and_expect(self.pg1, v6 * 1, self.pg1)
248         v6_reply = v6[1]
249         v6_reply.hlim -= 1
250         for p in rx:
251             self.validate(p[1], v6_reply)
252
253         #
254         # Fire in a V6 encapped packet.
255         # expect a decapped packet on the inside ip4 link
256         #
257         p = (
258             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
259             / IPv6(dst="3000::1", src=map_translated_addr)
260             / IP(dst=self.pg0.remote_ip4, src="192.168.1.1")
261             / UDP(sport=10000, dport=20000)
262             / Raw(b"\xa5" * 100)
263         )
264
265         self.pg1.add_stream(p)
266
267         self.pg_enable_capture(self.pg_interfaces)
268         self.pg_start()
269
270         rx = self.pg0.get_capture(1)
271         rx = rx[0]
272
273         self.assertFalse(rx.haslayer(IPv6))
274         self.assertEqual(rx[IP].src, p[IP].src)
275         self.assertEqual(rx[IP].dst, p[IP].dst)
276
277         #
278         # Verify encapped reordered fragments pass as well
279         #
280         p = (
281             IP(id=1, dst=self.pg0.remote_ip4, src="192.168.1.1")
282             / UDP(sport=10000, dport=20000)
283             / Raw(b"\xa5" * 1500)
284         )
285         frags = fragment_rfc791(p, 400)
286         frags.reverse()
287
288         stream = (
289             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
290             / IPv6(dst="3000::1", src=map_translated_addr)
291             / x
292             for x in frags
293         )
294
295         self.pg1.add_stream(stream)
296
297         self.pg_enable_capture(self.pg_interfaces)
298         self.pg_start()
299
300         rx = self.pg0.get_capture(len(frags))
301
302         for r in rx:
303             self.assertFalse(r.haslayer(IPv6))
304             self.assertEqual(r[IP].src, p[IP].src)
305             self.assertEqual(r[IP].dst, p[IP].dst)
306
307         # Verify that fragments pass even if ipv6 layer is fragmented
308         stream = (IPv6(dst="3000::1", src=map_translated_addr) / x for x in frags)
309
310         v6_stream = [
311             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / x
312             for i in range(len(frags))
313             for x in fragment_rfc8200(
314                 IPv6(dst="3000::1", src=map_translated_addr) / frags[i], i, 200
315             )
316         ]
317
318         self.pg1.add_stream(v6_stream)
319
320         self.pg_enable_capture(self.pg_interfaces)
321         self.pg_start()
322
323         rx = self.pg0.get_capture(len(frags))
324
325         for r in rx:
326             self.assertFalse(r.haslayer(IPv6))
327             self.assertEqual(r[IP].src, p[IP].src)
328             self.assertEqual(r[IP].dst, p[IP].dst)
329
330         #
331         # Pre-resolve. No API for this!!
332         #
333         self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
334
335         self.send_and_assert_no_replies(self.pg0, v4, "resolved via default route")
336
337         #
338         # Add a route to 4001::1. Expect the encapped traffic to be
339         # sent via that routes next-hop
340         #
341         pre_res_route = VppIpRoute(
342             self,
343             "4001::1",
344             128,
345             [VppRoutePath(self.pg1.remote_hosts[2].ip6, self.pg1.sw_if_index)],
346         )
347         pre_res_route.add_vpp_config()
348
349         self.send_and_assert_encapped_one(
350             v4, "3000::1", map_translated_addr, dmac=self.pg1.remote_hosts[2].mac
351         )
352
353         #
354         # change the route to the pre-solved next-hop
355         #
356         pre_res_route.modify(
357             [VppRoutePath(self.pg1.remote_hosts[3].ip6, self.pg1.sw_if_index)]
358         )
359         pre_res_route.add_vpp_config()
360
361         self.send_and_assert_encapped_one(
362             v4, "3000::1", map_translated_addr, dmac=self.pg1.remote_hosts[3].mac
363         )
364
365         #
366         # cleanup. The test infra's object registry will ensure
367         # the route is really gone and thus that the unresolve worked.
368         #
369         pre_res_route.remove_vpp_config()
370         self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
371
372     def test_map_e_inner_frag(self):
373         """MAP-E Inner fragmentation"""
374
375         #
376         # Add a route to the MAP-BR
377         #
378         map_br_pfx = "2001::"
379         map_br_pfx_len = 32
380         map_route = VppIpRoute(
381             self,
382             map_br_pfx,
383             map_br_pfx_len,
384             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
385         )
386         map_route.add_vpp_config()
387
388         #
389         # Add a domain that maps from pg0 to pg1
390         #
391         map_dst = "2001::/32"
392         map_src = "3000::1/128"
393         client_pfx = "192.168.0.0/16"
394         map_translated_addr = "2001:0:101:7000:0:c0a8:101:7"
395         tag = "MAP-E tag."
396         self.vapi.map_add_domain(
397             ip4_prefix=client_pfx,
398             ip6_prefix=map_dst,
399             ip6_src=map_src,
400             ea_bits_len=20,
401             psid_offset=4,
402             psid_length=4,
403             mtu=1000,
404             tag=tag,
405         )
406
407         # Enable MAP on interface.
408         self.vapi.map_if_enable_disable(
409             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=0
410         )
411
412         # Enable inner fragmentation
413         self.vapi.map_param_set_fragmentation(inner=1)
414
415         v4 = (
416             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
417             / IP(src=self.pg0.remote_ip4, dst="192.168.1.1")
418             / UDP(sport=20000, dport=10000)
419             / Raw(b"\xa5" * 1300)
420         )
421
422         self.pg_send(self.pg0, v4 * 1)
423         rx = self.pg1.get_capture(2)
424
425         # 1000-sizeof(ip6_header_t) = 960.
426         frags = fragment_rfc791(v4[1], 960)
427         frags[0].id = 0
428         frags[1].id = 0
429         frags[0].ttl -= 1
430         frags[1].ttl -= 1
431         frags[0].chksum = 0
432         frags[1].chksum = 0
433
434         v6_reply1 = IPv6(src="3000::1", dst=map_translated_addr, hlim=63) / frags[0]
435         v6_reply2 = IPv6(src="3000::1", dst=map_translated_addr, hlim=63) / frags[1]
436         rx[0][1].fl = 0
437         rx[1][1].fl = 0
438         rx[0][1][IP].id = 0
439         rx[1][1][IP].id = 0
440         rx[0][1][IP].chksum = 0
441         rx[1][1][IP].chksum = 0
442
443         self.validate(rx[0][1], v6_reply1)
444         self.validate(rx[1][1], v6_reply2)
445
446     def test_map_e_tcp_mss(self):
447         """MAP-E TCP MSS"""
448
449         #
450         # Add a route to the MAP-BR
451         #
452         map_br_pfx = "2001::"
453         map_br_pfx_len = 32
454         map_route = VppIpRoute(
455             self,
456             map_br_pfx,
457             map_br_pfx_len,
458             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
459         )
460         map_route.add_vpp_config()
461
462         #
463         # Add a domain that maps from pg0 to pg1
464         #
465         map_dst = "2001::/32"
466         map_src = "3000::1/128"
467         client_pfx = "192.168.0.0/16"
468         map_translated_addr = "2001:0:101:5000:0:c0a8:101:5"
469         tag = "MAP-E TCP tag."
470         self.vapi.map_add_domain(
471             ip4_prefix=client_pfx,
472             ip6_prefix=map_dst,
473             ip6_src=map_src,
474             ea_bits_len=20,
475             psid_offset=4,
476             psid_length=4,
477             tag=tag,
478         )
479
480         # Enable MAP on pg0 interface.
481         self.vapi.map_if_enable_disable(
482             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=0
483         )
484
485         # Enable MAP on pg1 interface.
486         self.vapi.map_if_enable_disable(
487             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=0
488         )
489
490         # TCP MSS clamping
491         mss_clamp = 1300
492         self.vapi.map_param_set_tcp(mss_clamp)
493
494         #
495         # Send a v4 packet that will be encapped.
496         #
497         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
498         p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.1.1")
499         p_tcp = TCP(sport=20000, dport=30000, flags="S", options=[("MSS", 1455)])
500         p4 = p_ether / p_ip4 / p_tcp
501
502         self.pg1.add_stream(p4)
503         self.pg_enable_capture(self.pg_interfaces)
504         self.pg_start()
505
506         rx = self.pg1.get_capture(1)
507         rx = rx[0]
508
509         self.assertTrue(rx.haslayer(IPv6))
510         self.assertEqual(rx[IP].src, p4[IP].src)
511         self.assertEqual(rx[IP].dst, p4[IP].dst)
512         self.assertEqual(rx[IPv6].src, "3000::1")
513         self.assertEqual(rx[TCP].options, TCP(options=[("MSS", mss_clamp)]).options)
514
515     def validate(self, rx, expected):
516         self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
517
518     def validate_frag6(self, p6_frag, p_ip6_expected):
519         self.assertFalse(p6_frag.haslayer(IP))
520         self.assertTrue(p6_frag.haslayer(IPv6))
521         self.assertTrue(p6_frag.haslayer(IPv6ExtHdrFragment))
522         self.assertEqual(p6_frag[IPv6].src, p_ip6_expected.src)
523         self.assertEqual(p6_frag[IPv6].dst, p_ip6_expected.dst)
524
525     def validate_frag_payload_len6(self, rx, proto, payload_len_expected):
526         payload_total = 0
527         for p in rx:
528             payload_total += p[IPv6].plen
529
530         # First fragment has proto
531         payload_total -= len(proto())
532
533         # Every fragment has IPv6 fragment header
534         payload_total -= len(IPv6ExtHdrFragment()) * len(rx)
535
536         self.assertEqual(payload_total, payload_len_expected)
537
538     def validate_frag4(self, p4_frag, p_ip4_expected):
539         self.assertFalse(p4_frag.haslayer(IPv6))
540         self.assertTrue(p4_frag.haslayer(IP))
541         self.assertTrue(p4_frag[IP].frag != 0 or p4_frag[IP].flags.MF)
542         self.assertEqual(p4_frag[IP].src, p_ip4_expected.src)
543         self.assertEqual(p4_frag[IP].dst, p_ip4_expected.dst)
544
545     def validate_frag_payload_len4(self, rx, proto, payload_len_expected):
546         payload_total = 0
547         for p in rx:
548             payload_total += len(p[IP].payload)
549
550         # First fragment has proto
551         payload_total -= len(proto())
552
553         self.assertEqual(payload_total, payload_len_expected)
554
555     def payload(self, len):
556         return "x" * len
557
558     def test_map_t(self):
559         """MAP-T"""
560
561         #
562         # Add a domain that maps from pg0 to pg1
563         #
564         map_dst = "2001:db8::/32"
565         map_src = "1234:5678:90ab:cdef::/64"
566         ip4_pfx = "192.168.0.0/24"
567         tag = "MAP-T Tag."
568
569         self.vapi.map_add_domain(
570             ip6_prefix=map_dst,
571             ip4_prefix=ip4_pfx,
572             ip6_src=map_src,
573             ea_bits_len=16,
574             psid_offset=6,
575             psid_length=4,
576             mtu=1500,
577             tag=tag,
578         )
579
580         # Enable MAP-T on interfaces.
581         self.vapi.map_if_enable_disable(
582             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=1
583         )
584         self.vapi.map_if_enable_disable(
585             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
586         )
587
588         # Ensure MAP doesn't steal all packets!
589         v4 = (
590             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
591             / IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4)
592             / UDP(sport=20000, dport=10000)
593             / Raw(b"\xa5" * 100)
594         )
595         rx = self.send_and_expect(self.pg0, v4 * 1, self.pg0)
596         v4_reply = v4[1]
597         v4_reply.ttl -= 1
598         for p in rx:
599             self.validate(p[1], v4_reply)
600         # Ensure MAP doesn't steal all packets
601         v6 = (
602             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
603             / IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6)
604             / UDP(sport=20000, dport=10000)
605             / Raw(b"\xa5" * 100)
606         )
607         rx = self.send_and_expect(self.pg1, v6 * 1, self.pg1)
608         v6_reply = v6[1]
609         v6_reply.hlim -= 1
610         for p in rx:
611             self.validate(p[1], v6_reply)
612
613         map_route = VppIpRoute(
614             self,
615             "2001:db8::",
616             32,
617             [
618                 VppRoutePath(
619                     self.pg1.remote_ip6,
620                     self.pg1.sw_if_index,
621                     proto=DpoProto.DPO_PROTO_IP6,
622                 )
623             ],
624         )
625         map_route.add_vpp_config()
626
627         #
628         # Send a v4 packet that will be translated
629         #
630         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
631         p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.0.1")
632         payload = TCP(sport=0xABCD, dport=0xABCD)
633
634         p4 = p_ether / p_ip4 / payload
635         p6_translated = (
636             IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1f0::c0a8:1:f")
637             / payload
638         )
639         p6_translated.hlim -= 1
640         rx = self.send_and_expect(self.pg0, p4 * 1, self.pg1)
641         for p in rx:
642             self.validate(p[1], p6_translated)
643
644         # Send back an IPv6 packet that will be "untranslated"
645         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
646         p_ip6 = IPv6(
647             src="2001:db8:1f0::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
648         )
649         p6 = p_ether6 / p_ip6 / payload
650         p4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4) / payload
651         p4_translated.id = 0
652         p4_translated.ttl -= 1
653         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg0)
654         for p in rx:
655             self.validate(p[1], p4_translated)
656
657         # IPv4 TTL=0
658         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst="192.168.0.1", ttl=0)
659         p4 = p_ether / ip4_ttl_expired / payload
660
661         icmp4_reply = (
662             IP(id=0, ttl=255, src=self.pg0.local_ip4, dst=self.pg0.remote_ip4)
663             / ICMP(type="time-exceeded", code="ttl-zero-during-transit")
664             / IP(src=self.pg0.remote_ip4, dst="192.168.0.1", ttl=0)
665             / payload
666         )
667         rx = self.send_and_expect(self.pg0, p4 * 1, self.pg0)
668         for p in rx:
669             self.validate(p[1], icmp4_reply)
670
671         # IPv4 TTL=1
672         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst="192.168.0.1", ttl=1)
673         p4 = p_ether / ip4_ttl_expired / payload
674
675         icmp4_reply = (
676             IP(id=0, ttl=255, src=self.pg0.local_ip4, dst=self.pg0.remote_ip4)
677             / ICMP(type="time-exceeded", code="ttl-zero-during-transit")
678             / IP(src=self.pg0.remote_ip4, dst="192.168.0.1", ttl=1)
679             / payload
680         )
681         rx = self.send_and_expect(self.pg0, p4 * 1, self.pg0)
682         for p in rx:
683             self.validate(p[1], icmp4_reply)
684
685         # IPv6 Hop limit at BR
686         ip6_hlim_expired = IPv6(
687             hlim=1,
688             src="2001:db8:1ab::c0a8:1:ab",
689             dst="1234:5678:90ab:cdef:ac:1001:200:0",
690         )
691         p6 = p_ether6 / ip6_hlim_expired / payload
692
693         icmp6_reply = (
694             IPv6(hlim=255, src=self.pg1.local_ip6, dst="2001:db8:1ab::c0a8:1:ab")
695             / ICMPv6TimeExceeded(code=0)
696             / IPv6(
697                 src="2001:db8:1ab::c0a8:1:ab",
698                 dst="1234:5678:90ab:cdef:ac:1001:200:0",
699                 hlim=1,
700             )
701             / payload
702         )
703         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg1)
704         for p in rx:
705             self.validate(p[1], icmp6_reply)
706
707         # IPv6 Hop limit beyond BR
708         ip6_hlim_expired = IPv6(
709             hlim=0,
710             src="2001:db8:1ab::c0a8:1:ab",
711             dst="1234:5678:90ab:cdef:ac:1001:200:0",
712         )
713         p6 = p_ether6 / ip6_hlim_expired / payload
714
715         icmp6_reply = (
716             IPv6(hlim=255, src=self.pg1.local_ip6, dst="2001:db8:1ab::c0a8:1:ab")
717             / ICMPv6TimeExceeded(code=0)
718             / IPv6(
719                 src="2001:db8:1ab::c0a8:1:ab",
720                 dst="1234:5678:90ab:cdef:ac:1001:200:0",
721                 hlim=0,
722             )
723             / payload
724         )
725         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg1)
726         for p in rx:
727             self.validate(p[1], icmp6_reply)
728
729         # IPv4 Well-known port
730         p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.0.1")
731         payload = UDP(sport=200, dport=200)
732         p4 = p_ether / p_ip4 / payload
733         self.send_and_assert_no_replies(self.pg0, p4 * 1)
734
735         # IPv6 Well-known port
736         payload = UDP(sport=200, dport=200)
737         p6 = p_ether6 / p_ip6 / payload
738         self.send_and_assert_no_replies(self.pg1, p6 * 1)
739
740         # UDP packet fragmentation
741         payload_len = 1453
742         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
743         p4 = p_ether / p_ip4 / payload
744         self.pg_enable_capture()
745         self.pg0.add_stream(p4)
746         self.pg_start()
747         rx = self.pg1.get_capture(2)
748
749         p_ip6_translated = IPv6(
750             src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1e0::c0a8:1:e"
751         )
752         for p in rx:
753             self.validate_frag6(p, p_ip6_translated)
754
755         self.validate_frag_payload_len6(rx, UDP, payload_len)
756
757         # UDP packet fragmentation send fragments
758         payload_len = 1453
759         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
760         p4 = p_ether / p_ip4 / payload
761         frags = fragment_rfc791(p4, fragsize=1000)
762         self.pg_enable_capture()
763         self.pg0.add_stream(frags)
764         self.pg_start()
765         rx = self.pg1.get_capture(2)
766
767         for p in rx:
768             self.validate_frag6(p, p_ip6_translated)
769
770         self.validate_frag_payload_len6(rx, UDP, payload_len)
771
772         # Send back an fragmented IPv6 UDP packet that will be "untranslated"
773         payload = UDP(sport=4000, dport=40000) / self.payload(payload_len)
774         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
775         p_ip6 = IPv6(
776             src="2001:db8:1e0::c0a8:1:e", dst="1234:5678:90ab:cdef:ac:1001:200:0"
777         )
778         p6 = p_ether6 / p_ip6 / payload
779         frags6 = fragment_rfc8200(p6, identification=0xDCBA, fragsize=1000)
780
781         p_ip4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4)
782         p4_translated = p_ip4_translated / payload
783         p4_translated.id = 0
784         p4_translated.ttl -= 1
785
786         self.pg_enable_capture()
787         self.pg1.add_stream(frags6)
788         self.pg_start()
789         rx = self.pg0.get_capture(2)
790
791         for p in rx:
792             self.validate_frag4(p, p4_translated)
793
794         self.validate_frag_payload_len4(rx, UDP, payload_len)
795
796         # ICMP packet fragmentation
797         payload = ICMP(id=6529) / self.payload(payload_len)
798         p4 = p_ether / p_ip4 / payload
799         self.pg_enable_capture()
800         self.pg0.add_stream(p4)
801         self.pg_start()
802         rx = self.pg1.get_capture(2)
803
804         p_ip6_translated = IPv6(
805             src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:160::c0a8:1:6"
806         )
807         for p in rx:
808             self.validate_frag6(p, p_ip6_translated)
809
810         self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len)
811
812         # ICMP packet fragmentation send fragments
813         payload = ICMP(id=6529) / self.payload(payload_len)
814         p4 = p_ether / p_ip4 / payload
815         frags = fragment_rfc791(p4, fragsize=1000)
816         self.pg_enable_capture()
817         self.pg0.add_stream(frags)
818         self.pg_start()
819         rx = self.pg1.get_capture(2)
820
821         for p in rx:
822             self.validate_frag6(p, p_ip6_translated)
823
824         self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len)
825
826         # TCP MSS clamping
827         self.vapi.map_param_set_tcp(1300)
828
829         #
830         # Send a v4 TCP SYN packet that will be translated and MSS clamped
831         #
832         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
833         p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.0.1")
834         payload = TCP(sport=0xABCD, dport=0xABCD, flags="S", options=[("MSS", 1460)])
835
836         p4 = p_ether / p_ip4 / payload
837         p6_translated = (
838             IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1f0::c0a8:1:f")
839             / payload
840         )
841         p6_translated.hlim -= 1
842         p6_translated[TCP].options = [("MSS", 1300)]
843         rx = self.send_and_expect(self.pg0, p4 * 1, self.pg1)
844         for p in rx:
845             self.validate(p[1], p6_translated)
846
847         # Send back an IPv6 packet that will be "untranslated"
848         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
849         p_ip6 = IPv6(
850             src="2001:db8:1f0::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
851         )
852         p6 = p_ether6 / p_ip6 / payload
853         p4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4) / payload
854         p4_translated.id = 0
855         p4_translated.ttl -= 1
856         p4_translated[TCP].options = [("MSS", 1300)]
857         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg0)
858         for p in rx:
859             self.validate(p[1], p4_translated)
860
861         # TCP MSS clamping cleanup
862         self.vapi.map_param_set_tcp(0)
863
864         # Enable icmp6 param to get back ICMPv6 unreachable messages in case
865         # of security check fails
866         self.vapi.map_param_set_icmp6(enable_unreachable=1)
867
868         # Send back an IPv6 packet that will be droppped due to security
869         # check fail
870         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
871         p_ip6_sec_check_fail = IPv6(
872             src="2001:db8:1fe::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
873         )
874         payload = TCP(sport=0xABCD, dport=0xABCD)
875         p6 = p_ether6 / p_ip6_sec_check_fail / payload
876
877         self.pg_send(self.pg1, p6 * 1)
878         self.pg0.get_capture(0, timeout=1)
879         rx = self.pg1.get_capture(1)
880
881         icmp6_reply = (
882             IPv6(hlim=255, src=self.pg1.local_ip6, dst="2001:db8:1fe::c0a8:1:f")
883             / ICMPv6DestUnreach(code=5)
884             / p_ip6_sec_check_fail
885             / payload
886         )
887
888         for p in rx:
889             self.validate(p[1], icmp6_reply)
890
891         # ICMPv6 unreachable messages cleanup
892         self.vapi.map_param_set_icmp6(enable_unreachable=0)
893
894     def test_map_t_ip6_psid(self):
895         """MAP-T v6->v4 PSID validation"""
896
897         #
898         # Add a domain that maps from pg0 to pg1
899         #
900         map_dst = "2001:db8::/32"
901         map_src = "1234:5678:90ab:cdef::/64"
902         ip4_pfx = "192.168.0.0/24"
903         tag = "MAP-T Test Domain"
904
905         self.vapi.map_add_domain(
906             ip6_prefix=map_dst,
907             ip4_prefix=ip4_pfx,
908             ip6_src=map_src,
909             ea_bits_len=16,
910             psid_offset=6,
911             psid_length=4,
912             mtu=1500,
913             tag=tag,
914         )
915
916         # Enable MAP-T on interfaces.
917         self.vapi.map_if_enable_disable(
918             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=1
919         )
920         self.vapi.map_if_enable_disable(
921             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
922         )
923
924         map_route = VppIpRoute(
925             self,
926             "2001:db8::",
927             32,
928             [
929                 VppRoutePath(
930                     self.pg1.remote_ip6,
931                     self.pg1.sw_if_index,
932                     proto=DpoProto.DPO_PROTO_IP6,
933                 )
934             ],
935         )
936         map_route.add_vpp_config()
937
938         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
939         p_ip6 = IPv6(
940             src="2001:db8:1f0::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
941         )
942
943         # Send good IPv6 source port, ensure translated IPv4 received
944         payload = TCP(sport=0xABCD, dport=80)
945         p6 = p_ether6 / p_ip6 / payload
946         p4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4) / payload
947         p4_translated.id = 0
948         p4_translated.ttl -= 1
949         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg0)
950         for p in rx:
951             self.validate(p[1], p4_translated)
952
953         # Send bad IPv6 source port, ensure translated IPv4 not received
954         payload = TCP(sport=0xDCBA, dport=80)
955         p6 = p_ether6 / p_ip6 / payload
956         self.send_and_assert_no_replies(self.pg1, p6 * 1)
957
958     def test_map_t_pre_resolve(self):
959         """MAP-T pre-resolve"""
960
961         # Add a domain that maps from pg0 to pg1
962         map_dst = "2001:db8::/32"
963         map_src = "1234:5678:90ab:cdef::/64"
964         ip4_pfx = "192.168.0.0/24"
965         tag = "MAP-T Test Domain."
966
967         self.vapi.map_add_domain(
968             ip6_prefix=map_dst,
969             ip4_prefix=ip4_pfx,
970             ip6_src=map_src,
971             ea_bits_len=16,
972             psid_offset=6,
973             psid_length=4,
974             mtu=1500,
975             tag=tag,
976         )
977
978         # Enable MAP-T on interfaces.
979         self.vapi.map_if_enable_disable(
980             is_enable=1, sw_if_index=self.pg0.sw_if_index, is_translation=1
981         )
982         self.vapi.map_if_enable_disable(
983             is_enable=1, sw_if_index=self.pg1.sw_if_index, is_translation=1
984         )
985
986         # Enable pre-resolve option
987         self.vapi.map_param_add_del_pre_resolve(
988             ip4_nh_address="10.1.2.3", ip6_nh_address="4001::1", is_add=1
989         )
990
991         # Add a route to 4001::1 and expect the translated traffic to be
992         # sent via that route next-hop.
993         pre_res_route6 = VppIpRoute(
994             self,
995             "4001::1",
996             128,
997             [VppRoutePath(self.pg1.remote_hosts[2].ip6, self.pg1.sw_if_index)],
998         )
999         pre_res_route6.add_vpp_config()
1000
1001         # Add a route to 10.1.2.3 and expect the "untranslated" traffic to be
1002         # sent via that route next-hop.
1003         pre_res_route4 = VppIpRoute(
1004             self,
1005             "10.1.2.3",
1006             32,
1007             [VppRoutePath(self.pg0.remote_hosts[1].ip4, self.pg0.sw_if_index)],
1008         )
1009         pre_res_route4.add_vpp_config()
1010
1011         # Send an IPv4 packet that will be translated
1012         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1013         p_ip4 = IP(src=self.pg0.remote_ip4, dst="192.168.0.1")
1014         payload = TCP(sport=0xABCD, dport=0xABCD)
1015         p4 = p_ether / p_ip4 / payload
1016
1017         p6_translated = (
1018             IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0", dst="2001:db8:1f0::c0a8:1:f")
1019             / payload
1020         )
1021         p6_translated.hlim -= 1
1022
1023         rx = self.send_and_expect(self.pg0, p4 * 1, self.pg1)
1024         for p in rx:
1025             self.assertEqual(p[Ether].dst, self.pg1.remote_hosts[2].mac)
1026             self.validate(p[1], p6_translated)
1027
1028         # Send back an IPv6 packet that will be "untranslated"
1029         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1030         p_ip6 = IPv6(
1031             src="2001:db8:1f0::c0a8:1:f", dst="1234:5678:90ab:cdef:ac:1001:200:0"
1032         )
1033         p6 = p_ether6 / p_ip6 / payload
1034
1035         p4_translated = IP(src="192.168.0.1", dst=self.pg0.remote_ip4) / payload
1036         p4_translated.id = 0
1037         p4_translated.ttl -= 1
1038
1039         rx = self.send_and_expect(self.pg1, p6 * 1, self.pg0)
1040         for p in rx:
1041             self.assertEqual(p[Ether].dst, self.pg0.remote_hosts[1].mac)
1042             self.validate(p[1], p4_translated)
1043
1044         # Cleanup pre-resolve option
1045         self.vapi.map_param_add_del_pre_resolve(
1046             ip4_nh_address="10.1.2.3", ip6_nh_address="4001::1", is_add=0
1047         )
1048
1049
1050 if __name__ == "__main__":
1051     unittest.main(testRunner=VppTestRunner)