IPv6 ND Router discovery data plane (VPP-1095)
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5
6 from framework import VppTestCase, VppTestRunner
7 from util import ppp, ip6_normalize
8 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
9 from vpp_pg_interface import is_ipv6_misc
10 from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
11     VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
12     VppMplsRoute, DpoProto, VppMplsTable
13 from vpp_neighbor import find_nbr, VppNeighbor
14
15 from scapy.packet import Raw
16 from scapy.layers.l2 import Ether, Dot1Q
17 from scapy.layers.inet6 import IPv6, UDP, TCP, ICMPv6ND_NS, ICMPv6ND_RS, \
18     ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
19     ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
20     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
21     ICMPv6TimeExceeded
22 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
23     in6_mactoifaceid, in6_ismaddr
24 from scapy.utils import inet_pton, inet_ntop
25 from scapy.contrib.mpls import MPLS
26
27
28 AF_INET6 = socket.AF_INET6
29
30
31 def mk_ll_addr(mac):
32     euid = in6_mactoifaceid(mac)
33     addr = "fe80::" + euid
34     return addr
35
36
37 class TestIPv6ND(VppTestCase):
38     def validate_ra(self, intf, rx, dst_ip=None):
39         if not dst_ip:
40             dst_ip = intf.remote_ip6
41
42         # unicasted packets must come to the unicast mac
43         self.assertEqual(rx[Ether].dst, intf.remote_mac)
44
45         # and from the router's MAC
46         self.assertEqual(rx[Ether].src, intf.local_mac)
47
48         # the rx'd RA should be addressed to the sender's source
49         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
50         self.assertEqual(in6_ptop(rx[IPv6].dst),
51                          in6_ptop(dst_ip))
52
53         # and come from the router's link local
54         self.assertTrue(in6_islladdr(rx[IPv6].src))
55         self.assertEqual(in6_ptop(rx[IPv6].src),
56                          in6_ptop(mk_ll_addr(intf.local_mac)))
57
58     def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
59         if not dst_ip:
60             dst_ip = intf.remote_ip6
61         if not tgt_ip:
62             dst_ip = intf.local_ip6
63
64         # unicasted packets must come to the unicast mac
65         self.assertEqual(rx[Ether].dst, intf.remote_mac)
66
67         # and from the router's MAC
68         self.assertEqual(rx[Ether].src, intf.local_mac)
69
70         # the rx'd NA should be addressed to the sender's source
71         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
72         self.assertEqual(in6_ptop(rx[IPv6].dst),
73                          in6_ptop(dst_ip))
74
75         # and come from the target address
76         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
77
78         # Dest link-layer options should have the router's MAC
79         dll = rx[ICMPv6NDOptDstLLAddr]
80         self.assertEqual(dll.lladdr, intf.local_mac)
81
82     def validate_ns(self, intf, rx, tgt_ip):
83         nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
84         dst_ip = inet_ntop(AF_INET6, nsma)
85
86         # NS is broadcast
87         self.assertEqual(rx[Ether].dst, "ff:ff:ff:ff:ff:ff")
88
89         # and from the router's MAC
90         self.assertEqual(rx[Ether].src, intf.local_mac)
91
92         # the rx'd NS should be addressed to an mcast address
93         # derived from the target address
94         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
95
96         # expect the tgt IP in the NS header
97         ns = rx[ICMPv6ND_NS]
98         self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
99
100         # packet is from the router's local address
101         self.assertEqual(in6_ptop(rx[IPv6].src), intf.local_ip6)
102
103         # Src link-layer options should have the router's MAC
104         sll = rx[ICMPv6NDOptSrcLLAddr]
105         self.assertEqual(sll.lladdr, intf.local_mac)
106
107     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
108                            filter_out_fn=is_ipv6_misc):
109         intf.add_stream(pkts)
110         self.pg_enable_capture(self.pg_interfaces)
111         self.pg_start()
112         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
113
114         self.assertEqual(len(rx), 1)
115         rx = rx[0]
116         self.validate_ra(intf, rx, dst_ip)
117
118     def send_and_expect_na(self, intf, pkts, remark, dst_ip=None,
119                            tgt_ip=None,
120                            filter_out_fn=is_ipv6_misc):
121         intf.add_stream(pkts)
122         self.pg_enable_capture(self.pg_interfaces)
123         self.pg_start()
124         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
125
126         self.assertEqual(len(rx), 1)
127         rx = rx[0]
128         self.validate_na(intf, rx, dst_ip, tgt_ip)
129
130     def send_and_expect_ns(self, tx_intf, rx_intf, pkts, tgt_ip,
131                            filter_out_fn=is_ipv6_misc):
132         tx_intf.add_stream(pkts)
133         self.pg_enable_capture(self.pg_interfaces)
134         self.pg_start()
135         rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
136
137         self.assertEqual(len(rx), 1)
138         rx = rx[0]
139         self.validate_ns(rx_intf, rx, tgt_ip)
140
141     def verify_ip(self, rx, smac, dmac, sip, dip):
142         ether = rx[Ether]
143         self.assertEqual(ether.dst, dmac)
144         self.assertEqual(ether.src, smac)
145
146         ip = rx[IPv6]
147         self.assertEqual(ip.src, sip)
148         self.assertEqual(ip.dst, dip)
149
150
151 class TestIPv6(TestIPv6ND):
152     """ IPv6 Test Case """
153
154     @classmethod
155     def setUpClass(cls):
156         super(TestIPv6, cls).setUpClass()
157
158     def setUp(self):
159         """
160         Perform test setup before test case.
161
162         **Config:**
163             - create 3 pg interfaces
164                 - untagged pg0 interface
165                 - Dot1Q subinterface on pg1
166                 - Dot1AD subinterface on pg2
167             - setup interfaces:
168                 - put it into UP state
169                 - set IPv6 addresses
170                 - resolve neighbor address using NDP
171             - configure 200 fib entries
172
173         :ivar list interfaces: pg interfaces and subinterfaces.
174         :ivar dict flows: IPv4 packet flows in test.
175         :ivar list pg_if_packet_sizes: packet sizes in test.
176
177         *TODO:* Create AD sub interface
178         """
179         super(TestIPv6, self).setUp()
180
181         # create 3 pg interfaces
182         self.create_pg_interfaces(range(3))
183
184         # create 2 subinterfaces for p1 and pg2
185         self.sub_interfaces = [
186             VppDot1QSubint(self, self.pg1, 100),
187             VppDot1QSubint(self, self.pg2, 200)
188             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
189         ]
190
191         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
192         self.flows = dict()
193         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
194         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
195         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
196
197         # packet sizes
198         self.pg_if_packet_sizes = [64, 512, 1518, 9018]
199         self.sub_if_packet_sizes = [64, 512, 1518 + 4, 9018 + 4]
200
201         self.interfaces = list(self.pg_interfaces)
202         self.interfaces.extend(self.sub_interfaces)
203
204         # setup all interfaces
205         for i in self.interfaces:
206             i.admin_up()
207             i.config_ip6()
208             i.resolve_ndp()
209
210         # config 2M FIB entries
211         self.config_fib_entries(200)
212
213     def tearDown(self):
214         """Run standard test teardown and log ``show ip6 neighbors``."""
215         for i in self.sub_interfaces:
216             i.unconfig_ip6()
217             i.ip6_disable()
218             i.admin_down()
219             i.remove_vpp_config()
220
221         super(TestIPv6, self).tearDown()
222         if not self.vpp_dead:
223             self.logger.info(self.vapi.cli("show ip6 neighbors"))
224             # info(self.vapi.cli("show ip6 fib"))  # many entries
225
226     def config_fib_entries(self, count):
227         """For each interface add to the FIB table *count* routes to
228         "fd02::1/128" destination with interface's local address as next-hop
229         address.
230
231         :param int count: Number of FIB entries.
232
233         - *TODO:* check if the next-hop address shouldn't be remote address
234           instead of local address.
235         """
236         n_int = len(self.interfaces)
237         percent = 0
238         counter = 0.0
239         dest_addr = inet_pton(AF_INET6, "fd02::1")
240         dest_addr_len = 128
241         for i in self.interfaces:
242             next_hop_address = i.local_ip6n
243             for j in range(count / n_int):
244                 self.vapi.ip_add_del_route(
245                     dest_addr, dest_addr_len, next_hop_address, is_ipv6=1)
246                 counter += 1
247                 if counter / count * 100 > percent:
248                     self.logger.info("Configure %d FIB entries .. %d%% done" %
249                                      (count, percent))
250                     percent += 1
251
252     def create_stream(self, src_if, packet_sizes):
253         """Create input packet stream for defined interface.
254
255         :param VppInterface src_if: Interface to create packet stream for.
256         :param list packet_sizes: Required packet sizes.
257         """
258         pkts = []
259         for i in range(0, 257):
260             dst_if = self.flows[src_if][i % 2]
261             info = self.create_packet_info(src_if, dst_if)
262             payload = self.info_to_payload(info)
263             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
264                  IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
265                  UDP(sport=1234, dport=1234) /
266                  Raw(payload))
267             info.data = p.copy()
268             if isinstance(src_if, VppSubInterface):
269                 p = src_if.add_dot1_layer(p)
270             size = packet_sizes[(i // 2) % len(packet_sizes)]
271             self.extend_packet(p, size)
272             pkts.append(p)
273         return pkts
274
275     def verify_capture(self, dst_if, capture):
276         """Verify captured input packet stream for defined interface.
277
278         :param VppInterface dst_if: Interface to verify captured packet stream
279                                     for.
280         :param list capture: Captured packet stream.
281         """
282         self.logger.info("Verifying capture on interface %s" % dst_if.name)
283         last_info = dict()
284         for i in self.interfaces:
285             last_info[i.sw_if_index] = None
286         is_sub_if = False
287         dst_sw_if_index = dst_if.sw_if_index
288         if hasattr(dst_if, 'parent'):
289             is_sub_if = True
290         for packet in capture:
291             if is_sub_if:
292                 # Check VLAN tags and Ethernet header
293                 packet = dst_if.remove_dot1_layer(packet)
294             self.assertTrue(Dot1Q not in packet)
295             try:
296                 ip = packet[IPv6]
297                 udp = packet[UDP]
298                 payload_info = self.payload_to_info(str(packet[Raw]))
299                 packet_index = payload_info.index
300                 self.assertEqual(payload_info.dst, dst_sw_if_index)
301                 self.logger.debug(
302                     "Got packet on port %s: src=%u (id=%u)" %
303                     (dst_if.name, payload_info.src, packet_index))
304                 next_info = self.get_next_packet_info_for_interface2(
305                     payload_info.src, dst_sw_if_index,
306                     last_info[payload_info.src])
307                 last_info[payload_info.src] = next_info
308                 self.assertTrue(next_info is not None)
309                 self.assertEqual(packet_index, next_info.index)
310                 saved_packet = next_info.data
311                 # Check standard fields
312                 self.assertEqual(ip.src, saved_packet[IPv6].src)
313                 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
314                 self.assertEqual(udp.sport, saved_packet[UDP].sport)
315                 self.assertEqual(udp.dport, saved_packet[UDP].dport)
316             except:
317                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
318                 raise
319         for i in self.interfaces:
320             remaining_packet = self.get_next_packet_info_for_interface2(
321                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
322             self.assertTrue(remaining_packet is None,
323                             "Interface %s: Packet expected from interface %s "
324                             "didn't arrive" % (dst_if.name, i.name))
325
326     def test_fib(self):
327         """ IPv6 FIB test
328
329         Test scenario:
330             - Create IPv6 stream for pg0 interface
331             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
332             - Send and verify received packets on each interface.
333         """
334
335         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes)
336         self.pg0.add_stream(pkts)
337
338         for i in self.sub_interfaces:
339             pkts = self.create_stream(i, self.sub_if_packet_sizes)
340             i.parent.add_stream(pkts)
341
342         self.pg_enable_capture(self.pg_interfaces)
343         self.pg_start()
344
345         pkts = self.pg0.get_capture()
346         self.verify_capture(self.pg0, pkts)
347
348         for i in self.sub_interfaces:
349             pkts = i.parent.get_capture()
350             self.verify_capture(i, pkts)
351
352     def test_ns(self):
353         """ IPv6 Neighbour Solicitation Exceptions
354
355         Test scenario:
356            - Send an NS Sourced from an address not covered by the link sub-net
357            - Send an NS to an mcast address the router has not joined
358            - Send NS for a target address the router does not onn.
359         """
360
361         #
362         # An NS from a non link source address
363         #
364         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
365         d = inet_ntop(AF_INET6, nsma)
366
367         p = (Ether(dst=in6_getnsmac(nsma)) /
368              IPv6(dst=d, src="2002::2") /
369              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
370              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
371         pkts = [p]
372
373         self.send_and_assert_no_replies(
374             self.pg0, pkts,
375             "No response to NS source by address not on sub-net")
376
377         #
378         # An NS for sent to a solicited mcast group the router is
379         # not a member of FAILS
380         #
381         if 0:
382             nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
383             d = inet_ntop(AF_INET6, nsma)
384
385             p = (Ether(dst=in6_getnsmac(nsma)) /
386                  IPv6(dst=d, src=self.pg0.remote_ip6) /
387                  ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
388                  ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
389             pkts = [p]
390
391             self.send_and_assert_no_replies(
392                 self.pg0, pkts,
393                 "No response to NS sent to unjoined mcast address")
394
395         #
396         # An NS whose target address is one the router does not own
397         #
398         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
399         d = inet_ntop(AF_INET6, nsma)
400
401         p = (Ether(dst=in6_getnsmac(nsma)) /
402              IPv6(dst=d, src=self.pg0.remote_ip6) /
403              ICMPv6ND_NS(tgt="fd::ffff") /
404              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
405         pkts = [p]
406
407         self.send_and_assert_no_replies(self.pg0, pkts,
408                                         "No response to NS for unknown target")
409
410         #
411         # A neighbor entry that has no associated FIB-entry
412         #
413         self.pg0.generate_remote_hosts(4)
414         nd_entry = VppNeighbor(self,
415                                self.pg0.sw_if_index,
416                                self.pg0.remote_hosts[2].mac,
417                                self.pg0.remote_hosts[2].ip6,
418                                af=AF_INET6,
419                                is_no_fib_entry=1)
420         nd_entry.add_vpp_config()
421
422         #
423         # check we have the neighbor, but no route
424         #
425         self.assertTrue(find_nbr(self,
426                                  self.pg0.sw_if_index,
427                                  self.pg0._remote_hosts[2].ip6,
428                                  inet=AF_INET6))
429         self.assertFalse(find_route(self,
430                                     self.pg0._remote_hosts[2].ip6,
431                                     128,
432                                     inet=AF_INET6))
433
434         #
435         # send an NS from a link local address to the interface's global
436         # address
437         #
438         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
439              IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6_ll) /
440              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
441              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
442
443         self.send_and_expect_na(self.pg0, p,
444                                 "NS from link-local",
445                                 dst_ip=self.pg0._remote_hosts[2].ip6_ll,
446                                 tgt_ip=self.pg0.local_ip6)
447
448         #
449         # we should have learned an ND entry for the peer's link-local
450         # but not inserted a route to it in the FIB
451         #
452         self.assertTrue(find_nbr(self,
453                                  self.pg0.sw_if_index,
454                                  self.pg0._remote_hosts[2].ip6_ll,
455                                  inet=AF_INET6))
456         self.assertFalse(find_route(self,
457                                     self.pg0._remote_hosts[2].ip6_ll,
458                                     128,
459                                     inet=AF_INET6))
460
461         #
462         # An NS to the router's own Link-local
463         #
464         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
465              IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll) /
466              ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll) /
467              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
468
469         self.send_and_expect_na(self.pg0, p,
470                                 "NS to/from link-local",
471                                 dst_ip=self.pg0._remote_hosts[3].ip6_ll,
472                                 tgt_ip=self.pg0.local_ip6_ll)
473
474         #
475         # we should have learned an ND entry for the peer's link-local
476         # but not inserted a route to it in the FIB
477         #
478         self.assertTrue(find_nbr(self,
479                                  self.pg0.sw_if_index,
480                                  self.pg0._remote_hosts[3].ip6_ll,
481                                  inet=AF_INET6))
482         self.assertFalse(find_route(self,
483                                     self.pg0._remote_hosts[3].ip6_ll,
484                                     128,
485                                     inet=AF_INET6))
486
487     def test_ns_duplicates(self):
488         """ ND Duplicates"""
489
490         #
491         # Generate some hosts on the LAN
492         #
493         self.pg1.generate_remote_hosts(3)
494
495         #
496         # Add host 1 on pg1 and pg2
497         #
498         ns_pg1 = VppNeighbor(self,
499                              self.pg1.sw_if_index,
500                              self.pg1.remote_hosts[1].mac,
501                              self.pg1.remote_hosts[1].ip6,
502                              af=AF_INET6)
503         ns_pg1.add_vpp_config()
504         ns_pg2 = VppNeighbor(self,
505                              self.pg2.sw_if_index,
506                              self.pg2.remote_mac,
507                              self.pg1.remote_hosts[1].ip6,
508                              af=AF_INET6)
509         ns_pg2.add_vpp_config()
510
511         #
512         # IP packet destined for pg1 remote host arrives on pg1 again.
513         #
514         p = (Ether(dst=self.pg0.local_mac,
515                    src=self.pg0.remote_mac) /
516              IPv6(src=self.pg0.remote_ip6,
517                   dst=self.pg1.remote_hosts[1].ip6) /
518              UDP(sport=1234, dport=1234) /
519              Raw())
520
521         self.pg0.add_stream(p)
522         self.pg_enable_capture(self.pg_interfaces)
523         self.pg_start()
524
525         rx1 = self.pg1.get_capture(1)
526
527         self.verify_ip(rx1[0],
528                        self.pg1.local_mac,
529                        self.pg1.remote_hosts[1].mac,
530                        self.pg0.remote_ip6,
531                        self.pg1.remote_hosts[1].ip6)
532
533         #
534         # remove the duplicate on pg1
535         # packet stream shoud generate NSs out of pg1
536         #
537         ns_pg1.remove_vpp_config()
538
539         self.send_and_expect_ns(self.pg0, self.pg1,
540                                 p, self.pg1.remote_hosts[1].ip6)
541
542         #
543         # Add it back
544         #
545         ns_pg1.add_vpp_config()
546
547         self.pg0.add_stream(p)
548         self.pg_enable_capture(self.pg_interfaces)
549         self.pg_start()
550
551         rx1 = self.pg1.get_capture(1)
552
553         self.verify_ip(rx1[0],
554                        self.pg1.local_mac,
555                        self.pg1.remote_hosts[1].mac,
556                        self.pg0.remote_ip6,
557                        self.pg1.remote_hosts[1].ip6)
558
559     def validate_ra(self, intf, rx, dst_ip=None, mtu=9000, pi_opt=None):
560         if not dst_ip:
561             dst_ip = intf.remote_ip6
562
563         # unicasted packets must come to the unicast mac
564         self.assertEqual(rx[Ether].dst, intf.remote_mac)
565
566         # and from the router's MAC
567         self.assertEqual(rx[Ether].src, intf.local_mac)
568
569         # the rx'd RA should be addressed to the sender's source
570         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
571         self.assertEqual(in6_ptop(rx[IPv6].dst),
572                          in6_ptop(dst_ip))
573
574         # and come from the router's link local
575         self.assertTrue(in6_islladdr(rx[IPv6].src))
576         self.assertEqual(in6_ptop(rx[IPv6].src),
577                          in6_ptop(mk_ll_addr(intf.local_mac)))
578
579         # it should contain the links MTU
580         ra = rx[ICMPv6ND_RA]
581         self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
582
583         # it should contain the source's link layer address option
584         sll = ra[ICMPv6NDOptSrcLLAddr]
585         self.assertEqual(sll.lladdr, intf.local_mac)
586
587         if not pi_opt:
588             # the RA should not contain prefix information
589             self.assertFalse(ra.haslayer(ICMPv6NDOptPrefixInfo))
590         else:
591             raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
592
593             # the options are nested in the scapy packet in way that i cannot
594             # decipher how to decode. this 1st layer of option always returns
595             # nested classes, so a direct obj1=obj2 comparison always fails.
596             # however, the getlayer(.., 2) does give one instnace.
597             # so we cheat here and construct a new opt instnace for comparison
598             rd = ICMPv6NDOptPrefixInfo(prefixlen=raos.prefixlen,
599                                        prefix=raos.prefix,
600                                        L=raos.L,
601                                        A=raos.A)
602             if type(pi_opt) is list:
603                 for ii in range(len(pi_opt)):
604                     self.assertEqual(pi_opt[ii], rd)
605                     rd = rx.getlayer(ICMPv6NDOptPrefixInfo, ii+2)
606             else:
607                 self.assertEqual(pi_opt, raos)
608
609     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
610                            filter_out_fn=is_ipv6_misc,
611                            opt=None):
612         intf.add_stream(pkts)
613         self.pg_enable_capture(self.pg_interfaces)
614         self.pg_start()
615         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
616
617         self.assertEqual(len(rx), 1)
618         rx = rx[0]
619         self.validate_ra(intf, rx, dst_ip, pi_opt=opt)
620
621     def test_rs(self):
622         """ IPv6 Router Solicitation Exceptions
623
624         Test scenario:
625         """
626
627         #
628         # Before we begin change the IPv6 RA responses to use the unicast
629         # address - that way we will not confuse them with the periodic
630         # RAs which go to the mcast address
631         # Sit and wait for the first periodic RA.
632         #
633         # TODO
634         #
635         self.pg0.ip6_ra_config(send_unicast=1)
636
637         #
638         # An RS from a link source address
639         #  - expect an RA in return
640         #
641         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
642              IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
643              ICMPv6ND_RS())
644         pkts = [p]
645         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
646
647         #
648         # For the next RS sent the RA should be rate limited
649         #
650         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
651
652         #
653         # When we reconfiure the IPv6 RA config, we reset the RA rate limiting,
654         # so we need to do this before each test below so as not to drop
655         # packets for rate limiting reasons. Test this works here.
656         #
657         self.pg0.ip6_ra_config(send_unicast=1)
658         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
659
660         #
661         # An RS sent from a non-link local source
662         #
663         self.pg0.ip6_ra_config(send_unicast=1)
664         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
665              IPv6(dst=self.pg0.local_ip6, src="2002::ffff") /
666              ICMPv6ND_RS())
667         pkts = [p]
668         self.send_and_assert_no_replies(self.pg0, pkts,
669                                         "RS from non-link source")
670
671         #
672         # Source an RS from a link local address
673         #
674         self.pg0.ip6_ra_config(send_unicast=1)
675         ll = mk_ll_addr(self.pg0.remote_mac)
676         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
677              IPv6(dst=self.pg0.local_ip6, src=ll) /
678              ICMPv6ND_RS())
679         pkts = [p]
680         self.send_and_expect_ra(self.pg0, pkts,
681                                 "RS sourced from link-local",
682                                 dst_ip=ll)
683
684         #
685         # Send the RS multicast
686         #
687         self.pg0.ip6_ra_config(send_unicast=1)
688         dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
689         ll = mk_ll_addr(self.pg0.remote_mac)
690         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
691              IPv6(dst="ff02::2", src=ll) /
692              ICMPv6ND_RS())
693         pkts = [p]
694         self.send_and_expect_ra(self.pg0, pkts,
695                                 "RS sourced from link-local",
696                                 dst_ip=ll)
697
698         #
699         # Source from the unspecified address ::. This happens when the RS
700         # is sent before the host has a configured address/sub-net,
701         # i.e. auto-config. Since the sender has no IP address, the reply
702         # comes back mcast - so the capture needs to not filter this.
703         # If we happen to pick up the periodic RA at this point then so be it,
704         # it's not an error.
705         #
706         self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
707         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
708              IPv6(dst="ff02::2", src="::") /
709              ICMPv6ND_RS())
710         pkts = [p]
711         self.send_and_expect_ra(self.pg0, pkts,
712                                 "RS sourced from unspecified",
713                                 dst_ip="ff02::1",
714                                 filter_out_fn=None)
715
716         #
717         # Configure The RA to announce the links prefix
718         #
719         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
720                                self.pg0.local_ip6_prefix_len)
721
722         #
723         # RAs should now contain the prefix information option
724         #
725         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
726                                     prefix=self.pg0.local_ip6,
727                                     L=1,
728                                     A=1)
729
730         self.pg0.ip6_ra_config(send_unicast=1)
731         ll = mk_ll_addr(self.pg0.remote_mac)
732         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
733              IPv6(dst=self.pg0.local_ip6, src=ll) /
734              ICMPv6ND_RS())
735         self.send_and_expect_ra(self.pg0, p,
736                                 "RA with prefix-info",
737                                 dst_ip=ll,
738                                 opt=opt)
739
740         #
741         # Change the prefix info to not off-link
742         #  L-flag is clear
743         #
744         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
745                                self.pg0.local_ip6_prefix_len,
746                                off_link=1)
747
748         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
749                                     prefix=self.pg0.local_ip6,
750                                     L=0,
751                                     A=1)
752
753         self.pg0.ip6_ra_config(send_unicast=1)
754         self.send_and_expect_ra(self.pg0, p,
755                                 "RA with Prefix info with L-flag=0",
756                                 dst_ip=ll,
757                                 opt=opt)
758
759         #
760         # Change the prefix info to not off-link, no-autoconfig
761         #  L and A flag are clear in the advert
762         #
763         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
764                                self.pg0.local_ip6_prefix_len,
765                                off_link=1,
766                                no_autoconfig=1)
767
768         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
769                                     prefix=self.pg0.local_ip6,
770                                     L=0,
771                                     A=0)
772
773         self.pg0.ip6_ra_config(send_unicast=1)
774         self.send_and_expect_ra(self.pg0, p,
775                                 "RA with Prefix info with A & L-flag=0",
776                                 dst_ip=ll,
777                                 opt=opt)
778
779         #
780         # Change the flag settings back to the defaults
781         #  L and A flag are set in the advert
782         #
783         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
784                                self.pg0.local_ip6_prefix_len)
785
786         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
787                                     prefix=self.pg0.local_ip6,
788                                     L=1,
789                                     A=1)
790
791         self.pg0.ip6_ra_config(send_unicast=1)
792         self.send_and_expect_ra(self.pg0, p,
793                                 "RA with Prefix info",
794                                 dst_ip=ll,
795                                 opt=opt)
796
797         #
798         # Change the prefix info to not off-link, no-autoconfig
799         #  L and A flag are clear in the advert
800         #
801         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
802                                self.pg0.local_ip6_prefix_len,
803                                off_link=1,
804                                no_autoconfig=1)
805
806         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
807                                     prefix=self.pg0.local_ip6,
808                                     L=0,
809                                     A=0)
810
811         self.pg0.ip6_ra_config(send_unicast=1)
812         self.send_and_expect_ra(self.pg0, p,
813                                 "RA with Prefix info with A & L-flag=0",
814                                 dst_ip=ll,
815                                 opt=opt)
816
817         #
818         # Use the reset to defults option to revert to defaults
819         #  L and A flag are clear in the advert
820         #
821         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
822                                self.pg0.local_ip6_prefix_len,
823                                use_default=1)
824
825         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
826                                     prefix=self.pg0.local_ip6,
827                                     L=1,
828                                     A=1)
829
830         self.pg0.ip6_ra_config(send_unicast=1)
831         self.send_and_expect_ra(self.pg0, p,
832                                 "RA with Prefix reverted to defaults",
833                                 dst_ip=ll,
834                                 opt=opt)
835
836         #
837         # Advertise Another prefix. With no L-flag/A-flag
838         #
839         self.pg0.ip6_ra_prefix(self.pg1.local_ip6n,
840                                self.pg1.local_ip6_prefix_len,
841                                off_link=1,
842                                no_autoconfig=1)
843
844         opt = [ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
845                                      prefix=self.pg0.local_ip6,
846                                      L=1,
847                                      A=1),
848                ICMPv6NDOptPrefixInfo(prefixlen=self.pg1.local_ip6_prefix_len,
849                                      prefix=self.pg1.local_ip6,
850                                      L=0,
851                                      A=0)]
852
853         self.pg0.ip6_ra_config(send_unicast=1)
854         ll = mk_ll_addr(self.pg0.remote_mac)
855         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
856              IPv6(dst=self.pg0.local_ip6, src=ll) /
857              ICMPv6ND_RS())
858         self.send_and_expect_ra(self.pg0, p,
859                                 "RA with multiple Prefix infos",
860                                 dst_ip=ll,
861                                 opt=opt)
862
863         #
864         # Remove the first refix-info - expect the second is still in the
865         # advert
866         #
867         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
868                                self.pg0.local_ip6_prefix_len,
869                                is_no=1)
870
871         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg1.local_ip6_prefix_len,
872                                     prefix=self.pg1.local_ip6,
873                                     L=0,
874                                     A=0)
875
876         self.pg0.ip6_ra_config(send_unicast=1)
877         self.send_and_expect_ra(self.pg0, p,
878                                 "RA with Prefix reverted to defaults",
879                                 dst_ip=ll,
880                                 opt=opt)
881
882         #
883         # Remove the second prefix-info - expect no prefix-info i nthe adverts
884         #
885         self.pg0.ip6_ra_prefix(self.pg1.local_ip6n,
886                                self.pg1.local_ip6_prefix_len,
887                                is_no=1)
888
889         self.pg0.ip6_ra_config(send_unicast=1)
890         self.send_and_expect_ra(self.pg0, p,
891                                 "RA with Prefix reverted to defaults",
892                                 dst_ip=ll)
893
894         #
895         # Reset the periodic advertisements back to default values
896         #
897         self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
898
899
900 class TestIPv6RD(TestIPv6ND):
901     """ IPv6 Router Discovery Test Case """
902
903     @classmethod
904     def setUpClass(cls):
905         super(TestIPv6RD, cls).setUpClass()
906
907     def setUp(self):
908         super(TestIPv6RD, self).setUp()
909
910         # create 2 pg interfaces
911         self.create_pg_interfaces(range(2))
912
913         self.interfaces = list(self.pg_interfaces)
914
915         # setup all interfaces
916         for i in self.interfaces:
917             i.admin_up()
918             i.config_ip6()
919
920     def tearDown(self):
921         super(TestIPv6RD, self).tearDown()
922
923     def test_rd_send_router_solicitation(self):
924         """ Verify router solicitation packets """
925
926         count = 2
927         self.pg_enable_capture(self.pg_interfaces)
928         self.pg_start()
929         self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index,
930                                                  mrc=count)
931         rx_list = self.pg1.get_capture(count, timeout=3)
932         self.assertEqual(len(rx_list), count)
933         for packet in rx_list:
934             self.assertTrue(packet.haslayer(IPv6))
935             self.assertTrue(packet[IPv6].haslayer(ICMPv6ND_RS))
936             dst = ip6_normalize(packet[IPv6].dst)
937             dst2 = ip6_normalize("ff02::2")
938             self.assert_equal(dst, dst2)
939             src = ip6_normalize(packet[IPv6].src)
940             src2 = ip6_normalize(self.pg1.local_ip6_ll)
941             self.assert_equal(src, src2)
942             self.assertTrue(packet[ICMPv6ND_RS].haslayer(ICMPv6NDOptSrcLLAddr))
943             self.assert_equal(packet[ICMPv6NDOptSrcLLAddr].lladdr,
944                               self.pg1.local_mac)
945
946     def verify_prefix_info(self, reported_prefix, prefix_option):
947         prefix = socket.inet_pton(socket.AF_INET6,
948                                   prefix_option.getfieldval("prefix"))
949         self.assert_equal(reported_prefix.dst_address, prefix)
950         self.assert_equal(reported_prefix.dst_address_length,
951                           prefix_option.getfieldval("prefixlen"))
952         L = prefix_option.getfieldval("L")
953         A = prefix_option.getfieldval("A")
954         option_flags = (L << 7) | (A << 6)
955         self.assert_equal(reported_prefix.flags, option_flags)
956         self.assert_equal(reported_prefix.valid_time,
957                           prefix_option.getfieldval("validlifetime"))
958         self.assert_equal(reported_prefix.preferred_time,
959                           prefix_option.getfieldval("preferredlifetime"))
960
961     def test_rd_receive_router_advertisement(self):
962         """ Verify events triggered by received RA packets """
963
964         self.vapi.want_ip6_ra_events()
965
966         prefix_info_1 = ICMPv6NDOptPrefixInfo(
967             prefix="1::2",
968             prefixlen=50,
969             validlifetime=200,
970             preferredlifetime=500,
971             L=1,
972             A=1,
973         )
974
975         prefix_info_2 = ICMPv6NDOptPrefixInfo(
976             prefix="7::4",
977             prefixlen=20,
978             validlifetime=70,
979             preferredlifetime=1000,
980             L=1,
981             A=0,
982         )
983
984         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
985              IPv6(dst=self.pg1.local_ip6_ll,
986                   src=mk_ll_addr(self.pg1.remote_mac)) /
987              ICMPv6ND_RA() /
988              prefix_info_1 /
989              prefix_info_2)
990         self.pg1.add_stream([p])
991         self.pg_start()
992
993         ev = self.vapi.wait_for_event(10, "ip6_ra_event")
994
995         self.assert_equal(ev.current_hop_limit, 0)
996         self.assert_equal(ev.flags, 8)
997         self.assert_equal(ev.router_lifetime_in_sec, 1800)
998         self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
999         self.assert_equal(
1000             ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0)
1001
1002         self.assert_equal(ev.n_prefixes, 2)
1003
1004         self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1005         self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1006
1007
1008 class IPv6NDProxyTest(TestIPv6ND):
1009     """ IPv6 ND ProxyTest Case """
1010
1011     def setUp(self):
1012         super(IPv6NDProxyTest, self).setUp()
1013
1014         # create 3 pg interfaces
1015         self.create_pg_interfaces(range(3))
1016
1017         # pg0 is the master interface, with the configured subnet
1018         self.pg0.admin_up()
1019         self.pg0.config_ip6()
1020         self.pg0.resolve_ndp()
1021
1022         self.pg1.ip6_enable()
1023         self.pg2.ip6_enable()
1024
1025     def tearDown(self):
1026         super(IPv6NDProxyTest, self).tearDown()
1027
1028     def test_nd_proxy(self):
1029         """ IPv6 Proxy ND """
1030
1031         #
1032         # Generate some hosts in the subnet that we are proxying
1033         #
1034         self.pg0.generate_remote_hosts(8)
1035
1036         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1037         d = inet_ntop(AF_INET6, nsma)
1038
1039         #
1040         # Send an NS for one of those remote hosts on one of the proxy links
1041         # expect no response since it's from an address that is not
1042         # on the link that has the prefix configured
1043         #
1044         ns_pg1 = (Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac) /
1045                   IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6) /
1046                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1047                   ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac))
1048
1049         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1050
1051         #
1052         # Add proxy support for the host
1053         #
1054         self.vapi.ip6_nd_proxy(
1055             inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1056             self.pg1.sw_if_index)
1057
1058         #
1059         # try that NS again. this time we expect an NA back
1060         #
1061         self.send_and_expect_na(self.pg1, ns_pg1,
1062                                 "NS to proxy entry",
1063                                 dst_ip=self.pg0._remote_hosts[2].ip6,
1064                                 tgt_ip=self.pg0.local_ip6)
1065
1066         #
1067         # ... and that we have an entry in the ND cache
1068         #
1069         self.assertTrue(find_nbr(self,
1070                                  self.pg1.sw_if_index,
1071                                  self.pg0._remote_hosts[2].ip6,
1072                                  inet=AF_INET6))
1073
1074         #
1075         # ... and we can route traffic to it
1076         #
1077         t = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1078              IPv6(dst=self.pg0._remote_hosts[2].ip6,
1079                   src=self.pg0.remote_ip6) /
1080              UDP(sport=10000, dport=20000) /
1081              Raw('\xa5' * 100))
1082
1083         self.pg0.add_stream(t)
1084         self.pg_enable_capture(self.pg_interfaces)
1085         self.pg_start()
1086         rx = self.pg1.get_capture(1)
1087         rx = rx[0]
1088
1089         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1090         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1091
1092         self.assertEqual(rx[IPv6].src, t[IPv6].src)
1093         self.assertEqual(rx[IPv6].dst, t[IPv6].dst)
1094
1095         #
1096         # Test we proxy for the host on the main interface
1097         #
1098         ns_pg0 = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
1099                   IPv6(dst=d, src=self.pg0.remote_ip6) /
1100                   ICMPv6ND_NS(tgt=self.pg0._remote_hosts[2].ip6) /
1101                   ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
1102
1103         self.send_and_expect_na(self.pg0, ns_pg0,
1104                                 "NS to proxy entry on main",
1105                                 tgt_ip=self.pg0._remote_hosts[2].ip6,
1106                                 dst_ip=self.pg0.remote_ip6)
1107
1108         #
1109         # Setup and resolve proxy for another host on another interface
1110         #
1111         ns_pg2 = (Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac) /
1112                   IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6) /
1113                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1114                   ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac))
1115
1116         self.vapi.ip6_nd_proxy(
1117             inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1118             self.pg2.sw_if_index)
1119
1120         self.send_and_expect_na(self.pg2, ns_pg2,
1121                                 "NS to proxy entry other interface",
1122                                 dst_ip=self.pg0._remote_hosts[3].ip6,
1123                                 tgt_ip=self.pg0.local_ip6)
1124
1125         self.assertTrue(find_nbr(self,
1126                                  self.pg2.sw_if_index,
1127                                  self.pg0._remote_hosts[3].ip6,
1128                                  inet=AF_INET6))
1129
1130         #
1131         # hosts can communicate. pg2->pg1
1132         #
1133         t2 = (Ether(dst=self.pg2.local_mac,
1134                     src=self.pg0.remote_hosts[3].mac) /
1135               IPv6(dst=self.pg0._remote_hosts[2].ip6,
1136                    src=self.pg0._remote_hosts[3].ip6) /
1137               UDP(sport=10000, dport=20000) /
1138               Raw('\xa5' * 100))
1139
1140         self.pg2.add_stream(t2)
1141         self.pg_enable_capture(self.pg_interfaces)
1142         self.pg_start()
1143         rx = self.pg1.get_capture(1)
1144         rx = rx[0]
1145
1146         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1147         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1148
1149         self.assertEqual(rx[IPv6].src, t2[IPv6].src)
1150         self.assertEqual(rx[IPv6].dst, t2[IPv6].dst)
1151
1152         #
1153         # remove the proxy configs
1154         #
1155         self.vapi.ip6_nd_proxy(
1156             inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1157             self.pg1.sw_if_index,
1158             is_del=1)
1159         self.vapi.ip6_nd_proxy(
1160             inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1161             self.pg2.sw_if_index,
1162             is_del=1)
1163
1164         self.assertFalse(find_nbr(self,
1165                                   self.pg2.sw_if_index,
1166                                   self.pg0._remote_hosts[3].ip6,
1167                                   inet=AF_INET6))
1168         self.assertFalse(find_nbr(self,
1169                                   self.pg1.sw_if_index,
1170                                   self.pg0._remote_hosts[2].ip6,
1171                                   inet=AF_INET6))
1172
1173         #
1174         # no longer proxy-ing...
1175         #
1176         self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1177         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1178         self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1179
1180         #
1181         # no longer forwarding. traffic generates NS out of the glean/main
1182         # interface
1183         #
1184         self.pg2.add_stream(t2)
1185         self.pg_enable_capture(self.pg_interfaces)
1186         self.pg_start()
1187
1188         rx = self.pg0.get_capture(1)
1189
1190         self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1191
1192
1193 class TestIPNull(VppTestCase):
1194     """ IPv6 routes via NULL """
1195
1196     def setUp(self):
1197         super(TestIPNull, self).setUp()
1198
1199         # create 2 pg interfaces
1200         self.create_pg_interfaces(range(1))
1201
1202         for i in self.pg_interfaces:
1203             i.admin_up()
1204             i.config_ip6()
1205             i.resolve_ndp()
1206
1207     def tearDown(self):
1208         super(TestIPNull, self).tearDown()
1209         for i in self.pg_interfaces:
1210             i.unconfig_ip6()
1211             i.admin_down()
1212
1213     def test_ip_null(self):
1214         """ IP NULL route """
1215
1216         p = (Ether(src=self.pg0.remote_mac,
1217                    dst=self.pg0.local_mac) /
1218              IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
1219              UDP(sport=1234, dport=1234) /
1220              Raw('\xa5' * 100))
1221
1222         #
1223         # A route via IP NULL that will reply with ICMP unreachables
1224         #
1225         ip_unreach = VppIpRoute(self, "2001::", 64, [], is_unreach=1, is_ip6=1)
1226         ip_unreach.add_vpp_config()
1227
1228         self.pg0.add_stream(p)
1229         self.pg_enable_capture(self.pg_interfaces)
1230         self.pg_start()
1231
1232         rx = self.pg0.get_capture(1)
1233         rx = rx[0]
1234         icmp = rx[ICMPv6DestUnreach]
1235
1236         # 0 = "No route to destination"
1237         self.assertEqual(icmp.code, 0)
1238
1239         # ICMP is rate limited. pause a bit
1240         self.sleep(1)
1241
1242         #
1243         # A route via IP NULL that will reply with ICMP prohibited
1244         #
1245         ip_prohibit = VppIpRoute(self, "2001::1", 128, [],
1246                                  is_prohibit=1, is_ip6=1)
1247         ip_prohibit.add_vpp_config()
1248
1249         self.pg0.add_stream(p)
1250         self.pg_enable_capture(self.pg_interfaces)
1251         self.pg_start()
1252
1253         rx = self.pg0.get_capture(1)
1254         rx = rx[0]
1255         icmp = rx[ICMPv6DestUnreach]
1256
1257         # 1 = "Communication with destination administratively prohibited"
1258         self.assertEqual(icmp.code, 1)
1259
1260
1261 class TestIPDisabled(VppTestCase):
1262     """ IPv6 disabled """
1263
1264     def setUp(self):
1265         super(TestIPDisabled, self).setUp()
1266
1267         # create 2 pg interfaces
1268         self.create_pg_interfaces(range(2))
1269
1270         # PG0 is IP enalbed
1271         self.pg0.admin_up()
1272         self.pg0.config_ip6()
1273         self.pg0.resolve_ndp()
1274
1275         # PG 1 is not IP enabled
1276         self.pg1.admin_up()
1277
1278     def tearDown(self):
1279         super(TestIPDisabled, self).tearDown()
1280         for i in self.pg_interfaces:
1281             i.unconfig_ip4()
1282             i.admin_down()
1283
1284     def test_ip_disabled(self):
1285         """ IP Disabled """
1286
1287         #
1288         # An (S,G).
1289         # one accepting interface, pg0, 2 forwarding interfaces
1290         #
1291         route_ff_01 = VppIpMRoute(
1292             self,
1293             "::",
1294             "ffef::1", 128,
1295             MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
1296             [VppMRoutePath(self.pg1.sw_if_index,
1297                            MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT),
1298              VppMRoutePath(self.pg0.sw_if_index,
1299                            MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)],
1300             is_ip6=1)
1301         route_ff_01.add_vpp_config()
1302
1303         pu = (Ether(src=self.pg1.remote_mac,
1304                     dst=self.pg1.local_mac) /
1305               IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
1306               UDP(sport=1234, dport=1234) /
1307               Raw('\xa5' * 100))
1308         pm = (Ether(src=self.pg1.remote_mac,
1309                     dst=self.pg1.local_mac) /
1310               IPv6(src="2001::1", dst="ffef::1") /
1311               UDP(sport=1234, dport=1234) /
1312               Raw('\xa5' * 100))
1313
1314         #
1315         # PG1 does not forward IP traffic
1316         #
1317         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1318         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1319
1320         #
1321         # IP enable PG1
1322         #
1323         self.pg1.config_ip6()
1324
1325         #
1326         # Now we get packets through
1327         #
1328         self.pg1.add_stream(pu)
1329         self.pg_enable_capture(self.pg_interfaces)
1330         self.pg_start()
1331         rx = self.pg0.get_capture(1)
1332
1333         self.pg1.add_stream(pm)
1334         self.pg_enable_capture(self.pg_interfaces)
1335         self.pg_start()
1336         rx = self.pg0.get_capture(1)
1337
1338         #
1339         # Disable PG1
1340         #
1341         self.pg1.unconfig_ip6()
1342
1343         #
1344         # PG1 does not forward IP traffic
1345         #
1346         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1347         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1348
1349
1350 class TestIP6LoadBalance(VppTestCase):
1351     """ IPv6 Load-Balancing """
1352
1353     def setUp(self):
1354         super(TestIP6LoadBalance, self).setUp()
1355
1356         self.create_pg_interfaces(range(5))
1357
1358         mpls_tbl = VppMplsTable(self, 0)
1359         mpls_tbl.add_vpp_config()
1360
1361         for i in self.pg_interfaces:
1362             i.admin_up()
1363             i.config_ip6()
1364             i.resolve_ndp()
1365             i.enable_mpls()
1366
1367     def tearDown(self):
1368         for i in self.pg_interfaces:
1369             i.unconfig_ip6()
1370             i.admin_down()
1371             i.disable_mpls()
1372         super(TestIP6LoadBalance, self).tearDown()
1373
1374     def send_and_expect_load_balancing(self, input, pkts, outputs):
1375         self.vapi.cli("clear trace")
1376         input.add_stream(pkts)
1377         self.pg_enable_capture(self.pg_interfaces)
1378         self.pg_start()
1379         for oo in outputs:
1380             rx = oo._get_capture(1)
1381             self.assertNotEqual(0, len(rx))
1382
1383     def send_and_expect_one_itf(self, input, pkts, itf):
1384         self.vapi.cli("clear trace")
1385         input.add_stream(pkts)
1386         self.pg_enable_capture(self.pg_interfaces)
1387         self.pg_start()
1388         rx = itf.get_capture(len(pkts))
1389
1390     def test_ip6_load_balance(self):
1391         """ IPv6 Load-Balancing """
1392
1393         #
1394         # An array of packets that differ only in the destination port
1395         #  - IP only
1396         #  - MPLS EOS
1397         #  - MPLS non-EOS
1398         #  - MPLS non-EOS with an entropy label
1399         #
1400         port_ip_pkts = []
1401         port_mpls_pkts = []
1402         port_mpls_neos_pkts = []
1403         port_ent_pkts = []
1404
1405         #
1406         # An array of packets that differ only in the source address
1407         #
1408         src_ip_pkts = []
1409         src_mpls_pkts = []
1410
1411         for ii in range(65):
1412             port_ip_hdr = (IPv6(dst="3000::1", src="3000:1::1") /
1413                            UDP(sport=1234, dport=1234 + ii) /
1414                            Raw('\xa5' * 100))
1415             port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1416                                        dst=self.pg0.local_mac) /
1417                                  port_ip_hdr))
1418             port_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1419                                          dst=self.pg0.local_mac) /
1420                                    MPLS(label=66, ttl=2) /
1421                                    port_ip_hdr))
1422             port_mpls_neos_pkts.append((Ether(src=self.pg0.remote_mac,
1423                                               dst=self.pg0.local_mac) /
1424                                         MPLS(label=67, ttl=2) /
1425                                         MPLS(label=77, ttl=2) /
1426                                         port_ip_hdr))
1427             port_ent_pkts.append((Ether(src=self.pg0.remote_mac,
1428                                         dst=self.pg0.local_mac) /
1429                                   MPLS(label=67, ttl=2) /
1430                                   MPLS(label=14, ttl=2) /
1431                                   MPLS(label=999, ttl=2) /
1432                                   port_ip_hdr))
1433             src_ip_hdr = (IPv6(dst="3000::1", src="3000:1::%d" % ii) /
1434                           UDP(sport=1234, dport=1234) /
1435                           Raw('\xa5' * 100))
1436             src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1437                                       dst=self.pg0.local_mac) /
1438                                 src_ip_hdr))
1439             src_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1440                                         dst=self.pg0.local_mac) /
1441                                   MPLS(label=66, ttl=2) /
1442                                   src_ip_hdr))
1443
1444         #
1445         # A route for the IP pacekts
1446         #
1447         route_3000_1 = VppIpRoute(self, "3000::1", 128,
1448                                   [VppRoutePath(self.pg1.remote_ip6,
1449                                                 self.pg1.sw_if_index,
1450                                                 proto=DpoProto.DPO_PROTO_IP6),
1451                                    VppRoutePath(self.pg2.remote_ip6,
1452                                                 self.pg2.sw_if_index,
1453                                                 proto=DpoProto.DPO_PROTO_IP6)],
1454                                   is_ip6=1)
1455         route_3000_1.add_vpp_config()
1456
1457         #
1458         # a local-label for the EOS packets
1459         #
1460         binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
1461         binding.add_vpp_config()
1462
1463         #
1464         # An MPLS route for the non-EOS packets
1465         #
1466         route_67 = VppMplsRoute(self, 67, 0,
1467                                 [VppRoutePath(self.pg1.remote_ip6,
1468                                               self.pg1.sw_if_index,
1469                                               labels=[67],
1470                                               proto=DpoProto.DPO_PROTO_IP6),
1471                                  VppRoutePath(self.pg2.remote_ip6,
1472                                               self.pg2.sw_if_index,
1473                                               labels=[67],
1474                                               proto=DpoProto.DPO_PROTO_IP6)])
1475         route_67.add_vpp_config()
1476
1477         #
1478         # inject the packet on pg0 - expect load-balancing across the 2 paths
1479         #  - since the default hash config is to use IP src,dst and port
1480         #    src,dst
1481         # We are not going to ensure equal amounts of packets across each link,
1482         # since the hash algorithm is statistical and therefore this can never
1483         # be guaranteed. But wuth 64 different packets we do expect some
1484         # balancing. So instead just ensure there is traffic on each link.
1485         #
1486         self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
1487                                             [self.pg1, self.pg2])
1488         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1489                                             [self.pg1, self.pg2])
1490         self.send_and_expect_load_balancing(self.pg0, port_mpls_pkts,
1491                                             [self.pg1, self.pg2])
1492         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1493                                             [self.pg1, self.pg2])
1494         self.send_and_expect_load_balancing(self.pg0, port_mpls_neos_pkts,
1495                                             [self.pg1, self.pg2])
1496
1497         #
1498         # The packets with Entropy label in should not load-balance,
1499         # since the Entorpy value is fixed.
1500         #
1501         self.send_and_expect_one_itf(self.pg0, port_ent_pkts, self.pg1)
1502
1503         #
1504         # change the flow hash config so it's only IP src,dst
1505         #  - now only the stream with differing source address will
1506         #    load-balance
1507         #
1508         self.vapi.set_ip_flow_hash(0, is_ip6=1, src=1, dst=1, sport=0, dport=0)
1509
1510         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1511                                             [self.pg1, self.pg2])
1512         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1513                                             [self.pg1, self.pg2])
1514         self.send_and_expect_one_itf(self.pg0, port_ip_pkts, self.pg2)
1515
1516         #
1517         # change the flow hash config back to defaults
1518         #
1519         self.vapi.set_ip_flow_hash(0, is_ip6=1, src=1, dst=1, sport=1, dport=1)
1520
1521         #
1522         # Recursive prefixes
1523         #  - testing that 2 stages of load-balancing occurs and there is no
1524         #    polarisation (i.e. only 2 of 4 paths are used)
1525         #
1526         port_pkts = []
1527         src_pkts = []
1528
1529         for ii in range(257):
1530             port_pkts.append((Ether(src=self.pg0.remote_mac,
1531                                     dst=self.pg0.local_mac) /
1532                               IPv6(dst="4000::1", src="4000:1::1") /
1533                               UDP(sport=1234, dport=1234 + ii) /
1534                               Raw('\xa5' * 100)))
1535             src_pkts.append((Ether(src=self.pg0.remote_mac,
1536                                    dst=self.pg0.local_mac) /
1537                              IPv6(dst="4000::1", src="4000:1::%d" % ii) /
1538                              UDP(sport=1234, dport=1234) /
1539                              Raw('\xa5' * 100)))
1540
1541         route_3000_2 = VppIpRoute(self, "3000::2", 128,
1542                                   [VppRoutePath(self.pg3.remote_ip6,
1543                                                 self.pg3.sw_if_index,
1544                                                 proto=DpoProto.DPO_PROTO_IP6),
1545                                    VppRoutePath(self.pg4.remote_ip6,
1546                                                 self.pg4.sw_if_index,
1547                                                 proto=DpoProto.DPO_PROTO_IP6)],
1548                                   is_ip6=1)
1549         route_3000_2.add_vpp_config()
1550
1551         route_4000_1 = VppIpRoute(self, "4000::1", 128,
1552                                   [VppRoutePath("3000::1",
1553                                                 0xffffffff,
1554                                                 proto=DpoProto.DPO_PROTO_IP6),
1555                                    VppRoutePath("3000::2",
1556                                                 0xffffffff,
1557                                                 proto=DpoProto.DPO_PROTO_IP6)],
1558                                   is_ip6=1)
1559         route_4000_1.add_vpp_config()
1560
1561         #
1562         # inject the packet on pg0 - expect load-balancing across all 4 paths
1563         #
1564         self.vapi.cli("clear trace")
1565         self.send_and_expect_load_balancing(self.pg0, port_pkts,
1566                                             [self.pg1, self.pg2,
1567                                              self.pg3, self.pg4])
1568         self.send_and_expect_load_balancing(self.pg0, src_pkts,
1569                                             [self.pg1, self.pg2,
1570                                              self.pg3, self.pg4])
1571
1572         #
1573         # Recursive prefixes
1574         #  - testing that 2 stages of load-balancing no choices
1575         #
1576         port_pkts = []
1577
1578         for ii in range(257):
1579             port_pkts.append((Ether(src=self.pg0.remote_mac,
1580                                     dst=self.pg0.local_mac) /
1581                               IPv6(dst="6000::1", src="6000:1::1") /
1582                               UDP(sport=1234, dport=1234 + ii) /
1583                               Raw('\xa5' * 100)))
1584
1585         route_5000_2 = VppIpRoute(self, "5000::2", 128,
1586                                   [VppRoutePath(self.pg3.remote_ip6,
1587                                                 self.pg3.sw_if_index,
1588                                                 proto=DpoProto.DPO_PROTO_IP6)],
1589                                   is_ip6=1)
1590         route_5000_2.add_vpp_config()
1591
1592         route_6000_1 = VppIpRoute(self, "6000::1", 128,
1593                                   [VppRoutePath("5000::2",
1594                                                 0xffffffff,
1595                                                 proto=DpoProto.DPO_PROTO_IP6)],
1596                                   is_ip6=1)
1597         route_6000_1.add_vpp_config()
1598
1599         #
1600         # inject the packet on pg0 - expect load-balancing across all 4 paths
1601         #
1602         self.vapi.cli("clear trace")
1603         self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg3)
1604
1605
1606 class TestIP6Punt(VppTestCase):
1607     """ IPv6 Punt Police/Redirect """
1608
1609     def setUp(self):
1610         super(TestIP6Punt, self).setUp()
1611
1612         self.create_pg_interfaces(range(2))
1613
1614         for i in self.pg_interfaces:
1615             i.admin_up()
1616             i.config_ip6()
1617             i.resolve_ndp()
1618
1619     def tearDown(self):
1620         super(TestIP6Punt, self).tearDown()
1621         for i in self.pg_interfaces:
1622             i.unconfig_ip6()
1623             i.admin_down()
1624
1625     def test_ip_punt(self):
1626         """ IP6 punt police and redirect """
1627
1628         p = (Ether(src=self.pg0.remote_mac,
1629                    dst=self.pg0.local_mac) /
1630              IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
1631              TCP(sport=1234, dport=1234) /
1632              Raw('\xa5' * 100))
1633
1634         pkts = p * 1025
1635
1636         #
1637         # Configure a punt redirect via pg1.
1638         #
1639         nh_addr = inet_pton(AF_INET6,
1640                             self.pg1.remote_ip6)
1641         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
1642                                    self.pg1.sw_if_index,
1643                                    nh_addr,
1644                                    is_ip6=1)
1645
1646         self.send_and_expect(self.pg0, pkts, self.pg1)
1647
1648         #
1649         # add a policer
1650         #
1651         policer = self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
1652                                             rate_type=1)
1653         self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
1654
1655         self.vapi.cli("clear trace")
1656         self.pg0.add_stream(pkts)
1657         self.pg_enable_capture(self.pg_interfaces)
1658         self.pg_start()
1659
1660         #
1661         # the number of packet recieved should be greater than 0,
1662         # but not equal to the number sent, since some were policed
1663         #
1664         rx = self.pg1._get_capture(1)
1665         self.assertTrue(len(rx) > 0)
1666         self.assertTrue(len(rx) < len(pkts))
1667
1668         #
1669         # remove the poilcer. back to full rx
1670         #
1671         self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
1672         self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
1673                                   rate_type=1, is_add=0)
1674         self.send_and_expect(self.pg0, pkts, self.pg1)
1675
1676         #
1677         # remove the redirect. expect full drop.
1678         #
1679         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
1680                                    self.pg1.sw_if_index,
1681                                    nh_addr,
1682                                    is_add=0,
1683                                    is_ip6=1)
1684         self.send_and_assert_no_replies(self.pg0, pkts,
1685                                         "IP no punt config")
1686
1687         #
1688         # Add a redirect that is not input port selective
1689         #
1690         self.vapi.ip_punt_redirect(0xffffffff,
1691                                    self.pg1.sw_if_index,
1692                                    nh_addr,
1693                                    is_ip6=1)
1694         self.send_and_expect(self.pg0, pkts, self.pg1)
1695
1696         self.vapi.ip_punt_redirect(0xffffffff,
1697                                    self.pg1.sw_if_index,
1698                                    nh_addr,
1699                                    is_add=0,
1700                                    is_ip6=1)
1701
1702
1703 class TestIP6Input(VppTestCase):
1704     """ IPv6 Input Exceptions """
1705
1706     def setUp(self):
1707         super(TestIP6Input, self).setUp()
1708
1709         self.create_pg_interfaces(range(2))
1710
1711         for i in self.pg_interfaces:
1712             i.admin_up()
1713             i.config_ip6()
1714             i.resolve_ndp()
1715
1716     def tearDown(self):
1717         super(TestIP6Input, self).tearDown()
1718         for i in self.pg_interfaces:
1719             i.unconfig_ip6()
1720             i.admin_down()
1721
1722     def test_ip_input(self):
1723         """ IP6 Input Exceptions """
1724
1725         #
1726         # bad version - this is dropped
1727         #
1728         p_version = (Ether(src=self.pg0.remote_mac,
1729                            dst=self.pg0.local_mac) /
1730                      IPv6(src=self.pg0.remote_ip6,
1731                           dst=self.pg1.remote_ip6,
1732                           version=3) /
1733                      UDP(sport=1234, dport=1234) /
1734                      Raw('\xa5' * 100))
1735
1736         self.send_and_assert_no_replies(self.pg0, p_version * 65,
1737                                         "funky version")
1738
1739         #
1740         # hop limit - IMCP replies
1741         #
1742         p_version = (Ether(src=self.pg0.remote_mac,
1743                            dst=self.pg0.local_mac) /
1744                      IPv6(src=self.pg0.remote_ip6,
1745                           dst=self.pg1.remote_ip6,
1746                           hlim=1) /
1747                      UDP(sport=1234, dport=1234) /
1748                      Raw('\xa5' * 100))
1749
1750         rx = self.send_and_expect(self.pg0, p_version * 65, self.pg0)
1751         rx = rx[0]
1752         icmp = rx[ICMPv6TimeExceeded]
1753         self.assertEqual(icmp.type, 3)
1754         # 0: "hop limit exceeded in transit",
1755         self.assertEqual(icmp.code, 0)
1756
1757
1758 if __name__ == '__main__':
1759     unittest.main(testRunner=VppTestRunner)