X-Git-Url: https://gerrit.fd.io/r/gitweb?p=vpp.git;a=blobdiff_plain;f=test%2Ftest_dhcp.py;h=16b0f470b0a93403e4dadfb69a32abbbb9ff7995;hp=fce41a9b42344f594d45e3b0b841972cdf5ea563;hb=038e1dfbd;hpb=08ac303e43492c8b25911340fb62811289dd3935 diff --git a/test/test_dhcp.py b/test/test_dhcp.py index fce41a9b423..16b0f470b0a 100644 --- a/test/test_dhcp.py +++ b/test/test_dhcp.py @@ -9,7 +9,7 @@ from vpp_neighbor import VppNeighbor from vpp_ip_route import find_route, VppIpTable from util import mk_ll_addr import scapy.compat -from scapy.layers.l2 import Ether, getmacbyip, ARP +from scapy.layers.l2 import Ether, getmacbyip, ARP, Dot1Q from scapy.layers.inet import IP, UDP, ICMP from scapy.layers.inet6 import IPv6, in6_getnsmac from scapy.utils6 import in6_mactoifaceid @@ -20,7 +20,10 @@ from scapy.layers.dhcp6 import DHCP6, DHCP6_Solicit, DHCP6_RelayForward, \ from socket import AF_INET, AF_INET6 from scapy.utils import inet_pton, inet_ntop from scapy.utils6 import in6_ptop -from vpp_papi import mac_pton +from vpp_papi import mac_pton, VppEnum +from vpp_sub_interface import VppDot1QSubint +from vpp_qos import VppQosEgressMap, VppQosMark + DHCP4_CLIENT_PORT = 68 DHCP4_SERVER_PORT = 67 @@ -210,7 +213,7 @@ class TestDHCP(VppTestCase): data = self.validate_relay_options(pkt, intf, intf.local_ip4, vpn_id, fib_id, oui) - def verify_orig_dhcp_pkt(self, pkt, intf, l2_bc=True): + def verify_orig_dhcp_pkt(self, pkt, intf, dscp, l2_bc=True): ether = pkt[Ether] if l2_bc: self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff") @@ -226,14 +229,15 @@ class TestDHCP(VppTestCase): else: self.assertEqual(ip.dst, intf.remote_ip4) self.assertEqual(ip.src, intf.local_ip4) + self.assertEqual(ip.tos, dscp) udp = pkt[UDP] self.assertEqual(udp.dport, DHCP4_SERVER_PORT) self.assertEqual(udp.sport, DHCP4_CLIENT_PORT) def verify_orig_dhcp_discover(self, pkt, intf, hostname, client_id=None, - broadcast=True): - self.verify_orig_dhcp_pkt(pkt, intf) + broadcast=True, dscp=0): + self.verify_orig_dhcp_pkt(pkt, intf, dscp) self.verify_dhcp_msg_type(pkt, "discover") self.verify_dhcp_has_option(pkt, "hostname", hostname) @@ -249,8 +253,9 @@ class TestDHCP(VppTestCase): def verify_orig_dhcp_request(self, pkt, intf, hostname, ip, broadcast=True, - l2_bc=True): - self.verify_orig_dhcp_pkt(pkt, intf, l2_bc=l2_bc) + l2_bc=True, + dscp=0): + self.verify_orig_dhcp_pkt(pkt, intf, dscp, l2_bc=l2_bc) self.verify_dhcp_msg_type(pkt, "request") self.verify_dhcp_has_option(pkt, "hostname", hostname) @@ -1229,6 +1234,7 @@ class TestDHCP(VppTestCase): def test_dhcp_client(self): """ DHCP Client""" + vdscp = VppEnum.vl_api_ip_dscp_t hostname = 'universal-dp' self.pg_enable_capture(self.pg_interfaces) @@ -1316,17 +1322,20 @@ class TestDHCP(VppTestCase): # # Start the procedure again. this time have VPP send the client-ID + # and set the DSCP value # self.pg3.admin_down() self.sleep(1) self.pg3.admin_up() self.vapi.dhcp_client_config(self.pg3.sw_if_index, hostname, - client_id=self.pg3.local_mac) + client_id=self.pg3.local_mac, + dscp=vdscp.IP_API_DSCP_EF) rx = self.pg3.get_capture(1) self.verify_orig_dhcp_discover(rx[0], self.pg3, hostname, - self.pg3.local_mac) + self.pg3.local_mac, + dscp=vdscp.IP_API_DSCP_EF) # TODO: VPP DHCP client should not accept DHCP OFFER message with # the XID (Transaction ID) not matching the XID of the most recent @@ -1339,7 +1348,8 @@ class TestDHCP(VppTestCase): rx = self.pg3.get_capture(1) self.verify_orig_dhcp_request(rx[0], self.pg3, hostname, - self.pg3.local_ip4) + self.pg3.local_ip4, + dscp=vdscp.IP_API_DSCP_EF) # # unicast the ack to the offered address @@ -1610,6 +1620,48 @@ class TestDHCP(VppTestCase): # self.vapi.dhcp_client_config(self.pg3.sw_if_index, hostname, is_add=0) + def test_dhcp_client_vlan(self): + """ DHCP Client w/ VLAN""" + + vdscp = VppEnum.vl_api_ip_dscp_t + vqos = VppEnum.vl_api_qos_source_t + hostname = 'universal-dp' + + self.pg_enable_capture(self.pg_interfaces) + + vlan_100 = VppDot1QSubint(self, self.pg3, 100) + vlan_100.admin_up() + + output = [scapy.compat.chb(4)] * 256 + os = b''.join(output) + rows = [{'outputs': os}, + {'outputs': os}, + {'outputs': os}, + {'outputs': os}] + + qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config() + qm1 = VppQosMark(self, vlan_100, qem1, + vqos.QOS_API_SOURCE_VLAN).add_vpp_config() + + # + # Configure DHCP client on PG3 and capture the discover sent + # + self.vapi.dhcp_client_config(vlan_100.sw_if_index, + hostname, + dscp=vdscp.IP_API_DSCP_EF) + + rx = self.pg3.get_capture(1) + + self.assertEqual(rx[0][Dot1Q].vlan, 100) + self.assertEqual(rx[0][Dot1Q].prio, 4) + + self.verify_orig_dhcp_discover(rx[0], self.pg3, hostname, + dscp=vdscp.IP_API_DSCP_EF) + + self.vapi.dhcp_client_config(vlan_100.sw_if_index, + hostname, + is_add=0) + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)