X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_dhcp6.py;h=8a00cb81f906a2821975f786d3709e1d9a71312f;hb=5010bbd3c44d9fb061818be7a291a474608f510a;hp=6acc3447c07f4f3162e500850e522d2245d910c7;hpb=81119e86bdf47f41f06218f91e52024bc4d00e7c;p=vpp.git diff --git a/test/test_dhcp6.py b/test/test_dhcp6.py index 6acc3447c07..8a00cb81f90 100644 --- a/test/test_dhcp6.py +++ b/test/test_dhcp6.py @@ -1,34 +1,52 @@ -from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \ - DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \ - DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \ - DHCP6_Rebind, DUID_LLT, DHCP6_Release, DHCP6OptElapsedTime +from socket import AF_INET6, inet_ntop, inet_pton + +from scapy.layers.dhcp6 import ( + DHCP6_Advertise, + DHCP6OptClientId, + DHCP6OptStatusCode, + DHCP6OptPref, + DHCP6OptIA_PD, + DHCP6OptIAPrefix, + DHCP6OptServerId, + DHCP6_Solicit, + DHCP6_Reply, + DHCP6_Request, + DHCP6_Renew, + DHCP6_Rebind, + DUID_LL, + DHCP6_Release, + DHCP6OptElapsedTime, + DHCP6OptIA_NA, + DHCP6OptIAAddress, +) from scapy.layers.inet6 import IPv6, Ether, UDP from scapy.utils6 import in6_mactoifaceid -from scapy.utils import inet_ntop, inet_pton -from socket import AF_INET6 + +from framework import tag_fixme_vpp_workers from framework import VppTestCase -from time import time +from framework import tag_run_solo +from vpp_papi import VppEnum +import util +import os def ip6_normalize(ip6): return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6)) -def mk_ll_addr(mac): - euid = in6_mactoifaceid(mac) - addr = "fe80::" + euid - return addr - - -class TestDHCPv6PD(VppTestCase): - """ DHCPv6 PD Data Plane Test Case """ +class TestDHCPv6DataPlane(VppTestCase): + """DHCPv6 Data Plane Test Case""" @classmethod def setUpClass(cls): - super(TestDHCPv6PD, cls).setUpClass() + super(TestDHCPv6DataPlane, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestDHCPv6DataPlane, cls).tearDownClass() def setUp(self): - super(TestDHCPv6PD, self).setUp() + super(TestDHCPv6DataPlane, self).setUp() self.create_pg_interfaces(range(1)) self.interfaces = list(self.pg_interfaces) @@ -36,34 +54,128 @@ class TestDHCPv6PD(VppTestCase): i.admin_up() i.config_ip6() - time_since_2000 = int(time()) - 946684800 - self.server_duid = DUID_LLT(timeval=time_since_2000, - lladdr=self.pg0.remote_mac) + self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac) def tearDown(self): for i in self.interfaces: i.unconfig_ip6() i.admin_down() - super(TestDHCPv6PD, self).tearDown() + super(TestDHCPv6DataPlane, self).tearDown() + + def test_dhcp_ia_na_send_solicit_receive_advertise(self): + """Verify DHCPv6 IA NA Solicit packet and Advertise event""" + + self.vapi.dhcp6_clients_enable_disable(enable=1) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + address = {"address": "1:2:3::5", "preferred_time": 60, "valid_time": 120} + self.vapi.dhcp6_send_client_message( + server_index=0xFFFFFFFF, + mrc=1, + msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT, + sw_if_index=self.pg0.sw_if_index, + T1=20, + T2=40, + addresses=[address], + n_addresses=len([address]), + ) + rx_list = self.pg0.get_capture(1) + self.assertEqual(len(rx_list), 1) + packet = rx_list[0] + + self.assertEqual(packet.haslayer(IPv6), 1) + self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1) + + client_duid = packet[DHCP6OptClientId].duid + trid = packet[DHCP6_Solicit].trid + + dst = ip6_normalize(packet[IPv6].dst) + dst2 = ip6_normalize("ff02::1:2") + self.assert_equal(dst, dst2) + src = ip6_normalize(packet[IPv6].src) + src2 = ip6_normalize(self.pg0.local_ip6_ll) + self.assert_equal(src, src2) + ia_na = packet[DHCP6OptIA_NA] + self.assert_equal(ia_na.T1, 20) + self.assert_equal(ia_na.T2, 40) + self.assert_equal(len(ia_na.ianaopts), 1) + address = ia_na.ianaopts[0] + self.assert_equal(address.addr, "1:2:3::5") + self.assert_equal(address.preflft, 60) + self.assert_equal(address.validlft, 120) + + self.vapi.want_dhcp6_reply_events(enable_disable=1, pid=os.getpid()) - def test_dhcp_send_solicit_receive_advertise(self): - """ Verify DHCPv6 PD Solicit packet and received Advertise envent """ + try: + ia_na_opts = DHCP6OptIAAddress(addr="7:8::2", preflft=60, validlft=120) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IPv6( + src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll + ) + / UDP(sport=547, dport=546) + / DHCP6_Advertise(trid=trid) + / DHCP6OptServerId(duid=self.server_duid) + / DHCP6OptClientId(duid=client_duid) + / DHCP6OptPref(prefval=7) + / DHCP6OptStatusCode(statuscode=1) + / DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts) + ) + self.pg0.add_stream([p]) + self.pg_start() + + ev = self.vapi.wait_for_event(1, "dhcp6_reply_event") + + self.assert_equal(ev.preference, 7) + self.assert_equal(ev.status_code, 1) + self.assert_equal(ev.T1, 20) + self.assert_equal(ev.T2, 40) + + reported_address = ev.addresses[0] + address = ia_na_opts.getfieldval("addr") + self.assert_equal(str(reported_address.address), address) + self.assert_equal( + reported_address.preferred_time, ia_na_opts.getfieldval("preflft") + ) + self.assert_equal( + reported_address.valid_time, ia_na_opts.getfieldval("validlft") + ) + + finally: + self.vapi.want_dhcp6_reply_events(enable_disable=0) + self.vapi.dhcp6_clients_enable_disable(enable=0) + + def test_dhcp_pd_send_solicit_receive_advertise(self): + """Verify DHCPv6 PD Solicit packet and Advertise event""" + + self.vapi.dhcp6_clients_enable_disable(enable=1) self.pg_enable_capture(self.pg_interfaces) self.pg_start() - prefix_bin = '\00\01\00\02\00\03' + '\00' * 10 - prefix = {'prefix': prefix_bin, - 'prefix_length': 50, - 'preferred_time': 60, - 'valid_time': 120} - self.vapi.dhcp6_pd_send_client_message(1, self.pg0.sw_if_index, - T1=20, T2=40, prefixes=[prefix]) + + prefix = { + "prefix": {"address": "1:2:3::", "len": 50}, + "preferred_time": 60, + "valid_time": 120, + } + prefixes = [prefix] + self.vapi.dhcp6_pd_send_client_message( + server_index=0xFFFFFFFF, + mrc=1, + msg_type=VppEnum.vl_api_dhcpv6_msg_type_t.DHCPV6_MSG_API_SOLICIT, + sw_if_index=self.pg0.sw_if_index, + T1=20, + T2=40, + prefixes=prefixes, + n_prefixes=len(prefixes), + ) rx_list = self.pg0.get_capture(1) self.assertEqual(len(rx_list), 1) packet = rx_list[0] - self.assertTrue(packet.haslayer(IPv6)) - self.assertTrue(packet[IPv6].haslayer(DHCP6_Solicit)) + self.assertEqual(packet.haslayer(IPv6), 1) + self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1) client_duid = packet[DHCP6OptClientId].duid trid = packet[DHCP6_Solicit].trid @@ -79,55 +191,325 @@ class TestDHCPv6PD(VppTestCase): self.assert_equal(ia_pd.T2, 40) self.assert_equal(len(ia_pd.iapdopt), 1) prefix = ia_pd.iapdopt[0] - self.assert_equal(prefix.prefix, '1:2:3::') + self.assert_equal(prefix.prefix, "1:2:3::") self.assert_equal(prefix.plen, 50) self.assert_equal(prefix.preflft, 60) self.assert_equal(prefix.validlft, 120) - self.vapi.want_dhcp6_pd_reply_events() - self.vapi.dhcp6_clients_enable_disable() - - ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60, - validlft=120) - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IPv6(src=mk_ll_addr(self.pg0.remote_mac), - dst=self.pg0.local_ip6_ll) / - UDP(sport=547, dport=546) / - DHCP6_Advertise(trid=trid) / - DHCP6OptServerId(duid=self.server_duid) / - DHCP6OptClientId(duid=client_duid) / - DHCP6OptPref(prefval=7) / - DHCP6OptStatusCode(statuscode=1) / - DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts) - ) + self.vapi.want_dhcp6_pd_reply_events(enable_disable=1, pid=os.getpid()) + + try: + ia_pd_opts = DHCP6OptIAPrefix( + prefix="7:8::", plen=56, preflft=60, validlft=120 + ) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IPv6( + src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll + ) + / UDP(sport=547, dport=546) + / DHCP6_Advertise(trid=trid) + / DHCP6OptServerId(duid=self.server_duid) + / DHCP6OptClientId(duid=client_duid) + / DHCP6OptPref(prefval=7) + / DHCP6OptStatusCode(statuscode=1) + / DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts) + ) + self.pg0.add_stream([p]) + self.pg_start() + + ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event") + + self.assert_equal(ev.preference, 7) + self.assert_equal(ev.status_code, 1) + self.assert_equal(ev.T1, 20) + self.assert_equal(ev.T2, 40) + + reported_prefix = ev.prefixes[0] + prefix = ia_pd_opts.getfieldval("prefix") + self.assert_equal(str(reported_prefix.prefix).split("/")[0], prefix) + self.assert_equal( + int(str(reported_prefix.prefix).split("/")[1]), + ia_pd_opts.getfieldval("plen"), + ) + self.assert_equal( + reported_prefix.preferred_time, ia_pd_opts.getfieldval("preflft") + ) + self.assert_equal( + reported_prefix.valid_time, ia_pd_opts.getfieldval("validlft") + ) + + finally: + self.vapi.want_dhcp6_pd_reply_events(enable_disable=0) + self.vapi.dhcp6_clients_enable_disable(enable=0) + + +@tag_run_solo +class TestDHCPv6IANAControlPlane(VppTestCase): + """DHCPv6 IA NA Control Plane Test Case""" + + @classmethod + def setUpClass(cls): + super(TestDHCPv6IANAControlPlane, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestDHCPv6IANAControlPlane, cls).tearDownClass() + + def setUp(self): + super(TestDHCPv6IANAControlPlane, self).setUp() + + self.create_pg_interfaces(range(1)) + self.interfaces = list(self.pg_interfaces) + for i in self.interfaces: + i.admin_up() + + self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac) + self.client_duid = None + self.T1 = 1 + self.T2 = 2 + + fib = self.vapi.ip_route_dump(0, True) + self.initial_addresses = set(self.get_interface_addresses(fib, self.pg0)) + + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + self.vapi.dhcp6_client_enable_disable( + sw_if_index=self.pg0.sw_if_index, enable=1 + ) + + def tearDown(self): + self.vapi.dhcp6_client_enable_disable( + sw_if_index=self.pg0.sw_if_index, enable=0 + ) + + for i in self.interfaces: + i.admin_down() + + super(TestDHCPv6IANAControlPlane, self).tearDown() + + @staticmethod + def get_interface_addresses(fib, pg): + lst = [] + for entry in fib: + if entry.route.prefix.prefixlen == 128: + path = entry.route.paths[0] + if path.sw_if_index == pg.sw_if_index: + lst.append(str(entry.route.prefix.network_address)) + return lst + + def get_addresses(self): + fib = self.vapi.ip_route_dump(0, True) + addresses = set(self.get_interface_addresses(fib, self.pg0)) + return addresses.difference(self.initial_addresses) + + def validate_duid_ll(self, duid): + DUID_LL(duid) + + def validate_packet(self, packet, msg_type, is_resend=False): + try: + self.assertEqual(packet.haslayer(msg_type), 1) + client_duid = packet[DHCP6OptClientId].duid + if self.client_duid is None: + self.client_duid = client_duid + self.validate_duid_ll(client_duid) + else: + self.assertEqual(self.client_duid, client_duid) + if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind: + server_duid = packet[DHCP6OptServerId].duid + self.assertEqual(server_duid, self.server_duid) + if is_resend: + self.assertEqual(self.trid, packet[msg_type].trid) + else: + self.trid = packet[msg_type].trid + ip = packet[IPv6] + udp = packet[UDP] + self.assertEqual(ip.dst, "ff02::1:2") + self.assertEqual(udp.sport, 546) + self.assertEqual(udp.dport, 547) + dhcpv6 = packet[msg_type] + elapsed_time = dhcpv6[DHCP6OptElapsedTime] + if is_resend: + self.assertNotEqual(elapsed_time.elapsedtime, 0) + else: + self.assertEqual(elapsed_time.elapsedtime, 0) + except BaseException: + packet.show() + raise + + def wait_for_packet(self, msg_type, timeout=None, is_resend=False): + if timeout is None: + timeout = 3 + rx_list = self.pg0.get_capture(1, timeout=timeout) + packet = rx_list[0] + self.validate_packet(packet, msg_type, is_resend=is_resend) + + def wait_for_solicit(self, timeout=None, is_resend=False): + self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend) + + def wait_for_request(self, timeout=None, is_resend=False): + self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend) + + def wait_for_renew(self, timeout=None, is_resend=False): + self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend) + + def wait_for_rebind(self, timeout=None, is_resend=False): + self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend) + + def wait_for_release(self, timeout=None, is_resend=False): + self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend) + + def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None): + if t1 is None: + t1 = self.T1 + if t2 is None: + t2 = self.T2 + if ianaopts is None: + opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2) + else: + opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IPv6(src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll) + / UDP(sport=547, dport=546) + / msg_type(trid=self.trid) + / DHCP6OptServerId(duid=self.server_duid) + / DHCP6OptClientId(duid=self.client_duid) + / opt_ia_na + ) self.pg0.add_stream([p]) + self.pg_enable_capture(self.pg_interfaces) self.pg_start() - ev = self.vapi.wait_for_event(10, "dhcp6_pd_reply_event") + def send_advertise(self, t1=None, t2=None, ianaopts=None): + self.send_packet(DHCP6_Advertise, t1, t2, ianaopts) - self.assert_equal(ev.preference, 7) - self.assert_equal(ev.status_code, 1) - self.assert_equal(ev.T1, 20) - self.assert_equal(ev.T2, 40) + def send_reply(self, t1=None, t2=None, ianaopts=None): + self.send_packet(DHCP6_Reply, t1, t2, ianaopts) - reported_prefix = ev.prefixes[0] - prefix = inet_pton(AF_INET6, ia_pd_opts.getfieldval("prefix")) - self.assert_equal(reported_prefix.prefix, prefix) - self.assert_equal(reported_prefix.prefix_length, - ia_pd_opts.getfieldval("plen")) - self.assert_equal(reported_prefix.preferred_time, - ia_pd_opts.getfieldval("preflft")) - self.assert_equal(reported_prefix.valid_time, - ia_pd_opts.getfieldval("validlft")) + def test_T1_and_T2_timeouts(self): + """Test T1 and T2 timeouts""" + self.wait_for_solicit() + self.send_advertise() + self.wait_for_request() + self.send_reply() + + self.sleep(1) + self.wait_for_renew() + + self.pg_enable_capture(self.pg_interfaces) + + self.sleep(1) + + self.wait_for_rebind() + + def test_addresses(self): + """Test handling of addresses""" + + ia_na_opts = DHCP6OptIAAddress(addr="7:8::2", preflft=1, validlft=2) + + self.wait_for_solicit() + self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts) + self.wait_for_request() + self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts) + self.sleep(0.1) + + # check FIB for new address + new_addresses = self.get_addresses() + self.assertEqual(len(new_addresses), 1) + addr = list(new_addresses)[0] + self.assertEqual(addr, "7:8::2") + + self.sleep(2) + + # check that the address is deleted + fib = self.vapi.ip_route_dump(0, True) + addresses = set(self.get_interface_addresses(fib, self.pg0)) + new_addresses = addresses.difference(self.initial_addresses) + self.assertEqual(len(new_addresses), 0) + + def test_sending_client_messages_solicit(self): + """VPP receives messages from DHCPv6 client""" + + self.wait_for_solicit() + self.send_packet(DHCP6_Solicit) + self.send_packet(DHCP6_Request) + self.send_packet(DHCP6_Renew) + self.send_packet(DHCP6_Rebind) + self.sleep(1) + self.wait_for_solicit(is_resend=True) + + def test_sending_inappropriate_packets(self): + """Server sends messages with inappropriate message types""" + + self.wait_for_solicit() + self.send_reply() + self.wait_for_solicit(is_resend=True) + self.send_advertise() + self.wait_for_request() + self.send_advertise() + self.wait_for_request(is_resend=True) + self.send_reply() + self.wait_for_renew() + + def test_no_address_available_in_advertise(self): + """Advertise message contains NoAddrsAvail status code""" + + self.wait_for_solicit() + noavail = DHCP6OptStatusCode(statuscode=2) # NoAddrsAvail + self.send_advertise(ianaopts=noavail) + self.wait_for_solicit(is_resend=True) + + def test_preferred_greater_than_valid_lifetime(self): + """Preferred lifetime is greater than valid lifetime""" + + self.wait_for_solicit() + self.send_advertise() + self.wait_for_request() + ia_na_opts = DHCP6OptIAAddress(addr="7:8::2", preflft=4, validlft=3) + self.send_reply(ianaopts=ia_na_opts) + + self.sleep(0.5) + + # check FIB contains no addresses + fib = self.vapi.ip_route_dump(0, True) + addresses = set(self.get_interface_addresses(fib, self.pg0)) + new_addresses = addresses.difference(self.initial_addresses) + self.assertEqual(len(new_addresses), 0) + + def test_T1_greater_than_T2(self): + """T1 is greater than T2""" + + self.wait_for_solicit() + self.send_advertise() + self.wait_for_request() + ia_na_opts = DHCP6OptIAAddress(addr="7:8::2", preflft=4, validlft=8) + self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts) + + self.sleep(0.5) + + # check FIB contains no addresses + fib = self.vapi.ip_route_dump(0, True) + addresses = set(self.get_interface_addresses(fib, self.pg0)) + new_addresses = addresses.difference(self.initial_addresses) + self.assertEqual(len(new_addresses), 0) + + +@tag_fixme_vpp_workers class TestDHCPv6PDControlPlane(VppTestCase): - """ DHCPv6 PD Control Plane Test Case """ + """DHCPv6 PD Control Plane Test Case""" @classmethod def setUpClass(cls): super(TestDHCPv6PDControlPlane, cls).setUpClass() + @classmethod + def tearDownClass(cls): + super(TestDHCPv6PDControlPlane, cls).tearDownClass() + def setUp(self): super(TestDHCPv6PDControlPlane, self).setUp() @@ -136,29 +518,25 @@ class TestDHCPv6PDControlPlane(VppTestCase): for i in self.interfaces: i.admin_up() - time_since_2000 = int(time()) - 946684800 - self.server_duid = DUID_LLT(timeval=time_since_2000, - lladdr=self.pg0.remote_mac) + self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac) self.client_duid = None self.T1 = 1 self.T2 = 2 - fib = self.vapi.ip6_fib_dump() - self.initial_addresses = set(self.get_interface_addresses(fib, - self.pg1)) + fib = self.vapi.ip_route_dump(0, True) + self.initial_addresses = set(self.get_interface_addresses(fib, self.pg1)) self.pg_enable_capture(self.pg_interfaces) self.pg_start() - self.prefix_group = 'my-pd-prefix-group' + self.prefix_group = "my-pd-prefix-group" self.vapi.dhcp6_pd_client_enable_disable( - self.pg0.sw_if_index, - prefix_group=self.prefix_group) + enable=1, sw_if_index=self.pg0.sw_if_index, prefix_group=self.prefix_group + ) def tearDown(self): - self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index, - enable=0) + self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index, enable=0) for i in self.interfaces: i.admin_down() @@ -169,27 +547,27 @@ class TestDHCPv6PDControlPlane(VppTestCase): def get_interface_addresses(fib, pg): lst = [] for entry in fib: - if entry.address_length == 128: - path = entry.path[0] + if entry.route.prefix.prefixlen == 128: + path = entry.route.paths[0] if path.sw_if_index == pg.sw_if_index: - lst.append(entry.address) + lst.append(str(entry.route.prefix.network_address)) return lst def get_addresses(self): - fib = self.vapi.ip6_fib_dump() + fib = self.vapi.ip_route_dump(0, True) addresses = set(self.get_interface_addresses(fib, self.pg1)) return addresses.difference(self.initial_addresses) - def validate_duid_llt(self, duid): - DUID_LLT(duid) + def validate_duid_ll(self, duid): + DUID_LL(duid) def validate_packet(self, packet, msg_type, is_resend=False): try: - self.assertTrue(packet.haslayer(msg_type)) + self.assertEqual(packet.haslayer(msg_type), 1) client_duid = packet[DHCP6OptClientId].duid if self.client_duid is None: self.client_duid = client_duid - self.validate_duid_llt(client_duid) + self.validate_duid_ll(client_duid) else: self.assertEqual(self.client_duid, client_duid) if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind: @@ -201,16 +579,16 @@ class TestDHCPv6PDControlPlane(VppTestCase): self.trid = packet[msg_type].trid ip = packet[IPv6] udp = packet[UDP] - self.assertEqual(ip.dst, 'ff02::1:2') + self.assertEqual(ip.dst, "ff02::1:2") self.assertEqual(udp.sport, 546) self.assertEqual(udp.dport, 547) dhcpv6 = packet[msg_type] elapsed_time = dhcpv6[DHCP6OptElapsedTime] - if (is_resend): + if is_resend: self.assertNotEqual(elapsed_time.elapsedtime, 0) else: self.assertEqual(elapsed_time.elapsedtime, 0) - except: + except BaseException: packet.show() raise @@ -245,15 +623,15 @@ class TestDHCPv6PDControlPlane(VppTestCase): opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2) else: opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt) - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IPv6(src=mk_ll_addr(self.pg0.remote_mac), - dst=self.pg0.local_ip6_ll) / - UDP(sport=547, dport=546) / - msg_type(trid=self.trid) / - DHCP6OptServerId(duid=self.server_duid) / - DHCP6OptClientId(duid=self.client_duid) / - opt_ia_pd - ) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IPv6(src=util.mk_ll_addr(self.pg0.remote_mac), dst=self.pg0.local_ip6_ll) + / UDP(sport=547, dport=546) + / msg_type(trid=self.trid) + / DHCP6OptServerId(duid=self.server_duid) + / DHCP6OptClientId(duid=self.client_duid) + / opt_ia_pd + ) self.pg0.add_stream([p]) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -265,7 +643,7 @@ class TestDHCPv6PDControlPlane(VppTestCase): self.send_packet(DHCP6_Reply, t1, t2, iapdopt) def test_T1_and_T2_timeouts(self): - """ Test T1 and T2 timeouts """ + """Test T1 and T2 timeouts""" self.wait_for_solicit() self.send_advertise() @@ -283,19 +661,20 @@ class TestDHCPv6PDControlPlane(VppTestCase): self.wait_for_rebind() def test_prefixes(self): - """ Test handling of prefixes """ - address_bin_1 = None - address_bin_2 = None + """Test handling of prefixes""" + + address1 = "::2:0:0:0:405/60" + address2 = "::76:0:0:0:406/62" try: - address_bin_1 = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05' - address_prefix_length_1 = 60 - self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index, - address_bin_1, - address_prefix_length_1, - self.prefix_group) + self.vapi.ip6_add_del_address_using_prefix( + sw_if_index=self.pg1.sw_if_index, + address_with_prefix=address1, + prefix_group=self.prefix_group, + ) - ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2, - validlft=3) + ia_pd_opts = DHCP6OptIAPrefix( + prefix="7:8::", plen=56, preflft=2, validlft=3 + ) self.wait_for_solicit() self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts) @@ -307,51 +686,56 @@ class TestDHCPv6PDControlPlane(VppTestCase): new_addresses = self.get_addresses() self.assertEqual(len(new_addresses), 1) addr = list(new_addresses)[0] - self.assertEqual(inet_ntop(AF_INET6, addr), '7:8:0:2::405') + self.assertEqual(addr, "7:8:0:2::405") self.sleep(1) - address_bin_2 = '\x00' * 6 + '\x00\x76' + '\x00' * 6 + '\x04\x06' - address_prefix_length_2 = 62 - self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index, - address_bin_2, - address_prefix_length_2, - self.prefix_group) + self.vapi.ip6_add_del_address_using_prefix( + sw_if_index=self.pg1.sw_if_index, + address_with_prefix=address2, + prefix_group=self.prefix_group, + ) self.sleep(1) # check FIB contains 2 addresses - fib = self.vapi.ip6_fib_dump() + fib = self.vapi.ip_route_dump(0, True) addresses = set(self.get_interface_addresses(fib, self.pg1)) new_addresses = addresses.difference(self.initial_addresses) self.assertEqual(len(new_addresses), 2) addr1 = list(new_addresses)[0] addr2 = list(new_addresses)[1] - if inet_ntop(AF_INET6, addr1) == '7:8:0:76::406': + if addr1 == "7:8:0:76::406": addr1, addr2 = addr2, addr1 - self.assertEqual(inet_ntop(AF_INET6, addr1), '7:8:0:2::405') - self.assertEqual(inet_ntop(AF_INET6, addr2), '7:8:0:76::406') + self.assertEqual(addr1, "7:8:0:2::405") + self.assertEqual(addr2, "7:8:0:76::406") self.sleep(1) # check that the addresses are deleted - fib = self.vapi.ip6_fib_dump() + fib = self.vapi.ip_route_dump(0, True) addresses = set(self.get_interface_addresses(fib, self.pg1)) new_addresses = addresses.difference(self.initial_addresses) self.assertEqual(len(new_addresses), 0) finally: - if address_bin_1 is not None: + if address1 is not None: self.vapi.ip6_add_del_address_using_prefix( - self.pg1.sw_if_index, address_bin_1, - address_prefix_length_1, self.prefix_group, is_add=0) - if address_bin_2 is not None: + sw_if_index=self.pg1.sw_if_index, + address_with_prefix=address1, + prefix_group=self.prefix_group, + is_add=0, + ) + if address2 is not None: self.vapi.ip6_add_del_address_using_prefix( - self.pg1.sw_if_index, address_bin_2, - address_prefix_length_2, self.prefix_group, is_add=0) + sw_if_index=self.pg1.sw_if_index, + address_with_prefix=address2, + prefix_group=self.prefix_group, + is_add=0, + ) def test_sending_client_messages_solicit(self): - """ VPP receives messages from DHCPv6 client """ + """VPP receives messages from DHCPv6 client""" self.wait_for_solicit() self.send_packet(DHCP6_Solicit) @@ -361,8 +745,8 @@ class TestDHCPv6PDControlPlane(VppTestCase): self.sleep(1) self.wait_for_solicit(is_resend=True) - def test_sending_inapropriate_packets(self): - """ Server sends messages with inapropriate message types """ + def test_sending_inappropriate_packets(self): + """Server sends messages with inappropriate message types""" self.wait_for_solicit() self.send_reply() @@ -375,75 +759,79 @@ class TestDHCPv6PDControlPlane(VppTestCase): self.wait_for_renew() def test_no_prefix_available_in_advertise(self): - """ Advertise message contains NoPrefixAvail status code """ + """Advertise message contains NoPrefixAvail status code""" self.wait_for_solicit() noavail = DHCP6OptStatusCode(statuscode=6) # NoPrefixAvail self.send_advertise(iapdopt=noavail) self.wait_for_solicit(is_resend=True) - def test_preferred_greater_than_valit_lifetime(self): - """ Preferred lifetime is greater than valid lifetime """ + def test_preferred_greater_than_valid_lifetime(self): + """Preferred lifetime is greater than valid lifetime""" + address1 = "::2:0:0:0:405/60" try: - address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05' - address_prefix_length = 60 - self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index, - address_bin, - address_prefix_length, - self.prefix_group) + self.vapi.ip6_add_del_address_using_prefix( + sw_if_index=self.pg1.sw_if_index, + address_with_prefix=address1, + prefix_group=self.prefix_group, + ) self.wait_for_solicit() self.send_advertise() self.wait_for_request() - ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4, - validlft=3) + ia_pd_opts = DHCP6OptIAPrefix( + prefix="7:8::", plen=56, preflft=4, validlft=3 + ) self.send_reply(iapdopt=ia_pd_opts) self.sleep(0.5) # check FIB contains no addresses - fib = self.vapi.ip6_fib_dump() + fib = self.vapi.ip_route_dump(0, True) addresses = set(self.get_interface_addresses(fib, self.pg1)) new_addresses = addresses.difference(self.initial_addresses) self.assertEqual(len(new_addresses), 0) finally: - self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index, - address_bin, - address_prefix_length, - self.prefix_group, - is_add=0) + self.vapi.ip6_add_del_address_using_prefix( + sw_if_index=self.pg1.sw_if_index, + address_with_prefix=address1, + prefix_group=self.prefix_group, + is_add=0, + ) def test_T1_greater_than_T2(self): - """ T1 is greater than T2 """ + """T1 is greater than T2""" + address1 = "::2:0:0:0:405/60" try: - address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05' - address_prefix_length = 60 - self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index, - address_bin, - address_prefix_length, - self.prefix_group) + self.vapi.ip6_add_del_address_using_prefix( + sw_if_index=self.pg1.sw_if_index, + address_with_prefix=address1, + prefix_group=self.prefix_group, + ) self.wait_for_solicit() self.send_advertise() self.wait_for_request() - ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4, - validlft=8) + ia_pd_opts = DHCP6OptIAPrefix( + prefix="7:8::", plen=56, preflft=4, validlft=8 + ) self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts) self.sleep(0.5) # check FIB contains no addresses - fib = self.vapi.ip6_fib_dump() + fib = self.vapi.ip_route_dump(0, True) addresses = set(self.get_interface_addresses(fib, self.pg1)) new_addresses = addresses.difference(self.initial_addresses) self.assertEqual(len(new_addresses), 0) finally: - self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index, - address_bin, - address_prefix_length, - self.prefix_group, - is_add=0) + self.vapi.ip6_add_del_address_using_prefix( + sw_if_index=self.pg1.sw_if_index, + prefix_group=self.prefix_group, + address_with_prefix=address1, + is_add=False, + )