import unittest
from parameterized import parameterized
-import scapy.compat
import scapy.layers.inet6 as inet6
from scapy.layers.inet import UDP, IP
from scapy.contrib.mpls import MPLS
ICMPv6EchoReply,
IPv6ExtHdrHopByHop,
ICMPv6MLReport2,
- ICMPv6MLDMultAddrRec,
)
from scapy.layers.l2 import Ether, Dot1Q, GRE
from scapy.packet import Raw
in6_getnsmac,
in6_ptop,
in6_islladdr,
- in6_mactoifaceid,
)
from six import moves
-from framework import VppTestCase, VppTestRunner, tag_run_solo
+from framework import VppTestCase
+from asfframework import VppTestRunner, tag_run_solo
from util import ppp, ip6_normalize, mk_ll_addr
from vpp_papi import VppEnum
-from vpp_ip import DpoProto, VppIpPuntPolicer, VppIpPuntRedirect, VppIpPathMtu
+from vpp_ip import VppIpPuntPolicer, VppIpPuntRedirect, VppIpPathMtu
from vpp_ip_route import (
VppIpRoute,
VppRoutePath,
self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
# packet is from the router's local address
- self.assertEqual(in6_ptop(rx[IPv6].src), intf.local_ip6)
+ self.assertEqual(in6_ptop(rx[IPv6].src), intf.local_ip6_ll)
# Src link-layer options should have the router's MAC
sll = rx[ICMPv6NDOptSrcLLAddr]
self.assertEqual(ip.src, sip)
self.assertEqual(ip.dst, dip)
+ def get_ip6_nd_rx_requests(self, itf):
+ """Get IP6 ND RX request stats for and interface"""
+ return self.statistics["/net/ip6-nd/rx/requests"][:, itf.sw_if_index].sum()
+
+ def get_ip6_nd_tx_requests(self, itf):
+ """Get IP6 ND TX request stats for and interface"""
+ return self.statistics["/net/ip6-nd/tx/requests"][:, itf.sw_if_index].sum()
+
+ def get_ip6_nd_rx_replies(self, itf):
+ """Get IP6 ND RX replies stats for and interface"""
+ return self.statistics["/net/ip6-nd/rx/replies"][:, itf.sw_if_index].sum()
+
+ def get_ip6_nd_tx_replies(self, itf):
+ """Get IP6 ND TX replies stats for and interface"""
+ return self.statistics["/net/ip6-nd/tx/replies"][:, itf.sw_if_index].sum()
+
@tag_run_solo
class TestIPv6(TestIPv6ND):
def tearDown(self):
"""Run standard test teardown and log ``show ip6 neighbors``."""
- for i in self.interfaces:
+ for i in reversed(self.interfaces):
i.unconfig_ip6()
i.admin_down()
for i in self.sub_interfaces:
- Send NS for a target address the router does not onn.
"""
+ n_rx_req_pg0 = self.get_ip6_nd_rx_requests(self.pg0)
+ n_tx_rep_pg0 = self.get_ip6_nd_tx_replies(self.pg0)
+
#
# An NS from a non link source address
#
self.send_and_assert_no_replies(
self.pg0, pkts, "No response to NS source by address not on sub-net"
)
+ self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 1)
#
# An NS for sent to a solicited mcast group the router is
self.send_and_assert_no_replies(
self.pg0, pkts, "No response to NS for unknown target"
)
+ self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 2)
#
# A neighbor entry that has no associated FIB-entry
dst_ip=self.pg0._remote_hosts[2].ip6_ll,
tgt_ip=self.pg0.local_ip6,
)
+ self.assert_equal(self.get_ip6_nd_rx_requests(self.pg0), n_rx_req_pg0 + 3)
+ self.assert_equal(self.get_ip6_nd_tx_replies(self.pg0), n_tx_rep_pg0 + 1)
#
# we should have learned an ND entry for the peer's link-local
)
self.assertFalse(find_route(self, self.pg0._remote_hosts[3].ip6_ll, 128))
+ def test_nd_incomplete(self):
+ """IP6-ND Incomplete"""
+ self.pg1.generate_remote_hosts(3)
+
+ p0 = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_hosts[1].ip6)
+ / UDP(sport=1234, dport=1234)
+ / Raw()
+ )
+
+ #
+ # a packet to an unresolved destination generates an ND request
+ #
+ n_tx_req_pg1 = self.get_ip6_nd_tx_requests(self.pg1)
+ self.send_and_expect_ns(self.pg0, self.pg1, p0, self.pg1.remote_hosts[1].ip6)
+ self.assert_equal(self.get_ip6_nd_tx_requests(self.pg1), n_tx_req_pg1 + 1)
+
+ #
+ # a reply to the request
+ #
+ self.assert_equal(self.get_ip6_nd_rx_replies(self.pg1), 0)
+ na = (
+ Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
+ / IPv6(dst=self.pg1.local_ip6, src=self.pg1.remote_hosts[1].ip6)
+ / ICMPv6ND_NA(tgt=self.pg1.remote_hosts[1].ip6)
+ / ICMPv6NDOptSrcLLAddr(lladdr=self.pg1.remote_hosts[1].mac)
+ )
+ self.send_and_assert_no_replies(self.pg1, [na])
+ self.assert_equal(self.get_ip6_nd_rx_replies(self.pg1), 1)
+
def test_ns_duplicates(self):
"""ND Duplicates"""
rx = rx[0]
self.validate_ra(intf, rx, dst_ip, src_ip=src_ip, pi_opt=opt)
+ def test_ip6_ra_dump(self):
+ """IPv6 RA dump"""
+
+ # Dump IPv6 RA for all interfaces
+ ip6_ra_dump = self.vapi.sw_interface_ip6nd_ra_dump(sw_if_index=0xFFFFFFFF)
+ self.assertEqual(len(ip6_ra_dump), len(self.interfaces))
+
+ for ip6_ra in ip6_ra_dump:
+ self.assertFalse(ip6_ra.send_radv)
+ self.assertEqual(ip6_ra.n_prefixes, 0)
+ self.assertEqual(len(ip6_ra.prefixes), 0)
+ self.assertEqual(ip6_ra.last_radv_time, 0.0)
+ self.assertEqual(ip6_ra.last_multicast_time, 0.0)
+ self.assertEqual(ip6_ra.next_multicast_time, 0.0)
+ self.assertEqual(ip6_ra.n_advertisements_sent, 0)
+ self.assertEqual(ip6_ra.n_solicitations_rcvd, 0)
+ self.assertEqual(ip6_ra.n_solicitations_dropped, 0)
+
+ # Enable sending IPv6 RA for an interface
+ self.pg0.ip6_ra_config(no=1, suppress=1)
+
+ # Add IPv6 RA prefixes for the interface
+ pfx0 = IPv6Network(
+ "%s/%s" % (self.pg0.local_ip6, self.pg0.local_ip6_prefix_len), strict=False
+ )
+ pfx1 = IPv6Network("fafa::/96")
+ self.pg0.ip6_ra_prefix(pfx0, off_link=0, no_autoconfig=0)
+ self.pg0.ip6_ra_prefix(pfx1, off_link=1, no_autoconfig=1)
+
+ # Wait for multicast IPv6 RA
+ self.sleep(1)
+
+ # Dump IPv6 RA for the interface
+ ip6_ra_dump = self.vapi.sw_interface_ip6nd_ra_dump(
+ sw_if_index=self.pg0.sw_if_index
+ )
+ self.assertEqual(len(ip6_ra_dump), 1)
+ ip6_ra = ip6_ra_dump[0]
+
+ self.assertEqual(ip6_ra.sw_if_index, self.pg0.sw_if_index)
+ self.assertTrue(ip6_ra.send_radv)
+ self.assertEqual(ip6_ra.n_prefixes, 2)
+ self.assertEqual(len(ip6_ra.prefixes), 2)
+ self.assertEqual(ip6_ra.last_radv_time, 0.0)
+ self.assertGreater(ip6_ra.last_multicast_time, 0.0)
+ self.assertGreater(ip6_ra.next_multicast_time, 0.0)
+ self.assertGreater(ip6_ra.n_advertisements_sent, 0)
+ self.assertEqual(ip6_ra.n_solicitations_rcvd, 0)
+ self.assertEqual(ip6_ra.n_solicitations_dropped, 0)
+
+ self.assertEqual(ip6_ra.prefixes[0].prefix, pfx0)
+ self.assertTrue(ip6_ra.prefixes[0].onlink_flag)
+ self.assertTrue(ip6_ra.prefixes[0].autonomous_flag)
+ self.assertFalse(ip6_ra.prefixes[0].no_advertise)
+
+ self.assertEqual(ip6_ra.prefixes[1].prefix, pfx1)
+ self.assertFalse(ip6_ra.prefixes[1].onlink_flag)
+ self.assertFalse(ip6_ra.prefixes[1].autonomous_flag)
+ self.assertFalse(ip6_ra.prefixes[1].no_advertise)
+
+ # Reset sending IPv6 RA for the interface
+ self.pg0.ip6_ra_config(suppress=1)
+
+ # Remove IPv6 RA prefixes for the interface
+ self.pg0.ip6_ra_prefix(pfx0, is_no=1)
+ self.pg0.ip6_ra_prefix(pfx1, is_no=1)
+
+ # Dump IPv6 RA for the interface
+ ip6_ra_dump = self.vapi.sw_interface_ip6nd_ra_dump(
+ sw_if_index=self.pg0.sw_if_index
+ )
+ self.assertEqual(len(ip6_ra_dump), 1)
+ ip6_ra = ip6_ra_dump[0]
+
+ self.assertEqual(ip6_ra.sw_if_index, self.pg0.sw_if_index)
+ self.assertFalse(ip6_ra.send_radv)
+ self.assertEqual(ip6_ra.n_prefixes, 0)
+ self.assertEqual(len(ip6_ra.prefixes), 0)
+
def test_rs(self):
"""IPv6 Router Solicitation Exceptions
self.pg_start()
subitf = VppDot1QSubint(self, self.pg1, 99)
+ self.interfaces.append(subitf)
+ self.sub_interfaces.append(subitf)
subitf.admin_up()
subitf.config_ip6()
]
)
def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
-
self._testMethodDoc = "IPv6 Input Exception - %s" % name
p_version = (
self.assertEqual(str(punts[2].punt.nh), "::")
+class TestIP6InterfaceRx(VppTestCase):
+ """IPv6 Interface Receive"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIP6InterfaceRx, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIP6InterfaceRx, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestIP6InterfaceRx, self).setUp()
+
+ self.create_pg_interfaces(range(3))
+
+ table_id = 0
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+
+ if table_id != 0:
+ table = VppIpTable(self, table_id, is_ip6=1)
+ table.add_vpp_config()
+
+ i.set_table_ip6(table_id)
+ i.config_ip6()
+ i.resolve_ndp()
+ table_id += 1
+
+ def tearDown(self):
+ for i in self.pg_interfaces:
+ i.unconfig_ip6()
+ i.admin_down()
+ i.set_table_ip6(0)
+
+ super(TestIP6InterfaceRx, self).tearDown()
+
+ def test_interface_rx(self):
+ """IPv6 Interface Receive"""
+
+ #
+ # add a route in the default table to receive ...
+ #
+ route_to_dst = VppIpRoute(
+ self,
+ "1::",
+ 122,
+ [
+ VppRoutePath(
+ "::",
+ self.pg1.sw_if_index,
+ type=FibPathType.FIB_PATH_TYPE_INTERFACE_RX,
+ )
+ ],
+ )
+ route_to_dst.add_vpp_config()
+
+ #
+ # packets to these destination are dropped, since they'll
+ # hit the respective default routes in table 1
+ #
+ p_dst = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IPv6(src="5::5", dst="1::1")
+ / inet6.TCP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+ pkts_dst = p_dst * 10
+
+ self.send_and_assert_no_replies(self.pg0, pkts_dst, "IP in table 1")
+
+ #
+ # add a route in the dst table to forward via pg1
+ #
+ route_in_dst = VppIpRoute(
+ self,
+ "1::1",
+ 128,
+ [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
+ table_id=1,
+ )
+ route_in_dst.add_vpp_config()
+
+ self.send_and_expect(self.pg0, pkts_dst, self.pg1)
+
+ #
+ # add a route in the default table to receive ...
+ #
+ route_to_dst = VppIpRoute(
+ self,
+ "1::",
+ 122,
+ [
+ VppRoutePath(
+ "::",
+ self.pg2.sw_if_index,
+ type=FibPathType.FIB_PATH_TYPE_INTERFACE_RX,
+ )
+ ],
+ table_id=1,
+ )
+ route_to_dst.add_vpp_config()
+
+ #
+ # packets to these destination are dropped, since they'll
+ # hit the respective default routes in table 2
+ #
+ p_dst = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IPv6(src="6::6", dst="1::2")
+ / inet6.TCP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+ pkts_dst = p_dst * 10
+
+ self.send_and_assert_no_replies(self.pg0, pkts_dst, "IP in table 2")
+
+ #
+ # add a route in the table 2 to forward via pg2
+ #
+ route_in_dst = VppIpRoute(
+ self,
+ "1::2",
+ 128,
+ [VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index)],
+ table_id=2,
+ )
+ route_in_dst.add_vpp_config()
+
+ self.send_and_expect(self.pg0, pkts_dst, self.pg2)
+
+
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)