import socket
import unittest
+from parameterized import parameterized
import scapy.layers.inet6 as inet6
from scapy.contrib.mpls import MPLS
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
from six import moves
from framework import VppTestCase, VppTestRunner
-from util import ppp, ip6_normalize
+from util import ppp, ip6_normalize, mk_ll_addr
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
- VppMplsRoute, VppMplsTable, VppIpTable, VppIpAddress
+ VppMplsRoute, VppMplsTable, VppIpTable
from vpp_neighbor import find_nbr, VppNeighbor
from vpp_pg_interface import is_ipv6_misc
from vpp_sub_interface import VppSubInterface, VppDot1QSubint
+from ipaddress import IPv6Network, IPv4Network
AF_INET6 = socket.AF_INET6
-
-def mk_ll_addr(mac):
- euid = in6_mactoifaceid(mac)
- addr = "fe80::" + euid
- return addr
+try:
+ text_type = unicode
+except NameError:
+ text_type = str
class TestIPv6ND(VppTestCase):
self.pg0.sw_if_index,
self.pg0.remote_hosts[2].mac,
self.pg0.remote_hosts[2].ip6,
- af=AF_INET6,
is_no_fib_entry=1)
nd_entry.add_vpp_config()
#
self.assertTrue(find_nbr(self,
self.pg0.sw_if_index,
- self.pg0._remote_hosts[2].ip6,
- inet=AF_INET6))
+ self.pg0._remote_hosts[2].ip6))
self.assertFalse(find_route(self,
self.pg0._remote_hosts[2].ip6,
128,
#
self.assertTrue(find_nbr(self,
self.pg0.sw_if_index,
- self.pg0._remote_hosts[2].ip6_ll,
- inet=AF_INET6))
+ self.pg0._remote_hosts[2].ip6_ll))
self.assertFalse(find_route(self,
self.pg0._remote_hosts[2].ip6_ll,
128,
#
self.assertTrue(find_nbr(self,
self.pg0.sw_if_index,
- self.pg0._remote_hosts[3].ip6_ll,
- inet=AF_INET6))
+ self.pg0._remote_hosts[3].ip6_ll))
self.assertFalse(find_route(self,
self.pg0._remote_hosts[3].ip6_ll,
128,
ns_pg1 = VppNeighbor(self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[1].mac,
- self.pg1.remote_hosts[1].ip6,
- af=AF_INET6)
+ self.pg1.remote_hosts[1].ip6)
ns_pg1.add_vpp_config()
ns_pg2 = VppNeighbor(self,
self.pg2.sw_if_index,
self.pg2.remote_mac,
- self.pg1.remote_hosts[1].ip6,
- af=AF_INET6)
+ self.pg1.remote_hosts[1].ip6)
ns_pg2.add_vpp_config()
#
#
# Configure The RA to announce the links prefix
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
+ self.pg0.ip6_ra_prefix(self.pg0.local_ip6,
self.pg0.local_ip6_prefix_len)
#
# Change the prefix info to not off-link
# L-flag is clear
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
+ self.pg0.ip6_ra_prefix(self.pg0.local_ip6,
self.pg0.local_ip6_prefix_len,
off_link=1)
# Change the prefix info to not off-link, no-autoconfig
# L and A flag are clear in the advert
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
+ self.pg0.ip6_ra_prefix(self.pg0.local_ip6,
self.pg0.local_ip6_prefix_len,
off_link=1,
no_autoconfig=1)
# Change the flag settings back to the defaults
# L and A flag are set in the advert
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
+ self.pg0.ip6_ra_prefix(self.pg0.local_ip6,
self.pg0.local_ip6_prefix_len)
opt = ICMPv6NDOptPrefixInfo(
# Change the prefix info to not off-link, no-autoconfig
# L and A flag are clear in the advert
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
+ self.pg0.ip6_ra_prefix(self.pg0.local_ip6,
self.pg0.local_ip6_prefix_len,
off_link=1,
no_autoconfig=1)
# Use the reset to defults option to revert to defaults
# L and A flag are clear in the advert
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
+ self.pg0.ip6_ra_prefix(self.pg0.local_ip6,
self.pg0.local_ip6_prefix_len,
use_default=1)
#
# Advertise Another prefix. With no L-flag/A-flag
#
- self.pg0.ip6_ra_prefix(self.pg1.local_ip6n,
+ self.pg0.ip6_ra_prefix(self.pg1.local_ip6,
self.pg1.local_ip6_prefix_len,
off_link=1,
no_autoconfig=1)
# Remove the first refix-info - expect the second is still in the
# advert
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
+ self.pg0.ip6_ra_prefix(self.pg0.local_ip6,
self.pg0.local_ip6_prefix_len,
is_no=1)
#
# Remove the second prefix-info - expect no prefix-info i nthe adverts
#
- self.pg0.ip6_ra_prefix(self.pg1.local_ip6n,
+ self.pg0.ip6_ra_prefix(self.pg1.local_ip6,
self.pg1.local_ip6_prefix_len,
is_no=1)
self.pg1.local_mac)
def verify_prefix_info(self, reported_prefix, prefix_option):
- prefix = socket.inet_pton(socket.AF_INET6,
- prefix_option.getfieldval("prefix"))
- self.assert_equal(reported_prefix.dst_address, prefix)
- self.assert_equal(reported_prefix.dst_address_length,
- prefix_option.getfieldval("prefixlen"))
+ prefix = IPv6Network(
+ text_type(prefix_option.getfieldval("prefix") +
+ "/" +
+ text_type(prefix_option.getfieldval("prefixlen"))),
+ strict=False)
+ self.assert_equal(reported_prefix.prefix.network_address,
+ prefix.network_address)
L = prefix_option.getfieldval("L")
A = prefix_option.getfieldval("A")
option_flags = (L << 7) | (A << 6)
#
# Add proxy support for the host
#
- self.vapi.ip6_nd_proxy(
- inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
- self.pg1.sw_if_index)
+ self.vapi.ip6nd_proxy_add_del(
+ ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
+ sw_if_index=self.pg1.sw_if_index)
#
# try that NS again. this time we expect an NA back
#
self.assertTrue(find_nbr(self,
self.pg1.sw_if_index,
- self.pg0._remote_hosts[2].ip6,
- inet=AF_INET6))
+ self.pg0._remote_hosts[2].ip6))
#
# ... and we can route traffic to it
ICMPv6NDOptSrcLLAddr(
lladdr=self.pg0._remote_hosts[2].mac))
- self.vapi.ip6_nd_proxy(
- inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
- self.pg2.sw_if_index)
+ self.vapi.ip6nd_proxy_add_del(
+ ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
+ sw_if_index=self.pg2.sw_if_index)
self.send_and_expect_na(self.pg2, ns_pg2,
"NS to proxy entry other interface",
self.assertTrue(find_nbr(self,
self.pg2.sw_if_index,
- self.pg0._remote_hosts[3].ip6,
- inet=AF_INET6))
+ self.pg0._remote_hosts[3].ip6))
#
# hosts can communicate. pg2->pg1
#
# remove the proxy configs
#
- self.vapi.ip6_nd_proxy(
- inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
- self.pg1.sw_if_index,
- is_del=1)
- self.vapi.ip6_nd_proxy(
- inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
- self.pg2.sw_if_index,
- is_del=1)
+ self.vapi.ip6nd_proxy_add_del(
+ ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
+ sw_if_index=self.pg1.sw_if_index, is_del=1)
+ self.vapi.ip6nd_proxy_add_del(
+ ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
+ sw_if_index=self.pg2.sw_if_index, is_del=1)
self.assertFalse(find_nbr(self,
self.pg2.sw_if_index,
- self.pg0._remote_hosts[3].ip6,
- inet=AF_INET6))
+ self.pg0._remote_hosts[3].ip6))
self.assertFalse(find_nbr(self,
self.pg1.sw_if_index,
- self.pg0._remote_hosts[2].ip6,
- inet=AF_INET6))
+ self.pg0._remote_hosts[2].ip6))
#
# no longer proxy-ing...
i.disable_mpls()
super(TestIP6LoadBalance, self).tearDown()
- def send_and_expect_load_balancing(self, input, pkts, outputs):
+ def pg_send(self, input, pkts):
self.vapi.cli("clear trace")
input.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
+
+ def send_and_expect_load_balancing(self, input, pkts, outputs):
+ self.pg_send(input, pkts)
for oo in outputs:
rx = oo._get_capture(1)
self.assertNotEqual(0, len(rx))
def send_and_expect_one_itf(self, input, pkts, itf):
- self.vapi.cli("clear trace")
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
+ self.pg_send(input, pkts)
rx = itf.get_capture(len(pkts))
def test_ip6_load_balance(self):
#
# Configure a punt redirect via pg1.
#
- nh_addr = VppIpAddress(self.pg1.remote_ip6).encode()
+ nh_addr = self.pg1.remote_ip6
self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
self.pg1.sw_if_index,
nh_addr)
self.assertLess(len(rx), len(pkts))
#
- # remove the poilcer. back to full rx
+ # remove the policer. back to full rx
#
self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
#
# Configure a punt redirects
#
- nh_addr = VppIpAddress(self.pg3.remote_ip6).encode()
+ nh_addr = self.pg3.remote_ip6
self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
self.pg3.sw_if_index,
nh_addr)
nh_addr)
self.vapi.ip_punt_redirect(self.pg2.sw_if_index,
self.pg3.sw_if_index,
- VppIpAddress('0::0').encode())
+ '0::0')
#
# Dump pg0 punt redirects
self.assertEqual(len(punts), 3)
for p in punts:
self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
- self.assertNotEqual(punts[1].punt.nh.un.ip6, self.pg3.remote_ip6)
- self.assertEqual(punts[2].punt.nh.un.ip6, '\x00'*16)
+ self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
+ self.assertEqual(str(punts[2].punt.nh), '::')
class TestIPDeag(VppTestCase):
class TestIP6Input(VppTestCase):
- """ IPv6 Input Exceptions """
+ """ IPv6 Input Exception Test Cases """
def setUp(self):
super(TestIP6Input, self).setUp()
i.unconfig_ip6()
i.admin_down()
- def test_ip_input(self):
- """ IP6 Input Exceptions """
-
- #
- # bad version - this is dropped
- #
- p_version = (Ether(src=self.pg0.remote_mac,
- dst=self.pg0.local_mac) /
- IPv6(src=self.pg0.remote_ip6,
- dst=self.pg1.remote_ip6,
- version=3) /
- inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
-
- self.send_and_assert_no_replies(self.pg0, p_version * 65,
- "funky version")
-
+ def test_ip_input_icmp_reply(self):
+ """ IP6 Input Exception - Return ICMP (3,0) """
#
- # hop limit - IMCP replies
+ # hop limit - ICMP replies
#
p_version = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
rx = self.send_and_expect(self.pg0, p_version * 65, self.pg0)
rx = rx[0]
icmp = rx[ICMPv6TimeExceeded]
- self.assertEqual(icmp.type, 3)
+
# 0: "hop limit exceeded in transit",
- self.assertEqual(icmp.code, 0)
+ self.assertEqual((icmp.type, icmp.code), (3, 0))
+
+ icmpv6_data = '\x0a' * 18
+ all_0s = "::"
+ all_1s = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
+
+ @parameterized.expand([
+ # Name, src, dst, l4proto, msg, timeout
+ ("src='iface', dst='iface'", None, None,
+ inet6.UDP(sport=1234, dport=1234), "funky version", None),
+ ("src='All 0's', dst='iface'", all_0s, None,
+ ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
+ ("src='iface', dst='All 0's'", None, all_0s,
+ ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
+ ("src='All 1's', dst='iface'", all_1s, None,
+ ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
+ ("src='iface', dst='All 1's'", None, all_1s,
+ ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
+ ("src='All 1's', dst='All 1's'", all_1s, all_1s,
+ ICMPv6EchoRequest(id=0xb, seq=5, data=icmpv6_data), None, 0.1),
+
+ ])
+ def test_ip_input_no_replies(self, name, src, dst, l4, msg, timeout):
+
+ self._testMethodDoc = 'IPv6 Input Exception - %s' % name
+
+ p_version = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IPv6(src=src or self.pg0.remote_ip6,
+ dst=dst or self.pg1.remote_ip6,
+ version=3) /
+ l4 /
+ Raw('\xa5' * 100))
+
+ self.send_and_assert_no_replies(self.pg0, p_version * 65,
+ remark=msg or "",
+ timeout=timeout)
if __name__ == '__main__':