BFD_vpp_echo,
BFD_UDP_SH_PORT,
BFD_UDP_MH_PORT,
+ BFD_UDP_DEFAULT_TOS,
)
from framework import VppTestCase
from asfframework import (
self.assertFalse(echo_source.have_usable_ip4)
self.assertFalse(echo_source.have_usable_ip6)
+ def test_set_udp_tos(self):
+ """test udp tos configuration"""
+ if self.multihop:
+ return
+ self.vapi.bfd_udp_set_tos(BFD_UDP_DEFAULT_TOS)
+ my_tos = self.vapi.bfd_udp_get_tos()
+ self.logger.debug("MY_TOS is %s", my_tos)
+ self.assertTrue(my_tos.tos == BFD_UDP_DEFAULT_TOS)
+ new_tos_value = 112
+ self.vapi.bfd_udp_set_tos(new_tos_value)
+ my_tos = self.vapi.bfd_udp_get_tos()
+ self.assertTrue(my_tos.tos == new_tos_value)
+ self.vapi.bfd_udp_set_tos(BFD_UDP_DEFAULT_TOS)
+
class BFDTestSession(object):
"""BFD session as seen from test framework side"""
)
-def verify_ip(test, packet):
+def verify_ip(test, packet, expected_tos=None):
"""Verify correctness of IP layer."""
if test.vpp_session.af == AF_INET6:
ip = packet[IPv6]
local_ip = test.vpp_session.local_addr
remote_ip = test.vpp_session.peer_addr
test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
+ if expected_tos is not None:
+ test.assert_equal(ip.tc, expected_tos, "IPv6 TOS")
else:
ip = packet[IP]
local_ip = test.vpp_session.local_addr
remote_ip = test.vpp_session.peer_addr
test.assert_equal(ip.ttl, 255, "IPv4 TTL")
+ if expected_tos is not None:
+ test.assert_equal(ip.tos, expected_tos, "IPv4 TOS")
test.assert_equal(ip.src, local_ip, "IP source address")
test.assert_equal(ip.dst, remote_ip, "IP destination address")
test.assert_equal(e.state, expected_state, BFDState)
-def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
+def wait_for_bfd_packet(
+ test, timeout=1, pcap_time_min=None, is_tunnel=False, expected_tos=None
+):
"""wait for BFD packet and verify its correctness
:param timeout: how long to wait
:param pcap_time_min: ignore packets with pcap timestamp lower than this
+ :param expected_tos: expected TOS/TC value for IP verification
:returns: tuple (packet, time spent waiting for packet)
"""
raise Exception(ppp("Unexpected or invalid BFD packet:", p))
if bfd.payload:
raise Exception(ppp("Unexpected payload in BFD packet:", bfd))
- verify_ip(test, p)
+ verify_ip(test, p, expected_tos)
verify_udp(test, p)
test.test_session.verify_bfd(p)
return p
self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
self.assertFalse(vpp_session.query_vpp_config())
+ def test_bfd_traffic_with_tos_configured(self):
+ """test BFD traffic with TOS values configured"""
+ # Test different TOS values
+ test_tos_values = [0, 112, 192, 255]
+
+ for tos_value in test_tos_values:
+ with self.subTest(tos=tos_value):
+ self.logger.info(f"Testing BFD traffic with TOS value: {tos_value}")
+ # Set the TOS value
+ self.vapi.bfd_udp_set_tos(tos_value)
+ # Verify the TOS value was set
+ current_tos = self.vapi.bfd_udp_get_tos()
+ self.assertEqual(
+ current_tos.tos, tos_value, f"TOS should be set to {tos_value}"
+ )
+ # Start the BFD session and wait for the first packet
+ self.logger.info("BFD: Waiting for packet with custom TOS")
+ p = wait_for_bfd_packet(
+ self,
+ timeout=2,
+ is_tunnel=self.vpp_session.is_tunnel,
+ expected_tos=tos_value,
+ )
+ # Verify the packet has the expected TOS value
+ if self.vpp_session.af == AF_INET6:
+ actual_tos = p[IPv6].tc
+ field_name = "IPv6 TC"
+ else:
+ actual_tos = p[IP].tos
+ field_name = "IPv4 TOS"
+ self.assertEqual(
+ actual_tos,
+ tos_value,
+ f"{field_name} should be {tos_value}, got {actual_tos}",
+ )
+ self.logger.info(
+ f"Successfully verified BFD packet with TOS {tos_value}"
+ )
+ # Send a response packet to keep the session alive
+ self.test_session.send_packet()
+ # Brief pause between tests
+ self.sleep(0.2, "brief pause between TOS tests")
+
@parameterized_class(
[