ip-neighbor: fix MLD reports not being sent
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python3
2
3 from socket import inet_pton, inet_ntop
4 import unittest
5
6 from parameterized import parameterized
7 import scapy.compat
8 import scapy.layers.inet6 as inet6
9 from scapy.contrib.mpls import MPLS
10 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
11     ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
12     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
13     ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, \
14     IPv6ExtHdrHopByHop, ICMPv6MLReport2, ICMPv6MLDMultAddrRec
15 from scapy.layers.l2 import Ether, Dot1Q
16 from scapy.packet import Raw
17 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
18     in6_mactoifaceid
19 from six import moves
20
21 from framework import VppTestCase, VppTestRunner
22 from util import ppp, ip6_normalize, mk_ll_addr
23 from vpp_ip import DpoProto
24 from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
25     VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
26     VppMplsRoute, VppMplsTable, VppIpTable, FibPathType, FibPathProto, \
27     VppIpInterfaceAddress, find_route_in_dump, find_mroute_in_dump
28 from vpp_neighbor import find_nbr, VppNeighbor
29 from vpp_pg_interface import is_ipv6_misc
30 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
31 from ipaddress import IPv6Network, IPv6Address
32
33 AF_INET6 = socket.AF_INET6
34
35 try:
36     text_type = unicode
37 except NameError:
38     text_type = str
39
40 NUM_PKTS = 67
41
42
43 class TestIPv6ND(VppTestCase):
44     def validate_ra(self, intf, rx, dst_ip=None):
45         if not dst_ip:
46             dst_ip = intf.remote_ip6
47
48         # unicasted packets must come to the unicast mac
49         self.assertEqual(rx[Ether].dst, intf.remote_mac)
50
51         # and from the router's MAC
52         self.assertEqual(rx[Ether].src, intf.local_mac)
53
54         # the rx'd RA should be addressed to the sender's source
55         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
56         self.assertEqual(in6_ptop(rx[IPv6].dst),
57                          in6_ptop(dst_ip))
58
59         # and come from the router's link local
60         self.assertTrue(in6_islladdr(rx[IPv6].src))
61         self.assertEqual(in6_ptop(rx[IPv6].src),
62                          in6_ptop(mk_ll_addr(intf.local_mac)))
63
64     def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
65         if not dst_ip:
66             dst_ip = intf.remote_ip6
67         if not tgt_ip:
68             dst_ip = intf.local_ip6
69
70         # unicasted packets must come to the unicast mac
71         self.assertEqual(rx[Ether].dst, intf.remote_mac)
72
73         # and from the router's MAC
74         self.assertEqual(rx[Ether].src, intf.local_mac)
75
76         # the rx'd NA should be addressed to the sender's source
77         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
78         self.assertEqual(in6_ptop(rx[IPv6].dst),
79                          in6_ptop(dst_ip))
80
81         # and come from the target address
82         self.assertEqual(
83             in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
84
85         # Dest link-layer options should have the router's MAC
86         dll = rx[ICMPv6NDOptDstLLAddr]
87         self.assertEqual(dll.lladdr, intf.local_mac)
88
89     def validate_ns(self, intf, rx, tgt_ip):
90         nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
91         dst_ip = inet_ntop(AF_INET6, nsma)
92
93         # NS is broadcast
94         self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
95
96         # and from the router's MAC
97         self.assertEqual(rx[Ether].src, intf.local_mac)
98
99         # the rx'd NS should be addressed to an mcast address
100         # derived from the target address
101         self.assertEqual(
102             in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
103
104         # expect the tgt IP in the NS header
105         ns = rx[ICMPv6ND_NS]
106         self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
107
108         # packet is from the router's local address
109         self.assertEqual(
110             in6_ptop(rx[IPv6].src), intf.local_ip6)
111
112         # Src link-layer options should have the router's MAC
113         sll = rx[ICMPv6NDOptSrcLLAddr]
114         self.assertEqual(sll.lladdr, intf.local_mac)
115
116     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
117                            filter_out_fn=is_ipv6_misc):
118         intf.add_stream(pkts)
119         self.pg_enable_capture(self.pg_interfaces)
120         self.pg_start()
121         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
122
123         self.assertEqual(len(rx), 1)
124         rx = rx[0]
125         self.validate_ra(intf, rx, dst_ip)
126
127     def send_and_expect_na(self, intf, pkts, remark, dst_ip=None,
128                            tgt_ip=None,
129                            filter_out_fn=is_ipv6_misc):
130         intf.add_stream(pkts)
131         self.pg_enable_capture(self.pg_interfaces)
132         self.pg_start()
133         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
134
135         self.assertEqual(len(rx), 1)
136         rx = rx[0]
137         self.validate_na(intf, rx, dst_ip, tgt_ip)
138
139     def send_and_expect_ns(self, tx_intf, rx_intf, pkts, tgt_ip,
140                            filter_out_fn=is_ipv6_misc):
141         self.vapi.cli("clear trace")
142         tx_intf.add_stream(pkts)
143         self.pg_enable_capture(self.pg_interfaces)
144         self.pg_start()
145         rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
146
147         self.assertEqual(len(rx), 1)
148         rx = rx[0]
149         self.validate_ns(rx_intf, rx, tgt_ip)
150
151     def verify_ip(self, rx, smac, dmac, sip, dip):
152         ether = rx[Ether]
153         self.assertEqual(ether.dst, dmac)
154         self.assertEqual(ether.src, smac)
155
156         ip = rx[IPv6]
157         self.assertEqual(ip.src, sip)
158         self.assertEqual(ip.dst, dip)
159
160
161 class TestIPv6(TestIPv6ND):
162     """ IPv6 Test Case """
163
164     @classmethod
165     def setUpClass(cls):
166         super(TestIPv6, cls).setUpClass()
167
168     @classmethod
169     def tearDownClass(cls):
170         super(TestIPv6, cls).tearDownClass()
171
172     def setUp(self):
173         """
174         Perform test setup before test case.
175
176         **Config:**
177             - create 3 pg interfaces
178                 - untagged pg0 interface
179                 - Dot1Q subinterface on pg1
180                 - Dot1AD subinterface on pg2
181             - setup interfaces:
182                 - put it into UP state
183                 - set IPv6 addresses
184                 - resolve neighbor address using NDP
185             - configure 200 fib entries
186
187         :ivar list interfaces: pg interfaces and subinterfaces.
188         :ivar dict flows: IPv4 packet flows in test.
189
190         *TODO:* Create AD sub interface
191         """
192         super(TestIPv6, self).setUp()
193
194         # create 3 pg interfaces
195         self.create_pg_interfaces(range(3))
196
197         # create 2 subinterfaces for p1 and pg2
198         self.sub_interfaces = [
199             VppDot1QSubint(self, self.pg1, 100),
200             VppDot1QSubint(self, self.pg2, 200)
201             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
202         ]
203
204         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
205         self.flows = dict()
206         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
207         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
208         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
209
210         # packet sizes
211         self.pg_if_packet_sizes = [64, 1500, 9020]
212
213         self.interfaces = list(self.pg_interfaces)
214         self.interfaces.extend(self.sub_interfaces)
215
216         # setup all interfaces
217         for i in self.interfaces:
218             i.admin_up()
219             i.config_ip6()
220             i.resolve_ndp()
221
222     def tearDown(self):
223         """Run standard test teardown and log ``show ip6 neighbors``."""
224         for i in self.interfaces:
225             i.unconfig_ip6()
226             i.admin_down()
227         for i in self.sub_interfaces:
228             i.remove_vpp_config()
229
230         super(TestIPv6, self).tearDown()
231         if not self.vpp_dead:
232             self.logger.info(self.vapi.cli("show ip6 neighbors"))
233             # info(self.vapi.cli("show ip6 fib"))  # many entries
234
235     def modify_packet(self, src_if, packet_size, pkt):
236         """Add load, set destination IP and extend packet to required packet
237         size for defined interface.
238
239         :param VppInterface src_if: Interface to create packet for.
240         :param int packet_size: Required packet size.
241         :param Scapy pkt: Packet to be modified.
242         """
243         dst_if_idx = int(packet_size / 10 % 2)
244         dst_if = self.flows[src_if][dst_if_idx]
245         info = self.create_packet_info(src_if, dst_if)
246         payload = self.info_to_payload(info)
247         p = pkt / Raw(payload)
248         p[IPv6].dst = dst_if.remote_ip6
249         info.data = p.copy()
250         if isinstance(src_if, VppSubInterface):
251             p = src_if.add_dot1_layer(p)
252         self.extend_packet(p, packet_size)
253
254         return p
255
256     def create_stream(self, src_if):
257         """Create input packet stream for defined interface.
258
259         :param VppInterface src_if: Interface to create packet stream for.
260         """
261         hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
262         pkt_tmpl = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
263                     IPv6(src=src_if.remote_ip6) /
264                     inet6.UDP(sport=1234, dport=1234))
265
266         pkts = [self.modify_packet(src_if, i, pkt_tmpl)
267                 for i in moves.range(self.pg_if_packet_sizes[0],
268                                      self.pg_if_packet_sizes[1], 10)]
269         pkts_b = [self.modify_packet(src_if, i, pkt_tmpl)
270                   for i in moves.range(self.pg_if_packet_sizes[1] + hdr_ext,
271                                        self.pg_if_packet_sizes[2] + hdr_ext,
272                                        50)]
273         pkts.extend(pkts_b)
274
275         return pkts
276
277     def verify_capture(self, dst_if, capture):
278         """Verify captured input packet stream for defined interface.
279
280         :param VppInterface dst_if: Interface to verify captured packet stream
281                                     for.
282         :param list capture: Captured packet stream.
283         """
284         self.logger.info("Verifying capture on interface %s" % dst_if.name)
285         last_info = dict()
286         for i in self.interfaces:
287             last_info[i.sw_if_index] = None
288         is_sub_if = False
289         dst_sw_if_index = dst_if.sw_if_index
290         if hasattr(dst_if, 'parent'):
291             is_sub_if = True
292         for packet in capture:
293             if is_sub_if:
294                 # Check VLAN tags and Ethernet header
295                 packet = dst_if.remove_dot1_layer(packet)
296             self.assertTrue(Dot1Q not in packet)
297             try:
298                 ip = packet[IPv6]
299                 udp = packet[inet6.UDP]
300                 payload_info = self.payload_to_info(packet[Raw])
301                 packet_index = payload_info.index
302                 self.assertEqual(payload_info.dst, dst_sw_if_index)
303                 self.logger.debug(
304                     "Got packet on port %s: src=%u (id=%u)" %
305                     (dst_if.name, payload_info.src, packet_index))
306                 next_info = self.get_next_packet_info_for_interface2(
307                     payload_info.src, dst_sw_if_index,
308                     last_info[payload_info.src])
309                 last_info[payload_info.src] = next_info
310                 self.assertTrue(next_info is not None)
311                 self.assertEqual(packet_index, next_info.index)
312                 saved_packet = next_info.data
313                 # Check standard fields
314                 self.assertEqual(
315                     ip.src, saved_packet[IPv6].src)
316                 self.assertEqual(
317                     ip.dst, saved_packet[IPv6].dst)
318                 self.assertEqual(
319                     udp.sport, saved_packet[inet6.UDP].sport)
320                 self.assertEqual(
321                     udp.dport, saved_packet[inet6.UDP].dport)
322             except:
323                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
324                 raise
325         for i in self.interfaces:
326             remaining_packet = self.get_next_packet_info_for_interface2(
327                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
328             self.assertTrue(remaining_packet is None,
329                             "Interface %s: Packet expected from interface %s "
330                             "didn't arrive" % (dst_if.name, i.name))
331
332     def test_next_header_anomaly(self):
333         """ IPv6 next header anomaly test
334
335         Test scenario:
336             - ipv6 next header field = Fragment Header (44)
337             - next header is ICMPv6 Echo Request
338             - wait for reassembly
339         """
340         pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
341                IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44) /
342                ICMPv6EchoRequest())
343
344         self.pg0.add_stream(pkt)
345         self.pg_start()
346
347         # wait for reassembly
348         self.sleep(10)
349
350     def test_fib(self):
351         """ IPv6 FIB test
352
353         Test scenario:
354             - Create IPv6 stream for pg0 interface
355             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
356             - Send and verify received packets on each interface.
357         """
358
359         pkts = self.create_stream(self.pg0)
360         self.pg0.add_stream(pkts)
361
362         for i in self.sub_interfaces:
363             pkts = self.create_stream(i)
364             i.parent.add_stream(pkts)
365
366         self.pg_enable_capture(self.pg_interfaces)
367         self.pg_start()
368
369         pkts = self.pg0.get_capture()
370         self.verify_capture(self.pg0, pkts)
371
372         for i in self.sub_interfaces:
373             pkts = i.parent.get_capture()
374             self.verify_capture(i, pkts)
375
376     def test_ns(self):
377         """ IPv6 Neighbour Solicitation Exceptions
378
379         Test scenario:
380            - Send an NS Sourced from an address not covered by the link sub-net
381            - Send an NS to an mcast address the router has not joined
382            - Send NS for a target address the router does not onn.
383         """
384
385         #
386         # An NS from a non link source address
387         #
388         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
389         d = inet_ntop(AF_INET6, nsma)
390
391         p = (Ether(dst=in6_getnsmac(nsma)) /
392              IPv6(dst=d, src="2002::2") /
393              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
394              ICMPv6NDOptSrcLLAddr(
395                  lladdr=self.pg0.remote_mac))
396         pkts = [p]
397
398         self.send_and_assert_no_replies(
399             self.pg0, pkts,
400             "No response to NS source by address not on sub-net")
401
402         #
403         # An NS for sent to a solicited mcast group the router is
404         # not a member of FAILS
405         #
406         if 0:
407             nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
408             d = inet_ntop(AF_INET6, nsma)
409
410             p = (Ether(dst=in6_getnsmac(nsma)) /
411                  IPv6(dst=d, src=self.pg0.remote_ip6) /
412                  ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
413                  ICMPv6NDOptSrcLLAddr(
414                      lladdr=self.pg0.remote_mac))
415             pkts = [p]
416
417             self.send_and_assert_no_replies(
418                 self.pg0, pkts,
419                 "No response to NS sent to unjoined mcast address")
420
421         #
422         # An NS whose target address is one the router does not own
423         #
424         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
425         d = inet_ntop(AF_INET6, nsma)
426
427         p = (Ether(dst=in6_getnsmac(nsma)) /
428              IPv6(dst=d, src=self.pg0.remote_ip6) /
429              ICMPv6ND_NS(tgt="fd::ffff") /
430              ICMPv6NDOptSrcLLAddr(
431                  lladdr=self.pg0.remote_mac))
432         pkts = [p]
433
434         self.send_and_assert_no_replies(self.pg0, pkts,
435                                         "No response to NS for unknown target")
436
437         #
438         # A neighbor entry that has no associated FIB-entry
439         #
440         self.pg0.generate_remote_hosts(4)
441         nd_entry = VppNeighbor(self,
442                                self.pg0.sw_if_index,
443                                self.pg0.remote_hosts[2].mac,
444                                self.pg0.remote_hosts[2].ip6,
445                                is_no_fib_entry=1)
446         nd_entry.add_vpp_config()
447
448         #
449         # check we have the neighbor, but no route
450         #
451         self.assertTrue(find_nbr(self,
452                                  self.pg0.sw_if_index,
453                                  self.pg0._remote_hosts[2].ip6))
454         self.assertFalse(find_route(self,
455                                     self.pg0._remote_hosts[2].ip6,
456                                     128))
457
458         #
459         # send an NS from a link local address to the interface's global
460         # address
461         #
462         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
463              IPv6(
464                  dst=d, src=self.pg0._remote_hosts[2].ip6_ll) /
465              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
466              ICMPv6NDOptSrcLLAddr(
467                  lladdr=self.pg0.remote_mac))
468
469         self.send_and_expect_na(self.pg0, p,
470                                 "NS from link-local",
471                                 dst_ip=self.pg0._remote_hosts[2].ip6_ll,
472                                 tgt_ip=self.pg0.local_ip6)
473
474         #
475         # we should have learned an ND entry for the peer's link-local
476         # but not inserted a route to it in the FIB
477         #
478         self.assertTrue(find_nbr(self,
479                                  self.pg0.sw_if_index,
480                                  self.pg0._remote_hosts[2].ip6_ll))
481         self.assertFalse(find_route(self,
482                                     self.pg0._remote_hosts[2].ip6_ll,
483                                     128))
484
485         #
486         # An NS to the router's own Link-local
487         #
488         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
489              IPv6(
490                  dst=d, src=self.pg0._remote_hosts[3].ip6_ll) /
491              ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll) /
492              ICMPv6NDOptSrcLLAddr(
493                  lladdr=self.pg0.remote_mac))
494
495         self.send_and_expect_na(self.pg0, p,
496                                 "NS to/from link-local",
497                                 dst_ip=self.pg0._remote_hosts[3].ip6_ll,
498                                 tgt_ip=self.pg0.local_ip6_ll)
499
500         #
501         # we should have learned an ND entry for the peer's link-local
502         # but not inserted a route to it in the FIB
503         #
504         self.assertTrue(find_nbr(self,
505                                  self.pg0.sw_if_index,
506                                  self.pg0._remote_hosts[3].ip6_ll))
507         self.assertFalse(find_route(self,
508                                     self.pg0._remote_hosts[3].ip6_ll,
509                                     128))
510
511     def test_ns_duplicates(self):
512         """ ND Duplicates"""
513
514         #
515         # Generate some hosts on the LAN
516         #
517         self.pg1.generate_remote_hosts(3)
518
519         #
520         # Add host 1 on pg1 and pg2
521         #
522         ns_pg1 = VppNeighbor(self,
523                              self.pg1.sw_if_index,
524                              self.pg1.remote_hosts[1].mac,
525                              self.pg1.remote_hosts[1].ip6)
526         ns_pg1.add_vpp_config()
527         ns_pg2 = VppNeighbor(self,
528                              self.pg2.sw_if_index,
529                              self.pg2.remote_mac,
530                              self.pg1.remote_hosts[1].ip6)
531         ns_pg2.add_vpp_config()
532
533         #
534         # IP packet destined for pg1 remote host arrives on pg1 again.
535         #
536         p = (Ether(dst=self.pg0.local_mac,
537                    src=self.pg0.remote_mac) /
538              IPv6(src=self.pg0.remote_ip6,
539                   dst=self.pg1.remote_hosts[1].ip6) /
540              inet6.UDP(sport=1234, dport=1234) /
541              Raw())
542
543         self.pg0.add_stream(p)
544         self.pg_enable_capture(self.pg_interfaces)
545         self.pg_start()
546
547         rx1 = self.pg1.get_capture(1)
548
549         self.verify_ip(rx1[0],
550                        self.pg1.local_mac,
551                        self.pg1.remote_hosts[1].mac,
552                        self.pg0.remote_ip6,
553                        self.pg1.remote_hosts[1].ip6)
554
555         #
556         # remove the duplicate on pg1
557         # packet stream should generate NSs out of pg1
558         #
559         ns_pg1.remove_vpp_config()
560
561         self.send_and_expect_ns(self.pg0, self.pg1,
562                                 p, self.pg1.remote_hosts[1].ip6)
563
564         #
565         # Add it back
566         #
567         ns_pg1.add_vpp_config()
568
569         self.pg0.add_stream(p)
570         self.pg_enable_capture(self.pg_interfaces)
571         self.pg_start()
572
573         rx1 = self.pg1.get_capture(1)
574
575         self.verify_ip(rx1[0],
576                        self.pg1.local_mac,
577                        self.pg1.remote_hosts[1].mac,
578                        self.pg0.remote_ip6,
579                        self.pg1.remote_hosts[1].ip6)
580
581     def validate_ra(self, intf, rx, dst_ip=None, src_ip=None,
582                     mtu=9000, pi_opt=None):
583         if not dst_ip:
584             dst_ip = intf.remote_ip6
585         if not src_ip:
586             src_ip = mk_ll_addr(intf.local_mac)
587
588         # unicasted packets must come to the unicast mac
589         self.assertEqual(rx[Ether].dst, intf.remote_mac)
590
591         # and from the router's MAC
592         self.assertEqual(rx[Ether].src, intf.local_mac)
593
594         # the rx'd RA should be addressed to the sender's source
595         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
596         self.assertEqual(in6_ptop(rx[IPv6].dst),
597                          in6_ptop(dst_ip))
598
599         # and come from the router's link local
600         self.assertTrue(in6_islladdr(rx[IPv6].src))
601         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(src_ip))
602
603         # it should contain the links MTU
604         ra = rx[ICMPv6ND_RA]
605         self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
606
607         # it should contain the source's link layer address option
608         sll = ra[ICMPv6NDOptSrcLLAddr]
609         self.assertEqual(sll.lladdr, intf.local_mac)
610
611         if not pi_opt:
612             # the RA should not contain prefix information
613             self.assertFalse(ra.haslayer(
614                 ICMPv6NDOptPrefixInfo))
615         else:
616             raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
617
618             # the options are nested in the scapy packet in way that i cannot
619             # decipher how to decode. this 1st layer of option always returns
620             # nested classes, so a direct obj1=obj2 comparison always fails.
621             # however, the getlayer(.., 2) does give one instance.
622             # so we cheat here and construct a new opt instance for comparison
623             rd = ICMPv6NDOptPrefixInfo(
624                 prefixlen=raos.prefixlen,
625                 prefix=raos.prefix,
626                 L=raos.L,
627                 A=raos.A)
628             if type(pi_opt) is list:
629                 for ii in range(len(pi_opt)):
630                     self.assertEqual(pi_opt[ii], rd)
631                     rd = rx.getlayer(
632                         ICMPv6NDOptPrefixInfo, ii + 2)
633             else:
634                 self.assertEqual(pi_opt, raos, 'Expected: %s, received: %s'
635                                  % (pi_opt.show(dump=True),
636                                     raos.show(dump=True)))
637
638     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
639                            filter_out_fn=is_ipv6_misc,
640                            opt=None,
641                            src_ip=None):
642         self.vapi.cli("clear trace")
643         intf.add_stream(pkts)
644         self.pg_enable_capture(self.pg_interfaces)
645         self.pg_start()
646         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
647
648         self.assertEqual(len(rx), 1)
649         rx = rx[0]
650         self.validate_ra(intf, rx, dst_ip, src_ip=src_ip, pi_opt=opt)
651
652     def test_rs(self):
653         """ IPv6 Router Solicitation Exceptions
654
655         Test scenario:
656         """
657
658         #
659         # Before we begin change the IPv6 RA responses to use the unicast
660         # address - that way we will not confuse them with the periodic
661         # RAs which go to the mcast address
662         # Sit and wait for the first periodic RA.
663         #
664         # TODO
665         #
666         self.pg0.ip6_ra_config(send_unicast=1)
667
668         #
669         # An RS from a link source address
670         #  - expect an RA in return
671         #
672         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
673              IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
674              ICMPv6ND_RS())
675         pkts = [p]
676         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
677
678         #
679         # For the next RS sent the RA should be rate limited
680         #
681         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
682
683         #
684         # When we reconfigure the IPv6 RA config,
685         # we reset the RA rate limiting,
686         # so we need to do this before each test below so as not to drop
687         # packets for rate limiting reasons. Test this works here.
688         #
689         self.pg0.ip6_ra_config(send_unicast=1)
690         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
691
692         #
693         # An RS sent from a non-link local source
694         #
695         self.pg0.ip6_ra_config(send_unicast=1)
696         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
697              IPv6(dst=self.pg0.local_ip6,
698                   src="2002::ffff") /
699              ICMPv6ND_RS())
700         pkts = [p]
701         self.send_and_assert_no_replies(self.pg0, pkts,
702                                         "RS from non-link source")
703
704         #
705         # Source an RS from a link local address
706         #
707         self.pg0.ip6_ra_config(send_unicast=1)
708         ll = mk_ll_addr(self.pg0.remote_mac)
709         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
710              IPv6(dst=self.pg0.local_ip6, src=ll) /
711              ICMPv6ND_RS())
712         pkts = [p]
713         self.send_and_expect_ra(self.pg0, pkts,
714                                 "RS sourced from link-local",
715                                 dst_ip=ll)
716
717         #
718         # Send the RS multicast
719         #
720         self.pg0.ip6_ra_config(send_unicast=1)
721         dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
722         ll = mk_ll_addr(self.pg0.remote_mac)
723         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
724              IPv6(dst="ff02::2", src=ll) /
725              ICMPv6ND_RS())
726         pkts = [p]
727         self.send_and_expect_ra(self.pg0, pkts,
728                                 "RS sourced from link-local",
729                                 dst_ip=ll)
730
731         #
732         # Source from the unspecified address ::. This happens when the RS
733         # is sent before the host has a configured address/sub-net,
734         # i.e. auto-config. Since the sender has no IP address, the reply
735         # comes back mcast - so the capture needs to not filter this.
736         # If we happen to pick up the periodic RA at this point then so be it,
737         # it's not an error.
738         #
739         self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
740         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
741              IPv6(dst="ff02::2", src="::") /
742              ICMPv6ND_RS())
743         pkts = [p]
744         self.send_and_expect_ra(self.pg0, pkts,
745                                 "RS sourced from unspecified",
746                                 dst_ip="ff02::1",
747                                 filter_out_fn=None)
748
749         #
750         # Configure The RA to announce the links prefix
751         #
752         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
753                                self.pg0.local_ip6_prefix_len))
754
755         #
756         # RAs should now contain the prefix information option
757         #
758         opt = ICMPv6NDOptPrefixInfo(
759             prefixlen=self.pg0.local_ip6_prefix_len,
760             prefix=self.pg0.local_ip6,
761             L=1,
762             A=1)
763
764         self.pg0.ip6_ra_config(send_unicast=1)
765         ll = mk_ll_addr(self.pg0.remote_mac)
766         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
767              IPv6(dst=self.pg0.local_ip6, src=ll) /
768              ICMPv6ND_RS())
769         self.send_and_expect_ra(self.pg0, p,
770                                 "RA with prefix-info",
771                                 dst_ip=ll,
772                                 opt=opt)
773
774         #
775         # Change the prefix info to not off-link
776         #  L-flag is clear
777         #
778         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
779                                self.pg0.local_ip6_prefix_len),
780                                off_link=1)
781
782         opt = ICMPv6NDOptPrefixInfo(
783             prefixlen=self.pg0.local_ip6_prefix_len,
784             prefix=self.pg0.local_ip6,
785             L=0,
786             A=1)
787
788         self.pg0.ip6_ra_config(send_unicast=1)
789         self.send_and_expect_ra(self.pg0, p,
790                                 "RA with Prefix info with L-flag=0",
791                                 dst_ip=ll,
792                                 opt=opt)
793
794         #
795         # Change the prefix info to not off-link, no-autoconfig
796         #  L and A flag are clear in the advert
797         #
798         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
799                                self.pg0.local_ip6_prefix_len),
800                                off_link=1,
801                                no_autoconfig=1)
802
803         opt = ICMPv6NDOptPrefixInfo(
804             prefixlen=self.pg0.local_ip6_prefix_len,
805             prefix=self.pg0.local_ip6,
806             L=0,
807             A=0)
808
809         self.pg0.ip6_ra_config(send_unicast=1)
810         self.send_and_expect_ra(self.pg0, p,
811                                 "RA with Prefix info with A & L-flag=0",
812                                 dst_ip=ll,
813                                 opt=opt)
814
815         #
816         # Change the flag settings back to the defaults
817         #  L and A flag are set in the advert
818         #
819         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
820                                self.pg0.local_ip6_prefix_len))
821
822         opt = ICMPv6NDOptPrefixInfo(
823             prefixlen=self.pg0.local_ip6_prefix_len,
824             prefix=self.pg0.local_ip6,
825             L=1,
826             A=1)
827
828         self.pg0.ip6_ra_config(send_unicast=1)
829         self.send_and_expect_ra(self.pg0, p,
830                                 "RA with Prefix info",
831                                 dst_ip=ll,
832                                 opt=opt)
833
834         #
835         # Change the prefix info to not off-link, no-autoconfig
836         #  L and A flag are clear in the advert
837         #
838         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
839                                self.pg0.local_ip6_prefix_len),
840                                off_link=1,
841                                no_autoconfig=1)
842
843         opt = ICMPv6NDOptPrefixInfo(
844             prefixlen=self.pg0.local_ip6_prefix_len,
845             prefix=self.pg0.local_ip6,
846             L=0,
847             A=0)
848
849         self.pg0.ip6_ra_config(send_unicast=1)
850         self.send_and_expect_ra(self.pg0, p,
851                                 "RA with Prefix info with A & L-flag=0",
852                                 dst_ip=ll,
853                                 opt=opt)
854
855         #
856         # Use the reset to defaults option to revert to defaults
857         #  L and A flag are clear in the advert
858         #
859         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
860                                self.pg0.local_ip6_prefix_len),
861                                use_default=1)
862
863         opt = ICMPv6NDOptPrefixInfo(
864             prefixlen=self.pg0.local_ip6_prefix_len,
865             prefix=self.pg0.local_ip6,
866             L=1,
867             A=1)
868
869         self.pg0.ip6_ra_config(send_unicast=1)
870         self.send_and_expect_ra(self.pg0, p,
871                                 "RA with Prefix reverted to defaults",
872                                 dst_ip=ll,
873                                 opt=opt)
874
875         #
876         # Advertise Another prefix. With no L-flag/A-flag
877         #
878         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
879                                self.pg1.local_ip6_prefix_len),
880                                off_link=1,
881                                no_autoconfig=1)
882
883         opt = [ICMPv6NDOptPrefixInfo(
884             prefixlen=self.pg0.local_ip6_prefix_len,
885             prefix=self.pg0.local_ip6,
886             L=1,
887             A=1),
888             ICMPv6NDOptPrefixInfo(
889                 prefixlen=self.pg1.local_ip6_prefix_len,
890                 prefix=self.pg1.local_ip6,
891                 L=0,
892                 A=0)]
893
894         self.pg0.ip6_ra_config(send_unicast=1)
895         ll = mk_ll_addr(self.pg0.remote_mac)
896         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
897              IPv6(dst=self.pg0.local_ip6, src=ll) /
898              ICMPv6ND_RS())
899         self.send_and_expect_ra(self.pg0, p,
900                                 "RA with multiple Prefix infos",
901                                 dst_ip=ll,
902                                 opt=opt)
903
904         #
905         # Remove the first prefix-info - expect the second is still in the
906         # advert
907         #
908         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
909                                self.pg0.local_ip6_prefix_len),
910                                is_no=1)
911
912         opt = ICMPv6NDOptPrefixInfo(
913             prefixlen=self.pg1.local_ip6_prefix_len,
914             prefix=self.pg1.local_ip6,
915             L=0,
916             A=0)
917
918         self.pg0.ip6_ra_config(send_unicast=1)
919         self.send_and_expect_ra(self.pg0, p,
920                                 "RA with Prefix reverted to defaults",
921                                 dst_ip=ll,
922                                 opt=opt)
923
924         #
925         # Remove the second prefix-info - expect no prefix-info in the adverts
926         #
927         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
928                                self.pg1.local_ip6_prefix_len),
929                                is_no=1)
930
931         #
932         # change the link's link local, so we know that works too.
933         #
934         self.vapi.sw_interface_ip6_set_link_local_address(
935             sw_if_index=self.pg0.sw_if_index,
936             ip="fe80::88")
937
938         self.pg0.ip6_ra_config(send_unicast=1)
939         self.send_and_expect_ra(self.pg0, p,
940                                 "RA with Prefix reverted to defaults",
941                                 dst_ip=ll,
942                                 src_ip="fe80::88")
943
944         #
945         # Reset the periodic advertisements back to default values
946         #
947         self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
948
949     def test_mld(self):
950         """ MLD Report """
951         #
952         # test one MLD is sent after applying an IPv6 Address on an interface
953         #
954         self.pg_enable_capture(self.pg_interfaces)
955         self.pg_start()
956
957         subitf = VppDot1QSubint(self, self.pg1, 99)
958
959         subitf.admin_up()
960         subitf.config_ip6()
961
962         rxs = self.pg1._get_capture(timeout=2, filter_out_fn=None)
963
964         #
965         # hunt for the MLD on vlan 99
966         #
967         for rx in rxs:
968             # make sure ipv6 packets with hop by hop options have
969             # correct checksums
970             self.assert_packet_checksums_valid(rx)
971             if rx.haslayer(IPv6ExtHdrHopByHop) and \
972                rx.haslayer(Dot1Q) and \
973                rx[Dot1Q].vlan == 99:
974                 mld = rx[ICMPv6MLReport2]
975
976         self.assertEqual(mld.records_number, 4)
977
978
979 class TestIPv6IfAddrRoute(VppTestCase):
980     """ IPv6 Interface Addr Route Test Case """
981
982     @classmethod
983     def setUpClass(cls):
984         super(TestIPv6IfAddrRoute, cls).setUpClass()
985
986     @classmethod
987     def tearDownClass(cls):
988         super(TestIPv6IfAddrRoute, cls).tearDownClass()
989
990     def setUp(self):
991         super(TestIPv6IfAddrRoute, self).setUp()
992
993         # create 1 pg interface
994         self.create_pg_interfaces(range(1))
995
996         for i in self.pg_interfaces:
997             i.admin_up()
998             i.config_ip6()
999             i.resolve_ndp()
1000
1001     def tearDown(self):
1002         super(TestIPv6IfAddrRoute, self).tearDown()
1003         for i in self.pg_interfaces:
1004             i.unconfig_ip6()
1005             i.admin_down()
1006
1007     def test_ipv6_ifaddrs_same_prefix(self):
1008         """ IPv6 Interface Addresses Same Prefix test
1009
1010         Test scenario:
1011
1012             - Verify no route in FIB for prefix 2001:10::/64
1013             - Configure IPv4 address 2001:10::10/64  on an interface
1014             - Verify route in FIB for prefix 2001:10::/64
1015             - Configure IPv4 address 2001:10::20/64 on an interface
1016             - Delete 2001:10::10/64 from interface
1017             - Verify route in FIB for prefix 2001:10::/64
1018             - Delete 2001:10::20/64 from interface
1019             - Verify no route in FIB for prefix 2001:10::/64
1020         """
1021
1022         addr1 = "2001:10::10"
1023         addr2 = "2001:10::20"
1024
1025         if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
1026         if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
1027         self.assertFalse(if_addr1.query_vpp_config())
1028         self.assertFalse(find_route(self, addr1, 128))
1029         self.assertFalse(find_route(self, addr2, 128))
1030
1031         # configure first address, verify route present
1032         if_addr1.add_vpp_config()
1033         self.assertTrue(if_addr1.query_vpp_config())
1034         self.assertTrue(find_route(self, addr1, 128))
1035         self.assertFalse(find_route(self, addr2, 128))
1036
1037         # configure second address, delete first, verify route not removed
1038         if_addr2.add_vpp_config()
1039         if_addr1.remove_vpp_config()
1040         self.assertFalse(if_addr1.query_vpp_config())
1041         self.assertTrue(if_addr2.query_vpp_config())
1042         self.assertFalse(find_route(self, addr1, 128))
1043         self.assertTrue(find_route(self, addr2, 128))
1044
1045         # delete second address, verify route removed
1046         if_addr2.remove_vpp_config()
1047         self.assertFalse(if_addr1.query_vpp_config())
1048         self.assertFalse(find_route(self, addr1, 128))
1049         self.assertFalse(find_route(self, addr2, 128))
1050
1051
1052 class TestICMPv6Echo(VppTestCase):
1053     """ ICMPv6 Echo Test Case """
1054
1055     @classmethod
1056     def setUpClass(cls):
1057         super(TestICMPv6Echo, cls).setUpClass()
1058
1059     @classmethod
1060     def tearDownClass(cls):
1061         super(TestICMPv6Echo, cls).tearDownClass()
1062
1063     def setUp(self):
1064         super(TestICMPv6Echo, self).setUp()
1065
1066         # create 1 pg interface
1067         self.create_pg_interfaces(range(1))
1068
1069         for i in self.pg_interfaces:
1070             i.admin_up()
1071             i.config_ip6()
1072             i.resolve_ndp()
1073
1074     def tearDown(self):
1075         super(TestICMPv6Echo, self).tearDown()
1076         for i in self.pg_interfaces:
1077             i.unconfig_ip6()
1078             i.admin_down()
1079
1080     def test_icmpv6_echo(self):
1081         """ VPP replies to ICMPv6 Echo Request
1082
1083         Test scenario:
1084
1085             - Receive ICMPv6 Echo Request message on pg0 interface.
1086             - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
1087         """
1088
1089         icmpv6_id = 0xb
1090         icmpv6_seq = 5
1091         icmpv6_data = b'\x0a' * 18
1092         p_echo_request = (Ether(src=self.pg0.remote_mac,
1093                                 dst=self.pg0.local_mac) /
1094                           IPv6(src=self.pg0.remote_ip6,
1095                                dst=self.pg0.local_ip6) /
1096                           ICMPv6EchoRequest(
1097                               id=icmpv6_id,
1098                               seq=icmpv6_seq,
1099                               data=icmpv6_data))
1100
1101         self.pg0.add_stream(p_echo_request)
1102         self.pg_enable_capture(self.pg_interfaces)
1103         self.pg_start()
1104
1105         rx = self.pg0.get_capture(1)
1106         rx = rx[0]
1107         ether = rx[Ether]
1108         ipv6 = rx[IPv6]
1109         icmpv6 = rx[ICMPv6EchoReply]
1110
1111         self.assertEqual(ether.src, self.pg0.local_mac)
1112         self.assertEqual(ether.dst, self.pg0.remote_mac)
1113
1114         self.assertEqual(ipv6.src, self.pg0.local_ip6)
1115         self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
1116
1117         self.assertEqual(
1118             icmp6types[icmpv6.type], "Echo Reply")
1119         self.assertEqual(icmpv6.id, icmpv6_id)
1120         self.assertEqual(icmpv6.seq, icmpv6_seq)
1121         self.assertEqual(icmpv6.data, icmpv6_data)
1122
1123
1124 class TestIPv6RD(TestIPv6ND):
1125     """ IPv6 Router Discovery Test Case """
1126
1127     @classmethod
1128     def setUpClass(cls):
1129         super(TestIPv6RD, cls).setUpClass()
1130
1131     @classmethod
1132     def tearDownClass(cls):
1133         super(TestIPv6RD, cls).tearDownClass()
1134
1135     def setUp(self):
1136         super(TestIPv6RD, self).setUp()
1137
1138         # create 2 pg interfaces
1139         self.create_pg_interfaces(range(2))
1140
1141         self.interfaces = list(self.pg_interfaces)
1142
1143         # setup all interfaces
1144         for i in self.interfaces:
1145             i.admin_up()
1146             i.config_ip6()
1147
1148     def tearDown(self):
1149         for i in self.interfaces:
1150             i.unconfig_ip6()
1151             i.admin_down()
1152         super(TestIPv6RD, self).tearDown()
1153
1154     def test_rd_send_router_solicitation(self):
1155         """ Verify router solicitation packets """
1156
1157         count = 2
1158         self.pg_enable_capture(self.pg_interfaces)
1159         self.pg_start()
1160         self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index,
1161                                                  mrc=count)
1162         rx_list = self.pg1.get_capture(count, timeout=3)
1163         self.assertEqual(len(rx_list), count)
1164         for packet in rx_list:
1165             self.assertEqual(packet.haslayer(IPv6), 1)
1166             self.assertEqual(packet[IPv6].haslayer(
1167                 ICMPv6ND_RS), 1)
1168             dst = ip6_normalize(packet[IPv6].dst)
1169             dst2 = ip6_normalize("ff02::2")
1170             self.assert_equal(dst, dst2)
1171             src = ip6_normalize(packet[IPv6].src)
1172             src2 = ip6_normalize(self.pg1.local_ip6_ll)
1173             self.assert_equal(src, src2)
1174             self.assertTrue(
1175                 bool(packet[ICMPv6ND_RS].haslayer(
1176                     ICMPv6NDOptSrcLLAddr)))
1177             self.assert_equal(
1178                 packet[ICMPv6NDOptSrcLLAddr].lladdr,
1179                 self.pg1.local_mac)
1180
1181     def verify_prefix_info(self, reported_prefix, prefix_option):
1182         prefix = IPv6Network(
1183             text_type(prefix_option.getfieldval("prefix") +
1184                       "/" +
1185                       text_type(prefix_option.getfieldval("prefixlen"))),
1186             strict=False)
1187         self.assert_equal(reported_prefix.prefix.network_address,
1188                           prefix.network_address)
1189         L = prefix_option.getfieldval("L")
1190         A = prefix_option.getfieldval("A")
1191         option_flags = (L << 7) | (A << 6)
1192         self.assert_equal(reported_prefix.flags, option_flags)
1193         self.assert_equal(reported_prefix.valid_time,
1194                           prefix_option.getfieldval("validlifetime"))
1195         self.assert_equal(reported_prefix.preferred_time,
1196                           prefix_option.getfieldval("preferredlifetime"))
1197
1198     def test_rd_receive_router_advertisement(self):
1199         """ Verify events triggered by received RA packets """
1200
1201         self.vapi.want_ip6_ra_events(enable=1)
1202
1203         prefix_info_1 = ICMPv6NDOptPrefixInfo(
1204             prefix="1::2",
1205             prefixlen=50,
1206             validlifetime=200,
1207             preferredlifetime=500,
1208             L=1,
1209             A=1,
1210         )
1211
1212         prefix_info_2 = ICMPv6NDOptPrefixInfo(
1213             prefix="7::4",
1214             prefixlen=20,
1215             validlifetime=70,
1216             preferredlifetime=1000,
1217             L=1,
1218             A=0,
1219         )
1220
1221         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1222              IPv6(dst=self.pg1.local_ip6_ll,
1223                   src=mk_ll_addr(self.pg1.remote_mac)) /
1224              ICMPv6ND_RA() /
1225              prefix_info_1 /
1226              prefix_info_2)
1227         self.pg1.add_stream([p])
1228         self.pg_start()
1229
1230         ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1231
1232         self.assert_equal(ev.current_hop_limit, 0)
1233         self.assert_equal(ev.flags, 8)
1234         self.assert_equal(ev.router_lifetime_in_sec, 1800)
1235         self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1236         self.assert_equal(
1237             ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0)
1238
1239         self.assert_equal(ev.n_prefixes, 2)
1240
1241         self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1242         self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1243
1244
1245 class TestIPv6RDControlPlane(TestIPv6ND):
1246     """ IPv6 Router Discovery Control Plane Test Case """
1247
1248     @classmethod
1249     def setUpClass(cls):
1250         super(TestIPv6RDControlPlane, cls).setUpClass()
1251
1252     @classmethod
1253     def tearDownClass(cls):
1254         super(TestIPv6RDControlPlane, cls).tearDownClass()
1255
1256     def setUp(self):
1257         super(TestIPv6RDControlPlane, self).setUp()
1258
1259         # create 1 pg interface
1260         self.create_pg_interfaces(range(1))
1261
1262         self.interfaces = list(self.pg_interfaces)
1263
1264         # setup all interfaces
1265         for i in self.interfaces:
1266             i.admin_up()
1267             i.config_ip6()
1268
1269     def tearDown(self):
1270         super(TestIPv6RDControlPlane, self).tearDown()
1271
1272     @staticmethod
1273     def create_ra_packet(pg, routerlifetime=None):
1274         src_ip = pg.remote_ip6_ll
1275         dst_ip = pg.local_ip6
1276         if routerlifetime is not None:
1277             ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1278         else:
1279             ra = ICMPv6ND_RA()
1280         p = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
1281              IPv6(dst=dst_ip, src=src_ip) / ra)
1282         return p
1283
1284     @staticmethod
1285     def get_default_routes(fib):
1286         list = []
1287         for entry in fib:
1288             if entry.route.prefix.prefixlen == 0:
1289                 for path in entry.route.paths:
1290                     if path.sw_if_index != 0xFFFFFFFF:
1291                         defaut_route = {}
1292                         defaut_route['sw_if_index'] = path.sw_if_index
1293                         defaut_route['next_hop'] = path.nh.address.ip6
1294                         list.append(defaut_route)
1295         return list
1296
1297     @staticmethod
1298     def get_interface_addresses(fib, pg):
1299         list = []
1300         for entry in fib:
1301             if entry.route.prefix.prefixlen == 128:
1302                 path = entry.route.paths[0]
1303                 if path.sw_if_index == pg.sw_if_index:
1304                     list.append(str(entry.route.prefix.network_address))
1305         return list
1306
1307     def wait_for_no_default_route(self, n_tries=50, s_time=1):
1308         while (n_tries):
1309             fib = self.vapi.ip_route_dump(0, True)
1310             default_routes = self.get_default_routes(fib)
1311             if 0 == len(default_routes):
1312                 return True
1313             n_tries = n_tries - 1
1314             self.sleep(s_time)
1315
1316         return False
1317
1318     def test_all(self):
1319         """ Test handling of SLAAC addresses and default routes """
1320
1321         fib = self.vapi.ip_route_dump(0, True)
1322         default_routes = self.get_default_routes(fib)
1323         initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1324         self.assertEqual(default_routes, [])
1325         router_address = IPv6Address(text_type(self.pg0.remote_ip6_ll))
1326
1327         self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1328
1329         self.sleep(0.1)
1330
1331         # send RA
1332         packet = (self.create_ra_packet(
1333             self.pg0) / ICMPv6NDOptPrefixInfo(
1334             prefix="1::",
1335             prefixlen=64,
1336             validlifetime=2,
1337             preferredlifetime=2,
1338             L=1,
1339             A=1,
1340         ) / ICMPv6NDOptPrefixInfo(
1341             prefix="7::",
1342             prefixlen=20,
1343             validlifetime=1500,
1344             preferredlifetime=1000,
1345             L=1,
1346             A=0,
1347         ))
1348         self.pg0.add_stream([packet])
1349         self.pg_start()
1350
1351         self.sleep_on_vpp_time(0.1)
1352
1353         fib = self.vapi.ip_route_dump(0, True)
1354
1355         # check FIB for new address
1356         addresses = set(self.get_interface_addresses(fib, self.pg0))
1357         new_addresses = addresses.difference(initial_addresses)
1358         self.assertEqual(len(new_addresses), 1)
1359         prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
1360                              strict=False)
1361         self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
1362
1363         # check FIB for new default route
1364         default_routes = self.get_default_routes(fib)
1365         self.assertEqual(len(default_routes), 1)
1366         dr = default_routes[0]
1367         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1368         self.assertEqual(dr['next_hop'], router_address)
1369
1370         # send RA to delete default route
1371         packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1372         self.pg0.add_stream([packet])
1373         self.pg_start()
1374
1375         self.sleep_on_vpp_time(0.1)
1376
1377         # check that default route is deleted
1378         fib = self.vapi.ip_route_dump(0, True)
1379         default_routes = self.get_default_routes(fib)
1380         self.assertEqual(len(default_routes), 0)
1381
1382         self.sleep_on_vpp_time(0.1)
1383
1384         # send RA
1385         packet = self.create_ra_packet(self.pg0)
1386         self.pg0.add_stream([packet])
1387         self.pg_start()
1388
1389         self.sleep_on_vpp_time(0.1)
1390
1391         # check FIB for new default route
1392         fib = self.vapi.ip_route_dump(0, True)
1393         default_routes = self.get_default_routes(fib)
1394         self.assertEqual(len(default_routes), 1)
1395         dr = default_routes[0]
1396         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1397         self.assertEqual(dr['next_hop'], router_address)
1398
1399         # send RA, updating router lifetime to 1s
1400         packet = self.create_ra_packet(self.pg0, 1)
1401         self.pg0.add_stream([packet])
1402         self.pg_start()
1403
1404         self.sleep_on_vpp_time(0.1)
1405
1406         # check that default route still exists
1407         fib = self.vapi.ip_route_dump(0, True)
1408         default_routes = self.get_default_routes(fib)
1409         self.assertEqual(len(default_routes), 1)
1410         dr = default_routes[0]
1411         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1412         self.assertEqual(dr['next_hop'], router_address)
1413
1414         self.sleep_on_vpp_time(1)
1415
1416         # check that default route is deleted
1417         self.assertTrue(self.wait_for_no_default_route())
1418
1419         # check FIB still contains the SLAAC address
1420         addresses = set(self.get_interface_addresses(fib, self.pg0))
1421         new_addresses = addresses.difference(initial_addresses)
1422
1423         self.assertEqual(len(new_addresses), 1)
1424         prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
1425                              strict=False)
1426         self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
1427
1428         self.sleep_on_vpp_time(1)
1429
1430         # check that SLAAC address is deleted
1431         fib = self.vapi.ip_route_dump(0, True)
1432         addresses = set(self.get_interface_addresses(fib, self.pg0))
1433         new_addresses = addresses.difference(initial_addresses)
1434         self.assertEqual(len(new_addresses), 0)
1435
1436
1437 class IPv6NDProxyTest(TestIPv6ND):
1438     """ IPv6 ND ProxyTest Case """
1439
1440     @classmethod
1441     def setUpClass(cls):
1442         super(IPv6NDProxyTest, cls).setUpClass()
1443
1444     @classmethod
1445     def tearDownClass(cls):
1446         super(IPv6NDProxyTest, cls).tearDownClass()
1447
1448     def setUp(self):
1449         super(IPv6NDProxyTest, self).setUp()
1450
1451         # create 3 pg interfaces
1452         self.create_pg_interfaces(range(3))
1453
1454         # pg0 is the master interface, with the configured subnet
1455         self.pg0.admin_up()
1456         self.pg0.config_ip6()
1457         self.pg0.resolve_ndp()
1458
1459         self.pg1.ip6_enable()
1460         self.pg2.ip6_enable()
1461
1462     def tearDown(self):
1463         super(IPv6NDProxyTest, self).tearDown()
1464
1465     def test_nd_proxy(self):
1466         """ IPv6 Proxy ND """
1467
1468         #
1469         # Generate some hosts in the subnet that we are proxying
1470         #
1471         self.pg0.generate_remote_hosts(8)
1472
1473         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1474         d = inet_ntop(AF_INET6, nsma)
1475
1476         #
1477         # Send an NS for one of those remote hosts on one of the proxy links
1478         # expect no response since it's from an address that is not
1479         # on the link that has the prefix configured
1480         #
1481         ns_pg1 = (Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac) /
1482                   IPv6(dst=d,
1483                        src=self.pg0._remote_hosts[2].ip6) /
1484                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1485                   ICMPv6NDOptSrcLLAddr(
1486                       lladdr=self.pg0._remote_hosts[2].mac))
1487
1488         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1489
1490         #
1491         # Add proxy support for the host
1492         #
1493         self.vapi.ip6nd_proxy_add_del(
1494             is_add=1, ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1495             sw_if_index=self.pg1.sw_if_index)
1496
1497         #
1498         # try that NS again. this time we expect an NA back
1499         #
1500         self.send_and_expect_na(self.pg1, ns_pg1,
1501                                 "NS to proxy entry",
1502                                 dst_ip=self.pg0._remote_hosts[2].ip6,
1503                                 tgt_ip=self.pg0.local_ip6)
1504
1505         #
1506         # ... and that we have an entry in the ND cache
1507         #
1508         self.assertTrue(find_nbr(self,
1509                                  self.pg1.sw_if_index,
1510                                  self.pg0._remote_hosts[2].ip6))
1511
1512         #
1513         # ... and we can route traffic to it
1514         #
1515         t = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1516              IPv6(dst=self.pg0._remote_hosts[2].ip6,
1517                   src=self.pg0.remote_ip6) /
1518              inet6.UDP(sport=10000, dport=20000) /
1519              Raw(b'\xa5' * 100))
1520
1521         self.pg0.add_stream(t)
1522         self.pg_enable_capture(self.pg_interfaces)
1523         self.pg_start()
1524         rx = self.pg1.get_capture(1)
1525         rx = rx[0]
1526
1527         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1528         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1529
1530         self.assertEqual(rx[IPv6].src,
1531                          t[IPv6].src)
1532         self.assertEqual(rx[IPv6].dst,
1533                          t[IPv6].dst)
1534
1535         #
1536         # Test we proxy for the host on the main interface
1537         #
1538         ns_pg0 = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
1539                   IPv6(dst=d, src=self.pg0.remote_ip6) /
1540                   ICMPv6ND_NS(
1541                       tgt=self.pg0._remote_hosts[2].ip6) /
1542                   ICMPv6NDOptSrcLLAddr(
1543                       lladdr=self.pg0.remote_mac))
1544
1545         self.send_and_expect_na(self.pg0, ns_pg0,
1546                                 "NS to proxy entry on main",
1547                                 tgt_ip=self.pg0._remote_hosts[2].ip6,
1548                                 dst_ip=self.pg0.remote_ip6)
1549
1550         #
1551         # Setup and resolve proxy for another host on another interface
1552         #
1553         ns_pg2 = (Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac) /
1554                   IPv6(dst=d,
1555                        src=self.pg0._remote_hosts[3].ip6) /
1556                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1557                   ICMPv6NDOptSrcLLAddr(
1558                       lladdr=self.pg0._remote_hosts[2].mac))
1559
1560         self.vapi.ip6nd_proxy_add_del(
1561             is_add=1, ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1562             sw_if_index=self.pg2.sw_if_index)
1563
1564         self.send_and_expect_na(self.pg2, ns_pg2,
1565                                 "NS to proxy entry other interface",
1566                                 dst_ip=self.pg0._remote_hosts[3].ip6,
1567                                 tgt_ip=self.pg0.local_ip6)
1568
1569         self.assertTrue(find_nbr(self,
1570                                  self.pg2.sw_if_index,
1571                                  self.pg0._remote_hosts[3].ip6))
1572
1573         #
1574         # hosts can communicate. pg2->pg1
1575         #
1576         t2 = (Ether(dst=self.pg2.local_mac,
1577                     src=self.pg0.remote_hosts[3].mac) /
1578               IPv6(dst=self.pg0._remote_hosts[2].ip6,
1579                    src=self.pg0._remote_hosts[3].ip6) /
1580               inet6.UDP(sport=10000, dport=20000) /
1581               Raw(b'\xa5' * 100))
1582
1583         self.pg2.add_stream(t2)
1584         self.pg_enable_capture(self.pg_interfaces)
1585         self.pg_start()
1586         rx = self.pg1.get_capture(1)
1587         rx = rx[0]
1588
1589         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1590         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1591
1592         self.assertEqual(rx[IPv6].src,
1593                          t2[IPv6].src)
1594         self.assertEqual(rx[IPv6].dst,
1595                          t2[IPv6].dst)
1596
1597         #
1598         # remove the proxy configs
1599         #
1600         self.vapi.ip6nd_proxy_add_del(
1601             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1602             sw_if_index=self.pg1.sw_if_index, is_add=0)
1603         self.vapi.ip6nd_proxy_add_del(
1604             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1605             sw_if_index=self.pg2.sw_if_index, is_add=0)
1606
1607         self.assertFalse(find_nbr(self,
1608                                   self.pg2.sw_if_index,
1609                                   self.pg0._remote_hosts[3].ip6))
1610         self.assertFalse(find_nbr(self,
1611                                   self.pg1.sw_if_index,
1612                                   self.pg0._remote_hosts[2].ip6))
1613
1614         #
1615         # no longer proxy-ing...
1616         #
1617         self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1618         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1619         self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1620
1621         #
1622         # no longer forwarding. traffic generates NS out of the glean/main
1623         # interface
1624         #
1625         self.pg2.add_stream(t2)
1626         self.pg_enable_capture(self.pg_interfaces)
1627         self.pg_start()
1628
1629         rx = self.pg0.get_capture(1)
1630
1631         self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1632
1633
1634 class TestIPNull(VppTestCase):
1635     """ IPv6 routes via NULL """
1636
1637     @classmethod
1638     def setUpClass(cls):
1639         super(TestIPNull, cls).setUpClass()
1640
1641     @classmethod
1642     def tearDownClass(cls):
1643         super(TestIPNull, cls).tearDownClass()
1644
1645     def setUp(self):
1646         super(TestIPNull, self).setUp()
1647
1648         # create 2 pg interfaces
1649         self.create_pg_interfaces(range(1))
1650
1651         for i in self.pg_interfaces:
1652             i.admin_up()
1653             i.config_ip6()
1654             i.resolve_ndp()
1655
1656     def tearDown(self):
1657         super(TestIPNull, self).tearDown()
1658         for i in self.pg_interfaces:
1659             i.unconfig_ip6()
1660             i.admin_down()
1661
1662     def test_ip_null(self):
1663         """ IP NULL route """
1664
1665         p = (Ether(src=self.pg0.remote_mac,
1666                    dst=self.pg0.local_mac) /
1667              IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
1668              inet6.UDP(sport=1234, dport=1234) /
1669              Raw(b'\xa5' * 100))
1670
1671         #
1672         # A route via IP NULL that will reply with ICMP unreachables
1673         #
1674         ip_unreach = VppIpRoute(
1675             self, "2001::", 64,
1676             [VppRoutePath("::", 0xffffffff,
1677                           type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH)])
1678         ip_unreach.add_vpp_config()
1679
1680         self.pg0.add_stream(p)
1681         self.pg_enable_capture(self.pg_interfaces)
1682         self.pg_start()
1683
1684         rx = self.pg0.get_capture(1)
1685         rx = rx[0]
1686         icmp = rx[ICMPv6DestUnreach]
1687
1688         # 0 = "No route to destination"
1689         self.assertEqual(icmp.code, 0)
1690
1691         # ICMP is rate limited. pause a bit
1692         self.sleep(1)
1693
1694         #
1695         # A route via IP NULL that will reply with ICMP prohibited
1696         #
1697         ip_prohibit = VppIpRoute(
1698             self, "2001::1", 128,
1699             [VppRoutePath("::", 0xffffffff,
1700                           type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT)])
1701         ip_prohibit.add_vpp_config()
1702
1703         self.pg0.add_stream(p)
1704         self.pg_enable_capture(self.pg_interfaces)
1705         self.pg_start()
1706
1707         rx = self.pg0.get_capture(1)
1708         rx = rx[0]
1709         icmp = rx[ICMPv6DestUnreach]
1710
1711         # 1 = "Communication with destination administratively prohibited"
1712         self.assertEqual(icmp.code, 1)
1713
1714
1715 class TestIPDisabled(VppTestCase):
1716     """ IPv6 disabled """
1717
1718     @classmethod
1719     def setUpClass(cls):
1720         super(TestIPDisabled, cls).setUpClass()
1721
1722     @classmethod
1723     def tearDownClass(cls):
1724         super(TestIPDisabled, cls).tearDownClass()
1725
1726     def setUp(self):
1727         super(TestIPDisabled, self).setUp()
1728
1729         # create 2 pg interfaces
1730         self.create_pg_interfaces(range(2))
1731
1732         # PG0 is IP enabled
1733         self.pg0.admin_up()
1734         self.pg0.config_ip6()
1735         self.pg0.resolve_ndp()
1736
1737         # PG 1 is not IP enabled
1738         self.pg1.admin_up()
1739
1740     def tearDown(self):
1741         super(TestIPDisabled, self).tearDown()
1742         for i in self.pg_interfaces:
1743             i.unconfig_ip4()
1744             i.admin_down()
1745
1746     def test_ip_disabled(self):
1747         """ IP Disabled """
1748
1749         #
1750         # An (S,G).
1751         # one accepting interface, pg0, 2 forwarding interfaces
1752         #
1753         route_ff_01 = VppIpMRoute(
1754             self,
1755             "::",
1756             "ffef::1", 128,
1757             MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
1758             [VppMRoutePath(self.pg1.sw_if_index,
1759                            MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT),
1760              VppMRoutePath(self.pg0.sw_if_index,
1761                            MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)])
1762         route_ff_01.add_vpp_config()
1763
1764         pu = (Ether(src=self.pg1.remote_mac,
1765                     dst=self.pg1.local_mac) /
1766               IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
1767               inet6.UDP(sport=1234, dport=1234) /
1768               Raw(b'\xa5' * 100))
1769         pm = (Ether(src=self.pg1.remote_mac,
1770                     dst=self.pg1.local_mac) /
1771               IPv6(src="2001::1", dst="ffef::1") /
1772               inet6.UDP(sport=1234, dport=1234) /
1773               Raw(b'\xa5' * 100))
1774
1775         #
1776         # PG1 does not forward IP traffic
1777         #
1778         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1779         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1780
1781         #
1782         # IP enable PG1
1783         #
1784         self.pg1.config_ip6()
1785
1786         #
1787         # Now we get packets through
1788         #
1789         self.pg1.add_stream(pu)
1790         self.pg_enable_capture(self.pg_interfaces)
1791         self.pg_start()
1792         rx = self.pg0.get_capture(1)
1793
1794         self.pg1.add_stream(pm)
1795         self.pg_enable_capture(self.pg_interfaces)
1796         self.pg_start()
1797         rx = self.pg0.get_capture(1)
1798
1799         #
1800         # Disable PG1
1801         #
1802         self.pg1.unconfig_ip6()
1803
1804         #
1805         # PG1 does not forward IP traffic
1806         #
1807         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1808         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1809
1810
1811 class TestIP6LoadBalance(VppTestCase):
1812     """ IPv6 Load-Balancing """
1813
1814     @classmethod
1815     def setUpClass(cls):
1816         super(TestIP6LoadBalance, cls).setUpClass()
1817
1818     @classmethod
1819     def tearDownClass(cls):
1820         super(TestIP6LoadBalance, cls).tearDownClass()
1821
1822     def setUp(self):
1823         super(TestIP6LoadBalance, self).setUp()
1824
1825         self.create_pg_interfaces(range(5))
1826
1827         mpls_tbl = VppMplsTable(self, 0)
1828         mpls_tbl.add_vpp_config()
1829
1830         for i in self.pg_interfaces:
1831             i.admin_up()
1832             i.config_ip6()
1833             i.resolve_ndp()
1834             i.enable_mpls()
1835
1836     def tearDown(self):
1837         for i in self.pg_interfaces:
1838             i.unconfig_ip6()
1839             i.admin_down()
1840             i.disable_mpls()
1841         super(TestIP6LoadBalance, self).tearDown()
1842
1843     def pg_send(self, input, pkts):
1844         self.vapi.cli("clear trace")
1845         input.add_stream(pkts)
1846         self.pg_enable_capture(self.pg_interfaces)
1847         self.pg_start()
1848
1849     def send_and_expect_load_balancing(self, input, pkts, outputs):
1850         self.pg_send(input, pkts)
1851         for oo in outputs:
1852             rx = oo._get_capture(1)
1853             self.assertNotEqual(0, len(rx))
1854
1855     def send_and_expect_one_itf(self, input, pkts, itf):
1856         self.pg_send(input, pkts)
1857         rx = itf.get_capture(len(pkts))
1858
1859     def test_ip6_load_balance(self):
1860         """ IPv6 Load-Balancing """
1861
1862         #
1863         # An array of packets that differ only in the destination port
1864         #  - IP only
1865         #  - MPLS EOS
1866         #  - MPLS non-EOS
1867         #  - MPLS non-EOS with an entropy label
1868         #
1869         port_ip_pkts = []
1870         port_mpls_pkts = []
1871         port_mpls_neos_pkts = []
1872         port_ent_pkts = []
1873
1874         #
1875         # An array of packets that differ only in the source address
1876         #
1877         src_ip_pkts = []
1878         src_mpls_pkts = []
1879
1880         for ii in range(NUM_PKTS):
1881             port_ip_hdr = (
1882                 IPv6(dst="3000::1", src="3000:1::1") /
1883                 inet6.UDP(sport=1234, dport=1234 + ii) /
1884                 Raw(b'\xa5' * 100))
1885             port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1886                                        dst=self.pg0.local_mac) /
1887                                  port_ip_hdr))
1888             port_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1889                                          dst=self.pg0.local_mac) /
1890                                    MPLS(label=66, ttl=2) /
1891                                    port_ip_hdr))
1892             port_mpls_neos_pkts.append((Ether(src=self.pg0.remote_mac,
1893                                               dst=self.pg0.local_mac) /
1894                                         MPLS(label=67, ttl=2) /
1895                                         MPLS(label=77, ttl=2) /
1896                                         port_ip_hdr))
1897             port_ent_pkts.append((Ether(src=self.pg0.remote_mac,
1898                                         dst=self.pg0.local_mac) /
1899                                   MPLS(label=67, ttl=2) /
1900                                   MPLS(label=14, ttl=2) /
1901                                   MPLS(label=999, ttl=2) /
1902                                   port_ip_hdr))
1903             src_ip_hdr = (
1904                 IPv6(dst="3000::1", src="3000:1::%d" % ii) /
1905                 inet6.UDP(sport=1234, dport=1234) /
1906                 Raw(b'\xa5' * 100))
1907             src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1908                                       dst=self.pg0.local_mac) /
1909                                 src_ip_hdr))
1910             src_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1911                                         dst=self.pg0.local_mac) /
1912                                   MPLS(label=66, ttl=2) /
1913                                   src_ip_hdr))
1914
1915         #
1916         # A route for the IP packets
1917         #
1918         route_3000_1 = VppIpRoute(self, "3000::1", 128,
1919                                   [VppRoutePath(self.pg1.remote_ip6,
1920                                                 self.pg1.sw_if_index),
1921                                    VppRoutePath(self.pg2.remote_ip6,
1922                                                 self.pg2.sw_if_index)])
1923         route_3000_1.add_vpp_config()
1924
1925         #
1926         # a local-label for the EOS packets
1927         #
1928         binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
1929         binding.add_vpp_config()
1930
1931         #
1932         # An MPLS route for the non-EOS packets
1933         #
1934         route_67 = VppMplsRoute(self, 67, 0,
1935                                 [VppRoutePath(self.pg1.remote_ip6,
1936                                               self.pg1.sw_if_index,
1937                                               labels=[67]),
1938                                  VppRoutePath(self.pg2.remote_ip6,
1939                                               self.pg2.sw_if_index,
1940                                               labels=[67])])
1941         route_67.add_vpp_config()
1942
1943         #
1944         # inject the packet on pg0 - expect load-balancing across the 2 paths
1945         #  - since the default hash config is to use IP src,dst and port
1946         #    src,dst
1947         # We are not going to ensure equal amounts of packets across each link,
1948         # since the hash algorithm is statistical and therefore this can never
1949         # be guaranteed. But with 64 different packets we do expect some
1950         # balancing. So instead just ensure there is traffic on each link.
1951         #
1952         self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
1953                                             [self.pg1, self.pg2])
1954         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1955                                             [self.pg1, self.pg2])
1956         self.send_and_expect_load_balancing(self.pg0, port_mpls_pkts,
1957                                             [self.pg1, self.pg2])
1958         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1959                                             [self.pg1, self.pg2])
1960         self.send_and_expect_load_balancing(self.pg0, port_mpls_neos_pkts,
1961                                             [self.pg1, self.pg2])
1962
1963         #
1964         # The packets with Entropy label in should not load-balance,
1965         # since the Entropy value is fixed.
1966         #
1967         self.send_and_expect_one_itf(self.pg0, port_ent_pkts, self.pg1)
1968
1969         #
1970         # change the flow hash config so it's only IP src,dst
1971         #  - now only the stream with differing source address will
1972         #    load-balance
1973         #
1974         self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=0, dport=0,
1975                                    is_ipv6=1)
1976
1977         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1978                                             [self.pg1, self.pg2])
1979         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1980                                             [self.pg1, self.pg2])
1981         self.send_and_expect_one_itf(self.pg0, port_ip_pkts, self.pg2)
1982
1983         #
1984         # change the flow hash config back to defaults
1985         #
1986         self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=1, dport=1,
1987                                    is_ipv6=1)
1988
1989         #
1990         # Recursive prefixes
1991         #  - testing that 2 stages of load-balancing occurs and there is no
1992         #    polarisation (i.e. only 2 of 4 paths are used)
1993         #
1994         port_pkts = []
1995         src_pkts = []
1996
1997         for ii in range(257):
1998             port_pkts.append((Ether(src=self.pg0.remote_mac,
1999                                     dst=self.pg0.local_mac) /
2000                               IPv6(dst="4000::1",
2001                                    src="4000:1::1") /
2002                               inet6.UDP(sport=1234,
2003                                         dport=1234 + ii) /
2004                               Raw(b'\xa5' * 100)))
2005             src_pkts.append((Ether(src=self.pg0.remote_mac,
2006                                    dst=self.pg0.local_mac) /
2007                              IPv6(dst="4000::1",
2008                                   src="4000:1::%d" % ii) /
2009                              inet6.UDP(sport=1234, dport=1234) /
2010                              Raw(b'\xa5' * 100)))
2011
2012         route_3000_2 = VppIpRoute(self, "3000::2", 128,
2013                                   [VppRoutePath(self.pg3.remote_ip6,
2014                                                 self.pg3.sw_if_index),
2015                                    VppRoutePath(self.pg4.remote_ip6,
2016                                                 self.pg4.sw_if_index)])
2017         route_3000_2.add_vpp_config()
2018
2019         route_4000_1 = VppIpRoute(self, "4000::1", 128,
2020                                   [VppRoutePath("3000::1",
2021                                                 0xffffffff),
2022                                    VppRoutePath("3000::2",
2023                                                 0xffffffff)])
2024         route_4000_1.add_vpp_config()
2025
2026         #
2027         # inject the packet on pg0 - expect load-balancing across all 4 paths
2028         #
2029         self.vapi.cli("clear trace")
2030         self.send_and_expect_load_balancing(self.pg0, port_pkts,
2031                                             [self.pg1, self.pg2,
2032                                              self.pg3, self.pg4])
2033         self.send_and_expect_load_balancing(self.pg0, src_pkts,
2034                                             [self.pg1, self.pg2,
2035                                              self.pg3, self.pg4])
2036
2037         #
2038         # Recursive prefixes
2039         #  - testing that 2 stages of load-balancing no choices
2040         #
2041         port_pkts = []
2042
2043         for ii in range(257):
2044             port_pkts.append((Ether(src=self.pg0.remote_mac,
2045                                     dst=self.pg0.local_mac) /
2046                               IPv6(dst="6000::1",
2047                                    src="6000:1::1") /
2048                               inet6.UDP(sport=1234,
2049                                         dport=1234 + ii) /
2050                               Raw(b'\xa5' * 100)))
2051
2052         route_5000_2 = VppIpRoute(self, "5000::2", 128,
2053                                   [VppRoutePath(self.pg3.remote_ip6,
2054                                                 self.pg3.sw_if_index)])
2055         route_5000_2.add_vpp_config()
2056
2057         route_6000_1 = VppIpRoute(self, "6000::1", 128,
2058                                   [VppRoutePath("5000::2",
2059                                                 0xffffffff)])
2060         route_6000_1.add_vpp_config()
2061
2062         #
2063         # inject the packet on pg0 - expect load-balancing across all 4 paths
2064         #
2065         self.vapi.cli("clear trace")
2066         self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg3)
2067
2068
2069 class TestIP6Punt(VppTestCase):
2070     """ IPv6 Punt Police/Redirect """
2071
2072     @classmethod
2073     def setUpClass(cls):
2074         super(TestIP6Punt, cls).setUpClass()
2075
2076     @classmethod
2077     def tearDownClass(cls):
2078         super(TestIP6Punt, cls).tearDownClass()
2079
2080     def setUp(self):
2081         super(TestIP6Punt, self).setUp()
2082
2083         self.create_pg_interfaces(range(4))
2084
2085         for i in self.pg_interfaces:
2086             i.admin_up()
2087             i.config_ip6()
2088             i.resolve_ndp()
2089
2090     def tearDown(self):
2091         super(TestIP6Punt, self).tearDown()
2092         for i in self.pg_interfaces:
2093             i.unconfig_ip6()
2094             i.admin_down()
2095
2096     def test_ip_punt(self):
2097         """ IP6 punt police and redirect """
2098
2099         p = (Ether(src=self.pg0.remote_mac,
2100                    dst=self.pg0.local_mac) /
2101              IPv6(src=self.pg0.remote_ip6,
2102                   dst=self.pg0.local_ip6) /
2103              inet6.TCP(sport=1234, dport=1234) /
2104              Raw(b'\xa5' * 100))
2105
2106         pkts = p * 1025
2107
2108         #
2109         # Configure a punt redirect via pg1.
2110         #
2111         nh_addr = self.pg1.remote_ip6
2112         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2113                                    self.pg1.sw_if_index,
2114                                    nh_addr)
2115
2116         self.send_and_expect(self.pg0, pkts, self.pg1)
2117
2118         #
2119         # add a policer
2120         #
2121         policer = self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
2122                                             rate_type=1)
2123         self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
2124
2125         self.vapi.cli("clear trace")
2126         self.pg0.add_stream(pkts)
2127         self.pg_enable_capture(self.pg_interfaces)
2128         self.pg_start()
2129
2130         #
2131         # the number of packet received should be greater than 0,
2132         # but not equal to the number sent, since some were policed
2133         #
2134         rx = self.pg1._get_capture(1)
2135         self.assertGreater(len(rx), 0)
2136         self.assertLess(len(rx), len(pkts))
2137
2138         #
2139         # remove the policer. back to full rx
2140         #
2141         self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
2142         self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
2143                                   rate_type=1, is_add=0)
2144         self.send_and_expect(self.pg0, pkts, self.pg1)
2145
2146         #
2147         # remove the redirect. expect full drop.
2148         #
2149         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2150                                    self.pg1.sw_if_index,
2151                                    nh_addr,
2152                                    is_add=0)
2153         self.send_and_assert_no_replies(self.pg0, pkts,
2154                                         "IP no punt config")
2155
2156         #
2157         # Add a redirect that is not input port selective
2158         #
2159         self.vapi.ip_punt_redirect(0xffffffff,
2160                                    self.pg1.sw_if_index,
2161                                    nh_addr)
2162         self.send_and_expect(self.pg0, pkts, self.pg1)
2163
2164         self.vapi.ip_punt_redirect(0xffffffff,
2165                                    self.pg1.sw_if_index,
2166                                    nh_addr,
2167                                    is_add=0)
2168
2169     def test_ip_punt_dump(self):
2170         """ IP6 punt redirect dump"""
2171
2172         #
2173         # Configure a punt redirects
2174         #
2175         nh_addr = self.pg3.remote_ip6
2176         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2177                                    self.pg3.sw_if_index,
2178                                    nh_addr)
2179         self.vapi.ip_punt_redirect(self.pg1.sw_if_index,
2180                                    self.pg3.sw_if_index,
2181                                    nh_addr)
2182         self.vapi.ip_punt_redirect(self.pg2.sw_if_index,
2183                                    self.pg3.sw_if_index,
2184                                    '0::0')
2185
2186         #
2187         # Dump pg0 punt redirects
2188         #
2189         punts = self.vapi.ip_punt_redirect_dump(self.pg0.sw_if_index,
2190                                                 is_ipv6=1)
2191         for p in punts:
2192             self.assertEqual(p.punt.rx_sw_if_index, self.pg0.sw_if_index)
2193
2194         #
2195         # Dump punt redirects for all interfaces
2196         #
2197         punts = self.vapi.ip_punt_redirect_dump(0xffffffff, is_ipv6=1)
2198         self.assertEqual(len(punts), 3)
2199         for p in punts:
2200             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
2201         self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
2202         self.assertEqual(str(punts[2].punt.nh), '::')
2203
2204
2205 class TestIPDeag(VppTestCase):
2206     """ IPv6 Deaggregate Routes """
2207
2208     @classmethod
2209     def setUpClass(cls):
2210         super(TestIPDeag, cls).setUpClass()
2211
2212     @classmethod
2213     def tearDownClass(cls):
2214         super(TestIPDeag, cls).tearDownClass()
2215
2216     def setUp(self):
2217         super(TestIPDeag, self).setUp()
2218
2219         self.create_pg_interfaces(range(3))
2220
2221         for i in self.pg_interfaces:
2222             i.admin_up()
2223             i.config_ip6()
2224             i.resolve_ndp()
2225
2226     def tearDown(self):
2227         super(TestIPDeag, self).tearDown()
2228         for i in self.pg_interfaces:
2229             i.unconfig_ip6()
2230             i.admin_down()
2231
2232     def test_ip_deag(self):
2233         """ IP Deag Routes """
2234
2235         #
2236         # Create a table to be used for:
2237         #  1 - another destination address lookup
2238         #  2 - a source address lookup
2239         #
2240         table_dst = VppIpTable(self, 1, is_ip6=1)
2241         table_src = VppIpTable(self, 2, is_ip6=1)
2242         table_dst.add_vpp_config()
2243         table_src.add_vpp_config()
2244
2245         #
2246         # Add a route in the default table to point to a deag/
2247         # second lookup in each of these tables
2248         #
2249         route_to_dst = VppIpRoute(self, "1::1", 128,
2250                                   [VppRoutePath("::",
2251                                                 0xffffffff,
2252                                                 nh_table_id=1)])
2253         route_to_src = VppIpRoute(
2254             self, "1::2", 128,
2255             [VppRoutePath("::",
2256                           0xffffffff,
2257                           nh_table_id=2,
2258                           type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP)])
2259
2260         route_to_dst.add_vpp_config()
2261         route_to_src.add_vpp_config()
2262
2263         #
2264         # packets to these destination are dropped, since they'll
2265         # hit the respective default routes in the second table
2266         #
2267         p_dst = (Ether(src=self.pg0.remote_mac,
2268                        dst=self.pg0.local_mac) /
2269                  IPv6(src="5::5", dst="1::1") /
2270                  inet6.TCP(sport=1234, dport=1234) /
2271                  Raw(b'\xa5' * 100))
2272         p_src = (Ether(src=self.pg0.remote_mac,
2273                        dst=self.pg0.local_mac) /
2274                  IPv6(src="2::2", dst="1::2") /
2275                  inet6.TCP(sport=1234, dport=1234) /
2276                  Raw(b'\xa5' * 100))
2277         pkts_dst = p_dst * 257
2278         pkts_src = p_src * 257
2279
2280         self.send_and_assert_no_replies(self.pg0, pkts_dst,
2281                                         "IP in dst table")
2282         self.send_and_assert_no_replies(self.pg0, pkts_src,
2283                                         "IP in src table")
2284
2285         #
2286         # add a route in the dst table to forward via pg1
2287         #
2288         route_in_dst = VppIpRoute(self, "1::1", 128,
2289                                   [VppRoutePath(self.pg1.remote_ip6,
2290                                                 self.pg1.sw_if_index)],
2291                                   table_id=1)
2292         route_in_dst.add_vpp_config()
2293
2294         self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2295
2296         #
2297         # add a route in the src table to forward via pg2
2298         #
2299         route_in_src = VppIpRoute(self, "2::2", 128,
2300                                   [VppRoutePath(self.pg2.remote_ip6,
2301                                                 self.pg2.sw_if_index)],
2302                                   table_id=2)
2303         route_in_src.add_vpp_config()
2304         self.send_and_expect(self.pg0, pkts_src, self.pg2)
2305
2306         #
2307         # loop in the lookup DP
2308         #
2309         route_loop = VppIpRoute(self, "3::3", 128,
2310                                 [VppRoutePath("::",
2311                                               0xffffffff)])
2312         route_loop.add_vpp_config()
2313
2314         p_l = (Ether(src=self.pg0.remote_mac,
2315                      dst=self.pg0.local_mac) /
2316                IPv6(src="3::4", dst="3::3") /
2317                inet6.TCP(sport=1234, dport=1234) /
2318                Raw(b'\xa5' * 100))
2319
2320         self.send_and_assert_no_replies(self.pg0, p_l * 257,
2321                                         "IP lookup loop")
2322
2323
2324 class TestIP6Input(VppTestCase):
2325     """ IPv6 Input Exception Test Cases """
2326
2327     @classmethod
2328     def setUpClass(cls):
2329         super(TestIP6Input, cls).setUpClass()
2330
2331     @classmethod
2332     def tearDownClass(cls):
2333         super(TestIP6Input, cls).tearDownClass()
2334
2335     def setUp(self):
2336         super(TestIP6Input, self).setUp()
2337
2338         self.create_pg_interfaces(range(2))
2339
2340         for i in self.pg_interfaces:
2341             i.admin_up()
2342             i.config_ip6()
2343             i.resolve_ndp()
2344
2345     def tearDown(self):
2346         super(TestIP6Input, self).tearDown()
2347         for i in self.pg_interfaces:
2348             i.unconfig_ip6()
2349             i.admin_down()
2350
2351     def test_ip_input_icmp_reply(self):
2352         """ IP6 Input Exception - Return ICMP (3,0) """
2353         #
2354         # hop limit - ICMP replies
2355         #
2356         p_version = (Ether(src=self.pg0.remote_mac,
2357                            dst=self.pg0.local_mac) /
2358                      IPv6(src=self.pg0.remote_ip6,
2359                           dst=self.pg1.remote_ip6,
2360                           hlim=1) /
2361                      inet6.UDP(sport=1234, dport=1234) /
2362                      Raw(b'\xa5' * 100))
2363
2364         rx = self.send_and_expect(self.pg0, p_version * NUM_PKTS, self.pg0)
2365         rx = rx[0]
2366         icmp = rx[ICMPv6TimeExceeded]
2367
2368         # 0: "hop limit exceeded in transit",
2369         self.assertEqual((icmp.type, icmp.code), (3, 0))
2370
2371     icmpv6_data = '\x0a' * 18
2372     all_0s = "::"
2373     all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
2374
2375     @parameterized.expand([
2376         # Name, src, dst, l4proto, msg, timeout
2377         ("src='iface',   dst='iface'", None, None,
2378          inet6.UDP(sport=1234, dport=1234), "funky version", None),
2379         ("src='All 0's', dst='iface'", all_0s, None,
2380          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2381         ("src='iface',   dst='All 0's'", None, all_0s,
2382          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2383         ("src='All 1's', dst='iface'", all_1s, None,
2384          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2385         ("src='iface',   dst='All 1's'", None, all_1s,
2386          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2387         ("src='All 1's', dst='All 1's'", all_1s, all_1s,
2388          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2389
2390     ])
2391     def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
2392
2393         self._testMethodDoc = 'IPv6 Input Exception - %s' % name
2394
2395         p_version = (Ether(src=self.pg0.remote_mac,
2396                            dst=self.pg0.local_mac) /
2397                      IPv6(src=src or self.pg0.remote_ip6,
2398                           dst=dst or self.pg1.remote_ip6,
2399                           version=3) /
2400                      l4 /
2401                      Raw(b'\xa5' * 100))
2402
2403         self.send_and_assert_no_replies(self.pg0, p_version * NUM_PKTS,
2404                                         remark=msg or "",
2405                                         timeout=timeout)
2406
2407     def test_hop_by_hop(self):
2408         """ Hop-by-hop header test """
2409
2410         p = (Ether(src=self.pg0.remote_mac,
2411                    dst=self.pg0.local_mac) /
2412              IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
2413              IPv6ExtHdrHopByHop() /
2414              inet6.UDP(sport=1234, dport=1234) /
2415              Raw(b'\xa5' * 100))
2416
2417         self.pg0.add_stream(p)
2418         self.pg_enable_capture(self.pg_interfaces)
2419         self.pg_start()
2420
2421
2422 class TestIPReplace(VppTestCase):
2423     """ IPv6 Table Replace """
2424
2425     @classmethod
2426     def setUpClass(cls):
2427         super(TestIPReplace, cls).setUpClass()
2428
2429     @classmethod
2430     def tearDownClass(cls):
2431         super(TestIPReplace, cls).tearDownClass()
2432
2433     def setUp(self):
2434         super(TestIPReplace, self).setUp()
2435
2436         self.create_pg_interfaces(range(4))
2437
2438         table_id = 1
2439         self.tables = []
2440
2441         for i in self.pg_interfaces:
2442             i.admin_up()
2443             i.config_ip6()
2444             i.resolve_arp()
2445             i.generate_remote_hosts(2)
2446             self.tables.append(VppIpTable(self, table_id,
2447                                           True).add_vpp_config())
2448             table_id += 1
2449
2450     def tearDown(self):
2451         super(TestIPReplace, self).tearDown()
2452         for i in self.pg_interfaces:
2453             i.admin_down()
2454             i.unconfig_ip4()
2455
2456     def test_replace(self):
2457         """ IP Table Replace """
2458
2459         N_ROUTES = 20
2460         links = [self.pg0, self.pg1, self.pg2, self.pg3]
2461         routes = [[], [], [], []]
2462
2463         # the sizes of 'empty' tables
2464         for t in self.tables:
2465             self.assertEqual(len(t.dump()), 2)
2466             self.assertEqual(len(t.mdump()), 5)
2467
2468         # load up the tables with some routes
2469         for ii, t in enumerate(self.tables):
2470             for jj in range(1, N_ROUTES):
2471                 uni = VppIpRoute(
2472                     self, "2001::%d" % jj if jj != 0 else "2001::", 128,
2473                     [VppRoutePath(links[ii].remote_hosts[0].ip6,
2474                                   links[ii].sw_if_index),
2475                      VppRoutePath(links[ii].remote_hosts[1].ip6,
2476                                   links[ii].sw_if_index)],
2477                     table_id=t.table_id).add_vpp_config()
2478                 multi = VppIpMRoute(
2479                     self, "::",
2480                     "ff:2001::%d" % jj, 128,
2481                     MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
2482                     [VppMRoutePath(self.pg0.sw_if_index,
2483                                    MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT,
2484                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
2485                      VppMRoutePath(self.pg1.sw_if_index,
2486                                    MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
2487                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
2488                      VppMRoutePath(self.pg2.sw_if_index,
2489                                    MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
2490                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
2491                      VppMRoutePath(self.pg3.sw_if_index,
2492                                    MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
2493                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)],
2494                     table_id=t.table_id).add_vpp_config()
2495                 routes[ii].append({'uni': uni,
2496                                    'multi': multi})
2497
2498         #
2499         # replace the tables a few times
2500         #
2501         for kk in range(3):
2502             # replace each table
2503             for t in self.tables:
2504                 t.replace_begin()
2505
2506             # all the routes are still there
2507             for ii, t in enumerate(self.tables):
2508                 dump = t.dump()
2509                 mdump = t.mdump()
2510                 for r in routes[ii]:
2511                     self.assertTrue(find_route_in_dump(dump, r['uni'], t))
2512                     self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
2513
2514             # redownload the even numbered routes
2515             for ii, t in enumerate(self.tables):
2516                 for jj in range(0, N_ROUTES, 2):
2517                     routes[ii][jj]['uni'].add_vpp_config()
2518                     routes[ii][jj]['multi'].add_vpp_config()
2519
2520             # signal each table converged
2521             for t in self.tables:
2522                 t.replace_end()
2523
2524             # we should find the even routes, but not the odd
2525             for ii, t in enumerate(self.tables):
2526                 dump = t.dump()
2527                 mdump = t.mdump()
2528                 for jj in range(0, N_ROUTES, 2):
2529                     self.assertTrue(find_route_in_dump(
2530                         dump, routes[ii][jj]['uni'], t))
2531                     self.assertTrue(find_mroute_in_dump(
2532                         mdump, routes[ii][jj]['multi'], t))
2533                 for jj in range(1, N_ROUTES - 1, 2):
2534                     self.assertFalse(find_route_in_dump(
2535                         dump, routes[ii][jj]['uni'], t))
2536                     self.assertFalse(find_mroute_in_dump(
2537                         mdump, routes[ii][jj]['multi'], t))
2538
2539             # reload all the routes
2540             for ii, t in enumerate(self.tables):
2541                 for r in routes[ii]:
2542                     r['uni'].add_vpp_config()
2543                     r['multi'].add_vpp_config()
2544
2545             # all the routes are still there
2546             for ii, t in enumerate(self.tables):
2547                 dump = t.dump()
2548                 mdump = t.mdump()
2549                 for r in routes[ii]:
2550                     self.assertTrue(find_route_in_dump(dump, r['uni'], t))
2551                     self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
2552
2553         #
2554         # finally flush the tables for good measure
2555         #
2556         for t in self.tables:
2557             t.flush()
2558             self.assertEqual(len(t.dump()), 2)
2559             self.assertEqual(len(t.mdump()), 5)
2560
2561
2562 if __name__ == '__main__':
2563     unittest.main(testRunner=VppTestRunner)