+ def test_qos_vlan(self):
+ """QoS mark/record VLAN """
+
+ #
+ # QoS for all input values
+ #
+ output = [chr(0)] * 256
+ for i in range(0, 255):
+ output[i] = chr(255 - i)
+ os = ''.join(output)
+ rows = [{'outputs': os},
+ {'outputs': os},
+ {'outputs': os},
+ {'outputs': os}]
+
+ self.vapi.qos_egress_map_update(1, rows)
+
+ sub_if = VppDot1QSubint(self, self.pg0, 11)
+
+ sub_if.admin_up()
+ sub_if.config_ip4()
+ sub_if.resolve_arp()
+ sub_if.config_ip6()
+ sub_if.resolve_ndp()
+
+ #
+ # enable VLAN QoS recording/marking on the input Pg0 subinterface and
+ #
+ self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
+ QOS_SOURCE.VLAN,
+ 1)
+ self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
+ QOS_SOURCE.VLAN,
+ 1,
+ 1)
+
+ #
+ # IP marking/recording on pg1
+ #
+ self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
+ QOS_SOURCE.IP,
+ 1)
+ self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
+ QOS_SOURCE.IP,
+ 1,
+ 1)
+
+ #
+ # a routes to/from sub-interface
+ #
+ route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
+ [VppRoutePath(sub_if.remote_ip4,
+ sub_if.sw_if_index)])
+ route_10_0_0_1.add_vpp_config()
+ route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
+ [VppRoutePath(self.pg1.remote_ip4,
+ self.pg1.sw_if_index)])
+ route_10_0_0_2.add_vpp_config()
+ route_2001_1 = VppIpRoute(self, "2001::1", 128,
+ [VppRoutePath(sub_if.remote_ip6,
+ sub_if.sw_if_index,
+ proto=DpoProto.DPO_PROTO_IP6)],
+ is_ip6=1)
+ route_2001_1.add_vpp_config()
+ route_2001_2 = VppIpRoute(self, "2001::2", 128,
+ [VppRoutePath(self.pg1.remote_ip6,
+ self.pg1.sw_if_index,
+ proto=DpoProto.DPO_PROTO_IP6)],
+ is_ip6=1)
+ route_2001_2.add_vpp_config()
+
+ p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ Dot1Q(vlan=11, prio=1) /
+ IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
+ UDP(sport=1234, dport=1234) /
+ Raw(chr(100) * 65))
+
+ p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
+ UDP(sport=1234, dport=1234) /
+ Raw(chr(100) * 65))
+
+ rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
+
+ for p in rx:
+ self.assertEqual(p[Dot1Q].prio, 6)
+
+ rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
+
+ for p in rx:
+ self.assertEqual(p[IP].tos, 254)
+
+ p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ Dot1Q(vlan=11, prio=2) /
+ IPv6(src="2001::1", dst="2001::2", tc=1) /
+ UDP(sport=1234, dport=1234) /
+ Raw(chr(100) * 65))
+
+ p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IPv6(src="3001::1", dst="2001::1", tc=1) /
+ UDP(sport=1234, dport=1234) /
+ Raw(chr(100) * 65))
+
+ rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
+
+ for p in rx:
+ self.assertEqual(p[Dot1Q].prio, 6)
+
+ rx = self.send_and_expect(self.pg0, p_v1 * 65, self.pg1)
+
+ for p in rx:
+ self.assertEqual(p[IPv6].tc, 253)
+
+ #
+ # cleanup
+ #
+ sub_if.unconfig_ip4()
+ sub_if.unconfig_ip6()
+
+ self.vapi.qos_record_enable_disable(sub_if.sw_if_index,
+ QOS_SOURCE.VLAN,
+ 0)
+ self.vapi.qos_mark_enable_disable(sub_if.sw_if_index,
+ QOS_SOURCE.VLAN,
+ 1,
+ 0)
+ self.vapi.qos_record_enable_disable(self.pg1.sw_if_index,
+ QOS_SOURCE.IP,
+ 0)
+ self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
+ QOS_SOURCE.IP,
+ 1,
+ 0)
+