qos: QoS dump APIs
[vpp.git] / test / test_qos.py
index 76fa8a1..9efa798 100644 (file)
@@ -3,21 +3,41 @@
 import unittest
 
 from framework import VppTestCase, VppTestRunner
 import unittest
 
 from framework import VppTestCase, VppTestRunner
-from vpp_papi_provider import QOS_SOURCE
 from vpp_sub_interface import VppDot1QSubint
 from vpp_sub_interface import VppDot1QSubint
+from vpp_ip import DpoProto
 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
-    VppMplsLabel, VppMplsTable, DpoProto
+    VppMplsLabel, VppMplsTable, FibPathProto
 
 
+import scapy.compat
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether, Dot1Q
 from scapy.layers.inet import IP, UDP
 from scapy.layers.inet6 import IPv6
 from scapy.contrib.mpls import MPLS
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether, Dot1Q
 from scapy.layers.inet import IP, UDP
 from scapy.layers.inet6 import IPv6
 from scapy.contrib.mpls import MPLS
+from vpp_papi import VppEnum
+from vpp_qos import VppQosRecord, VppQosEgressMap, VppQosMark
+
+NUM_PKTS = 67
 
 
 class TestQOS(VppTestCase):
     """ QOS Test Case """
 
 
 
 class TestQOS(VppTestCase):
     """ QOS Test Case """
 
+    # Note: Since the enums aren't created dynamically until after
+    #       the papi client attaches to VPP, we put it in a property to
+    #       ensure it is the value at runtime, not at module load time.
+    @property
+    def QOS_SOURCE(self):
+        return VppEnum.vl_api_qos_source_t
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestQOS, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestQOS, cls).tearDownClass()
+
     def setUp(self):
         super(TestQOS, self).setUp()
 
     def setUp(self):
         super(TestQOS, self).setUp()
 
@@ -49,70 +69,67 @@ class TestQOS(VppTestCase):
         # for table 1 map the n=0xff possible values of input QoS mark,
         # n to 1-n
         #
         # for table 1 map the n=0xff possible values of input QoS mark,
         # n to 1-n
         #
-        output = [chr(0)] * 256
+        output = [scapy.compat.chb(0)] * 256
         for i in range(0, 255):
         for i in range(0, 255):
-            output[i] = chr(255 - i)
-        os = ''.join(output)
+            output[i] = scapy.compat.chb(255 - i)
+        os = b''.join(output)
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
                 {'outputs': os}]
 
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
                 {'outputs': os}]
 
-        self.vapi.qos_egress_map_update(1, rows)
+        qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
 
         #
         # For table 2 (and up) use the value n for everything
         #
 
         #
         # For table 2 (and up) use the value n for everything
         #
-        output = [chr(2)] * 256
-        os = ''.join(output)
+        output = [scapy.compat.chb(2)] * 256
+        os = b''.join(output)
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
                 {'outputs': os}]
 
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
                 {'outputs': os}]
 
-        self.vapi.qos_egress_map_update(2, rows)
+        qem2 = VppQosEgressMap(self, 2, rows).add_vpp_config()
 
 
-        output = [chr(3)] * 256
-        os = ''.join(output)
+        output = [scapy.compat.chb(3)] * 256
+        os = b''.join(output)
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
                 {'outputs': os}]
 
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
                 {'outputs': os}]
 
-        self.vapi.qos_egress_map_update(3, rows)
+        qem3 = VppQosEgressMap(self, 3, rows).add_vpp_config()
 
 
-        output = [chr(4)] * 256
-        os = ''.join(output)
+        output = [scapy.compat.chb(4)] * 256
+        os = b''.join(output)
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
                 {'outputs': os}]
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
                 {'outputs': os}]
-        self.vapi.qos_egress_map_update(4, rows)
-        self.vapi.qos_egress_map_update(5, rows)
-        self.vapi.qos_egress_map_update(6, rows)
-        self.vapi.qos_egress_map_update(7, rows)
 
 
+        qem4 = VppQosEgressMap(self, 4, rows).add_vpp_config()
+        qem5 = VppQosEgressMap(self, 5, rows).add_vpp_config()
+        qem6 = VppQosEgressMap(self, 6, rows).add_vpp_config()
+        qem7 = VppQosEgressMap(self, 7, rows).add_vpp_config()
+
+        self.assertTrue(qem7.query_vpp_config())
         self.logger.info(self.vapi.cli("sh qos eg map"))
 
         #
         # Bind interface pgN to table n
         #
         self.logger.info(self.vapi.cli("sh qos eg map"))
 
         #
         # Bind interface pgN to table n
         #
-        self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
-                                          QOS_SOURCE.IP,
-                                          1,
-                                          1)
-        self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
-                                          QOS_SOURCE.IP,
-                                          2,
-                                          1)
-        self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
-                                          QOS_SOURCE.IP,
-                                          3,
-                                          1)
-        self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
-                                          QOS_SOURCE.IP,
-                                          4,
-                                          1)
+        qm1 = VppQosMark(self, self.pg1, qem1,
+                         self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
+        qm2 = VppQosMark(self, self.pg2, qem2,
+                         self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
+        qm3 = VppQosMark(self, self.pg3, qem3,
+                         self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
+        qm4 = VppQosMark(self, self.pg4, qem4,
+                         self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
+        self.assertTrue(qm3.query_vpp_config())
+
+        self.logger.info(self.vapi.cli("sh qos mark"))
 
         #
         # packets ingress on Pg0
 
         #
         # packets ingress on Pg0
@@ -120,39 +137,40 @@ class TestQOS(VppTestCase):
         p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
                 UDP(sport=1234, dport=1234) /
         p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
                 UDP(sport=1234, dport=1234) /
-                Raw(chr(100) * 65))
+                Raw(scapy.compat.chb(100) * NUM_PKTS))
         p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
                      tc=1) /
                 UDP(sport=1234, dport=1234) /
         p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
                      tc=1) /
                 UDP(sport=1234, dport=1234) /
-                Raw(chr(100) * 65))
+                Raw(scapy.compat.chb(100) * NUM_PKTS))
 
         #
         # Since we have not yet enabled the recording of the input QoS
         # from the input iP header, the egress packet's ToS will be unchanged
         #
 
         #
         # Since we have not yet enabled the recording of the input QoS
         # from the input iP header, the egress packet's ToS will be unchanged
         #
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IP].tos, 1)
         for p in rx:
             self.assertEqual(p[IP].tos, 1)
-        rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IPv6].tc, 1)
 
         #
         for p in rx:
             self.assertEqual(p[IPv6].tc, 1)
 
         #
-        # Enable QoS recrding on IP input for pg0
+        # Enable QoS recording on IP input for pg0
         #
         #
-        self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
-                                            QOS_SOURCE.IP,
-                                            1)
+        qr1 = VppQosRecord(self, self.pg0,
+                           self.QOS_SOURCE.QOS_API_SOURCE_IP)
+        qr1.add_vpp_config()
+        self.logger.info(self.vapi.cli("sh qos record"))
 
         #
         # send the same packets, this time expect the input TOS of 1
         # to be mapped to pg1's egress value of 254
         #
 
         #
         # send the same packets, this time expect the input TOS of 1
         # to be mapped to pg1's egress value of 254
         #
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
-        rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IPv6].tc, 254)
 
         for p in rx:
             self.assertEqual(p[IPv6].tc, 254)
 
@@ -160,20 +178,20 @@ class TestQOS(VppTestCase):
         # different input ToS to test the mapping
         #
         p_v4[IP].tos = 127
         # different input ToS to test the mapping
         #
         p_v4[IP].tos = 127
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IP].tos, 128)
         p_v6[IPv6].tc = 127
         for p in rx:
             self.assertEqual(p[IP].tos, 128)
         p_v6[IPv6].tc = 127
-        rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IPv6].tc, 128)
 
         p_v4[IP].tos = 254
         for p in rx:
             self.assertEqual(p[IPv6].tc, 128)
 
         p_v4[IP].tos = 254
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IP].tos, 1)
         p_v6[IPv6].tc = 254
         for p in rx:
             self.assertEqual(p[IP].tos, 1)
         p_v6[IPv6].tc = 254
-        rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IPv6].tc, 1)
 
         for p in rx:
             self.assertEqual(p[IPv6].tc, 1)
 
@@ -182,40 +200,37 @@ class TestQOS(VppTestCase):
         # correctly applied
         #
         p_v4[IP].dst = self.pg2.remote_ip4
         # correctly applied
         #
         p_v4[IP].dst = self.pg2.remote_ip4
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
         for p in rx:
             self.assertEqual(p[IP].tos, 2)
 
         p_v4[IP].dst = self.pg3.remote_ip4
         for p in rx:
             self.assertEqual(p[IP].tos, 2)
 
         p_v4[IP].dst = self.pg3.remote_ip4
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
         for p in rx:
             self.assertEqual(p[IP].tos, 3)
 
         p_v6[IPv6].dst = self.pg3.remote_ip6
         for p in rx:
             self.assertEqual(p[IP].tos, 3)
 
         p_v6[IPv6].dst = self.pg3.remote_ip6
-        rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3)
+        rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg3)
         for p in rx:
             self.assertEqual(p[IPv6].tc, 3)
 
         #
         # remove the map on pg2 and pg3, now expect an unchanged IP tos
         #
         for p in rx:
             self.assertEqual(p[IPv6].tc, 3)
 
         #
         # remove the map on pg2 and pg3, now expect an unchanged IP tos
         #
-        self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
-                                          QOS_SOURCE.IP,
-                                          2,
-                                          0)
-        self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
-                                          QOS_SOURCE.IP,
-                                          3,
-                                          0)
+        qm2.remove_vpp_config()
+        qm3.remove_vpp_config()
+        self.logger.info(self.vapi.cli("sh qos mark"))
+
+        self.assertFalse(qm3.query_vpp_config())
         self.logger.info(self.vapi.cli("sh int feat pg2"))
 
         p_v4[IP].dst = self.pg2.remote_ip4
         self.logger.info(self.vapi.cli("sh int feat pg2"))
 
         p_v4[IP].dst = self.pg2.remote_ip4
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
 
         p_v4[IP].dst = self.pg3.remote_ip4
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
 
         p_v4[IP].dst = self.pg3.remote_ip4
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
 
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
 
@@ -223,68 +238,60 @@ class TestQOS(VppTestCase):
         # still mapping out of pg1
         #
         p_v4[IP].dst = self.pg1.remote_ip4
         # still mapping out of pg1
         #
         p_v4[IP].dst = self.pg1.remote_ip4
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IP].tos, 1)
 
         #
         # disable the input recording on pg0
         #
         for p in rx:
             self.assertEqual(p[IP].tos, 1)
 
         #
         # disable the input recording on pg0
         #
-        self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
-                                            QOS_SOURCE.IP,
-                                            0)
+        self.assertTrue(qr1.query_vpp_config())
+        qr1.remove_vpp_config()
 
         #
         # back to an unchanged TOS value
         #
 
         #
         # back to an unchanged TOS value
         #
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
 
         #
         # disable the egress map on pg1 and pg4
         #
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
 
         #
         # disable the egress map on pg1 and pg4
         #
-        self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
-                                          QOS_SOURCE.IP,
-                                          1,
-                                          0)
-        self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
-                                          QOS_SOURCE.IP,
-                                          4,
-                                          0)
+        qm1.remove_vpp_config()
+        qm4.remove_vpp_config()
 
         #
         # unchanged Tos on pg1
         #
 
         #
         # unchanged Tos on pg1
         #
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
 
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
 
-        #
-        # clean-up the masp
-        #
-        self.vapi.qos_egress_map_delete(1)
-        self.vapi.qos_egress_map_delete(4)
-        self.vapi.qos_egress_map_delete(2)
-        self.vapi.qos_egress_map_delete(3)
-        self.vapi.qos_egress_map_delete(5)
-        self.vapi.qos_egress_map_delete(6)
-        self.vapi.qos_egress_map_delete(7)
-
     def test_qos_mpls(self):
         """ QoS Mark/Record MPLS """
 
         #
         # 255 QoS for all input values
         #
     def test_qos_mpls(self):
         """ QoS Mark/Record MPLS """
 
         #
         # 255 QoS for all input values
         #
-        output = [chr(255)] * 256
-        os = ''.join(output)
-        rows = [{'outputs': os},
-                {'outputs': os},
-                {'outputs': os},
-                {'outputs': os}]
-
-        self.vapi.qos_egress_map_update(1, rows)
+        from_ext = 7
+        from_ip = 6
+        from_mpls = 5
+        from_vlan = 4
+        output = [scapy.compat.chb(from_ext)] * 256
+        os1 = b''.join(output)
+        output = [scapy.compat.chb(from_vlan)] * 256
+        os2 = b''.join(output)
+        output = [scapy.compat.chb(from_mpls)] * 256
+        os3 = b''.join(output)
+        output = [scapy.compat.chb(from_ip)] * 256
+        os4 = b''.join(output)
+        rows = [{'outputs': os1},
+                {'outputs': os2},
+                {'outputs': os3},
+                {'outputs': os4}]
+
+        qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
 
         #
         # a route with 1 MPLS label
 
         #
         # a route with 1 MPLS label
@@ -308,13 +315,10 @@ class TestQOS(VppTestCase):
         # enable IP QoS recording on the input Pg0 and MPLS egress marking
         # on Pg1
         #
         # enable IP QoS recording on the input Pg0 and MPLS egress marking
         # on Pg1
         #
-        self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
-                                            QOS_SOURCE.IP,
-                                            1)
-        self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
-                                          QOS_SOURCE.MPLS,
-                                          1,
-                                          1)
+        qr1 = VppQosRecord(self, self.pg0,
+                           self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
+        qm1 = VppQosMark(self, self.pg1, qem1,
+                         self.QOS_SOURCE.QOS_API_SOURCE_MPLS).add_vpp_config()
 
         #
         # packet that will get one label added and 3 labels added resp.
 
         #
         # packet that will get one label added and 3 labels added resp.
@@ -322,33 +326,33 @@ class TestQOS(VppTestCase):
         p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
                UDP(sport=1234, dport=1234) /
         p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
                UDP(sport=1234, dport=1234) /
-               Raw(chr(100) * 65))
+               Raw(scapy.compat.chb(100) * NUM_PKTS))
         p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
                UDP(sport=1234, dport=1234) /
         p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
                UDP(sport=1234, dport=1234) /
-               Raw(chr(100) * 65))
+               Raw(scapy.compat.chb(100) * NUM_PKTS))
 
 
-        rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
 
         #
         # only 3 bits of ToS value in MPLS make sure tos is correct
         # and the label and EOS bit have not been corrupted
         #
         for p in rx:
 
         #
         # only 3 bits of ToS value in MPLS make sure tos is correct
         # and the label and EOS bit have not been corrupted
         #
         for p in rx:
-            self.assertEqual(p[MPLS].cos, 7)
+            self.assertEqual(p[MPLS].cos, from_ip)
             self.assertEqual(p[MPLS].label, 32)
             self.assertEqual(p[MPLS].s, 1)
             self.assertEqual(p[MPLS].label, 32)
             self.assertEqual(p[MPLS].s, 1)
-        rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_3 * NUM_PKTS, self.pg1)
         for p in rx:
         for p in rx:
-            self.assertEqual(p[MPLS].cos, 7)
+            self.assertEqual(p[MPLS].cos, from_ip)
             self.assertEqual(p[MPLS].label, 63)
             self.assertEqual(p[MPLS].s, 0)
             h = p[MPLS].payload
             self.assertEqual(p[MPLS].label, 63)
             self.assertEqual(p[MPLS].s, 0)
             h = p[MPLS].payload
-            self.assertEqual(h[MPLS].cos, 7)
+            self.assertEqual(h[MPLS].cos, from_ip)
             self.assertEqual(h[MPLS].label, 33)
             self.assertEqual(h[MPLS].s, 0)
             h = h[MPLS].payload
             self.assertEqual(h[MPLS].label, 33)
             self.assertEqual(h[MPLS].s, 0)
             h = h[MPLS].payload
-            self.assertEqual(h[MPLS].cos, 7)
+            self.assertEqual(h[MPLS].cos, from_ip)
             self.assertEqual(h[MPLS].label, 34)
             self.assertEqual(h[MPLS].s, 1)
 
             self.assertEqual(h[MPLS].label, 34)
             self.assertEqual(h[MPLS].s, 1)
 
@@ -356,16 +360,15 @@ class TestQOS(VppTestCase):
         # enable MPLS QoS recording on the input Pg0 and IP egress marking
         # on Pg1
         #
         # enable MPLS QoS recording on the input Pg0 and IP egress marking
         # on Pg1
         #
-        self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
-                                            QOS_SOURCE.MPLS,
-                                            1)
-        self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
-                                          QOS_SOURCE.IP,
-                                          1,
-                                          1)
+        qr2 = VppQosRecord(
+            self, self.pg0,
+            self.QOS_SOURCE.QOS_API_SOURCE_MPLS).add_vpp_config()
+        qm2 = VppQosMark(
+            self, self.pg1, qem1,
+            self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
 
         #
 
         #
-        # MPLS x-connect - COS is preserved
+        # MPLS x-connect - COS according to pg1 map
         #
         route_32_eos = VppMplsRoute(self, 32, 1,
                                     [VppRoutePath(self.pg1.remote_ip4,
         #
         route_32_eos = VppMplsRoute(self, 32, 1,
                                     [VppRoutePath(self.pg1.remote_ip4,
@@ -377,11 +380,11 @@ class TestQOS(VppTestCase):
                 MPLS(label=32, cos=3, ttl=2) /
                 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
                 UDP(sport=1234, dport=1234) /
                 MPLS(label=32, cos=3, ttl=2) /
                 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
                 UDP(sport=1234, dport=1234) /
-                Raw(chr(100) * 65))
+                Raw(scapy.compat.chb(100) * NUM_PKTS))
 
 
-        rx = self.send_and_expect(self.pg0, p_m1 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_m1 * NUM_PKTS, self.pg1)
         for p in rx:
         for p in rx:
-            self.assertEqual(p[MPLS].cos, 7)
+            self.assertEqual(p[MPLS].cos, from_mpls)
             self.assertEqual(p[MPLS].label, 33)
             self.assertEqual(p[MPLS].s, 1)
 
             self.assertEqual(p[MPLS].label, 33)
             self.assertEqual(p[MPLS].s, 1)
 
@@ -403,31 +406,12 @@ class TestQOS(VppTestCase):
                 MPLS(label=33, ttl=2, cos=3) /
                 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
                 UDP(sport=1234, dport=1234) /
                 MPLS(label=33, ttl=2, cos=3) /
                 IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
                 UDP(sport=1234, dport=1234) /
-                Raw(chr(100) * 65))
+                Raw(scapy.compat.chb(100) * NUM_PKTS))
 
 
-        rx = self.send_and_expect(self.pg0, p_m2 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_m2 * NUM_PKTS, self.pg1)
 
         for p in rx:
 
         for p in rx:
-            self.assertEqual(p[IP].tos, 255)
-
-        #
-        # cleanup
-        #
-        self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
-                                            QOS_SOURCE.IP,
-                                            0)
-        self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
-                                          QOS_SOURCE.MPLS,
-                                          1,
-                                          0)
-        self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
-                                            QOS_SOURCE.MPLS,
-                                            0)
-        self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
-                                          QOS_SOURCE.IP,
-                                          1,
-                                          0)
-        self.vapi.qos_egress_map_delete(1)
+            self.assertEqual(p[IP].tos, from_mpls)
 
     def test_qos_vlan(self):
         """QoS mark/record VLAN """
 
     def test_qos_vlan(self):
         """QoS mark/record VLAN """
@@ -435,16 +419,16 @@ class TestQOS(VppTestCase):
         #
         # QoS for all input values
         #
         #
         # QoS for all input values
         #
-        output = [chr(0)] * 256
+        output = [scapy.compat.chb(0)] * 256
         for i in range(0, 255):
         for i in range(0, 255):
-            output[i] = chr(255 - i)
-        os = ''.join(output)
+            output[i] = scapy.compat.chb(255 - i)
+        os = b''.join(output)
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
                 {'outputs': os}]
 
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
                 {'outputs': os}]
 
-        self.vapi.qos_egress_map_update(1, rows)
+        qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
 
         sub_if = VppDot1QSubint(self, self.pg0, 11)
 
 
         sub_if = VppDot1QSubint(self, self.pg0, 11)
 
@@ -457,24 +441,22 @@ class TestQOS(VppTestCase):
         #
         # enable VLAN QoS recording/marking on the input Pg0 subinterface and
         #
         #
         # enable VLAN QoS recording/marking on the input Pg0 subinterface and
         #
-        self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
-                                            QOS_SOURCE.VLAN,
-                                            1)
-        self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
-                                          QOS_SOURCE.VLAN,
-                                          1,
-                                          1)
+        qr_v = VppQosRecord(
+            self, sub_if,
+            self.QOS_SOURCE.QOS_API_SOURCE_VLAN).add_vpp_config()
+        qm_v = VppQosMark(
+            self, sub_if, qem1,
+            self.QOS_SOURCE.QOS_API_SOURCE_VLAN).add_vpp_config()
 
         #
         # IP marking/recording on pg1
         #
 
         #
         # IP marking/recording on pg1
         #
-        self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
-                                            QOS_SOURCE.IP,
-                                            1)
-        self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
-                                          QOS_SOURCE.IP,
-                                          1,
-                                          1)
+        qr_ip = VppQosRecord(
+            self, self.pg1,
+            self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
+        qm_ip = VppQosMark(
+            self, self.pg1, qem1,
+            self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
 
         #
         # a routes to/from sub-interface
 
         #
         # a routes to/from sub-interface
@@ -489,34 +471,30 @@ class TestQOS(VppTestCase):
         route_10_0_0_2.add_vpp_config()
         route_2001_1 = VppIpRoute(self, "2001::1", 128,
                                   [VppRoutePath(sub_if.remote_ip6,
         route_10_0_0_2.add_vpp_config()
         route_2001_1 = VppIpRoute(self, "2001::1", 128,
                                   [VppRoutePath(sub_if.remote_ip6,
-                                                sub_if.sw_if_index,
-                                                proto=DpoProto.DPO_PROTO_IP6)],
-                                  is_ip6=1)
+                                                sub_if.sw_if_index)])
         route_2001_1.add_vpp_config()
         route_2001_2 = VppIpRoute(self, "2001::2", 128,
                                   [VppRoutePath(self.pg1.remote_ip6,
         route_2001_1.add_vpp_config()
         route_2001_2 = VppIpRoute(self, "2001::2", 128,
                                   [VppRoutePath(self.pg1.remote_ip6,
-                                                self.pg1.sw_if_index,
-                                                proto=DpoProto.DPO_PROTO_IP6)],
-                                  is_ip6=1)
+                                                self.pg1.sw_if_index)])
         route_2001_2.add_vpp_config()
 
         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                 Dot1Q(vlan=11, prio=1) /
                 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
                 UDP(sport=1234, dport=1234) /
         route_2001_2.add_vpp_config()
 
         p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                 Dot1Q(vlan=11, prio=1) /
                 IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
                 UDP(sport=1234, dport=1234) /
-                Raw(chr(100) * 65))
+                Raw(scapy.compat.chb(100) * NUM_PKTS))
 
         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
                 UDP(sport=1234, dport=1234) /
 
         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                 IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
                 UDP(sport=1234, dport=1234) /
-                Raw(chr(100) * 65))
+                Raw(scapy.compat.chb(100) * NUM_PKTS))
 
 
-        rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
+        rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
 
         for p in rx:
             self.assertEqual(p[Dot1Q].prio, 6)
 
 
         for p in rx:
             self.assertEqual(p[Dot1Q].prio, 6)
 
-        rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
 
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
 
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
@@ -525,19 +503,19 @@ class TestQOS(VppTestCase):
                 Dot1Q(vlan=11, prio=2) /
                 IPv6(src="2001::1", dst="2001::2", tc=1) /
                 UDP(sport=1234, dport=1234) /
                 Dot1Q(vlan=11, prio=2) /
                 IPv6(src="2001::1", dst="2001::2", tc=1) /
                 UDP(sport=1234, dport=1234) /
-                Raw(chr(100) * 65))
+                Raw(scapy.compat.chb(100) * NUM_PKTS))
 
         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                 IPv6(src="3001::1", dst="2001::1", tc=1) /
                 UDP(sport=1234, dport=1234) /
 
         p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
                 IPv6(src="3001::1", dst="2001::1", tc=1) /
                 UDP(sport=1234, dport=1234) /
-                Raw(chr(100) * 65))
+                Raw(scapy.compat.chb(100) * NUM_PKTS))
 
 
-        rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
+        rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
 
         for p in rx:
             self.assertEqual(p[Dot1Q].prio, 6)
 
 
         for p in rx:
             self.assertEqual(p[Dot1Q].prio, 6)
 
-        rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
 
         for p in rx:
             self.assertEqual(p[IPv6].tc, 253)
 
         for p in rx:
             self.assertEqual(p[IPv6].tc, 253)
@@ -548,21 +526,6 @@ class TestQOS(VppTestCase):
         sub_if.unconfig_ip4()
         sub_if.unconfig_ip6()
 
         sub_if.unconfig_ip4()
         sub_if.unconfig_ip6()
 
-        self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
-                                            QOS_SOURCE.VLAN,
-                                            0)
-        self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
-                                          QOS_SOURCE.VLAN,
-                                          1,
-                                          0)
-        self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
-                                            QOS_SOURCE.IP,
-                                            0)
-        self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
-                                          QOS_SOURCE.IP,
-                                          1,
-                                          0)
-
 
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)
 
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)