No overlapping sub-nets on any interface in the same table/VRF (VPP-943)
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5
6 from framework import VppTestCase, VppTestRunner
7 from util import ppp, ip6_normalize
8 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
9 from vpp_pg_interface import is_ipv6_misc
10 from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
11     VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
12     VppMplsRoute, DpoProto, VppMplsTable
13 from vpp_neighbor import find_nbr, VppNeighbor
14
15 from scapy.packet import Raw
16 from scapy.layers.l2 import Ether, Dot1Q
17 from scapy.layers.inet6 import IPv6, UDP, TCP, ICMPv6ND_NS, ICMPv6ND_RS, \
18     ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
19     ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
20     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
21     ICMPv6TimeExceeded
22 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
23     in6_mactoifaceid, in6_ismaddr
24 from scapy.utils import inet_pton, inet_ntop
25 from scapy.contrib.mpls import MPLS
26
27
28 AF_INET6 = socket.AF_INET6
29
30
31 def mk_ll_addr(mac):
32     euid = in6_mactoifaceid(mac)
33     addr = "fe80::" + euid
34     return addr
35
36
37 class TestIPv6ND(VppTestCase):
38     def validate_ra(self, intf, rx, dst_ip=None):
39         if not dst_ip:
40             dst_ip = intf.remote_ip6
41
42         # unicasted packets must come to the unicast mac
43         self.assertEqual(rx[Ether].dst, intf.remote_mac)
44
45         # and from the router's MAC
46         self.assertEqual(rx[Ether].src, intf.local_mac)
47
48         # the rx'd RA should be addressed to the sender's source
49         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
50         self.assertEqual(in6_ptop(rx[IPv6].dst),
51                          in6_ptop(dst_ip))
52
53         # and come from the router's link local
54         self.assertTrue(in6_islladdr(rx[IPv6].src))
55         self.assertEqual(in6_ptop(rx[IPv6].src),
56                          in6_ptop(mk_ll_addr(intf.local_mac)))
57
58     def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
59         if not dst_ip:
60             dst_ip = intf.remote_ip6
61         if not tgt_ip:
62             dst_ip = intf.local_ip6
63
64         # unicasted packets must come to the unicast mac
65         self.assertEqual(rx[Ether].dst, intf.remote_mac)
66
67         # and from the router's MAC
68         self.assertEqual(rx[Ether].src, intf.local_mac)
69
70         # the rx'd NA should be addressed to the sender's source
71         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
72         self.assertEqual(in6_ptop(rx[IPv6].dst),
73                          in6_ptop(dst_ip))
74
75         # and come from the target address
76         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
77
78         # Dest link-layer options should have the router's MAC
79         dll = rx[ICMPv6NDOptDstLLAddr]
80         self.assertEqual(dll.lladdr, intf.local_mac)
81
82     def validate_ns(self, intf, rx, tgt_ip):
83         nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
84         dst_ip = inet_ntop(AF_INET6, nsma)
85
86         # NS is broadcast
87         self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
88
89         # and from the router's MAC
90         self.assertEqual(rx[Ether].src, intf.local_mac)
91
92         # the rx'd NS should be addressed to an mcast address
93         # derived from the target address
94         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
95
96         # expect the tgt IP in the NS header
97         ns = rx[ICMPv6ND_NS]
98         self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
99
100         # packet is from the router's local address
101         self.assertEqual(in6_ptop(rx[IPv6].src), intf.local_ip6)
102
103         # Src link-layer options should have the router's MAC
104         sll = rx[ICMPv6NDOptSrcLLAddr]
105         self.assertEqual(sll.lladdr, intf.local_mac)
106
107     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
108                            filter_out_fn=is_ipv6_misc):
109         intf.add_stream(pkts)
110         self.pg_enable_capture(self.pg_interfaces)
111         self.pg_start()
112         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
113
114         self.assertEqual(len(rx), 1)
115         rx = rx[0]
116         self.validate_ra(intf, rx, dst_ip)
117
118     def send_and_expect_na(self, intf, pkts, remark, dst_ip=None,
119                            tgt_ip=None,
120                            filter_out_fn=is_ipv6_misc):
121         intf.add_stream(pkts)
122         self.pg_enable_capture(self.pg_interfaces)
123         self.pg_start()
124         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
125
126         self.assertEqual(len(rx), 1)
127         rx = rx[0]
128         self.validate_na(intf, rx, dst_ip, tgt_ip)
129
130     def send_and_expect_ns(self, tx_intf, rx_intf, pkts, tgt_ip,
131                            filter_out_fn=is_ipv6_misc):
132         tx_intf.add_stream(pkts)
133         self.pg_enable_capture(self.pg_interfaces)
134         self.pg_start()
135         rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
136
137         self.assertEqual(len(rx), 1)
138         rx = rx[0]
139         self.validate_ns(rx_intf, rx, tgt_ip)
140
141     def verify_ip(self, rx, smac, dmac, sip, dip):
142         ether = rx[Ether]
143         self.assertEqual(ether.dst, dmac)
144         self.assertEqual(ether.src, smac)
145
146         ip = rx[IPv6]
147         self.assertEqual(ip.src, sip)
148         self.assertEqual(ip.dst, dip)
149
150
151 class TestIPv6(TestIPv6ND):
152     """ IPv6 Test Case """
153
154     @classmethod
155     def setUpClass(cls):
156         super(TestIPv6, cls).setUpClass()
157
158     def setUp(self):
159         """
160         Perform test setup before test case.
161
162         **Config:**
163             - create 3 pg interfaces
164                 - untagged pg0 interface
165                 - Dot1Q subinterface on pg1
166                 - Dot1AD subinterface on pg2
167             - setup interfaces:
168                 - put it into UP state
169                 - set IPv6 addresses
170                 - resolve neighbor address using NDP
171             - configure 200 fib entries
172
173         :ivar list interfaces: pg interfaces and subinterfaces.
174         :ivar dict flows: IPv4 packet flows in test.
175         :ivar list pg_if_packet_sizes: packet sizes in test.
176
177         *TODO:* Create AD sub interface
178         """
179         super(TestIPv6, self).setUp()
180
181         # create 3 pg interfaces
182         self.create_pg_interfaces(range(3))
183
184         # create 2 subinterfaces for p1 and pg2
185         self.sub_interfaces = [
186             VppDot1QSubint(self, self.pg1, 100),
187             VppDot1QSubint(self, self.pg2, 200)
188             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
189         ]
190
191         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
192         self.flows = dict()
193         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
194         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
195         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
196
197         # packet sizes
198         self.pg_if_packet_sizes = [64, 512, 1518, 9018]
199         self.sub_if_packet_sizes = [64, 512, 1518 + 4, 9018 + 4]
200
201         self.interfaces = list(self.pg_interfaces)
202         self.interfaces.extend(self.sub_interfaces)
203
204         # setup all interfaces
205         for i in self.interfaces:
206             i.admin_up()
207             i.config_ip6()
208             i.resolve_ndp()
209
210         # config 2M FIB entries
211         self.config_fib_entries(200)
212
213     def tearDown(self):
214         """Run standard test teardown and log ``show ip6 neighbors``."""
215         for i in self.interfaces:
216             i.unconfig_ip6()
217             i.ip6_disable()
218             i.admin_down()
219         for i in self.sub_interfaces:
220             i.remove_vpp_config()
221
222         super(TestIPv6, self).tearDown()
223         if not self.vpp_dead:
224             self.logger.info(self.vapi.cli("show ip6 neighbors"))
225             # info(self.vapi.cli("show ip6 fib"))  # many entries
226
227     def config_fib_entries(self, count):
228         """For each interface add to the FIB table *count* routes to
229         "fd02::1/128" destination with interface's local address as next-hop
230         address.
231
232         :param int count: Number of FIB entries.
233
234         - *TODO:* check if the next-hop address shouldn't be remote address
235           instead of local address.
236         """
237         n_int = len(self.interfaces)
238         percent = 0
239         counter = 0.0
240         dest_addr = inet_pton(AF_INET6, "fd02::1")
241         dest_addr_len = 128
242         for i in self.interfaces:
243             next_hop_address = i.local_ip6n
244             for j in range(count / n_int):
245                 self.vapi.ip_add_del_route(
246                     dest_addr, dest_addr_len, next_hop_address, is_ipv6=1)
247                 counter += 1
248                 if counter / count * 100 > percent:
249                     self.logger.info("Configure %d FIB entries .. %d%% done" %
250                                      (count, percent))
251                     percent += 1
252
253     def create_stream(self, src_if, packet_sizes):
254         """Create input packet stream for defined interface.
255
256         :param VppInterface src_if: Interface to create packet stream for.
257         :param list packet_sizes: Required packet sizes.
258         """
259         pkts = []
260         for i in range(0, 257):
261             dst_if = self.flows[src_if][i % 2]
262             info = self.create_packet_info(src_if, dst_if)
263             payload = self.info_to_payload(info)
264             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
265                  IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
266                  UDP(sport=1234, dport=1234) /
267                  Raw(payload))
268             info.data = p.copy()
269             if isinstance(src_if, VppSubInterface):
270                 p = src_if.add_dot1_layer(p)
271             size = packet_sizes[(i // 2) % len(packet_sizes)]
272             self.extend_packet(p, size)
273             pkts.append(p)
274         return pkts
275
276     def verify_capture(self, dst_if, capture):
277         """Verify captured input packet stream for defined interface.
278
279         :param VppInterface dst_if: Interface to verify captured packet stream
280                                     for.
281         :param list capture: Captured packet stream.
282         """
283         self.logger.info("Verifying capture on interface %s" % dst_if.name)
284         last_info = dict()
285         for i in self.interfaces:
286             last_info[i.sw_if_index] = None
287         is_sub_if = False
288         dst_sw_if_index = dst_if.sw_if_index
289         if hasattr(dst_if, 'parent'):
290             is_sub_if = True
291         for packet in capture:
292             if is_sub_if:
293                 # Check VLAN tags and Ethernet header
294                 packet = dst_if.remove_dot1_layer(packet)
295             self.assertTrue(Dot1Q not in packet)
296             try:
297                 ip = packet[IPv6]
298                 udp = packet[UDP]
299                 payload_info = self.payload_to_info(str(packet[Raw]))
300                 packet_index = payload_info.index
301                 self.assertEqual(payload_info.dst, dst_sw_if_index)
302                 self.logger.debug(
303                     "Got packet on port %s: src=%u (id=%u)" %
304                     (dst_if.name, payload_info.src, packet_index))
305                 next_info = self.get_next_packet_info_for_interface2(
306                     payload_info.src, dst_sw_if_index,
307                     last_info[payload_info.src])
308                 last_info[payload_info.src] = next_info
309                 self.assertTrue(next_info is not None)
310                 self.assertEqual(packet_index, next_info.index)
311                 saved_packet = next_info.data
312                 # Check standard fields
313                 self.assertEqual(ip.src, saved_packet[IPv6].src)
314                 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
315                 self.assertEqual(udp.sport, saved_packet[UDP].sport)
316                 self.assertEqual(udp.dport, saved_packet[UDP].dport)
317             except:
318                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
319                 raise
320         for i in self.interfaces:
321             remaining_packet = self.get_next_packet_info_for_interface2(
322                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
323             self.assertTrue(remaining_packet is None,
324                             "Interface %s: Packet expected from interface %s "
325                             "didn't arrive" % (dst_if.name, i.name))
326
327     def test_fib(self):
328         """ IPv6 FIB test
329
330         Test scenario:
331             - Create IPv6 stream for pg0 interface
332             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
333             - Send and verify received packets on each interface.
334         """
335
336         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes)
337         self.pg0.add_stream(pkts)
338
339         for i in self.sub_interfaces:
340             pkts = self.create_stream(i, self.sub_if_packet_sizes)
341             i.parent.add_stream(pkts)
342
343         self.pg_enable_capture(self.pg_interfaces)
344         self.pg_start()
345
346         pkts = self.pg0.get_capture()
347         self.verify_capture(self.pg0, pkts)
348
349         for i in self.sub_interfaces:
350             pkts = i.parent.get_capture()
351             self.verify_capture(i, pkts)
352
353     def test_ns(self):
354         """ IPv6 Neighbour Solicitation Exceptions
355
356         Test scenario:
357            - Send an NS Sourced from an address not covered by the link sub-net
358            - Send an NS to an mcast address the router has not joined
359            - Send NS for a target address the router does not onn.
360         """
361
362         #
363         # An NS from a non link source address
364         #
365         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
366         d = inet_ntop(AF_INET6, nsma)
367
368         p = (Ether(dst=in6_getnsmac(nsma)) /
369              IPv6(dst=d, src="2002::2") /
370              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
371              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
372         pkts = [p]
373
374         self.send_and_assert_no_replies(
375             self.pg0, pkts,
376             "No response to NS source by address not on sub-net")
377
378         #
379         # An NS for sent to a solicited mcast group the router is
380         # not a member of FAILS
381         #
382         if 0:
383             nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
384             d = inet_ntop(AF_INET6, nsma)
385
386             p = (Ether(dst=in6_getnsmac(nsma)) /
387                  IPv6(dst=d, src=self.pg0.remote_ip6) /
388                  ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
389                  ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
390             pkts = [p]
391
392             self.send_and_assert_no_replies(
393                 self.pg0, pkts,
394                 "No response to NS sent to unjoined mcast address")
395
396         #
397         # An NS whose target address is one the router does not own
398         #
399         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
400         d = inet_ntop(AF_INET6, nsma)
401
402         p = (Ether(dst=in6_getnsmac(nsma)) /
403              IPv6(dst=d, src=self.pg0.remote_ip6) /
404              ICMPv6ND_NS(tgt="fd::ffff") /
405              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
406         pkts = [p]
407
408         self.send_and_assert_no_replies(self.pg0, pkts,
409                                         "No response to NS for unknown target")
410
411         #
412         # A neighbor entry that has no associated FIB-entry
413         #
414         self.pg0.generate_remote_hosts(4)
415         nd_entry = VppNeighbor(self,
416                                self.pg0.sw_if_index,
417                                self.pg0.remote_hosts[2].mac,
418                                self.pg0.remote_hosts[2].ip6,
419                                af=AF_INET6,
420                                is_no_fib_entry=1)
421         nd_entry.add_vpp_config()
422
423         #
424         # check we have the neighbor, but no route
425         #
426         self.assertTrue(find_nbr(self,
427                                  self.pg0.sw_if_index,
428                                  self.pg0._remote_hosts[2].ip6,
429                                  inet=AF_INET6))
430         self.assertFalse(find_route(self,
431                                     self.pg0._remote_hosts[2].ip6,
432                                     128,
433                                     inet=AF_INET6))
434
435         #
436         # send an NS from a link local address to the interface's global
437         # address
438         #
439         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
440              IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6_ll) /
441              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
442              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
443
444         self.send_and_expect_na(self.pg0, p,
445                                 "NS from link-local",
446                                 dst_ip=self.pg0._remote_hosts[2].ip6_ll,
447                                 tgt_ip=self.pg0.local_ip6)
448
449         #
450         # we should have learned an ND entry for the peer's link-local
451         # but not inserted a route to it in the FIB
452         #
453         self.assertTrue(find_nbr(self,
454                                  self.pg0.sw_if_index,
455                                  self.pg0._remote_hosts[2].ip6_ll,
456                                  inet=AF_INET6))
457         self.assertFalse(find_route(self,
458                                     self.pg0._remote_hosts[2].ip6_ll,
459                                     128,
460                                     inet=AF_INET6))
461
462         #
463         # An NS to the router's own Link-local
464         #
465         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
466              IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll) /
467              ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll) /
468              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
469
470         self.send_and_expect_na(self.pg0, p,
471                                 "NS to/from link-local",
472                                 dst_ip=self.pg0._remote_hosts[3].ip6_ll,
473                                 tgt_ip=self.pg0.local_ip6_ll)
474
475         #
476         # we should have learned an ND entry for the peer's link-local
477         # but not inserted a route to it in the FIB
478         #
479         self.assertTrue(find_nbr(self,
480                                  self.pg0.sw_if_index,
481                                  self.pg0._remote_hosts[3].ip6_ll,
482                                  inet=AF_INET6))
483         self.assertFalse(find_route(self,
484                                     self.pg0._remote_hosts[3].ip6_ll,
485                                     128,
486                                     inet=AF_INET6))
487
488     def test_ns_duplicates(self):
489         """ ND Duplicates"""
490
491         #
492         # Generate some hosts on the LAN
493         #
494         self.pg1.generate_remote_hosts(3)
495
496         #
497         # Add host 1 on pg1 and pg2
498         #
499         ns_pg1 = VppNeighbor(self,
500                              self.pg1.sw_if_index,
501                              self.pg1.remote_hosts[1].mac,
502                              self.pg1.remote_hosts[1].ip6,
503                              af=AF_INET6)
504         ns_pg1.add_vpp_config()
505         ns_pg2 = VppNeighbor(self,
506                              self.pg2.sw_if_index,
507                              self.pg2.remote_mac,
508                              self.pg1.remote_hosts[1].ip6,
509                              af=AF_INET6)
510         ns_pg2.add_vpp_config()
511
512         #
513         # IP packet destined for pg1 remote host arrives on pg1 again.
514         #
515         p = (Ether(dst=self.pg0.local_mac,
516                    src=self.pg0.remote_mac) /
517              IPv6(src=self.pg0.remote_ip6,
518                   dst=self.pg1.remote_hosts[1].ip6) /
519              UDP(sport=1234, dport=1234) /
520              Raw())
521
522         self.pg0.add_stream(p)
523         self.pg_enable_capture(self.pg_interfaces)
524         self.pg_start()
525
526         rx1 = self.pg1.get_capture(1)
527
528         self.verify_ip(rx1[0],
529                        self.pg1.local_mac,
530                        self.pg1.remote_hosts[1].mac,
531                        self.pg0.remote_ip6,
532                        self.pg1.remote_hosts[1].ip6)
533
534         #
535         # remove the duplicate on pg1
536         # packet stream shoud generate NSs out of pg1
537         #
538         ns_pg1.remove_vpp_config()
539
540         self.send_and_expect_ns(self.pg0, self.pg1,
541                                 p, self.pg1.remote_hosts[1].ip6)
542
543         #
544         # Add it back
545         #
546         ns_pg1.add_vpp_config()
547
548         self.pg0.add_stream(p)
549         self.pg_enable_capture(self.pg_interfaces)
550         self.pg_start()
551
552         rx1 = self.pg1.get_capture(1)
553
554         self.verify_ip(rx1[0],
555                        self.pg1.local_mac,
556                        self.pg1.remote_hosts[1].mac,
557                        self.pg0.remote_ip6,
558                        self.pg1.remote_hosts[1].ip6)
559
560     def validate_ra(self, intf, rx, dst_ip=None, mtu=9000, pi_opt=None):
561         if not dst_ip:
562             dst_ip = intf.remote_ip6
563
564         # unicasted packets must come to the unicast mac
565         self.assertEqual(rx[Ether].dst, intf.remote_mac)
566
567         # and from the router's MAC
568         self.assertEqual(rx[Ether].src, intf.local_mac)
569
570         # the rx'd RA should be addressed to the sender's source
571         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
572         self.assertEqual(in6_ptop(rx[IPv6].dst),
573                          in6_ptop(dst_ip))
574
575         # and come from the router's link local
576         self.assertTrue(in6_islladdr(rx[IPv6].src))
577         self.assertEqual(in6_ptop(rx[IPv6].src),
578                          in6_ptop(mk_ll_addr(intf.local_mac)))
579
580         # it should contain the links MTU
581         ra = rx[ICMPv6ND_RA]
582         self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
583
584         # it should contain the source's link layer address option
585         sll = ra[ICMPv6NDOptSrcLLAddr]
586         self.assertEqual(sll.lladdr, intf.local_mac)
587
588         if not pi_opt:
589             # the RA should not contain prefix information
590             self.assertFalse(ra.haslayer(ICMPv6NDOptPrefixInfo))
591         else:
592             raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
593
594             # the options are nested in the scapy packet in way that i cannot
595             # decipher how to decode. this 1st layer of option always returns
596             # nested classes, so a direct obj1=obj2 comparison always fails.
597             # however, the getlayer(.., 2) does give one instnace.
598             # so we cheat here and construct a new opt instnace for comparison
599             rd = ICMPv6NDOptPrefixInfo(prefixlen=raos.prefixlen,
600                                        prefix=raos.prefix,
601                                        L=raos.L,
602                                        A=raos.A)
603             if type(pi_opt) is list:
604                 for ii in range(len(pi_opt)):
605                     self.assertEqual(pi_opt[ii], rd)
606                     rd = rx.getlayer(ICMPv6NDOptPrefixInfo, ii+2)
607             else:
608                 self.assertEqual(pi_opt, raos)
609
610     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
611                            filter_out_fn=is_ipv6_misc,
612                            opt=None):
613         intf.add_stream(pkts)
614         self.pg_enable_capture(self.pg_interfaces)
615         self.pg_start()
616         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
617
618         self.assertEqual(len(rx), 1)
619         rx = rx[0]
620         self.validate_ra(intf, rx, dst_ip, pi_opt=opt)
621
622     def test_rs(self):
623         """ IPv6 Router Solicitation Exceptions
624
625         Test scenario:
626         """
627
628         #
629         # Before we begin change the IPv6 RA responses to use the unicast
630         # address - that way we will not confuse them with the periodic
631         # RAs which go to the mcast address
632         # Sit and wait for the first periodic RA.
633         #
634         # TODO
635         #
636         self.pg0.ip6_ra_config(send_unicast=1)
637
638         #
639         # An RS from a link source address
640         #  - expect an RA in return
641         #
642         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
643              IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
644              ICMPv6ND_RS())
645         pkts = [p]
646         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
647
648         #
649         # For the next RS sent the RA should be rate limited
650         #
651         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
652
653         #
654         # When we reconfiure the IPv6 RA config, we reset the RA rate limiting,
655         # so we need to do this before each test below so as not to drop
656         # packets for rate limiting reasons. Test this works here.
657         #
658         self.pg0.ip6_ra_config(send_unicast=1)
659         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
660
661         #
662         # An RS sent from a non-link local source
663         #
664         self.pg0.ip6_ra_config(send_unicast=1)
665         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
666              IPv6(dst=self.pg0.local_ip6, src="2002::ffff") /
667              ICMPv6ND_RS())
668         pkts = [p]
669         self.send_and_assert_no_replies(self.pg0, pkts,
670                                         "RS from non-link source")
671
672         #
673         # Source an RS from a link local address
674         #
675         self.pg0.ip6_ra_config(send_unicast=1)
676         ll = mk_ll_addr(self.pg0.remote_mac)
677         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
678              IPv6(dst=self.pg0.local_ip6, src=ll) /
679              ICMPv6ND_RS())
680         pkts = [p]
681         self.send_and_expect_ra(self.pg0, pkts,
682                                 "RS sourced from link-local",
683                                 dst_ip=ll)
684
685         #
686         # Send the RS multicast
687         #
688         self.pg0.ip6_ra_config(send_unicast=1)
689         dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
690         ll = mk_ll_addr(self.pg0.remote_mac)
691         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
692              IPv6(dst="ff02::2", src=ll) /
693              ICMPv6ND_RS())
694         pkts = [p]
695         self.send_and_expect_ra(self.pg0, pkts,
696                                 "RS sourced from link-local",
697                                 dst_ip=ll)
698
699         #
700         # Source from the unspecified address ::. This happens when the RS
701         # is sent before the host has a configured address/sub-net,
702         # i.e. auto-config. Since the sender has no IP address, the reply
703         # comes back mcast - so the capture needs to not filter this.
704         # If we happen to pick up the periodic RA at this point then so be it,
705         # it's not an error.
706         #
707         self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
708         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
709              IPv6(dst="ff02::2", src="::") /
710              ICMPv6ND_RS())
711         pkts = [p]
712         self.send_and_expect_ra(self.pg0, pkts,
713                                 "RS sourced from unspecified",
714                                 dst_ip="ff02::1",
715                                 filter_out_fn=None)
716
717         #
718         # Configure The RA to announce the links prefix
719         #
720         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
721                                self.pg0.local_ip6_prefix_len)
722
723         #
724         # RAs should now contain the prefix information option
725         #
726         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
727                                     prefix=self.pg0.local_ip6,
728                                     L=1,
729                                     A=1)
730
731         self.pg0.ip6_ra_config(send_unicast=1)
732         ll = mk_ll_addr(self.pg0.remote_mac)
733         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
734              IPv6(dst=self.pg0.local_ip6, src=ll) /
735              ICMPv6ND_RS())
736         self.send_and_expect_ra(self.pg0, p,
737                                 "RA with prefix-info",
738                                 dst_ip=ll,
739                                 opt=opt)
740
741         #
742         # Change the prefix info to not off-link
743         #  L-flag is clear
744         #
745         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
746                                self.pg0.local_ip6_prefix_len,
747                                off_link=1)
748
749         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
750                                     prefix=self.pg0.local_ip6,
751                                     L=0,
752                                     A=1)
753
754         self.pg0.ip6_ra_config(send_unicast=1)
755         self.send_and_expect_ra(self.pg0, p,
756                                 "RA with Prefix info with L-flag=0",
757                                 dst_ip=ll,
758                                 opt=opt)
759
760         #
761         # Change the prefix info to not off-link, no-autoconfig
762         #  L and A flag are clear in the advert
763         #
764         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
765                                self.pg0.local_ip6_prefix_len,
766                                off_link=1,
767                                no_autoconfig=1)
768
769         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
770                                     prefix=self.pg0.local_ip6,
771                                     L=0,
772                                     A=0)
773
774         self.pg0.ip6_ra_config(send_unicast=1)
775         self.send_and_expect_ra(self.pg0, p,
776                                 "RA with Prefix info with A & L-flag=0",
777                                 dst_ip=ll,
778                                 opt=opt)
779
780         #
781         # Change the flag settings back to the defaults
782         #  L and A flag are set in the advert
783         #
784         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
785                                self.pg0.local_ip6_prefix_len)
786
787         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
788                                     prefix=self.pg0.local_ip6,
789                                     L=1,
790                                     A=1)
791
792         self.pg0.ip6_ra_config(send_unicast=1)
793         self.send_and_expect_ra(self.pg0, p,
794                                 "RA with Prefix info",
795                                 dst_ip=ll,
796                                 opt=opt)
797
798         #
799         # Change the prefix info to not off-link, no-autoconfig
800         #  L and A flag are clear in the advert
801         #
802         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
803                                self.pg0.local_ip6_prefix_len,
804                                off_link=1,
805                                no_autoconfig=1)
806
807         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
808                                     prefix=self.pg0.local_ip6,
809                                     L=0,
810                                     A=0)
811
812         self.pg0.ip6_ra_config(send_unicast=1)
813         self.send_and_expect_ra(self.pg0, p,
814                                 "RA with Prefix info with A & L-flag=0",
815                                 dst_ip=ll,
816                                 opt=opt)
817
818         #
819         # Use the reset to defults option to revert to defaults
820         #  L and A flag are clear in the advert
821         #
822         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
823                                self.pg0.local_ip6_prefix_len,
824                                use_default=1)
825
826         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
827                                     prefix=self.pg0.local_ip6,
828                                     L=1,
829                                     A=1)
830
831         self.pg0.ip6_ra_config(send_unicast=1)
832         self.send_and_expect_ra(self.pg0, p,
833                                 "RA with Prefix reverted to defaults",
834                                 dst_ip=ll,
835                                 opt=opt)
836
837         #
838         # Advertise Another prefix. With no L-flag/A-flag
839         #
840         self.pg0.ip6_ra_prefix(self.pg1.local_ip6n,
841                                self.pg1.local_ip6_prefix_len,
842                                off_link=1,
843                                no_autoconfig=1)
844
845         opt = [ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
846                                      prefix=self.pg0.local_ip6,
847                                      L=1,
848                                      A=1),
849                ICMPv6NDOptPrefixInfo(prefixlen=self.pg1.local_ip6_prefix_len,
850                                      prefix=self.pg1.local_ip6,
851                                      L=0,
852                                      A=0)]
853
854         self.pg0.ip6_ra_config(send_unicast=1)
855         ll = mk_ll_addr(self.pg0.remote_mac)
856         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
857              IPv6(dst=self.pg0.local_ip6, src=ll) /
858              ICMPv6ND_RS())
859         self.send_and_expect_ra(self.pg0, p,
860                                 "RA with multiple Prefix infos",
861                                 dst_ip=ll,
862                                 opt=opt)
863
864         #
865         # Remove the first refix-info - expect the second is still in the
866         # advert
867         #
868         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
869                                self.pg0.local_ip6_prefix_len,
870                                is_no=1)
871
872         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg1.local_ip6_prefix_len,
873                                     prefix=self.pg1.local_ip6,
874                                     L=0,
875                                     A=0)
876
877         self.pg0.ip6_ra_config(send_unicast=1)
878         self.send_and_expect_ra(self.pg0, p,
879                                 "RA with Prefix reverted to defaults",
880                                 dst_ip=ll,
881                                 opt=opt)
882
883         #
884         # Remove the second prefix-info - expect no prefix-info i nthe adverts
885         #
886         self.pg0.ip6_ra_prefix(self.pg1.local_ip6n,
887                                self.pg1.local_ip6_prefix_len,
888                                is_no=1)
889
890         self.pg0.ip6_ra_config(send_unicast=1)
891         self.send_and_expect_ra(self.pg0, p,
892                                 "RA with Prefix reverted to defaults",
893                                 dst_ip=ll)
894
895         #
896         # Reset the periodic advertisements back to default values
897         #
898         self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
899
900
901 class TestIPv6RD(TestIPv6ND):
902     """ IPv6 Router Discovery Test Case """
903
904     @classmethod
905     def setUpClass(cls):
906         super(TestIPv6RD, cls).setUpClass()
907
908     def setUp(self):
909         super(TestIPv6RD, self).setUp()
910
911         # create 2 pg interfaces
912         self.create_pg_interfaces(range(2))
913
914         self.interfaces = list(self.pg_interfaces)
915
916         # setup all interfaces
917         for i in self.interfaces:
918             i.admin_up()
919             i.config_ip6()
920
921     def tearDown(self):
922         for i in self.interfaces:
923             i.unconfig_ip6()
924             i.admin_down()
925         super(TestIPv6RD, self).tearDown()
926
927     def test_rd_send_router_solicitation(self):
928         """ Verify router solicitation packets """
929
930         count = 2
931         self.pg_enable_capture(self.pg_interfaces)
932         self.pg_start()
933         self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index,
934                                                  mrc=count)
935         rx_list = self.pg1.get_capture(count, timeout=3)
936         self.assertEqual(len(rx_list), count)
937         for packet in rx_list:
938             self.assertTrue(packet.haslayer(IPv6))
939             self.assertTrue(packet[IPv6].haslayer(ICMPv6ND_RS))
940             dst = ip6_normalize(packet[IPv6].dst)
941             dst2 = ip6_normalize("ff02::2")
942             self.assert_equal(dst, dst2)
943             src = ip6_normalize(packet[IPv6].src)
944             src2 = ip6_normalize(self.pg1.local_ip6_ll)
945             self.assert_equal(src, src2)
946             self.assertTrue(packet[ICMPv6ND_RS].haslayer(ICMPv6NDOptSrcLLAddr))
947             self.assert_equal(packet[ICMPv6NDOptSrcLLAddr].lladdr,
948                               self.pg1.local_mac)
949
950     def verify_prefix_info(self, reported_prefix, prefix_option):
951         prefix = socket.inet_pton(socket.AF_INET6,
952                                   prefix_option.getfieldval("prefix"))
953         self.assert_equal(reported_prefix.dst_address, prefix)
954         self.assert_equal(reported_prefix.dst_address_length,
955                           prefix_option.getfieldval("prefixlen"))
956         L = prefix_option.getfieldval("L")
957         A = prefix_option.getfieldval("A")
958         option_flags = (L << 7) | (A << 6)
959         self.assert_equal(reported_prefix.flags, option_flags)
960         self.assert_equal(reported_prefix.valid_time,
961                           prefix_option.getfieldval("validlifetime"))
962         self.assert_equal(reported_prefix.preferred_time,
963                           prefix_option.getfieldval("preferredlifetime"))
964
965     def test_rd_receive_router_advertisement(self):
966         """ Verify events triggered by received RA packets """
967
968         self.vapi.want_ip6_ra_events()
969
970         prefix_info_1 = ICMPv6NDOptPrefixInfo(
971             prefix="1::2",
972             prefixlen=50,
973             validlifetime=200,
974             preferredlifetime=500,
975             L=1,
976             A=1,
977         )
978
979         prefix_info_2 = ICMPv6NDOptPrefixInfo(
980             prefix="7::4",
981             prefixlen=20,
982             validlifetime=70,
983             preferredlifetime=1000,
984             L=1,
985             A=0,
986         )
987
988         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
989              IPv6(dst=self.pg1.local_ip6_ll,
990                   src=mk_ll_addr(self.pg1.remote_mac)) /
991              ICMPv6ND_RA() /
992              prefix_info_1 /
993              prefix_info_2)
994         self.pg1.add_stream([p])
995         self.pg_start()
996
997         ev = self.vapi.wait_for_event(10, "ip6_ra_event")
998
999         self.assert_equal(ev.current_hop_limit, 0)
1000         self.assert_equal(ev.flags, 8)
1001         self.assert_equal(ev.router_lifetime_in_sec, 1800)
1002         self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1003         self.assert_equal(
1004             ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0)
1005
1006         self.assert_equal(ev.n_prefixes, 2)
1007
1008         self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1009         self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1010
1011
1012 class TestIPv6RDControlPlane(TestIPv6ND):
1013     """ IPv6 Router Discovery Control Plane Test Case """
1014
1015     @classmethod
1016     def setUpClass(cls):
1017         super(TestIPv6RDControlPlane, cls).setUpClass()
1018
1019     def setUp(self):
1020         super(TestIPv6RDControlPlane, self).setUp()
1021
1022         # create 1 pg interface
1023         self.create_pg_interfaces(range(1))
1024
1025         self.interfaces = list(self.pg_interfaces)
1026
1027         # setup all interfaces
1028         for i in self.interfaces:
1029             i.admin_up()
1030             i.config_ip6()
1031
1032     def tearDown(self):
1033         super(TestIPv6RDControlPlane, self).tearDown()
1034
1035     @staticmethod
1036     def create_ra_packet(pg, routerlifetime=None):
1037         src_ip = pg.remote_ip6_ll
1038         dst_ip = pg.local_ip6
1039         if routerlifetime is not None:
1040             ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1041         else:
1042             ra = ICMPv6ND_RA()
1043         p = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
1044              IPv6(dst=dst_ip, src=src_ip) / ra)
1045         return p
1046
1047     @staticmethod
1048     def get_default_routes(fib):
1049         list = []
1050         for entry in fib:
1051             if entry.address_length == 0:
1052                 for path in entry.path:
1053                     if path.sw_if_index != 0xFFFFFFFF:
1054                         defaut_route = {}
1055                         defaut_route['sw_if_index'] = path.sw_if_index
1056                         defaut_route['next_hop'] = path.next_hop
1057                         list.append(defaut_route)
1058         return list
1059
1060     @staticmethod
1061     def get_interface_addresses(fib, pg):
1062         list = []
1063         for entry in fib:
1064             if entry.address_length == 128:
1065                 path = entry.path[0]
1066                 if path.sw_if_index == pg.sw_if_index:
1067                     list.append(entry.address)
1068         return list
1069
1070     def test_all(self):
1071         """ Test handling of SLAAC addresses and default routes """
1072
1073         fib = self.vapi.ip6_fib_dump()
1074         default_routes = self.get_default_routes(fib)
1075         initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1076         self.assertEqual(default_routes, [])
1077         router_address = self.pg0.remote_ip6n_ll
1078
1079         self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1080
1081         self.sleep(0.1)
1082
1083         # send RA
1084         packet = (self.create_ra_packet(self.pg0) / ICMPv6NDOptPrefixInfo(
1085             prefix="1::",
1086             prefixlen=64,
1087             validlifetime=2,
1088             preferredlifetime=2,
1089             L=1,
1090             A=1,
1091         ) / ICMPv6NDOptPrefixInfo(
1092             prefix="7::",
1093             prefixlen=20,
1094             validlifetime=1500,
1095             preferredlifetime=1000,
1096             L=1,
1097             A=0,
1098         ))
1099         self.pg0.add_stream([packet])
1100         self.pg_start()
1101
1102         self.sleep(0.1)
1103
1104         fib = self.vapi.ip6_fib_dump()
1105
1106         # check FIB for new address
1107         addresses = set(self.get_interface_addresses(fib, self.pg0))
1108         new_addresses = addresses.difference(initial_addresses)
1109         self.assertEqual(len(new_addresses), 1)
1110         prefix = list(new_addresses)[0][:8] + '\0\0\0\0\0\0\0\0'
1111         self.assertEqual(inet_ntop(AF_INET6, prefix), '1::')
1112
1113         # check FIB for new default route
1114         default_routes = self.get_default_routes(fib)
1115         self.assertEqual(len(default_routes), 1)
1116         dr = default_routes[0]
1117         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1118         self.assertEqual(dr['next_hop'], router_address)
1119
1120         # send RA to delete default route
1121         packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1122         self.pg0.add_stream([packet])
1123         self.pg_start()
1124
1125         self.sleep(0.1)
1126
1127         # check that default route is deleted
1128         fib = self.vapi.ip6_fib_dump()
1129         default_routes = self.get_default_routes(fib)
1130         self.assertEqual(len(default_routes), 0)
1131
1132         self.sleep(0.1)
1133
1134         # send RA
1135         packet = self.create_ra_packet(self.pg0)
1136         self.pg0.add_stream([packet])
1137         self.pg_start()
1138
1139         self.sleep(0.1)
1140
1141         # check FIB for new default route
1142         fib = self.vapi.ip6_fib_dump()
1143         default_routes = self.get_default_routes(fib)
1144         self.assertEqual(len(default_routes), 1)
1145         dr = default_routes[0]
1146         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1147         self.assertEqual(dr['next_hop'], router_address)
1148
1149         # send RA, updating router lifetime to 1s
1150         packet = self.create_ra_packet(self.pg0, 1)
1151         self.pg0.add_stream([packet])
1152         self.pg_start()
1153
1154         self.sleep(0.1)
1155
1156         # check that default route still exists
1157         fib = self.vapi.ip6_fib_dump()
1158         default_routes = self.get_default_routes(fib)
1159         self.assertEqual(len(default_routes), 1)
1160         dr = default_routes[0]
1161         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1162         self.assertEqual(dr['next_hop'], router_address)
1163
1164         self.sleep(1)
1165
1166         # check that default route is deleted
1167         fib = self.vapi.ip6_fib_dump()
1168         default_routes = self.get_default_routes(fib)
1169         self.assertEqual(len(default_routes), 0)
1170
1171         # check FIB still contains the SLAAC address
1172         addresses = set(self.get_interface_addresses(fib, self.pg0))
1173         new_addresses = addresses.difference(initial_addresses)
1174         self.assertEqual(len(new_addresses), 1)
1175         prefix = list(new_addresses)[0][:8] + '\0\0\0\0\0\0\0\0'
1176         self.assertEqual(inet_ntop(AF_INET6, prefix), '1::')
1177
1178         self.sleep(1)
1179
1180         # check that SLAAC address is deleted
1181         fib = self.vapi.ip6_fib_dump()
1182         addresses = set(self.get_interface_addresses(fib, self.pg0))
1183         new_addresses = addresses.difference(initial_addresses)
1184         self.assertEqual(len(new_addresses), 0)
1185
1186
1187 class IPv6NDProxyTest(TestIPv6ND):
1188     """ IPv6 ND ProxyTest Case """
1189
1190     def setUp(self):
1191         super(IPv6NDProxyTest, self).setUp()
1192
1193         # create 3 pg interfaces
1194         self.create_pg_interfaces(range(3))
1195
1196         # pg0 is the master interface, with the configured subnet
1197         self.pg0.admin_up()
1198         self.pg0.config_ip6()
1199         self.pg0.resolve_ndp()
1200
1201         self.pg1.ip6_enable()
1202         self.pg2.ip6_enable()
1203
1204     def tearDown(self):
1205         super(IPv6NDProxyTest, self).tearDown()
1206
1207     def test_nd_proxy(self):
1208         """ IPv6 Proxy ND """
1209
1210         #
1211         # Generate some hosts in the subnet that we are proxying
1212         #
1213         self.pg0.generate_remote_hosts(8)
1214
1215         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1216         d = inet_ntop(AF_INET6, nsma)
1217
1218         #
1219         # Send an NS for one of those remote hosts on one of the proxy links
1220         # expect no response since it's from an address that is not
1221         # on the link that has the prefix configured
1222         #
1223         ns_pg1 = (Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac) /
1224                   IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6) /
1225                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1226                   ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac))
1227
1228         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1229
1230         #
1231         # Add proxy support for the host
1232         #
1233         self.vapi.ip6_nd_proxy(
1234             inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1235             self.pg1.sw_if_index)
1236
1237         #
1238         # try that NS again. this time we expect an NA back
1239         #
1240         self.send_and_expect_na(self.pg1, ns_pg1,
1241                                 "NS to proxy entry",
1242                                 dst_ip=self.pg0._remote_hosts[2].ip6,
1243                                 tgt_ip=self.pg0.local_ip6)
1244
1245         #
1246         # ... and that we have an entry in the ND cache
1247         #
1248         self.assertTrue(find_nbr(self,
1249                                  self.pg1.sw_if_index,
1250                                  self.pg0._remote_hosts[2].ip6,
1251                                  inet=AF_INET6))
1252
1253         #
1254         # ... and we can route traffic to it
1255         #
1256         t = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1257              IPv6(dst=self.pg0._remote_hosts[2].ip6,
1258                   src=self.pg0.remote_ip6) /
1259              UDP(sport=10000, dport=20000) /
1260              Raw('\xa5' * 100))
1261
1262         self.pg0.add_stream(t)
1263         self.pg_enable_capture(self.pg_interfaces)
1264         self.pg_start()
1265         rx = self.pg1.get_capture(1)
1266         rx = rx[0]
1267
1268         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1269         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1270
1271         self.assertEqual(rx[IPv6].src, t[IPv6].src)
1272         self.assertEqual(rx[IPv6].dst, t[IPv6].dst)
1273
1274         #
1275         # Test we proxy for the host on the main interface
1276         #
1277         ns_pg0 = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
1278                   IPv6(dst=d, src=self.pg0.remote_ip6) /
1279                   ICMPv6ND_NS(tgt=self.pg0._remote_hosts[2].ip6) /
1280                   ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
1281
1282         self.send_and_expect_na(self.pg0, ns_pg0,
1283                                 "NS to proxy entry on main",
1284                                 tgt_ip=self.pg0._remote_hosts[2].ip6,
1285                                 dst_ip=self.pg0.remote_ip6)
1286
1287         #
1288         # Setup and resolve proxy for another host on another interface
1289         #
1290         ns_pg2 = (Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac) /
1291                   IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6) /
1292                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1293                   ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac))
1294
1295         self.vapi.ip6_nd_proxy(
1296             inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1297             self.pg2.sw_if_index)
1298
1299         self.send_and_expect_na(self.pg2, ns_pg2,
1300                                 "NS to proxy entry other interface",
1301                                 dst_ip=self.pg0._remote_hosts[3].ip6,
1302                                 tgt_ip=self.pg0.local_ip6)
1303
1304         self.assertTrue(find_nbr(self,
1305                                  self.pg2.sw_if_index,
1306                                  self.pg0._remote_hosts[3].ip6,
1307                                  inet=AF_INET6))
1308
1309         #
1310         # hosts can communicate. pg2->pg1
1311         #
1312         t2 = (Ether(dst=self.pg2.local_mac,
1313                     src=self.pg0.remote_hosts[3].mac) /
1314               IPv6(dst=self.pg0._remote_hosts[2].ip6,
1315                    src=self.pg0._remote_hosts[3].ip6) /
1316               UDP(sport=10000, dport=20000) /
1317               Raw('\xa5' * 100))
1318
1319         self.pg2.add_stream(t2)
1320         self.pg_enable_capture(self.pg_interfaces)
1321         self.pg_start()
1322         rx = self.pg1.get_capture(1)
1323         rx = rx[0]
1324
1325         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1326         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1327
1328         self.assertEqual(rx[IPv6].src, t2[IPv6].src)
1329         self.assertEqual(rx[IPv6].dst, t2[IPv6].dst)
1330
1331         #
1332         # remove the proxy configs
1333         #
1334         self.vapi.ip6_nd_proxy(
1335             inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1336             self.pg1.sw_if_index,
1337             is_del=1)
1338         self.vapi.ip6_nd_proxy(
1339             inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1340             self.pg2.sw_if_index,
1341             is_del=1)
1342
1343         self.assertFalse(find_nbr(self,
1344                                   self.pg2.sw_if_index,
1345                                   self.pg0._remote_hosts[3].ip6,
1346                                   inet=AF_INET6))
1347         self.assertFalse(find_nbr(self,
1348                                   self.pg1.sw_if_index,
1349                                   self.pg0._remote_hosts[2].ip6,
1350                                   inet=AF_INET6))
1351
1352         #
1353         # no longer proxy-ing...
1354         #
1355         self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1356         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1357         self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1358
1359         #
1360         # no longer forwarding. traffic generates NS out of the glean/main
1361         # interface
1362         #
1363         self.pg2.add_stream(t2)
1364         self.pg_enable_capture(self.pg_interfaces)
1365         self.pg_start()
1366
1367         rx = self.pg0.get_capture(1)
1368
1369         self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1370
1371
1372 class TestIPNull(VppTestCase):
1373     """ IPv6 routes via NULL """
1374
1375     def setUp(self):
1376         super(TestIPNull, self).setUp()
1377
1378         # create 2 pg interfaces
1379         self.create_pg_interfaces(range(1))
1380
1381         for i in self.pg_interfaces:
1382             i.admin_up()
1383             i.config_ip6()
1384             i.resolve_ndp()
1385
1386     def tearDown(self):
1387         super(TestIPNull, self).tearDown()
1388         for i in self.pg_interfaces:
1389             i.unconfig_ip6()
1390             i.admin_down()
1391
1392     def test_ip_null(self):
1393         """ IP NULL route """
1394
1395         p = (Ether(src=self.pg0.remote_mac,
1396                    dst=self.pg0.local_mac) /
1397              IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
1398              UDP(sport=1234, dport=1234) /
1399              Raw('\xa5' * 100))
1400
1401         #
1402         # A route via IP NULL that will reply with ICMP unreachables
1403         #
1404         ip_unreach = VppIpRoute(self, "2001::", 64, [], is_unreach=1, is_ip6=1)
1405         ip_unreach.add_vpp_config()
1406
1407         self.pg0.add_stream(p)
1408         self.pg_enable_capture(self.pg_interfaces)
1409         self.pg_start()
1410
1411         rx = self.pg0.get_capture(1)
1412         rx = rx[0]
1413         icmp = rx[ICMPv6DestUnreach]
1414
1415         # 0 = "No route to destination"
1416         self.assertEqual(icmp.code, 0)
1417
1418         # ICMP is rate limited. pause a bit
1419         self.sleep(1)
1420
1421         #
1422         # A route via IP NULL that will reply with ICMP prohibited
1423         #
1424         ip_prohibit = VppIpRoute(self, "2001::1", 128, [],
1425                                  is_prohibit=1, is_ip6=1)
1426         ip_prohibit.add_vpp_config()
1427
1428         self.pg0.add_stream(p)
1429         self.pg_enable_capture(self.pg_interfaces)
1430         self.pg_start()
1431
1432         rx = self.pg0.get_capture(1)
1433         rx = rx[0]
1434         icmp = rx[ICMPv6DestUnreach]
1435
1436         # 1 = "Communication with destination administratively prohibited"
1437         self.assertEqual(icmp.code, 1)
1438
1439
1440 class TestIPDisabled(VppTestCase):
1441     """ IPv6 disabled """
1442
1443     def setUp(self):
1444         super(TestIPDisabled, self).setUp()
1445
1446         # create 2 pg interfaces
1447         self.create_pg_interfaces(range(2))
1448
1449         # PG0 is IP enalbed
1450         self.pg0.admin_up()
1451         self.pg0.config_ip6()
1452         self.pg0.resolve_ndp()
1453
1454         # PG 1 is not IP enabled
1455         self.pg1.admin_up()
1456
1457     def tearDown(self):
1458         super(TestIPDisabled, self).tearDown()
1459         for i in self.pg_interfaces:
1460             i.unconfig_ip4()
1461             i.admin_down()
1462
1463     def test_ip_disabled(self):
1464         """ IP Disabled """
1465
1466         #
1467         # An (S,G).
1468         # one accepting interface, pg0, 2 forwarding interfaces
1469         #
1470         route_ff_01 = VppIpMRoute(
1471             self,
1472             "::",
1473             "ffef::1", 128,
1474             MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
1475             [VppMRoutePath(self.pg1.sw_if_index,
1476                            MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT),
1477              VppMRoutePath(self.pg0.sw_if_index,
1478                            MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)],
1479             is_ip6=1)
1480         route_ff_01.add_vpp_config()
1481
1482         pu = (Ether(src=self.pg1.remote_mac,
1483                     dst=self.pg1.local_mac) /
1484               IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
1485               UDP(sport=1234, dport=1234) /
1486               Raw('\xa5' * 100))
1487         pm = (Ether(src=self.pg1.remote_mac,
1488                     dst=self.pg1.local_mac) /
1489               IPv6(src="2001::1", dst="ffef::1") /
1490               UDP(sport=1234, dport=1234) /
1491               Raw('\xa5' * 100))
1492
1493         #
1494         # PG1 does not forward IP traffic
1495         #
1496         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1497         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1498
1499         #
1500         # IP enable PG1
1501         #
1502         self.pg1.config_ip6()
1503
1504         #
1505         # Now we get packets through
1506         #
1507         self.pg1.add_stream(pu)
1508         self.pg_enable_capture(self.pg_interfaces)
1509         self.pg_start()
1510         rx = self.pg0.get_capture(1)
1511
1512         self.pg1.add_stream(pm)
1513         self.pg_enable_capture(self.pg_interfaces)
1514         self.pg_start()
1515         rx = self.pg0.get_capture(1)
1516
1517         #
1518         # Disable PG1
1519         #
1520         self.pg1.unconfig_ip6()
1521
1522         #
1523         # PG1 does not forward IP traffic
1524         #
1525         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1526         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1527
1528
1529 class TestIP6LoadBalance(VppTestCase):
1530     """ IPv6 Load-Balancing """
1531
1532     def setUp(self):
1533         super(TestIP6LoadBalance, self).setUp()
1534
1535         self.create_pg_interfaces(range(5))
1536
1537         mpls_tbl = VppMplsTable(self, 0)
1538         mpls_tbl.add_vpp_config()
1539
1540         for i in self.pg_interfaces:
1541             i.admin_up()
1542             i.config_ip6()
1543             i.resolve_ndp()
1544             i.enable_mpls()
1545
1546     def tearDown(self):
1547         for i in self.pg_interfaces:
1548             i.unconfig_ip6()
1549             i.admin_down()
1550             i.disable_mpls()
1551         super(TestIP6LoadBalance, self).tearDown()
1552
1553     def send_and_expect_load_balancing(self, input, pkts, outputs):
1554         self.vapi.cli("clear trace")
1555         input.add_stream(pkts)
1556         self.pg_enable_capture(self.pg_interfaces)
1557         self.pg_start()
1558         for oo in outputs:
1559             rx = oo._get_capture(1)
1560             self.assertNotEqual(0, len(rx))
1561
1562     def send_and_expect_one_itf(self, input, pkts, itf):
1563         self.vapi.cli("clear trace")
1564         input.add_stream(pkts)
1565         self.pg_enable_capture(self.pg_interfaces)
1566         self.pg_start()
1567         rx = itf.get_capture(len(pkts))
1568
1569     def test_ip6_load_balance(self):
1570         """ IPv6 Load-Balancing """
1571
1572         #
1573         # An array of packets that differ only in the destination port
1574         #  - IP only
1575         #  - MPLS EOS
1576         #  - MPLS non-EOS
1577         #  - MPLS non-EOS with an entropy label
1578         #
1579         port_ip_pkts = []
1580         port_mpls_pkts = []
1581         port_mpls_neos_pkts = []
1582         port_ent_pkts = []
1583
1584         #
1585         # An array of packets that differ only in the source address
1586         #
1587         src_ip_pkts = []
1588         src_mpls_pkts = []
1589
1590         for ii in range(65):
1591             port_ip_hdr = (IPv6(dst="3000::1", src="3000:1::1") /
1592                            UDP(sport=1234, dport=1234 + ii) /
1593                            Raw('\xa5' * 100))
1594             port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1595                                        dst=self.pg0.local_mac) /
1596                                  port_ip_hdr))
1597             port_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1598                                          dst=self.pg0.local_mac) /
1599                                    MPLS(label=66, ttl=2) /
1600                                    port_ip_hdr))
1601             port_mpls_neos_pkts.append((Ether(src=self.pg0.remote_mac,
1602                                               dst=self.pg0.local_mac) /
1603                                         MPLS(label=67, ttl=2) /
1604                                         MPLS(label=77, ttl=2) /
1605                                         port_ip_hdr))
1606             port_ent_pkts.append((Ether(src=self.pg0.remote_mac,
1607                                         dst=self.pg0.local_mac) /
1608                                   MPLS(label=67, ttl=2) /
1609                                   MPLS(label=14, ttl=2) /
1610                                   MPLS(label=999, ttl=2) /
1611                                   port_ip_hdr))
1612             src_ip_hdr = (IPv6(dst="3000::1", src="3000:1::%d" % ii) /
1613                           UDP(sport=1234, dport=1234) /
1614                           Raw('\xa5' * 100))
1615             src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1616                                       dst=self.pg0.local_mac) /
1617                                 src_ip_hdr))
1618             src_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1619                                         dst=self.pg0.local_mac) /
1620                                   MPLS(label=66, ttl=2) /
1621                                   src_ip_hdr))
1622
1623         #
1624         # A route for the IP pacekts
1625         #
1626         route_3000_1 = VppIpRoute(self, "3000::1", 128,
1627                                   [VppRoutePath(self.pg1.remote_ip6,
1628                                                 self.pg1.sw_if_index,
1629                                                 proto=DpoProto.DPO_PROTO_IP6),
1630                                    VppRoutePath(self.pg2.remote_ip6,
1631                                                 self.pg2.sw_if_index,
1632                                                 proto=DpoProto.DPO_PROTO_IP6)],
1633                                   is_ip6=1)
1634         route_3000_1.add_vpp_config()
1635
1636         #
1637         # a local-label for the EOS packets
1638         #
1639         binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
1640         binding.add_vpp_config()
1641
1642         #
1643         # An MPLS route for the non-EOS packets
1644         #
1645         route_67 = VppMplsRoute(self, 67, 0,
1646                                 [VppRoutePath(self.pg1.remote_ip6,
1647                                               self.pg1.sw_if_index,
1648                                               labels=[67],
1649                                               proto=DpoProto.DPO_PROTO_IP6),
1650                                  VppRoutePath(self.pg2.remote_ip6,
1651                                               self.pg2.sw_if_index,
1652                                               labels=[67],
1653                                               proto=DpoProto.DPO_PROTO_IP6)])
1654         route_67.add_vpp_config()
1655
1656         #
1657         # inject the packet on pg0 - expect load-balancing across the 2 paths
1658         #  - since the default hash config is to use IP src,dst and port
1659         #    src,dst
1660         # We are not going to ensure equal amounts of packets across each link,
1661         # since the hash algorithm is statistical and therefore this can never
1662         # be guaranteed. But wuth 64 different packets we do expect some
1663         # balancing. So instead just ensure there is traffic on each link.
1664         #
1665         self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
1666                                             [self.pg1, self.pg2])
1667         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1668                                             [self.pg1, self.pg2])
1669         self.send_and_expect_load_balancing(self.pg0, port_mpls_pkts,
1670                                             [self.pg1, self.pg2])
1671         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1672                                             [self.pg1, self.pg2])
1673         self.send_and_expect_load_balancing(self.pg0, port_mpls_neos_pkts,
1674                                             [self.pg1, self.pg2])
1675
1676         #
1677         # The packets with Entropy label in should not load-balance,
1678         # since the Entorpy value is fixed.
1679         #
1680         self.send_and_expect_one_itf(self.pg0, port_ent_pkts, self.pg1)
1681
1682         #
1683         # change the flow hash config so it's only IP src,dst
1684         #  - now only the stream with differing source address will
1685         #    load-balance
1686         #
1687         self.vapi.set_ip_flow_hash(0, is_ip6=1, src=1, dst=1, sport=0, dport=0)
1688
1689         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1690                                             [self.pg1, self.pg2])
1691         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1692                                             [self.pg1, self.pg2])
1693         self.send_and_expect_one_itf(self.pg0, port_ip_pkts, self.pg2)
1694
1695         #
1696         # change the flow hash config back to defaults
1697         #
1698         self.vapi.set_ip_flow_hash(0, is_ip6=1, src=1, dst=1, sport=1, dport=1)
1699
1700         #
1701         # Recursive prefixes
1702         #  - testing that 2 stages of load-balancing occurs and there is no
1703         #    polarisation (i.e. only 2 of 4 paths are used)
1704         #
1705         port_pkts = []
1706         src_pkts = []
1707
1708         for ii in range(257):
1709             port_pkts.append((Ether(src=self.pg0.remote_mac,
1710                                     dst=self.pg0.local_mac) /
1711                               IPv6(dst="4000::1", src="4000:1::1") /
1712                               UDP(sport=1234, dport=1234 + ii) /
1713                               Raw('\xa5' * 100)))
1714             src_pkts.append((Ether(src=self.pg0.remote_mac,
1715                                    dst=self.pg0.local_mac) /
1716                              IPv6(dst="4000::1", src="4000:1::%d" % ii) /
1717                              UDP(sport=1234, dport=1234) /
1718                              Raw('\xa5' * 100)))
1719
1720         route_3000_2 = VppIpRoute(self, "3000::2", 128,
1721                                   [VppRoutePath(self.pg3.remote_ip6,
1722                                                 self.pg3.sw_if_index,
1723                                                 proto=DpoProto.DPO_PROTO_IP6),
1724                                    VppRoutePath(self.pg4.remote_ip6,
1725                                                 self.pg4.sw_if_index,
1726                                                 proto=DpoProto.DPO_PROTO_IP6)],
1727                                   is_ip6=1)
1728         route_3000_2.add_vpp_config()
1729
1730         route_4000_1 = VppIpRoute(self, "4000::1", 128,
1731                                   [VppRoutePath("3000::1",
1732                                                 0xffffffff,
1733                                                 proto=DpoProto.DPO_PROTO_IP6),
1734                                    VppRoutePath("3000::2",
1735                                                 0xffffffff,
1736                                                 proto=DpoProto.DPO_PROTO_IP6)],
1737                                   is_ip6=1)
1738         route_4000_1.add_vpp_config()
1739
1740         #
1741         # inject the packet on pg0 - expect load-balancing across all 4 paths
1742         #
1743         self.vapi.cli("clear trace")
1744         self.send_and_expect_load_balancing(self.pg0, port_pkts,
1745                                             [self.pg1, self.pg2,
1746                                              self.pg3, self.pg4])
1747         self.send_and_expect_load_balancing(self.pg0, src_pkts,
1748                                             [self.pg1, self.pg2,
1749                                              self.pg3, self.pg4])
1750
1751         #
1752         # Recursive prefixes
1753         #  - testing that 2 stages of load-balancing no choices
1754         #
1755         port_pkts = []
1756
1757         for ii in range(257):
1758             port_pkts.append((Ether(src=self.pg0.remote_mac,
1759                                     dst=self.pg0.local_mac) /
1760                               IPv6(dst="6000::1", src="6000:1::1") /
1761                               UDP(sport=1234, dport=1234 + ii) /
1762                               Raw('\xa5' * 100)))
1763
1764         route_5000_2 = VppIpRoute(self, "5000::2", 128,
1765                                   [VppRoutePath(self.pg3.remote_ip6,
1766                                                 self.pg3.sw_if_index,
1767                                                 proto=DpoProto.DPO_PROTO_IP6)],
1768                                   is_ip6=1)
1769         route_5000_2.add_vpp_config()
1770
1771         route_6000_1 = VppIpRoute(self, "6000::1", 128,
1772                                   [VppRoutePath("5000::2",
1773                                                 0xffffffff,
1774                                                 proto=DpoProto.DPO_PROTO_IP6)],
1775                                   is_ip6=1)
1776         route_6000_1.add_vpp_config()
1777
1778         #
1779         # inject the packet on pg0 - expect load-balancing across all 4 paths
1780         #
1781         self.vapi.cli("clear trace")
1782         self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg3)
1783
1784
1785 class TestIP6Punt(VppTestCase):
1786     """ IPv6 Punt Police/Redirect """
1787
1788     def setUp(self):
1789         super(TestIP6Punt, self).setUp()
1790
1791         self.create_pg_interfaces(range(2))
1792
1793         for i in self.pg_interfaces:
1794             i.admin_up()
1795             i.config_ip6()
1796             i.resolve_ndp()
1797
1798     def tearDown(self):
1799         super(TestIP6Punt, self).tearDown()
1800         for i in self.pg_interfaces:
1801             i.unconfig_ip6()
1802             i.admin_down()
1803
1804     def test_ip_punt(self):
1805         """ IP6 punt police and redirect """
1806
1807         p = (Ether(src=self.pg0.remote_mac,
1808                    dst=self.pg0.local_mac) /
1809              IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
1810              TCP(sport=1234, dport=1234) /
1811              Raw('\xa5' * 100))
1812
1813         pkts = p * 1025
1814
1815         #
1816         # Configure a punt redirect via pg1.
1817         #
1818         nh_addr = inet_pton(AF_INET6,
1819                             self.pg1.remote_ip6)
1820         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
1821                                    self.pg1.sw_if_index,
1822                                    nh_addr,
1823                                    is_ip6=1)
1824
1825         self.send_and_expect(self.pg0, pkts, self.pg1)
1826
1827         #
1828         # add a policer
1829         #
1830         policer = self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
1831                                             rate_type=1)
1832         self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
1833
1834         self.vapi.cli("clear trace")
1835         self.pg0.add_stream(pkts)
1836         self.pg_enable_capture(self.pg_interfaces)
1837         self.pg_start()
1838
1839         #
1840         # the number of packet recieved should be greater than 0,
1841         # but not equal to the number sent, since some were policed
1842         #
1843         rx = self.pg1._get_capture(1)
1844         self.assertTrue(len(rx) > 0)
1845         self.assertTrue(len(rx) < len(pkts))
1846
1847         #
1848         # remove the poilcer. back to full rx
1849         #
1850         self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
1851         self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
1852                                   rate_type=1, is_add=0)
1853         self.send_and_expect(self.pg0, pkts, self.pg1)
1854
1855         #
1856         # remove the redirect. expect full drop.
1857         #
1858         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
1859                                    self.pg1.sw_if_index,
1860                                    nh_addr,
1861                                    is_add=0,
1862                                    is_ip6=1)
1863         self.send_and_assert_no_replies(self.pg0, pkts,
1864                                         "IP no punt config")
1865
1866         #
1867         # Add a redirect that is not input port selective
1868         #
1869         self.vapi.ip_punt_redirect(0xffffffff,
1870                                    self.pg1.sw_if_index,
1871                                    nh_addr,
1872                                    is_ip6=1)
1873         self.send_and_expect(self.pg0, pkts, self.pg1)
1874
1875         self.vapi.ip_punt_redirect(0xffffffff,
1876                                    self.pg1.sw_if_index,
1877                                    nh_addr,
1878                                    is_add=0,
1879                                    is_ip6=1)
1880
1881
1882 class TestIP6Input(VppTestCase):
1883     """ IPv6 Input Exceptions """
1884
1885     def setUp(self):
1886         super(TestIP6Input, self).setUp()
1887
1888         self.create_pg_interfaces(range(2))
1889
1890         for i in self.pg_interfaces:
1891             i.admin_up()
1892             i.config_ip6()
1893             i.resolve_ndp()
1894
1895     def tearDown(self):
1896         super(TestIP6Input, self).tearDown()
1897         for i in self.pg_interfaces:
1898             i.unconfig_ip6()
1899             i.admin_down()
1900
1901     def test_ip_input(self):
1902         """ IP6 Input Exceptions """
1903
1904         #
1905         # bad version - this is dropped
1906         #
1907         p_version = (Ether(src=self.pg0.remote_mac,
1908                            dst=self.pg0.local_mac) /
1909                      IPv6(src=self.pg0.remote_ip6,
1910                           dst=self.pg1.remote_ip6,
1911                           version=3) /
1912                      UDP(sport=1234, dport=1234) /
1913                      Raw('\xa5' * 100))
1914
1915         self.send_and_assert_no_replies(self.pg0, p_version * 65,
1916                                         "funky version")
1917
1918         #
1919         # hop limit - IMCP replies
1920         #
1921         p_version = (Ether(src=self.pg0.remote_mac,
1922                            dst=self.pg0.local_mac) /
1923                      IPv6(src=self.pg0.remote_ip6,
1924                           dst=self.pg1.remote_ip6,
1925                           hlim=1) /
1926                      UDP(sport=1234, dport=1234) /
1927                      Raw('\xa5' * 100))
1928
1929         rx = self.send_and_expect(self.pg0, p_version * 65, self.pg0)
1930         rx = rx[0]
1931         icmp = rx[ICMPv6TimeExceeded]
1932         self.assertEqual(icmp.type, 3)
1933         # 0: "hop limit exceeded in transit",
1934         self.assertEqual(icmp.code, 0)
1935
1936
1937 if __name__ == '__main__':
1938     unittest.main(testRunner=VppTestRunner)