IPv6 ND Router discovery control plane (VPP-1095)
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5
6 from framework import VppTestCase, VppTestRunner
7 from util import ppp, ip6_normalize
8 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
9 from vpp_pg_interface import is_ipv6_misc
10 from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
11     VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
12     VppMplsRoute, DpoProto, VppMplsTable
13 from vpp_neighbor import find_nbr, VppNeighbor
14
15 from scapy.packet import Raw
16 from scapy.layers.l2 import Ether, Dot1Q
17 from scapy.layers.inet6 import IPv6, UDP, TCP, ICMPv6ND_NS, ICMPv6ND_RS, \
18     ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
19     ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
20     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
21     ICMPv6TimeExceeded
22 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
23     in6_mactoifaceid, in6_ismaddr
24 from scapy.utils import inet_pton, inet_ntop
25 from scapy.contrib.mpls import MPLS
26
27
28 AF_INET6 = socket.AF_INET6
29
30
31 def mk_ll_addr(mac):
32     euid = in6_mactoifaceid(mac)
33     addr = "fe80::" + euid
34     return addr
35
36
37 class TestIPv6ND(VppTestCase):
38     def validate_ra(self, intf, rx, dst_ip=None):
39         if not dst_ip:
40             dst_ip = intf.remote_ip6
41
42         # unicasted packets must come to the unicast mac
43         self.assertEqual(rx[Ether].dst, intf.remote_mac)
44
45         # and from the router's MAC
46         self.assertEqual(rx[Ether].src, intf.local_mac)
47
48         # the rx'd RA should be addressed to the sender's source
49         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
50         self.assertEqual(in6_ptop(rx[IPv6].dst),
51                          in6_ptop(dst_ip))
52
53         # and come from the router's link local
54         self.assertTrue(in6_islladdr(rx[IPv6].src))
55         self.assertEqual(in6_ptop(rx[IPv6].src),
56                          in6_ptop(mk_ll_addr(intf.local_mac)))
57
58     def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
59         if not dst_ip:
60             dst_ip = intf.remote_ip6
61         if not tgt_ip:
62             dst_ip = intf.local_ip6
63
64         # unicasted packets must come to the unicast mac
65         self.assertEqual(rx[Ether].dst, intf.remote_mac)
66
67         # and from the router's MAC
68         self.assertEqual(rx[Ether].src, intf.local_mac)
69
70         # the rx'd NA should be addressed to the sender's source
71         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
72         self.assertEqual(in6_ptop(rx[IPv6].dst),
73                          in6_ptop(dst_ip))
74
75         # and come from the target address
76         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
77
78         # Dest link-layer options should have the router's MAC
79         dll = rx[ICMPv6NDOptDstLLAddr]
80         self.assertEqual(dll.lladdr, intf.local_mac)
81
82     def validate_ns(self, intf, rx, tgt_ip):
83         nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
84         dst_ip = inet_ntop(AF_INET6, nsma)
85
86         # NS is broadcast
87         self.assertEqual(rx[Ether].dst, "ff:ff:ff:ff:ff:ff")
88
89         # and from the router's MAC
90         self.assertEqual(rx[Ether].src, intf.local_mac)
91
92         # the rx'd NS should be addressed to an mcast address
93         # derived from the target address
94         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
95
96         # expect the tgt IP in the NS header
97         ns = rx[ICMPv6ND_NS]
98         self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
99
100         # packet is from the router's local address
101         self.assertEqual(in6_ptop(rx[IPv6].src), intf.local_ip6)
102
103         # Src link-layer options should have the router's MAC
104         sll = rx[ICMPv6NDOptSrcLLAddr]
105         self.assertEqual(sll.lladdr, intf.local_mac)
106
107     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
108                            filter_out_fn=is_ipv6_misc):
109         intf.add_stream(pkts)
110         self.pg_enable_capture(self.pg_interfaces)
111         self.pg_start()
112         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
113
114         self.assertEqual(len(rx), 1)
115         rx = rx[0]
116         self.validate_ra(intf, rx, dst_ip)
117
118     def send_and_expect_na(self, intf, pkts, remark, dst_ip=None,
119                            tgt_ip=None,
120                            filter_out_fn=is_ipv6_misc):
121         intf.add_stream(pkts)
122         self.pg_enable_capture(self.pg_interfaces)
123         self.pg_start()
124         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
125
126         self.assertEqual(len(rx), 1)
127         rx = rx[0]
128         self.validate_na(intf, rx, dst_ip, tgt_ip)
129
130     def send_and_expect_ns(self, tx_intf, rx_intf, pkts, tgt_ip,
131                            filter_out_fn=is_ipv6_misc):
132         tx_intf.add_stream(pkts)
133         self.pg_enable_capture(self.pg_interfaces)
134         self.pg_start()
135         rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
136
137         self.assertEqual(len(rx), 1)
138         rx = rx[0]
139         self.validate_ns(rx_intf, rx, tgt_ip)
140
141     def verify_ip(self, rx, smac, dmac, sip, dip):
142         ether = rx[Ether]
143         self.assertEqual(ether.dst, dmac)
144         self.assertEqual(ether.src, smac)
145
146         ip = rx[IPv6]
147         self.assertEqual(ip.src, sip)
148         self.assertEqual(ip.dst, dip)
149
150
151 class TestIPv6(TestIPv6ND):
152     """ IPv6 Test Case """
153
154     @classmethod
155     def setUpClass(cls):
156         super(TestIPv6, cls).setUpClass()
157
158     def setUp(self):
159         """
160         Perform test setup before test case.
161
162         **Config:**
163             - create 3 pg interfaces
164                 - untagged pg0 interface
165                 - Dot1Q subinterface on pg1
166                 - Dot1AD subinterface on pg2
167             - setup interfaces:
168                 - put it into UP state
169                 - set IPv6 addresses
170                 - resolve neighbor address using NDP
171             - configure 200 fib entries
172
173         :ivar list interfaces: pg interfaces and subinterfaces.
174         :ivar dict flows: IPv4 packet flows in test.
175         :ivar list pg_if_packet_sizes: packet sizes in test.
176
177         *TODO:* Create AD sub interface
178         """
179         super(TestIPv6, self).setUp()
180
181         # create 3 pg interfaces
182         self.create_pg_interfaces(range(3))
183
184         # create 2 subinterfaces for p1 and pg2
185         self.sub_interfaces = [
186             VppDot1QSubint(self, self.pg1, 100),
187             VppDot1QSubint(self, self.pg2, 200)
188             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
189         ]
190
191         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
192         self.flows = dict()
193         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
194         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
195         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
196
197         # packet sizes
198         self.pg_if_packet_sizes = [64, 512, 1518, 9018]
199         self.sub_if_packet_sizes = [64, 512, 1518 + 4, 9018 + 4]
200
201         self.interfaces = list(self.pg_interfaces)
202         self.interfaces.extend(self.sub_interfaces)
203
204         # setup all interfaces
205         for i in self.interfaces:
206             i.admin_up()
207             i.config_ip6()
208             i.resolve_ndp()
209
210         # config 2M FIB entries
211         self.config_fib_entries(200)
212
213     def tearDown(self):
214         """Run standard test teardown and log ``show ip6 neighbors``."""
215         for i in self.sub_interfaces:
216             i.unconfig_ip6()
217             i.ip6_disable()
218             i.admin_down()
219             i.remove_vpp_config()
220
221         super(TestIPv6, self).tearDown()
222         if not self.vpp_dead:
223             self.logger.info(self.vapi.cli("show ip6 neighbors"))
224             # info(self.vapi.cli("show ip6 fib"))  # many entries
225
226     def config_fib_entries(self, count):
227         """For each interface add to the FIB table *count* routes to
228         "fd02::1/128" destination with interface's local address as next-hop
229         address.
230
231         :param int count: Number of FIB entries.
232
233         - *TODO:* check if the next-hop address shouldn't be remote address
234           instead of local address.
235         """
236         n_int = len(self.interfaces)
237         percent = 0
238         counter = 0.0
239         dest_addr = inet_pton(AF_INET6, "fd02::1")
240         dest_addr_len = 128
241         for i in self.interfaces:
242             next_hop_address = i.local_ip6n
243             for j in range(count / n_int):
244                 self.vapi.ip_add_del_route(
245                     dest_addr, dest_addr_len, next_hop_address, is_ipv6=1)
246                 counter += 1
247                 if counter / count * 100 > percent:
248                     self.logger.info("Configure %d FIB entries .. %d%% done" %
249                                      (count, percent))
250                     percent += 1
251
252     def create_stream(self, src_if, packet_sizes):
253         """Create input packet stream for defined interface.
254
255         :param VppInterface src_if: Interface to create packet stream for.
256         :param list packet_sizes: Required packet sizes.
257         """
258         pkts = []
259         for i in range(0, 257):
260             dst_if = self.flows[src_if][i % 2]
261             info = self.create_packet_info(src_if, dst_if)
262             payload = self.info_to_payload(info)
263             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
264                  IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
265                  UDP(sport=1234, dport=1234) /
266                  Raw(payload))
267             info.data = p.copy()
268             if isinstance(src_if, VppSubInterface):
269                 p = src_if.add_dot1_layer(p)
270             size = packet_sizes[(i // 2) % len(packet_sizes)]
271             self.extend_packet(p, size)
272             pkts.append(p)
273         return pkts
274
275     def verify_capture(self, dst_if, capture):
276         """Verify captured input packet stream for defined interface.
277
278         :param VppInterface dst_if: Interface to verify captured packet stream
279                                     for.
280         :param list capture: Captured packet stream.
281         """
282         self.logger.info("Verifying capture on interface %s" % dst_if.name)
283         last_info = dict()
284         for i in self.interfaces:
285             last_info[i.sw_if_index] = None
286         is_sub_if = False
287         dst_sw_if_index = dst_if.sw_if_index
288         if hasattr(dst_if, 'parent'):
289             is_sub_if = True
290         for packet in capture:
291             if is_sub_if:
292                 # Check VLAN tags and Ethernet header
293                 packet = dst_if.remove_dot1_layer(packet)
294             self.assertTrue(Dot1Q not in packet)
295             try:
296                 ip = packet[IPv6]
297                 udp = packet[UDP]
298                 payload_info = self.payload_to_info(str(packet[Raw]))
299                 packet_index = payload_info.index
300                 self.assertEqual(payload_info.dst, dst_sw_if_index)
301                 self.logger.debug(
302                     "Got packet on port %s: src=%u (id=%u)" %
303                     (dst_if.name, payload_info.src, packet_index))
304                 next_info = self.get_next_packet_info_for_interface2(
305                     payload_info.src, dst_sw_if_index,
306                     last_info[payload_info.src])
307                 last_info[payload_info.src] = next_info
308                 self.assertTrue(next_info is not None)
309                 self.assertEqual(packet_index, next_info.index)
310                 saved_packet = next_info.data
311                 # Check standard fields
312                 self.assertEqual(ip.src, saved_packet[IPv6].src)
313                 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
314                 self.assertEqual(udp.sport, saved_packet[UDP].sport)
315                 self.assertEqual(udp.dport, saved_packet[UDP].dport)
316             except:
317                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
318                 raise
319         for i in self.interfaces:
320             remaining_packet = self.get_next_packet_info_for_interface2(
321                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
322             self.assertTrue(remaining_packet is None,
323                             "Interface %s: Packet expected from interface %s "
324                             "didn't arrive" % (dst_if.name, i.name))
325
326     def test_fib(self):
327         """ IPv6 FIB test
328
329         Test scenario:
330             - Create IPv6 stream for pg0 interface
331             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
332             - Send and verify received packets on each interface.
333         """
334
335         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes)
336         self.pg0.add_stream(pkts)
337
338         for i in self.sub_interfaces:
339             pkts = self.create_stream(i, self.sub_if_packet_sizes)
340             i.parent.add_stream(pkts)
341
342         self.pg_enable_capture(self.pg_interfaces)
343         self.pg_start()
344
345         pkts = self.pg0.get_capture()
346         self.verify_capture(self.pg0, pkts)
347
348         for i in self.sub_interfaces:
349             pkts = i.parent.get_capture()
350             self.verify_capture(i, pkts)
351
352     def test_ns(self):
353         """ IPv6 Neighbour Solicitation Exceptions
354
355         Test scenario:
356            - Send an NS Sourced from an address not covered by the link sub-net
357            - Send an NS to an mcast address the router has not joined
358            - Send NS for a target address the router does not onn.
359         """
360
361         #
362         # An NS from a non link source address
363         #
364         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
365         d = inet_ntop(AF_INET6, nsma)
366
367         p = (Ether(dst=in6_getnsmac(nsma)) /
368              IPv6(dst=d, src="2002::2") /
369              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
370              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
371         pkts = [p]
372
373         self.send_and_assert_no_replies(
374             self.pg0, pkts,
375             "No response to NS source by address not on sub-net")
376
377         #
378         # An NS for sent to a solicited mcast group the router is
379         # not a member of FAILS
380         #
381         if 0:
382             nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
383             d = inet_ntop(AF_INET6, nsma)
384
385             p = (Ether(dst=in6_getnsmac(nsma)) /
386                  IPv6(dst=d, src=self.pg0.remote_ip6) /
387                  ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
388                  ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
389             pkts = [p]
390
391             self.send_and_assert_no_replies(
392                 self.pg0, pkts,
393                 "No response to NS sent to unjoined mcast address")
394
395         #
396         # An NS whose target address is one the router does not own
397         #
398         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
399         d = inet_ntop(AF_INET6, nsma)
400
401         p = (Ether(dst=in6_getnsmac(nsma)) /
402              IPv6(dst=d, src=self.pg0.remote_ip6) /
403              ICMPv6ND_NS(tgt="fd::ffff") /
404              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
405         pkts = [p]
406
407         self.send_and_assert_no_replies(self.pg0, pkts,
408                                         "No response to NS for unknown target")
409
410         #
411         # A neighbor entry that has no associated FIB-entry
412         #
413         self.pg0.generate_remote_hosts(4)
414         nd_entry = VppNeighbor(self,
415                                self.pg0.sw_if_index,
416                                self.pg0.remote_hosts[2].mac,
417                                self.pg0.remote_hosts[2].ip6,
418                                af=AF_INET6,
419                                is_no_fib_entry=1)
420         nd_entry.add_vpp_config()
421
422         #
423         # check we have the neighbor, but no route
424         #
425         self.assertTrue(find_nbr(self,
426                                  self.pg0.sw_if_index,
427                                  self.pg0._remote_hosts[2].ip6,
428                                  inet=AF_INET6))
429         self.assertFalse(find_route(self,
430                                     self.pg0._remote_hosts[2].ip6,
431                                     128,
432                                     inet=AF_INET6))
433
434         #
435         # send an NS from a link local address to the interface's global
436         # address
437         #
438         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
439              IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6_ll) /
440              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
441              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
442
443         self.send_and_expect_na(self.pg0, p,
444                                 "NS from link-local",
445                                 dst_ip=self.pg0._remote_hosts[2].ip6_ll,
446                                 tgt_ip=self.pg0.local_ip6)
447
448         #
449         # we should have learned an ND entry for the peer's link-local
450         # but not inserted a route to it in the FIB
451         #
452         self.assertTrue(find_nbr(self,
453                                  self.pg0.sw_if_index,
454                                  self.pg0._remote_hosts[2].ip6_ll,
455                                  inet=AF_INET6))
456         self.assertFalse(find_route(self,
457                                     self.pg0._remote_hosts[2].ip6_ll,
458                                     128,
459                                     inet=AF_INET6))
460
461         #
462         # An NS to the router's own Link-local
463         #
464         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
465              IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll) /
466              ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll) /
467              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
468
469         self.send_and_expect_na(self.pg0, p,
470                                 "NS to/from link-local",
471                                 dst_ip=self.pg0._remote_hosts[3].ip6_ll,
472                                 tgt_ip=self.pg0.local_ip6_ll)
473
474         #
475         # we should have learned an ND entry for the peer's link-local
476         # but not inserted a route to it in the FIB
477         #
478         self.assertTrue(find_nbr(self,
479                                  self.pg0.sw_if_index,
480                                  self.pg0._remote_hosts[3].ip6_ll,
481                                  inet=AF_INET6))
482         self.assertFalse(find_route(self,
483                                     self.pg0._remote_hosts[3].ip6_ll,
484                                     128,
485                                     inet=AF_INET6))
486
487     def test_ns_duplicates(self):
488         """ ND Duplicates"""
489
490         #
491         # Generate some hosts on the LAN
492         #
493         self.pg1.generate_remote_hosts(3)
494
495         #
496         # Add host 1 on pg1 and pg2
497         #
498         ns_pg1 = VppNeighbor(self,
499                              self.pg1.sw_if_index,
500                              self.pg1.remote_hosts[1].mac,
501                              self.pg1.remote_hosts[1].ip6,
502                              af=AF_INET6)
503         ns_pg1.add_vpp_config()
504         ns_pg2 = VppNeighbor(self,
505                              self.pg2.sw_if_index,
506                              self.pg2.remote_mac,
507                              self.pg1.remote_hosts[1].ip6,
508                              af=AF_INET6)
509         ns_pg2.add_vpp_config()
510
511         #
512         # IP packet destined for pg1 remote host arrives on pg1 again.
513         #
514         p = (Ether(dst=self.pg0.local_mac,
515                    src=self.pg0.remote_mac) /
516              IPv6(src=self.pg0.remote_ip6,
517                   dst=self.pg1.remote_hosts[1].ip6) /
518              UDP(sport=1234, dport=1234) /
519              Raw())
520
521         self.pg0.add_stream(p)
522         self.pg_enable_capture(self.pg_interfaces)
523         self.pg_start()
524
525         rx1 = self.pg1.get_capture(1)
526
527         self.verify_ip(rx1[0],
528                        self.pg1.local_mac,
529                        self.pg1.remote_hosts[1].mac,
530                        self.pg0.remote_ip6,
531                        self.pg1.remote_hosts[1].ip6)
532
533         #
534         # remove the duplicate on pg1
535         # packet stream shoud generate NSs out of pg1
536         #
537         ns_pg1.remove_vpp_config()
538
539         self.send_and_expect_ns(self.pg0, self.pg1,
540                                 p, self.pg1.remote_hosts[1].ip6)
541
542         #
543         # Add it back
544         #
545         ns_pg1.add_vpp_config()
546
547         self.pg0.add_stream(p)
548         self.pg_enable_capture(self.pg_interfaces)
549         self.pg_start()
550
551         rx1 = self.pg1.get_capture(1)
552
553         self.verify_ip(rx1[0],
554                        self.pg1.local_mac,
555                        self.pg1.remote_hosts[1].mac,
556                        self.pg0.remote_ip6,
557                        self.pg1.remote_hosts[1].ip6)
558
559     def validate_ra(self, intf, rx, dst_ip=None, mtu=9000, pi_opt=None):
560         if not dst_ip:
561             dst_ip = intf.remote_ip6
562
563         # unicasted packets must come to the unicast mac
564         self.assertEqual(rx[Ether].dst, intf.remote_mac)
565
566         # and from the router's MAC
567         self.assertEqual(rx[Ether].src, intf.local_mac)
568
569         # the rx'd RA should be addressed to the sender's source
570         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
571         self.assertEqual(in6_ptop(rx[IPv6].dst),
572                          in6_ptop(dst_ip))
573
574         # and come from the router's link local
575         self.assertTrue(in6_islladdr(rx[IPv6].src))
576         self.assertEqual(in6_ptop(rx[IPv6].src),
577                          in6_ptop(mk_ll_addr(intf.local_mac)))
578
579         # it should contain the links MTU
580         ra = rx[ICMPv6ND_RA]
581         self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
582
583         # it should contain the source's link layer address option
584         sll = ra[ICMPv6NDOptSrcLLAddr]
585         self.assertEqual(sll.lladdr, intf.local_mac)
586
587         if not pi_opt:
588             # the RA should not contain prefix information
589             self.assertFalse(ra.haslayer(ICMPv6NDOptPrefixInfo))
590         else:
591             raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
592
593             # the options are nested in the scapy packet in way that i cannot
594             # decipher how to decode. this 1st layer of option always returns
595             # nested classes, so a direct obj1=obj2 comparison always fails.
596             # however, the getlayer(.., 2) does give one instnace.
597             # so we cheat here and construct a new opt instnace for comparison
598             rd = ICMPv6NDOptPrefixInfo(prefixlen=raos.prefixlen,
599                                        prefix=raos.prefix,
600                                        L=raos.L,
601                                        A=raos.A)
602             if type(pi_opt) is list:
603                 for ii in range(len(pi_opt)):
604                     self.assertEqual(pi_opt[ii], rd)
605                     rd = rx.getlayer(ICMPv6NDOptPrefixInfo, ii+2)
606             else:
607                 self.assertEqual(pi_opt, raos)
608
609     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
610                            filter_out_fn=is_ipv6_misc,
611                            opt=None):
612         intf.add_stream(pkts)
613         self.pg_enable_capture(self.pg_interfaces)
614         self.pg_start()
615         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
616
617         self.assertEqual(len(rx), 1)
618         rx = rx[0]
619         self.validate_ra(intf, rx, dst_ip, pi_opt=opt)
620
621     def test_rs(self):
622         """ IPv6 Router Solicitation Exceptions
623
624         Test scenario:
625         """
626
627         #
628         # Before we begin change the IPv6 RA responses to use the unicast
629         # address - that way we will not confuse them with the periodic
630         # RAs which go to the mcast address
631         # Sit and wait for the first periodic RA.
632         #
633         # TODO
634         #
635         self.pg0.ip6_ra_config(send_unicast=1)
636
637         #
638         # An RS from a link source address
639         #  - expect an RA in return
640         #
641         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
642              IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
643              ICMPv6ND_RS())
644         pkts = [p]
645         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
646
647         #
648         # For the next RS sent the RA should be rate limited
649         #
650         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
651
652         #
653         # When we reconfiure the IPv6 RA config, we reset the RA rate limiting,
654         # so we need to do this before each test below so as not to drop
655         # packets for rate limiting reasons. Test this works here.
656         #
657         self.pg0.ip6_ra_config(send_unicast=1)
658         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
659
660         #
661         # An RS sent from a non-link local source
662         #
663         self.pg0.ip6_ra_config(send_unicast=1)
664         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
665              IPv6(dst=self.pg0.local_ip6, src="2002::ffff") /
666              ICMPv6ND_RS())
667         pkts = [p]
668         self.send_and_assert_no_replies(self.pg0, pkts,
669                                         "RS from non-link source")
670
671         #
672         # Source an RS from a link local address
673         #
674         self.pg0.ip6_ra_config(send_unicast=1)
675         ll = mk_ll_addr(self.pg0.remote_mac)
676         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
677              IPv6(dst=self.pg0.local_ip6, src=ll) /
678              ICMPv6ND_RS())
679         pkts = [p]
680         self.send_and_expect_ra(self.pg0, pkts,
681                                 "RS sourced from link-local",
682                                 dst_ip=ll)
683
684         #
685         # Send the RS multicast
686         #
687         self.pg0.ip6_ra_config(send_unicast=1)
688         dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
689         ll = mk_ll_addr(self.pg0.remote_mac)
690         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
691              IPv6(dst="ff02::2", src=ll) /
692              ICMPv6ND_RS())
693         pkts = [p]
694         self.send_and_expect_ra(self.pg0, pkts,
695                                 "RS sourced from link-local",
696                                 dst_ip=ll)
697
698         #
699         # Source from the unspecified address ::. This happens when the RS
700         # is sent before the host has a configured address/sub-net,
701         # i.e. auto-config. Since the sender has no IP address, the reply
702         # comes back mcast - so the capture needs to not filter this.
703         # If we happen to pick up the periodic RA at this point then so be it,
704         # it's not an error.
705         #
706         self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
707         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
708              IPv6(dst="ff02::2", src="::") /
709              ICMPv6ND_RS())
710         pkts = [p]
711         self.send_and_expect_ra(self.pg0, pkts,
712                                 "RS sourced from unspecified",
713                                 dst_ip="ff02::1",
714                                 filter_out_fn=None)
715
716         #
717         # Configure The RA to announce the links prefix
718         #
719         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
720                                self.pg0.local_ip6_prefix_len)
721
722         #
723         # RAs should now contain the prefix information option
724         #
725         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
726                                     prefix=self.pg0.local_ip6,
727                                     L=1,
728                                     A=1)
729
730         self.pg0.ip6_ra_config(send_unicast=1)
731         ll = mk_ll_addr(self.pg0.remote_mac)
732         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
733              IPv6(dst=self.pg0.local_ip6, src=ll) /
734              ICMPv6ND_RS())
735         self.send_and_expect_ra(self.pg0, p,
736                                 "RA with prefix-info",
737                                 dst_ip=ll,
738                                 opt=opt)
739
740         #
741         # Change the prefix info to not off-link
742         #  L-flag is clear
743         #
744         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
745                                self.pg0.local_ip6_prefix_len,
746                                off_link=1)
747
748         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
749                                     prefix=self.pg0.local_ip6,
750                                     L=0,
751                                     A=1)
752
753         self.pg0.ip6_ra_config(send_unicast=1)
754         self.send_and_expect_ra(self.pg0, p,
755                                 "RA with Prefix info with L-flag=0",
756                                 dst_ip=ll,
757                                 opt=opt)
758
759         #
760         # Change the prefix info to not off-link, no-autoconfig
761         #  L and A flag are clear in the advert
762         #
763         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
764                                self.pg0.local_ip6_prefix_len,
765                                off_link=1,
766                                no_autoconfig=1)
767
768         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
769                                     prefix=self.pg0.local_ip6,
770                                     L=0,
771                                     A=0)
772
773         self.pg0.ip6_ra_config(send_unicast=1)
774         self.send_and_expect_ra(self.pg0, p,
775                                 "RA with Prefix info with A & L-flag=0",
776                                 dst_ip=ll,
777                                 opt=opt)
778
779         #
780         # Change the flag settings back to the defaults
781         #  L and A flag are set in the advert
782         #
783         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
784                                self.pg0.local_ip6_prefix_len)
785
786         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
787                                     prefix=self.pg0.local_ip6,
788                                     L=1,
789                                     A=1)
790
791         self.pg0.ip6_ra_config(send_unicast=1)
792         self.send_and_expect_ra(self.pg0, p,
793                                 "RA with Prefix info",
794                                 dst_ip=ll,
795                                 opt=opt)
796
797         #
798         # Change the prefix info to not off-link, no-autoconfig
799         #  L and A flag are clear in the advert
800         #
801         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
802                                self.pg0.local_ip6_prefix_len,
803                                off_link=1,
804                                no_autoconfig=1)
805
806         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
807                                     prefix=self.pg0.local_ip6,
808                                     L=0,
809                                     A=0)
810
811         self.pg0.ip6_ra_config(send_unicast=1)
812         self.send_and_expect_ra(self.pg0, p,
813                                 "RA with Prefix info with A & L-flag=0",
814                                 dst_ip=ll,
815                                 opt=opt)
816
817         #
818         # Use the reset to defults option to revert to defaults
819         #  L and A flag are clear in the advert
820         #
821         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
822                                self.pg0.local_ip6_prefix_len,
823                                use_default=1)
824
825         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
826                                     prefix=self.pg0.local_ip6,
827                                     L=1,
828                                     A=1)
829
830         self.pg0.ip6_ra_config(send_unicast=1)
831         self.send_and_expect_ra(self.pg0, p,
832                                 "RA with Prefix reverted to defaults",
833                                 dst_ip=ll,
834                                 opt=opt)
835
836         #
837         # Advertise Another prefix. With no L-flag/A-flag
838         #
839         self.pg0.ip6_ra_prefix(self.pg1.local_ip6n,
840                                self.pg1.local_ip6_prefix_len,
841                                off_link=1,
842                                no_autoconfig=1)
843
844         opt = [ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
845                                      prefix=self.pg0.local_ip6,
846                                      L=1,
847                                      A=1),
848                ICMPv6NDOptPrefixInfo(prefixlen=self.pg1.local_ip6_prefix_len,
849                                      prefix=self.pg1.local_ip6,
850                                      L=0,
851                                      A=0)]
852
853         self.pg0.ip6_ra_config(send_unicast=1)
854         ll = mk_ll_addr(self.pg0.remote_mac)
855         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
856              IPv6(dst=self.pg0.local_ip6, src=ll) /
857              ICMPv6ND_RS())
858         self.send_and_expect_ra(self.pg0, p,
859                                 "RA with multiple Prefix infos",
860                                 dst_ip=ll,
861                                 opt=opt)
862
863         #
864         # Remove the first refix-info - expect the second is still in the
865         # advert
866         #
867         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
868                                self.pg0.local_ip6_prefix_len,
869                                is_no=1)
870
871         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg1.local_ip6_prefix_len,
872                                     prefix=self.pg1.local_ip6,
873                                     L=0,
874                                     A=0)
875
876         self.pg0.ip6_ra_config(send_unicast=1)
877         self.send_and_expect_ra(self.pg0, p,
878                                 "RA with Prefix reverted to defaults",
879                                 dst_ip=ll,
880                                 opt=opt)
881
882         #
883         # Remove the second prefix-info - expect no prefix-info i nthe adverts
884         #
885         self.pg0.ip6_ra_prefix(self.pg1.local_ip6n,
886                                self.pg1.local_ip6_prefix_len,
887                                is_no=1)
888
889         self.pg0.ip6_ra_config(send_unicast=1)
890         self.send_and_expect_ra(self.pg0, p,
891                                 "RA with Prefix reverted to defaults",
892                                 dst_ip=ll)
893
894         #
895         # Reset the periodic advertisements back to default values
896         #
897         self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
898
899
900 class TestIPv6RD(TestIPv6ND):
901     """ IPv6 Router Discovery Test Case """
902
903     @classmethod
904     def setUpClass(cls):
905         super(TestIPv6RD, cls).setUpClass()
906
907     def setUp(self):
908         super(TestIPv6RD, self).setUp()
909
910         # create 2 pg interfaces
911         self.create_pg_interfaces(range(2))
912
913         self.interfaces = list(self.pg_interfaces)
914
915         # setup all interfaces
916         for i in self.interfaces:
917             i.admin_up()
918             i.config_ip6()
919
920     def tearDown(self):
921         super(TestIPv6RD, self).tearDown()
922
923     def test_rd_send_router_solicitation(self):
924         """ Verify router solicitation packets """
925
926         count = 2
927         self.pg_enable_capture(self.pg_interfaces)
928         self.pg_start()
929         self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index,
930                                                  mrc=count)
931         rx_list = self.pg1.get_capture(count, timeout=3)
932         self.assertEqual(len(rx_list), count)
933         for packet in rx_list:
934             self.assertTrue(packet.haslayer(IPv6))
935             self.assertTrue(packet[IPv6].haslayer(ICMPv6ND_RS))
936             dst = ip6_normalize(packet[IPv6].dst)
937             dst2 = ip6_normalize("ff02::2")
938             self.assert_equal(dst, dst2)
939             src = ip6_normalize(packet[IPv6].src)
940             src2 = ip6_normalize(self.pg1.local_ip6_ll)
941             self.assert_equal(src, src2)
942             self.assertTrue(packet[ICMPv6ND_RS].haslayer(ICMPv6NDOptSrcLLAddr))
943             self.assert_equal(packet[ICMPv6NDOptSrcLLAddr].lladdr,
944                               self.pg1.local_mac)
945
946     def verify_prefix_info(self, reported_prefix, prefix_option):
947         prefix = socket.inet_pton(socket.AF_INET6,
948                                   prefix_option.getfieldval("prefix"))
949         self.assert_equal(reported_prefix.dst_address, prefix)
950         self.assert_equal(reported_prefix.dst_address_length,
951                           prefix_option.getfieldval("prefixlen"))
952         L = prefix_option.getfieldval("L")
953         A = prefix_option.getfieldval("A")
954         option_flags = (L << 7) | (A << 6)
955         self.assert_equal(reported_prefix.flags, option_flags)
956         self.assert_equal(reported_prefix.valid_time,
957                           prefix_option.getfieldval("validlifetime"))
958         self.assert_equal(reported_prefix.preferred_time,
959                           prefix_option.getfieldval("preferredlifetime"))
960
961     def test_rd_receive_router_advertisement(self):
962         """ Verify events triggered by received RA packets """
963
964         self.vapi.want_ip6_ra_events()
965
966         prefix_info_1 = ICMPv6NDOptPrefixInfo(
967             prefix="1::2",
968             prefixlen=50,
969             validlifetime=200,
970             preferredlifetime=500,
971             L=1,
972             A=1,
973         )
974
975         prefix_info_2 = ICMPv6NDOptPrefixInfo(
976             prefix="7::4",
977             prefixlen=20,
978             validlifetime=70,
979             preferredlifetime=1000,
980             L=1,
981             A=0,
982         )
983
984         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
985              IPv6(dst=self.pg1.local_ip6_ll,
986                   src=mk_ll_addr(self.pg1.remote_mac)) /
987              ICMPv6ND_RA() /
988              prefix_info_1 /
989              prefix_info_2)
990         self.pg1.add_stream([p])
991         self.pg_start()
992
993         ev = self.vapi.wait_for_event(10, "ip6_ra_event")
994
995         self.assert_equal(ev.current_hop_limit, 0)
996         self.assert_equal(ev.flags, 8)
997         self.assert_equal(ev.router_lifetime_in_sec, 1800)
998         self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
999         self.assert_equal(
1000             ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0)
1001
1002         self.assert_equal(ev.n_prefixes, 2)
1003
1004         self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1005         self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1006
1007
1008 class TestIPv6RDControlPlane(TestIPv6ND):
1009     """ IPv6 Router Discovery Control Plane Test Case """
1010
1011     @classmethod
1012     def setUpClass(cls):
1013         super(TestIPv6RDControlPlane, cls).setUpClass()
1014
1015     def setUp(self):
1016         super(TestIPv6RDControlPlane, self).setUp()
1017
1018         # create 1 pg interface
1019         self.create_pg_interfaces(range(1))
1020
1021         self.interfaces = list(self.pg_interfaces)
1022
1023         # setup all interfaces
1024         for i in self.interfaces:
1025             i.admin_up()
1026             i.config_ip6()
1027
1028     def tearDown(self):
1029         super(TestIPv6RDControlPlane, self).tearDown()
1030
1031     @staticmethod
1032     def create_ra_packet(pg, routerlifetime=None):
1033         src_ip = pg.remote_ip6_ll
1034         dst_ip = pg.local_ip6
1035         if routerlifetime is not None:
1036             ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1037         else:
1038             ra = ICMPv6ND_RA()
1039         p = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
1040              IPv6(dst=dst_ip, src=src_ip) / ra)
1041         return p
1042
1043     @staticmethod
1044     def get_default_routes(fib):
1045         list = []
1046         for entry in fib:
1047             if entry.address_length == 0:
1048                 for path in entry.path:
1049                     if path.sw_if_index != 0xFFFFFFFF:
1050                         defaut_route = {}
1051                         defaut_route['sw_if_index'] = path.sw_if_index
1052                         defaut_route['next_hop'] = path.next_hop
1053                         list.append(defaut_route)
1054         return list
1055
1056     @staticmethod
1057     def get_interface_addresses(fib, pg):
1058         list = []
1059         for entry in fib:
1060             if entry.address_length == 128:
1061                 path = entry.path[0]
1062                 if path.sw_if_index == pg.sw_if_index:
1063                     list.append(entry.address)
1064         return list
1065
1066     def test_all(self):
1067         """ Test handling of SLAAC addresses and default routes """
1068
1069         fib = self.vapi.ip6_fib_dump()
1070         default_routes = self.get_default_routes(fib)
1071         initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1072         self.assertEqual(default_routes, [])
1073         router_address = self.pg0.remote_ip6n_ll
1074
1075         self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1076
1077         self.sleep(0.1)
1078
1079         # send RA
1080         packet = (self.create_ra_packet(self.pg0) / ICMPv6NDOptPrefixInfo(
1081             prefix="1::",
1082             prefixlen=64,
1083             validlifetime=2,
1084             preferredlifetime=2,
1085             L=1,
1086             A=1,
1087         ) / ICMPv6NDOptPrefixInfo(
1088             prefix="7::",
1089             prefixlen=20,
1090             validlifetime=1500,
1091             preferredlifetime=1000,
1092             L=1,
1093             A=0,
1094         ))
1095         self.pg0.add_stream([packet])
1096         self.pg_start()
1097
1098         self.sleep(0.1)
1099
1100         fib = self.vapi.ip6_fib_dump()
1101
1102         # check FIB for new address
1103         addresses = set(self.get_interface_addresses(fib, self.pg0))
1104         new_addresses = addresses.difference(initial_addresses)
1105         self.assertEqual(len(new_addresses), 1)
1106         prefix = list(new_addresses)[0][:8] + '\0\0\0\0\0\0\0\0'
1107         self.assertEqual(inet_ntop(AF_INET6, prefix), '1::')
1108
1109         # check FIB for new default route
1110         default_routes = self.get_default_routes(fib)
1111         self.assertEqual(len(default_routes), 1)
1112         dr = default_routes[0]
1113         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1114         self.assertEqual(dr['next_hop'], router_address)
1115
1116         # send RA to delete default route
1117         packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1118         self.pg0.add_stream([packet])
1119         self.pg_start()
1120
1121         self.sleep(0.1)
1122
1123         # check that default route is deleted
1124         fib = self.vapi.ip6_fib_dump()
1125         default_routes = self.get_default_routes(fib)
1126         self.assertEqual(len(default_routes), 0)
1127
1128         self.sleep(0.1)
1129
1130         # send RA
1131         packet = self.create_ra_packet(self.pg0)
1132         self.pg0.add_stream([packet])
1133         self.pg_start()
1134
1135         self.sleep(0.1)
1136
1137         # check FIB for new default route
1138         fib = self.vapi.ip6_fib_dump()
1139         default_routes = self.get_default_routes(fib)
1140         self.assertEqual(len(default_routes), 1)
1141         dr = default_routes[0]
1142         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1143         self.assertEqual(dr['next_hop'], router_address)
1144
1145         # send RA, updating router lifetime to 1s
1146         packet = self.create_ra_packet(self.pg0, 1)
1147         self.pg0.add_stream([packet])
1148         self.pg_start()
1149
1150         self.sleep(0.1)
1151
1152         # check that default route still exists
1153         fib = self.vapi.ip6_fib_dump()
1154         default_routes = self.get_default_routes(fib)
1155         self.assertEqual(len(default_routes), 1)
1156         dr = default_routes[0]
1157         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1158         self.assertEqual(dr['next_hop'], router_address)
1159
1160         self.sleep(1)
1161
1162         # check that default route is deleted
1163         fib = self.vapi.ip6_fib_dump()
1164         default_routes = self.get_default_routes(fib)
1165         self.assertEqual(len(default_routes), 0)
1166
1167         # check FIB still contains the SLAAC address
1168         addresses = set(self.get_interface_addresses(fib, self.pg0))
1169         new_addresses = addresses.difference(initial_addresses)
1170         self.assertEqual(len(new_addresses), 1)
1171         prefix = list(new_addresses)[0][:8] + '\0\0\0\0\0\0\0\0'
1172         self.assertEqual(inet_ntop(AF_INET6, prefix), '1::')
1173
1174         self.sleep(1)
1175
1176         # check that SLAAC address is deleted
1177         fib = self.vapi.ip6_fib_dump()
1178         addresses = set(self.get_interface_addresses(fib, self.pg0))
1179         new_addresses = addresses.difference(initial_addresses)
1180         self.assertEqual(len(new_addresses), 0)
1181
1182
1183 class IPv6NDProxyTest(TestIPv6ND):
1184     """ IPv6 ND ProxyTest Case """
1185
1186     def setUp(self):
1187         super(IPv6NDProxyTest, self).setUp()
1188
1189         # create 3 pg interfaces
1190         self.create_pg_interfaces(range(3))
1191
1192         # pg0 is the master interface, with the configured subnet
1193         self.pg0.admin_up()
1194         self.pg0.config_ip6()
1195         self.pg0.resolve_ndp()
1196
1197         self.pg1.ip6_enable()
1198         self.pg2.ip6_enable()
1199
1200     def tearDown(self):
1201         super(IPv6NDProxyTest, self).tearDown()
1202
1203     def test_nd_proxy(self):
1204         """ IPv6 Proxy ND """
1205
1206         #
1207         # Generate some hosts in the subnet that we are proxying
1208         #
1209         self.pg0.generate_remote_hosts(8)
1210
1211         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1212         d = inet_ntop(AF_INET6, nsma)
1213
1214         #
1215         # Send an NS for one of those remote hosts on one of the proxy links
1216         # expect no response since it's from an address that is not
1217         # on the link that has the prefix configured
1218         #
1219         ns_pg1 = (Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac) /
1220                   IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6) /
1221                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1222                   ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac))
1223
1224         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1225
1226         #
1227         # Add proxy support for the host
1228         #
1229         self.vapi.ip6_nd_proxy(
1230             inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1231             self.pg1.sw_if_index)
1232
1233         #
1234         # try that NS again. this time we expect an NA back
1235         #
1236         self.send_and_expect_na(self.pg1, ns_pg1,
1237                                 "NS to proxy entry",
1238                                 dst_ip=self.pg0._remote_hosts[2].ip6,
1239                                 tgt_ip=self.pg0.local_ip6)
1240
1241         #
1242         # ... and that we have an entry in the ND cache
1243         #
1244         self.assertTrue(find_nbr(self,
1245                                  self.pg1.sw_if_index,
1246                                  self.pg0._remote_hosts[2].ip6,
1247                                  inet=AF_INET6))
1248
1249         #
1250         # ... and we can route traffic to it
1251         #
1252         t = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1253              IPv6(dst=self.pg0._remote_hosts[2].ip6,
1254                   src=self.pg0.remote_ip6) /
1255              UDP(sport=10000, dport=20000) /
1256              Raw('\xa5' * 100))
1257
1258         self.pg0.add_stream(t)
1259         self.pg_enable_capture(self.pg_interfaces)
1260         self.pg_start()
1261         rx = self.pg1.get_capture(1)
1262         rx = rx[0]
1263
1264         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1265         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1266
1267         self.assertEqual(rx[IPv6].src, t[IPv6].src)
1268         self.assertEqual(rx[IPv6].dst, t[IPv6].dst)
1269
1270         #
1271         # Test we proxy for the host on the main interface
1272         #
1273         ns_pg0 = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
1274                   IPv6(dst=d, src=self.pg0.remote_ip6) /
1275                   ICMPv6ND_NS(tgt=self.pg0._remote_hosts[2].ip6) /
1276                   ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
1277
1278         self.send_and_expect_na(self.pg0, ns_pg0,
1279                                 "NS to proxy entry on main",
1280                                 tgt_ip=self.pg0._remote_hosts[2].ip6,
1281                                 dst_ip=self.pg0.remote_ip6)
1282
1283         #
1284         # Setup and resolve proxy for another host on another interface
1285         #
1286         ns_pg2 = (Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac) /
1287                   IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6) /
1288                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1289                   ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac))
1290
1291         self.vapi.ip6_nd_proxy(
1292             inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1293             self.pg2.sw_if_index)
1294
1295         self.send_and_expect_na(self.pg2, ns_pg2,
1296                                 "NS to proxy entry other interface",
1297                                 dst_ip=self.pg0._remote_hosts[3].ip6,
1298                                 tgt_ip=self.pg0.local_ip6)
1299
1300         self.assertTrue(find_nbr(self,
1301                                  self.pg2.sw_if_index,
1302                                  self.pg0._remote_hosts[3].ip6,
1303                                  inet=AF_INET6))
1304
1305         #
1306         # hosts can communicate. pg2->pg1
1307         #
1308         t2 = (Ether(dst=self.pg2.local_mac,
1309                     src=self.pg0.remote_hosts[3].mac) /
1310               IPv6(dst=self.pg0._remote_hosts[2].ip6,
1311                    src=self.pg0._remote_hosts[3].ip6) /
1312               UDP(sport=10000, dport=20000) /
1313               Raw('\xa5' * 100))
1314
1315         self.pg2.add_stream(t2)
1316         self.pg_enable_capture(self.pg_interfaces)
1317         self.pg_start()
1318         rx = self.pg1.get_capture(1)
1319         rx = rx[0]
1320
1321         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1322         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1323
1324         self.assertEqual(rx[IPv6].src, t2[IPv6].src)
1325         self.assertEqual(rx[IPv6].dst, t2[IPv6].dst)
1326
1327         #
1328         # remove the proxy configs
1329         #
1330         self.vapi.ip6_nd_proxy(
1331             inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1332             self.pg1.sw_if_index,
1333             is_del=1)
1334         self.vapi.ip6_nd_proxy(
1335             inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1336             self.pg2.sw_if_index,
1337             is_del=1)
1338
1339         self.assertFalse(find_nbr(self,
1340                                   self.pg2.sw_if_index,
1341                                   self.pg0._remote_hosts[3].ip6,
1342                                   inet=AF_INET6))
1343         self.assertFalse(find_nbr(self,
1344                                   self.pg1.sw_if_index,
1345                                   self.pg0._remote_hosts[2].ip6,
1346                                   inet=AF_INET6))
1347
1348         #
1349         # no longer proxy-ing...
1350         #
1351         self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1352         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1353         self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1354
1355         #
1356         # no longer forwarding. traffic generates NS out of the glean/main
1357         # interface
1358         #
1359         self.pg2.add_stream(t2)
1360         self.pg_enable_capture(self.pg_interfaces)
1361         self.pg_start()
1362
1363         rx = self.pg0.get_capture(1)
1364
1365         self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1366
1367
1368 class TestIPNull(VppTestCase):
1369     """ IPv6 routes via NULL """
1370
1371     def setUp(self):
1372         super(TestIPNull, self).setUp()
1373
1374         # create 2 pg interfaces
1375         self.create_pg_interfaces(range(1))
1376
1377         for i in self.pg_interfaces:
1378             i.admin_up()
1379             i.config_ip6()
1380             i.resolve_ndp()
1381
1382     def tearDown(self):
1383         super(TestIPNull, self).tearDown()
1384         for i in self.pg_interfaces:
1385             i.unconfig_ip6()
1386             i.admin_down()
1387
1388     def test_ip_null(self):
1389         """ IP NULL route """
1390
1391         p = (Ether(src=self.pg0.remote_mac,
1392                    dst=self.pg0.local_mac) /
1393              IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
1394              UDP(sport=1234, dport=1234) /
1395              Raw('\xa5' * 100))
1396
1397         #
1398         # A route via IP NULL that will reply with ICMP unreachables
1399         #
1400         ip_unreach = VppIpRoute(self, "2001::", 64, [], is_unreach=1, is_ip6=1)
1401         ip_unreach.add_vpp_config()
1402
1403         self.pg0.add_stream(p)
1404         self.pg_enable_capture(self.pg_interfaces)
1405         self.pg_start()
1406
1407         rx = self.pg0.get_capture(1)
1408         rx = rx[0]
1409         icmp = rx[ICMPv6DestUnreach]
1410
1411         # 0 = "No route to destination"
1412         self.assertEqual(icmp.code, 0)
1413
1414         # ICMP is rate limited. pause a bit
1415         self.sleep(1)
1416
1417         #
1418         # A route via IP NULL that will reply with ICMP prohibited
1419         #
1420         ip_prohibit = VppIpRoute(self, "2001::1", 128, [],
1421                                  is_prohibit=1, is_ip6=1)
1422         ip_prohibit.add_vpp_config()
1423
1424         self.pg0.add_stream(p)
1425         self.pg_enable_capture(self.pg_interfaces)
1426         self.pg_start()
1427
1428         rx = self.pg0.get_capture(1)
1429         rx = rx[0]
1430         icmp = rx[ICMPv6DestUnreach]
1431
1432         # 1 = "Communication with destination administratively prohibited"
1433         self.assertEqual(icmp.code, 1)
1434
1435
1436 class TestIPDisabled(VppTestCase):
1437     """ IPv6 disabled """
1438
1439     def setUp(self):
1440         super(TestIPDisabled, self).setUp()
1441
1442         # create 2 pg interfaces
1443         self.create_pg_interfaces(range(2))
1444
1445         # PG0 is IP enalbed
1446         self.pg0.admin_up()
1447         self.pg0.config_ip6()
1448         self.pg0.resolve_ndp()
1449
1450         # PG 1 is not IP enabled
1451         self.pg1.admin_up()
1452
1453     def tearDown(self):
1454         super(TestIPDisabled, self).tearDown()
1455         for i in self.pg_interfaces:
1456             i.unconfig_ip4()
1457             i.admin_down()
1458
1459     def test_ip_disabled(self):
1460         """ IP Disabled """
1461
1462         #
1463         # An (S,G).
1464         # one accepting interface, pg0, 2 forwarding interfaces
1465         #
1466         route_ff_01 = VppIpMRoute(
1467             self,
1468             "::",
1469             "ffef::1", 128,
1470             MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
1471             [VppMRoutePath(self.pg1.sw_if_index,
1472                            MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT),
1473              VppMRoutePath(self.pg0.sw_if_index,
1474                            MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)],
1475             is_ip6=1)
1476         route_ff_01.add_vpp_config()
1477
1478         pu = (Ether(src=self.pg1.remote_mac,
1479                     dst=self.pg1.local_mac) /
1480               IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
1481               UDP(sport=1234, dport=1234) /
1482               Raw('\xa5' * 100))
1483         pm = (Ether(src=self.pg1.remote_mac,
1484                     dst=self.pg1.local_mac) /
1485               IPv6(src="2001::1", dst="ffef::1") /
1486               UDP(sport=1234, dport=1234) /
1487               Raw('\xa5' * 100))
1488
1489         #
1490         # PG1 does not forward IP traffic
1491         #
1492         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1493         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1494
1495         #
1496         # IP enable PG1
1497         #
1498         self.pg1.config_ip6()
1499
1500         #
1501         # Now we get packets through
1502         #
1503         self.pg1.add_stream(pu)
1504         self.pg_enable_capture(self.pg_interfaces)
1505         self.pg_start()
1506         rx = self.pg0.get_capture(1)
1507
1508         self.pg1.add_stream(pm)
1509         self.pg_enable_capture(self.pg_interfaces)
1510         self.pg_start()
1511         rx = self.pg0.get_capture(1)
1512
1513         #
1514         # Disable PG1
1515         #
1516         self.pg1.unconfig_ip6()
1517
1518         #
1519         # PG1 does not forward IP traffic
1520         #
1521         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1522         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1523
1524
1525 class TestIP6LoadBalance(VppTestCase):
1526     """ IPv6 Load-Balancing """
1527
1528     def setUp(self):
1529         super(TestIP6LoadBalance, self).setUp()
1530
1531         self.create_pg_interfaces(range(5))
1532
1533         mpls_tbl = VppMplsTable(self, 0)
1534         mpls_tbl.add_vpp_config()
1535
1536         for i in self.pg_interfaces:
1537             i.admin_up()
1538             i.config_ip6()
1539             i.resolve_ndp()
1540             i.enable_mpls()
1541
1542     def tearDown(self):
1543         for i in self.pg_interfaces:
1544             i.unconfig_ip6()
1545             i.admin_down()
1546             i.disable_mpls()
1547         super(TestIP6LoadBalance, self).tearDown()
1548
1549     def send_and_expect_load_balancing(self, input, pkts, outputs):
1550         self.vapi.cli("clear trace")
1551         input.add_stream(pkts)
1552         self.pg_enable_capture(self.pg_interfaces)
1553         self.pg_start()
1554         for oo in outputs:
1555             rx = oo._get_capture(1)
1556             self.assertNotEqual(0, len(rx))
1557
1558     def send_and_expect_one_itf(self, input, pkts, itf):
1559         self.vapi.cli("clear trace")
1560         input.add_stream(pkts)
1561         self.pg_enable_capture(self.pg_interfaces)
1562         self.pg_start()
1563         rx = itf.get_capture(len(pkts))
1564
1565     def test_ip6_load_balance(self):
1566         """ IPv6 Load-Balancing """
1567
1568         #
1569         # An array of packets that differ only in the destination port
1570         #  - IP only
1571         #  - MPLS EOS
1572         #  - MPLS non-EOS
1573         #  - MPLS non-EOS with an entropy label
1574         #
1575         port_ip_pkts = []
1576         port_mpls_pkts = []
1577         port_mpls_neos_pkts = []
1578         port_ent_pkts = []
1579
1580         #
1581         # An array of packets that differ only in the source address
1582         #
1583         src_ip_pkts = []
1584         src_mpls_pkts = []
1585
1586         for ii in range(65):
1587             port_ip_hdr = (IPv6(dst="3000::1", src="3000:1::1") /
1588                            UDP(sport=1234, dport=1234 + ii) /
1589                            Raw('\xa5' * 100))
1590             port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1591                                        dst=self.pg0.local_mac) /
1592                                  port_ip_hdr))
1593             port_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1594                                          dst=self.pg0.local_mac) /
1595                                    MPLS(label=66, ttl=2) /
1596                                    port_ip_hdr))
1597             port_mpls_neos_pkts.append((Ether(src=self.pg0.remote_mac,
1598                                               dst=self.pg0.local_mac) /
1599                                         MPLS(label=67, ttl=2) /
1600                                         MPLS(label=77, ttl=2) /
1601                                         port_ip_hdr))
1602             port_ent_pkts.append((Ether(src=self.pg0.remote_mac,
1603                                         dst=self.pg0.local_mac) /
1604                                   MPLS(label=67, ttl=2) /
1605                                   MPLS(label=14, ttl=2) /
1606                                   MPLS(label=999, ttl=2) /
1607                                   port_ip_hdr))
1608             src_ip_hdr = (IPv6(dst="3000::1", src="3000:1::%d" % ii) /
1609                           UDP(sport=1234, dport=1234) /
1610                           Raw('\xa5' * 100))
1611             src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1612                                       dst=self.pg0.local_mac) /
1613                                 src_ip_hdr))
1614             src_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1615                                         dst=self.pg0.local_mac) /
1616                                   MPLS(label=66, ttl=2) /
1617                                   src_ip_hdr))
1618
1619         #
1620         # A route for the IP pacekts
1621         #
1622         route_3000_1 = VppIpRoute(self, "3000::1", 128,
1623                                   [VppRoutePath(self.pg1.remote_ip6,
1624                                                 self.pg1.sw_if_index,
1625                                                 proto=DpoProto.DPO_PROTO_IP6),
1626                                    VppRoutePath(self.pg2.remote_ip6,
1627                                                 self.pg2.sw_if_index,
1628                                                 proto=DpoProto.DPO_PROTO_IP6)],
1629                                   is_ip6=1)
1630         route_3000_1.add_vpp_config()
1631
1632         #
1633         # a local-label for the EOS packets
1634         #
1635         binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
1636         binding.add_vpp_config()
1637
1638         #
1639         # An MPLS route for the non-EOS packets
1640         #
1641         route_67 = VppMplsRoute(self, 67, 0,
1642                                 [VppRoutePath(self.pg1.remote_ip6,
1643                                               self.pg1.sw_if_index,
1644                                               labels=[67],
1645                                               proto=DpoProto.DPO_PROTO_IP6),
1646                                  VppRoutePath(self.pg2.remote_ip6,
1647                                               self.pg2.sw_if_index,
1648                                               labels=[67],
1649                                               proto=DpoProto.DPO_PROTO_IP6)])
1650         route_67.add_vpp_config()
1651
1652         #
1653         # inject the packet on pg0 - expect load-balancing across the 2 paths
1654         #  - since the default hash config is to use IP src,dst and port
1655         #    src,dst
1656         # We are not going to ensure equal amounts of packets across each link,
1657         # since the hash algorithm is statistical and therefore this can never
1658         # be guaranteed. But wuth 64 different packets we do expect some
1659         # balancing. So instead just ensure there is traffic on each link.
1660         #
1661         self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
1662                                             [self.pg1, self.pg2])
1663         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1664                                             [self.pg1, self.pg2])
1665         self.send_and_expect_load_balancing(self.pg0, port_mpls_pkts,
1666                                             [self.pg1, self.pg2])
1667         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1668                                             [self.pg1, self.pg2])
1669         self.send_and_expect_load_balancing(self.pg0, port_mpls_neos_pkts,
1670                                             [self.pg1, self.pg2])
1671
1672         #
1673         # The packets with Entropy label in should not load-balance,
1674         # since the Entorpy value is fixed.
1675         #
1676         self.send_and_expect_one_itf(self.pg0, port_ent_pkts, self.pg1)
1677
1678         #
1679         # change the flow hash config so it's only IP src,dst
1680         #  - now only the stream with differing source address will
1681         #    load-balance
1682         #
1683         self.vapi.set_ip_flow_hash(0, is_ip6=1, src=1, dst=1, sport=0, dport=0)
1684
1685         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1686                                             [self.pg1, self.pg2])
1687         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1688                                             [self.pg1, self.pg2])
1689         self.send_and_expect_one_itf(self.pg0, port_ip_pkts, self.pg2)
1690
1691         #
1692         # change the flow hash config back to defaults
1693         #
1694         self.vapi.set_ip_flow_hash(0, is_ip6=1, src=1, dst=1, sport=1, dport=1)
1695
1696         #
1697         # Recursive prefixes
1698         #  - testing that 2 stages of load-balancing occurs and there is no
1699         #    polarisation (i.e. only 2 of 4 paths are used)
1700         #
1701         port_pkts = []
1702         src_pkts = []
1703
1704         for ii in range(257):
1705             port_pkts.append((Ether(src=self.pg0.remote_mac,
1706                                     dst=self.pg0.local_mac) /
1707                               IPv6(dst="4000::1", src="4000:1::1") /
1708                               UDP(sport=1234, dport=1234 + ii) /
1709                               Raw('\xa5' * 100)))
1710             src_pkts.append((Ether(src=self.pg0.remote_mac,
1711                                    dst=self.pg0.local_mac) /
1712                              IPv6(dst="4000::1", src="4000:1::%d" % ii) /
1713                              UDP(sport=1234, dport=1234) /
1714                              Raw('\xa5' * 100)))
1715
1716         route_3000_2 = VppIpRoute(self, "3000::2", 128,
1717                                   [VppRoutePath(self.pg3.remote_ip6,
1718                                                 self.pg3.sw_if_index,
1719                                                 proto=DpoProto.DPO_PROTO_IP6),
1720                                    VppRoutePath(self.pg4.remote_ip6,
1721                                                 self.pg4.sw_if_index,
1722                                                 proto=DpoProto.DPO_PROTO_IP6)],
1723                                   is_ip6=1)
1724         route_3000_2.add_vpp_config()
1725
1726         route_4000_1 = VppIpRoute(self, "4000::1", 128,
1727                                   [VppRoutePath("3000::1",
1728                                                 0xffffffff,
1729                                                 proto=DpoProto.DPO_PROTO_IP6),
1730                                    VppRoutePath("3000::2",
1731                                                 0xffffffff,
1732                                                 proto=DpoProto.DPO_PROTO_IP6)],
1733                                   is_ip6=1)
1734         route_4000_1.add_vpp_config()
1735
1736         #
1737         # inject the packet on pg0 - expect load-balancing across all 4 paths
1738         #
1739         self.vapi.cli("clear trace")
1740         self.send_and_expect_load_balancing(self.pg0, port_pkts,
1741                                             [self.pg1, self.pg2,
1742                                              self.pg3, self.pg4])
1743         self.send_and_expect_load_balancing(self.pg0, src_pkts,
1744                                             [self.pg1, self.pg2,
1745                                              self.pg3, self.pg4])
1746
1747         #
1748         # Recursive prefixes
1749         #  - testing that 2 stages of load-balancing no choices
1750         #
1751         port_pkts = []
1752
1753         for ii in range(257):
1754             port_pkts.append((Ether(src=self.pg0.remote_mac,
1755                                     dst=self.pg0.local_mac) /
1756                               IPv6(dst="6000::1", src="6000:1::1") /
1757                               UDP(sport=1234, dport=1234 + ii) /
1758                               Raw('\xa5' * 100)))
1759
1760         route_5000_2 = VppIpRoute(self, "5000::2", 128,
1761                                   [VppRoutePath(self.pg3.remote_ip6,
1762                                                 self.pg3.sw_if_index,
1763                                                 proto=DpoProto.DPO_PROTO_IP6)],
1764                                   is_ip6=1)
1765         route_5000_2.add_vpp_config()
1766
1767         route_6000_1 = VppIpRoute(self, "6000::1", 128,
1768                                   [VppRoutePath("5000::2",
1769                                                 0xffffffff,
1770                                                 proto=DpoProto.DPO_PROTO_IP6)],
1771                                   is_ip6=1)
1772         route_6000_1.add_vpp_config()
1773
1774         #
1775         # inject the packet on pg0 - expect load-balancing across all 4 paths
1776         #
1777         self.vapi.cli("clear trace")
1778         self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg3)
1779
1780
1781 class TestIP6Punt(VppTestCase):
1782     """ IPv6 Punt Police/Redirect """
1783
1784     def setUp(self):
1785         super(TestIP6Punt, self).setUp()
1786
1787         self.create_pg_interfaces(range(2))
1788
1789         for i in self.pg_interfaces:
1790             i.admin_up()
1791             i.config_ip6()
1792             i.resolve_ndp()
1793
1794     def tearDown(self):
1795         super(TestIP6Punt, self).tearDown()
1796         for i in self.pg_interfaces:
1797             i.unconfig_ip6()
1798             i.admin_down()
1799
1800     def test_ip_punt(self):
1801         """ IP6 punt police and redirect """
1802
1803         p = (Ether(src=self.pg0.remote_mac,
1804                    dst=self.pg0.local_mac) /
1805              IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
1806              TCP(sport=1234, dport=1234) /
1807              Raw('\xa5' * 100))
1808
1809         pkts = p * 1025
1810
1811         #
1812         # Configure a punt redirect via pg1.
1813         #
1814         nh_addr = inet_pton(AF_INET6,
1815                             self.pg1.remote_ip6)
1816         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
1817                                    self.pg1.sw_if_index,
1818                                    nh_addr,
1819                                    is_ip6=1)
1820
1821         self.send_and_expect(self.pg0, pkts, self.pg1)
1822
1823         #
1824         # add a policer
1825         #
1826         policer = self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
1827                                             rate_type=1)
1828         self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
1829
1830         self.vapi.cli("clear trace")
1831         self.pg0.add_stream(pkts)
1832         self.pg_enable_capture(self.pg_interfaces)
1833         self.pg_start()
1834
1835         #
1836         # the number of packet recieved should be greater than 0,
1837         # but not equal to the number sent, since some were policed
1838         #
1839         rx = self.pg1._get_capture(1)
1840         self.assertTrue(len(rx) > 0)
1841         self.assertTrue(len(rx) < len(pkts))
1842
1843         #
1844         # remove the poilcer. back to full rx
1845         #
1846         self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
1847         self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
1848                                   rate_type=1, is_add=0)
1849         self.send_and_expect(self.pg0, pkts, self.pg1)
1850
1851         #
1852         # remove the redirect. expect full drop.
1853         #
1854         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
1855                                    self.pg1.sw_if_index,
1856                                    nh_addr,
1857                                    is_add=0,
1858                                    is_ip6=1)
1859         self.send_and_assert_no_replies(self.pg0, pkts,
1860                                         "IP no punt config")
1861
1862         #
1863         # Add a redirect that is not input port selective
1864         #
1865         self.vapi.ip_punt_redirect(0xffffffff,
1866                                    self.pg1.sw_if_index,
1867                                    nh_addr,
1868                                    is_ip6=1)
1869         self.send_and_expect(self.pg0, pkts, self.pg1)
1870
1871         self.vapi.ip_punt_redirect(0xffffffff,
1872                                    self.pg1.sw_if_index,
1873                                    nh_addr,
1874                                    is_add=0,
1875                                    is_ip6=1)
1876
1877
1878 class TestIP6Input(VppTestCase):
1879     """ IPv6 Input Exceptions """
1880
1881     def setUp(self):
1882         super(TestIP6Input, self).setUp()
1883
1884         self.create_pg_interfaces(range(2))
1885
1886         for i in self.pg_interfaces:
1887             i.admin_up()
1888             i.config_ip6()
1889             i.resolve_ndp()
1890
1891     def tearDown(self):
1892         super(TestIP6Input, self).tearDown()
1893         for i in self.pg_interfaces:
1894             i.unconfig_ip6()
1895             i.admin_down()
1896
1897     def test_ip_input(self):
1898         """ IP6 Input Exceptions """
1899
1900         #
1901         # bad version - this is dropped
1902         #
1903         p_version = (Ether(src=self.pg0.remote_mac,
1904                            dst=self.pg0.local_mac) /
1905                      IPv6(src=self.pg0.remote_ip6,
1906                           dst=self.pg1.remote_ip6,
1907                           version=3) /
1908                      UDP(sport=1234, dport=1234) /
1909                      Raw('\xa5' * 100))
1910
1911         self.send_and_assert_no_replies(self.pg0, p_version * 65,
1912                                         "funky version")
1913
1914         #
1915         # hop limit - IMCP replies
1916         #
1917         p_version = (Ether(src=self.pg0.remote_mac,
1918                            dst=self.pg0.local_mac) /
1919                      IPv6(src=self.pg0.remote_ip6,
1920                           dst=self.pg1.remote_ip6,
1921                           hlim=1) /
1922                      UDP(sport=1234, dport=1234) /
1923                      Raw('\xa5' * 100))
1924
1925         rx = self.send_and_expect(self.pg0, p_version * 65, self.pg0)
1926         rx = rx[0]
1927         icmp = rx[ICMPv6TimeExceeded]
1928         self.assertEqual(icmp.type, 3)
1929         # 0: "hop limit exceeded in transit",
1930         self.assertEqual(icmp.code, 0)
1931
1932
1933 if __name__ == '__main__':
1934     unittest.main(testRunner=VppTestRunner)