api: ip: add IP_ROUTE_LOOKUP API
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python3
2
3 import socket
4 from socket import inet_pton, inet_ntop
5 import unittest
6
7 from parameterized import parameterized
8 import scapy.compat
9 import scapy.layers.inet6 as inet6
10 from scapy.contrib.mpls import MPLS
11 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
12     ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
13     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
14     ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, \
15     IPv6ExtHdrHopByHop, ICMPv6MLReport2, ICMPv6MLDMultAddrRec
16 from scapy.layers.l2 import Ether, Dot1Q
17 from scapy.packet import Raw
18 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
19     in6_mactoifaceid
20 from six import moves
21
22 from framework import VppTestCase, VppTestRunner
23 from util import ppp, ip6_normalize, mk_ll_addr
24 from vpp_ip import DpoProto
25 from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
26     VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
27     VppMplsRoute, VppMplsTable, VppIpTable, FibPathType, FibPathProto, \
28     VppIpInterfaceAddress, find_route_in_dump, find_mroute_in_dump, \
29     VppIp6LinkLocalAddress
30 from vpp_neighbor import find_nbr, VppNeighbor
31 from vpp_pg_interface import is_ipv6_misc
32 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
33 from vpp_policer import VppPolicer
34 from ipaddress import IPv6Network, IPv6Address
35
36 AF_INET6 = socket.AF_INET6
37
38 try:
39     text_type = unicode
40 except NameError:
41     text_type = str
42
43 NUM_PKTS = 67
44
45
46 class TestIPv6ND(VppTestCase):
47     def validate_ra(self, intf, rx, dst_ip=None):
48         if not dst_ip:
49             dst_ip = intf.remote_ip6
50
51         # unicasted packets must come to the unicast mac
52         self.assertEqual(rx[Ether].dst, intf.remote_mac)
53
54         # and from the router's MAC
55         self.assertEqual(rx[Ether].src, intf.local_mac)
56
57         # the rx'd RA should be addressed to the sender's source
58         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
59         self.assertEqual(in6_ptop(rx[IPv6].dst),
60                          in6_ptop(dst_ip))
61
62         # and come from the router's link local
63         self.assertTrue(in6_islladdr(rx[IPv6].src))
64         self.assertEqual(in6_ptop(rx[IPv6].src),
65                          in6_ptop(mk_ll_addr(intf.local_mac)))
66
67     def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
68         if not dst_ip:
69             dst_ip = intf.remote_ip6
70         if not tgt_ip:
71             dst_ip = intf.local_ip6
72
73         # unicasted packets must come to the unicast mac
74         self.assertEqual(rx[Ether].dst, intf.remote_mac)
75
76         # and from the router's MAC
77         self.assertEqual(rx[Ether].src, intf.local_mac)
78
79         # the rx'd NA should be addressed to the sender's source
80         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
81         self.assertEqual(in6_ptop(rx[IPv6].dst),
82                          in6_ptop(dst_ip))
83
84         # and come from the target address
85         self.assertEqual(
86             in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
87
88         # Dest link-layer options should have the router's MAC
89         dll = rx[ICMPv6NDOptDstLLAddr]
90         self.assertEqual(dll.lladdr, intf.local_mac)
91
92     def validate_ns(self, intf, rx, tgt_ip):
93         nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
94         dst_ip = inet_ntop(AF_INET6, nsma)
95
96         # NS is broadcast
97         self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
98
99         # and from the router's MAC
100         self.assertEqual(rx[Ether].src, intf.local_mac)
101
102         # the rx'd NS should be addressed to an mcast address
103         # derived from the target address
104         self.assertEqual(
105             in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
106
107         # expect the tgt IP in the NS header
108         ns = rx[ICMPv6ND_NS]
109         self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
110
111         # packet is from the router's local address
112         self.assertEqual(
113             in6_ptop(rx[IPv6].src), intf.local_ip6)
114
115         # Src link-layer options should have the router's MAC
116         sll = rx[ICMPv6NDOptSrcLLAddr]
117         self.assertEqual(sll.lladdr, intf.local_mac)
118
119     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
120                            filter_out_fn=is_ipv6_misc):
121         intf.add_stream(pkts)
122         self.pg_enable_capture(self.pg_interfaces)
123         self.pg_start()
124         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
125
126         self.assertEqual(len(rx), 1)
127         rx = rx[0]
128         self.validate_ra(intf, rx, dst_ip)
129
130     def send_and_expect_na(self, intf, pkts, remark, dst_ip=None,
131                            tgt_ip=None,
132                            filter_out_fn=is_ipv6_misc):
133         intf.add_stream(pkts)
134         self.pg_enable_capture(self.pg_interfaces)
135         self.pg_start()
136         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
137
138         self.assertEqual(len(rx), 1)
139         rx = rx[0]
140         self.validate_na(intf, rx, dst_ip, tgt_ip)
141
142     def send_and_expect_ns(self, tx_intf, rx_intf, pkts, tgt_ip,
143                            filter_out_fn=is_ipv6_misc):
144         self.vapi.cli("clear trace")
145         tx_intf.add_stream(pkts)
146         self.pg_enable_capture(self.pg_interfaces)
147         self.pg_start()
148         rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
149
150         self.assertEqual(len(rx), 1)
151         rx = rx[0]
152         self.validate_ns(rx_intf, rx, tgt_ip)
153
154     def verify_ip(self, rx, smac, dmac, sip, dip):
155         ether = rx[Ether]
156         self.assertEqual(ether.dst, dmac)
157         self.assertEqual(ether.src, smac)
158
159         ip = rx[IPv6]
160         self.assertEqual(ip.src, sip)
161         self.assertEqual(ip.dst, dip)
162
163
164 class TestIPv6(TestIPv6ND):
165     """ IPv6 Test Case """
166
167     @classmethod
168     def setUpClass(cls):
169         super(TestIPv6, cls).setUpClass()
170
171     @classmethod
172     def tearDownClass(cls):
173         super(TestIPv6, cls).tearDownClass()
174
175     def setUp(self):
176         """
177         Perform test setup before test case.
178
179         **Config:**
180             - create 3 pg interfaces
181                 - untagged pg0 interface
182                 - Dot1Q subinterface on pg1
183                 - Dot1AD subinterface on pg2
184             - setup interfaces:
185                 - put it into UP state
186                 - set IPv6 addresses
187                 - resolve neighbor address using NDP
188             - configure 200 fib entries
189
190         :ivar list interfaces: pg interfaces and subinterfaces.
191         :ivar dict flows: IPv4 packet flows in test.
192
193         *TODO:* Create AD sub interface
194         """
195         super(TestIPv6, self).setUp()
196
197         # create 3 pg interfaces
198         self.create_pg_interfaces(range(3))
199
200         # create 2 subinterfaces for p1 and pg2
201         self.sub_interfaces = [
202             VppDot1QSubint(self, self.pg1, 100),
203             VppDot1QSubint(self, self.pg2, 200)
204             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
205         ]
206
207         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
208         self.flows = dict()
209         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
210         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
211         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
212
213         # packet sizes
214         self.pg_if_packet_sizes = [64, 1500, 9020]
215
216         self.interfaces = list(self.pg_interfaces)
217         self.interfaces.extend(self.sub_interfaces)
218
219         # setup all interfaces
220         for i in self.interfaces:
221             i.admin_up()
222             i.config_ip6()
223             i.resolve_ndp()
224
225     def tearDown(self):
226         """Run standard test teardown and log ``show ip6 neighbors``."""
227         for i in self.interfaces:
228             i.unconfig_ip6()
229             i.admin_down()
230         for i in self.sub_interfaces:
231             i.remove_vpp_config()
232
233         super(TestIPv6, self).tearDown()
234         if not self.vpp_dead:
235             self.logger.info(self.vapi.cli("show ip6 neighbors"))
236             # info(self.vapi.cli("show ip6 fib"))  # many entries
237
238     def modify_packet(self, src_if, packet_size, pkt):
239         """Add load, set destination IP and extend packet to required packet
240         size for defined interface.
241
242         :param VppInterface src_if: Interface to create packet for.
243         :param int packet_size: Required packet size.
244         :param Scapy pkt: Packet to be modified.
245         """
246         dst_if_idx = int(packet_size / 10 % 2)
247         dst_if = self.flows[src_if][dst_if_idx]
248         info = self.create_packet_info(src_if, dst_if)
249         payload = self.info_to_payload(info)
250         p = pkt / Raw(payload)
251         p[IPv6].dst = dst_if.remote_ip6
252         info.data = p.copy()
253         if isinstance(src_if, VppSubInterface):
254             p = src_if.add_dot1_layer(p)
255         self.extend_packet(p, packet_size)
256
257         return p
258
259     def create_stream(self, src_if):
260         """Create input packet stream for defined interface.
261
262         :param VppInterface src_if: Interface to create packet stream for.
263         """
264         hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
265         pkt_tmpl = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
266                     IPv6(src=src_if.remote_ip6) /
267                     inet6.UDP(sport=1234, dport=1234))
268
269         pkts = [self.modify_packet(src_if, i, pkt_tmpl)
270                 for i in moves.range(self.pg_if_packet_sizes[0],
271                                      self.pg_if_packet_sizes[1], 10)]
272         pkts_b = [self.modify_packet(src_if, i, pkt_tmpl)
273                   for i in moves.range(self.pg_if_packet_sizes[1] + hdr_ext,
274                                        self.pg_if_packet_sizes[2] + hdr_ext,
275                                        50)]
276         pkts.extend(pkts_b)
277
278         return pkts
279
280     def verify_capture(self, dst_if, capture):
281         """Verify captured input packet stream for defined interface.
282
283         :param VppInterface dst_if: Interface to verify captured packet stream
284                                     for.
285         :param list capture: Captured packet stream.
286         """
287         self.logger.info("Verifying capture on interface %s" % dst_if.name)
288         last_info = dict()
289         for i in self.interfaces:
290             last_info[i.sw_if_index] = None
291         is_sub_if = False
292         dst_sw_if_index = dst_if.sw_if_index
293         if hasattr(dst_if, 'parent'):
294             is_sub_if = True
295         for packet in capture:
296             if is_sub_if:
297                 # Check VLAN tags and Ethernet header
298                 packet = dst_if.remove_dot1_layer(packet)
299             self.assertTrue(Dot1Q not in packet)
300             try:
301                 ip = packet[IPv6]
302                 udp = packet[inet6.UDP]
303                 payload_info = self.payload_to_info(packet[Raw])
304                 packet_index = payload_info.index
305                 self.assertEqual(payload_info.dst, dst_sw_if_index)
306                 self.logger.debug(
307                     "Got packet on port %s: src=%u (id=%u)" %
308                     (dst_if.name, payload_info.src, packet_index))
309                 next_info = self.get_next_packet_info_for_interface2(
310                     payload_info.src, dst_sw_if_index,
311                     last_info[payload_info.src])
312                 last_info[payload_info.src] = next_info
313                 self.assertTrue(next_info is not None)
314                 self.assertEqual(packet_index, next_info.index)
315                 saved_packet = next_info.data
316                 # Check standard fields
317                 self.assertEqual(
318                     ip.src, saved_packet[IPv6].src)
319                 self.assertEqual(
320                     ip.dst, saved_packet[IPv6].dst)
321                 self.assertEqual(
322                     udp.sport, saved_packet[inet6.UDP].sport)
323                 self.assertEqual(
324                     udp.dport, saved_packet[inet6.UDP].dport)
325             except:
326                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
327                 raise
328         for i in self.interfaces:
329             remaining_packet = self.get_next_packet_info_for_interface2(
330                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
331             self.assertTrue(remaining_packet is None,
332                             "Interface %s: Packet expected from interface %s "
333                             "didn't arrive" % (dst_if.name, i.name))
334
335     def test_next_header_anomaly(self):
336         """ IPv6 next header anomaly test
337
338         Test scenario:
339             - ipv6 next header field = Fragment Header (44)
340             - next header is ICMPv6 Echo Request
341             - wait for reassembly
342         """
343         pkt = (Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) /
344                IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=44) /
345                ICMPv6EchoRequest())
346
347         self.pg0.add_stream(pkt)
348         self.pg_start()
349
350         # wait for reassembly
351         self.sleep(10)
352
353     def test_fib(self):
354         """ IPv6 FIB test
355
356         Test scenario:
357             - Create IPv6 stream for pg0 interface
358             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
359             - Send and verify received packets on each interface.
360         """
361
362         pkts = self.create_stream(self.pg0)
363         self.pg0.add_stream(pkts)
364
365         for i in self.sub_interfaces:
366             pkts = self.create_stream(i)
367             i.parent.add_stream(pkts)
368
369         self.pg_enable_capture(self.pg_interfaces)
370         self.pg_start()
371
372         pkts = self.pg0.get_capture()
373         self.verify_capture(self.pg0, pkts)
374
375         for i in self.sub_interfaces:
376             pkts = i.parent.get_capture()
377             self.verify_capture(i, pkts)
378
379     def test_ns(self):
380         """ IPv6 Neighbour Solicitation Exceptions
381
382         Test scenario:
383            - Send an NS Sourced from an address not covered by the link sub-net
384            - Send an NS to an mcast address the router has not joined
385            - Send NS for a target address the router does not onn.
386         """
387
388         #
389         # An NS from a non link source address
390         #
391         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
392         d = inet_ntop(AF_INET6, nsma)
393
394         p = (Ether(dst=in6_getnsmac(nsma)) /
395              IPv6(dst=d, src="2002::2") /
396              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
397              ICMPv6NDOptSrcLLAddr(
398                  lladdr=self.pg0.remote_mac))
399         pkts = [p]
400
401         self.send_and_assert_no_replies(
402             self.pg0, pkts,
403             "No response to NS source by address not on sub-net")
404
405         #
406         # An NS for sent to a solicited mcast group the router is
407         # not a member of FAILS
408         #
409         if 0:
410             nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
411             d = inet_ntop(AF_INET6, nsma)
412
413             p = (Ether(dst=in6_getnsmac(nsma)) /
414                  IPv6(dst=d, src=self.pg0.remote_ip6) /
415                  ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
416                  ICMPv6NDOptSrcLLAddr(
417                      lladdr=self.pg0.remote_mac))
418             pkts = [p]
419
420             self.send_and_assert_no_replies(
421                 self.pg0, pkts,
422                 "No response to NS sent to unjoined mcast address")
423
424         #
425         # An NS whose target address is one the router does not own
426         #
427         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
428         d = inet_ntop(AF_INET6, nsma)
429
430         p = (Ether(dst=in6_getnsmac(nsma)) /
431              IPv6(dst=d, src=self.pg0.remote_ip6) /
432              ICMPv6ND_NS(tgt="fd::ffff") /
433              ICMPv6NDOptSrcLLAddr(
434                  lladdr=self.pg0.remote_mac))
435         pkts = [p]
436
437         self.send_and_assert_no_replies(self.pg0, pkts,
438                                         "No response to NS for unknown target")
439
440         #
441         # A neighbor entry that has no associated FIB-entry
442         #
443         self.pg0.generate_remote_hosts(4)
444         nd_entry = VppNeighbor(self,
445                                self.pg0.sw_if_index,
446                                self.pg0.remote_hosts[2].mac,
447                                self.pg0.remote_hosts[2].ip6,
448                                is_no_fib_entry=1)
449         nd_entry.add_vpp_config()
450
451         #
452         # check we have the neighbor, but no route
453         #
454         self.assertTrue(find_nbr(self,
455                                  self.pg0.sw_if_index,
456                                  self.pg0._remote_hosts[2].ip6))
457         self.assertFalse(find_route(self,
458                                     self.pg0._remote_hosts[2].ip6,
459                                     128))
460
461         #
462         # send an NS from a link local address to the interface's global
463         # address
464         #
465         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
466              IPv6(
467                  dst=d, src=self.pg0._remote_hosts[2].ip6_ll) /
468              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
469              ICMPv6NDOptSrcLLAddr(
470                  lladdr=self.pg0.remote_mac))
471
472         self.send_and_expect_na(self.pg0, p,
473                                 "NS from link-local",
474                                 dst_ip=self.pg0._remote_hosts[2].ip6_ll,
475                                 tgt_ip=self.pg0.local_ip6)
476
477         #
478         # we should have learned an ND entry for the peer's link-local
479         # but not inserted a route to it in the FIB
480         #
481         self.assertTrue(find_nbr(self,
482                                  self.pg0.sw_if_index,
483                                  self.pg0._remote_hosts[2].ip6_ll))
484         self.assertFalse(find_route(self,
485                                     self.pg0._remote_hosts[2].ip6_ll,
486                                     128))
487
488         #
489         # An NS to the router's own Link-local
490         #
491         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
492              IPv6(
493                  dst=d, src=self.pg0._remote_hosts[3].ip6_ll) /
494              ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll) /
495              ICMPv6NDOptSrcLLAddr(
496                  lladdr=self.pg0.remote_mac))
497
498         self.send_and_expect_na(self.pg0, p,
499                                 "NS to/from link-local",
500                                 dst_ip=self.pg0._remote_hosts[3].ip6_ll,
501                                 tgt_ip=self.pg0.local_ip6_ll)
502
503         #
504         # we should have learned an ND entry for the peer's link-local
505         # but not inserted a route to it in the FIB
506         #
507         self.assertTrue(find_nbr(self,
508                                  self.pg0.sw_if_index,
509                                  self.pg0._remote_hosts[3].ip6_ll))
510         self.assertFalse(find_route(self,
511                                     self.pg0._remote_hosts[3].ip6_ll,
512                                     128))
513
514     def test_ns_duplicates(self):
515         """ ND Duplicates"""
516
517         #
518         # Generate some hosts on the LAN
519         #
520         self.pg1.generate_remote_hosts(3)
521
522         #
523         # Add host 1 on pg1 and pg2
524         #
525         ns_pg1 = VppNeighbor(self,
526                              self.pg1.sw_if_index,
527                              self.pg1.remote_hosts[1].mac,
528                              self.pg1.remote_hosts[1].ip6)
529         ns_pg1.add_vpp_config()
530         ns_pg2 = VppNeighbor(self,
531                              self.pg2.sw_if_index,
532                              self.pg2.remote_mac,
533                              self.pg1.remote_hosts[1].ip6)
534         ns_pg2.add_vpp_config()
535
536         #
537         # IP packet destined for pg1 remote host arrives on pg1 again.
538         #
539         p = (Ether(dst=self.pg0.local_mac,
540                    src=self.pg0.remote_mac) /
541              IPv6(src=self.pg0.remote_ip6,
542                   dst=self.pg1.remote_hosts[1].ip6) /
543              inet6.UDP(sport=1234, dport=1234) /
544              Raw())
545
546         self.pg0.add_stream(p)
547         self.pg_enable_capture(self.pg_interfaces)
548         self.pg_start()
549
550         rx1 = self.pg1.get_capture(1)
551
552         self.verify_ip(rx1[0],
553                        self.pg1.local_mac,
554                        self.pg1.remote_hosts[1].mac,
555                        self.pg0.remote_ip6,
556                        self.pg1.remote_hosts[1].ip6)
557
558         #
559         # remove the duplicate on pg1
560         # packet stream should generate NSs out of pg1
561         #
562         ns_pg1.remove_vpp_config()
563
564         self.send_and_expect_ns(self.pg0, self.pg1,
565                                 p, self.pg1.remote_hosts[1].ip6)
566
567         #
568         # Add it back
569         #
570         ns_pg1.add_vpp_config()
571
572         self.pg0.add_stream(p)
573         self.pg_enable_capture(self.pg_interfaces)
574         self.pg_start()
575
576         rx1 = self.pg1.get_capture(1)
577
578         self.verify_ip(rx1[0],
579                        self.pg1.local_mac,
580                        self.pg1.remote_hosts[1].mac,
581                        self.pg0.remote_ip6,
582                        self.pg1.remote_hosts[1].ip6)
583
584     def validate_ra(self, intf, rx, dst_ip=None, src_ip=None,
585                     mtu=9000, pi_opt=None):
586         if not dst_ip:
587             dst_ip = intf.remote_ip6
588         if not src_ip:
589             src_ip = mk_ll_addr(intf.local_mac)
590
591         # unicasted packets must come to the unicast mac
592         self.assertEqual(rx[Ether].dst, intf.remote_mac)
593
594         # and from the router's MAC
595         self.assertEqual(rx[Ether].src, intf.local_mac)
596
597         # the rx'd RA should be addressed to the sender's source
598         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
599         self.assertEqual(in6_ptop(rx[IPv6].dst),
600                          in6_ptop(dst_ip))
601
602         # and come from the router's link local
603         self.assertTrue(in6_islladdr(rx[IPv6].src))
604         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(src_ip))
605
606         # it should contain the links MTU
607         ra = rx[ICMPv6ND_RA]
608         self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
609
610         # it should contain the source's link layer address option
611         sll = ra[ICMPv6NDOptSrcLLAddr]
612         self.assertEqual(sll.lladdr, intf.local_mac)
613
614         if not pi_opt:
615             # the RA should not contain prefix information
616             self.assertFalse(ra.haslayer(
617                 ICMPv6NDOptPrefixInfo))
618         else:
619             raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
620
621             # the options are nested in the scapy packet in way that i cannot
622             # decipher how to decode. this 1st layer of option always returns
623             # nested classes, so a direct obj1=obj2 comparison always fails.
624             # however, the getlayer(.., 2) does give one instance.
625             # so we cheat here and construct a new opt instance for comparison
626             rd = ICMPv6NDOptPrefixInfo(
627                 prefixlen=raos.prefixlen,
628                 prefix=raos.prefix,
629                 L=raos.L,
630                 A=raos.A)
631             if type(pi_opt) is list:
632                 for ii in range(len(pi_opt)):
633                     self.assertEqual(pi_opt[ii], rd)
634                     rd = rx.getlayer(
635                         ICMPv6NDOptPrefixInfo, ii + 2)
636             else:
637                 self.assertEqual(pi_opt, raos, 'Expected: %s, received: %s'
638                                  % (pi_opt.show(dump=True),
639                                     raos.show(dump=True)))
640
641     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
642                            filter_out_fn=is_ipv6_misc,
643                            opt=None,
644                            src_ip=None):
645         self.vapi.cli("clear trace")
646         intf.add_stream(pkts)
647         self.pg_enable_capture(self.pg_interfaces)
648         self.pg_start()
649         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
650
651         self.assertEqual(len(rx), 1)
652         rx = rx[0]
653         self.validate_ra(intf, rx, dst_ip, src_ip=src_ip, pi_opt=opt)
654
655     def test_rs(self):
656         """ IPv6 Router Solicitation Exceptions
657
658         Test scenario:
659         """
660
661         #
662         # Before we begin change the IPv6 RA responses to use the unicast
663         # address - that way we will not confuse them with the periodic
664         # RAs which go to the mcast address
665         # Sit and wait for the first periodic RA.
666         #
667         # TODO
668         #
669         self.pg0.ip6_ra_config(send_unicast=1)
670
671         #
672         # An RS from a link source address
673         #  - expect an RA in return
674         #
675         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
676              IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
677              ICMPv6ND_RS())
678         pkts = [p]
679         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
680
681         #
682         # For the next RS sent the RA should be rate limited
683         #
684         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
685
686         #
687         # When we reconfigure the IPv6 RA config,
688         # we reset the RA rate limiting,
689         # so we need to do this before each test below so as not to drop
690         # packets for rate limiting reasons. Test this works here.
691         #
692         self.pg0.ip6_ra_config(send_unicast=1)
693         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
694
695         #
696         # An RS sent from a non-link local source
697         #
698         self.pg0.ip6_ra_config(send_unicast=1)
699         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
700              IPv6(dst=self.pg0.local_ip6,
701                   src="2002::ffff") /
702              ICMPv6ND_RS())
703         pkts = [p]
704         self.send_and_assert_no_replies(self.pg0, pkts,
705                                         "RS from non-link source")
706
707         #
708         # Source an RS from a link local address
709         #
710         self.pg0.ip6_ra_config(send_unicast=1)
711         ll = mk_ll_addr(self.pg0.remote_mac)
712         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
713              IPv6(dst=self.pg0.local_ip6, src=ll) /
714              ICMPv6ND_RS())
715         pkts = [p]
716         self.send_and_expect_ra(self.pg0, pkts,
717                                 "RS sourced from link-local",
718                                 dst_ip=ll)
719
720         #
721         # Send the RS multicast
722         #
723         self.pg0.ip6_ra_config(send_unicast=1)
724         dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
725         ll = mk_ll_addr(self.pg0.remote_mac)
726         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
727              IPv6(dst="ff02::2", src=ll) /
728              ICMPv6ND_RS())
729         pkts = [p]
730         self.send_and_expect_ra(self.pg0, pkts,
731                                 "RS sourced from link-local",
732                                 dst_ip=ll)
733
734         #
735         # Source from the unspecified address ::. This happens when the RS
736         # is sent before the host has a configured address/sub-net,
737         # i.e. auto-config. Since the sender has no IP address, the reply
738         # comes back mcast - so the capture needs to not filter this.
739         # If we happen to pick up the periodic RA at this point then so be it,
740         # it's not an error.
741         #
742         self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
743         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
744              IPv6(dst="ff02::2", src="::") /
745              ICMPv6ND_RS())
746         pkts = [p]
747         self.send_and_expect_ra(self.pg0, pkts,
748                                 "RS sourced from unspecified",
749                                 dst_ip="ff02::1",
750                                 filter_out_fn=None)
751
752         #
753         # Configure The RA to announce the links prefix
754         #
755         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
756                                self.pg0.local_ip6_prefix_len))
757
758         #
759         # RAs should now contain the prefix information option
760         #
761         opt = ICMPv6NDOptPrefixInfo(
762             prefixlen=self.pg0.local_ip6_prefix_len,
763             prefix=self.pg0.local_ip6,
764             L=1,
765             A=1)
766
767         self.pg0.ip6_ra_config(send_unicast=1)
768         ll = mk_ll_addr(self.pg0.remote_mac)
769         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
770              IPv6(dst=self.pg0.local_ip6, src=ll) /
771              ICMPv6ND_RS())
772         self.send_and_expect_ra(self.pg0, p,
773                                 "RA with prefix-info",
774                                 dst_ip=ll,
775                                 opt=opt)
776
777         #
778         # Change the prefix info to not off-link
779         #  L-flag is clear
780         #
781         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
782                                self.pg0.local_ip6_prefix_len),
783                                off_link=1)
784
785         opt = ICMPv6NDOptPrefixInfo(
786             prefixlen=self.pg0.local_ip6_prefix_len,
787             prefix=self.pg0.local_ip6,
788             L=0,
789             A=1)
790
791         self.pg0.ip6_ra_config(send_unicast=1)
792         self.send_and_expect_ra(self.pg0, p,
793                                 "RA with Prefix info with L-flag=0",
794                                 dst_ip=ll,
795                                 opt=opt)
796
797         #
798         # Change the prefix info to not off-link, no-autoconfig
799         #  L and A flag are clear in the advert
800         #
801         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
802                                self.pg0.local_ip6_prefix_len),
803                                off_link=1,
804                                no_autoconfig=1)
805
806         opt = ICMPv6NDOptPrefixInfo(
807             prefixlen=self.pg0.local_ip6_prefix_len,
808             prefix=self.pg0.local_ip6,
809             L=0,
810             A=0)
811
812         self.pg0.ip6_ra_config(send_unicast=1)
813         self.send_and_expect_ra(self.pg0, p,
814                                 "RA with Prefix info with A & L-flag=0",
815                                 dst_ip=ll,
816                                 opt=opt)
817
818         #
819         # Change the flag settings back to the defaults
820         #  L and A flag are set in the advert
821         #
822         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
823                                self.pg0.local_ip6_prefix_len))
824
825         opt = ICMPv6NDOptPrefixInfo(
826             prefixlen=self.pg0.local_ip6_prefix_len,
827             prefix=self.pg0.local_ip6,
828             L=1,
829             A=1)
830
831         self.pg0.ip6_ra_config(send_unicast=1)
832         self.send_and_expect_ra(self.pg0, p,
833                                 "RA with Prefix info",
834                                 dst_ip=ll,
835                                 opt=opt)
836
837         #
838         # Change the prefix info to not off-link, no-autoconfig
839         #  L and A flag are clear in the advert
840         #
841         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
842                                self.pg0.local_ip6_prefix_len),
843                                off_link=1,
844                                no_autoconfig=1)
845
846         opt = ICMPv6NDOptPrefixInfo(
847             prefixlen=self.pg0.local_ip6_prefix_len,
848             prefix=self.pg0.local_ip6,
849             L=0,
850             A=0)
851
852         self.pg0.ip6_ra_config(send_unicast=1)
853         self.send_and_expect_ra(self.pg0, p,
854                                 "RA with Prefix info with A & L-flag=0",
855                                 dst_ip=ll,
856                                 opt=opt)
857
858         #
859         # Use the reset to defaults option to revert to defaults
860         #  L and A flag are clear in the advert
861         #
862         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
863                                self.pg0.local_ip6_prefix_len),
864                                use_default=1)
865
866         opt = ICMPv6NDOptPrefixInfo(
867             prefixlen=self.pg0.local_ip6_prefix_len,
868             prefix=self.pg0.local_ip6,
869             L=1,
870             A=1)
871
872         self.pg0.ip6_ra_config(send_unicast=1)
873         self.send_and_expect_ra(self.pg0, p,
874                                 "RA with Prefix reverted to defaults",
875                                 dst_ip=ll,
876                                 opt=opt)
877
878         #
879         # Advertise Another prefix. With no L-flag/A-flag
880         #
881         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
882                                self.pg1.local_ip6_prefix_len),
883                                off_link=1,
884                                no_autoconfig=1)
885
886         opt = [ICMPv6NDOptPrefixInfo(
887             prefixlen=self.pg0.local_ip6_prefix_len,
888             prefix=self.pg0.local_ip6,
889             L=1,
890             A=1),
891             ICMPv6NDOptPrefixInfo(
892                 prefixlen=self.pg1.local_ip6_prefix_len,
893                 prefix=self.pg1.local_ip6,
894                 L=0,
895                 A=0)]
896
897         self.pg0.ip6_ra_config(send_unicast=1)
898         ll = mk_ll_addr(self.pg0.remote_mac)
899         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
900              IPv6(dst=self.pg0.local_ip6, src=ll) /
901              ICMPv6ND_RS())
902         self.send_and_expect_ra(self.pg0, p,
903                                 "RA with multiple Prefix infos",
904                                 dst_ip=ll,
905                                 opt=opt)
906
907         #
908         # Remove the first prefix-info - expect the second is still in the
909         # advert
910         #
911         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
912                                self.pg0.local_ip6_prefix_len),
913                                is_no=1)
914
915         opt = ICMPv6NDOptPrefixInfo(
916             prefixlen=self.pg1.local_ip6_prefix_len,
917             prefix=self.pg1.local_ip6,
918             L=0,
919             A=0)
920
921         self.pg0.ip6_ra_config(send_unicast=1)
922         self.send_and_expect_ra(self.pg0, p,
923                                 "RA with Prefix reverted to defaults",
924                                 dst_ip=ll,
925                                 opt=opt)
926
927         #
928         # Remove the second prefix-info - expect no prefix-info in the adverts
929         #
930         self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
931                                self.pg1.local_ip6_prefix_len),
932                                is_no=1)
933
934         #
935         # change the link's link local, so we know that works too.
936         #
937         self.vapi.sw_interface_ip6_set_link_local_address(
938             sw_if_index=self.pg0.sw_if_index,
939             ip="fe80::88")
940
941         self.pg0.ip6_ra_config(send_unicast=1)
942         self.send_and_expect_ra(self.pg0, p,
943                                 "RA with Prefix reverted to defaults",
944                                 dst_ip=ll,
945                                 src_ip="fe80::88")
946
947         #
948         # Reset the periodic advertisements back to default values
949         #
950         self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
951
952     def test_mld(self):
953         """ MLD Report """
954         #
955         # test one MLD is sent after applying an IPv6 Address on an interface
956         #
957         self.pg_enable_capture(self.pg_interfaces)
958         self.pg_start()
959
960         subitf = VppDot1QSubint(self, self.pg1, 99)
961
962         subitf.admin_up()
963         subitf.config_ip6()
964
965         rxs = self.pg1._get_capture(timeout=4, filter_out_fn=None)
966
967         #
968         # hunt for the MLD on vlan 99
969         #
970         for rx in rxs:
971             # make sure ipv6 packets with hop by hop options have
972             # correct checksums
973             self.assert_packet_checksums_valid(rx)
974             if rx.haslayer(IPv6ExtHdrHopByHop) and \
975                rx.haslayer(Dot1Q) and \
976                rx[Dot1Q].vlan == 99:
977                 mld = rx[ICMPv6MLReport2]
978
979         self.assertEqual(mld.records_number, 4)
980
981
982 class TestIPv6RouteLookup(VppTestCase):
983     """ IPv6 Route Lookup Test Case """
984     routes = []
985
986     def route_lookup(self, prefix, exact):
987         return self.vapi.api(self.vapi.papi.ip_route_lookup,
988                              {
989                                  'table_id': 0,
990                                  'exact': exact,
991                                  'prefix': prefix,
992                              })
993
994     @classmethod
995     def setUpClass(cls):
996         super(TestIPv6RouteLookup, cls).setUpClass()
997
998     @classmethod
999     def tearDownClass(cls):
1000         super(TestIPv6RouteLookup, cls).tearDownClass()
1001
1002     def setUp(self):
1003         super(TestIPv6RouteLookup, self).setUp()
1004
1005         drop_nh = VppRoutePath("::1", 0xffffffff,
1006                                type=FibPathType.FIB_PATH_TYPE_DROP)
1007
1008         # Add 3 routes
1009         r = VppIpRoute(self, "2001:1111::", 32, [drop_nh])
1010         r.add_vpp_config()
1011         self.routes.append(r)
1012
1013         r = VppIpRoute(self, "2001:1111:2222::", 48, [drop_nh])
1014         r.add_vpp_config()
1015         self.routes.append(r)
1016
1017         r = VppIpRoute(self, "2001:1111:2222::1", 128, [drop_nh])
1018         r.add_vpp_config()
1019         self.routes.append(r)
1020
1021     def tearDown(self):
1022         # Remove the routes we added
1023         for r in self.routes:
1024             r.remove_vpp_config()
1025
1026         super(TestIPv6RouteLookup, self).tearDown()
1027
1028     def test_exact_match(self):
1029         # Verify we find the host route
1030         prefix = "2001:1111:2222::1/128"
1031         result = self.route_lookup(prefix, True)
1032         assert (prefix == str(result.route.prefix))
1033
1034         # Verify we find a middle prefix route
1035         prefix = "2001:1111:2222::/48"
1036         result = self.route_lookup(prefix, True)
1037         assert (prefix == str(result.route.prefix))
1038
1039         # Verify we do not find an available LPM.
1040         with self.vapi.assert_negative_api_retval():
1041             self.route_lookup("2001::2/128", True)
1042
1043     def test_longest_prefix_match(self):
1044         # verify we find lpm
1045         lpm_prefix = "2001:1111:2222::/48"
1046         result = self.route_lookup("2001:1111:2222::2/128", False)
1047         assert (lpm_prefix == str(result.route.prefix))
1048
1049         # Verify we find the exact when not requested
1050         result = self.route_lookup(lpm_prefix, False)
1051         assert (lpm_prefix == str(result.route.prefix))
1052
1053         # Can't seem to delete the default route so no negative LPM test.
1054
1055
1056 class TestIPv6IfAddrRoute(VppTestCase):
1057     """ IPv6 Interface Addr Route Test Case """
1058
1059     @classmethod
1060     def setUpClass(cls):
1061         super(TestIPv6IfAddrRoute, cls).setUpClass()
1062
1063     @classmethod
1064     def tearDownClass(cls):
1065         super(TestIPv6IfAddrRoute, cls).tearDownClass()
1066
1067     def setUp(self):
1068         super(TestIPv6IfAddrRoute, self).setUp()
1069
1070         # create 1 pg interface
1071         self.create_pg_interfaces(range(1))
1072
1073         for i in self.pg_interfaces:
1074             i.admin_up()
1075             i.config_ip6()
1076             i.resolve_ndp()
1077
1078     def tearDown(self):
1079         super(TestIPv6IfAddrRoute, self).tearDown()
1080         for i in self.pg_interfaces:
1081             i.unconfig_ip6()
1082             i.admin_down()
1083
1084     def test_ipv6_ifaddrs_same_prefix(self):
1085         """ IPv6 Interface Addresses Same Prefix test
1086
1087         Test scenario:
1088
1089             - Verify no route in FIB for prefix 2001:10::/64
1090             - Configure IPv4 address 2001:10::10/64  on an interface
1091             - Verify route in FIB for prefix 2001:10::/64
1092             - Configure IPv4 address 2001:10::20/64 on an interface
1093             - Delete 2001:10::10/64 from interface
1094             - Verify route in FIB for prefix 2001:10::/64
1095             - Delete 2001:10::20/64 from interface
1096             - Verify no route in FIB for prefix 2001:10::/64
1097         """
1098
1099         addr1 = "2001:10::10"
1100         addr2 = "2001:10::20"
1101
1102         if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
1103         if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
1104         self.assertFalse(if_addr1.query_vpp_config())
1105         self.assertFalse(find_route(self, addr1, 128))
1106         self.assertFalse(find_route(self, addr2, 128))
1107
1108         # configure first address, verify route present
1109         if_addr1.add_vpp_config()
1110         self.assertTrue(if_addr1.query_vpp_config())
1111         self.assertTrue(find_route(self, addr1, 128))
1112         self.assertFalse(find_route(self, addr2, 128))
1113
1114         # configure second address, delete first, verify route not removed
1115         if_addr2.add_vpp_config()
1116         if_addr1.remove_vpp_config()
1117         self.assertFalse(if_addr1.query_vpp_config())
1118         self.assertTrue(if_addr2.query_vpp_config())
1119         self.assertFalse(find_route(self, addr1, 128))
1120         self.assertTrue(find_route(self, addr2, 128))
1121
1122         # delete second address, verify route removed
1123         if_addr2.remove_vpp_config()
1124         self.assertFalse(if_addr1.query_vpp_config())
1125         self.assertFalse(find_route(self, addr1, 128))
1126         self.assertFalse(find_route(self, addr2, 128))
1127
1128
1129 class TestICMPv6Echo(VppTestCase):
1130     """ ICMPv6 Echo Test Case """
1131
1132     @classmethod
1133     def setUpClass(cls):
1134         super(TestICMPv6Echo, cls).setUpClass()
1135
1136     @classmethod
1137     def tearDownClass(cls):
1138         super(TestICMPv6Echo, cls).tearDownClass()
1139
1140     def setUp(self):
1141         super(TestICMPv6Echo, self).setUp()
1142
1143         # create 1 pg interface
1144         self.create_pg_interfaces(range(1))
1145
1146         for i in self.pg_interfaces:
1147             i.admin_up()
1148             i.config_ip6()
1149             i.resolve_ndp()
1150
1151     def tearDown(self):
1152         super(TestICMPv6Echo, self).tearDown()
1153         for i in self.pg_interfaces:
1154             i.unconfig_ip6()
1155             i.admin_down()
1156
1157     def test_icmpv6_echo(self):
1158         """ VPP replies to ICMPv6 Echo Request
1159
1160         Test scenario:
1161
1162             - Receive ICMPv6 Echo Request message on pg0 interface.
1163             - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
1164         """
1165
1166         icmpv6_id = 0xb
1167         icmpv6_seq = 5
1168         icmpv6_data = b'\x0a' * 18
1169         p_echo_request = (Ether(src=self.pg0.remote_mac,
1170                                 dst=self.pg0.local_mac) /
1171                           IPv6(src=self.pg0.remote_ip6,
1172                                dst=self.pg0.local_ip6) /
1173                           ICMPv6EchoRequest(
1174                               id=icmpv6_id,
1175                               seq=icmpv6_seq,
1176                               data=icmpv6_data))
1177
1178         self.pg0.add_stream(p_echo_request)
1179         self.pg_enable_capture(self.pg_interfaces)
1180         self.pg_start()
1181
1182         rx = self.pg0.get_capture(1)
1183         rx = rx[0]
1184         ether = rx[Ether]
1185         ipv6 = rx[IPv6]
1186         icmpv6 = rx[ICMPv6EchoReply]
1187
1188         self.assertEqual(ether.src, self.pg0.local_mac)
1189         self.assertEqual(ether.dst, self.pg0.remote_mac)
1190
1191         self.assertEqual(ipv6.src, self.pg0.local_ip6)
1192         self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
1193
1194         self.assertEqual(
1195             icmp6types[icmpv6.type], "Echo Reply")
1196         self.assertEqual(icmpv6.id, icmpv6_id)
1197         self.assertEqual(icmpv6.seq, icmpv6_seq)
1198         self.assertEqual(icmpv6.data, icmpv6_data)
1199
1200
1201 class TestIPv6RD(TestIPv6ND):
1202     """ IPv6 Router Discovery Test Case """
1203
1204     @classmethod
1205     def setUpClass(cls):
1206         super(TestIPv6RD, cls).setUpClass()
1207
1208     @classmethod
1209     def tearDownClass(cls):
1210         super(TestIPv6RD, cls).tearDownClass()
1211
1212     def setUp(self):
1213         super(TestIPv6RD, self).setUp()
1214
1215         # create 2 pg interfaces
1216         self.create_pg_interfaces(range(2))
1217
1218         self.interfaces = list(self.pg_interfaces)
1219
1220         # setup all interfaces
1221         for i in self.interfaces:
1222             i.admin_up()
1223             i.config_ip6()
1224
1225     def tearDown(self):
1226         for i in self.interfaces:
1227             i.unconfig_ip6()
1228             i.admin_down()
1229         super(TestIPv6RD, self).tearDown()
1230
1231     def test_rd_send_router_solicitation(self):
1232         """ Verify router solicitation packets """
1233
1234         count = 2
1235         self.pg_enable_capture(self.pg_interfaces)
1236         self.pg_start()
1237         self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index,
1238                                                  mrc=count)
1239         rx_list = self.pg1.get_capture(count, timeout=3)
1240         self.assertEqual(len(rx_list), count)
1241         for packet in rx_list:
1242             self.assertEqual(packet.haslayer(IPv6), 1)
1243             self.assertEqual(packet[IPv6].haslayer(
1244                 ICMPv6ND_RS), 1)
1245             dst = ip6_normalize(packet[IPv6].dst)
1246             dst2 = ip6_normalize("ff02::2")
1247             self.assert_equal(dst, dst2)
1248             src = ip6_normalize(packet[IPv6].src)
1249             src2 = ip6_normalize(self.pg1.local_ip6_ll)
1250             self.assert_equal(src, src2)
1251             self.assertTrue(
1252                 bool(packet[ICMPv6ND_RS].haslayer(
1253                     ICMPv6NDOptSrcLLAddr)))
1254             self.assert_equal(
1255                 packet[ICMPv6NDOptSrcLLAddr].lladdr,
1256                 self.pg1.local_mac)
1257
1258     def verify_prefix_info(self, reported_prefix, prefix_option):
1259         prefix = IPv6Network(
1260             text_type(prefix_option.getfieldval("prefix") +
1261                       "/" +
1262                       text_type(prefix_option.getfieldval("prefixlen"))),
1263             strict=False)
1264         self.assert_equal(reported_prefix.prefix.network_address,
1265                           prefix.network_address)
1266         L = prefix_option.getfieldval("L")
1267         A = prefix_option.getfieldval("A")
1268         option_flags = (L << 7) | (A << 6)
1269         self.assert_equal(reported_prefix.flags, option_flags)
1270         self.assert_equal(reported_prefix.valid_time,
1271                           prefix_option.getfieldval("validlifetime"))
1272         self.assert_equal(reported_prefix.preferred_time,
1273                           prefix_option.getfieldval("preferredlifetime"))
1274
1275     def test_rd_receive_router_advertisement(self):
1276         """ Verify events triggered by received RA packets """
1277
1278         self.vapi.want_ip6_ra_events(enable=1)
1279
1280         prefix_info_1 = ICMPv6NDOptPrefixInfo(
1281             prefix="1::2",
1282             prefixlen=50,
1283             validlifetime=200,
1284             preferredlifetime=500,
1285             L=1,
1286             A=1,
1287         )
1288
1289         prefix_info_2 = ICMPv6NDOptPrefixInfo(
1290             prefix="7::4",
1291             prefixlen=20,
1292             validlifetime=70,
1293             preferredlifetime=1000,
1294             L=1,
1295             A=0,
1296         )
1297
1298         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1299              IPv6(dst=self.pg1.local_ip6_ll,
1300                   src=mk_ll_addr(self.pg1.remote_mac)) /
1301              ICMPv6ND_RA() /
1302              prefix_info_1 /
1303              prefix_info_2)
1304         self.pg1.add_stream([p])
1305         self.pg_start()
1306
1307         ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1308
1309         self.assert_equal(ev.current_hop_limit, 0)
1310         self.assert_equal(ev.flags, 8)
1311         self.assert_equal(ev.router_lifetime_in_sec, 1800)
1312         self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1313         self.assert_equal(
1314             ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0)
1315
1316         self.assert_equal(ev.n_prefixes, 2)
1317
1318         self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1319         self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1320
1321
1322 class TestIPv6RDControlPlane(TestIPv6ND):
1323     """ IPv6 Router Discovery Control Plane Test Case """
1324
1325     @classmethod
1326     def setUpClass(cls):
1327         super(TestIPv6RDControlPlane, cls).setUpClass()
1328
1329     @classmethod
1330     def tearDownClass(cls):
1331         super(TestIPv6RDControlPlane, cls).tearDownClass()
1332
1333     def setUp(self):
1334         super(TestIPv6RDControlPlane, self).setUp()
1335
1336         # create 1 pg interface
1337         self.create_pg_interfaces(range(1))
1338
1339         self.interfaces = list(self.pg_interfaces)
1340
1341         # setup all interfaces
1342         for i in self.interfaces:
1343             i.admin_up()
1344             i.config_ip6()
1345
1346     def tearDown(self):
1347         super(TestIPv6RDControlPlane, self).tearDown()
1348
1349     @staticmethod
1350     def create_ra_packet(pg, routerlifetime=None):
1351         src_ip = pg.remote_ip6_ll
1352         dst_ip = pg.local_ip6
1353         if routerlifetime is not None:
1354             ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1355         else:
1356             ra = ICMPv6ND_RA()
1357         p = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
1358              IPv6(dst=dst_ip, src=src_ip) / ra)
1359         return p
1360
1361     @staticmethod
1362     def get_default_routes(fib):
1363         list = []
1364         for entry in fib:
1365             if entry.route.prefix.prefixlen == 0:
1366                 for path in entry.route.paths:
1367                     if path.sw_if_index != 0xFFFFFFFF:
1368                         defaut_route = {}
1369                         defaut_route['sw_if_index'] = path.sw_if_index
1370                         defaut_route['next_hop'] = path.nh.address.ip6
1371                         list.append(defaut_route)
1372         return list
1373
1374     @staticmethod
1375     def get_interface_addresses(fib, pg):
1376         list = []
1377         for entry in fib:
1378             if entry.route.prefix.prefixlen == 128:
1379                 path = entry.route.paths[0]
1380                 if path.sw_if_index == pg.sw_if_index:
1381                     list.append(str(entry.route.prefix.network_address))
1382         return list
1383
1384     def wait_for_no_default_route(self, n_tries=50, s_time=1):
1385         while (n_tries):
1386             fib = self.vapi.ip_route_dump(0, True)
1387             default_routes = self.get_default_routes(fib)
1388             if 0 == len(default_routes):
1389                 return True
1390             n_tries = n_tries - 1
1391             self.sleep(s_time)
1392
1393         return False
1394
1395     def test_all(self):
1396         """ Test handling of SLAAC addresses and default routes """
1397
1398         fib = self.vapi.ip_route_dump(0, True)
1399         default_routes = self.get_default_routes(fib)
1400         initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1401         self.assertEqual(default_routes, [])
1402         router_address = IPv6Address(text_type(self.pg0.remote_ip6_ll))
1403
1404         self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1405
1406         self.sleep(0.1)
1407
1408         # send RA
1409         packet = (self.create_ra_packet(
1410             self.pg0) / ICMPv6NDOptPrefixInfo(
1411             prefix="1::",
1412             prefixlen=64,
1413             validlifetime=2,
1414             preferredlifetime=2,
1415             L=1,
1416             A=1,
1417         ) / ICMPv6NDOptPrefixInfo(
1418             prefix="7::",
1419             prefixlen=20,
1420             validlifetime=1500,
1421             preferredlifetime=1000,
1422             L=1,
1423             A=0,
1424         ))
1425         self.pg0.add_stream([packet])
1426         self.pg_start()
1427
1428         self.sleep_on_vpp_time(0.1)
1429
1430         fib = self.vapi.ip_route_dump(0, True)
1431
1432         # check FIB for new address
1433         addresses = set(self.get_interface_addresses(fib, self.pg0))
1434         new_addresses = addresses.difference(initial_addresses)
1435         self.assertEqual(len(new_addresses), 1)
1436         prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
1437                              strict=False)
1438         self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
1439
1440         # check FIB for new default route
1441         default_routes = self.get_default_routes(fib)
1442         self.assertEqual(len(default_routes), 1)
1443         dr = default_routes[0]
1444         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1445         self.assertEqual(dr['next_hop'], router_address)
1446
1447         # send RA to delete default route
1448         packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1449         self.pg0.add_stream([packet])
1450         self.pg_start()
1451
1452         self.sleep_on_vpp_time(0.1)
1453
1454         # check that default route is deleted
1455         fib = self.vapi.ip_route_dump(0, True)
1456         default_routes = self.get_default_routes(fib)
1457         self.assertEqual(len(default_routes), 0)
1458
1459         self.sleep_on_vpp_time(0.1)
1460
1461         # send RA
1462         packet = self.create_ra_packet(self.pg0)
1463         self.pg0.add_stream([packet])
1464         self.pg_start()
1465
1466         self.sleep_on_vpp_time(0.1)
1467
1468         # check FIB for new default route
1469         fib = self.vapi.ip_route_dump(0, True)
1470         default_routes = self.get_default_routes(fib)
1471         self.assertEqual(len(default_routes), 1)
1472         dr = default_routes[0]
1473         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1474         self.assertEqual(dr['next_hop'], router_address)
1475
1476         # send RA, updating router lifetime to 1s
1477         packet = self.create_ra_packet(self.pg0, 1)
1478         self.pg0.add_stream([packet])
1479         self.pg_start()
1480
1481         self.sleep_on_vpp_time(0.1)
1482
1483         # check that default route still exists
1484         fib = self.vapi.ip_route_dump(0, True)
1485         default_routes = self.get_default_routes(fib)
1486         self.assertEqual(len(default_routes), 1)
1487         dr = default_routes[0]
1488         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1489         self.assertEqual(dr['next_hop'], router_address)
1490
1491         self.sleep_on_vpp_time(1)
1492
1493         # check that default route is deleted
1494         self.assertTrue(self.wait_for_no_default_route())
1495
1496         # check FIB still contains the SLAAC address
1497         addresses = set(self.get_interface_addresses(fib, self.pg0))
1498         new_addresses = addresses.difference(initial_addresses)
1499
1500         self.assertEqual(len(new_addresses), 1)
1501         prefix = IPv6Network(text_type("%s/%d" % (list(new_addresses)[0], 20)),
1502                              strict=False)
1503         self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
1504
1505         self.sleep_on_vpp_time(1)
1506
1507         # check that SLAAC address is deleted
1508         fib = self.vapi.ip_route_dump(0, True)
1509         addresses = set(self.get_interface_addresses(fib, self.pg0))
1510         new_addresses = addresses.difference(initial_addresses)
1511         self.assertEqual(len(new_addresses), 0)
1512
1513
1514 class IPv6NDProxyTest(TestIPv6ND):
1515     """ IPv6 ND ProxyTest Case """
1516
1517     @classmethod
1518     def setUpClass(cls):
1519         super(IPv6NDProxyTest, cls).setUpClass()
1520
1521     @classmethod
1522     def tearDownClass(cls):
1523         super(IPv6NDProxyTest, cls).tearDownClass()
1524
1525     def setUp(self):
1526         super(IPv6NDProxyTest, self).setUp()
1527
1528         # create 3 pg interfaces
1529         self.create_pg_interfaces(range(3))
1530
1531         # pg0 is the master interface, with the configured subnet
1532         self.pg0.admin_up()
1533         self.pg0.config_ip6()
1534         self.pg0.resolve_ndp()
1535
1536         self.pg1.ip6_enable()
1537         self.pg2.ip6_enable()
1538
1539     def tearDown(self):
1540         super(IPv6NDProxyTest, self).tearDown()
1541
1542     def test_nd_proxy(self):
1543         """ IPv6 Proxy ND """
1544
1545         #
1546         # Generate some hosts in the subnet that we are proxying
1547         #
1548         self.pg0.generate_remote_hosts(8)
1549
1550         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1551         d = inet_ntop(AF_INET6, nsma)
1552
1553         #
1554         # Send an NS for one of those remote hosts on one of the proxy links
1555         # expect no response since it's from an address that is not
1556         # on the link that has the prefix configured
1557         #
1558         ns_pg1 = (Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac) /
1559                   IPv6(dst=d,
1560                        src=self.pg0._remote_hosts[2].ip6) /
1561                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1562                   ICMPv6NDOptSrcLLAddr(
1563                       lladdr=self.pg0._remote_hosts[2].mac))
1564
1565         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1566
1567         #
1568         # Add proxy support for the host
1569         #
1570         self.vapi.ip6nd_proxy_add_del(
1571             is_add=1, ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1572             sw_if_index=self.pg1.sw_if_index)
1573
1574         #
1575         # try that NS again. this time we expect an NA back
1576         #
1577         self.send_and_expect_na(self.pg1, ns_pg1,
1578                                 "NS to proxy entry",
1579                                 dst_ip=self.pg0._remote_hosts[2].ip6,
1580                                 tgt_ip=self.pg0.local_ip6)
1581
1582         #
1583         # ... and that we have an entry in the ND cache
1584         #
1585         self.assertTrue(find_nbr(self,
1586                                  self.pg1.sw_if_index,
1587                                  self.pg0._remote_hosts[2].ip6))
1588
1589         #
1590         # ... and we can route traffic to it
1591         #
1592         t = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1593              IPv6(dst=self.pg0._remote_hosts[2].ip6,
1594                   src=self.pg0.remote_ip6) /
1595              inet6.UDP(sport=10000, dport=20000) /
1596              Raw(b'\xa5' * 100))
1597
1598         self.pg0.add_stream(t)
1599         self.pg_enable_capture(self.pg_interfaces)
1600         self.pg_start()
1601         rx = self.pg1.get_capture(1)
1602         rx = rx[0]
1603
1604         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1605         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1606
1607         self.assertEqual(rx[IPv6].src,
1608                          t[IPv6].src)
1609         self.assertEqual(rx[IPv6].dst,
1610                          t[IPv6].dst)
1611
1612         #
1613         # Test we proxy for the host on the main interface
1614         #
1615         ns_pg0 = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
1616                   IPv6(dst=d, src=self.pg0.remote_ip6) /
1617                   ICMPv6ND_NS(
1618                       tgt=self.pg0._remote_hosts[2].ip6) /
1619                   ICMPv6NDOptSrcLLAddr(
1620                       lladdr=self.pg0.remote_mac))
1621
1622         self.send_and_expect_na(self.pg0, ns_pg0,
1623                                 "NS to proxy entry on main",
1624                                 tgt_ip=self.pg0._remote_hosts[2].ip6,
1625                                 dst_ip=self.pg0.remote_ip6)
1626
1627         #
1628         # Setup and resolve proxy for another host on another interface
1629         #
1630         ns_pg2 = (Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac) /
1631                   IPv6(dst=d,
1632                        src=self.pg0._remote_hosts[3].ip6) /
1633                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1634                   ICMPv6NDOptSrcLLAddr(
1635                       lladdr=self.pg0._remote_hosts[2].mac))
1636
1637         self.vapi.ip6nd_proxy_add_del(
1638             is_add=1, ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1639             sw_if_index=self.pg2.sw_if_index)
1640
1641         self.send_and_expect_na(self.pg2, ns_pg2,
1642                                 "NS to proxy entry other interface",
1643                                 dst_ip=self.pg0._remote_hosts[3].ip6,
1644                                 tgt_ip=self.pg0.local_ip6)
1645
1646         self.assertTrue(find_nbr(self,
1647                                  self.pg2.sw_if_index,
1648                                  self.pg0._remote_hosts[3].ip6))
1649
1650         #
1651         # hosts can communicate. pg2->pg1
1652         #
1653         t2 = (Ether(dst=self.pg2.local_mac,
1654                     src=self.pg0.remote_hosts[3].mac) /
1655               IPv6(dst=self.pg0._remote_hosts[2].ip6,
1656                    src=self.pg0._remote_hosts[3].ip6) /
1657               inet6.UDP(sport=10000, dport=20000) /
1658               Raw(b'\xa5' * 100))
1659
1660         self.pg2.add_stream(t2)
1661         self.pg_enable_capture(self.pg_interfaces)
1662         self.pg_start()
1663         rx = self.pg1.get_capture(1)
1664         rx = rx[0]
1665
1666         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1667         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1668
1669         self.assertEqual(rx[IPv6].src,
1670                          t2[IPv6].src)
1671         self.assertEqual(rx[IPv6].dst,
1672                          t2[IPv6].dst)
1673
1674         #
1675         # remove the proxy configs
1676         #
1677         self.vapi.ip6nd_proxy_add_del(
1678             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1679             sw_if_index=self.pg1.sw_if_index, is_add=0)
1680         self.vapi.ip6nd_proxy_add_del(
1681             ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1682             sw_if_index=self.pg2.sw_if_index, is_add=0)
1683
1684         self.assertFalse(find_nbr(self,
1685                                   self.pg2.sw_if_index,
1686                                   self.pg0._remote_hosts[3].ip6))
1687         self.assertFalse(find_nbr(self,
1688                                   self.pg1.sw_if_index,
1689                                   self.pg0._remote_hosts[2].ip6))
1690
1691         #
1692         # no longer proxy-ing...
1693         #
1694         self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1695         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1696         self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1697
1698         #
1699         # no longer forwarding. traffic generates NS out of the glean/main
1700         # interface
1701         #
1702         self.pg2.add_stream(t2)
1703         self.pg_enable_capture(self.pg_interfaces)
1704         self.pg_start()
1705
1706         rx = self.pg0.get_capture(1)
1707
1708         self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1709
1710
1711 class TestIPNull(VppTestCase):
1712     """ IPv6 routes via NULL """
1713
1714     @classmethod
1715     def setUpClass(cls):
1716         super(TestIPNull, cls).setUpClass()
1717
1718     @classmethod
1719     def tearDownClass(cls):
1720         super(TestIPNull, cls).tearDownClass()
1721
1722     def setUp(self):
1723         super(TestIPNull, self).setUp()
1724
1725         # create 2 pg interfaces
1726         self.create_pg_interfaces(range(1))
1727
1728         for i in self.pg_interfaces:
1729             i.admin_up()
1730             i.config_ip6()
1731             i.resolve_ndp()
1732
1733     def tearDown(self):
1734         super(TestIPNull, self).tearDown()
1735         for i in self.pg_interfaces:
1736             i.unconfig_ip6()
1737             i.admin_down()
1738
1739     def test_ip_null(self):
1740         """ IP NULL route """
1741
1742         p = (Ether(src=self.pg0.remote_mac,
1743                    dst=self.pg0.local_mac) /
1744              IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
1745              inet6.UDP(sport=1234, dport=1234) /
1746              Raw(b'\xa5' * 100))
1747
1748         #
1749         # A route via IP NULL that will reply with ICMP unreachables
1750         #
1751         ip_unreach = VppIpRoute(
1752             self, "2001::", 64,
1753             [VppRoutePath("::", 0xffffffff,
1754                           type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH)])
1755         ip_unreach.add_vpp_config()
1756
1757         self.pg0.add_stream(p)
1758         self.pg_enable_capture(self.pg_interfaces)
1759         self.pg_start()
1760
1761         rx = self.pg0.get_capture(1)
1762         rx = rx[0]
1763         icmp = rx[ICMPv6DestUnreach]
1764
1765         # 0 = "No route to destination"
1766         self.assertEqual(icmp.code, 0)
1767
1768         # ICMP is rate limited. pause a bit
1769         self.sleep(1)
1770
1771         #
1772         # A route via IP NULL that will reply with ICMP prohibited
1773         #
1774         ip_prohibit = VppIpRoute(
1775             self, "2001::1", 128,
1776             [VppRoutePath("::", 0xffffffff,
1777                           type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT)])
1778         ip_prohibit.add_vpp_config()
1779
1780         self.pg0.add_stream(p)
1781         self.pg_enable_capture(self.pg_interfaces)
1782         self.pg_start()
1783
1784         rx = self.pg0.get_capture(1)
1785         rx = rx[0]
1786         icmp = rx[ICMPv6DestUnreach]
1787
1788         # 1 = "Communication with destination administratively prohibited"
1789         self.assertEqual(icmp.code, 1)
1790
1791
1792 class TestIPDisabled(VppTestCase):
1793     """ IPv6 disabled """
1794
1795     @classmethod
1796     def setUpClass(cls):
1797         super(TestIPDisabled, cls).setUpClass()
1798
1799     @classmethod
1800     def tearDownClass(cls):
1801         super(TestIPDisabled, cls).tearDownClass()
1802
1803     def setUp(self):
1804         super(TestIPDisabled, self).setUp()
1805
1806         # create 2 pg interfaces
1807         self.create_pg_interfaces(range(2))
1808
1809         # PG0 is IP enabled
1810         self.pg0.admin_up()
1811         self.pg0.config_ip6()
1812         self.pg0.resolve_ndp()
1813
1814         # PG 1 is not IP enabled
1815         self.pg1.admin_up()
1816
1817     def tearDown(self):
1818         super(TestIPDisabled, self).tearDown()
1819         for i in self.pg_interfaces:
1820             i.unconfig_ip4()
1821             i.admin_down()
1822
1823     def test_ip_disabled(self):
1824         """ IP Disabled """
1825
1826         #
1827         # An (S,G).
1828         # one accepting interface, pg0, 2 forwarding interfaces
1829         #
1830         route_ff_01 = VppIpMRoute(
1831             self,
1832             "::",
1833             "ffef::1", 128,
1834             MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
1835             [VppMRoutePath(self.pg1.sw_if_index,
1836                            MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT),
1837              VppMRoutePath(self.pg0.sw_if_index,
1838                            MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)])
1839         route_ff_01.add_vpp_config()
1840
1841         pu = (Ether(src=self.pg1.remote_mac,
1842                     dst=self.pg1.local_mac) /
1843               IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
1844               inet6.UDP(sport=1234, dport=1234) /
1845               Raw(b'\xa5' * 100))
1846         pm = (Ether(src=self.pg1.remote_mac,
1847                     dst=self.pg1.local_mac) /
1848               IPv6(src="2001::1", dst="ffef::1") /
1849               inet6.UDP(sport=1234, dport=1234) /
1850               Raw(b'\xa5' * 100))
1851
1852         #
1853         # PG1 does not forward IP traffic
1854         #
1855         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1856         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1857
1858         #
1859         # IP enable PG1
1860         #
1861         self.pg1.config_ip6()
1862
1863         #
1864         # Now we get packets through
1865         #
1866         self.pg1.add_stream(pu)
1867         self.pg_enable_capture(self.pg_interfaces)
1868         self.pg_start()
1869         rx = self.pg0.get_capture(1)
1870
1871         self.pg1.add_stream(pm)
1872         self.pg_enable_capture(self.pg_interfaces)
1873         self.pg_start()
1874         rx = self.pg0.get_capture(1)
1875
1876         #
1877         # Disable PG1
1878         #
1879         self.pg1.unconfig_ip6()
1880
1881         #
1882         # PG1 does not forward IP traffic
1883         #
1884         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1885         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1886
1887
1888 class TestIP6LoadBalance(VppTestCase):
1889     """ IPv6 Load-Balancing """
1890
1891     @classmethod
1892     def setUpClass(cls):
1893         super(TestIP6LoadBalance, cls).setUpClass()
1894
1895     @classmethod
1896     def tearDownClass(cls):
1897         super(TestIP6LoadBalance, cls).tearDownClass()
1898
1899     def setUp(self):
1900         super(TestIP6LoadBalance, self).setUp()
1901
1902         self.create_pg_interfaces(range(5))
1903
1904         mpls_tbl = VppMplsTable(self, 0)
1905         mpls_tbl.add_vpp_config()
1906
1907         for i in self.pg_interfaces:
1908             i.admin_up()
1909             i.config_ip6()
1910             i.resolve_ndp()
1911             i.enable_mpls()
1912
1913     def tearDown(self):
1914         for i in self.pg_interfaces:
1915             i.unconfig_ip6()
1916             i.admin_down()
1917             i.disable_mpls()
1918         super(TestIP6LoadBalance, self).tearDown()
1919
1920     def pg_send(self, input, pkts):
1921         self.vapi.cli("clear trace")
1922         input.add_stream(pkts)
1923         self.pg_enable_capture(self.pg_interfaces)
1924         self.pg_start()
1925
1926     def send_and_expect_load_balancing(self, input, pkts, outputs):
1927         self.pg_send(input, pkts)
1928         for oo in outputs:
1929             rx = oo._get_capture(1)
1930             self.assertNotEqual(0, len(rx))
1931
1932     def send_and_expect_one_itf(self, input, pkts, itf):
1933         self.pg_send(input, pkts)
1934         rx = itf.get_capture(len(pkts))
1935
1936     def test_ip6_load_balance(self):
1937         """ IPv6 Load-Balancing """
1938
1939         #
1940         # An array of packets that differ only in the destination port
1941         #  - IP only
1942         #  - MPLS EOS
1943         #  - MPLS non-EOS
1944         #  - MPLS non-EOS with an entropy label
1945         #
1946         port_ip_pkts = []
1947         port_mpls_pkts = []
1948         port_mpls_neos_pkts = []
1949         port_ent_pkts = []
1950
1951         #
1952         # An array of packets that differ only in the source address
1953         #
1954         src_ip_pkts = []
1955         src_mpls_pkts = []
1956
1957         for ii in range(NUM_PKTS):
1958             port_ip_hdr = (
1959                 IPv6(dst="3000::1", src="3000:1::1") /
1960                 inet6.UDP(sport=1234, dport=1234 + ii) /
1961                 Raw(b'\xa5' * 100))
1962             port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1963                                        dst=self.pg0.local_mac) /
1964                                  port_ip_hdr))
1965             port_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1966                                          dst=self.pg0.local_mac) /
1967                                    MPLS(label=66, ttl=2) /
1968                                    port_ip_hdr))
1969             port_mpls_neos_pkts.append((Ether(src=self.pg0.remote_mac,
1970                                               dst=self.pg0.local_mac) /
1971                                         MPLS(label=67, ttl=2) /
1972                                         MPLS(label=77, ttl=2) /
1973                                         port_ip_hdr))
1974             port_ent_pkts.append((Ether(src=self.pg0.remote_mac,
1975                                         dst=self.pg0.local_mac) /
1976                                   MPLS(label=67, ttl=2) /
1977                                   MPLS(label=14, ttl=2) /
1978                                   MPLS(label=999, ttl=2) /
1979                                   port_ip_hdr))
1980             src_ip_hdr = (
1981                 IPv6(dst="3000::1", src="3000:1::%d" % ii) /
1982                 inet6.UDP(sport=1234, dport=1234) /
1983                 Raw(b'\xa5' * 100))
1984             src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1985                                       dst=self.pg0.local_mac) /
1986                                 src_ip_hdr))
1987             src_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1988                                         dst=self.pg0.local_mac) /
1989                                   MPLS(label=66, ttl=2) /
1990                                   src_ip_hdr))
1991
1992         #
1993         # A route for the IP packets
1994         #
1995         route_3000_1 = VppIpRoute(self, "3000::1", 128,
1996                                   [VppRoutePath(self.pg1.remote_ip6,
1997                                                 self.pg1.sw_if_index),
1998                                    VppRoutePath(self.pg2.remote_ip6,
1999                                                 self.pg2.sw_if_index)])
2000         route_3000_1.add_vpp_config()
2001
2002         #
2003         # a local-label for the EOS packets
2004         #
2005         binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
2006         binding.add_vpp_config()
2007
2008         #
2009         # An MPLS route for the non-EOS packets
2010         #
2011         route_67 = VppMplsRoute(self, 67, 0,
2012                                 [VppRoutePath(self.pg1.remote_ip6,
2013                                               self.pg1.sw_if_index,
2014                                               labels=[67]),
2015                                  VppRoutePath(self.pg2.remote_ip6,
2016                                               self.pg2.sw_if_index,
2017                                               labels=[67])])
2018         route_67.add_vpp_config()
2019
2020         #
2021         # inject the packet on pg0 - expect load-balancing across the 2 paths
2022         #  - since the default hash config is to use IP src,dst and port
2023         #    src,dst
2024         # We are not going to ensure equal amounts of packets across each link,
2025         # since the hash algorithm is statistical and therefore this can never
2026         # be guaranteed. But with 64 different packets we do expect some
2027         # balancing. So instead just ensure there is traffic on each link.
2028         #
2029         self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
2030                                             [self.pg1, self.pg2])
2031         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
2032                                             [self.pg1, self.pg2])
2033         self.send_and_expect_load_balancing(self.pg0, port_mpls_pkts,
2034                                             [self.pg1, self.pg2])
2035         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
2036                                             [self.pg1, self.pg2])
2037         self.send_and_expect_load_balancing(self.pg0, port_mpls_neos_pkts,
2038                                             [self.pg1, self.pg2])
2039
2040         #
2041         # The packets with Entropy label in should not load-balance,
2042         # since the Entropy value is fixed.
2043         #
2044         self.send_and_expect_one_itf(self.pg0, port_ent_pkts, self.pg1)
2045
2046         #
2047         # change the flow hash config so it's only IP src,dst
2048         #  - now only the stream with differing source address will
2049         #    load-balance
2050         #
2051         self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=0, dport=0,
2052                                    is_ipv6=1)
2053
2054         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
2055                                             [self.pg1, self.pg2])
2056         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
2057                                             [self.pg1, self.pg2])
2058         self.send_and_expect_one_itf(self.pg0, port_ip_pkts, self.pg2)
2059
2060         #
2061         # change the flow hash config back to defaults
2062         #
2063         self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=1, dport=1,
2064                                    is_ipv6=1)
2065
2066         #
2067         # Recursive prefixes
2068         #  - testing that 2 stages of load-balancing occurs and there is no
2069         #    polarisation (i.e. only 2 of 4 paths are used)
2070         #
2071         port_pkts = []
2072         src_pkts = []
2073
2074         for ii in range(257):
2075             port_pkts.append((Ether(src=self.pg0.remote_mac,
2076                                     dst=self.pg0.local_mac) /
2077                               IPv6(dst="4000::1",
2078                                    src="4000:1::1") /
2079                               inet6.UDP(sport=1234,
2080                                         dport=1234 + ii) /
2081                               Raw(b'\xa5' * 100)))
2082             src_pkts.append((Ether(src=self.pg0.remote_mac,
2083                                    dst=self.pg0.local_mac) /
2084                              IPv6(dst="4000::1",
2085                                   src="4000:1::%d" % ii) /
2086                              inet6.UDP(sport=1234, dport=1234) /
2087                              Raw(b'\xa5' * 100)))
2088
2089         route_3000_2 = VppIpRoute(self, "3000::2", 128,
2090                                   [VppRoutePath(self.pg3.remote_ip6,
2091                                                 self.pg3.sw_if_index),
2092                                    VppRoutePath(self.pg4.remote_ip6,
2093                                                 self.pg4.sw_if_index)])
2094         route_3000_2.add_vpp_config()
2095
2096         route_4000_1 = VppIpRoute(self, "4000::1", 128,
2097                                   [VppRoutePath("3000::1",
2098                                                 0xffffffff),
2099                                    VppRoutePath("3000::2",
2100                                                 0xffffffff)])
2101         route_4000_1.add_vpp_config()
2102
2103         #
2104         # inject the packet on pg0 - expect load-balancing across all 4 paths
2105         #
2106         self.vapi.cli("clear trace")
2107         self.send_and_expect_load_balancing(self.pg0, port_pkts,
2108                                             [self.pg1, self.pg2,
2109                                              self.pg3, self.pg4])
2110         self.send_and_expect_load_balancing(self.pg0, src_pkts,
2111                                             [self.pg1, self.pg2,
2112                                              self.pg3, self.pg4])
2113
2114         #
2115         # Recursive prefixes
2116         #  - testing that 2 stages of load-balancing no choices
2117         #
2118         port_pkts = []
2119
2120         for ii in range(257):
2121             port_pkts.append((Ether(src=self.pg0.remote_mac,
2122                                     dst=self.pg0.local_mac) /
2123                               IPv6(dst="6000::1",
2124                                    src="6000:1::1") /
2125                               inet6.UDP(sport=1234,
2126                                         dport=1234 + ii) /
2127                               Raw(b'\xa5' * 100)))
2128
2129         route_5000_2 = VppIpRoute(self, "5000::2", 128,
2130                                   [VppRoutePath(self.pg3.remote_ip6,
2131                                                 self.pg3.sw_if_index)])
2132         route_5000_2.add_vpp_config()
2133
2134         route_6000_1 = VppIpRoute(self, "6000::1", 128,
2135                                   [VppRoutePath("5000::2",
2136                                                 0xffffffff)])
2137         route_6000_1.add_vpp_config()
2138
2139         #
2140         # inject the packet on pg0 - expect load-balancing across all 4 paths
2141         #
2142         self.vapi.cli("clear trace")
2143         self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg3)
2144
2145
2146 class TestIP6Punt(VppTestCase):
2147     """ IPv6 Punt Police/Redirect """
2148
2149     @classmethod
2150     def setUpClass(cls):
2151         super(TestIP6Punt, cls).setUpClass()
2152
2153     @classmethod
2154     def tearDownClass(cls):
2155         super(TestIP6Punt, cls).tearDownClass()
2156
2157     def setUp(self):
2158         super(TestIP6Punt, self).setUp()
2159
2160         self.create_pg_interfaces(range(4))
2161
2162         for i in self.pg_interfaces:
2163             i.admin_up()
2164             i.config_ip6()
2165             i.resolve_ndp()
2166
2167     def tearDown(self):
2168         super(TestIP6Punt, self).tearDown()
2169         for i in self.pg_interfaces:
2170             i.unconfig_ip6()
2171             i.admin_down()
2172
2173     def test_ip_punt(self):
2174         """ IP6 punt police and redirect """
2175
2176         p = (Ether(src=self.pg0.remote_mac,
2177                    dst=self.pg0.local_mac) /
2178              IPv6(src=self.pg0.remote_ip6,
2179                   dst=self.pg0.local_ip6) /
2180              inet6.TCP(sport=1234, dport=1234) /
2181              Raw(b'\xa5' * 100))
2182
2183         pkts = p * 1025
2184
2185         #
2186         # Configure a punt redirect via pg1.
2187         #
2188         nh_addr = self.pg1.remote_ip6
2189         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2190                                    self.pg1.sw_if_index,
2191                                    nh_addr)
2192
2193         self.send_and_expect(self.pg0, pkts, self.pg1)
2194
2195         #
2196         # add a policer
2197         #
2198         policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
2199         policer.add_vpp_config()
2200         self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
2201
2202         self.vapi.cli("clear trace")
2203         self.pg0.add_stream(pkts)
2204         self.pg_enable_capture(self.pg_interfaces)
2205         self.pg_start()
2206
2207         #
2208         # the number of packet received should be greater than 0,
2209         # but not equal to the number sent, since some were policed
2210         #
2211         rx = self.pg1._get_capture(1)
2212         self.assertGreater(len(rx), 0)
2213         self.assertLess(len(rx), len(pkts))
2214
2215         #
2216         # remove the policer. back to full rx
2217         #
2218         self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
2219         policer.remove_vpp_config()
2220         self.send_and_expect(self.pg0, pkts, self.pg1)
2221
2222         #
2223         # remove the redirect. expect full drop.
2224         #
2225         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2226                                    self.pg1.sw_if_index,
2227                                    nh_addr,
2228                                    is_add=0)
2229         self.send_and_assert_no_replies(self.pg0, pkts,
2230                                         "IP no punt config")
2231
2232         #
2233         # Add a redirect that is not input port selective
2234         #
2235         self.vapi.ip_punt_redirect(0xffffffff,
2236                                    self.pg1.sw_if_index,
2237                                    nh_addr)
2238         self.send_and_expect(self.pg0, pkts, self.pg1)
2239
2240         self.vapi.ip_punt_redirect(0xffffffff,
2241                                    self.pg1.sw_if_index,
2242                                    nh_addr,
2243                                    is_add=0)
2244
2245     def test_ip_punt_dump(self):
2246         """ IP6 punt redirect dump"""
2247
2248         #
2249         # Configure a punt redirects
2250         #
2251         nh_addr = self.pg3.remote_ip6
2252         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2253                                    self.pg3.sw_if_index,
2254                                    nh_addr)
2255         self.vapi.ip_punt_redirect(self.pg1.sw_if_index,
2256                                    self.pg3.sw_if_index,
2257                                    nh_addr)
2258         self.vapi.ip_punt_redirect(self.pg2.sw_if_index,
2259                                    self.pg3.sw_if_index,
2260                                    '0::0')
2261
2262         #
2263         # Dump pg0 punt redirects
2264         #
2265         punts = self.vapi.ip_punt_redirect_dump(self.pg0.sw_if_index,
2266                                                 is_ipv6=1)
2267         for p in punts:
2268             self.assertEqual(p.punt.rx_sw_if_index, self.pg0.sw_if_index)
2269
2270         #
2271         # Dump punt redirects for all interfaces
2272         #
2273         punts = self.vapi.ip_punt_redirect_dump(0xffffffff, is_ipv6=1)
2274         self.assertEqual(len(punts), 3)
2275         for p in punts:
2276             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
2277         self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
2278         self.assertEqual(str(punts[2].punt.nh), '::')
2279
2280
2281 class TestIPDeag(VppTestCase):
2282     """ IPv6 Deaggregate Routes """
2283
2284     @classmethod
2285     def setUpClass(cls):
2286         super(TestIPDeag, cls).setUpClass()
2287
2288     @classmethod
2289     def tearDownClass(cls):
2290         super(TestIPDeag, cls).tearDownClass()
2291
2292     def setUp(self):
2293         super(TestIPDeag, self).setUp()
2294
2295         self.create_pg_interfaces(range(3))
2296
2297         for i in self.pg_interfaces:
2298             i.admin_up()
2299             i.config_ip6()
2300             i.resolve_ndp()
2301
2302     def tearDown(self):
2303         super(TestIPDeag, self).tearDown()
2304         for i in self.pg_interfaces:
2305             i.unconfig_ip6()
2306             i.admin_down()
2307
2308     def test_ip_deag(self):
2309         """ IP Deag Routes """
2310
2311         #
2312         # Create a table to be used for:
2313         #  1 - another destination address lookup
2314         #  2 - a source address lookup
2315         #
2316         table_dst = VppIpTable(self, 1, is_ip6=1)
2317         table_src = VppIpTable(self, 2, is_ip6=1)
2318         table_dst.add_vpp_config()
2319         table_src.add_vpp_config()
2320
2321         #
2322         # Add a route in the default table to point to a deag/
2323         # second lookup in each of these tables
2324         #
2325         route_to_dst = VppIpRoute(self, "1::1", 128,
2326                                   [VppRoutePath("::",
2327                                                 0xffffffff,
2328                                                 nh_table_id=1)])
2329         route_to_src = VppIpRoute(
2330             self, "1::2", 128,
2331             [VppRoutePath("::",
2332                           0xffffffff,
2333                           nh_table_id=2,
2334                           type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP)])
2335
2336         route_to_dst.add_vpp_config()
2337         route_to_src.add_vpp_config()
2338
2339         #
2340         # packets to these destination are dropped, since they'll
2341         # hit the respective default routes in the second table
2342         #
2343         p_dst = (Ether(src=self.pg0.remote_mac,
2344                        dst=self.pg0.local_mac) /
2345                  IPv6(src="5::5", dst="1::1") /
2346                  inet6.TCP(sport=1234, dport=1234) /
2347                  Raw(b'\xa5' * 100))
2348         p_src = (Ether(src=self.pg0.remote_mac,
2349                        dst=self.pg0.local_mac) /
2350                  IPv6(src="2::2", dst="1::2") /
2351                  inet6.TCP(sport=1234, dport=1234) /
2352                  Raw(b'\xa5' * 100))
2353         pkts_dst = p_dst * 257
2354         pkts_src = p_src * 257
2355
2356         self.send_and_assert_no_replies(self.pg0, pkts_dst,
2357                                         "IP in dst table")
2358         self.send_and_assert_no_replies(self.pg0, pkts_src,
2359                                         "IP in src table")
2360
2361         #
2362         # add a route in the dst table to forward via pg1
2363         #
2364         route_in_dst = VppIpRoute(self, "1::1", 128,
2365                                   [VppRoutePath(self.pg1.remote_ip6,
2366                                                 self.pg1.sw_if_index)],
2367                                   table_id=1)
2368         route_in_dst.add_vpp_config()
2369
2370         self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2371
2372         #
2373         # add a route in the src table to forward via pg2
2374         #
2375         route_in_src = VppIpRoute(self, "2::2", 128,
2376                                   [VppRoutePath(self.pg2.remote_ip6,
2377                                                 self.pg2.sw_if_index)],
2378                                   table_id=2)
2379         route_in_src.add_vpp_config()
2380         self.send_and_expect(self.pg0, pkts_src, self.pg2)
2381
2382         #
2383         # loop in the lookup DP
2384         #
2385         route_loop = VppIpRoute(self, "3::3", 128,
2386                                 [VppRoutePath("::",
2387                                               0xffffffff)])
2388         route_loop.add_vpp_config()
2389
2390         p_l = (Ether(src=self.pg0.remote_mac,
2391                      dst=self.pg0.local_mac) /
2392                IPv6(src="3::4", dst="3::3") /
2393                inet6.TCP(sport=1234, dport=1234) /
2394                Raw(b'\xa5' * 100))
2395
2396         self.send_and_assert_no_replies(self.pg0, p_l * 257,
2397                                         "IP lookup loop")
2398
2399
2400 class TestIP6Input(VppTestCase):
2401     """ IPv6 Input Exception Test Cases """
2402
2403     @classmethod
2404     def setUpClass(cls):
2405         super(TestIP6Input, cls).setUpClass()
2406
2407     @classmethod
2408     def tearDownClass(cls):
2409         super(TestIP6Input, cls).tearDownClass()
2410
2411     def setUp(self):
2412         super(TestIP6Input, self).setUp()
2413
2414         self.create_pg_interfaces(range(2))
2415
2416         for i in self.pg_interfaces:
2417             i.admin_up()
2418             i.config_ip6()
2419             i.resolve_ndp()
2420
2421     def tearDown(self):
2422         super(TestIP6Input, self).tearDown()
2423         for i in self.pg_interfaces:
2424             i.unconfig_ip6()
2425             i.admin_down()
2426
2427     def test_ip_input_icmp_reply(self):
2428         """ IP6 Input Exception - Return ICMP (3,0) """
2429         #
2430         # hop limit - ICMP replies
2431         #
2432         p_version = (Ether(src=self.pg0.remote_mac,
2433                            dst=self.pg0.local_mac) /
2434                      IPv6(src=self.pg0.remote_ip6,
2435                           dst=self.pg1.remote_ip6,
2436                           hlim=1) /
2437                      inet6.UDP(sport=1234, dport=1234) /
2438                      Raw(b'\xa5' * 100))
2439
2440         rx = self.send_and_expect(self.pg0, p_version * NUM_PKTS, self.pg0)
2441         rx = rx[0]
2442         icmp = rx[ICMPv6TimeExceeded]
2443
2444         # 0: "hop limit exceeded in transit",
2445         self.assertEqual((icmp.type, icmp.code), (3, 0))
2446
2447     icmpv6_data = '\x0a' * 18
2448     all_0s = "::"
2449     all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
2450
2451     @parameterized.expand([
2452         # Name, src, dst, l4proto, msg, timeout
2453         ("src='iface',   dst='iface'", None, None,
2454          inet6.UDP(sport=1234, dport=1234), "funky version", None),
2455         ("src='All 0's', dst='iface'", all_0s, None,
2456          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2457         ("src='iface',   dst='All 0's'", None, all_0s,
2458          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2459         ("src='All 1's', dst='iface'", all_1s, None,
2460          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2461         ("src='iface',   dst='All 1's'", None, all_1s,
2462          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2463         ("src='All 1's', dst='All 1's'", all_1s, all_1s,
2464          ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
2465
2466     ])
2467     def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
2468
2469         self._testMethodDoc = 'IPv6 Input Exception - %s' % name
2470
2471         p_version = (Ether(src=self.pg0.remote_mac,
2472                            dst=self.pg0.local_mac) /
2473                      IPv6(src=src or self.pg0.remote_ip6,
2474                           dst=dst or self.pg1.remote_ip6,
2475                           version=3) /
2476                      l4 /
2477                      Raw(b'\xa5' * 100))
2478
2479         self.send_and_assert_no_replies(self.pg0, p_version * NUM_PKTS,
2480                                         remark=msg or "",
2481                                         timeout=timeout)
2482
2483     def test_hop_by_hop(self):
2484         """ Hop-by-hop header test """
2485
2486         p = (Ether(src=self.pg0.remote_mac,
2487                    dst=self.pg0.local_mac) /
2488              IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
2489              IPv6ExtHdrHopByHop() /
2490              inet6.UDP(sport=1234, dport=1234) /
2491              Raw(b'\xa5' * 100))
2492
2493         self.pg0.add_stream(p)
2494         self.pg_enable_capture(self.pg_interfaces)
2495         self.pg_start()
2496
2497
2498 class TestIPReplace(VppTestCase):
2499     """ IPv6 Table Replace """
2500
2501     @classmethod
2502     def setUpClass(cls):
2503         super(TestIPReplace, cls).setUpClass()
2504
2505     @classmethod
2506     def tearDownClass(cls):
2507         super(TestIPReplace, cls).tearDownClass()
2508
2509     def setUp(self):
2510         super(TestIPReplace, self).setUp()
2511
2512         self.create_pg_interfaces(range(4))
2513
2514         table_id = 1
2515         self.tables = []
2516
2517         for i in self.pg_interfaces:
2518             i.admin_up()
2519             i.config_ip6()
2520             i.generate_remote_hosts(2)
2521             self.tables.append(VppIpTable(self, table_id,
2522                                           True).add_vpp_config())
2523             table_id += 1
2524
2525     def tearDown(self):
2526         super(TestIPReplace, self).tearDown()
2527         for i in self.pg_interfaces:
2528             i.admin_down()
2529             i.unconfig_ip6()
2530
2531     def test_replace(self):
2532         """ IP Table Replace """
2533
2534         N_ROUTES = 20
2535         links = [self.pg0, self.pg1, self.pg2, self.pg3]
2536         routes = [[], [], [], []]
2537
2538         # the sizes of 'empty' tables
2539         for t in self.tables:
2540             self.assertEqual(len(t.dump()), 2)
2541             self.assertEqual(len(t.mdump()), 5)
2542
2543         # load up the tables with some routes
2544         for ii, t in enumerate(self.tables):
2545             for jj in range(1, N_ROUTES):
2546                 uni = VppIpRoute(
2547                     self, "2001::%d" % jj if jj != 0 else "2001::", 128,
2548                     [VppRoutePath(links[ii].remote_hosts[0].ip6,
2549                                   links[ii].sw_if_index),
2550                      VppRoutePath(links[ii].remote_hosts[1].ip6,
2551                                   links[ii].sw_if_index)],
2552                     table_id=t.table_id).add_vpp_config()
2553                 multi = VppIpMRoute(
2554                     self, "::",
2555                     "ff:2001::%d" % jj, 128,
2556                     MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
2557                     [VppMRoutePath(self.pg0.sw_if_index,
2558                                    MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT,
2559                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
2560                      VppMRoutePath(self.pg1.sw_if_index,
2561                                    MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
2562                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
2563                      VppMRoutePath(self.pg2.sw_if_index,
2564                                    MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
2565                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
2566                      VppMRoutePath(self.pg3.sw_if_index,
2567                                    MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
2568                                    proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)],
2569                     table_id=t.table_id).add_vpp_config()
2570                 routes[ii].append({'uni': uni,
2571                                    'multi': multi})
2572
2573         #
2574         # replace the tables a few times
2575         #
2576         for kk in range(3):
2577             # replace each table
2578             for t in self.tables:
2579                 t.replace_begin()
2580
2581             # all the routes are still there
2582             for ii, t in enumerate(self.tables):
2583                 dump = t.dump()
2584                 mdump = t.mdump()
2585                 for r in routes[ii]:
2586                     self.assertTrue(find_route_in_dump(dump, r['uni'], t))
2587                     self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
2588
2589             # redownload the even numbered routes
2590             for ii, t in enumerate(self.tables):
2591                 for jj in range(0, N_ROUTES, 2):
2592                     routes[ii][jj]['uni'].add_vpp_config()
2593                     routes[ii][jj]['multi'].add_vpp_config()
2594
2595             # signal each table converged
2596             for t in self.tables:
2597                 t.replace_end()
2598
2599             # we should find the even routes, but not the odd
2600             for ii, t in enumerate(self.tables):
2601                 dump = t.dump()
2602                 mdump = t.mdump()
2603                 for jj in range(0, N_ROUTES, 2):
2604                     self.assertTrue(find_route_in_dump(
2605                         dump, routes[ii][jj]['uni'], t))
2606                     self.assertTrue(find_mroute_in_dump(
2607                         mdump, routes[ii][jj]['multi'], t))
2608                 for jj in range(1, N_ROUTES - 1, 2):
2609                     self.assertFalse(find_route_in_dump(
2610                         dump, routes[ii][jj]['uni'], t))
2611                     self.assertFalse(find_mroute_in_dump(
2612                         mdump, routes[ii][jj]['multi'], t))
2613
2614             # reload all the routes
2615             for ii, t in enumerate(self.tables):
2616                 for r in routes[ii]:
2617                     r['uni'].add_vpp_config()
2618                     r['multi'].add_vpp_config()
2619
2620             # all the routes are still there
2621             for ii, t in enumerate(self.tables):
2622                 dump = t.dump()
2623                 mdump = t.mdump()
2624                 for r in routes[ii]:
2625                     self.assertTrue(find_route_in_dump(dump, r['uni'], t))
2626                     self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
2627
2628         #
2629         # finally flush the tables for good measure
2630         #
2631         for t in self.tables:
2632             t.flush()
2633             self.assertEqual(len(t.dump()), 2)
2634             self.assertEqual(len(t.mdump()), 5)
2635
2636
2637 class TestIP6Replace(VppTestCase):
2638     """ IPv4 Interface Address Replace """
2639
2640     @classmethod
2641     def setUpClass(cls):
2642         super(TestIP6Replace, cls).setUpClass()
2643
2644     @classmethod
2645     def tearDownClass(cls):
2646         super(TestIP6Replace, cls).tearDownClass()
2647
2648     def setUp(self):
2649         super(TestIP6Replace, self).setUp()
2650
2651         self.create_pg_interfaces(range(4))
2652
2653         for i in self.pg_interfaces:
2654             i.admin_up()
2655
2656     def tearDown(self):
2657         super(TestIP6Replace, self).tearDown()
2658         for i in self.pg_interfaces:
2659             i.admin_down()
2660
2661     def get_n_pfxs(self, intf):
2662         return len(self.vapi.ip_address_dump(intf.sw_if_index, True))
2663
2664     def test_replace(self):
2665         """ IP interface address replace """
2666
2667         intf_pfxs = [[], [], [], []]
2668
2669         # add prefixes to each of the interfaces
2670         for i in range(len(self.pg_interfaces)):
2671             intf = self.pg_interfaces[i]
2672
2673             # 2001:16:x::1/64
2674             addr = "2001:16:%d::1" % intf.sw_if_index
2675             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
2676             intf_pfxs[i].append(a)
2677
2678             # 2001:16:x::2/64 - a different address in the same subnet as above
2679             addr = "2001:16:%d::2" % intf.sw_if_index
2680             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
2681             intf_pfxs[i].append(a)
2682
2683             # 2001:15:x::2/64 - a different address and subnet
2684             addr = "2001:15:%d::2" % intf.sw_if_index
2685             a = VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
2686             intf_pfxs[i].append(a)
2687
2688         # a dump should n_address in it
2689         for intf in self.pg_interfaces:
2690             self.assertEqual(self.get_n_pfxs(intf), 3)
2691
2692         #
2693         # remove all the address thru a replace
2694         #
2695         self.vapi.sw_interface_address_replace_begin()
2696         self.vapi.sw_interface_address_replace_end()
2697         for intf in self.pg_interfaces:
2698             self.assertEqual(self.get_n_pfxs(intf), 0)
2699
2700         #
2701         # add all the interface addresses back
2702         #
2703         for p in intf_pfxs:
2704             for v in p:
2705                 v.add_vpp_config()
2706         for intf in self.pg_interfaces:
2707             self.assertEqual(self.get_n_pfxs(intf), 3)
2708
2709         #
2710         # replace again, but this time update/re-add the address on the first
2711         # two interfaces
2712         #
2713         self.vapi.sw_interface_address_replace_begin()
2714
2715         for p in intf_pfxs[:2]:
2716             for v in p:
2717                 v.add_vpp_config()
2718
2719         self.vapi.sw_interface_address_replace_end()
2720
2721         # on the first two the address still exist,
2722         # on the other two they do not
2723         for intf in self.pg_interfaces[:2]:
2724             self.assertEqual(self.get_n_pfxs(intf), 3)
2725         for p in intf_pfxs[:2]:
2726             for v in p:
2727                 self.assertTrue(v.query_vpp_config())
2728         for intf in self.pg_interfaces[2:]:
2729             self.assertEqual(self.get_n_pfxs(intf), 0)
2730
2731         #
2732         # add all the interface addresses back on the last two
2733         #
2734         for p in intf_pfxs[2:]:
2735             for v in p:
2736                 v.add_vpp_config()
2737         for intf in self.pg_interfaces:
2738             self.assertEqual(self.get_n_pfxs(intf), 3)
2739
2740         #
2741         # replace again, this time add different prefixes on all the interfaces
2742         #
2743         self.vapi.sw_interface_address_replace_begin()
2744
2745         pfxs = []
2746         for intf in self.pg_interfaces:
2747             # 2001:18:x::1/64
2748             addr = "2001:18:%d::1" % intf.sw_if_index
2749             pfxs.append(VppIpInterfaceAddress(self, intf, addr,
2750                                               64).add_vpp_config())
2751
2752         self.vapi.sw_interface_address_replace_end()
2753
2754         # only .18 should exist on each interface
2755         for intf in self.pg_interfaces:
2756             self.assertEqual(self.get_n_pfxs(intf), 1)
2757         for pfx in pfxs:
2758             self.assertTrue(pfx.query_vpp_config())
2759
2760         #
2761         # remove everything
2762         #
2763         self.vapi.sw_interface_address_replace_begin()
2764         self.vapi.sw_interface_address_replace_end()
2765         for intf in self.pg_interfaces:
2766             self.assertEqual(self.get_n_pfxs(intf), 0)
2767
2768         #
2769         # add prefixes to each interface. post-begin add the prefix from
2770         # interface X onto interface Y. this would normally be an error
2771         # since it would generate a 'duplicate address' warning. but in
2772         # this case, since what is newly downloaded is sane, it's ok
2773         #
2774         for intf in self.pg_interfaces:
2775             # 2001:18:x::1/64
2776             addr = "2001:18:%d::1" % intf.sw_if_index
2777             VppIpInterfaceAddress(self, intf, addr, 64).add_vpp_config()
2778
2779         self.vapi.sw_interface_address_replace_begin()
2780
2781         pfxs = []
2782         for intf in self.pg_interfaces:
2783             # 2001:18:x::1/64
2784             addr = "2001:18:%d::1" % (intf.sw_if_index + 1)
2785             pfxs.append(VppIpInterfaceAddress(self, intf,
2786                                               addr, 64).add_vpp_config())
2787
2788         self.vapi.sw_interface_address_replace_end()
2789
2790         self.logger.info(self.vapi.cli("sh int addr"))
2791
2792         for intf in self.pg_interfaces:
2793             self.assertEqual(self.get_n_pfxs(intf), 1)
2794         for pfx in pfxs:
2795             self.assertTrue(pfx.query_vpp_config())
2796
2797
2798 class TestIP6LinkLocal(VppTestCase):
2799     """ IPv6 Link Local """
2800
2801     @classmethod
2802     def setUpClass(cls):
2803         super(TestIP6LinkLocal, cls).setUpClass()
2804
2805     @classmethod
2806     def tearDownClass(cls):
2807         super(TestIP6LinkLocal, cls).tearDownClass()
2808
2809     def setUp(self):
2810         super(TestIP6LinkLocal, self).setUp()
2811
2812         self.create_pg_interfaces(range(2))
2813
2814         for i in self.pg_interfaces:
2815             i.admin_up()
2816
2817     def tearDown(self):
2818         super(TestIP6LinkLocal, self).tearDown()
2819         for i in self.pg_interfaces:
2820             i.admin_down()
2821
2822     def test_ip6_ll(self):
2823         """ IPv6 Link Local """
2824
2825         #
2826         # two APIs to add a link local address.
2827         #   1 - just like any other prefix
2828         #   2 - with the special set LL API
2829         #
2830
2831         #
2832         # First with the API to set a 'normal' prefix
2833         #
2834         ll1 = "fe80:1::1"
2835         ll2 = "fe80:2::2"
2836         ll3 = "fe80:3::3"
2837
2838         VppIpInterfaceAddress(self, self.pg0, ll1, 128).add_vpp_config()
2839
2840         #
2841         # should be able to ping the ll
2842         #
2843         p_echo_request_1 = (Ether(src=self.pg0.remote_mac,
2844                                   dst=self.pg0.local_mac) /
2845                             IPv6(src=ll2,
2846                                  dst=ll1) /
2847                             ICMPv6EchoRequest())
2848
2849         self.send_and_expect(self.pg0, [p_echo_request_1], self.pg0)
2850
2851         #
2852         # change the link-local on pg0
2853         #
2854         v_ll3 = VppIpInterfaceAddress(self, self.pg0,
2855                                       ll3, 128).add_vpp_config()
2856
2857         p_echo_request_3 = (Ether(src=self.pg0.remote_mac,
2858                                   dst=self.pg0.local_mac) /
2859                             IPv6(src=ll2,
2860                                  dst=ll3) /
2861                             ICMPv6EchoRequest())
2862
2863         self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
2864
2865         #
2866         # set a normal v6 prefix on the link
2867         #
2868         self.pg0.config_ip6()
2869
2870         self.send_and_expect(self.pg0, [p_echo_request_3], self.pg0)
2871
2872         # the link-local cannot be removed
2873         with self.vapi.assert_negative_api_retval():
2874             v_ll3.remove_vpp_config()
2875
2876         #
2877         # Use the specific link-local API on pg1
2878         #
2879         VppIp6LinkLocalAddress(self, self.pg1, ll1).add_vpp_config()
2880         self.send_and_expect(self.pg1, [p_echo_request_1], self.pg1)
2881
2882         VppIp6LinkLocalAddress(self, self.pg1, ll3).add_vpp_config()
2883         self.send_and_expect(self.pg1, [p_echo_request_3], self.pg1)
2884
2885
2886 if __name__ == '__main__':
2887     unittest.main(testRunner=VppTestRunner)