map: honor icmp6-unreachables param in map-t
[vpp.git] / src / plugins / map / test / test_map.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from util import fragment_rfc791, fragment_rfc8200
10
11 import scapy.compat
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.inet import IP, UDP, ICMP, TCP
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, IPv6ExtHdrFragment, \
16     ICMPv6EchoRequest, ICMPv6DestUnreach
17
18
19 class TestMAP(VppTestCase):
20     """ MAP Test Case """
21
22     @classmethod
23     def setUpClass(cls):
24         super(TestMAP, cls).setUpClass()
25
26     @classmethod
27     def tearDownClass(cls):
28         super(TestMAP, cls).tearDownClass()
29
30     def setUp(self):
31         super(TestMAP, self).setUp()
32
33         # create 2 pg interfaces
34         self.create_pg_interfaces(range(4))
35
36         # pg0 is 'inside' IPv4
37         self.pg0.admin_up()
38         self.pg0.config_ip4()
39         self.pg0.resolve_arp()
40
41         # pg1 is 'outside' IPv6
42         self.pg1.admin_up()
43         self.pg1.config_ip6()
44         self.pg1.generate_remote_hosts(4)
45         self.pg1.configure_ipv6_neighbors()
46
47     def tearDown(self):
48         super(TestMAP, self).tearDown()
49         for i in self.pg_interfaces:
50             i.unconfig_ip4()
51             i.unconfig_ip6()
52             i.admin_down()
53
54     def send_and_assert_encapped(self, packets, ip6_src, ip6_dst, dmac=None):
55         if not dmac:
56             dmac = self.pg1.remote_mac
57
58         self.pg0.add_stream(packets)
59
60         self.pg_enable_capture(self.pg_interfaces)
61         self.pg_start()
62
63         capture = self.pg1.get_capture(len(packets))
64         for rx, tx in zip(capture, packets):
65             self.assertEqual(rx[Ether].dst, dmac)
66             self.assertEqual(rx[IP].src, tx[IP].src)
67             self.assertEqual(rx[IPv6].src, ip6_src)
68             self.assertEqual(rx[IPv6].dst, ip6_dst)
69
70     def send_and_assert_encapped_one(self, packet, ip6_src, ip6_dst,
71                                      dmac=None):
72         return self.send_and_assert_encapped([packet], ip6_src, ip6_dst, dmac)
73
74     def test_api_map_domain_dump(self):
75         map_dst = '2001::/64'
76         map_src = '3000::1/128'
77         client_pfx = '192.168.0.0/16'
78         tag = 'MAP-E tag.'
79         index = self.vapi.map_add_domain(ip4_prefix=client_pfx,
80                                          ip6_prefix=map_dst,
81                                          ip6_src=map_src,
82                                          tag=tag).index
83         rv = self.vapi.map_domain_dump()
84
85         # restore the state early so as to not impact subsequent tests.
86         # If an assert fails, we will not get the chance to do it at the end.
87         self.vapi.map_del_domain(index=index)
88
89         self.assertGreater(len(rv), 0,
90                            "Expected output from 'map_domain_dump'")
91
92         # typedefs are returned as ipaddress objects.
93         # wrap results in str() ugh! to avoid the need to call unicode.
94         self.assertEqual(str(rv[0].ip4_prefix), client_pfx)
95         self.assertEqual(str(rv[0].ip6_prefix), map_dst)
96         self.assertEqual(str(rv[0].ip6_src), map_src)
97
98         self.assertEqual(rv[0].tag, tag,
99                          "output produced incorrect tag value.")
100
101     def test_map_e_udp(self):
102         """ MAP-E UDP"""
103
104         #
105         # Add a route to the MAP-BR
106         #
107         map_br_pfx = "2001::"
108         map_br_pfx_len = 32
109         map_route = VppIpRoute(self,
110                                map_br_pfx,
111                                map_br_pfx_len,
112                                [VppRoutePath(self.pg1.remote_ip6,
113                                              self.pg1.sw_if_index)])
114         map_route.add_vpp_config()
115
116         #
117         # Add a domain that maps from pg0 to pg1
118         #
119         map_dst = '2001::/32'
120         map_src = '3000::1/128'
121         client_pfx = '192.168.0.0/16'
122         map_translated_addr = '2001:0:101:7000:0:c0a8:101:7'
123         tag = 'MAP-E tag.'
124         self.vapi.map_add_domain(ip4_prefix=client_pfx,
125                                  ip6_prefix=map_dst,
126                                  ip6_src=map_src,
127                                  ea_bits_len=20,
128                                  psid_offset=4,
129                                  psid_length=4,
130                                  tag=tag)
131
132         self.vapi.map_param_set_security_check(enable=1, fragments=1)
133
134         # Enable MAP on interface.
135         self.vapi.map_if_enable_disable(is_enable=1,
136                                         sw_if_index=self.pg0.sw_if_index,
137                                         is_translation=0)
138
139         # Ensure MAP doesn't steal all packets!
140         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
141               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
142               UDP(sport=20000, dport=10000) /
143               Raw(b'\xa5' * 100))
144         rx = self.send_and_expect(self.pg0, v4 * 4, self.pg0)
145         v4_reply = v4[1]
146         v4_reply.ttl -= 1
147         for p in rx:
148             self.validate(p[1], v4_reply)
149
150         #
151         # Fire in a v4 packet that will be encapped to the BR
152         #
153         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
154               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
155               UDP(sport=20000, dport=10000) /
156               Raw(b'\xa5' * 100))
157
158         self.send_and_assert_encapped(v4 * 4, "3000::1", map_translated_addr)
159
160         #
161         # Verify reordered fragments are able to pass as well
162         #
163         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
164               IP(id=1, src=self.pg0.remote_ip4, dst='192.168.1.1') /
165               UDP(sport=20000, dport=10000) /
166               Raw(b'\xa5' * 1000))
167
168         frags = fragment_rfc791(v4, 400)
169         frags.reverse()
170
171         self.send_and_assert_encapped(frags, "3000::1", map_translated_addr)
172
173         # Enable MAP on interface.
174         self.vapi.map_if_enable_disable(is_enable=1,
175                                         sw_if_index=self.pg1.sw_if_index,
176                                         is_translation=0)
177
178         # Ensure MAP doesn't steal all packets
179         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
180               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
181               UDP(sport=20000, dport=10000) /
182               Raw(b'\xa5' * 100))
183         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
184         v6_reply = v6[1]
185         v6_reply.hlim -= 1
186         for p in rx:
187             self.validate(p[1], v6_reply)
188
189         #
190         # Fire in a V6 encapped packet.
191         # expect a decapped packet on the inside ip4 link
192         #
193         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
194              IPv6(dst='3000::1', src=map_translated_addr) /
195              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
196              UDP(sport=10000, dport=20000) /
197              Raw(b'\xa5' * 100))
198
199         self.pg1.add_stream(p)
200
201         self.pg_enable_capture(self.pg_interfaces)
202         self.pg_start()
203
204         rx = self.pg0.get_capture(1)
205         rx = rx[0]
206
207         self.assertFalse(rx.haslayer(IPv6))
208         self.assertEqual(rx[IP].src, p[IP].src)
209         self.assertEqual(rx[IP].dst, p[IP].dst)
210
211         #
212         # Verify encapped reordered fragments pass as well
213         #
214         p = (IP(id=1, dst=self.pg0.remote_ip4, src='192.168.1.1') /
215              UDP(sport=10000, dport=20000) /
216              Raw(b'\xa5' * 1500))
217         frags = fragment_rfc791(p, 400)
218         frags.reverse()
219
220         stream = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
221                   IPv6(dst='3000::1', src=map_translated_addr) /
222                   x for x in frags)
223
224         self.pg1.add_stream(stream)
225
226         self.pg_enable_capture(self.pg_interfaces)
227         self.pg_start()
228
229         rx = self.pg0.get_capture(len(frags))
230
231         for r in rx:
232             self.assertFalse(r.haslayer(IPv6))
233             self.assertEqual(r[IP].src, p[IP].src)
234             self.assertEqual(r[IP].dst, p[IP].dst)
235
236         # Verify that fragments pass even if ipv6 layer is fragmented
237         stream = (IPv6(dst='3000::1', src=map_translated_addr) / x
238                   for x in frags)
239
240         v6_stream = [
241             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / x
242             for i in range(len(frags))
243             for x in fragment_rfc8200(
244                 IPv6(dst='3000::1', src=map_translated_addr) / frags[i],
245                 i, 200)]
246
247         self.pg1.add_stream(v6_stream)
248
249         self.pg_enable_capture(self.pg_interfaces)
250         self.pg_start()
251
252         rx = self.pg0.get_capture(len(frags))
253
254         for r in rx:
255             self.assertFalse(r.haslayer(IPv6))
256             self.assertEqual(r[IP].src, p[IP].src)
257             self.assertEqual(r[IP].dst, p[IP].dst)
258
259         #
260         # Pre-resolve. No API for this!!
261         #
262         self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
263
264         self.send_and_assert_no_replies(self.pg0, v4,
265                                         "resolved via default route")
266
267         #
268         # Add a route to 4001::1. Expect the encapped traffic to be
269         # sent via that routes next-hop
270         #
271         pre_res_route = VppIpRoute(self, "4001::1", 128,
272                                    [VppRoutePath(self.pg1.remote_hosts[2].ip6,
273                                                  self.pg1.sw_if_index)])
274         pre_res_route.add_vpp_config()
275
276         self.send_and_assert_encapped_one(v4, "3000::1",
277                                           map_translated_addr,
278                                           dmac=self.pg1.remote_hosts[2].mac)
279
280         #
281         # change the route to the pre-solved next-hop
282         #
283         pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6,
284                                            self.pg1.sw_if_index)])
285         pre_res_route.add_vpp_config()
286
287         self.send_and_assert_encapped_one(v4, "3000::1",
288                                           map_translated_addr,
289                                           dmac=self.pg1.remote_hosts[3].mac)
290
291         #
292         # cleanup. The test infra's object registry will ensure
293         # the route is really gone and thus that the unresolve worked.
294         #
295         pre_res_route.remove_vpp_config()
296         self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
297
298     def test_map_e_inner_frag(self):
299         """ MAP-E Inner fragmentation """
300
301         #
302         # Add a route to the MAP-BR
303         #
304         map_br_pfx = "2001::"
305         map_br_pfx_len = 32
306         map_route = VppIpRoute(self,
307                                map_br_pfx,
308                                map_br_pfx_len,
309                                [VppRoutePath(self.pg1.remote_ip6,
310                                              self.pg1.sw_if_index)])
311         map_route.add_vpp_config()
312
313         #
314         # Add a domain that maps from pg0 to pg1
315         #
316         map_dst = '2001::/32'
317         map_src = '3000::1/128'
318         client_pfx = '192.168.0.0/16'
319         map_translated_addr = '2001:0:101:7000:0:c0a8:101:7'
320         tag = 'MAP-E tag.'
321         self.vapi.map_add_domain(ip4_prefix=client_pfx,
322                                  ip6_prefix=map_dst,
323                                  ip6_src=map_src,
324                                  ea_bits_len=20,
325                                  psid_offset=4,
326                                  psid_length=4,
327                                  mtu=1000,
328                                  tag=tag)
329
330         # Enable MAP on interface.
331         self.vapi.map_if_enable_disable(is_enable=1,
332                                         sw_if_index=self.pg0.sw_if_index,
333                                         is_translation=0)
334
335         # Enable inner fragmentation
336         self.vapi.map_param_set_fragmentation(inner=1)
337
338         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
339               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
340               UDP(sport=20000, dport=10000) /
341               Raw(b'\xa5' * 1300))
342
343         self.pg_send(self.pg0, v4*1)
344         rx = self.pg1.get_capture(2)
345
346         frags = fragment_rfc791(v4[1], 1000)
347         frags[0].id = 0
348         frags[1].id = 0
349         frags[0].ttl -= 1
350         frags[1].ttl -= 1
351         frags[0].chksum = 0
352         frags[1].chksum = 0
353
354         v6_reply1 = (IPv6(src='3000::1', dst=map_translated_addr, hlim=63) /
355                      frags[0])
356         v6_reply2 = (IPv6(src='3000::1', dst=map_translated_addr, hlim=63) /
357                      frags[1])
358         rx[0][1].fl = 0
359         rx[1][1].fl = 0
360         rx[0][1][IP].id = 0
361         rx[1][1][IP].id = 0
362         rx[0][1][IP].chksum = 0
363         rx[1][1][IP].chksum = 0
364
365         self.validate(rx[0][1], v6_reply1)
366         self.validate(rx[1][1], v6_reply2)
367
368     def test_map_e_tcp_mss(self):
369         """ MAP-E TCP MSS"""
370
371         #
372         # Add a route to the MAP-BR
373         #
374         map_br_pfx = "2001::"
375         map_br_pfx_len = 32
376         map_route = VppIpRoute(self,
377                                map_br_pfx,
378                                map_br_pfx_len,
379                                [VppRoutePath(self.pg1.remote_ip6,
380                                              self.pg1.sw_if_index)])
381         map_route.add_vpp_config()
382
383         #
384         # Add a domain that maps from pg0 to pg1
385         #
386         map_dst = '2001::/32'
387         map_src = '3000::1/128'
388         client_pfx = '192.168.0.0/16'
389         map_translated_addr = '2001:0:101:5000:0:c0a8:101:5'
390         tag = 'MAP-E TCP tag.'
391         self.vapi.map_add_domain(ip4_prefix=client_pfx,
392                                  ip6_prefix=map_dst,
393                                  ip6_src=map_src,
394                                  ea_bits_len=20,
395                                  psid_offset=4,
396                                  psid_length=4,
397                                  tag=tag)
398
399         # Enable MAP on pg0 interface.
400         self.vapi.map_if_enable_disable(is_enable=1,
401                                         sw_if_index=self.pg0.sw_if_index,
402                                         is_translation=0)
403
404         # Enable MAP on pg1 interface.
405         self.vapi.map_if_enable_disable(is_enable=1,
406                                         sw_if_index=self.pg1.sw_if_index,
407                                         is_translation=0)
408
409         # TCP MSS clamping
410         mss_clamp = 1300
411         self.vapi.map_param_set_tcp(mss_clamp)
412
413         #
414         # Send a v4 packet that will be encapped.
415         #
416         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
417         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.1.1')
418         p_tcp = TCP(sport=20000, dport=30000, flags="S",
419                     options=[("MSS", 1455)])
420         p4 = p_ether / p_ip4 / p_tcp
421
422         self.pg1.add_stream(p4)
423         self.pg_enable_capture(self.pg_interfaces)
424         self.pg_start()
425
426         rx = self.pg1.get_capture(1)
427         rx = rx[0]
428
429         self.assertTrue(rx.haslayer(IPv6))
430         self.assertEqual(rx[IP].src, p4[IP].src)
431         self.assertEqual(rx[IP].dst, p4[IP].dst)
432         self.assertEqual(rx[IPv6].src, "3000::1")
433         self.assertEqual(rx[TCP].options,
434                          TCP(options=[('MSS', mss_clamp)]).options)
435
436     def validate(self, rx, expected):
437         self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
438
439     def validate_frag(self, p6_frag, p_ip6_expected):
440         self.assertFalse(p6_frag.haslayer(IP))
441         self.assertTrue(p6_frag.haslayer(IPv6))
442         self.assertTrue(p6_frag.haslayer(IPv6ExtHdrFragment))
443         self.assertEqual(p6_frag[IPv6].src, p_ip6_expected.src)
444         self.assertEqual(p6_frag[IPv6].dst, p_ip6_expected.dst)
445
446     def validate_frag_payload_len(self, rx, proto, payload_len_expected):
447         payload_total = 0
448         for p in rx:
449             payload_total += p[IPv6].plen
450
451         # First fragment has proto
452         payload_total -= len(proto())
453
454         # Every fragment has IPv6 fragment header
455         payload_total -= len(IPv6ExtHdrFragment()) * len(rx)
456
457         self.assertEqual(payload_total, payload_len_expected)
458
459     def payload(self, len):
460         return 'x' * len
461
462     def test_map_t(self):
463         """ MAP-T """
464
465         #
466         # Add a domain that maps from pg0 to pg1
467         #
468         map_dst = '2001:db8::/32'
469         map_src = '1234:5678:90ab:cdef::/64'
470         ip4_pfx = '192.168.0.0/24'
471         tag = 'MAP-T Tag.'
472
473         self.vapi.map_add_domain(ip6_prefix=map_dst,
474                                  ip4_prefix=ip4_pfx,
475                                  ip6_src=map_src,
476                                  ea_bits_len=16,
477                                  psid_offset=6,
478                                  psid_length=4,
479                                  mtu=1500,
480                                  tag=tag)
481
482         # Enable MAP-T on interfaces.
483         self.vapi.map_if_enable_disable(is_enable=1,
484                                         sw_if_index=self.pg0.sw_if_index,
485                                         is_translation=1)
486         self.vapi.map_if_enable_disable(is_enable=1,
487                                         sw_if_index=self.pg1.sw_if_index,
488                                         is_translation=1)
489
490         # Ensure MAP doesn't steal all packets!
491         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
492               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
493               UDP(sport=20000, dport=10000) /
494               Raw(b'\xa5' * 100))
495         rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
496         v4_reply = v4[1]
497         v4_reply.ttl -= 1
498         for p in rx:
499             self.validate(p[1], v4_reply)
500         # Ensure MAP doesn't steal all packets
501         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
502               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
503               UDP(sport=20000, dport=10000) /
504               Raw(b'\xa5' * 100))
505         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
506         v6_reply = v6[1]
507         v6_reply.hlim -= 1
508         for p in rx:
509             self.validate(p[1], v6_reply)
510
511         map_route = VppIpRoute(self,
512                                "2001:db8::",
513                                32,
514                                [VppRoutePath(self.pg1.remote_ip6,
515                                              self.pg1.sw_if_index,
516                                              proto=DpoProto.DPO_PROTO_IP6)])
517         map_route.add_vpp_config()
518
519         #
520         # Send a v4 packet that will be translated
521         #
522         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
523         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
524         payload = TCP(sport=0xabcd, dport=0xabcd)
525
526         p4 = (p_ether / p_ip4 / payload)
527         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
528                               dst="2001:db8:1f0::c0a8:1:f") / payload)
529         p6_translated.hlim -= 1
530         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
531         for p in rx:
532             self.validate(p[1], p6_translated)
533
534         # Send back an IPv6 packet that will be "untranslated"
535         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
536         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
537                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
538         p6 = (p_ether6 / p_ip6 / payload)
539         p4_translated = (IP(src='192.168.0.1',
540                             dst=self.pg0.remote_ip4) / payload)
541         p4_translated.id = 0
542         p4_translated.ttl -= 1
543         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
544         for p in rx:
545             self.validate(p[1], p4_translated)
546
547         # IPv4 TTL=0
548         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
549         p4 = (p_ether / ip4_ttl_expired / payload)
550
551         icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
552                           dst=self.pg0.remote_ip4) /
553                        ICMP(type='time-exceeded',
554                             code='ttl-zero-during-transit') /
555                        IP(src=self.pg0.remote_ip4,
556                           dst='192.168.0.1', ttl=0) / payload)
557         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
558         for p in rx:
559             self.validate(p[1], icmp4_reply)
560
561         # IPv4 TTL=1
562         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
563         p4 = (p_ether / ip4_ttl_expired / payload)
564
565         icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
566                           dst=self.pg0.remote_ip4) /
567                        ICMP(type='time-exceeded',
568                             code='ttl-zero-during-transit') /
569                        IP(src=self.pg0.remote_ip4,
570                           dst='192.168.0.1', ttl=1) / payload)
571         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
572         for p in rx:
573             self.validate(p[1], icmp4_reply)
574
575         # IPv6 Hop limit
576         ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
577                                 dst='1234:5678:90ab:cdef:ac:1001:200:0')
578         p6 = (p_ether6 / ip6_hlim_expired / payload)
579
580         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
581                             dst="2001:db8:1ab::c0a8:1:ab") /
582                        ICMPv6TimeExceeded(code=0) /
583                        IPv6(src="2001:db8:1ab::c0a8:1:ab",
584                             dst='1234:5678:90ab:cdef:ac:1001:200:0',
585                             hlim=0) / payload)
586         rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
587         for p in rx:
588             self.validate(p[1], icmp6_reply)
589
590         # IPv4 Well-known port
591         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
592         payload = UDP(sport=200, dport=200)
593         p4 = (p_ether / p_ip4 / payload)
594         self.send_and_assert_no_replies(self.pg0, p4*1)
595
596         # IPv6 Well-known port
597         payload = UDP(sport=200, dport=200)
598         p6 = (p_ether6 / p_ip6 / payload)
599         self.send_and_assert_no_replies(self.pg1, p6*1)
600
601         # UDP packet fragmentation
602         payload_len = 1453
603         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
604         p4 = (p_ether / p_ip4 / payload)
605         self.pg_enable_capture()
606         self.pg0.add_stream(p4)
607         self.pg_start()
608         rx = self.pg1.get_capture(2)
609
610         p_ip6_translated = IPv6(src='1234:5678:90ab:cdef:ac:1001:200:0',
611                                 dst='2001:db8:1e0::c0a8:1:e')
612         for p in rx:
613             self.validate_frag(p, p_ip6_translated)
614
615         self.validate_frag_payload_len(rx, UDP, payload_len)
616
617         # UDP packet fragmentation send fragments
618         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
619         p4 = (p_ether / p_ip4 / payload)
620         frags = fragment_rfc791(p4, fragsize=1000)
621         self.pg_enable_capture()
622         self.pg0.add_stream(frags)
623         self.pg_start()
624         rx = self.pg1.get_capture(2)
625
626         for p in rx:
627             self.validate_frag(p, p_ip6_translated)
628
629         self.validate_frag_payload_len(rx, UDP, payload_len)
630
631         # ICMP packet fragmentation
632         payload = ICMP(id=6529) / self.payload(payload_len)
633         p4 = (p_ether / p_ip4 / payload)
634         self.pg_enable_capture()
635         self.pg0.add_stream(p4)
636         self.pg_start()
637         rx = self.pg1.get_capture(2)
638
639         p_ip6_translated = IPv6(src='1234:5678:90ab:cdef:ac:1001:200:0',
640                                 dst='2001:db8:160::c0a8:1:6')
641         for p in rx:
642             self.validate_frag(p, p_ip6_translated)
643
644         self.validate_frag_payload_len(rx, ICMPv6EchoRequest, payload_len)
645
646         # ICMP packet fragmentation send fragments
647         payload = ICMP(id=6529) / self.payload(payload_len)
648         p4 = (p_ether / p_ip4 / payload)
649         frags = fragment_rfc791(p4, fragsize=1000)
650         self.pg_enable_capture()
651         self.pg0.add_stream(frags)
652         self.pg_start()
653         rx = self.pg1.get_capture(2)
654
655         for p in rx:
656             self.validate_frag(p, p_ip6_translated)
657
658         self.validate_frag_payload_len(rx, ICMPv6EchoRequest, payload_len)
659
660         # TCP MSS clamping
661         self.vapi.map_param_set_tcp(1300)
662
663         #
664         # Send a v4 TCP SYN packet that will be translated and MSS clamped
665         #
666         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
667         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
668         payload = TCP(sport=0xabcd, dport=0xabcd, flags="S",
669                       options=[('MSS', 1460)])
670
671         p4 = (p_ether / p_ip4 / payload)
672         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
673                               dst="2001:db8:1f0::c0a8:1:f") / payload)
674         p6_translated.hlim -= 1
675         p6_translated[TCP].options = [('MSS', 1300)]
676         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
677         for p in rx:
678             self.validate(p[1], p6_translated)
679
680         # Send back an IPv6 packet that will be "untranslated"
681         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
682         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
683                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
684         p6 = (p_ether6 / p_ip6 / payload)
685         p4_translated = (IP(src='192.168.0.1',
686                             dst=self.pg0.remote_ip4) / payload)
687         p4_translated.id = 0
688         p4_translated.ttl -= 1
689         p4_translated[TCP].options = [('MSS', 1300)]
690         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
691         for p in rx:
692             self.validate(p[1], p4_translated)
693
694         # TCP MSS clamping cleanup
695         self.vapi.map_param_set_tcp(0)
696
697         # Enable icmp6 param to get back ICMPv6 unreachable messages in case
698         # of security check fails
699         self.vapi.map_param_set_icmp6(enable_unreachable=1)
700
701         # Send back an IPv6 packet that will be droppped due to security
702         # check fail
703         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
704         p_ip6_sec_check_fail = IPv6(src='2001:db8:1fe::c0a8:1:f',
705                                     dst='1234:5678:90ab:cdef:ac:1001:200:0')
706         payload = TCP(sport=0xabcd, dport=0xabcd)
707         p6 = (p_ether6 / p_ip6_sec_check_fail / payload)
708
709         self.pg_send(self.pg1, p6*1)
710         self.pg0.get_capture(0, timeout=1)
711         rx = self.pg1.get_capture(1)
712
713         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
714                             dst='2001:db8:1fe::c0a8:1:f') /
715                        ICMPv6DestUnreach(code=5) /
716                        p_ip6_sec_check_fail / payload)
717
718         for p in rx:
719             self.validate(p[1], icmp6_reply)
720
721         # ICMPv6 unreachable messages cleanup
722         self.vapi.map_param_set_icmp6(enable_unreachable=0)
723
724     def test_map_t_ip6_psid(self):
725         """ MAP-T v6->v4 PSID validation"""
726
727         #
728         # Add a domain that maps from pg0 to pg1
729         #
730         map_dst = '2001:db8::/32'
731         map_src = '1234:5678:90ab:cdef::/64'
732         ip4_pfx = '192.168.0.0/24'
733         tag = 'MAP-T Test Domain'
734
735         self.vapi.map_add_domain(ip6_prefix=map_dst,
736                                  ip4_prefix=ip4_pfx,
737                                  ip6_src=map_src,
738                                  ea_bits_len=16,
739                                  psid_offset=6,
740                                  psid_length=4,
741                                  mtu=1500,
742                                  tag=tag)
743
744         # Enable MAP-T on interfaces.
745         self.vapi.map_if_enable_disable(is_enable=1,
746                                         sw_if_index=self.pg0.sw_if_index,
747                                         is_translation=1)
748         self.vapi.map_if_enable_disable(is_enable=1,
749                                         sw_if_index=self.pg1.sw_if_index,
750                                         is_translation=1)
751
752         map_route = VppIpRoute(self,
753                                "2001:db8::",
754                                32,
755                                [VppRoutePath(self.pg1.remote_ip6,
756                                              self.pg1.sw_if_index,
757                                              proto=DpoProto.DPO_PROTO_IP6)])
758         map_route.add_vpp_config()
759
760         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
761         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
762                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
763
764         # Send good IPv6 source port, ensure translated IPv4 received
765         payload = TCP(sport=0xabcd, dport=80)
766         p6 = (p_ether6 / p_ip6 / payload)
767         p4_translated = (IP(src='192.168.0.1',
768                             dst=self.pg0.remote_ip4) / payload)
769         p4_translated.id = 0
770         p4_translated.ttl -= 1
771         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
772         for p in rx:
773             self.validate(p[1], p4_translated)
774
775         # Send bad IPv6 source port, ensure translated IPv4 not received
776         payload = TCP(sport=0xdcba, dport=80)
777         p6 = (p_ether6 / p_ip6 / payload)
778         self.send_and_assert_no_replies(self.pg1, p6*1)
779
780 if __name__ == '__main__':
781     unittest.main(testRunner=VppTestRunner)