API: Change ip4_address and ip6_address to use type alias.
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5
6 import scapy.layers.inet6 as inet6
7 from scapy.contrib.mpls import MPLS
8 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
9     ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
10     ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
11     ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply
12 from scapy.layers.l2 import Ether, Dot1Q
13 from scapy.packet import Raw
14 from scapy.utils import inet_pton, inet_ntop
15 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
16     in6_mactoifaceid
17 from six import moves
18
19 from framework import VppTestCase, VppTestRunner
20 from util import ppp, ip6_normalize
21 from vpp_ip import DpoProto
22 from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
23     VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
24     VppMplsRoute, VppMplsTable, VppIpTable, VppIpAddress
25 from vpp_neighbor import find_nbr, VppNeighbor
26 from vpp_pg_interface import is_ipv6_misc
27 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
28
29 AF_INET6 = socket.AF_INET6
30
31
32 def mk_ll_addr(mac):
33     euid = in6_mactoifaceid(mac)
34     addr = "fe80::" + euid
35     return addr
36
37
38 class TestIPv6ND(VppTestCase):
39     def validate_ra(self, intf, rx, dst_ip=None):
40         if not dst_ip:
41             dst_ip = intf.remote_ip6
42
43         # unicasted packets must come to the unicast mac
44         self.assertEqual(rx[Ether].dst, intf.remote_mac)
45
46         # and from the router's MAC
47         self.assertEqual(rx[Ether].src, intf.local_mac)
48
49         # the rx'd RA should be addressed to the sender's source
50         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
51         self.assertEqual(in6_ptop(rx[IPv6].dst),
52                          in6_ptop(dst_ip))
53
54         # and come from the router's link local
55         self.assertTrue(in6_islladdr(rx[IPv6].src))
56         self.assertEqual(in6_ptop(rx[IPv6].src),
57                          in6_ptop(mk_ll_addr(intf.local_mac)))
58
59     def validate_na(self, intf, rx, dst_ip=None, tgt_ip=None):
60         if not dst_ip:
61             dst_ip = intf.remote_ip6
62         if not tgt_ip:
63             dst_ip = intf.local_ip6
64
65         # unicasted packets must come to the unicast mac
66         self.assertEqual(rx[Ether].dst, intf.remote_mac)
67
68         # and from the router's MAC
69         self.assertEqual(rx[Ether].src, intf.local_mac)
70
71         # the rx'd NA should be addressed to the sender's source
72         self.assertTrue(rx.haslayer(ICMPv6ND_NA))
73         self.assertEqual(in6_ptop(rx[IPv6].dst),
74                          in6_ptop(dst_ip))
75
76         # and come from the target address
77         self.assertEqual(
78             in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
79
80         # Dest link-layer options should have the router's MAC
81         dll = rx[ICMPv6NDOptDstLLAddr]
82         self.assertEqual(dll.lladdr, intf.local_mac)
83
84     def validate_ns(self, intf, rx, tgt_ip):
85         nsma = in6_getnsma(inet_pton(AF_INET6, tgt_ip))
86         dst_ip = inet_ntop(AF_INET6, nsma)
87
88         # NS is broadcast
89         self.assertEqual(rx[Ether].dst, in6_getnsmac(nsma))
90
91         # and from the router's MAC
92         self.assertEqual(rx[Ether].src, intf.local_mac)
93
94         # the rx'd NS should be addressed to an mcast address
95         # derived from the target address
96         self.assertEqual(
97             in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
98
99         # expect the tgt IP in the NS header
100         ns = rx[ICMPv6ND_NS]
101         self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
102
103         # packet is from the router's local address
104         self.assertEqual(
105             in6_ptop(rx[IPv6].src), intf.local_ip6)
106
107         # Src link-layer options should have the router's MAC
108         sll = rx[ICMPv6NDOptSrcLLAddr]
109         self.assertEqual(sll.lladdr, intf.local_mac)
110
111     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
112                            filter_out_fn=is_ipv6_misc):
113         intf.add_stream(pkts)
114         self.pg_enable_capture(self.pg_interfaces)
115         self.pg_start()
116         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
117
118         self.assertEqual(len(rx), 1)
119         rx = rx[0]
120         self.validate_ra(intf, rx, dst_ip)
121
122     def send_and_expect_na(self, intf, pkts, remark, dst_ip=None,
123                            tgt_ip=None,
124                            filter_out_fn=is_ipv6_misc):
125         intf.add_stream(pkts)
126         self.pg_enable_capture(self.pg_interfaces)
127         self.pg_start()
128         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
129
130         self.assertEqual(len(rx), 1)
131         rx = rx[0]
132         self.validate_na(intf, rx, dst_ip, tgt_ip)
133
134     def send_and_expect_ns(self, tx_intf, rx_intf, pkts, tgt_ip,
135                            filter_out_fn=is_ipv6_misc):
136         tx_intf.add_stream(pkts)
137         self.pg_enable_capture(self.pg_interfaces)
138         self.pg_start()
139         rx = rx_intf.get_capture(1, filter_out_fn=filter_out_fn)
140
141         self.assertEqual(len(rx), 1)
142         rx = rx[0]
143         self.validate_ns(rx_intf, rx, tgt_ip)
144
145     def verify_ip(self, rx, smac, dmac, sip, dip):
146         ether = rx[Ether]
147         self.assertEqual(ether.dst, dmac)
148         self.assertEqual(ether.src, smac)
149
150         ip = rx[IPv6]
151         self.assertEqual(ip.src, sip)
152         self.assertEqual(ip.dst, dip)
153
154
155 class TestIPv6(TestIPv6ND):
156     """ IPv6 Test Case """
157
158     @classmethod
159     def setUpClass(cls):
160         super(TestIPv6, cls).setUpClass()
161
162     def setUp(self):
163         """
164         Perform test setup before test case.
165
166         **Config:**
167             - create 3 pg interfaces
168                 - untagged pg0 interface
169                 - Dot1Q subinterface on pg1
170                 - Dot1AD subinterface on pg2
171             - setup interfaces:
172                 - put it into UP state
173                 - set IPv6 addresses
174                 - resolve neighbor address using NDP
175             - configure 200 fib entries
176
177         :ivar list interfaces: pg interfaces and subinterfaces.
178         :ivar dict flows: IPv4 packet flows in test.
179
180         *TODO:* Create AD sub interface
181         """
182         super(TestIPv6, self).setUp()
183
184         # create 3 pg interfaces
185         self.create_pg_interfaces(range(3))
186
187         # create 2 subinterfaces for p1 and pg2
188         self.sub_interfaces = [
189             VppDot1QSubint(self, self.pg1, 100),
190             VppDot1QSubint(self, self.pg2, 200)
191             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
192         ]
193
194         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
195         self.flows = dict()
196         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
197         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
198         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
199
200         # packet sizes
201         self.pg_if_packet_sizes = [64, 1500, 9020]
202
203         self.interfaces = list(self.pg_interfaces)
204         self.interfaces.extend(self.sub_interfaces)
205
206         # setup all interfaces
207         for i in self.interfaces:
208             i.admin_up()
209             i.config_ip6()
210             i.resolve_ndp()
211
212         # config 2M FIB entries
213         self.config_fib_entries(200)
214
215     def tearDown(self):
216         """Run standard test teardown and log ``show ip6 neighbors``."""
217         for i in self.interfaces:
218             i.unconfig_ip6()
219             i.ip6_disable()
220             i.admin_down()
221         for i in self.sub_interfaces:
222             i.remove_vpp_config()
223
224         super(TestIPv6, self).tearDown()
225         if not self.vpp_dead:
226             self.logger.info(self.vapi.cli("show ip6 neighbors"))
227             # info(self.vapi.cli("show ip6 fib"))  # many entries
228
229     def config_fib_entries(self, count):
230         """For each interface add to the FIB table *count* routes to
231         "fd02::1/128" destination with interface's local address as next-hop
232         address.
233
234         :param int count: Number of FIB entries.
235
236         - *TODO:* check if the next-hop address shouldn't be remote address
237           instead of local address.
238         """
239         n_int = len(self.interfaces)
240         percent = 0
241         counter = 0.0
242         dest_addr = inet_pton(AF_INET6, "fd02::1")
243         dest_addr_len = 128
244         for i in self.interfaces:
245             next_hop_address = i.local_ip6n
246             for j in range(count / n_int):
247                 self.vapi.ip_add_del_route(
248                     dest_addr, dest_addr_len, next_hop_address, is_ipv6=1)
249                 counter += 1
250                 if counter / count * 100 > percent:
251                     self.logger.info("Configure %d FIB entries .. %d%% done" %
252                                      (count, percent))
253                     percent += 1
254
255     def modify_packet(self, src_if, packet_size, pkt):
256         """Add load, set destination IP and extend packet to required packet
257         size for defined interface.
258
259         :param VppInterface src_if: Interface to create packet for.
260         :param int packet_size: Required packet size.
261         :param Scapy pkt: Packet to be modified.
262         """
263         dst_if_idx = packet_size / 10 % 2
264         dst_if = self.flows[src_if][dst_if_idx]
265         info = self.create_packet_info(src_if, dst_if)
266         payload = self.info_to_payload(info)
267         p = pkt / Raw(payload)
268         p[IPv6].dst = dst_if.remote_ip6
269         info.data = p.copy()
270         if isinstance(src_if, VppSubInterface):
271             p = src_if.add_dot1_layer(p)
272         self.extend_packet(p, packet_size)
273
274         return p
275
276     def create_stream(self, src_if):
277         """Create input packet stream for defined interface.
278
279         :param VppInterface src_if: Interface to create packet stream for.
280         """
281         hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
282         pkt_tmpl = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
283                     IPv6(src=src_if.remote_ip6) /
284                     inet6.UDP(sport=1234, dport=1234))
285
286         pkts = [self.modify_packet(src_if, i, pkt_tmpl)
287                 for i in moves.range(self.pg_if_packet_sizes[0],
288                                      self.pg_if_packet_sizes[1], 10)]
289         pkts_b = [self.modify_packet(src_if, i, pkt_tmpl)
290                   for i in moves.range(self.pg_if_packet_sizes[1] + hdr_ext,
291                                        self.pg_if_packet_sizes[2] + hdr_ext,
292                                        50)]
293         pkts.extend(pkts_b)
294
295         return pkts
296
297     def verify_capture(self, dst_if, capture):
298         """Verify captured input packet stream for defined interface.
299
300         :param VppInterface dst_if: Interface to verify captured packet stream
301                                     for.
302         :param list capture: Captured packet stream.
303         """
304         self.logger.info("Verifying capture on interface %s" % dst_if.name)
305         last_info = dict()
306         for i in self.interfaces:
307             last_info[i.sw_if_index] = None
308         is_sub_if = False
309         dst_sw_if_index = dst_if.sw_if_index
310         if hasattr(dst_if, 'parent'):
311             is_sub_if = True
312         for packet in capture:
313             if is_sub_if:
314                 # Check VLAN tags and Ethernet header
315                 packet = dst_if.remove_dot1_layer(packet)
316             self.assertTrue(Dot1Q not in packet)
317             try:
318                 ip = packet[IPv6]
319                 udp = packet[inet6.UDP]
320                 payload_info = self.payload_to_info(str(packet[Raw]))
321                 packet_index = payload_info.index
322                 self.assertEqual(payload_info.dst, dst_sw_if_index)
323                 self.logger.debug(
324                     "Got packet on port %s: src=%u (id=%u)" %
325                     (dst_if.name, payload_info.src, packet_index))
326                 next_info = self.get_next_packet_info_for_interface2(
327                     payload_info.src, dst_sw_if_index,
328                     last_info[payload_info.src])
329                 last_info[payload_info.src] = next_info
330                 self.assertTrue(next_info is not None)
331                 self.assertEqual(packet_index, next_info.index)
332                 saved_packet = next_info.data
333                 # Check standard fields
334                 self.assertEqual(
335                     ip.src, saved_packet[IPv6].src)
336                 self.assertEqual(
337                     ip.dst, saved_packet[IPv6].dst)
338                 self.assertEqual(
339                     udp.sport, saved_packet[inet6.UDP].sport)
340                 self.assertEqual(
341                     udp.dport, saved_packet[inet6.UDP].dport)
342             except:
343                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
344                 raise
345         for i in self.interfaces:
346             remaining_packet = self.get_next_packet_info_for_interface2(
347                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
348             self.assertTrue(remaining_packet is None,
349                             "Interface %s: Packet expected from interface %s "
350                             "didn't arrive" % (dst_if.name, i.name))
351
352     def test_fib(self):
353         """ IPv6 FIB test
354
355         Test scenario:
356             - Create IPv6 stream for pg0 interface
357             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
358             - Send and verify received packets on each interface.
359         """
360
361         pkts = self.create_stream(self.pg0)
362         self.pg0.add_stream(pkts)
363
364         for i in self.sub_interfaces:
365             pkts = self.create_stream(i)
366             i.parent.add_stream(pkts)
367
368         self.pg_enable_capture(self.pg_interfaces)
369         self.pg_start()
370
371         pkts = self.pg0.get_capture()
372         self.verify_capture(self.pg0, pkts)
373
374         for i in self.sub_interfaces:
375             pkts = i.parent.get_capture()
376             self.verify_capture(i, pkts)
377
378     def test_ns(self):
379         """ IPv6 Neighbour Solicitation Exceptions
380
381         Test scenario:
382            - Send an NS Sourced from an address not covered by the link sub-net
383            - Send an NS to an mcast address the router has not joined
384            - Send NS for a target address the router does not onn.
385         """
386
387         #
388         # An NS from a non link source address
389         #
390         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
391         d = inet_ntop(AF_INET6, nsma)
392
393         p = (Ether(dst=in6_getnsmac(nsma)) /
394              IPv6(dst=d, src="2002::2") /
395              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
396              ICMPv6NDOptSrcLLAddr(
397                  lladdr=self.pg0.remote_mac))
398         pkts = [p]
399
400         self.send_and_assert_no_replies(
401             self.pg0, pkts,
402             "No response to NS source by address not on sub-net")
403
404         #
405         # An NS for sent to a solicited mcast group the router is
406         # not a member of FAILS
407         #
408         if 0:
409             nsma = in6_getnsma(inet_pton(AF_INET6, "fd::ffff"))
410             d = inet_ntop(AF_INET6, nsma)
411
412             p = (Ether(dst=in6_getnsmac(nsma)) /
413                  IPv6(dst=d, src=self.pg0.remote_ip6) /
414                  ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
415                  ICMPv6NDOptSrcLLAddr(
416                      lladdr=self.pg0.remote_mac))
417             pkts = [p]
418
419             self.send_and_assert_no_replies(
420                 self.pg0, pkts,
421                 "No response to NS sent to unjoined mcast address")
422
423         #
424         # An NS whose target address is one the router does not own
425         #
426         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
427         d = inet_ntop(AF_INET6, nsma)
428
429         p = (Ether(dst=in6_getnsmac(nsma)) /
430              IPv6(dst=d, src=self.pg0.remote_ip6) /
431              ICMPv6ND_NS(tgt="fd::ffff") /
432              ICMPv6NDOptSrcLLAddr(
433                  lladdr=self.pg0.remote_mac))
434         pkts = [p]
435
436         self.send_and_assert_no_replies(self.pg0, pkts,
437                                         "No response to NS for unknown target")
438
439         #
440         # A neighbor entry that has no associated FIB-entry
441         #
442         self.pg0.generate_remote_hosts(4)
443         nd_entry = VppNeighbor(self,
444                                self.pg0.sw_if_index,
445                                self.pg0.remote_hosts[2].mac,
446                                self.pg0.remote_hosts[2].ip6,
447                                af=AF_INET6,
448                                is_no_fib_entry=1)
449         nd_entry.add_vpp_config()
450
451         #
452         # check we have the neighbor, but no route
453         #
454         self.assertTrue(find_nbr(self,
455                                  self.pg0.sw_if_index,
456                                  self.pg0._remote_hosts[2].ip6,
457                                  inet=AF_INET6))
458         self.assertFalse(find_route(self,
459                                     self.pg0._remote_hosts[2].ip6,
460                                     128,
461                                     inet=AF_INET6))
462
463         #
464         # send an NS from a link local address to the interface's global
465         # address
466         #
467         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
468              IPv6(
469                  dst=d, src=self.pg0._remote_hosts[2].ip6_ll) /
470              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
471              ICMPv6NDOptSrcLLAddr(
472                  lladdr=self.pg0.remote_mac))
473
474         self.send_and_expect_na(self.pg0, p,
475                                 "NS from link-local",
476                                 dst_ip=self.pg0._remote_hosts[2].ip6_ll,
477                                 tgt_ip=self.pg0.local_ip6)
478
479         #
480         # we should have learned an ND entry for the peer's link-local
481         # but not inserted a route to it in the FIB
482         #
483         self.assertTrue(find_nbr(self,
484                                  self.pg0.sw_if_index,
485                                  self.pg0._remote_hosts[2].ip6_ll,
486                                  inet=AF_INET6))
487         self.assertFalse(find_route(self,
488                                     self.pg0._remote_hosts[2].ip6_ll,
489                                     128,
490                                     inet=AF_INET6))
491
492         #
493         # An NS to the router's own Link-local
494         #
495         p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
496              IPv6(
497                  dst=d, src=self.pg0._remote_hosts[3].ip6_ll) /
498              ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll) /
499              ICMPv6NDOptSrcLLAddr(
500                  lladdr=self.pg0.remote_mac))
501
502         self.send_and_expect_na(self.pg0, p,
503                                 "NS to/from link-local",
504                                 dst_ip=self.pg0._remote_hosts[3].ip6_ll,
505                                 tgt_ip=self.pg0.local_ip6_ll)
506
507         #
508         # we should have learned an ND entry for the peer's link-local
509         # but not inserted a route to it in the FIB
510         #
511         self.assertTrue(find_nbr(self,
512                                  self.pg0.sw_if_index,
513                                  self.pg0._remote_hosts[3].ip6_ll,
514                                  inet=AF_INET6))
515         self.assertFalse(find_route(self,
516                                     self.pg0._remote_hosts[3].ip6_ll,
517                                     128,
518                                     inet=AF_INET6))
519
520     def test_ns_duplicates(self):
521         """ ND Duplicates"""
522
523         #
524         # Generate some hosts on the LAN
525         #
526         self.pg1.generate_remote_hosts(3)
527
528         #
529         # Add host 1 on pg1 and pg2
530         #
531         ns_pg1 = VppNeighbor(self,
532                              self.pg1.sw_if_index,
533                              self.pg1.remote_hosts[1].mac,
534                              self.pg1.remote_hosts[1].ip6,
535                              af=AF_INET6)
536         ns_pg1.add_vpp_config()
537         ns_pg2 = VppNeighbor(self,
538                              self.pg2.sw_if_index,
539                              self.pg2.remote_mac,
540                              self.pg1.remote_hosts[1].ip6,
541                              af=AF_INET6)
542         ns_pg2.add_vpp_config()
543
544         #
545         # IP packet destined for pg1 remote host arrives on pg1 again.
546         #
547         p = (Ether(dst=self.pg0.local_mac,
548                    src=self.pg0.remote_mac) /
549              IPv6(src=self.pg0.remote_ip6,
550                   dst=self.pg1.remote_hosts[1].ip6) /
551              inet6.UDP(sport=1234, dport=1234) /
552              Raw())
553
554         self.pg0.add_stream(p)
555         self.pg_enable_capture(self.pg_interfaces)
556         self.pg_start()
557
558         rx1 = self.pg1.get_capture(1)
559
560         self.verify_ip(rx1[0],
561                        self.pg1.local_mac,
562                        self.pg1.remote_hosts[1].mac,
563                        self.pg0.remote_ip6,
564                        self.pg1.remote_hosts[1].ip6)
565
566         #
567         # remove the duplicate on pg1
568         # packet stream shoud generate NSs out of pg1
569         #
570         ns_pg1.remove_vpp_config()
571
572         self.send_and_expect_ns(self.pg0, self.pg1,
573                                 p, self.pg1.remote_hosts[1].ip6)
574
575         #
576         # Add it back
577         #
578         ns_pg1.add_vpp_config()
579
580         self.pg0.add_stream(p)
581         self.pg_enable_capture(self.pg_interfaces)
582         self.pg_start()
583
584         rx1 = self.pg1.get_capture(1)
585
586         self.verify_ip(rx1[0],
587                        self.pg1.local_mac,
588                        self.pg1.remote_hosts[1].mac,
589                        self.pg0.remote_ip6,
590                        self.pg1.remote_hosts[1].ip6)
591
592     def validate_ra(self, intf, rx, dst_ip=None, mtu=9000, pi_opt=None):
593         if not dst_ip:
594             dst_ip = intf.remote_ip6
595
596         # unicasted packets must come to the unicast mac
597         self.assertEqual(rx[Ether].dst, intf.remote_mac)
598
599         # and from the router's MAC
600         self.assertEqual(rx[Ether].src, intf.local_mac)
601
602         # the rx'd RA should be addressed to the sender's source
603         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
604         self.assertEqual(in6_ptop(rx[IPv6].dst),
605                          in6_ptop(dst_ip))
606
607         # and come from the router's link local
608         self.assertTrue(in6_islladdr(rx[IPv6].src))
609         self.assertEqual(in6_ptop(rx[IPv6].src),
610                          in6_ptop(mk_ll_addr(intf.local_mac)))
611
612         # it should contain the links MTU
613         ra = rx[ICMPv6ND_RA]
614         self.assertEqual(ra[ICMPv6NDOptMTU].mtu, mtu)
615
616         # it should contain the source's link layer address option
617         sll = ra[ICMPv6NDOptSrcLLAddr]
618         self.assertEqual(sll.lladdr, intf.local_mac)
619
620         if not pi_opt:
621             # the RA should not contain prefix information
622             self.assertFalse(ra.haslayer(
623                 ICMPv6NDOptPrefixInfo))
624         else:
625             raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
626
627             # the options are nested in the scapy packet in way that i cannot
628             # decipher how to decode. this 1st layer of option always returns
629             # nested classes, so a direct obj1=obj2 comparison always fails.
630             # however, the getlayer(.., 2) does give one instnace.
631             # so we cheat here and construct a new opt instnace for comparison
632             rd = ICMPv6NDOptPrefixInfo(
633                 prefixlen=raos.prefixlen,
634                 prefix=raos.prefix,
635                 L=raos.L,
636                 A=raos.A)
637             if type(pi_opt) is list:
638                 for ii in range(len(pi_opt)):
639                     self.assertEqual(pi_opt[ii], rd)
640                     rd = rx.getlayer(
641                         ICMPv6NDOptPrefixInfo, ii + 2)
642             else:
643                 self.assertEqual(pi_opt, raos)
644
645     def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
646                            filter_out_fn=is_ipv6_misc,
647                            opt=None):
648         intf.add_stream(pkts)
649         self.pg_enable_capture(self.pg_interfaces)
650         self.pg_start()
651         rx = intf.get_capture(1, filter_out_fn=filter_out_fn)
652
653         self.assertEqual(len(rx), 1)
654         rx = rx[0]
655         self.validate_ra(intf, rx, dst_ip, pi_opt=opt)
656
657     def test_rs(self):
658         """ IPv6 Router Solicitation Exceptions
659
660         Test scenario:
661         """
662
663         #
664         # Before we begin change the IPv6 RA responses to use the unicast
665         # address - that way we will not confuse them with the periodic
666         # RAs which go to the mcast address
667         # Sit and wait for the first periodic RA.
668         #
669         # TODO
670         #
671         self.pg0.ip6_ra_config(send_unicast=1)
672
673         #
674         # An RS from a link source address
675         #  - expect an RA in return
676         #
677         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
678              IPv6(
679                  dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
680              ICMPv6ND_RS())
681         pkts = [p]
682         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
683
684         #
685         # For the next RS sent the RA should be rate limited
686         #
687         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
688
689         #
690         # When we reconfiure the IPv6 RA config, we reset the RA rate limiting,
691         # so we need to do this before each test below so as not to drop
692         # packets for rate limiting reasons. Test this works here.
693         #
694         self.pg0.ip6_ra_config(send_unicast=1)
695         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
696
697         #
698         # An RS sent from a non-link local source
699         #
700         self.pg0.ip6_ra_config(send_unicast=1)
701         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
702              IPv6(dst=self.pg0.local_ip6,
703                   src="2002::ffff") /
704              ICMPv6ND_RS())
705         pkts = [p]
706         self.send_and_assert_no_replies(self.pg0, pkts,
707                                         "RS from non-link source")
708
709         #
710         # Source an RS from a link local address
711         #
712         self.pg0.ip6_ra_config(send_unicast=1)
713         ll = mk_ll_addr(self.pg0.remote_mac)
714         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
715              IPv6(dst=self.pg0.local_ip6, src=ll) /
716              ICMPv6ND_RS())
717         pkts = [p]
718         self.send_and_expect_ra(self.pg0, pkts,
719                                 "RS sourced from link-local",
720                                 dst_ip=ll)
721
722         #
723         # Send the RS multicast
724         #
725         self.pg0.ip6_ra_config(send_unicast=1)
726         dmac = in6_getnsmac(inet_pton(AF_INET6, "ff02::2"))
727         ll = mk_ll_addr(self.pg0.remote_mac)
728         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
729              IPv6(dst="ff02::2", src=ll) /
730              ICMPv6ND_RS())
731         pkts = [p]
732         self.send_and_expect_ra(self.pg0, pkts,
733                                 "RS sourced from link-local",
734                                 dst_ip=ll)
735
736         #
737         # Source from the unspecified address ::. This happens when the RS
738         # is sent before the host has a configured address/sub-net,
739         # i.e. auto-config. Since the sender has no IP address, the reply
740         # comes back mcast - so the capture needs to not filter this.
741         # If we happen to pick up the periodic RA at this point then so be it,
742         # it's not an error.
743         #
744         self.pg0.ip6_ra_config(send_unicast=1, suppress=1)
745         p = (Ether(dst=dmac, src=self.pg0.remote_mac) /
746              IPv6(dst="ff02::2", src="::") /
747              ICMPv6ND_RS())
748         pkts = [p]
749         self.send_and_expect_ra(self.pg0, pkts,
750                                 "RS sourced from unspecified",
751                                 dst_ip="ff02::1",
752                                 filter_out_fn=None)
753
754         #
755         # Configure The RA to announce the links prefix
756         #
757         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
758                                self.pg0.local_ip6_prefix_len)
759
760         #
761         # RAs should now contain the prefix information option
762         #
763         opt = ICMPv6NDOptPrefixInfo(
764             prefixlen=self.pg0.local_ip6_prefix_len,
765             prefix=self.pg0.local_ip6,
766             L=1,
767             A=1)
768
769         self.pg0.ip6_ra_config(send_unicast=1)
770         ll = mk_ll_addr(self.pg0.remote_mac)
771         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
772              IPv6(dst=self.pg0.local_ip6, src=ll) /
773              ICMPv6ND_RS())
774         self.send_and_expect_ra(self.pg0, p,
775                                 "RA with prefix-info",
776                                 dst_ip=ll,
777                                 opt=opt)
778
779         #
780         # Change the prefix info to not off-link
781         #  L-flag is clear
782         #
783         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
784                                self.pg0.local_ip6_prefix_len,
785                                off_link=1)
786
787         opt = ICMPv6NDOptPrefixInfo(
788             prefixlen=self.pg0.local_ip6_prefix_len,
789             prefix=self.pg0.local_ip6,
790             L=0,
791             A=1)
792
793         self.pg0.ip6_ra_config(send_unicast=1)
794         self.send_and_expect_ra(self.pg0, p,
795                                 "RA with Prefix info with L-flag=0",
796                                 dst_ip=ll,
797                                 opt=opt)
798
799         #
800         # Change the prefix info to not off-link, no-autoconfig
801         #  L and A flag are clear in the advert
802         #
803         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
804                                self.pg0.local_ip6_prefix_len,
805                                off_link=1,
806                                no_autoconfig=1)
807
808         opt = ICMPv6NDOptPrefixInfo(
809             prefixlen=self.pg0.local_ip6_prefix_len,
810             prefix=self.pg0.local_ip6,
811             L=0,
812             A=0)
813
814         self.pg0.ip6_ra_config(send_unicast=1)
815         self.send_and_expect_ra(self.pg0, p,
816                                 "RA with Prefix info with A & L-flag=0",
817                                 dst_ip=ll,
818                                 opt=opt)
819
820         #
821         # Change the flag settings back to the defaults
822         #  L and A flag are set in the advert
823         #
824         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
825                                self.pg0.local_ip6_prefix_len)
826
827         opt = ICMPv6NDOptPrefixInfo(
828             prefixlen=self.pg0.local_ip6_prefix_len,
829             prefix=self.pg0.local_ip6,
830             L=1,
831             A=1)
832
833         self.pg0.ip6_ra_config(send_unicast=1)
834         self.send_and_expect_ra(self.pg0, p,
835                                 "RA with Prefix info",
836                                 dst_ip=ll,
837                                 opt=opt)
838
839         #
840         # Change the prefix info to not off-link, no-autoconfig
841         #  L and A flag are clear in the advert
842         #
843         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
844                                self.pg0.local_ip6_prefix_len,
845                                off_link=1,
846                                no_autoconfig=1)
847
848         opt = ICMPv6NDOptPrefixInfo(
849             prefixlen=self.pg0.local_ip6_prefix_len,
850             prefix=self.pg0.local_ip6,
851             L=0,
852             A=0)
853
854         self.pg0.ip6_ra_config(send_unicast=1)
855         self.send_and_expect_ra(self.pg0, p,
856                                 "RA with Prefix info with A & L-flag=0",
857                                 dst_ip=ll,
858                                 opt=opt)
859
860         #
861         # Use the reset to defults option to revert to defaults
862         #  L and A flag are clear in the advert
863         #
864         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
865                                self.pg0.local_ip6_prefix_len,
866                                use_default=1)
867
868         opt = ICMPv6NDOptPrefixInfo(
869             prefixlen=self.pg0.local_ip6_prefix_len,
870             prefix=self.pg0.local_ip6,
871             L=1,
872             A=1)
873
874         self.pg0.ip6_ra_config(send_unicast=1)
875         self.send_and_expect_ra(self.pg0, p,
876                                 "RA with Prefix reverted to defaults",
877                                 dst_ip=ll,
878                                 opt=opt)
879
880         #
881         # Advertise Another prefix. With no L-flag/A-flag
882         #
883         self.pg0.ip6_ra_prefix(self.pg1.local_ip6n,
884                                self.pg1.local_ip6_prefix_len,
885                                off_link=1,
886                                no_autoconfig=1)
887
888         opt = [ICMPv6NDOptPrefixInfo(
889             prefixlen=self.pg0.local_ip6_prefix_len,
890             prefix=self.pg0.local_ip6,
891             L=1,
892             A=1),
893             ICMPv6NDOptPrefixInfo(
894                 prefixlen=self.pg1.local_ip6_prefix_len,
895                 prefix=self.pg1.local_ip6,
896                 L=0,
897                 A=0)]
898
899         self.pg0.ip6_ra_config(send_unicast=1)
900         ll = mk_ll_addr(self.pg0.remote_mac)
901         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
902              IPv6(dst=self.pg0.local_ip6, src=ll) /
903              ICMPv6ND_RS())
904         self.send_and_expect_ra(self.pg0, p,
905                                 "RA with multiple Prefix infos",
906                                 dst_ip=ll,
907                                 opt=opt)
908
909         #
910         # Remove the first refix-info - expect the second is still in the
911         # advert
912         #
913         self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
914                                self.pg0.local_ip6_prefix_len,
915                                is_no=1)
916
917         opt = ICMPv6NDOptPrefixInfo(
918             prefixlen=self.pg1.local_ip6_prefix_len,
919             prefix=self.pg1.local_ip6,
920             L=0,
921             A=0)
922
923         self.pg0.ip6_ra_config(send_unicast=1)
924         self.send_and_expect_ra(self.pg0, p,
925                                 "RA with Prefix reverted to defaults",
926                                 dst_ip=ll,
927                                 opt=opt)
928
929         #
930         # Remove the second prefix-info - expect no prefix-info i nthe adverts
931         #
932         self.pg0.ip6_ra_prefix(self.pg1.local_ip6n,
933                                self.pg1.local_ip6_prefix_len,
934                                is_no=1)
935
936         self.pg0.ip6_ra_config(send_unicast=1)
937         self.send_and_expect_ra(self.pg0, p,
938                                 "RA with Prefix reverted to defaults",
939                                 dst_ip=ll)
940
941         #
942         # Reset the periodic advertisements back to default values
943         #
944         self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
945
946
947 class TestICMPv6Echo(VppTestCase):
948     """ ICMPv6 Echo Test Case """
949
950     def setUp(self):
951         super(TestICMPv6Echo, self).setUp()
952
953         # create 1 pg interface
954         self.create_pg_interfaces(range(1))
955
956         for i in self.pg_interfaces:
957             i.admin_up()
958             i.config_ip6()
959             i.resolve_ndp()
960
961     def tearDown(self):
962         super(TestICMPv6Echo, self).tearDown()
963         for i in self.pg_interfaces:
964             i.unconfig_ip6()
965             i.ip6_disable()
966             i.admin_down()
967
968     def test_icmpv6_echo(self):
969         """ VPP replies to ICMPv6 Echo Request
970
971         Test scenario:
972
973             - Receive ICMPv6 Echo Request message on pg0 interface.
974             - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
975         """
976
977         icmpv6_id = 0xb
978         icmpv6_seq = 5
979         icmpv6_data = '\x0a' * 18
980         p_echo_request = (Ether(src=self.pg0.remote_mac,
981                                 dst=self.pg0.local_mac) /
982                           IPv6(src=self.pg0.remote_ip6,
983                                dst=self.pg0.local_ip6) /
984                           ICMPv6EchoRequest(
985                               id=icmpv6_id,
986                               seq=icmpv6_seq,
987                               data=icmpv6_data))
988
989         self.pg0.add_stream(p_echo_request)
990         self.pg_enable_capture(self.pg_interfaces)
991         self.pg_start()
992
993         rx = self.pg0.get_capture(1)
994         rx = rx[0]
995         ether = rx[Ether]
996         ipv6 = rx[IPv6]
997         icmpv6 = rx[ICMPv6EchoReply]
998
999         self.assertEqual(ether.src, self.pg0.local_mac)
1000         self.assertEqual(ether.dst, self.pg0.remote_mac)
1001
1002         self.assertEqual(ipv6.src, self.pg0.local_ip6)
1003         self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
1004
1005         self.assertEqual(
1006             icmp6types[icmpv6.type], "Echo Reply")
1007         self.assertEqual(icmpv6.id, icmpv6_id)
1008         self.assertEqual(icmpv6.seq, icmpv6_seq)
1009         self.assertEqual(icmpv6.data, icmpv6_data)
1010
1011
1012 class TestIPv6RD(TestIPv6ND):
1013     """ IPv6 Router Discovery Test Case """
1014
1015     @classmethod
1016     def setUpClass(cls):
1017         super(TestIPv6RD, cls).setUpClass()
1018
1019     def setUp(self):
1020         super(TestIPv6RD, self).setUp()
1021
1022         # create 2 pg interfaces
1023         self.create_pg_interfaces(range(2))
1024
1025         self.interfaces = list(self.pg_interfaces)
1026
1027         # setup all interfaces
1028         for i in self.interfaces:
1029             i.admin_up()
1030             i.config_ip6()
1031
1032     def tearDown(self):
1033         for i in self.interfaces:
1034             i.unconfig_ip6()
1035             i.admin_down()
1036         super(TestIPv6RD, self).tearDown()
1037
1038     def test_rd_send_router_solicitation(self):
1039         """ Verify router solicitation packets """
1040
1041         count = 2
1042         self.pg_enable_capture(self.pg_interfaces)
1043         self.pg_start()
1044         self.vapi.ip6nd_send_router_solicitation(self.pg1.sw_if_index,
1045                                                  mrc=count)
1046         rx_list = self.pg1.get_capture(count, timeout=3)
1047         self.assertEqual(len(rx_list), count)
1048         for packet in rx_list:
1049             self.assertEqual(packet.haslayer(IPv6), 1)
1050             self.assertEqual(packet[IPv6].haslayer(
1051                 ICMPv6ND_RS), 1)
1052             dst = ip6_normalize(packet[IPv6].dst)
1053             dst2 = ip6_normalize("ff02::2")
1054             self.assert_equal(dst, dst2)
1055             src = ip6_normalize(packet[IPv6].src)
1056             src2 = ip6_normalize(self.pg1.local_ip6_ll)
1057             self.assert_equal(src, src2)
1058             self.assertTrue(
1059                 bool(packet[ICMPv6ND_RS].haslayer(
1060                     ICMPv6NDOptSrcLLAddr)))
1061             self.assert_equal(
1062                 packet[ICMPv6NDOptSrcLLAddr].lladdr,
1063                 self.pg1.local_mac)
1064
1065     def verify_prefix_info(self, reported_prefix, prefix_option):
1066         prefix = socket.inet_pton(socket.AF_INET6,
1067                                   prefix_option.getfieldval("prefix"))
1068         self.assert_equal(reported_prefix.dst_address, prefix)
1069         self.assert_equal(reported_prefix.dst_address_length,
1070                           prefix_option.getfieldval("prefixlen"))
1071         L = prefix_option.getfieldval("L")
1072         A = prefix_option.getfieldval("A")
1073         option_flags = (L << 7) | (A << 6)
1074         self.assert_equal(reported_prefix.flags, option_flags)
1075         self.assert_equal(reported_prefix.valid_time,
1076                           prefix_option.getfieldval("validlifetime"))
1077         self.assert_equal(reported_prefix.preferred_time,
1078                           prefix_option.getfieldval("preferredlifetime"))
1079
1080     def test_rd_receive_router_advertisement(self):
1081         """ Verify events triggered by received RA packets """
1082
1083         self.vapi.want_ip6_ra_events()
1084
1085         prefix_info_1 = ICMPv6NDOptPrefixInfo(
1086             prefix="1::2",
1087             prefixlen=50,
1088             validlifetime=200,
1089             preferredlifetime=500,
1090             L=1,
1091             A=1,
1092         )
1093
1094         prefix_info_2 = ICMPv6NDOptPrefixInfo(
1095             prefix="7::4",
1096             prefixlen=20,
1097             validlifetime=70,
1098             preferredlifetime=1000,
1099             L=1,
1100             A=0,
1101         )
1102
1103         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1104              IPv6(dst=self.pg1.local_ip6_ll,
1105                   src=mk_ll_addr(self.pg1.remote_mac)) /
1106              ICMPv6ND_RA() /
1107              prefix_info_1 /
1108              prefix_info_2)
1109         self.pg1.add_stream([p])
1110         self.pg_start()
1111
1112         ev = self.vapi.wait_for_event(10, "ip6_ra_event")
1113
1114         self.assert_equal(ev.current_hop_limit, 0)
1115         self.assert_equal(ev.flags, 8)
1116         self.assert_equal(ev.router_lifetime_in_sec, 1800)
1117         self.assert_equal(ev.neighbor_reachable_time_in_msec, 0)
1118         self.assert_equal(
1119             ev.time_in_msec_between_retransmitted_neighbor_solicitations, 0)
1120
1121         self.assert_equal(ev.n_prefixes, 2)
1122
1123         self.verify_prefix_info(ev.prefixes[0], prefix_info_1)
1124         self.verify_prefix_info(ev.prefixes[1], prefix_info_2)
1125
1126
1127 class TestIPv6RDControlPlane(TestIPv6ND):
1128     """ IPv6 Router Discovery Control Plane Test Case """
1129
1130     @classmethod
1131     def setUpClass(cls):
1132         super(TestIPv6RDControlPlane, cls).setUpClass()
1133
1134     def setUp(self):
1135         super(TestIPv6RDControlPlane, self).setUp()
1136
1137         # create 1 pg interface
1138         self.create_pg_interfaces(range(1))
1139
1140         self.interfaces = list(self.pg_interfaces)
1141
1142         # setup all interfaces
1143         for i in self.interfaces:
1144             i.admin_up()
1145             i.config_ip6()
1146
1147     def tearDown(self):
1148         super(TestIPv6RDControlPlane, self).tearDown()
1149
1150     @staticmethod
1151     def create_ra_packet(pg, routerlifetime=None):
1152         src_ip = pg.remote_ip6_ll
1153         dst_ip = pg.local_ip6
1154         if routerlifetime is not None:
1155             ra = ICMPv6ND_RA(routerlifetime=routerlifetime)
1156         else:
1157             ra = ICMPv6ND_RA()
1158         p = (Ether(dst=pg.local_mac, src=pg.remote_mac) /
1159              IPv6(dst=dst_ip, src=src_ip) / ra)
1160         return p
1161
1162     @staticmethod
1163     def get_default_routes(fib):
1164         list = []
1165         for entry in fib:
1166             if entry.address_length == 0:
1167                 for path in entry.path:
1168                     if path.sw_if_index != 0xFFFFFFFF:
1169                         defaut_route = {}
1170                         defaut_route['sw_if_index'] = path.sw_if_index
1171                         defaut_route['next_hop'] = path.next_hop
1172                         list.append(defaut_route)
1173         return list
1174
1175     @staticmethod
1176     def get_interface_addresses(fib, pg):
1177         list = []
1178         for entry in fib:
1179             if entry.address_length == 128:
1180                 path = entry.path[0]
1181                 if path.sw_if_index == pg.sw_if_index:
1182                     list.append(entry.address)
1183         return list
1184
1185     def test_all(self):
1186         """ Test handling of SLAAC addresses and default routes """
1187
1188         fib = self.vapi.ip6_fib_dump()
1189         default_routes = self.get_default_routes(fib)
1190         initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
1191         self.assertEqual(default_routes, [])
1192         router_address = self.pg0.remote_ip6n_ll
1193
1194         self.vapi.ip6_nd_address_autoconfig(self.pg0.sw_if_index, 1, 1)
1195
1196         self.sleep(0.1)
1197
1198         # send RA
1199         packet = (self.create_ra_packet(
1200             self.pg0) / ICMPv6NDOptPrefixInfo(
1201             prefix="1::",
1202             prefixlen=64,
1203             validlifetime=2,
1204             preferredlifetime=2,
1205             L=1,
1206             A=1,
1207         ) / ICMPv6NDOptPrefixInfo(
1208             prefix="7::",
1209             prefixlen=20,
1210             validlifetime=1500,
1211             preferredlifetime=1000,
1212             L=1,
1213             A=0,
1214         ))
1215         self.pg0.add_stream([packet])
1216         self.pg_start()
1217
1218         self.sleep(0.1)
1219
1220         fib = self.vapi.ip6_fib_dump()
1221
1222         # check FIB for new address
1223         addresses = set(self.get_interface_addresses(fib, self.pg0))
1224         new_addresses = addresses.difference(initial_addresses)
1225         self.assertEqual(len(new_addresses), 1)
1226         prefix = list(new_addresses)[0][:8] + '\0\0\0\0\0\0\0\0'
1227         self.assertEqual(inet_ntop(AF_INET6, prefix), '1::')
1228
1229         # check FIB for new default route
1230         default_routes = self.get_default_routes(fib)
1231         self.assertEqual(len(default_routes), 1)
1232         dr = default_routes[0]
1233         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1234         self.assertEqual(dr['next_hop'], router_address)
1235
1236         # send RA to delete default route
1237         packet = self.create_ra_packet(self.pg0, routerlifetime=0)
1238         self.pg0.add_stream([packet])
1239         self.pg_start()
1240
1241         self.sleep(0.1)
1242
1243         # check that default route is deleted
1244         fib = self.vapi.ip6_fib_dump()
1245         default_routes = self.get_default_routes(fib)
1246         self.assertEqual(len(default_routes), 0)
1247
1248         self.sleep(0.1)
1249
1250         # send RA
1251         packet = self.create_ra_packet(self.pg0)
1252         self.pg0.add_stream([packet])
1253         self.pg_start()
1254
1255         self.sleep(0.1)
1256
1257         # check FIB for new default route
1258         fib = self.vapi.ip6_fib_dump()
1259         default_routes = self.get_default_routes(fib)
1260         self.assertEqual(len(default_routes), 1)
1261         dr = default_routes[0]
1262         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1263         self.assertEqual(dr['next_hop'], router_address)
1264
1265         # send RA, updating router lifetime to 1s
1266         packet = self.create_ra_packet(self.pg0, 1)
1267         self.pg0.add_stream([packet])
1268         self.pg_start()
1269
1270         self.sleep(0.1)
1271
1272         # check that default route still exists
1273         fib = self.vapi.ip6_fib_dump()
1274         default_routes = self.get_default_routes(fib)
1275         self.assertEqual(len(default_routes), 1)
1276         dr = default_routes[0]
1277         self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
1278         self.assertEqual(dr['next_hop'], router_address)
1279
1280         self.sleep(1)
1281
1282         # check that default route is deleted
1283         fib = self.vapi.ip6_fib_dump()
1284         default_routes = self.get_default_routes(fib)
1285         self.assertEqual(len(default_routes), 0)
1286
1287         # check FIB still contains the SLAAC address
1288         addresses = set(self.get_interface_addresses(fib, self.pg0))
1289         new_addresses = addresses.difference(initial_addresses)
1290         self.assertEqual(len(new_addresses), 1)
1291         prefix = list(new_addresses)[0][:8] + '\0\0\0\0\0\0\0\0'
1292         self.assertEqual(inet_ntop(AF_INET6, prefix), '1::')
1293
1294         self.sleep(1)
1295
1296         # check that SLAAC address is deleted
1297         fib = self.vapi.ip6_fib_dump()
1298         addresses = set(self.get_interface_addresses(fib, self.pg0))
1299         new_addresses = addresses.difference(initial_addresses)
1300         self.assertEqual(len(new_addresses), 0)
1301
1302
1303 class IPv6NDProxyTest(TestIPv6ND):
1304     """ IPv6 ND ProxyTest Case """
1305
1306     def setUp(self):
1307         super(IPv6NDProxyTest, self).setUp()
1308
1309         # create 3 pg interfaces
1310         self.create_pg_interfaces(range(3))
1311
1312         # pg0 is the master interface, with the configured subnet
1313         self.pg0.admin_up()
1314         self.pg0.config_ip6()
1315         self.pg0.resolve_ndp()
1316
1317         self.pg1.ip6_enable()
1318         self.pg2.ip6_enable()
1319
1320     def tearDown(self):
1321         super(IPv6NDProxyTest, self).tearDown()
1322
1323     def test_nd_proxy(self):
1324         """ IPv6 Proxy ND """
1325
1326         #
1327         # Generate some hosts in the subnet that we are proxying
1328         #
1329         self.pg0.generate_remote_hosts(8)
1330
1331         nsma = in6_getnsma(inet_pton(AF_INET6, self.pg0.local_ip6))
1332         d = inet_ntop(AF_INET6, nsma)
1333
1334         #
1335         # Send an NS for one of those remote hosts on one of the proxy links
1336         # expect no response since it's from an address that is not
1337         # on the link that has the prefix configured
1338         #
1339         ns_pg1 = (Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac) /
1340                   IPv6(dst=d,
1341                        src=self.pg0._remote_hosts[2].ip6) /
1342                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1343                   ICMPv6NDOptSrcLLAddr(
1344                       lladdr=self.pg0._remote_hosts[2].mac))
1345
1346         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
1347
1348         #
1349         # Add proxy support for the host
1350         #
1351         self.vapi.ip6_nd_proxy(
1352             inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1353             self.pg1.sw_if_index)
1354
1355         #
1356         # try that NS again. this time we expect an NA back
1357         #
1358         self.send_and_expect_na(self.pg1, ns_pg1,
1359                                 "NS to proxy entry",
1360                                 dst_ip=self.pg0._remote_hosts[2].ip6,
1361                                 tgt_ip=self.pg0.local_ip6)
1362
1363         #
1364         # ... and that we have an entry in the ND cache
1365         #
1366         self.assertTrue(find_nbr(self,
1367                                  self.pg1.sw_if_index,
1368                                  self.pg0._remote_hosts[2].ip6,
1369                                  inet=AF_INET6))
1370
1371         #
1372         # ... and we can route traffic to it
1373         #
1374         t = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1375              IPv6(dst=self.pg0._remote_hosts[2].ip6,
1376                   src=self.pg0.remote_ip6) /
1377              inet6.UDP(sport=10000, dport=20000) /
1378              Raw('\xa5' * 100))
1379
1380         self.pg0.add_stream(t)
1381         self.pg_enable_capture(self.pg_interfaces)
1382         self.pg_start()
1383         rx = self.pg1.get_capture(1)
1384         rx = rx[0]
1385
1386         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1387         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1388
1389         self.assertEqual(rx[IPv6].src,
1390                          t[IPv6].src)
1391         self.assertEqual(rx[IPv6].dst,
1392                          t[IPv6].dst)
1393
1394         #
1395         # Test we proxy for the host on the main interface
1396         #
1397         ns_pg0 = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
1398                   IPv6(dst=d, src=self.pg0.remote_ip6) /
1399                   ICMPv6ND_NS(
1400                       tgt=self.pg0._remote_hosts[2].ip6) /
1401                   ICMPv6NDOptSrcLLAddr(
1402                       lladdr=self.pg0.remote_mac))
1403
1404         self.send_and_expect_na(self.pg0, ns_pg0,
1405                                 "NS to proxy entry on main",
1406                                 tgt_ip=self.pg0._remote_hosts[2].ip6,
1407                                 dst_ip=self.pg0.remote_ip6)
1408
1409         #
1410         # Setup and resolve proxy for another host on another interface
1411         #
1412         ns_pg2 = (Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac) /
1413                   IPv6(dst=d,
1414                        src=self.pg0._remote_hosts[3].ip6) /
1415                   ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
1416                   ICMPv6NDOptSrcLLAddr(
1417                       lladdr=self.pg0._remote_hosts[2].mac))
1418
1419         self.vapi.ip6_nd_proxy(
1420             inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1421             self.pg2.sw_if_index)
1422
1423         self.send_and_expect_na(self.pg2, ns_pg2,
1424                                 "NS to proxy entry other interface",
1425                                 dst_ip=self.pg0._remote_hosts[3].ip6,
1426                                 tgt_ip=self.pg0.local_ip6)
1427
1428         self.assertTrue(find_nbr(self,
1429                                  self.pg2.sw_if_index,
1430                                  self.pg0._remote_hosts[3].ip6,
1431                                  inet=AF_INET6))
1432
1433         #
1434         # hosts can communicate. pg2->pg1
1435         #
1436         t2 = (Ether(dst=self.pg2.local_mac,
1437                     src=self.pg0.remote_hosts[3].mac) /
1438               IPv6(dst=self.pg0._remote_hosts[2].ip6,
1439                    src=self.pg0._remote_hosts[3].ip6) /
1440               inet6.UDP(sport=10000, dport=20000) /
1441               Raw('\xa5' * 100))
1442
1443         self.pg2.add_stream(t2)
1444         self.pg_enable_capture(self.pg_interfaces)
1445         self.pg_start()
1446         rx = self.pg1.get_capture(1)
1447         rx = rx[0]
1448
1449         self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
1450         self.assertEqual(rx[Ether].src, self.pg1.local_mac)
1451
1452         self.assertEqual(rx[IPv6].src,
1453                          t2[IPv6].src)
1454         self.assertEqual(rx[IPv6].dst,
1455                          t2[IPv6].dst)
1456
1457         #
1458         # remove the proxy configs
1459         #
1460         self.vapi.ip6_nd_proxy(
1461             inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
1462             self.pg1.sw_if_index,
1463             is_del=1)
1464         self.vapi.ip6_nd_proxy(
1465             inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
1466             self.pg2.sw_if_index,
1467             is_del=1)
1468
1469         self.assertFalse(find_nbr(self,
1470                                   self.pg2.sw_if_index,
1471                                   self.pg0._remote_hosts[3].ip6,
1472                                   inet=AF_INET6))
1473         self.assertFalse(find_nbr(self,
1474                                   self.pg1.sw_if_index,
1475                                   self.pg0._remote_hosts[2].ip6,
1476                                   inet=AF_INET6))
1477
1478         #
1479         # no longer proxy-ing...
1480         #
1481         self.send_and_assert_no_replies(self.pg0, ns_pg0, "Proxy unconfigured")
1482         self.send_and_assert_no_replies(self.pg1, ns_pg1, "Proxy unconfigured")
1483         self.send_and_assert_no_replies(self.pg2, ns_pg2, "Proxy unconfigured")
1484
1485         #
1486         # no longer forwarding. traffic generates NS out of the glean/main
1487         # interface
1488         #
1489         self.pg2.add_stream(t2)
1490         self.pg_enable_capture(self.pg_interfaces)
1491         self.pg_start()
1492
1493         rx = self.pg0.get_capture(1)
1494
1495         self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
1496
1497
1498 class TestIPNull(VppTestCase):
1499     """ IPv6 routes via NULL """
1500
1501     def setUp(self):
1502         super(TestIPNull, self).setUp()
1503
1504         # create 2 pg interfaces
1505         self.create_pg_interfaces(range(1))
1506
1507         for i in self.pg_interfaces:
1508             i.admin_up()
1509             i.config_ip6()
1510             i.resolve_ndp()
1511
1512     def tearDown(self):
1513         super(TestIPNull, self).tearDown()
1514         for i in self.pg_interfaces:
1515             i.unconfig_ip6()
1516             i.admin_down()
1517
1518     def test_ip_null(self):
1519         """ IP NULL route """
1520
1521         p = (Ether(src=self.pg0.remote_mac,
1522                    dst=self.pg0.local_mac) /
1523              IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
1524              inet6.UDP(sport=1234, dport=1234) /
1525              Raw('\xa5' * 100))
1526
1527         #
1528         # A route via IP NULL that will reply with ICMP unreachables
1529         #
1530         ip_unreach = VppIpRoute(self, "2001::", 64, [], is_unreach=1, is_ip6=1)
1531         ip_unreach.add_vpp_config()
1532
1533         self.pg0.add_stream(p)
1534         self.pg_enable_capture(self.pg_interfaces)
1535         self.pg_start()
1536
1537         rx = self.pg0.get_capture(1)
1538         rx = rx[0]
1539         icmp = rx[ICMPv6DestUnreach]
1540
1541         # 0 = "No route to destination"
1542         self.assertEqual(icmp.code, 0)
1543
1544         # ICMP is rate limited. pause a bit
1545         self.sleep(1)
1546
1547         #
1548         # A route via IP NULL that will reply with ICMP prohibited
1549         #
1550         ip_prohibit = VppIpRoute(self, "2001::1", 128, [],
1551                                  is_prohibit=1, is_ip6=1)
1552         ip_prohibit.add_vpp_config()
1553
1554         self.pg0.add_stream(p)
1555         self.pg_enable_capture(self.pg_interfaces)
1556         self.pg_start()
1557
1558         rx = self.pg0.get_capture(1)
1559         rx = rx[0]
1560         icmp = rx[ICMPv6DestUnreach]
1561
1562         # 1 = "Communication with destination administratively prohibited"
1563         self.assertEqual(icmp.code, 1)
1564
1565
1566 class TestIPDisabled(VppTestCase):
1567     """ IPv6 disabled """
1568
1569     def setUp(self):
1570         super(TestIPDisabled, self).setUp()
1571
1572         # create 2 pg interfaces
1573         self.create_pg_interfaces(range(2))
1574
1575         # PG0 is IP enalbed
1576         self.pg0.admin_up()
1577         self.pg0.config_ip6()
1578         self.pg0.resolve_ndp()
1579
1580         # PG 1 is not IP enabled
1581         self.pg1.admin_up()
1582
1583     def tearDown(self):
1584         super(TestIPDisabled, self).tearDown()
1585         for i in self.pg_interfaces:
1586             i.unconfig_ip4()
1587             i.admin_down()
1588
1589     def test_ip_disabled(self):
1590         """ IP Disabled """
1591
1592         #
1593         # An (S,G).
1594         # one accepting interface, pg0, 2 forwarding interfaces
1595         #
1596         route_ff_01 = VppIpMRoute(
1597             self,
1598             "::",
1599             "ffef::1", 128,
1600             MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
1601             [VppMRoutePath(self.pg1.sw_if_index,
1602                            MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT),
1603              VppMRoutePath(self.pg0.sw_if_index,
1604                            MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)],
1605             is_ip6=1)
1606         route_ff_01.add_vpp_config()
1607
1608         pu = (Ether(src=self.pg1.remote_mac,
1609                     dst=self.pg1.local_mac) /
1610               IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
1611               inet6.UDP(sport=1234, dport=1234) /
1612               Raw('\xa5' * 100))
1613         pm = (Ether(src=self.pg1.remote_mac,
1614                     dst=self.pg1.local_mac) /
1615               IPv6(src="2001::1", dst="ffef::1") /
1616               inet6.UDP(sport=1234, dport=1234) /
1617               Raw('\xa5' * 100))
1618
1619         #
1620         # PG1 does not forward IP traffic
1621         #
1622         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1623         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1624
1625         #
1626         # IP enable PG1
1627         #
1628         self.pg1.config_ip6()
1629
1630         #
1631         # Now we get packets through
1632         #
1633         self.pg1.add_stream(pu)
1634         self.pg_enable_capture(self.pg_interfaces)
1635         self.pg_start()
1636         rx = self.pg0.get_capture(1)
1637
1638         self.pg1.add_stream(pm)
1639         self.pg_enable_capture(self.pg_interfaces)
1640         self.pg_start()
1641         rx = self.pg0.get_capture(1)
1642
1643         #
1644         # Disable PG1
1645         #
1646         self.pg1.unconfig_ip6()
1647
1648         #
1649         # PG1 does not forward IP traffic
1650         #
1651         self.send_and_assert_no_replies(self.pg1, pu, "IPv6 disabled")
1652         self.send_and_assert_no_replies(self.pg1, pm, "IPv6 disabled")
1653
1654
1655 class TestIP6LoadBalance(VppTestCase):
1656     """ IPv6 Load-Balancing """
1657
1658     def setUp(self):
1659         super(TestIP6LoadBalance, self).setUp()
1660
1661         self.create_pg_interfaces(range(5))
1662
1663         mpls_tbl = VppMplsTable(self, 0)
1664         mpls_tbl.add_vpp_config()
1665
1666         for i in self.pg_interfaces:
1667             i.admin_up()
1668             i.config_ip6()
1669             i.resolve_ndp()
1670             i.enable_mpls()
1671
1672     def tearDown(self):
1673         for i in self.pg_interfaces:
1674             i.unconfig_ip6()
1675             i.admin_down()
1676             i.disable_mpls()
1677         super(TestIP6LoadBalance, self).tearDown()
1678
1679     def send_and_expect_load_balancing(self, input, pkts, outputs):
1680         self.vapi.cli("clear trace")
1681         input.add_stream(pkts)
1682         self.pg_enable_capture(self.pg_interfaces)
1683         self.pg_start()
1684         for oo in outputs:
1685             rx = oo._get_capture(1)
1686             self.assertNotEqual(0, len(rx))
1687
1688     def send_and_expect_one_itf(self, input, pkts, itf):
1689         self.vapi.cli("clear trace")
1690         input.add_stream(pkts)
1691         self.pg_enable_capture(self.pg_interfaces)
1692         self.pg_start()
1693         rx = itf.get_capture(len(pkts))
1694
1695     def test_ip6_load_balance(self):
1696         """ IPv6 Load-Balancing """
1697
1698         #
1699         # An array of packets that differ only in the destination port
1700         #  - IP only
1701         #  - MPLS EOS
1702         #  - MPLS non-EOS
1703         #  - MPLS non-EOS with an entropy label
1704         #
1705         port_ip_pkts = []
1706         port_mpls_pkts = []
1707         port_mpls_neos_pkts = []
1708         port_ent_pkts = []
1709
1710         #
1711         # An array of packets that differ only in the source address
1712         #
1713         src_ip_pkts = []
1714         src_mpls_pkts = []
1715
1716         for ii in range(65):
1717             port_ip_hdr = (
1718                 IPv6(dst="3000::1", src="3000:1::1") /
1719                 inet6.UDP(sport=1234, dport=1234 + ii) /
1720                 Raw('\xa5' * 100))
1721             port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1722                                        dst=self.pg0.local_mac) /
1723                                  port_ip_hdr))
1724             port_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1725                                          dst=self.pg0.local_mac) /
1726                                    MPLS(label=66, ttl=2) /
1727                                    port_ip_hdr))
1728             port_mpls_neos_pkts.append((Ether(src=self.pg0.remote_mac,
1729                                               dst=self.pg0.local_mac) /
1730                                         MPLS(label=67, ttl=2) /
1731                                         MPLS(label=77, ttl=2) /
1732                                         port_ip_hdr))
1733             port_ent_pkts.append((Ether(src=self.pg0.remote_mac,
1734                                         dst=self.pg0.local_mac) /
1735                                   MPLS(label=67, ttl=2) /
1736                                   MPLS(label=14, ttl=2) /
1737                                   MPLS(label=999, ttl=2) /
1738                                   port_ip_hdr))
1739             src_ip_hdr = (
1740                 IPv6(dst="3000::1", src="3000:1::%d" % ii) /
1741                 inet6.UDP(sport=1234, dport=1234) /
1742                 Raw('\xa5' * 100))
1743             src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
1744                                       dst=self.pg0.local_mac) /
1745                                 src_ip_hdr))
1746             src_mpls_pkts.append((Ether(src=self.pg0.remote_mac,
1747                                         dst=self.pg0.local_mac) /
1748                                   MPLS(label=66, ttl=2) /
1749                                   src_ip_hdr))
1750
1751         #
1752         # A route for the IP pacekts
1753         #
1754         route_3000_1 = VppIpRoute(self, "3000::1", 128,
1755                                   [VppRoutePath(self.pg1.remote_ip6,
1756                                                 self.pg1.sw_if_index,
1757                                                 proto=DpoProto.DPO_PROTO_IP6),
1758                                    VppRoutePath(self.pg2.remote_ip6,
1759                                                 self.pg2.sw_if_index,
1760                                                 proto=DpoProto.DPO_PROTO_IP6)],
1761                                   is_ip6=1)
1762         route_3000_1.add_vpp_config()
1763
1764         #
1765         # a local-label for the EOS packets
1766         #
1767         binding = VppMplsIpBind(self, 66, "3000::1", 128, is_ip6=1)
1768         binding.add_vpp_config()
1769
1770         #
1771         # An MPLS route for the non-EOS packets
1772         #
1773         route_67 = VppMplsRoute(self, 67, 0,
1774                                 [VppRoutePath(self.pg1.remote_ip6,
1775                                               self.pg1.sw_if_index,
1776                                               labels=[67],
1777                                               proto=DpoProto.DPO_PROTO_IP6),
1778                                  VppRoutePath(self.pg2.remote_ip6,
1779                                               self.pg2.sw_if_index,
1780                                               labels=[67],
1781                                               proto=DpoProto.DPO_PROTO_IP6)])
1782         route_67.add_vpp_config()
1783
1784         #
1785         # inject the packet on pg0 - expect load-balancing across the 2 paths
1786         #  - since the default hash config is to use IP src,dst and port
1787         #    src,dst
1788         # We are not going to ensure equal amounts of packets across each link,
1789         # since the hash algorithm is statistical and therefore this can never
1790         # be guaranteed. But wuth 64 different packets we do expect some
1791         # balancing. So instead just ensure there is traffic on each link.
1792         #
1793         self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
1794                                             [self.pg1, self.pg2])
1795         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1796                                             [self.pg1, self.pg2])
1797         self.send_and_expect_load_balancing(self.pg0, port_mpls_pkts,
1798                                             [self.pg1, self.pg2])
1799         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1800                                             [self.pg1, self.pg2])
1801         self.send_and_expect_load_balancing(self.pg0, port_mpls_neos_pkts,
1802                                             [self.pg1, self.pg2])
1803
1804         #
1805         # The packets with Entropy label in should not load-balance,
1806         # since the Entorpy value is fixed.
1807         #
1808         self.send_and_expect_one_itf(self.pg0, port_ent_pkts, self.pg1)
1809
1810         #
1811         # change the flow hash config so it's only IP src,dst
1812         #  - now only the stream with differing source address will
1813         #    load-balance
1814         #
1815         self.vapi.set_ip_flow_hash(0, is_ip6=1, src=1, dst=1, sport=0, dport=0)
1816
1817         self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
1818                                             [self.pg1, self.pg2])
1819         self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts,
1820                                             [self.pg1, self.pg2])
1821         self.send_and_expect_one_itf(self.pg0, port_ip_pkts, self.pg2)
1822
1823         #
1824         # change the flow hash config back to defaults
1825         #
1826         self.vapi.set_ip_flow_hash(0, is_ip6=1, src=1, dst=1, sport=1, dport=1)
1827
1828         #
1829         # Recursive prefixes
1830         #  - testing that 2 stages of load-balancing occurs and there is no
1831         #    polarisation (i.e. only 2 of 4 paths are used)
1832         #
1833         port_pkts = []
1834         src_pkts = []
1835
1836         for ii in range(257):
1837             port_pkts.append((Ether(src=self.pg0.remote_mac,
1838                                     dst=self.pg0.local_mac) /
1839                               IPv6(dst="4000::1",
1840                                    src="4000:1::1") /
1841                               inet6.UDP(sport=1234,
1842                                         dport=1234 + ii) /
1843                               Raw('\xa5' * 100)))
1844             src_pkts.append((Ether(src=self.pg0.remote_mac,
1845                                    dst=self.pg0.local_mac) /
1846                              IPv6(dst="4000::1",
1847                                   src="4000:1::%d" % ii) /
1848                              inet6.UDP(sport=1234, dport=1234) /
1849                              Raw('\xa5' * 100)))
1850
1851         route_3000_2 = VppIpRoute(self, "3000::2", 128,
1852                                   [VppRoutePath(self.pg3.remote_ip6,
1853                                                 self.pg3.sw_if_index,
1854                                                 proto=DpoProto.DPO_PROTO_IP6),
1855                                    VppRoutePath(self.pg4.remote_ip6,
1856                                                 self.pg4.sw_if_index,
1857                                                 proto=DpoProto.DPO_PROTO_IP6)],
1858                                   is_ip6=1)
1859         route_3000_2.add_vpp_config()
1860
1861         route_4000_1 = VppIpRoute(self, "4000::1", 128,
1862                                   [VppRoutePath("3000::1",
1863                                                 0xffffffff,
1864                                                 proto=DpoProto.DPO_PROTO_IP6),
1865                                    VppRoutePath("3000::2",
1866                                                 0xffffffff,
1867                                                 proto=DpoProto.DPO_PROTO_IP6)],
1868                                   is_ip6=1)
1869         route_4000_1.add_vpp_config()
1870
1871         #
1872         # inject the packet on pg0 - expect load-balancing across all 4 paths
1873         #
1874         self.vapi.cli("clear trace")
1875         self.send_and_expect_load_balancing(self.pg0, port_pkts,
1876                                             [self.pg1, self.pg2,
1877                                              self.pg3, self.pg4])
1878         self.send_and_expect_load_balancing(self.pg0, src_pkts,
1879                                             [self.pg1, self.pg2,
1880                                              self.pg3, self.pg4])
1881
1882         #
1883         # Recursive prefixes
1884         #  - testing that 2 stages of load-balancing no choices
1885         #
1886         port_pkts = []
1887
1888         for ii in range(257):
1889             port_pkts.append((Ether(src=self.pg0.remote_mac,
1890                                     dst=self.pg0.local_mac) /
1891                               IPv6(dst="6000::1",
1892                                    src="6000:1::1") /
1893                               inet6.UDP(sport=1234,
1894                                         dport=1234 + ii) /
1895                               Raw('\xa5' * 100)))
1896
1897         route_5000_2 = VppIpRoute(self, "5000::2", 128,
1898                                   [VppRoutePath(self.pg3.remote_ip6,
1899                                                 self.pg3.sw_if_index,
1900                                                 proto=DpoProto.DPO_PROTO_IP6)],
1901                                   is_ip6=1)
1902         route_5000_2.add_vpp_config()
1903
1904         route_6000_1 = VppIpRoute(self, "6000::1", 128,
1905                                   [VppRoutePath("5000::2",
1906                                                 0xffffffff,
1907                                                 proto=DpoProto.DPO_PROTO_IP6)],
1908                                   is_ip6=1)
1909         route_6000_1.add_vpp_config()
1910
1911         #
1912         # inject the packet on pg0 - expect load-balancing across all 4 paths
1913         #
1914         self.vapi.cli("clear trace")
1915         self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg3)
1916
1917
1918 class TestIP6Punt(VppTestCase):
1919     """ IPv6 Punt Police/Redirect """
1920
1921     def setUp(self):
1922         super(TestIP6Punt, self).setUp()
1923
1924         self.create_pg_interfaces(range(4))
1925
1926         for i in self.pg_interfaces:
1927             i.admin_up()
1928             i.config_ip6()
1929             i.resolve_ndp()
1930
1931     def tearDown(self):
1932         super(TestIP6Punt, self).tearDown()
1933         for i in self.pg_interfaces:
1934             i.unconfig_ip6()
1935             i.admin_down()
1936
1937     def test_ip_punt(self):
1938         """ IP6 punt police and redirect """
1939
1940         p = (Ether(src=self.pg0.remote_mac,
1941                    dst=self.pg0.local_mac) /
1942              IPv6(src=self.pg0.remote_ip6,
1943                   dst=self.pg0.local_ip6) /
1944              inet6.TCP(sport=1234, dport=1234) /
1945              Raw('\xa5' * 100))
1946
1947         pkts = p * 1025
1948
1949         #
1950         # Configure a punt redirect via pg1.
1951         #
1952         nh_addr = VppIpAddress(self.pg1.remote_ip6).encode()
1953         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
1954                                    self.pg1.sw_if_index,
1955                                    nh_addr)
1956
1957         self.send_and_expect(self.pg0, pkts, self.pg1)
1958
1959         #
1960         # add a policer
1961         #
1962         policer = self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
1963                                             rate_type=1)
1964         self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
1965
1966         self.vapi.cli("clear trace")
1967         self.pg0.add_stream(pkts)
1968         self.pg_enable_capture(self.pg_interfaces)
1969         self.pg_start()
1970
1971         #
1972         # the number of packet recieved should be greater than 0,
1973         # but not equal to the number sent, since some were policed
1974         #
1975         rx = self.pg1._get_capture(1)
1976         self.assertGreater(len(rx), 0)
1977         self.assertLess(len(rx), len(pkts))
1978
1979         #
1980         # remove the poilcer. back to full rx
1981         #
1982         self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
1983         self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
1984                                   rate_type=1, is_add=0)
1985         self.send_and_expect(self.pg0, pkts, self.pg1)
1986
1987         #
1988         # remove the redirect. expect full drop.
1989         #
1990         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
1991                                    self.pg1.sw_if_index,
1992                                    nh_addr,
1993                                    is_add=0)
1994         self.send_and_assert_no_replies(self.pg0, pkts,
1995                                         "IP no punt config")
1996
1997         #
1998         # Add a redirect that is not input port selective
1999         #
2000         self.vapi.ip_punt_redirect(0xffffffff,
2001                                    self.pg1.sw_if_index,
2002                                    nh_addr)
2003         self.send_and_expect(self.pg0, pkts, self.pg1)
2004
2005         self.vapi.ip_punt_redirect(0xffffffff,
2006                                    self.pg1.sw_if_index,
2007                                    nh_addr,
2008                                    is_add=0)
2009
2010     def test_ip_punt_dump(self):
2011         """ IP6 punt redirect dump"""
2012
2013         #
2014         # Configure a punt redirects
2015         #
2016         nh_addr = VppIpAddress(self.pg3.remote_ip6).encode()
2017         self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
2018                                    self.pg3.sw_if_index,
2019                                    nh_addr)
2020         self.vapi.ip_punt_redirect(self.pg1.sw_if_index,
2021                                    self.pg3.sw_if_index,
2022                                    nh_addr)
2023         self.vapi.ip_punt_redirect(self.pg2.sw_if_index,
2024                                    self.pg3.sw_if_index,
2025                                    VppIpAddress('0::0').encode())
2026
2027         #
2028         # Dump pg0 punt redirects
2029         #
2030         punts = self.vapi.ip_punt_redirect_dump(self.pg0.sw_if_index,
2031                                                 is_ipv6=1)
2032         for p in punts:
2033             self.assertEqual(p.punt.rx_sw_if_index, self.pg0.sw_if_index)
2034
2035         #
2036         # Dump punt redirects for all interfaces
2037         #
2038         punts = self.vapi.ip_punt_redirect_dump(0xffffffff, is_ipv6=1)
2039         self.assertEqual(len(punts), 3)
2040         for p in punts:
2041             self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
2042         self.assertNotEqual(punts[1].punt.nh.un.ip6, self.pg3.remote_ip6)
2043         self.assertEqual(punts[2].punt.nh.un.ip6, '\x00'*16)
2044
2045
2046 class TestIPDeag(VppTestCase):
2047     """ IPv6 Deaggregate Routes """
2048
2049     def setUp(self):
2050         super(TestIPDeag, self).setUp()
2051
2052         self.create_pg_interfaces(range(3))
2053
2054         for i in self.pg_interfaces:
2055             i.admin_up()
2056             i.config_ip6()
2057             i.resolve_ndp()
2058
2059     def tearDown(self):
2060         super(TestIPDeag, self).tearDown()
2061         for i in self.pg_interfaces:
2062             i.unconfig_ip6()
2063             i.admin_down()
2064
2065     def test_ip_deag(self):
2066         """ IP Deag Routes """
2067
2068         #
2069         # Create a table to be used for:
2070         #  1 - another destination address lookup
2071         #  2 - a source address lookup
2072         #
2073         table_dst = VppIpTable(self, 1, is_ip6=1)
2074         table_src = VppIpTable(self, 2, is_ip6=1)
2075         table_dst.add_vpp_config()
2076         table_src.add_vpp_config()
2077
2078         #
2079         # Add a route in the default table to point to a deag/
2080         # second lookup in each of these tables
2081         #
2082         route_to_dst = VppIpRoute(self, "1::1", 128,
2083                                   [VppRoutePath("::",
2084                                                 0xffffffff,
2085                                                 nh_table_id=1,
2086                                                 proto=DpoProto.DPO_PROTO_IP6)],
2087                                   is_ip6=1)
2088         route_to_src = VppIpRoute(self, "1::2", 128,
2089                                   [VppRoutePath("::",
2090                                                 0xffffffff,
2091                                                 nh_table_id=2,
2092                                                 is_source_lookup=1,
2093                                                 proto=DpoProto.DPO_PROTO_IP6)],
2094                                   is_ip6=1)
2095         route_to_dst.add_vpp_config()
2096         route_to_src.add_vpp_config()
2097
2098         #
2099         # packets to these destination are dropped, since they'll
2100         # hit the respective default routes in the second table
2101         #
2102         p_dst = (Ether(src=self.pg0.remote_mac,
2103                        dst=self.pg0.local_mac) /
2104                  IPv6(src="5::5", dst="1::1") /
2105                  inet6.TCP(sport=1234, dport=1234) /
2106                  Raw('\xa5' * 100))
2107         p_src = (Ether(src=self.pg0.remote_mac,
2108                        dst=self.pg0.local_mac) /
2109                  IPv6(src="2::2", dst="1::2") /
2110                  inet6.TCP(sport=1234, dport=1234) /
2111                  Raw('\xa5' * 100))
2112         pkts_dst = p_dst * 257
2113         pkts_src = p_src * 257
2114
2115         self.send_and_assert_no_replies(self.pg0, pkts_dst,
2116                                         "IP in dst table")
2117         self.send_and_assert_no_replies(self.pg0, pkts_src,
2118                                         "IP in src table")
2119
2120         #
2121         # add a route in the dst table to forward via pg1
2122         #
2123         route_in_dst = VppIpRoute(self, "1::1", 128,
2124                                   [VppRoutePath(self.pg1.remote_ip6,
2125                                                 self.pg1.sw_if_index,
2126                                                 proto=DpoProto.DPO_PROTO_IP6)],
2127                                   is_ip6=1,
2128                                   table_id=1)
2129         route_in_dst.add_vpp_config()
2130
2131         self.send_and_expect(self.pg0, pkts_dst, self.pg1)
2132
2133         #
2134         # add a route in the src table to forward via pg2
2135         #
2136         route_in_src = VppIpRoute(self, "2::2", 128,
2137                                   [VppRoutePath(self.pg2.remote_ip6,
2138                                                 self.pg2.sw_if_index,
2139                                                 proto=DpoProto.DPO_PROTO_IP6)],
2140                                   is_ip6=1,
2141                                   table_id=2)
2142         route_in_src.add_vpp_config()
2143         self.send_and_expect(self.pg0, pkts_src, self.pg2)
2144
2145         #
2146         # loop in the lookup DP
2147         #
2148         route_loop = VppIpRoute(self, "3::3", 128,
2149                                 [VppRoutePath("::",
2150                                               0xffffffff,
2151                                               proto=DpoProto.DPO_PROTO_IP6)],
2152                                 is_ip6=1)
2153         route_loop.add_vpp_config()
2154
2155         p_l = (Ether(src=self.pg0.remote_mac,
2156                      dst=self.pg0.local_mac) /
2157                IPv6(src="3::4", dst="3::3") /
2158                inet6.TCP(sport=1234, dport=1234) /
2159                Raw('\xa5' * 100))
2160
2161         self.send_and_assert_no_replies(self.pg0, p_l * 257,
2162                                         "IP lookup loop")
2163
2164
2165 class TestIP6Input(VppTestCase):
2166     """ IPv6 Input Exceptions """
2167
2168     def setUp(self):
2169         super(TestIP6Input, self).setUp()
2170
2171         self.create_pg_interfaces(range(2))
2172
2173         for i in self.pg_interfaces:
2174             i.admin_up()
2175             i.config_ip6()
2176             i.resolve_ndp()
2177
2178     def tearDown(self):
2179         super(TestIP6Input, self).tearDown()
2180         for i in self.pg_interfaces:
2181             i.unconfig_ip6()
2182             i.admin_down()
2183
2184     def test_ip_input(self):
2185         """ IP6 Input Exceptions """
2186
2187         #
2188         # bad version - this is dropped
2189         #
2190         p_version = (Ether(src=self.pg0.remote_mac,
2191                            dst=self.pg0.local_mac) /
2192                      IPv6(src=self.pg0.remote_ip6,
2193                           dst=self.pg1.remote_ip6,
2194                           version=3) /
2195                      inet6.UDP(sport=1234, dport=1234) /
2196                      Raw('\xa5' * 100))
2197
2198         self.send_and_assert_no_replies(self.pg0, p_version * 65,
2199                                         "funky version")
2200
2201         #
2202         # hop limit - IMCP replies
2203         #
2204         p_version = (Ether(src=self.pg0.remote_mac,
2205                            dst=self.pg0.local_mac) /
2206                      IPv6(src=self.pg0.remote_ip6,
2207                           dst=self.pg1.remote_ip6,
2208                           hlim=1) /
2209                      inet6.UDP(sport=1234, dport=1234) /
2210                      Raw('\xa5' * 100))
2211
2212         rx = self.send_and_expect(self.pg0, p_version * 65, self.pg0)
2213         rx = rx[0]
2214         icmp = rx[ICMPv6TimeExceeded]
2215         self.assertEqual(icmp.type, 3)
2216         # 0: "hop limit exceeded in transit",
2217         self.assertEqual(icmp.code, 0)
2218
2219
2220 if __name__ == '__main__':
2221     unittest.main(testRunner=VppTestRunner)