dhcp ip: DSCP settings for transmitted DHCP packets
[vpp.git] / test / test_dhcp.py
index fce41a9..16b0f47 100644 (file)
@@ -9,7 +9,7 @@ from vpp_neighbor import VppNeighbor
 from vpp_ip_route import find_route, VppIpTable
 from util import mk_ll_addr
 import scapy.compat
-from scapy.layers.l2 import Ether, getmacbyip, ARP
+from scapy.layers.l2 import Ether, getmacbyip, ARP, Dot1Q
 from scapy.layers.inet import IP, UDP, ICMP
 from scapy.layers.inet6 import IPv6, in6_getnsmac
 from scapy.utils6 import in6_mactoifaceid
@@ -20,7 +20,10 @@ from scapy.layers.dhcp6 import DHCP6, DHCP6_Solicit, DHCP6_RelayForward, \
 from socket import AF_INET, AF_INET6
 from scapy.utils import inet_pton, inet_ntop
 from scapy.utils6 import in6_ptop
-from vpp_papi import mac_pton
+from vpp_papi import mac_pton, VppEnum
+from vpp_sub_interface import VppDot1QSubint
+from vpp_qos import VppQosEgressMap, VppQosMark
+
 
 DHCP4_CLIENT_PORT = 68
 DHCP4_SERVER_PORT = 67
@@ -210,7 +213,7 @@ class TestDHCP(VppTestCase):
         data = self.validate_relay_options(pkt, intf, intf.local_ip4,
                                            vpn_id, fib_id, oui)
 
-    def verify_orig_dhcp_pkt(self, pkt, intf, l2_bc=True):
+    def verify_orig_dhcp_pkt(self, pkt, intf, dscp, l2_bc=True):
         ether = pkt[Ether]
         if l2_bc:
             self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
@@ -226,14 +229,15 @@ class TestDHCP(VppTestCase):
         else:
             self.assertEqual(ip.dst, intf.remote_ip4)
             self.assertEqual(ip.src, intf.local_ip4)
+        self.assertEqual(ip.tos, dscp)
 
         udp = pkt[UDP]
         self.assertEqual(udp.dport, DHCP4_SERVER_PORT)
         self.assertEqual(udp.sport, DHCP4_CLIENT_PORT)
 
     def verify_orig_dhcp_discover(self, pkt, intf, hostname, client_id=None,
-                                  broadcast=True):
-        self.verify_orig_dhcp_pkt(pkt, intf)
+                                  broadcast=True, dscp=0):
+        self.verify_orig_dhcp_pkt(pkt, intf, dscp)
 
         self.verify_dhcp_msg_type(pkt, "discover")
         self.verify_dhcp_has_option(pkt, "hostname", hostname)
@@ -249,8 +253,9 @@ class TestDHCP(VppTestCase):
 
     def verify_orig_dhcp_request(self, pkt, intf, hostname, ip,
                                  broadcast=True,
-                                 l2_bc=True):
-        self.verify_orig_dhcp_pkt(pkt, intf, l2_bc=l2_bc)
+                                 l2_bc=True,
+                                 dscp=0):
+        self.verify_orig_dhcp_pkt(pkt, intf, dscp, l2_bc=l2_bc)
 
         self.verify_dhcp_msg_type(pkt, "request")
         self.verify_dhcp_has_option(pkt, "hostname", hostname)
@@ -1229,6 +1234,7 @@ class TestDHCP(VppTestCase):
     def test_dhcp_client(self):
         """ DHCP Client"""
 
+        vdscp = VppEnum.vl_api_ip_dscp_t
         hostname = 'universal-dp'
 
         self.pg_enable_capture(self.pg_interfaces)
@@ -1316,17 +1322,20 @@ class TestDHCP(VppTestCase):
 
         #
         # Start the procedure again. this time have VPP send the client-ID
+        # and set the DSCP value
         #
         self.pg3.admin_down()
         self.sleep(1)
         self.pg3.admin_up()
         self.vapi.dhcp_client_config(self.pg3.sw_if_index, hostname,
-                                     client_id=self.pg3.local_mac)
+                                     client_id=self.pg3.local_mac,
+                                     dscp=vdscp.IP_API_DSCP_EF)
 
         rx = self.pg3.get_capture(1)
 
         self.verify_orig_dhcp_discover(rx[0], self.pg3, hostname,
-                                       self.pg3.local_mac)
+                                       self.pg3.local_mac,
+                                       dscp=vdscp.IP_API_DSCP_EF)
 
         # TODO: VPP DHCP client should not accept DHCP OFFER message with
         # the XID (Transaction ID) not matching the XID of the most recent
@@ -1339,7 +1348,8 @@ class TestDHCP(VppTestCase):
 
         rx = self.pg3.get_capture(1)
         self.verify_orig_dhcp_request(rx[0], self.pg3, hostname,
-                                      self.pg3.local_ip4)
+                                      self.pg3.local_ip4,
+                                      dscp=vdscp.IP_API_DSCP_EF)
 
         #
         # unicast the ack to the offered address
@@ -1610,6 +1620,48 @@ class TestDHCP(VppTestCase):
         #
         self.vapi.dhcp_client_config(self.pg3.sw_if_index, hostname, is_add=0)
 
+    def test_dhcp_client_vlan(self):
+        """ DHCP Client w/ VLAN"""
+
+        vdscp = VppEnum.vl_api_ip_dscp_t
+        vqos = VppEnum.vl_api_qos_source_t
+        hostname = 'universal-dp'
+
+        self.pg_enable_capture(self.pg_interfaces)
+
+        vlan_100 = VppDot1QSubint(self, self.pg3, 100)
+        vlan_100.admin_up()
+
+        output = [scapy.compat.chb(4)] * 256
+        os = b''.join(output)
+        rows = [{'outputs': os},
+                {'outputs': os},
+                {'outputs': os},
+                {'outputs': os}]
+
+        qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
+        qm1 = VppQosMark(self, vlan_100, qem1,
+                         vqos.QOS_API_SOURCE_VLAN).add_vpp_config()
+
+        #
+        # Configure DHCP client on PG3 and capture the discover sent
+        #
+        self.vapi.dhcp_client_config(vlan_100.sw_if_index,
+                                     hostname,
+                                     dscp=vdscp.IP_API_DSCP_EF)
+
+        rx = self.pg3.get_capture(1)
+
+        self.assertEqual(rx[0][Dot1Q].vlan, 100)
+        self.assertEqual(rx[0][Dot1Q].prio, 4)
+
+        self.verify_orig_dhcp_discover(rx[0], self.pg3, hostname,
+                                       dscp=vdscp.IP_API_DSCP_EF)
+
+        self.vapi.dhcp_client_config(vlan_100.sw_if_index,
+                                     hostname,
+                                     is_add=0)
+
 
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)