#!/usr/bin/env python3
import socket
+from socket import inet_pton, inet_ntop
import unittest
from parameterized import parameterized
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
- ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, IPv6ExtHdrHopByHop
+ ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, \
+ IPv6ExtHdrHopByHop, ICMPv6MLReport2, ICMPv6MLDMultAddrRec
from scapy.layers.l2 import Ether, Dot1Q
from scapy.packet import Raw
-from scapy.utils import inet_pton, inet_ntop
from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
in6_mactoifaceid
from six import moves
from vpp_neighbor import find_nbr, VppNeighbor
from vpp_pg_interface import is_ipv6_misc
from vpp_sub_interface import VppSubInterface, VppDot1QSubint
+from vpp_policer import VppPolicer
from ipaddress import IPv6Network, IPv6Address
AF_INET6 = socket.AF_INET6
def send_and_expect_ns(self, tx_intf, rx_intf, pkts, tgt_ip,
filter_out_fn=is_ipv6_misc):
+ self.vapi.cli("clear trace")
tx_intf.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
"""Run standard test teardown and log ``show ip6 neighbors``."""
for i in self.interfaces:
i.unconfig_ip6()
- i.ip6_disable()
i.admin_down()
for i in self.sub_interfaces:
i.remove_vpp_config()
self.pg0.remote_ip6,
self.pg1.remote_hosts[1].ip6)
- def validate_ra(self, intf, rx, dst_ip=None, mtu=9000, pi_opt=None):
+ def validate_ra(self, intf, rx, dst_ip=None, src_ip=None,
+ mtu=9000, pi_opt=None):
if not dst_ip:
dst_ip = intf.remote_ip6
+ if not src_ip:
+ src_ip = mk_ll_addr(intf.local_mac)
# unicasted packets must come to the unicast mac
self.assertEqual(rx[Ether].dst, intf.remote_mac)
# and come from the router's link local
self.assertTrue(in6_islladdr(rx[IPv6].src))
- self.assertEqual(in6_ptop(rx[IPv6].src),
- in6_ptop(mk_ll_addr(intf.local_mac)))
+ self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(src_ip))
# it should contain the links MTU
ra = rx[ICMPv6ND_RA]
def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
filter_out_fn=is_ipv6_misc,
- opt=None):
+ opt=None,
+ src_ip=None):
+ self.vapi.cli("clear trace")
intf.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.assertEqual(len(rx), 1)
rx = rx[0]
- self.validate_ra(intf, rx, dst_ip, pi_opt=opt)
+ self.validate_ra(intf, rx, dst_ip, src_ip=src_ip, pi_opt=opt)
def test_rs(self):
""" IPv6 Router Solicitation Exceptions
# - expect an RA in return
#
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IPv6(
- dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
+ IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
ICMPv6ND_RS())
pkts = [p]
self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
self.pg1.local_ip6_prefix_len),
is_no=1)
+ #
+ # change the link's link local, so we know that works too.
+ #
+ self.vapi.sw_interface_ip6_set_link_local_address(
+ sw_if_index=self.pg0.sw_if_index,
+ ip="fe80::88")
+
self.pg0.ip6_ra_config(send_unicast=1)
self.send_and_expect_ra(self.pg0, p,
"RA with Prefix reverted to defaults",
- dst_ip=ll)
+ dst_ip=ll,
+ src_ip="fe80::88")
#
# Reset the periodic advertisements back to default values
#
self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
+ def test_mld(self):
+ """ MLD Report """
+ #
+ # test one MLD is sent after applying an IPv6 Address on an interface
+ #
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ subitf = VppDot1QSubint(self, self.pg1, 99)
+
+ subitf.admin_up()
+ subitf.config_ip6()
+
+ rxs = self.pg1._get_capture(timeout=4, filter_out_fn=None)
+
+ #
+ # hunt for the MLD on vlan 99
+ #
+ for rx in rxs:
+ # make sure ipv6 packets with hop by hop options have
+ # correct checksums
+ self.assert_packet_checksums_valid(rx)
+ if rx.haslayer(IPv6ExtHdrHopByHop) and \
+ rx.haslayer(Dot1Q) and \
+ rx[Dot1Q].vlan == 99:
+ mld = rx[ICMPv6MLReport2]
+
+ self.assertEqual(mld.records_number, 4)
+
class TestIPv6IfAddrRoute(VppTestCase):
""" IPv6 Interface Addr Route Test Case """
super(TestICMPv6Echo, self).tearDown()
for i in self.pg_interfaces:
i.unconfig_ip6()
- i.ip6_disable()
i.admin_down()
def test_icmpv6_echo(self):
def test_rd_receive_router_advertisement(self):
""" Verify events triggered by received RA packets """
- self.vapi.want_ip6_ra_events()
+ self.vapi.want_ip6_ra_events(enable=1)
prefix_info_1 = ICMPv6NDOptPrefixInfo(
prefix="1::2",
list.append(str(entry.route.prefix.network_address))
return list
+ def wait_for_no_default_route(self, n_tries=50, s_time=1):
+ while (n_tries):
+ fib = self.vapi.ip_route_dump(0, True)
+ default_routes = self.get_default_routes(fib)
+ if 0 == len(default_routes):
+ return True
+ n_tries = n_tries - 1
+ self.sleep(s_time)
+
+ return False
+
def test_all(self):
""" Test handling of SLAAC addresses and default routes """
self.sleep_on_vpp_time(1)
# check that default route is deleted
- fib = self.vapi.ip_route_dump(0, True)
- default_routes = self.get_default_routes(fib)
- self.assertEqual(len(default_routes), 0)
+ self.assertTrue(self.wait_for_no_default_route())
# check FIB still contains the SLAAC address
addresses = set(self.get_interface_addresses(fib, self.pg0))
# Add proxy support for the host
#
self.vapi.ip6nd_proxy_add_del(
- ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
+ is_add=1, ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
sw_if_index=self.pg1.sw_if_index)
#
lladdr=self.pg0._remote_hosts[2].mac))
self.vapi.ip6nd_proxy_add_del(
- ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
+ is_add=1, ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
sw_if_index=self.pg2.sw_if_index)
self.send_and_expect_na(self.pg2, ns_pg2,
#
self.vapi.ip6nd_proxy_add_del(
ip=inet_pton(AF_INET6, self.pg0._remote_hosts[2].ip6),
- sw_if_index=self.pg1.sw_if_index, is_del=1)
+ sw_if_index=self.pg1.sw_if_index, is_add=0)
self.vapi.ip6nd_proxy_add_del(
ip=inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
- sw_if_index=self.pg2.sw_if_index, is_del=1)
+ sw_if_index=self.pg2.sw_if_index, is_add=0)
self.assertFalse(find_nbr(self,
self.pg2.sw_if_index,
#
# add a policer
#
- policer = self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
- rate_type=1)
+ policer = VppPolicer(self, "ip6-punt", 400, 0, 10, 0, rate_type=1)
+ policer.add_vpp_config()
self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
self.vapi.cli("clear trace")
# remove the policer. back to full rx
#
self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
- self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
- rate_type=1, is_add=0)
+ policer.remove_vpp_config()
self.send_and_expect(self.pg0, pkts, self.pg1)
#