tests: "force solo" testcase support
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python3
2
3 import socket
4 from socket import inet_pton, inet_ntop
5 import unittest
6
7 from parameterized import parameterized
8 import scapy.compat
9 import scapy.layers.inet6 as inet6
10 from scapy.contrib.mpls import MPLS
11 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
12     ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
13     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
14     ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, \
15     IPv6ExtHdrHopByHop, ICMPv6MLReport2, ICMPv6MLDMultAddrRec
16 from scapy.layers.l2 import Ether, Dot1Q
17 from scapy.packet import Raw
18 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
19     in6_mactoifaceid
20 from six import moves
21
22 from framework import VppTestCase, VppTestRunner
23 from util import ppp, ip6_normalize, mk_ll_addr
24 from vpp_ip import DpoProto
25 from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
26     VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
27     VppMplsRoute, VppMplsTable, VppIpTable, FibPathType, FibPathProto, \
28     VppIpInterfaceAddress, find_route_in_dump, find_mroute_in_dump, \
29     VppIp6LinkLocalAddress
30 from vpp_neighbor import find_nbr, VppNeighbor
31 from vpp_pg_interface import is_ipv6_misc
32 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
33 from vpp_policer import VppPolicer
34 from ipaddress import IPv6Network, IPv6Address
35
36 AF_INET6 = socket.AF_INET6
37
38 try:
39     text_type = unicode
40 except NameError:
41     text_type = str
42
43 NUM_PKTS = 67
44
45
46 class TestIPv6ND(VppTestCase):
47     def validate_ra(self, intf, rx, dst_ip=None):
48         if not dst_ip:
49             dst_ip = intf.remote_ip6
50
51         # unicasted packets must come to the unicast mac
52         self.assertEqual(rx[Ether].dst, intf.remote_mac)
53
54         # and from the router's MAC
55         self.assertEqual(rx[Ether].src, intf.local_mac)
56
57         # the rx'd RA should be addressed to the sender's source
58         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
59         self.assertEqual(in6_ptop(rx[IPv6].dst),
60                          in6_ptop(dst_ip))
61
62         # and come from the router's link local
63         self.assertTrue(in6_islladdr(rx[IPv6].src))
64         self.assertEqual(in6_ptop(rx[IPv6].src),
65                          in6_ptop(mk_ll_addr(intf.local_mac)))
66
67     def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
68         if not dst_ip:
69             dst_ip = intf.remote_ip6
70         if not tgt_ip:
71             dst_ip = intf.local_ip6
72
73         # unicasted packets must come to the unicast mac
74         self.assertEqual(rx[Ether].dst, intf.remote_mac)
75
76         # and from the router's MAC
77         self.assertEqual(rx[Ether].src, intf.local_mac)
78
79         # the rx'd NA should be addressed to the sender's source
80         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
81         self.assertEqual(in6_ptop(rx[IPv6].dst),
82                          in6_ptop(dst_ip))
83
84         # and come from the target address
85         self.assertEqual(
86             in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
87
88         # Dest link-layer options should have the router's MAC
89         dll = rx[ICMPv6NDOptDstLLAddr]
90         self.assertEqual(dll.lladdr, intf.local_mac)
91
92     def validate_ns(self, intf, rx, tgt_ip):
93         nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
94         dst_ip = inet_ntop(AF_INET6, nsma)
95
96         # NS is broadcast
97         self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
98
99         # and from the router's MAC
100         self.assertEqual(rx[Ether].src, intf.local_mac)
101
102         # the rx'd NS should be addressed to an mcast address
103         # derived from the target address
104         self.assertEqual(
105             in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
106
107         # expect the tgt IP in the NS header
108         ns = rx[ICMPv6ND_NS]
109         self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
110
111         # packet is from the router's local address
112         self.assertEqual(
113             in6_ptop(rx[IPv6].src), intf.local_ip6)
114
115         # Src link-layer options should have the router's MAC
116         sll = rx[ICMPv6NDOptSrcLLAddr]
117         self.assertEqual(sll.lladdr, intf.local_mac)
118
119     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
120                            filter_out_fn=is_ipv6_misc):
121         intf.add_stream(pkts)
122         self.pg_enable_capture(self.pg_interfaces)
123         self.pg_start()
124         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
125
126         self.assertEqual(len(rx), 1)
127         rx = rx[0]
128         self.validate_ra(intf, rx, dst_ip)
129
130     def send_and_expect_na(self, intf, pkts, remark, dst_ip=None,
131                            tgt_ip=None,
132                            filter_out_fn=is_ipv6_misc):
133         intf.add_stream(pkts)
134         self.pg_enable_capture(self.pg_interfaces)
135         self.pg_start()
136         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
137
138         self.assertEqual(len(rx), 1)
139         rx = rx[0]
140         self.validate_na(intf, rx, dst_ip, tgt_ip)
141
142     def send_and_expect_ns(self, tx_intf, rx_intf, pkts, tgt_ip,
143                            filter_out_fn=is_ipv6_misc):
144         self.vapi.cli("clear trace")
145         tx_intf.add_stream(pkts)
146         self.pg_enable_capture(self.pg_interfaces)
147         self.pg_start()
148         rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
149
150         self.assertEqual(len(rx), 1)
151         rx = rx[0]
152         self.validate_ns(rx_intf, rx, tgt_ip)
153
154     def verify_ip(self, rx, smac, dmac, sip, dip):
155         ether = rx[Ether]
156         self.assertEqual(ether.dst, dmac)
157         self.assertEqual(ether.src, smac)
158
159         ip = rx[IPv6]
160         self.assertEqual(ip.src, sip)
161         self.assertEqual(ip.dst, dip)
162
163
164 class TestIPv6(TestIPv6ND):
165     """ IPv6 Test Case """
166
167     @classmethod
168     def force_solo(cls):
169         return True
170
171     @classmethod
172     def setUpClass(cls):
173         super(TestIPv6, cls).setUpClass()
174
175     @classmethod
176     def tearDownClass(cls):
177         super(TestIPv6, cls).tearDownClass()
178
179     def setUp(self):
180         """
181         Perform test setup before test case.
182
183         **Config:**
184             - create 3 pg interfaces
185                 - untagged pg0 interface
186                 - Dot1Q subinterface on pg1
187                 - Dot1AD subinterface on pg2
188             - setup interfaces:
189                 - put it into UP state
190                 - set IPv6 addresses
191                 - resolve neighbor address using NDP
192             - configure 200 fib entries
193
194         :ivar list interfaces: pg interfaces and subinterfaces.
195         :ivar dict flows: IPv4 packet flows in test.
196
197         *TODO:* Create AD sub interface
198         """
199         super(TestIPv6, self).setUp()
200
201         # create 3 pg interfaces
202         self.create_pg_interfaces(range(3))
203
204         # create 2 subinterfaces for p1 and pg2
205         self.sub_interfaces = [
206             VppDot1QSubint(self, self.pg1, 100),
207             VppDot1QSubint(self, self.pg2, 200)
208             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
209         ]
210
211         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
212         self.flows = dict()
213         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
214         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
215         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
216
217         # packet sizes
218         self.pg_if_packet_sizes = [64, 1500, 9020]
219
220         self.interfaces = list(self.pg_interfaces)
221         self.interfaces.extend(self.sub_interfaces)
222
223         # setup all interfaces
224         for i in self.interfaces:
225             i.admin_up()
226             i.config_ip6()
227             i.resolve_ndp()
228
229     def tearDown(self):
230         """Run standard test teardown and log ``show ip6 neighbors``."""
231         for i in self.interfaces:
232             i.unconfig_ip6()
233             i.admin_down()
234         for i in self.sub_interfaces:
235             i.remove_vpp_config()
236
237         super(TestIPv6, self).tearDown()
238         if not self.vpp_dead:
239             self.logger.info(self.vapi.cli("show ip6 neighbors"))
240             # info(self.vapi.cli("show ip6 fib"))  # many entries
241
242     def modify_packet(self, src_if, packet_size, pkt):
243         """Add load, set destination IP and extend packet to required packet
244         size for defined interface.
245
246         :param VppInterface src_if: Interface to create packet for.
247         :param int packet_size: Required packet size.
248         :param Scapy pkt: Packet to be modified.
249         """
250         dst_if_idx = int(packet_size / 10 % 2)
251         dst_if = self.flows[src_if][dst_if_idx]
252         info = self.create_packet_info(src_if, dst_if)
253         payload = self.info_to_payload(info)
254         p = pkt / Raw(payload)
255         p[IPv6].dst = dst_if.remote_ip6
256         info.data = p.copy()
257         if isinstance(src_if, VppSubInterface):
258             p = src_if.add_dot1_layer(p)
259         self.extend_packet(p, packet_size)
260
261         return p
262
263     def create_stream(self, src_if):
264         """Create input packet stream for defined interface.
265
266         :param VppInterface src_if: Interface to create packet stream for.
267         """
268         hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
269         pkt_tmpl = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
270                     IPv6(src=src_if.remote_ip6) /
271                     inet6.UDP(sport=1234, dport=1234))
272
273         pkts = [self.modify_packet(src_if, i, pkt_tmpl)
274                 for i in moves.range(self.pg_if_packet_sizes[0],
275                                      self.pg_if_packet_sizes[1], 10)]
276         pkts_b = [self.modify_packet(src_if, i, pkt_tmpl)
277                   for i in moves.range(self.pg_if_packet_sizes[1] + hdr_ext,
278                                        self.pg_if_packet_sizes[2] + hdr_ext,
279                                        50)]
280         pkts.extend(pkts_b)
281
282         return pkts
283
284     def verify_capture(self, dst_if, capture):
285         """Verify captured input packet stream for defined interface.
286
287         :param VppInterface dst_if: Interface to verify captured packet stream
288                                     for.
289         :param list capture: Captured packet stream.
290         """
291         self.logger.info("Verifying capture on interface %s" % dst_if.name)
292         last_info = dict()
293         for i in self.interfaces:
294             last_info[i.sw_if_index] = None
295         is_sub_if = False
296         dst_sw_if_index = dst_if.sw_if_index
297         if hasattr(dst_if, 'parent'):
298             is_sub_if = True
299         for packet in capture:
300             if is_sub_if:
301                 # Check VLAN tags and Ethernet header
302                 packet = dst_if.remove_dot1_layer(packet)
303             self.assertTrue(Dot1Q not in packet)
304             try:
305                 ip = packet[IPv6]
306                 udp = packet[inet6.UDP]
307                 payload_info = self.payload_to_info(packet[Raw])
308                 packet_index = payload_info.index
309                 self.assertEqual(payload_info.dst, dst_sw_if_index)
310                 self.logger.debug(
311                     "Got packet on port %s: src=%u (id=%u)" %
312                     (dst_if.name, payload_info.src, packet_index))
313                 next_info = self.get_next_packet_info_for_interface2(
314                     payload_info.src, dst_sw_if_index,
315                     last_info[payload_info.src])
316                 last_info[payload_info.src] = next_info
317                 self.assertTrue(next_info is not None)
318                 self.assertEqual(packet_index, next_info.index)
319                 saved_packet = next_info.data
320                 # Check standard fields
321                 self.assertEqual(
322                     ip.src, saved_packet[IPv6].src)
323                 self.assertEqual(
324                     ip.dst, saved_packet[IPv6].dst)
325                 self.assertEqual(
326                     udp.sport, saved_packet[inet6.UDP].sport)
327                 self.assertEqual(
328                     udp.dport, saved_packet[inet6.UDP].dport)
329             except:
330                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
331                 raise
332         for i in self.interfaces:
333             remaining_packet = self.get_next_packet_info_for_interface2(
334                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
335             self.assertTrue(remaining_packet is None,
336                             "Interface %s: Packet expected from interface %s "
337                             "didn't arrive" % (dst_if.name, i.name))
338
339     def test_next_header_anomaly(self):
340         """ IPv6 next header anomaly test
341
342         Test scenario:
343             - ipv6 next header field = Fragment Header (44)
344             - next header is ICMPv6 Echo Request
345             - wait for reassembly
346         """
347         pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
348                IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44) /
349                ICMPv6EchoRequest())
350
351         self.pg0.add_stream(pkt)
352         self.pg_start()
353
354         # wait for reassembly
355         self.sleep(10)
356
357     def test_fib(self):
358         """ IPv6 FIB test
359
360         Test scenario:
361             - Create IPv6 stream for pg0 interface
362             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
363             - Send and verify received packets on each interface.
364         """
365
366         pkts = self.create_stream(self.pg0)
367         self.pg0.add_stream(pkts)
368
369         for i in self.sub_interfaces:
370             pkts = self.create_stream(i)
371             i.parent.add_stream(pkts)
372
373         self.pg_enable_capture(self.pg_interfaces)
374         self.pg_start()
375
376         pkts = self.pg0.get_capture()
377         self.verify_capture(self.pg0, pkts)
378
379         for i in self.sub_interfaces:
380             pkts = i.parent.get_capture()
381             self.verify_capture(i, pkts)
382
383     def test_ns(self):
384         """ IPv6 Neighbour Solicitation Exceptions
385
386         Test scenario:
387            - Send an NS Sourced from an address not covered by the link sub-net
388            - Send an NS to an mcast address the router has not joined
389            - Send NS for a target address the router does not onn.
390         """
391
392         #
393         # An NS from a non link source address
394         #
395         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
396         d = inet_ntop(AF_INET6, nsma)
397
398         p = (Ether(dst=in6_getnsmac(nsma)) /
399              IPv6(dst=d, src="2002::2") /
400              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
401              ICMPv6NDOptSrcLLAddr(
402                  lladdr=self.pg0.remote_mac))
403         pkts = [p]
404
405         self.send_and_assert_no_replies(
406             self.pg0, pkts,
407             "No response to NS source by address not on sub-net")
408
409         #
410         # An NS for sent to a solicited mcast group the router is
411         # not a member of FAILS
412         #
413         if 0:
414             nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
415             d = inet_ntop(AF_INET6, nsma)
416
417             p = (Ether(dst=in6_getnsmac(nsma)) /
418                  IPv6(dst=d, src=self.pg0.remote_ip6) /
419                  ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
420                  ICMPv6NDOptSrcLLAddr(
421                      lladdr=self.pg0.remote_mac))
422             pkts = [p]
423
424             self.send_and_assert_no_replies(
425                 self.pg0, pkts,
426                 "No response to NS sent to unjoined mcast address")
427
428         #
429         # An NS whose target address is one the router does not own
430         #
431         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
432         d = inet_ntop(AF_INET6, nsma)
433
434         p = (Ether(dst=in6_getnsmac(nsma)) /
435              IPv6(dst=d, src=self.pg0.remote_ip6) /
436              ICMPv6ND_NS(tgt="fd::ffff") /
437              ICMPv6NDOptSrcLLAddr(
438                  lladdr=self.pg0.remote_mac))
439         pkts = [p]
440
441         self.send_and_assert_no_replies(self.pg0, pkts,
442                                         "No response to NS for unknown target")
443
444         #
445         # A neighbor entry that has no associated FIB-entry
446         #
447         self.pg0.generate_remote_hosts(4)
448         nd_entry = VppNeighbor(self,
449                                self.pg0.sw_if_index,
450                                self.pg0.remote_hosts[2].mac,
451                                self.pg0.remote_hosts[2].ip6,
452                                is_no_fib_entry=1)
453         nd_entry.add_vpp_config()
454
455         #
456         # check we have the neighbor, but no route
457         #
458         self.assertTrue(find_nbr(self,
459                                  self.pg0.sw_if_index,
460                                  self.pg0._remote_hosts[2].ip6))
461         self.assertFalse(find_route(self,
462                                     self.pg0._remote_hosts[2].ip6,
463                                     128))
464
465         #
466         # send an NS from a link local address to the interface's global
467         # address
468         #
469         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
470              IPv6(
471                  dst=d, src=self.pg0._remote_hosts[2].ip6_ll) /
472              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
473              ICMPv6NDOptSrcLLAddr(
474                  lladdr=self.pg0.remote_mac))
475
476         self.send_and_expect_na(self.pg0, p,
477                                 "NS from link-local",
478                                 dst_ip=self.pg0._remote_hosts[2].ip6_ll,
479                                 tgt_ip=self.pg0.local_ip6)
480
481         #
482         # we should have learned an ND entry for the peer's link-local
483         # but not inserted a route to it in the FIB
484         #
485         self.assertTrue(find_nbr(self,
486                                  self.pg0.sw_if_index,
487                                  self.pg0._remote_hosts[2].ip6_ll))
488         self.assertFalse(find_route(self,
489                                     self.pg0._remote_hosts[2].ip6_ll,
490                                     128))
491
492         #
493         # An NS to the router's own Link-local
494         #
495         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
496              IPv6(
497                  dst=d, src=self.pg0._remote_hosts[3].ip6_ll) /
498              ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll) /
499              ICMPv6NDOptSrcLLAddr(
500                  lladdr=self.pg0.remote_mac))
501
502         self.send_and_expect_na(self.pg0, p,
503                                 "NS to/from link-local",
504                                 dst_ip=self.pg0._remote_hosts[3].ip6_ll,
505                                 tgt_ip=self.pg0.local_ip6_ll)
506
507         #
508         # we should have learned an ND entry for the peer's link-local
509         # but not inserted a route to it in the FIB
510         #
511         self.assertTrue(find_nbr(self,
512                                  self.pg0.sw_if_index,
513                                  self.pg0._remote_hosts[3].ip6_ll))
514         self.assertFalse(find_route(self,
515                                     self.pg0._remote_hosts[3].ip6_ll,
516                                     128))
517
518     def test_ns_duplicates(self):
519         """ ND Duplicates"""
520
521         #
522         # Generate some hosts on the LAN
523         #
524         self.pg1.generate_remote_hosts(3)
525
526         #
527         # Add host 1 on pg1 and pg2
528         #
529         ns_pg1 = VppNeighbor(self,
530                              self.pg1.sw_if_index,
531                              self.pg1.remote_hosts[1].mac,
532                              self.pg1.remote_hosts[1].ip6)
533         ns_pg1.add_vpp_config()
534         ns_pg2 = VppNeighbor(self,
535                              self.pg2.sw_if_index,
536                              self.pg2.remote_mac,
537                              self.pg1.remote_hosts[1].ip6)
538         ns_pg2.add_vpp_config()
539
540         #
541         # IP packet destined for pg1 remote host arrives on pg1 again.
542         #
543         p = (Ether(dst=self.pg0.local_mac,
544                    src=self.pg0.remote_mac) /
545              IPv6(src=self.pg0.remote_ip6,
546                   dst=self.pg1.remote_hosts[1].ip6) /
547              inet6.UDP(sport=1234, dport=1234) /
548              Raw())
549
550         self.pg0.add_stream(p)
551         self.pg_enable_capture(self.pg_interfaces)
552         self.pg_start()
553
554         rx1 = self.pg1.get_capture(1)
555
556         self.verify_ip(rx1[0],
557                        self.pg1.local_mac,
558                        self.pg1.remote_hosts[1].mac,
559                        self.pg0.remote_ip6,
560                        self.pg1.remote_hosts[1].ip6)
561
562         #
563         # remove the duplicate on pg1
564         # packet stream should generate NSs out of pg1
565         #
566         ns_pg1.remove_vpp_config()
567
568         self.send_and_expect_ns(self.pg0, self.pg1,
569                                 p, self.pg1.remote_hosts[1].ip6)
570
571         #
572         # Add it back
573         #
574         ns_pg1.add_vpp_config()
575
576         self.pg0.add_stream(p)
577         self.pg_enable_capture(self.pg_interfaces)
578         self.pg_start()
579
580         rx1 = self.pg1.get_capture(1)
581
582         self.verify_ip(rx1[0],
583                        self.pg1.local_mac,
584                        self.pg1.remote_hosts[1].mac,
585                        self.pg0.remote_ip6,
586                        self.pg1.remote_hosts[1].ip6)
587
588     def validate_ra(self, intf, rx, dst_ip=None, src_ip=None,
589                     mtu=9000, pi_opt=None):
590         if not dst_ip:
591             dst_ip = intf.remote_ip6
592         if not src_ip:
593             src_ip = mk_ll_addr(intf.local_mac)
594
595         # unicasted packets must come to the unicast mac
596         self.assertEqual(rx[Ether].dst, intf.remote_mac)
597
598         # and from the router's MAC
599         self.assertEqual(rx[Ether].src, intf.local_mac)
600
601         # the rx'd RA should be addressed to the sender's source
602         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
603         self.assertEqual(in6_ptop(rx[IPv6].dst),
604                          in6_ptop(dst_ip))
605
606         # and come from the router's link local
607         self.assertTrue(in6_islladdr(rx[IPv6].src))
608         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(src_ip))
609
610         # it should contain the links MTU
611         ra = rx[ICMPv6ND_RA]
612         self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
613
614         # it should contain the source's link layer address option
615         sll = ra[ICMPv6NDOptSrcLLAddr]
616         self.assertEqual(sll.lladdr, intf.local_mac)
617
618         if not pi_opt:
619             # the RA should not contain prefix information
620             self.assertFalse(ra.haslayer(
621                 ICMPv6NDOptPrefixInfo))
622         else:
623             raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
624
625             # the options are nested in the scapy packet in way that i cannot
626             # decipher how to decode. this 1st layer of option always returns
627             # nested classes, so a direct obj1=obj2 comparison always fails.
628             # however, the getlayer(.., 2) does give one instance.
629             # so we cheat here and construct a new opt instance for comparison
630             rd = ICMPv6NDOptPrefixInfo(
631                 prefixlen=raos.prefixlen,
632                 prefix=raos.prefix,
633                 L=raos.L,
634                 A=raos.A)
635             if type(pi_opt) is list:
636                 for ii in range(len(pi_opt)):
637                     self.assertEqual(pi_opt[ii], rd)
638                     rd = rx.getlayer(
639                         ICMPv6NDOptPrefixInfo, ii + 2)
640             else:
641                 self.assertEqual(pi_opt, raos, 'Expected: %s, received: %s'
642                                  % (pi_opt.show(dump=True),
643                                     raos.show(dump=True)))
644
645     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
646                            filter_out_fn=is_ipv6_misc,
647                            opt=None,
648                            src_ip=None):
649         self.vapi.cli("clear trace")
650         intf.add_stream(pkts)
651         self.pg_enable_capture(self.pg_interfaces)
652         self.pg_start()
653         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
654
655         self.assertEqual(len(rx), 1)
656         rx = rx[0]
657         self.validate_ra(intf, rx, dst_ip, src_ip=src_ip, pi_opt=opt)
658
659     def test_rs(self):
660         """ IPv6 Router Solicitation Exceptions
661
662         Test scenario:
663         """
664
665         #
666         # Before we begin change the IPv6 RA responses to use the unicast
667         # address - that way we will not confuse them with the periodic
668         # RAs which go to the mcast address
669         # Sit and wait for the first periodic RA.
670         #
671         # TODO
672         #
673         self.pg0.ip6_ra_config(send_unicast=1)
674
675         #
676         # An RS from a link source address
677         #  - expect an RA in return
678         #
679         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
680              IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
681              ICMPv6ND_RS())
682         pkts = [p]
683         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
684
685         #
686         # For the next RS sent the RA should be rate limited
687         #
688         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
689
690         #
691         # When we reconfigure the IPv6 RA config,
692         # we reset the RA rate limiting,
693         # so we need to do this before each test below so as not to drop
694         # packets for rate limiting reasons. Test this works here.
695         #
696         self.pg0.ip6_ra_config(send_unicast=1)
697         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
698
699         #
700         # An RS sent from a non-link local source
701         #
702         self.pg0.ip6_ra_config(send_unicast=1)
703         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
704              IPv6(dst=self.pg0.local_ip6,
705                   src="2002::ffff") /
706              ICMPv6ND_RS())
707         pkts = [p]
708         self.send_and_assert_no_replies(self.pg0, pkts,
709                                         "RS from non-link source")
710
711         #
712         # Source an RS from a link local address
713         #
714         self.pg0.ip6_ra_config(send_unicast=1)
715         ll = mk_ll_addr(self.pg0.remote_mac)
716         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
717              IPv6(dst=self.pg0.local_ip6, src=ll) /
718              ICMPv6ND_RS())
719         pkts = [p]
720         self.send_and_expect_ra(self.pg0, pkts,
721                                 "RS sourced from link-local",
722                                 dst_ip=ll)
723
724         #
725         # Send the RS multicast
726         #
727         self.pg0.ip6_ra_config(send_unicast=1)
728         dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
729         ll = mk_ll_addr(self.pg0.remote_mac)
730         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
731              IPv6(dst="ff02::2", src=ll) /
732              ICMPv6ND_RS())
733         pkts = [p]
734         self.send_and_expect_ra(self.pg0, pkts,
735                                 "RS sourced from link-local",
736                                 dst_ip=ll)
737
738         #
739         # Source from the unspecified address ::. This happens when the RS
740         # is sent before the host has a configured address/sub-net,
741         # i.e. auto-config. Since the sender has no IP address, the reply
742         # comes back mcast - so the capture needs to not filter this.
743         # If we happen to pick up the periodic RA at this point then so be it,
744         # it's not an error.
745         #
746         self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
747         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
748              IPv6(dst="ff02::2", src="::") /
749              ICMPv6ND_RS())
750         pkts = [p]
751         self.send_and_expect_ra(self.pg0, pkts,
752                                 "RS sourced from unspecified",
753                                 dst_ip="ff02::1",
754                                 filter_out_fn=None)
755
756         #
757         # Configure The RA to announce the links prefix
758         #
759         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
760                                self.pg0.local_ip6_prefix_len))
761
762         #
763         # RAs should now contain the prefix information option
764         #
765         opt = ICMPv6NDOptPrefixInfo(
766             prefixlen=self.pg0.local_ip6_prefix_len,
767             prefix=self.pg0.local_ip6,
768             L=1,
769             A=1)
770
771         self.pg0.ip6_ra_config(send_unicast=1)
772         ll = mk_ll_addr(self.pg0.remote_mac)
773         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
774              IPv6(dst=self.pg0.local_ip6, src=ll) /
775              ICMPv6ND_RS())
776         self.send_and_expect_ra(self.pg0, p,
777                                 "RA with prefix-info",
778                                 dst_ip=ll,
779                                 opt=opt)
780
781         #
782         # Change the prefix info to not off-link
783         #  L-flag is clear
784         #
785         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
786                                self.pg0.local_ip6_prefix_len),
787                                off_link=1)
788
789         opt = ICMPv6NDOptPrefixInfo(
790             prefixlen=self.pg0.local_ip6_prefix_len,
791             prefix=self.pg0.local_ip6,
792             L=0,
793             A=1)
794
795         self.pg0.ip6_ra_config(send_unicast=1)
796         self.send_and_expect_ra(self.pg0, p,
797                                 "RA with Prefix info with L-flag=0",
798                                 dst_ip=ll,
799                                 opt=opt)
800
801         #
802         # Change the prefix info to not off-link, no-autoconfig
803         #  L and A flag are clear in the advert
804         #
805         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
806                                self.pg0.local_ip6_prefix_len),
807                                off_link=1,
808                                no_autoconfig=1)
809
810         opt = ICMPv6NDOptPrefixInfo(
811             prefixlen=self.pg0.local_ip6_prefix_len,
812             prefix=self.pg0.local_ip6,
813             L=0,
814             A=0)
815
816         self.pg0.ip6_ra_config(send_unicast=1)
817         self.send_and_expect_ra(self.pg0, p,
818                                 "RA with Prefix info with A & L-flag=0",
819                                 dst_ip=ll,
820                                 opt=opt)
821
822         #
823         # Change the flag settings back to the defaults
824         #  L and A flag are set in the advert
825         #
826         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
827                                self.pg0.local_ip6_prefix_len))
828
829         opt = ICMPv6NDOptPrefixInfo(
830             prefixlen=self.pg0.local_ip6_prefix_len,
831             prefix=self.pg0.local_ip6,
832             L=1,
833             A=1)
834
835         self.pg0.ip6_ra_config(send_unicast=1)
836         self.send_and_expect_ra(self.pg0, p,
837                                 "RA with Prefix info",
838                                 dst_ip=ll,
839                                 opt=opt)
840
841         #
842         # Change the prefix info to not off-link, no-autoconfig
843         #  L and A flag are clear in the advert
844         #
845         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
846                                self.pg0.local_ip6_prefix_len),
847                                off_link=1,
848                                no_autoconfig=1)
849
850         opt = ICMPv6NDOptPrefixInfo(
851             prefixlen=self.pg0.local_ip6_prefix_len,
852             prefix=self.pg0.local_ip6,
853             L=0,
854             A=0)
855
856         self.pg0.ip6_ra_config(send_unicast=1)
857         self.send_and_expect_ra(self.pg0, p,
858                                 "RA with Prefix info with A & L-flag=0",
859                                 dst_ip=ll,
860                                 opt=opt)
861
862         #
863         # Use the reset to defaults option to revert to defaults
864         #  L and A flag are clear in the advert
865         #
866         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
867                                self.pg0.local_ip6_prefix_len),
868                                use_default=1)
869
870         opt = ICMPv6NDOptPrefixInfo(
871             prefixlen=self.pg0.local_ip6_prefix_len,
872             prefix=self.pg0.local_ip6,
873             L=1,
874             A=1)
875
876         self.pg0.ip6_ra_config(send_unicast=1)
877         self.send_and_expect_ra(self.pg0, p,
878                                 "RA with Prefix reverted to defaults",
879                                 dst_ip=ll,
880                                 opt=opt)
881
882         #
883         # Advertise Another prefix. With no L-flag/A-flag
884         #
885         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
886                                self.pg1.local_ip6_prefix_len),
887                                off_link=1,
888                                no_autoconfig=1)
889
890         opt = [ICMPv6NDOptPrefixInfo(
891             prefixlen=self.pg0.local_ip6_prefix_len,
892             prefix=self.pg0.local_ip6,
893             L=1,
894             A=1),
895             ICMPv6NDOptPrefixInfo(
896                 prefixlen=self.pg1.local_ip6_prefix_len,
897                 prefix=self.pg1.local_ip6,
898                 L=0,
899                 A=0)]
900
901         self.pg0.ip6_ra_config(send_unicast=1)
902         ll = mk_ll_addr(self.pg0.remote_mac)
903         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
904              IPv6(dst=self.pg0.local_ip6, src=ll) /
905              ICMPv6ND_RS())
906         self.send_and_expect_ra(self.pg0, p,
907                                 "RA with multiple Prefix infos",
908                                 dst_ip=ll,
909                                 opt=opt)
910
911         #
912         # Remove the first prefix-info - expect the second is still in the
913         # advert
914         #
915         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
916                                self.pg0.local_ip6_prefix_len),
917                                is_no=1)
918
919         opt = ICMPv6NDOptPrefixInfo(
920             prefixlen=self.pg1.local_ip6_prefix_len,
921             prefix=self.pg1.local_ip6,
922             L=0,
923             A=0)
924
925         self.pg0.ip6_ra_config(send_unicast=1)
926         self.send_and_expect_ra(self.pg0, p,
927                                 "RA with Prefix reverted to defaults",
928                                 dst_ip=ll,
929                                 opt=opt)
930
931         #
932         # Remove the second prefix-info - expect no prefix-info in the adverts
933         #
934         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
935                                self.pg1.local_ip6_prefix_len),
936                                is_no=1)
937
938         #
939         # change the link's link local, so we know that works too.
940         #
941         self.vapi.sw_interface_ip6_set_link_local_address(
942             sw_if_index=self.pg0.sw_if_index,
943             ip="fe80::88")
944
945         self.pg0.ip6_ra_config(send_unicast=1)
946         self.send_and_expect_ra(self.pg0, p,
947                                 "RA with Prefix reverted to defaults",
948                                 dst_ip=ll,
949                                 src_ip="fe80::88")
950
951         #
952         # Reset the periodic advertisements back to default values
953         #
954         self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
955
956     def test_mld(self):
957         """ MLD Report """
958         #
959         # test one MLD is sent after applying an IPv6 Address on an interface
960         #
961         self.pg_enable_capture(self.pg_interfaces)
962         self.pg_start()
963
964         subitf = VppDot1QSubint(self, self.pg1, 99)
965
966         subitf.admin_up()
967         subitf.config_ip6()
968
969         rxs = self.pg1._get_capture(timeout=4, filter_out_fn=None)
970
971         #
972         # hunt for the MLD on vlan 99
973         #
974         for rx in rxs:
975             # make sure ipv6 packets with hop by hop options have
976             # correct checksums
977             self.assert_packet_checksums_valid(rx)
978             if rx.haslayer(IPv6ExtHdrHopByHop) and \
979                rx.haslayer(Dot1Q) and \
980                rx[Dot1Q].vlan == 99:
981                 mld = rx[ICMPv6MLReport2]
982
983         self.assertEqual(mld.records_number, 4)
984
985
986 class TestIPv6RouteLookup(VppTestCase):
987     """ IPv6 Route Lookup Test Case """
988     routes = []
989
990     def route_lookup(self, prefix, exact):
991         return self.vapi.api(self.vapi.papi.ip_route_lookup,
992                              {
993                                  'table_id': 0,
994                                  'exact': exact,
995                                  'prefix': prefix,
996                              })
997
998     @classmethod
999     def setUpClass(cls):
1000         super(TestIPv6RouteLookup, cls).setUpClass()
1001
1002     @classmethod
1003     def tearDownClass(cls):
1004         super(TestIPv6RouteLookup, cls).tearDownClass()
1005
1006     def setUp(self):
1007         super(TestIPv6RouteLookup, self).setUp()
1008
1009         drop_nh = VppRoutePath("::1", 0xffffffff,
1010                                type=FibPathType.FIB_PATH_TYPE_DROP)
1011
1012         # Add 3 routes
1013         r = VppIpRoute(self, "2001:1111::", 32, [drop_nh])
1014         r.add_vpp_config()
1015         self.routes.append(r)
1016
1017         r = VppIpRoute(self, "2001:1111:2222::", 48, [drop_nh])
1018         r.add_vpp_config()
1019         self.routes.append(r)
1020
1021         r = VppIpRoute(self, "2001:1111:2222::1", 128, [drop_nh])
1022         r.add_vpp_config()
1023         self.routes.append(r)
1024
1025     def tearDown(self):
1026         # Remove the routes we added
1027         for r in self.routes:
1028             r.remove_vpp_config()
1029
1030         super(TestIPv6RouteLookup, self).tearDown()
1031
1032     def test_exact_match(self):
1033         # Verify we find the host route
1034         prefix = "2001:1111:2222::1/128"
1035         result = self.route_lookup(prefix, True)
1036         assert (prefix == str(result.route.prefix))
1037
1038         # Verify we find a middle prefix route
1039         prefix = "2001:1111:2222::/48"
1040         result = self.route_lookup(prefix, True)
1041         assert (prefix == str(result.route.prefix))
1042
1043         # Verify we do not find an available LPM.
1044         with self.vapi.assert_negative_api_retval():
1045             self.route_lookup("2001::2/128", True)
1046
1047     def test_longest_prefix_match(self):
1048         # verify we find lpm
1049         lpm_prefix = "2001:1111:2222::/48"
1050         result = self.route_lookup("2001:1111:2222::2/128", False)
1051         assert (lpm_prefix == str(result.route.prefix))
1052
1053         # Verify we find the exact when not requested
1054         result = self.route_lookup(lpm_prefix, False)
1055         assert (lpm_prefix == str(result.route.prefix))
1056
1057         # Can't seem to delete the default route so no negative LPM test.
1058
1059
1060 class TestIPv6IfAddrRoute(VppTestCase):
1061     """ IPv6 Interface Addr Route Test Case """
1062
1063     @classmethod
1064     def setUpClass(cls):
1065         super(TestIPv6IfAddrRoute, cls).setUpClass()
1066
1067     @classmethod
1068     def tearDownClass(cls):
1069         super(TestIPv6IfAddrRoute, cls).tearDownClass()
1070
1071     def setUp(self):
1072         super(TestIPv6IfAddrRoute, self).setUp()
1073
1074         # create 1 pg interface
1075         self.create_pg_interfaces(range(1))
1076
1077         for i in self.pg_interfaces:
1078             i.admin_up()
1079             i.config_ip6()
1080             i.resolve_ndp()
1081
1082     def tearDown(self):
1083         super(TestIPv6IfAddrRoute, self).tearDown()
1084         for i in self.pg_interfaces:
1085             i.unconfig_ip6()
1086             i.admin_down()
1087
1088     def test_ipv6_ifaddrs_same_prefix(self):
1089         """ IPv6 Interface Addresses Same Prefix test
1090
1091         Test scenario:
1092
1093             - Verify no route in FIB for prefix 2001:10::/64
1094             - Configure IPv4 address 2001:10::10/64  on an interface
1095             - Verify route in FIB for prefix 2001:10::/64
1096             - Configure IPv4 address 2001:10::20/64 on an interface
1097             - Delete 2001:10::10/64 from interface
1098             - Verify route in FIB for prefix 2001:10::/64
1099             - Delete 2001:10::20/64 from interface
1100             - Verify no route in FIB for prefix 2001:10::/64
1101         """
1102
1103         addr1 = "2001:10::10"
1104         addr2 = "2001:10::20"
1105
1106         if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
1107         if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
1108         self.assertFalse(if_addr1.query_vpp_config())
1109         self.assertFalse(find_route(self, addr1, 128))
1110         self.assertFalse(find_route(self, addr2, 128))
1111
1112         # configure first address, verify route present
1113         if_addr1.add_vpp_config()
1114         self.assertTrue(if_addr1.query_vpp_config())
1115         self.assertTrue(find_route(self, addr1, 128))
1116         self.assertFalse(find_route(self, addr2, 128))
1117
1118         # configure second address, delete first, verify route not removed
1119         if_addr2.add_vpp_config()
1120         if_addr1.remove_vpp_config()
1121         self.assertFalse(if_addr1.query_vpp_config())
1122         self.assertTrue(if_addr2.query_vpp_config())
1123         self.assertFalse(find_route(self, addr1, 128))
1124         self.assertTrue(find_route(self, addr2, 128))
1125
1126         # delete second address, verify route removed
1127         if_addr2.remove_vpp_config()
1128         self.assertFalse(if_addr1.query_vpp_config())
1129         self.assertFalse(find_route(self, addr1, 128))
1130         self.assertFalse(find_route(self, addr2, 128))
1131
1132     def test_ipv6_ifaddr_del(self):
1133         """ Delete an interface address that does not exist """
1134
1135         loopbacks = self.create_loopback_interfaces(1)
1136         lo = self.lo_interfaces[0]
1137
1138         lo.config_ip6()
1139         lo.admin_up()
1140
1141         #
1142         # try and remove pg0's subnet from lo
1143         #
1144         with self.vapi.assert_negative_api_retval():
1145             self.vapi.sw_interface_add_del_address(
1146                 sw_if_index=lo.sw_if_index,
1147                 prefix=self.pg0.local_ip6_prefix,
1148                 is_add=0)
1149
1150
1151 class TestICMPv6Echo(VppTestCase):
1152     """ ICMPv6 Echo Test Case """
1153
1154     @classmethod
1155     def setUpClass(cls):
1156         super(TestICMPv6Echo, cls).setUpClass()
1157
1158     @classmethod
1159     def tearDownClass(cls):
1160         super(TestICMPv6Echo, cls).tearDownClass()
1161
1162     def setUp(self):
1163         super(TestICMPv6Echo, self).setUp()
1164
1165         # create 1 pg interface
1166         self.create_pg_interfaces(range(1))
1167
1168         for i in self.pg_interfaces:
1169             i.admin_up()
1170             i.config_ip6()
1171             i.resolve_ndp()
1172
1173     def tearDown(self):
1174         super(TestICMPv6Echo, self).tearDown()
1175         for i in self.pg_interfaces:
1176             i.unconfig_ip6()
1177             i.admin_down()
1178
1179     def test_icmpv6_echo(self):
1180         """ VPP replies to ICMPv6 Echo Request
1181
1182         Test scenario:
1183
1184             - Receive ICMPv6 Echo Request message on pg0 interface.
1185             - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
1186         """
1187
1188         icmpv6_id = 0xb
1189         icmpv6_seq = 5
1190         icmpv6_data = b'\x0a' * 18
1191         p_echo_request = (Ether(src=self.pg0.remote_mac,
1192                                 dst=self.pg0.local_mac) /
1193                           IPv6(src=self.pg0.remote_ip6,
1194                                dst=self.pg0.local_ip6) /
1195                           ICMPv6EchoRequest(
1196                               id=icmpv6_id,
1197                               seq=icmpv6_seq,
1198                               data=icmpv6_data))
1199
1200         self.pg0.add_stream(p_echo_request)
1201         self.pg_enable_capture(self.pg_interfaces)
1202         self.pg_start()
1203
1204         rx = self.pg0.get_capture(1)
1205         rx = rx[0]
1206         ether = rx[Ether]
1207         ipv6 = rx[IPv6]
1208         icmpv6 = rx[ICMPv6EchoReply]
1209
1210         self.assertEqual(ether.src, self.pg0.local_mac)
1211         self.assertEqual(ether.dst, self.pg0.remote_mac)
1212
1213         self.assertEqual(ipv6.src, self.pg0.local_ip6)
1214         self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
1215
1216         self.assertEqual(
1217             icmp6types[icmpv6.type], "Echo Reply")
1218         self.assertEqual(icmpv6.id, icmpv6_id)
1219         self.assertEqual(icmpv6.seq, icmpv6_seq)
1220         self.assertEqual(icmpv6.data, icmpv6_data)
1221
1222
1223 class TestIPv6RD(TestIPv6ND):
1224     """ IPv6 Router Discovery Test Case """
1225
1226     @classmethod
1227     def setUpClass(cls):
1228         super(TestIPv6RD, cls).setUpClass()
1229
1230     @classmethod
1231     def tearDownClass(cls):
1232         super(TestIPv6RD, cls).tearDownClass()
1233
1234     def setUp(self):
1235         super(TestIPv6RD, self).setUp()
1236
1237         # create 2 pg interfaces
1238         self.create_pg_interfaces(range(2))
1239
1240         self.interfaces = list(self.pg_interfaces)
1241
1242         # setup all interfaces
1243         for i in self.interfaces:
1244             i.admin_up()
1245             i.config_ip6()
1246
1247     def tearDown(self):
1248         for i in self.interfaces:
1249             i.unconfig_ip6()
1250             i.admin_down()
1251         super(TestIPv6RD, self).tearDown()
1252
1253     def test_rd_send_router_solicitation(self):
1254         """ Verify router solicitation packets """
1255
1256         count = 2
1257         self.pg_enable_capture(self.pg_interfaces)
1258         self.pg_start()
1259         self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index,
1260                                                  mrc=count)
1261         rx_list = self.pg1.get_capture(count, timeout=3)
1262         self.assertEqual(len(rx_list), count)
1263         for packet in rx_list:
1264             self.assertEqual(packet.haslayer(IPv6), 1)
1265             self.assertEqual(packet[IPv6].haslayer(
1266                 ICMPv6ND_RS), 1)
1267             dst = ip6_normalize(packet[IPv6].dst)
1268             dst2 = ip6_normalize("ff02::2")
1269             self.assert_equal(dst, dst2)
1270             src = ip6_normalize(packet[IPv6].src)
1271             src2 = ip6_normalize(self.pg1.local_ip6_ll)
1272             self.assert_equal(src, src2)
1273             self.assertTrue(
1274                 bool(packet[ICMPv6ND_RS].haslayer(
1275                     ICMPv6NDOptSrcLLAddr)))
1276             self.assert_equal(
1277                 packet[ICMPv6NDOptSrcLLAddr].lladdr,
1278                 self.pg1.local_mac)
1279
1280     def verify_prefix_info(self, reported_prefix, prefix_option):
1281         prefix = IPv6Network(
1282             text_type(prefix_option.getfieldval("prefix") +
1283                       "/" +
1284                       text_type(prefix_option.getfieldval("prefixlen"))),
1285             strict=False)
1286         self.assert_equal(reported_prefix.prefix.network_address,
1287                           prefix.network_address)
1288         L = prefix_option.getfieldval("L")
1289         A = prefix_option.getfieldval("A")
1290         option_flags = (L << 7) | (A << 6)
1291         self.assert_equal(reported_prefix.flags, option_flags)
1292         self.assert_equal(reported_prefix.valid_time,
1293                           prefix_option.getfieldval("validlifetime"))
1294         self.assert_equal(reported_prefix.preferred_time,
1295                           prefix_option.getfieldval("preferredlifetime"))
1296
1297     def test_rd_receive_router_advertisement(self):
1298         """ Verify events triggered by received RA packets """
1299
1300         self.vapi.want_ip6_ra_events(enable=1)
1301
1302         prefix_info_1 = ICMPv6NDOptPrefixInfo(
1303             prefix="1::2",
1304             prefixlen=50,
1305             validlifetime=200,
1306             preferredlifetime=500,
1307             L=1,
1308             A=1,
1309         )
1310
1311         prefix_info_2 = ICMPv6NDOptPrefixInfo(
1312             prefix="7::4",
1313             prefixlen=20,
1314             validlifetime=70,
1315             preferredlifetime=1000,
1316             L=1,
1317             A=0,
1318         )
1319
1320         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1321              IPv6(dst=self.pg1.local_ip6_ll,
1322                   src=mk_ll_addr(self.pg1.remote_mac)) /
1323              ICMPv6ND_RA() /
1324              prefix_info_1 /
1325              prefix_info_2)
1326         self.pg1.add_stream([p])
1327         self.pg_start()
1328
1329         ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1330
1331         self.assert_equal(ev.current_hop_limit, 0)
1332         self.assert_equal(ev.flags, 8)
1333         self.assert_equal(ev.router_lifetime_in_sec, 1800)
1334         self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1335         self.assert_equal(
1336             ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0)
1337
1338         self.assert_equal(ev.n_prefixes, 2)
1339
1340         self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1341         self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1342
1343
1344 class TestIPv6RDControlPlane(TestIPv6ND):
1345     """ IPv6 Router Discovery Control Plane Test Case """
1346
1347     @classmethod
1348     def setUpClass(cls):
1349         super(TestIPv6RDControlPlane, cls).setUpClass()
1350
1351     @classmethod
1352     def tearDownClass(cls):
1353         super(TestIPv6RDControlPlane, cls).tearDownClass()
1354
1355     def setUp(self):
1356         super(TestIPv6RDControlPlane, self).setUp()
1357
1358         # create 1 pg interface
1359         self.create_pg_interfaces(range(1))
1360
1361         self.interfaces = list(self.pg_interfaces)
1362
1363         # setup all interfaces
1364         for i in self.interfaces:
1365             i.admin_up()
1366             i.config_ip6()
1367
1368     def tearDown(self):
1369         super(TestIPv6RDControlPlane, self).tearDown()
1370
1371     @staticmethod
1372     def create_ra_packet(pg, routerlifetime=None):
1373         src_ip = pg.remote_ip6_ll
1374         dst_ip = pg.local_ip6
1375         if routerlifetime is not None:
1376             ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1377         else:
1378             ra = ICMPv6ND_RA()
1379         p = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
1380              IPv6(dst=dst_ip, src=src_ip) / ra)
1381         return p
1382
1383     @staticmethod
1384     def get_default_routes(fib):
1385         list = []
1386         for entry in fib:
1387             if entry.route.prefix.prefixlen == 0:
1388                 for path in entry.route.paths:
1389                     if path.sw_if_index != 0xFFFFFFFF:
1390                         defaut_route = {}
1391                         defaut_route['sw_if_index'] = path.sw_if_index
1392                         defaut_route['next_hop'] = path.nh.address.ip6
1393                         list.append(defaut_route)
1394         return list
1395
1396     @staticmethod
1397     def get_interface_addresses(fib, pg):
1398         list = []
1399         for entry in fib:
1400             if entry.route.prefix.prefixlen == 128:
1401                 path = entry.route.paths[0]
1402                 if path.sw_if_index == pg.sw_if_index:
1403                     list.append(str(entry.route.prefix.network_address))
1404         return list
1405
1406     def wait_for_no_default_route(self, n_tries=50, s_time=1):
1407         while (n_tries):
1408             fib = self.vapi.ip_route_dump(0, True)
1409             default_routes = self.get_default_routes(fib)
1410             if 0 == len(default_routes):
1411                 return True
1412             n_tries = n_tries - 1
1413             self.sleep(s_time)
1414
1415         return False
1416
1417     def test_all(self):
1418         """ Test handling of SLAAC addresses and default routes """
1419
1420         fib = self.vapi.ip_route_dump(0, True)
1421         default_routes = self.get_default_routes(fib)
1422         initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1423         self.assertEqual(default_routes, [])
1424         router_address = IPv6Address(text_type(self.pg0.remote_ip6_ll))
1425
1426         self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1427
1428         self.sleep(0.1)
1429
1430         # send RA
1431         packet = (self.create_ra_packet(
1432             self.pg0) / ICMPv6NDOptPrefixInfo(
1433             prefix="1::",
1434             prefixlen=64,
1435             validlifetime=2,
1436             preferredlifetime=2,
1437             L=1,
1438             A=1,
1439         ) / ICMPv6NDOptPrefixInfo(
1440             prefix="7::",
1441             prefixlen=20,
1442             validlifetime=1500,
1443             preferredlifetime=1000,
1444             L=1,
1445             A=0,
1446         ))
1447         self.pg0.add_stream([packet])
1448         self.pg_start()
1449
1450         self.sleep_on_vpp_time(0.1)
1451
1452         fib = self.vapi.ip_route_dump(0, True)
1453
1454         # check FIB for new address
1455         addresses = set(self.get_interface_addresses(fib, self.pg0))
1456         new_addresses = addresses.difference(initial_addresses)
1457         self.assertEqual(len(new_addresses), 1)
1458         prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
1459                              strict=False)
1460         self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
1461
1462         # check FIB for new default route
1463         default_routes = self.get_default_routes(fib)
1464         self.assertEqual(len(default_routes), 1)
1465         dr = default_routes[0]
1466         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1467         self.assertEqual(dr['next_hop'], router_address)
1468
1469         # send RA to delete default route
1470         packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1471         self.pg0.add_stream([packet])
1472         self.pg_start()
1473
1474         self.sleep_on_vpp_time(0.1)
1475
1476         # check that default route is deleted
1477         fib = self.vapi.ip_route_dump(0, True)
1478         default_routes = self.get_default_routes(fib)
1479         self.assertEqual(len(default_routes), 0)
1480
1481         self.sleep_on_vpp_time(0.1)
1482
1483         # send RA
1484         packet = self.create_ra_packet(self.pg0)
1485         self.pg0.add_stream([packet])
1486         self.pg_start()
1487
1488         self.sleep_on_vpp_time(0.1)
1489
1490         # check FIB for new default route
1491         fib = self.vapi.ip_route_dump(0, True)
1492         default_routes = self.get_default_routes(fib)
1493         self.assertEqual(len(default_routes), 1)
1494         dr = default_routes[0]
1495         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1496         self.assertEqual(dr['next_hop'], router_address)
1497
1498         # send RA, updating router lifetime to 1s
1499         packet = self.create_ra_packet(self.pg0, 1)
1500         self.pg0.add_stream([packet])
1501         self.pg_start()
1502
1503         self.sleep_on_vpp_time(0.1)
1504
1505         # check that default route still exists
1506         fib = self.vapi.ip_route_dump(0, True)
1507         default_routes = self.get_default_routes(fib)
1508         self.assertEqual(len(default_routes), 1)
1509         dr = default_routes[0]
1510         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1511         self.assertEqual(dr['next_hop'], router_address)
1512
1513         self.sleep_on_vpp_time(1)
1514
1515         # check that default route is deleted
1516         self.assertTrue(self.wait_for_no_default_route())
1517
1518         # check FIB still contains the SLAAC address
1519         addresses = set(self.get_interface_addresses(fib, self.pg0))
1520         new_addresses = addresses.difference(initial_addresses)
1521
1522         self.assertEqual(len(new_addresses), 1)
1523         prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
1524                              strict=False)
1525         self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
1526
1527         self.sleep_on_vpp_time(1)
1528
1529         # check that SLAAC address is deleted
1530         fib = self.vapi.ip_route_dump(0, True)
1531         addresses = set(self.get_interface_addresses(fib, self.pg0))
1532         new_addresses = addresses.difference(initial_addresses)
1533         self.assertEqual(len(new_addresses), 0)
1534
1535
1536 class IPv6NDProxyTest(TestIPv6ND):
1537     """ IPv6 ND ProxyTest Case """
1538
1539     @classmethod
1540     def setUpClass(cls):
1541         super(IPv6NDProxyTest, cls).setUpClass()
1542
1543     @classmethod
1544     def tearDownClass(cls):
1545         super(IPv6NDProxyTest, cls).tearDownClass()
1546
1547     def setUp(self):
1548         super(IPv6NDProxyTest, self).setUp()
1549
1550         # create 3 pg interfaces
1551         self.create_pg_interfaces(range(3))
1552
1553         # pg0 is the master interface, with the configured subnet
1554         self.pg0.admin_up()
1555         self.pg0.config_ip6()
1556         self.pg0.resolve_ndp()
1557
1558         self.pg1.ip6_enable()
1559         self.pg2.ip6_enable()
1560
1561     def tearDown(self):
1562         super(IPv6NDProxyTest, self).tearDown()
1563
1564     def test_nd_proxy(self):
1565         """ IPv6 Proxy ND """
1566
1567         #
1568         # Generate some hosts in the subnet that we are proxying
1569         #
1570         self.pg0.generate_remote_hosts(8)
1571
1572         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1573         d = inet_ntop(AF_INET6, nsma)
1574
1575         #
1576         # Send an NS for one of those remote hosts on one of the proxy links
1577         # expect no response since it's from an address that is not
1578         # on the link that has the prefix configured
1579         #
1580         ns_pg1 = (Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac) /
1581                   IPv6(dst=d,
1582                        src=self.pg0._remote_hosts[2].ip6) /
1583                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1584                   ICMPv6NDOptSrcLLAddr(
1585                       lladdr=self.pg0._remote_hosts[2].mac))
1586
1587         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1588
1589         #
1590         # Add proxy support for the host
1591         #
1592         self.vapi.ip6nd_proxy_add_del(
1593             is_add=1, ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1594             sw_if_index=self.pg1.sw_if_index)
1595
1596         #
1597         # try that NS again. this time we expect an NA back
1598         #
1599         self.send_and_expect_na(self.pg1, ns_pg1,
1600                                 "NS to proxy entry",
1601                                 dst_ip=self.pg0._remote_hosts[2].ip6,
1602                                 tgt_ip=self.pg0.local_ip6)
1603
1604         #
1605         # ... and that we have an entry in the ND cache
1606         #
1607         self.assertTrue(find_nbr(self,
1608                                  self.pg1.sw_if_index,
1609                                  self.pg0._remote_hosts[2].ip6))
1610
1611         #
1612         # ... and we can route traffic to it
1613         #
1614         t = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1615              IPv6(dst=self.pg0._remote_hosts[2].ip6,
1616                   src=self.pg0.remote_ip6) /
1617              inet6.UDP(sport=10000, dport=20000) /
1618              Raw(b'\xa5' * 100))
1619
1620         self.pg0.add_stream(t)
1621         self.pg_enable_capture(self.pg_interfaces)
1622         self.pg_start()
1623         rx = self.pg1.get_capture(1)
1624         rx = rx[0]
1625
1626         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1627         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1628
1629         self.assertEqual(rx[IPv6].src,
1630                          t[IPv6].src)
1631         self.assertEqual(rx[IPv6].dst,
1632                          t[IPv6].dst)
1633
1634         #
1635         # Test we proxy for the host on the main interface
1636         #
1637         ns_pg0 = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
1638                   IPv6(dst=d, src=self.pg0.remote_ip6) /
1639                   ICMPv6ND_NS(
1640                       tgt=self.pg0._remote_hosts[2].ip6) /
1641                   ICMPv6NDOptSrcLLAddr(
1642                       lladdr=self.pg0.remote_mac))
1643
1644         self.send_and_expect_na(self.pg0, ns_pg0,
1645                                 "NS to proxy entry on main",
1646                                 tgt_ip=self.pg0._remote_hosts[2].ip6,
1647                                 dst_ip=self.pg0.remote_ip6)
1648
1649         #
1650         # Setup and resolve proxy for another host on another interface
1651         #
1652         ns_pg2 = (Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac) /
1653                   IPv6(dst=d,
1654                        src=self.pg0._remote_hosts[3].ip6) /
1655                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1656                   ICMPv6NDOptSrcLLAddr(
1657                       lladdr=self.pg0._remote_hosts[2].mac))
1658
1659         self.vapi.ip6nd_proxy_add_del(
1660             is_add=1, ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1661             sw_if_index=self.pg2.sw_if_index)
1662
1663         self.send_and_expect_na(self.pg2, ns_pg2,
1664                                 "NS to proxy entry other interface",
1665                                 dst_ip=self.pg0._remote_hosts[3].ip6,
1666                                 tgt_ip=self.pg0.local_ip6)
1667
1668         self.assertTrue(find_nbr(self,
1669                                  self.pg2.sw_if_index,
1670                                  self.pg0._remote_hosts[3].ip6))
1671
1672         #
1673         # hosts can communicate. pg2->pg1
1674         #
1675         t2 = (Ether(dst=self.pg2.local_mac,
1676                     src=self.pg0.remote_hosts[3].mac) /
1677               IPv6(dst=self.pg0._remote_hosts[2].ip6,
1678                    src=self.pg0._remote_hosts[3].ip6) /
1679               inet6.UDP(sport=10000, dport=20000) /
1680               Raw(b'\xa5' * 100))
1681
1682         self.pg2.add_stream(t2)
1683         self.pg_enable_capture(self.pg_interfaces)
1684         self.pg_start()
1685         rx = self.pg1.get_capture(1)
1686         rx = rx[0]
1687
1688         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1689         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1690
1691         self.assertEqual(rx[IPv6].src,
1692                          t2[IPv6].src)
1693         self.assertEqual(rx[IPv6].dst,
1694                          t2[IPv6].dst)
1695
1696         #
1697         # remove the proxy configs
1698         #
1699         self.vapi.ip6nd_proxy_add_del(
1700             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1701             sw_if_index=self.pg1.sw_if_index, is_add=0)
1702         self.vapi.ip6nd_proxy_add_del(
1703             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1704             sw_if_index=self.pg2.sw_if_index, is_add=0)
1705
1706         self.assertFalse(find_nbr(self,
1707                                   self.pg2.sw_if_index,
1708                                   self.pg0._remote_hosts[3].ip6))
1709         self.assertFalse(find_nbr(self,
1710                                   self.pg1.sw_if_index,
1711                                   self.pg0._remote_hosts[2].ip6))
1712
1713         #
1714         # no longer proxy-ing...
1715         #
1716         self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1717         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1718         self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1719
1720         #
1721         # no longer forwarding. traffic generates NS out of the glean/main
1722         # interface
1723         #
1724         self.pg2.add_stream(t2)
1725         self.pg_enable_capture(self.pg_interfaces)
1726         self.pg_start()
1727
1728         rx = self.pg0.get_capture(1)
1729
1730         self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1731
1732
1733 class TestIPNull(VppTestCase):
1734     """ IPv6 routes via NULL """
1735
1736     @classmethod
1737     def setUpClass(cls):
1738         super(TestIPNull, cls).setUpClass()
1739
1740     @classmethod
1741     def tearDownClass(cls):
1742         super(TestIPNull, cls).tearDownClass()
1743
1744     def setUp(self):
1745         super(TestIPNull, self).setUp()
1746
1747         # create 2 pg interfaces
1748         self.create_pg_interfaces(range(1))
1749
1750         for i in self.pg_interfaces:
1751             i.admin_up()
1752             i.config_ip6()
1753             i.resolve_ndp()
1754
1755     def tearDown(self):
1756         super(TestIPNull, self).tearDown()
1757         for i in self.pg_interfaces:
1758             i.unconfig_ip6()
1759             i.admin_down()
1760
1761     def test_ip_null(self):
1762         """ IP NULL route """
1763
1764         p = (Ether(src=self.pg0.remote_mac,
1765                    dst=self.pg0.local_mac) /
1766              IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
1767              inet6.UDP(sport=1234, dport=1234) /
1768              Raw(b'\xa5' * 100))
1769
1770         #
1771         # A route via IP NULL that will reply with ICMP unreachables
1772         #
1773         ip_unreach = VppIpRoute(
1774             self, "2001::", 64,
1775             [VppRoutePath("::", 0xffffffff,
1776                           type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH)])
1777         ip_unreach.add_vpp_config()
1778
1779         self.pg0.add_stream(p)
1780         self.pg_enable_capture(self.pg_interfaces)
1781         self.pg_start()
1782
1783         rx = self.pg0.get_capture(1)
1784         rx = rx[0]
1785         icmp = rx[ICMPv6DestUnreach]
1786
1787         # 0 = "No route to destination"
1788         self.assertEqual(icmp.code, 0)
1789
1790         # ICMP is rate limited. pause a bit
1791         self.sleep(1)
1792
1793         #
1794         # A route via IP NULL that will reply with ICMP prohibited
1795         #
1796         ip_prohibit = VppIpRoute(
1797             self, "2001::1", 128,
1798             [VppRoutePath("::", 0xffffffff,
1799                           type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT)])
1800         ip_prohibit.add_vpp_config()
1801
1802         self.pg0.add_stream(p)
1803         self.pg_enable_capture(self.pg_interfaces)
1804         self.pg_start()
1805
1806         rx = self.pg0.get_capture(1)
1807         rx = rx[0]
1808         icmp = rx[ICMPv6DestUnreach]
1809
1810         # 1 = "Communication with destination administratively prohibited"
1811         self.assertEqual(icmp.code, 1)
1812
1813
1814 class TestIPDisabled(VppTestCase):
1815     """ IPv6 disabled """
1816
1817     @classmethod
1818     def setUpClass(cls):
1819         super(TestIPDisabled, cls).setUpClass()
1820
1821     @classmethod
1822     def tearDownClass(cls):
1823         super(TestIPDisabled, cls).tearDownClass()
1824
1825     def setUp(self):
1826         super(TestIPDisabled, self).setUp()
1827
1828         # create 2 pg interfaces
1829         self.create_pg_interfaces(range(2))
1830
1831         # PG0 is IP enabled
1832         self.pg0.admin_up()
1833         self.pg0.config_ip6()
1834         self.pg0.resolve_ndp()
1835
1836         # PG 1 is not IP enabled
1837         self.pg1.admin_up()
1838
1839     def tearDown(self):
1840         super(TestIPDisabled, self).tearDown()
1841         for i in self.pg_interfaces:
1842             i.unconfig_ip4()
1843             i.admin_down()
1844
1845     def test_ip_disabled(self):
1846         """ IP Disabled """
1847
1848         #
1849         # An (S,G).
1850         # one accepting interface, pg0, 2 forwarding interfaces
1851         #
1852         route_ff_01 = VppIpMRoute(
1853             self,
1854             "::",
1855             "ffef::1", 128,
1856             MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
1857             [VppMRoutePath(self.pg1.sw_if_index,
1858                            MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT),
1859              VppMRoutePath(self.pg0.sw_if_index,
1860                            MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)])
1861         route_ff_01.add_vpp_config()
1862
1863         pu = (Ether(src=self.pg1.remote_mac,
1864                     dst=self.pg1.local_mac) /
1865               IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
1866               inet6.UDP(sport=1234, dport=1234) /
1867               Raw(b'\xa5' * 100))
1868         pm = (Ether(src=self.pg1.remote_mac,
1869                     dst=self.pg1.local_mac) /
1870               IPv6(src="2001::1", dst="ffef::1") /
1871               inet6.UDP(sport=1234, dport=1234) /
1872               Raw(b'\xa5' * 100))
1873
1874         #
1875         # PG1 does not forward IP traffic
1876         #
1877         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1878         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1879
1880         #
1881         # IP enable PG1
1882         #
1883         self.pg1.config_ip6()
1884
1885         #
1886         # Now we get packets through
1887         #
1888         self.pg1.add_stream(pu)
1889         self.pg_enable_capture(self.pg_interfaces)
1890         self.pg_start()
1891         rx = self.pg0.get_capture(1)
1892
1893         self.pg1.add_stream(pm)
1894         self.pg_enable_capture(self.pg_interfaces)
1895         self.pg_start()
1896         rx = self.pg0.get_capture(1)
1897
1898         #
1899         # Disable PG1
1900         #
1901         self.pg1.unconfig_ip6()
1902
1903         #
1904         # PG1 does not forward IP traffic
1905         #
1906         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1907         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1908
1909
1910 class TestIP6LoadBalance(VppTestCase):
1911     """ IPv6 Load-Balancing """
1912
1913     @classmethod
1914     def setUpClass(cls):
1915         super(TestIP6LoadBalance, cls).setUpClass()
1916
1917     @classmethod
1918     def tearDownClass(cls):
1919         super(TestIP6LoadBalance, cls).tearDownClass()
1920
1921     def setUp(self):
1922         super(TestIP6LoadBalance, self).setUp()
1923
1924         self.create_pg_interfaces(range(5))
1925
1926         mpls_tbl = VppMplsTable(self, 0)
1927         mpls_tbl.add_vpp_config()
1928
1929         for i in self.pg_interfaces:
1930             i.admin_up()
1931             i.config_ip6()
1932             i.resolve_ndp()
1933             i.enable_mpls()
1934
1935     def tearDown(self):
1936         for i in self.pg_interfaces:
1937             i.unconfig_ip6()
1938             i.admin_down()
1939             i.disable_mpls()
1940         super(TestIP6LoadBalance, self).tearDown()
1941
1942     def pg_send(self, input, pkts):
1943         self.vapi.cli("clear trace")
1944         input.add_stream(pkts)
1945         self.pg_enable_capture(self.pg_interfaces)
1946         self.pg_start()
1947
1948     def send_and_expect_load_balancing(self, input, pkts, outputs):
1949         self.pg_send(input, pkts)
1950         for oo in outputs:
1951             rx = oo._get_capture(1)
1952             self.assertNotEqual(0, len(rx))
1953
1954     def send_and_expect_one_itf(self, input, pkts, itf):
1955         self.pg_send(input, pkts)
1956         rx = itf.get_capture(len(pkts))
1957
1958     def test_ip6_load_balance(self):
1959         """ IPv6 Load-Balancing """
1960
1961         #
1962         # An array of packets that differ only in the destination port
1963         #  - IP only
1964         #  - MPLS EOS
1965         #  - MPLS non-EOS
1966         #  - MPLS non-EOS with an entropy label
1967         #
1968         port_ip_pkts = []
1969         port_mpls_pkts = []
1970         port_mpls_neos_pkts = []
1971         port_ent_pkts = []
1972
1973         #
1974         # An array of packets that differ only in the source address
1975         #
1976         src_ip_pkts = []
1977         src_mpls_pkts = []
1978
1979         for ii in range(NUM_PKTS):
1980             port_ip_hdr = (
1981                 IPv6(dst="3000::1", src="3000:1::1") /
1982                 inet6.UDP(sport=1234, dport=1234 + ii) /
1983                 Raw(b'\xa5' * 100))
1984             port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1985                                        dst=self.pg0.local_mac) /
1986                                  port_ip_hdr))
1987             port_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1988                                          dst=self.pg0.local_mac) /
1989                                    MPLS(label=66, ttl=2) /
1990                                    port_ip_hdr))
1991             port_mpls_neos_pkts.append((Ether(src=self.pg0.remote_mac,
1992                                               dst=self.pg0.local_mac) /
1993                                         MPLS(label=67, ttl=2) /
1994                                         MPLS(label=77, ttl=2) /
1995                                         port_ip_hdr))
1996             port_ent_pkts.append((Ether(src=self.pg0.remote_mac,
1997                                         dst=self.pg0.local_mac) /
1998                                   MPLS(label=67, ttl=2) /
1999                                   MPLS(label=14, ttl=2) /
2000                                   MPLS(label=999, ttl=2) /
2001                                   port_ip_hdr))
2002             src_ip_hdr = (
2003                 IPv6(dst="3000::1", src="3000:1::%d" % ii) /
2004                 inet6.UDP(sport=1234, dport=1234) /
2005                 Raw(b'\xa5' * 100))
2006             src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
2007                                       dst=self.pg0.local_mac) /
2008                                 src_ip_hdr))
2009             src_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
2010                                         dst=self.pg0.local_mac) /
2011                                   MPLS(label=66, ttl=2) /
2012                                   src_ip_hdr))
2013
2014         #
2015         # A route for the IP packets
2016         #
2017         route_3000_1 = VppIpRoute(self, "3000::1", 128,
2018                                   [VppRoutePath(self.pg1.remote_ip6,
2019                                                 self.pg1.sw_if_index),
2020                                    VppRoutePath(self.pg2.remote_ip6,
2021                                                 self.pg2.sw_if_index)])
2022         route_3000_1.add_vpp_config()
2023
2024         #
2025         # a local-label for the EOS packets
2026         #
2027         binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
2028         binding.add_vpp_config()
2029
2030         #
2031         # An MPLS route for the non-EOS packets
2032         #
2033         route_67 = VppMplsRoute(self, 67, 0,
2034                                 [VppRoutePath(self.pg1.remote_ip6,
2035                                               self.pg1.sw_if_index,
2036                                               labels=[67]),
2037                                  VppRoutePath(self.pg2.remote_ip6,
2038                                               self.pg2.sw_if_index,
2039                                               labels=[67])])
2040         route_67.add_vpp_config()
2041
2042         #
2043         # inject the packet on pg0 - expect load-balancing across the 2 paths
2044         #  - since the default hash config is to use IP src,dst and port
2045         #    src,dst
2046         # We are not going to ensure equal amounts of packets across each link,
2047         # since the hash algorithm is statistical and therefore this can never
2048         # be guaranteed. But with 64 different packets we do expect some
2049         # balancing. So instead just ensure there is traffic on each link.
2050         #
2051         self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
2052                                             [self.pg1, self.pg2])
2053         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
2054                                             [self.pg1, self.pg2])
2055         self.send_and_expect_load_balancing(self.pg0, port_mpls_pkts,
2056                                             [self.pg1, self.pg2])
2057         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
2058                                             [self.pg1, self.pg2])
2059         self.send_and_expect_load_balancing(self.pg0, port_mpls_neos_pkts,
2060                                             [self.pg1, self.pg2])
2061
2062         #
2063         # The packets with Entropy label in should not load-balance,
2064         # since the Entropy value is fixed.
2065         #
2066         self.send_and_expect_one_itf(self.pg0, port_ent_pkts, self.pg1)
2067
2068         #
2069         # change the flow hash config so it's only IP src,dst
2070         #  - now only the stream with differing source address will
2071         #    load-balance
2072         #
2073         self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=0, dport=0,
2074                                    is_ipv6=1)
2075
2076         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
2077                                             [self.pg1, self.pg2])
2078         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
2079                                             [self.pg1, self.pg2])
2080         self.send_and_expect_one_itf(self.pg0, port_ip_pkts, self.pg2)
2081
2082         #
2083         # change the flow hash config back to defaults
2084         #
2085         self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=1, dport=1,
2086                                    is_ipv6=1)
2087
2088         #
2089         # Recursive prefixes
2090         #  - testing that 2 stages of load-balancing occurs and there is no
2091         #    polarisation (i.e. only 2 of 4 paths are used)
2092         #
2093         port_pkts = []
2094         src_pkts = []
2095
2096         for ii in range(257):
2097             port_pkts.append((Ether(src=self.pg0.remote_mac,
2098                                     dst=self.pg0.local_mac) /
2099                               IPv6(dst="4000::1",
2100                                    src="4000:1::1") /
2101                               inet6.UDP(sport=1234,
2102                                         dport=1234 + ii) /
2103                               Raw(b'\xa5' * 100)))
2104             src_pkts.append((Ether(src=self.pg0.remote_mac,
2105                                    dst=self.pg0.local_mac) /
2106                              IPv6(dst="4000::1",
2107                                   src="4000:1::%d" % ii) /
2108                              inet6.UDP(sport=1234, dport=1234) /
2109                              Raw(b'\xa5' * 100)))
2110
2111         route_3000_2 = VppIpRoute(self, "3000::2", 128,
2112                                   [VppRoutePath(self.pg3.remote_ip6,
2113                                                 self.pg3.sw_if_index),
2114                                    VppRoutePath(self.pg4.remote_ip6,
2115                                                 self.pg4.sw_if_index)])
2116         route_3000_2.add_vpp_config()
2117
2118         route_4000_1 = VppIpRoute(self, "4000::1", 128,
2119                                   [VppRoutePath("3000::1",
2120                                                 0xffffffff),
2121                                    VppRoutePath("3000::2",
2122                                                 0xffffffff)])
2123         route_4000_1.add_vpp_config()
2124
2125         #
2126         # inject the packet on pg0 - expect load-balancing across all 4 paths
2127         #
2128         self.vapi.cli("clear trace")
2129         self.send_and_expect_load_balancing(self.pg0, port_pkts,
2130                                             [self.pg1, self.pg2,
2131                                              self.pg3, self.pg4])
2132         self.send_and_expect_load_balancing(self.pg0, src_pkts,
2133                                             [self.pg1, self.pg2,
2134                                              self.pg3, self.pg4])
2135
2136         #
2137         # Recursive prefixes
2138         #  - testing that 2 stages of load-balancing no choices
2139         #
2140         port_pkts = []
2141
2142         for ii in range(257):
2143             port_pkts.append((Ether(src=self.pg0.remote_mac,
2144                                     dst=self.pg0.local_mac) /
2145                               IPv6(dst="6000::1",
2146                                    src="6000:1::1") /
2147                               inet6.UDP(sport=1234,
2148                                         dport=1234 + ii) /
2149                               Raw(b'\xa5' * 100)))
2150
2151         route_5000_2 = VppIpRoute(self, "5000::2", 128,
2152                                   [VppRoutePath(self.pg3.remote_ip6,
2153                                                 self.pg3.sw_if_index)])
2154         route_5000_2.add_vpp_config()
2155
2156         route_6000_1 = VppIpRoute(self, "6000::1", 128,
2157                                   [VppRoutePath("5000::2",
2158                                                 0xffffffff)])
2159         route_6000_1.add_vpp_config()
2160
2161         #
2162         # inject the packet on pg0 - expect load-balancing across all 4 paths
2163         #
2164         self.vapi.cli("clear trace")
2165         self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg3)
2166
2167
2168 class TestIP6Punt(VppTestCase):
2169     """ IPv6 Punt Police/Redirect """
2170
2171     @classmethod
2172     def setUpClass(cls):
2173         super(TestIP6Punt, cls).setUpClass()
2174
2175     @classmethod
2176     def tearDownClass(cls):
2177         super(TestIP6Punt, cls).tearDownClass()
2178
2179     def setUp(self):
2180         super(TestIP6Punt, self).setUp()
2181
2182         self.create_pg_interfaces(range(4))
2183
2184         for i in self.pg_interfaces:
2185             i.admin_up()
2186             i.config_ip6()
2187             i.resolve_ndp()
2188
2189     def tearDown(self):
2190         super(TestIP6Punt, self).tearDown()
2191         for i in self.pg_interfaces:
2192             i.unconfig_ip6()
2193             i.admin_down()
2194
2195     def test_ip_punt(self):
2196         """ IP6 punt police and redirect """
2197
2198         p = (Ether(src=self.pg0.remote_mac,
2199                    dst=self.pg0.local_mac) /
2200              IPv6(src=self.pg0.remote_ip6,
2201                   dst=self.pg0.local_ip6) /
2202              inet6.TCP(sport=1234, dport=1234) /
2203              Raw(b'\xa5' * 100))
2204
2205         pkts = p * 1025
2206
2207         #
2208         # Configure a punt redirect via pg1.
2209         #
2210         nh_addr = self.pg1.remote_ip6
2211         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2212                                    self.pg1.sw_if_index,
2213                                    nh_addr)
2214
2215         self.send_and_expect(self.pg0, pkts, self.pg1)
2216
2217         #
2218         # add a policer
2219         #
2220         policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
2221         policer.add_vpp_config()
2222         self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
2223
2224         self.vapi.cli("clear trace")
2225         self.pg0.add_stream(pkts)
2226         self.pg_enable_capture(self.pg_interfaces)
2227         self.pg_start()
2228
2229         #
2230         # the number of packet received should be greater than 0,
2231         # but not equal to the number sent, since some were policed
2232         #
2233         rx = self.pg1._get_capture(1)
2234         self.assertGreater(len(rx), 0)
2235         self.assertLess(len(rx), len(pkts))
2236
2237         #
2238         # remove the policer. back to full rx
2239         #
2240         self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
2241         policer.remove_vpp_config()
2242         self.send_and_expect(self.pg0, pkts, self.pg1)
2243
2244         #
2245         # remove the redirect. expect full drop.
2246         #
2247         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2248                                    self.pg1.sw_if_index,
2249                                    nh_addr,
2250                                    is_add=0)
2251         self.send_and_assert_no_replies(self.pg0, pkts,
2252                                         "IP no punt config")
2253
2254         #
2255         # Add a redirect that is not input port selective
2256         #
2257         self.vapi.ip_punt_redirect(0xffffffff,
2258                                    self.pg1.sw_if_index,
2259                                    nh_addr)
2260         self.send_and_expect(self.pg0, pkts, self.pg1)
2261
2262         self.vapi.ip_punt_redirect(0xffffffff,
2263                                    self.pg1.sw_if_index,
2264                                    nh_addr,
2265                                    is_add=0)
2266
2267     def test_ip_punt_dump(self):
2268         """ IP6 punt redirect dump"""
2269
2270         #
2271         # Configure a punt redirects
2272         #
2273         nh_addr = self.pg3.remote_ip6
2274         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2275                                    self.pg3.sw_if_index,
2276                                    nh_addr)
2277         self.vapi.ip_punt_redirect(self.pg1.sw_if_index,
2278                                    self.pg3.sw_if_index,
2279                                    nh_addr)
2280         self.vapi.ip_punt_redirect(self.pg2.sw_if_index,
2281                                    self.pg3.sw_if_index,
2282                                    '0::0')
2283
2284         #
2285         # Dump pg0 punt redirects
2286         #
2287         punts = self.vapi.ip_punt_redirect_dump(self.pg0.sw_if_index,
2288                                                 is_ipv6=1)
2289         for p in punts:
2290             self.assertEqual(p.punt.rx_sw_if_index, self.pg0.sw_if_index)
2291
2292         #
2293         # Dump punt redirects for all interfaces
2294         #
2295         punts = self.vapi.ip_punt_redirect_dump(0xffffffff, is_ipv6=1)
2296         self.assertEqual(len(punts), 3)
2297         for p in punts:
2298             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
2299         self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
2300         self.assertEqual(str(punts[2].punt.nh), '::')
2301
2302
2303 class TestIPDeag(VppTestCase):
2304     """ IPv6 Deaggregate Routes """
2305
2306     @classmethod
2307     def setUpClass(cls):
2308         super(TestIPDeag, cls).setUpClass()
2309
2310     @classmethod
2311     def tearDownClass(cls):
2312         super(TestIPDeag, cls).tearDownClass()
2313
2314     def setUp(self):
2315         super(TestIPDeag, self).setUp()
2316
2317         self.create_pg_interfaces(range(3))
2318
2319         for i in self.pg_interfaces:
2320             i.admin_up()
2321             i.config_ip6()
2322             i.resolve_ndp()
2323
2324     def tearDown(self):
2325         super(TestIPDeag, self).tearDown()
2326         for i in self.pg_interfaces:
2327             i.unconfig_ip6()
2328             i.admin_down()
2329
2330     def test_ip_deag(self):
2331         """ IP Deag Routes """
2332
2333         #
2334         # Create a table to be used for:
2335         #  1 - another destination address lookup
2336         #  2 - a source address lookup
2337         #
2338         table_dst = VppIpTable(self, 1, is_ip6=1)
2339         table_src = VppIpTable(self, 2, is_ip6=1)
2340         table_dst.add_vpp_config()
2341         table_src.add_vpp_config()
2342
2343         #
2344         # Add a route in the default table to point to a deag/
2345         # second lookup in each of these tables
2346         #
2347         route_to_dst = VppIpRoute(self, "1::1", 128,
2348                                   [VppRoutePath("::",
2349                                                 0xffffffff,
2350                                                 nh_table_id=1)])
2351         route_to_src = VppIpRoute(
2352             self, "1::2", 128,
2353             [VppRoutePath("::",
2354                           0xffffffff,
2355                           nh_table_id=2,
2356                           type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP)])
2357
2358         route_to_dst.add_vpp_config()
2359         route_to_src.add_vpp_config()
2360
2361         #
2362         # packets to these destination are dropped, since they'll
2363         # hit the respective default routes in the second table
2364         #
2365         p_dst = (Ether(src=self.pg0.remote_mac,
2366                        dst=self.pg0.local_mac) /
2367                  IPv6(src="5::5", dst="1::1") /
2368                  inet6.TCP(sport=1234, dport=1234) /
2369                  Raw(b'\xa5' * 100))
2370         p_src = (Ether(src=self.pg0.remote_mac,
2371                        dst=self.pg0.local_mac) /
2372                  IPv6(src="2::2", dst="1::2") /
2373                  inet6.TCP(sport=1234, dport=1234) /
2374                  Raw(b'\xa5' * 100))
2375         pkts_dst = p_dst * 257
2376         pkts_src = p_src * 257
2377
2378         self.send_and_assert_no_replies(self.pg0, pkts_dst,
2379                                         "IP in dst table")
2380         self.send_and_assert_no_replies(self.pg0, pkts_src,
2381                                         "IP in src table")
2382
2383         #
2384         # add a route in the dst table to forward via pg1
2385         #
2386         route_in_dst = VppIpRoute(self, "1::1", 128,
2387                                   [VppRoutePath(self.pg1.remote_ip6,
2388                                                 self.pg1.sw_if_index)],
2389                                   table_id=1)
2390         route_in_dst.add_vpp_config()
2391
2392         self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2393
2394         #
2395         # add a route in the src table to forward via pg2
2396         #
2397         route_in_src = VppIpRoute(self, "2::2", 128,
2398                                   [VppRoutePath(self.pg2.remote_ip6,
2399                                                 self.pg2.sw_if_index)],
2400                                   table_id=2)
2401         route_in_src.add_vpp_config()
2402         self.send_and_expect(self.pg0, pkts_src, self.pg2)
2403
2404         #
2405         # loop in the lookup DP
2406         #
2407         route_loop = VppIpRoute(self, "3::3", 128,
2408                                 [VppRoutePath("::",
2409                                               0xffffffff)])
2410         route_loop.add_vpp_config()
2411
2412         p_l = (Ether(src=self.pg0.remote_mac,
2413                      dst=self.pg0.local_mac) /
2414                IPv6(src="3::4", dst="3::3") /
2415                inet6.TCP(sport=1234, dport=1234) /
2416                Raw(b'\xa5' * 100))
2417
2418         self.send_and_assert_no_replies(self.pg0, p_l * 257,
2419                                         "IP lookup loop")
2420
2421
2422 class TestIP6Input(VppTestCase):
2423     """ IPv6 Input Exception Test Cases """
2424
2425     @classmethod
2426     def setUpClass(cls):
2427         super(TestIP6Input, cls).setUpClass()
2428
2429     @classmethod
2430     def tearDownClass(cls):
2431         super(TestIP6Input, cls).tearDownClass()
2432
2433     def setUp(self):
2434         super(TestIP6Input, self).setUp()
2435
2436         self.create_pg_interfaces(range(2))
2437
2438         for i in self.pg_interfaces:
2439             i.admin_up()
2440             i.config_ip6()
2441             i.resolve_ndp()
2442
2443     def tearDown(self):
2444         super(TestIP6Input, self).tearDown()
2445         for i in self.pg_interfaces:
2446             i.unconfig_ip6()
2447             i.admin_down()
2448
2449     def test_ip_input_icmp_reply(self):
2450         """ IP6 Input Exception - Return ICMP (3,0) """
2451         #
2452         # hop limit - ICMP replies
2453         #
2454         p_version = (Ether(src=self.pg0.remote_mac,
2455                            dst=self.pg0.local_mac) /
2456                      IPv6(src=self.pg0.remote_ip6,
2457                           dst=self.pg1.remote_ip6,
2458                           hlim=1) /
2459                      inet6.UDP(sport=1234, dport=1234) /
2460                      Raw(b'\xa5' * 100))
2461
2462         rx = self.send_and_expect(self.pg0, p_version * NUM_PKTS, self.pg0)
2463         rx = rx[0]
2464         icmp = rx[ICMPv6TimeExceeded]
2465
2466         # 0: "hop limit exceeded in transit",
2467         self.assertEqual((icmp.type, icmp.code), (3, 0))
2468
2469     icmpv6_data = '\x0a' * 18
2470     all_0s = "::"
2471     all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
2472
2473     @parameterized.expand([
2474         # Name, src, dst, l4proto, msg, timeout
2475         ("src='iface',   dst='iface'", None, None,
2476          inet6.UDP(sport=1234, dport=1234), "funky version", None),
2477         ("src='All 0's', dst='iface'", all_0s, None,
2478          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2479         ("src='iface',   dst='All 0's'", None, all_0s,
2480          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2481         ("src='All 1's', dst='iface'", all_1s, None,
2482          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2483         ("src='iface',   dst='All 1's'", None, all_1s,
2484          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2485         ("src='All 1's', dst='All 1's'", all_1s, all_1s,
2486          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2487
2488     ])
2489     def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
2490
2491         self._testMethodDoc = 'IPv6 Input Exception - %s' % name
2492
2493         p_version = (Ether(src=self.pg0.remote_mac,
2494                            dst=self.pg0.local_mac) /
2495                      IPv6(src=src or self.pg0.remote_ip6,
2496                           dst=dst or self.pg1.remote_ip6,
2497                           version=3) /
2498                      l4 /
2499                      Raw(b'\xa5' * 100))
2500
2501         self.send_and_assert_no_replies(self.pg0, p_version * NUM_PKTS,
2502                                         remark=msg or "",
2503                                         timeout=timeout)
2504
2505     def test_hop_by_hop(self):
2506         """ Hop-by-hop header test """
2507
2508         p = (Ether(src=self.pg0.remote_mac,
2509                    dst=self.pg0.local_mac) /
2510              IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
2511              IPv6ExtHdrHopByHop() /
2512              inet6.UDP(sport=1234, dport=1234) /
2513              Raw(b'\xa5' * 100))
2514
2515         self.pg0.add_stream(p)
2516         self.pg_enable_capture(self.pg_interfaces)
2517         self.pg_start()
2518
2519
2520 class TestIPReplace(VppTestCase):
2521     """ IPv6 Table Replace """
2522
2523     @classmethod
2524     def setUpClass(cls):
2525         super(TestIPReplace, cls).setUpClass()
2526
2527     @classmethod
2528     def tearDownClass(cls):
2529         super(TestIPReplace, cls).tearDownClass()
2530
2531     def setUp(self):
2532         super(TestIPReplace, self).setUp()
2533
2534         self.create_pg_interfaces(range(4))
2535
2536         table_id = 1
2537         self.tables = []
2538
2539         for i in self.pg_interfaces:
2540             i.admin_up()
2541             i.config_ip6()
2542             i.generate_remote_hosts(2)
2543             self.tables.append(VppIpTable(self, table_id,
2544                                           True).add_vpp_config())
2545             table_id += 1
2546
2547     def tearDown(self):
2548         super(TestIPReplace, self).tearDown()
2549         for i in self.pg_interfaces:
2550             i.admin_down()
2551             i.unconfig_ip6()
2552
2553     def test_replace(self):
2554         """ IP Table Replace """
2555
2556         N_ROUTES = 20
2557         links = [self.pg0, self.pg1, self.pg2, self.pg3]
2558         routes = [[], [], [], []]
2559
2560         # the sizes of 'empty' tables
2561         for t in self.tables:
2562             self.assertEqual(len(t.dump()), 2)
2563             self.assertEqual(len(t.mdump()), 5)
2564
2565         # load up the tables with some routes
2566         for ii, t in enumerate(self.tables):
2567             for jj in range(1, N_ROUTES):
2568                 uni = VppIpRoute(
2569                     self, "2001::%d" % jj if jj != 0 else "2001::", 128,
2570                     [VppRoutePath(links[ii].remote_hosts[0].ip6,
2571                                   links[ii].sw_if_index),
2572                      VppRoutePath(links[ii].remote_hosts[1].ip6,
2573                                   links[ii].sw_if_index)],
2574                     table_id=t.table_id).add_vpp_config()
2575                 multi = VppIpMRoute(
2576                     self, "::",
2577                     "ff:2001::%d" % jj, 128,
2578                     MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
2579                     [VppMRoutePath(self.pg0.sw_if_index,
2580                                    MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT,
2581                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
2582                      VppMRoutePath(self.pg1.sw_if_index,
2583                                    MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
2584                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
2585                      VppMRoutePath(self.pg2.sw_if_index,
2586                                    MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
2587                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
2588                      VppMRoutePath(self.pg3.sw_if_index,
2589                                    MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
2590                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)],
2591                     table_id=t.table_id).add_vpp_config()
2592                 routes[ii].append({'uni': uni,
2593                                    'multi': multi})
2594
2595         #
2596         # replace the tables a few times
2597         #
2598         for kk in range(3):
2599             # replace each table
2600             for t in self.tables:
2601                 t.replace_begin()
2602
2603             # all the routes are still there
2604             for ii, t in enumerate(self.tables):
2605                 dump = t.dump()
2606                 mdump = t.mdump()
2607                 for r in routes[ii]:
2608                     self.assertTrue(find_route_in_dump(dump, r['uni'], t))
2609                     self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
2610
2611             # redownload the even numbered routes
2612             for ii, t in enumerate(self.tables):
2613                 for jj in range(0, N_ROUTES, 2):
2614                     routes[ii][jj]['uni'].add_vpp_config()
2615                     routes[ii][jj]['multi'].add_vpp_config()
2616
2617             # signal each table converged
2618             for t in self.tables:
2619                 t.replace_end()
2620
2621             # we should find the even routes, but not the odd
2622             for ii, t in enumerate(self.tables):
2623                 dump = t.dump()
2624                 mdump = t.mdump()
2625                 for jj in range(0, N_ROUTES, 2):
2626                     self.assertTrue(find_route_in_dump(
2627                         dump, routes[ii][jj]['uni'], t))
2628                     self.assertTrue(find_mroute_in_dump(
2629                         mdump, routes[ii][jj]['multi'], t))
2630                 for jj in range(1, N_ROUTES - 1, 2):
2631                     self.assertFalse(find_route_in_dump(
2632                         dump, routes[ii][jj]['uni'], t))
2633                     self.assertFalse(find_mroute_in_dump(
2634                         mdump, routes[ii][jj]['multi'], t))
2635
2636             # reload all the routes
2637             for ii, t in enumerate(self.tables):
2638                 for r in routes[ii]:
2639                     r['uni'].add_vpp_config()
2640                     r['multi'].add_vpp_config()
2641
2642             # all the routes are still there
2643             for ii, t in enumerate(self.tables):
2644                 dump = t.dump()
2645                 mdump = t.mdump()
2646                 for r in routes[ii]:
2647                     self.assertTrue(find_route_in_dump(dump, r['uni'], t))
2648                     self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
2649
2650         #
2651         # finally flush the tables for good measure
2652         #
2653         for t in self.tables:
2654             t.flush()
2655             self.assertEqual(len(t.dump()), 2)
2656             self.assertEqual(len(t.mdump()), 5)
2657
2658
2659 class TestIP6Replace(VppTestCase):
2660     """ IPv4 Interface Address Replace """
2661
2662     @classmethod
2663     def setUpClass(cls):
2664         super(TestIP6Replace, cls).setUpClass()
2665
2666     @classmethod
2667     def tearDownClass(cls):
2668         super(TestIP6Replace, cls).tearDownClass()
2669
2670     def setUp(self):
2671         super(TestIP6Replace, self).setUp()
2672
2673         self.create_pg_interfaces(range(4))
2674
2675         for i in self.pg_interfaces:
2676             i.admin_up()
2677
2678     def tearDown(self):
2679         super(TestIP6Replace, self).tearDown()
2680         for i in self.pg_interfaces:
2681             i.admin_down()
2682
2683     def get_n_pfxs(self, intf):
2684         return len(self.vapi.ip_address_dump(intf.sw_if_index, True))
2685
2686     def test_replace(self):
2687         """ IP interface address replace """
2688
2689         intf_pfxs = [[], [], [], []]
2690
2691         # add prefixes to each of the interfaces
2692         for i in range(len(self.pg_interfaces)):
2693             intf = self.pg_interfaces[i]
2694
2695             # 2001:16:x::1/64
2696             addr = "2001:16:%d::1" % intf.sw_if_index
2697             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
2698             intf_pfxs[i].append(a)
2699
2700             # 2001:16:x::2/64 - a different address in the same subnet as above
2701             addr = "2001:16:%d::2" % intf.sw_if_index
2702             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
2703             intf_pfxs[i].append(a)
2704
2705             # 2001:15:x::2/64 - a different address and subnet
2706             addr = "2001:15:%d::2" % intf.sw_if_index
2707             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
2708             intf_pfxs[i].append(a)
2709
2710         # a dump should n_address in it
2711         for intf in self.pg_interfaces:
2712             self.assertEqual(self.get_n_pfxs(intf), 3)
2713
2714         #
2715         # remove all the address thru a replace
2716         #
2717         self.vapi.sw_interface_address_replace_begin()
2718         self.vapi.sw_interface_address_replace_end()
2719         for intf in self.pg_interfaces:
2720             self.assertEqual(self.get_n_pfxs(intf), 0)
2721
2722         #
2723         # add all the interface addresses back
2724         #
2725         for p in intf_pfxs:
2726             for v in p:
2727                 v.add_vpp_config()
2728         for intf in self.pg_interfaces:
2729             self.assertEqual(self.get_n_pfxs(intf), 3)
2730
2731         #
2732         # replace again, but this time update/re-add the address on the first
2733         # two interfaces
2734         #
2735         self.vapi.sw_interface_address_replace_begin()
2736
2737         for p in intf_pfxs[:2]:
2738             for v in p:
2739                 v.add_vpp_config()
2740
2741         self.vapi.sw_interface_address_replace_end()
2742
2743         # on the first two the address still exist,
2744         # on the other two they do not
2745         for intf in self.pg_interfaces[:2]:
2746             self.assertEqual(self.get_n_pfxs(intf), 3)
2747         for p in intf_pfxs[:2]:
2748             for v in p:
2749                 self.assertTrue(v.query_vpp_config())
2750         for intf in self.pg_interfaces[2:]:
2751             self.assertEqual(self.get_n_pfxs(intf), 0)
2752
2753         #
2754         # add all the interface addresses back on the last two
2755         #
2756         for p in intf_pfxs[2:]:
2757             for v in p:
2758                 v.add_vpp_config()
2759         for intf in self.pg_interfaces:
2760             self.assertEqual(self.get_n_pfxs(intf), 3)
2761
2762         #
2763         # replace again, this time add different prefixes on all the interfaces
2764         #
2765         self.vapi.sw_interface_address_replace_begin()
2766
2767         pfxs = []
2768         for intf in self.pg_interfaces:
2769             # 2001:18:x::1/64
2770             addr = "2001:18:%d::1" % intf.sw_if_index
2771             pfxs.append(VppIpInterfaceAddress(self, intf, addr,
2772                                               64).add_vpp_config())
2773
2774         self.vapi.sw_interface_address_replace_end()
2775
2776         # only .18 should exist on each interface
2777         for intf in self.pg_interfaces:
2778             self.assertEqual(self.get_n_pfxs(intf), 1)
2779         for pfx in pfxs:
2780             self.assertTrue(pfx.query_vpp_config())
2781
2782         #
2783         # remove everything
2784         #
2785         self.vapi.sw_interface_address_replace_begin()
2786         self.vapi.sw_interface_address_replace_end()
2787         for intf in self.pg_interfaces:
2788             self.assertEqual(self.get_n_pfxs(intf), 0)
2789
2790         #
2791         # add prefixes to each interface. post-begin add the prefix from
2792         # interface X onto interface Y. this would normally be an error
2793         # since it would generate a 'duplicate address' warning. but in
2794         # this case, since what is newly downloaded is sane, it's ok
2795         #
2796         for intf in self.pg_interfaces:
2797             # 2001:18:x::1/64
2798             addr = "2001:18:%d::1" % intf.sw_if_index
2799             VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
2800
2801         self.vapi.sw_interface_address_replace_begin()
2802
2803         pfxs = []
2804         for intf in self.pg_interfaces:
2805             # 2001:18:x::1/64
2806             addr = "2001:18:%d::1" % (intf.sw_if_index + 1)
2807             pfxs.append(VppIpInterfaceAddress(self, intf,
2808                                               addr, 64).add_vpp_config())
2809
2810         self.vapi.sw_interface_address_replace_end()
2811
2812         self.logger.info(self.vapi.cli("sh int addr"))
2813
2814         for intf in self.pg_interfaces:
2815             self.assertEqual(self.get_n_pfxs(intf), 1)
2816         for pfx in pfxs:
2817             self.assertTrue(pfx.query_vpp_config())
2818
2819
2820 class TestIP6LinkLocal(VppTestCase):
2821     """ IPv6 Link Local """
2822
2823     @classmethod
2824     def setUpClass(cls):
2825         super(TestIP6LinkLocal, cls).setUpClass()
2826
2827     @classmethod
2828     def tearDownClass(cls):
2829         super(TestIP6LinkLocal, cls).tearDownClass()
2830
2831     def setUp(self):
2832         super(TestIP6LinkLocal, self).setUp()
2833
2834         self.create_pg_interfaces(range(2))
2835
2836         for i in self.pg_interfaces:
2837             i.admin_up()
2838
2839     def tearDown(self):
2840         super(TestIP6LinkLocal, self).tearDown()
2841         for i in self.pg_interfaces:
2842             i.admin_down()
2843
2844     def test_ip6_ll(self):
2845         """ IPv6 Link Local """
2846
2847         #
2848         # two APIs to add a link local address.
2849         #   1 - just like any other prefix
2850         #   2 - with the special set LL API
2851         #
2852
2853         #
2854         # First with the API to set a 'normal' prefix
2855         #
2856         ll1 = "fe80:1::1"
2857         ll2 = "fe80:2::2"
2858         ll3 = "fe80:3::3"
2859
2860         VppIpInterfaceAddress(self, self.pg0, ll1, 128).add_vpp_config()
2861
2862         #
2863         # should be able to ping the ll
2864         #
2865         p_echo_request_1 = (Ether(src=self.pg0.remote_mac,
2866                                   dst=self.pg0.local_mac) /
2867                             IPv6(src=ll2,
2868                                  dst=ll1) /
2869                             ICMPv6EchoRequest())
2870
2871         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
2872
2873         #
2874         # change the link-local on pg0
2875         #
2876         v_ll3 = VppIpInterfaceAddress(self, self.pg0,
2877                                       ll3, 128).add_vpp_config()
2878
2879         p_echo_request_3 = (Ether(src=self.pg0.remote_mac,
2880                                   dst=self.pg0.local_mac) /
2881                             IPv6(src=ll2,
2882                                  dst=ll3) /
2883                             ICMPv6EchoRequest())
2884
2885         self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
2886
2887         #
2888         # set a normal v6 prefix on the link
2889         #
2890         self.pg0.config_ip6()
2891
2892         self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
2893
2894         # the link-local cannot be removed
2895         with self.vapi.assert_negative_api_retval():
2896             v_ll3.remove_vpp_config()
2897
2898         #
2899         # Use the specific link-local API on pg1
2900         #
2901         VppIp6LinkLocalAddress(self, self.pg1, ll1).add_vpp_config()
2902         self.send_and_expect(self.pg1, [p_echo_request_1], self.pg1)
2903
2904         VppIp6LinkLocalAddress(self, self.pg1, ll3).add_vpp_config()
2905         self.send_and_expect(self.pg1, [p_echo_request_3], self.pg1)
2906
2907
2908 if __name__ == '__main__':
2909     unittest.main(testRunner=VppTestRunner)