IPv6 NS RS tests and fixes
[vpp.git] / test / test_ip6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import socket
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_sub_interface import VppSubInterface, VppDot1QSubint
8
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether, Dot1Q
11 from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, ICMPv6ND_RA, \
12     ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation
13 from util import ppp
14 from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
15     in6_mactoifaceid
16 from scapy.utils import inet_pton, inet_ntop
17
18
19 def mk_ll_addr(mac):
20     euid = in6_mactoifaceid(mac)
21     addr = "fe80::" + euid
22     return addr
23
24
25 class TestIPv6(VppTestCase):
26     """ IPv6 Test Case """
27
28     @classmethod
29     def setUpClass(cls):
30         super(TestIPv6, cls).setUpClass()
31
32     def setUp(self):
33         """
34         Perform test setup before test case.
35
36         **Config:**
37             - create 3 pg interfaces
38                 - untagged pg0 interface
39                 - Dot1Q subinterface on pg1
40                 - Dot1AD subinterface on pg2
41             - setup interfaces:
42                 - put it into UP state
43                 - set IPv6 addresses
44                 - resolve neighbor address using NDP
45             - configure 200 fib entries
46
47         :ivar list interfaces: pg interfaces and subinterfaces.
48         :ivar dict flows: IPv4 packet flows in test.
49         :ivar list pg_if_packet_sizes: packet sizes in test.
50
51         *TODO:* Create AD sub interface
52         """
53         super(TestIPv6, self).setUp()
54
55         # create 3 pg interfaces
56         self.create_pg_interfaces(range(3))
57
58         # create 2 subinterfaces for p1 and pg2
59         self.sub_interfaces = [
60             VppDot1QSubint(self, self.pg1, 100),
61             VppDot1QSubint(self, self.pg2, 200)
62             # TODO: VppDot1ADSubint(self, self.pg2, 200, 300, 400)
63         ]
64
65         # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc.
66         self.flows = dict()
67         self.flows[self.pg0] = [self.pg1.sub_if, self.pg2.sub_if]
68         self.flows[self.pg1.sub_if] = [self.pg0, self.pg2.sub_if]
69         self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
70
71         # packet sizes
72         self.pg_if_packet_sizes = [64, 512, 1518, 9018]
73         self.sub_if_packet_sizes = [64, 512, 1518 + 4, 9018 + 4]
74
75         self.interfaces = list(self.pg_interfaces)
76         self.interfaces.extend(self.sub_interfaces)
77
78         # setup all interfaces
79         for i in self.interfaces:
80             i.admin_up()
81             i.config_ip6()
82             i.resolve_ndp()
83
84         # config 2M FIB entries
85         self.config_fib_entries(200)
86
87     def tearDown(self):
88         """Run standard test teardown and log ``show ip6 neighbors``."""
89         for i in self.sub_interfaces:
90             i.unconfig_ip6()
91             i.ip6_disable()
92             i.admin_down()
93             i.remove_vpp_config()
94
95         super(TestIPv6, self).tearDown()
96         if not self.vpp_dead:
97             self.logger.info(self.vapi.cli("show ip6 neighbors"))
98             # info(self.vapi.cli("show ip6 fib"))  # many entries
99
100     def config_fib_entries(self, count):
101         """For each interface add to the FIB table *count* routes to
102         "fd02::1/128" destination with interface's local address as next-hop
103         address.
104
105         :param int count: Number of FIB entries.
106
107         - *TODO:* check if the next-hop address shouldn't be remote address
108           instead of local address.
109         """
110         n_int = len(self.interfaces)
111         percent = 0
112         counter = 0.0
113         dest_addr = socket.inet_pton(socket.AF_INET6, "fd02::1")
114         dest_addr_len = 128
115         for i in self.interfaces:
116             next_hop_address = i.local_ip6n
117             for j in range(count / n_int):
118                 self.vapi.ip_add_del_route(
119                     dest_addr, dest_addr_len, next_hop_address, is_ipv6=1)
120                 counter += 1
121                 if counter / count * 100 > percent:
122                     self.logger.info("Configure %d FIB entries .. %d%% done" %
123                                      (count, percent))
124                     percent += 1
125
126     def create_stream(self, src_if, packet_sizes):
127         """Create input packet stream for defined interface.
128
129         :param VppInterface src_if: Interface to create packet stream for.
130         :param list packet_sizes: Required packet sizes.
131         """
132         pkts = []
133         for i in range(0, 257):
134             dst_if = self.flows[src_if][i % 2]
135             info = self.create_packet_info(src_if, dst_if)
136             payload = self.info_to_payload(info)
137             p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
138                  IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
139                  UDP(sport=1234, dport=1234) /
140                  Raw(payload))
141             info.data = p.copy()
142             if isinstance(src_if, VppSubInterface):
143                 p = src_if.add_dot1_layer(p)
144             size = packet_sizes[(i // 2) % len(packet_sizes)]
145             self.extend_packet(p, size)
146             pkts.append(p)
147         return pkts
148
149     def verify_capture(self, dst_if, capture):
150         """Verify captured input packet stream for defined interface.
151
152         :param VppInterface dst_if: Interface to verify captured packet stream
153                                     for.
154         :param list capture: Captured packet stream.
155         """
156         self.logger.info("Verifying capture on interface %s" % dst_if.name)
157         last_info = dict()
158         for i in self.interfaces:
159             last_info[i.sw_if_index] = None
160         is_sub_if = False
161         dst_sw_if_index = dst_if.sw_if_index
162         if hasattr(dst_if, 'parent'):
163             is_sub_if = True
164         for packet in capture:
165             if is_sub_if:
166                 # Check VLAN tags and Ethernet header
167                 packet = dst_if.remove_dot1_layer(packet)
168             self.assertTrue(Dot1Q not in packet)
169             try:
170                 ip = packet[IPv6]
171                 udp = packet[UDP]
172                 payload_info = self.payload_to_info(str(packet[Raw]))
173                 packet_index = payload_info.index
174                 self.assertEqual(payload_info.dst, dst_sw_if_index)
175                 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
176                                   (dst_if.name, payload_info.src, packet_index))
177                 next_info = self.get_next_packet_info_for_interface2(
178                     payload_info.src, dst_sw_if_index,
179                     last_info[payload_info.src])
180                 last_info[payload_info.src] = next_info
181                 self.assertTrue(next_info is not None)
182                 self.assertEqual(packet_index, next_info.index)
183                 saved_packet = next_info.data
184                 # Check standard fields
185                 self.assertEqual(ip.src, saved_packet[IPv6].src)
186                 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
187                 self.assertEqual(udp.sport, saved_packet[UDP].sport)
188                 self.assertEqual(udp.dport, saved_packet[UDP].dport)
189             except:
190                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
191                 raise
192         for i in self.interfaces:
193             remaining_packet = self.get_next_packet_info_for_interface2(
194                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
195             self.assertTrue(remaining_packet is None,
196                             "Interface %s: Packet expected from interface %s "
197                             "didn't arrive" % (dst_if.name, i.name))
198
199     def test_fib(self):
200         """ IPv6 FIB test
201
202         Test scenario:
203             - Create IPv6 stream for pg0 interface
204             - Create IPv6 tagged streams for pg1's and pg2's subinterface.
205             - Send and verify received packets on each interface.
206         """
207
208         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes)
209         self.pg0.add_stream(pkts)
210
211         for i in self.sub_interfaces:
212             pkts = self.create_stream(i, self.sub_if_packet_sizes)
213             i.parent.add_stream(pkts)
214
215         self.pg_enable_capture(self.pg_interfaces)
216         self.pg_start()
217
218         pkts = self.pg0.get_capture()
219         self.verify_capture(self.pg0, pkts)
220
221         for i in self.sub_interfaces:
222             pkts = i.parent.get_capture()
223             self.verify_capture(i, pkts)
224
225     def send_and_assert_no_replies(self, intf, pkts, remark):
226         intf.add_stream(pkts)
227         self.pg_enable_capture(self.pg_interfaces)
228         self.pg_start()
229         intf.assert_nothing_captured(remark=remark)
230
231     def test_ns(self):
232         """ IPv6 Neighbour Soliciatation Exceptions
233
234         Test sceanrio:
235            - Send an NS Sourced from an address not covered by the link sub-net
236            - Send an NS to an mcast address the router has not joined
237            - Send NS for a target address the router does not onn.
238         """
239
240         #
241         # An NS from a non link source address
242         #
243         nsma = in6_getnsma(inet_pton(socket.AF_INET6, self.pg0.local_ip6))
244         d = inet_ntop(socket.AF_INET6, nsma)
245
246         p = (Ether(dst=in6_getnsmac(nsma)) /
247              IPv6(dst=d, src="2002::2") /
248              ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
249              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
250         pkts = [p]
251
252         self.send_and_assert_no_replies(self.pg0, pkts,
253                                         "No response to NS source by address not on sub-net")
254
255         #
256         # An NS for sent to a solicited mcast group the router is not a member of
257         #  FAILS
258         #
259         if 0:
260             nsma = in6_getnsma(inet_pton(socket.AF_INET6, "fd::ffff"))
261             d = inet_ntop(socket.AF_INET6, nsma)
262
263             p = (Ether(dst=in6_getnsmac(nsma)) /
264                  IPv6(dst=d, src=self.pg0.remote_ip6) /
265                  ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
266                  ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
267             pkts = [p]
268
269             self.send_and_assert_no_replies(self.pg0, pkts,
270                                             "No response to NS sent to unjoined mcast address")
271
272         #
273         # An NS whose target address is one the router does not own
274         #
275         nsma = in6_getnsma(inet_pton(socket.AF_INET6, self.pg0.local_ip6))
276         d = inet_ntop(socket.AF_INET6, nsma)
277
278         p = (Ether(dst=in6_getnsmac(nsma)) /
279              IPv6(dst=d, src=self.pg0.remote_ip6) /
280              ICMPv6ND_NS(tgt="fd::ffff") /
281              ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
282         pkts = [p]
283
284         self.send_and_assert_no_replies(self.pg0, pkts,
285                                         "No response to NS for unknown target")
286
287     def send_and_expect_ra(self, intf, pkts, remark, src_ip=None):
288         if not src_ip:
289             src_ip = intf.remote_ip6
290         intf.add_stream(pkts)
291         self.pg0.add_stream(pkts)
292         self.pg_enable_capture(self.pg_interfaces)
293         self.pg_start()
294         rx = intf.get_capture(1)
295
296         self.assertEqual(len(rx), 1)
297         rx = rx[0]
298
299         # the rx'd RA should be addressed to the sender's source
300         self.assertTrue(rx.haslayer(ICMPv6ND_RA))
301         self.assertEqual(in6_ptop(rx[IPv6].dst),
302                          in6_ptop(src_ip))
303
304         # and come from the router's link local
305         self.assertTrue(in6_islladdr(rx[IPv6].src))
306         self.assertEqual(in6_ptop(rx[IPv6].src),
307                          in6_ptop(mk_ll_addr(intf.local_mac)))
308
309     def test_rs(self):
310         """ IPv6 Router Soliciatation Exceptions
311
312         Test sceanrio:
313         """
314
315         #
316         # Before we begin change the IPv6 RA responses to use the unicast address
317         # that way we will not confuse them with the periodic Ras which go to the Mcast
318         # address
319         #
320         self.pg0.ip6_ra_config(send_unicast=1)
321
322         #
323         # An RS from a link source address
324         #  - expect an RA in return
325         #
326         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
327              IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
328              ICMPv6ND_RS())
329         pkts = [p]
330         self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
331
332         #
333         # For the next RS sent the RA should be rate limited
334         #
335         self.send_and_assert_no_replies(self.pg0, pkts, "RA rate limited")
336
337         #
338         # When we reconfiure the IPv6 RA config, we reset the RA rate limiting,
339         # so we need to do this before each test below so as not to drop packets for
340         # rate limiting reasons. Test this works here.
341         #
342         self.pg0.ip6_ra_config(send_unicast=1)
343         self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
344
345         #
346         # An RS sent from a non-link local source
347         #
348         self.pg0.ip6_ra_config(send_unicast=1)
349         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
350              IPv6(dst=self.pg0.local_ip6, src="2002::ffff") /
351              ICMPv6ND_RS())
352         pkts = [p]
353         self.send_and_assert_no_replies(self.pg0, pkts,
354                                         "RS from non-link source")
355
356         #
357         # Source an RS from a link local address
358         #
359         self.pg0.ip6_ra_config(send_unicast=1)
360         ll = mk_ll_addr(self.pg0.remote_mac)
361         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
362              IPv6(dst=self.pg0.local_ip6, src=ll) /
363              ICMPv6ND_RS())
364         pkts = [p]
365         self.send_and_expect_ra(
366             self.pg0, pkts, "RS sourced from link-local", src_ip=ll)
367
368         #
369         # Source from the unspecified address ::. This happens when the RS is sent before
370         # the host has a configured address/sub-net, i.e. auto-config.
371         # Since the sender has no IP address, the reply comes back mcast - so the
372         # capture needs to not filter this.
373         # If we happen to pick up the periodic RA at this point then so be it, it's not
374         # an error.
375         #
376         self.pg0.ip6_ra_config(send_unicast=1)
377         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
378              IPv6(dst=self.pg0.local_ip6, src="::") /
379              ICMPv6ND_RS())
380         pkts = [p]
381
382         self.pg0.add_stream(pkts)
383         self.pg0.add_stream(pkts)
384         self.pg_enable_capture(self.pg_interfaces)
385         self.pg_start()
386         capture = self.pg0.get_capture(1, filter_out_fn=None)
387         found = 0
388         for rx in capture:
389             if (rx.haslayer(ICMPv6ND_RA)):
390                 # and come from the router's link local
391                 self.assertTrue(in6_islladdr(rx[IPv6].src))
392                 self.assertEqual(in6_ptop(rx[IPv6].src),
393                                  in6_ptop(mk_ll_addr(self.pg0.local_mac)))
394                 # sent to the all hosts mcast
395                 self.assertEqual(in6_ptop(rx[IPv6].dst), "ff02::1")
396
397                 found = 1
398         self.assertTrue(found)
399
400     @unittest.skip("Unsupported")
401     def test_mrs(self):
402         """ IPv6 Multicast Router Soliciatation Exceptions
403
404         Test sceanrio:
405         """
406
407         #
408         # An RS from a link source address
409         #  - expect an RA in return
410         #
411         nsma = in6_getnsma(inet_pton(socket.AF_INET6, self.pg0.local_ip6))
412         d = inet_ntop(socket.AF_INET6, nsma)
413
414         p = (Ether(dst=getmacbyip6("ff02::2")) /
415              IPv6(dst=d, src=self.pg0.remote_ip6) /
416              ICMPv6MRD_Solicitation())
417         pkts = [p]
418
419         self.pg0.add_stream(pkts)
420         self.pg_enable_capture(self.pg_interfaces)
421         self.pg_start()
422         self.pg0.assert_nothing_captured(
423             remark="No response to NS source by address not on sub-net")
424
425         #
426         # An RS from a non link source address
427         #
428         nsma = in6_getnsma(inet_pton(socket.AF_INET6, self.pg0.local_ip6))
429         d = inet_ntop(socket.AF_INET6, nsma)
430
431         p = (Ether(dst=getmacbyip6("ff02::2")) /
432              IPv6(dst=d, src="2002::2") /
433              ICMPv6MRD_Solicitation())
434         pkts = [p]
435
436         self.send_and_assert_no_replies(self.pg0, pkts,
437                                         "RA rate limited")
438         self.pg0.add_stream(pkts)
439         self.pg_enable_capture(self.pg_interfaces)
440         self.pg_start()
441         self.pg0.assert_nothing_captured(
442             remark="No response to NS source by address not on sub-net")
443
444
445 if __name__ == '__main__':
446     unittest.main(testRunner=VppTestRunner)