VPP-1507: Added binary api to dump configured ip_punt_redirect
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5
6 from framework import VppTestCase, VppTestRunner
7 from util import ppp, ip6_normalize
8 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
9 from vpp_pg_interface import is_ipv6_misc
10 from vpp_ip import DpoProto
11 from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
12     VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
13     VppMplsRoute, VppMplsTable, VppIpTable, VppIpAddress
14 from vpp_neighbor import find_nbr, VppNeighbor
15
16 from scapy.packet import Raw
17 from scapy.layers.l2 import Ether, Dot1Q
18 from scapy.layers.inet6 import IPv6, UDP, TCP, ICMPv6ND_NS, ICMPv6ND_RS, \
19     ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
20     ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
21     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
22     ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply
23 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
24     in6_mactoifaceid, in6_ismaddr
25 from scapy.utils import inet_pton, inet_ntop
26 from scapy.contrib.mpls import MPLS
27
28
29 AF_INET6 = socket.AF_INET6
30
31
32 def mk_ll_addr(mac):
33     euid = in6_mactoifaceid(mac)
34     addr = "fe80::" + euid
35     return addr
36
37
38 class TestIPv6ND(VppTestCase):
39     def validate_ra(self, intf, rx, dst_ip=None):
40         if not dst_ip:
41             dst_ip = intf.remote_ip6
42
43         # unicasted packets must come to the unicast mac
44         self.assertEqual(rx[Ether].dst, intf.remote_mac)
45
46         # and from the router's MAC
47         self.assertEqual(rx[Ether].src, intf.local_mac)
48
49         # the rx'd RA should be addressed to the sender's source
50         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
51         self.assertEqual(in6_ptop(rx[IPv6].dst),
52                          in6_ptop(dst_ip))
53
54         # and come from the router's link local
55         self.assertTrue(in6_islladdr(rx[IPv6].src))
56         self.assertEqual(in6_ptop(rx[IPv6].src),
57                          in6_ptop(mk_ll_addr(intf.local_mac)))
58
59     def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
60         if not dst_ip:
61             dst_ip = intf.remote_ip6
62         if not tgt_ip:
63             dst_ip = intf.local_ip6
64
65         # unicasted packets must come to the unicast mac
66         self.assertEqual(rx[Ether].dst, intf.remote_mac)
67
68         # and from the router's MAC
69         self.assertEqual(rx[Ether].src, intf.local_mac)
70
71         # the rx'd NA should be addressed to the sender's source
72         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
73         self.assertEqual(in6_ptop(rx[IPv6].dst),
74                          in6_ptop(dst_ip))
75
76         # and come from the target address
77         self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
78
79         # Dest link-layer options should have the router's MAC
80         dll = rx[ICMPv6NDOptDstLLAddr]
81         self.assertEqual(dll.lladdr, intf.local_mac)
82
83     def validate_ns(self, intf, rx, tgt_ip):
84         nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
85         dst_ip = inet_ntop(AF_INET6, nsma)
86
87         # NS is broadcast
88         self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
89
90         # and from the router's MAC
91         self.assertEqual(rx[Ether].src, intf.local_mac)
92
93         # the rx'd NS should be addressed to an mcast address
94         # derived from the target address
95         self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
96
97         # expect the tgt IP in the NS header
98         ns = rx[ICMPv6ND_NS]
99         self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
100
101         # packet is from the router's local address
102         self.assertEqual(in6_ptop(rx[IPv6].src), intf.local_ip6)
103
104         # Src link-layer options should have the router's MAC
105         sll = rx[ICMPv6NDOptSrcLLAddr]
106         self.assertEqual(sll.lladdr, intf.local_mac)
107
108     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
109                            filter_out_fn=is_ipv6_misc):
110         intf.add_stream(pkts)
111         self.pg_enable_capture(self.pg_interfaces)
112         self.pg_start()
113         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
114
115         self.assertEqual(len(rx), 1)
116         rx = rx[0]
117         self.validate_ra(intf, rx, dst_ip)
118
119     def send_and_expect_na(self, intf, pkts, remark, dst_ip=None,
120                            tgt_ip=None,
121                            filter_out_fn=is_ipv6_misc):
122         intf.add_stream(pkts)
123         self.pg_enable_capture(self.pg_interfaces)
124         self.pg_start()
125         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
126
127         self.assertEqual(len(rx), 1)
128         rx = rx[0]
129         self.validate_na(intf, rx, dst_ip, tgt_ip)
130
131     def send_and_expect_ns(self, tx_intf, rx_intf, pkts, tgt_ip,
132                            filter_out_fn=is_ipv6_misc):
133         tx_intf.add_stream(pkts)
134         self.pg_enable_capture(self.pg_interfaces)
135         self.pg_start()
136         rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
137
138         self.assertEqual(len(rx), 1)
139         rx = rx[0]
140         self.validate_ns(rx_intf, rx, tgt_ip)
141
142     def verify_ip(self, rx, smac, dmac, sip, dip):
143         ether = rx[Ether]
144         self.assertEqual(ether.dst, dmac)
145         self.assertEqual(ether.src, smac)
146
147         ip = rx[IPv6]
148         self.assertEqual(ip.src, sip)
149         self.assertEqual(ip.dst, dip)
150
151
152 class TestIPv6(TestIPv6ND):
153     """ IPv6 Test Case """
154
155     @classmethod
156     def setUpClass(cls):
157         super(TestIPv6, cls).setUpClass()
158
159     def setUp(self):
160         """
161         Perform test setup before test case.
162
163         **Config:**
164             - create 3 pg interfaces
165                 - untagged pg0 interface
166                 - Dot1Q subinterface on pg1
167                 - Dot1AD subinterface on pg2
168             - setup interfaces:
169                 - put it into UP state
170                 - set IPv6 addresses
171                 - resolve neighbor address using NDP
172             - configure 200 fib entries
173
174         :ivar list interfaces: pg interfaces and subinterfaces.
175         :ivar dict flows: IPv4 packet flows in test.
176
177         *TODO:* Create AD sub interface
178         """
179         super(TestIPv6, self).setUp()
180
181         # create 3 pg interfaces
182         self.create_pg_interfaces(range(3))
183
184         # create 2 subinterfaces for p1 and pg2
185         self.sub_interfaces = [
186             VppDot1QSubint(self, self.pg1, 100),
187             VppDot1QSubint(self, self.pg2, 200)
188             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
189         ]
190
191         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
192         self.flows = dict()
193         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
194         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
195         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
196
197         # packet sizes
198         self.pg_if_packet_sizes = [64, 1500, 9020]
199
200         self.interfaces = list(self.pg_interfaces)
201         self.interfaces.extend(self.sub_interfaces)
202
203         # setup all interfaces
204         for i in self.interfaces:
205             i.admin_up()
206             i.config_ip6()
207             i.resolve_ndp()
208
209         # config 2M FIB entries
210         self.config_fib_entries(200)
211
212     def tearDown(self):
213         """Run standard test teardown and log ``show ip6 neighbors``."""
214         for i in self.interfaces:
215             i.unconfig_ip6()
216             i.ip6_disable()
217             i.admin_down()
218         for i in self.sub_interfaces:
219             i.remove_vpp_config()
220
221         super(TestIPv6, self).tearDown()
222         if not self.vpp_dead:
223             self.logger.info(self.vapi.cli("show ip6 neighbors"))
224             # info(self.vapi.cli("show ip6 fib"))  # many entries
225
226     def config_fib_entries(self, count):
227         """For each interface add to the FIB table *count* routes to
228         "fd02::1/128" destination with interface's local address as next-hop
229         address.
230
231         :param int count: Number of FIB entries.
232
233         - *TODO:* check if the next-hop address shouldn't be remote address
234           instead of local address.
235         """
236         n_int = len(self.interfaces)
237         percent = 0
238         counter = 0.0
239         dest_addr = inet_pton(AF_INET6, "fd02::1")
240         dest_addr_len = 128
241         for i in self.interfaces:
242             next_hop_address = i.local_ip6n
243             for j in range(count / n_int):
244                 self.vapi.ip_add_del_route(
245                     dest_addr, dest_addr_len, next_hop_address, is_ipv6=1)
246                 counter += 1
247                 if counter / count * 100 > percent:
248                     self.logger.info("Configure %d FIB entries .. %d%% done" %
249                                      (count, percent))
250                     percent += 1
251
252     def modify_packet(self, src_if, packet_size, pkt):
253         """Add load, set destination IP and extend packet to required packet
254         size for defined interface.
255
256         :param VppInterface src_if: Interface to create packet for.
257         :param int packet_size: Required packet size.
258         :param Scapy pkt: Packet to be modified.
259         """
260         dst_if_idx = packet_size / 10 % 2
261         dst_if = self.flows[src_if][dst_if_idx]
262         info = self.create_packet_info(src_if, dst_if)
263         payload = self.info_to_payload(info)
264         p = pkt/Raw(payload)
265         p[IPv6].dst = dst_if.remote_ip6
266         info.data = p.copy()
267         if isinstance(src_if, VppSubInterface):
268             p = src_if.add_dot1_layer(p)
269         self.extend_packet(p, packet_size)
270
271         return p
272
273     def create_stream(self, src_if):
274         """Create input packet stream for defined interface.
275
276         :param VppInterface src_if: Interface to create packet stream for.
277         """
278         hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
279         pkt_tmpl = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
280                     IPv6(src=src_if.remote_ip6) /
281                     UDP(sport=1234, dport=1234))
282
283         pkts = [self.modify_packet(src_if, i, pkt_tmpl)
284                 for i in xrange(self.pg_if_packet_sizes[0],
285                                 self.pg_if_packet_sizes[1], 10)]
286         pkts_b = [self.modify_packet(src_if, i, pkt_tmpl)
287                   for i in xrange(self.pg_if_packet_sizes[1] + hdr_ext,
288                                   self.pg_if_packet_sizes[2] + hdr_ext, 50)]
289         pkts.extend(pkts_b)
290
291         return pkts
292
293     def verify_capture(self, dst_if, capture):
294         """Verify captured input packet stream for defined interface.
295
296         :param VppInterface dst_if: Interface to verify captured packet stream
297                                     for.
298         :param list capture: Captured packet stream.
299         """
300         self.logger.info("Verifying capture on interface %s" % dst_if.name)
301         last_info = dict()
302         for i in self.interfaces:
303             last_info[i.sw_if_index] = None
304         is_sub_if = False
305         dst_sw_if_index = dst_if.sw_if_index
306         if hasattr(dst_if, 'parent'):
307             is_sub_if = True
308         for packet in capture:
309             if is_sub_if:
310                 # Check VLAN tags and Ethernet header
311                 packet = dst_if.remove_dot1_layer(packet)
312             self.assertTrue(Dot1Q not in packet)
313             try:
314                 ip = packet[IPv6]
315                 udp = packet[UDP]
316                 payload_info = self.payload_to_info(str(packet[Raw]))
317                 packet_index = payload_info.index
318                 self.assertEqual(payload_info.dst, dst_sw_if_index)
319                 self.logger.debug(
320                     "Got packet on port %s: src=%u (id=%u)" %
321                     (dst_if.name, payload_info.src, packet_index))
322                 next_info = self.get_next_packet_info_for_interface2(
323                     payload_info.src, dst_sw_if_index,
324                     last_info[payload_info.src])
325                 last_info[payload_info.src] = next_info
326                 self.assertTrue(next_info is not None)
327                 self.assertEqual(packet_index, next_info.index)
328                 saved_packet = next_info.data
329                 # Check standard fields
330                 self.assertEqual(ip.src, saved_packet[IPv6].src)
331                 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
332                 self.assertEqual(udp.sport, saved_packet[UDP].sport)
333                 self.assertEqual(udp.dport, saved_packet[UDP].dport)
334             except:
335                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
336                 raise
337         for i in self.interfaces:
338             remaining_packet = self.get_next_packet_info_for_interface2(
339                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
340             self.assertTrue(remaining_packet is None,
341                             "Interface %s: Packet expected from interface %s "
342                             "didn't arrive" % (dst_if.name, i.name))
343
344     def test_fib(self):
345         """ IPv6 FIB test
346
347         Test scenario:
348             - Create IPv6 stream for pg0 interface
349             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
350             - Send and verify received packets on each interface.
351         """
352
353         pkts = self.create_stream(self.pg0)
354         self.pg0.add_stream(pkts)
355
356         for i in self.sub_interfaces:
357             pkts = self.create_stream(i)
358             i.parent.add_stream(pkts)
359
360         self.pg_enable_capture(self.pg_interfaces)
361         self.pg_start()
362
363         pkts = self.pg0.get_capture()
364         self.verify_capture(self.pg0, pkts)
365
366         for i in self.sub_interfaces:
367             pkts = i.parent.get_capture()
368             self.verify_capture(i, pkts)
369
370     def test_ns(self):
371         """ IPv6 Neighbour Solicitation Exceptions
372
373         Test scenario:
374            - Send an NS Sourced from an address not covered by the link sub-net
375            - Send an NS to an mcast address the router has not joined
376            - Send NS for a target address the router does not onn.
377         """
378
379         #
380         # An NS from a non link source address
381         #
382         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
383         d = inet_ntop(AF_INET6, nsma)
384
385         p = (Ether(dst=in6_getnsmac(nsma)) /
386              IPv6(dst=d, src="2002::2") /
387              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
388              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
389         pkts = [p]
390
391         self.send_and_assert_no_replies(
392             self.pg0, pkts,
393             "No response to NS source by address not on sub-net")
394
395         #
396         # An NS for sent to a solicited mcast group the router is
397         # not a member of FAILS
398         #
399         if 0:
400             nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
401             d = inet_ntop(AF_INET6, nsma)
402
403             p = (Ether(dst=in6_getnsmac(nsma)) /
404                  IPv6(dst=d, src=self.pg0.remote_ip6) /
405                  ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
406                  ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
407             pkts = [p]
408
409             self.send_and_assert_no_replies(
410                 self.pg0, pkts,
411                 "No response to NS sent to unjoined mcast address")
412
413         #
414         # An NS whose target address is one the router does not own
415         #
416         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
417         d = inet_ntop(AF_INET6, nsma)
418
419         p = (Ether(dst=in6_getnsmac(nsma)) /
420              IPv6(dst=d, src=self.pg0.remote_ip6) /
421              ICMPv6ND_NS(tgt="fd::ffff") /
422              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
423         pkts = [p]
424
425         self.send_and_assert_no_replies(self.pg0, pkts,
426                                         "No response to NS for unknown target")
427
428         #
429         # A neighbor entry that has no associated FIB-entry
430         #
431         self.pg0.generate_remote_hosts(4)
432         nd_entry = VppNeighbor(self,
433                                self.pg0.sw_if_index,
434                                self.pg0.remote_hosts[2].mac,
435                                self.pg0.remote_hosts[2].ip6,
436                                af=AF_INET6,
437                                is_no_fib_entry=1)
438         nd_entry.add_vpp_config()
439
440         #
441         # check we have the neighbor, but no route
442         #
443         self.assertTrue(find_nbr(self,
444                                  self.pg0.sw_if_index,
445                                  self.pg0._remote_hosts[2].ip6,
446                                  inet=AF_INET6))
447         self.assertFalse(find_route(self,
448                                     self.pg0._remote_hosts[2].ip6,
449                                     128,
450                                     inet=AF_INET6))
451
452         #
453         # send an NS from a link local address to the interface's global
454         # address
455         #
456         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
457              IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6_ll) /
458              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
459              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
460
461         self.send_and_expect_na(self.pg0, p,
462                                 "NS from link-local",
463                                 dst_ip=self.pg0._remote_hosts[2].ip6_ll,
464                                 tgt_ip=self.pg0.local_ip6)
465
466         #
467         # we should have learned an ND entry for the peer's link-local
468         # but not inserted a route to it in the FIB
469         #
470         self.assertTrue(find_nbr(self,
471                                  self.pg0.sw_if_index,
472                                  self.pg0._remote_hosts[2].ip6_ll,
473                                  inet=AF_INET6))
474         self.assertFalse(find_route(self,
475                                     self.pg0._remote_hosts[2].ip6_ll,
476                                     128,
477                                     inet=AF_INET6))
478
479         #
480         # An NS to the router's own Link-local
481         #
482         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
483              IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll) /
484              ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll) /
485              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
486
487         self.send_and_expect_na(self.pg0, p,
488                                 "NS to/from link-local",
489                                 dst_ip=self.pg0._remote_hosts[3].ip6_ll,
490                                 tgt_ip=self.pg0.local_ip6_ll)
491
492         #
493         # we should have learned an ND entry for the peer's link-local
494         # but not inserted a route to it in the FIB
495         #
496         self.assertTrue(find_nbr(self,
497                                  self.pg0.sw_if_index,
498                                  self.pg0._remote_hosts[3].ip6_ll,
499                                  inet=AF_INET6))
500         self.assertFalse(find_route(self,
501                                     self.pg0._remote_hosts[3].ip6_ll,
502                                     128,
503                                     inet=AF_INET6))
504
505     def test_ns_duplicates(self):
506         """ ND Duplicates"""
507
508         #
509         # Generate some hosts on the LAN
510         #
511         self.pg1.generate_remote_hosts(3)
512
513         #
514         # Add host 1 on pg1 and pg2
515         #
516         ns_pg1 = VppNeighbor(self,
517                              self.pg1.sw_if_index,
518                              self.pg1.remote_hosts[1].mac,
519                              self.pg1.remote_hosts[1].ip6,
520                              af=AF_INET6)
521         ns_pg1.add_vpp_config()
522         ns_pg2 = VppNeighbor(self,
523                              self.pg2.sw_if_index,
524                              self.pg2.remote_mac,
525                              self.pg1.remote_hosts[1].ip6,
526                              af=AF_INET6)
527         ns_pg2.add_vpp_config()
528
529         #
530         # IP packet destined for pg1 remote host arrives on pg1 again.
531         #
532         p = (Ether(dst=self.pg0.local_mac,
533                    src=self.pg0.remote_mac) /
534              IPv6(src=self.pg0.remote_ip6,
535                   dst=self.pg1.remote_hosts[1].ip6) /
536              UDP(sport=1234, dport=1234) /
537              Raw())
538
539         self.pg0.add_stream(p)
540         self.pg_enable_capture(self.pg_interfaces)
541         self.pg_start()
542
543         rx1 = self.pg1.get_capture(1)
544
545         self.verify_ip(rx1[0],
546                        self.pg1.local_mac,
547                        self.pg1.remote_hosts[1].mac,
548                        self.pg0.remote_ip6,
549                        self.pg1.remote_hosts[1].ip6)
550
551         #
552         # remove the duplicate on pg1
553         # packet stream shoud generate NSs out of pg1
554         #
555         ns_pg1.remove_vpp_config()
556
557         self.send_and_expect_ns(self.pg0, self.pg1,
558                                 p, self.pg1.remote_hosts[1].ip6)
559
560         #
561         # Add it back
562         #
563         ns_pg1.add_vpp_config()
564
565         self.pg0.add_stream(p)
566         self.pg_enable_capture(self.pg_interfaces)
567         self.pg_start()
568
569         rx1 = self.pg1.get_capture(1)
570
571         self.verify_ip(rx1[0],
572                        self.pg1.local_mac,
573                        self.pg1.remote_hosts[1].mac,
574                        self.pg0.remote_ip6,
575                        self.pg1.remote_hosts[1].ip6)
576
577     def validate_ra(self, intf, rx, dst_ip=None, mtu=9000, pi_opt=None):
578         if not dst_ip:
579             dst_ip = intf.remote_ip6
580
581         # unicasted packets must come to the unicast mac
582         self.assertEqual(rx[Ether].dst, intf.remote_mac)
583
584         # and from the router's MAC
585         self.assertEqual(rx[Ether].src, intf.local_mac)
586
587         # the rx'd RA should be addressed to the sender's source
588         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
589         self.assertEqual(in6_ptop(rx[IPv6].dst),
590                          in6_ptop(dst_ip))
591
592         # and come from the router's link local
593         self.assertTrue(in6_islladdr(rx[IPv6].src))
594         self.assertEqual(in6_ptop(rx[IPv6].src),
595                          in6_ptop(mk_ll_addr(intf.local_mac)))
596
597         # it should contain the links MTU
598         ra = rx[ICMPv6ND_RA]
599         self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
600
601         # it should contain the source's link layer address option
602         sll = ra[ICMPv6NDOptSrcLLAddr]
603         self.assertEqual(sll.lladdr, intf.local_mac)
604
605         if not pi_opt:
606             # the RA should not contain prefix information
607             self.assertFalse(ra.haslayer(ICMPv6NDOptPrefixInfo))
608         else:
609             raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
610
611             # the options are nested in the scapy packet in way that i cannot
612             # decipher how to decode. this 1st layer of option always returns
613             # nested classes, so a direct obj1=obj2 comparison always fails.
614             # however, the getlayer(.., 2) does give one instnace.
615             # so we cheat here and construct a new opt instnace for comparison
616             rd = ICMPv6NDOptPrefixInfo(prefixlen=raos.prefixlen,
617                                        prefix=raos.prefix,
618                                        L=raos.L,
619                                        A=raos.A)
620             if type(pi_opt) is list:
621                 for ii in range(len(pi_opt)):
622                     self.assertEqual(pi_opt[ii], rd)
623                     rd = rx.getlayer(ICMPv6NDOptPrefixInfo, ii+2)
624             else:
625                 self.assertEqual(pi_opt, raos)
626
627     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
628                            filter_out_fn=is_ipv6_misc,
629                            opt=None):
630         intf.add_stream(pkts)
631         self.pg_enable_capture(self.pg_interfaces)
632         self.pg_start()
633         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
634
635         self.assertEqual(len(rx), 1)
636         rx = rx[0]
637         self.validate_ra(intf, rx, dst_ip, pi_opt=opt)
638
639     def test_rs(self):
640         """ IPv6 Router Solicitation Exceptions
641
642         Test scenario:
643         """
644
645         #
646         # Before we begin change the IPv6 RA responses to use the unicast
647         # address - that way we will not confuse them with the periodic
648         # RAs which go to the mcast address
649         # Sit and wait for the first periodic RA.
650         #
651         # TODO
652         #
653         self.pg0.ip6_ra_config(send_unicast=1)
654
655         #
656         # An RS from a link source address
657         #  - expect an RA in return
658         #
659         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
660              IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
661              ICMPv6ND_RS())
662         pkts = [p]
663         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
664
665         #
666         # For the next RS sent the RA should be rate limited
667         #
668         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
669
670         #
671         # When we reconfiure the IPv6 RA config, we reset the RA rate limiting,
672         # so we need to do this before each test below so as not to drop
673         # packets for rate limiting reasons. Test this works here.
674         #
675         self.pg0.ip6_ra_config(send_unicast=1)
676         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
677
678         #
679         # An RS sent from a non-link local source
680         #
681         self.pg0.ip6_ra_config(send_unicast=1)
682         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
683              IPv6(dst=self.pg0.local_ip6, src="2002::ffff") /
684              ICMPv6ND_RS())
685         pkts = [p]
686         self.send_and_assert_no_replies(self.pg0, pkts,
687                                         "RS from non-link source")
688
689         #
690         # Source an RS from a link local address
691         #
692         self.pg0.ip6_ra_config(send_unicast=1)
693         ll = mk_ll_addr(self.pg0.remote_mac)
694         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
695              IPv6(dst=self.pg0.local_ip6, src=ll) /
696              ICMPv6ND_RS())
697         pkts = [p]
698         self.send_and_expect_ra(self.pg0, pkts,
699                                 "RS sourced from link-local",
700                                 dst_ip=ll)
701
702         #
703         # Send the RS multicast
704         #
705         self.pg0.ip6_ra_config(send_unicast=1)
706         dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
707         ll = mk_ll_addr(self.pg0.remote_mac)
708         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
709              IPv6(dst="ff02::2", src=ll) /
710              ICMPv6ND_RS())
711         pkts = [p]
712         self.send_and_expect_ra(self.pg0, pkts,
713                                 "RS sourced from link-local",
714                                 dst_ip=ll)
715
716         #
717         # Source from the unspecified address ::. This happens when the RS
718         # is sent before the host has a configured address/sub-net,
719         # i.e. auto-config. Since the sender has no IP address, the reply
720         # comes back mcast - so the capture needs to not filter this.
721         # If we happen to pick up the periodic RA at this point then so be it,
722         # it's not an error.
723         #
724         self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
725         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
726              IPv6(dst="ff02::2", src="::") /
727              ICMPv6ND_RS())
728         pkts = [p]
729         self.send_and_expect_ra(self.pg0, pkts,
730                                 "RS sourced from unspecified",
731                                 dst_ip="ff02::1",
732                                 filter_out_fn=None)
733
734         #
735         # Configure The RA to announce the links prefix
736         #
737         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
738                                self.pg0.local_ip6_prefix_len)
739
740         #
741         # RAs should now contain the prefix information option
742         #
743         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
744                                     prefix=self.pg0.local_ip6,
745                                     L=1,
746                                     A=1)
747
748         self.pg0.ip6_ra_config(send_unicast=1)
749         ll = mk_ll_addr(self.pg0.remote_mac)
750         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
751              IPv6(dst=self.pg0.local_ip6, src=ll) /
752              ICMPv6ND_RS())
753         self.send_and_expect_ra(self.pg0, p,
754                                 "RA with prefix-info",
755                                 dst_ip=ll,
756                                 opt=opt)
757
758         #
759         # Change the prefix info to not off-link
760         #  L-flag is clear
761         #
762         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
763                                self.pg0.local_ip6_prefix_len,
764                                off_link=1)
765
766         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
767                                     prefix=self.pg0.local_ip6,
768                                     L=0,
769                                     A=1)
770
771         self.pg0.ip6_ra_config(send_unicast=1)
772         self.send_and_expect_ra(self.pg0, p,
773                                 "RA with Prefix info with L-flag=0",
774                                 dst_ip=ll,
775                                 opt=opt)
776
777         #
778         # Change the prefix info to not off-link, no-autoconfig
779         #  L and A flag are clear in the advert
780         #
781         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
782                                self.pg0.local_ip6_prefix_len,
783                                off_link=1,
784                                no_autoconfig=1)
785
786         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
787                                     prefix=self.pg0.local_ip6,
788                                     L=0,
789                                     A=0)
790
791         self.pg0.ip6_ra_config(send_unicast=1)
792         self.send_and_expect_ra(self.pg0, p,
793                                 "RA with Prefix info with A & L-flag=0",
794                                 dst_ip=ll,
795                                 opt=opt)
796
797         #
798         # Change the flag settings back to the defaults
799         #  L and A flag are set in the advert
800         #
801         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
802                                self.pg0.local_ip6_prefix_len)
803
804         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
805                                     prefix=self.pg0.local_ip6,
806                                     L=1,
807                                     A=1)
808
809         self.pg0.ip6_ra_config(send_unicast=1)
810         self.send_and_expect_ra(self.pg0, p,
811                                 "RA with Prefix info",
812                                 dst_ip=ll,
813                                 opt=opt)
814
815         #
816         # Change the prefix info to not off-link, no-autoconfig
817         #  L and A flag are clear in the advert
818         #
819         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
820                                self.pg0.local_ip6_prefix_len,
821                                off_link=1,
822                                no_autoconfig=1)
823
824         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
825                                     prefix=self.pg0.local_ip6,
826                                     L=0,
827                                     A=0)
828
829         self.pg0.ip6_ra_config(send_unicast=1)
830         self.send_and_expect_ra(self.pg0, p,
831                                 "RA with Prefix info with A & L-flag=0",
832                                 dst_ip=ll,
833                                 opt=opt)
834
835         #
836         # Use the reset to defults option to revert to defaults
837         #  L and A flag are clear in the advert
838         #
839         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
840                                self.pg0.local_ip6_prefix_len,
841                                use_default=1)
842
843         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
844                                     prefix=self.pg0.local_ip6,
845                                     L=1,
846                                     A=1)
847
848         self.pg0.ip6_ra_config(send_unicast=1)
849         self.send_and_expect_ra(self.pg0, p,
850                                 "RA with Prefix reverted to defaults",
851                                 dst_ip=ll,
852                                 opt=opt)
853
854         #
855         # Advertise Another prefix. With no L-flag/A-flag
856         #
857         self.pg0.ip6_ra_prefix(self.pg1.local_ip6n,
858                                self.pg1.local_ip6_prefix_len,
859                                off_link=1,
860                                no_autoconfig=1)
861
862         opt = [ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
863                                      prefix=self.pg0.local_ip6,
864                                      L=1,
865                                      A=1),
866                ICMPv6NDOptPrefixInfo(prefixlen=self.pg1.local_ip6_prefix_len,
867                                      prefix=self.pg1.local_ip6,
868                                      L=0,
869                                      A=0)]
870
871         self.pg0.ip6_ra_config(send_unicast=1)
872         ll = mk_ll_addr(self.pg0.remote_mac)
873         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
874              IPv6(dst=self.pg0.local_ip6, src=ll) /
875              ICMPv6ND_RS())
876         self.send_and_expect_ra(self.pg0, p,
877                                 "RA with multiple Prefix infos",
878                                 dst_ip=ll,
879                                 opt=opt)
880
881         #
882         # Remove the first refix-info - expect the second is still in the
883         # advert
884         #
885         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
886                                self.pg0.local_ip6_prefix_len,
887                                is_no=1)
888
889         opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg1.local_ip6_prefix_len,
890                                     prefix=self.pg1.local_ip6,
891                                     L=0,
892                                     A=0)
893
894         self.pg0.ip6_ra_config(send_unicast=1)
895         self.send_and_expect_ra(self.pg0, p,
896                                 "RA with Prefix reverted to defaults",
897                                 dst_ip=ll,
898                                 opt=opt)
899
900         #
901         # Remove the second prefix-info - expect no prefix-info i nthe adverts
902         #
903         self.pg0.ip6_ra_prefix(self.pg1.local_ip6n,
904                                self.pg1.local_ip6_prefix_len,
905                                is_no=1)
906
907         self.pg0.ip6_ra_config(send_unicast=1)
908         self.send_and_expect_ra(self.pg0, p,
909                                 "RA with Prefix reverted to defaults",
910                                 dst_ip=ll)
911
912         #
913         # Reset the periodic advertisements back to default values
914         #
915         self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
916
917
918 class TestICMPv6Echo(VppTestCase):
919     """ ICMPv6 Echo Test Case """
920
921     def setUp(self):
922         super(TestICMPv6Echo, self).setUp()
923
924         # create 1 pg interface
925         self.create_pg_interfaces(range(1))
926
927         for i in self.pg_interfaces:
928             i.admin_up()
929             i.config_ip6()
930             i.resolve_ndp()
931
932     def tearDown(self):
933         super(TestICMPv6Echo, self).tearDown()
934         for i in self.pg_interfaces:
935             i.unconfig_ip6()
936             i.ip6_disable()
937             i.admin_down()
938
939     def test_icmpv6_echo(self):
940         """ VPP replies to ICMPv6 Echo Request
941
942         Test scenario:
943
944             - Receive ICMPv6 Echo Request message on pg0 interface.
945             - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
946         """
947
948         icmpv6_id = 0xb
949         icmpv6_seq = 5
950         icmpv6_data = '\x0a' * 18
951         p_echo_request = (Ether(src=self.pg0.remote_mac,
952                                 dst=self.pg0.local_mac) /
953                           IPv6(src=self.pg0.remote_ip6,
954                                dst=self.pg0.local_ip6) /
955                           ICMPv6EchoRequest(id=icmpv6_id, seq=icmpv6_seq,
956                                             data=icmpv6_data))
957
958         self.pg0.add_stream(p_echo_request)
959         self.pg_enable_capture(self.pg_interfaces)
960         self.pg_start()
961
962         rx = self.pg0.get_capture(1)
963         rx = rx[0]
964         ether = rx[Ether]
965         ipv6 = rx[IPv6]
966         icmpv6 = rx[ICMPv6EchoReply]
967
968         self.assertEqual(ether.src, self.pg0.local_mac)
969         self.assertEqual(ether.dst, self.pg0.remote_mac)
970
971         self.assertEqual(ipv6.src, self.pg0.local_ip6)
972         self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
973
974         self.assertEqual(icmp6types[icmpv6.type], "Echo Reply")
975         self.assertEqual(icmpv6.id, icmpv6_id)
976         self.assertEqual(icmpv6.seq, icmpv6_seq)
977         self.assertEqual(icmpv6.data, icmpv6_data)
978
979
980 class TestIPv6RD(TestIPv6ND):
981     """ IPv6 Router Discovery Test Case """
982
983     @classmethod
984     def setUpClass(cls):
985         super(TestIPv6RD, cls).setUpClass()
986
987     def setUp(self):
988         super(TestIPv6RD, self).setUp()
989
990         # create 2 pg interfaces
991         self.create_pg_interfaces(range(2))
992
993         self.interfaces = list(self.pg_interfaces)
994
995         # setup all interfaces
996         for i in self.interfaces:
997             i.admin_up()
998             i.config_ip6()
999
1000     def tearDown(self):
1001         for i in self.interfaces:
1002             i.unconfig_ip6()
1003             i.admin_down()
1004         super(TestIPv6RD, self).tearDown()
1005
1006     def test_rd_send_router_solicitation(self):
1007         """ Verify router solicitation packets """
1008
1009         count = 2
1010         self.pg_enable_capture(self.pg_interfaces)
1011         self.pg_start()
1012         self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index,
1013                                                  mrc=count)
1014         rx_list = self.pg1.get_capture(count, timeout=3)
1015         self.assertEqual(len(rx_list), count)
1016         for packet in rx_list:
1017             self.assertTrue(packet.haslayer(IPv6))
1018             self.assertTrue(packet[IPv6].haslayer(ICMPv6ND_RS))
1019             dst = ip6_normalize(packet[IPv6].dst)
1020             dst2 = ip6_normalize("ff02::2")
1021             self.assert_equal(dst, dst2)
1022             src = ip6_normalize(packet[IPv6].src)
1023             src2 = ip6_normalize(self.pg1.local_ip6_ll)
1024             self.assert_equal(src, src2)
1025             self.assertTrue(packet[ICMPv6ND_RS].haslayer(ICMPv6NDOptSrcLLAddr))
1026             self.assert_equal(packet[ICMPv6NDOptSrcLLAddr].lladdr,
1027                               self.pg1.local_mac)
1028
1029     def verify_prefix_info(self, reported_prefix, prefix_option):
1030         prefix = socket.inet_pton(socket.AF_INET6,
1031                                   prefix_option.getfieldval("prefix"))
1032         self.assert_equal(reported_prefix.dst_address, prefix)
1033         self.assert_equal(reported_prefix.dst_address_length,
1034                           prefix_option.getfieldval("prefixlen"))
1035         L = prefix_option.getfieldval("L")
1036         A = prefix_option.getfieldval("A")
1037         option_flags = (L << 7) | (A << 6)
1038         self.assert_equal(reported_prefix.flags, option_flags)
1039         self.assert_equal(reported_prefix.valid_time,
1040                           prefix_option.getfieldval("validlifetime"))
1041         self.assert_equal(reported_prefix.preferred_time,
1042                           prefix_option.getfieldval("preferredlifetime"))
1043
1044     def test_rd_receive_router_advertisement(self):
1045         """ Verify events triggered by received RA packets """
1046
1047         self.vapi.want_ip6_ra_events()
1048
1049         prefix_info_1 = ICMPv6NDOptPrefixInfo(
1050             prefix="1::2",
1051             prefixlen=50,
1052             validlifetime=200,
1053             preferredlifetime=500,
1054             L=1,
1055             A=1,
1056         )
1057
1058         prefix_info_2 = ICMPv6NDOptPrefixInfo(
1059             prefix="7::4",
1060             prefixlen=20,
1061             validlifetime=70,
1062             preferredlifetime=1000,
1063             L=1,
1064             A=0,
1065         )
1066
1067         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1068              IPv6(dst=self.pg1.local_ip6_ll,
1069                   src=mk_ll_addr(self.pg1.remote_mac)) /
1070              ICMPv6ND_RA() /
1071              prefix_info_1 /
1072              prefix_info_2)
1073         self.pg1.add_stream([p])
1074         self.pg_start()
1075
1076         ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1077
1078         self.assert_equal(ev.current_hop_limit, 0)
1079         self.assert_equal(ev.flags, 8)
1080         self.assert_equal(ev.router_lifetime_in_sec, 1800)
1081         self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1082         self.assert_equal(
1083             ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0)
1084
1085         self.assert_equal(ev.n_prefixes, 2)
1086
1087         self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1088         self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1089
1090
1091 class TestIPv6RDControlPlane(TestIPv6ND):
1092     """ IPv6 Router Discovery Control Plane Test Case """
1093
1094     @classmethod
1095     def setUpClass(cls):
1096         super(TestIPv6RDControlPlane, cls).setUpClass()
1097
1098     def setUp(self):
1099         super(TestIPv6RDControlPlane, self).setUp()
1100
1101         # create 1 pg interface
1102         self.create_pg_interfaces(range(1))
1103
1104         self.interfaces = list(self.pg_interfaces)
1105
1106         # setup all interfaces
1107         for i in self.interfaces:
1108             i.admin_up()
1109             i.config_ip6()
1110
1111     def tearDown(self):
1112         super(TestIPv6RDControlPlane, self).tearDown()
1113
1114     @staticmethod
1115     def create_ra_packet(pg, routerlifetime=None):
1116         src_ip = pg.remote_ip6_ll
1117         dst_ip = pg.local_ip6
1118         if routerlifetime is not None:
1119             ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1120         else:
1121             ra = ICMPv6ND_RA()
1122         p = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
1123              IPv6(dst=dst_ip, src=src_ip) / ra)
1124         return p
1125
1126     @staticmethod
1127     def get_default_routes(fib):
1128         list = []
1129         for entry in fib:
1130             if entry.address_length == 0:
1131                 for path in entry.path:
1132                     if path.sw_if_index != 0xFFFFFFFF:
1133                         defaut_route = {}
1134                         defaut_route['sw_if_index'] = path.sw_if_index
1135                         defaut_route['next_hop'] = path.next_hop
1136                         list.append(defaut_route)
1137         return list
1138
1139     @staticmethod
1140     def get_interface_addresses(fib, pg):
1141         list = []
1142         for entry in fib:
1143             if entry.address_length == 128:
1144                 path = entry.path[0]
1145                 if path.sw_if_index == pg.sw_if_index:
1146                     list.append(entry.address)
1147         return list
1148
1149     def test_all(self):
1150         """ Test handling of SLAAC addresses and default routes """
1151
1152         fib = self.vapi.ip6_fib_dump()
1153         default_routes = self.get_default_routes(fib)
1154         initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1155         self.assertEqual(default_routes, [])
1156         router_address = self.pg0.remote_ip6n_ll
1157
1158         self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1159
1160         self.sleep(0.1)
1161
1162         # send RA
1163         packet = (self.create_ra_packet(self.pg0) / ICMPv6NDOptPrefixInfo(
1164             prefix="1::",
1165             prefixlen=64,
1166             validlifetime=2,
1167             preferredlifetime=2,
1168             L=1,
1169             A=1,
1170         ) / ICMPv6NDOptPrefixInfo(
1171             prefix="7::",
1172             prefixlen=20,
1173             validlifetime=1500,
1174             preferredlifetime=1000,
1175             L=1,
1176             A=0,
1177         ))
1178         self.pg0.add_stream([packet])
1179         self.pg_start()
1180
1181         self.sleep(0.1)
1182
1183         fib = self.vapi.ip6_fib_dump()
1184
1185         # check FIB for new address
1186         addresses = set(self.get_interface_addresses(fib, self.pg0))
1187         new_addresses = addresses.difference(initial_addresses)
1188         self.assertEqual(len(new_addresses), 1)
1189         prefix = list(new_addresses)[0][:8] + '\0\0\0\0\0\0\0\0'
1190         self.assertEqual(inet_ntop(AF_INET6, prefix), '1::')
1191
1192         # check FIB for new default route
1193         default_routes = self.get_default_routes(fib)
1194         self.assertEqual(len(default_routes), 1)
1195         dr = default_routes[0]
1196         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1197         self.assertEqual(dr['next_hop'], router_address)
1198
1199         # send RA to delete default route
1200         packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1201         self.pg0.add_stream([packet])
1202         self.pg_start()
1203
1204         self.sleep(0.1)
1205
1206         # check that default route is deleted
1207         fib = self.vapi.ip6_fib_dump()
1208         default_routes = self.get_default_routes(fib)
1209         self.assertEqual(len(default_routes), 0)
1210
1211         self.sleep(0.1)
1212
1213         # send RA
1214         packet = self.create_ra_packet(self.pg0)
1215         self.pg0.add_stream([packet])
1216         self.pg_start()
1217
1218         self.sleep(0.1)
1219
1220         # check FIB for new default route
1221         fib = self.vapi.ip6_fib_dump()
1222         default_routes = self.get_default_routes(fib)
1223         self.assertEqual(len(default_routes), 1)
1224         dr = default_routes[0]
1225         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1226         self.assertEqual(dr['next_hop'], router_address)
1227
1228         # send RA, updating router lifetime to 1s
1229         packet = self.create_ra_packet(self.pg0, 1)
1230         self.pg0.add_stream([packet])
1231         self.pg_start()
1232
1233         self.sleep(0.1)
1234
1235         # check that default route still exists
1236         fib = self.vapi.ip6_fib_dump()
1237         default_routes = self.get_default_routes(fib)
1238         self.assertEqual(len(default_routes), 1)
1239         dr = default_routes[0]
1240         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1241         self.assertEqual(dr['next_hop'], router_address)
1242
1243         self.sleep(1)
1244
1245         # check that default route is deleted
1246         fib = self.vapi.ip6_fib_dump()
1247         default_routes = self.get_default_routes(fib)
1248         self.assertEqual(len(default_routes), 0)
1249
1250         # check FIB still contains the SLAAC address
1251         addresses = set(self.get_interface_addresses(fib, self.pg0))
1252         new_addresses = addresses.difference(initial_addresses)
1253         self.assertEqual(len(new_addresses), 1)
1254         prefix = list(new_addresses)[0][:8] + '\0\0\0\0\0\0\0\0'
1255         self.assertEqual(inet_ntop(AF_INET6, prefix), '1::')
1256
1257         self.sleep(1)
1258
1259         # check that SLAAC address is deleted
1260         fib = self.vapi.ip6_fib_dump()
1261         addresses = set(self.get_interface_addresses(fib, self.pg0))
1262         new_addresses = addresses.difference(initial_addresses)
1263         self.assertEqual(len(new_addresses), 0)
1264
1265
1266 class IPv6NDProxyTest(TestIPv6ND):
1267     """ IPv6 ND ProxyTest Case """
1268
1269     def setUp(self):
1270         super(IPv6NDProxyTest, self).setUp()
1271
1272         # create 3 pg interfaces
1273         self.create_pg_interfaces(range(3))
1274
1275         # pg0 is the master interface, with the configured subnet
1276         self.pg0.admin_up()
1277         self.pg0.config_ip6()
1278         self.pg0.resolve_ndp()
1279
1280         self.pg1.ip6_enable()
1281         self.pg2.ip6_enable()
1282
1283     def tearDown(self):
1284         super(IPv6NDProxyTest, self).tearDown()
1285
1286     def test_nd_proxy(self):
1287         """ IPv6 Proxy ND """
1288
1289         #
1290         # Generate some hosts in the subnet that we are proxying
1291         #
1292         self.pg0.generate_remote_hosts(8)
1293
1294         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1295         d = inet_ntop(AF_INET6, nsma)
1296
1297         #
1298         # Send an NS for one of those remote hosts on one of the proxy links
1299         # expect no response since it's from an address that is not
1300         # on the link that has the prefix configured
1301         #
1302         ns_pg1 = (Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac) /
1303                   IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6) /
1304                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1305                   ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac))
1306
1307         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1308
1309         #
1310         # Add proxy support for the host
1311         #
1312         self.vapi.ip6_nd_proxy(
1313             inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1314             self.pg1.sw_if_index)
1315
1316         #
1317         # try that NS again. this time we expect an NA back
1318         #
1319         self.send_and_expect_na(self.pg1, ns_pg1,
1320                                 "NS to proxy entry",
1321                                 dst_ip=self.pg0._remote_hosts[2].ip6,
1322                                 tgt_ip=self.pg0.local_ip6)
1323
1324         #
1325         # ... and that we have an entry in the ND cache
1326         #
1327         self.assertTrue(find_nbr(self,
1328                                  self.pg1.sw_if_index,
1329                                  self.pg0._remote_hosts[2].ip6,
1330                                  inet=AF_INET6))
1331
1332         #
1333         # ... and we can route traffic to it
1334         #
1335         t = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1336              IPv6(dst=self.pg0._remote_hosts[2].ip6,
1337                   src=self.pg0.remote_ip6) /
1338              UDP(sport=10000, dport=20000) /
1339              Raw('\xa5' * 100))
1340
1341         self.pg0.add_stream(t)
1342         self.pg_enable_capture(self.pg_interfaces)
1343         self.pg_start()
1344         rx = self.pg1.get_capture(1)
1345         rx = rx[0]
1346
1347         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1348         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1349
1350         self.assertEqual(rx[IPv6].src, t[IPv6].src)
1351         self.assertEqual(rx[IPv6].dst, t[IPv6].dst)
1352
1353         #
1354         # Test we proxy for the host on the main interface
1355         #
1356         ns_pg0 = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
1357                   IPv6(dst=d, src=self.pg0.remote_ip6) /
1358                   ICMPv6ND_NS(tgt=self.pg0._remote_hosts[2].ip6) /
1359                   ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
1360
1361         self.send_and_expect_na(self.pg0, ns_pg0,
1362                                 "NS to proxy entry on main",
1363                                 tgt_ip=self.pg0._remote_hosts[2].ip6,
1364                                 dst_ip=self.pg0.remote_ip6)
1365
1366         #
1367         # Setup and resolve proxy for another host on another interface
1368         #
1369         ns_pg2 = (Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac) /
1370                   IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6) /
1371                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1372                   ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac))
1373
1374         self.vapi.ip6_nd_proxy(
1375             inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1376             self.pg2.sw_if_index)
1377
1378         self.send_and_expect_na(self.pg2, ns_pg2,
1379                                 "NS to proxy entry other interface",
1380                                 dst_ip=self.pg0._remote_hosts[3].ip6,
1381                                 tgt_ip=self.pg0.local_ip6)
1382
1383         self.assertTrue(find_nbr(self,
1384                                  self.pg2.sw_if_index,
1385                                  self.pg0._remote_hosts[3].ip6,
1386                                  inet=AF_INET6))
1387
1388         #
1389         # hosts can communicate. pg2->pg1
1390         #
1391         t2 = (Ether(dst=self.pg2.local_mac,
1392                     src=self.pg0.remote_hosts[3].mac) /
1393               IPv6(dst=self.pg0._remote_hosts[2].ip6,
1394                    src=self.pg0._remote_hosts[3].ip6) /
1395               UDP(sport=10000, dport=20000) /
1396               Raw('\xa5' * 100))
1397
1398         self.pg2.add_stream(t2)
1399         self.pg_enable_capture(self.pg_interfaces)
1400         self.pg_start()
1401         rx = self.pg1.get_capture(1)
1402         rx = rx[0]
1403
1404         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1405         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1406
1407         self.assertEqual(rx[IPv6].src, t2[IPv6].src)
1408         self.assertEqual(rx[IPv6].dst, t2[IPv6].dst)
1409
1410         #
1411         # remove the proxy configs
1412         #
1413         self.vapi.ip6_nd_proxy(
1414             inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1415             self.pg1.sw_if_index,
1416             is_del=1)
1417         self.vapi.ip6_nd_proxy(
1418             inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1419             self.pg2.sw_if_index,
1420             is_del=1)
1421
1422         self.assertFalse(find_nbr(self,
1423                                   self.pg2.sw_if_index,
1424                                   self.pg0._remote_hosts[3].ip6,
1425                                   inet=AF_INET6))
1426         self.assertFalse(find_nbr(self,
1427                                   self.pg1.sw_if_index,
1428                                   self.pg0._remote_hosts[2].ip6,
1429                                   inet=AF_INET6))
1430
1431         #
1432         # no longer proxy-ing...
1433         #
1434         self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1435         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1436         self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1437
1438         #
1439         # no longer forwarding. traffic generates NS out of the glean/main
1440         # interface
1441         #
1442         self.pg2.add_stream(t2)
1443         self.pg_enable_capture(self.pg_interfaces)
1444         self.pg_start()
1445
1446         rx = self.pg0.get_capture(1)
1447
1448         self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1449
1450
1451 class TestIPNull(VppTestCase):
1452     """ IPv6 routes via NULL """
1453
1454     def setUp(self):
1455         super(TestIPNull, self).setUp()
1456
1457         # create 2 pg interfaces
1458         self.create_pg_interfaces(range(1))
1459
1460         for i in self.pg_interfaces:
1461             i.admin_up()
1462             i.config_ip6()
1463             i.resolve_ndp()
1464
1465     def tearDown(self):
1466         super(TestIPNull, self).tearDown()
1467         for i in self.pg_interfaces:
1468             i.unconfig_ip6()
1469             i.admin_down()
1470
1471     def test_ip_null(self):
1472         """ IP NULL route """
1473
1474         p = (Ether(src=self.pg0.remote_mac,
1475                    dst=self.pg0.local_mac) /
1476              IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
1477              UDP(sport=1234, dport=1234) /
1478              Raw('\xa5' * 100))
1479
1480         #
1481         # A route via IP NULL that will reply with ICMP unreachables
1482         #
1483         ip_unreach = VppIpRoute(self, "2001::", 64, [], is_unreach=1, is_ip6=1)
1484         ip_unreach.add_vpp_config()
1485
1486         self.pg0.add_stream(p)
1487         self.pg_enable_capture(self.pg_interfaces)
1488         self.pg_start()
1489
1490         rx = self.pg0.get_capture(1)
1491         rx = rx[0]
1492         icmp = rx[ICMPv6DestUnreach]
1493
1494         # 0 = "No route to destination"
1495         self.assertEqual(icmp.code, 0)
1496
1497         # ICMP is rate limited. pause a bit
1498         self.sleep(1)
1499
1500         #
1501         # A route via IP NULL that will reply with ICMP prohibited
1502         #
1503         ip_prohibit = VppIpRoute(self, "2001::1", 128, [],
1504                                  is_prohibit=1, is_ip6=1)
1505         ip_prohibit.add_vpp_config()
1506
1507         self.pg0.add_stream(p)
1508         self.pg_enable_capture(self.pg_interfaces)
1509         self.pg_start()
1510
1511         rx = self.pg0.get_capture(1)
1512         rx = rx[0]
1513         icmp = rx[ICMPv6DestUnreach]
1514
1515         # 1 = "Communication with destination administratively prohibited"
1516         self.assertEqual(icmp.code, 1)
1517
1518
1519 class TestIPDisabled(VppTestCase):
1520     """ IPv6 disabled """
1521
1522     def setUp(self):
1523         super(TestIPDisabled, self).setUp()
1524
1525         # create 2 pg interfaces
1526         self.create_pg_interfaces(range(2))
1527
1528         # PG0 is IP enalbed
1529         self.pg0.admin_up()
1530         self.pg0.config_ip6()
1531         self.pg0.resolve_ndp()
1532
1533         # PG 1 is not IP enabled
1534         self.pg1.admin_up()
1535
1536     def tearDown(self):
1537         super(TestIPDisabled, self).tearDown()
1538         for i in self.pg_interfaces:
1539             i.unconfig_ip4()
1540             i.admin_down()
1541
1542     def test_ip_disabled(self):
1543         """ IP Disabled """
1544
1545         #
1546         # An (S,G).
1547         # one accepting interface, pg0, 2 forwarding interfaces
1548         #
1549         route_ff_01 = VppIpMRoute(
1550             self,
1551             "::",
1552             "ffef::1", 128,
1553             MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
1554             [VppMRoutePath(self.pg1.sw_if_index,
1555                            MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT),
1556              VppMRoutePath(self.pg0.sw_if_index,
1557                            MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)],
1558             is_ip6=1)
1559         route_ff_01.add_vpp_config()
1560
1561         pu = (Ether(src=self.pg1.remote_mac,
1562                     dst=self.pg1.local_mac) /
1563               IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
1564               UDP(sport=1234, dport=1234) /
1565               Raw('\xa5' * 100))
1566         pm = (Ether(src=self.pg1.remote_mac,
1567                     dst=self.pg1.local_mac) /
1568               IPv6(src="2001::1", dst="ffef::1") /
1569               UDP(sport=1234, dport=1234) /
1570               Raw('\xa5' * 100))
1571
1572         #
1573         # PG1 does not forward IP traffic
1574         #
1575         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1576         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1577
1578         #
1579         # IP enable PG1
1580         #
1581         self.pg1.config_ip6()
1582
1583         #
1584         # Now we get packets through
1585         #
1586         self.pg1.add_stream(pu)
1587         self.pg_enable_capture(self.pg_interfaces)
1588         self.pg_start()
1589         rx = self.pg0.get_capture(1)
1590
1591         self.pg1.add_stream(pm)
1592         self.pg_enable_capture(self.pg_interfaces)
1593         self.pg_start()
1594         rx = self.pg0.get_capture(1)
1595
1596         #
1597         # Disable PG1
1598         #
1599         self.pg1.unconfig_ip6()
1600
1601         #
1602         # PG1 does not forward IP traffic
1603         #
1604         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1605         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1606
1607
1608 class TestIP6LoadBalance(VppTestCase):
1609     """ IPv6 Load-Balancing """
1610
1611     def setUp(self):
1612         super(TestIP6LoadBalance, self).setUp()
1613
1614         self.create_pg_interfaces(range(5))
1615
1616         mpls_tbl = VppMplsTable(self, 0)
1617         mpls_tbl.add_vpp_config()
1618
1619         for i in self.pg_interfaces:
1620             i.admin_up()
1621             i.config_ip6()
1622             i.resolve_ndp()
1623             i.enable_mpls()
1624
1625     def tearDown(self):
1626         for i in self.pg_interfaces:
1627             i.unconfig_ip6()
1628             i.admin_down()
1629             i.disable_mpls()
1630         super(TestIP6LoadBalance, self).tearDown()
1631
1632     def send_and_expect_load_balancing(self, input, pkts, outputs):
1633         self.vapi.cli("clear trace")
1634         input.add_stream(pkts)
1635         self.pg_enable_capture(self.pg_interfaces)
1636         self.pg_start()
1637         for oo in outputs:
1638             rx = oo._get_capture(1)
1639             self.assertNotEqual(0, len(rx))
1640
1641     def send_and_expect_one_itf(self, input, pkts, itf):
1642         self.vapi.cli("clear trace")
1643         input.add_stream(pkts)
1644         self.pg_enable_capture(self.pg_interfaces)
1645         self.pg_start()
1646         rx = itf.get_capture(len(pkts))
1647
1648     def test_ip6_load_balance(self):
1649         """ IPv6 Load-Balancing """
1650
1651         #
1652         # An array of packets that differ only in the destination port
1653         #  - IP only
1654         #  - MPLS EOS
1655         #  - MPLS non-EOS
1656         #  - MPLS non-EOS with an entropy label
1657         #
1658         port_ip_pkts = []
1659         port_mpls_pkts = []
1660         port_mpls_neos_pkts = []
1661         port_ent_pkts = []
1662
1663         #
1664         # An array of packets that differ only in the source address
1665         #
1666         src_ip_pkts = []
1667         src_mpls_pkts = []
1668
1669         for ii in range(65):
1670             port_ip_hdr = (IPv6(dst="3000::1", src="3000:1::1") /
1671                            UDP(sport=1234, dport=1234 + ii) /
1672                            Raw('\xa5' * 100))
1673             port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1674                                        dst=self.pg0.local_mac) /
1675                                  port_ip_hdr))
1676             port_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1677                                          dst=self.pg0.local_mac) /
1678                                    MPLS(label=66, ttl=2) /
1679                                    port_ip_hdr))
1680             port_mpls_neos_pkts.append((Ether(src=self.pg0.remote_mac,
1681                                               dst=self.pg0.local_mac) /
1682                                         MPLS(label=67, ttl=2) /
1683                                         MPLS(label=77, ttl=2) /
1684                                         port_ip_hdr))
1685             port_ent_pkts.append((Ether(src=self.pg0.remote_mac,
1686                                         dst=self.pg0.local_mac) /
1687                                   MPLS(label=67, ttl=2) /
1688                                   MPLS(label=14, ttl=2) /
1689                                   MPLS(label=999, ttl=2) /
1690                                   port_ip_hdr))
1691             src_ip_hdr = (IPv6(dst="3000::1", src="3000:1::%d" % ii) /
1692                           UDP(sport=1234, dport=1234) /
1693                           Raw('\xa5' * 100))
1694             src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1695                                       dst=self.pg0.local_mac) /
1696                                 src_ip_hdr))
1697             src_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1698                                         dst=self.pg0.local_mac) /
1699                                   MPLS(label=66, ttl=2) /
1700                                   src_ip_hdr))
1701
1702         #
1703         # A route for the IP pacekts
1704         #
1705         route_3000_1 = VppIpRoute(self, "3000::1", 128,
1706                                   [VppRoutePath(self.pg1.remote_ip6,
1707                                                 self.pg1.sw_if_index,
1708                                                 proto=DpoProto.DPO_PROTO_IP6),
1709                                    VppRoutePath(self.pg2.remote_ip6,
1710                                                 self.pg2.sw_if_index,
1711                                                 proto=DpoProto.DPO_PROTO_IP6)],
1712                                   is_ip6=1)
1713         route_3000_1.add_vpp_config()
1714
1715         #
1716         # a local-label for the EOS packets
1717         #
1718         binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
1719         binding.add_vpp_config()
1720
1721         #
1722         # An MPLS route for the non-EOS packets
1723         #
1724         route_67 = VppMplsRoute(self, 67, 0,
1725                                 [VppRoutePath(self.pg1.remote_ip6,
1726                                               self.pg1.sw_if_index,
1727                                               labels=[67],
1728                                               proto=DpoProto.DPO_PROTO_IP6),
1729                                  VppRoutePath(self.pg2.remote_ip6,
1730                                               self.pg2.sw_if_index,
1731                                               labels=[67],
1732                                               proto=DpoProto.DPO_PROTO_IP6)])
1733         route_67.add_vpp_config()
1734
1735         #
1736         # inject the packet on pg0 - expect load-balancing across the 2 paths
1737         #  - since the default hash config is to use IP src,dst and port
1738         #    src,dst
1739         # We are not going to ensure equal amounts of packets across each link,
1740         # since the hash algorithm is statistical and therefore this can never
1741         # be guaranteed. But wuth 64 different packets we do expect some
1742         # balancing. So instead just ensure there is traffic on each link.
1743         #
1744         self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
1745                                             [self.pg1, self.pg2])
1746         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1747                                             [self.pg1, self.pg2])
1748         self.send_and_expect_load_balancing(self.pg0, port_mpls_pkts,
1749                                             [self.pg1, self.pg2])
1750         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1751                                             [self.pg1, self.pg2])
1752         self.send_and_expect_load_balancing(self.pg0, port_mpls_neos_pkts,
1753                                             [self.pg1, self.pg2])
1754
1755         #
1756         # The packets with Entropy label in should not load-balance,
1757         # since the Entorpy value is fixed.
1758         #
1759         self.send_and_expect_one_itf(self.pg0, port_ent_pkts, self.pg1)
1760
1761         #
1762         # change the flow hash config so it's only IP src,dst
1763         #  - now only the stream with differing source address will
1764         #    load-balance
1765         #
1766         self.vapi.set_ip_flow_hash(0, is_ip6=1, src=1, dst=1, sport=0, dport=0)
1767
1768         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1769                                             [self.pg1, self.pg2])
1770         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1771                                             [self.pg1, self.pg2])
1772         self.send_and_expect_one_itf(self.pg0, port_ip_pkts, self.pg2)
1773
1774         #
1775         # change the flow hash config back to defaults
1776         #
1777         self.vapi.set_ip_flow_hash(0, is_ip6=1, src=1, dst=1, sport=1, dport=1)
1778
1779         #
1780         # Recursive prefixes
1781         #  - testing that 2 stages of load-balancing occurs and there is no
1782         #    polarisation (i.e. only 2 of 4 paths are used)
1783         #
1784         port_pkts = []
1785         src_pkts = []
1786
1787         for ii in range(257):
1788             port_pkts.append((Ether(src=self.pg0.remote_mac,
1789                                     dst=self.pg0.local_mac) /
1790                               IPv6(dst="4000::1", src="4000:1::1") /
1791                               UDP(sport=1234, dport=1234 + ii) /
1792                               Raw('\xa5' * 100)))
1793             src_pkts.append((Ether(src=self.pg0.remote_mac,
1794                                    dst=self.pg0.local_mac) /
1795                              IPv6(dst="4000::1", src="4000:1::%d" % ii) /
1796                              UDP(sport=1234, dport=1234) /
1797                              Raw('\xa5' * 100)))
1798
1799         route_3000_2 = VppIpRoute(self, "3000::2", 128,
1800                                   [VppRoutePath(self.pg3.remote_ip6,
1801                                                 self.pg3.sw_if_index,
1802                                                 proto=DpoProto.DPO_PROTO_IP6),
1803                                    VppRoutePath(self.pg4.remote_ip6,
1804                                                 self.pg4.sw_if_index,
1805                                                 proto=DpoProto.DPO_PROTO_IP6)],
1806                                   is_ip6=1)
1807         route_3000_2.add_vpp_config()
1808
1809         route_4000_1 = VppIpRoute(self, "4000::1", 128,
1810                                   [VppRoutePath("3000::1",
1811                                                 0xffffffff,
1812                                                 proto=DpoProto.DPO_PROTO_IP6),
1813                                    VppRoutePath("3000::2",
1814                                                 0xffffffff,
1815                                                 proto=DpoProto.DPO_PROTO_IP6)],
1816                                   is_ip6=1)
1817         route_4000_1.add_vpp_config()
1818
1819         #
1820         # inject the packet on pg0 - expect load-balancing across all 4 paths
1821         #
1822         self.vapi.cli("clear trace")
1823         self.send_and_expect_load_balancing(self.pg0, port_pkts,
1824                                             [self.pg1, self.pg2,
1825                                              self.pg3, self.pg4])
1826         self.send_and_expect_load_balancing(self.pg0, src_pkts,
1827                                             [self.pg1, self.pg2,
1828                                              self.pg3, self.pg4])
1829
1830         #
1831         # Recursive prefixes
1832         #  - testing that 2 stages of load-balancing no choices
1833         #
1834         port_pkts = []
1835
1836         for ii in range(257):
1837             port_pkts.append((Ether(src=self.pg0.remote_mac,
1838                                     dst=self.pg0.local_mac) /
1839                               IPv6(dst="6000::1", src="6000:1::1") /
1840                               UDP(sport=1234, dport=1234 + ii) /
1841                               Raw('\xa5' * 100)))
1842
1843         route_5000_2 = VppIpRoute(self, "5000::2", 128,
1844                                   [VppRoutePath(self.pg3.remote_ip6,
1845                                                 self.pg3.sw_if_index,
1846                                                 proto=DpoProto.DPO_PROTO_IP6)],
1847                                   is_ip6=1)
1848         route_5000_2.add_vpp_config()
1849
1850         route_6000_1 = VppIpRoute(self, "6000::1", 128,
1851                                   [VppRoutePath("5000::2",
1852                                                 0xffffffff,
1853                                                 proto=DpoProto.DPO_PROTO_IP6)],
1854                                   is_ip6=1)
1855         route_6000_1.add_vpp_config()
1856
1857         #
1858         # inject the packet on pg0 - expect load-balancing across all 4 paths
1859         #
1860         self.vapi.cli("clear trace")
1861         self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg3)
1862
1863
1864 class TestIP6Punt(VppTestCase):
1865     """ IPv6 Punt Police/Redirect """
1866
1867     def setUp(self):
1868         super(TestIP6Punt, self).setUp()
1869
1870         self.create_pg_interfaces(range(4))
1871
1872         for i in self.pg_interfaces:
1873             i.admin_up()
1874             i.config_ip6()
1875             i.resolve_ndp()
1876
1877     def tearDown(self):
1878         super(TestIP6Punt, self).tearDown()
1879         for i in self.pg_interfaces:
1880             i.unconfig_ip6()
1881             i.admin_down()
1882
1883     def test_ip_punt(self):
1884         """ IP6 punt police and redirect """
1885
1886         p = (Ether(src=self.pg0.remote_mac,
1887                    dst=self.pg0.local_mac) /
1888              IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
1889              TCP(sport=1234, dport=1234) /
1890              Raw('\xa5' * 100))
1891
1892         pkts = p * 1025
1893
1894         #
1895         # Configure a punt redirect via pg1.
1896         #
1897         nh_addr = VppIpAddress(self.pg1.remote_ip6).encode()
1898         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
1899                                    self.pg1.sw_if_index,
1900                                    nh_addr)
1901
1902         self.send_and_expect(self.pg0, pkts, self.pg1)
1903
1904         #
1905         # add a policer
1906         #
1907         policer = self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
1908                                             rate_type=1)
1909         self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
1910
1911         self.vapi.cli("clear trace")
1912         self.pg0.add_stream(pkts)
1913         self.pg_enable_capture(self.pg_interfaces)
1914         self.pg_start()
1915
1916         #
1917         # the number of packet recieved should be greater than 0,
1918         # but not equal to the number sent, since some were policed
1919         #
1920         rx = self.pg1._get_capture(1)
1921         self.assertGreater(len(rx), 0)
1922         self.assertLess(len(rx), len(pkts))
1923
1924         #
1925         # remove the poilcer. back to full rx
1926         #
1927         self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
1928         self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
1929                                   rate_type=1, is_add=0)
1930         self.send_and_expect(self.pg0, pkts, self.pg1)
1931
1932         #
1933         # remove the redirect. expect full drop.
1934         #
1935         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
1936                                    self.pg1.sw_if_index,
1937                                    nh_addr,
1938                                    is_add=0)
1939         self.send_and_assert_no_replies(self.pg0, pkts,
1940                                         "IP no punt config")
1941
1942         #
1943         # Add a redirect that is not input port selective
1944         #
1945         self.vapi.ip_punt_redirect(0xffffffff,
1946                                    self.pg1.sw_if_index,
1947                                    nh_addr)
1948         self.send_and_expect(self.pg0, pkts, self.pg1)
1949
1950         self.vapi.ip_punt_redirect(0xffffffff,
1951                                    self.pg1.sw_if_index,
1952                                    nh_addr,
1953                                    is_add=0)
1954
1955     def test_ip_punt_dump(self):
1956         """ IP6 punt redirect dump"""
1957
1958         #
1959         # Configure a punt redirects
1960         #
1961         nh_addr = VppIpAddress(self.pg3.remote_ip6).encode()
1962         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
1963                                    self.pg3.sw_if_index,
1964                                    nh_addr)
1965         self.vapi.ip_punt_redirect(self.pg1.sw_if_index,
1966                                    self.pg3.sw_if_index,
1967                                    nh_addr)
1968         self.vapi.ip_punt_redirect(self.pg2.sw_if_index,
1969                                    self.pg3.sw_if_index,
1970                                    VppIpAddress('0::0').encode())
1971
1972         #
1973         # Dump pg0 punt redirects
1974         #
1975         punts = self.vapi.ip_punt_redirect_dump(self.pg0.sw_if_index,
1976                                                 is_ipv6=1)
1977         for p in punts:
1978             self.assertEqual(p.punt.rx_sw_if_index, self.pg0.sw_if_index)
1979
1980         #
1981         # Dump punt redirects for all interfaces
1982         #
1983         punts = self.vapi.ip_punt_redirect_dump(0xffffffff, is_ipv6=1)
1984         self.assertEqual(len(punts), 3)
1985         for p in punts:
1986             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
1987         self.assertNotEqual(punts[1].punt.nh.un.ip6, self.pg3.remote_ip6)
1988         self.assertEqual(punts[2].punt.nh.un.ip6.address, '\x00'*16)
1989
1990
1991 class TestIPDeag(VppTestCase):
1992     """ IPv6 Deaggregate Routes """
1993
1994     def setUp(self):
1995         super(TestIPDeag, self).setUp()
1996
1997         self.create_pg_interfaces(range(3))
1998
1999         for i in self.pg_interfaces:
2000             i.admin_up()
2001             i.config_ip6()
2002             i.resolve_ndp()
2003
2004     def tearDown(self):
2005         super(TestIPDeag, self).tearDown()
2006         for i in self.pg_interfaces:
2007             i.unconfig_ip6()
2008             i.admin_down()
2009
2010     def test_ip_deag(self):
2011         """ IP Deag Routes """
2012
2013         #
2014         # Create a table to be used for:
2015         #  1 - another destination address lookup
2016         #  2 - a source address lookup
2017         #
2018         table_dst = VppIpTable(self, 1, is_ip6=1)
2019         table_src = VppIpTable(self, 2, is_ip6=1)
2020         table_dst.add_vpp_config()
2021         table_src.add_vpp_config()
2022
2023         #
2024         # Add a route in the default table to point to a deag/
2025         # second lookup in each of these tables
2026         #
2027         route_to_dst = VppIpRoute(self, "1::1", 128,
2028                                   [VppRoutePath("::",
2029                                                 0xffffffff,
2030                                                 nh_table_id=1,
2031                                                 proto=DpoProto.DPO_PROTO_IP6)],
2032                                   is_ip6=1)
2033         route_to_src = VppIpRoute(self, "1::2", 128,
2034                                   [VppRoutePath("::",
2035                                                 0xffffffff,
2036                                                 nh_table_id=2,
2037                                                 is_source_lookup=1,
2038                                                 proto=DpoProto.DPO_PROTO_IP6)],
2039                                   is_ip6=1)
2040         route_to_dst.add_vpp_config()
2041         route_to_src.add_vpp_config()
2042
2043         #
2044         # packets to these destination are dropped, since they'll
2045         # hit the respective default routes in the second table
2046         #
2047         p_dst = (Ether(src=self.pg0.remote_mac,
2048                        dst=self.pg0.local_mac) /
2049                  IPv6(src="5::5", dst="1::1") /
2050                  TCP(sport=1234, dport=1234) /
2051                  Raw('\xa5' * 100))
2052         p_src = (Ether(src=self.pg0.remote_mac,
2053                        dst=self.pg0.local_mac) /
2054                  IPv6(src="2::2", dst="1::2") /
2055                  TCP(sport=1234, dport=1234) /
2056                  Raw('\xa5' * 100))
2057         pkts_dst = p_dst * 257
2058         pkts_src = p_src * 257
2059
2060         self.send_and_assert_no_replies(self.pg0, pkts_dst,
2061                                         "IP in dst table")
2062         self.send_and_assert_no_replies(self.pg0, pkts_src,
2063                                         "IP in src table")
2064
2065         #
2066         # add a route in the dst table to forward via pg1
2067         #
2068         route_in_dst = VppIpRoute(self, "1::1", 128,
2069                                   [VppRoutePath(self.pg1.remote_ip6,
2070                                                 self.pg1.sw_if_index,
2071                                                 proto=DpoProto.DPO_PROTO_IP6)],
2072                                   is_ip6=1,
2073                                   table_id=1)
2074         route_in_dst.add_vpp_config()
2075
2076         self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2077
2078         #
2079         # add a route in the src table to forward via pg2
2080         #
2081         route_in_src = VppIpRoute(self, "2::2", 128,
2082                                   [VppRoutePath(self.pg2.remote_ip6,
2083                                                 self.pg2.sw_if_index,
2084                                                 proto=DpoProto.DPO_PROTO_IP6)],
2085                                   is_ip6=1,
2086                                   table_id=2)
2087         route_in_src.add_vpp_config()
2088         self.send_and_expect(self.pg0, pkts_src, self.pg2)
2089
2090         #
2091         # loop in the lookup DP
2092         #
2093         route_loop = VppIpRoute(self, "3::3", 128,
2094                                 [VppRoutePath("::",
2095                                               0xffffffff,
2096                                               proto=DpoProto.DPO_PROTO_IP6)],
2097                                 is_ip6=1)
2098         route_loop.add_vpp_config()
2099
2100         p_l = (Ether(src=self.pg0.remote_mac,
2101                      dst=self.pg0.local_mac) /
2102                IPv6(src="3::4", dst="3::3") /
2103                TCP(sport=1234, dport=1234) /
2104                Raw('\xa5' * 100))
2105
2106         self.send_and_assert_no_replies(self.pg0, p_l * 257,
2107                                         "IP lookup loop")
2108
2109
2110 class TestIP6Input(VppTestCase):
2111     """ IPv6 Input Exceptions """
2112
2113     def setUp(self):
2114         super(TestIP6Input, self).setUp()
2115
2116         self.create_pg_interfaces(range(2))
2117
2118         for i in self.pg_interfaces:
2119             i.admin_up()
2120             i.config_ip6()
2121             i.resolve_ndp()
2122
2123     def tearDown(self):
2124         super(TestIP6Input, self).tearDown()
2125         for i in self.pg_interfaces:
2126             i.unconfig_ip6()
2127             i.admin_down()
2128
2129     def test_ip_input(self):
2130         """ IP6 Input Exceptions """
2131
2132         #
2133         # bad version - this is dropped
2134         #
2135         p_version = (Ether(src=self.pg0.remote_mac,
2136                            dst=self.pg0.local_mac) /
2137                      IPv6(src=self.pg0.remote_ip6,
2138                           dst=self.pg1.remote_ip6,
2139                           version=3) /
2140                      UDP(sport=1234, dport=1234) /
2141                      Raw('\xa5' * 100))
2142
2143         self.send_and_assert_no_replies(self.pg0, p_version * 65,
2144                                         "funky version")
2145
2146         #
2147         # hop limit - IMCP replies
2148         #
2149         p_version = (Ether(src=self.pg0.remote_mac,
2150                            dst=self.pg0.local_mac) /
2151                      IPv6(src=self.pg0.remote_ip6,
2152                           dst=self.pg1.remote_ip6,
2153                           hlim=1) /
2154                      UDP(sport=1234, dport=1234) /
2155                      Raw('\xa5' * 100))
2156
2157         rx = self.send_and_expect(self.pg0, p_version * 65, self.pg0)
2158         rx = rx[0]
2159         icmp = rx[ICMPv6TimeExceeded]
2160         self.assertEqual(icmp.type, 3)
2161         # 0: "hop limit exceeded in transit",
2162         self.assertEqual(icmp.code, 0)
2163
2164
2165 if __name__ == '__main__':
2166     unittest.main(testRunner=VppTestRunner)