fib: Allow the creation of new source on the API
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python3
2
3 import socket
4 from socket import inet_pton, inet_ntop
5 import unittest
6
7 from parameterized import parameterized
8 import scapy.compat
9 import scapy.layers.inet6 as inet6
10 from scapy.layers.inet import UDP
11 from scapy.contrib.mpls import MPLS
12 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
13     ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
14     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
15     ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, \
16     IPv6ExtHdrHopByHop, ICMPv6MLReport2, ICMPv6MLDMultAddrRec
17 from scapy.layers.l2 import Ether, Dot1Q
18 from scapy.packet import Raw
19 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
20     in6_mactoifaceid
21 from six import moves
22
23 from framework import VppTestCase, VppTestRunner, tag_run_solo
24 from util import ppp, ip6_normalize, mk_ll_addr
25 from vpp_papi import VppEnum
26 from vpp_ip import DpoProto, VppIpPuntPolicer, VppIpPuntRedirect, VppIpPathMtu
27 from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
28     VppMRoutePath, VppMplsIpBind, \
29     VppMplsRoute, VppMplsTable, VppIpTable, FibPathType, FibPathProto, \
30     VppIpInterfaceAddress, find_route_in_dump, find_mroute_in_dump, \
31     VppIp6LinkLocalAddress, VppIpRouteV2
32 from vpp_neighbor import find_nbr, VppNeighbor
33 from vpp_ipip_tun_interface import VppIpIpTunInterface
34 from vpp_pg_interface import is_ipv6_misc
35 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
36 from vpp_policer import VppPolicer, PolicerAction
37 from ipaddress import IPv6Network, IPv6Address
38
39 AF_INET6 = socket.AF_INET6
40
41 try:
42     text_type = unicode
43 except NameError:
44     text_type = str
45
46 NUM_PKTS = 67
47
48
49 class TestIPv6ND(VppTestCase):
50     def validate_ra(self, intf, rx, dst_ip=None):
51         if not dst_ip:
52             dst_ip = intf.remote_ip6
53
54         # unicasted packets must come to the unicast mac
55         self.assertEqual(rx[Ether].dst, intf.remote_mac)
56
57         # and from the router's MAC
58         self.assertEqual(rx[Ether].src, intf.local_mac)
59
60         # the rx'd RA should be addressed to the sender's source
61         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
62         self.assertEqual(in6_ptop(rx[IPv6].dst),
63                          in6_ptop(dst_ip))
64
65         # and come from the router's link local
66         self.assertTrue(in6_islladdr(rx[IPv6].src))
67         self.assertEqual(in6_ptop(rx[IPv6].src),
68                          in6_ptop(mk_ll_addr(intf.local_mac)))
69
70     def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
71         if not dst_ip:
72             dst_ip = intf.remote_ip6
73         if not tgt_ip:
74             dst_ip = intf.local_ip6
75
76         # unicasted packets must come to the unicast mac
77         self.assertEqual(rx[Ether].dst, intf.remote_mac)
78
79         # and from the router's MAC
80         self.assertEqual(rx[Ether].src, intf.local_mac)
81
82         # the rx'd NA should be addressed to the sender's source
83         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
84         self.assertEqual(in6_ptop(rx[IPv6].dst),
85                          in6_ptop(dst_ip))
86
87         # and come from the target address
88         self.assertEqual(
89             in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
90
91         # Dest link-layer options should have the router's MAC
92         dll = rx[ICMPv6NDOptDstLLAddr]
93         self.assertEqual(dll.lladdr, intf.local_mac)
94
95     def validate_ns(self, intf, rx, tgt_ip):
96         nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
97         dst_ip = inet_ntop(AF_INET6, nsma)
98
99         # NS is broadcast
100         self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
101
102         # and from the router's MAC
103         self.assertEqual(rx[Ether].src, intf.local_mac)
104
105         # the rx'd NS should be addressed to an mcast address
106         # derived from the target address
107         self.assertEqual(
108             in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
109
110         # expect the tgt IP in the NS header
111         ns = rx[ICMPv6ND_NS]
112         self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
113
114         # packet is from the router's local address
115         self.assertEqual(
116             in6_ptop(rx[IPv6].src), intf.local_ip6)
117
118         # Src link-layer options should have the router's MAC
119         sll = rx[ICMPv6NDOptSrcLLAddr]
120         self.assertEqual(sll.lladdr, intf.local_mac)
121
122     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
123                            filter_out_fn=is_ipv6_misc):
124         intf.add_stream(pkts)
125         self.pg_enable_capture(self.pg_interfaces)
126         self.pg_start()
127         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
128
129         self.assertEqual(len(rx), 1)
130         rx = rx[0]
131         self.validate_ra(intf, rx, dst_ip)
132
133     def send_and_expect_na(self, intf, pkts, remark, dst_ip=None,
134                            tgt_ip=None,
135                            filter_out_fn=is_ipv6_misc):
136         intf.add_stream(pkts)
137         self.pg_enable_capture(self.pg_interfaces)
138         self.pg_start()
139         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
140
141         self.assertEqual(len(rx), 1)
142         rx = rx[0]
143         self.validate_na(intf, rx, dst_ip, tgt_ip)
144
145     def send_and_expect_ns(self, tx_intf, rx_intf, pkts, tgt_ip,
146                            filter_out_fn=is_ipv6_misc):
147         self.vapi.cli("clear trace")
148         tx_intf.add_stream(pkts)
149         self.pg_enable_capture(self.pg_interfaces)
150         self.pg_start()
151         rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
152
153         self.assertEqual(len(rx), 1)
154         rx = rx[0]
155         self.validate_ns(rx_intf, rx, tgt_ip)
156
157     def verify_ip(self, rx, smac, dmac, sip, dip):
158         ether = rx[Ether]
159         self.assertEqual(ether.dst, dmac)
160         self.assertEqual(ether.src, smac)
161
162         ip = rx[IPv6]
163         self.assertEqual(ip.src, sip)
164         self.assertEqual(ip.dst, dip)
165
166
167 @tag_run_solo
168 class TestIPv6(TestIPv6ND):
169     """ IPv6 Test Case """
170
171     @classmethod
172     def setUpClass(cls):
173         super(TestIPv6, cls).setUpClass()
174
175     @classmethod
176     def tearDownClass(cls):
177         super(TestIPv6, cls).tearDownClass()
178
179     def setUp(self):
180         """
181         Perform test setup before test case.
182
183         **Config:**
184             - create 3 pg interfaces
185                 - untagged pg0 interface
186                 - Dot1Q subinterface on pg1
187                 - Dot1AD subinterface on pg2
188             - setup interfaces:
189                 - put it into UP state
190                 - set IPv6 addresses
191                 - resolve neighbor address using NDP
192             - configure 200 fib entries
193
194         :ivar list interfaces: pg interfaces and subinterfaces.
195         :ivar dict flows: IPv4 packet flows in test.
196
197         *TODO:* Create AD sub interface
198         """
199         super(TestIPv6, self).setUp()
200
201         # create 3 pg interfaces
202         self.create_pg_interfaces(range(3))
203
204         # create 2 subinterfaces for p1 and pg2
205         self.sub_interfaces = [
206             VppDot1QSubint(self, self.pg1, 100),
207             VppDot1QSubint(self, self.pg2, 200)
208             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
209         ]
210
211         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
212         self.flows = dict()
213         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
214         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
215         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
216
217         # packet sizes
218         self.pg_if_packet_sizes = [64, 1500, 9020]
219
220         self.interfaces = list(self.pg_interfaces)
221         self.interfaces.extend(self.sub_interfaces)
222
223         # setup all interfaces
224         for i in self.interfaces:
225             i.admin_up()
226             i.config_ip6()
227             i.resolve_ndp()
228
229     def tearDown(self):
230         """Run standard test teardown and log ``show ip6 neighbors``."""
231         for i in self.interfaces:
232             i.unconfig_ip6()
233             i.admin_down()
234         for i in self.sub_interfaces:
235             i.remove_vpp_config()
236
237         super(TestIPv6, self).tearDown()
238         if not self.vpp_dead:
239             self.logger.info(self.vapi.cli("show ip6 neighbors"))
240             # info(self.vapi.cli("show ip6 fib"))  # many entries
241
242     def modify_packet(self, src_if, packet_size, pkt):
243         """Add load, set destination IP and extend packet to required packet
244         size for defined interface.
245
246         :param VppInterface src_if: Interface to create packet for.
247         :param int packet_size: Required packet size.
248         :param Scapy pkt: Packet to be modified.
249         """
250         dst_if_idx = int(packet_size / 10 % 2)
251         dst_if = self.flows[src_if][dst_if_idx]
252         info = self.create_packet_info(src_if, dst_if)
253         payload = self.info_to_payload(info)
254         p = pkt / Raw(payload)
255         p[IPv6].dst = dst_if.remote_ip6
256         info.data = p.copy()
257         if isinstance(src_if, VppSubInterface):
258             p = src_if.add_dot1_layer(p)
259         self.extend_packet(p, packet_size)
260
261         return p
262
263     def create_stream(self, src_if):
264         """Create input packet stream for defined interface.
265
266         :param VppInterface src_if: Interface to create packet stream for.
267         """
268         hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
269         pkt_tmpl = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
270                     IPv6(src=src_if.remote_ip6) /
271                     inet6.UDP(sport=1234, dport=1234))
272
273         pkts = [self.modify_packet(src_if, i, pkt_tmpl)
274                 for i in moves.range(self.pg_if_packet_sizes[0],
275                                      self.pg_if_packet_sizes[1], 10)]
276         pkts_b = [self.modify_packet(src_if, i, pkt_tmpl)
277                   for i in moves.range(self.pg_if_packet_sizes[1] + hdr_ext,
278                                        self.pg_if_packet_sizes[2] + hdr_ext,
279                                        50)]
280         pkts.extend(pkts_b)
281
282         return pkts
283
284     def verify_capture(self, dst_if, capture):
285         """Verify captured input packet stream for defined interface.
286
287         :param VppInterface dst_if: Interface to verify captured packet stream
288                                     for.
289         :param list capture: Captured packet stream.
290         """
291         self.logger.info("Verifying capture on interface %s" % dst_if.name)
292         last_info = dict()
293         for i in self.interfaces:
294             last_info[i.sw_if_index] = None
295         is_sub_if = False
296         dst_sw_if_index = dst_if.sw_if_index
297         if hasattr(dst_if, 'parent'):
298             is_sub_if = True
299         for packet in capture:
300             if is_sub_if:
301                 # Check VLAN tags and Ethernet header
302                 packet = dst_if.remove_dot1_layer(packet)
303             self.assertTrue(Dot1Q not in packet)
304             try:
305                 ip = packet[IPv6]
306                 udp = packet[inet6.UDP]
307                 payload_info = self.payload_to_info(packet[Raw])
308                 packet_index = payload_info.index
309                 self.assertEqual(payload_info.dst, dst_sw_if_index)
310                 self.logger.debug(
311                     "Got packet on port %s: src=%u (id=%u)" %
312                     (dst_if.name, payload_info.src, packet_index))
313                 next_info = self.get_next_packet_info_for_interface2(
314                     payload_info.src, dst_sw_if_index,
315                     last_info[payload_info.src])
316                 last_info[payload_info.src] = next_info
317                 self.assertTrue(next_info is not None)
318                 self.assertEqual(packet_index, next_info.index)
319                 saved_packet = next_info.data
320                 # Check standard fields
321                 self.assertEqual(
322                     ip.src, saved_packet[IPv6].src)
323                 self.assertEqual(
324                     ip.dst, saved_packet[IPv6].dst)
325                 self.assertEqual(
326                     udp.sport, saved_packet[inet6.UDP].sport)
327                 self.assertEqual(
328                     udp.dport, saved_packet[inet6.UDP].dport)
329             except:
330                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
331                 raise
332         for i in self.interfaces:
333             remaining_packet = self.get_next_packet_info_for_interface2(
334                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
335             self.assertTrue(remaining_packet is None,
336                             "Interface %s: Packet expected from interface %s "
337                             "didn't arrive" % (dst_if.name, i.name))
338
339     def test_next_header_anomaly(self):
340         """ IPv6 next header anomaly test
341
342         Test scenario:
343             - ipv6 next header field = Fragment Header (44)
344             - next header is ICMPv6 Echo Request
345             - wait for reassembly
346         """
347         pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
348                IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44) /
349                ICMPv6EchoRequest())
350
351         self.pg0.add_stream(pkt)
352         self.pg_start()
353
354         # wait for reassembly
355         self.sleep(10)
356
357     def test_fib(self):
358         """ IPv6 FIB test
359
360         Test scenario:
361             - Create IPv6 stream for pg0 interface
362             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
363             - Send and verify received packets on each interface.
364         """
365
366         pkts = self.create_stream(self.pg0)
367         self.pg0.add_stream(pkts)
368
369         for i in self.sub_interfaces:
370             pkts = self.create_stream(i)
371             i.parent.add_stream(pkts)
372
373         self.pg_enable_capture(self.pg_interfaces)
374         self.pg_start()
375
376         pkts = self.pg0.get_capture()
377         self.verify_capture(self.pg0, pkts)
378
379         for i in self.sub_interfaces:
380             pkts = i.parent.get_capture()
381             self.verify_capture(i, pkts)
382
383     def test_ns(self):
384         """ IPv6 Neighbour Solicitation Exceptions
385
386         Test scenario:
387            - Send an NS Sourced from an address not covered by the link sub-net
388            - Send an NS to an mcast address the router has not joined
389            - Send NS for a target address the router does not onn.
390         """
391
392         #
393         # An NS from a non link source address
394         #
395         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
396         d = inet_ntop(AF_INET6, nsma)
397
398         p = (Ether(dst=in6_getnsmac(nsma)) /
399              IPv6(dst=d, src="2002::2") /
400              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
401              ICMPv6NDOptSrcLLAddr(
402                  lladdr=self.pg0.remote_mac))
403         pkts = [p]
404
405         self.send_and_assert_no_replies(
406             self.pg0, pkts,
407             "No response to NS source by address not on sub-net")
408
409         #
410         # An NS for sent to a solicited mcast group the router is
411         # not a member of FAILS
412         #
413         if 0:
414             nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
415             d = inet_ntop(AF_INET6, nsma)
416
417             p = (Ether(dst=in6_getnsmac(nsma)) /
418                  IPv6(dst=d, src=self.pg0.remote_ip6) /
419                  ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
420                  ICMPv6NDOptSrcLLAddr(
421                      lladdr=self.pg0.remote_mac))
422             pkts = [p]
423
424             self.send_and_assert_no_replies(
425                 self.pg0, pkts,
426                 "No response to NS sent to unjoined mcast address")
427
428         #
429         # An NS whose target address is one the router does not own
430         #
431         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
432         d = inet_ntop(AF_INET6, nsma)
433
434         p = (Ether(dst=in6_getnsmac(nsma)) /
435              IPv6(dst=d, src=self.pg0.remote_ip6) /
436              ICMPv6ND_NS(tgt="fd::ffff") /
437              ICMPv6NDOptSrcLLAddr(
438                  lladdr=self.pg0.remote_mac))
439         pkts = [p]
440
441         self.send_and_assert_no_replies(self.pg0, pkts,
442                                         "No response to NS for unknown target")
443
444         #
445         # A neighbor entry that has no associated FIB-entry
446         #
447         self.pg0.generate_remote_hosts(4)
448         nd_entry = VppNeighbor(self,
449                                self.pg0.sw_if_index,
450                                self.pg0.remote_hosts[2].mac,
451                                self.pg0.remote_hosts[2].ip6,
452                                is_no_fib_entry=1)
453         nd_entry.add_vpp_config()
454
455         #
456         # check we have the neighbor, but no route
457         #
458         self.assertTrue(find_nbr(self,
459                                  self.pg0.sw_if_index,
460                                  self.pg0._remote_hosts[2].ip6))
461         self.assertFalse(find_route(self,
462                                     self.pg0._remote_hosts[2].ip6,
463                                     128))
464
465         #
466         # send an NS from a link local address to the interface's global
467         # address
468         #
469         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
470              IPv6(
471                  dst=d, src=self.pg0._remote_hosts[2].ip6_ll) /
472              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
473              ICMPv6NDOptSrcLLAddr(
474                  lladdr=self.pg0.remote_mac))
475
476         self.send_and_expect_na(self.pg0, p,
477                                 "NS from link-local",
478                                 dst_ip=self.pg0._remote_hosts[2].ip6_ll,
479                                 tgt_ip=self.pg0.local_ip6)
480
481         #
482         # we should have learned an ND entry for the peer's link-local
483         # but not inserted a route to it in the FIB
484         #
485         self.assertTrue(find_nbr(self,
486                                  self.pg0.sw_if_index,
487                                  self.pg0._remote_hosts[2].ip6_ll))
488         self.assertFalse(find_route(self,
489                                     self.pg0._remote_hosts[2].ip6_ll,
490                                     128))
491
492         #
493         # An NS to the router's own Link-local
494         #
495         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
496              IPv6(
497                  dst=d, src=self.pg0._remote_hosts[3].ip6_ll) /
498              ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll) /
499              ICMPv6NDOptSrcLLAddr(
500                  lladdr=self.pg0.remote_mac))
501
502         self.send_and_expect_na(self.pg0, p,
503                                 "NS to/from link-local",
504                                 dst_ip=self.pg0._remote_hosts[3].ip6_ll,
505                                 tgt_ip=self.pg0.local_ip6_ll)
506
507         #
508         # we should have learned an ND entry for the peer's link-local
509         # but not inserted a route to it in the FIB
510         #
511         self.assertTrue(find_nbr(self,
512                                  self.pg0.sw_if_index,
513                                  self.pg0._remote_hosts[3].ip6_ll))
514         self.assertFalse(find_route(self,
515                                     self.pg0._remote_hosts[3].ip6_ll,
516                                     128))
517
518     def test_ns_duplicates(self):
519         """ ND Duplicates"""
520
521         #
522         # Generate some hosts on the LAN
523         #
524         self.pg1.generate_remote_hosts(3)
525
526         #
527         # Add host 1 on pg1 and pg2
528         #
529         ns_pg1 = VppNeighbor(self,
530                              self.pg1.sw_if_index,
531                              self.pg1.remote_hosts[1].mac,
532                              self.pg1.remote_hosts[1].ip6)
533         ns_pg1.add_vpp_config()
534         ns_pg2 = VppNeighbor(self,
535                              self.pg2.sw_if_index,
536                              self.pg2.remote_mac,
537                              self.pg1.remote_hosts[1].ip6)
538         ns_pg2.add_vpp_config()
539
540         #
541         # IP packet destined for pg1 remote host arrives on pg1 again.
542         #
543         p = (Ether(dst=self.pg0.local_mac,
544                    src=self.pg0.remote_mac) /
545              IPv6(src=self.pg0.remote_ip6,
546                   dst=self.pg1.remote_hosts[1].ip6) /
547              inet6.UDP(sport=1234, dport=1234) /
548              Raw())
549
550         self.pg0.add_stream(p)
551         self.pg_enable_capture(self.pg_interfaces)
552         self.pg_start()
553
554         rx1 = self.pg1.get_capture(1)
555
556         self.verify_ip(rx1[0],
557                        self.pg1.local_mac,
558                        self.pg1.remote_hosts[1].mac,
559                        self.pg0.remote_ip6,
560                        self.pg1.remote_hosts[1].ip6)
561
562         #
563         # remove the duplicate on pg1
564         # packet stream should generate NSs out of pg1
565         #
566         ns_pg1.remove_vpp_config()
567
568         self.send_and_expect_ns(self.pg0, self.pg1,
569                                 p, self.pg1.remote_hosts[1].ip6)
570
571         #
572         # Add it back
573         #
574         ns_pg1.add_vpp_config()
575
576         self.pg0.add_stream(p)
577         self.pg_enable_capture(self.pg_interfaces)
578         self.pg_start()
579
580         rx1 = self.pg1.get_capture(1)
581
582         self.verify_ip(rx1[0],
583                        self.pg1.local_mac,
584                        self.pg1.remote_hosts[1].mac,
585                        self.pg0.remote_ip6,
586                        self.pg1.remote_hosts[1].ip6)
587
588     def validate_ra(self, intf, rx, dst_ip=None, src_ip=None,
589                     mtu=9000, pi_opt=None):
590         if not dst_ip:
591             dst_ip = intf.remote_ip6
592         if not src_ip:
593             src_ip = mk_ll_addr(intf.local_mac)
594
595         # unicasted packets must come to the unicast mac
596         self.assertEqual(rx[Ether].dst, intf.remote_mac)
597
598         # and from the router's MAC
599         self.assertEqual(rx[Ether].src, intf.local_mac)
600
601         # the rx'd RA should be addressed to the sender's source
602         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
603         self.assertEqual(in6_ptop(rx[IPv6].dst),
604                          in6_ptop(dst_ip))
605
606         # and come from the router's link local
607         self.assertTrue(in6_islladdr(rx[IPv6].src))
608         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(src_ip))
609
610         # it should contain the links MTU
611         ra = rx[ICMPv6ND_RA]
612         self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
613
614         # it should contain the source's link layer address option
615         sll = ra[ICMPv6NDOptSrcLLAddr]
616         self.assertEqual(sll.lladdr, intf.local_mac)
617
618         if not pi_opt:
619             # the RA should not contain prefix information
620             self.assertFalse(ra.haslayer(
621                 ICMPv6NDOptPrefixInfo))
622         else:
623             raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
624
625             # the options are nested in the scapy packet in way that i cannot
626             # decipher how to decode. this 1st layer of option always returns
627             # nested classes, so a direct obj1=obj2 comparison always fails.
628             # however, the getlayer(.., 2) does give one instance.
629             # so we cheat here and construct a new opt instance for comparison
630             rd = ICMPv6NDOptPrefixInfo(
631                 prefixlen=raos.prefixlen,
632                 prefix=raos.prefix,
633                 L=raos.L,
634                 A=raos.A)
635             if type(pi_opt) is list:
636                 for ii in range(len(pi_opt)):
637                     self.assertEqual(pi_opt[ii], rd)
638                     rd = rx.getlayer(
639                         ICMPv6NDOptPrefixInfo, ii + 2)
640             else:
641                 self.assertEqual(pi_opt, raos, 'Expected: %s, received: %s'
642                                  % (pi_opt.show(dump=True),
643                                     raos.show(dump=True)))
644
645     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
646                            filter_out_fn=is_ipv6_misc,
647                            opt=None,
648                            src_ip=None):
649         self.vapi.cli("clear trace")
650         intf.add_stream(pkts)
651         self.pg_enable_capture(self.pg_interfaces)
652         self.pg_start()
653         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
654
655         self.assertEqual(len(rx), 1)
656         rx = rx[0]
657         self.validate_ra(intf, rx, dst_ip, src_ip=src_ip, pi_opt=opt)
658
659     def test_rs(self):
660         """ IPv6 Router Solicitation Exceptions
661
662         Test scenario:
663         """
664
665         #
666         # Before we begin change the IPv6 RA responses to use the unicast
667         # address - that way we will not confuse them with the periodic
668         # RAs which go to the mcast address
669         # Sit and wait for the first periodic RA.
670         #
671         # TODO
672         #
673         self.pg0.ip6_ra_config(send_unicast=1)
674
675         #
676         # An RS from a link source address
677         #  - expect an RA in return
678         #
679         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
680              IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
681              ICMPv6ND_RS())
682         pkts = [p]
683         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
684
685         #
686         # For the next RS sent the RA should be rate limited
687         #
688         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
689
690         #
691         # When we reconfigure the IPv6 RA config,
692         # we reset the RA rate limiting,
693         # so we need to do this before each test below so as not to drop
694         # packets for rate limiting reasons. Test this works here.
695         #
696         self.pg0.ip6_ra_config(send_unicast=1)
697         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
698
699         #
700         # An RS sent from a non-link local source
701         #
702         self.pg0.ip6_ra_config(send_unicast=1)
703         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
704              IPv6(dst=self.pg0.local_ip6,
705                   src="2002::ffff") /
706              ICMPv6ND_RS())
707         pkts = [p]
708         self.send_and_assert_no_replies(self.pg0, pkts,
709                                         "RS from non-link source")
710
711         #
712         # Source an RS from a link local address
713         #
714         self.pg0.ip6_ra_config(send_unicast=1)
715         ll = mk_ll_addr(self.pg0.remote_mac)
716         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
717              IPv6(dst=self.pg0.local_ip6, src=ll) /
718              ICMPv6ND_RS())
719         pkts = [p]
720         self.send_and_expect_ra(self.pg0, pkts,
721                                 "RS sourced from link-local",
722                                 dst_ip=ll)
723
724         #
725         # Send the RS multicast
726         #
727         self.pg0.ip6_ra_config(send_unicast=1)
728         dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
729         ll = mk_ll_addr(self.pg0.remote_mac)
730         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
731              IPv6(dst="ff02::2", src=ll) /
732              ICMPv6ND_RS())
733         pkts = [p]
734         self.send_and_expect_ra(self.pg0, pkts,
735                                 "RS sourced from link-local",
736                                 dst_ip=ll)
737
738         #
739         # Source from the unspecified address ::. This happens when the RS
740         # is sent before the host has a configured address/sub-net,
741         # i.e. auto-config. Since the sender has no IP address, the reply
742         # comes back mcast - so the capture needs to not filter this.
743         # If we happen to pick up the periodic RA at this point then so be it,
744         # it's not an error.
745         #
746         self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
747         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
748              IPv6(dst="ff02::2", src="::") /
749              ICMPv6ND_RS())
750         pkts = [p]
751         self.send_and_expect_ra(self.pg0, pkts,
752                                 "RS sourced from unspecified",
753                                 dst_ip="ff02::1",
754                                 filter_out_fn=None)
755
756         #
757         # Configure The RA to announce the links prefix
758         #
759         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
760                                self.pg0.local_ip6_prefix_len))
761
762         #
763         # RAs should now contain the prefix information option
764         #
765         opt = ICMPv6NDOptPrefixInfo(
766             prefixlen=self.pg0.local_ip6_prefix_len,
767             prefix=self.pg0.local_ip6,
768             L=1,
769             A=1)
770
771         self.pg0.ip6_ra_config(send_unicast=1)
772         ll = mk_ll_addr(self.pg0.remote_mac)
773         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
774              IPv6(dst=self.pg0.local_ip6, src=ll) /
775              ICMPv6ND_RS())
776         self.send_and_expect_ra(self.pg0, p,
777                                 "RA with prefix-info",
778                                 dst_ip=ll,
779                                 opt=opt)
780
781         #
782         # Change the prefix info to not off-link
783         #  L-flag is clear
784         #
785         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
786                                self.pg0.local_ip6_prefix_len),
787                                off_link=1)
788
789         opt = ICMPv6NDOptPrefixInfo(
790             prefixlen=self.pg0.local_ip6_prefix_len,
791             prefix=self.pg0.local_ip6,
792             L=0,
793             A=1)
794
795         self.pg0.ip6_ra_config(send_unicast=1)
796         self.send_and_expect_ra(self.pg0, p,
797                                 "RA with Prefix info with L-flag=0",
798                                 dst_ip=ll,
799                                 opt=opt)
800
801         #
802         # Change the prefix info to not off-link, no-autoconfig
803         #  L and A flag are clear in the advert
804         #
805         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
806                                self.pg0.local_ip6_prefix_len),
807                                off_link=1,
808                                no_autoconfig=1)
809
810         opt = ICMPv6NDOptPrefixInfo(
811             prefixlen=self.pg0.local_ip6_prefix_len,
812             prefix=self.pg0.local_ip6,
813             L=0,
814             A=0)
815
816         self.pg0.ip6_ra_config(send_unicast=1)
817         self.send_and_expect_ra(self.pg0, p,
818                                 "RA with Prefix info with A & L-flag=0",
819                                 dst_ip=ll,
820                                 opt=opt)
821
822         #
823         # Change the flag settings back to the defaults
824         #  L and A flag are set in the advert
825         #
826         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
827                                self.pg0.local_ip6_prefix_len))
828
829         opt = ICMPv6NDOptPrefixInfo(
830             prefixlen=self.pg0.local_ip6_prefix_len,
831             prefix=self.pg0.local_ip6,
832             L=1,
833             A=1)
834
835         self.pg0.ip6_ra_config(send_unicast=1)
836         self.send_and_expect_ra(self.pg0, p,
837                                 "RA with Prefix info",
838                                 dst_ip=ll,
839                                 opt=opt)
840
841         #
842         # Change the prefix info to not off-link, no-autoconfig
843         #  L and A flag are clear in the advert
844         #
845         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
846                                self.pg0.local_ip6_prefix_len),
847                                off_link=1,
848                                no_autoconfig=1)
849
850         opt = ICMPv6NDOptPrefixInfo(
851             prefixlen=self.pg0.local_ip6_prefix_len,
852             prefix=self.pg0.local_ip6,
853             L=0,
854             A=0)
855
856         self.pg0.ip6_ra_config(send_unicast=1)
857         self.send_and_expect_ra(self.pg0, p,
858                                 "RA with Prefix info with A & L-flag=0",
859                                 dst_ip=ll,
860                                 opt=opt)
861
862         #
863         # Use the reset to defaults option to revert to defaults
864         #  L and A flag are clear in the advert
865         #
866         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
867                                self.pg0.local_ip6_prefix_len),
868                                use_default=1)
869
870         opt = ICMPv6NDOptPrefixInfo(
871             prefixlen=self.pg0.local_ip6_prefix_len,
872             prefix=self.pg0.local_ip6,
873             L=1,
874             A=1)
875
876         self.pg0.ip6_ra_config(send_unicast=1)
877         self.send_and_expect_ra(self.pg0, p,
878                                 "RA with Prefix reverted to defaults",
879                                 dst_ip=ll,
880                                 opt=opt)
881
882         #
883         # Advertise Another prefix. With no L-flag/A-flag
884         #
885         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
886                                self.pg1.local_ip6_prefix_len),
887                                off_link=1,
888                                no_autoconfig=1)
889
890         opt = [ICMPv6NDOptPrefixInfo(
891             prefixlen=self.pg0.local_ip6_prefix_len,
892             prefix=self.pg0.local_ip6,
893             L=1,
894             A=1),
895             ICMPv6NDOptPrefixInfo(
896                 prefixlen=self.pg1.local_ip6_prefix_len,
897                 prefix=self.pg1.local_ip6,
898                 L=0,
899                 A=0)]
900
901         self.pg0.ip6_ra_config(send_unicast=1)
902         ll = mk_ll_addr(self.pg0.remote_mac)
903         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
904              IPv6(dst=self.pg0.local_ip6, src=ll) /
905              ICMPv6ND_RS())
906         self.send_and_expect_ra(self.pg0, p,
907                                 "RA with multiple Prefix infos",
908                                 dst_ip=ll,
909                                 opt=opt)
910
911         #
912         # Remove the first prefix-info - expect the second is still in the
913         # advert
914         #
915         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
916                                self.pg0.local_ip6_prefix_len),
917                                is_no=1)
918
919         opt = ICMPv6NDOptPrefixInfo(
920             prefixlen=self.pg1.local_ip6_prefix_len,
921             prefix=self.pg1.local_ip6,
922             L=0,
923             A=0)
924
925         self.pg0.ip6_ra_config(send_unicast=1)
926         self.send_and_expect_ra(self.pg0, p,
927                                 "RA with Prefix reverted to defaults",
928                                 dst_ip=ll,
929                                 opt=opt)
930
931         #
932         # Remove the second prefix-info - expect no prefix-info in the adverts
933         #
934         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
935                                self.pg1.local_ip6_prefix_len),
936                                is_no=1)
937
938         #
939         # change the link's link local, so we know that works too.
940         #
941         self.vapi.sw_interface_ip6_set_link_local_address(
942             sw_if_index=self.pg0.sw_if_index,
943             ip="fe80::88")
944
945         self.pg0.ip6_ra_config(send_unicast=1)
946         self.send_and_expect_ra(self.pg0, p,
947                                 "RA with Prefix reverted to defaults",
948                                 dst_ip=ll,
949                                 src_ip="fe80::88")
950
951         #
952         # Reset the periodic advertisements back to default values
953         #
954         self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
955
956     def test_mld(self):
957         """ MLD Report """
958         #
959         # test one MLD is sent after applying an IPv6 Address on an interface
960         #
961         self.pg_enable_capture(self.pg_interfaces)
962         self.pg_start()
963
964         subitf = VppDot1QSubint(self, self.pg1, 99)
965
966         subitf.admin_up()
967         subitf.config_ip6()
968
969         rxs = self.pg1._get_capture(timeout=4, filter_out_fn=None)
970
971         #
972         # hunt for the MLD on vlan 99
973         #
974         for rx in rxs:
975             # make sure ipv6 packets with hop by hop options have
976             # correct checksums
977             self.assert_packet_checksums_valid(rx)
978             if rx.haslayer(IPv6ExtHdrHopByHop) and \
979                rx.haslayer(Dot1Q) and \
980                rx[Dot1Q].vlan == 99:
981                 mld = rx[ICMPv6MLReport2]
982
983         self.assertEqual(mld.records_number, 4)
984
985
986 class TestIPv6RouteLookup(VppTestCase):
987     """ IPv6 Route Lookup Test Case """
988     routes = []
989
990     def route_lookup(self, prefix, exact):
991         return self.vapi.api(self.vapi.papi.ip_route_lookup,
992                              {
993                                  'table_id': 0,
994                                  'exact': exact,
995                                  'prefix': prefix,
996                              })
997
998     @classmethod
999     def setUpClass(cls):
1000         super(TestIPv6RouteLookup, cls).setUpClass()
1001
1002     @classmethod
1003     def tearDownClass(cls):
1004         super(TestIPv6RouteLookup, cls).tearDownClass()
1005
1006     def setUp(self):
1007         super(TestIPv6RouteLookup, self).setUp()
1008
1009         drop_nh = VppRoutePath("::1", 0xffffffff,
1010                                type=FibPathType.FIB_PATH_TYPE_DROP)
1011
1012         # Add 3 routes
1013         r = VppIpRoute(self, "2001:1111::", 32, [drop_nh])
1014         r.add_vpp_config()
1015         self.routes.append(r)
1016
1017         r = VppIpRoute(self, "2001:1111:2222::", 48, [drop_nh])
1018         r.add_vpp_config()
1019         self.routes.append(r)
1020
1021         r = VppIpRoute(self, "2001:1111:2222::1", 128, [drop_nh])
1022         r.add_vpp_config()
1023         self.routes.append(r)
1024
1025     def tearDown(self):
1026         # Remove the routes we added
1027         for r in self.routes:
1028             r.remove_vpp_config()
1029
1030         super(TestIPv6RouteLookup, self).tearDown()
1031
1032     def test_exact_match(self):
1033         # Verify we find the host route
1034         prefix = "2001:1111:2222::1/128"
1035         result = self.route_lookup(prefix, True)
1036         assert (prefix == str(result.route.prefix))
1037
1038         # Verify we find a middle prefix route
1039         prefix = "2001:1111:2222::/48"
1040         result = self.route_lookup(prefix, True)
1041         assert (prefix == str(result.route.prefix))
1042
1043         # Verify we do not find an available LPM.
1044         with self.vapi.assert_negative_api_retval():
1045             self.route_lookup("2001::2/128", True)
1046
1047     def test_longest_prefix_match(self):
1048         # verify we find lpm
1049         lpm_prefix = "2001:1111:2222::/48"
1050         result = self.route_lookup("2001:1111:2222::2/128", False)
1051         assert (lpm_prefix == str(result.route.prefix))
1052
1053         # Verify we find the exact when not requested
1054         result = self.route_lookup(lpm_prefix, False)
1055         assert (lpm_prefix == str(result.route.prefix))
1056
1057         # Can't seem to delete the default route so no negative LPM test.
1058
1059
1060 class TestIPv6IfAddrRoute(VppTestCase):
1061     """ IPv6 Interface Addr Route Test Case """
1062
1063     @classmethod
1064     def setUpClass(cls):
1065         super(TestIPv6IfAddrRoute, cls).setUpClass()
1066
1067     @classmethod
1068     def tearDownClass(cls):
1069         super(TestIPv6IfAddrRoute, cls).tearDownClass()
1070
1071     def setUp(self):
1072         super(TestIPv6IfAddrRoute, self).setUp()
1073
1074         # create 1 pg interface
1075         self.create_pg_interfaces(range(1))
1076
1077         for i in self.pg_interfaces:
1078             i.admin_up()
1079             i.config_ip6()
1080             i.resolve_ndp()
1081
1082     def tearDown(self):
1083         super(TestIPv6IfAddrRoute, self).tearDown()
1084         for i in self.pg_interfaces:
1085             i.unconfig_ip6()
1086             i.admin_down()
1087
1088     def test_ipv6_ifaddrs_same_prefix(self):
1089         """ IPv6 Interface Addresses Same Prefix test
1090
1091         Test scenario:
1092
1093             - Verify no route in FIB for prefix 2001:10::/64
1094             - Configure IPv4 address 2001:10::10/64  on an interface
1095             - Verify route in FIB for prefix 2001:10::/64
1096             - Configure IPv4 address 2001:10::20/64 on an interface
1097             - Delete 2001:10::10/64 from interface
1098             - Verify route in FIB for prefix 2001:10::/64
1099             - Delete 2001:10::20/64 from interface
1100             - Verify no route in FIB for prefix 2001:10::/64
1101         """
1102
1103         addr1 = "2001:10::10"
1104         addr2 = "2001:10::20"
1105
1106         if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
1107         if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
1108         self.assertFalse(if_addr1.query_vpp_config())
1109         self.assertFalse(find_route(self, addr1, 128))
1110         self.assertFalse(find_route(self, addr2, 128))
1111
1112         # configure first address, verify route present
1113         if_addr1.add_vpp_config()
1114         self.assertTrue(if_addr1.query_vpp_config())
1115         self.assertTrue(find_route(self, addr1, 128))
1116         self.assertFalse(find_route(self, addr2, 128))
1117
1118         # configure second address, delete first, verify route not removed
1119         if_addr2.add_vpp_config()
1120         if_addr1.remove_vpp_config()
1121         self.assertFalse(if_addr1.query_vpp_config())
1122         self.assertTrue(if_addr2.query_vpp_config())
1123         self.assertFalse(find_route(self, addr1, 128))
1124         self.assertTrue(find_route(self, addr2, 128))
1125
1126         # delete second address, verify route removed
1127         if_addr2.remove_vpp_config()
1128         self.assertFalse(if_addr1.query_vpp_config())
1129         self.assertFalse(find_route(self, addr1, 128))
1130         self.assertFalse(find_route(self, addr2, 128))
1131
1132     def test_ipv6_ifaddr_del(self):
1133         """ Delete an interface address that does not exist """
1134
1135         loopbacks = self.create_loopback_interfaces(1)
1136         lo = self.lo_interfaces[0]
1137
1138         lo.config_ip6()
1139         lo.admin_up()
1140
1141         #
1142         # try and remove pg0's subnet from lo
1143         #
1144         with self.vapi.assert_negative_api_retval():
1145             self.vapi.sw_interface_add_del_address(
1146                 sw_if_index=lo.sw_if_index,
1147                 prefix=self.pg0.local_ip6_prefix,
1148                 is_add=0)
1149
1150
1151 class TestICMPv6Echo(VppTestCase):
1152     """ ICMPv6 Echo Test Case """
1153
1154     @classmethod
1155     def setUpClass(cls):
1156         super(TestICMPv6Echo, cls).setUpClass()
1157
1158     @classmethod
1159     def tearDownClass(cls):
1160         super(TestICMPv6Echo, cls).tearDownClass()
1161
1162     def setUp(self):
1163         super(TestICMPv6Echo, self).setUp()
1164
1165         # create 1 pg interface
1166         self.create_pg_interfaces(range(1))
1167
1168         for i in self.pg_interfaces:
1169             i.admin_up()
1170             i.config_ip6()
1171             i.resolve_ndp(link_layer=True)
1172             i.resolve_ndp()
1173
1174     def tearDown(self):
1175         super(TestICMPv6Echo, self).tearDown()
1176         for i in self.pg_interfaces:
1177             i.unconfig_ip6()
1178             i.admin_down()
1179
1180     def test_icmpv6_echo(self):
1181         """ VPP replies to ICMPv6 Echo Request
1182
1183         Test scenario:
1184
1185             - Receive ICMPv6 Echo Request message on pg0 interface.
1186             - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
1187         """
1188
1189         # test both with global and local ipv6 addresses
1190         dsts = (self.pg0.local_ip6, self.pg0.local_ip6_ll)
1191         id = 0xb
1192         seq = 5
1193         data = b'\x0a' * 18
1194         p = list()
1195         for dst in dsts:
1196             p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1197                       IPv6(src=self.pg0.remote_ip6, dst=dst) /
1198                       ICMPv6EchoRequest(id=id, seq=seq, data=data)))
1199
1200         self.pg0.add_stream(p)
1201         self.pg_enable_capture(self.pg_interfaces)
1202         self.pg_start()
1203         rxs = self.pg0.get_capture(len(dsts))
1204
1205         for rx, dst in zip(rxs, dsts):
1206             ether = rx[Ether]
1207             ipv6 = rx[IPv6]
1208             icmpv6 = rx[ICMPv6EchoReply]
1209             self.assertEqual(ether.src, self.pg0.local_mac)
1210             self.assertEqual(ether.dst, self.pg0.remote_mac)
1211             self.assertEqual(ipv6.src, dst)
1212             self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
1213             self.assertEqual(icmp6types[icmpv6.type], "Echo Reply")
1214             self.assertEqual(icmpv6.id, id)
1215             self.assertEqual(icmpv6.seq, seq)
1216             self.assertEqual(icmpv6.data, data)
1217
1218
1219 class TestIPv6RD(TestIPv6ND):
1220     """ IPv6 Router Discovery Test Case """
1221
1222     @classmethod
1223     def setUpClass(cls):
1224         super(TestIPv6RD, cls).setUpClass()
1225
1226     @classmethod
1227     def tearDownClass(cls):
1228         super(TestIPv6RD, cls).tearDownClass()
1229
1230     def setUp(self):
1231         super(TestIPv6RD, self).setUp()
1232
1233         # create 2 pg interfaces
1234         self.create_pg_interfaces(range(2))
1235
1236         self.interfaces = list(self.pg_interfaces)
1237
1238         # setup all interfaces
1239         for i in self.interfaces:
1240             i.admin_up()
1241             i.config_ip6()
1242
1243     def tearDown(self):
1244         for i in self.interfaces:
1245             i.unconfig_ip6()
1246             i.admin_down()
1247         super(TestIPv6RD, self).tearDown()
1248
1249     def test_rd_send_router_solicitation(self):
1250         """ Verify router solicitation packets """
1251
1252         count = 2
1253         self.pg_enable_capture(self.pg_interfaces)
1254         self.pg_start()
1255         self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index,
1256                                                  mrc=count)
1257         rx_list = self.pg1.get_capture(count, timeout=3)
1258         self.assertEqual(len(rx_list), count)
1259         for packet in rx_list:
1260             self.assertEqual(packet.haslayer(IPv6), 1)
1261             self.assertEqual(packet[IPv6].haslayer(
1262                 ICMPv6ND_RS), 1)
1263             dst = ip6_normalize(packet[IPv6].dst)
1264             dst2 = ip6_normalize("ff02::2")
1265             self.assert_equal(dst, dst2)
1266             src = ip6_normalize(packet[IPv6].src)
1267             src2 = ip6_normalize(self.pg1.local_ip6_ll)
1268             self.assert_equal(src, src2)
1269             self.assertTrue(
1270                 bool(packet[ICMPv6ND_RS].haslayer(
1271                     ICMPv6NDOptSrcLLAddr)))
1272             self.assert_equal(
1273                 packet[ICMPv6NDOptSrcLLAddr].lladdr,
1274                 self.pg1.local_mac)
1275
1276     def verify_prefix_info(self, reported_prefix, prefix_option):
1277         prefix = IPv6Network(
1278             text_type(prefix_option.getfieldval("prefix") +
1279                       "/" +
1280                       text_type(prefix_option.getfieldval("prefixlen"))),
1281             strict=False)
1282         self.assert_equal(reported_prefix.prefix.network_address,
1283                           prefix.network_address)
1284         L = prefix_option.getfieldval("L")
1285         A = prefix_option.getfieldval("A")
1286         option_flags = (L << 7) | (A << 6)
1287         self.assert_equal(reported_prefix.flags, option_flags)
1288         self.assert_equal(reported_prefix.valid_time,
1289                           prefix_option.getfieldval("validlifetime"))
1290         self.assert_equal(reported_prefix.preferred_time,
1291                           prefix_option.getfieldval("preferredlifetime"))
1292
1293     def test_rd_receive_router_advertisement(self):
1294         """ Verify events triggered by received RA packets """
1295
1296         self.vapi.want_ip6_ra_events(enable=1)
1297
1298         prefix_info_1 = ICMPv6NDOptPrefixInfo(
1299             prefix="1::2",
1300             prefixlen=50,
1301             validlifetime=200,
1302             preferredlifetime=500,
1303             L=1,
1304             A=1,
1305         )
1306
1307         prefix_info_2 = ICMPv6NDOptPrefixInfo(
1308             prefix="7::4",
1309             prefixlen=20,
1310             validlifetime=70,
1311             preferredlifetime=1000,
1312             L=1,
1313             A=0,
1314         )
1315
1316         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1317              IPv6(dst=self.pg1.local_ip6_ll,
1318                   src=mk_ll_addr(self.pg1.remote_mac)) /
1319              ICMPv6ND_RA() /
1320              prefix_info_1 /
1321              prefix_info_2)
1322         self.pg1.add_stream([p])
1323         self.pg_start()
1324
1325         ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1326
1327         self.assert_equal(ev.current_hop_limit, 0)
1328         self.assert_equal(ev.flags, 8)
1329         self.assert_equal(ev.router_lifetime_in_sec, 1800)
1330         self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1331         self.assert_equal(
1332             ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0)
1333
1334         self.assert_equal(ev.n_prefixes, 2)
1335
1336         self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1337         self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1338
1339
1340 class TestIPv6RDControlPlane(TestIPv6ND):
1341     """ IPv6 Router Discovery Control Plane Test Case """
1342
1343     @classmethod
1344     def setUpClass(cls):
1345         super(TestIPv6RDControlPlane, cls).setUpClass()
1346
1347     @classmethod
1348     def tearDownClass(cls):
1349         super(TestIPv6RDControlPlane, cls).tearDownClass()
1350
1351     def setUp(self):
1352         super(TestIPv6RDControlPlane, self).setUp()
1353
1354         # create 1 pg interface
1355         self.create_pg_interfaces(range(1))
1356
1357         self.interfaces = list(self.pg_interfaces)
1358
1359         # setup all interfaces
1360         for i in self.interfaces:
1361             i.admin_up()
1362             i.config_ip6()
1363
1364     def tearDown(self):
1365         super(TestIPv6RDControlPlane, self).tearDown()
1366
1367     @staticmethod
1368     def create_ra_packet(pg, routerlifetime=None):
1369         src_ip = pg.remote_ip6_ll
1370         dst_ip = pg.local_ip6
1371         if routerlifetime is not None:
1372             ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1373         else:
1374             ra = ICMPv6ND_RA()
1375         p = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
1376              IPv6(dst=dst_ip, src=src_ip) / ra)
1377         return p
1378
1379     @staticmethod
1380     def get_default_routes(fib):
1381         list = []
1382         for entry in fib:
1383             if entry.route.prefix.prefixlen == 0:
1384                 for path in entry.route.paths:
1385                     if path.sw_if_index != 0xFFFFFFFF:
1386                         defaut_route = {}
1387                         defaut_route['sw_if_index'] = path.sw_if_index
1388                         defaut_route['next_hop'] = path.nh.address.ip6
1389                         list.append(defaut_route)
1390         return list
1391
1392     @staticmethod
1393     def get_interface_addresses(fib, pg):
1394         list = []
1395         for entry in fib:
1396             if entry.route.prefix.prefixlen == 128:
1397                 path = entry.route.paths[0]
1398                 if path.sw_if_index == pg.sw_if_index:
1399                     list.append(str(entry.route.prefix.network_address))
1400         return list
1401
1402     def wait_for_no_default_route(self, n_tries=50, s_time=1):
1403         while (n_tries):
1404             fib = self.vapi.ip_route_dump(0, True)
1405             default_routes = self.get_default_routes(fib)
1406             if 0 == len(default_routes):
1407                 return True
1408             n_tries = n_tries - 1
1409             self.sleep(s_time)
1410
1411         return False
1412
1413     def test_all(self):
1414         """ Test handling of SLAAC addresses and default routes """
1415
1416         fib = self.vapi.ip_route_dump(0, True)
1417         default_routes = self.get_default_routes(fib)
1418         initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1419         self.assertEqual(default_routes, [])
1420         router_address = IPv6Address(text_type(self.pg0.remote_ip6_ll))
1421
1422         self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1423
1424         self.sleep(0.1)
1425
1426         # send RA
1427         packet = (self.create_ra_packet(
1428             self.pg0) / ICMPv6NDOptPrefixInfo(
1429             prefix="1::",
1430             prefixlen=64,
1431             validlifetime=2,
1432             preferredlifetime=2,
1433             L=1,
1434             A=1,
1435         ) / ICMPv6NDOptPrefixInfo(
1436             prefix="7::",
1437             prefixlen=20,
1438             validlifetime=1500,
1439             preferredlifetime=1000,
1440             L=1,
1441             A=0,
1442         ))
1443         self.pg0.add_stream([packet])
1444         self.pg_start()
1445
1446         self.sleep_on_vpp_time(0.1)
1447
1448         fib = self.vapi.ip_route_dump(0, True)
1449
1450         # check FIB for new address
1451         addresses = set(self.get_interface_addresses(fib, self.pg0))
1452         new_addresses = addresses.difference(initial_addresses)
1453         self.assertEqual(len(new_addresses), 1)
1454         prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
1455                              strict=False)
1456         self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
1457
1458         # check FIB for new default route
1459         default_routes = self.get_default_routes(fib)
1460         self.assertEqual(len(default_routes), 1)
1461         dr = default_routes[0]
1462         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1463         self.assertEqual(dr['next_hop'], router_address)
1464
1465         # send RA to delete default route
1466         packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1467         self.pg0.add_stream([packet])
1468         self.pg_start()
1469
1470         self.sleep_on_vpp_time(0.1)
1471
1472         # check that default route is deleted
1473         fib = self.vapi.ip_route_dump(0, True)
1474         default_routes = self.get_default_routes(fib)
1475         self.assertEqual(len(default_routes), 0)
1476
1477         self.sleep_on_vpp_time(0.1)
1478
1479         # send RA
1480         packet = self.create_ra_packet(self.pg0)
1481         self.pg0.add_stream([packet])
1482         self.pg_start()
1483
1484         self.sleep_on_vpp_time(0.1)
1485
1486         # check FIB for new default route
1487         fib = self.vapi.ip_route_dump(0, True)
1488         default_routes = self.get_default_routes(fib)
1489         self.assertEqual(len(default_routes), 1)
1490         dr = default_routes[0]
1491         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1492         self.assertEqual(dr['next_hop'], router_address)
1493
1494         # send RA, updating router lifetime to 1s
1495         packet = self.create_ra_packet(self.pg0, 1)
1496         self.pg0.add_stream([packet])
1497         self.pg_start()
1498
1499         self.sleep_on_vpp_time(0.1)
1500
1501         # check that default route still exists
1502         fib = self.vapi.ip_route_dump(0, True)
1503         default_routes = self.get_default_routes(fib)
1504         self.assertEqual(len(default_routes), 1)
1505         dr = default_routes[0]
1506         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1507         self.assertEqual(dr['next_hop'], router_address)
1508
1509         self.sleep_on_vpp_time(1)
1510
1511         # check that default route is deleted
1512         self.assertTrue(self.wait_for_no_default_route())
1513
1514         # check FIB still contains the SLAAC address
1515         addresses = set(self.get_interface_addresses(fib, self.pg0))
1516         new_addresses = addresses.difference(initial_addresses)
1517
1518         self.assertEqual(len(new_addresses), 1)
1519         prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
1520                              strict=False)
1521         self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
1522
1523         self.sleep_on_vpp_time(1)
1524
1525         # check that SLAAC address is deleted
1526         fib = self.vapi.ip_route_dump(0, True)
1527         addresses = set(self.get_interface_addresses(fib, self.pg0))
1528         new_addresses = addresses.difference(initial_addresses)
1529         self.assertEqual(len(new_addresses), 0)
1530
1531
1532 class IPv6NDProxyTest(TestIPv6ND):
1533     """ IPv6 ND ProxyTest Case """
1534
1535     @classmethod
1536     def setUpClass(cls):
1537         super(IPv6NDProxyTest, cls).setUpClass()
1538
1539     @classmethod
1540     def tearDownClass(cls):
1541         super(IPv6NDProxyTest, cls).tearDownClass()
1542
1543     def setUp(self):
1544         super(IPv6NDProxyTest, self).setUp()
1545
1546         # create 3 pg interfaces
1547         self.create_pg_interfaces(range(3))
1548
1549         # pg0 is the master interface, with the configured subnet
1550         self.pg0.admin_up()
1551         self.pg0.config_ip6()
1552         self.pg0.resolve_ndp()
1553
1554         self.pg1.ip6_enable()
1555         self.pg2.ip6_enable()
1556
1557     def tearDown(self):
1558         super(IPv6NDProxyTest, self).tearDown()
1559
1560     def test_nd_proxy(self):
1561         """ IPv6 Proxy ND """
1562
1563         #
1564         # Generate some hosts in the subnet that we are proxying
1565         #
1566         self.pg0.generate_remote_hosts(8)
1567
1568         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1569         d = inet_ntop(AF_INET6, nsma)
1570
1571         #
1572         # Send an NS for one of those remote hosts on one of the proxy links
1573         # expect no response since it's from an address that is not
1574         # on the link that has the prefix configured
1575         #
1576         ns_pg1 = (Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac) /
1577                   IPv6(dst=d,
1578                        src=self.pg0._remote_hosts[2].ip6) /
1579                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1580                   ICMPv6NDOptSrcLLAddr(
1581                       lladdr=self.pg0._remote_hosts[2].mac))
1582
1583         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1584
1585         #
1586         # Add proxy support for the host
1587         #
1588         self.vapi.ip6nd_proxy_add_del(
1589             is_add=1, ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1590             sw_if_index=self.pg1.sw_if_index)
1591
1592         #
1593         # try that NS again. this time we expect an NA back
1594         #
1595         self.send_and_expect_na(self.pg1, ns_pg1,
1596                                 "NS to proxy entry",
1597                                 dst_ip=self.pg0._remote_hosts[2].ip6,
1598                                 tgt_ip=self.pg0.local_ip6)
1599
1600         #
1601         # ... and that we have an entry in the ND cache
1602         #
1603         self.assertTrue(find_nbr(self,
1604                                  self.pg1.sw_if_index,
1605                                  self.pg0._remote_hosts[2].ip6))
1606
1607         #
1608         # ... and we can route traffic to it
1609         #
1610         t = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1611              IPv6(dst=self.pg0._remote_hosts[2].ip6,
1612                   src=self.pg0.remote_ip6) /
1613              inet6.UDP(sport=10000, dport=20000) /
1614              Raw(b'\xa5' * 100))
1615
1616         self.pg0.add_stream(t)
1617         self.pg_enable_capture(self.pg_interfaces)
1618         self.pg_start()
1619         rx = self.pg1.get_capture(1)
1620         rx = rx[0]
1621
1622         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1623         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1624
1625         self.assertEqual(rx[IPv6].src,
1626                          t[IPv6].src)
1627         self.assertEqual(rx[IPv6].dst,
1628                          t[IPv6].dst)
1629
1630         #
1631         # Test we proxy for the host on the main interface
1632         #
1633         ns_pg0 = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
1634                   IPv6(dst=d, src=self.pg0.remote_ip6) /
1635                   ICMPv6ND_NS(
1636                       tgt=self.pg0._remote_hosts[2].ip6) /
1637                   ICMPv6NDOptSrcLLAddr(
1638                       lladdr=self.pg0.remote_mac))
1639
1640         self.send_and_expect_na(self.pg0, ns_pg0,
1641                                 "NS to proxy entry on main",
1642                                 tgt_ip=self.pg0._remote_hosts[2].ip6,
1643                                 dst_ip=self.pg0.remote_ip6)
1644
1645         #
1646         # Setup and resolve proxy for another host on another interface
1647         #
1648         ns_pg2 = (Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac) /
1649                   IPv6(dst=d,
1650                        src=self.pg0._remote_hosts[3].ip6) /
1651                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1652                   ICMPv6NDOptSrcLLAddr(
1653                       lladdr=self.pg0._remote_hosts[2].mac))
1654
1655         self.vapi.ip6nd_proxy_add_del(
1656             is_add=1, ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1657             sw_if_index=self.pg2.sw_if_index)
1658
1659         self.send_and_expect_na(self.pg2, ns_pg2,
1660                                 "NS to proxy entry other interface",
1661                                 dst_ip=self.pg0._remote_hosts[3].ip6,
1662                                 tgt_ip=self.pg0.local_ip6)
1663
1664         self.assertTrue(find_nbr(self,
1665                                  self.pg2.sw_if_index,
1666                                  self.pg0._remote_hosts[3].ip6))
1667
1668         #
1669         # hosts can communicate. pg2->pg1
1670         #
1671         t2 = (Ether(dst=self.pg2.local_mac,
1672                     src=self.pg0.remote_hosts[3].mac) /
1673               IPv6(dst=self.pg0._remote_hosts[2].ip6,
1674                    src=self.pg0._remote_hosts[3].ip6) /
1675               inet6.UDP(sport=10000, dport=20000) /
1676               Raw(b'\xa5' * 100))
1677
1678         self.pg2.add_stream(t2)
1679         self.pg_enable_capture(self.pg_interfaces)
1680         self.pg_start()
1681         rx = self.pg1.get_capture(1)
1682         rx = rx[0]
1683
1684         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1685         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1686
1687         self.assertEqual(rx[IPv6].src,
1688                          t2[IPv6].src)
1689         self.assertEqual(rx[IPv6].dst,
1690                          t2[IPv6].dst)
1691
1692         #
1693         # remove the proxy configs
1694         #
1695         self.vapi.ip6nd_proxy_add_del(
1696             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1697             sw_if_index=self.pg1.sw_if_index, is_add=0)
1698         self.vapi.ip6nd_proxy_add_del(
1699             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1700             sw_if_index=self.pg2.sw_if_index, is_add=0)
1701
1702         self.assertFalse(find_nbr(self,
1703                                   self.pg2.sw_if_index,
1704                                   self.pg0._remote_hosts[3].ip6))
1705         self.assertFalse(find_nbr(self,
1706                                   self.pg1.sw_if_index,
1707                                   self.pg0._remote_hosts[2].ip6))
1708
1709         #
1710         # no longer proxy-ing...
1711         #
1712         self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1713         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1714         self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1715
1716         #
1717         # no longer forwarding. traffic generates NS out of the glean/main
1718         # interface
1719         #
1720         self.pg2.add_stream(t2)
1721         self.pg_enable_capture(self.pg_interfaces)
1722         self.pg_start()
1723
1724         rx = self.pg0.get_capture(1)
1725
1726         self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1727
1728
1729 class TestIPNull(VppTestCase):
1730     """ IPv6 routes via NULL """
1731
1732     @classmethod
1733     def setUpClass(cls):
1734         super(TestIPNull, cls).setUpClass()
1735
1736     @classmethod
1737     def tearDownClass(cls):
1738         super(TestIPNull, cls).tearDownClass()
1739
1740     def setUp(self):
1741         super(TestIPNull, self).setUp()
1742
1743         # create 2 pg interfaces
1744         self.create_pg_interfaces(range(1))
1745
1746         for i in self.pg_interfaces:
1747             i.admin_up()
1748             i.config_ip6()
1749             i.resolve_ndp()
1750
1751     def tearDown(self):
1752         super(TestIPNull, self).tearDown()
1753         for i in self.pg_interfaces:
1754             i.unconfig_ip6()
1755             i.admin_down()
1756
1757     def test_ip_null(self):
1758         """ IP NULL route """
1759
1760         p = (Ether(src=self.pg0.remote_mac,
1761                    dst=self.pg0.local_mac) /
1762              IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
1763              inet6.UDP(sport=1234, dport=1234) /
1764              Raw(b'\xa5' * 100))
1765
1766         #
1767         # A route via IP NULL that will reply with ICMP unreachables
1768         #
1769         ip_unreach = VppIpRoute(
1770             self, "2001::", 64,
1771             [VppRoutePath("::", 0xffffffff,
1772                           type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH)])
1773         ip_unreach.add_vpp_config()
1774
1775         self.pg0.add_stream(p)
1776         self.pg_enable_capture(self.pg_interfaces)
1777         self.pg_start()
1778
1779         rx = self.pg0.get_capture(1)
1780         rx = rx[0]
1781         icmp = rx[ICMPv6DestUnreach]
1782
1783         # 0 = "No route to destination"
1784         self.assertEqual(icmp.code, 0)
1785
1786         # ICMP is rate limited. pause a bit
1787         self.sleep(1)
1788
1789         #
1790         # A route via IP NULL that will reply with ICMP prohibited
1791         #
1792         ip_prohibit = VppIpRoute(
1793             self, "2001::1", 128,
1794             [VppRoutePath("::", 0xffffffff,
1795                           type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT)])
1796         ip_prohibit.add_vpp_config()
1797
1798         self.pg0.add_stream(p)
1799         self.pg_enable_capture(self.pg_interfaces)
1800         self.pg_start()
1801
1802         rx = self.pg0.get_capture(1)
1803         rx = rx[0]
1804         icmp = rx[ICMPv6DestUnreach]
1805
1806         # 1 = "Communication with destination administratively prohibited"
1807         self.assertEqual(icmp.code, 1)
1808
1809
1810 class TestIPDisabled(VppTestCase):
1811     """ IPv6 disabled """
1812
1813     @classmethod
1814     def setUpClass(cls):
1815         super(TestIPDisabled, cls).setUpClass()
1816
1817     @classmethod
1818     def tearDownClass(cls):
1819         super(TestIPDisabled, cls).tearDownClass()
1820
1821     def setUp(self):
1822         super(TestIPDisabled, self).setUp()
1823
1824         # create 2 pg interfaces
1825         self.create_pg_interfaces(range(2))
1826
1827         # PG0 is IP enabled
1828         self.pg0.admin_up()
1829         self.pg0.config_ip6()
1830         self.pg0.resolve_ndp()
1831
1832         # PG 1 is not IP enabled
1833         self.pg1.admin_up()
1834
1835     def tearDown(self):
1836         super(TestIPDisabled, self).tearDown()
1837         for i in self.pg_interfaces:
1838             i.unconfig_ip4()
1839             i.admin_down()
1840
1841     def test_ip_disabled(self):
1842         """ IP Disabled """
1843
1844         MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
1845         MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
1846         #
1847         # An (S,G).
1848         # one accepting interface, pg0, 2 forwarding interfaces
1849         #
1850         route_ff_01 = VppIpMRoute(
1851             self,
1852             "::",
1853             "ffef::1", 128,
1854             MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
1855             [VppMRoutePath(self.pg1.sw_if_index,
1856                            MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT),
1857              VppMRoutePath(self.pg0.sw_if_index,
1858                            MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD)])
1859         route_ff_01.add_vpp_config()
1860
1861         pu = (Ether(src=self.pg1.remote_mac,
1862                     dst=self.pg1.local_mac) /
1863               IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
1864               inet6.UDP(sport=1234, dport=1234) /
1865               Raw(b'\xa5' * 100))
1866         pm = (Ether(src=self.pg1.remote_mac,
1867                     dst=self.pg1.local_mac) /
1868               IPv6(src="2001::1", dst="ffef::1") /
1869               inet6.UDP(sport=1234, dport=1234) /
1870               Raw(b'\xa5' * 100))
1871
1872         #
1873         # PG1 does not forward IP traffic
1874         #
1875         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1876         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1877
1878         #
1879         # IP enable PG1
1880         #
1881         self.pg1.config_ip6()
1882
1883         #
1884         # Now we get packets through
1885         #
1886         self.pg1.add_stream(pu)
1887         self.pg_enable_capture(self.pg_interfaces)
1888         self.pg_start()
1889         rx = self.pg0.get_capture(1)
1890
1891         self.pg1.add_stream(pm)
1892         self.pg_enable_capture(self.pg_interfaces)
1893         self.pg_start()
1894         rx = self.pg0.get_capture(1)
1895
1896         #
1897         # Disable PG1
1898         #
1899         self.pg1.unconfig_ip6()
1900
1901         #
1902         # PG1 does not forward IP traffic
1903         #
1904         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1905         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1906
1907
1908 class TestIP6LoadBalance(VppTestCase):
1909     """ IPv6 Load-Balancing """
1910
1911     @classmethod
1912     def setUpClass(cls):
1913         super(TestIP6LoadBalance, cls).setUpClass()
1914
1915     @classmethod
1916     def tearDownClass(cls):
1917         super(TestIP6LoadBalance, cls).tearDownClass()
1918
1919     def setUp(self):
1920         super(TestIP6LoadBalance, self).setUp()
1921
1922         self.create_pg_interfaces(range(5))
1923
1924         mpls_tbl = VppMplsTable(self, 0)
1925         mpls_tbl.add_vpp_config()
1926
1927         for i in self.pg_interfaces:
1928             i.admin_up()
1929             i.config_ip6()
1930             i.resolve_ndp()
1931             i.enable_mpls()
1932
1933     def tearDown(self):
1934         for i in self.pg_interfaces:
1935             i.unconfig_ip6()
1936             i.admin_down()
1937             i.disable_mpls()
1938         super(TestIP6LoadBalance, self).tearDown()
1939
1940     def pg_send(self, input, pkts):
1941         self.vapi.cli("clear trace")
1942         input.add_stream(pkts)
1943         self.pg_enable_capture(self.pg_interfaces)
1944         self.pg_start()
1945
1946     def send_and_expect_load_balancing(self, input, pkts, outputs):
1947         self.pg_send(input, pkts)
1948         rxs = []
1949         for oo in outputs:
1950             rx = oo._get_capture(1)
1951             self.assertNotEqual(0, len(rx))
1952             rxs.append(rx)
1953         return rxs
1954
1955     def send_and_expect_one_itf(self, input, pkts, itf):
1956         self.pg_send(input, pkts)
1957         rx = itf.get_capture(len(pkts))
1958
1959     def test_ip6_load_balance(self):
1960         """ IPv6 Load-Balancing """
1961
1962         #
1963         # An array of packets that differ only in the destination port
1964         #  - IP only
1965         #  - MPLS EOS
1966         #  - MPLS non-EOS
1967         #  - MPLS non-EOS with an entropy label
1968         #
1969         port_ip_pkts = []
1970         port_mpls_pkts = []
1971         port_mpls_neos_pkts = []
1972         port_ent_pkts = []
1973
1974         #
1975         # An array of packets that differ only in the source address
1976         #
1977         src_ip_pkts = []
1978         src_mpls_pkts = []
1979
1980         for ii in range(NUM_PKTS):
1981             port_ip_hdr = (
1982                 IPv6(dst="3000::1", src="3000:1::1") /
1983                 inet6.UDP(sport=1234, dport=1234 + ii) /
1984                 Raw(b'\xa5' * 100))
1985             port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1986                                        dst=self.pg0.local_mac) /
1987                                  port_ip_hdr))
1988             port_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1989                                          dst=self.pg0.local_mac) /
1990                                    MPLS(label=66, ttl=2) /
1991                                    port_ip_hdr))
1992             port_mpls_neos_pkts.append((Ether(src=self.pg0.remote_mac,
1993                                               dst=self.pg0.local_mac) /
1994                                         MPLS(label=67, ttl=2) /
1995                                         MPLS(label=77, ttl=2) /
1996                                         port_ip_hdr))
1997             port_ent_pkts.append((Ether(src=self.pg0.remote_mac,
1998                                         dst=self.pg0.local_mac) /
1999                                   MPLS(label=67, ttl=2) /
2000                                   MPLS(label=14, ttl=2) /
2001                                   MPLS(label=999, ttl=2) /
2002                                   port_ip_hdr))
2003             src_ip_hdr = (
2004                 IPv6(dst="3000::1", src="3000:1::%d" % ii) /
2005                 inet6.UDP(sport=1234, dport=1234) /
2006                 Raw(b'\xa5' * 100))
2007             src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
2008                                       dst=self.pg0.local_mac) /
2009                                 src_ip_hdr))
2010             src_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
2011                                         dst=self.pg0.local_mac) /
2012                                   MPLS(label=66, ttl=2) /
2013                                   src_ip_hdr))
2014
2015         #
2016         # A route for the IP packets
2017         #
2018         route_3000_1 = VppIpRoute(self, "3000::1", 128,
2019                                   [VppRoutePath(self.pg1.remote_ip6,
2020                                                 self.pg1.sw_if_index),
2021                                    VppRoutePath(self.pg2.remote_ip6,
2022                                                 self.pg2.sw_if_index)])
2023         route_3000_1.add_vpp_config()
2024
2025         #
2026         # a local-label for the EOS packets
2027         #
2028         binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
2029         binding.add_vpp_config()
2030
2031         #
2032         # An MPLS route for the non-EOS packets
2033         #
2034         route_67 = VppMplsRoute(self, 67, 0,
2035                                 [VppRoutePath(self.pg1.remote_ip6,
2036                                               self.pg1.sw_if_index,
2037                                               labels=[67]),
2038                                  VppRoutePath(self.pg2.remote_ip6,
2039                                               self.pg2.sw_if_index,
2040                                               labels=[67])])
2041         route_67.add_vpp_config()
2042
2043         #
2044         # inject the packet on pg0 - expect load-balancing across the 2 paths
2045         #  - since the default hash config is to use IP src,dst and port
2046         #    src,dst
2047         # We are not going to ensure equal amounts of packets across each link,
2048         # since the hash algorithm is statistical and therefore this can never
2049         # be guaranteed. But with 64 different packets we do expect some
2050         # balancing. So instead just ensure there is traffic on each link.
2051         #
2052         rx = self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
2053                                                  [self.pg1, self.pg2])
2054         n_ip_pg0 = len(rx[0])
2055         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
2056                                             [self.pg1, self.pg2])
2057         self.send_and_expect_load_balancing(self.pg0, port_mpls_pkts,
2058                                             [self.pg1, self.pg2])
2059         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
2060                                             [self.pg1, self.pg2])
2061         rx = self.send_and_expect_load_balancing(self.pg0, port_mpls_neos_pkts,
2062                                                  [self.pg1, self.pg2])
2063         n_mpls_pg0 = len(rx[0])
2064
2065         #
2066         # change the router ID and expect the distribution changes
2067         #
2068         self.vapi.set_ip_flow_hash_router_id(router_id=0x11111111)
2069
2070         rx = self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
2071                                                  [self.pg1, self.pg2])
2072         self.assertNotEqual(n_ip_pg0, len(rx[0]))
2073
2074         rx = self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
2075                                                  [self.pg1, self.pg2])
2076         self.assertNotEqual(n_mpls_pg0, len(rx[0]))
2077
2078         #
2079         # The packets with Entropy label in should not load-balance,
2080         # since the Entropy value is fixed.
2081         #
2082         self.send_and_expect_one_itf(self.pg0, port_ent_pkts, self.pg1)
2083
2084         #
2085         # change the flow hash config so it's only IP src,dst
2086         #  - now only the stream with differing source address will
2087         #    load-balance
2088         #
2089         self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, proto=1,
2090                                    sport=0, dport=0, is_ipv6=1)
2091
2092         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
2093                                             [self.pg1, self.pg2])
2094         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
2095                                             [self.pg1, self.pg2])
2096         self.send_and_expect_one_itf(self.pg0, port_ip_pkts, self.pg2)
2097
2098         #
2099         # change the flow hash config back to defaults
2100         #
2101         self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=1, dport=1,
2102                                    proto=1, is_ipv6=1)
2103
2104         #
2105         # Recursive prefixes
2106         #  - testing that 2 stages of load-balancing occurs and there is no
2107         #    polarisation (i.e. only 2 of 4 paths are used)
2108         #
2109         port_pkts = []
2110         src_pkts = []
2111
2112         for ii in range(257):
2113             port_pkts.append((Ether(src=self.pg0.remote_mac,
2114                                     dst=self.pg0.local_mac) /
2115                               IPv6(dst="4000::1",
2116                                    src="4000:1::1") /
2117                               inet6.UDP(sport=1234,
2118                                         dport=1234 + ii) /
2119                               Raw(b'\xa5' * 100)))
2120             src_pkts.append((Ether(src=self.pg0.remote_mac,
2121                                    dst=self.pg0.local_mac) /
2122                              IPv6(dst="4000::1",
2123                                   src="4000:1::%d" % ii) /
2124                              inet6.UDP(sport=1234, dport=1234) /
2125                              Raw(b'\xa5' * 100)))
2126
2127         route_3000_2 = VppIpRoute(self, "3000::2", 128,
2128                                   [VppRoutePath(self.pg3.remote_ip6,
2129                                                 self.pg3.sw_if_index),
2130                                    VppRoutePath(self.pg4.remote_ip6,
2131                                                 self.pg4.sw_if_index)])
2132         route_3000_2.add_vpp_config()
2133
2134         route_4000_1 = VppIpRoute(self, "4000::1", 128,
2135                                   [VppRoutePath("3000::1",
2136                                                 0xffffffff),
2137                                    VppRoutePath("3000::2",
2138                                                 0xffffffff)])
2139         route_4000_1.add_vpp_config()
2140
2141         #
2142         # inject the packet on pg0 - expect load-balancing across all 4 paths
2143         #
2144         self.vapi.cli("clear trace")
2145         self.send_and_expect_load_balancing(self.pg0, port_pkts,
2146                                             [self.pg1, self.pg2,
2147                                              self.pg3, self.pg4])
2148         self.send_and_expect_load_balancing(self.pg0, src_pkts,
2149                                             [self.pg1, self.pg2,
2150                                              self.pg3, self.pg4])
2151
2152         #
2153         # Recursive prefixes
2154         #  - testing that 2 stages of load-balancing no choices
2155         #
2156         port_pkts = []
2157
2158         for ii in range(257):
2159             port_pkts.append((Ether(src=self.pg0.remote_mac,
2160                                     dst=self.pg0.local_mac) /
2161                               IPv6(dst="6000::1",
2162                                    src="6000:1::1") /
2163                               inet6.UDP(sport=1234,
2164                                         dport=1234 + ii) /
2165                               Raw(b'\xa5' * 100)))
2166
2167         route_5000_2 = VppIpRoute(self, "5000::2", 128,
2168                                   [VppRoutePath(self.pg3.remote_ip6,
2169                                                 self.pg3.sw_if_index)])
2170         route_5000_2.add_vpp_config()
2171
2172         route_6000_1 = VppIpRoute(self, "6000::1", 128,
2173                                   [VppRoutePath("5000::2",
2174                                                 0xffffffff)])
2175         route_6000_1.add_vpp_config()
2176
2177         #
2178         # inject the packet on pg0 - expect load-balancing across all 4 paths
2179         #
2180         self.vapi.cli("clear trace")
2181         self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg3)
2182
2183
2184 class IP6PuntSetup(object):
2185     """ Setup for IPv6 Punt Police/Redirect """
2186
2187     def punt_setup(self):
2188         self.create_pg_interfaces(range(4))
2189
2190         for i in self.pg_interfaces:
2191             i.admin_up()
2192             i.config_ip6()
2193             i.resolve_ndp()
2194
2195         self.pkt = (Ether(src=self.pg0.remote_mac,
2196                           dst=self.pg0.local_mac) /
2197                     IPv6(src=self.pg0.remote_ip6,
2198                          dst=self.pg0.local_ip6) /
2199                     inet6.TCP(sport=1234, dport=1234) /
2200                     Raw(b'\xa5' * 100))
2201
2202     def punt_teardown(self):
2203         for i in self.pg_interfaces:
2204             i.unconfig_ip6()
2205             i.admin_down()
2206
2207
2208 class TestIP6Punt(IP6PuntSetup, VppTestCase):
2209     """ IPv6 Punt Police/Redirect """
2210
2211     def setUp(self):
2212         super(TestIP6Punt, self).setUp()
2213         super(TestIP6Punt, self).punt_setup()
2214
2215     def tearDown(self):
2216         super(TestIP6Punt, self).punt_teardown()
2217         super(TestIP6Punt, self).tearDown()
2218
2219     def test_ip_punt(self):
2220         """ IP6 punt police and redirect """
2221
2222         pkts = self.pkt * 1025
2223
2224         #
2225         # Configure a punt redirect via pg1.
2226         #
2227         nh_addr = self.pg1.remote_ip6
2228         ip_punt_redirect = VppIpPuntRedirect(self, self.pg0.sw_if_index,
2229                                              self.pg1.sw_if_index, nh_addr)
2230         ip_punt_redirect.add_vpp_config()
2231
2232         self.send_and_expect(self.pg0, pkts, self.pg1)
2233
2234         #
2235         # add a policer
2236         #
2237         policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
2238         policer.add_vpp_config()
2239         ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index,
2240                                            is_ip6=True)
2241         ip_punt_policer.add_vpp_config()
2242
2243         self.vapi.cli("clear trace")
2244         self.pg0.add_stream(pkts)
2245         self.pg_enable_capture(self.pg_interfaces)
2246         self.pg_start()
2247
2248         #
2249         # the number of packet received should be greater than 0,
2250         # but not equal to the number sent, since some were policed
2251         #
2252         rx = self.pg1._get_capture(1)
2253         stats = policer.get_stats()
2254
2255         # Single rate policer - expect conform, violate but no exceed
2256         self.assertGreater(stats['conform_packets'], 0)
2257         self.assertEqual(stats['exceed_packets'], 0)
2258         self.assertGreater(stats['violate_packets'], 0)
2259
2260         self.assertGreater(len(rx), 0)
2261         self.assertLess(len(rx), len(pkts))
2262
2263         #
2264         # remove the policer. back to full rx
2265         #
2266         ip_punt_policer.remove_vpp_config()
2267         policer.remove_vpp_config()
2268         self.send_and_expect(self.pg0, pkts, self.pg1)
2269
2270         #
2271         # remove the redirect. expect full drop.
2272         #
2273         ip_punt_redirect.remove_vpp_config()
2274         self.send_and_assert_no_replies(self.pg0, pkts,
2275                                         "IP no punt config")
2276
2277         #
2278         # Add a redirect that is not input port selective
2279         #
2280         ip_punt_redirect = VppIpPuntRedirect(self, 0xffffffff,
2281                                              self.pg1.sw_if_index, nh_addr)
2282         ip_punt_redirect.add_vpp_config()
2283         self.send_and_expect(self.pg0, pkts, self.pg1)
2284         ip_punt_redirect.remove_vpp_config()
2285
2286     def test_ip_punt_dump(self):
2287         """ IP6 punt redirect dump"""
2288
2289         #
2290         # Configure a punt redirects
2291         #
2292         nh_address = self.pg3.remote_ip6
2293         ipr_03 = VppIpPuntRedirect(self, self.pg0.sw_if_index,
2294                                    self.pg3.sw_if_index, nh_address)
2295         ipr_13 = VppIpPuntRedirect(self, self.pg1.sw_if_index,
2296                                    self.pg3.sw_if_index, nh_address)
2297         ipr_23 = VppIpPuntRedirect(self, self.pg2.sw_if_index,
2298                                    self.pg3.sw_if_index, '0::0')
2299         ipr_03.add_vpp_config()
2300         ipr_13.add_vpp_config()
2301         ipr_23.add_vpp_config()
2302
2303         #
2304         # Dump pg0 punt redirects
2305         #
2306         self.assertTrue(ipr_03.query_vpp_config())
2307         self.assertTrue(ipr_13.query_vpp_config())
2308         self.assertTrue(ipr_23.query_vpp_config())
2309
2310         #
2311         # Dump punt redirects for all interfaces
2312         #
2313         punts = self.vapi.ip_punt_redirect_dump(0xffffffff, is_ipv6=1)
2314         self.assertEqual(len(punts), 3)
2315         for p in punts:
2316             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
2317         self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
2318         self.assertEqual(str(punts[2].punt.nh), '::')
2319
2320
2321 class TestIP6PuntHandoff(IP6PuntSetup, VppTestCase):
2322     """ IPv6 Punt Police/Redirect """
2323     worker_config = "workers 2"
2324
2325     def setUp(self):
2326         super(TestIP6PuntHandoff, self).setUp()
2327         super(TestIP6PuntHandoff, self).punt_setup()
2328
2329     def tearDown(self):
2330         super(TestIP6PuntHandoff, self).punt_teardown()
2331         super(TestIP6PuntHandoff, self).tearDown()
2332
2333     def test_ip_punt(self):
2334         """ IP6 punt policer thread handoff """
2335         pkts = self.pkt * NUM_PKTS
2336
2337         #
2338         # Configure a punt redirect via pg1.
2339         #
2340         nh_addr = self.pg1.remote_ip6
2341         ip_punt_redirect = VppIpPuntRedirect(self, self.pg0.sw_if_index,
2342                                              self.pg1.sw_if_index, nh_addr)
2343         ip_punt_redirect.add_vpp_config()
2344
2345         action_tx = PolicerAction(
2346             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
2347             0)
2348         #
2349         # This policer drops no packets, we are just
2350         # testing that they get to the right thread.
2351         #
2352         policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, 1,
2353                              0, 0, False, action_tx, action_tx, action_tx)
2354         policer.add_vpp_config()
2355         ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index,
2356                                            is_ip6=True)
2357         ip_punt_policer.add_vpp_config()
2358
2359         for worker in [0, 1]:
2360             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2361             if worker == 0:
2362                 self.logger.debug(self.vapi.cli("show trace max 100"))
2363
2364         # Combined stats, all threads
2365         stats = policer.get_stats()
2366
2367         # Single rate policer - expect conform, violate but no exceed
2368         self.assertGreater(stats['conform_packets'], 0)
2369         self.assertEqual(stats['exceed_packets'], 0)
2370         self.assertGreater(stats['violate_packets'], 0)
2371
2372         # Worker 0, should have done all the policing
2373         stats0 = policer.get_stats(worker=0)
2374         self.assertEqual(stats, stats0)
2375
2376         # Worker 1, should have handed everything off
2377         stats1 = policer.get_stats(worker=1)
2378         self.assertEqual(stats1['conform_packets'], 0)
2379         self.assertEqual(stats1['exceed_packets'], 0)
2380         self.assertEqual(stats1['violate_packets'], 0)
2381
2382         # Bind the policer to worker 1 and repeat
2383         policer.bind_vpp_config(1, True)
2384         for worker in [0, 1]:
2385             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2386             self.logger.debug(self.vapi.cli("show trace max 100"))
2387
2388         # The 2 workers should now have policed the same amount
2389         stats = policer.get_stats()
2390         stats0 = policer.get_stats(worker=0)
2391         stats1 = policer.get_stats(worker=1)
2392
2393         self.assertGreater(stats0['conform_packets'], 0)
2394         self.assertEqual(stats0['exceed_packets'], 0)
2395         self.assertGreater(stats0['violate_packets'], 0)
2396
2397         self.assertGreater(stats1['conform_packets'], 0)
2398         self.assertEqual(stats1['exceed_packets'], 0)
2399         self.assertGreater(stats1['violate_packets'], 0)
2400
2401         self.assertEqual(stats0['conform_packets'] + stats1['conform_packets'],
2402                          stats['conform_packets'])
2403
2404         self.assertEqual(stats0['violate_packets'] + stats1['violate_packets'],
2405                          stats['violate_packets'])
2406
2407         # Unbind the policer and repeat
2408         policer.bind_vpp_config(1, False)
2409         for worker in [0, 1]:
2410             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
2411             self.logger.debug(self.vapi.cli("show trace max 100"))
2412
2413         # The policer should auto-bind to worker 0 when packets arrive
2414         stats = policer.get_stats()
2415         stats0new = policer.get_stats(worker=0)
2416         stats1new = policer.get_stats(worker=1)
2417
2418         self.assertGreater(stats0new['conform_packets'],
2419                            stats0['conform_packets'])
2420         self.assertEqual(stats0new['exceed_packets'], 0)
2421         self.assertGreater(stats0new['violate_packets'],
2422                            stats0['violate_packets'])
2423
2424         self.assertEqual(stats1, stats1new)
2425
2426         #
2427         # Clean up
2428         #
2429         ip_punt_policer.remove_vpp_config()
2430         policer.remove_vpp_config()
2431         ip_punt_redirect.remove_vpp_config()
2432
2433
2434 class TestIPDeag(VppTestCase):
2435     """ IPv6 Deaggregate Routes """
2436
2437     @classmethod
2438     def setUpClass(cls):
2439         super(TestIPDeag, cls).setUpClass()
2440
2441     @classmethod
2442     def tearDownClass(cls):
2443         super(TestIPDeag, cls).tearDownClass()
2444
2445     def setUp(self):
2446         super(TestIPDeag, self).setUp()
2447
2448         self.create_pg_interfaces(range(3))
2449
2450         for i in self.pg_interfaces:
2451             i.admin_up()
2452             i.config_ip6()
2453             i.resolve_ndp()
2454
2455     def tearDown(self):
2456         super(TestIPDeag, self).tearDown()
2457         for i in self.pg_interfaces:
2458             i.unconfig_ip6()
2459             i.admin_down()
2460
2461     def test_ip_deag(self):
2462         """ IP Deag Routes """
2463
2464         #
2465         # Create a table to be used for:
2466         #  1 - another destination address lookup
2467         #  2 - a source address lookup
2468         #
2469         table_dst = VppIpTable(self, 1, is_ip6=1)
2470         table_src = VppIpTable(self, 2, is_ip6=1)
2471         table_dst.add_vpp_config()
2472         table_src.add_vpp_config()
2473
2474         #
2475         # Add a route in the default table to point to a deag/
2476         # second lookup in each of these tables
2477         #
2478         route_to_dst = VppIpRoute(self, "1::1", 128,
2479                                   [VppRoutePath("::",
2480                                                 0xffffffff,
2481                                                 nh_table_id=1)])
2482         route_to_src = VppIpRoute(
2483             self, "1::2", 128,
2484             [VppRoutePath("::",
2485                           0xffffffff,
2486                           nh_table_id=2,
2487                           type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP)])
2488
2489         route_to_dst.add_vpp_config()
2490         route_to_src.add_vpp_config()
2491
2492         #
2493         # packets to these destination are dropped, since they'll
2494         # hit the respective default routes in the second table
2495         #
2496         p_dst = (Ether(src=self.pg0.remote_mac,
2497                        dst=self.pg0.local_mac) /
2498                  IPv6(src="5::5", dst="1::1") /
2499                  inet6.TCP(sport=1234, dport=1234) /
2500                  Raw(b'\xa5' * 100))
2501         p_src = (Ether(src=self.pg0.remote_mac,
2502                        dst=self.pg0.local_mac) /
2503                  IPv6(src="2::2", dst="1::2") /
2504                  inet6.TCP(sport=1234, dport=1234) /
2505                  Raw(b'\xa5' * 100))
2506         pkts_dst = p_dst * 257
2507         pkts_src = p_src * 257
2508
2509         self.send_and_assert_no_replies(self.pg0, pkts_dst,
2510                                         "IP in dst table")
2511         self.send_and_assert_no_replies(self.pg0, pkts_src,
2512                                         "IP in src table")
2513
2514         #
2515         # add a route in the dst table to forward via pg1
2516         #
2517         route_in_dst = VppIpRoute(self, "1::1", 128,
2518                                   [VppRoutePath(self.pg1.remote_ip6,
2519                                                 self.pg1.sw_if_index)],
2520                                   table_id=1)
2521         route_in_dst.add_vpp_config()
2522
2523         self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2524
2525         #
2526         # add a route in the src table to forward via pg2
2527         #
2528         route_in_src = VppIpRoute(self, "2::2", 128,
2529                                   [VppRoutePath(self.pg2.remote_ip6,
2530                                                 self.pg2.sw_if_index)],
2531                                   table_id=2)
2532         route_in_src.add_vpp_config()
2533         self.send_and_expect(self.pg0, pkts_src, self.pg2)
2534
2535         #
2536         # loop in the lookup DP
2537         #
2538         route_loop = VppIpRoute(self, "3::3", 128,
2539                                 [VppRoutePath("::",
2540                                               0xffffffff)])
2541         route_loop.add_vpp_config()
2542
2543         p_l = (Ether(src=self.pg0.remote_mac,
2544                      dst=self.pg0.local_mac) /
2545                IPv6(src="3::4", dst="3::3") /
2546                inet6.TCP(sport=1234, dport=1234) /
2547                Raw(b'\xa5' * 100))
2548
2549         self.send_and_assert_no_replies(self.pg0, p_l * 257,
2550                                         "IP lookup loop")
2551
2552
2553 class TestIP6Input(VppTestCase):
2554     """ IPv6 Input Exception Test Cases """
2555
2556     @classmethod
2557     def setUpClass(cls):
2558         super(TestIP6Input, cls).setUpClass()
2559
2560     @classmethod
2561     def tearDownClass(cls):
2562         super(TestIP6Input, cls).tearDownClass()
2563
2564     def setUp(self):
2565         super(TestIP6Input, self).setUp()
2566
2567         self.create_pg_interfaces(range(2))
2568
2569         for i in self.pg_interfaces:
2570             i.admin_up()
2571             i.config_ip6()
2572             i.resolve_ndp()
2573
2574     def tearDown(self):
2575         super(TestIP6Input, self).tearDown()
2576         for i in self.pg_interfaces:
2577             i.unconfig_ip6()
2578             i.admin_down()
2579
2580     def test_ip_input_icmp_reply(self):
2581         """ IP6 Input Exception - Return ICMP (3,0) """
2582         #
2583         # hop limit - ICMP replies
2584         #
2585         p_version = (Ether(src=self.pg0.remote_mac,
2586                            dst=self.pg0.local_mac) /
2587                      IPv6(src=self.pg0.remote_ip6,
2588                           dst=self.pg1.remote_ip6,
2589                           hlim=1) /
2590                      inet6.UDP(sport=1234, dport=1234) /
2591                      Raw(b'\xa5' * 100))
2592
2593         rx = self.send_and_expect(self.pg0, p_version * NUM_PKTS, self.pg0)
2594         rx = rx[0]
2595         icmp = rx[ICMPv6TimeExceeded]
2596
2597         # 0: "hop limit exceeded in transit",
2598         self.assertEqual((icmp.type, icmp.code), (3, 0))
2599
2600     icmpv6_data = '\x0a' * 18
2601     all_0s = "::"
2602     all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
2603
2604     @parameterized.expand([
2605         # Name, src, dst, l4proto, msg, timeout
2606         ("src='iface',   dst='iface'", None, None,
2607          inet6.UDP(sport=1234, dport=1234), "funky version", None),
2608         ("src='All 0's', dst='iface'", all_0s, None,
2609          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2610         ("src='iface',   dst='All 0's'", None, all_0s,
2611          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2612         ("src='All 1's', dst='iface'", all_1s, None,
2613          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2614         ("src='iface',   dst='All 1's'", None, all_1s,
2615          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2616         ("src='All 1's', dst='All 1's'", all_1s, all_1s,
2617          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2618
2619     ])
2620     def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
2621
2622         self._testMethodDoc = 'IPv6 Input Exception - %s' % name
2623
2624         p_version = (Ether(src=self.pg0.remote_mac,
2625                            dst=self.pg0.local_mac) /
2626                      IPv6(src=src or self.pg0.remote_ip6,
2627                           dst=dst or self.pg1.remote_ip6,
2628                           version=3) /
2629                      l4 /
2630                      Raw(b'\xa5' * 100))
2631
2632         self.send_and_assert_no_replies(self.pg0, p_version * NUM_PKTS,
2633                                         remark=msg or "",
2634                                         timeout=timeout)
2635
2636     def test_hop_by_hop(self):
2637         """ Hop-by-hop header test """
2638
2639         p = (Ether(src=self.pg0.remote_mac,
2640                    dst=self.pg0.local_mac) /
2641              IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
2642              IPv6ExtHdrHopByHop() /
2643              inet6.UDP(sport=1234, dport=1234) /
2644              Raw(b'\xa5' * 100))
2645
2646         self.pg0.add_stream(p)
2647         self.pg_enable_capture(self.pg_interfaces)
2648         self.pg_start()
2649
2650
2651 class TestIPReplace(VppTestCase):
2652     """ IPv6 Table Replace """
2653
2654     @classmethod
2655     def setUpClass(cls):
2656         super(TestIPReplace, cls).setUpClass()
2657
2658     @classmethod
2659     def tearDownClass(cls):
2660         super(TestIPReplace, cls).tearDownClass()
2661
2662     def setUp(self):
2663         super(TestIPReplace, self).setUp()
2664
2665         self.create_pg_interfaces(range(4))
2666
2667         table_id = 1
2668         self.tables = []
2669
2670         for i in self.pg_interfaces:
2671             i.admin_up()
2672             i.config_ip6()
2673             i.generate_remote_hosts(2)
2674             self.tables.append(VppIpTable(self, table_id,
2675                                           True).add_vpp_config())
2676             table_id += 1
2677
2678     def tearDown(self):
2679         super(TestIPReplace, self).tearDown()
2680         for i in self.pg_interfaces:
2681             i.admin_down()
2682             i.unconfig_ip6()
2683
2684     def test_replace(self):
2685         """ IP Table Replace """
2686
2687         MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
2688         MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
2689         N_ROUTES = 20
2690         links = [self.pg0, self.pg1, self.pg2, self.pg3]
2691         routes = [[], [], [], []]
2692
2693         # the sizes of 'empty' tables
2694         for t in self.tables:
2695             self.assertEqual(len(t.dump()), 2)
2696             self.assertEqual(len(t.mdump()), 5)
2697
2698         # load up the tables with some routes
2699         for ii, t in enumerate(self.tables):
2700             for jj in range(1, N_ROUTES):
2701                 uni = VppIpRoute(
2702                     self, "2001::%d" % jj if jj != 0 else "2001::", 128,
2703                     [VppRoutePath(links[ii].remote_hosts[0].ip6,
2704                                   links[ii].sw_if_index),
2705                      VppRoutePath(links[ii].remote_hosts[1].ip6,
2706                                   links[ii].sw_if_index)],
2707                     table_id=t.table_id).add_vpp_config()
2708                 multi = VppIpMRoute(
2709                     self, "::",
2710                     "ff:2001::%d" % jj, 128,
2711                     MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
2712                     [VppMRoutePath(self.pg0.sw_if_index,
2713                                    MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT,
2714                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
2715                      VppMRoutePath(self.pg1.sw_if_index,
2716                                    MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
2717                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
2718                      VppMRoutePath(self.pg2.sw_if_index,
2719                                    MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
2720                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
2721                      VppMRoutePath(self.pg3.sw_if_index,
2722                                    MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
2723                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)],
2724                     table_id=t.table_id).add_vpp_config()
2725                 routes[ii].append({'uni': uni,
2726                                    'multi': multi})
2727
2728         #
2729         # replace the tables a few times
2730         #
2731         for kk in range(3):
2732             # replace each table
2733             for t in self.tables:
2734                 t.replace_begin()
2735
2736             # all the routes are still there
2737             for ii, t in enumerate(self.tables):
2738                 dump = t.dump()
2739                 mdump = t.mdump()
2740                 for r in routes[ii]:
2741                     self.assertTrue(find_route_in_dump(dump, r['uni'], t))
2742                     self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
2743
2744             # redownload the even numbered routes
2745             for ii, t in enumerate(self.tables):
2746                 for jj in range(0, N_ROUTES, 2):
2747                     routes[ii][jj]['uni'].add_vpp_config()
2748                     routes[ii][jj]['multi'].add_vpp_config()
2749
2750             # signal each table converged
2751             for t in self.tables:
2752                 t.replace_end()
2753
2754             # we should find the even routes, but not the odd
2755             for ii, t in enumerate(self.tables):
2756                 dump = t.dump()
2757                 mdump = t.mdump()
2758                 for jj in range(0, N_ROUTES, 2):
2759                     self.assertTrue(find_route_in_dump(
2760                         dump, routes[ii][jj]['uni'], t))
2761                     self.assertTrue(find_mroute_in_dump(
2762                         mdump, routes[ii][jj]['multi'], t))
2763                 for jj in range(1, N_ROUTES - 1, 2):
2764                     self.assertFalse(find_route_in_dump(
2765                         dump, routes[ii][jj]['uni'], t))
2766                     self.assertFalse(find_mroute_in_dump(
2767                         mdump, routes[ii][jj]['multi'], t))
2768
2769             # reload all the routes
2770             for ii, t in enumerate(self.tables):
2771                 for r in routes[ii]:
2772                     r['uni'].add_vpp_config()
2773                     r['multi'].add_vpp_config()
2774
2775             # all the routes are still there
2776             for ii, t in enumerate(self.tables):
2777                 dump = t.dump()
2778                 mdump = t.mdump()
2779                 for r in routes[ii]:
2780                     self.assertTrue(find_route_in_dump(dump, r['uni'], t))
2781                     self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
2782
2783         #
2784         # finally flush the tables for good measure
2785         #
2786         for t in self.tables:
2787             t.flush()
2788             self.assertEqual(len(t.dump()), 2)
2789             self.assertEqual(len(t.mdump()), 5)
2790
2791
2792 class TestIP6Replace(VppTestCase):
2793     """ IPv4 Interface Address Replace """
2794
2795     @classmethod
2796     def setUpClass(cls):
2797         super(TestIP6Replace, cls).setUpClass()
2798
2799     @classmethod
2800     def tearDownClass(cls):
2801         super(TestIP6Replace, cls).tearDownClass()
2802
2803     def setUp(self):
2804         super(TestIP6Replace, self).setUp()
2805
2806         self.create_pg_interfaces(range(4))
2807
2808         for i in self.pg_interfaces:
2809             i.admin_up()
2810
2811     def tearDown(self):
2812         super(TestIP6Replace, self).tearDown()
2813         for i in self.pg_interfaces:
2814             i.admin_down()
2815
2816     def get_n_pfxs(self, intf):
2817         return len(self.vapi.ip_address_dump(intf.sw_if_index, True))
2818
2819     def test_replace(self):
2820         """ IP interface address replace """
2821
2822         intf_pfxs = [[], [], [], []]
2823
2824         # add prefixes to each of the interfaces
2825         for i in range(len(self.pg_interfaces)):
2826             intf = self.pg_interfaces[i]
2827
2828             # 2001:16:x::1/64
2829             addr = "2001:16:%d::1" % intf.sw_if_index
2830             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
2831             intf_pfxs[i].append(a)
2832
2833             # 2001:16:x::2/64 - a different address in the same subnet as above
2834             addr = "2001:16:%d::2" % intf.sw_if_index
2835             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
2836             intf_pfxs[i].append(a)
2837
2838             # 2001:15:x::2/64 - a different address and subnet
2839             addr = "2001:15:%d::2" % intf.sw_if_index
2840             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
2841             intf_pfxs[i].append(a)
2842
2843         # a dump should n_address in it
2844         for intf in self.pg_interfaces:
2845             self.assertEqual(self.get_n_pfxs(intf), 3)
2846
2847         #
2848         # remove all the address thru a replace
2849         #
2850         self.vapi.sw_interface_address_replace_begin()
2851         self.vapi.sw_interface_address_replace_end()
2852         for intf in self.pg_interfaces:
2853             self.assertEqual(self.get_n_pfxs(intf), 0)
2854
2855         #
2856         # add all the interface addresses back
2857         #
2858         for p in intf_pfxs:
2859             for v in p:
2860                 v.add_vpp_config()
2861         for intf in self.pg_interfaces:
2862             self.assertEqual(self.get_n_pfxs(intf), 3)
2863
2864         #
2865         # replace again, but this time update/re-add the address on the first
2866         # two interfaces
2867         #
2868         self.vapi.sw_interface_address_replace_begin()
2869
2870         for p in intf_pfxs[:2]:
2871             for v in p:
2872                 v.add_vpp_config()
2873
2874         self.vapi.sw_interface_address_replace_end()
2875
2876         # on the first two the address still exist,
2877         # on the other two they do not
2878         for intf in self.pg_interfaces[:2]:
2879             self.assertEqual(self.get_n_pfxs(intf), 3)
2880         for p in intf_pfxs[:2]:
2881             for v in p:
2882                 self.assertTrue(v.query_vpp_config())
2883         for intf in self.pg_interfaces[2:]:
2884             self.assertEqual(self.get_n_pfxs(intf), 0)
2885
2886         #
2887         # add all the interface addresses back on the last two
2888         #
2889         for p in intf_pfxs[2:]:
2890             for v in p:
2891                 v.add_vpp_config()
2892         for intf in self.pg_interfaces:
2893             self.assertEqual(self.get_n_pfxs(intf), 3)
2894
2895         #
2896         # replace again, this time add different prefixes on all the interfaces
2897         #
2898         self.vapi.sw_interface_address_replace_begin()
2899
2900         pfxs = []
2901         for intf in self.pg_interfaces:
2902             # 2001:18:x::1/64
2903             addr = "2001:18:%d::1" % intf.sw_if_index
2904             pfxs.append(VppIpInterfaceAddress(self, intf, addr,
2905                                               64).add_vpp_config())
2906
2907         self.vapi.sw_interface_address_replace_end()
2908
2909         # only .18 should exist on each interface
2910         for intf in self.pg_interfaces:
2911             self.assertEqual(self.get_n_pfxs(intf), 1)
2912         for pfx in pfxs:
2913             self.assertTrue(pfx.query_vpp_config())
2914
2915         #
2916         # remove everything
2917         #
2918         self.vapi.sw_interface_address_replace_begin()
2919         self.vapi.sw_interface_address_replace_end()
2920         for intf in self.pg_interfaces:
2921             self.assertEqual(self.get_n_pfxs(intf), 0)
2922
2923         #
2924         # add prefixes to each interface. post-begin add the prefix from
2925         # interface X onto interface Y. this would normally be an error
2926         # since it would generate a 'duplicate address' warning. but in
2927         # this case, since what is newly downloaded is sane, it's ok
2928         #
2929         for intf in self.pg_interfaces:
2930             # 2001:18:x::1/64
2931             addr = "2001:18:%d::1" % intf.sw_if_index
2932             VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
2933
2934         self.vapi.sw_interface_address_replace_begin()
2935
2936         pfxs = []
2937         for intf in self.pg_interfaces:
2938             # 2001:18:x::1/64
2939             addr = "2001:18:%d::1" % (intf.sw_if_index + 1)
2940             pfxs.append(VppIpInterfaceAddress(self, intf,
2941                                               addr, 64).add_vpp_config())
2942
2943         self.vapi.sw_interface_address_replace_end()
2944
2945         self.logger.info(self.vapi.cli("sh int addr"))
2946
2947         for intf in self.pg_interfaces:
2948             self.assertEqual(self.get_n_pfxs(intf), 1)
2949         for pfx in pfxs:
2950             self.assertTrue(pfx.query_vpp_config())
2951
2952
2953 class TestIP6LinkLocal(VppTestCase):
2954     """ IPv6 Link Local """
2955
2956     @classmethod
2957     def setUpClass(cls):
2958         super(TestIP6LinkLocal, cls).setUpClass()
2959
2960     @classmethod
2961     def tearDownClass(cls):
2962         super(TestIP6LinkLocal, cls).tearDownClass()
2963
2964     def setUp(self):
2965         super(TestIP6LinkLocal, self).setUp()
2966
2967         self.create_pg_interfaces(range(2))
2968
2969         for i in self.pg_interfaces:
2970             i.admin_up()
2971
2972     def tearDown(self):
2973         super(TestIP6LinkLocal, self).tearDown()
2974         for i in self.pg_interfaces:
2975             i.admin_down()
2976
2977     def test_ip6_ll(self):
2978         """ IPv6 Link Local """
2979
2980         #
2981         # two APIs to add a link local address.
2982         #   1 - just like any other prefix
2983         #   2 - with the special set LL API
2984         #
2985
2986         #
2987         # First with the API to set a 'normal' prefix
2988         #
2989         ll1 = "fe80:1::1"
2990         ll2 = "fe80:2::2"
2991         ll3 = "fe80:3::3"
2992
2993         VppIpInterfaceAddress(self, self.pg0, ll1, 128).add_vpp_config()
2994
2995         #
2996         # should be able to ping the ll
2997         #
2998         p_echo_request_1 = (Ether(src=self.pg0.remote_mac,
2999                                   dst=self.pg0.local_mac) /
3000                             IPv6(src=ll2,
3001                                  dst=ll1) /
3002                             ICMPv6EchoRequest())
3003
3004         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
3005
3006         #
3007         # change the link-local on pg0
3008         #
3009         v_ll3 = VppIpInterfaceAddress(self, self.pg0,
3010                                       ll3, 128).add_vpp_config()
3011
3012         p_echo_request_3 = (Ether(src=self.pg0.remote_mac,
3013                                   dst=self.pg0.local_mac) /
3014                             IPv6(src=ll2,
3015                                  dst=ll3) /
3016                             ICMPv6EchoRequest())
3017
3018         self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3019
3020         #
3021         # set a normal v6 prefix on the link
3022         #
3023         self.pg0.config_ip6()
3024
3025         self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
3026
3027         # the link-local cannot be removed
3028         with self.vapi.assert_negative_api_retval():
3029             v_ll3.remove_vpp_config()
3030
3031         #
3032         # Use the specific link-local API on pg1
3033         #
3034         VppIp6LinkLocalAddress(self, self.pg1, ll1).add_vpp_config()
3035         self.send_and_expect(self.pg1, [p_echo_request_1], self.pg1)
3036
3037         VppIp6LinkLocalAddress(self, self.pg1, ll3).add_vpp_config()
3038         self.send_and_expect(self.pg1, [p_echo_request_3], self.pg1)
3039
3040
3041 class TestIPv6PathMTU(VppTestCase):
3042     """ IPv6 Path MTU """
3043
3044     def setUp(self):
3045         super(TestIPv6PathMTU, self).setUp()
3046
3047         self.create_pg_interfaces(range(2))
3048
3049         # setup all interfaces
3050         for i in self.pg_interfaces:
3051             i.admin_up()
3052             i.config_ip6()
3053             i.resolve_ndp()
3054
3055     def tearDown(self):
3056         super(TestIPv6PathMTU, self).tearDown()
3057         for i in self.pg_interfaces:
3058             i.unconfig_ip6()
3059             i.admin_down()
3060
3061     def test_path_mtu_local(self):
3062         """ Path MTU for attached neighbour """
3063
3064         self.vapi.cli("set log class ip level debug")
3065         #
3066         # The goal here is not test that fragmentation works correctly,
3067         # that's done elsewhere, the intent is to ensure that the Path MTU
3068         # settings are honoured.
3069         #
3070
3071         #
3072         # IPv6 will only frag locally generated packets, so use tunnelled
3073         # packets post encap
3074         #
3075         tun = VppIpIpTunInterface(
3076             self,
3077             self.pg1,
3078             self.pg1.local_ip6,
3079             self.pg1.remote_ip6)
3080         tun.add_vpp_config()
3081         tun.admin_up()
3082         tun.config_ip6()
3083
3084         # set the interface MTU to a reasonable value
3085         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
3086                                        [2800, 0, 0, 0])
3087
3088         p_2k = (Ether(dst=self.pg0.local_mac,
3089                       src=self.pg0.remote_mac) /
3090                 IPv6(src=self.pg0.remote_ip6,
3091                      dst=tun.remote_ip6) /
3092                 UDP(sport=1234, dport=5678) /
3093                 Raw(b'0xa' * 1000))
3094         p_1k = (Ether(dst=self.pg0.local_mac,
3095                       src=self.pg0.remote_mac) /
3096                 IPv6(src=self.pg0.remote_ip6,
3097                      dst=tun.remote_ip6) /
3098                 UDP(sport=1234, dport=5678) /
3099                 Raw(b'0xa' * 600))
3100
3101         nbr = VppNeighbor(self,
3102                           self.pg1.sw_if_index,
3103                           self.pg1.remote_mac,
3104                           self.pg1.remote_ip6).add_vpp_config()
3105
3106         # this is now the interface MTU frags
3107         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3108         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3109
3110         # drop the path MTU for this neighbour to below the interface MTU
3111         # expect more frags
3112         pmtu = VppIpPathMtu(self, self.pg1.remote_ip6, 1300).add_vpp_config()
3113
3114         # print/format the adj delegate and trackers
3115         self.logger.info(self.vapi.cli("sh ip pmtu"))
3116         self.logger.info(self.vapi.cli("sh adj 7"))
3117
3118         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3119         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3120
3121         # increase the path MTU to more than the interface
3122         # expect to use the interface MTU
3123         pmtu.modify(8192)
3124
3125         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3126         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3127
3128         # go back to an MTU from the path
3129         pmtu.modify(1300)
3130
3131         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3132         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3133
3134         # raise the interface's MTU
3135         # should still use that of the path
3136         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
3137                                        [2000, 0, 0, 0])
3138         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3139         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3140
3141         # set path high and interface low
3142         pmtu.modify(2000)
3143         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
3144                                        [1300, 0, 0, 0])
3145         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3146         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3147
3148         # remove the path MTU
3149         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
3150                                        [2800, 0, 0, 0])
3151         pmtu.modify(0)
3152
3153         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3154         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3155
3156     def test_path_mtu_remote(self):
3157         """ Path MTU for remote neighbour """
3158
3159         self.vapi.cli("set log class ip level debug")
3160         #
3161         # The goal here is not test that fragmentation works correctly,
3162         # that's done elsewhere, the intent is to ensure that the Path MTU
3163         # settings are honoured.
3164         #
3165         tun_dst = "2001::1"
3166
3167         route = VppIpRoute(
3168             self, tun_dst, 64,
3169             [VppRoutePath(self.pg1.remote_ip6,
3170                           self.pg1.sw_if_index)]).add_vpp_config()
3171
3172         #
3173         # IPv6 will only frag locally generated packets, so use tunnelled
3174         # packets post encap
3175         #
3176         tun = VppIpIpTunInterface(
3177             self,
3178             self.pg1,
3179             self.pg1.local_ip6,
3180             tun_dst)
3181         tun.add_vpp_config()
3182         tun.admin_up()
3183         tun.config_ip6()
3184
3185         # set the interface MTU to a reasonable value
3186         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
3187                                        [2800, 0, 0, 0])
3188
3189         p_2k = (Ether(dst=self.pg0.local_mac,
3190                       src=self.pg0.remote_mac) /
3191                 IPv6(src=self.pg0.remote_ip6,
3192                      dst=tun.remote_ip6) /
3193                 UDP(sport=1234, dport=5678) /
3194                 Raw(b'0xa' * 1000))
3195         p_1k = (Ether(dst=self.pg0.local_mac,
3196                       src=self.pg0.remote_mac) /
3197                 IPv6(src=self.pg0.remote_ip6,
3198                      dst=tun.remote_ip6) /
3199                 UDP(sport=1234, dport=5678) /
3200                 Raw(b'0xa' * 600))
3201
3202         nbr = VppNeighbor(self,
3203                           self.pg1.sw_if_index,
3204                           self.pg1.remote_mac,
3205                           self.pg1.remote_ip6).add_vpp_config()
3206
3207         # this is now the interface MTU frags
3208         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3209         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3210
3211         # drop the path MTU for this neighbour to below the interface MTU
3212         # expect more frags
3213         pmtu = VppIpPathMtu(self, tun_dst, 1300).add_vpp_config()
3214
3215         # print/format the fib entry/dpo
3216         self.logger.info(self.vapi.cli("sh ip6 fib 2001::1"))
3217
3218         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3219         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3220
3221         # increase the path MTU to more than the interface
3222         # expect to use the interface MTU
3223         pmtu.modify(8192)
3224
3225         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3226         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3227
3228         # go back to an MTU from the path
3229         pmtu.modify(1300)
3230
3231         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3232         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3233
3234         # raise the interface's MTU
3235         # should still use that of the path
3236         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
3237                                        [2000, 0, 0, 0])
3238         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3239         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3240
3241         # turn the tun_dst into an attached neighbour
3242         route.modify([VppRoutePath("::",
3243                                    self.pg1.sw_if_index)])
3244         nbr2 = VppNeighbor(self,
3245                            self.pg1.sw_if_index,
3246                            self.pg1.remote_mac,
3247                            tun_dst).add_vpp_config()
3248
3249         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3250         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3251
3252         # add back to not attached
3253         nbr2.remove_vpp_config()
3254         route.modify([VppRoutePath(self.pg1.remote_ip6,
3255                                    self.pg1.sw_if_index)])
3256
3257         # set path high and interface low
3258         pmtu.modify(2000)
3259         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
3260                                        [1300, 0, 0, 0])
3261         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3)
3262         self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2)
3263
3264         # remove the path MTU
3265         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
3266                                        [2800, 0, 0, 0])
3267         pmtu.remove_vpp_config()
3268         self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2)
3269         self.send_and_expect(self.pg0, [p_1k], self.pg1)
3270
3271
3272 class TestIPFibSource(VppTestCase):
3273     """ IPv6 Table FibSource """
3274
3275     @classmethod
3276     def setUpClass(cls):
3277         super(TestIPFibSource, cls).setUpClass()
3278
3279     @classmethod
3280     def tearDownClass(cls):
3281         super(TestIPFibSource, cls).tearDownClass()
3282
3283     def setUp(self):
3284         super(TestIPFibSource, self).setUp()
3285
3286         self.create_pg_interfaces(range(2))
3287
3288         for i in self.pg_interfaces:
3289             i.admin_up()
3290             i.config_ip6()
3291             i.resolve_arp()
3292             i.generate_remote_hosts(2)
3293             i.configure_ipv6_neighbors()
3294
3295     def tearDown(self):
3296         super(TestIPFibSource, self).tearDown()
3297         for i in self.pg_interfaces:
3298             i.admin_down()
3299             i.unconfig_ip4()
3300
3301     def test_fib_source(self):
3302         """ IP Table FibSource """
3303
3304         routes = self.vapi.ip_route_v2_dump(0, True)
3305
3306         # 2 interfaces (4 routes) + 2 specials + 4 neighbours = 10 routes
3307         self.assertEqual(len(routes), 10)
3308
3309         # dump all the sources in the FIB
3310         sources = self.vapi.fib_source_dump()
3311         for source in sources:
3312             if (source.src.name == "API"):
3313                 api_source = source.src
3314             if (source.src.name == "interface"):
3315                 intf_source = source.src
3316             if (source.src.name == "adjacency"):
3317                 adj_source = source.src
3318             if (source.src.name == "special"):
3319                 special_source = source.src
3320             if (source.src.name == "default-route"):
3321                 dr_source = source.src
3322
3323         # dump the individual route types
3324         routes = self.vapi.ip_route_v2_dump(0, True, src=adj_source.id)
3325         self.assertEqual(len(routes), 4)
3326         routes = self.vapi.ip_route_v2_dump(0, True, src=intf_source.id)
3327         self.assertEqual(len(routes), 4)
3328         routes = self.vapi.ip_route_v2_dump(0, True, src=special_source.id)
3329         self.assertEqual(len(routes), 1)
3330         routes = self.vapi.ip_route_v2_dump(0, True, src=dr_source.id)
3331         self.assertEqual(len(routes), 1)
3332
3333         # add a new soure that'a better than the API
3334         self.vapi.fib_source_add(src={'name': "bgp",
3335                                       "priority": api_source.priority - 1})
3336
3337         # dump all the sources to check our new one is there
3338         sources = self.vapi.fib_source_dump()
3339
3340         for source in sources:
3341             if (source.src.name == "bgp"):
3342                 bgp_source = source.src
3343
3344         self.assertTrue(bgp_source)
3345         self.assertEqual(bgp_source.priority,
3346                          api_source.priority - 1)
3347
3348         # add a route with the default API source
3349         r1 = VppIpRouteV2(
3350             self, "2001::1", 128,
3351             [VppRoutePath(self.pg0.remote_ip6,
3352                           self.pg0.sw_if_index)]).add_vpp_config()
3353
3354         r2 = VppIpRouteV2(self, "2001::1", 128,
3355                           [VppRoutePath(self.pg1.remote_ip6,
3356                                         self.pg1.sw_if_index)],
3357                           src=bgp_source.id).add_vpp_config()
3358
3359         # ensure the BGP source takes priority
3360         p = (Ether(src=self.pg0.remote_mac,
3361                    dst=self.pg0.local_mac) /
3362              IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
3363              inet6.UDP(sport=1234, dport=1234) /
3364              Raw(b'\xa5' * 100))
3365
3366         self.send_and_expect(self.pg0, [p], self.pg1)
3367
3368         r2.remove_vpp_config()
3369         r1.remove_vpp_config()
3370
3371         self.assertFalse(find_route(self, "2001::1", 128))
3372
3373
3374 if __name__ == '__main__':
3375     unittest.main(testRunner=VppTestRunner)