tests: python3 changes for ip6 fib tests
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5
6 from parameterized import parameterized
7 import scapy.compat
8 import scapy.layers.inet6 as inet6
9 from scapy.contrib.mpls import MPLS
10 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
11     ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
12     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
13     ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, IPv6ExtHdrHopByHop
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.packet import Raw
16 from scapy.utils import inet_pton, inet_ntop
17 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
18     in6_mactoifaceid
19 from six import moves
20
21 from framework import VppTestCase, VppTestRunner
22 from util import ppp, ip6_normalize, mk_ll_addr
23 from vpp_ip import DpoProto, VppIpAddress
24 from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
25     VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
26     VppMplsRoute, VppMplsTable, VppIpTable, FibPathType, \
27     VppIpInterfaceAddress
28 from vpp_neighbor import find_nbr, VppNeighbor
29 from vpp_pg_interface import is_ipv6_misc
30 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
31 from ipaddress import IPv6Network, IPv4Network, IPv6Address
32
33 AF_INET6 = socket.AF_INET6
34
35 try:
36     text_type = unicode
37 except NameError:
38     text_type = str
39
40 NUM_PKTS = 67
41
42
43 class TestIPv6ND(VppTestCase):
44     def validate_ra(self, intf, rx, dst_ip=None):
45         if not dst_ip:
46             dst_ip = intf.remote_ip6
47
48         # unicasted packets must come to the unicast mac
49         self.assertEqual(rx[Ether].dst, intf.remote_mac)
50
51         # and from the router's MAC
52         self.assertEqual(rx[Ether].src, intf.local_mac)
53
54         # the rx'd RA should be addressed to the sender's source
55         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
56         self.assertEqual(in6_ptop(rx[IPv6].dst),
57                          in6_ptop(dst_ip))
58
59         # and come from the router's link local
60         self.assertTrue(in6_islladdr(rx[IPv6].src))
61         self.assertEqual(in6_ptop(rx[IPv6].src),
62                          in6_ptop(mk_ll_addr(intf.local_mac)))
63
64     def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
65         if not dst_ip:
66             dst_ip = intf.remote_ip6
67         if not tgt_ip:
68             dst_ip = intf.local_ip6
69
70         # unicasted packets must come to the unicast mac
71         self.assertEqual(rx[Ether].dst, intf.remote_mac)
72
73         # and from the router's MAC
74         self.assertEqual(rx[Ether].src, intf.local_mac)
75
76         # the rx'd NA should be addressed to the sender's source
77         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
78         self.assertEqual(in6_ptop(rx[IPv6].dst),
79                          in6_ptop(dst_ip))
80
81         # and come from the target address
82         self.assertEqual(
83             in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
84
85         # Dest link-layer options should have the router's MAC
86         dll = rx[ICMPv6NDOptDstLLAddr]
87         self.assertEqual(dll.lladdr, intf.local_mac)
88
89     def validate_ns(self, intf, rx, tgt_ip):
90         nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
91         dst_ip = inet_ntop(AF_INET6, nsma)
92
93         # NS is broadcast
94         self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
95
96         # and from the router's MAC
97         self.assertEqual(rx[Ether].src, intf.local_mac)
98
99         # the rx'd NS should be addressed to an mcast address
100         # derived from the target address
101         self.assertEqual(
102             in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
103
104         # expect the tgt IP in the NS header
105         ns = rx[ICMPv6ND_NS]
106         self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
107
108         # packet is from the router's local address
109         self.assertEqual(
110             in6_ptop(rx[IPv6].src), intf.local_ip6)
111
112         # Src link-layer options should have the router's MAC
113         sll = rx[ICMPv6NDOptSrcLLAddr]
114         self.assertEqual(sll.lladdr, intf.local_mac)
115
116     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
117                            filter_out_fn=is_ipv6_misc):
118         intf.add_stream(pkts)
119         self.pg_enable_capture(self.pg_interfaces)
120         self.pg_start()
121         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
122
123         self.assertEqual(len(rx), 1)
124         rx = rx[0]
125         self.validate_ra(intf, rx, dst_ip)
126
127     def send_and_expect_na(self, intf, pkts, remark, dst_ip=None,
128                            tgt_ip=None,
129                            filter_out_fn=is_ipv6_misc):
130         intf.add_stream(pkts)
131         self.pg_enable_capture(self.pg_interfaces)
132         self.pg_start()
133         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
134
135         self.assertEqual(len(rx), 1)
136         rx = rx[0]
137         self.validate_na(intf, rx, dst_ip, tgt_ip)
138
139     def send_and_expect_ns(self, tx_intf, rx_intf, pkts, tgt_ip,
140                            filter_out_fn=is_ipv6_misc):
141         tx_intf.add_stream(pkts)
142         self.pg_enable_capture(self.pg_interfaces)
143         self.pg_start()
144         rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
145
146         self.assertEqual(len(rx), 1)
147         rx = rx[0]
148         self.validate_ns(rx_intf, rx, tgt_ip)
149
150     def verify_ip(self, rx, smac, dmac, sip, dip):
151         ether = rx[Ether]
152         self.assertEqual(ether.dst, dmac)
153         self.assertEqual(ether.src, smac)
154
155         ip = rx[IPv6]
156         self.assertEqual(ip.src, sip)
157         self.assertEqual(ip.dst, dip)
158
159
160 class TestIPv6(TestIPv6ND):
161     """ IPv6 Test Case """
162
163     @classmethod
164     def setUpClass(cls):
165         super(TestIPv6, cls).setUpClass()
166
167     @classmethod
168     def tearDownClass(cls):
169         super(TestIPv6, cls).tearDownClass()
170
171     def setUp(self):
172         """
173         Perform test setup before test case.
174
175         **Config:**
176             - create 3 pg interfaces
177                 - untagged pg0 interface
178                 - Dot1Q subinterface on pg1
179                 - Dot1AD subinterface on pg2
180             - setup interfaces:
181                 - put it into UP state
182                 - set IPv6 addresses
183                 - resolve neighbor address using NDP
184             - configure 200 fib entries
185
186         :ivar list interfaces: pg interfaces and subinterfaces.
187         :ivar dict flows: IPv4 packet flows in test.
188
189         *TODO:* Create AD sub interface
190         """
191         super(TestIPv6, self).setUp()
192
193         # create 3 pg interfaces
194         self.create_pg_interfaces(range(3))
195
196         # create 2 subinterfaces for p1 and pg2
197         self.sub_interfaces = [
198             VppDot1QSubint(self, self.pg1, 100),
199             VppDot1QSubint(self, self.pg2, 200)
200             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
201         ]
202
203         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
204         self.flows = dict()
205         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
206         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
207         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
208
209         # packet sizes
210         self.pg_if_packet_sizes = [64, 1500, 9020]
211
212         self.interfaces = list(self.pg_interfaces)
213         self.interfaces.extend(self.sub_interfaces)
214
215         # setup all interfaces
216         for i in self.interfaces:
217             i.admin_up()
218             i.config_ip6()
219             i.resolve_ndp()
220
221     def tearDown(self):
222         """Run standard test teardown and log ``show ip6 neighbors``."""
223         for i in self.interfaces:
224             i.unconfig_ip6()
225             i.ip6_disable()
226             i.admin_down()
227         for i in self.sub_interfaces:
228             i.remove_vpp_config()
229
230         super(TestIPv6, self).tearDown()
231         if not self.vpp_dead:
232             self.logger.info(self.vapi.cli("show ip6 neighbors"))
233             # info(self.vapi.cli("show ip6 fib"))  # many entries
234
235     def modify_packet(self, src_if, packet_size, pkt):
236         """Add load, set destination IP and extend packet to required packet
237         size for defined interface.
238
239         :param VppInterface src_if: Interface to create packet for.
240         :param int packet_size: Required packet size.
241         :param Scapy pkt: Packet to be modified.
242         """
243         dst_if_idx = int(packet_size / 10 % 2)
244         dst_if = self.flows[src_if][dst_if_idx]
245         info = self.create_packet_info(src_if, dst_if)
246         payload = self.info_to_payload(info)
247         p = pkt / Raw(payload)
248         p[IPv6].dst = dst_if.remote_ip6
249         info.data = p.copy()
250         if isinstance(src_if, VppSubInterface):
251             p = src_if.add_dot1_layer(p)
252         self.extend_packet(p, packet_size)
253
254         return p
255
256     def create_stream(self, src_if):
257         """Create input packet stream for defined interface.
258
259         :param VppInterface src_if: Interface to create packet stream for.
260         """
261         hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
262         pkt_tmpl = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
263                     IPv6(src=src_if.remote_ip6) /
264                     inet6.UDP(sport=1234, dport=1234))
265
266         pkts = [self.modify_packet(src_if, i, pkt_tmpl)
267                 for i in moves.range(self.pg_if_packet_sizes[0],
268                                      self.pg_if_packet_sizes[1], 10)]
269         pkts_b = [self.modify_packet(src_if, i, pkt_tmpl)
270                   for i in moves.range(self.pg_if_packet_sizes[1] + hdr_ext,
271                                        self.pg_if_packet_sizes[2] + hdr_ext,
272                                        50)]
273         pkts.extend(pkts_b)
274
275         return pkts
276
277     def verify_capture(self, dst_if, capture):
278         """Verify captured input packet stream for defined interface.
279
280         :param VppInterface dst_if: Interface to verify captured packet stream
281                                     for.
282         :param list capture: Captured packet stream.
283         """
284         self.logger.info("Verifying capture on interface %s" % dst_if.name)
285         last_info = dict()
286         for i in self.interfaces:
287             last_info[i.sw_if_index] = None
288         is_sub_if = False
289         dst_sw_if_index = dst_if.sw_if_index
290         if hasattr(dst_if, 'parent'):
291             is_sub_if = True
292         for packet in capture:
293             if is_sub_if:
294                 # Check VLAN tags and Ethernet header
295                 packet = dst_if.remove_dot1_layer(packet)
296             self.assertTrue(Dot1Q not in packet)
297             try:
298                 ip = packet[IPv6]
299                 udp = packet[inet6.UDP]
300                 payload_info = self.payload_to_info(packet[Raw])
301                 packet_index = payload_info.index
302                 self.assertEqual(payload_info.dst, dst_sw_if_index)
303                 self.logger.debug(
304                     "Got packet on port %s: src=%u (id=%u)" %
305                     (dst_if.name, payload_info.src, packet_index))
306                 next_info = self.get_next_packet_info_for_interface2(
307                     payload_info.src, dst_sw_if_index,
308                     last_info[payload_info.src])
309                 last_info[payload_info.src] = next_info
310                 self.assertTrue(next_info is not None)
311                 self.assertEqual(packet_index, next_info.index)
312                 saved_packet = next_info.data
313                 # Check standard fields
314                 self.assertEqual(
315                     ip.src, saved_packet[IPv6].src)
316                 self.assertEqual(
317                     ip.dst, saved_packet[IPv6].dst)
318                 self.assertEqual(
319                     udp.sport, saved_packet[inet6.UDP].sport)
320                 self.assertEqual(
321                     udp.dport, saved_packet[inet6.UDP].dport)
322             except:
323                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
324                 raise
325         for i in self.interfaces:
326             remaining_packet = self.get_next_packet_info_for_interface2(
327                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
328             self.assertTrue(remaining_packet is None,
329                             "Interface %s: Packet expected from interface %s "
330                             "didn't arrive" % (dst_if.name, i.name))
331
332     def test_next_header_anomaly(self):
333         """ IPv6 next header anomaly test
334
335         Test scenario:
336             - ipv6 next header field = Fragment Header (44)
337             - next header is ICMPv6 Echo Request
338             - wait for reassembly
339         """
340         pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
341                IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44) /
342                ICMPv6EchoRequest())
343
344         self.pg0.add_stream(pkt)
345         self.pg_start()
346
347         # wait for reassembly
348         self.sleep(10)
349
350     def test_fib(self):
351         """ IPv6 FIB test
352
353         Test scenario:
354             - Create IPv6 stream for pg0 interface
355             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
356             - Send and verify received packets on each interface.
357         """
358
359         pkts = self.create_stream(self.pg0)
360         self.pg0.add_stream(pkts)
361
362         for i in self.sub_interfaces:
363             pkts = self.create_stream(i)
364             i.parent.add_stream(pkts)
365
366         self.pg_enable_capture(self.pg_interfaces)
367         self.pg_start()
368
369         pkts = self.pg0.get_capture()
370         self.verify_capture(self.pg0, pkts)
371
372         for i in self.sub_interfaces:
373             pkts = i.parent.get_capture()
374             self.verify_capture(i, pkts)
375
376     def test_ns(self):
377         """ IPv6 Neighbour Solicitation Exceptions
378
379         Test scenario:
380            - Send an NS Sourced from an address not covered by the link sub-net
381            - Send an NS to an mcast address the router has not joined
382            - Send NS for a target address the router does not onn.
383         """
384
385         #
386         # An NS from a non link source address
387         #
388         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
389         d = inet_ntop(AF_INET6, nsma)
390
391         p = (Ether(dst=in6_getnsmac(nsma)) /
392              IPv6(dst=d, src="2002::2") /
393              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
394              ICMPv6NDOptSrcLLAddr(
395                  lladdr=self.pg0.remote_mac))
396         pkts = [p]
397
398         self.send_and_assert_no_replies(
399             self.pg0, pkts,
400             "No response to NS source by address not on sub-net")
401
402         #
403         # An NS for sent to a solicited mcast group the router is
404         # not a member of FAILS
405         #
406         if 0:
407             nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
408             d = inet_ntop(AF_INET6, nsma)
409
410             p = (Ether(dst=in6_getnsmac(nsma)) /
411                  IPv6(dst=d, src=self.pg0.remote_ip6) /
412                  ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
413                  ICMPv6NDOptSrcLLAddr(
414                      lladdr=self.pg0.remote_mac))
415             pkts = [p]
416
417             self.send_and_assert_no_replies(
418                 self.pg0, pkts,
419                 "No response to NS sent to unjoined mcast address")
420
421         #
422         # An NS whose target address is one the router does not own
423         #
424         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
425         d = inet_ntop(AF_INET6, nsma)
426
427         p = (Ether(dst=in6_getnsmac(nsma)) /
428              IPv6(dst=d, src=self.pg0.remote_ip6) /
429              ICMPv6ND_NS(tgt="fd::ffff") /
430              ICMPv6NDOptSrcLLAddr(
431                  lladdr=self.pg0.remote_mac))
432         pkts = [p]
433
434         self.send_and_assert_no_replies(self.pg0, pkts,
435                                         "No response to NS for unknown target")
436
437         #
438         # A neighbor entry that has no associated FIB-entry
439         #
440         self.pg0.generate_remote_hosts(4)
441         nd_entry = VppNeighbor(self,
442                                self.pg0.sw_if_index,
443                                self.pg0.remote_hosts[2].mac,
444                                self.pg0.remote_hosts[2].ip6,
445                                is_no_fib_entry=1)
446         nd_entry.add_vpp_config()
447
448         #
449         # check we have the neighbor, but no route
450         #
451         self.assertTrue(find_nbr(self,
452                                  self.pg0.sw_if_index,
453                                  self.pg0._remote_hosts[2].ip6))
454         self.assertFalse(find_route(self,
455                                     self.pg0._remote_hosts[2].ip6,
456                                     128))
457
458         #
459         # send an NS from a link local address to the interface's global
460         # address
461         #
462         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
463              IPv6(
464                  dst=d, src=self.pg0._remote_hosts[2].ip6_ll) /
465              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
466              ICMPv6NDOptSrcLLAddr(
467                  lladdr=self.pg0.remote_mac))
468
469         self.send_and_expect_na(self.pg0, p,
470                                 "NS from link-local",
471                                 dst_ip=self.pg0._remote_hosts[2].ip6_ll,
472                                 tgt_ip=self.pg0.local_ip6)
473
474         #
475         # we should have learned an ND entry for the peer's link-local
476         # but not inserted a route to it in the FIB
477         #
478         self.assertTrue(find_nbr(self,
479                                  self.pg0.sw_if_index,
480                                  self.pg0._remote_hosts[2].ip6_ll))
481         self.assertFalse(find_route(self,
482                                     self.pg0._remote_hosts[2].ip6_ll,
483                                     128))
484
485         #
486         # An NS to the router's own Link-local
487         #
488         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
489              IPv6(
490                  dst=d, src=self.pg0._remote_hosts[3].ip6_ll) /
491              ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll) /
492              ICMPv6NDOptSrcLLAddr(
493                  lladdr=self.pg0.remote_mac))
494
495         self.send_and_expect_na(self.pg0, p,
496                                 "NS to/from link-local",
497                                 dst_ip=self.pg0._remote_hosts[3].ip6_ll,
498                                 tgt_ip=self.pg0.local_ip6_ll)
499
500         #
501         # we should have learned an ND entry for the peer's link-local
502         # but not inserted a route to it in the FIB
503         #
504         self.assertTrue(find_nbr(self,
505                                  self.pg0.sw_if_index,
506                                  self.pg0._remote_hosts[3].ip6_ll))
507         self.assertFalse(find_route(self,
508                                     self.pg0._remote_hosts[3].ip6_ll,
509                                     128))
510
511     def test_ns_duplicates(self):
512         """ ND Duplicates"""
513
514         #
515         # Generate some hosts on the LAN
516         #
517         self.pg1.generate_remote_hosts(3)
518
519         #
520         # Add host 1 on pg1 and pg2
521         #
522         ns_pg1 = VppNeighbor(self,
523                              self.pg1.sw_if_index,
524                              self.pg1.remote_hosts[1].mac,
525                              self.pg1.remote_hosts[1].ip6)
526         ns_pg1.add_vpp_config()
527         ns_pg2 = VppNeighbor(self,
528                              self.pg2.sw_if_index,
529                              self.pg2.remote_mac,
530                              self.pg1.remote_hosts[1].ip6)
531         ns_pg2.add_vpp_config()
532
533         #
534         # IP packet destined for pg1 remote host arrives on pg1 again.
535         #
536         p = (Ether(dst=self.pg0.local_mac,
537                    src=self.pg0.remote_mac) /
538              IPv6(src=self.pg0.remote_ip6,
539                   dst=self.pg1.remote_hosts[1].ip6) /
540              inet6.UDP(sport=1234, dport=1234) /
541              Raw())
542
543         self.pg0.add_stream(p)
544         self.pg_enable_capture(self.pg_interfaces)
545         self.pg_start()
546
547         rx1 = self.pg1.get_capture(1)
548
549         self.verify_ip(rx1[0],
550                        self.pg1.local_mac,
551                        self.pg1.remote_hosts[1].mac,
552                        self.pg0.remote_ip6,
553                        self.pg1.remote_hosts[1].ip6)
554
555         #
556         # remove the duplicate on pg1
557         # packet stream should generate NSs out of pg1
558         #
559         ns_pg1.remove_vpp_config()
560
561         self.send_and_expect_ns(self.pg0, self.pg1,
562                                 p, self.pg1.remote_hosts[1].ip6)
563
564         #
565         # Add it back
566         #
567         ns_pg1.add_vpp_config()
568
569         self.pg0.add_stream(p)
570         self.pg_enable_capture(self.pg_interfaces)
571         self.pg_start()
572
573         rx1 = self.pg1.get_capture(1)
574
575         self.verify_ip(rx1[0],
576                        self.pg1.local_mac,
577                        self.pg1.remote_hosts[1].mac,
578                        self.pg0.remote_ip6,
579                        self.pg1.remote_hosts[1].ip6)
580
581     def validate_ra(self, intf, rx, dst_ip=None, mtu=9000, pi_opt=None):
582         if not dst_ip:
583             dst_ip = intf.remote_ip6
584
585         # unicasted packets must come to the unicast mac
586         self.assertEqual(rx[Ether].dst, intf.remote_mac)
587
588         # and from the router's MAC
589         self.assertEqual(rx[Ether].src, intf.local_mac)
590
591         # the rx'd RA should be addressed to the sender's source
592         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
593         self.assertEqual(in6_ptop(rx[IPv6].dst),
594                          in6_ptop(dst_ip))
595
596         # and come from the router's link local
597         self.assertTrue(in6_islladdr(rx[IPv6].src))
598         self.assertEqual(in6_ptop(rx[IPv6].src),
599                          in6_ptop(mk_ll_addr(intf.local_mac)))
600
601         # it should contain the links MTU
602         ra = rx[ICMPv6ND_RA]
603         self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
604
605         # it should contain the source's link layer address option
606         sll = ra[ICMPv6NDOptSrcLLAddr]
607         self.assertEqual(sll.lladdr, intf.local_mac)
608
609         if not pi_opt:
610             # the RA should not contain prefix information
611             self.assertFalse(ra.haslayer(
612                 ICMPv6NDOptPrefixInfo))
613         else:
614             raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
615
616             # the options are nested in the scapy packet in way that i cannot
617             # decipher how to decode. this 1st layer of option always returns
618             # nested classes, so a direct obj1=obj2 comparison always fails.
619             # however, the getlayer(.., 2) does give one instance.
620             # so we cheat here and construct a new opt instance for comparison
621             rd = ICMPv6NDOptPrefixInfo(
622                 prefixlen=raos.prefixlen,
623                 prefix=raos.prefix,
624                 L=raos.L,
625                 A=raos.A)
626             if type(pi_opt) is list:
627                 for ii in range(len(pi_opt)):
628                     self.assertEqual(pi_opt[ii], rd)
629                     rd = rx.getlayer(
630                         ICMPv6NDOptPrefixInfo, ii + 2)
631             else:
632                 self.assertEqual(pi_opt, raos, 'Expected: %s, received: %s'
633                                  % (pi_opt.show(dump=True),
634                                     raos.show(dump=True)))
635
636     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
637                            filter_out_fn=is_ipv6_misc,
638                            opt=None):
639         intf.add_stream(pkts)
640         self.pg_enable_capture(self.pg_interfaces)
641         self.pg_start()
642         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
643
644         self.assertEqual(len(rx), 1)
645         rx = rx[0]
646         self.validate_ra(intf, rx, dst_ip, pi_opt=opt)
647
648     def test_rs(self):
649         """ IPv6 Router Solicitation Exceptions
650
651         Test scenario:
652         """
653
654         #
655         # Before we begin change the IPv6 RA responses to use the unicast
656         # address - that way we will not confuse them with the periodic
657         # RAs which go to the mcast address
658         # Sit and wait for the first periodic RA.
659         #
660         # TODO
661         #
662         self.pg0.ip6_ra_config(send_unicast=1)
663
664         #
665         # An RS from a link source address
666         #  - expect an RA in return
667         #
668         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
669              IPv6(
670                  dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
671              ICMPv6ND_RS())
672         pkts = [p]
673         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
674
675         #
676         # For the next RS sent the RA should be rate limited
677         #
678         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
679
680         #
681         # When we reconfigure the IPv6 RA config,
682         # we reset the RA rate limiting,
683         # so we need to do this before each test below so as not to drop
684         # packets for rate limiting reasons. Test this works here.
685         #
686         self.pg0.ip6_ra_config(send_unicast=1)
687         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
688
689         #
690         # An RS sent from a non-link local source
691         #
692         self.pg0.ip6_ra_config(send_unicast=1)
693         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
694              IPv6(dst=self.pg0.local_ip6,
695                   src="2002::ffff") /
696              ICMPv6ND_RS())
697         pkts = [p]
698         self.send_and_assert_no_replies(self.pg0, pkts,
699                                         "RS from non-link source")
700
701         #
702         # Source an RS from a link local address
703         #
704         self.pg0.ip6_ra_config(send_unicast=1)
705         ll = mk_ll_addr(self.pg0.remote_mac)
706         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
707              IPv6(dst=self.pg0.local_ip6, src=ll) /
708              ICMPv6ND_RS())
709         pkts = [p]
710         self.send_and_expect_ra(self.pg0, pkts,
711                                 "RS sourced from link-local",
712                                 dst_ip=ll)
713
714         #
715         # Send the RS multicast
716         #
717         self.pg0.ip6_ra_config(send_unicast=1)
718         dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
719         ll = mk_ll_addr(self.pg0.remote_mac)
720         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
721              IPv6(dst="ff02::2", src=ll) /
722              ICMPv6ND_RS())
723         pkts = [p]
724         self.send_and_expect_ra(self.pg0, pkts,
725                                 "RS sourced from link-local",
726                                 dst_ip=ll)
727
728         #
729         # Source from the unspecified address ::. This happens when the RS
730         # is sent before the host has a configured address/sub-net,
731         # i.e. auto-config. Since the sender has no IP address, the reply
732         # comes back mcast - so the capture needs to not filter this.
733         # If we happen to pick up the periodic RA at this point then so be it,
734         # it's not an error.
735         #
736         self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
737         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
738              IPv6(dst="ff02::2", src="::") /
739              ICMPv6ND_RS())
740         pkts = [p]
741         self.send_and_expect_ra(self.pg0, pkts,
742                                 "RS sourced from unspecified",
743                                 dst_ip="ff02::1",
744                                 filter_out_fn=None)
745
746         #
747         # Configure The RA to announce the links prefix
748         #
749         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
750                                self.pg0.local_ip6_prefix_len))
751
752         #
753         # RAs should now contain the prefix information option
754         #
755         opt = ICMPv6NDOptPrefixInfo(
756             prefixlen=self.pg0.local_ip6_prefix_len,
757             prefix=self.pg0.local_ip6,
758             L=1,
759             A=1)
760
761         self.pg0.ip6_ra_config(send_unicast=1)
762         ll = mk_ll_addr(self.pg0.remote_mac)
763         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
764              IPv6(dst=self.pg0.local_ip6, src=ll) /
765              ICMPv6ND_RS())
766         self.send_and_expect_ra(self.pg0, p,
767                                 "RA with prefix-info",
768                                 dst_ip=ll,
769                                 opt=opt)
770
771         #
772         # Change the prefix info to not off-link
773         #  L-flag is clear
774         #
775         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
776                                self.pg0.local_ip6_prefix_len),
777                                off_link=1)
778
779         opt = ICMPv6NDOptPrefixInfo(
780             prefixlen=self.pg0.local_ip6_prefix_len,
781             prefix=self.pg0.local_ip6,
782             L=0,
783             A=1)
784
785         self.pg0.ip6_ra_config(send_unicast=1)
786         self.send_and_expect_ra(self.pg0, p,
787                                 "RA with Prefix info with L-flag=0",
788                                 dst_ip=ll,
789                                 opt=opt)
790
791         #
792         # Change the prefix info to not off-link, no-autoconfig
793         #  L and A flag are clear in the advert
794         #
795         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
796                                self.pg0.local_ip6_prefix_len),
797                                off_link=1,
798                                no_autoconfig=1)
799
800         opt = ICMPv6NDOptPrefixInfo(
801             prefixlen=self.pg0.local_ip6_prefix_len,
802             prefix=self.pg0.local_ip6,
803             L=0,
804             A=0)
805
806         self.pg0.ip6_ra_config(send_unicast=1)
807         self.send_and_expect_ra(self.pg0, p,
808                                 "RA with Prefix info with A & L-flag=0",
809                                 dst_ip=ll,
810                                 opt=opt)
811
812         #
813         # Change the flag settings back to the defaults
814         #  L and A flag are set in the advert
815         #
816         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
817                                self.pg0.local_ip6_prefix_len))
818
819         opt = ICMPv6NDOptPrefixInfo(
820             prefixlen=self.pg0.local_ip6_prefix_len,
821             prefix=self.pg0.local_ip6,
822             L=1,
823             A=1)
824
825         self.pg0.ip6_ra_config(send_unicast=1)
826         self.send_and_expect_ra(self.pg0, p,
827                                 "RA with Prefix info",
828                                 dst_ip=ll,
829                                 opt=opt)
830
831         #
832         # Change the prefix info to not off-link, no-autoconfig
833         #  L and A flag are clear in the advert
834         #
835         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
836                                self.pg0.local_ip6_prefix_len),
837                                off_link=1,
838                                no_autoconfig=1)
839
840         opt = ICMPv6NDOptPrefixInfo(
841             prefixlen=self.pg0.local_ip6_prefix_len,
842             prefix=self.pg0.local_ip6,
843             L=0,
844             A=0)
845
846         self.pg0.ip6_ra_config(send_unicast=1)
847         self.send_and_expect_ra(self.pg0, p,
848                                 "RA with Prefix info with A & L-flag=0",
849                                 dst_ip=ll,
850                                 opt=opt)
851
852         #
853         # Use the reset to defaults option to revert to defaults
854         #  L and A flag are clear in the advert
855         #
856         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
857                                self.pg0.local_ip6_prefix_len),
858                                use_default=1)
859
860         opt = ICMPv6NDOptPrefixInfo(
861             prefixlen=self.pg0.local_ip6_prefix_len,
862             prefix=self.pg0.local_ip6,
863             L=1,
864             A=1)
865
866         self.pg0.ip6_ra_config(send_unicast=1)
867         self.send_and_expect_ra(self.pg0, p,
868                                 "RA with Prefix reverted to defaults",
869                                 dst_ip=ll,
870                                 opt=opt)
871
872         #
873         # Advertise Another prefix. With no L-flag/A-flag
874         #
875         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
876                                self.pg1.local_ip6_prefix_len),
877                                off_link=1,
878                                no_autoconfig=1)
879
880         opt = [ICMPv6NDOptPrefixInfo(
881             prefixlen=self.pg0.local_ip6_prefix_len,
882             prefix=self.pg0.local_ip6,
883             L=1,
884             A=1),
885             ICMPv6NDOptPrefixInfo(
886                 prefixlen=self.pg1.local_ip6_prefix_len,
887                 prefix=self.pg1.local_ip6,
888                 L=0,
889                 A=0)]
890
891         self.pg0.ip6_ra_config(send_unicast=1)
892         ll = mk_ll_addr(self.pg0.remote_mac)
893         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
894              IPv6(dst=self.pg0.local_ip6, src=ll) /
895              ICMPv6ND_RS())
896         self.send_and_expect_ra(self.pg0, p,
897                                 "RA with multiple Prefix infos",
898                                 dst_ip=ll,
899                                 opt=opt)
900
901         #
902         # Remove the first prefix-info - expect the second is still in the
903         # advert
904         #
905         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
906                                self.pg0.local_ip6_prefix_len),
907                                is_no=1)
908
909         opt = ICMPv6NDOptPrefixInfo(
910             prefixlen=self.pg1.local_ip6_prefix_len,
911             prefix=self.pg1.local_ip6,
912             L=0,
913             A=0)
914
915         self.pg0.ip6_ra_config(send_unicast=1)
916         self.send_and_expect_ra(self.pg0, p,
917                                 "RA with Prefix reverted to defaults",
918                                 dst_ip=ll,
919                                 opt=opt)
920
921         #
922         # Remove the second prefix-info - expect no prefix-info in the adverts
923         #
924         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
925                                self.pg1.local_ip6_prefix_len),
926                                is_no=1)
927
928         self.pg0.ip6_ra_config(send_unicast=1)
929         self.send_and_expect_ra(self.pg0, p,
930                                 "RA with Prefix reverted to defaults",
931                                 dst_ip=ll)
932
933         #
934         # Reset the periodic advertisements back to default values
935         #
936         self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
937
938
939 class TestIPv6IfAddrRoute(VppTestCase):
940     """ IPv6 Interface Addr Route Test Case """
941
942     @classmethod
943     def setUpClass(cls):
944         super(TestIPv6IfAddrRoute, cls).setUpClass()
945
946     @classmethod
947     def tearDownClass(cls):
948         super(TestIPv6IfAddrRoute, cls).tearDownClass()
949
950     def setUp(self):
951         super(TestIPv6IfAddrRoute, self).setUp()
952
953         # create 1 pg interface
954         self.create_pg_interfaces(range(1))
955
956         for i in self.pg_interfaces:
957             i.admin_up()
958             i.config_ip6()
959             i.resolve_ndp()
960
961     def tearDown(self):
962         super(TestIPv6IfAddrRoute, self).tearDown()
963         for i in self.pg_interfaces:
964             i.unconfig_ip6()
965             i.admin_down()
966
967     def test_ipv6_ifaddrs_same_prefix(self):
968         """ IPv6 Interface Addresses Same Prefix test
969
970         Test scenario:
971
972             - Verify no route in FIB for prefix 2001:10::/64
973             - Configure IPv4 address 2001:10::10/64  on an interface
974             - Verify route in FIB for prefix 2001:10::/64
975             - Configure IPv4 address 2001:10::20/64 on an interface
976             - Delete 2001:10::10/64 from interface
977             - Verify route in FIB for prefix 2001:10::/64
978             - Delete 2001:10::20/64 from interface
979             - Verify no route in FIB for prefix 2001:10::/64
980         """
981
982         addr1 = "2001:10::10"
983         addr2 = "2001:10::20"
984
985         if_addr1 = VppIpInterfaceAddress(self, self.pg0,
986                                          VppIpAddress(addr1), 64)
987         if_addr2 = VppIpInterfaceAddress(self, self.pg0,
988                                          VppIpAddress(addr2), 64)
989         self.assertFalse(if_addr1.query_vpp_config())  # 2001:10::/64
990         self.assertFalse(find_route(self, addr1, 128))
991         self.assertFalse(find_route(self, addr2, 128))
992
993         # configure first address, verify route present
994         if_addr1.add_vpp_config()
995         self.assertTrue(if_addr1.query_vpp_config())  # 2001:10::/64
996         self.assertTrue(find_route(self, addr1, 128))
997         self.assertFalse(find_route(self, addr2, 128))
998
999         # configure second address, delete first, verify route not removed
1000         if_addr2.add_vpp_config()
1001         if_addr1.remove_vpp_config()
1002         self.assertTrue(if_addr1.query_vpp_config())  # 2001:10::/64
1003         self.assertFalse(find_route(self, addr1, 128))
1004         self.assertTrue(find_route(self, addr2, 128))
1005
1006         # delete second address, verify route removed
1007         if_addr2.remove_vpp_config()
1008         self.assertFalse(if_addr1.query_vpp_config())  # 2001:10::/64
1009         self.assertFalse(find_route(self, addr1, 128))
1010         self.assertFalse(find_route(self, addr2, 128))
1011
1012
1013 class TestICMPv6Echo(VppTestCase):
1014     """ ICMPv6 Echo Test Case """
1015
1016     @classmethod
1017     def setUpClass(cls):
1018         super(TestICMPv6Echo, cls).setUpClass()
1019
1020     @classmethod
1021     def tearDownClass(cls):
1022         super(TestICMPv6Echo, cls).tearDownClass()
1023
1024     def setUp(self):
1025         super(TestICMPv6Echo, self).setUp()
1026
1027         # create 1 pg interface
1028         self.create_pg_interfaces(range(1))
1029
1030         for i in self.pg_interfaces:
1031             i.admin_up()
1032             i.config_ip6()
1033             i.resolve_ndp()
1034
1035     def tearDown(self):
1036         super(TestICMPv6Echo, self).tearDown()
1037         for i in self.pg_interfaces:
1038             i.unconfig_ip6()
1039             i.ip6_disable()
1040             i.admin_down()
1041
1042     def test_icmpv6_echo(self):
1043         """ VPP replies to ICMPv6 Echo Request
1044
1045         Test scenario:
1046
1047             - Receive ICMPv6 Echo Request message on pg0 interface.
1048             - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
1049         """
1050
1051         icmpv6_id = 0xb
1052         icmpv6_seq = 5
1053         icmpv6_data = b'\x0a' * 18
1054         p_echo_request = (Ether(src=self.pg0.remote_mac,
1055                                 dst=self.pg0.local_mac) /
1056                           IPv6(src=self.pg0.remote_ip6,
1057                                dst=self.pg0.local_ip6) /
1058                           ICMPv6EchoRequest(
1059                               id=icmpv6_id,
1060                               seq=icmpv6_seq,
1061                               data=icmpv6_data))
1062
1063         self.pg0.add_stream(p_echo_request)
1064         self.pg_enable_capture(self.pg_interfaces)
1065         self.pg_start()
1066
1067         rx = self.pg0.get_capture(1)
1068         rx = rx[0]
1069         ether = rx[Ether]
1070         ipv6 = rx[IPv6]
1071         icmpv6 = rx[ICMPv6EchoReply]
1072
1073         self.assertEqual(ether.src, self.pg0.local_mac)
1074         self.assertEqual(ether.dst, self.pg0.remote_mac)
1075
1076         self.assertEqual(ipv6.src, self.pg0.local_ip6)
1077         self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
1078
1079         self.assertEqual(
1080             icmp6types[icmpv6.type], "Echo Reply")
1081         self.assertEqual(icmpv6.id, icmpv6_id)
1082         self.assertEqual(icmpv6.seq, icmpv6_seq)
1083         self.assertEqual(icmpv6.data, icmpv6_data)
1084
1085
1086 class TestIPv6RD(TestIPv6ND):
1087     """ IPv6 Router Discovery Test Case """
1088
1089     @classmethod
1090     def setUpClass(cls):
1091         super(TestIPv6RD, cls).setUpClass()
1092
1093     @classmethod
1094     def tearDownClass(cls):
1095         super(TestIPv6RD, cls).tearDownClass()
1096
1097     def setUp(self):
1098         super(TestIPv6RD, self).setUp()
1099
1100         # create 2 pg interfaces
1101         self.create_pg_interfaces(range(2))
1102
1103         self.interfaces = list(self.pg_interfaces)
1104
1105         # setup all interfaces
1106         for i in self.interfaces:
1107             i.admin_up()
1108             i.config_ip6()
1109
1110     def tearDown(self):
1111         for i in self.interfaces:
1112             i.unconfig_ip6()
1113             i.admin_down()
1114         super(TestIPv6RD, self).tearDown()
1115
1116     def test_rd_send_router_solicitation(self):
1117         """ Verify router solicitation packets """
1118
1119         count = 2
1120         self.pg_enable_capture(self.pg_interfaces)
1121         self.pg_start()
1122         self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index,
1123                                                  mrc=count)
1124         rx_list = self.pg1.get_capture(count, timeout=3)
1125         self.assertEqual(len(rx_list), count)
1126         for packet in rx_list:
1127             self.assertEqual(packet.haslayer(IPv6), 1)
1128             self.assertEqual(packet[IPv6].haslayer(
1129                 ICMPv6ND_RS), 1)
1130             dst = ip6_normalize(packet[IPv6].dst)
1131             dst2 = ip6_normalize("ff02::2")
1132             self.assert_equal(dst, dst2)
1133             src = ip6_normalize(packet[IPv6].src)
1134             src2 = ip6_normalize(self.pg1.local_ip6_ll)
1135             self.assert_equal(src, src2)
1136             self.assertTrue(
1137                 bool(packet[ICMPv6ND_RS].haslayer(
1138                     ICMPv6NDOptSrcLLAddr)))
1139             self.assert_equal(
1140                 packet[ICMPv6NDOptSrcLLAddr].lladdr,
1141                 self.pg1.local_mac)
1142
1143     def verify_prefix_info(self, reported_prefix, prefix_option):
1144         prefix = IPv6Network(
1145             text_type(prefix_option.getfieldval("prefix") +
1146                       "/" +
1147                       text_type(prefix_option.getfieldval("prefixlen"))),
1148             strict=False)
1149         self.assert_equal(reported_prefix.prefix.network_address,
1150                           prefix.network_address)
1151         L = prefix_option.getfieldval("L")
1152         A = prefix_option.getfieldval("A")
1153         option_flags = (L << 7) | (A << 6)
1154         self.assert_equal(reported_prefix.flags, option_flags)
1155         self.assert_equal(reported_prefix.valid_time,
1156                           prefix_option.getfieldval("validlifetime"))
1157         self.assert_equal(reported_prefix.preferred_time,
1158                           prefix_option.getfieldval("preferredlifetime"))
1159
1160     def test_rd_receive_router_advertisement(self):
1161         """ Verify events triggered by received RA packets """
1162
1163         self.vapi.want_ip6_ra_events()
1164
1165         prefix_info_1 = ICMPv6NDOptPrefixInfo(
1166             prefix="1::2",
1167             prefixlen=50,
1168             validlifetime=200,
1169             preferredlifetime=500,
1170             L=1,
1171             A=1,
1172         )
1173
1174         prefix_info_2 = ICMPv6NDOptPrefixInfo(
1175             prefix="7::4",
1176             prefixlen=20,
1177             validlifetime=70,
1178             preferredlifetime=1000,
1179             L=1,
1180             A=0,
1181         )
1182
1183         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1184              IPv6(dst=self.pg1.local_ip6_ll,
1185                   src=mk_ll_addr(self.pg1.remote_mac)) /
1186              ICMPv6ND_RA() /
1187              prefix_info_1 /
1188              prefix_info_2)
1189         self.pg1.add_stream([p])
1190         self.pg_start()
1191
1192         ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1193
1194         self.assert_equal(ev.current_hop_limit, 0)
1195         self.assert_equal(ev.flags, 8)
1196         self.assert_equal(ev.router_lifetime_in_sec, 1800)
1197         self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1198         self.assert_equal(
1199             ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0)
1200
1201         self.assert_equal(ev.n_prefixes, 2)
1202
1203         self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1204         self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1205
1206
1207 class TestIPv6RDControlPlane(TestIPv6ND):
1208     """ IPv6 Router Discovery Control Plane Test Case """
1209
1210     @classmethod
1211     def setUpClass(cls):
1212         super(TestIPv6RDControlPlane, cls).setUpClass()
1213
1214     @classmethod
1215     def tearDownClass(cls):
1216         super(TestIPv6RDControlPlane, cls).tearDownClass()
1217
1218     def setUp(self):
1219         super(TestIPv6RDControlPlane, self).setUp()
1220
1221         # create 1 pg interface
1222         self.create_pg_interfaces(range(1))
1223
1224         self.interfaces = list(self.pg_interfaces)
1225
1226         # setup all interfaces
1227         for i in self.interfaces:
1228             i.admin_up()
1229             i.config_ip6()
1230
1231     def tearDown(self):
1232         super(TestIPv6RDControlPlane, self).tearDown()
1233
1234     @staticmethod
1235     def create_ra_packet(pg, routerlifetime=None):
1236         src_ip = pg.remote_ip6_ll
1237         dst_ip = pg.local_ip6
1238         if routerlifetime is not None:
1239             ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1240         else:
1241             ra = ICMPv6ND_RA()
1242         p = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
1243              IPv6(dst=dst_ip, src=src_ip) / ra)
1244         return p
1245
1246     @staticmethod
1247     def get_default_routes(fib):
1248         list = []
1249         for entry in fib:
1250             if entry.route.prefix.prefixlen == 0:
1251                 for path in entry.route.paths:
1252                     if path.sw_if_index != 0xFFFFFFFF:
1253                         defaut_route = {}
1254                         defaut_route['sw_if_index'] = path.sw_if_index
1255                         defaut_route['next_hop'] = path.nh.address.ip6
1256                         list.append(defaut_route)
1257         return list
1258
1259     @staticmethod
1260     def get_interface_addresses(fib, pg):
1261         list = []
1262         for entry in fib:
1263             if entry.route.prefix.prefixlen == 128:
1264                 path = entry.route.paths[0]
1265                 if path.sw_if_index == pg.sw_if_index:
1266                     list.append(str(entry.route.prefix.network_address))
1267         return list
1268
1269     def test_all(self):
1270         """ Test handling of SLAAC addresses and default routes """
1271
1272         fib = self.vapi.ip_route_dump(0, True)
1273         default_routes = self.get_default_routes(fib)
1274         initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1275         self.assertEqual(default_routes, [])
1276         router_address = IPv6Address(text_type(self.pg0.remote_ip6_ll))
1277
1278         self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1279
1280         self.sleep(0.1)
1281
1282         # send RA
1283         packet = (self.create_ra_packet(
1284             self.pg0) / ICMPv6NDOptPrefixInfo(
1285             prefix="1::",
1286             prefixlen=64,
1287             validlifetime=2,
1288             preferredlifetime=2,
1289             L=1,
1290             A=1,
1291         ) / ICMPv6NDOptPrefixInfo(
1292             prefix="7::",
1293             prefixlen=20,
1294             validlifetime=1500,
1295             preferredlifetime=1000,
1296             L=1,
1297             A=0,
1298         ))
1299         self.pg0.add_stream([packet])
1300         self.pg_start()
1301
1302         self.sleep(0.1)
1303
1304         fib = self.vapi.ip_route_dump(0, True)
1305
1306         # check FIB for new address
1307         addresses = set(self.get_interface_addresses(fib, self.pg0))
1308         new_addresses = addresses.difference(initial_addresses)
1309         self.assertEqual(len(new_addresses), 1)
1310         prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
1311                              strict=False)
1312         self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
1313
1314         # check FIB for new default route
1315         default_routes = self.get_default_routes(fib)
1316         self.assertEqual(len(default_routes), 1)
1317         dr = default_routes[0]
1318         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1319         self.assertEqual(dr['next_hop'], router_address)
1320
1321         # send RA to delete default route
1322         packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1323         self.pg0.add_stream([packet])
1324         self.pg_start()
1325
1326         self.sleep(0.1)
1327
1328         # check that default route is deleted
1329         fib = self.vapi.ip_route_dump(0, True)
1330         default_routes = self.get_default_routes(fib)
1331         self.assertEqual(len(default_routes), 0)
1332
1333         self.sleep(0.1)
1334
1335         # send RA
1336         packet = self.create_ra_packet(self.pg0)
1337         self.pg0.add_stream([packet])
1338         self.pg_start()
1339
1340         self.sleep(0.1)
1341
1342         # check FIB for new default route
1343         fib = self.vapi.ip_route_dump(0, True)
1344         default_routes = self.get_default_routes(fib)
1345         self.assertEqual(len(default_routes), 1)
1346         dr = default_routes[0]
1347         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1348         self.assertEqual(dr['next_hop'], router_address)
1349
1350         # send RA, updating router lifetime to 1s
1351         packet = self.create_ra_packet(self.pg0, 1)
1352         self.pg0.add_stream([packet])
1353         self.pg_start()
1354
1355         self.sleep(0.1)
1356
1357         # check that default route still exists
1358         fib = self.vapi.ip_route_dump(0, True)
1359         default_routes = self.get_default_routes(fib)
1360         self.assertEqual(len(default_routes), 1)
1361         dr = default_routes[0]
1362         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1363         self.assertEqual(dr['next_hop'], router_address)
1364
1365         self.sleep(1)
1366
1367         # check that default route is deleted
1368         fib = self.vapi.ip_route_dump(0, True)
1369         default_routes = self.get_default_routes(fib)
1370         self.assertEqual(len(default_routes), 0)
1371
1372         # check FIB still contains the SLAAC address
1373         addresses = set(self.get_interface_addresses(fib, self.pg0))
1374         new_addresses = addresses.difference(initial_addresses)
1375
1376         self.assertEqual(len(new_addresses), 1)
1377         prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
1378                              strict=False)
1379         self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
1380
1381         self.sleep(1)
1382
1383         # check that SLAAC address is deleted
1384         fib = self.vapi.ip_route_dump(0, True)
1385         addresses = set(self.get_interface_addresses(fib, self.pg0))
1386         new_addresses = addresses.difference(initial_addresses)
1387         self.assertEqual(len(new_addresses), 0)
1388
1389
1390 class IPv6NDProxyTest(TestIPv6ND):
1391     """ IPv6 ND ProxyTest Case """
1392
1393     @classmethod
1394     def setUpClass(cls):
1395         super(IPv6NDProxyTest, cls).setUpClass()
1396
1397     @classmethod
1398     def tearDownClass(cls):
1399         super(IPv6NDProxyTest, cls).tearDownClass()
1400
1401     def setUp(self):
1402         super(IPv6NDProxyTest, self).setUp()
1403
1404         # create 3 pg interfaces
1405         self.create_pg_interfaces(range(3))
1406
1407         # pg0 is the master interface, with the configured subnet
1408         self.pg0.admin_up()
1409         self.pg0.config_ip6()
1410         self.pg0.resolve_ndp()
1411
1412         self.pg1.ip6_enable()
1413         self.pg2.ip6_enable()
1414
1415     def tearDown(self):
1416         super(IPv6NDProxyTest, self).tearDown()
1417
1418     def test_nd_proxy(self):
1419         """ IPv6 Proxy ND """
1420
1421         #
1422         # Generate some hosts in the subnet that we are proxying
1423         #
1424         self.pg0.generate_remote_hosts(8)
1425
1426         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1427         d = inet_ntop(AF_INET6, nsma)
1428
1429         #
1430         # Send an NS for one of those remote hosts on one of the proxy links
1431         # expect no response since it's from an address that is not
1432         # on the link that has the prefix configured
1433         #
1434         ns_pg1 = (Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac) /
1435                   IPv6(dst=d,
1436                        src=self.pg0._remote_hosts[2].ip6) /
1437                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1438                   ICMPv6NDOptSrcLLAddr(
1439                       lladdr=self.pg0._remote_hosts[2].mac))
1440
1441         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1442
1443         #
1444         # Add proxy support for the host
1445         #
1446         self.vapi.ip6nd_proxy_add_del(
1447             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1448             sw_if_index=self.pg1.sw_if_index)
1449
1450         #
1451         # try that NS again. this time we expect an NA back
1452         #
1453         self.send_and_expect_na(self.pg1, ns_pg1,
1454                                 "NS to proxy entry",
1455                                 dst_ip=self.pg0._remote_hosts[2].ip6,
1456                                 tgt_ip=self.pg0.local_ip6)
1457
1458         #
1459         # ... and that we have an entry in the ND cache
1460         #
1461         self.assertTrue(find_nbr(self,
1462                                  self.pg1.sw_if_index,
1463                                  self.pg0._remote_hosts[2].ip6))
1464
1465         #
1466         # ... and we can route traffic to it
1467         #
1468         t = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1469              IPv6(dst=self.pg0._remote_hosts[2].ip6,
1470                   src=self.pg0.remote_ip6) /
1471              inet6.UDP(sport=10000, dport=20000) /
1472              Raw('\xa5' * 100))
1473
1474         self.pg0.add_stream(t)
1475         self.pg_enable_capture(self.pg_interfaces)
1476         self.pg_start()
1477         rx = self.pg1.get_capture(1)
1478         rx = rx[0]
1479
1480         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1481         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1482
1483         self.assertEqual(rx[IPv6].src,
1484                          t[IPv6].src)
1485         self.assertEqual(rx[IPv6].dst,
1486                          t[IPv6].dst)
1487
1488         #
1489         # Test we proxy for the host on the main interface
1490         #
1491         ns_pg0 = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
1492                   IPv6(dst=d, src=self.pg0.remote_ip6) /
1493                   ICMPv6ND_NS(
1494                       tgt=self.pg0._remote_hosts[2].ip6) /
1495                   ICMPv6NDOptSrcLLAddr(
1496                       lladdr=self.pg0.remote_mac))
1497
1498         self.send_and_expect_na(self.pg0, ns_pg0,
1499                                 "NS to proxy entry on main",
1500                                 tgt_ip=self.pg0._remote_hosts[2].ip6,
1501                                 dst_ip=self.pg0.remote_ip6)
1502
1503         #
1504         # Setup and resolve proxy for another host on another interface
1505         #
1506         ns_pg2 = (Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac) /
1507                   IPv6(dst=d,
1508                        src=self.pg0._remote_hosts[3].ip6) /
1509                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1510                   ICMPv6NDOptSrcLLAddr(
1511                       lladdr=self.pg0._remote_hosts[2].mac))
1512
1513         self.vapi.ip6nd_proxy_add_del(
1514             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1515             sw_if_index=self.pg2.sw_if_index)
1516
1517         self.send_and_expect_na(self.pg2, ns_pg2,
1518                                 "NS to proxy entry other interface",
1519                                 dst_ip=self.pg0._remote_hosts[3].ip6,
1520                                 tgt_ip=self.pg0.local_ip6)
1521
1522         self.assertTrue(find_nbr(self,
1523                                  self.pg2.sw_if_index,
1524                                  self.pg0._remote_hosts[3].ip6))
1525
1526         #
1527         # hosts can communicate. pg2->pg1
1528         #
1529         t2 = (Ether(dst=self.pg2.local_mac,
1530                     src=self.pg0.remote_hosts[3].mac) /
1531               IPv6(dst=self.pg0._remote_hosts[2].ip6,
1532                    src=self.pg0._remote_hosts[3].ip6) /
1533               inet6.UDP(sport=10000, dport=20000) /
1534               Raw('\xa5' * 100))
1535
1536         self.pg2.add_stream(t2)
1537         self.pg_enable_capture(self.pg_interfaces)
1538         self.pg_start()
1539         rx = self.pg1.get_capture(1)
1540         rx = rx[0]
1541
1542         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1543         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1544
1545         self.assertEqual(rx[IPv6].src,
1546                          t2[IPv6].src)
1547         self.assertEqual(rx[IPv6].dst,
1548                          t2[IPv6].dst)
1549
1550         #
1551         # remove the proxy configs
1552         #
1553         self.vapi.ip6nd_proxy_add_del(
1554             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1555             sw_if_index=self.pg1.sw_if_index, is_del=1)
1556         self.vapi.ip6nd_proxy_add_del(
1557             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1558             sw_if_index=self.pg2.sw_if_index, is_del=1)
1559
1560         self.assertFalse(find_nbr(self,
1561                                   self.pg2.sw_if_index,
1562                                   self.pg0._remote_hosts[3].ip6))
1563         self.assertFalse(find_nbr(self,
1564                                   self.pg1.sw_if_index,
1565                                   self.pg0._remote_hosts[2].ip6))
1566
1567         #
1568         # no longer proxy-ing...
1569         #
1570         self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1571         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1572         self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1573
1574         #
1575         # no longer forwarding. traffic generates NS out of the glean/main
1576         # interface
1577         #
1578         self.pg2.add_stream(t2)
1579         self.pg_enable_capture(self.pg_interfaces)
1580         self.pg_start()
1581
1582         rx = self.pg0.get_capture(1)
1583
1584         self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1585
1586
1587 class TestIPNull(VppTestCase):
1588     """ IPv6 routes via NULL """
1589
1590     @classmethod
1591     def setUpClass(cls):
1592         super(TestIPNull, cls).setUpClass()
1593
1594     @classmethod
1595     def tearDownClass(cls):
1596         super(TestIPNull, cls).tearDownClass()
1597
1598     def setUp(self):
1599         super(TestIPNull, self).setUp()
1600
1601         # create 2 pg interfaces
1602         self.create_pg_interfaces(range(1))
1603
1604         for i in self.pg_interfaces:
1605             i.admin_up()
1606             i.config_ip6()
1607             i.resolve_ndp()
1608
1609     def tearDown(self):
1610         super(TestIPNull, self).tearDown()
1611         for i in self.pg_interfaces:
1612             i.unconfig_ip6()
1613             i.admin_down()
1614
1615     def test_ip_null(self):
1616         """ IP NULL route """
1617
1618         p = (Ether(src=self.pg0.remote_mac,
1619                    dst=self.pg0.local_mac) /
1620              IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
1621              inet6.UDP(sport=1234, dport=1234) /
1622              Raw('\xa5' * 100))
1623
1624         #
1625         # A route via IP NULL that will reply with ICMP unreachables
1626         #
1627         ip_unreach = VppIpRoute(
1628             self, "2001::", 64,
1629             [VppRoutePath("::", 0xffffffff,
1630                           type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH)])
1631         ip_unreach.add_vpp_config()
1632
1633         self.pg0.add_stream(p)
1634         self.pg_enable_capture(self.pg_interfaces)
1635         self.pg_start()
1636
1637         rx = self.pg0.get_capture(1)
1638         rx = rx[0]
1639         icmp = rx[ICMPv6DestUnreach]
1640
1641         # 0 = "No route to destination"
1642         self.assertEqual(icmp.code, 0)
1643
1644         # ICMP is rate limited. pause a bit
1645         self.sleep(1)
1646
1647         #
1648         # A route via IP NULL that will reply with ICMP prohibited
1649         #
1650         ip_prohibit = VppIpRoute(
1651             self, "2001::1", 128,
1652             [VppRoutePath("::", 0xffffffff,
1653                           type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT)])
1654         ip_prohibit.add_vpp_config()
1655
1656         self.pg0.add_stream(p)
1657         self.pg_enable_capture(self.pg_interfaces)
1658         self.pg_start()
1659
1660         rx = self.pg0.get_capture(1)
1661         rx = rx[0]
1662         icmp = rx[ICMPv6DestUnreach]
1663
1664         # 1 = "Communication with destination administratively prohibited"
1665         self.assertEqual(icmp.code, 1)
1666
1667
1668 class TestIPDisabled(VppTestCase):
1669     """ IPv6 disabled """
1670
1671     @classmethod
1672     def setUpClass(cls):
1673         super(TestIPDisabled, cls).setUpClass()
1674
1675     @classmethod
1676     def tearDownClass(cls):
1677         super(TestIPDisabled, cls).tearDownClass()
1678
1679     def setUp(self):
1680         super(TestIPDisabled, self).setUp()
1681
1682         # create 2 pg interfaces
1683         self.create_pg_interfaces(range(2))
1684
1685         # PG0 is IP enabled
1686         self.pg0.admin_up()
1687         self.pg0.config_ip6()
1688         self.pg0.resolve_ndp()
1689
1690         # PG 1 is not IP enabled
1691         self.pg1.admin_up()
1692
1693     def tearDown(self):
1694         super(TestIPDisabled, self).tearDown()
1695         for i in self.pg_interfaces:
1696             i.unconfig_ip4()
1697             i.admin_down()
1698
1699     def test_ip_disabled(self):
1700         """ IP Disabled """
1701
1702         #
1703         # An (S,G).
1704         # one accepting interface, pg0, 2 forwarding interfaces
1705         #
1706         route_ff_01 = VppIpMRoute(
1707             self,
1708             "::",
1709             "ffef::1", 128,
1710             MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
1711             [VppMRoutePath(self.pg1.sw_if_index,
1712                            MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT),
1713              VppMRoutePath(self.pg0.sw_if_index,
1714                            MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)])
1715         route_ff_01.add_vpp_config()
1716
1717         pu = (Ether(src=self.pg1.remote_mac,
1718                     dst=self.pg1.local_mac) /
1719               IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
1720               inet6.UDP(sport=1234, dport=1234) /
1721               Raw('\xa5' * 100))
1722         pm = (Ether(src=self.pg1.remote_mac,
1723                     dst=self.pg1.local_mac) /
1724               IPv6(src="2001::1", dst="ffef::1") /
1725               inet6.UDP(sport=1234, dport=1234) /
1726               Raw('\xa5' * 100))
1727
1728         #
1729         # PG1 does not forward IP traffic
1730         #
1731         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1732         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1733
1734         #
1735         # IP enable PG1
1736         #
1737         self.pg1.config_ip6()
1738
1739         #
1740         # Now we get packets through
1741         #
1742         self.pg1.add_stream(pu)
1743         self.pg_enable_capture(self.pg_interfaces)
1744         self.pg_start()
1745         rx = self.pg0.get_capture(1)
1746
1747         self.pg1.add_stream(pm)
1748         self.pg_enable_capture(self.pg_interfaces)
1749         self.pg_start()
1750         rx = self.pg0.get_capture(1)
1751
1752         #
1753         # Disable PG1
1754         #
1755         self.pg1.unconfig_ip6()
1756
1757         #
1758         # PG1 does not forward IP traffic
1759         #
1760         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1761         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1762
1763
1764 class TestIP6LoadBalance(VppTestCase):
1765     """ IPv6 Load-Balancing """
1766
1767     @classmethod
1768     def setUpClass(cls):
1769         super(TestIP6LoadBalance, cls).setUpClass()
1770
1771     @classmethod
1772     def tearDownClass(cls):
1773         super(TestIP6LoadBalance, cls).tearDownClass()
1774
1775     def setUp(self):
1776         super(TestIP6LoadBalance, self).setUp()
1777
1778         self.create_pg_interfaces(range(5))
1779
1780         mpls_tbl = VppMplsTable(self, 0)
1781         mpls_tbl.add_vpp_config()
1782
1783         for i in self.pg_interfaces:
1784             i.admin_up()
1785             i.config_ip6()
1786             i.resolve_ndp()
1787             i.enable_mpls()
1788
1789     def tearDown(self):
1790         for i in self.pg_interfaces:
1791             i.unconfig_ip6()
1792             i.admin_down()
1793             i.disable_mpls()
1794         super(TestIP6LoadBalance, self).tearDown()
1795
1796     def pg_send(self, input, pkts):
1797         self.vapi.cli("clear trace")
1798         input.add_stream(pkts)
1799         self.pg_enable_capture(self.pg_interfaces)
1800         self.pg_start()
1801
1802     def send_and_expect_load_balancing(self, input, pkts, outputs):
1803         self.pg_send(input, pkts)
1804         for oo in outputs:
1805             rx = oo._get_capture(1)
1806             self.assertNotEqual(0, len(rx))
1807
1808     def send_and_expect_one_itf(self, input, pkts, itf):
1809         self.pg_send(input, pkts)
1810         rx = itf.get_capture(len(pkts))
1811
1812     def test_ip6_load_balance(self):
1813         """ IPv6 Load-Balancing """
1814
1815         #
1816         # An array of packets that differ only in the destination port
1817         #  - IP only
1818         #  - MPLS EOS
1819         #  - MPLS non-EOS
1820         #  - MPLS non-EOS with an entropy label
1821         #
1822         port_ip_pkts = []
1823         port_mpls_pkts = []
1824         port_mpls_neos_pkts = []
1825         port_ent_pkts = []
1826
1827         #
1828         # An array of packets that differ only in the source address
1829         #
1830         src_ip_pkts = []
1831         src_mpls_pkts = []
1832
1833         for ii in range(NUM_PKTS):
1834             port_ip_hdr = (
1835                 IPv6(dst="3000::1", src="3000:1::1") /
1836                 inet6.UDP(sport=1234, dport=1234 + ii) /
1837                 Raw('\xa5' * 100))
1838             port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1839                                        dst=self.pg0.local_mac) /
1840                                  port_ip_hdr))
1841             port_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1842                                          dst=self.pg0.local_mac) /
1843                                    MPLS(label=66, ttl=2) /
1844                                    port_ip_hdr))
1845             port_mpls_neos_pkts.append((Ether(src=self.pg0.remote_mac,
1846                                               dst=self.pg0.local_mac) /
1847                                         MPLS(label=67, ttl=2) /
1848                                         MPLS(label=77, ttl=2) /
1849                                         port_ip_hdr))
1850             port_ent_pkts.append((Ether(src=self.pg0.remote_mac,
1851                                         dst=self.pg0.local_mac) /
1852                                   MPLS(label=67, ttl=2) /
1853                                   MPLS(label=14, ttl=2) /
1854                                   MPLS(label=999, ttl=2) /
1855                                   port_ip_hdr))
1856             src_ip_hdr = (
1857                 IPv6(dst="3000::1", src="3000:1::%d" % ii) /
1858                 inet6.UDP(sport=1234, dport=1234) /
1859                 Raw('\xa5' * 100))
1860             src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1861                                       dst=self.pg0.local_mac) /
1862                                 src_ip_hdr))
1863             src_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1864                                         dst=self.pg0.local_mac) /
1865                                   MPLS(label=66, ttl=2) /
1866                                   src_ip_hdr))
1867
1868         #
1869         # A route for the IP packets
1870         #
1871         route_3000_1 = VppIpRoute(self, "3000::1", 128,
1872                                   [VppRoutePath(self.pg1.remote_ip6,
1873                                                 self.pg1.sw_if_index),
1874                                    VppRoutePath(self.pg2.remote_ip6,
1875                                                 self.pg2.sw_if_index)])
1876         route_3000_1.add_vpp_config()
1877
1878         #
1879         # a local-label for the EOS packets
1880         #
1881         binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
1882         binding.add_vpp_config()
1883
1884         #
1885         # An MPLS route for the non-EOS packets
1886         #
1887         route_67 = VppMplsRoute(self, 67, 0,
1888                                 [VppRoutePath(self.pg1.remote_ip6,
1889                                               self.pg1.sw_if_index,
1890                                               labels=[67]),
1891                                  VppRoutePath(self.pg2.remote_ip6,
1892                                               self.pg2.sw_if_index,
1893                                               labels=[67])])
1894         route_67.add_vpp_config()
1895
1896         #
1897         # inject the packet on pg0 - expect load-balancing across the 2 paths
1898         #  - since the default hash config is to use IP src,dst and port
1899         #    src,dst
1900         # We are not going to ensure equal amounts of packets across each link,
1901         # since the hash algorithm is statistical and therefore this can never
1902         # be guaranteed. But with 64 different packets we do expect some
1903         # balancing. So instead just ensure there is traffic on each link.
1904         #
1905         self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
1906                                             [self.pg1, self.pg2])
1907         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1908                                             [self.pg1, self.pg2])
1909         self.send_and_expect_load_balancing(self.pg0, port_mpls_pkts,
1910                                             [self.pg1, self.pg2])
1911         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1912                                             [self.pg1, self.pg2])
1913         self.send_and_expect_load_balancing(self.pg0, port_mpls_neos_pkts,
1914                                             [self.pg1, self.pg2])
1915
1916         #
1917         # The packets with Entropy label in should not load-balance,
1918         # since the Entropy value is fixed.
1919         #
1920         self.send_and_expect_one_itf(self.pg0, port_ent_pkts, self.pg1)
1921
1922         #
1923         # change the flow hash config so it's only IP src,dst
1924         #  - now only the stream with differing source address will
1925         #    load-balance
1926         #
1927         self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=0, dport=0,
1928                                    is_ipv6=1)
1929
1930         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1931                                             [self.pg1, self.pg2])
1932         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1933                                             [self.pg1, self.pg2])
1934         self.send_and_expect_one_itf(self.pg0, port_ip_pkts, self.pg2)
1935
1936         #
1937         # change the flow hash config back to defaults
1938         #
1939         self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=1, dport=1,
1940                                    is_ipv6=1)
1941
1942         #
1943         # Recursive prefixes
1944         #  - testing that 2 stages of load-balancing occurs and there is no
1945         #    polarisation (i.e. only 2 of 4 paths are used)
1946         #
1947         port_pkts = []
1948         src_pkts = []
1949
1950         for ii in range(257):
1951             port_pkts.append((Ether(src=self.pg0.remote_mac,
1952                                     dst=self.pg0.local_mac) /
1953                               IPv6(dst="4000::1",
1954                                    src="4000:1::1") /
1955                               inet6.UDP(sport=1234,
1956                                         dport=1234 + ii) /
1957                               Raw('\xa5' * 100)))
1958             src_pkts.append((Ether(src=self.pg0.remote_mac,
1959                                    dst=self.pg0.local_mac) /
1960                              IPv6(dst="4000::1",
1961                                   src="4000:1::%d" % ii) /
1962                              inet6.UDP(sport=1234, dport=1234) /
1963                              Raw('\xa5' * 100)))
1964
1965         route_3000_2 = VppIpRoute(self, "3000::2", 128,
1966                                   [VppRoutePath(self.pg3.remote_ip6,
1967                                                 self.pg3.sw_if_index),
1968                                    VppRoutePath(self.pg4.remote_ip6,
1969                                                 self.pg4.sw_if_index)])
1970         route_3000_2.add_vpp_config()
1971
1972         route_4000_1 = VppIpRoute(self, "4000::1", 128,
1973                                   [VppRoutePath("3000::1",
1974                                                 0xffffffff),
1975                                    VppRoutePath("3000::2",
1976                                                 0xffffffff)])
1977         route_4000_1.add_vpp_config()
1978
1979         #
1980         # inject the packet on pg0 - expect load-balancing across all 4 paths
1981         #
1982         self.vapi.cli("clear trace")
1983         self.send_and_expect_load_balancing(self.pg0, port_pkts,
1984                                             [self.pg1, self.pg2,
1985                                              self.pg3, self.pg4])
1986         self.send_and_expect_load_balancing(self.pg0, src_pkts,
1987                                             [self.pg1, self.pg2,
1988                                              self.pg3, self.pg4])
1989
1990         #
1991         # Recursive prefixes
1992         #  - testing that 2 stages of load-balancing no choices
1993         #
1994         port_pkts = []
1995
1996         for ii in range(257):
1997             port_pkts.append((Ether(src=self.pg0.remote_mac,
1998                                     dst=self.pg0.local_mac) /
1999                               IPv6(dst="6000::1",
2000                                    src="6000:1::1") /
2001                               inet6.UDP(sport=1234,
2002                                         dport=1234 + ii) /
2003                               Raw('\xa5' * 100)))
2004
2005         route_5000_2 = VppIpRoute(self, "5000::2", 128,
2006                                   [VppRoutePath(self.pg3.remote_ip6,
2007                                                 self.pg3.sw_if_index)])
2008         route_5000_2.add_vpp_config()
2009
2010         route_6000_1 = VppIpRoute(self, "6000::1", 128,
2011                                   [VppRoutePath("5000::2",
2012                                                 0xffffffff)])
2013         route_6000_1.add_vpp_config()
2014
2015         #
2016         # inject the packet on pg0 - expect load-balancing across all 4 paths
2017         #
2018         self.vapi.cli("clear trace")
2019         self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg3)
2020
2021
2022 class TestIP6Punt(VppTestCase):
2023     """ IPv6 Punt Police/Redirect """
2024
2025     @classmethod
2026     def setUpClass(cls):
2027         super(TestIP6Punt, cls).setUpClass()
2028
2029     @classmethod
2030     def tearDownClass(cls):
2031         super(TestIP6Punt, cls).tearDownClass()
2032
2033     def setUp(self):
2034         super(TestIP6Punt, self).setUp()
2035
2036         self.create_pg_interfaces(range(4))
2037
2038         for i in self.pg_interfaces:
2039             i.admin_up()
2040             i.config_ip6()
2041             i.resolve_ndp()
2042
2043     def tearDown(self):
2044         super(TestIP6Punt, self).tearDown()
2045         for i in self.pg_interfaces:
2046             i.unconfig_ip6()
2047             i.admin_down()
2048
2049     def test_ip_punt(self):
2050         """ IP6 punt police and redirect """
2051
2052         p = (Ether(src=self.pg0.remote_mac,
2053                    dst=self.pg0.local_mac) /
2054              IPv6(src=self.pg0.remote_ip6,
2055                   dst=self.pg0.local_ip6) /
2056              inet6.TCP(sport=1234, dport=1234) /
2057              Raw('\xa5' * 100))
2058
2059         pkts = p * 1025
2060
2061         #
2062         # Configure a punt redirect via pg1.
2063         #
2064         nh_addr = self.pg1.remote_ip6
2065         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2066                                    self.pg1.sw_if_index,
2067                                    nh_addr)
2068
2069         self.send_and_expect(self.pg0, pkts, self.pg1)
2070
2071         #
2072         # add a policer
2073         #
2074         policer = self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
2075                                             rate_type=1)
2076         self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
2077
2078         self.vapi.cli("clear trace")
2079         self.pg0.add_stream(pkts)
2080         self.pg_enable_capture(self.pg_interfaces)
2081         self.pg_start()
2082
2083         #
2084         # the number of packet received should be greater than 0,
2085         # but not equal to the number sent, since some were policed
2086         #
2087         rx = self.pg1._get_capture(1)
2088         self.assertGreater(len(rx), 0)
2089         self.assertLess(len(rx), len(pkts))
2090
2091         #
2092         # remove the policer. back to full rx
2093         #
2094         self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
2095         self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
2096                                   rate_type=1, is_add=0)
2097         self.send_and_expect(self.pg0, pkts, self.pg1)
2098
2099         #
2100         # remove the redirect. expect full drop.
2101         #
2102         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2103                                    self.pg1.sw_if_index,
2104                                    nh_addr,
2105                                    is_add=0)
2106         self.send_and_assert_no_replies(self.pg0, pkts,
2107                                         "IP no punt config")
2108
2109         #
2110         # Add a redirect that is not input port selective
2111         #
2112         self.vapi.ip_punt_redirect(0xffffffff,
2113                                    self.pg1.sw_if_index,
2114                                    nh_addr)
2115         self.send_and_expect(self.pg0, pkts, self.pg1)
2116
2117         self.vapi.ip_punt_redirect(0xffffffff,
2118                                    self.pg1.sw_if_index,
2119                                    nh_addr,
2120                                    is_add=0)
2121
2122     def test_ip_punt_dump(self):
2123         """ IP6 punt redirect dump"""
2124
2125         #
2126         # Configure a punt redirects
2127         #
2128         nh_addr = self.pg3.remote_ip6
2129         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2130                                    self.pg3.sw_if_index,
2131                                    nh_addr)
2132         self.vapi.ip_punt_redirect(self.pg1.sw_if_index,
2133                                    self.pg3.sw_if_index,
2134                                    nh_addr)
2135         self.vapi.ip_punt_redirect(self.pg2.sw_if_index,
2136                                    self.pg3.sw_if_index,
2137                                    '0::0')
2138
2139         #
2140         # Dump pg0 punt redirects
2141         #
2142         punts = self.vapi.ip_punt_redirect_dump(self.pg0.sw_if_index,
2143                                                 is_ipv6=1)
2144         for p in punts:
2145             self.assertEqual(p.punt.rx_sw_if_index, self.pg0.sw_if_index)
2146
2147         #
2148         # Dump punt redirects for all interfaces
2149         #
2150         punts = self.vapi.ip_punt_redirect_dump(0xffffffff, is_ipv6=1)
2151         self.assertEqual(len(punts), 3)
2152         for p in punts:
2153             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
2154         self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
2155         self.assertEqual(str(punts[2].punt.nh), '::')
2156
2157
2158 class TestIPDeag(VppTestCase):
2159     """ IPv6 Deaggregate Routes """
2160
2161     @classmethod
2162     def setUpClass(cls):
2163         super(TestIPDeag, cls).setUpClass()
2164
2165     @classmethod
2166     def tearDownClass(cls):
2167         super(TestIPDeag, cls).tearDownClass()
2168
2169     def setUp(self):
2170         super(TestIPDeag, self).setUp()
2171
2172         self.create_pg_interfaces(range(3))
2173
2174         for i in self.pg_interfaces:
2175             i.admin_up()
2176             i.config_ip6()
2177             i.resolve_ndp()
2178
2179     def tearDown(self):
2180         super(TestIPDeag, self).tearDown()
2181         for i in self.pg_interfaces:
2182             i.unconfig_ip6()
2183             i.admin_down()
2184
2185     def test_ip_deag(self):
2186         """ IP Deag Routes """
2187
2188         #
2189         # Create a table to be used for:
2190         #  1 - another destination address lookup
2191         #  2 - a source address lookup
2192         #
2193         table_dst = VppIpTable(self, 1, is_ip6=1)
2194         table_src = VppIpTable(self, 2, is_ip6=1)
2195         table_dst.add_vpp_config()
2196         table_src.add_vpp_config()
2197
2198         #
2199         # Add a route in the default table to point to a deag/
2200         # second lookup in each of these tables
2201         #
2202         route_to_dst = VppIpRoute(self, "1::1", 128,
2203                                   [VppRoutePath("::",
2204                                                 0xffffffff,
2205                                                 nh_table_id=1)])
2206         route_to_src = VppIpRoute(
2207             self, "1::2", 128,
2208             [VppRoutePath("::",
2209                           0xffffffff,
2210                           nh_table_id=2,
2211                           type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP)])
2212
2213         route_to_dst.add_vpp_config()
2214         route_to_src.add_vpp_config()
2215
2216         #
2217         # packets to these destination are dropped, since they'll
2218         # hit the respective default routes in the second table
2219         #
2220         p_dst = (Ether(src=self.pg0.remote_mac,
2221                        dst=self.pg0.local_mac) /
2222                  IPv6(src="5::5", dst="1::1") /
2223                  inet6.TCP(sport=1234, dport=1234) /
2224                  Raw('\xa5' * 100))
2225         p_src = (Ether(src=self.pg0.remote_mac,
2226                        dst=self.pg0.local_mac) /
2227                  IPv6(src="2::2", dst="1::2") /
2228                  inet6.TCP(sport=1234, dport=1234) /
2229                  Raw('\xa5' * 100))
2230         pkts_dst = p_dst * 257
2231         pkts_src = p_src * 257
2232
2233         self.send_and_assert_no_replies(self.pg0, pkts_dst,
2234                                         "IP in dst table")
2235         self.send_and_assert_no_replies(self.pg0, pkts_src,
2236                                         "IP in src table")
2237
2238         #
2239         # add a route in the dst table to forward via pg1
2240         #
2241         route_in_dst = VppIpRoute(self, "1::1", 128,
2242                                   [VppRoutePath(self.pg1.remote_ip6,
2243                                                 self.pg1.sw_if_index)],
2244                                   table_id=1)
2245         route_in_dst.add_vpp_config()
2246
2247         self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2248
2249         #
2250         # add a route in the src table to forward via pg2
2251         #
2252         route_in_src = VppIpRoute(self, "2::2", 128,
2253                                   [VppRoutePath(self.pg2.remote_ip6,
2254                                                 self.pg2.sw_if_index)],
2255                                   table_id=2)
2256         route_in_src.add_vpp_config()
2257         self.send_and_expect(self.pg0, pkts_src, self.pg2)
2258
2259         #
2260         # loop in the lookup DP
2261         #
2262         route_loop = VppIpRoute(self, "3::3", 128,
2263                                 [VppRoutePath("::",
2264                                               0xffffffff)])
2265         route_loop.add_vpp_config()
2266
2267         p_l = (Ether(src=self.pg0.remote_mac,
2268                      dst=self.pg0.local_mac) /
2269                IPv6(src="3::4", dst="3::3") /
2270                inet6.TCP(sport=1234, dport=1234) /
2271                Raw('\xa5' * 100))
2272
2273         self.send_and_assert_no_replies(self.pg0, p_l * 257,
2274                                         "IP lookup loop")
2275
2276
2277 class TestIP6Input(VppTestCase):
2278     """ IPv6 Input Exception Test Cases """
2279
2280     @classmethod
2281     def setUpClass(cls):
2282         super(TestIP6Input, cls).setUpClass()
2283
2284     @classmethod
2285     def tearDownClass(cls):
2286         super(TestIP6Input, cls).tearDownClass()
2287
2288     def setUp(self):
2289         super(TestIP6Input, self).setUp()
2290
2291         self.create_pg_interfaces(range(2))
2292
2293         for i in self.pg_interfaces:
2294             i.admin_up()
2295             i.config_ip6()
2296             i.resolve_ndp()
2297
2298     def tearDown(self):
2299         super(TestIP6Input, self).tearDown()
2300         for i in self.pg_interfaces:
2301             i.unconfig_ip6()
2302             i.admin_down()
2303
2304     def test_ip_input_icmp_reply(self):
2305         """ IP6 Input Exception - Return ICMP (3,0) """
2306         #
2307         # hop limit - ICMP replies
2308         #
2309         p_version = (Ether(src=self.pg0.remote_mac,
2310                            dst=self.pg0.local_mac) /
2311                      IPv6(src=self.pg0.remote_ip6,
2312                           dst=self.pg1.remote_ip6,
2313                           hlim=1) /
2314                      inet6.UDP(sport=1234, dport=1234) /
2315                      Raw('\xa5' * 100))
2316
2317         rx = self.send_and_expect(self.pg0, p_version * NUM_PKTS, self.pg0)
2318         rx = rx[0]
2319         icmp = rx[ICMPv6TimeExceeded]
2320
2321         # 0: "hop limit exceeded in transit",
2322         self.assertEqual((icmp.type, icmp.code), (3, 0))
2323
2324     icmpv6_data = '\x0a' * 18
2325     all_0s = "::"
2326     all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
2327
2328     @parameterized.expand([
2329         # Name, src, dst, l4proto, msg, timeout
2330         ("src='iface',   dst='iface'", None, None,
2331          inet6.UDP(sport=1234, dport=1234), "funky version", None),
2332         ("src='All 0's', dst='iface'", all_0s, None,
2333          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2334         ("src='iface',   dst='All 0's'", None, all_0s,
2335          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2336         ("src='All 1's', dst='iface'", all_1s, None,
2337          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2338         ("src='iface',   dst='All 1's'", None, all_1s,
2339          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2340         ("src='All 1's', dst='All 1's'", all_1s, all_1s,
2341          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2342
2343     ])
2344     def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
2345
2346         self._testMethodDoc = 'IPv6 Input Exception - %s' % name
2347
2348         p_version = (Ether(src=self.pg0.remote_mac,
2349                            dst=self.pg0.local_mac) /
2350                      IPv6(src=src or self.pg0.remote_ip6,
2351                           dst=dst or self.pg1.remote_ip6,
2352                           version=3) /
2353                      l4 /
2354                      Raw('\xa5' * 100))
2355
2356         self.send_and_assert_no_replies(self.pg0, p_version * NUM_PKTS,
2357                                         remark=msg or "",
2358                                         timeout=timeout)
2359
2360     def test_hop_by_hop(self):
2361         """ Hop-by-hop header test """
2362
2363         p = (Ether(src=self.pg0.remote_mac,
2364                    dst=self.pg0.local_mac) /
2365              IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
2366              IPv6ExtHdrHopByHop() /
2367              inet6.UDP(sport=1234, dport=1234) /
2368              Raw('\xa5' * 100))
2369
2370         self.pg0.add_stream(p)
2371         self.pg_enable_capture(self.pg_interfaces)
2372         self.pg_start()
2373
2374 if __name__ == '__main__':
2375     unittest.main(testRunner=VppTestRunner)