tests: Remove the unrequired VPP IP address/prefix class wrappers
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python3
2
3 import socket
4 import unittest
5
6 from parameterized import parameterized
7 import scapy.compat
8 import scapy.layers.inet6 as inet6
9 from scapy.contrib.mpls import MPLS
10 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
11     ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
12     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
13     ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, IPv6ExtHdrHopByHop
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.packet import Raw
16 from scapy.utils import inet_pton, inet_ntop
17 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
18     in6_mactoifaceid
19 from six import moves
20
21 from framework import VppTestCase, VppTestRunner
22 from util import ppp, ip6_normalize, mk_ll_addr
23 from vpp_ip import DpoProto
24 from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
25     VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
26     VppMplsRoute, VppMplsTable, VppIpTable, FibPathType, \
27     VppIpInterfaceAddress
28 from vpp_neighbor import find_nbr, VppNeighbor
29 from vpp_pg_interface import is_ipv6_misc
30 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
31 from ipaddress import IPv6Network, IPv6Address
32
33 AF_INET6 = socket.AF_INET6
34
35 try:
36     text_type = unicode
37 except NameError:
38     text_type = str
39
40 NUM_PKTS = 67
41
42
43 class TestIPv6ND(VppTestCase):
44     def validate_ra(self, intf, rx, dst_ip=None):
45         if not dst_ip:
46             dst_ip = intf.remote_ip6
47
48         # unicasted packets must come to the unicast mac
49         self.assertEqual(rx[Ether].dst, intf.remote_mac)
50
51         # and from the router's MAC
52         self.assertEqual(rx[Ether].src, intf.local_mac)
53
54         # the rx'd RA should be addressed to the sender's source
55         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
56         self.assertEqual(in6_ptop(rx[IPv6].dst),
57                          in6_ptop(dst_ip))
58
59         # and come from the router's link local
60         self.assertTrue(in6_islladdr(rx[IPv6].src))
61         self.assertEqual(in6_ptop(rx[IPv6].src),
62                          in6_ptop(mk_ll_addr(intf.local_mac)))
63
64     def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
65         if not dst_ip:
66             dst_ip = intf.remote_ip6
67         if not tgt_ip:
68             dst_ip = intf.local_ip6
69
70         # unicasted packets must come to the unicast mac
71         self.assertEqual(rx[Ether].dst, intf.remote_mac)
72
73         # and from the router's MAC
74         self.assertEqual(rx[Ether].src, intf.local_mac)
75
76         # the rx'd NA should be addressed to the sender's source
77         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
78         self.assertEqual(in6_ptop(rx[IPv6].dst),
79                          in6_ptop(dst_ip))
80
81         # and come from the target address
82         self.assertEqual(
83             in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
84
85         # Dest link-layer options should have the router's MAC
86         dll = rx[ICMPv6NDOptDstLLAddr]
87         self.assertEqual(dll.lladdr, intf.local_mac)
88
89     def validate_ns(self, intf, rx, tgt_ip):
90         nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
91         dst_ip = inet_ntop(AF_INET6, nsma)
92
93         # NS is broadcast
94         self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
95
96         # and from the router's MAC
97         self.assertEqual(rx[Ether].src, intf.local_mac)
98
99         # the rx'd NS should be addressed to an mcast address
100         # derived from the target address
101         self.assertEqual(
102             in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
103
104         # expect the tgt IP in the NS header
105         ns = rx[ICMPv6ND_NS]
106         self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
107
108         # packet is from the router's local address
109         self.assertEqual(
110             in6_ptop(rx[IPv6].src), intf.local_ip6)
111
112         # Src link-layer options should have the router's MAC
113         sll = rx[ICMPv6NDOptSrcLLAddr]
114         self.assertEqual(sll.lladdr, intf.local_mac)
115
116     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
117                            filter_out_fn=is_ipv6_misc):
118         intf.add_stream(pkts)
119         self.pg_enable_capture(self.pg_interfaces)
120         self.pg_start()
121         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
122
123         self.assertEqual(len(rx), 1)
124         rx = rx[0]
125         self.validate_ra(intf, rx, dst_ip)
126
127     def send_and_expect_na(self, intf, pkts, remark, dst_ip=None,
128                            tgt_ip=None,
129                            filter_out_fn=is_ipv6_misc):
130         intf.add_stream(pkts)
131         self.pg_enable_capture(self.pg_interfaces)
132         self.pg_start()
133         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
134
135         self.assertEqual(len(rx), 1)
136         rx = rx[0]
137         self.validate_na(intf, rx, dst_ip, tgt_ip)
138
139     def send_and_expect_ns(self, tx_intf, rx_intf, pkts, tgt_ip,
140                            filter_out_fn=is_ipv6_misc):
141         tx_intf.add_stream(pkts)
142         self.pg_enable_capture(self.pg_interfaces)
143         self.pg_start()
144         rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
145
146         self.assertEqual(len(rx), 1)
147         rx = rx[0]
148         self.validate_ns(rx_intf, rx, tgt_ip)
149
150     def verify_ip(self, rx, smac, dmac, sip, dip):
151         ether = rx[Ether]
152         self.assertEqual(ether.dst, dmac)
153         self.assertEqual(ether.src, smac)
154
155         ip = rx[IPv6]
156         self.assertEqual(ip.src, sip)
157         self.assertEqual(ip.dst, dip)
158
159
160 class TestIPv6(TestIPv6ND):
161     """ IPv6 Test Case """
162
163     @classmethod
164     def setUpClass(cls):
165         super(TestIPv6, cls).setUpClass()
166
167     @classmethod
168     def tearDownClass(cls):
169         super(TestIPv6, cls).tearDownClass()
170
171     def setUp(self):
172         """
173         Perform test setup before test case.
174
175         **Config:**
176             - create 3 pg interfaces
177                 - untagged pg0 interface
178                 - Dot1Q subinterface on pg1
179                 - Dot1AD subinterface on pg2
180             - setup interfaces:
181                 - put it into UP state
182                 - set IPv6 addresses
183                 - resolve neighbor address using NDP
184             - configure 200 fib entries
185
186         :ivar list interfaces: pg interfaces and subinterfaces.
187         :ivar dict flows: IPv4 packet flows in test.
188
189         *TODO:* Create AD sub interface
190         """
191         super(TestIPv6, self).setUp()
192
193         # create 3 pg interfaces
194         self.create_pg_interfaces(range(3))
195
196         # create 2 subinterfaces for p1 and pg2
197         self.sub_interfaces = [
198             VppDot1QSubint(self, self.pg1, 100),
199             VppDot1QSubint(self, self.pg2, 200)
200             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
201         ]
202
203         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
204         self.flows = dict()
205         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
206         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
207         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
208
209         # packet sizes
210         self.pg_if_packet_sizes = [64, 1500, 9020]
211
212         self.interfaces = list(self.pg_interfaces)
213         self.interfaces.extend(self.sub_interfaces)
214
215         # setup all interfaces
216         for i in self.interfaces:
217             i.admin_up()
218             i.config_ip6()
219             i.resolve_ndp()
220
221     def tearDown(self):
222         """Run standard test teardown and log ``show ip6 neighbors``."""
223         for i in self.interfaces:
224             i.unconfig_ip6()
225             i.ip6_disable()
226             i.admin_down()
227         for i in self.sub_interfaces:
228             i.remove_vpp_config()
229
230         super(TestIPv6, self).tearDown()
231         if not self.vpp_dead:
232             self.logger.info(self.vapi.cli("show ip6 neighbors"))
233             # info(self.vapi.cli("show ip6 fib"))  # many entries
234
235     def modify_packet(self, src_if, packet_size, pkt):
236         """Add load, set destination IP and extend packet to required packet
237         size for defined interface.
238
239         :param VppInterface src_if: Interface to create packet for.
240         :param int packet_size: Required packet size.
241         :param Scapy pkt: Packet to be modified.
242         """
243         dst_if_idx = int(packet_size / 10 % 2)
244         dst_if = self.flows[src_if][dst_if_idx]
245         info = self.create_packet_info(src_if, dst_if)
246         payload = self.info_to_payload(info)
247         p = pkt / Raw(payload)
248         p[IPv6].dst = dst_if.remote_ip6
249         info.data = p.copy()
250         if isinstance(src_if, VppSubInterface):
251             p = src_if.add_dot1_layer(p)
252         self.extend_packet(p, packet_size)
253
254         return p
255
256     def create_stream(self, src_if):
257         """Create input packet stream for defined interface.
258
259         :param VppInterface src_if: Interface to create packet stream for.
260         """
261         hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
262         pkt_tmpl = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
263                     IPv6(src=src_if.remote_ip6) /
264                     inet6.UDP(sport=1234, dport=1234))
265
266         pkts = [self.modify_packet(src_if, i, pkt_tmpl)
267                 for i in moves.range(self.pg_if_packet_sizes[0],
268                                      self.pg_if_packet_sizes[1], 10)]
269         pkts_b = [self.modify_packet(src_if, i, pkt_tmpl)
270                   for i in moves.range(self.pg_if_packet_sizes[1] + hdr_ext,
271                                        self.pg_if_packet_sizes[2] + hdr_ext,
272                                        50)]
273         pkts.extend(pkts_b)
274
275         return pkts
276
277     def verify_capture(self, dst_if, capture):
278         """Verify captured input packet stream for defined interface.
279
280         :param VppInterface dst_if: Interface to verify captured packet stream
281                                     for.
282         :param list capture: Captured packet stream.
283         """
284         self.logger.info("Verifying capture on interface %s" % dst_if.name)
285         last_info = dict()
286         for i in self.interfaces:
287             last_info[i.sw_if_index] = None
288         is_sub_if = False
289         dst_sw_if_index = dst_if.sw_if_index
290         if hasattr(dst_if, 'parent'):
291             is_sub_if = True
292         for packet in capture:
293             if is_sub_if:
294                 # Check VLAN tags and Ethernet header
295                 packet = dst_if.remove_dot1_layer(packet)
296             self.assertTrue(Dot1Q not in packet)
297             try:
298                 ip = packet[IPv6]
299                 udp = packet[inet6.UDP]
300                 payload_info = self.payload_to_info(packet[Raw])
301                 packet_index = payload_info.index
302                 self.assertEqual(payload_info.dst, dst_sw_if_index)
303                 self.logger.debug(
304                     "Got packet on port %s: src=%u (id=%u)" %
305                     (dst_if.name, payload_info.src, packet_index))
306                 next_info = self.get_next_packet_info_for_interface2(
307                     payload_info.src, dst_sw_if_index,
308                     last_info[payload_info.src])
309                 last_info[payload_info.src] = next_info
310                 self.assertTrue(next_info is not None)
311                 self.assertEqual(packet_index, next_info.index)
312                 saved_packet = next_info.data
313                 # Check standard fields
314                 self.assertEqual(
315                     ip.src, saved_packet[IPv6].src)
316                 self.assertEqual(
317                     ip.dst, saved_packet[IPv6].dst)
318                 self.assertEqual(
319                     udp.sport, saved_packet[inet6.UDP].sport)
320                 self.assertEqual(
321                     udp.dport, saved_packet[inet6.UDP].dport)
322             except:
323                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
324                 raise
325         for i in self.interfaces:
326             remaining_packet = self.get_next_packet_info_for_interface2(
327                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
328             self.assertTrue(remaining_packet is None,
329                             "Interface %s: Packet expected from interface %s "
330                             "didn't arrive" % (dst_if.name, i.name))
331
332     def test_next_header_anomaly(self):
333         """ IPv6 next header anomaly test
334
335         Test scenario:
336             - ipv6 next header field = Fragment Header (44)
337             - next header is ICMPv6 Echo Request
338             - wait for reassembly
339         """
340         pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
341                IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44) /
342                ICMPv6EchoRequest())
343
344         self.pg0.add_stream(pkt)
345         self.pg_start()
346
347         # wait for reassembly
348         self.sleep(10)
349
350     def test_fib(self):
351         """ IPv6 FIB test
352
353         Test scenario:
354             - Create IPv6 stream for pg0 interface
355             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
356             - Send and verify received packets on each interface.
357         """
358
359         pkts = self.create_stream(self.pg0)
360         self.pg0.add_stream(pkts)
361
362         for i in self.sub_interfaces:
363             pkts = self.create_stream(i)
364             i.parent.add_stream(pkts)
365
366         self.pg_enable_capture(self.pg_interfaces)
367         self.pg_start()
368
369         pkts = self.pg0.get_capture()
370         self.verify_capture(self.pg0, pkts)
371
372         for i in self.sub_interfaces:
373             pkts = i.parent.get_capture()
374             self.verify_capture(i, pkts)
375
376     def test_ns(self):
377         """ IPv6 Neighbour Solicitation Exceptions
378
379         Test scenario:
380            - Send an NS Sourced from an address not covered by the link sub-net
381            - Send an NS to an mcast address the router has not joined
382            - Send NS for a target address the router does not onn.
383         """
384
385         #
386         # An NS from a non link source address
387         #
388         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
389         d = inet_ntop(AF_INET6, nsma)
390
391         p = (Ether(dst=in6_getnsmac(nsma)) /
392              IPv6(dst=d, src="2002::2") /
393              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
394              ICMPv6NDOptSrcLLAddr(
395                  lladdr=self.pg0.remote_mac))
396         pkts = [p]
397
398         self.send_and_assert_no_replies(
399             self.pg0, pkts,
400             "No response to NS source by address not on sub-net")
401
402         #
403         # An NS for sent to a solicited mcast group the router is
404         # not a member of FAILS
405         #
406         if 0:
407             nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
408             d = inet_ntop(AF_INET6, nsma)
409
410             p = (Ether(dst=in6_getnsmac(nsma)) /
411                  IPv6(dst=d, src=self.pg0.remote_ip6) /
412                  ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
413                  ICMPv6NDOptSrcLLAddr(
414                      lladdr=self.pg0.remote_mac))
415             pkts = [p]
416
417             self.send_and_assert_no_replies(
418                 self.pg0, pkts,
419                 "No response to NS sent to unjoined mcast address")
420
421         #
422         # An NS whose target address is one the router does not own
423         #
424         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
425         d = inet_ntop(AF_INET6, nsma)
426
427         p = (Ether(dst=in6_getnsmac(nsma)) /
428              IPv6(dst=d, src=self.pg0.remote_ip6) /
429              ICMPv6ND_NS(tgt="fd::ffff") /
430              ICMPv6NDOptSrcLLAddr(
431                  lladdr=self.pg0.remote_mac))
432         pkts = [p]
433
434         self.send_and_assert_no_replies(self.pg0, pkts,
435                                         "No response to NS for unknown target")
436
437         #
438         # A neighbor entry that has no associated FIB-entry
439         #
440         self.pg0.generate_remote_hosts(4)
441         nd_entry = VppNeighbor(self,
442                                self.pg0.sw_if_index,
443                                self.pg0.remote_hosts[2].mac,
444                                self.pg0.remote_hosts[2].ip6,
445                                is_no_fib_entry=1)
446         nd_entry.add_vpp_config()
447
448         #
449         # check we have the neighbor, but no route
450         #
451         self.assertTrue(find_nbr(self,
452                                  self.pg0.sw_if_index,
453                                  self.pg0._remote_hosts[2].ip6))
454         self.assertFalse(find_route(self,
455                                     self.pg0._remote_hosts[2].ip6,
456                                     128))
457
458         #
459         # send an NS from a link local address to the interface's global
460         # address
461         #
462         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
463              IPv6(
464                  dst=d, src=self.pg0._remote_hosts[2].ip6_ll) /
465              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
466              ICMPv6NDOptSrcLLAddr(
467                  lladdr=self.pg0.remote_mac))
468
469         self.send_and_expect_na(self.pg0, p,
470                                 "NS from link-local",
471                                 dst_ip=self.pg0._remote_hosts[2].ip6_ll,
472                                 tgt_ip=self.pg0.local_ip6)
473
474         #
475         # we should have learned an ND entry for the peer's link-local
476         # but not inserted a route to it in the FIB
477         #
478         self.assertTrue(find_nbr(self,
479                                  self.pg0.sw_if_index,
480                                  self.pg0._remote_hosts[2].ip6_ll))
481         self.assertFalse(find_route(self,
482                                     self.pg0._remote_hosts[2].ip6_ll,
483                                     128))
484
485         #
486         # An NS to the router's own Link-local
487         #
488         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
489              IPv6(
490                  dst=d, src=self.pg0._remote_hosts[3].ip6_ll) /
491              ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll) /
492              ICMPv6NDOptSrcLLAddr(
493                  lladdr=self.pg0.remote_mac))
494
495         self.send_and_expect_na(self.pg0, p,
496                                 "NS to/from link-local",
497                                 dst_ip=self.pg0._remote_hosts[3].ip6_ll,
498                                 tgt_ip=self.pg0.local_ip6_ll)
499
500         #
501         # we should have learned an ND entry for the peer's link-local
502         # but not inserted a route to it in the FIB
503         #
504         self.assertTrue(find_nbr(self,
505                                  self.pg0.sw_if_index,
506                                  self.pg0._remote_hosts[3].ip6_ll))
507         self.assertFalse(find_route(self,
508                                     self.pg0._remote_hosts[3].ip6_ll,
509                                     128))
510
511     def test_ns_duplicates(self):
512         """ ND Duplicates"""
513
514         #
515         # Generate some hosts on the LAN
516         #
517         self.pg1.generate_remote_hosts(3)
518
519         #
520         # Add host 1 on pg1 and pg2
521         #
522         ns_pg1 = VppNeighbor(self,
523                              self.pg1.sw_if_index,
524                              self.pg1.remote_hosts[1].mac,
525                              self.pg1.remote_hosts[1].ip6)
526         ns_pg1.add_vpp_config()
527         ns_pg2 = VppNeighbor(self,
528                              self.pg2.sw_if_index,
529                              self.pg2.remote_mac,
530                              self.pg1.remote_hosts[1].ip6)
531         ns_pg2.add_vpp_config()
532
533         #
534         # IP packet destined for pg1 remote host arrives on pg1 again.
535         #
536         p = (Ether(dst=self.pg0.local_mac,
537                    src=self.pg0.remote_mac) /
538              IPv6(src=self.pg0.remote_ip6,
539                   dst=self.pg1.remote_hosts[1].ip6) /
540              inet6.UDP(sport=1234, dport=1234) /
541              Raw())
542
543         self.pg0.add_stream(p)
544         self.pg_enable_capture(self.pg_interfaces)
545         self.pg_start()
546
547         rx1 = self.pg1.get_capture(1)
548
549         self.verify_ip(rx1[0],
550                        self.pg1.local_mac,
551                        self.pg1.remote_hosts[1].mac,
552                        self.pg0.remote_ip6,
553                        self.pg1.remote_hosts[1].ip6)
554
555         #
556         # remove the duplicate on pg1
557         # packet stream should generate NSs out of pg1
558         #
559         ns_pg1.remove_vpp_config()
560
561         self.send_and_expect_ns(self.pg0, self.pg1,
562                                 p, self.pg1.remote_hosts[1].ip6)
563
564         #
565         # Add it back
566         #
567         ns_pg1.add_vpp_config()
568
569         self.pg0.add_stream(p)
570         self.pg_enable_capture(self.pg_interfaces)
571         self.pg_start()
572
573         rx1 = self.pg1.get_capture(1)
574
575         self.verify_ip(rx1[0],
576                        self.pg1.local_mac,
577                        self.pg1.remote_hosts[1].mac,
578                        self.pg0.remote_ip6,
579                        self.pg1.remote_hosts[1].ip6)
580
581     def validate_ra(self, intf, rx, dst_ip=None, mtu=9000, pi_opt=None):
582         if not dst_ip:
583             dst_ip = intf.remote_ip6
584
585         # unicasted packets must come to the unicast mac
586         self.assertEqual(rx[Ether].dst, intf.remote_mac)
587
588         # and from the router's MAC
589         self.assertEqual(rx[Ether].src, intf.local_mac)
590
591         # the rx'd RA should be addressed to the sender's source
592         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
593         self.assertEqual(in6_ptop(rx[IPv6].dst),
594                          in6_ptop(dst_ip))
595
596         # and come from the router's link local
597         self.assertTrue(in6_islladdr(rx[IPv6].src))
598         self.assertEqual(in6_ptop(rx[IPv6].src),
599                          in6_ptop(mk_ll_addr(intf.local_mac)))
600
601         # it should contain the links MTU
602         ra = rx[ICMPv6ND_RA]
603         self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
604
605         # it should contain the source's link layer address option
606         sll = ra[ICMPv6NDOptSrcLLAddr]
607         self.assertEqual(sll.lladdr, intf.local_mac)
608
609         if not pi_opt:
610             # the RA should not contain prefix information
611             self.assertFalse(ra.haslayer(
612                 ICMPv6NDOptPrefixInfo))
613         else:
614             raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
615
616             # the options are nested in the scapy packet in way that i cannot
617             # decipher how to decode. this 1st layer of option always returns
618             # nested classes, so a direct obj1=obj2 comparison always fails.
619             # however, the getlayer(.., 2) does give one instance.
620             # so we cheat here and construct a new opt instance for comparison
621             rd = ICMPv6NDOptPrefixInfo(
622                 prefixlen=raos.prefixlen,
623                 prefix=raos.prefix,
624                 L=raos.L,
625                 A=raos.A)
626             if type(pi_opt) is list:
627                 for ii in range(len(pi_opt)):
628                     self.assertEqual(pi_opt[ii], rd)
629                     rd = rx.getlayer(
630                         ICMPv6NDOptPrefixInfo, ii + 2)
631             else:
632                 self.assertEqual(pi_opt, raos, 'Expected: %s, received: %s'
633                                  % (pi_opt.show(dump=True),
634                                     raos.show(dump=True)))
635
636     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
637                            filter_out_fn=is_ipv6_misc,
638                            opt=None):
639         intf.add_stream(pkts)
640         self.pg_enable_capture(self.pg_interfaces)
641         self.pg_start()
642         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
643
644         self.assertEqual(len(rx), 1)
645         rx = rx[0]
646         self.validate_ra(intf, rx, dst_ip, pi_opt=opt)
647
648     def test_rs(self):
649         """ IPv6 Router Solicitation Exceptions
650
651         Test scenario:
652         """
653
654         #
655         # Before we begin change the IPv6 RA responses to use the unicast
656         # address - that way we will not confuse them with the periodic
657         # RAs which go to the mcast address
658         # Sit and wait for the first periodic RA.
659         #
660         # TODO
661         #
662         self.pg0.ip6_ra_config(send_unicast=1)
663
664         #
665         # An RS from a link source address
666         #  - expect an RA in return
667         #
668         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
669              IPv6(
670                  dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
671              ICMPv6ND_RS())
672         pkts = [p]
673         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
674
675         #
676         # For the next RS sent the RA should be rate limited
677         #
678         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
679
680         #
681         # When we reconfigure the IPv6 RA config,
682         # we reset the RA rate limiting,
683         # so we need to do this before each test below so as not to drop
684         # packets for rate limiting reasons. Test this works here.
685         #
686         self.pg0.ip6_ra_config(send_unicast=1)
687         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
688
689         #
690         # An RS sent from a non-link local source
691         #
692         self.pg0.ip6_ra_config(send_unicast=1)
693         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
694              IPv6(dst=self.pg0.local_ip6,
695                   src="2002::ffff") /
696              ICMPv6ND_RS())
697         pkts = [p]
698         self.send_and_assert_no_replies(self.pg0, pkts,
699                                         "RS from non-link source")
700
701         #
702         # Source an RS from a link local address
703         #
704         self.pg0.ip6_ra_config(send_unicast=1)
705         ll = mk_ll_addr(self.pg0.remote_mac)
706         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
707              IPv6(dst=self.pg0.local_ip6, src=ll) /
708              ICMPv6ND_RS())
709         pkts = [p]
710         self.send_and_expect_ra(self.pg0, pkts,
711                                 "RS sourced from link-local",
712                                 dst_ip=ll)
713
714         #
715         # Send the RS multicast
716         #
717         self.pg0.ip6_ra_config(send_unicast=1)
718         dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
719         ll = mk_ll_addr(self.pg0.remote_mac)
720         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
721              IPv6(dst="ff02::2", src=ll) /
722              ICMPv6ND_RS())
723         pkts = [p]
724         self.send_and_expect_ra(self.pg0, pkts,
725                                 "RS sourced from link-local",
726                                 dst_ip=ll)
727
728         #
729         # Source from the unspecified address ::. This happens when the RS
730         # is sent before the host has a configured address/sub-net,
731         # i.e. auto-config. Since the sender has no IP address, the reply
732         # comes back mcast - so the capture needs to not filter this.
733         # If we happen to pick up the periodic RA at this point then so be it,
734         # it's not an error.
735         #
736         self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
737         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
738              IPv6(dst="ff02::2", src="::") /
739              ICMPv6ND_RS())
740         pkts = [p]
741         self.send_and_expect_ra(self.pg0, pkts,
742                                 "RS sourced from unspecified",
743                                 dst_ip="ff02::1",
744                                 filter_out_fn=None)
745
746         #
747         # Configure The RA to announce the links prefix
748         #
749         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
750                                self.pg0.local_ip6_prefix_len))
751
752         #
753         # RAs should now contain the prefix information option
754         #
755         opt = ICMPv6NDOptPrefixInfo(
756             prefixlen=self.pg0.local_ip6_prefix_len,
757             prefix=self.pg0.local_ip6,
758             L=1,
759             A=1)
760
761         self.pg0.ip6_ra_config(send_unicast=1)
762         ll = mk_ll_addr(self.pg0.remote_mac)
763         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
764              IPv6(dst=self.pg0.local_ip6, src=ll) /
765              ICMPv6ND_RS())
766         self.send_and_expect_ra(self.pg0, p,
767                                 "RA with prefix-info",
768                                 dst_ip=ll,
769                                 opt=opt)
770
771         #
772         # Change the prefix info to not off-link
773         #  L-flag is clear
774         #
775         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
776                                self.pg0.local_ip6_prefix_len),
777                                off_link=1)
778
779         opt = ICMPv6NDOptPrefixInfo(
780             prefixlen=self.pg0.local_ip6_prefix_len,
781             prefix=self.pg0.local_ip6,
782             L=0,
783             A=1)
784
785         self.pg0.ip6_ra_config(send_unicast=1)
786         self.send_and_expect_ra(self.pg0, p,
787                                 "RA with Prefix info with L-flag=0",
788                                 dst_ip=ll,
789                                 opt=opt)
790
791         #
792         # Change the prefix info to not off-link, no-autoconfig
793         #  L and A flag are clear in the advert
794         #
795         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
796                                self.pg0.local_ip6_prefix_len),
797                                off_link=1,
798                                no_autoconfig=1)
799
800         opt = ICMPv6NDOptPrefixInfo(
801             prefixlen=self.pg0.local_ip6_prefix_len,
802             prefix=self.pg0.local_ip6,
803             L=0,
804             A=0)
805
806         self.pg0.ip6_ra_config(send_unicast=1)
807         self.send_and_expect_ra(self.pg0, p,
808                                 "RA with Prefix info with A & L-flag=0",
809                                 dst_ip=ll,
810                                 opt=opt)
811
812         #
813         # Change the flag settings back to the defaults
814         #  L and A flag are set in the advert
815         #
816         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
817                                self.pg0.local_ip6_prefix_len))
818
819         opt = ICMPv6NDOptPrefixInfo(
820             prefixlen=self.pg0.local_ip6_prefix_len,
821             prefix=self.pg0.local_ip6,
822             L=1,
823             A=1)
824
825         self.pg0.ip6_ra_config(send_unicast=1)
826         self.send_and_expect_ra(self.pg0, p,
827                                 "RA with Prefix info",
828                                 dst_ip=ll,
829                                 opt=opt)
830
831         #
832         # Change the prefix info to not off-link, no-autoconfig
833         #  L and A flag are clear in the advert
834         #
835         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
836                                self.pg0.local_ip6_prefix_len),
837                                off_link=1,
838                                no_autoconfig=1)
839
840         opt = ICMPv6NDOptPrefixInfo(
841             prefixlen=self.pg0.local_ip6_prefix_len,
842             prefix=self.pg0.local_ip6,
843             L=0,
844             A=0)
845
846         self.pg0.ip6_ra_config(send_unicast=1)
847         self.send_and_expect_ra(self.pg0, p,
848                                 "RA with Prefix info with A & L-flag=0",
849                                 dst_ip=ll,
850                                 opt=opt)
851
852         #
853         # Use the reset to defaults option to revert to defaults
854         #  L and A flag are clear in the advert
855         #
856         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
857                                self.pg0.local_ip6_prefix_len),
858                                use_default=1)
859
860         opt = ICMPv6NDOptPrefixInfo(
861             prefixlen=self.pg0.local_ip6_prefix_len,
862             prefix=self.pg0.local_ip6,
863             L=1,
864             A=1)
865
866         self.pg0.ip6_ra_config(send_unicast=1)
867         self.send_and_expect_ra(self.pg0, p,
868                                 "RA with Prefix reverted to defaults",
869                                 dst_ip=ll,
870                                 opt=opt)
871
872         #
873         # Advertise Another prefix. With no L-flag/A-flag
874         #
875         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
876                                self.pg1.local_ip6_prefix_len),
877                                off_link=1,
878                                no_autoconfig=1)
879
880         opt = [ICMPv6NDOptPrefixInfo(
881             prefixlen=self.pg0.local_ip6_prefix_len,
882             prefix=self.pg0.local_ip6,
883             L=1,
884             A=1),
885             ICMPv6NDOptPrefixInfo(
886                 prefixlen=self.pg1.local_ip6_prefix_len,
887                 prefix=self.pg1.local_ip6,
888                 L=0,
889                 A=0)]
890
891         self.pg0.ip6_ra_config(send_unicast=1)
892         ll = mk_ll_addr(self.pg0.remote_mac)
893         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
894              IPv6(dst=self.pg0.local_ip6, src=ll) /
895              ICMPv6ND_RS())
896         self.send_and_expect_ra(self.pg0, p,
897                                 "RA with multiple Prefix infos",
898                                 dst_ip=ll,
899                                 opt=opt)
900
901         #
902         # Remove the first prefix-info - expect the second is still in the
903         # advert
904         #
905         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
906                                self.pg0.local_ip6_prefix_len),
907                                is_no=1)
908
909         opt = ICMPv6NDOptPrefixInfo(
910             prefixlen=self.pg1.local_ip6_prefix_len,
911             prefix=self.pg1.local_ip6,
912             L=0,
913             A=0)
914
915         self.pg0.ip6_ra_config(send_unicast=1)
916         self.send_and_expect_ra(self.pg0, p,
917                                 "RA with Prefix reverted to defaults",
918                                 dst_ip=ll,
919                                 opt=opt)
920
921         #
922         # Remove the second prefix-info - expect no prefix-info in the adverts
923         #
924         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
925                                self.pg1.local_ip6_prefix_len),
926                                is_no=1)
927
928         self.pg0.ip6_ra_config(send_unicast=1)
929         self.send_and_expect_ra(self.pg0, p,
930                                 "RA with Prefix reverted to defaults",
931                                 dst_ip=ll)
932
933         #
934         # Reset the periodic advertisements back to default values
935         #
936         self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
937
938
939 class TestIPv6IfAddrRoute(VppTestCase):
940     """ IPv6 Interface Addr Route Test Case """
941
942     @classmethod
943     def setUpClass(cls):
944         super(TestIPv6IfAddrRoute, cls).setUpClass()
945
946     @classmethod
947     def tearDownClass(cls):
948         super(TestIPv6IfAddrRoute, cls).tearDownClass()
949
950     def setUp(self):
951         super(TestIPv6IfAddrRoute, self).setUp()
952
953         # create 1 pg interface
954         self.create_pg_interfaces(range(1))
955
956         for i in self.pg_interfaces:
957             i.admin_up()
958             i.config_ip6()
959             i.resolve_ndp()
960
961     def tearDown(self):
962         super(TestIPv6IfAddrRoute, self).tearDown()
963         for i in self.pg_interfaces:
964             i.unconfig_ip6()
965             i.admin_down()
966
967     def test_ipv6_ifaddrs_same_prefix(self):
968         """ IPv6 Interface Addresses Same Prefix test
969
970         Test scenario:
971
972             - Verify no route in FIB for prefix 2001:10::/64
973             - Configure IPv4 address 2001:10::10/64  on an interface
974             - Verify route in FIB for prefix 2001:10::/64
975             - Configure IPv4 address 2001:10::20/64 on an interface
976             - Delete 2001:10::10/64 from interface
977             - Verify route in FIB for prefix 2001:10::/64
978             - Delete 2001:10::20/64 from interface
979             - Verify no route in FIB for prefix 2001:10::/64
980         """
981
982         addr1 = "2001:10::10"
983         addr2 = "2001:10::20"
984
985         if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
986         if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
987         self.assertFalse(if_addr1.query_vpp_config())
988         self.assertFalse(find_route(self, addr1, 128))
989         self.assertFalse(find_route(self, addr2, 128))
990
991         # configure first address, verify route present
992         if_addr1.add_vpp_config()
993         self.assertTrue(if_addr1.query_vpp_config())
994         self.assertTrue(find_route(self, addr1, 128))
995         self.assertFalse(find_route(self, addr2, 128))
996
997         # configure second address, delete first, verify route not removed
998         if_addr2.add_vpp_config()
999         if_addr1.remove_vpp_config()
1000         self.assertFalse(if_addr1.query_vpp_config())
1001         self.assertTrue(if_addr2.query_vpp_config())
1002         self.assertFalse(find_route(self, addr1, 128))
1003         self.assertTrue(find_route(self, addr2, 128))
1004
1005         # delete second address, verify route removed
1006         if_addr2.remove_vpp_config()
1007         self.assertFalse(if_addr1.query_vpp_config())
1008         self.assertFalse(find_route(self, addr1, 128))
1009         self.assertFalse(find_route(self, addr2, 128))
1010
1011
1012 class TestICMPv6Echo(VppTestCase):
1013     """ ICMPv6 Echo Test Case """
1014
1015     @classmethod
1016     def setUpClass(cls):
1017         super(TestICMPv6Echo, cls).setUpClass()
1018
1019     @classmethod
1020     def tearDownClass(cls):
1021         super(TestICMPv6Echo, cls).tearDownClass()
1022
1023     def setUp(self):
1024         super(TestICMPv6Echo, self).setUp()
1025
1026         # create 1 pg interface
1027         self.create_pg_interfaces(range(1))
1028
1029         for i in self.pg_interfaces:
1030             i.admin_up()
1031             i.config_ip6()
1032             i.resolve_ndp()
1033
1034     def tearDown(self):
1035         super(TestICMPv6Echo, self).tearDown()
1036         for i in self.pg_interfaces:
1037             i.unconfig_ip6()
1038             i.ip6_disable()
1039             i.admin_down()
1040
1041     def test_icmpv6_echo(self):
1042         """ VPP replies to ICMPv6 Echo Request
1043
1044         Test scenario:
1045
1046             - Receive ICMPv6 Echo Request message on pg0 interface.
1047             - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
1048         """
1049
1050         icmpv6_id = 0xb
1051         icmpv6_seq = 5
1052         icmpv6_data = b'\x0a' * 18
1053         p_echo_request = (Ether(src=self.pg0.remote_mac,
1054                                 dst=self.pg0.local_mac) /
1055                           IPv6(src=self.pg0.remote_ip6,
1056                                dst=self.pg0.local_ip6) /
1057                           ICMPv6EchoRequest(
1058                               id=icmpv6_id,
1059                               seq=icmpv6_seq,
1060                               data=icmpv6_data))
1061
1062         self.pg0.add_stream(p_echo_request)
1063         self.pg_enable_capture(self.pg_interfaces)
1064         self.pg_start()
1065
1066         rx = self.pg0.get_capture(1)
1067         rx = rx[0]
1068         ether = rx[Ether]
1069         ipv6 = rx[IPv6]
1070         icmpv6 = rx[ICMPv6EchoReply]
1071
1072         self.assertEqual(ether.src, self.pg0.local_mac)
1073         self.assertEqual(ether.dst, self.pg0.remote_mac)
1074
1075         self.assertEqual(ipv6.src, self.pg0.local_ip6)
1076         self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
1077
1078         self.assertEqual(
1079             icmp6types[icmpv6.type], "Echo Reply")
1080         self.assertEqual(icmpv6.id, icmpv6_id)
1081         self.assertEqual(icmpv6.seq, icmpv6_seq)
1082         self.assertEqual(icmpv6.data, icmpv6_data)
1083
1084
1085 class TestIPv6RD(TestIPv6ND):
1086     """ IPv6 Router Discovery Test Case """
1087
1088     @classmethod
1089     def setUpClass(cls):
1090         super(TestIPv6RD, cls).setUpClass()
1091
1092     @classmethod
1093     def tearDownClass(cls):
1094         super(TestIPv6RD, cls).tearDownClass()
1095
1096     def setUp(self):
1097         super(TestIPv6RD, self).setUp()
1098
1099         # create 2 pg interfaces
1100         self.create_pg_interfaces(range(2))
1101
1102         self.interfaces = list(self.pg_interfaces)
1103
1104         # setup all interfaces
1105         for i in self.interfaces:
1106             i.admin_up()
1107             i.config_ip6()
1108
1109     def tearDown(self):
1110         for i in self.interfaces:
1111             i.unconfig_ip6()
1112             i.admin_down()
1113         super(TestIPv6RD, self).tearDown()
1114
1115     def test_rd_send_router_solicitation(self):
1116         """ Verify router solicitation packets """
1117
1118         count = 2
1119         self.pg_enable_capture(self.pg_interfaces)
1120         self.pg_start()
1121         self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index,
1122                                                  mrc=count)
1123         rx_list = self.pg1.get_capture(count, timeout=3)
1124         self.assertEqual(len(rx_list), count)
1125         for packet in rx_list:
1126             self.assertEqual(packet.haslayer(IPv6), 1)
1127             self.assertEqual(packet[IPv6].haslayer(
1128                 ICMPv6ND_RS), 1)
1129             dst = ip6_normalize(packet[IPv6].dst)
1130             dst2 = ip6_normalize("ff02::2")
1131             self.assert_equal(dst, dst2)
1132             src = ip6_normalize(packet[IPv6].src)
1133             src2 = ip6_normalize(self.pg1.local_ip6_ll)
1134             self.assert_equal(src, src2)
1135             self.assertTrue(
1136                 bool(packet[ICMPv6ND_RS].haslayer(
1137                     ICMPv6NDOptSrcLLAddr)))
1138             self.assert_equal(
1139                 packet[ICMPv6NDOptSrcLLAddr].lladdr,
1140                 self.pg1.local_mac)
1141
1142     def verify_prefix_info(self, reported_prefix, prefix_option):
1143         prefix = IPv6Network(
1144             text_type(prefix_option.getfieldval("prefix") +
1145                       "/" +
1146                       text_type(prefix_option.getfieldval("prefixlen"))),
1147             strict=False)
1148         self.assert_equal(reported_prefix.prefix.network_address,
1149                           prefix.network_address)
1150         L = prefix_option.getfieldval("L")
1151         A = prefix_option.getfieldval("A")
1152         option_flags = (L << 7) | (A << 6)
1153         self.assert_equal(reported_prefix.flags, option_flags)
1154         self.assert_equal(reported_prefix.valid_time,
1155                           prefix_option.getfieldval("validlifetime"))
1156         self.assert_equal(reported_prefix.preferred_time,
1157                           prefix_option.getfieldval("preferredlifetime"))
1158
1159     def test_rd_receive_router_advertisement(self):
1160         """ Verify events triggered by received RA packets """
1161
1162         self.vapi.want_ip6_ra_events()
1163
1164         prefix_info_1 = ICMPv6NDOptPrefixInfo(
1165             prefix="1::2",
1166             prefixlen=50,
1167             validlifetime=200,
1168             preferredlifetime=500,
1169             L=1,
1170             A=1,
1171         )
1172
1173         prefix_info_2 = ICMPv6NDOptPrefixInfo(
1174             prefix="7::4",
1175             prefixlen=20,
1176             validlifetime=70,
1177             preferredlifetime=1000,
1178             L=1,
1179             A=0,
1180         )
1181
1182         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1183              IPv6(dst=self.pg1.local_ip6_ll,
1184                   src=mk_ll_addr(self.pg1.remote_mac)) /
1185              ICMPv6ND_RA() /
1186              prefix_info_1 /
1187              prefix_info_2)
1188         self.pg1.add_stream([p])
1189         self.pg_start()
1190
1191         ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1192
1193         self.assert_equal(ev.current_hop_limit, 0)
1194         self.assert_equal(ev.flags, 8)
1195         self.assert_equal(ev.router_lifetime_in_sec, 1800)
1196         self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1197         self.assert_equal(
1198             ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0)
1199
1200         self.assert_equal(ev.n_prefixes, 2)
1201
1202         self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1203         self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1204
1205
1206 class TestIPv6RDControlPlane(TestIPv6ND):
1207     """ IPv6 Router Discovery Control Plane Test Case """
1208
1209     @classmethod
1210     def setUpClass(cls):
1211         super(TestIPv6RDControlPlane, cls).setUpClass()
1212
1213     @classmethod
1214     def tearDownClass(cls):
1215         super(TestIPv6RDControlPlane, cls).tearDownClass()
1216
1217     def setUp(self):
1218         super(TestIPv6RDControlPlane, self).setUp()
1219
1220         # create 1 pg interface
1221         self.create_pg_interfaces(range(1))
1222
1223         self.interfaces = list(self.pg_interfaces)
1224
1225         # setup all interfaces
1226         for i in self.interfaces:
1227             i.admin_up()
1228             i.config_ip6()
1229
1230     def tearDown(self):
1231         super(TestIPv6RDControlPlane, self).tearDown()
1232
1233     @staticmethod
1234     def create_ra_packet(pg, routerlifetime=None):
1235         src_ip = pg.remote_ip6_ll
1236         dst_ip = pg.local_ip6
1237         if routerlifetime is not None:
1238             ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1239         else:
1240             ra = ICMPv6ND_RA()
1241         p = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
1242              IPv6(dst=dst_ip, src=src_ip) / ra)
1243         return p
1244
1245     @staticmethod
1246     def get_default_routes(fib):
1247         list = []
1248         for entry in fib:
1249             if entry.route.prefix.prefixlen == 0:
1250                 for path in entry.route.paths:
1251                     if path.sw_if_index != 0xFFFFFFFF:
1252                         defaut_route = {}
1253                         defaut_route['sw_if_index'] = path.sw_if_index
1254                         defaut_route['next_hop'] = path.nh.address.ip6
1255                         list.append(defaut_route)
1256         return list
1257
1258     @staticmethod
1259     def get_interface_addresses(fib, pg):
1260         list = []
1261         for entry in fib:
1262             if entry.route.prefix.prefixlen == 128:
1263                 path = entry.route.paths[0]
1264                 if path.sw_if_index == pg.sw_if_index:
1265                     list.append(str(entry.route.prefix.network_address))
1266         return list
1267
1268     def test_all(self):
1269         """ Test handling of SLAAC addresses and default routes """
1270
1271         fib = self.vapi.ip_route_dump(0, True)
1272         default_routes = self.get_default_routes(fib)
1273         initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1274         self.assertEqual(default_routes, [])
1275         router_address = IPv6Address(text_type(self.pg0.remote_ip6_ll))
1276
1277         self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1278
1279         self.sleep(0.1)
1280
1281         # send RA
1282         packet = (self.create_ra_packet(
1283             self.pg0) / ICMPv6NDOptPrefixInfo(
1284             prefix="1::",
1285             prefixlen=64,
1286             validlifetime=2,
1287             preferredlifetime=2,
1288             L=1,
1289             A=1,
1290         ) / ICMPv6NDOptPrefixInfo(
1291             prefix="7::",
1292             prefixlen=20,
1293             validlifetime=1500,
1294             preferredlifetime=1000,
1295             L=1,
1296             A=0,
1297         ))
1298         self.pg0.add_stream([packet])
1299         self.pg_start()
1300
1301         self.sleep_on_vpp_time(0.1)
1302
1303         fib = self.vapi.ip_route_dump(0, True)
1304
1305         # check FIB for new address
1306         addresses = set(self.get_interface_addresses(fib, self.pg0))
1307         new_addresses = addresses.difference(initial_addresses)
1308         self.assertEqual(len(new_addresses), 1)
1309         prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
1310                              strict=False)
1311         self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
1312
1313         # check FIB for new default route
1314         default_routes = self.get_default_routes(fib)
1315         self.assertEqual(len(default_routes), 1)
1316         dr = default_routes[0]
1317         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1318         self.assertEqual(dr['next_hop'], router_address)
1319
1320         # send RA to delete default route
1321         packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1322         self.pg0.add_stream([packet])
1323         self.pg_start()
1324
1325         self.sleep_on_vpp_time(0.1)
1326
1327         # check that default route is deleted
1328         fib = self.vapi.ip_route_dump(0, True)
1329         default_routes = self.get_default_routes(fib)
1330         self.assertEqual(len(default_routes), 0)
1331
1332         self.sleep_on_vpp_time(0.1)
1333
1334         # send RA
1335         packet = self.create_ra_packet(self.pg0)
1336         self.pg0.add_stream([packet])
1337         self.pg_start()
1338
1339         self.sleep_on_vpp_time(0.1)
1340
1341         # check FIB for new default route
1342         fib = self.vapi.ip_route_dump(0, True)
1343         default_routes = self.get_default_routes(fib)
1344         self.assertEqual(len(default_routes), 1)
1345         dr = default_routes[0]
1346         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1347         self.assertEqual(dr['next_hop'], router_address)
1348
1349         # send RA, updating router lifetime to 1s
1350         packet = self.create_ra_packet(self.pg0, 1)
1351         self.pg0.add_stream([packet])
1352         self.pg_start()
1353
1354         self.sleep_on_vpp_time(0.1)
1355
1356         # check that default route still exists
1357         fib = self.vapi.ip_route_dump(0, True)
1358         default_routes = self.get_default_routes(fib)
1359         self.assertEqual(len(default_routes), 1)
1360         dr = default_routes[0]
1361         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1362         self.assertEqual(dr['next_hop'], router_address)
1363
1364         self.sleep_on_vpp_time(1)
1365
1366         # check that default route is deleted
1367         fib = self.vapi.ip_route_dump(0, True)
1368         default_routes = self.get_default_routes(fib)
1369         self.assertEqual(len(default_routes), 0)
1370
1371         # check FIB still contains the SLAAC address
1372         addresses = set(self.get_interface_addresses(fib, self.pg0))
1373         new_addresses = addresses.difference(initial_addresses)
1374
1375         self.assertEqual(len(new_addresses), 1)
1376         prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
1377                              strict=False)
1378         self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
1379
1380         self.sleep_on_vpp_time(1)
1381
1382         # check that SLAAC address is deleted
1383         fib = self.vapi.ip_route_dump(0, True)
1384         addresses = set(self.get_interface_addresses(fib, self.pg0))
1385         new_addresses = addresses.difference(initial_addresses)
1386         self.assertEqual(len(new_addresses), 0)
1387
1388
1389 class IPv6NDProxyTest(TestIPv6ND):
1390     """ IPv6 ND ProxyTest Case """
1391
1392     @classmethod
1393     def setUpClass(cls):
1394         super(IPv6NDProxyTest, cls).setUpClass()
1395
1396     @classmethod
1397     def tearDownClass(cls):
1398         super(IPv6NDProxyTest, cls).tearDownClass()
1399
1400     def setUp(self):
1401         super(IPv6NDProxyTest, self).setUp()
1402
1403         # create 3 pg interfaces
1404         self.create_pg_interfaces(range(3))
1405
1406         # pg0 is the master interface, with the configured subnet
1407         self.pg0.admin_up()
1408         self.pg0.config_ip6()
1409         self.pg0.resolve_ndp()
1410
1411         self.pg1.ip6_enable()
1412         self.pg2.ip6_enable()
1413
1414     def tearDown(self):
1415         super(IPv6NDProxyTest, self).tearDown()
1416
1417     def test_nd_proxy(self):
1418         """ IPv6 Proxy ND """
1419
1420         #
1421         # Generate some hosts in the subnet that we are proxying
1422         #
1423         self.pg0.generate_remote_hosts(8)
1424
1425         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1426         d = inet_ntop(AF_INET6, nsma)
1427
1428         #
1429         # Send an NS for one of those remote hosts on one of the proxy links
1430         # expect no response since it's from an address that is not
1431         # on the link that has the prefix configured
1432         #
1433         ns_pg1 = (Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac) /
1434                   IPv6(dst=d,
1435                        src=self.pg0._remote_hosts[2].ip6) /
1436                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1437                   ICMPv6NDOptSrcLLAddr(
1438                       lladdr=self.pg0._remote_hosts[2].mac))
1439
1440         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1441
1442         #
1443         # Add proxy support for the host
1444         #
1445         self.vapi.ip6nd_proxy_add_del(
1446             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1447             sw_if_index=self.pg1.sw_if_index)
1448
1449         #
1450         # try that NS again. this time we expect an NA back
1451         #
1452         self.send_and_expect_na(self.pg1, ns_pg1,
1453                                 "NS to proxy entry",
1454                                 dst_ip=self.pg0._remote_hosts[2].ip6,
1455                                 tgt_ip=self.pg0.local_ip6)
1456
1457         #
1458         # ... and that we have an entry in the ND cache
1459         #
1460         self.assertTrue(find_nbr(self,
1461                                  self.pg1.sw_if_index,
1462                                  self.pg0._remote_hosts[2].ip6))
1463
1464         #
1465         # ... and we can route traffic to it
1466         #
1467         t = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1468              IPv6(dst=self.pg0._remote_hosts[2].ip6,
1469                   src=self.pg0.remote_ip6) /
1470              inet6.UDP(sport=10000, dport=20000) /
1471              Raw(b'\xa5' * 100))
1472
1473         self.pg0.add_stream(t)
1474         self.pg_enable_capture(self.pg_interfaces)
1475         self.pg_start()
1476         rx = self.pg1.get_capture(1)
1477         rx = rx[0]
1478
1479         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1480         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1481
1482         self.assertEqual(rx[IPv6].src,
1483                          t[IPv6].src)
1484         self.assertEqual(rx[IPv6].dst,
1485                          t[IPv6].dst)
1486
1487         #
1488         # Test we proxy for the host on the main interface
1489         #
1490         ns_pg0 = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
1491                   IPv6(dst=d, src=self.pg0.remote_ip6) /
1492                   ICMPv6ND_NS(
1493                       tgt=self.pg0._remote_hosts[2].ip6) /
1494                   ICMPv6NDOptSrcLLAddr(
1495                       lladdr=self.pg0.remote_mac))
1496
1497         self.send_and_expect_na(self.pg0, ns_pg0,
1498                                 "NS to proxy entry on main",
1499                                 tgt_ip=self.pg0._remote_hosts[2].ip6,
1500                                 dst_ip=self.pg0.remote_ip6)
1501
1502         #
1503         # Setup and resolve proxy for another host on another interface
1504         #
1505         ns_pg2 = (Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac) /
1506                   IPv6(dst=d,
1507                        src=self.pg0._remote_hosts[3].ip6) /
1508                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1509                   ICMPv6NDOptSrcLLAddr(
1510                       lladdr=self.pg0._remote_hosts[2].mac))
1511
1512         self.vapi.ip6nd_proxy_add_del(
1513             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1514             sw_if_index=self.pg2.sw_if_index)
1515
1516         self.send_and_expect_na(self.pg2, ns_pg2,
1517                                 "NS to proxy entry other interface",
1518                                 dst_ip=self.pg0._remote_hosts[3].ip6,
1519                                 tgt_ip=self.pg0.local_ip6)
1520
1521         self.assertTrue(find_nbr(self,
1522                                  self.pg2.sw_if_index,
1523                                  self.pg0._remote_hosts[3].ip6))
1524
1525         #
1526         # hosts can communicate. pg2->pg1
1527         #
1528         t2 = (Ether(dst=self.pg2.local_mac,
1529                     src=self.pg0.remote_hosts[3].mac) /
1530               IPv6(dst=self.pg0._remote_hosts[2].ip6,
1531                    src=self.pg0._remote_hosts[3].ip6) /
1532               inet6.UDP(sport=10000, dport=20000) /
1533               Raw(b'\xa5' * 100))
1534
1535         self.pg2.add_stream(t2)
1536         self.pg_enable_capture(self.pg_interfaces)
1537         self.pg_start()
1538         rx = self.pg1.get_capture(1)
1539         rx = rx[0]
1540
1541         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1542         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1543
1544         self.assertEqual(rx[IPv6].src,
1545                          t2[IPv6].src)
1546         self.assertEqual(rx[IPv6].dst,
1547                          t2[IPv6].dst)
1548
1549         #
1550         # remove the proxy configs
1551         #
1552         self.vapi.ip6nd_proxy_add_del(
1553             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1554             sw_if_index=self.pg1.sw_if_index, is_del=1)
1555         self.vapi.ip6nd_proxy_add_del(
1556             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1557             sw_if_index=self.pg2.sw_if_index, is_del=1)
1558
1559         self.assertFalse(find_nbr(self,
1560                                   self.pg2.sw_if_index,
1561                                   self.pg0._remote_hosts[3].ip6))
1562         self.assertFalse(find_nbr(self,
1563                                   self.pg1.sw_if_index,
1564                                   self.pg0._remote_hosts[2].ip6))
1565
1566         #
1567         # no longer proxy-ing...
1568         #
1569         self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1570         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1571         self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1572
1573         #
1574         # no longer forwarding. traffic generates NS out of the glean/main
1575         # interface
1576         #
1577         self.pg2.add_stream(t2)
1578         self.pg_enable_capture(self.pg_interfaces)
1579         self.pg_start()
1580
1581         rx = self.pg0.get_capture(1)
1582
1583         self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1584
1585
1586 class TestIPNull(VppTestCase):
1587     """ IPv6 routes via NULL """
1588
1589     @classmethod
1590     def setUpClass(cls):
1591         super(TestIPNull, cls).setUpClass()
1592
1593     @classmethod
1594     def tearDownClass(cls):
1595         super(TestIPNull, cls).tearDownClass()
1596
1597     def setUp(self):
1598         super(TestIPNull, self).setUp()
1599
1600         # create 2 pg interfaces
1601         self.create_pg_interfaces(range(1))
1602
1603         for i in self.pg_interfaces:
1604             i.admin_up()
1605             i.config_ip6()
1606             i.resolve_ndp()
1607
1608     def tearDown(self):
1609         super(TestIPNull, self).tearDown()
1610         for i in self.pg_interfaces:
1611             i.unconfig_ip6()
1612             i.admin_down()
1613
1614     def test_ip_null(self):
1615         """ IP NULL route """
1616
1617         p = (Ether(src=self.pg0.remote_mac,
1618                    dst=self.pg0.local_mac) /
1619              IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
1620              inet6.UDP(sport=1234, dport=1234) /
1621              Raw(b'\xa5' * 100))
1622
1623         #
1624         # A route via IP NULL that will reply with ICMP unreachables
1625         #
1626         ip_unreach = VppIpRoute(
1627             self, "2001::", 64,
1628             [VppRoutePath("::", 0xffffffff,
1629                           type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH)])
1630         ip_unreach.add_vpp_config()
1631
1632         self.pg0.add_stream(p)
1633         self.pg_enable_capture(self.pg_interfaces)
1634         self.pg_start()
1635
1636         rx = self.pg0.get_capture(1)
1637         rx = rx[0]
1638         icmp = rx[ICMPv6DestUnreach]
1639
1640         # 0 = "No route to destination"
1641         self.assertEqual(icmp.code, 0)
1642
1643         # ICMP is rate limited. pause a bit
1644         self.sleep(1)
1645
1646         #
1647         # A route via IP NULL that will reply with ICMP prohibited
1648         #
1649         ip_prohibit = VppIpRoute(
1650             self, "2001::1", 128,
1651             [VppRoutePath("::", 0xffffffff,
1652                           type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT)])
1653         ip_prohibit.add_vpp_config()
1654
1655         self.pg0.add_stream(p)
1656         self.pg_enable_capture(self.pg_interfaces)
1657         self.pg_start()
1658
1659         rx = self.pg0.get_capture(1)
1660         rx = rx[0]
1661         icmp = rx[ICMPv6DestUnreach]
1662
1663         # 1 = "Communication with destination administratively prohibited"
1664         self.assertEqual(icmp.code, 1)
1665
1666
1667 class TestIPDisabled(VppTestCase):
1668     """ IPv6 disabled """
1669
1670     @classmethod
1671     def setUpClass(cls):
1672         super(TestIPDisabled, cls).setUpClass()
1673
1674     @classmethod
1675     def tearDownClass(cls):
1676         super(TestIPDisabled, cls).tearDownClass()
1677
1678     def setUp(self):
1679         super(TestIPDisabled, self).setUp()
1680
1681         # create 2 pg interfaces
1682         self.create_pg_interfaces(range(2))
1683
1684         # PG0 is IP enabled
1685         self.pg0.admin_up()
1686         self.pg0.config_ip6()
1687         self.pg0.resolve_ndp()
1688
1689         # PG 1 is not IP enabled
1690         self.pg1.admin_up()
1691
1692     def tearDown(self):
1693         super(TestIPDisabled, self).tearDown()
1694         for i in self.pg_interfaces:
1695             i.unconfig_ip4()
1696             i.admin_down()
1697
1698     def test_ip_disabled(self):
1699         """ IP Disabled """
1700
1701         #
1702         # An (S,G).
1703         # one accepting interface, pg0, 2 forwarding interfaces
1704         #
1705         route_ff_01 = VppIpMRoute(
1706             self,
1707             "::",
1708             "ffef::1", 128,
1709             MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
1710             [VppMRoutePath(self.pg1.sw_if_index,
1711                            MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT),
1712              VppMRoutePath(self.pg0.sw_if_index,
1713                            MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)])
1714         route_ff_01.add_vpp_config()
1715
1716         pu = (Ether(src=self.pg1.remote_mac,
1717                     dst=self.pg1.local_mac) /
1718               IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
1719               inet6.UDP(sport=1234, dport=1234) /
1720               Raw(b'\xa5' * 100))
1721         pm = (Ether(src=self.pg1.remote_mac,
1722                     dst=self.pg1.local_mac) /
1723               IPv6(src="2001::1", dst="ffef::1") /
1724               inet6.UDP(sport=1234, dport=1234) /
1725               Raw(b'\xa5' * 100))
1726
1727         #
1728         # PG1 does not forward IP traffic
1729         #
1730         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1731         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1732
1733         #
1734         # IP enable PG1
1735         #
1736         self.pg1.config_ip6()
1737
1738         #
1739         # Now we get packets through
1740         #
1741         self.pg1.add_stream(pu)
1742         self.pg_enable_capture(self.pg_interfaces)
1743         self.pg_start()
1744         rx = self.pg0.get_capture(1)
1745
1746         self.pg1.add_stream(pm)
1747         self.pg_enable_capture(self.pg_interfaces)
1748         self.pg_start()
1749         rx = self.pg0.get_capture(1)
1750
1751         #
1752         # Disable PG1
1753         #
1754         self.pg1.unconfig_ip6()
1755
1756         #
1757         # PG1 does not forward IP traffic
1758         #
1759         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1760         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1761
1762
1763 class TestIP6LoadBalance(VppTestCase):
1764     """ IPv6 Load-Balancing """
1765
1766     @classmethod
1767     def setUpClass(cls):
1768         super(TestIP6LoadBalance, cls).setUpClass()
1769
1770     @classmethod
1771     def tearDownClass(cls):
1772         super(TestIP6LoadBalance, cls).tearDownClass()
1773
1774     def setUp(self):
1775         super(TestIP6LoadBalance, self).setUp()
1776
1777         self.create_pg_interfaces(range(5))
1778
1779         mpls_tbl = VppMplsTable(self, 0)
1780         mpls_tbl.add_vpp_config()
1781
1782         for i in self.pg_interfaces:
1783             i.admin_up()
1784             i.config_ip6()
1785             i.resolve_ndp()
1786             i.enable_mpls()
1787
1788     def tearDown(self):
1789         for i in self.pg_interfaces:
1790             i.unconfig_ip6()
1791             i.admin_down()
1792             i.disable_mpls()
1793         super(TestIP6LoadBalance, self).tearDown()
1794
1795     def pg_send(self, input, pkts):
1796         self.vapi.cli("clear trace")
1797         input.add_stream(pkts)
1798         self.pg_enable_capture(self.pg_interfaces)
1799         self.pg_start()
1800
1801     def send_and_expect_load_balancing(self, input, pkts, outputs):
1802         self.pg_send(input, pkts)
1803         for oo in outputs:
1804             rx = oo._get_capture(1)
1805             self.assertNotEqual(0, len(rx))
1806
1807     def send_and_expect_one_itf(self, input, pkts, itf):
1808         self.pg_send(input, pkts)
1809         rx = itf.get_capture(len(pkts))
1810
1811     def test_ip6_load_balance(self):
1812         """ IPv6 Load-Balancing """
1813
1814         #
1815         # An array of packets that differ only in the destination port
1816         #  - IP only
1817         #  - MPLS EOS
1818         #  - MPLS non-EOS
1819         #  - MPLS non-EOS with an entropy label
1820         #
1821         port_ip_pkts = []
1822         port_mpls_pkts = []
1823         port_mpls_neos_pkts = []
1824         port_ent_pkts = []
1825
1826         #
1827         # An array of packets that differ only in the source address
1828         #
1829         src_ip_pkts = []
1830         src_mpls_pkts = []
1831
1832         for ii in range(NUM_PKTS):
1833             port_ip_hdr = (
1834                 IPv6(dst="3000::1", src="3000:1::1") /
1835                 inet6.UDP(sport=1234, dport=1234 + ii) /
1836                 Raw(b'\xa5' * 100))
1837             port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1838                                        dst=self.pg0.local_mac) /
1839                                  port_ip_hdr))
1840             port_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1841                                          dst=self.pg0.local_mac) /
1842                                    MPLS(label=66, ttl=2) /
1843                                    port_ip_hdr))
1844             port_mpls_neos_pkts.append((Ether(src=self.pg0.remote_mac,
1845                                               dst=self.pg0.local_mac) /
1846                                         MPLS(label=67, ttl=2) /
1847                                         MPLS(label=77, ttl=2) /
1848                                         port_ip_hdr))
1849             port_ent_pkts.append((Ether(src=self.pg0.remote_mac,
1850                                         dst=self.pg0.local_mac) /
1851                                   MPLS(label=67, ttl=2) /
1852                                   MPLS(label=14, ttl=2) /
1853                                   MPLS(label=999, ttl=2) /
1854                                   port_ip_hdr))
1855             src_ip_hdr = (
1856                 IPv6(dst="3000::1", src="3000:1::%d" % ii) /
1857                 inet6.UDP(sport=1234, dport=1234) /
1858                 Raw(b'\xa5' * 100))
1859             src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1860                                       dst=self.pg0.local_mac) /
1861                                 src_ip_hdr))
1862             src_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1863                                         dst=self.pg0.local_mac) /
1864                                   MPLS(label=66, ttl=2) /
1865                                   src_ip_hdr))
1866
1867         #
1868         # A route for the IP packets
1869         #
1870         route_3000_1 = VppIpRoute(self, "3000::1", 128,
1871                                   [VppRoutePath(self.pg1.remote_ip6,
1872                                                 self.pg1.sw_if_index),
1873                                    VppRoutePath(self.pg2.remote_ip6,
1874                                                 self.pg2.sw_if_index)])
1875         route_3000_1.add_vpp_config()
1876
1877         #
1878         # a local-label for the EOS packets
1879         #
1880         binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
1881         binding.add_vpp_config()
1882
1883         #
1884         # An MPLS route for the non-EOS packets
1885         #
1886         route_67 = VppMplsRoute(self, 67, 0,
1887                                 [VppRoutePath(self.pg1.remote_ip6,
1888                                               self.pg1.sw_if_index,
1889                                               labels=[67]),
1890                                  VppRoutePath(self.pg2.remote_ip6,
1891                                               self.pg2.sw_if_index,
1892                                               labels=[67])])
1893         route_67.add_vpp_config()
1894
1895         #
1896         # inject the packet on pg0 - expect load-balancing across the 2 paths
1897         #  - since the default hash config is to use IP src,dst and port
1898         #    src,dst
1899         # We are not going to ensure equal amounts of packets across each link,
1900         # since the hash algorithm is statistical and therefore this can never
1901         # be guaranteed. But with 64 different packets we do expect some
1902         # balancing. So instead just ensure there is traffic on each link.
1903         #
1904         self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
1905                                             [self.pg1, self.pg2])
1906         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1907                                             [self.pg1, self.pg2])
1908         self.send_and_expect_load_balancing(self.pg0, port_mpls_pkts,
1909                                             [self.pg1, self.pg2])
1910         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1911                                             [self.pg1, self.pg2])
1912         self.send_and_expect_load_balancing(self.pg0, port_mpls_neos_pkts,
1913                                             [self.pg1, self.pg2])
1914
1915         #
1916         # The packets with Entropy label in should not load-balance,
1917         # since the Entropy value is fixed.
1918         #
1919         self.send_and_expect_one_itf(self.pg0, port_ent_pkts, self.pg1)
1920
1921         #
1922         # change the flow hash config so it's only IP src,dst
1923         #  - now only the stream with differing source address will
1924         #    load-balance
1925         #
1926         self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=0, dport=0,
1927                                    is_ipv6=1)
1928
1929         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1930                                             [self.pg1, self.pg2])
1931         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1932                                             [self.pg1, self.pg2])
1933         self.send_and_expect_one_itf(self.pg0, port_ip_pkts, self.pg2)
1934
1935         #
1936         # change the flow hash config back to defaults
1937         #
1938         self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=1, dport=1,
1939                                    is_ipv6=1)
1940
1941         #
1942         # Recursive prefixes
1943         #  - testing that 2 stages of load-balancing occurs and there is no
1944         #    polarisation (i.e. only 2 of 4 paths are used)
1945         #
1946         port_pkts = []
1947         src_pkts = []
1948
1949         for ii in range(257):
1950             port_pkts.append((Ether(src=self.pg0.remote_mac,
1951                                     dst=self.pg0.local_mac) /
1952                               IPv6(dst="4000::1",
1953                                    src="4000:1::1") /
1954                               inet6.UDP(sport=1234,
1955                                         dport=1234 + ii) /
1956                               Raw(b'\xa5' * 100)))
1957             src_pkts.append((Ether(src=self.pg0.remote_mac,
1958                                    dst=self.pg0.local_mac) /
1959                              IPv6(dst="4000::1",
1960                                   src="4000:1::%d" % ii) /
1961                              inet6.UDP(sport=1234, dport=1234) /
1962                              Raw(b'\xa5' * 100)))
1963
1964         route_3000_2 = VppIpRoute(self, "3000::2", 128,
1965                                   [VppRoutePath(self.pg3.remote_ip6,
1966                                                 self.pg3.sw_if_index),
1967                                    VppRoutePath(self.pg4.remote_ip6,
1968                                                 self.pg4.sw_if_index)])
1969         route_3000_2.add_vpp_config()
1970
1971         route_4000_1 = VppIpRoute(self, "4000::1", 128,
1972                                   [VppRoutePath("3000::1",
1973                                                 0xffffffff),
1974                                    VppRoutePath("3000::2",
1975                                                 0xffffffff)])
1976         route_4000_1.add_vpp_config()
1977
1978         #
1979         # inject the packet on pg0 - expect load-balancing across all 4 paths
1980         #
1981         self.vapi.cli("clear trace")
1982         self.send_and_expect_load_balancing(self.pg0, port_pkts,
1983                                             [self.pg1, self.pg2,
1984                                              self.pg3, self.pg4])
1985         self.send_and_expect_load_balancing(self.pg0, src_pkts,
1986                                             [self.pg1, self.pg2,
1987                                              self.pg3, self.pg4])
1988
1989         #
1990         # Recursive prefixes
1991         #  - testing that 2 stages of load-balancing no choices
1992         #
1993         port_pkts = []
1994
1995         for ii in range(257):
1996             port_pkts.append((Ether(src=self.pg0.remote_mac,
1997                                     dst=self.pg0.local_mac) /
1998                               IPv6(dst="6000::1",
1999                                    src="6000:1::1") /
2000                               inet6.UDP(sport=1234,
2001                                         dport=1234 + ii) /
2002                               Raw(b'\xa5' * 100)))
2003
2004         route_5000_2 = VppIpRoute(self, "5000::2", 128,
2005                                   [VppRoutePath(self.pg3.remote_ip6,
2006                                                 self.pg3.sw_if_index)])
2007         route_5000_2.add_vpp_config()
2008
2009         route_6000_1 = VppIpRoute(self, "6000::1", 128,
2010                                   [VppRoutePath("5000::2",
2011                                                 0xffffffff)])
2012         route_6000_1.add_vpp_config()
2013
2014         #
2015         # inject the packet on pg0 - expect load-balancing across all 4 paths
2016         #
2017         self.vapi.cli("clear trace")
2018         self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg3)
2019
2020
2021 class TestIP6Punt(VppTestCase):
2022     """ IPv6 Punt Police/Redirect """
2023
2024     @classmethod
2025     def setUpClass(cls):
2026         super(TestIP6Punt, cls).setUpClass()
2027
2028     @classmethod
2029     def tearDownClass(cls):
2030         super(TestIP6Punt, cls).tearDownClass()
2031
2032     def setUp(self):
2033         super(TestIP6Punt, self).setUp()
2034
2035         self.create_pg_interfaces(range(4))
2036
2037         for i in self.pg_interfaces:
2038             i.admin_up()
2039             i.config_ip6()
2040             i.resolve_ndp()
2041
2042     def tearDown(self):
2043         super(TestIP6Punt, self).tearDown()
2044         for i in self.pg_interfaces:
2045             i.unconfig_ip6()
2046             i.admin_down()
2047
2048     def test_ip_punt(self):
2049         """ IP6 punt police and redirect """
2050
2051         p = (Ether(src=self.pg0.remote_mac,
2052                    dst=self.pg0.local_mac) /
2053              IPv6(src=self.pg0.remote_ip6,
2054                   dst=self.pg0.local_ip6) /
2055              inet6.TCP(sport=1234, dport=1234) /
2056              Raw(b'\xa5' * 100))
2057
2058         pkts = p * 1025
2059
2060         #
2061         # Configure a punt redirect via pg1.
2062         #
2063         nh_addr = self.pg1.remote_ip6
2064         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2065                                    self.pg1.sw_if_index,
2066                                    nh_addr)
2067
2068         self.send_and_expect(self.pg0, pkts, self.pg1)
2069
2070         #
2071         # add a policer
2072         #
2073         policer = self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
2074                                             rate_type=1)
2075         self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
2076
2077         self.vapi.cli("clear trace")
2078         self.pg0.add_stream(pkts)
2079         self.pg_enable_capture(self.pg_interfaces)
2080         self.pg_start()
2081
2082         #
2083         # the number of packet received should be greater than 0,
2084         # but not equal to the number sent, since some were policed
2085         #
2086         rx = self.pg1._get_capture(1)
2087         self.assertGreater(len(rx), 0)
2088         self.assertLess(len(rx), len(pkts))
2089
2090         #
2091         # remove the policer. back to full rx
2092         #
2093         self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
2094         self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
2095                                   rate_type=1, is_add=0)
2096         self.send_and_expect(self.pg0, pkts, self.pg1)
2097
2098         #
2099         # remove the redirect. expect full drop.
2100         #
2101         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2102                                    self.pg1.sw_if_index,
2103                                    nh_addr,
2104                                    is_add=0)
2105         self.send_and_assert_no_replies(self.pg0, pkts,
2106                                         "IP no punt config")
2107
2108         #
2109         # Add a redirect that is not input port selective
2110         #
2111         self.vapi.ip_punt_redirect(0xffffffff,
2112                                    self.pg1.sw_if_index,
2113                                    nh_addr)
2114         self.send_and_expect(self.pg0, pkts, self.pg1)
2115
2116         self.vapi.ip_punt_redirect(0xffffffff,
2117                                    self.pg1.sw_if_index,
2118                                    nh_addr,
2119                                    is_add=0)
2120
2121     def test_ip_punt_dump(self):
2122         """ IP6 punt redirect dump"""
2123
2124         #
2125         # Configure a punt redirects
2126         #
2127         nh_addr = self.pg3.remote_ip6
2128         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2129                                    self.pg3.sw_if_index,
2130                                    nh_addr)
2131         self.vapi.ip_punt_redirect(self.pg1.sw_if_index,
2132                                    self.pg3.sw_if_index,
2133                                    nh_addr)
2134         self.vapi.ip_punt_redirect(self.pg2.sw_if_index,
2135                                    self.pg3.sw_if_index,
2136                                    '0::0')
2137
2138         #
2139         # Dump pg0 punt redirects
2140         #
2141         punts = self.vapi.ip_punt_redirect_dump(self.pg0.sw_if_index,
2142                                                 is_ipv6=1)
2143         for p in punts:
2144             self.assertEqual(p.punt.rx_sw_if_index, self.pg0.sw_if_index)
2145
2146         #
2147         # Dump punt redirects for all interfaces
2148         #
2149         punts = self.vapi.ip_punt_redirect_dump(0xffffffff, is_ipv6=1)
2150         self.assertEqual(len(punts), 3)
2151         for p in punts:
2152             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
2153         self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
2154         self.assertEqual(str(punts[2].punt.nh), '::')
2155
2156
2157 class TestIPDeag(VppTestCase):
2158     """ IPv6 Deaggregate Routes """
2159
2160     @classmethod
2161     def setUpClass(cls):
2162         super(TestIPDeag, cls).setUpClass()
2163
2164     @classmethod
2165     def tearDownClass(cls):
2166         super(TestIPDeag, cls).tearDownClass()
2167
2168     def setUp(self):
2169         super(TestIPDeag, self).setUp()
2170
2171         self.create_pg_interfaces(range(3))
2172
2173         for i in self.pg_interfaces:
2174             i.admin_up()
2175             i.config_ip6()
2176             i.resolve_ndp()
2177
2178     def tearDown(self):
2179         super(TestIPDeag, self).tearDown()
2180         for i in self.pg_interfaces:
2181             i.unconfig_ip6()
2182             i.admin_down()
2183
2184     def test_ip_deag(self):
2185         """ IP Deag Routes """
2186
2187         #
2188         # Create a table to be used for:
2189         #  1 - another destination address lookup
2190         #  2 - a source address lookup
2191         #
2192         table_dst = VppIpTable(self, 1, is_ip6=1)
2193         table_src = VppIpTable(self, 2, is_ip6=1)
2194         table_dst.add_vpp_config()
2195         table_src.add_vpp_config()
2196
2197         #
2198         # Add a route in the default table to point to a deag/
2199         # second lookup in each of these tables
2200         #
2201         route_to_dst = VppIpRoute(self, "1::1", 128,
2202                                   [VppRoutePath("::",
2203                                                 0xffffffff,
2204                                                 nh_table_id=1)])
2205         route_to_src = VppIpRoute(
2206             self, "1::2", 128,
2207             [VppRoutePath("::",
2208                           0xffffffff,
2209                           nh_table_id=2,
2210                           type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP)])
2211
2212         route_to_dst.add_vpp_config()
2213         route_to_src.add_vpp_config()
2214
2215         #
2216         # packets to these destination are dropped, since they'll
2217         # hit the respective default routes in the second table
2218         #
2219         p_dst = (Ether(src=self.pg0.remote_mac,
2220                        dst=self.pg0.local_mac) /
2221                  IPv6(src="5::5", dst="1::1") /
2222                  inet6.TCP(sport=1234, dport=1234) /
2223                  Raw(b'\xa5' * 100))
2224         p_src = (Ether(src=self.pg0.remote_mac,
2225                        dst=self.pg0.local_mac) /
2226                  IPv6(src="2::2", dst="1::2") /
2227                  inet6.TCP(sport=1234, dport=1234) /
2228                  Raw(b'\xa5' * 100))
2229         pkts_dst = p_dst * 257
2230         pkts_src = p_src * 257
2231
2232         self.send_and_assert_no_replies(self.pg0, pkts_dst,
2233                                         "IP in dst table")
2234         self.send_and_assert_no_replies(self.pg0, pkts_src,
2235                                         "IP in src table")
2236
2237         #
2238         # add a route in the dst table to forward via pg1
2239         #
2240         route_in_dst = VppIpRoute(self, "1::1", 128,
2241                                   [VppRoutePath(self.pg1.remote_ip6,
2242                                                 self.pg1.sw_if_index)],
2243                                   table_id=1)
2244         route_in_dst.add_vpp_config()
2245
2246         self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2247
2248         #
2249         # add a route in the src table to forward via pg2
2250         #
2251         route_in_src = VppIpRoute(self, "2::2", 128,
2252                                   [VppRoutePath(self.pg2.remote_ip6,
2253                                                 self.pg2.sw_if_index)],
2254                                   table_id=2)
2255         route_in_src.add_vpp_config()
2256         self.send_and_expect(self.pg0, pkts_src, self.pg2)
2257
2258         #
2259         # loop in the lookup DP
2260         #
2261         route_loop = VppIpRoute(self, "3::3", 128,
2262                                 [VppRoutePath("::",
2263                                               0xffffffff)])
2264         route_loop.add_vpp_config()
2265
2266         p_l = (Ether(src=self.pg0.remote_mac,
2267                      dst=self.pg0.local_mac) /
2268                IPv6(src="3::4", dst="3::3") /
2269                inet6.TCP(sport=1234, dport=1234) /
2270                Raw(b'\xa5' * 100))
2271
2272         self.send_and_assert_no_replies(self.pg0, p_l * 257,
2273                                         "IP lookup loop")
2274
2275
2276 class TestIP6Input(VppTestCase):
2277     """ IPv6 Input Exception Test Cases """
2278
2279     @classmethod
2280     def setUpClass(cls):
2281         super(TestIP6Input, cls).setUpClass()
2282
2283     @classmethod
2284     def tearDownClass(cls):
2285         super(TestIP6Input, cls).tearDownClass()
2286
2287     def setUp(self):
2288         super(TestIP6Input, self).setUp()
2289
2290         self.create_pg_interfaces(range(2))
2291
2292         for i in self.pg_interfaces:
2293             i.admin_up()
2294             i.config_ip6()
2295             i.resolve_ndp()
2296
2297     def tearDown(self):
2298         super(TestIP6Input, self).tearDown()
2299         for i in self.pg_interfaces:
2300             i.unconfig_ip6()
2301             i.admin_down()
2302
2303     def test_ip_input_icmp_reply(self):
2304         """ IP6 Input Exception - Return ICMP (3,0) """
2305         #
2306         # hop limit - ICMP replies
2307         #
2308         p_version = (Ether(src=self.pg0.remote_mac,
2309                            dst=self.pg0.local_mac) /
2310                      IPv6(src=self.pg0.remote_ip6,
2311                           dst=self.pg1.remote_ip6,
2312                           hlim=1) /
2313                      inet6.UDP(sport=1234, dport=1234) /
2314                      Raw(b'\xa5' * 100))
2315
2316         rx = self.send_and_expect(self.pg0, p_version * NUM_PKTS, self.pg0)
2317         rx = rx[0]
2318         icmp = rx[ICMPv6TimeExceeded]
2319
2320         # 0: "hop limit exceeded in transit",
2321         self.assertEqual((icmp.type, icmp.code), (3, 0))
2322
2323     icmpv6_data = '\x0a' * 18
2324     all_0s = "::"
2325     all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
2326
2327     @parameterized.expand([
2328         # Name, src, dst, l4proto, msg, timeout
2329         ("src='iface',   dst='iface'", None, None,
2330          inet6.UDP(sport=1234, dport=1234), "funky version", None),
2331         ("src='All 0's', dst='iface'", all_0s, None,
2332          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2333         ("src='iface',   dst='All 0's'", None, all_0s,
2334          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2335         ("src='All 1's', dst='iface'", all_1s, None,
2336          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2337         ("src='iface',   dst='All 1's'", None, all_1s,
2338          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2339         ("src='All 1's', dst='All 1's'", all_1s, all_1s,
2340          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2341
2342     ])
2343     def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
2344
2345         self._testMethodDoc = 'IPv6 Input Exception - %s' % name
2346
2347         p_version = (Ether(src=self.pg0.remote_mac,
2348                            dst=self.pg0.local_mac) /
2349                      IPv6(src=src or self.pg0.remote_ip6,
2350                           dst=dst or self.pg1.remote_ip6,
2351                           version=3) /
2352                      l4 /
2353                      Raw(b'\xa5' * 100))
2354
2355         self.send_and_assert_no_replies(self.pg0, p_version * NUM_PKTS,
2356                                         remark=msg or "",
2357                                         timeout=timeout)
2358
2359     def test_hop_by_hop(self):
2360         """ Hop-by-hop header test """
2361
2362         p = (Ether(src=self.pg0.remote_mac,
2363                    dst=self.pg0.local_mac) /
2364              IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
2365              IPv6ExtHdrHopByHop() /
2366              inet6.UDP(sport=1234, dport=1234) /
2367              Raw(b'\xa5' * 100))
2368
2369         self.pg0.add_stream(p)
2370         self.pg_enable_capture(self.pg_interfaces)
2371         self.pg_start()
2372
2373 if __name__ == '__main__':
2374     unittest.main(testRunner=VppTestRunner)