api: add new stream message convention
[vpp.git] / src / plugins / map / test / test_map.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import unittest
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from util import fragment_rfc791, fragment_rfc8200
10
11 import scapy.compat
12 from scapy.layers.l2 import Ether
13 from scapy.packet import Raw
14 from scapy.layers.inet import IP, UDP, ICMP, TCP
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, IPv6ExtHdrFragment, \
16     ICMPv6EchoRequest, ICMPv6DestUnreach
17
18
19 class TestMAP(VppTestCase):
20     """ MAP Test Case """
21
22     @classmethod
23     def setUpClass(cls):
24         super(TestMAP, cls).setUpClass()
25
26     @classmethod
27     def tearDownClass(cls):
28         super(TestMAP, cls).tearDownClass()
29
30     def setUp(self):
31         super(TestMAP, self).setUp()
32
33         # create 2 pg interfaces
34         self.create_pg_interfaces(range(4))
35
36         # pg0 is 'inside' IPv4
37         self.pg0.admin_up()
38         self.pg0.config_ip4()
39         self.pg0.resolve_arp()
40         self.pg0.generate_remote_hosts(2)
41         self.pg0.configure_ipv4_neighbors()
42
43         # pg1 is 'outside' IPv6
44         self.pg1.admin_up()
45         self.pg1.config_ip6()
46         self.pg1.generate_remote_hosts(4)
47         self.pg1.configure_ipv6_neighbors()
48
49     def tearDown(self):
50         super(TestMAP, self).tearDown()
51         for i in self.pg_interfaces:
52             i.unconfig_ip4()
53             i.unconfig_ip6()
54             i.admin_down()
55
56     def send_and_assert_encapped(self, packets, ip6_src, ip6_dst, dmac=None):
57         if not dmac:
58             dmac = self.pg1.remote_mac
59
60         self.pg0.add_stream(packets)
61
62         self.pg_enable_capture(self.pg_interfaces)
63         self.pg_start()
64
65         capture = self.pg1.get_capture(len(packets))
66         for rx, tx in zip(capture, packets):
67             self.assertEqual(rx[Ether].dst, dmac)
68             self.assertEqual(rx[IP].src, tx[IP].src)
69             self.assertEqual(rx[IPv6].src, ip6_src)
70             self.assertEqual(rx[IPv6].dst, ip6_dst)
71
72     def send_and_assert_encapped_one(self, packet, ip6_src, ip6_dst,
73                                      dmac=None):
74         return self.send_and_assert_encapped([packet], ip6_src, ip6_dst, dmac)
75
76     def test_api_map_domain_dump(self):
77         map_dst = '2001::/64'
78         map_src = '3000::1/128'
79         client_pfx = '192.168.0.0/16'
80         tag = 'MAP-E tag.'
81         index = self.vapi.map_add_domain(ip4_prefix=client_pfx,
82                                          ip6_prefix=map_dst,
83                                          ip6_src=map_src,
84                                          tag=tag).index
85         rv = self.vapi.map_domain_dump()
86
87         # restore the state early so as to not impact subsequent tests.
88         # If an assert fails, we will not get the chance to do it at the end.
89         self.vapi.map_del_domain(index=index)
90
91         self.assertGreater(len(rv), 0,
92                            "Expected output from 'map_domain_dump'")
93
94         # typedefs are returned as ipaddress objects.
95         # wrap results in str() ugh! to avoid the need to call unicode.
96         self.assertEqual(str(rv[0].ip4_prefix), client_pfx)
97         self.assertEqual(str(rv[0].ip6_prefix), map_dst)
98         self.assertEqual(str(rv[0].ip6_src), map_src)
99
100         self.assertEqual(rv[0].tag, tag,
101                          "output produced incorrect tag value.")
102
103     def create_domains(self, ip4_pfx_str, ip6_pfx_str, ip6_src_str):
104         ip4_pfx = ipaddress.ip_network(ip4_pfx_str)
105         ip6_dst = ipaddress.ip_network(ip6_pfx_str)
106         mod = ip4_pfx.num_addresses / 1024
107         indicies = []
108         for i in range(ip4_pfx.num_addresses):
109             rv = self.vapi.map_add_domain(ip6_prefix=ip6_pfx_str,
110                                           ip4_prefix=str(ip4_pfx[i]) + "/32",
111                                           ip6_src=ip6_src_str)
112             indicies.append(rv.index)
113         return indicies
114
115     def test_api_map_domains_get(self):
116         # Create a bunch of domains
117         domains = self.create_domains('130.67.0.0/24', '2001::/32',
118                                       '2001::1/128')
119         self.assertEqual(len(domains), 256)
120
121         d = []
122         cursor = 0
123
124         # Invalid cursor
125         rv, details = self.vapi.map_domains_get(cursor=1234)
126         self.assertEqual(rv.retval, -7)
127
128         # Delete a domain in the middle of walk
129         rv, details = self.vapi.map_domains_get(cursor=0)
130         self.assertEqual(rv.retval, -165)
131         self.vapi.map_del_domain(index=rv.cursor)
132         domains.remove(rv.cursor)
133
134         # Continue at point of deleted cursor
135         rv, details = self.vapi.map_domains_get(cursor=rv.cursor)
136         self.assertEqual(rv.retval, -165)
137
138         d = list(self.vapi.vpp.details_iter(self.vapi.map_domains_get))
139         self.assertEqual(len(d), 255)
140
141         # Clean up
142         for i in domains:
143             self.vapi.map_del_domain(index=i)
144
145     def test_map_e_udp(self):
146         """ MAP-E UDP"""
147
148         #
149         # Add a route to the MAP-BR
150         #
151         map_br_pfx = "2001::"
152         map_br_pfx_len = 32
153         map_route = VppIpRoute(self,
154                                map_br_pfx,
155                                map_br_pfx_len,
156                                [VppRoutePath(self.pg1.remote_ip6,
157                                              self.pg1.sw_if_index)])
158         map_route.add_vpp_config()
159
160         #
161         # Add a domain that maps from pg0 to pg1
162         #
163         map_dst = '2001::/32'
164         map_src = '3000::1/128'
165         client_pfx = '192.168.0.0/16'
166         map_translated_addr = '2001:0:101:7000:0:c0a8:101:7'
167         tag = 'MAP-E tag.'
168         self.vapi.map_add_domain(ip4_prefix=client_pfx,
169                                  ip6_prefix=map_dst,
170                                  ip6_src=map_src,
171                                  ea_bits_len=20,
172                                  psid_offset=4,
173                                  psid_length=4,
174                                  tag=tag)
175
176         self.vapi.map_param_set_security_check(enable=1, fragments=1)
177
178         # Enable MAP on interface.
179         self.vapi.map_if_enable_disable(is_enable=1,
180                                         sw_if_index=self.pg0.sw_if_index,
181                                         is_translation=0)
182
183         # Ensure MAP doesn't steal all packets!
184         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
185               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
186               UDP(sport=20000, dport=10000) /
187               Raw(b'\xa5' * 100))
188         rx = self.send_and_expect(self.pg0, v4 * 4, self.pg0)
189         v4_reply = v4[1]
190         v4_reply.ttl -= 1
191         for p in rx:
192             self.validate(p[1], v4_reply)
193
194         #
195         # Fire in a v4 packet that will be encapped to the BR
196         #
197         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
198               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
199               UDP(sport=20000, dport=10000) /
200               Raw(b'\xa5' * 100))
201
202         self.send_and_assert_encapped(v4 * 4, "3000::1", map_translated_addr)
203
204         #
205         # Verify reordered fragments are able to pass as well
206         #
207         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
208               IP(id=1, src=self.pg0.remote_ip4, dst='192.168.1.1') /
209               UDP(sport=20000, dport=10000) /
210               Raw(b'\xa5' * 1000))
211
212         frags = fragment_rfc791(v4, 400)
213         frags.reverse()
214
215         self.send_and_assert_encapped(frags, "3000::1", map_translated_addr)
216
217         # Enable MAP on interface.
218         self.vapi.map_if_enable_disable(is_enable=1,
219                                         sw_if_index=self.pg1.sw_if_index,
220                                         is_translation=0)
221
222         # Ensure MAP doesn't steal all packets
223         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
224               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
225               UDP(sport=20000, dport=10000) /
226               Raw(b'\xa5' * 100))
227         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
228         v6_reply = v6[1]
229         v6_reply.hlim -= 1
230         for p in rx:
231             self.validate(p[1], v6_reply)
232
233         #
234         # Fire in a V6 encapped packet.
235         # expect a decapped packet on the inside ip4 link
236         #
237         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
238              IPv6(dst='3000::1', src=map_translated_addr) /
239              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
240              UDP(sport=10000, dport=20000) /
241              Raw(b'\xa5' * 100))
242
243         self.pg1.add_stream(p)
244
245         self.pg_enable_capture(self.pg_interfaces)
246         self.pg_start()
247
248         rx = self.pg0.get_capture(1)
249         rx = rx[0]
250
251         self.assertFalse(rx.haslayer(IPv6))
252         self.assertEqual(rx[IP].src, p[IP].src)
253         self.assertEqual(rx[IP].dst, p[IP].dst)
254
255         #
256         # Verify encapped reordered fragments pass as well
257         #
258         p = (IP(id=1, dst=self.pg0.remote_ip4, src='192.168.1.1') /
259              UDP(sport=10000, dport=20000) /
260              Raw(b'\xa5' * 1500))
261         frags = fragment_rfc791(p, 400)
262         frags.reverse()
263
264         stream = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
265                   IPv6(dst='3000::1', src=map_translated_addr) /
266                   x for x in frags)
267
268         self.pg1.add_stream(stream)
269
270         self.pg_enable_capture(self.pg_interfaces)
271         self.pg_start()
272
273         rx = self.pg0.get_capture(len(frags))
274
275         for r in rx:
276             self.assertFalse(r.haslayer(IPv6))
277             self.assertEqual(r[IP].src, p[IP].src)
278             self.assertEqual(r[IP].dst, p[IP].dst)
279
280         # Verify that fragments pass even if ipv6 layer is fragmented
281         stream = (IPv6(dst='3000::1', src=map_translated_addr) / x
282                   for x in frags)
283
284         v6_stream = [
285             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / x
286             for i in range(len(frags))
287             for x in fragment_rfc8200(
288                 IPv6(dst='3000::1', src=map_translated_addr) / frags[i],
289                 i, 200)]
290
291         self.pg1.add_stream(v6_stream)
292
293         self.pg_enable_capture(self.pg_interfaces)
294         self.pg_start()
295
296         rx = self.pg0.get_capture(len(frags))
297
298         for r in rx:
299             self.assertFalse(r.haslayer(IPv6))
300             self.assertEqual(r[IP].src, p[IP].src)
301             self.assertEqual(r[IP].dst, p[IP].dst)
302
303         #
304         # Pre-resolve. No API for this!!
305         #
306         self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
307
308         self.send_and_assert_no_replies(self.pg0, v4,
309                                         "resolved via default route")
310
311         #
312         # Add a route to 4001::1. Expect the encapped traffic to be
313         # sent via that routes next-hop
314         #
315         pre_res_route = VppIpRoute(self, "4001::1", 128,
316                                    [VppRoutePath(self.pg1.remote_hosts[2].ip6,
317                                                  self.pg1.sw_if_index)])
318         pre_res_route.add_vpp_config()
319
320         self.send_and_assert_encapped_one(v4, "3000::1",
321                                           map_translated_addr,
322                                           dmac=self.pg1.remote_hosts[2].mac)
323
324         #
325         # change the route to the pre-solved next-hop
326         #
327         pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6,
328                                            self.pg1.sw_if_index)])
329         pre_res_route.add_vpp_config()
330
331         self.send_and_assert_encapped_one(v4, "3000::1",
332                                           map_translated_addr,
333                                           dmac=self.pg1.remote_hosts[3].mac)
334
335         #
336         # cleanup. The test infra's object registry will ensure
337         # the route is really gone and thus that the unresolve worked.
338         #
339         pre_res_route.remove_vpp_config()
340         self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
341
342     def test_map_e_inner_frag(self):
343         """ MAP-E Inner fragmentation """
344
345         #
346         # Add a route to the MAP-BR
347         #
348         map_br_pfx = "2001::"
349         map_br_pfx_len = 32
350         map_route = VppIpRoute(self,
351                                map_br_pfx,
352                                map_br_pfx_len,
353                                [VppRoutePath(self.pg1.remote_ip6,
354                                              self.pg1.sw_if_index)])
355         map_route.add_vpp_config()
356
357         #
358         # Add a domain that maps from pg0 to pg1
359         #
360         map_dst = '2001::/32'
361         map_src = '3000::1/128'
362         client_pfx = '192.168.0.0/16'
363         map_translated_addr = '2001:0:101:7000:0:c0a8:101:7'
364         tag = 'MAP-E tag.'
365         self.vapi.map_add_domain(ip4_prefix=client_pfx,
366                                  ip6_prefix=map_dst,
367                                  ip6_src=map_src,
368                                  ea_bits_len=20,
369                                  psid_offset=4,
370                                  psid_length=4,
371                                  mtu=1000,
372                                  tag=tag)
373
374         # Enable MAP on interface.
375         self.vapi.map_if_enable_disable(is_enable=1,
376                                         sw_if_index=self.pg0.sw_if_index,
377                                         is_translation=0)
378
379         # Enable inner fragmentation
380         self.vapi.map_param_set_fragmentation(inner=1)
381
382         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
383               IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
384               UDP(sport=20000, dport=10000) /
385               Raw(b'\xa5' * 1300))
386
387         self.pg_send(self.pg0, v4*1)
388         rx = self.pg1.get_capture(2)
389
390         frags = fragment_rfc791(v4[1], 1000)
391         frags[0].id = 0
392         frags[1].id = 0
393         frags[0].ttl -= 1
394         frags[1].ttl -= 1
395         frags[0].chksum = 0
396         frags[1].chksum = 0
397
398         v6_reply1 = (IPv6(src='3000::1', dst=map_translated_addr, hlim=63) /
399                      frags[0])
400         v6_reply2 = (IPv6(src='3000::1', dst=map_translated_addr, hlim=63) /
401                      frags[1])
402         rx[0][1].fl = 0
403         rx[1][1].fl = 0
404         rx[0][1][IP].id = 0
405         rx[1][1][IP].id = 0
406         rx[0][1][IP].chksum = 0
407         rx[1][1][IP].chksum = 0
408
409         self.validate(rx[0][1], v6_reply1)
410         self.validate(rx[1][1], v6_reply2)
411
412     def test_map_e_tcp_mss(self):
413         """ MAP-E TCP MSS"""
414
415         #
416         # Add a route to the MAP-BR
417         #
418         map_br_pfx = "2001::"
419         map_br_pfx_len = 32
420         map_route = VppIpRoute(self,
421                                map_br_pfx,
422                                map_br_pfx_len,
423                                [VppRoutePath(self.pg1.remote_ip6,
424                                              self.pg1.sw_if_index)])
425         map_route.add_vpp_config()
426
427         #
428         # Add a domain that maps from pg0 to pg1
429         #
430         map_dst = '2001::/32'
431         map_src = '3000::1/128'
432         client_pfx = '192.168.0.0/16'
433         map_translated_addr = '2001:0:101:5000:0:c0a8:101:5'
434         tag = 'MAP-E TCP tag.'
435         self.vapi.map_add_domain(ip4_prefix=client_pfx,
436                                  ip6_prefix=map_dst,
437                                  ip6_src=map_src,
438                                  ea_bits_len=20,
439                                  psid_offset=4,
440                                  psid_length=4,
441                                  tag=tag)
442
443         # Enable MAP on pg0 interface.
444         self.vapi.map_if_enable_disable(is_enable=1,
445                                         sw_if_index=self.pg0.sw_if_index,
446                                         is_translation=0)
447
448         # Enable MAP on pg1 interface.
449         self.vapi.map_if_enable_disable(is_enable=1,
450                                         sw_if_index=self.pg1.sw_if_index,
451                                         is_translation=0)
452
453         # TCP MSS clamping
454         mss_clamp = 1300
455         self.vapi.map_param_set_tcp(mss_clamp)
456
457         #
458         # Send a v4 packet that will be encapped.
459         #
460         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
461         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.1.1')
462         p_tcp = TCP(sport=20000, dport=30000, flags="S",
463                     options=[("MSS", 1455)])
464         p4 = p_ether / p_ip4 / p_tcp
465
466         self.pg1.add_stream(p4)
467         self.pg_enable_capture(self.pg_interfaces)
468         self.pg_start()
469
470         rx = self.pg1.get_capture(1)
471         rx = rx[0]
472
473         self.assertTrue(rx.haslayer(IPv6))
474         self.assertEqual(rx[IP].src, p4[IP].src)
475         self.assertEqual(rx[IP].dst, p4[IP].dst)
476         self.assertEqual(rx[IPv6].src, "3000::1")
477         self.assertEqual(rx[TCP].options,
478                          TCP(options=[('MSS', mss_clamp)]).options)
479
480     def validate(self, rx, expected):
481         self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
482
483     def validate_frag6(self, p6_frag, p_ip6_expected):
484         self.assertFalse(p6_frag.haslayer(IP))
485         self.assertTrue(p6_frag.haslayer(IPv6))
486         self.assertTrue(p6_frag.haslayer(IPv6ExtHdrFragment))
487         self.assertEqual(p6_frag[IPv6].src, p_ip6_expected.src)
488         self.assertEqual(p6_frag[IPv6].dst, p_ip6_expected.dst)
489
490     def validate_frag_payload_len6(self, rx, proto, payload_len_expected):
491         payload_total = 0
492         for p in rx:
493             payload_total += p[IPv6].plen
494
495         # First fragment has proto
496         payload_total -= len(proto())
497
498         # Every fragment has IPv6 fragment header
499         payload_total -= len(IPv6ExtHdrFragment()) * len(rx)
500
501         self.assertEqual(payload_total, payload_len_expected)
502
503     def validate_frag4(self, p4_frag, p_ip4_expected):
504         self.assertFalse(p4_frag.haslayer(IPv6))
505         self.assertTrue(p4_frag.haslayer(IP))
506         self.assertTrue(p4_frag[IP].frag != 0 or p4_frag[IP].flags.MF)
507         self.assertEqual(p4_frag[IP].src, p_ip4_expected.src)
508         self.assertEqual(p4_frag[IP].dst, p_ip4_expected.dst)
509
510     def validate_frag_payload_len4(self, rx, proto, payload_len_expected):
511         payload_total = 0
512         for p in rx:
513             payload_total += len(p[IP].payload)
514
515         # First fragment has proto
516         payload_total -= len(proto())
517
518         self.assertEqual(payload_total, payload_len_expected)
519
520     def payload(self, len):
521         return 'x' * len
522
523     def test_map_t(self):
524         """ MAP-T """
525
526         #
527         # Add a domain that maps from pg0 to pg1
528         #
529         map_dst = '2001:db8::/32'
530         map_src = '1234:5678:90ab:cdef::/64'
531         ip4_pfx = '192.168.0.0/24'
532         tag = 'MAP-T Tag.'
533
534         self.vapi.map_add_domain(ip6_prefix=map_dst,
535                                  ip4_prefix=ip4_pfx,
536                                  ip6_src=map_src,
537                                  ea_bits_len=16,
538                                  psid_offset=6,
539                                  psid_length=4,
540                                  mtu=1500,
541                                  tag=tag)
542
543         # Enable MAP-T on interfaces.
544         self.vapi.map_if_enable_disable(is_enable=1,
545                                         sw_if_index=self.pg0.sw_if_index,
546                                         is_translation=1)
547         self.vapi.map_if_enable_disable(is_enable=1,
548                                         sw_if_index=self.pg1.sw_if_index,
549                                         is_translation=1)
550
551         # Ensure MAP doesn't steal all packets!
552         v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
553               IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
554               UDP(sport=20000, dport=10000) /
555               Raw(b'\xa5' * 100))
556         rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
557         v4_reply = v4[1]
558         v4_reply.ttl -= 1
559         for p in rx:
560             self.validate(p[1], v4_reply)
561         # Ensure MAP doesn't steal all packets
562         v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
563               IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
564               UDP(sport=20000, dport=10000) /
565               Raw(b'\xa5' * 100))
566         rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
567         v6_reply = v6[1]
568         v6_reply.hlim -= 1
569         for p in rx:
570             self.validate(p[1], v6_reply)
571
572         map_route = VppIpRoute(self,
573                                "2001:db8::",
574                                32,
575                                [VppRoutePath(self.pg1.remote_ip6,
576                                              self.pg1.sw_if_index,
577                                              proto=DpoProto.DPO_PROTO_IP6)])
578         map_route.add_vpp_config()
579
580         #
581         # Send a v4 packet that will be translated
582         #
583         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
584         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
585         payload = TCP(sport=0xabcd, dport=0xabcd)
586
587         p4 = (p_ether / p_ip4 / payload)
588         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
589                               dst="2001:db8:1f0::c0a8:1:f") / payload)
590         p6_translated.hlim -= 1
591         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
592         for p in rx:
593             self.validate(p[1], p6_translated)
594
595         # Send back an IPv6 packet that will be "untranslated"
596         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
597         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
598                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
599         p6 = (p_ether6 / p_ip6 / payload)
600         p4_translated = (IP(src='192.168.0.1',
601                             dst=self.pg0.remote_ip4) / payload)
602         p4_translated.id = 0
603         p4_translated.ttl -= 1
604         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
605         for p in rx:
606             self.validate(p[1], p4_translated)
607
608         # IPv4 TTL=0
609         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
610         p4 = (p_ether / ip4_ttl_expired / payload)
611
612         icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
613                           dst=self.pg0.remote_ip4) /
614                        ICMP(type='time-exceeded',
615                             code='ttl-zero-during-transit') /
616                        IP(src=self.pg0.remote_ip4,
617                           dst='192.168.0.1', ttl=0) / payload)
618         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
619         for p in rx:
620             self.validate(p[1], icmp4_reply)
621
622         # IPv4 TTL=1
623         ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
624         p4 = (p_ether / ip4_ttl_expired / payload)
625
626         icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
627                           dst=self.pg0.remote_ip4) /
628                        ICMP(type='time-exceeded',
629                             code='ttl-zero-during-transit') /
630                        IP(src=self.pg0.remote_ip4,
631                           dst='192.168.0.1', ttl=1) / payload)
632         rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
633         for p in rx:
634             self.validate(p[1], icmp4_reply)
635
636         # IPv6 Hop limit at BR
637         ip6_hlim_expired = IPv6(hlim=1, src='2001:db8:1ab::c0a8:1:ab',
638                                 dst='1234:5678:90ab:cdef:ac:1001:200:0')
639         p6 = (p_ether6 / ip6_hlim_expired / payload)
640
641         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
642                             dst="2001:db8:1ab::c0a8:1:ab") /
643                        ICMPv6TimeExceeded(code=0) /
644                        IPv6(src="2001:db8:1ab::c0a8:1:ab",
645                             dst='1234:5678:90ab:cdef:ac:1001:200:0',
646                             hlim=1) / payload)
647         rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
648         for p in rx:
649             self.validate(p[1], icmp6_reply)
650
651         # IPv6 Hop limit beyond BR
652         ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
653                                 dst='1234:5678:90ab:cdef:ac:1001:200:0')
654         p6 = (p_ether6 / ip6_hlim_expired / payload)
655
656         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
657                             dst="2001:db8:1ab::c0a8:1:ab") /
658                        ICMPv6TimeExceeded(code=0) /
659                        IPv6(src="2001:db8:1ab::c0a8:1:ab",
660                             dst='1234:5678:90ab:cdef:ac:1001:200:0',
661                             hlim=0) / payload)
662         rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
663         for p in rx:
664             self.validate(p[1], icmp6_reply)
665
666         # IPv4 Well-known port
667         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
668         payload = UDP(sport=200, dport=200)
669         p4 = (p_ether / p_ip4 / payload)
670         self.send_and_assert_no_replies(self.pg0, p4*1)
671
672         # IPv6 Well-known port
673         payload = UDP(sport=200, dport=200)
674         p6 = (p_ether6 / p_ip6 / payload)
675         self.send_and_assert_no_replies(self.pg1, p6*1)
676
677         # UDP packet fragmentation
678         payload_len = 1453
679         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
680         p4 = (p_ether / p_ip4 / payload)
681         self.pg_enable_capture()
682         self.pg0.add_stream(p4)
683         self.pg_start()
684         rx = self.pg1.get_capture(2)
685
686         p_ip6_translated = IPv6(src='1234:5678:90ab:cdef:ac:1001:200:0',
687                                 dst='2001:db8:1e0::c0a8:1:e')
688         for p in rx:
689             self.validate_frag6(p, p_ip6_translated)
690
691         self.validate_frag_payload_len6(rx, UDP, payload_len)
692
693         # UDP packet fragmentation send fragments
694         payload_len = 1453
695         payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
696         p4 = (p_ether / p_ip4 / payload)
697         frags = fragment_rfc791(p4, fragsize=1000)
698         self.pg_enable_capture()
699         self.pg0.add_stream(frags)
700         self.pg_start()
701         rx = self.pg1.get_capture(2)
702
703         for p in rx:
704             self.validate_frag6(p, p_ip6_translated)
705
706         self.validate_frag_payload_len6(rx, UDP, payload_len)
707
708         # Send back an fragmented IPv6 UDP packet that will be "untranslated"
709         payload = UDP(sport=4000, dport=40000) / self.payload(payload_len)
710         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
711         p_ip6 = IPv6(src='2001:db8:1e0::c0a8:1:e',
712                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
713         p6 = (p_ether6 / p_ip6 / payload)
714         frags6 = fragment_rfc8200(p6, identification=0xdcba, fragsize=1000)
715
716         p_ip4_translated = IP(src='192.168.0.1', dst=self.pg0.remote_ip4)
717         p4_translated = (p_ip4_translated / payload)
718         p4_translated.id = 0
719         p4_translated.ttl -= 1
720
721         self.pg_enable_capture()
722         self.pg1.add_stream(frags6)
723         self.pg_start()
724         rx = self.pg0.get_capture(2)
725
726         for p in rx:
727             self.validate_frag4(p, p4_translated)
728
729         self.validate_frag_payload_len4(rx, UDP, payload_len)
730
731         # ICMP packet fragmentation
732         payload = ICMP(id=6529) / self.payload(payload_len)
733         p4 = (p_ether / p_ip4 / payload)
734         self.pg_enable_capture()
735         self.pg0.add_stream(p4)
736         self.pg_start()
737         rx = self.pg1.get_capture(2)
738
739         p_ip6_translated = IPv6(src='1234:5678:90ab:cdef:ac:1001:200:0',
740                                 dst='2001:db8:160::c0a8:1:6')
741         for p in rx:
742             self.validate_frag6(p, p_ip6_translated)
743
744         self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len)
745
746         # ICMP packet fragmentation send fragments
747         payload = ICMP(id=6529) / self.payload(payload_len)
748         p4 = (p_ether / p_ip4 / payload)
749         frags = fragment_rfc791(p4, fragsize=1000)
750         self.pg_enable_capture()
751         self.pg0.add_stream(frags)
752         self.pg_start()
753         rx = self.pg1.get_capture(2)
754
755         for p in rx:
756             self.validate_frag6(p, p_ip6_translated)
757
758         self.validate_frag_payload_len6(rx, ICMPv6EchoRequest, payload_len)
759
760         # TCP MSS clamping
761         self.vapi.map_param_set_tcp(1300)
762
763         #
764         # Send a v4 TCP SYN packet that will be translated and MSS clamped
765         #
766         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
767         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
768         payload = TCP(sport=0xabcd, dport=0xabcd, flags="S",
769                       options=[('MSS', 1460)])
770
771         p4 = (p_ether / p_ip4 / payload)
772         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
773                               dst="2001:db8:1f0::c0a8:1:f") / payload)
774         p6_translated.hlim -= 1
775         p6_translated[TCP].options = [('MSS', 1300)]
776         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
777         for p in rx:
778             self.validate(p[1], p6_translated)
779
780         # Send back an IPv6 packet that will be "untranslated"
781         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
782         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
783                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
784         p6 = (p_ether6 / p_ip6 / payload)
785         p4_translated = (IP(src='192.168.0.1',
786                             dst=self.pg0.remote_ip4) / payload)
787         p4_translated.id = 0
788         p4_translated.ttl -= 1
789         p4_translated[TCP].options = [('MSS', 1300)]
790         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
791         for p in rx:
792             self.validate(p[1], p4_translated)
793
794         # TCP MSS clamping cleanup
795         self.vapi.map_param_set_tcp(0)
796
797         # Enable icmp6 param to get back ICMPv6 unreachable messages in case
798         # of security check fails
799         self.vapi.map_param_set_icmp6(enable_unreachable=1)
800
801         # Send back an IPv6 packet that will be droppped due to security
802         # check fail
803         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
804         p_ip6_sec_check_fail = IPv6(src='2001:db8:1fe::c0a8:1:f',
805                                     dst='1234:5678:90ab:cdef:ac:1001:200:0')
806         payload = TCP(sport=0xabcd, dport=0xabcd)
807         p6 = (p_ether6 / p_ip6_sec_check_fail / payload)
808
809         self.pg_send(self.pg1, p6*1)
810         self.pg0.get_capture(0, timeout=1)
811         rx = self.pg1.get_capture(1)
812
813         icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
814                             dst='2001:db8:1fe::c0a8:1:f') /
815                        ICMPv6DestUnreach(code=5) /
816                        p_ip6_sec_check_fail / payload)
817
818         for p in rx:
819             self.validate(p[1], icmp6_reply)
820
821         # ICMPv6 unreachable messages cleanup
822         self.vapi.map_param_set_icmp6(enable_unreachable=0)
823
824     def test_map_t_ip6_psid(self):
825         """ MAP-T v6->v4 PSID validation"""
826
827         #
828         # Add a domain that maps from pg0 to pg1
829         #
830         map_dst = '2001:db8::/32'
831         map_src = '1234:5678:90ab:cdef::/64'
832         ip4_pfx = '192.168.0.0/24'
833         tag = 'MAP-T Test Domain'
834
835         self.vapi.map_add_domain(ip6_prefix=map_dst,
836                                  ip4_prefix=ip4_pfx,
837                                  ip6_src=map_src,
838                                  ea_bits_len=16,
839                                  psid_offset=6,
840                                  psid_length=4,
841                                  mtu=1500,
842                                  tag=tag)
843
844         # Enable MAP-T on interfaces.
845         self.vapi.map_if_enable_disable(is_enable=1,
846                                         sw_if_index=self.pg0.sw_if_index,
847                                         is_translation=1)
848         self.vapi.map_if_enable_disable(is_enable=1,
849                                         sw_if_index=self.pg1.sw_if_index,
850                                         is_translation=1)
851
852         map_route = VppIpRoute(self,
853                                "2001:db8::",
854                                32,
855                                [VppRoutePath(self.pg1.remote_ip6,
856                                              self.pg1.sw_if_index,
857                                              proto=DpoProto.DPO_PROTO_IP6)])
858         map_route.add_vpp_config()
859
860         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
861         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
862                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
863
864         # Send good IPv6 source port, ensure translated IPv4 received
865         payload = TCP(sport=0xabcd, dport=80)
866         p6 = (p_ether6 / p_ip6 / payload)
867         p4_translated = (IP(src='192.168.0.1',
868                             dst=self.pg0.remote_ip4) / payload)
869         p4_translated.id = 0
870         p4_translated.ttl -= 1
871         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
872         for p in rx:
873             self.validate(p[1], p4_translated)
874
875         # Send bad IPv6 source port, ensure translated IPv4 not received
876         payload = TCP(sport=0xdcba, dport=80)
877         p6 = (p_ether6 / p_ip6 / payload)
878         self.send_and_assert_no_replies(self.pg1, p6*1)
879
880     def test_map_t_pre_resolve(self):
881         """ MAP-T pre-resolve"""
882
883         # Add a domain that maps from pg0 to pg1
884         map_dst = '2001:db8::/32'
885         map_src = '1234:5678:90ab:cdef::/64'
886         ip4_pfx = '192.168.0.0/24'
887         tag = 'MAP-T Test Domain.'
888
889         self.vapi.map_add_domain(ip6_prefix=map_dst,
890                                  ip4_prefix=ip4_pfx,
891                                  ip6_src=map_src,
892                                  ea_bits_len=16,
893                                  psid_offset=6,
894                                  psid_length=4,
895                                  mtu=1500,
896                                  tag=tag)
897
898         # Enable MAP-T on interfaces.
899         self.vapi.map_if_enable_disable(is_enable=1,
900                                         sw_if_index=self.pg0.sw_if_index,
901                                         is_translation=1)
902         self.vapi.map_if_enable_disable(is_enable=1,
903                                         sw_if_index=self.pg1.sw_if_index,
904                                         is_translation=1)
905
906         # Enable pre-resolve option
907         self.vapi.map_param_add_del_pre_resolve(ip4_nh_address="10.1.2.3",
908                                                 ip6_nh_address="4001::1",
909                                                 is_add=1)
910
911         # Add a route to 4001::1 and expect the translated traffic to be
912         # sent via that route next-hop.
913         pre_res_route6 = VppIpRoute(self, "4001::1", 128,
914                                     [VppRoutePath(self.pg1.remote_hosts[2].ip6,
915                                                   self.pg1.sw_if_index)])
916         pre_res_route6.add_vpp_config()
917
918         # Add a route to 10.1.2.3 and expect the "untranslated" traffic to be
919         # sent via that route next-hop.
920         pre_res_route4 = VppIpRoute(self, "10.1.2.3", 32,
921                                     [VppRoutePath(self.pg0.remote_hosts[1].ip4,
922                                                   self.pg0.sw_if_index)])
923         pre_res_route4.add_vpp_config()
924
925         # Send an IPv4 packet that will be translated
926         p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
927         p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
928         payload = TCP(sport=0xabcd, dport=0xabcd)
929         p4 = (p_ether / p_ip4 / payload)
930
931         p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
932                               dst="2001:db8:1f0::c0a8:1:f") / payload)
933         p6_translated.hlim -= 1
934
935         rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
936         for p in rx:
937             self.assertEqual(p[Ether].dst, self.pg1.remote_hosts[2].mac)
938             self.validate(p[1], p6_translated)
939
940         # Send back an IPv6 packet that will be "untranslated"
941         p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
942         p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
943                      dst='1234:5678:90ab:cdef:ac:1001:200:0')
944         p6 = (p_ether6 / p_ip6 / payload)
945
946         p4_translated = (IP(src='192.168.0.1',
947                             dst=self.pg0.remote_ip4) / payload)
948         p4_translated.id = 0
949         p4_translated.ttl -= 1
950
951         rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
952         for p in rx:
953             self.assertEqual(p[Ether].dst, self.pg0.remote_hosts[1].mac)
954             self.validate(p[1], p4_translated)
955
956         # Cleanup pre-resolve option
957         self.vapi.map_param_add_del_pre_resolve(ip4_nh_address="10.1.2.3",
958                                                 ip6_nh_address="4001::1",
959                                                 is_add=0)
960
961
962 if __name__ == '__main__':
963     unittest.main(testRunner=VppTestRunner)