ip: fix buffer leaks in reassembly
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python3
2
3 import socket
4 from socket import inet_pton, inet_ntop
5 import unittest
6
7 from parameterized import parameterized
8 import scapy.compat
9 import scapy.layers.inet6 as inet6
10 from scapy.layers.inet import UDP, IP
11 from scapy.contrib.mpls import MPLS
12 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
13     ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
14     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
15     ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, \
16     IPv6ExtHdrHopByHop, ICMPv6MLReport2, ICMPv6MLDMultAddrRec
17 from scapy.layers.l2 import Ether, Dot1Q, GRE
18 from scapy.packet import Raw
19 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
20     in6_mactoifaceid
21 from six import moves
22
23 from framework import VppTestCase, VppTestRunner, tag_run_solo
24 from util import ppp, ip6_normalize, mk_ll_addr
25 from vpp_papi import VppEnum
26 from vpp_ip import DpoProto, VppIpPuntPolicer, VppIpPuntRedirect, VppIpPathMtu
27 from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
28     VppMRoutePath, VppMplsIpBind, \
29     VppMplsRoute, VppMplsTable, VppIpTable, FibPathType, FibPathProto, \
30     VppIpInterfaceAddress, find_route_in_dump, find_mroute_in_dump, \
31     VppIp6LinkLocalAddress, VppIpRouteV2
32 from vpp_neighbor import find_nbr, VppNeighbor
33 from vpp_ipip_tun_interface import VppIpIpTunInterface
34 from vpp_pg_interface import is_ipv6_misc
35 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
36 from vpp_policer import VppPolicer, PolicerAction
37 from ipaddress import IPv6Network, IPv6Address
38 from vpp_gre_interface import VppGreInterface
39 from vpp_teib import VppTeib
40
41 AF_INET6 = socket.AF_INET6
42
43 try:
44     text_type = unicode
45 except NameError:
46     text_type = str
47
48 NUM_PKTS = 67
49
50
51 class TestIPv6ND(VppTestCase):
52     def validate_ra(self, intf, rx, dst_ip=None):
53         if not dst_ip:
54             dst_ip = intf.remote_ip6
55
56         # unicasted packets must come to the unicast mac
57         self.assertEqual(rx[Ether].dst, intf.remote_mac)
58
59         # and from the router's MAC
60         self.assertEqual(rx[Ether].src, intf.local_mac)
61
62         # the rx'd RA should be addressed to the sender's source
63         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
64         self.assertEqual(in6_ptop(rx[IPv6].dst),
65                          in6_ptop(dst_ip))
66
67         # and come from the router's link local
68         self.assertTrue(in6_islladdr(rx[IPv6].src))
69         self.assertEqual(in6_ptop(rx[IPv6].src),
70                          in6_ptop(mk_ll_addr(intf.local_mac)))
71
72     def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
73         if not dst_ip:
74             dst_ip = intf.remote_ip6
75         if not tgt_ip:
76             dst_ip = intf.local_ip6
77
78         # unicasted packets must come to the unicast mac
79         self.assertEqual(rx[Ether].dst, intf.remote_mac)
80
81         # and from the router's MAC
82         self.assertEqual(rx[Ether].src, intf.local_mac)
83
84         # the rx'd NA should be addressed to the sender's source
85         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
86         self.assertEqual(in6_ptop(rx[IPv6].dst),
87                          in6_ptop(dst_ip))
88
89         # and come from the target address
90         self.assertEqual(
91             in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
92
93         # Dest link-layer options should have the router's MAC
94         dll = rx[ICMPv6NDOptDstLLAddr]
95         self.assertEqual(dll.lladdr, intf.local_mac)
96
97     def validate_ns(self, intf, rx, tgt_ip):
98         nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
99         dst_ip = inet_ntop(AF_INET6, nsma)
100
101         # NS is broadcast
102         self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
103
104         # and from the router's MAC
105         self.assertEqual(rx[Ether].src, intf.local_mac)
106
107         # the rx'd NS should be addressed to an mcast address
108         # derived from the target address
109         self.assertEqual(
110             in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
111
112         # expect the tgt IP in the NS header
113         ns = rx[ICMPv6ND_NS]
114         self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
115
116         # packet is from the router's local address
117         self.assertEqual(
118             in6_ptop(rx[IPv6].src), intf.local_ip6)
119
120         # Src link-layer options should have the router's MAC
121         sll = rx[ICMPv6NDOptSrcLLAddr]
122         self.assertEqual(sll.lladdr, intf.local_mac)
123
124     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
125                            filter_out_fn=is_ipv6_misc):
126         intf.add_stream(pkts)
127         self.pg_enable_capture(self.pg_interfaces)
128         self.pg_start()
129         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
130
131         self.assertEqual(len(rx), 1)
132         rx = rx[0]
133         self.validate_ra(intf, rx, dst_ip)
134
135     def send_and_expect_na(self, intf, pkts, remark, dst_ip=None,
136                            tgt_ip=None,
137                            filter_out_fn=is_ipv6_misc):
138         intf.add_stream(pkts)
139         self.pg_enable_capture(self.pg_interfaces)
140         self.pg_start()
141         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
142
143         self.assertEqual(len(rx), 1)
144         rx = rx[0]
145         self.validate_na(intf, rx, dst_ip, tgt_ip)
146
147     def send_and_expect_ns(self, tx_intf, rx_intf, pkts, tgt_ip,
148                            filter_out_fn=is_ipv6_misc):
149         self.vapi.cli("clear trace")
150         tx_intf.add_stream(pkts)
151         self.pg_enable_capture(self.pg_interfaces)
152         self.pg_start()
153         rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
154
155         self.assertEqual(len(rx), 1)
156         rx = rx[0]
157         self.validate_ns(rx_intf, rx, tgt_ip)
158
159     def verify_ip(self, rx, smac, dmac, sip, dip):
160         ether = rx[Ether]
161         self.assertEqual(ether.dst, dmac)
162         self.assertEqual(ether.src, smac)
163
164         ip = rx[IPv6]
165         self.assertEqual(ip.src, sip)
166         self.assertEqual(ip.dst, dip)
167
168
169 @tag_run_solo
170 class TestIPv6(TestIPv6ND):
171     """ IPv6 Test Case """
172
173     @classmethod
174     def setUpClass(cls):
175         super(TestIPv6, cls).setUpClass()
176
177     @classmethod
178     def tearDownClass(cls):
179         super(TestIPv6, cls).tearDownClass()
180
181     def setUp(self):
182         """
183         Perform test setup before test case.
184
185         **Config:**
186             - create 3 pg interfaces
187                 - untagged pg0 interface
188                 - Dot1Q subinterface on pg1
189                 - Dot1AD subinterface on pg2
190             - setup interfaces:
191                 - put it into UP state
192                 - set IPv6 addresses
193                 - resolve neighbor address using NDP
194             - configure 200 fib entries
195
196         :ivar list interfaces: pg interfaces and subinterfaces.
197         :ivar dict flows: IPv4 packet flows in test.
198
199         *TODO:* Create AD sub interface
200         """
201         super(TestIPv6, self).setUp()
202
203         # create 3 pg interfaces
204         self.create_pg_interfaces(range(3))
205
206         # create 2 subinterfaces for p1 and pg2
207         self.sub_interfaces = [
208             VppDot1QSubint(self, self.pg1, 100),
209             VppDot1QSubint(self, self.pg2, 200)
210             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
211         ]
212
213         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
214         self.flows = dict()
215         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
216         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
217         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
218
219         # packet sizes
220         self.pg_if_packet_sizes = [64, 1500, 9020]
221
222         self.interfaces = list(self.pg_interfaces)
223         self.interfaces.extend(self.sub_interfaces)
224
225         # setup all interfaces
226         for i in self.interfaces:
227             i.admin_up()
228             i.config_ip6()
229             i.resolve_ndp()
230
231     def tearDown(self):
232         """Run standard test teardown and log ``show ip6 neighbors``."""
233         for i in self.interfaces:
234             i.unconfig_ip6()
235             i.admin_down()
236         for i in self.sub_interfaces:
237             i.remove_vpp_config()
238
239         super(TestIPv6, self).tearDown()
240         if not self.vpp_dead:
241             self.logger.info(self.vapi.cli("show ip6 neighbors"))
242             # info(self.vapi.cli("show ip6 fib"))  # many entries
243
244     def modify_packet(self, src_if, packet_size, pkt):
245         """Add load, set destination IP and extend packet to required packet
246         size for defined interface.
247
248         :param VppInterface src_if: Interface to create packet for.
249         :param int packet_size: Required packet size.
250         :param Scapy pkt: Packet to be modified.
251         """
252         dst_if_idx = int(packet_size / 10 % 2)
253         dst_if = self.flows[src_if][dst_if_idx]
254         info = self.create_packet_info(src_if, dst_if)
255         payload = self.info_to_payload(info)
256         p = pkt / Raw(payload)
257         p[IPv6].dst = dst_if.remote_ip6
258         info.data = p.copy()
259         if isinstance(src_if, VppSubInterface):
260             p = src_if.add_dot1_layer(p)
261         self.extend_packet(p, packet_size)
262
263         return p
264
265     def create_stream(self, src_if):
266         """Create input packet stream for defined interface.
267
268         :param VppInterface src_if: Interface to create packet stream for.
269         """
270         hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
271         pkt_tmpl = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
272                     IPv6(src=src_if.remote_ip6) /
273                     inet6.UDP(sport=1234, dport=1234))
274
275         pkts = [self.modify_packet(src_if, i, pkt_tmpl)
276                 for i in moves.range(self.pg_if_packet_sizes[0],
277                                      self.pg_if_packet_sizes[1], 10)]
278         pkts_b = [self.modify_packet(src_if, i, pkt_tmpl)
279                   for i in moves.range(self.pg_if_packet_sizes[1] + hdr_ext,
280                                        self.pg_if_packet_sizes[2] + hdr_ext,
281                                        50)]
282         pkts.extend(pkts_b)
283
284         return pkts
285
286     def verify_capture(self, dst_if, capture):
287         """Verify captured input packet stream for defined interface.
288
289         :param VppInterface dst_if: Interface to verify captured packet stream
290                                     for.
291         :param list capture: Captured packet stream.
292         """
293         self.logger.info("Verifying capture on interface %s" % dst_if.name)
294         last_info = dict()
295         for i in self.interfaces:
296             last_info[i.sw_if_index] = None
297         is_sub_if = False
298         dst_sw_if_index = dst_if.sw_if_index
299         if hasattr(dst_if, 'parent'):
300             is_sub_if = True
301         for packet in capture:
302             if is_sub_if:
303                 # Check VLAN tags and Ethernet header
304                 packet = dst_if.remove_dot1_layer(packet)
305             self.assertTrue(Dot1Q not in packet)
306             try:
307                 ip = packet[IPv6]
308                 udp = packet[inet6.UDP]
309                 payload_info = self.payload_to_info(packet[Raw])
310                 packet_index = payload_info.index
311                 self.assertEqual(payload_info.dst, dst_sw_if_index)
312                 self.logger.debug(
313                     "Got packet on port %s: src=%u (id=%u)" %
314                     (dst_if.name, payload_info.src, packet_index))
315                 next_info = self.get_next_packet_info_for_interface2(
316                     payload_info.src, dst_sw_if_index,
317                     last_info[payload_info.src])
318                 last_info[payload_info.src] = next_info
319                 self.assertTrue(next_info is not None)
320                 self.assertEqual(packet_index, next_info.index)
321                 saved_packet = next_info.data
322                 # Check standard fields
323                 self.assertEqual(
324                     ip.src, saved_packet[IPv6].src)
325                 self.assertEqual(
326                     ip.dst, saved_packet[IPv6].dst)
327                 self.assertEqual(
328                     udp.sport, saved_packet[inet6.UDP].sport)
329                 self.assertEqual(
330                     udp.dport, saved_packet[inet6.UDP].dport)
331             except:
332                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
333                 raise
334         for i in self.interfaces:
335             remaining_packet = self.get_next_packet_info_for_interface2(
336                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
337             self.assertTrue(remaining_packet is None,
338                             "Interface %s: Packet expected from interface %s "
339                             "didn't arrive" % (dst_if.name, i.name))
340
341     def test_next_header_anomaly(self):
342         """ IPv6 next header anomaly test
343
344         Test scenario:
345             - ipv6 next header field = Fragment Header (44)
346             - next header is ICMPv6 Echo Request
347             - wait for reassembly
348         """
349         pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
350                IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44) /
351                ICMPv6EchoRequest())
352
353         self.pg0.add_stream(pkt)
354         self.pg_start()
355
356         # wait for reassembly
357         self.sleep(10)
358
359     def test_fib(self):
360         """ IPv6 FIB test
361
362         Test scenario:
363             - Create IPv6 stream for pg0 interface
364             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
365             - Send and verify received packets on each interface.
366         """
367
368         pkts = self.create_stream(self.pg0)
369         self.pg0.add_stream(pkts)
370
371         for i in self.sub_interfaces:
372             pkts = self.create_stream(i)
373             i.parent.add_stream(pkts)
374
375         self.pg_enable_capture(self.pg_interfaces)
376         self.pg_start()
377
378         pkts = self.pg0.get_capture()
379         self.verify_capture(self.pg0, pkts)
380
381         for i in self.sub_interfaces:
382             pkts = i.parent.get_capture()
383             self.verify_capture(i, pkts)
384
385     def test_ns(self):
386         """ IPv6 Neighbour Solicitation Exceptions
387
388         Test scenario:
389            - Send an NS Sourced from an address not covered by the link sub-net
390            - Send an NS to an mcast address the router has not joined
391            - Send NS for a target address the router does not onn.
392         """
393
394         #
395         # An NS from a non link source address
396         #
397         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
398         d = inet_ntop(AF_INET6, nsma)
399
400         p = (Ether(dst=in6_getnsmac(nsma)) /
401              IPv6(dst=d, src="2002::2") /
402              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
403              ICMPv6NDOptSrcLLAddr(
404                  lladdr=self.pg0.remote_mac))
405         pkts = [p]
406
407         self.send_and_assert_no_replies(
408             self.pg0, pkts,
409             "No response to NS source by address not on sub-net")
410
411         #
412         # An NS for sent to a solicited mcast group the router is
413         # not a member of FAILS
414         #
415         if 0:
416             nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
417             d = inet_ntop(AF_INET6, nsma)
418
419             p = (Ether(dst=in6_getnsmac(nsma)) /
420                  IPv6(dst=d, src=self.pg0.remote_ip6) /
421                  ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
422                  ICMPv6NDOptSrcLLAddr(
423                      lladdr=self.pg0.remote_mac))
424             pkts = [p]
425
426             self.send_and_assert_no_replies(
427                 self.pg0, pkts,
428                 "No response to NS sent to unjoined mcast address")
429
430         #
431         # An NS whose target address is one the router does not own
432         #
433         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
434         d = inet_ntop(AF_INET6, nsma)
435
436         p = (Ether(dst=in6_getnsmac(nsma)) /
437              IPv6(dst=d, src=self.pg0.remote_ip6) /
438              ICMPv6ND_NS(tgt="fd::ffff") /
439              ICMPv6NDOptSrcLLAddr(
440                  lladdr=self.pg0.remote_mac))
441         pkts = [p]
442
443         self.send_and_assert_no_replies(self.pg0, pkts,
444                                         "No response to NS for unknown target")
445
446         #
447         # A neighbor entry that has no associated FIB-entry
448         #
449         self.pg0.generate_remote_hosts(4)
450         nd_entry = VppNeighbor(self,
451                                self.pg0.sw_if_index,
452                                self.pg0.remote_hosts[2].mac,
453                                self.pg0.remote_hosts[2].ip6,
454                                is_no_fib_entry=1)
455         nd_entry.add_vpp_config()
456
457         #
458         # check we have the neighbor, but no route
459         #
460         self.assertTrue(find_nbr(self,
461                                  self.pg0.sw_if_index,
462                                  self.pg0._remote_hosts[2].ip6))
463         self.assertFalse(find_route(self,
464                                     self.pg0._remote_hosts[2].ip6,
465                                     128))
466
467         #
468         # send an NS from a link local address to the interface's global
469         # address
470         #
471         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
472              IPv6(
473                  dst=d, src=self.pg0._remote_hosts[2].ip6_ll) /
474              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
475              ICMPv6NDOptSrcLLAddr(
476                  lladdr=self.pg0.remote_mac))
477
478         self.send_and_expect_na(self.pg0, p,
479                                 "NS from link-local",
480                                 dst_ip=self.pg0._remote_hosts[2].ip6_ll,
481                                 tgt_ip=self.pg0.local_ip6)
482
483         #
484         # we should have learned an ND entry for the peer's link-local
485         # but not inserted a route to it in the FIB
486         #
487         self.assertTrue(find_nbr(self,
488                                  self.pg0.sw_if_index,
489                                  self.pg0._remote_hosts[2].ip6_ll))
490         self.assertFalse(find_route(self,
491                                     self.pg0._remote_hosts[2].ip6_ll,
492                                     128))
493
494         #
495         # An NS to the router's own Link-local
496         #
497         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
498              IPv6(
499                  dst=d, src=self.pg0._remote_hosts[3].ip6_ll) /
500              ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll) /
501              ICMPv6NDOptSrcLLAddr(
502                  lladdr=self.pg0.remote_mac))
503
504         self.send_and_expect_na(self.pg0, p,
505                                 "NS to/from link-local",
506                                 dst_ip=self.pg0._remote_hosts[3].ip6_ll,
507                                 tgt_ip=self.pg0.local_ip6_ll)
508
509         #
510         # do not respond to a NS for the peer's address
511         #
512         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
513              IPv6(dst=d,
514                   src=self.pg0._remote_hosts[3].ip6_ll) /
515              ICMPv6ND_NS(tgt=self.pg0._remote_hosts[3].ip6_ll) /
516              ICMPv6NDOptSrcLLAddr(
517                  lladdr=self.pg0.remote_mac))
518
519         self.send_and_assert_no_replies(self.pg0, p)
520
521         #
522         # we should have learned an ND entry for the peer's link-local
523         # but not inserted a route to it in the FIB
524         #
525         self.assertTrue(find_nbr(self,
526                                  self.pg0.sw_if_index,
527                                  self.pg0._remote_hosts[3].ip6_ll))
528         self.assertFalse(find_route(self,
529                                     self.pg0._remote_hosts[3].ip6_ll,
530                                     128))
531
532     def test_ns_duplicates(self):
533         """ ND Duplicates"""
534
535         #
536         # Generate some hosts on the LAN
537         #
538         self.pg1.generate_remote_hosts(3)
539
540         #
541         # Add host 1 on pg1 and pg2
542         #
543         ns_pg1 = VppNeighbor(self,
544                              self.pg1.sw_if_index,
545                              self.pg1.remote_hosts[1].mac,
546                              self.pg1.remote_hosts[1].ip6)
547         ns_pg1.add_vpp_config()
548         ns_pg2 = VppNeighbor(self,
549                              self.pg2.sw_if_index,
550                              self.pg2.remote_mac,
551                              self.pg1.remote_hosts[1].ip6)
552         ns_pg2.add_vpp_config()
553
554         #
555         # IP packet destined for pg1 remote host arrives on pg1 again.
556         #
557         p = (Ether(dst=self.pg0.local_mac,
558                    src=self.pg0.remote_mac) /
559              IPv6(src=self.pg0.remote_ip6,
560                   dst=self.pg1.remote_hosts[1].ip6) /
561              inet6.UDP(sport=1234, dport=1234) /
562              Raw())
563
564         self.pg0.add_stream(p)
565         self.pg_enable_capture(self.pg_interfaces)
566         self.pg_start()
567
568         rx1 = self.pg1.get_capture(1)
569
570         self.verify_ip(rx1[0],
571                        self.pg1.local_mac,
572                        self.pg1.remote_hosts[1].mac,
573                        self.pg0.remote_ip6,
574                        self.pg1.remote_hosts[1].ip6)
575
576         #
577         # remove the duplicate on pg1
578         # packet stream should generate NSs out of pg1
579         #
580         ns_pg1.remove_vpp_config()
581
582         self.send_and_expect_ns(self.pg0, self.pg1,
583                                 p, self.pg1.remote_hosts[1].ip6)
584
585         #
586         # Add it back
587         #
588         ns_pg1.add_vpp_config()
589
590         self.pg0.add_stream(p)
591         self.pg_enable_capture(self.pg_interfaces)
592         self.pg_start()
593
594         rx1 = self.pg1.get_capture(1)
595
596         self.verify_ip(rx1[0],
597                        self.pg1.local_mac,
598                        self.pg1.remote_hosts[1].mac,
599                        self.pg0.remote_ip6,
600                        self.pg1.remote_hosts[1].ip6)
601
602     def validate_ra(self, intf, rx, dst_ip=None, src_ip=None,
603                     mtu=9000, pi_opt=None):
604         if not dst_ip:
605             dst_ip = intf.remote_ip6
606         if not src_ip:
607             src_ip = mk_ll_addr(intf.local_mac)
608
609         # unicasted packets must come to the unicast mac
610         self.assertEqual(rx[Ether].dst, intf.remote_mac)
611
612         # and from the router's MAC
613         self.assertEqual(rx[Ether].src, intf.local_mac)
614
615         # the rx'd RA should be addressed to the sender's source
616         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
617         self.assertEqual(in6_ptop(rx[IPv6].dst),
618                          in6_ptop(dst_ip))
619
620         # and come from the router's link local
621         self.assertTrue(in6_islladdr(rx[IPv6].src))
622         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(src_ip))
623
624         # it should contain the links MTU
625         ra = rx[ICMPv6ND_RA]
626         self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
627
628         # it should contain the source's link layer address option
629         sll = ra[ICMPv6NDOptSrcLLAddr]
630         self.assertEqual(sll.lladdr, intf.local_mac)
631
632         if not pi_opt:
633             # the RA should not contain prefix information
634             self.assertFalse(ra.haslayer(
635                 ICMPv6NDOptPrefixInfo))
636         else:
637             raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
638
639             # the options are nested in the scapy packet in way that i cannot
640             # decipher how to decode. this 1st layer of option always returns
641             # nested classes, so a direct obj1=obj2 comparison always fails.
642             # however, the getlayer(.., 2) does give one instance.
643             # so we cheat here and construct a new opt instance for comparison
644             rd = ICMPv6NDOptPrefixInfo(
645                 prefixlen=raos.prefixlen,
646                 prefix=raos.prefix,
647                 L=raos.L,
648                 A=raos.A)
649             if type(pi_opt) is list:
650                 for ii in range(len(pi_opt)):
651                     self.assertEqual(pi_opt[ii], rd)
652                     rd = rx.getlayer(
653                         ICMPv6NDOptPrefixInfo, ii + 2)
654             else:
655                 self.assertEqual(pi_opt, raos, 'Expected: %s, received: %s'
656                                  % (pi_opt.show(dump=True),
657                                     raos.show(dump=True)))
658
659     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
660                            filter_out_fn=is_ipv6_misc,
661                            opt=None,
662                            src_ip=None):
663         self.vapi.cli("clear trace")
664         intf.add_stream(pkts)
665         self.pg_enable_capture(self.pg_interfaces)
666         self.pg_start()
667         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
668
669         self.assertEqual(len(rx), 1)
670         rx = rx[0]
671         self.validate_ra(intf, rx, dst_ip, src_ip=src_ip, pi_opt=opt)
672
673     def test_rs(self):
674         """ IPv6 Router Solicitation Exceptions
675
676         Test scenario:
677         """
678
679         self.pg0.ip6_ra_config(no=1, suppress=1)
680
681         #
682         # Before we begin change the IPv6 RA responses to use the unicast
683         # address - that way we will not confuse them with the periodic
684         # RAs which go to the mcast address
685         # Sit and wait for the first periodic RA.
686         #
687         # TODO
688         #
689         self.pg0.ip6_ra_config(send_unicast=1)
690
691         #
692         # An RS from a link source address
693         #  - expect an RA in return
694         #
695         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
696              IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
697              ICMPv6ND_RS())
698         pkts = [p]
699         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
700
701         #
702         # For the next RS sent the RA should be rate limited
703         #
704         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
705
706         #
707         # When we reconfigure the IPv6 RA config,
708         # we reset the RA rate limiting,
709         # so we need to do this before each test below so as not to drop
710         # packets for rate limiting reasons. Test this works here.
711         #
712         self.pg0.ip6_ra_config(send_unicast=1)
713         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
714
715         #
716         # An RS sent from a non-link local source
717         #
718         self.pg0.ip6_ra_config(send_unicast=1)
719         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
720              IPv6(dst=self.pg0.local_ip6,
721                   src="2002::ffff") /
722              ICMPv6ND_RS())
723         pkts = [p]
724         self.send_and_assert_no_replies(self.pg0, pkts,
725                                         "RS from non-link source")
726
727         #
728         # Source an RS from a link local address
729         #
730         self.pg0.ip6_ra_config(send_unicast=1)
731         ll = mk_ll_addr(self.pg0.remote_mac)
732         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
733              IPv6(dst=self.pg0.local_ip6, src=ll) /
734              ICMPv6ND_RS())
735         pkts = [p]
736         self.send_and_expect_ra(self.pg0, pkts,
737                                 "RS sourced from link-local",
738                                 dst_ip=ll)
739
740         #
741         # Source an RS from a link local address
742         # Ensure suppress also applies to solicited RS
743         #
744         self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
745         ll = mk_ll_addr(self.pg0.remote_mac)
746         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
747              IPv6(dst=self.pg0.local_ip6, src=ll) /
748              ICMPv6ND_RS())
749         pkts = [p]
750         self.send_and_assert_no_replies(self.pg0, pkts,
751                                         "Suppressed RS from link-local")
752
753         #
754         # Send the RS multicast
755         #
756         self.pg0.ip6_ra_config(no=1, suppress=1)  # Reset suppress flag to zero
757         self.pg0.ip6_ra_config(send_unicast=1)
758         dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
759         ll = mk_ll_addr(self.pg0.remote_mac)
760         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
761              IPv6(dst="ff02::2", src=ll) /
762              ICMPv6ND_RS())
763         pkts = [p]
764         self.send_and_expect_ra(self.pg0, pkts,
765                                 "RS sourced from link-local",
766                                 dst_ip=ll)
767
768         #
769         # Source from the unspecified address ::. This happens when the RS
770         # is sent before the host has a configured address/sub-net,
771         # i.e. auto-config. Since the sender has no IP address, the reply
772         # comes back mcast - so the capture needs to not filter this.
773         # If we happen to pick up the periodic RA at this point then so be it,
774         # it's not an error.
775         #
776         self.pg0.ip6_ra_config(send_unicast=1)
777         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
778              IPv6(dst="ff02::2", src="::") /
779              ICMPv6ND_RS())
780         pkts = [p]
781         self.send_and_expect_ra(self.pg0, pkts,
782                                 "RS sourced from unspecified",
783                                 dst_ip="ff02::1",
784                                 filter_out_fn=None)
785
786         #
787         # Configure The RA to announce the links prefix
788         #
789         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
790                                self.pg0.local_ip6_prefix_len))
791
792         #
793         # RAs should now contain the prefix information option
794         #
795         opt = ICMPv6NDOptPrefixInfo(
796             prefixlen=self.pg0.local_ip6_prefix_len,
797             prefix=self.pg0.local_ip6,
798             L=1,
799             A=1)
800
801         self.pg0.ip6_ra_config(send_unicast=1)
802         ll = mk_ll_addr(self.pg0.remote_mac)
803         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
804              IPv6(dst=self.pg0.local_ip6, src=ll) /
805              ICMPv6ND_RS())
806         self.send_and_expect_ra(self.pg0, p,
807                                 "RA with prefix-info",
808                                 dst_ip=ll,
809                                 opt=opt)
810
811         #
812         # Change the prefix info to not off-link
813         #  L-flag is clear
814         #
815         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
816                                self.pg0.local_ip6_prefix_len),
817                                off_link=1)
818
819         opt = ICMPv6NDOptPrefixInfo(
820             prefixlen=self.pg0.local_ip6_prefix_len,
821             prefix=self.pg0.local_ip6,
822             L=0,
823             A=1)
824
825         self.pg0.ip6_ra_config(send_unicast=1)
826         self.send_and_expect_ra(self.pg0, p,
827                                 "RA with Prefix info with L-flag=0",
828                                 dst_ip=ll,
829                                 opt=opt)
830
831         #
832         # Change the prefix info to not off-link, no-autoconfig
833         #  L and A flag are clear in the advert
834         #
835         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
836                                self.pg0.local_ip6_prefix_len),
837                                off_link=1,
838                                no_autoconfig=1)
839
840         opt = ICMPv6NDOptPrefixInfo(
841             prefixlen=self.pg0.local_ip6_prefix_len,
842             prefix=self.pg0.local_ip6,
843             L=0,
844             A=0)
845
846         self.pg0.ip6_ra_config(send_unicast=1)
847         self.send_and_expect_ra(self.pg0, p,
848                                 "RA with Prefix info with A & L-flag=0",
849                                 dst_ip=ll,
850                                 opt=opt)
851
852         #
853         # Change the flag settings back to the defaults
854         #  L and A flag are set in the advert
855         #
856         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
857                                self.pg0.local_ip6_prefix_len))
858
859         opt = ICMPv6NDOptPrefixInfo(
860             prefixlen=self.pg0.local_ip6_prefix_len,
861             prefix=self.pg0.local_ip6,
862             L=1,
863             A=1)
864
865         self.pg0.ip6_ra_config(send_unicast=1)
866         self.send_and_expect_ra(self.pg0, p,
867                                 "RA with Prefix info",
868                                 dst_ip=ll,
869                                 opt=opt)
870
871         #
872         # Change the prefix info to not off-link, no-autoconfig
873         #  L and A flag are clear in the advert
874         #
875         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
876                                self.pg0.local_ip6_prefix_len),
877                                off_link=1,
878                                no_autoconfig=1)
879
880         opt = ICMPv6NDOptPrefixInfo(
881             prefixlen=self.pg0.local_ip6_prefix_len,
882             prefix=self.pg0.local_ip6,
883             L=0,
884             A=0)
885
886         self.pg0.ip6_ra_config(send_unicast=1)
887         self.send_and_expect_ra(self.pg0, p,
888                                 "RA with Prefix info with A & L-flag=0",
889                                 dst_ip=ll,
890                                 opt=opt)
891
892         #
893         # Use the reset to defaults option to revert to defaults
894         #  L and A flag are clear in the advert
895         #
896         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
897                                self.pg0.local_ip6_prefix_len),
898                                use_default=1)
899
900         opt = ICMPv6NDOptPrefixInfo(
901             prefixlen=self.pg0.local_ip6_prefix_len,
902             prefix=self.pg0.local_ip6,
903             L=1,
904             A=1)
905
906         self.pg0.ip6_ra_config(send_unicast=1)
907         self.send_and_expect_ra(self.pg0, p,
908                                 "RA with Prefix reverted to defaults",
909                                 dst_ip=ll,
910                                 opt=opt)
911
912         #
913         # Advertise Another prefix. With no L-flag/A-flag
914         #
915         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
916                                self.pg1.local_ip6_prefix_len),
917                                off_link=1,
918                                no_autoconfig=1)
919
920         opt = [ICMPv6NDOptPrefixInfo(
921             prefixlen=self.pg0.local_ip6_prefix_len,
922             prefix=self.pg0.local_ip6,
923             L=1,
924             A=1),
925             ICMPv6NDOptPrefixInfo(
926                 prefixlen=self.pg1.local_ip6_prefix_len,
927                 prefix=self.pg1.local_ip6,
928                 L=0,
929                 A=0)]
930
931         self.pg0.ip6_ra_config(send_unicast=1)
932         ll = mk_ll_addr(self.pg0.remote_mac)
933         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
934              IPv6(dst=self.pg0.local_ip6, src=ll) /
935              ICMPv6ND_RS())
936         self.send_and_expect_ra(self.pg0, p,
937                                 "RA with multiple Prefix infos",
938                                 dst_ip=ll,
939                                 opt=opt)
940
941         #
942         # Remove the first prefix-info - expect the second is still in the
943         # advert
944         #
945         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
946                                self.pg0.local_ip6_prefix_len),
947                                is_no=1)
948
949         opt = ICMPv6NDOptPrefixInfo(
950             prefixlen=self.pg1.local_ip6_prefix_len,
951             prefix=self.pg1.local_ip6,
952             L=0,
953             A=0)
954
955         self.pg0.ip6_ra_config(send_unicast=1)
956         self.send_and_expect_ra(self.pg0, p,
957                                 "RA with Prefix reverted to defaults",
958                                 dst_ip=ll,
959                                 opt=opt)
960
961         #
962         # Remove the second prefix-info - expect no prefix-info in the adverts
963         #
964         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
965                                self.pg1.local_ip6_prefix_len),
966                                is_no=1)
967
968         #
969         # change the link's link local, so we know that works too.
970         #
971         self.vapi.sw_interface_ip6_set_link_local_address(
972             sw_if_index=self.pg0.sw_if_index,
973             ip="fe80::88")
974
975         self.pg0.ip6_ra_config(send_unicast=1)
976         self.send_and_expect_ra(self.pg0, p,
977                                 "RA with Prefix reverted to defaults",
978                                 dst_ip=ll,
979                                 src_ip="fe80::88")
980
981         #
982         # Reset the periodic advertisements back to default values
983         #
984         self.pg0.ip6_ra_config(suppress=1)
985         self.pg0.ip6_ra_config(no=1, send_unicast=1)
986
987     def test_mld(self):
988         """ MLD Report """
989         #
990         # test one MLD is sent after applying an IPv6 Address on an interface
991         #
992         self.pg_enable_capture(self.pg_interfaces)
993         self.pg_start()
994
995         subitf = VppDot1QSubint(self, self.pg1, 99)
996
997         subitf.admin_up()
998         subitf.config_ip6()
999
1000         rxs = self.pg1._get_capture(timeout=4, filter_out_fn=None)
1001
1002         #
1003         # hunt for the MLD on vlan 99
1004         #
1005         for rx in rxs:
1006             # make sure ipv6 packets with hop by hop options have
1007             # correct checksums
1008             self.assert_packet_checksums_valid(rx)
1009             if rx.haslayer(IPv6ExtHdrHopByHop) and \
1010                rx.haslayer(Dot1Q) and \
1011                rx[Dot1Q].vlan == 99:
1012                 mld = rx[ICMPv6MLReport2]
1013
1014         self.assertEqual(mld.records_number, 4)
1015
1016
1017 class TestIPv6RouteLookup(VppTestCase):
1018     """ IPv6 Route Lookup Test Case """
1019     routes = []
1020
1021     def route_lookup(self, prefix, exact):
1022         return self.vapi.api(self.vapi.papi.ip_route_lookup,
1023                              {
1024                                  'table_id': 0,
1025                                  'exact': exact,
1026                                  'prefix': prefix,
1027                              })
1028
1029     @classmethod
1030     def setUpClass(cls):
1031         super(TestIPv6RouteLookup, cls).setUpClass()
1032
1033     @classmethod
1034     def tearDownClass(cls):
1035         super(TestIPv6RouteLookup, cls).tearDownClass()
1036
1037     def setUp(self):
1038         super(TestIPv6RouteLookup, self).setUp()
1039
1040         drop_nh = VppRoutePath("::1", 0xffffffff,
1041                                type=FibPathType.FIB_PATH_TYPE_DROP)
1042
1043         # Add 3 routes
1044         r = VppIpRoute(self, "2001:1111::", 32, [drop_nh])
1045         r.add_vpp_config()
1046         self.routes.append(r)
1047
1048         r = VppIpRoute(self, "2001:1111:2222::", 48, [drop_nh])
1049         r.add_vpp_config()
1050         self.routes.append(r)
1051
1052         r = VppIpRoute(self, "2001:1111:2222::1", 128, [drop_nh])
1053         r.add_vpp_config()
1054         self.routes.append(r)
1055
1056     def tearDown(self):
1057         # Remove the routes we added
1058         for r in self.routes:
1059             r.remove_vpp_config()
1060
1061         super(TestIPv6RouteLookup, self).tearDown()
1062
1063     def test_exact_match(self):
1064         # Verify we find the host route
1065         prefix = "2001:1111:2222::1/128"
1066         result = self.route_lookup(prefix, True)
1067         assert (prefix == str(result.route.prefix))
1068
1069         # Verify we find a middle prefix route
1070         prefix = "2001:1111:2222::/48"
1071         result = self.route_lookup(prefix, True)
1072         assert (prefix == str(result.route.prefix))
1073
1074         # Verify we do not find an available LPM.
1075         with self.vapi.assert_negative_api_retval():
1076             self.route_lookup("2001::2/128", True)
1077
1078     def test_longest_prefix_match(self):
1079         # verify we find lpm
1080         lpm_prefix = "2001:1111:2222::/48"
1081         result = self.route_lookup("2001:1111:2222::2/128", False)
1082         assert (lpm_prefix == str(result.route.prefix))
1083
1084         # Verify we find the exact when not requested
1085         result = self.route_lookup(lpm_prefix, False)
1086         assert (lpm_prefix == str(result.route.prefix))
1087
1088         # Can't seem to delete the default route so no negative LPM test.
1089
1090
1091 class TestIPv6IfAddrRoute(VppTestCase):
1092     """ IPv6 Interface Addr Route Test Case """
1093
1094     @classmethod
1095     def setUpClass(cls):
1096         super(TestIPv6IfAddrRoute, cls).setUpClass()
1097
1098     @classmethod
1099     def tearDownClass(cls):
1100         super(TestIPv6IfAddrRoute, cls).tearDownClass()
1101
1102     def setUp(self):
1103         super(TestIPv6IfAddrRoute, self).setUp()
1104
1105         # create 1 pg interface
1106         self.create_pg_interfaces(range(1))
1107
1108         for i in self.pg_interfaces:
1109             i.admin_up()
1110             i.config_ip6()
1111             i.resolve_ndp()
1112
1113     def tearDown(self):
1114         super(TestIPv6IfAddrRoute, self).tearDown()
1115         for i in self.pg_interfaces:
1116             i.unconfig_ip6()
1117             i.admin_down()
1118
1119     def test_ipv6_ifaddrs_same_prefix(self):
1120         """ IPv6 Interface Addresses Same Prefix test
1121
1122         Test scenario:
1123
1124             - Verify no route in FIB for prefix 2001:10::/64
1125             - Configure IPv4 address 2001:10::10/64  on an interface
1126             - Verify route in FIB for prefix 2001:10::/64
1127             - Configure IPv4 address 2001:10::20/64 on an interface
1128             - Delete 2001:10::10/64 from interface
1129             - Verify route in FIB for prefix 2001:10::/64
1130             - Delete 2001:10::20/64 from interface
1131             - Verify no route in FIB for prefix 2001:10::/64
1132         """
1133
1134         addr1 = "2001:10::10"
1135         addr2 = "2001:10::20"
1136
1137         if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
1138         if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
1139         self.assertFalse(if_addr1.query_vpp_config())
1140         self.assertFalse(find_route(self, addr1, 128))
1141         self.assertFalse(find_route(self, addr2, 128))
1142
1143         # configure first address, verify route present
1144         if_addr1.add_vpp_config()
1145         self.assertTrue(if_addr1.query_vpp_config())
1146         self.assertTrue(find_route(self, addr1, 128))
1147         self.assertFalse(find_route(self, addr2, 128))
1148
1149         # configure second address, delete first, verify route not removed
1150         if_addr2.add_vpp_config()
1151         if_addr1.remove_vpp_config()
1152         self.assertFalse(if_addr1.query_vpp_config())
1153         self.assertTrue(if_addr2.query_vpp_config())
1154         self.assertFalse(find_route(self, addr1, 128))
1155         self.assertTrue(find_route(self, addr2, 128))
1156
1157         # delete second address, verify route removed
1158         if_addr2.remove_vpp_config()
1159         self.assertFalse(if_addr1.query_vpp_config())
1160         self.assertFalse(find_route(self, addr1, 128))
1161         self.assertFalse(find_route(self, addr2, 128))
1162
1163     def test_ipv6_ifaddr_del(self):
1164         """ Delete an interface address that does not exist """
1165
1166         loopbacks = self.create_loopback_interfaces(1)
1167         lo = self.lo_interfaces[0]
1168
1169         lo.config_ip6()
1170         lo.admin_up()
1171
1172         #
1173         # try and remove pg0's subnet from lo
1174         #
1175         with self.vapi.assert_negative_api_retval():
1176             self.vapi.sw_interface_add_del_address(
1177                 sw_if_index=lo.sw_if_index,
1178                 prefix=self.pg0.local_ip6_prefix,
1179                 is_add=0)
1180
1181
1182 class TestICMPv6Echo(VppTestCase):
1183     """ ICMPv6 Echo Test Case """
1184
1185     @classmethod
1186     def setUpClass(cls):
1187         super(TestICMPv6Echo, cls).setUpClass()
1188
1189     @classmethod
1190     def tearDownClass(cls):
1191         super(TestICMPv6Echo, cls).tearDownClass()
1192
1193     def setUp(self):
1194         super(TestICMPv6Echo, self).setUp()
1195
1196         # create 1 pg interface
1197         self.create_pg_interfaces(range(1))
1198
1199         for i in self.pg_interfaces:
1200             i.admin_up()
1201             i.config_ip6()
1202             i.resolve_ndp(link_layer=True)
1203             i.resolve_ndp()
1204
1205     def tearDown(self):
1206         super(TestICMPv6Echo, self).tearDown()
1207         for i in self.pg_interfaces:
1208             i.unconfig_ip6()
1209             i.admin_down()
1210
1211     def test_icmpv6_echo(self):
1212         """ VPP replies to ICMPv6 Echo Request
1213
1214         Test scenario:
1215
1216             - Receive ICMPv6 Echo Request message on pg0 interface.
1217             - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
1218         """
1219
1220         # test both with global and local ipv6 addresses
1221         dsts = (self.pg0.local_ip6, self.pg0.local_ip6_ll)
1222         id = 0xb
1223         seq = 5
1224         data = b'\x0a' * 18
1225         p = list()
1226         for dst in dsts:
1227             p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1228                       IPv6(src=self.pg0.remote_ip6, dst=dst) /
1229                       ICMPv6EchoRequest(id=id, seq=seq, data=data)))
1230
1231         self.pg0.add_stream(p)
1232         self.pg_enable_capture(self.pg_interfaces)
1233         self.pg_start()
1234         rxs = self.pg0.get_capture(len(dsts))
1235
1236         for rx, dst in zip(rxs, dsts):
1237             ether = rx[Ether]
1238             ipv6 = rx[IPv6]
1239             icmpv6 = rx[ICMPv6EchoReply]
1240             self.assertEqual(ether.src, self.pg0.local_mac)
1241             self.assertEqual(ether.dst, self.pg0.remote_mac)
1242             self.assertEqual(ipv6.src, dst)
1243             self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
1244             self.assertEqual(icmp6types[icmpv6.type], "Echo Reply")
1245             self.assertEqual(icmpv6.id, id)
1246             self.assertEqual(icmpv6.seq, seq)
1247             self.assertEqual(icmpv6.data, data)
1248
1249
1250 class TestIPv6RD(TestIPv6ND):
1251     """ IPv6 Router Discovery Test Case """
1252
1253     @classmethod
1254     def setUpClass(cls):
1255         super(TestIPv6RD, cls).setUpClass()
1256
1257     @classmethod
1258     def tearDownClass(cls):
1259         super(TestIPv6RD, cls).tearDownClass()
1260
1261     def setUp(self):
1262         super(TestIPv6RD, self).setUp()
1263
1264         # create 2 pg interfaces
1265         self.create_pg_interfaces(range(2))
1266
1267         self.interfaces = list(self.pg_interfaces)
1268
1269         # setup all interfaces
1270         for i in self.interfaces:
1271             i.admin_up()
1272             i.config_ip6()
1273
1274     def tearDown(self):
1275         for i in self.interfaces:
1276             i.unconfig_ip6()
1277             i.admin_down()
1278         super(TestIPv6RD, self).tearDown()
1279
1280     def test_rd_send_router_solicitation(self):
1281         """ Verify router solicitation packets """
1282
1283         count = 2
1284         self.pg_enable_capture(self.pg_interfaces)
1285         self.pg_start()
1286         self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index,
1287                                                  mrc=count)
1288         rx_list = self.pg1.get_capture(count, timeout=3)
1289         self.assertEqual(len(rx_list), count)
1290         for packet in rx_list:
1291             self.assertEqual(packet.haslayer(IPv6), 1)
1292             self.assertEqual(packet[IPv6].haslayer(
1293                 ICMPv6ND_RS), 1)
1294             dst = ip6_normalize(packet[IPv6].dst)
1295             dst2 = ip6_normalize("ff02::2")
1296             self.assert_equal(dst, dst2)
1297             src = ip6_normalize(packet[IPv6].src)
1298             src2 = ip6_normalize(self.pg1.local_ip6_ll)
1299             self.assert_equal(src, src2)
1300             self.assertTrue(
1301                 bool(packet[ICMPv6ND_RS].haslayer(
1302                     ICMPv6NDOptSrcLLAddr)))
1303             self.assert_equal(
1304                 packet[ICMPv6NDOptSrcLLAddr].lladdr,
1305                 self.pg1.local_mac)
1306
1307     def verify_prefix_info(self, reported_prefix, prefix_option):
1308         prefix = IPv6Network(
1309             text_type(prefix_option.getfieldval("prefix") +
1310                       "/" +
1311                       text_type(prefix_option.getfieldval("prefixlen"))),
1312             strict=False)
1313         self.assert_equal(reported_prefix.prefix.network_address,
1314                           prefix.network_address)
1315         L = prefix_option.getfieldval("L")
1316         A = prefix_option.getfieldval("A")
1317         option_flags = (L << 7) | (A << 6)
1318         self.assert_equal(reported_prefix.flags, option_flags)
1319         self.assert_equal(reported_prefix.valid_time,
1320                           prefix_option.getfieldval("validlifetime"))
1321         self.assert_equal(reported_prefix.preferred_time,
1322                           prefix_option.getfieldval("preferredlifetime"))
1323
1324     def test_rd_receive_router_advertisement(self):
1325         """ Verify events triggered by received RA packets """
1326
1327         self.vapi.want_ip6_ra_events(enable=1)
1328
1329         prefix_info_1 = ICMPv6NDOptPrefixInfo(
1330             prefix="1::2",
1331             prefixlen=50,
1332             validlifetime=200,
1333             preferredlifetime=500,
1334             L=1,
1335             A=1,
1336         )
1337
1338         prefix_info_2 = ICMPv6NDOptPrefixInfo(
1339             prefix="7::4",
1340             prefixlen=20,
1341             validlifetime=70,
1342             preferredlifetime=1000,
1343             L=1,
1344             A=0,
1345         )
1346
1347         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1348              IPv6(dst=self.pg1.local_ip6_ll,
1349                   src=mk_ll_addr(self.pg1.remote_mac)) /
1350              ICMPv6ND_RA() /
1351              prefix_info_1 /
1352              prefix_info_2)
1353         self.pg1.add_stream([p])
1354         self.pg_start()
1355
1356         ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1357
1358         self.assert_equal(ev.current_hop_limit, 0)
1359         self.assert_equal(ev.flags, 8)
1360         self.assert_equal(ev.router_lifetime_in_sec, 1800)
1361         self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1362         self.assert_equal(
1363             ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0)
1364
1365         self.assert_equal(ev.n_prefixes, 2)
1366
1367         self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1368         self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1369
1370
1371 class TestIPv6RDControlPlane(TestIPv6ND):
1372     """ IPv6 Router Discovery Control Plane Test Case """
1373
1374     @classmethod
1375     def setUpClass(cls):
1376         super(TestIPv6RDControlPlane, cls).setUpClass()
1377
1378     @classmethod
1379     def tearDownClass(cls):
1380         super(TestIPv6RDControlPlane, cls).tearDownClass()
1381
1382     def setUp(self):
1383         super(TestIPv6RDControlPlane, self).setUp()
1384
1385         # create 1 pg interface
1386         self.create_pg_interfaces(range(1))
1387
1388         self.interfaces = list(self.pg_interfaces)
1389
1390         # setup all interfaces
1391         for i in self.interfaces:
1392             i.admin_up()
1393             i.config_ip6()
1394
1395     def tearDown(self):
1396         super(TestIPv6RDControlPlane, self).tearDown()
1397
1398     @staticmethod
1399     def create_ra_packet(pg, routerlifetime=None):
1400         src_ip = pg.remote_ip6_ll
1401         dst_ip = pg.local_ip6
1402         if routerlifetime is not None:
1403             ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1404         else:
1405             ra = ICMPv6ND_RA()
1406         p = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
1407              IPv6(dst=dst_ip, src=src_ip) / ra)
1408         return p
1409
1410     @staticmethod
1411     def get_default_routes(fib):
1412         list = []
1413         for entry in fib:
1414             if entry.route.prefix.prefixlen == 0:
1415                 for path in entry.route.paths:
1416                     if path.sw_if_index != 0xFFFFFFFF:
1417                         defaut_route = {}
1418                         defaut_route['sw_if_index'] = path.sw_if_index
1419                         defaut_route['next_hop'] = path.nh.address.ip6
1420                         list.append(defaut_route)
1421         return list
1422
1423     @staticmethod
1424     def get_interface_addresses(fib, pg):
1425         list = []
1426         for entry in fib:
1427             if entry.route.prefix.prefixlen == 128:
1428                 path = entry.route.paths[0]
1429                 if path.sw_if_index == pg.sw_if_index:
1430                     list.append(str(entry.route.prefix.network_address))
1431         return list
1432
1433     def wait_for_no_default_route(self, n_tries=50, s_time=1):
1434         while (n_tries):
1435             fib = self.vapi.ip_route_dump(0, True)
1436             default_routes = self.get_default_routes(fib)
1437             if 0 == len(default_routes):
1438                 return True
1439             n_tries = n_tries - 1
1440             self.sleep(s_time)
1441
1442         return False
1443
1444     def test_all(self):
1445         """ Test handling of SLAAC addresses and default routes """
1446
1447         fib = self.vapi.ip_route_dump(0, True)
1448         default_routes = self.get_default_routes(fib)
1449         initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1450         self.assertEqual(default_routes, [])
1451         router_address = IPv6Address(text_type(self.pg0.remote_ip6_ll))
1452
1453         self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1454
1455         self.sleep(0.1)
1456
1457         # send RA
1458         packet = (self.create_ra_packet(
1459             self.pg0) / ICMPv6NDOptPrefixInfo(
1460             prefix="1::",
1461             prefixlen=64,
1462             validlifetime=2,
1463             preferredlifetime=2,
1464             L=1,
1465             A=1,
1466         ) / ICMPv6NDOptPrefixInfo(
1467             prefix="7::",
1468             prefixlen=20,
1469             validlifetime=1500,
1470             preferredlifetime=1000,
1471             L=1,
1472             A=0,
1473         ))
1474         self.pg0.add_stream([packet])
1475         self.pg_start()
1476
1477         self.sleep_on_vpp_time(0.1)
1478
1479         fib = self.vapi.ip_route_dump(0, True)
1480
1481         # check FIB for new address
1482         addresses = set(self.get_interface_addresses(fib, self.pg0))
1483         new_addresses = addresses.difference(initial_addresses)
1484         self.assertEqual(len(new_addresses), 1)
1485         prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
1486                              strict=False)
1487         self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
1488
1489         # check FIB for new default route
1490         default_routes = self.get_default_routes(fib)
1491         self.assertEqual(len(default_routes), 1)
1492         dr = default_routes[0]
1493         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1494         self.assertEqual(dr['next_hop'], router_address)
1495
1496         # send RA to delete default route
1497         packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1498         self.pg0.add_stream([packet])
1499         self.pg_start()
1500
1501         self.sleep_on_vpp_time(0.1)
1502
1503         # check that default route is deleted
1504         fib = self.vapi.ip_route_dump(0, True)
1505         default_routes = self.get_default_routes(fib)
1506         self.assertEqual(len(default_routes), 0)
1507
1508         self.sleep_on_vpp_time(0.1)
1509
1510         # send RA
1511         packet = self.create_ra_packet(self.pg0)
1512         self.pg0.add_stream([packet])
1513         self.pg_start()
1514
1515         self.sleep_on_vpp_time(0.1)
1516
1517         # check FIB for new default route
1518         fib = self.vapi.ip_route_dump(0, True)
1519         default_routes = self.get_default_routes(fib)
1520         self.assertEqual(len(default_routes), 1)
1521         dr = default_routes[0]
1522         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1523         self.assertEqual(dr['next_hop'], router_address)
1524
1525         # send RA, updating router lifetime to 1s
1526         packet = self.create_ra_packet(self.pg0, 1)
1527         self.pg0.add_stream([packet])
1528         self.pg_start()
1529
1530         self.sleep_on_vpp_time(0.1)
1531
1532         # check that default route still exists
1533         fib = self.vapi.ip_route_dump(0, True)
1534         default_routes = self.get_default_routes(fib)
1535         self.assertEqual(len(default_routes), 1)
1536         dr = default_routes[0]
1537         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1538         self.assertEqual(dr['next_hop'], router_address)
1539
1540         self.sleep_on_vpp_time(1)
1541
1542         # check that default route is deleted
1543         self.assertTrue(self.wait_for_no_default_route())
1544
1545         # check FIB still contains the SLAAC address
1546         addresses = set(self.get_interface_addresses(fib, self.pg0))
1547         new_addresses = addresses.difference(initial_addresses)
1548
1549         self.assertEqual(len(new_addresses), 1)
1550         prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
1551                              strict=False)
1552         self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
1553
1554         self.sleep_on_vpp_time(1)
1555
1556         # check that SLAAC address is deleted
1557         fib = self.vapi.ip_route_dump(0, True)
1558         addresses = set(self.get_interface_addresses(fib, self.pg0))
1559         new_addresses = addresses.difference(initial_addresses)
1560         self.assertEqual(len(new_addresses), 0)
1561
1562
1563 class IPv6NDProxyTest(TestIPv6ND):
1564     """ IPv6 ND ProxyTest Case """
1565
1566     @classmethod
1567     def setUpClass(cls):
1568         super(IPv6NDProxyTest, cls).setUpClass()
1569
1570     @classmethod
1571     def tearDownClass(cls):
1572         super(IPv6NDProxyTest, cls).tearDownClass()
1573
1574     def setUp(self):
1575         super(IPv6NDProxyTest, self).setUp()
1576
1577         # create 3 pg interfaces
1578         self.create_pg_interfaces(range(3))
1579
1580         # pg0 is the master interface, with the configured subnet
1581         self.pg0.admin_up()
1582         self.pg0.config_ip6()
1583         self.pg0.resolve_ndp()
1584
1585         self.pg1.ip6_enable()
1586         self.pg2.ip6_enable()
1587
1588     def tearDown(self):
1589         super(IPv6NDProxyTest, self).tearDown()
1590
1591     def test_nd_proxy(self):
1592         """ IPv6 Proxy ND """
1593
1594         #
1595         # Generate some hosts in the subnet that we are proxying
1596         #
1597         self.pg0.generate_remote_hosts(8)
1598
1599         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1600         d = inet_ntop(AF_INET6, nsma)
1601
1602         #
1603         # Send an NS for one of those remote hosts on one of the proxy links
1604         # expect no response since it's from an address that is not
1605         # on the link that has the prefix configured
1606         #
1607         ns_pg1 = (Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac) /
1608                   IPv6(dst=d,
1609                        src=self.pg0._remote_hosts[2].ip6) /
1610                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1611                   ICMPv6NDOptSrcLLAddr(
1612                       lladdr=self.pg0._remote_hosts[2].mac))
1613
1614         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1615
1616         #
1617         # Add proxy support for the host
1618         #
1619         self.vapi.ip6nd_proxy_add_del(
1620             is_add=1, ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1621             sw_if_index=self.pg1.sw_if_index)
1622
1623         #
1624         # try that NS again. this time we expect an NA back
1625         #
1626         self.send_and_expect_na(self.pg1, ns_pg1,
1627                                 "NS to proxy entry",
1628                                 dst_ip=self.pg0._remote_hosts[2].ip6,
1629                                 tgt_ip=self.pg0.local_ip6)
1630
1631         #
1632         # ... and that we have an entry in the ND cache
1633         #
1634         self.assertTrue(find_nbr(self,
1635                                  self.pg1.sw_if_index,
1636                                  self.pg0._remote_hosts[2].ip6))
1637
1638         #
1639         # ... and we can route traffic to it
1640         #
1641         t = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1642              IPv6(dst=self.pg0._remote_hosts[2].ip6,
1643                   src=self.pg0.remote_ip6) /
1644              inet6.UDP(sport=10000, dport=20000) /
1645              Raw(b'\xa5' * 100))
1646
1647         self.pg0.add_stream(t)
1648         self.pg_enable_capture(self.pg_interfaces)
1649         self.pg_start()
1650         rx = self.pg1.get_capture(1)
1651         rx = rx[0]
1652
1653         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1654         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1655
1656         self.assertEqual(rx[IPv6].src,
1657                          t[IPv6].src)
1658         self.assertEqual(rx[IPv6].dst,
1659                          t[IPv6].dst)
1660
1661         #
1662         # Test we proxy for the host on the main interface
1663         #
1664         ns_pg0 = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
1665                   IPv6(dst=d, src=self.pg0.remote_ip6) /
1666                   ICMPv6ND_NS(
1667                       tgt=self.pg0._remote_hosts[2].ip6) /
1668                   ICMPv6NDOptSrcLLAddr(
1669                       lladdr=self.pg0.remote_mac))
1670
1671         self.send_and_expect_na(self.pg0, ns_pg0,
1672                                 "NS to proxy entry on main",
1673                                 tgt_ip=self.pg0._remote_hosts[2].ip6,
1674                                 dst_ip=self.pg0.remote_ip6)
1675
1676         #
1677         # Setup and resolve proxy for another host on another interface
1678         #
1679         ns_pg2 = (Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac) /
1680                   IPv6(dst=d,
1681                        src=self.pg0._remote_hosts[3].ip6) /
1682                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1683                   ICMPv6NDOptSrcLLAddr(
1684                       lladdr=self.pg0._remote_hosts[2].mac))
1685
1686         self.vapi.ip6nd_proxy_add_del(
1687             is_add=1, ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1688             sw_if_index=self.pg2.sw_if_index)
1689
1690         self.send_and_expect_na(self.pg2, ns_pg2,
1691                                 "NS to proxy entry other interface",
1692                                 dst_ip=self.pg0._remote_hosts[3].ip6,
1693                                 tgt_ip=self.pg0.local_ip6)
1694
1695         self.assertTrue(find_nbr(self,
1696                                  self.pg2.sw_if_index,
1697                                  self.pg0._remote_hosts[3].ip6))
1698
1699         #
1700         # hosts can communicate. pg2->pg1
1701         #
1702         t2 = (Ether(dst=self.pg2.local_mac,
1703                     src=self.pg0.remote_hosts[3].mac) /
1704               IPv6(dst=self.pg0._remote_hosts[2].ip6,
1705                    src=self.pg0._remote_hosts[3].ip6) /
1706               inet6.UDP(sport=10000, dport=20000) /
1707               Raw(b'\xa5' * 100))
1708
1709         self.pg2.add_stream(t2)
1710         self.pg_enable_capture(self.pg_interfaces)
1711         self.pg_start()
1712         rx = self.pg1.get_capture(1)
1713         rx = rx[0]
1714
1715         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1716         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1717
1718         self.assertEqual(rx[IPv6].src,
1719                          t2[IPv6].src)
1720         self.assertEqual(rx[IPv6].dst,
1721                          t2[IPv6].dst)
1722
1723         #
1724         # remove the proxy configs
1725         #
1726         self.vapi.ip6nd_proxy_add_del(
1727             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1728             sw_if_index=self.pg1.sw_if_index, is_add=0)
1729         self.vapi.ip6nd_proxy_add_del(
1730             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1731             sw_if_index=self.pg2.sw_if_index, is_add=0)
1732
1733         self.assertFalse(find_nbr(self,
1734                                   self.pg2.sw_if_index,
1735                                   self.pg0._remote_hosts[3].ip6))
1736         self.assertFalse(find_nbr(self,
1737                                   self.pg1.sw_if_index,
1738                                   self.pg0._remote_hosts[2].ip6))
1739
1740         #
1741         # no longer proxy-ing...
1742         #
1743         self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1744         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1745         self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1746
1747         #
1748         # no longer forwarding. traffic generates NS out of the glean/main
1749         # interface
1750         #
1751         self.pg2.add_stream(t2)
1752         self.pg_enable_capture(self.pg_interfaces)
1753         self.pg_start()
1754
1755         rx = self.pg0.get_capture(1)
1756
1757         self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1758
1759
1760 class TestIP6Null(VppTestCase):
1761     """ IPv6 routes via NULL """
1762
1763     @classmethod
1764     def setUpClass(cls):
1765         super(TestIP6Null, cls).setUpClass()
1766
1767     @classmethod
1768     def tearDownClass(cls):
1769         super(TestIP6Null, cls).tearDownClass()
1770
1771     def setUp(self):
1772         super(TestIP6Null, self).setUp()
1773
1774         # create 2 pg interfaces
1775         self.create_pg_interfaces(range(1))
1776
1777         for i in self.pg_interfaces:
1778             i.admin_up()
1779             i.config_ip6()
1780             i.resolve_ndp()
1781
1782     def tearDown(self):
1783         super(TestIP6Null, self).tearDown()
1784         for i in self.pg_interfaces:
1785             i.unconfig_ip6()
1786             i.admin_down()
1787
1788     def test_ip_null(self):
1789         """ IP NULL route """
1790
1791         p = (Ether(src=self.pg0.remote_mac,
1792                    dst=self.pg0.local_mac) /
1793              IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
1794              inet6.UDP(sport=1234, dport=1234) /
1795              Raw(b'\xa5' * 100))
1796
1797         #
1798         # A route via IP NULL that will reply with ICMP unreachables
1799         #
1800         ip_unreach = VppIpRoute(
1801             self, "2001::", 64,
1802             [VppRoutePath("::", 0xffffffff,
1803                           type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH)])
1804         ip_unreach.add_vpp_config()
1805
1806         self.pg0.add_stream(p)
1807         self.pg_enable_capture(self.pg_interfaces)
1808         self.pg_start()
1809
1810         rx = self.pg0.get_capture(1)
1811         rx = rx[0]
1812         icmp = rx[ICMPv6DestUnreach]
1813
1814         # 0 = "No route to destination"
1815         self.assertEqual(icmp.code, 0)
1816
1817         # ICMP is rate limited. pause a bit
1818         self.sleep(1)
1819
1820         #
1821         # A route via IP NULL that will reply with ICMP prohibited
1822         #
1823         ip_prohibit = VppIpRoute(
1824             self, "2001::1", 128,
1825             [VppRoutePath("::", 0xffffffff,
1826                           type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT)])
1827         ip_prohibit.add_vpp_config()
1828
1829         self.pg0.add_stream(p)
1830         self.pg_enable_capture(self.pg_interfaces)
1831         self.pg_start()
1832
1833         rx = self.pg0.get_capture(1)
1834         rx = rx[0]
1835         icmp = rx[ICMPv6DestUnreach]
1836
1837         # 1 = "Communication with destination administratively prohibited"
1838         self.assertEqual(icmp.code, 1)
1839
1840
1841 class TestIP6Disabled(VppTestCase):
1842     """ IPv6 disabled """
1843
1844     @classmethod
1845     def setUpClass(cls):
1846         super(TestIP6Disabled, cls).setUpClass()
1847
1848     @classmethod
1849     def tearDownClass(cls):
1850         super(TestIP6Disabled, cls).tearDownClass()
1851
1852     def setUp(self):
1853         super(TestIP6Disabled, self).setUp()
1854
1855         # create 2 pg interfaces
1856         self.create_pg_interfaces(range(2))
1857
1858         # PG0 is IP enabled
1859         self.pg0.admin_up()
1860         self.pg0.config_ip6()
1861         self.pg0.resolve_ndp()
1862
1863         # PG 1 is not IP enabled
1864         self.pg1.admin_up()
1865
1866     def tearDown(self):
1867         super(TestIP6Disabled, self).tearDown()
1868         for i in self.pg_interfaces:
1869             i.unconfig_ip4()
1870             i.admin_down()
1871
1872     def test_ip_disabled(self):
1873         """ IP Disabled """
1874
1875         MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
1876         MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
1877         #
1878         # An (S,G).
1879         # one accepting interface, pg0, 2 forwarding interfaces
1880         #
1881         route_ff_01 = VppIpMRoute(
1882             self,
1883             "::",
1884             "ffef::1", 128,
1885             MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
1886             [VppMRoutePath(self.pg1.sw_if_index,
1887                            MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT),
1888              VppMRoutePath(self.pg0.sw_if_index,
1889                            MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD)])
1890         route_ff_01.add_vpp_config()
1891
1892         pu = (Ether(src=self.pg1.remote_mac,
1893                     dst=self.pg1.local_mac) /
1894               IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
1895               inet6.UDP(sport=1234, dport=1234) /
1896               Raw(b'\xa5' * 100))
1897         pm = (Ether(src=self.pg1.remote_mac,
1898                     dst=self.pg1.local_mac) /
1899               IPv6(src="2001::1", dst="ffef::1") /
1900               inet6.UDP(sport=1234, dport=1234) /
1901               Raw(b'\xa5' * 100))
1902
1903         #
1904         # PG1 does not forward IP traffic
1905         #
1906         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1907         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1908
1909         #
1910         # IP enable PG1
1911         #
1912         self.pg1.config_ip6()
1913
1914         #
1915         # Now we get packets through
1916         #
1917         self.pg1.add_stream(pu)
1918         self.pg_enable_capture(self.pg_interfaces)
1919         self.pg_start()
1920         rx = self.pg0.get_capture(1)
1921
1922         self.pg1.add_stream(pm)
1923         self.pg_enable_capture(self.pg_interfaces)
1924         self.pg_start()
1925         rx = self.pg0.get_capture(1)
1926
1927         #
1928         # Disable PG1
1929         #
1930         self.pg1.unconfig_ip6()
1931
1932         #
1933         # PG1 does not forward IP traffic
1934         #
1935         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1936         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1937
1938
1939 class TestIP6LoadBalance(VppTestCase):
1940     """ IPv6 Load-Balancing """
1941
1942     @classmethod
1943     def setUpClass(cls):
1944         super(TestIP6LoadBalance, cls).setUpClass()
1945
1946     @classmethod
1947     def tearDownClass(cls):
1948         super(TestIP6LoadBalance, cls).tearDownClass()
1949
1950     def setUp(self):
1951         super(TestIP6LoadBalance, self).setUp()
1952
1953         self.create_pg_interfaces(range(5))
1954
1955         mpls_tbl = VppMplsTable(self, 0)
1956         mpls_tbl.add_vpp_config()
1957
1958         for i in self.pg_interfaces:
1959             i.admin_up()
1960             i.config_ip6()
1961             i.resolve_ndp()
1962             i.enable_mpls()
1963
1964     def tearDown(self):
1965         for i in self.pg_interfaces:
1966             i.unconfig_ip6()
1967             i.admin_down()
1968             i.disable_mpls()
1969         super(TestIP6LoadBalance, self).tearDown()
1970
1971     def test_ip6_load_balance(self):
1972         """ IPv6 Load-Balancing """
1973
1974         #
1975         # An array of packets that differ only in the destination port
1976         #  - IP only
1977         #  - MPLS EOS
1978         #  - MPLS non-EOS
1979         #  - MPLS non-EOS with an entropy label
1980         #
1981         port_ip_pkts = []
1982         port_mpls_pkts = []
1983         port_mpls_neos_pkts = []
1984         port_ent_pkts = []
1985
1986         #
1987         # An array of packets that differ only in the source address
1988         #
1989         src_ip_pkts = []
1990         src_mpls_pkts = []
1991
1992         for ii in range(NUM_PKTS):
1993             port_ip_hdr = (
1994                 IPv6(dst="3000::1", src="3000:1::1") /
1995                 inet6.UDP(sport=1234, dport=1234 + ii) /
1996                 Raw(b'\xa5' * 100))
1997             port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1998                                        dst=self.pg0.local_mac) /
1999                                  port_ip_hdr))
2000             port_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
2001                                          dst=self.pg0.local_mac) /
2002                                    MPLS(label=66, ttl=2) /
2003                                    port_ip_hdr))
2004             port_mpls_neos_pkts.append((Ether(src=self.pg0.remote_mac,
2005                                               dst=self.pg0.local_mac) /
2006                                         MPLS(label=67, ttl=2) /
2007                                         MPLS(label=77, ttl=2) /
2008                                         port_ip_hdr))
2009             port_ent_pkts.append((Ether(src=self.pg0.remote_mac,
2010                                         dst=self.pg0.local_mac) /
2011                                   MPLS(label=67, ttl=2) /
2012                                   MPLS(label=14, ttl=2) /
2013                                   MPLS(label=999, ttl=2) /
2014                                   port_ip_hdr))
2015             src_ip_hdr = (
2016                 IPv6(dst="3000::1", src="3000:1::%d" % ii) /
2017                 inet6.UDP(sport=1234, dport=1234) /
2018                 Raw(b'\xa5' * 100))
2019             src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
2020                                       dst=self.pg0.local_mac) /
2021                                 src_ip_hdr))
2022             src_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
2023                                         dst=self.pg0.local_mac) /
2024                                   MPLS(label=66, ttl=2) /
2025                                   src_ip_hdr))
2026
2027         #
2028         # A route for the IP packets
2029         #
2030         route_3000_1 = VppIpRoute(self, "3000::1", 128,
2031                                   [VppRoutePath(self.pg1.remote_ip6,
2032                                                 self.pg1.sw_if_index),
2033                                    VppRoutePath(self.pg2.remote_ip6,
2034                                                 self.pg2.sw_if_index)])
2035         route_3000_1.add_vpp_config()
2036
2037         #
2038         # a local-label for the EOS packets
2039         #
2040         binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
2041         binding.add_vpp_config()
2042
2043         #
2044         # An MPLS route for the non-EOS packets
2045         #
2046         route_67 = VppMplsRoute(self, 67, 0,
2047                                 [VppRoutePath(self.pg1.remote_ip6,
2048                                               self.pg1.sw_if_index,
2049                                               labels=[67]),
2050                                  VppRoutePath(self.pg2.remote_ip6,
2051                                               self.pg2.sw_if_index,
2052                                               labels=[67])])
2053         route_67.add_vpp_config()
2054
2055         #
2056         # inject the packet on pg0 - expect load-balancing across the 2 paths
2057         #  - since the default hash config is to use IP src,dst and port
2058         #    src,dst
2059         # We are not going to ensure equal amounts of packets across each link,
2060         # since the hash algorithm is statistical and therefore this can never
2061         # be guaranteed. But with 64 different packets we do expect some
2062         # balancing. So instead just ensure there is traffic on each link.
2063         #
2064         rx = self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
2065                                                  [self.pg1, self.pg2])
2066         n_ip_pg0 = len(rx[0])
2067         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
2068                                             [self.pg1, self.pg2])
2069         self.send_and_expect_load_balancing(self.pg0, port_mpls_pkts,
2070                                             [self.pg1, self.pg2])
2071         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
2072                                             [self.pg1, self.pg2])
2073         rx = self.send_and_expect_load_balancing(self.pg0, port_mpls_neos_pkts,
2074                                                  [self.pg1, self.pg2])
2075         n_mpls_pg0 = len(rx[0])
2076
2077         #
2078         # change the router ID and expect the distribution changes
2079         #
2080         self.vapi.set_ip_flow_hash_router_id(router_id=0x11111111)
2081
2082         rx = self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
2083                                                  [self.pg1, self.pg2])
2084         self.assertNotEqual(n_ip_pg0, len(rx[0]))
2085
2086         rx = self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
2087                                                  [self.pg1, self.pg2])
2088         self.assertNotEqual(n_mpls_pg0, len(rx[0]))
2089
2090         #
2091         # The packets with Entropy label in should not load-balance,
2092         # since the Entropy value is fixed.
2093         #
2094         self.send_and_expect_only(self.pg0, port_ent_pkts, self.pg1)
2095
2096         #
2097         # change the flow hash config so it's only IP src,dst
2098         #  - now only the stream with differing source address will
2099         #    load-balance
2100         #
2101         self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, proto=1,
2102                                    sport=0, dport=0, is_ipv6=1)
2103
2104         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
2105                                             [self.pg1, self.pg2])
2106         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
2107                                             [self.pg1, self.pg2])
2108         self.send_and_expect_only(self.pg0, port_ip_pkts, self.pg2)
2109
2110         #
2111         # change the flow hash config back to defaults
2112         #
2113         self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=1, dport=1,
2114                                    proto=1, is_ipv6=1)
2115
2116         #
2117         # Recursive prefixes
2118         #  - testing that 2 stages of load-balancing occurs and there is no
2119         #    polarisation (i.e. only 2 of 4 paths are used)
2120         #
2121         port_pkts = []
2122         src_pkts = []
2123
2124         for ii in range(257):
2125             port_pkts.append((Ether(src=self.pg0.remote_mac,
2126                                     dst=self.pg0.local_mac) /
2127                               IPv6(dst="4000::1",
2128                                    src="4000:1::1") /
2129                               inet6.UDP(sport=1234,
2130                                         dport=1234 + ii) /
2131                               Raw(b'\xa5' * 100)))
2132             src_pkts.append((Ether(src=self.pg0.remote_mac,
2133                                    dst=self.pg0.local_mac) /
2134                              IPv6(dst="4000::1",
2135                                   src="4000:1::%d" % ii) /
2136                              inet6.UDP(sport=1234, dport=1234) /
2137                              Raw(b'\xa5' * 100)))
2138
2139         route_3000_2 = VppIpRoute(self, "3000::2", 128,
2140                                   [VppRoutePath(self.pg3.remote_ip6,
2141                                                 self.pg3.sw_if_index),
2142                                    VppRoutePath(self.pg4.remote_ip6,
2143                                                 self.pg4.sw_if_index)])
2144         route_3000_2.add_vpp_config()
2145
2146         route_4000_1 = VppIpRoute(self, "4000::1", 128,
2147                                   [VppRoutePath("3000::1",
2148                                                 0xffffffff),
2149                                    VppRoutePath("3000::2",
2150                                                 0xffffffff)])
2151         route_4000_1.add_vpp_config()
2152
2153         #
2154         # inject the packet on pg0 - expect load-balancing across all 4 paths
2155         #
2156         self.vapi.cli("clear trace")
2157         self.send_and_expect_load_balancing(self.pg0, port_pkts,
2158                                             [self.pg1, self.pg2,
2159                                              self.pg3, self.pg4])
2160         self.send_and_expect_load_balancing(self.pg0, src_pkts,
2161                                             [self.pg1, self.pg2,
2162                                              self.pg3, self.pg4])
2163
2164         #
2165         # Recursive prefixes
2166         #  - testing that 2 stages of load-balancing no choices
2167         #
2168         port_pkts = []
2169
2170         for ii in range(257):
2171             port_pkts.append((Ether(src=self.pg0.remote_mac,
2172                                     dst=self.pg0.local_mac) /
2173                               IPv6(dst="6000::1",
2174                                    src="6000:1::1") /
2175                               inet6.UDP(sport=1234,
2176                                         dport=1234 + ii) /
2177                               Raw(b'\xa5' * 100)))
2178
2179         route_5000_2 = VppIpRoute(self, "5000::2", 128,
2180                                   [VppRoutePath(self.pg3.remote_ip6,
2181                                                 self.pg3.sw_if_index)])
2182         route_5000_2.add_vpp_config()
2183
2184         route_6000_1 = VppIpRoute(self, "6000::1", 128,
2185                                   [VppRoutePath("5000::2",
2186                                                 0xffffffff)])
2187         route_6000_1.add_vpp_config()
2188
2189         #
2190         # inject the packet on pg0 - expect load-balancing across all 4 paths
2191         #
2192         self.vapi.cli("clear trace")
2193         self.send_and_expect_only(self.pg0, port_pkts, self.pg3)
2194
2195
2196 class IP6PuntSetup(object):
2197     """ Setup for IPv6 Punt Police/Redirect """
2198
2199     def punt_setup(self):
2200         self.create_pg_interfaces(range(4))
2201
2202         for i in self.pg_interfaces:
2203             i.admin_up()
2204             i.config_ip6()
2205             i.resolve_ndp()
2206
2207         self.pkt = (Ether(src=self.pg0.remote_mac,
2208                           dst=self.pg0.local_mac) /
2209                     IPv6(src=self.pg0.remote_ip6,
2210                          dst=self.pg0.local_ip6) /
2211                     inet6.TCP(sport=1234, dport=1234) /
2212                     Raw(b'\xa5' * 100))
2213
2214     def punt_teardown(self):
2215         for i in self.pg_interfaces:
2216             i.unconfig_ip6()
2217             i.admin_down()
2218
2219
2220 class TestIP6Punt(IP6PuntSetup, VppTestCase):
2221     """ IPv6 Punt Police/Redirect """
2222
2223     def setUp(self):
2224         super(TestIP6Punt, self).setUp()
2225         super(TestIP6Punt, self).punt_setup()
2226
2227     def tearDown(self):
2228         super(TestIP6Punt, self).punt_teardown()
2229         super(TestIP6Punt, self).tearDown()
2230
2231     def test_ip_punt(self):
2232         """ IP6 punt police and redirect """
2233
2234         pkts = self.pkt * 1025
2235
2236         #
2237         # Configure a punt redirect via pg1.
2238         #
2239         nh_addr = self.pg1.remote_ip6
2240         ip_punt_redirect = VppIpPuntRedirect(self, self.pg0.sw_if_index,
2241                                              self.pg1.sw_if_index, nh_addr)
2242         ip_punt_redirect.add_vpp_config()
2243
2244         self.send_and_expect(self.pg0, pkts, self.pg1)
2245
2246         #
2247         # add a policer
2248         #
2249         policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
2250         policer.add_vpp_config()
2251         ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index,
2252                                            is_ip6=True)
2253         ip_punt_policer.add_vpp_config()
2254
2255         self.vapi.cli("clear trace")
2256         self.pg0.add_stream(pkts)
2257         self.pg_enable_capture(self.pg_interfaces)
2258         self.pg_start()
2259
2260         #
2261         # the number of packet received should be greater than 0,
2262         # but not equal to the number sent, since some were policed
2263         #
2264         rx = self.pg1._get_capture(1)
2265         stats = policer.get_stats()
2266
2267         # Single rate policer - expect conform, violate but no exceed
2268         self.assertGreater(stats['conform_packets'], 0)
2269         self.assertEqual(stats['exceed_packets'], 0)
2270         self.assertGreater(stats['violate_packets'], 0)
2271
2272         self.assertGreater(len(rx), 0)
2273         self.assertLess(len(rx), len(pkts))
2274
2275         #
2276         # remove the policer. back to full rx
2277         #
2278         ip_punt_policer.remove_vpp_config()
2279         policer.remove_vpp_config()
2280         self.send_and_expect(self.pg0, pkts, self.pg1)
2281
2282         #
2283         # remove the redirect. expect full drop.
2284         #
2285         ip_punt_redirect.remove_vpp_config()
2286         self.send_and_assert_no_replies(self.pg0, pkts,
2287                                         "IP no punt config")
2288
2289         #
2290         # Add a redirect that is not input port selective
2291         #
2292         ip_punt_redirect = VppIpPuntRedirect(self, 0xffffffff,
2293                                              self.pg1.sw_if_index, nh_addr)
2294         ip_punt_redirect.add_vpp_config()
2295         self.send_and_expect(self.pg0, pkts, self.pg1)
2296         ip_punt_redirect.remove_vpp_config()
2297
2298     def test_ip_punt_dump(self):
2299         """ IP6 punt redirect dump"""
2300
2301         #
2302         # Configure a punt redirects
2303         #
2304         nh_address = self.pg3.remote_ip6
2305         ipr_03 = VppIpPuntRedirect(self, self.pg0.sw_if_index,
2306                                    self.pg3.sw_if_index, nh_address)
2307         ipr_13 = VppIpPuntRedirect(self, self.pg1.sw_if_index,
2308                                    self.pg3.sw_if_index, nh_address)
2309         ipr_23 = VppIpPuntRedirect(self, self.pg2.sw_if_index,
2310                                    self.pg3.sw_if_index, '0::0')
2311         ipr_03.add_vpp_config()
2312         ipr_13.add_vpp_config()
2313         ipr_23.add_vpp_config()
2314
2315         #
2316         # Dump pg0 punt redirects
2317         #
2318         self.assertTrue(ipr_03.query_vpp_config())
2319         self.assertTrue(ipr_13.query_vpp_config())
2320         self.assertTrue(ipr_23.query_vpp_config())
2321
2322         #
2323         # Dump punt redirects for all interfaces
2324         #
2325         punts = self.vapi.ip_punt_redirect_dump(0xffffffff, is_ipv6=1)
2326         self.assertEqual(len(punts), 3)
2327         for p in punts:
2328             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
2329         self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
2330         self.assertEqual(str(punts[2].punt.nh), '::')
2331
2332
2333 class TestIP6PuntHandoff(IP6PuntSetup, VppTestCase):
2334     """ IPv6 Punt Police/Redirect """
2335     vpp_worker_count = 2
2336
2337     def setUp(self):
2338         super(TestIP6PuntHandoff, self).setUp()
2339         super(TestIP6PuntHandoff, self).punt_setup()
2340
2341     def tearDown(self):
2342         super(TestIP6PuntHandoff, self).punt_teardown()
2343         super(TestIP6PuntHandoff, self).tearDown()
2344
2345     def test_ip_punt(self):
2346         """ IP6 punt policer thread handoff """
2347         pkts = self.pkt * NUM_PKTS
2348
2349         #
2350         # Configure a punt redirect via pg1.
2351         #
2352         nh_addr = self.pg1.remote_ip6
2353         ip_punt_redirect = VppIpPuntRedirect(self, self.pg0.sw_if_index,
2354                                              self.pg1.sw_if_index, nh_addr)
2355         ip_punt_redirect.add_vpp_config()
2356
2357         action_tx = PolicerAction(
2358             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
2359             0)
2360         #
2361         # This policer drops no packets, we are just
2362         # testing that they get to the right thread.
2363         #
2364         policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, 1,
2365                              0, 0, False, action_tx, action_tx, action_tx)
2366         policer.add_vpp_config()
2367         ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index,
2368                                            is_ip6=True)
2369         ip_punt_policer.add_vpp_config()
2370
2371         for worker in [0, 1]:
2372             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2373             if worker == 0:
2374                 self.logger.debug(self.vapi.cli("show trace max 100"))
2375
2376         # Combined stats, all threads
2377         stats = policer.get_stats()
2378
2379         # Single rate policer - expect conform, violate but no exceed
2380         self.assertGreater(stats['conform_packets'], 0)
2381         self.assertEqual(stats['exceed_packets'], 0)
2382         self.assertGreater(stats['violate_packets'], 0)
2383
2384         # Worker 0, should have done all the policing
2385         stats0 = policer.get_stats(worker=0)
2386         self.assertEqual(stats, stats0)
2387
2388         # Worker 1, should have handed everything off
2389         stats1 = policer.get_stats(worker=1)
2390         self.assertEqual(stats1['conform_packets'], 0)
2391         self.assertEqual(stats1['exceed_packets'], 0)
2392         self.assertEqual(stats1['violate_packets'], 0)
2393
2394         # Bind the policer to worker 1 and repeat
2395         policer.bind_vpp_config(1, True)
2396         for worker in [0, 1]:
2397             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2398             self.logger.debug(self.vapi.cli("show trace max 100"))
2399
2400         # The 2 workers should now have policed the same amount
2401         stats = policer.get_stats()
2402         stats0 = policer.get_stats(worker=0)
2403         stats1 = policer.get_stats(worker=1)
2404
2405         self.assertGreater(stats0['conform_packets'], 0)
2406         self.assertEqual(stats0['exceed_packets'], 0)
2407         self.assertGreater(stats0['violate_packets'], 0)
2408
2409         self.assertGreater(stats1['conform_packets'], 0)
2410         self.assertEqual(stats1['exceed_packets'], 0)
2411         self.assertGreater(stats1['violate_packets'], 0)
2412
2413         self.assertEqual(stats0['conform_packets'] + stats1['conform_packets'],
2414                          stats['conform_packets'])
2415
2416         self.assertEqual(stats0['violate_packets'] + stats1['violate_packets'],
2417                          stats['violate_packets'])
2418
2419         # Unbind the policer and repeat
2420         policer.bind_vpp_config(1, False)
2421         for worker in [0, 1]:
2422             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2423             self.logger.debug(self.vapi.cli("show trace max 100"))
2424
2425         # The policer should auto-bind to worker 0 when packets arrive
2426         stats = policer.get_stats()
2427         stats0new = policer.get_stats(worker=0)
2428         stats1new = policer.get_stats(worker=1)
2429
2430         self.assertGreater(stats0new['conform_packets'],
2431                            stats0['conform_packets'])
2432         self.assertEqual(stats0new['exceed_packets'], 0)
2433         self.assertGreater(stats0new['violate_packets'],
2434                            stats0['violate_packets'])
2435
2436         self.assertEqual(stats1, stats1new)
2437
2438         #
2439         # Clean up
2440         #
2441         ip_punt_policer.remove_vpp_config()
2442         policer.remove_vpp_config()
2443         ip_punt_redirect.remove_vpp_config()
2444
2445
2446 class TestIP6Deag(VppTestCase):
2447     """ IPv6 Deaggregate Routes """
2448
2449     @classmethod
2450     def setUpClass(cls):
2451         super(TestIP6Deag, cls).setUpClass()
2452
2453     @classmethod
2454     def tearDownClass(cls):
2455         super(TestIP6Deag, cls).tearDownClass()
2456
2457     def setUp(self):
2458         super(TestIP6Deag, self).setUp()
2459
2460         self.create_pg_interfaces(range(3))
2461
2462         for i in self.pg_interfaces:
2463             i.admin_up()
2464             i.config_ip6()
2465             i.resolve_ndp()
2466
2467     def tearDown(self):
2468         super(TestIP6Deag, self).tearDown()
2469         for i in self.pg_interfaces:
2470             i.unconfig_ip6()
2471             i.admin_down()
2472
2473     def test_ip_deag(self):
2474         """ IP Deag Routes """
2475
2476         #
2477         # Create a table to be used for:
2478         #  1 - another destination address lookup
2479         #  2 - a source address lookup
2480         #
2481         table_dst = VppIpTable(self, 1, is_ip6=1)
2482         table_src = VppIpTable(self, 2, is_ip6=1)
2483         table_dst.add_vpp_config()
2484         table_src.add_vpp_config()
2485
2486         #
2487         # Add a route in the default table to point to a deag/
2488         # second lookup in each of these tables
2489         #
2490         route_to_dst = VppIpRoute(self, "1::1", 128,
2491                                   [VppRoutePath("::",
2492                                                 0xffffffff,
2493                                                 nh_table_id=1)])
2494         route_to_src = VppIpRoute(
2495             self, "1::2", 128,
2496             [VppRoutePath("::",
2497                           0xffffffff,
2498                           nh_table_id=2,
2499                           type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP)])
2500
2501         route_to_dst.add_vpp_config()
2502         route_to_src.add_vpp_config()
2503
2504         #
2505         # packets to these destination are dropped, since they'll
2506         # hit the respective default routes in the second table
2507         #
2508         p_dst = (Ether(src=self.pg0.remote_mac,
2509                        dst=self.pg0.local_mac) /
2510                  IPv6(src="5::5", dst="1::1") /
2511                  inet6.TCP(sport=1234, dport=1234) /
2512                  Raw(b'\xa5' * 100))
2513         p_src = (Ether(src=self.pg0.remote_mac,
2514                        dst=self.pg0.local_mac) /
2515                  IPv6(src="2::2", dst="1::2") /
2516                  inet6.TCP(sport=1234, dport=1234) /
2517                  Raw(b'\xa5' * 100))
2518         pkts_dst = p_dst * 257
2519         pkts_src = p_src * 257
2520
2521         self.send_and_assert_no_replies(self.pg0, pkts_dst,
2522                                         "IP in dst table")
2523         self.send_and_assert_no_replies(self.pg0, pkts_src,
2524                                         "IP in src table")
2525
2526         #
2527         # add a route in the dst table to forward via pg1
2528         #
2529         route_in_dst = VppIpRoute(self, "1::1", 128,
2530                                   [VppRoutePath(self.pg1.remote_ip6,
2531                                                 self.pg1.sw_if_index)],
2532                                   table_id=1)
2533         route_in_dst.add_vpp_config()
2534
2535         self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2536
2537         #
2538         # add a route in the src table to forward via pg2
2539         #
2540         route_in_src = VppIpRoute(self, "2::2", 128,
2541                                   [VppRoutePath(self.pg2.remote_ip6,
2542                                                 self.pg2.sw_if_index)],
2543                                   table_id=2)
2544         route_in_src.add_vpp_config()
2545         self.send_and_expect(self.pg0, pkts_src, self.pg2)
2546
2547         #
2548         # loop in the lookup DP
2549         #
2550         route_loop = VppIpRoute(self, "3::3", 128,
2551                                 [VppRoutePath("::",
2552                                               0xffffffff)])
2553         route_loop.add_vpp_config()
2554
2555         p_l = (Ether(src=self.pg0.remote_mac,
2556                      dst=self.pg0.local_mac) /
2557                IPv6(src="3::4", dst="3::3") /
2558                inet6.TCP(sport=1234, dport=1234) /
2559                Raw(b'\xa5' * 100))
2560
2561         self.send_and_assert_no_replies(self.pg0, p_l * 257,
2562                                         "IP lookup loop")
2563
2564
2565 class TestIP6Input(VppTestCase):
2566     """ IPv6 Input Exception Test Cases """
2567
2568     @classmethod
2569     def setUpClass(cls):
2570         super(TestIP6Input, cls).setUpClass()
2571
2572     @classmethod
2573     def tearDownClass(cls):
2574         super(TestIP6Input, cls).tearDownClass()
2575
2576     def setUp(self):
2577         super(TestIP6Input, self).setUp()
2578
2579         self.create_pg_interfaces(range(2))
2580
2581         for i in self.pg_interfaces:
2582             i.admin_up()
2583             i.config_ip6()
2584             i.resolve_ndp()
2585
2586     def tearDown(self):
2587         super(TestIP6Input, self).tearDown()
2588         for i in self.pg_interfaces:
2589             i.unconfig_ip6()
2590             i.admin_down()
2591
2592     def test_ip_input_icmp_reply(self):
2593         """ IP6 Input Exception - Return ICMP (3,0) """
2594         #
2595         # hop limit - ICMP replies
2596         #
2597         p_version = (Ether(src=self.pg0.remote_mac,
2598                            dst=self.pg0.local_mac) /
2599                      IPv6(src=self.pg0.remote_ip6,
2600                           dst=self.pg1.remote_ip6,
2601                           hlim=1) /
2602                      inet6.UDP(sport=1234, dport=1234) /
2603                      Raw(b'\xa5' * 100))
2604
2605         rxs = self.send_and_expect_some(self.pg0,
2606                                         p_version * NUM_PKTS,
2607                                         self.pg0)
2608
2609         for rx in rxs:
2610             icmp = rx[ICMPv6TimeExceeded]
2611             # 0: "hop limit exceeded in transit",
2612             self.assertEqual((icmp.type, icmp.code), (3, 0))
2613
2614     icmpv6_data = '\x0a' * 18
2615     all_0s = "::"
2616     all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
2617
2618     @parameterized.expand([
2619         # Name, src, dst, l4proto, msg, timeout
2620         ("src='iface',   dst='iface'", None, None,
2621          inet6.UDP(sport=1234, dport=1234), "funky version", None),
2622         ("src='All 0's', dst='iface'", all_0s, None,
2623          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2624         ("src='iface',   dst='All 0's'", None, all_0s,
2625          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2626         ("src='All 1's', dst='iface'", all_1s, None,
2627          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2628         ("src='iface',   dst='All 1's'", None, all_1s,
2629          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2630         ("src='All 1's', dst='All 1's'", all_1s, all_1s,
2631          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2632
2633     ])
2634     def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
2635
2636         self._testMethodDoc = 'IPv6 Input Exception - %s' % name
2637
2638         p_version = (Ether(src=self.pg0.remote_mac,
2639                            dst=self.pg0.local_mac) /
2640                      IPv6(src=src or self.pg0.remote_ip6,
2641                           dst=dst or self.pg1.remote_ip6,
2642                           version=3) /
2643                      l4 /
2644                      Raw(b'\xa5' * 100))
2645
2646         self.send_and_assert_no_replies(self.pg0, p_version * NUM_PKTS,
2647                                         remark=msg or "",
2648                                         timeout=timeout)
2649
2650     def test_hop_by_hop(self):
2651         """ Hop-by-hop header test """
2652
2653         p = (Ether(src=self.pg0.remote_mac,
2654                    dst=self.pg0.local_mac) /
2655              IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
2656              IPv6ExtHdrHopByHop() /
2657              inet6.UDP(sport=1234, dport=1234) /
2658              Raw(b'\xa5' * 100))
2659
2660         self.pg0.add_stream(p)
2661         self.pg_enable_capture(self.pg_interfaces)
2662         self.pg_start()
2663
2664
2665 class TestIP6Replace(VppTestCase):
2666     """ IPv6 Table Replace """
2667
2668     @classmethod
2669     def setUpClass(cls):
2670         super(TestIP6Replace, cls).setUpClass()
2671
2672     @classmethod
2673     def tearDownClass(cls):
2674         super(TestIP6Replace, cls).tearDownClass()
2675
2676     def setUp(self):
2677         super(TestIP6Replace, self).setUp()
2678
2679         self.create_pg_interfaces(range(4))
2680
2681         table_id = 1
2682         self.tables = []
2683
2684         for i in self.pg_interfaces:
2685             i.admin_up()
2686             i.config_ip6()
2687             i.generate_remote_hosts(2)
2688             self.tables.append(VppIpTable(self, table_id,
2689                                           True).add_vpp_config())
2690             table_id += 1
2691
2692     def tearDown(self):
2693         super(TestIP6Replace, self).tearDown()
2694         for i in self.pg_interfaces:
2695             i.admin_down()
2696             i.unconfig_ip6()
2697
2698     def test_replace(self):
2699         """ IP Table Replace """
2700
2701         MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
2702         MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
2703         N_ROUTES = 20
2704         links = [self.pg0, self.pg1, self.pg2, self.pg3]
2705         routes = [[], [], [], []]
2706
2707         # the sizes of 'empty' tables
2708         for t in self.tables:
2709             self.assertEqual(len(t.dump()), 2)
2710             self.assertEqual(len(t.mdump()), 5)
2711
2712         # load up the tables with some routes
2713         for ii, t in enumerate(self.tables):
2714             for jj in range(1, N_ROUTES):
2715                 uni = VppIpRoute(
2716                     self, "2001::%d" % jj if jj != 0 else "2001::", 128,
2717                     [VppRoutePath(links[ii].remote_hosts[0].ip6,
2718                                   links[ii].sw_if_index),
2719                      VppRoutePath(links[ii].remote_hosts[1].ip6,
2720                                   links[ii].sw_if_index)],
2721                     table_id=t.table_id).add_vpp_config()
2722                 multi = VppIpMRoute(
2723                     self, "::",
2724                     "ff:2001::%d" % jj, 128,
2725                     MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
2726                     [VppMRoutePath(self.pg0.sw_if_index,
2727                                    MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT,
2728                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
2729                      VppMRoutePath(self.pg1.sw_if_index,
2730                                    MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
2731                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
2732                      VppMRoutePath(self.pg2.sw_if_index,
2733                                    MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
2734                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
2735                      VppMRoutePath(self.pg3.sw_if_index,
2736                                    MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
2737                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)],
2738                     table_id=t.table_id).add_vpp_config()
2739                 routes[ii].append({'uni': uni,
2740                                    'multi': multi})
2741
2742         #
2743         # replace the tables a few times
2744         #
2745         for kk in range(3):
2746             # replace each table
2747             for t in self.tables:
2748                 t.replace_begin()
2749
2750             # all the routes are still there
2751             for ii, t in enumerate(self.tables):
2752                 dump = t.dump()
2753                 mdump = t.mdump()
2754                 for r in routes[ii]:
2755                     self.assertTrue(find_route_in_dump(dump, r['uni'], t))
2756                     self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
2757
2758             # redownload the even numbered routes
2759             for ii, t in enumerate(self.tables):
2760                 for jj in range(0, N_ROUTES, 2):
2761                     routes[ii][jj]['uni'].add_vpp_config()
2762                     routes[ii][jj]['multi'].add_vpp_config()
2763
2764             # signal each table converged
2765             for t in self.tables:
2766                 t.replace_end()
2767
2768             # we should find the even routes, but not the odd
2769             for ii, t in enumerate(self.tables):
2770                 dump = t.dump()
2771                 mdump = t.mdump()
2772                 for jj in range(0, N_ROUTES, 2):
2773                     self.assertTrue(find_route_in_dump(
2774                         dump, routes[ii][jj]['uni'], t))
2775                     self.assertTrue(find_mroute_in_dump(
2776                         mdump, routes[ii][jj]['multi'], t))
2777                 for jj in range(1, N_ROUTES - 1, 2):
2778                     self.assertFalse(find_route_in_dump(
2779                         dump, routes[ii][jj]['uni'], t))
2780                     self.assertFalse(find_mroute_in_dump(
2781                         mdump, routes[ii][jj]['multi'], t))
2782
2783             # reload all the routes
2784             for ii, t in enumerate(self.tables):
2785                 for r in routes[ii]:
2786                     r['uni'].add_vpp_config()
2787                     r['multi'].add_vpp_config()
2788
2789             # all the routes are still there
2790             for ii, t in enumerate(self.tables):
2791                 dump = t.dump()
2792                 mdump = t.mdump()
2793                 for r in routes[ii]:
2794                     self.assertTrue(find_route_in_dump(dump, r['uni'], t))
2795                     self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
2796
2797         #
2798         # finally flush the tables for good measure
2799         #
2800         for t in self.tables:
2801             t.flush()
2802             self.assertEqual(len(t.dump()), 2)
2803             self.assertEqual(len(t.mdump()), 5)
2804
2805
2806 class TestIP6AddrReplace(VppTestCase):
2807     """ IPv6 Interface Address Replace """
2808
2809     @classmethod
2810     def setUpClass(cls):
2811         super(TestIP6AddrReplace, cls).setUpClass()
2812
2813     @classmethod
2814     def tearDownClass(cls):
2815         super(TestIP6AddrReplace, cls).tearDownClass()
2816
2817     def setUp(self):
2818         super(TestIP6AddrReplace, self).setUp()
2819
2820         self.create_pg_interfaces(range(4))
2821
2822         for i in self.pg_interfaces:
2823             i.admin_up()
2824
2825     def tearDown(self):
2826         super(TestIP6AddrReplace, self).tearDown()
2827         for i in self.pg_interfaces:
2828             i.admin_down()
2829
2830     def get_n_pfxs(self, intf):
2831         return len(self.vapi.ip_address_dump(intf.sw_if_index, True))
2832
2833     def test_replace(self):
2834         """ IP interface address replace """
2835
2836         intf_pfxs = [[], [], [], []]
2837
2838         # add prefixes to each of the interfaces
2839         for i in range(len(self.pg_interfaces)):
2840             intf = self.pg_interfaces[i]
2841
2842             # 2001:16:x::1/64
2843             addr = "2001:16:%d::1" % intf.sw_if_index
2844             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
2845             intf_pfxs[i].append(a)
2846
2847             # 2001:16:x::2/64 - a different address in the same subnet as above
2848             addr = "2001:16:%d::2" % intf.sw_if_index
2849             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
2850             intf_pfxs[i].append(a)
2851
2852             # 2001:15:x::2/64 - a different address and subnet
2853             addr = "2001:15:%d::2" % intf.sw_if_index
2854             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
2855             intf_pfxs[i].append(a)
2856
2857         # a dump should n_address in it
2858         for intf in self.pg_interfaces:
2859             self.assertEqual(self.get_n_pfxs(intf), 3)
2860
2861         #
2862         # remove all the address thru a replace
2863         #
2864         self.vapi.sw_interface_address_replace_begin()
2865         self.vapi.sw_interface_address_replace_end()
2866         for intf in self.pg_interfaces:
2867             self.assertEqual(self.get_n_pfxs(intf), 0)
2868
2869         #
2870         # add all the interface addresses back
2871         #
2872         for p in intf_pfxs:
2873             for v in p:
2874                 v.add_vpp_config()
2875         for intf in self.pg_interfaces:
2876             self.assertEqual(self.get_n_pfxs(intf), 3)
2877
2878         #
2879         # replace again, but this time update/re-add the address on the first
2880         # two interfaces
2881         #
2882         self.vapi.sw_interface_address_replace_begin()
2883
2884         for p in intf_pfxs[:2]:
2885             for v in p:
2886                 v.add_vpp_config()
2887
2888         self.vapi.sw_interface_address_replace_end()
2889
2890         # on the first two the address still exist,
2891         # on the other two they do not
2892         for intf in self.pg_interfaces[:2]:
2893             self.assertEqual(self.get_n_pfxs(intf), 3)
2894         for p in intf_pfxs[:2]:
2895             for v in p:
2896                 self.assertTrue(v.query_vpp_config())
2897         for intf in self.pg_interfaces[2:]:
2898             self.assertEqual(self.get_n_pfxs(intf), 0)
2899
2900         #
2901         # add all the interface addresses back on the last two
2902         #
2903         for p in intf_pfxs[2:]:
2904             for v in p:
2905                 v.add_vpp_config()
2906         for intf in self.pg_interfaces:
2907             self.assertEqual(self.get_n_pfxs(intf), 3)
2908
2909         #
2910         # replace again, this time add different prefixes on all the interfaces
2911         #
2912         self.vapi.sw_interface_address_replace_begin()
2913
2914         pfxs = []
2915         for intf in self.pg_interfaces:
2916             # 2001:18:x::1/64
2917             addr = "2001:18:%d::1" % intf.sw_if_index
2918             pfxs.append(VppIpInterfaceAddress(self, intf, addr,
2919                                               64).add_vpp_config())
2920
2921         self.vapi.sw_interface_address_replace_end()
2922
2923         # only .18 should exist on each interface
2924         for intf in self.pg_interfaces:
2925             self.assertEqual(self.get_n_pfxs(intf), 1)
2926         for pfx in pfxs:
2927             self.assertTrue(pfx.query_vpp_config())
2928
2929         #
2930         # remove everything
2931         #
2932         self.vapi.sw_interface_address_replace_begin()
2933         self.vapi.sw_interface_address_replace_end()
2934         for intf in self.pg_interfaces:
2935             self.assertEqual(self.get_n_pfxs(intf), 0)
2936
2937         #
2938         # add prefixes to each interface. post-begin add the prefix from
2939         # interface X onto interface Y. this would normally be an error
2940         # since it would generate a 'duplicate address' warning. but in
2941         # this case, since what is newly downloaded is sane, it's ok
2942         #
2943         for intf in self.pg_interfaces:
2944             # 2001:18:x::1/64
2945             addr = "2001:18:%d::1" % intf.sw_if_index
2946             VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
2947
2948         self.vapi.sw_interface_address_replace_begin()
2949
2950         pfxs = []
2951         for intf in self.pg_interfaces:
2952             # 2001:18:x::1/64
2953             addr = "2001:18:%d::1" % (intf.sw_if_index + 1)
2954             pfxs.append(VppIpInterfaceAddress(self, intf,
2955                                               addr, 64).add_vpp_config())
2956
2957         self.vapi.sw_interface_address_replace_end()
2958
2959         self.logger.info(self.vapi.cli("sh int addr"))
2960
2961         for intf in self.pg_interfaces:
2962             self.assertEqual(self.get_n_pfxs(intf), 1)
2963         for pfx in pfxs:
2964             self.assertTrue(pfx.query_vpp_config())
2965
2966
2967 class TestIP6LinkLocal(VppTestCase):
2968     """ IPv6 Link Local """
2969
2970     @classmethod
2971     def setUpClass(cls):
2972         super(TestIP6LinkLocal, cls).setUpClass()
2973
2974     @classmethod
2975     def tearDownClass(cls):
2976         super(TestIP6LinkLocal, cls).tearDownClass()
2977
2978     def setUp(self):
2979         super(TestIP6LinkLocal, self).setUp()
2980
2981         self.create_pg_interfaces(range(2))
2982
2983         for i in self.pg_interfaces:
2984             i.admin_up()
2985
2986     def tearDown(self):
2987         super(TestIP6LinkLocal, self).tearDown()
2988         for i in self.pg_interfaces:
2989             i.admin_down()
2990
2991     def test_ip6_ll(self):
2992         """ IPv6 Link Local """
2993
2994         #
2995         # two APIs to add a link local address.
2996         #   1 - just like any other prefix
2997         #   2 - with the special set LL API
2998         #
2999
3000         #
3001         # First with the API to set a 'normal' prefix
3002         #
3003         ll1 = "fe80:1::1"
3004         ll2 = "fe80:2::2"
3005         ll3 = "fe80:3::3"
3006
3007         VppNeighbor(self,
3008                     self.pg0.sw_if_index,
3009                     self.pg0.remote_mac,
3010                     ll2).add_vpp_config()
3011
3012         VppIpInterfaceAddress(self, self.pg0, ll1, 128).add_vpp_config()
3013
3014         #
3015         # should be able to ping the ll
3016         #
3017         p_echo_request_1 = (Ether(src=self.pg0.remote_mac,
3018                                   dst=self.pg0.local_mac) /
3019                             IPv6(src=ll2,
3020                                  dst=ll1) /
3021                             ICMPv6EchoRequest())
3022
3023         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3024
3025         #
3026         # change the link-local on pg0
3027         #
3028         v_ll3 = VppIpInterfaceAddress(self, self.pg0,
3029                                       ll3, 128).add_vpp_config()
3030
3031         p_echo_request_3 = (Ether(src=self.pg0.remote_mac,
3032                                   dst=self.pg0.local_mac) /
3033                             IPv6(src=ll2,
3034                                  dst=ll3) /
3035                             ICMPv6EchoRequest())
3036
3037         self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3038
3039         #
3040         # set a normal v6 prefix on the link
3041         #
3042         self.pg0.config_ip6()
3043
3044         self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3045
3046         # the link-local cannot be removed
3047         with self.vapi.assert_negative_api_retval():
3048             v_ll3.remove_vpp_config()
3049
3050         #
3051         # Use the specific link-local API on pg1
3052         #
3053         VppIp6LinkLocalAddress(self, self.pg1, ll1).add_vpp_config()
3054         self.send_and_expect(self.pg1, [p_echo_request_1], self.pg1)
3055
3056         VppIp6LinkLocalAddress(self, self.pg1, ll3).add_vpp_config()
3057         self.send_and_expect(self.pg1, [p_echo_request_3], self.pg1)
3058
3059     def test_ip6_ll_p2p(self):
3060         """ IPv6 Link Local P2P (GRE)"""
3061
3062         self.pg0.config_ip4()
3063         self.pg0.resolve_arp()
3064         gre_if = VppGreInterface(self,
3065                                  self.pg0.local_ip4,
3066                                  self.pg0.remote_ip4).add_vpp_config()
3067         gre_if.admin_up()
3068
3069         ll1 = "fe80:1::1"
3070         ll2 = "fe80:2::2"
3071
3072         VppIpInterfaceAddress(self, gre_if, ll1, 128).add_vpp_config()
3073
3074         self.logger.info(self.vapi.cli("sh ip6-ll gre0 fe80:2::2"))
3075
3076         p_echo_request_1 = (Ether(src=self.pg0.remote_mac,
3077                                   dst=self.pg0.local_mac) /
3078                             IP(src=self.pg0.remote_ip4,
3079                                dst=self.pg0.local_ip4) /
3080                             GRE() /
3081                             IPv6(src=ll2, dst=ll1) /
3082                             ICMPv6EchoRequest())
3083         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3084
3085         self.pg0.unconfig_ip4()
3086         gre_if.remove_vpp_config()
3087
3088     def test_ip6_ll_p2mp(self):
3089         """ IPv6 Link Local P2MP (GRE)"""
3090
3091         self.pg0.config_ip4()
3092         self.pg0.resolve_arp()
3093
3094         gre_if = VppGreInterface(
3095             self,
3096             self.pg0.local_ip4,
3097             "0.0.0.0",
3098             mode=(VppEnum.vl_api_tunnel_mode_t.
3099                   TUNNEL_API_MODE_MP)).add_vpp_config()
3100         gre_if.admin_up()
3101
3102         ll1 = "fe80:1::1"
3103         ll2 = "fe80:2::2"
3104
3105         VppIpInterfaceAddress(self, gre_if, ll1, 128).add_vpp_config()
3106
3107         p_echo_request_1 = (Ether(src=self.pg0.remote_mac,
3108                                   dst=self.pg0.local_mac) /
3109                             IP(src=self.pg0.remote_ip4,
3110                                dst=self.pg0.local_ip4) /
3111                             GRE() /
3112                             IPv6(src=ll2, dst=ll1) /
3113                             ICMPv6EchoRequest())
3114
3115         # no route back at this point
3116         self.send_and_assert_no_replies(self.pg0, [p_echo_request_1])
3117
3118         # add teib entry for the peer
3119         teib = VppTeib(self, gre_if, ll2, self.pg0.remote_ip4)
3120         teib.add_vpp_config()
3121
3122         self.logger.info(self.vapi.cli("sh ip6-ll gre0 %s" % ll2))
3123         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3124
3125         # teardown
3126         self.pg0.unconfig_ip4()
3127
3128
3129 class TestIPv6PathMTU(VppTestCase):
3130     """ IPv6 Path MTU """
3131
3132     def setUp(self):
3133         super(TestIPv6PathMTU, self).setUp()
3134
3135         self.create_pg_interfaces(range(2))
3136
3137         # setup all interfaces
3138         for i in self.pg_interfaces:
3139             i.admin_up()
3140             i.config_ip6()
3141             i.resolve_ndp()
3142
3143     def tearDown(self):
3144         super(TestIPv6PathMTU, self).tearDown()
3145         for i in self.pg_interfaces:
3146             i.unconfig_ip6()
3147             i.admin_down()
3148
3149     def test_path_mtu_local(self):
3150         """ Path MTU for attached neighbour """
3151
3152         self.vapi.cli("set log class ip level debug")
3153         #
3154         # The goal here is not test that fragmentation works correctly,
3155         # that's done elsewhere, the intent is to ensure that the Path MTU
3156         # settings are honoured.
3157         #
3158
3159         #
3160         # IPv6 will only frag locally generated packets, so use tunnelled
3161         # packets post encap
3162         #
3163         tun = VppIpIpTunInterface(
3164             self,
3165             self.pg1,
3166             self.pg1.local_ip6,
3167             self.pg1.remote_ip6)
3168         tun.add_vpp_config()
3169         tun.admin_up()
3170         tun.config_ip6()
3171
3172         # set the interface MTU to a reasonable value
3173         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
3174                                        [2800, 0, 0, 0])
3175
3176         p_6k = (Ether(dst=self.pg0.local_mac,
3177                       src=self.pg0.remote_mac) /
3178                 IPv6(src=self.pg0.remote_ip6,
3179                      dst=tun.remote_ip6) /
3180                 UDP(sport=1234, dport=5678) /
3181                 Raw(b'0xa' * 2000))
3182         p_2k = (Ether(dst=self.pg0.local_mac,
3183                       src=self.pg0.remote_mac) /
3184                 IPv6(src=self.pg0.remote_ip6,
3185                      dst=tun.remote_ip6) /
3186                 UDP(sport=1234, dport=5678) /
3187                 Raw(b'0xa' * 1000))
3188         p_1k = (Ether(dst=self.pg0.local_mac,
3189                       src=self.pg0.remote_mac) /
3190                 IPv6(src=self.pg0.remote_ip6,
3191                      dst=tun.remote_ip6) /
3192                 UDP(sport=1234, dport=5678) /
3193                 Raw(b'0xa' * 600))
3194
3195         nbr = VppNeighbor(self,
3196                           self.pg1.sw_if_index,
3197                           self.pg1.remote_mac,
3198                           self.pg1.remote_ip6).add_vpp_config()
3199
3200         # this is now the interface MTU frags
3201         self.send_and_expect(self.pg0, [p_6k], self.pg1, n_rx=4)
3202         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3203         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3204
3205         # drop the path MTU for this neighbour to below the interface MTU
3206         # expect more frags
3207         pmtu = VppIpPathMtu(self, self.pg1.remote_ip6, 1300).add_vpp_config()
3208
3209         # print/format the adj delegate and trackers
3210         self.logger.info(self.vapi.cli("sh ip pmtu"))
3211         self.logger.info(self.vapi.cli("sh adj 7"))
3212
3213         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3214         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3215
3216         # increase the path MTU to more than the interface
3217         # expect to use the interface MTU
3218         pmtu.modify(8192)
3219
3220         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3221         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3222
3223         # go back to an MTU from the path
3224         pmtu.modify(1300)
3225
3226         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3227         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3228
3229         # raise the interface's MTU
3230         # should still use that of the path
3231         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
3232                                        [2000, 0, 0, 0])
3233         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3234         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3235
3236         # set path high and interface low
3237         pmtu.modify(2000)
3238         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
3239                                        [1300, 0, 0, 0])
3240         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3241         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3242
3243         # remove the path MTU
3244         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
3245                                        [2800, 0, 0, 0])
3246         pmtu.modify(0)
3247
3248         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3249         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3250
3251     def test_path_mtu_remote(self):
3252         """ Path MTU for remote neighbour """
3253
3254         self.vapi.cli("set log class ip level debug")
3255         #
3256         # The goal here is not test that fragmentation works correctly,
3257         # that's done elsewhere, the intent is to ensure that the Path MTU
3258         # settings are honoured.
3259         #
3260         tun_dst = "2001::1"
3261
3262         route = VppIpRoute(
3263             self, tun_dst, 64,
3264             [VppRoutePath(self.pg1.remote_ip6,
3265                           self.pg1.sw_if_index)]).add_vpp_config()
3266
3267         #
3268         # IPv6 will only frag locally generated packets, so use tunnelled
3269         # packets post encap
3270         #
3271         tun = VppIpIpTunInterface(
3272             self,
3273             self.pg1,
3274             self.pg1.local_ip6,
3275             tun_dst)
3276         tun.add_vpp_config()
3277         tun.admin_up()
3278         tun.config_ip6()
3279
3280         # set the interface MTU to a reasonable value
3281         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
3282                                        [2800, 0, 0, 0])
3283
3284         p_2k = (Ether(dst=self.pg0.local_mac,
3285                       src=self.pg0.remote_mac) /
3286                 IPv6(src=self.pg0.remote_ip6,
3287                      dst=tun.remote_ip6) /
3288                 UDP(sport=1234, dport=5678) /
3289                 Raw(b'0xa' * 1000))
3290         p_1k = (Ether(dst=self.pg0.local_mac,
3291                       src=self.pg0.remote_mac) /
3292                 IPv6(src=self.pg0.remote_ip6,
3293                      dst=tun.remote_ip6) /
3294                 UDP(sport=1234, dport=5678) /
3295                 Raw(b'0xa' * 600))
3296
3297         nbr = VppNeighbor(self,
3298                           self.pg1.sw_if_index,
3299                           self.pg1.remote_mac,
3300                           self.pg1.remote_ip6).add_vpp_config()
3301
3302         # this is now the interface MTU frags
3303         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3304         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3305
3306         # drop the path MTU for this neighbour to below the interface MTU
3307         # expect more frags
3308         pmtu = VppIpPathMtu(self, tun_dst, 1300).add_vpp_config()
3309
3310         # print/format the fib entry/dpo
3311         self.logger.info(self.vapi.cli("sh ip6 fib 2001::1"))
3312
3313         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3314         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3315
3316         # increase the path MTU to more than the interface
3317         # expect to use the interface MTU
3318         pmtu.modify(8192)
3319
3320         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3321         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3322
3323         # go back to an MTU from the path
3324         pmtu.modify(1300)
3325
3326         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3327         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3328
3329         # raise the interface's MTU
3330         # should still use that of the path
3331         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
3332                                        [2000, 0, 0, 0])
3333         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3334         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3335
3336         # turn the tun_dst into an attached neighbour
3337         route.modify([VppRoutePath("::",
3338                                    self.pg1.sw_if_index)])
3339         nbr2 = VppNeighbor(self,
3340                            self.pg1.sw_if_index,
3341                            self.pg1.remote_mac,
3342                            tun_dst).add_vpp_config()
3343
3344         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3345         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3346
3347         # add back to not attached
3348         nbr2.remove_vpp_config()
3349         route.modify([VppRoutePath(self.pg1.remote_ip6,
3350                                    self.pg1.sw_if_index)])
3351
3352         # set path high and interface low
3353         pmtu.modify(2000)
3354         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
3355                                        [1300, 0, 0, 0])
3356         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3357         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3358
3359         # remove the path MTU
3360         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
3361                                        [2800, 0, 0, 0])
3362         pmtu.remove_vpp_config()
3363         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3364         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3365
3366
3367 class TestIPFibSource(VppTestCase):
3368     """ IPv6 Table FibSource """
3369
3370     @classmethod
3371     def setUpClass(cls):
3372         super(TestIPFibSource, cls).setUpClass()
3373
3374     @classmethod
3375     def tearDownClass(cls):
3376         super(TestIPFibSource, cls).tearDownClass()
3377
3378     def setUp(self):
3379         super(TestIPFibSource, self).setUp()
3380
3381         self.create_pg_interfaces(range(2))
3382
3383         for i in self.pg_interfaces:
3384             i.admin_up()
3385             i.config_ip6()
3386             i.resolve_arp()
3387             i.generate_remote_hosts(2)
3388             i.configure_ipv6_neighbors()
3389
3390     def tearDown(self):
3391         super(TestIPFibSource, self).tearDown()
3392         for i in self.pg_interfaces:
3393             i.admin_down()
3394             i.unconfig_ip4()
3395
3396     def test_fib_source(self):
3397         """ IP Table FibSource """
3398
3399         routes = self.vapi.ip_route_v2_dump(0, True)
3400
3401         # 2 interfaces (4 routes) + 2 specials + 4 neighbours = 10 routes
3402         self.assertEqual(len(routes), 10)
3403
3404         # dump all the sources in the FIB
3405         sources = self.vapi.fib_source_dump()
3406         for source in sources:
3407             if (source.src.name == "API"):
3408                 api_source = source.src
3409             if (source.src.name == "interface"):
3410                 intf_source = source.src
3411             if (source.src.name == "adjacency"):
3412                 adj_source = source.src
3413             if (source.src.name == "special"):
3414                 special_source = source.src
3415             if (source.src.name == "default-route"):
3416                 dr_source = source.src
3417
3418         # dump the individual route types
3419         routes = self.vapi.ip_route_v2_dump(0, True, src=adj_source.id)
3420         self.assertEqual(len(routes), 4)
3421         routes = self.vapi.ip_route_v2_dump(0, True, src=intf_source.id)
3422         self.assertEqual(len(routes), 4)
3423         routes = self.vapi.ip_route_v2_dump(0, True, src=special_source.id)
3424         self.assertEqual(len(routes), 1)
3425         routes = self.vapi.ip_route_v2_dump(0, True, src=dr_source.id)
3426         self.assertEqual(len(routes), 1)
3427
3428         # add a new soure that'a better than the API
3429         self.vapi.fib_source_add(src={'name': "bgp",
3430                                       "priority": api_source.priority - 1})
3431
3432         # dump all the sources to check our new one is there
3433         sources = self.vapi.fib_source_dump()
3434
3435         for source in sources:
3436             if (source.src.name == "bgp"):
3437                 bgp_source = source.src
3438
3439         self.assertTrue(bgp_source)
3440         self.assertEqual(bgp_source.priority,
3441                          api_source.priority - 1)
3442
3443         # add a route with the default API source
3444         r1 = VppIpRouteV2(
3445             self, "2001::1", 128,
3446             [VppRoutePath(self.pg0.remote_ip6,
3447                           self.pg0.sw_if_index)]).add_vpp_config()
3448
3449         r2 = VppIpRouteV2(self, "2001::1", 128,
3450                           [VppRoutePath(self.pg1.remote_ip6,
3451                                         self.pg1.sw_if_index)],
3452                           src=bgp_source.id).add_vpp_config()
3453
3454         # ensure the BGP source takes priority
3455         p = (Ether(src=self.pg0.remote_mac,
3456                    dst=self.pg0.local_mac) /
3457              IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
3458              inet6.UDP(sport=1234, dport=1234) /
3459              Raw(b'\xa5' * 100))
3460
3461         self.send_and_expect(self.pg0, [p], self.pg1)
3462
3463         r2.remove_vpp_config()
3464         r1.remove_vpp_config()
3465
3466         self.assertFalse(find_route(self, "2001::1", 128))
3467
3468
3469 class TestIPxAF(VppTestCase):
3470     """ IP cross AF """
3471
3472     @classmethod
3473     def setUpClass(cls):
3474         super(TestIPxAF, cls).setUpClass()
3475
3476     @classmethod
3477     def tearDownClass(cls):
3478         super(TestIPxAF, cls).tearDownClass()
3479
3480     def setUp(self):
3481         super(TestIPxAF, self).setUp()
3482
3483         self.create_pg_interfaces(range(2))
3484
3485         for i in self.pg_interfaces:
3486             i.admin_up()
3487             i.config_ip6()
3488             i.config_ip4()
3489             i.resolve_arp()
3490             i.resolve_ndp()
3491
3492     def tearDown(self):
3493         super(TestIPxAF, self).tearDown()
3494         for i in self.pg_interfaces:
3495             i.admin_down()
3496             i.unconfig_ip4()
3497             i.unconfig_ip6()
3498
3499     def test_x_af(self):
3500         """ Cross AF routing """
3501
3502         N_PKTS = 63
3503         # a v4 route via a v6 attached next-hop
3504         VppIpRoute(
3505             self, "1.1.1.1", 32,
3506             [VppRoutePath(self.pg1.remote_ip6,
3507                           self.pg1.sw_if_index)]).add_vpp_config()
3508
3509         p = (Ether(src=self.pg0.remote_mac,
3510                    dst=self.pg0.local_mac) /
3511              IP(src=self.pg0.remote_ip4, dst="1.1.1.1") /
3512              UDP(sport=1234, dport=1234) /
3513              Raw(b'\xa5' * 100))
3514         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3515
3516         for rx in rxs:
3517             self.assertEqual(rx[IP].dst, "1.1.1.1")
3518
3519         # a v6 route via a v4 attached next-hop
3520         VppIpRoute(
3521             self, "2001::1", 128,
3522             [VppRoutePath(self.pg1.remote_ip4,
3523                           self.pg1.sw_if_index)]).add_vpp_config()
3524
3525         p = (Ether(src=self.pg0.remote_mac,
3526                    dst=self.pg0.local_mac) /
3527              IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
3528              UDP(sport=1234, dport=1234) /
3529              Raw(b'\xa5' * 100))
3530         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3531
3532         for rx in rxs:
3533             self.assertEqual(rx[IPv6].dst, "2001::1")
3534
3535         # a recursive v4 route via a v6 next-hop (from above)
3536         VppIpRoute(
3537             self, "2.2.2.2", 32,
3538             [VppRoutePath("2001::1",
3539                           0xffffffff)]).add_vpp_config()
3540
3541         p = (Ether(src=self.pg0.remote_mac,
3542                    dst=self.pg0.local_mac) /
3543              IP(src=self.pg0.remote_ip4, dst="2.2.2.2") /
3544              UDP(sport=1234, dport=1234) /
3545              Raw(b'\xa5' * 100))
3546         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3547
3548         # a recursive v4 route via a v6 next-hop
3549         VppIpRoute(
3550             self, "2.2.2.3", 32,
3551             [VppRoutePath(self.pg1.remote_ip6,
3552                           0xffffffff)]).add_vpp_config()
3553
3554         p = (Ether(src=self.pg0.remote_mac,
3555                    dst=self.pg0.local_mac) /
3556              IP(src=self.pg0.remote_ip4, dst="2.2.2.3") /
3557              UDP(sport=1234, dport=1234) /
3558              Raw(b'\xa5' * 100))
3559         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3560
3561         # a recursive v6 route via a v4 next-hop
3562         VppIpRoute(
3563             self, "3001::1", 128,
3564             [VppRoutePath(self.pg1.remote_ip4,
3565                           0xffffffff)]).add_vpp_config()
3566
3567         p = (Ether(src=self.pg0.remote_mac,
3568                    dst=self.pg0.local_mac) /
3569              IPv6(src=self.pg0.remote_ip6, dst="3001::1") /
3570              UDP(sport=1234, dport=1234) /
3571              Raw(b'\xa5' * 100))
3572         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3573
3574         for rx in rxs:
3575             self.assertEqual(rx[IPv6].dst, "3001::1")
3576
3577         VppIpRoute(
3578             self, "3001::2", 128,
3579             [VppRoutePath("1.1.1.1",
3580                           0xffffffff)]).add_vpp_config()
3581
3582         p = (Ether(src=self.pg0.remote_mac,
3583                    dst=self.pg0.local_mac) /
3584              IPv6(src=self.pg0.remote_ip6, dst="3001::2") /
3585              UDP(sport=1234, dport=1234) /
3586              Raw(b'\xa5' * 100))
3587         rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
3588
3589         for rx in rxs:
3590             self.assertEqual(rx[IPv6].dst, "3001::2")
3591
3592
3593 class TestIPv6Punt(VppTestCase):
3594     """ IPv6 Punt Police/Redirect """
3595
3596     def setUp(self):
3597         super(TestIPv6Punt, self).setUp()
3598         self.create_pg_interfaces(range(4))
3599
3600         for i in self.pg_interfaces:
3601             i.admin_up()
3602             i.config_ip6()
3603             i.resolve_ndp()
3604
3605     def tearDown(self):
3606         super(TestIPv6Punt, self).tearDown()
3607         for i in self.pg_interfaces:
3608             i.unconfig_ip6()
3609             i.admin_down()
3610
3611     def test_ip6_punt(self):
3612         """ IPv6 punt police and redirect """
3613
3614         # use UDP packet that have a port we need to explicitly
3615         # register to get punted.
3616         pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
3617         af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6
3618         udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
3619         punt_udp = {
3620             'type': pt_l4,
3621             'punt': {
3622                 'l4': {
3623                     'af': af_ip6,
3624                     'protocol': udp_proto,
3625                     'port': 7654,
3626                 }
3627             }
3628         }
3629
3630         self.vapi.set_punt(is_add=1, punt=punt_udp)
3631
3632         pkts = (Ether(src=self.pg0.remote_mac,
3633                       dst=self.pg0.local_mac) /
3634                 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
3635                 UDP(sport=1234, dport=7654) /
3636                 Raw(b'\xa5' * 100)) * 1025
3637
3638         #
3639         # Configure a punt redirect via pg1.
3640         #
3641         nh_addr = self.pg1.remote_ip6
3642         ip_punt_redirect = VppIpPuntRedirect(self, self.pg0.sw_if_index,
3643                                              self.pg1.sw_if_index, nh_addr)
3644         ip_punt_redirect.add_vpp_config()
3645
3646         self.send_and_expect(self.pg0, pkts, self.pg1)
3647
3648         #
3649         # add a policer
3650         #
3651         policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
3652         policer.add_vpp_config()
3653         ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index,
3654                                            is_ip6=True)
3655         ip_punt_policer.add_vpp_config()
3656
3657         self.vapi.cli("clear trace")
3658         self.pg0.add_stream(pkts)
3659         self.pg_enable_capture(self.pg_interfaces)
3660         self.pg_start()
3661
3662         #
3663         # the number of packet received should be greater than 0,
3664         # but not equal to the number sent, since some were policed
3665         #
3666         rx = self.pg1._get_capture(1)
3667
3668         stats = policer.get_stats()
3669
3670         # Single rate policer - expect conform, violate but no exceed
3671         self.assertGreater(stats['conform_packets'], 0)
3672         self.assertEqual(stats['exceed_packets'], 0)
3673         self.assertGreater(stats['violate_packets'], 0)
3674
3675         self.assertGreater(len(rx), 0)
3676         self.assertLess(len(rx), len(pkts))
3677
3678         #
3679         # remove the policer. back to full rx
3680         #
3681         ip_punt_policer.remove_vpp_config()
3682         policer.remove_vpp_config()
3683         self.send_and_expect(self.pg0, pkts, self.pg1)
3684
3685         #
3686         # remove the redirect. expect full drop.
3687         #
3688         ip_punt_redirect.remove_vpp_config()
3689         self.send_and_assert_no_replies(self.pg0, pkts,
3690                                         "IP no punt config")
3691
3692         #
3693         # Add a redirect that is not input port selective
3694         #
3695         ip_punt_redirect = VppIpPuntRedirect(self, 0xffffffff,
3696                                              self.pg1.sw_if_index, nh_addr)
3697         ip_punt_redirect.add_vpp_config()
3698         self.send_and_expect(self.pg0, pkts, self.pg1)
3699         ip_punt_redirect.remove_vpp_config()
3700
3701     def test_ip6_punt_dump(self):
3702         """ IPv6 punt redirect dump"""
3703
3704         #
3705         # Configure a punt redirects
3706         #
3707         nh_address = self.pg3.remote_ip6
3708         ipr_03 = VppIpPuntRedirect(self, self.pg0.sw_if_index,
3709                                    self.pg3.sw_if_index, nh_address)
3710         ipr_13 = VppIpPuntRedirect(self, self.pg1.sw_if_index,
3711                                    self.pg3.sw_if_index, nh_address)
3712         ipr_23 = VppIpPuntRedirect(self, self.pg2.sw_if_index,
3713                                    self.pg3.sw_if_index, "::")
3714         ipr_03.add_vpp_config()
3715         ipr_13.add_vpp_config()
3716         ipr_23.add_vpp_config()
3717
3718         #
3719         # Dump pg0 punt redirects
3720         #
3721         self.assertTrue(ipr_03.query_vpp_config())
3722         self.assertTrue(ipr_13.query_vpp_config())
3723         self.assertTrue(ipr_23.query_vpp_config())
3724
3725         #
3726         # Dump punt redirects for all interfaces
3727         #
3728         punts = self.vapi.ip_punt_redirect_dump(sw_if_index=0xffffffff,
3729                                                 is_ipv6=True)
3730         self.assertEqual(len(punts), 3)
3731         for p in punts:
3732             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
3733         self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
3734         self.assertEqual(str(punts[2].punt.nh), '::')
3735
3736
3737 if __name__ == '__main__':
3738     unittest.main(testRunner=VppTestRunner)