tests: refactor. Replace literal constant w/ named constant.
[vpp.git] / test / test_qos.py
index 939cca5..8025bdc 100644 (file)
@@ -4,48 +4,67 @@ import unittest
 
 from framework import VppTestCase, VppTestRunner
 from vpp_papi_provider import QOS_SOURCE
 
 from framework import VppTestCase, VppTestRunner
 from vpp_papi_provider import QOS_SOURCE
-from vpp_ip_route import VppIpRoute, VppRoutePath
+from vpp_sub_interface import VppDot1QSubint
+from vpp_ip import DpoProto
+from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
+    VppMplsLabel, VppMplsTable
 
 
+import scapy.compat
 from scapy.packet import Raw
 from scapy.packet import Raw
-from scapy.layers.l2 import Ether
+from scapy.layers.l2 import Ether, Dot1Q
 from scapy.layers.inet import IP, UDP
 from scapy.layers.inet6 import IPv6
 from scapy.contrib.mpls import MPLS
 
 from scapy.layers.inet import IP, UDP
 from scapy.layers.inet6 import IPv6
 from scapy.contrib.mpls import MPLS
 
+NUM_PKTS = 67
+
 
 class TestQOS(VppTestCase):
     """ QOS Test Case """
 
 
 class TestQOS(VppTestCase):
     """ QOS Test Case """
 
+    @classmethod
+    def setUpClass(cls):
+        super(TestQOS, cls).setUpClass()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestQOS, cls).tearDownClass()
+
     def setUp(self):
         super(TestQOS, self).setUp()
 
         self.create_pg_interfaces(range(5))
 
     def setUp(self):
         super(TestQOS, self).setUp()
 
         self.create_pg_interfaces(range(5))
 
+        tbl = VppMplsTable(self, 0)
+        tbl.add_vpp_config()
+
         for i in self.pg_interfaces:
             i.admin_up()
             i.config_ip4()
             i.resolve_arp()
             i.config_ip6()
             i.resolve_ndp()
         for i in self.pg_interfaces:
             i.admin_up()
             i.config_ip4()
             i.resolve_arp()
             i.config_ip6()
             i.resolve_ndp()
+            i.enable_mpls()
 
     def tearDown(self):
         for i in self.pg_interfaces:
             i.unconfig_ip4()
             i.unconfig_ip6()
 
     def tearDown(self):
         for i in self.pg_interfaces:
             i.unconfig_ip4()
             i.unconfig_ip6()
+            i.disable_mpls()
 
         super(TestQOS, self).tearDown()
 
     def test_qos_ip(self):
 
         super(TestQOS, self).tearDown()
 
     def test_qos_ip(self):
-        """ QoS Mark IP """
+        """ QoS Mark/Record IP """
 
         #
         # for table 1 map the n=0xff possible values of input QoS mark,
         # n to 1-n
         #
 
         #
         # for table 1 map the n=0xff possible values of input QoS mark,
         # n to 1-n
         #
-        output = [chr(0)] * 256
+        output = [scapy.compat.chb(0)] * 256
         for i in range(0, 255):
         for i in range(0, 255):
-            output[i] = chr(255 - i)
-        os = ''.join(output)
+            output[i] = scapy.compat.chb(255 - i)
+        os = b''.join(output)
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
@@ -56,8 +75,8 @@ class TestQOS(VppTestCase):
         #
         # For table 2 (and up) use the value n for everything
         #
         #
         # For table 2 (and up) use the value n for everything
         #
-        output = [chr(2)] * 256
-        os = ''.join(output)
+        output = [scapy.compat.chb(2)] * 256
+        os = b''.join(output)
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
@@ -65,8 +84,8 @@ class TestQOS(VppTestCase):
 
         self.vapi.qos_egress_map_update(2, rows)
 
 
         self.vapi.qos_egress_map_update(2, rows)
 
-        output = [chr(3)] * 256
-        os = ''.join(output)
+        output = [scapy.compat.chb(3)] * 256
+        os = b''.join(output)
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
@@ -74,8 +93,8 @@ class TestQOS(VppTestCase):
 
         self.vapi.qos_egress_map_update(3, rows)
 
 
         self.vapi.qos_egress_map_update(3, rows)
 
-        output = [chr(4)] * 256
-        os = ''.join(output)
+        output = [scapy.compat.chb(4)] * 256
+        os = b''.join(output)
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
         rows = [{'outputs': os},
                 {'outputs': os},
                 {'outputs': os},
@@ -113,26 +132,26 @@ class TestQOS(VppTestCase):
         p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
                 UDP(sport=1234, dport=1234) /
         p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
                 UDP(sport=1234, dport=1234) /
-                Raw(chr(100) * 65))
+                Raw(scapy.compat.chb(100) * NUM_PKTS))
         p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
                      tc=1) /
                 UDP(sport=1234, dport=1234) /
         p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
                      tc=1) /
                 UDP(sport=1234, dport=1234) /
-                Raw(chr(100) * 65))
+                Raw(scapy.compat.chb(100) * NUM_PKTS))
 
         #
         # Since we have not yet enabled the recording of the input QoS
         # from the input iP header, the egress packet's ToS will be unchanged
         #
 
         #
         # Since we have not yet enabled the recording of the input QoS
         # from the input iP header, the egress packet's ToS will be unchanged
         #
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IP].tos, 1)
         for p in rx:
             self.assertEqual(p[IP].tos, 1)
-        rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IPv6].tc, 1)
 
         #
         for p in rx:
             self.assertEqual(p[IPv6].tc, 1)
 
         #
-        # Enable QoS recrding on IP input for pg0
+        # Enable QoS recording on IP input for pg0
         #
         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
                                             QOS_SOURCE.IP,
         #
         self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
                                             QOS_SOURCE.IP,
@@ -142,10 +161,10 @@ class TestQOS(VppTestCase):
         # send the same packets, this time expect the input TOS of 1
         # to be mapped to pg1's egress value of 254
         #
         # send the same packets, this time expect the input TOS of 1
         # to be mapped to pg1's egress value of 254
         #
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
-        rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IPv6].tc, 254)
 
         for p in rx:
             self.assertEqual(p[IPv6].tc, 254)
 
@@ -153,20 +172,20 @@ class TestQOS(VppTestCase):
         # different input ToS to test the mapping
         #
         p_v4[IP].tos = 127
         # different input ToS to test the mapping
         #
         p_v4[IP].tos = 127
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IP].tos, 128)
         p_v6[IPv6].tc = 127
         for p in rx:
             self.assertEqual(p[IP].tos, 128)
         p_v6[IPv6].tc = 127
-        rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IPv6].tc, 128)
 
         p_v4[IP].tos = 254
         for p in rx:
             self.assertEqual(p[IPv6].tc, 128)
 
         p_v4[IP].tos = 254
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IP].tos, 1)
         p_v6[IPv6].tc = 254
         for p in rx:
             self.assertEqual(p[IP].tos, 1)
         p_v6[IPv6].tc = 254
-        rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IPv6].tc, 1)
 
         for p in rx:
             self.assertEqual(p[IPv6].tc, 1)
 
@@ -175,17 +194,17 @@ class TestQOS(VppTestCase):
         # correctly applied
         #
         p_v4[IP].dst = self.pg2.remote_ip4
         # correctly applied
         #
         p_v4[IP].dst = self.pg2.remote_ip4
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
         for p in rx:
             self.assertEqual(p[IP].tos, 2)
 
         p_v4[IP].dst = self.pg3.remote_ip4
         for p in rx:
             self.assertEqual(p[IP].tos, 2)
 
         p_v4[IP].dst = self.pg3.remote_ip4
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
         for p in rx:
             self.assertEqual(p[IP].tos, 3)
 
         p_v6[IPv6].dst = self.pg3.remote_ip6
         for p in rx:
             self.assertEqual(p[IP].tos, 3)
 
         p_v6[IPv6].dst = self.pg3.remote_ip6
-        rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3)
+        rx = self.send_and_expect(self.pg0, p_v6 * NUM_PKTS, self.pg3)
         for p in rx:
             self.assertEqual(p[IPv6].tc, 3)
 
         for p in rx:
             self.assertEqual(p[IPv6].tc, 3)
 
@@ -203,12 +222,12 @@ class TestQOS(VppTestCase):
         self.logger.info(self.vapi.cli("sh int feat pg2"))
 
         p_v4[IP].dst = self.pg2.remote_ip4
         self.logger.info(self.vapi.cli("sh int feat pg2"))
 
         p_v4[IP].dst = self.pg2.remote_ip4
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg2)
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
 
         p_v4[IP].dst = self.pg3.remote_ip4
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
 
         p_v4[IP].dst = self.pg3.remote_ip4
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg3)
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
 
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
 
@@ -216,7 +235,7 @@ class TestQOS(VppTestCase):
         # still mapping out of pg1
         #
         p_v4[IP].dst = self.pg1.remote_ip4
         # still mapping out of pg1
         #
         p_v4[IP].dst = self.pg1.remote_ip4
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IP].tos, 1)
 
         for p in rx:
             self.assertEqual(p[IP].tos, 1)
 
@@ -230,7 +249,7 @@ class TestQOS(VppTestCase):
         #
         # back to an unchanged TOS value
         #
         #
         # back to an unchanged TOS value
         #
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
 
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
 
@@ -249,12 +268,12 @@ class TestQOS(VppTestCase):
         #
         # unchanged Tos on pg1
         #
         #
         # unchanged Tos on pg1
         #
-        rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
 
         #
         for p in rx:
             self.assertEqual(p[IP].tos, 254)
 
         #
-        # clean-up the masp
+        # clean-up the map
         #
         self.vapi.qos_egress_map_delete(1)
         self.vapi.qos_egress_map_delete(4)
         #
         self.vapi.qos_egress_map_delete(1)
         self.vapi.qos_egress_map_delete(4)
@@ -265,17 +284,27 @@ class TestQOS(VppTestCase):
         self.vapi.qos_egress_map_delete(7)
 
     def test_qos_mpls(self):
         self.vapi.qos_egress_map_delete(7)
 
     def test_qos_mpls(self):
-        """ QoS Mark MPLS """
+        """ QoS Mark/Record MPLS """
 
         #
         # 255 QoS for all input values
         #
 
         #
         # 255 QoS for all input values
         #
-        output = [chr(255)] * 256
-        os = ''.join(output)
-        rows = [{'outputs': os},
-                {'outputs': os},
-                {'outputs': os},
-                {'outputs': os}]
+        from_ext = 7
+        from_ip = 6
+        from_mpls = 5
+        from_vlan = 4
+        output = [scapy.compat.chb(from_ext)] * 256
+        os1 = b''.join(output)
+        output = [scapy.compat.chb(from_vlan)] * 256
+        os2 = b''.join(output)
+        output = [scapy.compat.chb(from_mpls)] * 256
+        os3 = b''.join(output)
+        output = [scapy.compat.chb(from_ip)] * 256
+        os4 = b''.join(output)
+        rows = [{'outputs': os1},
+                {'outputs': os2},
+                {'outputs': os3},
+                {'outputs': os4}]
 
         self.vapi.qos_egress_map_update(1, rows)
 
 
         self.vapi.qos_egress_map_update(1, rows)
 
@@ -315,36 +344,94 @@ class TestQOS(VppTestCase):
         p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
                UDP(sport=1234, dport=1234) /
         p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
                UDP(sport=1234, dport=1234) /
-               Raw(chr(100) * 65))
+               Raw(scapy.compat.chb(100) * NUM_PKTS))
         p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
                UDP(sport=1234, dport=1234) /
         p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
                UDP(sport=1234, dport=1234) /
-               Raw(chr(100) * 65))
+               Raw(scapy.compat.chb(100) * NUM_PKTS))
 
 
-        rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
 
         #
         # only 3 bits of ToS value in MPLS make sure tos is correct
         # and the label and EOS bit have not been corrupted
         #
         for p in rx:
 
         #
         # only 3 bits of ToS value in MPLS make sure tos is correct
         # and the label and EOS bit have not been corrupted
         #
         for p in rx:
-            self.assertEqual(p[MPLS].cos, 7)
+            self.assertEqual(p[MPLS].cos, from_ip)
             self.assertEqual(p[MPLS].label, 32)
             self.assertEqual(p[MPLS].s, 1)
             self.assertEqual(p[MPLS].label, 32)
             self.assertEqual(p[MPLS].s, 1)
-        rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1)
+        rx = self.send_and_expect(self.pg0, p_3 * NUM_PKTS, self.pg1)
         for p in rx:
         for p in rx:
-            self.assertEqual(p[MPLS].cos, 7)
+            self.assertEqual(p[MPLS].cos, from_ip)
             self.assertEqual(p[MPLS].label, 63)
             self.assertEqual(p[MPLS].s, 0)
             h = p[MPLS].payload
             self.assertEqual(p[MPLS].label, 63)
             self.assertEqual(p[MPLS].s, 0)
             h = p[MPLS].payload
-            self.assertEqual(h[MPLS].cos, 7)
+            self.assertEqual(h[MPLS].cos, from_ip)
             self.assertEqual(h[MPLS].label, 33)
             self.assertEqual(h[MPLS].s, 0)
             h = h[MPLS].payload
             self.assertEqual(h[MPLS].label, 33)
             self.assertEqual(h[MPLS].s, 0)
             h = h[MPLS].payload
-            self.assertEqual(h[MPLS].cos, 7)
+            self.assertEqual(h[MPLS].cos, from_ip)
             self.assertEqual(h[MPLS].label, 34)
             self.assertEqual(h[MPLS].s, 1)
 
             self.assertEqual(h[MPLS].label, 34)
             self.assertEqual(h[MPLS].s, 1)
 
+        #
+        # enable MPLS QoS recording on the input Pg0 and IP egress marking
+        # on Pg1
+        #
+        self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
+                                            QOS_SOURCE.MPLS,
+                                            1)
+        self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
+                                          QOS_SOURCE.IP,
+                                          1,
+                                          1)
+
+        #
+        # MPLS x-connect - COS according to pg1 map
+        #
+        route_32_eos = VppMplsRoute(self, 32, 1,
+                                    [VppRoutePath(self.pg1.remote_ip4,
+                                                  self.pg1.sw_if_index,
+                                                  labels=[VppMplsLabel(33)])])
+        route_32_eos.add_vpp_config()
+
+        p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+                MPLS(label=32, cos=3, ttl=2) /
+                IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
+                UDP(sport=1234, dport=1234) /
+                Raw(scapy.compat.chb(100) * NUM_PKTS))
+
+        rx = self.send_and_expect(self.pg0, p_m1 * NUM_PKTS, self.pg1)
+        for p in rx:
+            self.assertEqual(p[MPLS].cos, from_mpls)
+            self.assertEqual(p[MPLS].label, 33)
+            self.assertEqual(p[MPLS].s, 1)
+
+        #
+        # MPLS deag - COS is copied from MPLS to IP
+        #
+        route_33_eos = VppMplsRoute(self, 33, 1,
+                                    [VppRoutePath("0.0.0.0",
+                                                  0xffffffff,
+                                                  nh_table_id=0)])
+        route_33_eos.add_vpp_config()
+
+        route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
+                                    [VppRoutePath(self.pg1.remote_ip4,
+                                                  self.pg1.sw_if_index)])
+        route_10_0_0_4.add_vpp_config()
+
+        p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+                MPLS(label=33, ttl=2, cos=3) /
+                IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
+                UDP(sport=1234, dport=1234) /
+                Raw(scapy.compat.chb(100) * NUM_PKTS))
+
+        rx = self.send_and_expect(self.pg0, p_m2 * NUM_PKTS, self.pg1)
+
+        for p in rx:
+            self.assertEqual(p[IP].tos, from_mpls)
+
         #
         # cleanup
         #
         #
         # cleanup
         #
@@ -355,8 +442,149 @@ class TestQOS(VppTestCase):
                                           QOS_SOURCE.MPLS,
                                           1,
                                           0)
                                           QOS_SOURCE.MPLS,
                                           1,
                                           0)
+        self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
+                                            QOS_SOURCE.MPLS,
+                                            0)
+        self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
+                                          QOS_SOURCE.IP,
+                                          1,
+                                          0)
         self.vapi.qos_egress_map_delete(1)
 
         self.vapi.qos_egress_map_delete(1)
 
+    def test_qos_vlan(self):
+        """QoS mark/record VLAN """
+
+        #
+        # QoS for all input values
+        #
+        output = [scapy.compat.chb(0)] * 256
+        for i in range(0, 255):
+            output[i] = scapy.compat.chb(255 - i)
+        os = b''.join(output)
+        rows = [{'outputs': os},
+                {'outputs': os},
+                {'outputs': os},
+                {'outputs': os}]
+
+        self.vapi.qos_egress_map_update(1, rows)
+
+        sub_if = VppDot1QSubint(self, self.pg0, 11)
+
+        sub_if.admin_up()
+        sub_if.config_ip4()
+        sub_if.resolve_arp()
+        sub_if.config_ip6()
+        sub_if.resolve_ndp()
+
+        #
+        # enable VLAN QoS recording/marking on the input Pg0 subinterface and
+        #
+        self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
+                                            QOS_SOURCE.VLAN,
+                                            1)
+        self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
+                                          QOS_SOURCE.VLAN,
+                                          1,
+                                          1)
+
+        #
+        # IP marking/recording on pg1
+        #
+        self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
+                                            QOS_SOURCE.IP,
+                                            1)
+        self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
+                                          QOS_SOURCE.IP,
+                                          1,
+                                          1)
+
+        #
+        # a routes to/from sub-interface
+        #
+        route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
+                                    [VppRoutePath(sub_if.remote_ip4,
+                                                  sub_if.sw_if_index)])
+        route_10_0_0_1.add_vpp_config()
+        route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
+                                    [VppRoutePath(self.pg1.remote_ip4,
+                                                  self.pg1.sw_if_index)])
+        route_10_0_0_2.add_vpp_config()
+        route_2001_1 = VppIpRoute(self, "2001::1", 128,
+                                  [VppRoutePath(sub_if.remote_ip6,
+                                                sub_if.sw_if_index,
+                                                proto=DpoProto.DPO_PROTO_IP6)],
+                                  is_ip6=1)
+        route_2001_1.add_vpp_config()
+        route_2001_2 = VppIpRoute(self, "2001::2", 128,
+                                  [VppRoutePath(self.pg1.remote_ip6,
+                                                self.pg1.sw_if_index,
+                                                proto=DpoProto.DPO_PROTO_IP6)],
+                                  is_ip6=1)
+        route_2001_2.add_vpp_config()
+
+        p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+                Dot1Q(vlan=11, prio=1) /
+                IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
+                UDP(sport=1234, dport=1234) /
+                Raw(scapy.compat.chb(100) * NUM_PKTS))
+
+        p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+                IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
+                UDP(sport=1234, dport=1234) /
+                Raw(scapy.compat.chb(100) * NUM_PKTS))
+
+        rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
+
+        for p in rx:
+            self.assertEqual(p[Dot1Q].prio, 6)
+
+        rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
+
+        for p in rx:
+            self.assertEqual(p[IP].tos, 254)
+
+        p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+                Dot1Q(vlan=11, prio=2) /
+                IPv6(src="2001::1", dst="2001::2", tc=1) /
+                UDP(sport=1234, dport=1234) /
+                Raw(scapy.compat.chb(100) * NUM_PKTS))
+
+        p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+                IPv6(src="3001::1", dst="2001::1", tc=1) /
+                UDP(sport=1234, dport=1234) /
+                Raw(scapy.compat.chb(100) * NUM_PKTS))
+
+        rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
+
+        for p in rx:
+            self.assertEqual(p[Dot1Q].prio, 6)
+
+        rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
+
+        for p in rx:
+            self.assertEqual(p[IPv6].tc, 253)
+
+        #
+        # cleanup
+        #
+        sub_if.unconfig_ip4()
+        sub_if.unconfig_ip6()
+
+        self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
+                                            QOS_SOURCE.VLAN,
+                                            0)
+        self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
+                                          QOS_SOURCE.VLAN,
+                                          1,
+                                          0)
+        self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
+                                            QOS_SOURCE.IP,
+                                            0)
+        self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
+                                          QOS_SOURCE.IP,
+                                          1,
+                                          0)
+
 
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)
 
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)