90fee30126715e53db6de03e18ba97eba41752df
[vpp.git] / test / test_map.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from util import fragment_rfc791, fragment_rfc8200
10
11 import scapy.compat
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.inet import IP, UDP, ICMP, TCP
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, IPv6ExtHdrFragment, \
16     ICMPv6EchoRequest, ICMPv6DestUnreach
17
18
19 class TestMAP(VppTestCase):
20     """ MAP Test Case """
21
22     @classmethod
23     def setUpClass(cls):
24         super(TestMAP, cls).setUpClass()
25
26     @classmethod
27     def tearDownClass(cls):
28         super(TestMAP, cls).tearDownClass()
29
30     def setUp(self):
31         super(TestMAP, self).setUp()
32
33         # create 2 pg interfaces
34         self.create_pg_interfaces(range(4))
35
36         # pg0 is 'inside' IPv4
37         self.pg0.admin_up()
38         self.pg0.config_ip4()
39         self.pg0.resolve_arp()
40         self.pg0.generate_remote_hosts(2)
41         self.pg0.configure_ipv4_neighbors()
42
43         # pg1 is 'outside' IPv6
44         self.pg1.admin_up()
45         self.pg1.config_ip6()
46         self.pg1.generate_remote_hosts(4)
47         self.pg1.configure_ipv6_neighbors()
48
49     def tearDown(self):
50         super(TestMAP, self).tearDown()
51         for i in self.pg_interfaces:
52             i.unconfig_ip4()
53             i.unconfig_ip6()
54             i.admin_down()
55
56     def send_and_assert_encapped(self, packets, ip6_src, ip6_dst, dmac=None):
57         if not dmac:
58             dmac = self.pg1.remote_mac
59
60         self.pg0.add_stream(packets)
61
62         self.pg_enable_capture(self.pg_interfaces)
63         self.pg_start()
64
65         capture = self.pg1.get_capture(len(packets))
66         for rx, tx in zip(capture, packets):
67             self.assertEqual(rx[Ether].dst, dmac)
68             self.assertEqual(rx[IP].src, tx[IP].src)
69             self.assertEqual(rx[IPv6].src, ip6_src)
70             self.assertEqual(rx[IPv6].dst, ip6_dst)
71
72     def send_and_assert_encapped_one(self, packet, ip6_src, ip6_dst,
73                                      dmac=None):
74         return self.send_and_assert_encapped([packet], ip6_src, ip6_dst, dmac)
75
76     def test_api_map_domain_dump(self):
77         map_dst = '2001::/64'
78         map_src = '3000::1/128'
79         client_pfx = '192.168.0.0/16'
80         tag = 'MAP-E tag.'
81         index = self.vapi.map_add_domain(ip4_prefix=client_pfx,
82                                          ip6_prefix=map_dst,
83                                          ip6_src=map_src,
84                                          tag=tag).index
85         rv = self.vapi.map_domain_dump()
86
87         # restore the state early so as to not impact subsequent tests.
88         # If an assert fails, we will not get the chance to do it at the end.
89         self.vapi.map_del_domain(index=index)
90
91         self.assertGreater(len(rv), 0,
92                            "Expected output from 'map_domain_dump'")
93
94         # typedefs are returned as ipaddress objects.
95         # wrap results in str() ugh! to avoid the need to call unicode.
96         self.assertEqual(str(rv[0].ip4_prefix), client_pfx)
97         self.assertEqual(str(rv[0].ip6_prefix), map_dst)
98         self.assertEqual(str(rv[0].ip6_src), map_src)
99
100         self.assertEqual(rv[0].tag, tag,
101                          "output produced incorrect tag value.")
102
103     def create_domains(self, ip4_pfx_str, ip6_pfx_str, ip6_src_str):
104         ip4_pfx = ipaddress.ip_network(ip4_pfx_str)
105         ip6_dst = ipaddress.ip_network(ip6_pfx_str)
106         mod = ip4_pfx.num_addresses / 1024
107         indicies = []
108         for i in range(ip4_pfx.num_addresses):
109             rv = self.vapi.map_add_domain(ip6_prefix=ip6_pfx_str,
110                                           ip4_prefix=str(ip4_pfx[i]) + "/32",
111                                           ip6_src=ip6_src_str)
112             indicies.append(rv.index)
113         return indicies
114
115     def test_api_map_domains_get(self):
116         # Create a bunch of domains
117         no_domains = 4096  # This must be large enough to ensure VPP suspends
118         domains = self.create_domains('130.67.0.0/20', '2001::/32',
119                                       '2001::1/128')
120         self.assertEqual(len(domains), no_domains)
121
122         d = []
123         cursor = 0
124
125         # Invalid cursor
126         rv, details = self.vapi.map_domains_get(cursor=no_domains+10)
127         self.assertEqual(rv.retval, -7)
128
129         # Delete a domain in the middle of walk
130         rv, details = self.vapi.map_domains_get(cursor=0)
131         self.assertEqual(rv.retval, -165)
132         self.vapi.map_del_domain(index=rv.cursor)
133         domains.remove(rv.cursor)
134
135         # Continue at point of deleted cursor
136         rv, details = self.vapi.map_domains_get(cursor=rv.cursor)
137         self.assertIn(rv.retval, [0, -165])
138
139         d = list(self.vapi.vpp.details_iter(self.vapi.map_domains_get))
140         self.assertEqual(len(d), no_domains - 1)
141
142         # Clean up
143         for i in domains:
144             self.vapi.map_del_domain(index=i)
145
146     def test_map_e_udp(self):
147         """ MAP-E UDP"""
148
149         #
150         # Add a route to the MAP-BR
151         #
152         map_br_pfx = "2001::"
153         map_br_pfx_len = 32
154         map_route = VppIpRoute(self,
155                                map_br_pfx,
156                                map_br_pfx_len,
157                                [VppRoutePath(self.pg1.remote_ip6,
158                                              self.pg1.sw_if_index)])
159         map_route.add_vpp_config()
160
161         #
162         # Add a domain that maps from pg0 to pg1
163         #
164         map_dst = '2001::/32'
165         map_src = '3000::1/128'
166         client_pfx = '192.168.0.0/16'
167         map_translated_addr = '2001:0:101:7000:0:c0a8:101:7'
168         tag = 'MAP-E tag.'
169         self.vapi.map_add_domain(ip4_prefix=client_pfx,
170                                  ip6_prefix=map_dst,
171                                  ip6_src=map_src,
172                                  ea_bits_len=20,
173                                  psid_offset=4,
174                                  psid_length=4,
175                                  tag=tag)
176
177         self.vapi.map_param_set_security_check(enable=1, fragments=1)
178
179         # Enable MAP on interface.
180         self.vapi.map_if_enable_disable(is_enable=1,
181                                         sw_if_index=self.pg0.sw_if_index,
182                                         is_translation=0)
183
184         # Ensure MAP doesn't steal all packets!
185         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
186               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
187               UDP(sport=20000, dport=10000) /
188               Raw(b'\xa5' * 100))
189         rx = self.send_and_expect(self.pg0, v4 * 4, self.pg0)
190         v4_reply = v4[1]
191         v4_reply.ttl -= 1
192         for p in rx:
193             self.validate(p[1], v4_reply)
194
195         #
196         # Fire in a v4 packet that will be encapped to the BR
197         #
198         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
199               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
200               UDP(sport=20000, dport=10000) /
201               Raw(b'\xa5' * 100))
202
203         self.send_and_assert_encapped(v4 * 4, "3000::1", map_translated_addr)
204
205         #
206         # Verify reordered fragments are able to pass as well
207         #
208         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
209               IP(id=1, src=self.pg0.remote_ip4, dst='192.168.1.1') /
210               UDP(sport=20000, dport=10000) /
211               Raw(b'\xa5' * 1000))
212
213         frags = fragment_rfc791(v4, 400)
214         frags.reverse()
215
216         self.send_and_assert_encapped(frags, "3000::1", map_translated_addr)
217
218         # Enable MAP on interface.
219         self.vapi.map_if_enable_disable(is_enable=1,
220                                         sw_if_index=self.pg1.sw_if_index,
221                                         is_translation=0)
222
223         # Ensure MAP doesn't steal all packets
224         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
225               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
226               UDP(sport=20000, dport=10000) /
227               Raw(b'\xa5' * 100))
228         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
229         v6_reply = v6[1]
230         v6_reply.hlim -= 1
231         for p in rx:
232             self.validate(p[1], v6_reply)
233
234         #
235         # Fire in a V6 encapped packet.
236         # expect a decapped packet on the inside ip4 link
237         #
238         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
239              IPv6(dst='3000::1', src=map_translated_addr) /
240              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
241              UDP(sport=10000, dport=20000) /
242              Raw(b'\xa5' * 100))
243
244         self.pg1.add_stream(p)
245
246         self.pg_enable_capture(self.pg_interfaces)
247         self.pg_start()
248
249         rx = self.pg0.get_capture(1)
250         rx = rx[0]
251
252         self.assertFalse(rx.haslayer(IPv6))
253         self.assertEqual(rx[IP].src, p[IP].src)
254         self.assertEqual(rx[IP].dst, p[IP].dst)
255
256         #
257         # Verify encapped reordered fragments pass as well
258         #
259         p = (IP(id=1, dst=self.pg0.remote_ip4, src='192.168.1.1') /
260              UDP(sport=10000, dport=20000) /
261              Raw(b'\xa5' * 1500))
262         frags = fragment_rfc791(p, 400)
263         frags.reverse()
264
265         stream = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
266                   IPv6(dst='3000::1', src=map_translated_addr) /
267                   x for x in frags)
268
269         self.pg1.add_stream(stream)
270
271         self.pg_enable_capture(self.pg_interfaces)
272         self.pg_start()
273
274         rx = self.pg0.get_capture(len(frags))
275
276         for r in rx:
277             self.assertFalse(r.haslayer(IPv6))
278             self.assertEqual(r[IP].src, p[IP].src)
279             self.assertEqual(r[IP].dst, p[IP].dst)
280
281         # Verify that fragments pass even if ipv6 layer is fragmented
282         stream = (IPv6(dst='3000::1', src=map_translated_addr) / x
283                   for x in frags)
284
285         v6_stream = [
286             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / x
287             for i in range(len(frags))
288             for x in fragment_rfc8200(
289                 IPv6(dst='3000::1', src=map_translated_addr) / frags[i],
290                 i, 200)]
291
292         self.pg1.add_stream(v6_stream)
293
294         self.pg_enable_capture(self.pg_interfaces)
295         self.pg_start()
296
297         rx = self.pg0.get_capture(len(frags))
298
299         for r in rx:
300             self.assertFalse(r.haslayer(IPv6))
301             self.assertEqual(r[IP].src, p[IP].src)
302             self.assertEqual(r[IP].dst, p[IP].dst)
303
304         #
305         # Pre-resolve. No API for this!!
306         #
307         self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
308
309         self.send_and_assert_no_replies(self.pg0, v4,
310                                         "resolved via default route")
311
312         #
313         # Add a route to 4001::1. Expect the encapped traffic to be
314         # sent via that routes next-hop
315         #
316         pre_res_route = VppIpRoute(self, "4001::1", 128,
317                                    [VppRoutePath(self.pg1.remote_hosts[2].ip6,
318                                                  self.pg1.sw_if_index)])
319         pre_res_route.add_vpp_config()
320
321         self.send_and_assert_encapped_one(v4, "3000::1",
322                                           map_translated_addr,
323                                           dmac=self.pg1.remote_hosts[2].mac)
324
325         #
326         # change the route to the pre-solved next-hop
327         #
328         pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6,
329                                            self.pg1.sw_if_index)])
330         pre_res_route.add_vpp_config()
331
332         self.send_and_assert_encapped_one(v4, "3000::1",
333                                           map_translated_addr,
334                                           dmac=self.pg1.remote_hosts[3].mac)
335
336         #
337         # cleanup. The test infra's object registry will ensure
338         # the route is really gone and thus that the unresolve worked.
339         #
340         pre_res_route.remove_vpp_config()
341         self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
342
343     def test_map_e_inner_frag(self):
344         """ MAP-E Inner fragmentation """
345
346         #
347         # Add a route to the MAP-BR
348         #
349         map_br_pfx = "2001::"
350         map_br_pfx_len = 32
351         map_route = VppIpRoute(self,
352                                map_br_pfx,
353                                map_br_pfx_len,
354                                [VppRoutePath(self.pg1.remote_ip6,
355                                              self.pg1.sw_if_index)])
356         map_route.add_vpp_config()
357
358         #
359         # Add a domain that maps from pg0 to pg1
360         #
361         map_dst = '2001::/32'
362         map_src = '3000::1/128'
363         client_pfx = '192.168.0.0/16'
364         map_translated_addr = '2001:0:101:7000:0:c0a8:101:7'
365         tag = 'MAP-E tag.'
366         self.vapi.map_add_domain(ip4_prefix=client_pfx,
367                                  ip6_prefix=map_dst,
368                                  ip6_src=map_src,
369                                  ea_bits_len=20,
370                                  psid_offset=4,
371                                  psid_length=4,
372                                  mtu=1000,
373                                  tag=tag)
374
375         # Enable MAP on interface.
376         self.vapi.map_if_enable_disable(is_enable=1,
377                                         sw_if_index=self.pg0.sw_if_index,
378                                         is_translation=0)
379
380         # Enable inner fragmentation
381         self.vapi.map_param_set_fragmentation(inner=1)
382
383         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
384               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
385               UDP(sport=20000, dport=10000) /
386               Raw(b'\xa5' * 1300))
387
388         self.pg_send(self.pg0, v4*1)
389         rx = self.pg1.get_capture(2)
390
391         frags = fragment_rfc791(v4[1], 1000)
392         frags[0].id = 0
393         frags[1].id = 0
394         frags[0].ttl -= 1
395         frags[1].ttl -= 1
396         frags[0].chksum = 0
397         frags[1].chksum = 0
398
399         v6_reply1 = (IPv6(src='3000::1', dst=map_translated_addr, hlim=63) /
400                      frags[0])
401         v6_reply2 = (IPv6(src='3000::1', dst=map_translated_addr, hlim=63) /
402                      frags[1])
403         rx[0][1].fl = 0
404         rx[1][1].fl = 0
405         rx[0][1][IP].id = 0
406         rx[1][1][IP].id = 0
407         rx[0][1][IP].chksum = 0
408         rx[1][1][IP].chksum = 0
409
410         self.validate(rx[0][1], v6_reply1)
411         self.validate(rx[1][1], v6_reply2)
412
413     def test_map_e_tcp_mss(self):
414         """ MAP-E TCP MSS"""
415
416         #
417         # Add a route to the MAP-BR
418         #
419         map_br_pfx = "2001::"
420         map_br_pfx_len = 32
421         map_route = VppIpRoute(self,
422                                map_br_pfx,
423                                map_br_pfx_len,
424                                [VppRoutePath(self.pg1.remote_ip6,
425                                              self.pg1.sw_if_index)])
426         map_route.add_vpp_config()
427
428         #
429         # Add a domain that maps from pg0 to pg1
430         #
431         map_dst = '2001::/32'
432         map_src = '3000::1/128'
433         client_pfx = '192.168.0.0/16'
434         map_translated_addr = '2001:0:101:5000:0:c0a8:101:5'
435         tag = 'MAP-E TCP tag.'
436         self.vapi.map_add_domain(ip4_prefix=client_pfx,
437                                  ip6_prefix=map_dst,
438                                  ip6_src=map_src,
439                                  ea_bits_len=20,
440                                  psid_offset=4,
441                                  psid_length=4,
442                                  tag=tag)
443
444         # Enable MAP on pg0 interface.
445         self.vapi.map_if_enable_disable(is_enable=1,
446                                         sw_if_index=self.pg0.sw_if_index,
447                                         is_translation=0)
448
449         # Enable MAP on pg1 interface.
450         self.vapi.map_if_enable_disable(is_enable=1,
451                                         sw_if_index=self.pg1.sw_if_index,
452                                         is_translation=0)
453
454         # TCP MSS clamping
455         mss_clamp = 1300
456         self.vapi.map_param_set_tcp(mss_clamp)
457
458         #
459         # Send a v4 packet that will be encapped.
460         #
461         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
462         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.1.1')
463         p_tcp = TCP(sport=20000, dport=30000, flags="S",
464                     options=[("MSS", 1455)])
465         p4 = p_ether / p_ip4 / p_tcp
466
467         self.pg1.add_stream(p4)
468         self.pg_enable_capture(self.pg_interfaces)
469         self.pg_start()
470
471         rx = self.pg1.get_capture(1)
472         rx = rx[0]
473
474         self.assertTrue(rx.haslayer(IPv6))
475         self.assertEqual(rx[IP].src, p4[IP].src)
476         self.assertEqual(rx[IP].dst, p4[IP].dst)
477         self.assertEqual(rx[IPv6].src, "3000::1")
478         self.assertEqual(rx[TCP].options,
479                          TCP(options=[('MSS', mss_clamp)]).options)
480
481     def validate(self, rx, expected):
482         self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
483
484     def validate_frag6(self, p6_frag, p_ip6_expected):
485         self.assertFalse(p6_frag.haslayer(IP))
486         self.assertTrue(p6_frag.haslayer(IPv6))
487         self.assertTrue(p6_frag.haslayer(IPv6ExtHdrFragment))
488         self.assertEqual(p6_frag[IPv6].src, p_ip6_expected.src)
489         self.assertEqual(p6_frag[IPv6].dst, p_ip6_expected.dst)
490
491     def validate_frag_payload_len6(self, rx, proto, payload_len_expected):
492         payload_total = 0
493         for p in rx:
494             payload_total += p[IPv6].plen
495
496         # First fragment has proto
497         payload_total -= len(proto())
498
499         # Every fragment has IPv6 fragment header
500         payload_total -= len(IPv6ExtHdrFragment()) * len(rx)
501
502         self.assertEqual(payload_total, payload_len_expected)
503
504     def validate_frag4(self, p4_frag, p_ip4_expected):
505         self.assertFalse(p4_frag.haslayer(IPv6))
506         self.assertTrue(p4_frag.haslayer(IP))
507         self.assertTrue(p4_frag[IP].frag != 0 or p4_frag[IP].flags.MF)
508         self.assertEqual(p4_frag[IP].src, p_ip4_expected.src)
509         self.assertEqual(p4_frag[IP].dst, p_ip4_expected.dst)
510
511     def validate_frag_payload_len4(self, rx, proto, payload_len_expected):
512         payload_total = 0
513         for p in rx:
514             payload_total += len(p[IP].payload)
515
516         # First fragment has proto
517         payload_total -= len(proto())
518
519         self.assertEqual(payload_total, payload_len_expected)
520
521     def payload(self, len):
522         return 'x' * len
523
524     def test_map_t(self):
525         """ MAP-T """
526
527         #
528         # Add a domain that maps from pg0 to pg1
529         #
530         map_dst = '2001:db8::/32'
531         map_src = '1234:5678:90ab:cdef::/64'
532         ip4_pfx = '192.168.0.0/24'
533         tag = 'MAP-T Tag.'
534
535         self.vapi.map_add_domain(ip6_prefix=map_dst,
536                                  ip4_prefix=ip4_pfx,
537                                  ip6_src=map_src,
538                                  ea_bits_len=16,
539                                  psid_offset=6,
540                                  psid_length=4,
541                                  mtu=1500,
542                                  tag=tag)
543
544         # Enable MAP-T on interfaces.
545         self.vapi.map_if_enable_disable(is_enable=1,
546                                         sw_if_index=self.pg0.sw_if_index,
547                                         is_translation=1)
548         self.vapi.map_if_enable_disable(is_enable=1,
549                                         sw_if_index=self.pg1.sw_if_index,
550                                         is_translation=1)
551
552         # Ensure MAP doesn't steal all packets!
553         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
554               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
555               UDP(sport=20000, dport=10000) /
556               Raw(b'\xa5' * 100))
557         rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
558         v4_reply = v4[1]
559         v4_reply.ttl -= 1
560         for p in rx:
561             self.validate(p[1], v4_reply)
562         # Ensure MAP doesn't steal all packets
563         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
564               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
565               UDP(sport=20000, dport=10000) /
566               Raw(b'\xa5' * 100))
567         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
568         v6_reply = v6[1]
569         v6_reply.hlim -= 1
570         for p in rx:
571             self.validate(p[1], v6_reply)
572
573         map_route = VppIpRoute(self,
574                                "2001:db8::",
575                                32,
576                                [VppRoutePath(self.pg1.remote_ip6,
577                                              self.pg1.sw_if_index,
578                                              proto=DpoProto.DPO_PROTO_IP6)])
579         map_route.add_vpp_config()
580
581         #
582         # Send a v4 packet that will be translated
583         #
584         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
585         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
586         payload = TCP(sport=0xabcd, dport=0xabcd)
587
588         p4 = (p_ether / p_ip4 / payload)
589         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
590                               dst="2001:db8:1f0::c0a8:1:f") / payload)
591         p6_translated.hlim -= 1
592         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
593         for p in rx:
594             self.validate(p[1], p6_translated)
595
596         # Send back an IPv6 packet that will be "untranslated"
597         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
598         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
599                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
600         p6 = (p_ether6 / p_ip6 / payload)
601         p4_translated = (IP(src='192.168.0.1',
602                             dst=self.pg0.remote_ip4) / payload)
603         p4_translated.id = 0
604         p4_translated.ttl -= 1
605         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
606         for p in rx:
607             self.validate(p[1], p4_translated)
608
609         # IPv4 TTL=0
610         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
611         p4 = (p_ether / ip4_ttl_expired / payload)
612
613         icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
614                           dst=self.pg0.remote_ip4) /
615                        ICMP(type='time-exceeded',
616                             code='ttl-zero-during-transit') /
617                        IP(src=self.pg0.remote_ip4,
618                           dst='192.168.0.1', ttl=0) / payload)
619         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
620         for p in rx:
621             self.validate(p[1], icmp4_reply)
622
623         # IPv4 TTL=1
624         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
625         p4 = (p_ether / ip4_ttl_expired / payload)
626
627         icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
628                           dst=self.pg0.remote_ip4) /
629                        ICMP(type='time-exceeded',
630                             code='ttl-zero-during-transit') /
631                        IP(src=self.pg0.remote_ip4,
632                           dst='192.168.0.1', ttl=1) / payload)
633         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
634         for p in rx:
635             self.validate(p[1], icmp4_reply)
636
637         # IPv6 Hop limit at BR
638         ip6_hlim_expired = IPv6(hlim=1, src='2001:db8:1ab::c0a8:1:ab',
639                                 dst='1234:5678:90ab:cdef:ac:1001:200:0')
640         p6 = (p_ether6 / ip6_hlim_expired / payload)
641
642         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
643                             dst="2001:db8:1ab::c0a8:1:ab") /
644                        ICMPv6TimeExceeded(code=0) /
645                        IPv6(src="2001:db8:1ab::c0a8:1:ab",
646                             dst='1234:5678:90ab:cdef:ac:1001:200:0',
647                             hlim=1) / payload)
648         rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
649         for p in rx:
650             self.validate(p[1], icmp6_reply)
651
652         # IPv6 Hop limit beyond BR
653         ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
654                                 dst='1234:5678:90ab:cdef:ac:1001:200:0')
655         p6 = (p_ether6 / ip6_hlim_expired / payload)
656
657         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
658                             dst="2001:db8:1ab::c0a8:1:ab") /
659                        ICMPv6TimeExceeded(code=0) /
660                        IPv6(src="2001:db8:1ab::c0a8:1:ab",
661                             dst='1234:5678:90ab:cdef:ac:1001:200:0',
662                             hlim=0) / payload)
663         rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
664         for p in rx:
665             self.validate(p[1], icmp6_reply)
666
667         # IPv4 Well-known port
668         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
669         payload = UDP(sport=200, dport=200)
670         p4 = (p_ether / p_ip4 / payload)
671         self.send_and_assert_no_replies(self.pg0, p4*1)
672
673         # IPv6 Well-known port
674         payload = UDP(sport=200, dport=200)
675         p6 = (p_ether6 / p_ip6 / payload)
676         self.send_and_assert_no_replies(self.pg1, p6*1)
677
678         # UDP packet fragmentation
679         payload_len = 1453
680         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
681         p4 = (p_ether / p_ip4 / payload)
682         self.pg_enable_capture()
683         self.pg0.add_stream(p4)
684         self.pg_start()
685         rx = self.pg1.get_capture(2)
686
687         p_ip6_translated = IPv6(src='1234:5678:90ab:cdef:ac:1001:200:0',
688                                 dst='2001:db8:1e0::c0a8:1:e')
689         for p in rx:
690             self.validate_frag6(p, p_ip6_translated)
691
692         self.validate_frag_payload_len6(rx, UDP, payload_len)
693
694         # UDP packet fragmentation send fragments
695         payload_len = 1453
696         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
697         p4 = (p_ether / p_ip4 / payload)
698         frags = fragment_rfc791(p4, fragsize=1000)
699         self.pg_enable_capture()
700         self.pg0.add_stream(frags)
701         self.pg_start()
702         rx = self.pg1.get_capture(2)
703
704         for p in rx:
705             self.validate_frag6(p, p_ip6_translated)
706
707         self.validate_frag_payload_len6(rx, UDP, payload_len)
708
709         # Send back an fragmented IPv6 UDP packet that will be "untranslated"
710         payload = UDP(sport=4000, dport=40000) / self.payload(payload_len)
711         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
712         p_ip6 = IPv6(src='2001:db8:1e0::c0a8:1:e',
713                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
714         p6 = (p_ether6 / p_ip6 / payload)
715         frags6 = fragment_rfc8200(p6, identification=0xdcba, fragsize=1000)
716
717         p_ip4_translated = IP(src='192.168.0.1', dst=self.pg0.remote_ip4)
718         p4_translated = (p_ip4_translated / payload)
719         p4_translated.id = 0
720         p4_translated.ttl -= 1
721
722         self.pg_enable_capture()
723         self.pg1.add_stream(frags6)
724         self.pg_start()
725         rx = self.pg0.get_capture(2)
726
727         for p in rx:
728             self.validate_frag4(p, p4_translated)
729
730         self.validate_frag_payload_len4(rx, UDP, payload_len)
731
732         # ICMP packet fragmentation
733         payload = ICMP(id=6529) / self.payload(payload_len)
734         p4 = (p_ether / p_ip4 / payload)
735         self.pg_enable_capture()
736         self.pg0.add_stream(p4)
737         self.pg_start()
738         rx = self.pg1.get_capture(2)
739
740         p_ip6_translated = IPv6(src='1234:5678:90ab:cdef:ac:1001:200:0',
741                                 dst='2001:db8:160::c0a8:1:6')
742         for p in rx:
743             self.validate_frag6(p, p_ip6_translated)
744
745         self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len)
746
747         # ICMP packet fragmentation send fragments
748         payload = ICMP(id=6529) / self.payload(payload_len)
749         p4 = (p_ether / p_ip4 / payload)
750         frags = fragment_rfc791(p4, fragsize=1000)
751         self.pg_enable_capture()
752         self.pg0.add_stream(frags)
753         self.pg_start()
754         rx = self.pg1.get_capture(2)
755
756         for p in rx:
757             self.validate_frag6(p, p_ip6_translated)
758
759         self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len)
760
761         # TCP MSS clamping
762         self.vapi.map_param_set_tcp(1300)
763
764         #
765         # Send a v4 TCP SYN packet that will be translated and MSS clamped
766         #
767         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
768         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
769         payload = TCP(sport=0xabcd, dport=0xabcd, flags="S",
770                       options=[('MSS', 1460)])
771
772         p4 = (p_ether / p_ip4 / payload)
773         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
774                               dst="2001:db8:1f0::c0a8:1:f") / payload)
775         p6_translated.hlim -= 1
776         p6_translated[TCP].options = [('MSS', 1300)]
777         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
778         for p in rx:
779             self.validate(p[1], p6_translated)
780
781         # Send back an IPv6 packet that will be "untranslated"
782         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
783         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
784                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
785         p6 = (p_ether6 / p_ip6 / payload)
786         p4_translated = (IP(src='192.168.0.1',
787                             dst=self.pg0.remote_ip4) / payload)
788         p4_translated.id = 0
789         p4_translated.ttl -= 1
790         p4_translated[TCP].options = [('MSS', 1300)]
791         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
792         for p in rx:
793             self.validate(p[1], p4_translated)
794
795         # TCP MSS clamping cleanup
796         self.vapi.map_param_set_tcp(0)
797
798         # Enable icmp6 param to get back ICMPv6 unreachable messages in case
799         # of security check fails
800         self.vapi.map_param_set_icmp6(enable_unreachable=1)
801
802         # Send back an IPv6 packet that will be droppped due to security
803         # check fail
804         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
805         p_ip6_sec_check_fail = IPv6(src='2001:db8:1fe::c0a8:1:f',
806                                     dst='1234:5678:90ab:cdef:ac:1001:200:0')
807         payload = TCP(sport=0xabcd, dport=0xabcd)
808         p6 = (p_ether6 / p_ip6_sec_check_fail / payload)
809
810         self.pg_send(self.pg1, p6*1)
811         self.pg0.get_capture(0, timeout=1)
812         rx = self.pg1.get_capture(1)
813
814         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
815                             dst='2001:db8:1fe::c0a8:1:f') /
816                        ICMPv6DestUnreach(code=5) /
817                        p_ip6_sec_check_fail / payload)
818
819         for p in rx:
820             self.validate(p[1], icmp6_reply)
821
822         # ICMPv6 unreachable messages cleanup
823         self.vapi.map_param_set_icmp6(enable_unreachable=0)
824
825     def test_map_t_ip6_psid(self):
826         """ MAP-T v6->v4 PSID validation"""
827
828         #
829         # Add a domain that maps from pg0 to pg1
830         #
831         map_dst = '2001:db8::/32'
832         map_src = '1234:5678:90ab:cdef::/64'
833         ip4_pfx = '192.168.0.0/24'
834         tag = 'MAP-T Test Domain'
835
836         self.vapi.map_add_domain(ip6_prefix=map_dst,
837                                  ip4_prefix=ip4_pfx,
838                                  ip6_src=map_src,
839                                  ea_bits_len=16,
840                                  psid_offset=6,
841                                  psid_length=4,
842                                  mtu=1500,
843                                  tag=tag)
844
845         # Enable MAP-T on interfaces.
846         self.vapi.map_if_enable_disable(is_enable=1,
847                                         sw_if_index=self.pg0.sw_if_index,
848                                         is_translation=1)
849         self.vapi.map_if_enable_disable(is_enable=1,
850                                         sw_if_index=self.pg1.sw_if_index,
851                                         is_translation=1)
852
853         map_route = VppIpRoute(self,
854                                "2001:db8::",
855                                32,
856                                [VppRoutePath(self.pg1.remote_ip6,
857                                              self.pg1.sw_if_index,
858                                              proto=DpoProto.DPO_PROTO_IP6)])
859         map_route.add_vpp_config()
860
861         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
862         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
863                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
864
865         # Send good IPv6 source port, ensure translated IPv4 received
866         payload = TCP(sport=0xabcd, dport=80)
867         p6 = (p_ether6 / p_ip6 / payload)
868         p4_translated = (IP(src='192.168.0.1',
869                             dst=self.pg0.remote_ip4) / payload)
870         p4_translated.id = 0
871         p4_translated.ttl -= 1
872         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
873         for p in rx:
874             self.validate(p[1], p4_translated)
875
876         # Send bad IPv6 source port, ensure translated IPv4 not received
877         payload = TCP(sport=0xdcba, dport=80)
878         p6 = (p_ether6 / p_ip6 / payload)
879         self.send_and_assert_no_replies(self.pg1, p6*1)
880
881     def test_map_t_pre_resolve(self):
882         """ MAP-T pre-resolve"""
883
884         # Add a domain that maps from pg0 to pg1
885         map_dst = '2001:db8::/32'
886         map_src = '1234:5678:90ab:cdef::/64'
887         ip4_pfx = '192.168.0.0/24'
888         tag = 'MAP-T Test Domain.'
889
890         self.vapi.map_add_domain(ip6_prefix=map_dst,
891                                  ip4_prefix=ip4_pfx,
892                                  ip6_src=map_src,
893                                  ea_bits_len=16,
894                                  psid_offset=6,
895                                  psid_length=4,
896                                  mtu=1500,
897                                  tag=tag)
898
899         # Enable MAP-T on interfaces.
900         self.vapi.map_if_enable_disable(is_enable=1,
901                                         sw_if_index=self.pg0.sw_if_index,
902                                         is_translation=1)
903         self.vapi.map_if_enable_disable(is_enable=1,
904                                         sw_if_index=self.pg1.sw_if_index,
905                                         is_translation=1)
906
907         # Enable pre-resolve option
908         self.vapi.map_param_add_del_pre_resolve(ip4_nh_address="10.1.2.3",
909                                                 ip6_nh_address="4001::1",
910                                                 is_add=1)
911
912         # Add a route to 4001::1 and expect the translated traffic to be
913         # sent via that route next-hop.
914         pre_res_route6 = VppIpRoute(self, "4001::1", 128,
915                                     [VppRoutePath(self.pg1.remote_hosts[2].ip6,
916                                                   self.pg1.sw_if_index)])
917         pre_res_route6.add_vpp_config()
918
919         # Add a route to 10.1.2.3 and expect the "untranslated" traffic to be
920         # sent via that route next-hop.
921         pre_res_route4 = VppIpRoute(self, "10.1.2.3", 32,
922                                     [VppRoutePath(self.pg0.remote_hosts[1].ip4,
923                                                   self.pg0.sw_if_index)])
924         pre_res_route4.add_vpp_config()
925
926         # Send an IPv4 packet that will be translated
927         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
928         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
929         payload = TCP(sport=0xabcd, dport=0xabcd)
930         p4 = (p_ether / p_ip4 / payload)
931
932         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
933                               dst="2001:db8:1f0::c0a8:1:f") / payload)
934         p6_translated.hlim -= 1
935
936         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
937         for p in rx:
938             self.assertEqual(p[Ether].dst, self.pg1.remote_hosts[2].mac)
939             self.validate(p[1], p6_translated)
940
941         # Send back an IPv6 packet that will be "untranslated"
942         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
943         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
944                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
945         p6 = (p_ether6 / p_ip6 / payload)
946
947         p4_translated = (IP(src='192.168.0.1',
948                             dst=self.pg0.remote_ip4) / payload)
949         p4_translated.id = 0
950         p4_translated.ttl -= 1
951
952         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
953         for p in rx:
954             self.assertEqual(p[Ether].dst, self.pg0.remote_hosts[1].mac)
955             self.validate(p[1], p4_translated)
956
957         # Cleanup pre-resolve option
958         self.vapi.map_param_add_del_pre_resolve(ip4_nh_address="10.1.2.3",
959                                                 ip6_nh_address="4001::1",
960                                                 is_add=0)
961
962
963 if __name__ == '__main__':
964     unittest.main(testRunner=VppTestRunner)