-from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
- DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
- DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
- DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime
+from socket import AF_INET6, inet_ntop, inet_pton
+
+from scapy.layers.dhcp6 import (
+ DHCP6_Advertise,
+ DHCP6OptClientId,
+ DHCP6OptStatusCode,
+ DHCP6OptPref,
+ DHCP6OptIA_PD,
+ DHCP6OptIAPrefix,
+ DHCP6OptServerId,
+ DHCP6_Solicit,
+ DHCP6_Reply,
+ DHCP6_Request,
+ DHCP6_Renew,
+ DHCP6_Rebind,
+ DUID_LL,
+ DHCP6_Release,
+ DHCP6OptElapsedTime,
+ DHCP6OptIA_NA,
+ DHCP6OptIAAddress,
+)
from scapy.layers.inet6 import IPv6, Ether, UDP
-from scapy.utils6 import in6_mactoifaceid
-from scapy.utils import inet_ntop, inet_pton
-from socket import AF_INET6
+
from framework import VppTestCase
-from time import time
+from asfframework import tag_fixme_vpp_workers, tag_run_solo
+from vpp_papi import VppEnum
+import util
+import os
def ip6_normalize(ip6):
return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
-def mk_ll_addr(mac):
- euid = in6_mactoifaceid(mac)
- addr = "fe80::" + euid
- return addr
-
-
-class TestDHCPv6PD(VppTestCase):
- """ DHCPv6 PD Data Plane Test Case """
+class TestDHCPv6DataPlane(VppTestCase):
+ """DHCPv6 Data Plane Test Case"""
@classmethod
def setUpClass(cls):
- super(TestDHCPv6PD, cls).setUpClass()
+ super(TestDHCPv6DataPlane, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestDHCPv6DataPlane, cls).tearDownClass()
def setUp(self):
- super(TestDHCPv6PD, self).setUp()
+ super(TestDHCPv6DataPlane, self).setUp()
self.create_pg_interfaces(range(1))
self.interfaces = list(self.pg_interfaces)
for i in self.interfaces:
i.unconfig_ip6()
i.admin_down()
- super(TestDHCPv6PD, self).tearDown()
+ super(TestDHCPv6DataPlane, self).tearDown()
- def test_dhcp_send_solicit_receive_advertise(self):
- """ Verify DHCPv6 PD Solicit packet and received Advertise envent """
+ def test_dhcp_ia_na_send_solicit_receive_advertise(self):
+ """Verify DHCPv6 IA NA Solicit packet and Advertise event"""
- self.vapi.dhcp6_clients_enable_disable()
+ self.vapi.dhcp6_clients_enable_disable(enable=1)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- prefix_bin = '\00\01\00\02\00\03' + '\00' * 10
- prefix = {'prefix': prefix_bin,
- 'prefix_length': 50,
- 'preferred_time': 60,
- 'valid_time': 120}
- self.vapi.dhcp6_pd_send_client_message(1, self.pg0.sw_if_index,
- T1=20, T2=40, prefixes=[prefix])
+ address = {"address": "1:2:3::5", "preferred_time": 60, "valid_time": 120}
+ self.vapi.dhcp6_send_client_message(
+ server_index=0xFFFFFFFF,
+ mrc=1,
+ msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
+ sw_if_index=self.pg0.sw_if_index,
+ T1=20,
+ T2=40,
+ addresses=[address],
+ n_addresses=len([address]),
+ )
rx_list = self.pg0.get_capture(1)
self.assertEqual(len(rx_list), 1)
packet = rx_list[0]
- self.assertTrue(packet.haslayer(IPv6))
- self.assertTrue(packet[IPv6].haslayer(DHCP6_Solicit))
+ self.assertEqual(packet.haslayer(IPv6), 1)
+ self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
+
+ client_duid = packet[DHCP6OptClientId].duid
+ trid = packet[DHCP6_Solicit].trid
+
+ dst = ip6_normalize(packet[IPv6].dst)
+ dst2 = ip6_normalize("ff02::1:2")
+ self.assert_equal(dst, dst2)
+ src = ip6_normalize(packet[IPv6].src)
+ src2 = ip6_normalize(self.pg0.local_ip6_ll)
+ self.assert_equal(src, src2)
+ ia_na = packet[DHCP6OptIA_NA]
+ self.assert_equal(ia_na.T1, 20)
+ self.assert_equal(ia_na.T2, 40)
+ self.assert_equal(len(ia_na.ianaopts), 1)
+ address = ia_na.ianaopts[0]
+ self.assert_equal(address.addr, "1:2:3::5")
+ self.assert_equal(address.preflft, 60)
+ self.assert_equal(address.validlft, 120)
+
+ self.vapi.want_dhcp6_reply_events(enable_disable=1, pid=os.getpid())
+
+ try:
+ ia_na_opts = DHCP6OptIAAddress(addr="7:8::2", preflft=60, validlft=120)
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IPv6(
+ src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll
+ )
+ / UDP(sport=547, dport=546)
+ / DHCP6_Advertise(trid=trid)
+ / DHCP6OptServerId(duid=self.server_duid)
+ / DHCP6OptClientId(duid=client_duid)
+ / DHCP6OptPref(prefval=7)
+ / DHCP6OptStatusCode(statuscode=1)
+ / DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts)
+ )
+ self.pg0.add_stream([p])
+ self.pg_start()
+
+ ev = self.vapi.wait_for_event(1, "dhcp6_reply_event")
+
+ self.assert_equal(ev.preference, 7)
+ self.assert_equal(ev.status_code, 1)
+ self.assert_equal(ev.T1, 20)
+ self.assert_equal(ev.T2, 40)
+
+ reported_address = ev.addresses[0]
+ address = ia_na_opts.getfieldval("addr")
+ self.assert_equal(str(reported_address.address), address)
+ self.assert_equal(
+ reported_address.preferred_time, ia_na_opts.getfieldval("preflft")
+ )
+ self.assert_equal(
+ reported_address.valid_time, ia_na_opts.getfieldval("validlft")
+ )
+
+ finally:
+ self.vapi.want_dhcp6_reply_events(enable_disable=0)
+ self.vapi.dhcp6_clients_enable_disable(enable=0)
+
+ def test_dhcp_pd_send_solicit_receive_advertise(self):
+ """Verify DHCPv6 PD Solicit packet and Advertise event"""
+
+ self.vapi.dhcp6_clients_enable_disable(enable=1)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ prefix = {
+ "prefix": {"address": "1:2:3::", "len": 50},
+ "preferred_time": 60,
+ "valid_time": 120,
+ }
+ prefixes = [prefix]
+ self.vapi.dhcp6_pd_send_client_message(
+ server_index=0xFFFFFFFF,
+ mrc=1,
+ msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT,
+ sw_if_index=self.pg0.sw_if_index,
+ T1=20,
+ T2=40,
+ prefixes=prefixes,
+ n_prefixes=len(prefixes),
+ )
+ rx_list = self.pg0.get_capture(1)
+ self.assertEqual(len(rx_list), 1)
+ packet = rx_list[0]
+
+ self.assertEqual(packet.haslayer(IPv6), 1)
+ self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
client_duid = packet[DHCP6OptClientId].duid
trid = packet[DHCP6_Solicit].trid
self.assert_equal(ia_pd.T2, 40)
self.assert_equal(len(ia_pd.iapdopt), 1)
prefix = ia_pd.iapdopt[0]
- self.assert_equal(prefix.prefix, '1:2:3::')
+ self.assert_equal(prefix.prefix, "1:2:3::")
self.assert_equal(prefix.plen, 50)
self.assert_equal(prefix.preflft, 60)
self.assert_equal(prefix.validlft, 120)
- self.vapi.want_dhcp6_pd_reply_events()
-
- ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
- validlft=120)
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IPv6(src=mk_ll_addr(self.pg0.remote_mac),
- dst=self.pg0.local_ip6_ll) /
- UDP(sport=547, dport=546) /
- DHCP6_Advertise(trid=trid) /
- DHCP6OptServerId(duid=self.server_duid) /
- DHCP6OptClientId(duid=client_duid) /
- DHCP6OptPref(prefval=7) /
- DHCP6OptStatusCode(statuscode=1) /
- DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
- )
+ self.vapi.want_dhcp6_pd_reply_events(enable_disable=1, pid=os.getpid())
+
+ try:
+ ia_pd_opts = DHCP6OptIAPrefix(
+ prefix="7:8::", plen=56, preflft=60, validlft=120
+ )
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IPv6(
+ src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll
+ )
+ / UDP(sport=547, dport=546)
+ / DHCP6_Advertise(trid=trid)
+ / DHCP6OptServerId(duid=self.server_duid)
+ / DHCP6OptClientId(duid=client_duid)
+ / DHCP6OptPref(prefval=7)
+ / DHCP6OptStatusCode(statuscode=1)
+ / DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
+ )
+ self.pg0.add_stream([p])
+ self.pg_start()
+
+ ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
+
+ self.assert_equal(ev.preference, 7)
+ self.assert_equal(ev.status_code, 1)
+ self.assert_equal(ev.T1, 20)
+ self.assert_equal(ev.T2, 40)
+
+ reported_prefix = ev.prefixes[0]
+ prefix = ia_pd_opts.getfieldval("prefix")
+ self.assert_equal(str(reported_prefix.prefix).split("/")[0], prefix)
+ self.assert_equal(
+ int(str(reported_prefix.prefix).split("/")[1]),
+ ia_pd_opts.getfieldval("plen"),
+ )
+ self.assert_equal(
+ reported_prefix.preferred_time, ia_pd_opts.getfieldval("preflft")
+ )
+ self.assert_equal(
+ reported_prefix.valid_time, ia_pd_opts.getfieldval("validlft")
+ )
+
+ finally:
+ self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
+ self.vapi.dhcp6_clients_enable_disable(enable=0)
+
+
+@tag_run_solo
+class TestDHCPv6IANAControlPlane(VppTestCase):
+ """DHCPv6 IA NA Control Plane Test Case"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestDHCPv6IANAControlPlane, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestDHCPv6IANAControlPlane, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestDHCPv6IANAControlPlane, self).setUp()
+
+ self.create_pg_interfaces(range(1))
+ self.interfaces = list(self.pg_interfaces)
+ for i in self.interfaces:
+ i.admin_up()
+
+ self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
+ self.client_duid = None
+ self.T1 = 1
+ self.T2 = 2
+
+ fib = self.vapi.ip_route_dump(0, True)
+ self.initial_addresses = set(self.get_interface_addresses(fib, self.pg0))
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ self.vapi.dhcp6_client_enable_disable(
+ sw_if_index=self.pg0.sw_if_index, enable=1
+ )
+
+ def tearDown(self):
+ self.vapi.dhcp6_client_enable_disable(
+ sw_if_index=self.pg0.sw_if_index, enable=0
+ )
+
+ for i in self.interfaces:
+ i.admin_down()
+
+ super(TestDHCPv6IANAControlPlane, self).tearDown()
+
+ @staticmethod
+ def get_interface_addresses(fib, pg):
+ lst = []
+ for entry in fib:
+ if entry.route.prefix.prefixlen == 128:
+ path = entry.route.paths[0]
+ if path.sw_if_index == pg.sw_if_index:
+ lst.append(str(entry.route.prefix.network_address))
+ return lst
+
+ def get_addresses(self):
+ fib = self.vapi.ip_route_dump(0, True)
+ addresses = set(self.get_interface_addresses(fib, self.pg0))
+ return addresses.difference(self.initial_addresses)
+
+ def validate_duid_ll(self, duid):
+ DUID_LL(duid)
+
+ def validate_packet(self, packet, msg_type, is_resend=False):
+ try:
+ self.assertEqual(packet.haslayer(msg_type), 1)
+ client_duid = packet[DHCP6OptClientId].duid
+ if self.client_duid is None:
+ self.client_duid = client_duid
+ self.validate_duid_ll(client_duid)
+ else:
+ self.assertEqual(self.client_duid, client_duid)
+ if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
+ server_duid = packet[DHCP6OptServerId].duid
+ self.assertEqual(server_duid, self.server_duid)
+ if is_resend:
+ self.assertEqual(self.trid, packet[msg_type].trid)
+ else:
+ self.trid = packet[msg_type].trid
+ ip = packet[IPv6]
+ udp = packet[UDP]
+ self.assertEqual(ip.dst, "ff02::1:2")
+ self.assertEqual(udp.sport, 546)
+ self.assertEqual(udp.dport, 547)
+ dhcpv6 = packet[msg_type]
+ elapsed_time = dhcpv6[DHCP6OptElapsedTime]
+ if is_resend:
+ self.assertNotEqual(elapsed_time.elapsedtime, 0)
+ else:
+ self.assertEqual(elapsed_time.elapsedtime, 0)
+ except BaseException:
+ packet.show()
+ raise
+
+ def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
+ if timeout is None:
+ timeout = 3
+ rx_list = self.pg0.get_capture(1, timeout=timeout)
+ packet = rx_list[0]
+ self.validate_packet(packet, msg_type, is_resend=is_resend)
+
+ def wait_for_solicit(self, timeout=None, is_resend=False):
+ self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
+
+ def wait_for_request(self, timeout=None, is_resend=False):
+ self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
+
+ def wait_for_renew(self, timeout=None, is_resend=False):
+ self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
+
+ def wait_for_rebind(self, timeout=None, is_resend=False):
+ self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
+
+ def wait_for_release(self, timeout=None, is_resend=False):
+ self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
+
+ def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
+ if t1 is None:
+ t1 = self.T1
+ if t2 is None:
+ t2 = self.T2
+ if ianaopts is None:
+ opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
+ else:
+ opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IPv6(src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll)
+ / UDP(sport=547, dport=546)
+ / msg_type(trid=self.trid)
+ / DHCP6OptServerId(duid=self.server_duid)
+ / DHCP6OptClientId(duid=self.client_duid)
+ / opt_ia_na
+ )
self.pg0.add_stream([p])
+ self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
+ def send_advertise(self, t1=None, t2=None, ianaopts=None):
+ self.send_packet(DHCP6_Advertise, t1, t2, ianaopts)
- self.assert_equal(ev.preference, 7)
- self.assert_equal(ev.status_code, 1)
- self.assert_equal(ev.T1, 20)
- self.assert_equal(ev.T2, 40)
+ def send_reply(self, t1=None, t2=None, ianaopts=None):
+ self.send_packet(DHCP6_Reply, t1, t2, ianaopts)
- reported_prefix = ev.prefixes[0]
- prefix = inet_pton(AF_INET6, ia_pd_opts.getfieldval("prefix"))
- self.assert_equal(reported_prefix.prefix, prefix)
- self.assert_equal(reported_prefix.prefix_length,
- ia_pd_opts.getfieldval("plen"))
- self.assert_equal(reported_prefix.preferred_time,
- ia_pd_opts.getfieldval("preflft"))
- self.assert_equal(reported_prefix.valid_time,
- ia_pd_opts.getfieldval("validlft"))
+ def test_T1_and_T2_timeouts(self):
+ """Test T1 and T2 timeouts"""
+ self.wait_for_solicit()
+ self.send_advertise()
+ self.wait_for_request()
+ self.send_reply()
+ self.sleep(1)
+
+ self.wait_for_renew()
+
+ self.pg_enable_capture(self.pg_interfaces)
+
+ self.sleep(1)
+
+ self.wait_for_rebind()
+
+ def test_addresses(self):
+ """Test handling of addresses"""
+
+ ia_na_opts = DHCP6OptIAAddress(addr="7:8::2", preflft=1, validlft=2)
+
+ self.wait_for_solicit()
+ self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts)
+ self.wait_for_request()
+ self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts)
+ self.sleep(0.1)
+
+ # check FIB for new address
+ new_addresses = self.get_addresses()
+ self.assertEqual(len(new_addresses), 1)
+ addr = list(new_addresses)[0]
+ self.assertEqual(addr, "7:8::2")
+
+ self.sleep(2)
+
+ # check that the address is deleted
+ fib = self.vapi.ip_route_dump(0, True)
+ addresses = set(self.get_interface_addresses(fib, self.pg0))
+ new_addresses = addresses.difference(self.initial_addresses)
+ self.assertEqual(len(new_addresses), 0)
+
+ def test_sending_client_messages_solicit(self):
+ """VPP receives messages from DHCPv6 client"""
+
+ self.wait_for_solicit()
+ self.send_packet(DHCP6_Solicit)
+ self.send_packet(DHCP6_Request)
+ self.send_packet(DHCP6_Renew)
+ self.send_packet(DHCP6_Rebind)
+ self.sleep(1)
+ self.wait_for_solicit(is_resend=True)
+
+ def test_sending_inappropriate_packets(self):
+ """Server sends messages with inappropriate message types"""
+
+ self.wait_for_solicit()
+ self.send_reply()
+ self.wait_for_solicit(is_resend=True)
+ self.send_advertise()
+ self.wait_for_request()
+ self.send_advertise()
+ self.wait_for_request(is_resend=True)
+ self.send_reply()
+ self.wait_for_renew()
+
+ def test_no_address_available_in_advertise(self):
+ """Advertise message contains NoAddrsAvail status code"""
+
+ self.wait_for_solicit()
+ noavail = DHCP6OptStatusCode(statuscode=2) # NoAddrsAvail
+ self.send_advertise(ianaopts=noavail)
+ self.wait_for_solicit(is_resend=True)
+
+ def test_preferred_greater_than_valid_lifetime(self):
+ """Preferred lifetime is greater than valid lifetime"""
+
+ self.wait_for_solicit()
+ self.send_advertise()
+ self.wait_for_request()
+ ia_na_opts = DHCP6OptIAAddress(addr="7:8::2", preflft=4, validlft=3)
+ self.send_reply(ianaopts=ia_na_opts)
+
+ self.sleep(0.5)
+
+ # check FIB contains no addresses
+ fib = self.vapi.ip_route_dump(0, True)
+ addresses = set(self.get_interface_addresses(fib, self.pg0))
+ new_addresses = addresses.difference(self.initial_addresses)
+ self.assertEqual(len(new_addresses), 0)
+
+ def test_T1_greater_than_T2(self):
+ """T1 is greater than T2"""
+
+ self.wait_for_solicit()
+ self.send_advertise()
+ self.wait_for_request()
+ ia_na_opts = DHCP6OptIAAddress(addr="7:8::2", preflft=4, validlft=8)
+ self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts)
+
+ self.sleep(0.5)
+
+ # check FIB contains no addresses
+ fib = self.vapi.ip_route_dump(0, True)
+ addresses = set(self.get_interface_addresses(fib, self.pg0))
+ new_addresses = addresses.difference(self.initial_addresses)
+ self.assertEqual(len(new_addresses), 0)
+
+
+@tag_fixme_vpp_workers
class TestDHCPv6PDControlPlane(VppTestCase):
- """ DHCPv6 PD Control Plane Test Case """
+ """DHCPv6 PD Control Plane Test Case"""
@classmethod
def setUpClass(cls):
super(TestDHCPv6PDControlPlane, cls).setUpClass()
+ @classmethod
+ def tearDownClass(cls):
+ super(TestDHCPv6PDControlPlane, cls).tearDownClass()
+
def setUp(self):
super(TestDHCPv6PDControlPlane, self).setUp()
self.T1 = 1
self.T2 = 2
- fib = self.vapi.ip6_fib_dump()
- self.initial_addresses = set(self.get_interface_addresses(fib,
- self.pg1))
+ fib = self.vapi.ip_route_dump(0, True)
+ self.initial_addresses = set(self.get_interface_addresses(fib, self.pg1))
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- self.prefix_group = 'my-pd-prefix-group'
+ self.prefix_group = "my-pd-prefix-group"
self.vapi.dhcp6_pd_client_enable_disable(
- self.pg0.sw_if_index,
- prefix_group=self.prefix_group)
+ enable=1, sw_if_index=self.pg0.sw_if_index, prefix_group=self.prefix_group
+ )
def tearDown(self):
- self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index,
- enable=0)
+ self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index, enable=0)
for i in self.interfaces:
i.admin_down()
def get_interface_addresses(fib, pg):
lst = []
for entry in fib:
- if entry.address_length == 128:
- path = entry.path[0]
+ if entry.route.prefix.prefixlen == 128:
+ path = entry.route.paths[0]
if path.sw_if_index == pg.sw_if_index:
- lst.append(entry.address)
+ lst.append(str(entry.route.prefix.network_address))
return lst
def get_addresses(self):
- fib = self.vapi.ip6_fib_dump()
+ fib = self.vapi.ip_route_dump(0, True)
addresses = set(self.get_interface_addresses(fib, self.pg1))
return addresses.difference(self.initial_addresses)
def validate_packet(self, packet, msg_type, is_resend=False):
try:
- self.assertTrue(packet.haslayer(msg_type))
+ self.assertEqual(packet.haslayer(msg_type), 1)
client_duid = packet[DHCP6OptClientId].duid
if self.client_duid is None:
self.client_duid = client_duid
self.trid = packet[msg_type].trid
ip = packet[IPv6]
udp = packet[UDP]
- self.assertEqual(ip.dst, 'ff02::1:2')
+ self.assertEqual(ip.dst, "ff02::1:2")
self.assertEqual(udp.sport, 546)
self.assertEqual(udp.dport, 547)
dhcpv6 = packet[msg_type]
elapsed_time = dhcpv6[DHCP6OptElapsedTime]
- if (is_resend):
+ if is_resend:
self.assertNotEqual(elapsed_time.elapsedtime, 0)
else:
self.assertEqual(elapsed_time.elapsedtime, 0)
- except:
+ except BaseException:
packet.show()
raise
opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
else:
opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
- p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IPv6(src=mk_ll_addr(self.pg0.remote_mac),
- dst=self.pg0.local_ip6_ll) /
- UDP(sport=547, dport=546) /
- msg_type(trid=self.trid) /
- DHCP6OptServerId(duid=self.server_duid) /
- DHCP6OptClientId(duid=self.client_duid) /
- opt_ia_pd
- )
+ p = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IPv6(src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll)
+ / UDP(sport=547, dport=546)
+ / msg_type(trid=self.trid)
+ / DHCP6OptServerId(duid=self.server_duid)
+ / DHCP6OptClientId(duid=self.client_duid)
+ / opt_ia_pd
+ )
self.pg0.add_stream([p])
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
def test_T1_and_T2_timeouts(self):
- """ Test T1 and T2 timeouts """
+ """Test T1 and T2 timeouts"""
self.wait_for_solicit()
self.send_advertise()
self.wait_for_rebind()
def test_prefixes(self):
- """ Test handling of prefixes """
- address_bin_1 = None
- address_bin_2 = None
+ """Test handling of prefixes"""
+
+ address1 = "::2:0:0:0:405/60"
+ address2 = "::76:0:0:0:406/62"
try:
- address_bin_1 = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
- address_prefix_length_1 = 60
- self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
- address_bin_1,
- address_prefix_length_1,
- self.prefix_group)
+ self.vapi.ip6_add_del_address_using_prefix(
+ sw_if_index=self.pg1.sw_if_index,
+ address_with_prefix=address1,
+ prefix_group=self.prefix_group,
+ )
- ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2,
- validlft=3)
+ ia_pd_opts = DHCP6OptIAPrefix(
+ prefix="7:8::", plen=56, preflft=2, validlft=3
+ )
self.wait_for_solicit()
self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
new_addresses = self.get_addresses()
self.assertEqual(len(new_addresses), 1)
addr = list(new_addresses)[0]
- self.assertEqual(inet_ntop(AF_INET6, addr), '7:8:0:2::405')
+ self.assertEqual(addr, "7:8:0:2::405")
self.sleep(1)
- address_bin_2 = '\x00' * 6 + '\x00\x76' + '\x00' * 6 + '\x04\x06'
- address_prefix_length_2 = 62
- self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
- address_bin_2,
- address_prefix_length_2,
- self.prefix_group)
+ self.vapi.ip6_add_del_address_using_prefix(
+ sw_if_index=self.pg1.sw_if_index,
+ address_with_prefix=address2,
+ prefix_group=self.prefix_group,
+ )
self.sleep(1)
# check FIB contains 2 addresses
- fib = self.vapi.ip6_fib_dump()
+ fib = self.vapi.ip_route_dump(0, True)
addresses = set(self.get_interface_addresses(fib, self.pg1))
new_addresses = addresses.difference(self.initial_addresses)
self.assertEqual(len(new_addresses), 2)
addr1 = list(new_addresses)[0]
addr2 = list(new_addresses)[1]
- if inet_ntop(AF_INET6, addr1) == '7:8:0:76::406':
+ if addr1 == "7:8:0:76::406":
addr1, addr2 = addr2, addr1
- self.assertEqual(inet_ntop(AF_INET6, addr1), '7:8:0:2::405')
- self.assertEqual(inet_ntop(AF_INET6, addr2), '7:8:0:76::406')
+ self.assertEqual(addr1, "7:8:0:2::405")
+ self.assertEqual(addr2, "7:8:0:76::406")
self.sleep(1)
# check that the addresses are deleted
- fib = self.vapi.ip6_fib_dump()
+ fib = self.vapi.ip_route_dump(0, True)
addresses = set(self.get_interface_addresses(fib, self.pg1))
new_addresses = addresses.difference(self.initial_addresses)
self.assertEqual(len(new_addresses), 0)
finally:
- if address_bin_1 is not None:
+ if address1 is not None:
self.vapi.ip6_add_del_address_using_prefix(
- self.pg1.sw_if_index, address_bin_1,
- address_prefix_length_1, self.prefix_group, is_add=0)
- if address_bin_2 is not None:
+ sw_if_index=self.pg1.sw_if_index,
+ address_with_prefix=address1,
+ prefix_group=self.prefix_group,
+ is_add=0,
+ )
+ if address2 is not None:
self.vapi.ip6_add_del_address_using_prefix(
- self.pg1.sw_if_index, address_bin_2,
- address_prefix_length_2, self.prefix_group, is_add=0)
+ sw_if_index=self.pg1.sw_if_index,
+ address_with_prefix=address2,
+ prefix_group=self.prefix_group,
+ is_add=0,
+ )
def test_sending_client_messages_solicit(self):
- """ VPP receives messages from DHCPv6 client """
+ """VPP receives messages from DHCPv6 client"""
self.wait_for_solicit()
self.send_packet(DHCP6_Solicit)
self.sleep(1)
self.wait_for_solicit(is_resend=True)
- def test_sending_inapropriate_packets(self):
- """ Server sends messages with inapropriate message types """
+ def test_sending_inappropriate_packets(self):
+ """Server sends messages with inappropriate message types"""
self.wait_for_solicit()
self.send_reply()
self.wait_for_renew()
def test_no_prefix_available_in_advertise(self):
- """ Advertise message contains NoPrefixAvail status code """
+ """Advertise message contains NoPrefixAvail status code"""
self.wait_for_solicit()
noavail = DHCP6OptStatusCode(statuscode=6) # NoPrefixAvail
self.send_advertise(iapdopt=noavail)
self.wait_for_solicit(is_resend=True)
- def test_preferred_greater_than_valit_lifetime(self):
- """ Preferred lifetime is greater than valid lifetime """
+ def test_preferred_greater_than_valid_lifetime(self):
+ """Preferred lifetime is greater than valid lifetime"""
+ address1 = "::2:0:0:0:405/60"
try:
- address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
- address_prefix_length = 60
- self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
- address_bin,
- address_prefix_length,
- self.prefix_group)
+ self.vapi.ip6_add_del_address_using_prefix(
+ sw_if_index=self.pg1.sw_if_index,
+ address_with_prefix=address1,
+ prefix_group=self.prefix_group,
+ )
self.wait_for_solicit()
self.send_advertise()
self.wait_for_request()
- ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
- validlft=3)
+ ia_pd_opts = DHCP6OptIAPrefix(
+ prefix="7:8::", plen=56, preflft=4, validlft=3
+ )
self.send_reply(iapdopt=ia_pd_opts)
self.sleep(0.5)
# check FIB contains no addresses
- fib = self.vapi.ip6_fib_dump()
+ fib = self.vapi.ip_route_dump(0, True)
addresses = set(self.get_interface_addresses(fib, self.pg1))
new_addresses = addresses.difference(self.initial_addresses)
self.assertEqual(len(new_addresses), 0)
finally:
- self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
- address_bin,
- address_prefix_length,
- self.prefix_group,
- is_add=0)
+ self.vapi.ip6_add_del_address_using_prefix(
+ sw_if_index=self.pg1.sw_if_index,
+ address_with_prefix=address1,
+ prefix_group=self.prefix_group,
+ is_add=0,
+ )
def test_T1_greater_than_T2(self):
- """ T1 is greater than T2 """
+ """T1 is greater than T2"""
+ address1 = "::2:0:0:0:405/60"
try:
- address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
- address_prefix_length = 60
- self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
- address_bin,
- address_prefix_length,
- self.prefix_group)
+ self.vapi.ip6_add_del_address_using_prefix(
+ sw_if_index=self.pg1.sw_if_index,
+ address_with_prefix=address1,
+ prefix_group=self.prefix_group,
+ )
self.wait_for_solicit()
self.send_advertise()
self.wait_for_request()
- ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
- validlft=8)
+ ia_pd_opts = DHCP6OptIAPrefix(
+ prefix="7:8::", plen=56, preflft=4, validlft=8
+ )
self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
self.sleep(0.5)
# check FIB contains no addresses
- fib = self.vapi.ip6_fib_dump()
+ fib = self.vapi.ip_route_dump(0, True)
addresses = set(self.get_interface_addresses(fib, self.pg1))
new_addresses = addresses.difference(self.initial_addresses)
self.assertEqual(len(new_addresses), 0)
finally:
- self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
- address_bin,
- address_prefix_length,
- self.prefix_group,
- is_add=0)
+ self.vapi.ip6_add_del_address_using_prefix(
+ sw_if_index=self.pg1.sw_if_index,
+ prefix_group=self.prefix_group,
+ address_with_prefix=address1,
+ is_add=False,
+ )