loop counter to prevent infiinte number of look ups per-packet
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5
6 from framework import VppTestCase, VppTestRunner
7 from util import ppp, ip6_normalize
8 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
9 from vpp_pg_interface import is_ipv6_misc
10 from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
11     VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
12     VppMplsRoute, DpoProto, VppMplsTable, VppIpTable
13 from vpp_neighbor import find_nbr, VppNeighbor
14
15 from scapy.packet import Raw
16 from scapy.layers.l2 import Ether, Dot1Q
17 from scapy.layers.inet6 import IPv6, UDP, TCP, ICMPv6ND_NS, ICMPv6ND_RS, \
18     ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
19     ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
20     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
21     ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply
22 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
23     in6_mactoifaceid, in6_ismaddr
24 from scapy.utils import inet_pton, inet_ntop
25 from scapy.contrib.mpls import MPLS
26
27
28 AF_INET6 = socket.AF_INET6
29
30
31 def mk_ll_addr(mac):
32     euid = in6_mactoifaceid(mac)
33     addr = "fe80::" + euid
34     return addr
35
36
37 class TestIPv6ND(VppTestCase):
38     def validate_ra(self, intf, rx, dst_ip=None):
39         if not dst_ip:
40             dst_ip = intf.remote_ip6
41
42         # unicasted packets must come to the unicast mac
43         self.assertEqual(rx[Ether].dst, intf.remote_mac)
44
45         # and from the router's MAC
46         self.assertEqual(rx[Ether].src, intf.local_mac)
47
48         # the rx'd RA should be addressed to the sender's source
49         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
50         self.assertEqual(in6_ptop(rx[IPv6].dst),
51                          in6_ptop(dst_ip))
52
53         # and come from the router's link local
54         self.assertTrue(in6_islladdr(rx[IPv6].src))
55         self.assertEqual(in6_ptop(rx[IPv6].src),
56                          in6_ptop(mk_ll_addr(intf.local_mac)))
57
58     def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
59         if not dst_ip:
60             dst_ip = intf.remote_ip6
61         if not tgt_ip:
62             dst_ip = intf.local_ip6
63
64         # unicasted packets must come to the unicast mac
65         self.assertEqual(rx[Ether].dst, intf.remote_mac)
66
67         # and from the router's MAC
68         self.assertEqual(rx[Ether].src, intf.local_mac)
69
70         # the rx'd NA should be addressed to the sender's source
71         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
72         self.assertEqual(in6_ptop(rx[IPv6].dst),
73                          in6_ptop(dst_ip))
74
75         # and come from the target address
76         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
77
78         # Dest link-layer options should have the router's MAC
79         dll = rx[ICMPv6NDOptDstLLAddr]
80         self.assertEqual(dll.lladdr, intf.local_mac)
81
82     def validate_ns(self, intf, rx, tgt_ip):
83         nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
84         dst_ip = inet_ntop(AF_INET6, nsma)
85
86         # NS is broadcast
87         self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
88
89         # and from the router's MAC
90         self.assertEqual(rx[Ether].src, intf.local_mac)
91
92         # the rx'd NS should be addressed to an mcast address
93         # derived from the target address
94         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
95
96         # expect the tgt IP in the NS header
97         ns = rx[ICMPv6ND_NS]
98         self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
99
100         # packet is from the router's local address
101         self.assertEqual(in6_ptop(rx[IPv6].src), intf.local_ip6)
102
103         # Src link-layer options should have the router's MAC
104         sll = rx[ICMPv6NDOptSrcLLAddr]
105         self.assertEqual(sll.lladdr, intf.local_mac)
106
107     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
108                            filter_out_fn=is_ipv6_misc):
109         intf.add_stream(pkts)
110         self.pg_enable_capture(self.pg_interfaces)
111         self.pg_start()
112         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
113
114         self.assertEqual(len(rx), 1)
115         rx = rx[0]
116         self.validate_ra(intf, rx, dst_ip)
117
118     def send_and_expect_na(self, intf, pkts, remark, dst_ip=None,
119                            tgt_ip=None,
120                            filter_out_fn=is_ipv6_misc):
121         intf.add_stream(pkts)
122         self.pg_enable_capture(self.pg_interfaces)
123         self.pg_start()
124         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
125
126         self.assertEqual(len(rx), 1)
127         rx = rx[0]
128         self.validate_na(intf, rx, dst_ip, tgt_ip)
129
130     def send_and_expect_ns(self, tx_intf, rx_intf, pkts, tgt_ip,
131                            filter_out_fn=is_ipv6_misc):
132         tx_intf.add_stream(pkts)
133         self.pg_enable_capture(self.pg_interfaces)
134         self.pg_start()
135         rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
136
137         self.assertEqual(len(rx), 1)
138         rx = rx[0]
139         self.validate_ns(rx_intf, rx, tgt_ip)
140
141     def verify_ip(self, rx, smac, dmac, sip, dip):
142         ether = rx[Ether]
143         self.assertEqual(ether.dst, dmac)
144         self.assertEqual(ether.src, smac)
145
146         ip = rx[IPv6]
147         self.assertEqual(ip.src, sip)
148         self.assertEqual(ip.dst, dip)
149
150
151 class TestIPv6(TestIPv6ND):
152     """ IPv6 Test Case """
153
154     @classmethod
155     def setUpClass(cls):
156         super(TestIPv6, cls).setUpClass()
157
158     def setUp(self):
159         """
160         Perform test setup before test case.
161
162         **Config:**
163             - create 3 pg interfaces
164                 - untagged pg0 interface
165                 - Dot1Q subinterface on pg1
166                 - Dot1AD subinterface on pg2
167             - setup interfaces:
168                 - put it into UP state
169                 - set IPv6 addresses
170                 - resolve neighbor address using NDP
171             - configure 200 fib entries
172
173         :ivar list interfaces: pg interfaces and subinterfaces.
174         :ivar dict flows: IPv4 packet flows in test.
175
176         *TODO:* Create AD sub interface
177         """
178         super(TestIPv6, self).setUp()
179
180         # create 3 pg interfaces
181         self.create_pg_interfaces(range(3))
182
183         # create 2 subinterfaces for p1 and pg2
184         self.sub_interfaces = [
185             VppDot1QSubint(self, self.pg1, 100),
186             VppDot1QSubint(self, self.pg2, 200)
187             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
188         ]
189
190         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
191         self.flows = dict()
192         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
193         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
194         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
195
196         # packet sizes
197         self.pg_if_packet_sizes = [64, 1500, 9020]
198
199         self.interfaces = list(self.pg_interfaces)
200         self.interfaces.extend(self.sub_interfaces)
201
202         # setup all interfaces
203         for i in self.interfaces:
204             i.admin_up()
205             i.config_ip6()
206             i.resolve_ndp()
207
208         # config 2M FIB entries
209         self.config_fib_entries(200)
210
211     def tearDown(self):
212         """Run standard test teardown and log ``show ip6 neighbors``."""
213         for i in self.interfaces:
214             i.unconfig_ip6()
215             i.ip6_disable()
216             i.admin_down()
217         for i in self.sub_interfaces:
218             i.remove_vpp_config()
219
220         super(TestIPv6, self).tearDown()
221         if not self.vpp_dead:
222             self.logger.info(self.vapi.cli("show ip6 neighbors"))
223             # info(self.vapi.cli("show ip6 fib"))  # many entries
224
225     def config_fib_entries(self, count):
226         """For each interface add to the FIB table *count* routes to
227         "fd02::1/128" destination with interface's local address as next-hop
228         address.
229
230         :param int count: Number of FIB entries.
231
232         - *TODO:* check if the next-hop address shouldn't be remote address
233           instead of local address.
234         """
235         n_int = len(self.interfaces)
236         percent = 0
237         counter = 0.0
238         dest_addr = inet_pton(AF_INET6, "fd02::1")
239         dest_addr_len = 128
240         for i in self.interfaces:
241             next_hop_address = i.local_ip6n
242             for j in range(count / n_int):
243                 self.vapi.ip_add_del_route(
244                     dest_addr, dest_addr_len, next_hop_address, is_ipv6=1)
245                 counter += 1
246                 if counter / count * 100 > percent:
247                     self.logger.info("Configure %d FIB entries .. %d%% done" %
248                                      (count, percent))
249                     percent += 1
250
251     def modify_packet(self, src_if, packet_size, pkt):
252         """Add load, set destination IP and extend packet to required packet
253         size for defined interface.
254
255         :param VppInterface src_if: Interface to create packet for.
256         :param int packet_size: Required packet size.
257         :param Scapy pkt: Packet to be modified.
258         """
259         dst_if_idx = packet_size / 10 % 2
260         dst_if = self.flows[src_if][dst_if_idx]
261         info = self.create_packet_info(src_if, dst_if)
262         payload = self.info_to_payload(info)
263         p = pkt/Raw(payload)
264         p[IPv6].dst = dst_if.remote_ip6
265         info.data = p.copy()
266         if isinstance(src_if, VppSubInterface):
267             p = src_if.add_dot1_layer(p)
268         self.extend_packet(p, packet_size)
269
270         return p
271
272     def create_stream(self, src_if):
273         """Create input packet stream for defined interface.
274
275         :param VppInterface src_if: Interface to create packet stream for.
276         """
277         hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
278         pkt_tmpl = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
279                     IPv6(src=src_if.remote_ip6) /
280                     UDP(sport=1234, dport=1234))
281
282         pkts = [self.modify_packet(src_if, i, pkt_tmpl)
283                 for i in xrange(self.pg_if_packet_sizes[0],
284                                 self.pg_if_packet_sizes[1], 10)]
285         pkts_b = [self.modify_packet(src_if, i, pkt_tmpl)
286                   for i in xrange(self.pg_if_packet_sizes[1] + hdr_ext,
287                                   self.pg_if_packet_sizes[2] + hdr_ext, 50)]
288         pkts.extend(pkts_b)
289
290         return pkts
291
292     def verify_capture(self, dst_if, capture):
293         """Verify captured input packet stream for defined interface.
294
295         :param VppInterface dst_if: Interface to verify captured packet stream
296                                     for.
297         :param list capture: Captured packet stream.
298         """
299         self.logger.info("Verifying capture on interface %s" % dst_if.name)
300         last_info = dict()
301         for i in self.interfaces:
302             last_info[i.sw_if_index] = None
303         is_sub_if = False
304         dst_sw_if_index = dst_if.sw_if_index
305         if hasattr(dst_if, 'parent'):
306             is_sub_if = True
307         for packet in capture:
308             if is_sub_if:
309                 # Check VLAN tags and Ethernet header
310                 packet = dst_if.remove_dot1_layer(packet)
311             self.assertTrue(Dot1Q not in packet)
312             try:
313                 ip = packet[IPv6]
314                 udp = packet[UDP]
315                 payload_info = self.payload_to_info(str(packet[Raw]))
316                 packet_index = payload_info.index
317                 self.assertEqual(payload_info.dst, dst_sw_if_index)
318                 self.logger.debug(
319                     "Got packet on port %s: src=%u (id=%u)" %
320                     (dst_if.name, payload_info.src, packet_index))
321                 next_info = self.get_next_packet_info_for_interface2(
322                     payload_info.src, dst_sw_if_index,
323                     last_info[payload_info.src])
324                 last_info[payload_info.src] = next_info
325                 self.assertTrue(next_info is not None)
326                 self.assertEqual(packet_index, next_info.index)
327                 saved_packet = next_info.data
328                 # Check standard fields
329                 self.assertEqual(ip.src, saved_packet[IPv6].src)
330                 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
331                 self.assertEqual(udp.sport, saved_packet[UDP].sport)
332                 self.assertEqual(udp.dport, saved_packet[UDP].dport)
333             except:
334                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
335                 raise
336         for i in self.interfaces:
337             remaining_packet = self.get_next_packet_info_for_interface2(
338                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
339             self.assertTrue(remaining_packet is None,
340                             "Interface %s: Packet expected from interface %s "
341                             "didn't arrive" % (dst_if.name, i.name))
342
343     def test_fib(self):
344         """ IPv6 FIB test
345
346         Test scenario:
347             - Create IPv6 stream for pg0 interface
348             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
349             - Send and verify received packets on each interface.
350         """
351
352         pkts = self.create_stream(self.pg0)
353         self.pg0.add_stream(pkts)
354
355         for i in self.sub_interfaces:
356             pkts = self.create_stream(i)
357             i.parent.add_stream(pkts)
358
359         self.pg_enable_capture(self.pg_interfaces)
360         self.pg_start()
361
362         pkts = self.pg0.get_capture()
363         self.verify_capture(self.pg0, pkts)
364
365         for i in self.sub_interfaces:
366             pkts = i.parent.get_capture()
367             self.verify_capture(i, pkts)
368
369     def test_ns(self):
370         """ IPv6 Neighbour Solicitation Exceptions
371
372         Test scenario:
373            - Send an NS Sourced from an address not covered by the link sub-net
374            - Send an NS to an mcast address the router has not joined
375            - Send NS for a target address the router does not onn.
376         """
377
378         #
379         # An NS from a non link source address
380         #
381         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
382         d = inet_ntop(AF_INET6, nsma)
383
384         p = (Ether(dst=in6_getnsmac(nsma)) /
385              IPv6(dst=d, src="2002::2") /
386              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
387              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
388         pkts = [p]
389
390         self.send_and_assert_no_replies(
391             self.pg0, pkts,
392             "No response to NS source by address not on sub-net")
393
394         #
395         # An NS for sent to a solicited mcast group the router is
396         # not a member of FAILS
397         #
398         if 0:
399             nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
400             d = inet_ntop(AF_INET6, nsma)
401
402             p = (Ether(dst=in6_getnsmac(nsma)) /
403                  IPv6(dst=d, src=self.pg0.remote_ip6) /
404                  ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
405                  ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
406             pkts = [p]
407
408             self.send_and_assert_no_replies(
409                 self.pg0, pkts,
410                 "No response to NS sent to unjoined mcast address")
411
412         #
413         # An NS whose target address is one the router does not own
414         #
415         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
416         d = inet_ntop(AF_INET6, nsma)
417
418         p = (Ether(dst=in6_getnsmac(nsma)) /
419              IPv6(dst=d, src=self.pg0.remote_ip6) /
420              ICMPv6ND_NS(tgt="fd::ffff") /
421              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
422         pkts = [p]
423
424         self.send_and_assert_no_replies(self.pg0, pkts,
425                                         "No response to NS for unknown target")
426
427         #
428         # A neighbor entry that has no associated FIB-entry
429         #
430         self.pg0.generate_remote_hosts(4)
431         nd_entry = VppNeighbor(self,
432                                self.pg0.sw_if_index,
433                                self.pg0.remote_hosts[2].mac,
434                                self.pg0.remote_hosts[2].ip6,
435                                af=AF_INET6,
436                                is_no_fib_entry=1)
437         nd_entry.add_vpp_config()
438
439         #
440         # check we have the neighbor, but no route
441         #
442         self.assertTrue(find_nbr(self,
443                                  self.pg0.sw_if_index,
444                                  self.pg0._remote_hosts[2].ip6,
445                                  inet=AF_INET6))
446         self.assertFalse(find_route(self,
447                                     self.pg0._remote_hosts[2].ip6,
448                                     128,
449                                     inet=AF_INET6))
450
451         #
452         # send an NS from a link local address to the interface's global
453         # address
454         #
455         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
456              IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6_ll) /
457              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
458              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
459
460         self.send_and_expect_na(self.pg0, p,
461                                 "NS from link-local",
462                                 dst_ip=self.pg0._remote_hosts[2].ip6_ll,
463                                 tgt_ip=self.pg0.local_ip6)
464
465         #
466         # we should have learned an ND entry for the peer's link-local
467         # but not inserted a route to it in the FIB
468         #
469         self.assertTrue(find_nbr(self,
470                                  self.pg0.sw_if_index,
471                                  self.pg0._remote_hosts[2].ip6_ll,
472                                  inet=AF_INET6))
473         self.assertFalse(find_route(self,
474                                     self.pg0._remote_hosts[2].ip6_ll,
475                                     128,
476                                     inet=AF_INET6))
477
478         #
479         # An NS to the router's own Link-local
480         #
481         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
482              IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll) /
483              ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll) /
484              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
485
486         self.send_and_expect_na(self.pg0, p,
487                                 "NS to/from link-local",
488                                 dst_ip=self.pg0._remote_hosts[3].ip6_ll,
489                                 tgt_ip=self.pg0.local_ip6_ll)
490
491         #
492         # we should have learned an ND entry for the peer's link-local
493         # but not inserted a route to it in the FIB
494         #
495         self.assertTrue(find_nbr(self,
496                                  self.pg0.sw_if_index,
497                                  self.pg0._remote_hosts[3].ip6_ll,
498                                  inet=AF_INET6))
499         self.assertFalse(find_route(self,
500                                     self.pg0._remote_hosts[3].ip6_ll,
501                                     128,
502                                     inet=AF_INET6))
503
504     def test_ns_duplicates(self):
505         """ ND Duplicates"""
506
507         #
508         # Generate some hosts on the LAN
509         #
510         self.pg1.generate_remote_hosts(3)
511
512         #
513         # Add host 1 on pg1 and pg2
514         #
515         ns_pg1 = VppNeighbor(self,
516                              self.pg1.sw_if_index,
517                              self.pg1.remote_hosts[1].mac,
518                              self.pg1.remote_hosts[1].ip6,
519                              af=AF_INET6)
520         ns_pg1.add_vpp_config()
521         ns_pg2 = VppNeighbor(self,
522                              self.pg2.sw_if_index,
523                              self.pg2.remote_mac,
524                              self.pg1.remote_hosts[1].ip6,
525                              af=AF_INET6)
526         ns_pg2.add_vpp_config()
527
528         #
529         # IP packet destined for pg1 remote host arrives on pg1 again.
530         #
531         p = (Ether(dst=self.pg0.local_mac,
532                    src=self.pg0.remote_mac) /
533              IPv6(src=self.pg0.remote_ip6,
534                   dst=self.pg1.remote_hosts[1].ip6) /
535              UDP(sport=1234, dport=1234) /
536              Raw())
537
538         self.pg0.add_stream(p)
539         self.pg_enable_capture(self.pg_interfaces)
540         self.pg_start()
541
542         rx1 = self.pg1.get_capture(1)
543
544         self.verify_ip(rx1[0],
545                        self.pg1.local_mac,
546                        self.pg1.remote_hosts[1].mac,
547                        self.pg0.remote_ip6,
548                        self.pg1.remote_hosts[1].ip6)
549
550         #
551         # remove the duplicate on pg1
552         # packet stream shoud generate NSs out of pg1
553         #
554         ns_pg1.remove_vpp_config()
555
556         self.send_and_expect_ns(self.pg0, self.pg1,
557                                 p, self.pg1.remote_hosts[1].ip6)
558
559         #
560         # Add it back
561         #
562         ns_pg1.add_vpp_config()
563
564         self.pg0.add_stream(p)
565         self.pg_enable_capture(self.pg_interfaces)
566         self.pg_start()
567
568         rx1 = self.pg1.get_capture(1)
569
570         self.verify_ip(rx1[0],
571                        self.pg1.local_mac,
572                        self.pg1.remote_hosts[1].mac,
573                        self.pg0.remote_ip6,
574                        self.pg1.remote_hosts[1].ip6)
575
576     def validate_ra(self, intf, rx, dst_ip=None, mtu=9000, pi_opt=None):
577         if not dst_ip:
578             dst_ip = intf.remote_ip6
579
580         # unicasted packets must come to the unicast mac
581         self.assertEqual(rx[Ether].dst, intf.remote_mac)
582
583         # and from the router's MAC
584         self.assertEqual(rx[Ether].src, intf.local_mac)
585
586         # the rx'd RA should be addressed to the sender's source
587         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
588         self.assertEqual(in6_ptop(rx[IPv6].dst),
589                          in6_ptop(dst_ip))
590
591         # and come from the router's link local
592         self.assertTrue(in6_islladdr(rx[IPv6].src))
593         self.assertEqual(in6_ptop(rx[IPv6].src),
594                          in6_ptop(mk_ll_addr(intf.local_mac)))
595
596         # it should contain the links MTU
597         ra = rx[ICMPv6ND_RA]
598         self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
599
600         # it should contain the source's link layer address option
601         sll = ra[ICMPv6NDOptSrcLLAddr]
602         self.assertEqual(sll.lladdr, intf.local_mac)
603
604         if not pi_opt:
605             # the RA should not contain prefix information
606             self.assertFalse(ra.haslayer(ICMPv6NDOptPrefixInfo))
607         else:
608             raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
609
610             # the options are nested in the scapy packet in way that i cannot
611             # decipher how to decode. this 1st layer of option always returns
612             # nested classes, so a direct obj1=obj2 comparison always fails.
613             # however, the getlayer(.., 2) does give one instnace.
614             # so we cheat here and construct a new opt instnace for comparison
615             rd = ICMPv6NDOptPrefixInfo(prefixlen=raos.prefixlen,
616                                        prefix=raos.prefix,
617                                        L=raos.L,
618                                        A=raos.A)
619             if type(pi_opt) is list:
620                 for ii in range(len(pi_opt)):
621                     self.assertEqual(pi_opt[ii], rd)
622                     rd = rx.getlayer(ICMPv6NDOptPrefixInfo, ii+2)
623             else:
624                 self.assertEqual(pi_opt, raos)
625
626     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
627                            filter_out_fn=is_ipv6_misc,
628                            opt=None):
629         intf.add_stream(pkts)
630         self.pg_enable_capture(self.pg_interfaces)
631         self.pg_start()
632         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
633
634         self.assertEqual(len(rx), 1)
635         rx = rx[0]
636         self.validate_ra(intf, rx, dst_ip, pi_opt=opt)
637
638     def test_rs(self):
639         """ IPv6 Router Solicitation Exceptions
640
641         Test scenario:
642         """
643
644         #
645         # Before we begin change the IPv6 RA responses to use the unicast
646         # address - that way we will not confuse them with the periodic
647         # RAs which go to the mcast address
648         # Sit and wait for the first periodic RA.
649         #
650         # TODO
651         #
652         self.pg0.ip6_ra_config(send_unicast=1)
653
654         #
655         # An RS from a link source address
656         #  - expect an RA in return
657         #
658         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
659              IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
660              ICMPv6ND_RS())
661         pkts = [p]
662         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
663
664         #
665         # For the next RS sent the RA should be rate limited
666         #
667         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
668
669         #
670         # When we reconfiure the IPv6 RA config, we reset the RA rate limiting,
671         # so we need to do this before each test below so as not to drop
672         # packets for rate limiting reasons. Test this works here.
673         #
674         self.pg0.ip6_ra_config(send_unicast=1)
675         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
676
677         #
678         # An RS sent from a non-link local source
679         #
680         self.pg0.ip6_ra_config(send_unicast=1)
681         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
682              IPv6(dst=self.pg0.local_ip6, src="2002::ffff") /
683              ICMPv6ND_RS())
684         pkts = [p]
685         self.send_and_assert_no_replies(self.pg0, pkts,
686                                         "RS from non-link source")
687
688         #
689         # Source an RS from a link local address
690         #
691         self.pg0.ip6_ra_config(send_unicast=1)
692         ll = mk_ll_addr(self.pg0.remote_mac)
693         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
694              IPv6(dst=self.pg0.local_ip6, src=ll) /
695              ICMPv6ND_RS())
696         pkts = [p]
697         self.send_and_expect_ra(self.pg0, pkts,
698                                 "RS sourced from link-local",
699                                 dst_ip=ll)
700
701         #
702         # Send the RS multicast
703         #
704         self.pg0.ip6_ra_config(send_unicast=1)
705         dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
706         ll = mk_ll_addr(self.pg0.remote_mac)
707         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
708              IPv6(dst="ff02::2", src=ll) /
709              ICMPv6ND_RS())
710         pkts = [p]
711         self.send_and_expect_ra(self.pg0, pkts,
712                                 "RS sourced from link-local",
713                                 dst_ip=ll)
714
715         #
716         # Source from the unspecified address ::. This happens when the RS
717         # is sent before the host has a configured address/sub-net,
718         # i.e. auto-config. Since the sender has no IP address, the reply
719         # comes back mcast - so the capture needs to not filter this.
720         # If we happen to pick up the periodic RA at this point then so be it,
721         # it's not an error.
722         #
723         self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
724         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
725              IPv6(dst="ff02::2", src="::") /
726              ICMPv6ND_RS())
727         pkts = [p]
728         self.send_and_expect_ra(self.pg0, pkts,
729                                 "RS sourced from unspecified",
730                                 dst_ip="ff02::1",
731                                 filter_out_fn=None)
732
733         #
734         # Configure The RA to announce the links prefix
735         #
736         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
737                                self.pg0.local_ip6_prefix_len)
738
739         #
740         # RAs should now contain the prefix information option
741         #
742         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
743                                     prefix=self.pg0.local_ip6,
744                                     L=1,
745                                     A=1)
746
747         self.pg0.ip6_ra_config(send_unicast=1)
748         ll = mk_ll_addr(self.pg0.remote_mac)
749         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
750              IPv6(dst=self.pg0.local_ip6, src=ll) /
751              ICMPv6ND_RS())
752         self.send_and_expect_ra(self.pg0, p,
753                                 "RA with prefix-info",
754                                 dst_ip=ll,
755                                 opt=opt)
756
757         #
758         # Change the prefix info to not off-link
759         #  L-flag is clear
760         #
761         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
762                                self.pg0.local_ip6_prefix_len,
763                                off_link=1)
764
765         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
766                                     prefix=self.pg0.local_ip6,
767                                     L=0,
768                                     A=1)
769
770         self.pg0.ip6_ra_config(send_unicast=1)
771         self.send_and_expect_ra(self.pg0, p,
772                                 "RA with Prefix info with L-flag=0",
773                                 dst_ip=ll,
774                                 opt=opt)
775
776         #
777         # Change the prefix info to not off-link, no-autoconfig
778         #  L and A flag are clear in the advert
779         #
780         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
781                                self.pg0.local_ip6_prefix_len,
782                                off_link=1,
783                                no_autoconfig=1)
784
785         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
786                                     prefix=self.pg0.local_ip6,
787                                     L=0,
788                                     A=0)
789
790         self.pg0.ip6_ra_config(send_unicast=1)
791         self.send_and_expect_ra(self.pg0, p,
792                                 "RA with Prefix info with A & L-flag=0",
793                                 dst_ip=ll,
794                                 opt=opt)
795
796         #
797         # Change the flag settings back to the defaults
798         #  L and A flag are set in the advert
799         #
800         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
801                                self.pg0.local_ip6_prefix_len)
802
803         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
804                                     prefix=self.pg0.local_ip6,
805                                     L=1,
806                                     A=1)
807
808         self.pg0.ip6_ra_config(send_unicast=1)
809         self.send_and_expect_ra(self.pg0, p,
810                                 "RA with Prefix info",
811                                 dst_ip=ll,
812                                 opt=opt)
813
814         #
815         # Change the prefix info to not off-link, no-autoconfig
816         #  L and A flag are clear in the advert
817         #
818         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
819                                self.pg0.local_ip6_prefix_len,
820                                off_link=1,
821                                no_autoconfig=1)
822
823         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
824                                     prefix=self.pg0.local_ip6,
825                                     L=0,
826                                     A=0)
827
828         self.pg0.ip6_ra_config(send_unicast=1)
829         self.send_and_expect_ra(self.pg0, p,
830                                 "RA with Prefix info with A & L-flag=0",
831                                 dst_ip=ll,
832                                 opt=opt)
833
834         #
835         # Use the reset to defults option to revert to defaults
836         #  L and A flag are clear in the advert
837         #
838         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
839                                self.pg0.local_ip6_prefix_len,
840                                use_default=1)
841
842         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
843                                     prefix=self.pg0.local_ip6,
844                                     L=1,
845                                     A=1)
846
847         self.pg0.ip6_ra_config(send_unicast=1)
848         self.send_and_expect_ra(self.pg0, p,
849                                 "RA with Prefix reverted to defaults",
850                                 dst_ip=ll,
851                                 opt=opt)
852
853         #
854         # Advertise Another prefix. With no L-flag/A-flag
855         #
856         self.pg0.ip6_ra_prefix(self.pg1.local_ip6n,
857                                self.pg1.local_ip6_prefix_len,
858                                off_link=1,
859                                no_autoconfig=1)
860
861         opt = [ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
862                                      prefix=self.pg0.local_ip6,
863                                      L=1,
864                                      A=1),
865                ICMPv6NDOptPrefixInfo(prefixlen=self.pg1.local_ip6_prefix_len,
866                                      prefix=self.pg1.local_ip6,
867                                      L=0,
868                                      A=0)]
869
870         self.pg0.ip6_ra_config(send_unicast=1)
871         ll = mk_ll_addr(self.pg0.remote_mac)
872         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
873              IPv6(dst=self.pg0.local_ip6, src=ll) /
874              ICMPv6ND_RS())
875         self.send_and_expect_ra(self.pg0, p,
876                                 "RA with multiple Prefix infos",
877                                 dst_ip=ll,
878                                 opt=opt)
879
880         #
881         # Remove the first refix-info - expect the second is still in the
882         # advert
883         #
884         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
885                                self.pg0.local_ip6_prefix_len,
886                                is_no=1)
887
888         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg1.local_ip6_prefix_len,
889                                     prefix=self.pg1.local_ip6,
890                                     L=0,
891                                     A=0)
892
893         self.pg0.ip6_ra_config(send_unicast=1)
894         self.send_and_expect_ra(self.pg0, p,
895                                 "RA with Prefix reverted to defaults",
896                                 dst_ip=ll,
897                                 opt=opt)
898
899         #
900         # Remove the second prefix-info - expect no prefix-info i nthe adverts
901         #
902         self.pg0.ip6_ra_prefix(self.pg1.local_ip6n,
903                                self.pg1.local_ip6_prefix_len,
904                                is_no=1)
905
906         self.pg0.ip6_ra_config(send_unicast=1)
907         self.send_and_expect_ra(self.pg0, p,
908                                 "RA with Prefix reverted to defaults",
909                                 dst_ip=ll)
910
911         #
912         # Reset the periodic advertisements back to default values
913         #
914         self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
915
916
917 class TestICMPv6Echo(VppTestCase):
918     """ ICMPv6 Echo Test Case """
919
920     def setUp(self):
921         super(TestICMPv6Echo, self).setUp()
922
923         # create 1 pg interface
924         self.create_pg_interfaces(range(1))
925
926         for i in self.pg_interfaces:
927             i.admin_up()
928             i.config_ip6()
929             i.resolve_ndp()
930
931     def tearDown(self):
932         super(TestICMPv6Echo, self).tearDown()
933         for i in self.pg_interfaces:
934             i.unconfig_ip6()
935             i.ip6_disable()
936             i.admin_down()
937
938     def test_icmpv6_echo(self):
939         """ VPP replies to ICMPv6 Echo Request
940
941         Test scenario:
942
943             - Receive ICMPv6 Echo Request message on pg0 interface.
944             - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
945         """
946
947         icmpv6_id = 0xb
948         icmpv6_seq = 5
949         icmpv6_data = '\x0a' * 18
950         p_echo_request = (Ether(src=self.pg0.remote_mac,
951                                 dst=self.pg0.local_mac) /
952                           IPv6(src=self.pg0.remote_ip6,
953                                dst=self.pg0.local_ip6) /
954                           ICMPv6EchoRequest(id=icmpv6_id, seq=icmpv6_seq,
955                                             data=icmpv6_data))
956
957         self.pg0.add_stream(p_echo_request)
958         self.pg_enable_capture(self.pg_interfaces)
959         self.pg_start()
960
961         rx = self.pg0.get_capture(1)
962         rx = rx[0]
963         ether = rx[Ether]
964         ipv6 = rx[IPv6]
965         icmpv6 = rx[ICMPv6EchoReply]
966
967         self.assertEqual(ether.src, self.pg0.local_mac)
968         self.assertEqual(ether.dst, self.pg0.remote_mac)
969
970         self.assertEqual(ipv6.src, self.pg0.local_ip6)
971         self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
972
973         self.assertEqual(icmp6types[icmpv6.type], "Echo Reply")
974         self.assertEqual(icmpv6.id, icmpv6_id)
975         self.assertEqual(icmpv6.seq, icmpv6_seq)
976         self.assertEqual(icmpv6.data, icmpv6_data)
977
978
979 class TestIPv6RD(TestIPv6ND):
980     """ IPv6 Router Discovery Test Case """
981
982     @classmethod
983     def setUpClass(cls):
984         super(TestIPv6RD, cls).setUpClass()
985
986     def setUp(self):
987         super(TestIPv6RD, self).setUp()
988
989         # create 2 pg interfaces
990         self.create_pg_interfaces(range(2))
991
992         self.interfaces = list(self.pg_interfaces)
993
994         # setup all interfaces
995         for i in self.interfaces:
996             i.admin_up()
997             i.config_ip6()
998
999     def tearDown(self):
1000         for i in self.interfaces:
1001             i.unconfig_ip6()
1002             i.admin_down()
1003         super(TestIPv6RD, self).tearDown()
1004
1005     def test_rd_send_router_solicitation(self):
1006         """ Verify router solicitation packets """
1007
1008         count = 2
1009         self.pg_enable_capture(self.pg_interfaces)
1010         self.pg_start()
1011         self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index,
1012                                                  mrc=count)
1013         rx_list = self.pg1.get_capture(count, timeout=3)
1014         self.assertEqual(len(rx_list), count)
1015         for packet in rx_list:
1016             self.assertTrue(packet.haslayer(IPv6))
1017             self.assertTrue(packet[IPv6].haslayer(ICMPv6ND_RS))
1018             dst = ip6_normalize(packet[IPv6].dst)
1019             dst2 = ip6_normalize("ff02::2")
1020             self.assert_equal(dst, dst2)
1021             src = ip6_normalize(packet[IPv6].src)
1022             src2 = ip6_normalize(self.pg1.local_ip6_ll)
1023             self.assert_equal(src, src2)
1024             self.assertTrue(packet[ICMPv6ND_RS].haslayer(ICMPv6NDOptSrcLLAddr))
1025             self.assert_equal(packet[ICMPv6NDOptSrcLLAddr].lladdr,
1026                               self.pg1.local_mac)
1027
1028     def verify_prefix_info(self, reported_prefix, prefix_option):
1029         prefix = socket.inet_pton(socket.AF_INET6,
1030                                   prefix_option.getfieldval("prefix"))
1031         self.assert_equal(reported_prefix.dst_address, prefix)
1032         self.assert_equal(reported_prefix.dst_address_length,
1033                           prefix_option.getfieldval("prefixlen"))
1034         L = prefix_option.getfieldval("L")
1035         A = prefix_option.getfieldval("A")
1036         option_flags = (L << 7) | (A << 6)
1037         self.assert_equal(reported_prefix.flags, option_flags)
1038         self.assert_equal(reported_prefix.valid_time,
1039                           prefix_option.getfieldval("validlifetime"))
1040         self.assert_equal(reported_prefix.preferred_time,
1041                           prefix_option.getfieldval("preferredlifetime"))
1042
1043     def test_rd_receive_router_advertisement(self):
1044         """ Verify events triggered by received RA packets """
1045
1046         self.vapi.want_ip6_ra_events()
1047
1048         prefix_info_1 = ICMPv6NDOptPrefixInfo(
1049             prefix="1::2",
1050             prefixlen=50,
1051             validlifetime=200,
1052             preferredlifetime=500,
1053             L=1,
1054             A=1,
1055         )
1056
1057         prefix_info_2 = ICMPv6NDOptPrefixInfo(
1058             prefix="7::4",
1059             prefixlen=20,
1060             validlifetime=70,
1061             preferredlifetime=1000,
1062             L=1,
1063             A=0,
1064         )
1065
1066         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1067              IPv6(dst=self.pg1.local_ip6_ll,
1068                   src=mk_ll_addr(self.pg1.remote_mac)) /
1069              ICMPv6ND_RA() /
1070              prefix_info_1 /
1071              prefix_info_2)
1072         self.pg1.add_stream([p])
1073         self.pg_start()
1074
1075         ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1076
1077         self.assert_equal(ev.current_hop_limit, 0)
1078         self.assert_equal(ev.flags, 8)
1079         self.assert_equal(ev.router_lifetime_in_sec, 1800)
1080         self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1081         self.assert_equal(
1082             ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0)
1083
1084         self.assert_equal(ev.n_prefixes, 2)
1085
1086         self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1087         self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1088
1089
1090 class TestIPv6RDControlPlane(TestIPv6ND):
1091     """ IPv6 Router Discovery Control Plane Test Case """
1092
1093     @classmethod
1094     def setUpClass(cls):
1095         super(TestIPv6RDControlPlane, cls).setUpClass()
1096
1097     def setUp(self):
1098         super(TestIPv6RDControlPlane, self).setUp()
1099
1100         # create 1 pg interface
1101         self.create_pg_interfaces(range(1))
1102
1103         self.interfaces = list(self.pg_interfaces)
1104
1105         # setup all interfaces
1106         for i in self.interfaces:
1107             i.admin_up()
1108             i.config_ip6()
1109
1110     def tearDown(self):
1111         super(TestIPv6RDControlPlane, self).tearDown()
1112
1113     @staticmethod
1114     def create_ra_packet(pg, routerlifetime=None):
1115         src_ip = pg.remote_ip6_ll
1116         dst_ip = pg.local_ip6
1117         if routerlifetime is not None:
1118             ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1119         else:
1120             ra = ICMPv6ND_RA()
1121         p = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
1122              IPv6(dst=dst_ip, src=src_ip) / ra)
1123         return p
1124
1125     @staticmethod
1126     def get_default_routes(fib):
1127         list = []
1128         for entry in fib:
1129             if entry.address_length == 0:
1130                 for path in entry.path:
1131                     if path.sw_if_index != 0xFFFFFFFF:
1132                         defaut_route = {}
1133                         defaut_route['sw_if_index'] = path.sw_if_index
1134                         defaut_route['next_hop'] = path.next_hop
1135                         list.append(defaut_route)
1136         return list
1137
1138     @staticmethod
1139     def get_interface_addresses(fib, pg):
1140         list = []
1141         for entry in fib:
1142             if entry.address_length == 128:
1143                 path = entry.path[0]
1144                 if path.sw_if_index == pg.sw_if_index:
1145                     list.append(entry.address)
1146         return list
1147
1148     def test_all(self):
1149         """ Test handling of SLAAC addresses and default routes """
1150
1151         fib = self.vapi.ip6_fib_dump()
1152         default_routes = self.get_default_routes(fib)
1153         initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1154         self.assertEqual(default_routes, [])
1155         router_address = self.pg0.remote_ip6n_ll
1156
1157         self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1158
1159         self.sleep(0.1)
1160
1161         # send RA
1162         packet = (self.create_ra_packet(self.pg0) / ICMPv6NDOptPrefixInfo(
1163             prefix="1::",
1164             prefixlen=64,
1165             validlifetime=2,
1166             preferredlifetime=2,
1167             L=1,
1168             A=1,
1169         ) / ICMPv6NDOptPrefixInfo(
1170             prefix="7::",
1171             prefixlen=20,
1172             validlifetime=1500,
1173             preferredlifetime=1000,
1174             L=1,
1175             A=0,
1176         ))
1177         self.pg0.add_stream([packet])
1178         self.pg_start()
1179
1180         self.sleep(0.1)
1181
1182         fib = self.vapi.ip6_fib_dump()
1183
1184         # check FIB for new address
1185         addresses = set(self.get_interface_addresses(fib, self.pg0))
1186         new_addresses = addresses.difference(initial_addresses)
1187         self.assertEqual(len(new_addresses), 1)
1188         prefix = list(new_addresses)[0][:8] + '\0\0\0\0\0\0\0\0'
1189         self.assertEqual(inet_ntop(AF_INET6, prefix), '1::')
1190
1191         # check FIB for new default route
1192         default_routes = self.get_default_routes(fib)
1193         self.assertEqual(len(default_routes), 1)
1194         dr = default_routes[0]
1195         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1196         self.assertEqual(dr['next_hop'], router_address)
1197
1198         # send RA to delete default route
1199         packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1200         self.pg0.add_stream([packet])
1201         self.pg_start()
1202
1203         self.sleep(0.1)
1204
1205         # check that default route is deleted
1206         fib = self.vapi.ip6_fib_dump()
1207         default_routes = self.get_default_routes(fib)
1208         self.assertEqual(len(default_routes), 0)
1209
1210         self.sleep(0.1)
1211
1212         # send RA
1213         packet = self.create_ra_packet(self.pg0)
1214         self.pg0.add_stream([packet])
1215         self.pg_start()
1216
1217         self.sleep(0.1)
1218
1219         # check FIB for new default route
1220         fib = self.vapi.ip6_fib_dump()
1221         default_routes = self.get_default_routes(fib)
1222         self.assertEqual(len(default_routes), 1)
1223         dr = default_routes[0]
1224         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1225         self.assertEqual(dr['next_hop'], router_address)
1226
1227         # send RA, updating router lifetime to 1s
1228         packet = self.create_ra_packet(self.pg0, 1)
1229         self.pg0.add_stream([packet])
1230         self.pg_start()
1231
1232         self.sleep(0.1)
1233
1234         # check that default route still exists
1235         fib = self.vapi.ip6_fib_dump()
1236         default_routes = self.get_default_routes(fib)
1237         self.assertEqual(len(default_routes), 1)
1238         dr = default_routes[0]
1239         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1240         self.assertEqual(dr['next_hop'], router_address)
1241
1242         self.sleep(1)
1243
1244         # check that default route is deleted
1245         fib = self.vapi.ip6_fib_dump()
1246         default_routes = self.get_default_routes(fib)
1247         self.assertEqual(len(default_routes), 0)
1248
1249         # check FIB still contains the SLAAC address
1250         addresses = set(self.get_interface_addresses(fib, self.pg0))
1251         new_addresses = addresses.difference(initial_addresses)
1252         self.assertEqual(len(new_addresses), 1)
1253         prefix = list(new_addresses)[0][:8] + '\0\0\0\0\0\0\0\0'
1254         self.assertEqual(inet_ntop(AF_INET6, prefix), '1::')
1255
1256         self.sleep(1)
1257
1258         # check that SLAAC address is deleted
1259         fib = self.vapi.ip6_fib_dump()
1260         addresses = set(self.get_interface_addresses(fib, self.pg0))
1261         new_addresses = addresses.difference(initial_addresses)
1262         self.assertEqual(len(new_addresses), 0)
1263
1264
1265 class IPv6NDProxyTest(TestIPv6ND):
1266     """ IPv6 ND ProxyTest Case """
1267
1268     def setUp(self):
1269         super(IPv6NDProxyTest, self).setUp()
1270
1271         # create 3 pg interfaces
1272         self.create_pg_interfaces(range(3))
1273
1274         # pg0 is the master interface, with the configured subnet
1275         self.pg0.admin_up()
1276         self.pg0.config_ip6()
1277         self.pg0.resolve_ndp()
1278
1279         self.pg1.ip6_enable()
1280         self.pg2.ip6_enable()
1281
1282     def tearDown(self):
1283         super(IPv6NDProxyTest, self).tearDown()
1284
1285     def test_nd_proxy(self):
1286         """ IPv6 Proxy ND """
1287
1288         #
1289         # Generate some hosts in the subnet that we are proxying
1290         #
1291         self.pg0.generate_remote_hosts(8)
1292
1293         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1294         d = inet_ntop(AF_INET6, nsma)
1295
1296         #
1297         # Send an NS for one of those remote hosts on one of the proxy links
1298         # expect no response since it's from an address that is not
1299         # on the link that has the prefix configured
1300         #
1301         ns_pg1 = (Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac) /
1302                   IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6) /
1303                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1304                   ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac))
1305
1306         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1307
1308         #
1309         # Add proxy support for the host
1310         #
1311         self.vapi.ip6_nd_proxy(
1312             inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1313             self.pg1.sw_if_index)
1314
1315         #
1316         # try that NS again. this time we expect an NA back
1317         #
1318         self.send_and_expect_na(self.pg1, ns_pg1,
1319                                 "NS to proxy entry",
1320                                 dst_ip=self.pg0._remote_hosts[2].ip6,
1321                                 tgt_ip=self.pg0.local_ip6)
1322
1323         #
1324         # ... and that we have an entry in the ND cache
1325         #
1326         self.assertTrue(find_nbr(self,
1327                                  self.pg1.sw_if_index,
1328                                  self.pg0._remote_hosts[2].ip6,
1329                                  inet=AF_INET6))
1330
1331         #
1332         # ... and we can route traffic to it
1333         #
1334         t = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1335              IPv6(dst=self.pg0._remote_hosts[2].ip6,
1336                   src=self.pg0.remote_ip6) /
1337              UDP(sport=10000, dport=20000) /
1338              Raw('\xa5' * 100))
1339
1340         self.pg0.add_stream(t)
1341         self.pg_enable_capture(self.pg_interfaces)
1342         self.pg_start()
1343         rx = self.pg1.get_capture(1)
1344         rx = rx[0]
1345
1346         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1347         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1348
1349         self.assertEqual(rx[IPv6].src, t[IPv6].src)
1350         self.assertEqual(rx[IPv6].dst, t[IPv6].dst)
1351
1352         #
1353         # Test we proxy for the host on the main interface
1354         #
1355         ns_pg0 = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
1356                   IPv6(dst=d, src=self.pg0.remote_ip6) /
1357                   ICMPv6ND_NS(tgt=self.pg0._remote_hosts[2].ip6) /
1358                   ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
1359
1360         self.send_and_expect_na(self.pg0, ns_pg0,
1361                                 "NS to proxy entry on main",
1362                                 tgt_ip=self.pg0._remote_hosts[2].ip6,
1363                                 dst_ip=self.pg0.remote_ip6)
1364
1365         #
1366         # Setup and resolve proxy for another host on another interface
1367         #
1368         ns_pg2 = (Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac) /
1369                   IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6) /
1370                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1371                   ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac))
1372
1373         self.vapi.ip6_nd_proxy(
1374             inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1375             self.pg2.sw_if_index)
1376
1377         self.send_and_expect_na(self.pg2, ns_pg2,
1378                                 "NS to proxy entry other interface",
1379                                 dst_ip=self.pg0._remote_hosts[3].ip6,
1380                                 tgt_ip=self.pg0.local_ip6)
1381
1382         self.assertTrue(find_nbr(self,
1383                                  self.pg2.sw_if_index,
1384                                  self.pg0._remote_hosts[3].ip6,
1385                                  inet=AF_INET6))
1386
1387         #
1388         # hosts can communicate. pg2->pg1
1389         #
1390         t2 = (Ether(dst=self.pg2.local_mac,
1391                     src=self.pg0.remote_hosts[3].mac) /
1392               IPv6(dst=self.pg0._remote_hosts[2].ip6,
1393                    src=self.pg0._remote_hosts[3].ip6) /
1394               UDP(sport=10000, dport=20000) /
1395               Raw('\xa5' * 100))
1396
1397         self.pg2.add_stream(t2)
1398         self.pg_enable_capture(self.pg_interfaces)
1399         self.pg_start()
1400         rx = self.pg1.get_capture(1)
1401         rx = rx[0]
1402
1403         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1404         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1405
1406         self.assertEqual(rx[IPv6].src, t2[IPv6].src)
1407         self.assertEqual(rx[IPv6].dst, t2[IPv6].dst)
1408
1409         #
1410         # remove the proxy configs
1411         #
1412         self.vapi.ip6_nd_proxy(
1413             inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1414             self.pg1.sw_if_index,
1415             is_del=1)
1416         self.vapi.ip6_nd_proxy(
1417             inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1418             self.pg2.sw_if_index,
1419             is_del=1)
1420
1421         self.assertFalse(find_nbr(self,
1422                                   self.pg2.sw_if_index,
1423                                   self.pg0._remote_hosts[3].ip6,
1424                                   inet=AF_INET6))
1425         self.assertFalse(find_nbr(self,
1426                                   self.pg1.sw_if_index,
1427                                   self.pg0._remote_hosts[2].ip6,
1428                                   inet=AF_INET6))
1429
1430         #
1431         # no longer proxy-ing...
1432         #
1433         self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1434         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1435         self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1436
1437         #
1438         # no longer forwarding. traffic generates NS out of the glean/main
1439         # interface
1440         #
1441         self.pg2.add_stream(t2)
1442         self.pg_enable_capture(self.pg_interfaces)
1443         self.pg_start()
1444
1445         rx = self.pg0.get_capture(1)
1446
1447         self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1448
1449
1450 class TestIPNull(VppTestCase):
1451     """ IPv6 routes via NULL """
1452
1453     def setUp(self):
1454         super(TestIPNull, self).setUp()
1455
1456         # create 2 pg interfaces
1457         self.create_pg_interfaces(range(1))
1458
1459         for i in self.pg_interfaces:
1460             i.admin_up()
1461             i.config_ip6()
1462             i.resolve_ndp()
1463
1464     def tearDown(self):
1465         super(TestIPNull, self).tearDown()
1466         for i in self.pg_interfaces:
1467             i.unconfig_ip6()
1468             i.admin_down()
1469
1470     def test_ip_null(self):
1471         """ IP NULL route """
1472
1473         p = (Ether(src=self.pg0.remote_mac,
1474                    dst=self.pg0.local_mac) /
1475              IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
1476              UDP(sport=1234, dport=1234) /
1477              Raw('\xa5' * 100))
1478
1479         #
1480         # A route via IP NULL that will reply with ICMP unreachables
1481         #
1482         ip_unreach = VppIpRoute(self, "2001::", 64, [], is_unreach=1, is_ip6=1)
1483         ip_unreach.add_vpp_config()
1484
1485         self.pg0.add_stream(p)
1486         self.pg_enable_capture(self.pg_interfaces)
1487         self.pg_start()
1488
1489         rx = self.pg0.get_capture(1)
1490         rx = rx[0]
1491         icmp = rx[ICMPv6DestUnreach]
1492
1493         # 0 = "No route to destination"
1494         self.assertEqual(icmp.code, 0)
1495
1496         # ICMP is rate limited. pause a bit
1497         self.sleep(1)
1498
1499         #
1500         # A route via IP NULL that will reply with ICMP prohibited
1501         #
1502         ip_prohibit = VppIpRoute(self, "2001::1", 128, [],
1503                                  is_prohibit=1, is_ip6=1)
1504         ip_prohibit.add_vpp_config()
1505
1506         self.pg0.add_stream(p)
1507         self.pg_enable_capture(self.pg_interfaces)
1508         self.pg_start()
1509
1510         rx = self.pg0.get_capture(1)
1511         rx = rx[0]
1512         icmp = rx[ICMPv6DestUnreach]
1513
1514         # 1 = "Communication with destination administratively prohibited"
1515         self.assertEqual(icmp.code, 1)
1516
1517
1518 class TestIPDisabled(VppTestCase):
1519     """ IPv6 disabled """
1520
1521     def setUp(self):
1522         super(TestIPDisabled, self).setUp()
1523
1524         # create 2 pg interfaces
1525         self.create_pg_interfaces(range(2))
1526
1527         # PG0 is IP enalbed
1528         self.pg0.admin_up()
1529         self.pg0.config_ip6()
1530         self.pg0.resolve_ndp()
1531
1532         # PG 1 is not IP enabled
1533         self.pg1.admin_up()
1534
1535     def tearDown(self):
1536         super(TestIPDisabled, self).tearDown()
1537         for i in self.pg_interfaces:
1538             i.unconfig_ip4()
1539             i.admin_down()
1540
1541     def test_ip_disabled(self):
1542         """ IP Disabled """
1543
1544         #
1545         # An (S,G).
1546         # one accepting interface, pg0, 2 forwarding interfaces
1547         #
1548         route_ff_01 = VppIpMRoute(
1549             self,
1550             "::",
1551             "ffef::1", 128,
1552             MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
1553             [VppMRoutePath(self.pg1.sw_if_index,
1554                            MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT),
1555              VppMRoutePath(self.pg0.sw_if_index,
1556                            MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)],
1557             is_ip6=1)
1558         route_ff_01.add_vpp_config()
1559
1560         pu = (Ether(src=self.pg1.remote_mac,
1561                     dst=self.pg1.local_mac) /
1562               IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
1563               UDP(sport=1234, dport=1234) /
1564               Raw('\xa5' * 100))
1565         pm = (Ether(src=self.pg1.remote_mac,
1566                     dst=self.pg1.local_mac) /
1567               IPv6(src="2001::1", dst="ffef::1") /
1568               UDP(sport=1234, dport=1234) /
1569               Raw('\xa5' * 100))
1570
1571         #
1572         # PG1 does not forward IP traffic
1573         #
1574         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1575         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1576
1577         #
1578         # IP enable PG1
1579         #
1580         self.pg1.config_ip6()
1581
1582         #
1583         # Now we get packets through
1584         #
1585         self.pg1.add_stream(pu)
1586         self.pg_enable_capture(self.pg_interfaces)
1587         self.pg_start()
1588         rx = self.pg0.get_capture(1)
1589
1590         self.pg1.add_stream(pm)
1591         self.pg_enable_capture(self.pg_interfaces)
1592         self.pg_start()
1593         rx = self.pg0.get_capture(1)
1594
1595         #
1596         # Disable PG1
1597         #
1598         self.pg1.unconfig_ip6()
1599
1600         #
1601         # PG1 does not forward IP traffic
1602         #
1603         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1604         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1605
1606
1607 class TestIP6LoadBalance(VppTestCase):
1608     """ IPv6 Load-Balancing """
1609
1610     def setUp(self):
1611         super(TestIP6LoadBalance, self).setUp()
1612
1613         self.create_pg_interfaces(range(5))
1614
1615         mpls_tbl = VppMplsTable(self, 0)
1616         mpls_tbl.add_vpp_config()
1617
1618         for i in self.pg_interfaces:
1619             i.admin_up()
1620             i.config_ip6()
1621             i.resolve_ndp()
1622             i.enable_mpls()
1623
1624     def tearDown(self):
1625         for i in self.pg_interfaces:
1626             i.unconfig_ip6()
1627             i.admin_down()
1628             i.disable_mpls()
1629         super(TestIP6LoadBalance, self).tearDown()
1630
1631     def send_and_expect_load_balancing(self, input, pkts, outputs):
1632         self.vapi.cli("clear trace")
1633         input.add_stream(pkts)
1634         self.pg_enable_capture(self.pg_interfaces)
1635         self.pg_start()
1636         for oo in outputs:
1637             rx = oo._get_capture(1)
1638             self.assertNotEqual(0, len(rx))
1639
1640     def send_and_expect_one_itf(self, input, pkts, itf):
1641         self.vapi.cli("clear trace")
1642         input.add_stream(pkts)
1643         self.pg_enable_capture(self.pg_interfaces)
1644         self.pg_start()
1645         rx = itf.get_capture(len(pkts))
1646
1647     def test_ip6_load_balance(self):
1648         """ IPv6 Load-Balancing """
1649
1650         #
1651         # An array of packets that differ only in the destination port
1652         #  - IP only
1653         #  - MPLS EOS
1654         #  - MPLS non-EOS
1655         #  - MPLS non-EOS with an entropy label
1656         #
1657         port_ip_pkts = []
1658         port_mpls_pkts = []
1659         port_mpls_neos_pkts = []
1660         port_ent_pkts = []
1661
1662         #
1663         # An array of packets that differ only in the source address
1664         #
1665         src_ip_pkts = []
1666         src_mpls_pkts = []
1667
1668         for ii in range(65):
1669             port_ip_hdr = (IPv6(dst="3000::1", src="3000:1::1") /
1670                            UDP(sport=1234, dport=1234 + ii) /
1671                            Raw('\xa5' * 100))
1672             port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1673                                        dst=self.pg0.local_mac) /
1674                                  port_ip_hdr))
1675             port_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1676                                          dst=self.pg0.local_mac) /
1677                                    MPLS(label=66, ttl=2) /
1678                                    port_ip_hdr))
1679             port_mpls_neos_pkts.append((Ether(src=self.pg0.remote_mac,
1680                                               dst=self.pg0.local_mac) /
1681                                         MPLS(label=67, ttl=2) /
1682                                         MPLS(label=77, ttl=2) /
1683                                         port_ip_hdr))
1684             port_ent_pkts.append((Ether(src=self.pg0.remote_mac,
1685                                         dst=self.pg0.local_mac) /
1686                                   MPLS(label=67, ttl=2) /
1687                                   MPLS(label=14, ttl=2) /
1688                                   MPLS(label=999, ttl=2) /
1689                                   port_ip_hdr))
1690             src_ip_hdr = (IPv6(dst="3000::1", src="3000:1::%d" % ii) /
1691                           UDP(sport=1234, dport=1234) /
1692                           Raw('\xa5' * 100))
1693             src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1694                                       dst=self.pg0.local_mac) /
1695                                 src_ip_hdr))
1696             src_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1697                                         dst=self.pg0.local_mac) /
1698                                   MPLS(label=66, ttl=2) /
1699                                   src_ip_hdr))
1700
1701         #
1702         # A route for the IP pacekts
1703         #
1704         route_3000_1 = VppIpRoute(self, "3000::1", 128,
1705                                   [VppRoutePath(self.pg1.remote_ip6,
1706                                                 self.pg1.sw_if_index,
1707                                                 proto=DpoProto.DPO_PROTO_IP6),
1708                                    VppRoutePath(self.pg2.remote_ip6,
1709                                                 self.pg2.sw_if_index,
1710                                                 proto=DpoProto.DPO_PROTO_IP6)],
1711                                   is_ip6=1)
1712         route_3000_1.add_vpp_config()
1713
1714         #
1715         # a local-label for the EOS packets
1716         #
1717         binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
1718         binding.add_vpp_config()
1719
1720         #
1721         # An MPLS route for the non-EOS packets
1722         #
1723         route_67 = VppMplsRoute(self, 67, 0,
1724                                 [VppRoutePath(self.pg1.remote_ip6,
1725                                               self.pg1.sw_if_index,
1726                                               labels=[67],
1727                                               proto=DpoProto.DPO_PROTO_IP6),
1728                                  VppRoutePath(self.pg2.remote_ip6,
1729                                               self.pg2.sw_if_index,
1730                                               labels=[67],
1731                                               proto=DpoProto.DPO_PROTO_IP6)])
1732         route_67.add_vpp_config()
1733
1734         #
1735         # inject the packet on pg0 - expect load-balancing across the 2 paths
1736         #  - since the default hash config is to use IP src,dst and port
1737         #    src,dst
1738         # We are not going to ensure equal amounts of packets across each link,
1739         # since the hash algorithm is statistical and therefore this can never
1740         # be guaranteed. But wuth 64 different packets we do expect some
1741         # balancing. So instead just ensure there is traffic on each link.
1742         #
1743         self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
1744                                             [self.pg1, self.pg2])
1745         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1746                                             [self.pg1, self.pg2])
1747         self.send_and_expect_load_balancing(self.pg0, port_mpls_pkts,
1748                                             [self.pg1, self.pg2])
1749         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1750                                             [self.pg1, self.pg2])
1751         self.send_and_expect_load_balancing(self.pg0, port_mpls_neos_pkts,
1752                                             [self.pg1, self.pg2])
1753
1754         #
1755         # The packets with Entropy label in should not load-balance,
1756         # since the Entorpy value is fixed.
1757         #
1758         self.send_and_expect_one_itf(self.pg0, port_ent_pkts, self.pg1)
1759
1760         #
1761         # change the flow hash config so it's only IP src,dst
1762         #  - now only the stream with differing source address will
1763         #    load-balance
1764         #
1765         self.vapi.set_ip_flow_hash(0, is_ip6=1, src=1, dst=1, sport=0, dport=0)
1766
1767         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1768                                             [self.pg1, self.pg2])
1769         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1770                                             [self.pg1, self.pg2])
1771         self.send_and_expect_one_itf(self.pg0, port_ip_pkts, self.pg2)
1772
1773         #
1774         # change the flow hash config back to defaults
1775         #
1776         self.vapi.set_ip_flow_hash(0, is_ip6=1, src=1, dst=1, sport=1, dport=1)
1777
1778         #
1779         # Recursive prefixes
1780         #  - testing that 2 stages of load-balancing occurs and there is no
1781         #    polarisation (i.e. only 2 of 4 paths are used)
1782         #
1783         port_pkts = []
1784         src_pkts = []
1785
1786         for ii in range(257):
1787             port_pkts.append((Ether(src=self.pg0.remote_mac,
1788                                     dst=self.pg0.local_mac) /
1789                               IPv6(dst="4000::1", src="4000:1::1") /
1790                               UDP(sport=1234, dport=1234 + ii) /
1791                               Raw('\xa5' * 100)))
1792             src_pkts.append((Ether(src=self.pg0.remote_mac,
1793                                    dst=self.pg0.local_mac) /
1794                              IPv6(dst="4000::1", src="4000:1::%d" % ii) /
1795                              UDP(sport=1234, dport=1234) /
1796                              Raw('\xa5' * 100)))
1797
1798         route_3000_2 = VppIpRoute(self, "3000::2", 128,
1799                                   [VppRoutePath(self.pg3.remote_ip6,
1800                                                 self.pg3.sw_if_index,
1801                                                 proto=DpoProto.DPO_PROTO_IP6),
1802                                    VppRoutePath(self.pg4.remote_ip6,
1803                                                 self.pg4.sw_if_index,
1804                                                 proto=DpoProto.DPO_PROTO_IP6)],
1805                                   is_ip6=1)
1806         route_3000_2.add_vpp_config()
1807
1808         route_4000_1 = VppIpRoute(self, "4000::1", 128,
1809                                   [VppRoutePath("3000::1",
1810                                                 0xffffffff,
1811                                                 proto=DpoProto.DPO_PROTO_IP6),
1812                                    VppRoutePath("3000::2",
1813                                                 0xffffffff,
1814                                                 proto=DpoProto.DPO_PROTO_IP6)],
1815                                   is_ip6=1)
1816         route_4000_1.add_vpp_config()
1817
1818         #
1819         # inject the packet on pg0 - expect load-balancing across all 4 paths
1820         #
1821         self.vapi.cli("clear trace")
1822         self.send_and_expect_load_balancing(self.pg0, port_pkts,
1823                                             [self.pg1, self.pg2,
1824                                              self.pg3, self.pg4])
1825         self.send_and_expect_load_balancing(self.pg0, src_pkts,
1826                                             [self.pg1, self.pg2,
1827                                              self.pg3, self.pg4])
1828
1829         #
1830         # Recursive prefixes
1831         #  - testing that 2 stages of load-balancing no choices
1832         #
1833         port_pkts = []
1834
1835         for ii in range(257):
1836             port_pkts.append((Ether(src=self.pg0.remote_mac,
1837                                     dst=self.pg0.local_mac) /
1838                               IPv6(dst="6000::1", src="6000:1::1") /
1839                               UDP(sport=1234, dport=1234 + ii) /
1840                               Raw('\xa5' * 100)))
1841
1842         route_5000_2 = VppIpRoute(self, "5000::2", 128,
1843                                   [VppRoutePath(self.pg3.remote_ip6,
1844                                                 self.pg3.sw_if_index,
1845                                                 proto=DpoProto.DPO_PROTO_IP6)],
1846                                   is_ip6=1)
1847         route_5000_2.add_vpp_config()
1848
1849         route_6000_1 = VppIpRoute(self, "6000::1", 128,
1850                                   [VppRoutePath("5000::2",
1851                                                 0xffffffff,
1852                                                 proto=DpoProto.DPO_PROTO_IP6)],
1853                                   is_ip6=1)
1854         route_6000_1.add_vpp_config()
1855
1856         #
1857         # inject the packet on pg0 - expect load-balancing across all 4 paths
1858         #
1859         self.vapi.cli("clear trace")
1860         self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg3)
1861
1862
1863 class TestIP6Punt(VppTestCase):
1864     """ IPv6 Punt Police/Redirect """
1865
1866     def setUp(self):
1867         super(TestIP6Punt, self).setUp()
1868
1869         self.create_pg_interfaces(range(2))
1870
1871         for i in self.pg_interfaces:
1872             i.admin_up()
1873             i.config_ip6()
1874             i.resolve_ndp()
1875
1876     def tearDown(self):
1877         super(TestIP6Punt, self).tearDown()
1878         for i in self.pg_interfaces:
1879             i.unconfig_ip6()
1880             i.admin_down()
1881
1882     def test_ip_punt(self):
1883         """ IP6 punt police and redirect """
1884
1885         p = (Ether(src=self.pg0.remote_mac,
1886                    dst=self.pg0.local_mac) /
1887              IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
1888              TCP(sport=1234, dport=1234) /
1889              Raw('\xa5' * 100))
1890
1891         pkts = p * 1025
1892
1893         #
1894         # Configure a punt redirect via pg1.
1895         #
1896         nh_addr = inet_pton(AF_INET6,
1897                             self.pg1.remote_ip6)
1898         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
1899                                    self.pg1.sw_if_index,
1900                                    nh_addr,
1901                                    is_ip6=1)
1902
1903         self.send_and_expect(self.pg0, pkts, self.pg1)
1904
1905         #
1906         # add a policer
1907         #
1908         policer = self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
1909                                             rate_type=1)
1910         self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
1911
1912         self.vapi.cli("clear trace")
1913         self.pg0.add_stream(pkts)
1914         self.pg_enable_capture(self.pg_interfaces)
1915         self.pg_start()
1916
1917         #
1918         # the number of packet recieved should be greater than 0,
1919         # but not equal to the number sent, since some were policed
1920         #
1921         rx = self.pg1._get_capture(1)
1922         self.assertTrue(len(rx) > 0)
1923         self.assertTrue(len(rx) < len(pkts))
1924
1925         #
1926         # remove the poilcer. back to full rx
1927         #
1928         self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
1929         self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
1930                                   rate_type=1, is_add=0)
1931         self.send_and_expect(self.pg0, pkts, self.pg1)
1932
1933         #
1934         # remove the redirect. expect full drop.
1935         #
1936         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
1937                                    self.pg1.sw_if_index,
1938                                    nh_addr,
1939                                    is_add=0,
1940                                    is_ip6=1)
1941         self.send_and_assert_no_replies(self.pg0, pkts,
1942                                         "IP no punt config")
1943
1944         #
1945         # Add a redirect that is not input port selective
1946         #
1947         self.vapi.ip_punt_redirect(0xffffffff,
1948                                    self.pg1.sw_if_index,
1949                                    nh_addr,
1950                                    is_ip6=1)
1951         self.send_and_expect(self.pg0, pkts, self.pg1)
1952
1953         self.vapi.ip_punt_redirect(0xffffffff,
1954                                    self.pg1.sw_if_index,
1955                                    nh_addr,
1956                                    is_add=0,
1957                                    is_ip6=1)
1958
1959
1960 class TestIPDeag(VppTestCase):
1961     """ IPv6 Deaggregate Routes """
1962
1963     def setUp(self):
1964         super(TestIPDeag, self).setUp()
1965
1966         self.create_pg_interfaces(range(3))
1967
1968         for i in self.pg_interfaces:
1969             i.admin_up()
1970             i.config_ip6()
1971             i.resolve_ndp()
1972
1973     def tearDown(self):
1974         super(TestIPDeag, self).tearDown()
1975         for i in self.pg_interfaces:
1976             i.unconfig_ip6()
1977             i.admin_down()
1978
1979     def test_ip_deag(self):
1980         """ IP Deag Routes """
1981
1982         #
1983         # Create a table to be used for:
1984         #  1 - another destination address lookup
1985         #  2 - a source address lookup
1986         #
1987         table_dst = VppIpTable(self, 1, is_ip6=1)
1988         table_src = VppIpTable(self, 2, is_ip6=1)
1989         table_dst.add_vpp_config()
1990         table_src.add_vpp_config()
1991
1992         #
1993         # Add a route in the default table to point to a deag/
1994         # second lookup in each of these tables
1995         #
1996         route_to_dst = VppIpRoute(self, "1::1", 128,
1997                                   [VppRoutePath("::",
1998                                                 0xffffffff,
1999                                                 nh_table_id=1,
2000                                                 proto=DpoProto.DPO_PROTO_IP6)],
2001                                   is_ip6=1)
2002         route_to_src = VppIpRoute(self, "1::2", 128,
2003                                   [VppRoutePath("::",
2004                                                 0xffffffff,
2005                                                 nh_table_id=2,
2006                                                 is_source_lookup=1,
2007                                                 proto=DpoProto.DPO_PROTO_IP6)],
2008                                   is_ip6=1)
2009         route_to_dst.add_vpp_config()
2010         route_to_src.add_vpp_config()
2011
2012         #
2013         # packets to these destination are dropped, since they'll
2014         # hit the respective default routes in the second table
2015         #
2016         p_dst = (Ether(src=self.pg0.remote_mac,
2017                        dst=self.pg0.local_mac) /
2018                  IPv6(src="5::5", dst="1::1") /
2019                  TCP(sport=1234, dport=1234) /
2020                  Raw('\xa5' * 100))
2021         p_src = (Ether(src=self.pg0.remote_mac,
2022                        dst=self.pg0.local_mac) /
2023                  IPv6(src="2::2", dst="1::2") /
2024                  TCP(sport=1234, dport=1234) /
2025                  Raw('\xa5' * 100))
2026         pkts_dst = p_dst * 257
2027         pkts_src = p_src * 257
2028
2029         self.send_and_assert_no_replies(self.pg0, pkts_dst,
2030                                         "IP in dst table")
2031         self.send_and_assert_no_replies(self.pg0, pkts_src,
2032                                         "IP in src table")
2033
2034         #
2035         # add a route in the dst table to forward via pg1
2036         #
2037         route_in_dst = VppIpRoute(self, "1::1", 128,
2038                                   [VppRoutePath(self.pg1.remote_ip6,
2039                                                 self.pg1.sw_if_index,
2040                                                 proto=DpoProto.DPO_PROTO_IP6)],
2041                                   is_ip6=1,
2042                                   table_id=1)
2043         route_in_dst.add_vpp_config()
2044
2045         self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2046
2047         #
2048         # add a route in the src table to forward via pg2
2049         #
2050         route_in_src = VppIpRoute(self, "2::2", 128,
2051                                   [VppRoutePath(self.pg2.remote_ip6,
2052                                                 self.pg2.sw_if_index,
2053                                                 proto=DpoProto.DPO_PROTO_IP6)],
2054                                   is_ip6=1,
2055                                   table_id=2)
2056         route_in_src.add_vpp_config()
2057         self.send_and_expect(self.pg0, pkts_src, self.pg2)
2058
2059         #
2060         # loop in the lookup DP
2061         #
2062         route_loop = VppIpRoute(self, "3::3", 128,
2063                                 [VppRoutePath("::",
2064                                               0xffffffff,
2065                                               proto=DpoProto.DPO_PROTO_IP6)],
2066                                 is_ip6=1)
2067         route_loop.add_vpp_config()
2068
2069         p_l = (Ether(src=self.pg0.remote_mac,
2070                      dst=self.pg0.local_mac) /
2071                IPv6(src="3::4", dst="3::3") /
2072                TCP(sport=1234, dport=1234) /
2073                Raw('\xa5' * 100))
2074
2075         self.send_and_assert_no_replies(self.pg0, p_l * 257,
2076                                         "IP lookup loop")
2077
2078
2079 class TestIP6Input(VppTestCase):
2080     """ IPv6 Input Exceptions """
2081
2082     def setUp(self):
2083         super(TestIP6Input, self).setUp()
2084
2085         self.create_pg_interfaces(range(2))
2086
2087         for i in self.pg_interfaces:
2088             i.admin_up()
2089             i.config_ip6()
2090             i.resolve_ndp()
2091
2092     def tearDown(self):
2093         super(TestIP6Input, self).tearDown()
2094         for i in self.pg_interfaces:
2095             i.unconfig_ip6()
2096             i.admin_down()
2097
2098     def test_ip_input(self):
2099         """ IP6 Input Exceptions """
2100
2101         #
2102         # bad version - this is dropped
2103         #
2104         p_version = (Ether(src=self.pg0.remote_mac,
2105                            dst=self.pg0.local_mac) /
2106                      IPv6(src=self.pg0.remote_ip6,
2107                           dst=self.pg1.remote_ip6,
2108                           version=3) /
2109                      UDP(sport=1234, dport=1234) /
2110                      Raw('\xa5' * 100))
2111
2112         self.send_and_assert_no_replies(self.pg0, p_version * 65,
2113                                         "funky version")
2114
2115         #
2116         # hop limit - IMCP replies
2117         #
2118         p_version = (Ether(src=self.pg0.remote_mac,
2119                            dst=self.pg0.local_mac) /
2120                      IPv6(src=self.pg0.remote_ip6,
2121                           dst=self.pg1.remote_ip6,
2122                           hlim=1) /
2123                      UDP(sport=1234, dport=1234) /
2124                      Raw('\xa5' * 100))
2125
2126         rx = self.send_and_expect(self.pg0, p_version * 65, self.pg0)
2127         rx = rx[0]
2128         icmp = rx[ICMPv6TimeExceeded]
2129         self.assertEqual(icmp.type, 3)
2130         # 0: "hop limit exceeded in transit",
2131         self.assertEqual(icmp.code, 0)
2132
2133
2134 if __name__ == '__main__':
2135     unittest.main(testRunner=VppTestRunner)