-#!/usr/bin/env python
+#!/usr/bin/env python3
-import socket
+from socket import inet_pton, inet_ntop
import unittest
from parameterized import parameterized
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
- ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, IPv6ExtHdrHopByHop
+ ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, \
+ IPv6ExtHdrHopByHop, ICMPv6MLReport2, ICMPv6MLDMultAddrRec
from scapy.layers.l2 import Ether, Dot1Q
from scapy.packet import Raw
-from scapy.utils import inet_pton, inet_ntop
from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
in6_mactoifaceid
from six import moves
from framework import VppTestCase, VppTestRunner
from util import ppp, ip6_normalize, mk_ll_addr
-from vpp_ip import DpoProto, VppIpAddress
+from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
- VppMplsRoute, VppMplsTable, VppIpTable, FibPathType, \
- VppIpInterfaceAddress
+ VppMplsRoute, VppMplsTable, VppIpTable, FibPathType, FibPathProto, \
+ VppIpInterfaceAddress, find_route_in_dump, find_mroute_in_dump
from vpp_neighbor import find_nbr, VppNeighbor
from vpp_pg_interface import is_ipv6_misc
from vpp_sub_interface import VppSubInterface, VppDot1QSubint
-from ipaddress import IPv6Network, IPv4Network, IPv6Address
+from vpp_policer import VppPolicer
+from ipaddress import IPv6Network, IPv6Address
AF_INET6 = socket.AF_INET6
def send_and_expect_ns(self, tx_intf, rx_intf, pkts, tgt_ip,
filter_out_fn=is_ipv6_misc):
+ self.vapi.cli("clear trace")
tx_intf.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
"""Run standard test teardown and log ``show ip6 neighbors``."""
for i in self.interfaces:
i.unconfig_ip6()
- i.ip6_disable()
i.admin_down()
for i in self.sub_interfaces:
i.remove_vpp_config()
:param int packet_size: Required packet size.
:param Scapy pkt: Packet to be modified.
"""
- dst_if_idx = packet_size / 10 % 2
+ dst_if_idx = int(packet_size / 10 % 2)
dst_if = self.flows[src_if][dst_if_idx]
info = self.create_packet_info(src_if, dst_if)
payload = self.info_to_payload(info)
self.pg0.remote_ip6,
self.pg1.remote_hosts[1].ip6)
- def validate_ra(self, intf, rx, dst_ip=None, mtu=9000, pi_opt=None):
+ def validate_ra(self, intf, rx, dst_ip=None, src_ip=None,
+ mtu=9000, pi_opt=None):
if not dst_ip:
dst_ip = intf.remote_ip6
+ if not src_ip:
+ src_ip = mk_ll_addr(intf.local_mac)
# unicasted packets must come to the unicast mac
self.assertEqual(rx[Ether].dst, intf.remote_mac)
# and come from the router's link local
self.assertTrue(in6_islladdr(rx[IPv6].src))
- self.assertEqual(in6_ptop(rx[IPv6].src),
- in6_ptop(mk_ll_addr(intf.local_mac)))
+ self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(src_ip))
# it should contain the links MTU
ra = rx[ICMPv6ND_RA]
def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
filter_out_fn=is_ipv6_misc,
- opt=None):
+ opt=None,
+ src_ip=None):
+ self.vapi.cli("clear trace")
intf.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.assertEqual(len(rx), 1)
rx = rx[0]
- self.validate_ra(intf, rx, dst_ip, pi_opt=opt)
+ self.validate_ra(intf, rx, dst_ip, src_ip=src_ip, pi_opt=opt)
def test_rs(self):
""" IPv6 Router Solicitation Exceptions
# - expect an RA in return
#
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IPv6(
- dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
+ IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
ICMPv6ND_RS())
pkts = [p]
self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
self.pg1.local_ip6_prefix_len),
is_no=1)
+ #
+ # change the link's link local, so we know that works too.
+ #
+ self.vapi.sw_interface_ip6_set_link_local_address(
+ sw_if_index=self.pg0.sw_if_index,
+ ip="fe80::88")
+
self.pg0.ip6_ra_config(send_unicast=1)
self.send_and_expect_ra(self.pg0, p,
"RA with Prefix reverted to defaults",
- dst_ip=ll)
+ dst_ip=ll,
+ src_ip="fe80::88")
#
# Reset the periodic advertisements back to default values
#
self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
+ def test_mld(self):
+ """ MLD Report """
+ #
+ # test one MLD is sent after applying an IPv6 Address on an interface
+ #
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ subitf = VppDot1QSubint(self, self.pg1, 99)
+
+ subitf.admin_up()
+ subitf.config_ip6()
+
+ rxs = self.pg1._get_capture(timeout=4, filter_out_fn=None)
+
+ #
+ # hunt for the MLD on vlan 99
+ #
+ for rx in rxs:
+ # make sure ipv6 packets with hop by hop options have
+ # correct checksums
+ self.assert_packet_checksums_valid(rx)
+ if rx.haslayer(IPv6ExtHdrHopByHop) and \
+ rx.haslayer(Dot1Q) and \
+ rx[Dot1Q].vlan == 99:
+ mld = rx[ICMPv6MLReport2]
+
+ self.assertEqual(mld.records_number, 4)
+
class TestIPv6IfAddrRoute(VppTestCase):
""" IPv6 Interface Addr Route Test Case """
addr1 = "2001:10::10"
addr2 = "2001:10::20"
- if_addr1 = VppIpInterfaceAddress(self, self.pg0,
- VppIpAddress(addr1), 64)
- if_addr2 = VppIpInterfaceAddress(self, self.pg0,
- VppIpAddress(addr2), 64)
- self.assertFalse(if_addr1.query_vpp_config()) # 2001:10::/64
+ if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
+ if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
+ self.assertFalse(if_addr1.query_vpp_config())
self.assertFalse(find_route(self, addr1, 128))
self.assertFalse(find_route(self, addr2, 128))
# configure first address, verify route present
if_addr1.add_vpp_config()
- self.assertTrue(if_addr1.query_vpp_config()) # 2001:10::/64
+ self.assertTrue(if_addr1.query_vpp_config())
self.assertTrue(find_route(self, addr1, 128))
self.assertFalse(find_route(self, addr2, 128))
# configure second address, delete first, verify route not removed
if_addr2.add_vpp_config()
if_addr1.remove_vpp_config()
- self.assertTrue(if_addr1.query_vpp_config()) # 2001:10::/64
+ self.assertFalse(if_addr1.query_vpp_config())
+ self.assertTrue(if_addr2.query_vpp_config())
self.assertFalse(find_route(self, addr1, 128))
self.assertTrue(find_route(self, addr2, 128))
# delete second address, verify route removed
if_addr2.remove_vpp_config()
- self.assertFalse(if_addr1.query_vpp_config()) # 2001:10::/64
+ self.assertFalse(if_addr1.query_vpp_config())
self.assertFalse(find_route(self, addr1, 128))
self.assertFalse(find_route(self, addr2, 128))
super(TestICMPv6Echo, self).tearDown()
for i in self.pg_interfaces:
i.unconfig_ip6()
- i.ip6_disable()
i.admin_down()
def test_icmpv6_echo(self):
def test_rd_receive_router_advertisement(self):
""" Verify events triggered by received RA packets """
- self.vapi.want_ip6_ra_events()
+ self.vapi.want_ip6_ra_events(enable=1)
prefix_info_1 = ICMPv6NDOptPrefixInfo(
prefix="1::2",
list.append(str(entry.route.prefix.network_address))
return list
+ def wait_for_no_default_route(self, n_tries=50, s_time=1):
+ while (n_tries):
+ fib = self.vapi.ip_route_dump(0, True)
+ default_routes = self.get_default_routes(fib)
+ if 0 == len(default_routes):
+ return True
+ n_tries = n_tries - 1
+ self.sleep(s_time)
+
+ return False
+
def test_all(self):
""" Test handling of SLAAC addresses and default routes """
self.pg0.add_stream([packet])
self.pg_start()
- self.sleep(0.1)
+ self.sleep_on_vpp_time(0.1)
fib = self.vapi.ip_route_dump(0, True)
self.pg0.add_stream([packet])
self.pg_start()
- self.sleep(0.1)
+ self.sleep_on_vpp_time(0.1)
# check that default route is deleted
fib = self.vapi.ip_route_dump(0, True)
default_routes = self.get_default_routes(fib)
self.assertEqual(len(default_routes), 0)
- self.sleep(0.1)
+ self.sleep_on_vpp_time(0.1)
# send RA
packet = self.create_ra_packet(self.pg0)
self.pg0.add_stream([packet])
self.pg_start()
- self.sleep(0.1)
+ self.sleep_on_vpp_time(0.1)
# check FIB for new default route
fib = self.vapi.ip_route_dump(0, True)
self.pg0.add_stream([packet])
self.pg_start()
- self.sleep(0.1)
+ self.sleep_on_vpp_time(0.1)
# check that default route still exists
fib = self.vapi.ip_route_dump(0, True)
self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
self.assertEqual(dr['next_hop'], router_address)
- self.sleep(1)
+ self.sleep_on_vpp_time(1)
# check that default route is deleted
- fib = self.vapi.ip_route_dump(0, True)
- default_routes = self.get_default_routes(fib)
- self.assertEqual(len(default_routes), 0)
+ self.assertTrue(self.wait_for_no_default_route())
# check FIB still contains the SLAAC address
addresses = set(self.get_interface_addresses(fib, self.pg0))
strict=False)
self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
- self.sleep(1)
+ self.sleep_on_vpp_time(1)
# check that SLAAC address is deleted
fib = self.vapi.ip_route_dump(0, True)
# Add proxy support for the host
#
self.vapi.ip6nd_proxy_add_del(
- ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
+ is_add=1, ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
sw_if_index=self.pg1.sw_if_index)
#
IPv6(dst=self.pg0._remote_hosts[2].ip6,
src=self.pg0.remote_ip6) /
inet6.UDP(sport=10000, dport=20000) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.pg0.add_stream(t)
self.pg_enable_capture(self.pg_interfaces)
lladdr=self.pg0._remote_hosts[2].mac))
self.vapi.ip6nd_proxy_add_del(
- ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
+ is_add=1, ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
sw_if_index=self.pg2.sw_if_index)
self.send_and_expect_na(self.pg2, ns_pg2,
IPv6(dst=self.pg0._remote_hosts[2].ip6,
src=self.pg0._remote_hosts[3].ip6) /
inet6.UDP(sport=10000, dport=20000) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.pg2.add_stream(t2)
self.pg_enable_capture(self.pg_interfaces)
#
self.vapi.ip6nd_proxy_add_del(
ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
- sw_if_index=self.pg1.sw_if_index, is_del=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=0)
self.vapi.ip6nd_proxy_add_del(
ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
- sw_if_index=self.pg2.sw_if_index, is_del=1)
+ sw_if_index=self.pg2.sw_if_index, is_add=0)
self.assertFalse(find_nbr(self,
self.pg2.sw_if_index,
dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
#
# A route via IP NULL that will reply with ICMP unreachables
dst=self.pg1.local_mac) /
IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
pm = (Ether(src=self.pg1.remote_mac,
dst=self.pg1.local_mac) /
IPv6(src="2001::1", dst="ffef::1") /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
#
# PG1 does not forward IP traffic
port_ip_hdr = (
IPv6(dst="3000::1", src="3000:1::1") /
inet6.UDP(sport=1234, dport=1234 + ii) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
port_ip_hdr))
src_ip_hdr = (
IPv6(dst="3000::1", src="3000:1::%d" % ii) /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
src_ip_hdr))
src="4000:1::1") /
inet6.UDP(sport=1234,
dport=1234 + ii) /
- Raw('\xa5' * 100)))
+ Raw(b'\xa5' * 100)))
src_pkts.append((Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(dst="4000::1",
src="4000:1::%d" % ii) /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100)))
+ Raw(b'\xa5' * 100)))
route_3000_2 = VppIpRoute(self, "3000::2", 128,
[VppRoutePath(self.pg3.remote_ip6,
src="6000:1::1") /
inet6.UDP(sport=1234,
dport=1234 + ii) /
- Raw('\xa5' * 100)))
+ Raw(b'\xa5' * 100)))
route_5000_2 = VppIpRoute(self, "5000::2", 128,
[VppRoutePath(self.pg3.remote_ip6,
IPv6(src=self.pg0.remote_ip6,
dst=self.pg0.local_ip6) /
inet6.TCP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
pkts = p * 1025
#
# add a policer
#
- policer = self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
- rate_type=1)
+ policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
+ policer.add_vpp_config()
self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
self.vapi.cli("clear trace")
# remove the policer. back to full rx
#
self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
- self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
- rate_type=1, is_add=0)
+ policer.remove_vpp_config()
self.send_and_expect(self.pg0, pkts, self.pg1)
#
dst=self.pg0.local_mac) /
IPv6(src="5::5", dst="1::1") /
inet6.TCP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
p_src = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(src="2::2", dst="1::2") /
inet6.TCP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
pkts_dst = p_dst * 257
pkts_src = p_src * 257
dst=self.pg0.local_mac) /
IPv6(src="3::4", dst="3::3") /
inet6.TCP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.send_and_assert_no_replies(self.pg0, p_l * 257,
"IP lookup loop")
dst=self.pg1.remote_ip6,
hlim=1) /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
rx = self.send_and_expect(self.pg0, p_version * NUM_PKTS, self.pg0)
rx = rx[0]
dst=dst or self.pg1.remote_ip6,
version=3) /
l4 /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.send_and_assert_no_replies(self.pg0, p_version * NUM_PKTS,
remark=msg or "",
IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
IPv6ExtHdrHopByHop() /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
+
+class TestIPReplace(VppTestCase):
+ """ IPv6 Table Replace """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPReplace, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPReplace, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestIPReplace, self).setUp()
+
+ self.create_pg_interfaces(range(4))
+
+ table_id = 1
+ self.tables = []
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.resolve_arp()
+ i.generate_remote_hosts(2)
+ self.tables.append(VppIpTable(self, table_id,
+ True).add_vpp_config())
+ table_id += 1
+
+ def tearDown(self):
+ super(TestIPReplace, self).tearDown()
+ for i in self.pg_interfaces:
+ i.admin_down()
+ i.unconfig_ip4()
+
+ def test_replace(self):
+ """ IP Table Replace """
+
+ N_ROUTES = 20
+ links = [self.pg0, self.pg1, self.pg2, self.pg3]
+ routes = [[], [], [], []]
+
+ # the sizes of 'empty' tables
+ for t in self.tables:
+ self.assertEqual(len(t.dump()), 2)
+ self.assertEqual(len(t.mdump()), 5)
+
+ # load up the tables with some routes
+ for ii, t in enumerate(self.tables):
+ for jj in range(1, N_ROUTES):
+ uni = VppIpRoute(
+ self, "2001::%d" % jj if jj != 0 else "2001::", 128,
+ [VppRoutePath(links[ii].remote_hosts[0].ip6,
+ links[ii].sw_if_index),
+ VppRoutePath(links[ii].remote_hosts[1].ip6,
+ links[ii].sw_if_index)],
+ table_id=t.table_id).add_vpp_config()
+ multi = VppIpMRoute(
+ self, "::",
+ "ff:2001::%d" % jj, 128,
+ MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
+ [VppMRoutePath(self.pg0.sw_if_index,
+ MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT,
+ proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
+ VppMRoutePath(self.pg1.sw_if_index,
+ MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
+ proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
+ VppMRoutePath(self.pg2.sw_if_index,
+ MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
+ proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
+ VppMRoutePath(self.pg3.sw_if_index,
+ MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
+ proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)],
+ table_id=t.table_id).add_vpp_config()
+ routes[ii].append({'uni': uni,
+ 'multi': multi})
+
+ #
+ # replace the tables a few times
+ #
+ for kk in range(3):
+ # replace each table
+ for t in self.tables:
+ t.replace_begin()
+
+ # all the routes are still there
+ for ii, t in enumerate(self.tables):
+ dump = t.dump()
+ mdump = t.mdump()
+ for r in routes[ii]:
+ self.assertTrue(find_route_in_dump(dump, r['uni'], t))
+ self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
+
+ # redownload the even numbered routes
+ for ii, t in enumerate(self.tables):
+ for jj in range(0, N_ROUTES, 2):
+ routes[ii][jj]['uni'].add_vpp_config()
+ routes[ii][jj]['multi'].add_vpp_config()
+
+ # signal each table converged
+ for t in self.tables:
+ t.replace_end()
+
+ # we should find the even routes, but not the odd
+ for ii, t in enumerate(self.tables):
+ dump = t.dump()
+ mdump = t.mdump()
+ for jj in range(0, N_ROUTES, 2):
+ self.assertTrue(find_route_in_dump(
+ dump, routes[ii][jj]['uni'], t))
+ self.assertTrue(find_mroute_in_dump(
+ mdump, routes[ii][jj]['multi'], t))
+ for jj in range(1, N_ROUTES - 1, 2):
+ self.assertFalse(find_route_in_dump(
+ dump, routes[ii][jj]['uni'], t))
+ self.assertFalse(find_mroute_in_dump(
+ mdump, routes[ii][jj]['multi'], t))
+
+ # reload all the routes
+ for ii, t in enumerate(self.tables):
+ for r in routes[ii]:
+ r['uni'].add_vpp_config()
+ r['multi'].add_vpp_config()
+
+ # all the routes are still there
+ for ii, t in enumerate(self.tables):
+ dump = t.dump()
+ mdump = t.mdump()
+ for r in routes[ii]:
+ self.assertTrue(find_route_in_dump(dump, r['uni'], t))
+ self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
+
+ #
+ # finally flush the tables for good measure
+ #
+ for t in self.tables:
+ t.flush()
+ self.assertEqual(len(t.dump()), 2)
+ self.assertEqual(len(t.mdump()), 5)
+
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)