22fe1e1d20a18add8de3ecb89ae0838ff127769c
[vpp.git] / test / test_map.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from util import fragment_rfc791, fragment_rfc8200
10
11 import scapy.compat
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.inet import IP, UDP, ICMP, TCP
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, IPv6ExtHdrFragment, \
16     ICMPv6EchoRequest, ICMPv6DestUnreach
17
18
19 class TestMAP(VppTestCase):
20     """ MAP Test Case """
21
22     @classmethod
23     def setUpClass(cls):
24         super(TestMAP, cls).setUpClass()
25
26     @classmethod
27     def tearDownClass(cls):
28         super(TestMAP, cls).tearDownClass()
29
30     def setUp(self):
31         super(TestMAP, self).setUp()
32
33         # create 2 pg interfaces
34         self.create_pg_interfaces(range(4))
35
36         # pg0 is 'inside' IPv4
37         self.pg0.admin_up()
38         self.pg0.config_ip4()
39         self.pg0.resolve_arp()
40         self.pg0.generate_remote_hosts(2)
41         self.pg0.configure_ipv4_neighbors()
42
43         # pg1 is 'outside' IPv6
44         self.pg1.admin_up()
45         self.pg1.config_ip6()
46         self.pg1.generate_remote_hosts(4)
47         self.pg1.configure_ipv6_neighbors()
48
49     def tearDown(self):
50         super(TestMAP, self).tearDown()
51         for i in self.pg_interfaces:
52             i.unconfig_ip4()
53             i.unconfig_ip6()
54             i.admin_down()
55
56     def send_and_assert_encapped(self, packets, ip6_src, ip6_dst, dmac=None):
57         if not dmac:
58             dmac = self.pg1.remote_mac
59
60         self.pg0.add_stream(packets)
61
62         self.pg_enable_capture(self.pg_interfaces)
63         self.pg_start()
64
65         capture = self.pg1.get_capture(len(packets))
66         for rx, tx in zip(capture, packets):
67             self.assertEqual(rx[Ether].dst, dmac)
68             self.assertEqual(rx[IP].src, tx[IP].src)
69             self.assertEqual(rx[IPv6].src, ip6_src)
70             self.assertEqual(rx[IPv6].dst, ip6_dst)
71
72     def send_and_assert_encapped_one(self, packet, ip6_src, ip6_dst,
73                                      dmac=None):
74         return self.send_and_assert_encapped([packet], ip6_src, ip6_dst, dmac)
75
76     def test_api_map_domain_dump(self):
77         map_dst = '2001::/64'
78         map_src = '3000::1/128'
79         client_pfx = '192.168.0.0/16'
80         tag = 'MAP-E tag.'
81         index = self.vapi.map_add_domain(ip4_prefix=client_pfx,
82                                          ip6_prefix=map_dst,
83                                          ip6_src=map_src,
84                                          tag=tag).index
85         rv = self.vapi.map_domain_dump()
86
87         # restore the state early so as to not impact subsequent tests.
88         # If an assert fails, we will not get the chance to do it at the end.
89         self.vapi.map_del_domain(index=index)
90
91         self.assertGreater(len(rv), 0,
92                            "Expected output from 'map_domain_dump'")
93
94         # typedefs are returned as ipaddress objects.
95         # wrap results in str() ugh! to avoid the need to call unicode.
96         self.assertEqual(str(rv[0].ip4_prefix), client_pfx)
97         self.assertEqual(str(rv[0].ip6_prefix), map_dst)
98         self.assertEqual(str(rv[0].ip6_src), map_src)
99
100         self.assertEqual(rv[0].tag, tag,
101                          "output produced incorrect tag value.")
102
103     def create_domains(self, ip4_pfx_str, ip6_pfx_str, ip6_src_str):
104         ip4_pfx = ipaddress.ip_network(ip4_pfx_str)
105         ip6_dst = ipaddress.ip_network(ip6_pfx_str)
106         mod = ip4_pfx.num_addresses / 1024
107         indicies = []
108         for i in range(ip4_pfx.num_addresses):
109             rv = self.vapi.map_add_domain(ip6_prefix=ip6_pfx_str,
110                                           ip4_prefix=str(ip4_pfx[i]) + "/32",
111                                           ip6_src=ip6_src_str)
112             indicies.append(rv.index)
113         return indicies
114
115     def test_api_map_domains_get(self):
116         # Create a bunch of domains
117         no_domains = 4096  # This must be large enough to ensure VPP suspends
118         domains = self.create_domains('130.67.0.0/20', '2001::/32',
119                                       '2001::1/128')
120         self.assertEqual(len(domains), no_domains)
121
122         d = []
123         cursor = 0
124
125         # Invalid cursor
126         rv, details = self.vapi.map_domains_get(cursor=no_domains+10)
127         self.assertEqual(rv.retval, -7)
128
129         # Delete a domain in the middle of walk
130         rv, details = self.vapi.map_domains_get(cursor=0)
131         self.assertEqual(rv.retval, -165)
132         self.vapi.map_del_domain(index=rv.cursor)
133         domains.remove(rv.cursor)
134
135         # Continue at point of deleted cursor
136         rv, details = self.vapi.map_domains_get(cursor=rv.cursor)
137         self.assertIn(rv.retval, [0, -165])
138
139         d = list(self.vapi.vpp.details_iter(self.vapi.map_domains_get))
140         self.assertEqual(len(d), no_domains - 1)
141
142         # Clean up
143         for i in domains:
144             self.vapi.map_del_domain(index=i)
145
146     def test_map_e_udp(self):
147         """ MAP-E UDP"""
148
149         #
150         # Add a route to the MAP-BR
151         #
152         map_br_pfx = "2001::"
153         map_br_pfx_len = 32
154         map_route = VppIpRoute(self,
155                                map_br_pfx,
156                                map_br_pfx_len,
157                                [VppRoutePath(self.pg1.remote_ip6,
158                                              self.pg1.sw_if_index)])
159         map_route.add_vpp_config()
160
161         #
162         # Add a domain that maps from pg0 to pg1
163         #
164         map_dst = '2001::/32'
165         map_src = '3000::1/128'
166         client_pfx = '192.168.0.0/16'
167         map_translated_addr = '2001:0:101:7000:0:c0a8:101:7'
168         tag = 'MAP-E tag.'
169         self.vapi.map_add_domain(ip4_prefix=client_pfx,
170                                  ip6_prefix=map_dst,
171                                  ip6_src=map_src,
172                                  ea_bits_len=20,
173                                  psid_offset=4,
174                                  psid_length=4,
175                                  tag=tag)
176
177         self.vapi.map_param_set_security_check(enable=1, fragments=1)
178
179         # Enable MAP on interface.
180         self.vapi.map_if_enable_disable(is_enable=1,
181                                         sw_if_index=self.pg0.sw_if_index,
182                                         is_translation=0)
183
184         # Ensure MAP doesn't steal all packets!
185         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
186               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
187               UDP(sport=20000, dport=10000) /
188               Raw(b'\xa5' * 100))
189         rx = self.send_and_expect(self.pg0, v4 * 4, self.pg0)
190         v4_reply = v4[1]
191         v4_reply.ttl -= 1
192         for p in rx:
193             self.validate(p[1], v4_reply)
194
195         #
196         # Fire in a v4 packet that will be encapped to the BR
197         #
198         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
199               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
200               UDP(sport=20000, dport=10000) /
201               Raw(b'\xa5' * 100))
202
203         self.send_and_assert_encapped(v4 * 4, "3000::1", map_translated_addr)
204
205         #
206         # Verify reordered fragments are able to pass as well
207         #
208         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
209               IP(id=1, src=self.pg0.remote_ip4, dst='192.168.1.1') /
210               UDP(sport=20000, dport=10000) /
211               Raw(b'\xa5' * 1000))
212
213         frags = fragment_rfc791(v4, 400)
214         frags.reverse()
215
216         self.send_and_assert_encapped(frags, "3000::1", map_translated_addr)
217
218         # Enable MAP on interface.
219         self.vapi.map_if_enable_disable(is_enable=1,
220                                         sw_if_index=self.pg1.sw_if_index,
221                                         is_translation=0)
222
223         # Ensure MAP doesn't steal all packets
224         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
225               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
226               UDP(sport=20000, dport=10000) /
227               Raw(b'\xa5' * 100))
228         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
229         v6_reply = v6[1]
230         v6_reply.hlim -= 1
231         for p in rx:
232             self.validate(p[1], v6_reply)
233
234         #
235         # Fire in a V6 encapped packet.
236         # expect a decapped packet on the inside ip4 link
237         #
238         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
239              IPv6(dst='3000::1', src=map_translated_addr) /
240              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
241              UDP(sport=10000, dport=20000) /
242              Raw(b'\xa5' * 100))
243
244         self.pg1.add_stream(p)
245
246         self.pg_enable_capture(self.pg_interfaces)
247         self.pg_start()
248
249         rx = self.pg0.get_capture(1)
250         rx = rx[0]
251
252         self.assertFalse(rx.haslayer(IPv6))
253         self.assertEqual(rx[IP].src, p[IP].src)
254         self.assertEqual(rx[IP].dst, p[IP].dst)
255
256         #
257         # Verify encapped reordered fragments pass as well
258         #
259         p = (IP(id=1, dst=self.pg0.remote_ip4, src='192.168.1.1') /
260              UDP(sport=10000, dport=20000) /
261              Raw(b'\xa5' * 1500))
262         frags = fragment_rfc791(p, 400)
263         frags.reverse()
264
265         stream = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
266                   IPv6(dst='3000::1', src=map_translated_addr) /
267                   x for x in frags)
268
269         self.pg1.add_stream(stream)
270
271         self.pg_enable_capture(self.pg_interfaces)
272         self.pg_start()
273
274         rx = self.pg0.get_capture(len(frags))
275
276         for r in rx:
277             self.assertFalse(r.haslayer(IPv6))
278             self.assertEqual(r[IP].src, p[IP].src)
279             self.assertEqual(r[IP].dst, p[IP].dst)
280
281         # Verify that fragments pass even if ipv6 layer is fragmented
282         stream = (IPv6(dst='3000::1', src=map_translated_addr) / x
283                   for x in frags)
284
285         v6_stream = [
286             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / x
287             for i in range(len(frags))
288             for x in fragment_rfc8200(
289                 IPv6(dst='3000::1', src=map_translated_addr) / frags[i],
290                 i, 200)]
291
292         self.pg1.add_stream(v6_stream)
293
294         self.pg_enable_capture(self.pg_interfaces)
295         self.pg_start()
296
297         rx = self.pg0.get_capture(len(frags))
298
299         for r in rx:
300             self.assertFalse(r.haslayer(IPv6))
301             self.assertEqual(r[IP].src, p[IP].src)
302             self.assertEqual(r[IP].dst, p[IP].dst)
303
304         #
305         # Pre-resolve. No API for this!!
306         #
307         self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
308
309         self.send_and_assert_no_replies(self.pg0, v4,
310                                         "resolved via default route")
311
312         #
313         # Add a route to 4001::1. Expect the encapped traffic to be
314         # sent via that routes next-hop
315         #
316         pre_res_route = VppIpRoute(self, "4001::1", 128,
317                                    [VppRoutePath(self.pg1.remote_hosts[2].ip6,
318                                                  self.pg1.sw_if_index)])
319         pre_res_route.add_vpp_config()
320
321         self.send_and_assert_encapped_one(v4, "3000::1",
322                                           map_translated_addr,
323                                           dmac=self.pg1.remote_hosts[2].mac)
324
325         #
326         # change the route to the pre-solved next-hop
327         #
328         pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6,
329                                            self.pg1.sw_if_index)])
330         pre_res_route.add_vpp_config()
331
332         self.send_and_assert_encapped_one(v4, "3000::1",
333                                           map_translated_addr,
334                                           dmac=self.pg1.remote_hosts[3].mac)
335
336         #
337         # cleanup. The test infra's object registry will ensure
338         # the route is really gone and thus that the unresolve worked.
339         #
340         pre_res_route.remove_vpp_config()
341         self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
342
343     def test_map_e_inner_frag(self):
344         """ MAP-E Inner fragmentation """
345
346         #
347         # Add a route to the MAP-BR
348         #
349         map_br_pfx = "2001::"
350         map_br_pfx_len = 32
351         map_route = VppIpRoute(self,
352                                map_br_pfx,
353                                map_br_pfx_len,
354                                [VppRoutePath(self.pg1.remote_ip6,
355                                              self.pg1.sw_if_index)])
356         map_route.add_vpp_config()
357
358         #
359         # Add a domain that maps from pg0 to pg1
360         #
361         map_dst = '2001::/32'
362         map_src = '3000::1/128'
363         client_pfx = '192.168.0.0/16'
364         map_translated_addr = '2001:0:101:7000:0:c0a8:101:7'
365         tag = 'MAP-E tag.'
366         self.vapi.map_add_domain(ip4_prefix=client_pfx,
367                                  ip6_prefix=map_dst,
368                                  ip6_src=map_src,
369                                  ea_bits_len=20,
370                                  psid_offset=4,
371                                  psid_length=4,
372                                  mtu=1000,
373                                  tag=tag)
374
375         # Enable MAP on interface.
376         self.vapi.map_if_enable_disable(is_enable=1,
377                                         sw_if_index=self.pg0.sw_if_index,
378                                         is_translation=0)
379
380         # Enable inner fragmentation
381         self.vapi.map_param_set_fragmentation(inner=1)
382
383         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
384               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
385               UDP(sport=20000, dport=10000) /
386               Raw(b'\xa5' * 1300))
387
388         self.pg_send(self.pg0, v4*1)
389         rx = self.pg1.get_capture(2)
390
391         # 1000-sizeof(ip6_header_t) = 960.
392         frags = fragment_rfc791(v4[1], 960)
393         frags[0].id = 0
394         frags[1].id = 0
395         frags[0].ttl -= 1
396         frags[1].ttl -= 1
397         frags[0].chksum = 0
398         frags[1].chksum = 0
399
400         v6_reply1 = (IPv6(src='3000::1', dst=map_translated_addr, hlim=63) /
401                      frags[0])
402         v6_reply2 = (IPv6(src='3000::1', dst=map_translated_addr, hlim=63) /
403                      frags[1])
404         rx[0][1].fl = 0
405         rx[1][1].fl = 0
406         rx[0][1][IP].id = 0
407         rx[1][1][IP].id = 0
408         rx[0][1][IP].chksum = 0
409         rx[1][1][IP].chksum = 0
410
411         self.validate(rx[0][1], v6_reply1)
412         self.validate(rx[1][1], v6_reply2)
413
414     def test_map_e_tcp_mss(self):
415         """ MAP-E TCP MSS"""
416
417         #
418         # Add a route to the MAP-BR
419         #
420         map_br_pfx = "2001::"
421         map_br_pfx_len = 32
422         map_route = VppIpRoute(self,
423                                map_br_pfx,
424                                map_br_pfx_len,
425                                [VppRoutePath(self.pg1.remote_ip6,
426                                              self.pg1.sw_if_index)])
427         map_route.add_vpp_config()
428
429         #
430         # Add a domain that maps from pg0 to pg1
431         #
432         map_dst = '2001::/32'
433         map_src = '3000::1/128'
434         client_pfx = '192.168.0.0/16'
435         map_translated_addr = '2001:0:101:5000:0:c0a8:101:5'
436         tag = 'MAP-E TCP tag.'
437         self.vapi.map_add_domain(ip4_prefix=client_pfx,
438                                  ip6_prefix=map_dst,
439                                  ip6_src=map_src,
440                                  ea_bits_len=20,
441                                  psid_offset=4,
442                                  psid_length=4,
443                                  tag=tag)
444
445         # Enable MAP on pg0 interface.
446         self.vapi.map_if_enable_disable(is_enable=1,
447                                         sw_if_index=self.pg0.sw_if_index,
448                                         is_translation=0)
449
450         # Enable MAP on pg1 interface.
451         self.vapi.map_if_enable_disable(is_enable=1,
452                                         sw_if_index=self.pg1.sw_if_index,
453                                         is_translation=0)
454
455         # TCP MSS clamping
456         mss_clamp = 1300
457         self.vapi.map_param_set_tcp(mss_clamp)
458
459         #
460         # Send a v4 packet that will be encapped.
461         #
462         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
463         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.1.1')
464         p_tcp = TCP(sport=20000, dport=30000, flags="S",
465                     options=[("MSS", 1455)])
466         p4 = p_ether / p_ip4 / p_tcp
467
468         self.pg1.add_stream(p4)
469         self.pg_enable_capture(self.pg_interfaces)
470         self.pg_start()
471
472         rx = self.pg1.get_capture(1)
473         rx = rx[0]
474
475         self.assertTrue(rx.haslayer(IPv6))
476         self.assertEqual(rx[IP].src, p4[IP].src)
477         self.assertEqual(rx[IP].dst, p4[IP].dst)
478         self.assertEqual(rx[IPv6].src, "3000::1")
479         self.assertEqual(rx[TCP].options,
480                          TCP(options=[('MSS', mss_clamp)]).options)
481
482     def validate(self, rx, expected):
483         self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
484
485     def validate_frag6(self, p6_frag, p_ip6_expected):
486         self.assertFalse(p6_frag.haslayer(IP))
487         self.assertTrue(p6_frag.haslayer(IPv6))
488         self.assertTrue(p6_frag.haslayer(IPv6ExtHdrFragment))
489         self.assertEqual(p6_frag[IPv6].src, p_ip6_expected.src)
490         self.assertEqual(p6_frag[IPv6].dst, p_ip6_expected.dst)
491
492     def validate_frag_payload_len6(self, rx, proto, payload_len_expected):
493         payload_total = 0
494         for p in rx:
495             payload_total += p[IPv6].plen
496
497         # First fragment has proto
498         payload_total -= len(proto())
499
500         # Every fragment has IPv6 fragment header
501         payload_total -= len(IPv6ExtHdrFragment()) * len(rx)
502
503         self.assertEqual(payload_total, payload_len_expected)
504
505     def validate_frag4(self, p4_frag, p_ip4_expected):
506         self.assertFalse(p4_frag.haslayer(IPv6))
507         self.assertTrue(p4_frag.haslayer(IP))
508         self.assertTrue(p4_frag[IP].frag != 0 or p4_frag[IP].flags.MF)
509         self.assertEqual(p4_frag[IP].src, p_ip4_expected.src)
510         self.assertEqual(p4_frag[IP].dst, p_ip4_expected.dst)
511
512     def validate_frag_payload_len4(self, rx, proto, payload_len_expected):
513         payload_total = 0
514         for p in rx:
515             payload_total += len(p[IP].payload)
516
517         # First fragment has proto
518         payload_total -= len(proto())
519
520         self.assertEqual(payload_total, payload_len_expected)
521
522     def payload(self, len):
523         return 'x' * len
524
525     def test_map_t(self):
526         """ MAP-T """
527
528         #
529         # Add a domain that maps from pg0 to pg1
530         #
531         map_dst = '2001:db8::/32'
532         map_src = '1234:5678:90ab:cdef::/64'
533         ip4_pfx = '192.168.0.0/24'
534         tag = 'MAP-T Tag.'
535
536         self.vapi.map_add_domain(ip6_prefix=map_dst,
537                                  ip4_prefix=ip4_pfx,
538                                  ip6_src=map_src,
539                                  ea_bits_len=16,
540                                  psid_offset=6,
541                                  psid_length=4,
542                                  mtu=1500,
543                                  tag=tag)
544
545         # Enable MAP-T on interfaces.
546         self.vapi.map_if_enable_disable(is_enable=1,
547                                         sw_if_index=self.pg0.sw_if_index,
548                                         is_translation=1)
549         self.vapi.map_if_enable_disable(is_enable=1,
550                                         sw_if_index=self.pg1.sw_if_index,
551                                         is_translation=1)
552
553         # Ensure MAP doesn't steal all packets!
554         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
555               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
556               UDP(sport=20000, dport=10000) /
557               Raw(b'\xa5' * 100))
558         rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
559         v4_reply = v4[1]
560         v4_reply.ttl -= 1
561         for p in rx:
562             self.validate(p[1], v4_reply)
563         # Ensure MAP doesn't steal all packets
564         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
565               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
566               UDP(sport=20000, dport=10000) /
567               Raw(b'\xa5' * 100))
568         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
569         v6_reply = v6[1]
570         v6_reply.hlim -= 1
571         for p in rx:
572             self.validate(p[1], v6_reply)
573
574         map_route = VppIpRoute(self,
575                                "2001:db8::",
576                                32,
577                                [VppRoutePath(self.pg1.remote_ip6,
578                                              self.pg1.sw_if_index,
579                                              proto=DpoProto.DPO_PROTO_IP6)])
580         map_route.add_vpp_config()
581
582         #
583         # Send a v4 packet that will be translated
584         #
585         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
586         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
587         payload = TCP(sport=0xabcd, dport=0xabcd)
588
589         p4 = (p_ether / p_ip4 / payload)
590         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
591                               dst="2001:db8:1f0::c0a8:1:f") / payload)
592         p6_translated.hlim -= 1
593         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
594         for p in rx:
595             self.validate(p[1], p6_translated)
596
597         # Send back an IPv6 packet that will be "untranslated"
598         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
599         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
600                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
601         p6 = (p_ether6 / p_ip6 / payload)
602         p4_translated = (IP(src='192.168.0.1',
603                             dst=self.pg0.remote_ip4) / payload)
604         p4_translated.id = 0
605         p4_translated.ttl -= 1
606         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
607         for p in rx:
608             self.validate(p[1], p4_translated)
609
610         # IPv4 TTL=0
611         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
612         p4 = (p_ether / ip4_ttl_expired / payload)
613
614         icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
615                           dst=self.pg0.remote_ip4) /
616                        ICMP(type='time-exceeded',
617                             code='ttl-zero-during-transit') /
618                        IP(src=self.pg0.remote_ip4,
619                           dst='192.168.0.1', ttl=0) / payload)
620         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
621         for p in rx:
622             self.validate(p[1], icmp4_reply)
623
624         # IPv4 TTL=1
625         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
626         p4 = (p_ether / ip4_ttl_expired / payload)
627
628         icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
629                           dst=self.pg0.remote_ip4) /
630                        ICMP(type='time-exceeded',
631                             code='ttl-zero-during-transit') /
632                        IP(src=self.pg0.remote_ip4,
633                           dst='192.168.0.1', ttl=1) / payload)
634         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
635         for p in rx:
636             self.validate(p[1], icmp4_reply)
637
638         # IPv6 Hop limit at BR
639         ip6_hlim_expired = IPv6(hlim=1, src='2001:db8:1ab::c0a8:1:ab',
640                                 dst='1234:5678:90ab:cdef:ac:1001:200:0')
641         p6 = (p_ether6 / ip6_hlim_expired / payload)
642
643         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
644                             dst="2001:db8:1ab::c0a8:1:ab") /
645                        ICMPv6TimeExceeded(code=0) /
646                        IPv6(src="2001:db8:1ab::c0a8:1:ab",
647                             dst='1234:5678:90ab:cdef:ac:1001:200:0',
648                             hlim=1) / payload)
649         rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
650         for p in rx:
651             self.validate(p[1], icmp6_reply)
652
653         # IPv6 Hop limit beyond BR
654         ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
655                                 dst='1234:5678:90ab:cdef:ac:1001:200:0')
656         p6 = (p_ether6 / ip6_hlim_expired / payload)
657
658         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
659                             dst="2001:db8:1ab::c0a8:1:ab") /
660                        ICMPv6TimeExceeded(code=0) /
661                        IPv6(src="2001:db8:1ab::c0a8:1:ab",
662                             dst='1234:5678:90ab:cdef:ac:1001:200:0',
663                             hlim=0) / payload)
664         rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
665         for p in rx:
666             self.validate(p[1], icmp6_reply)
667
668         # IPv4 Well-known port
669         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
670         payload = UDP(sport=200, dport=200)
671         p4 = (p_ether / p_ip4 / payload)
672         self.send_and_assert_no_replies(self.pg0, p4*1)
673
674         # IPv6 Well-known port
675         payload = UDP(sport=200, dport=200)
676         p6 = (p_ether6 / p_ip6 / payload)
677         self.send_and_assert_no_replies(self.pg1, p6*1)
678
679         # UDP packet fragmentation
680         payload_len = 1453
681         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
682         p4 = (p_ether / p_ip4 / payload)
683         self.pg_enable_capture()
684         self.pg0.add_stream(p4)
685         self.pg_start()
686         rx = self.pg1.get_capture(2)
687
688         p_ip6_translated = IPv6(src='1234:5678:90ab:cdef:ac:1001:200:0',
689                                 dst='2001:db8:1e0::c0a8:1:e')
690         for p in rx:
691             self.validate_frag6(p, p_ip6_translated)
692
693         self.validate_frag_payload_len6(rx, UDP, payload_len)
694
695         # UDP packet fragmentation send fragments
696         payload_len = 1453
697         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
698         p4 = (p_ether / p_ip4 / payload)
699         frags = fragment_rfc791(p4, fragsize=1000)
700         self.pg_enable_capture()
701         self.pg0.add_stream(frags)
702         self.pg_start()
703         rx = self.pg1.get_capture(2)
704
705         for p in rx:
706             self.validate_frag6(p, p_ip6_translated)
707
708         self.validate_frag_payload_len6(rx, UDP, payload_len)
709
710         # Send back an fragmented IPv6 UDP packet that will be "untranslated"
711         payload = UDP(sport=4000, dport=40000) / self.payload(payload_len)
712         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
713         p_ip6 = IPv6(src='2001:db8:1e0::c0a8:1:e',
714                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
715         p6 = (p_ether6 / p_ip6 / payload)
716         frags6 = fragment_rfc8200(p6, identification=0xdcba, fragsize=1000)
717
718         p_ip4_translated = IP(src='192.168.0.1', dst=self.pg0.remote_ip4)
719         p4_translated = (p_ip4_translated / payload)
720         p4_translated.id = 0
721         p4_translated.ttl -= 1
722
723         self.pg_enable_capture()
724         self.pg1.add_stream(frags6)
725         self.pg_start()
726         rx = self.pg0.get_capture(2)
727
728         for p in rx:
729             self.validate_frag4(p, p4_translated)
730
731         self.validate_frag_payload_len4(rx, UDP, payload_len)
732
733         # ICMP packet fragmentation
734         payload = ICMP(id=6529) / self.payload(payload_len)
735         p4 = (p_ether / p_ip4 / payload)
736         self.pg_enable_capture()
737         self.pg0.add_stream(p4)
738         self.pg_start()
739         rx = self.pg1.get_capture(2)
740
741         p_ip6_translated = IPv6(src='1234:5678:90ab:cdef:ac:1001:200:0',
742                                 dst='2001:db8:160::c0a8:1:6')
743         for p in rx:
744             self.validate_frag6(p, p_ip6_translated)
745
746         self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len)
747
748         # ICMP packet fragmentation send fragments
749         payload = ICMP(id=6529) / self.payload(payload_len)
750         p4 = (p_ether / p_ip4 / payload)
751         frags = fragment_rfc791(p4, fragsize=1000)
752         self.pg_enable_capture()
753         self.pg0.add_stream(frags)
754         self.pg_start()
755         rx = self.pg1.get_capture(2)
756
757         for p in rx:
758             self.validate_frag6(p, p_ip6_translated)
759
760         self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len)
761
762         # TCP MSS clamping
763         self.vapi.map_param_set_tcp(1300)
764
765         #
766         # Send a v4 TCP SYN packet that will be translated and MSS clamped
767         #
768         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
769         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
770         payload = TCP(sport=0xabcd, dport=0xabcd, flags="S",
771                       options=[('MSS', 1460)])
772
773         p4 = (p_ether / p_ip4 / payload)
774         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
775                               dst="2001:db8:1f0::c0a8:1:f") / payload)
776         p6_translated.hlim -= 1
777         p6_translated[TCP].options = [('MSS', 1300)]
778         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
779         for p in rx:
780             self.validate(p[1], p6_translated)
781
782         # Send back an IPv6 packet that will be "untranslated"
783         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
784         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
785                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
786         p6 = (p_ether6 / p_ip6 / payload)
787         p4_translated = (IP(src='192.168.0.1',
788                             dst=self.pg0.remote_ip4) / payload)
789         p4_translated.id = 0
790         p4_translated.ttl -= 1
791         p4_translated[TCP].options = [('MSS', 1300)]
792         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
793         for p in rx:
794             self.validate(p[1], p4_translated)
795
796         # TCP MSS clamping cleanup
797         self.vapi.map_param_set_tcp(0)
798
799         # Enable icmp6 param to get back ICMPv6 unreachable messages in case
800         # of security check fails
801         self.vapi.map_param_set_icmp6(enable_unreachable=1)
802
803         # Send back an IPv6 packet that will be droppped due to security
804         # check fail
805         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
806         p_ip6_sec_check_fail = IPv6(src='2001:db8:1fe::c0a8:1:f',
807                                     dst='1234:5678:90ab:cdef:ac:1001:200:0')
808         payload = TCP(sport=0xabcd, dport=0xabcd)
809         p6 = (p_ether6 / p_ip6_sec_check_fail / payload)
810
811         self.pg_send(self.pg1, p6*1)
812         self.pg0.get_capture(0, timeout=1)
813         rx = self.pg1.get_capture(1)
814
815         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
816                             dst='2001:db8:1fe::c0a8:1:f') /
817                        ICMPv6DestUnreach(code=5) /
818                        p_ip6_sec_check_fail / payload)
819
820         for p in rx:
821             self.validate(p[1], icmp6_reply)
822
823         # ICMPv6 unreachable messages cleanup
824         self.vapi.map_param_set_icmp6(enable_unreachable=0)
825
826     def test_map_t_ip6_psid(self):
827         """ MAP-T v6->v4 PSID validation"""
828
829         #
830         # Add a domain that maps from pg0 to pg1
831         #
832         map_dst = '2001:db8::/32'
833         map_src = '1234:5678:90ab:cdef::/64'
834         ip4_pfx = '192.168.0.0/24'
835         tag = 'MAP-T Test Domain'
836
837         self.vapi.map_add_domain(ip6_prefix=map_dst,
838                                  ip4_prefix=ip4_pfx,
839                                  ip6_src=map_src,
840                                  ea_bits_len=16,
841                                  psid_offset=6,
842                                  psid_length=4,
843                                  mtu=1500,
844                                  tag=tag)
845
846         # Enable MAP-T on interfaces.
847         self.vapi.map_if_enable_disable(is_enable=1,
848                                         sw_if_index=self.pg0.sw_if_index,
849                                         is_translation=1)
850         self.vapi.map_if_enable_disable(is_enable=1,
851                                         sw_if_index=self.pg1.sw_if_index,
852                                         is_translation=1)
853
854         map_route = VppIpRoute(self,
855                                "2001:db8::",
856                                32,
857                                [VppRoutePath(self.pg1.remote_ip6,
858                                              self.pg1.sw_if_index,
859                                              proto=DpoProto.DPO_PROTO_IP6)])
860         map_route.add_vpp_config()
861
862         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
863         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
864                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
865
866         # Send good IPv6 source port, ensure translated IPv4 received
867         payload = TCP(sport=0xabcd, dport=80)
868         p6 = (p_ether6 / p_ip6 / payload)
869         p4_translated = (IP(src='192.168.0.1',
870                             dst=self.pg0.remote_ip4) / payload)
871         p4_translated.id = 0
872         p4_translated.ttl -= 1
873         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
874         for p in rx:
875             self.validate(p[1], p4_translated)
876
877         # Send bad IPv6 source port, ensure translated IPv4 not received
878         payload = TCP(sport=0xdcba, dport=80)
879         p6 = (p_ether6 / p_ip6 / payload)
880         self.send_and_assert_no_replies(self.pg1, p6*1)
881
882     def test_map_t_pre_resolve(self):
883         """ MAP-T pre-resolve"""
884
885         # Add a domain that maps from pg0 to pg1
886         map_dst = '2001:db8::/32'
887         map_src = '1234:5678:90ab:cdef::/64'
888         ip4_pfx = '192.168.0.0/24'
889         tag = 'MAP-T Test Domain.'
890
891         self.vapi.map_add_domain(ip6_prefix=map_dst,
892                                  ip4_prefix=ip4_pfx,
893                                  ip6_src=map_src,
894                                  ea_bits_len=16,
895                                  psid_offset=6,
896                                  psid_length=4,
897                                  mtu=1500,
898                                  tag=tag)
899
900         # Enable MAP-T on interfaces.
901         self.vapi.map_if_enable_disable(is_enable=1,
902                                         sw_if_index=self.pg0.sw_if_index,
903                                         is_translation=1)
904         self.vapi.map_if_enable_disable(is_enable=1,
905                                         sw_if_index=self.pg1.sw_if_index,
906                                         is_translation=1)
907
908         # Enable pre-resolve option
909         self.vapi.map_param_add_del_pre_resolve(ip4_nh_address="10.1.2.3",
910                                                 ip6_nh_address="4001::1",
911                                                 is_add=1)
912
913         # Add a route to 4001::1 and expect the translated traffic to be
914         # sent via that route next-hop.
915         pre_res_route6 = VppIpRoute(self, "4001::1", 128,
916                                     [VppRoutePath(self.pg1.remote_hosts[2].ip6,
917                                                   self.pg1.sw_if_index)])
918         pre_res_route6.add_vpp_config()
919
920         # Add a route to 10.1.2.3 and expect the "untranslated" traffic to be
921         # sent via that route next-hop.
922         pre_res_route4 = VppIpRoute(self, "10.1.2.3", 32,
923                                     [VppRoutePath(self.pg0.remote_hosts[1].ip4,
924                                                   self.pg0.sw_if_index)])
925         pre_res_route4.add_vpp_config()
926
927         # Send an IPv4 packet that will be translated
928         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
929         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
930         payload = TCP(sport=0xabcd, dport=0xabcd)
931         p4 = (p_ether / p_ip4 / payload)
932
933         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
934                               dst="2001:db8:1f0::c0a8:1:f") / payload)
935         p6_translated.hlim -= 1
936
937         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
938         for p in rx:
939             self.assertEqual(p[Ether].dst, self.pg1.remote_hosts[2].mac)
940             self.validate(p[1], p6_translated)
941
942         # Send back an IPv6 packet that will be "untranslated"
943         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
944         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
945                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
946         p6 = (p_ether6 / p_ip6 / payload)
947
948         p4_translated = (IP(src='192.168.0.1',
949                             dst=self.pg0.remote_ip4) / payload)
950         p4_translated.id = 0
951         p4_translated.ttl -= 1
952
953         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
954         for p in rx:
955             self.assertEqual(p[Ether].dst, self.pg0.remote_hosts[1].mac)
956             self.validate(p[1], p4_translated)
957
958         # Cleanup pre-resolve option
959         self.vapi.map_param_add_del_pre_resolve(ip4_nh_address="10.1.2.3",
960                                                 ip6_nh_address="4001::1",
961                                                 is_add=0)
962
963
964 if __name__ == '__main__':
965     unittest.main(testRunner=VppTestRunner)