-#!/usr/bin/env python
+#!/usr/bin/env python3
import unittest
from scapy.layers.inet6 import IPv6
from scapy.contrib.mpls import MPLS
from vpp_papi import VppEnum
-from vpp_qos import VppQosRecord, VppQosEgressMap, VppQosMark
+from vpp_qos import VppQosRecord, VppQosEgressMap, VppQosMark, VppQosStore
NUM_PKTS = 67
super(TestQOS, self).tearDown()
def test_qos_ip(self):
- """ QoS Mark/Record IP """
+ """ QoS Mark/Record/Store IP """
#
# for table 1 map the n=0xff possible values of input QoS mark,
self.assertTrue(qr1.query_vpp_config())
qr1.remove_vpp_config()
+ #
+ # back to an unchanged TOS value
+ #
+ rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
+ for p in rx:
+ self.assertEqual(p[IP].tos, 254)
+
+ #
+ # enable QoS stroe instead of record
+ #
+ qst1 = VppQosStore(self, self.pg0,
+ self.QOS_SOURCE.QOS_API_SOURCE_IP,
+ 5).add_vpp_config()
+ self.logger.info(self.vapi.cli("sh qos store"))
+
+ p_v4[IP].dst = self.pg1.remote_ip4
+ rx = self.send_and_expect(self.pg0, p_v4 * NUM_PKTS, self.pg1)
+ for p in rx:
+ self.assertEqual(p[IP].tos, 250)
+
+ #
+ # disable the input storing on pg0
+ #
+ self.assertTrue(qst1.query_vpp_config())
+ qst1.remove_vpp_config()
+
#
# back to an unchanged TOS value
#
UDP(sport=1234, dport=1234) /
Raw(scapy.compat.chb(100) * NUM_PKTS))
+ p_v3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ Dot1Q(vlan=11, prio=1, id=1) /
+ IP(src="1.1.1.1", dst="10.0.0.2", tos=2) /
+ UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * NUM_PKTS))
+
rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
for p in rx:
- self.assertEqual(p[Dot1Q].prio, 6)
+ self.assertEqual(p[Dot1Q].prio, 7)
+ self.assertEqual(p[Dot1Q].id, 0)
+
+ rx = self.send_and_expect(self.pg0, p_v3 * NUM_PKTS, self.pg1)
+
+ for p in rx:
+ self.assertEqual(p[IP].tos, 252)
rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
for p in rx:
- self.assertEqual(p[IP].tos, 254)
+ self.assertEqual(p[IP].tos, 253)
p_v1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
Dot1Q(vlan=11, prio=2) /
rx = self.send_and_expect(self.pg1, p_v2 * NUM_PKTS, self.pg0)
for p in rx:
- self.assertEqual(p[Dot1Q].prio, 6)
+ self.assertEqual(p[Dot1Q].prio, 7)
+ self.assertEqual(p[Dot1Q].id, 0)
rx = self.send_and_expect(self.pg0, p_v1 * NUM_PKTS, self.pg1)
for p in rx:
- self.assertEqual(p[IPv6].tc, 253)
+ self.assertEqual(p[IPv6].tc, 251)
#
# cleanup