+ #
+ # enable MPLS QoS recording on the input Pg0 and IP egress marking
+ # on Pg1
+ #
+ qr2 = VppQosRecord(
+ self, self.pg0,
+ self.QOS_SOURCE.QOS_API_SOURCE_MPLS).add_vpp_config()
+ qm2 = VppQosMark(
+ self, self.pg1, qem1,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
+
+ #
+ # MPLS x-connect - COS according to pg1 map
+ #
+ route_32_eos = VppMplsRoute(self, 32, 1,
+ [VppRoutePath(self.pg1.remote_ip4,
+ self.pg1.sw_if_index,
+ labels=[VppMplsLabel(33)])])
+ route_32_eos.add_vpp_config()
+
+ p_m1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ MPLS(label=32, cos=3, ttl=2) /
+ IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
+ UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * NUM_PKTS))
+
+ rx = self.send_and_expect(self.pg0, p_m1 * NUM_PKTS, self.pg1)
+ for p in rx:
+ self.assertEqual(p[MPLS].cos, from_mpls)
+ self.assertEqual(p[MPLS].label, 33)
+ self.assertEqual(p[MPLS].s, 1)
+
+ #
+ # MPLS deag - COS is copied from MPLS to IP
+ #
+ route_33_eos = VppMplsRoute(self, 33, 1,
+ [VppRoutePath("0.0.0.0",
+ 0xffffffff,
+ nh_table_id=0)])
+ route_33_eos.add_vpp_config()
+
+ route_10_0_0_4 = VppIpRoute(self, "10.0.0.4", 32,
+ [VppRoutePath(self.pg1.remote_ip4,
+ self.pg1.sw_if_index)])
+ route_10_0_0_4.add_vpp_config()
+
+ p_m2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ MPLS(label=33, ttl=2, cos=3) /
+ IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
+ UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * NUM_PKTS))
+
+ rx = self.send_and_expect(self.pg0, p_m2 * NUM_PKTS, self.pg1)
+
+ for p in rx:
+ self.assertEqual(p[IP].tos, from_mpls)
+
+ def test_qos_vlan(self):
+ """QoS mark/record VLAN """
+
+ #
+ # QoS for all input values
+ #
+ output = [scapy.compat.chb(0)] * 256
+ for i in range(0, 255):
+ output[i] = scapy.compat.chb(255 - i)
+ os = b''.join(output)
+ rows = [{'outputs': os},
+ {'outputs': os},
+ {'outputs': os},
+ {'outputs': os}]
+
+ qem1 = VppQosEgressMap(self, 1, rows).add_vpp_config()
+
+ sub_if = VppDot1QSubint(self, self.pg0, 11)
+
+ sub_if.admin_up()
+ sub_if.config_ip4()
+ sub_if.resolve_arp()
+ sub_if.config_ip6()
+ sub_if.resolve_ndp()
+
+ #
+ # enable VLAN QoS recording/marking on the input Pg0 subinterface and
+ #
+ qr_v = VppQosRecord(
+ self, sub_if,
+ self.QOS_SOURCE.QOS_API_SOURCE_VLAN).add_vpp_config()
+ qm_v = VppQosMark(
+ self, sub_if, qem1,
+ self.QOS_SOURCE.QOS_API_SOURCE_VLAN).add_vpp_config()
+
+ #
+ # IP marking/recording on pg1
+ #
+ qr_ip = VppQosRecord(
+ self, self.pg1,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
+ qm_ip = VppQosMark(
+ self, self.pg1, qem1,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP).add_vpp_config()
+
+ #
+ # a routes to/from sub-interface
+ #
+ route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
+ [VppRoutePath(sub_if.remote_ip4,
+ sub_if.sw_if_index)])
+ route_10_0_0_1.add_vpp_config()
+ route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
+ [VppRoutePath(self.pg1.remote_ip4,
+ self.pg1.sw_if_index)])
+ route_10_0_0_2.add_vpp_config()
+ route_2001_1 = VppIpRoute(self, "2001::1", 128,
+ [VppRoutePath(sub_if.remote_ip6,
+ sub_if.sw_if_index)])
+ route_2001_1.add_vpp_config()
+ route_2001_2 = VppIpRoute(self, "2001::2", 128,
+ [VppRoutePath(self.pg1.remote_ip6,
+ self.pg1.sw_if_index)])
+ route_2001_2.add_vpp_config()
+
+ p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ Dot1Q(vlan=11, prio=1) /
+ IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
+ UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * NUM_PKTS))
+
+ p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
+ UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * NUM_PKTS))
+
+ p_v3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ Dot1Q(vlan=11, prio=1, id=1) /
+ IP(src="1.1.1.1", dst="10.0.0.2", tos=2) /
+ UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * NUM_PKTS))
+
+ rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
+
+ for p in rx:
+ self.assertEqual(p[Dot1Q].prio, 7)
+ self.assertEqual(p[Dot1Q].id, 0)
+
+ rx = self.send_and_expect(self.pg0, p_v3 * NUM_PKTS, self.pg1)
+
+ for p in rx:
+ self.assertEqual(p[IP].tos, 252)
+
+ rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
+
+ for p in rx:
+ self.assertEqual(p[IP].tos, 253)
+
+ p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ Dot1Q(vlan=11, prio=2) /
+ IPv6(src="2001::1", dst="2001::2", tc=1) /
+ UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * NUM_PKTS))
+
+ p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IPv6(src="3001::1", dst="2001::1", tc=1) /
+ UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * NUM_PKTS))
+
+ rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
+
+ for p in rx:
+ self.assertEqual(p[Dot1Q].prio, 7)
+ self.assertEqual(p[Dot1Q].id, 0)
+
+ rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
+
+ for p in rx:
+ self.assertEqual(p[IPv6].tc, 251)
+