time_diff, 0.70, 1.05, "time between slow packets")
prev_packet = next_packet
time_diff, 0.70, 1.05, "time between slow packets")
prev_packet = next_packet
e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
verify_event(self, e, expected_state=BFDState.down)
e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
verify_event(self, e, expected_state=BFDState.down)
def test_immediate_remote_min_rx_reduction(self):
""" immediately honor remote required min rx reduction """
self.vpp_session.remove_vpp_config()
def test_immediate_remote_min_rx_reduction(self):
""" immediately honor remote required min rx reduction """
self.vpp_session.remove_vpp_config()
self.assert_in_range(time_to_event, .9 * timeout,
1.1 * timeout, "session timeout")
self.assert_in_range(time_to_event, .9 * timeout,
1.1 * timeout, "session timeout")
def test_modify_req_min_rx_halve(self):
""" modify session - halve required min rx """
self.vpp_session.modify_parameters(
def test_modify_req_min_rx_halve(self):
""" modify session - halve required min rx """
self.vpp_session.modify_parameters(
"time before bfd session goes down")
verify_event(self, e, expected_state=BFDState.down)
"time before bfd session goes down")
verify_event(self, e, expected_state=BFDState.down)
self.assertNotIn("P", p.sprintf("%BFD.flags%"),
"Poll bit not set in BFD packet")
self.assertNotIn("P", p.sprintf("%BFD.flags%"),
"Poll bit not set in BFD packet")
self.assertNotIn("P", p.sprintf("%BFD.flags%"),
"Poll bit set in BFD packet")
self.assertNotIn("P", p.sprintf("%BFD.flags%"),
"Poll bit set in BFD packet")
self, pcap_time_min=time.time() - self.vpp_clock_offset)
self.assertIn("F", final.sprintf("%BFD.flags%"))
self, pcap_time_min=time.time() - self.vpp_clock_offset)
self.assertIn("F", final.sprintf("%BFD.flags%"))
def test_no_periodic_if_remote_demand(self):
""" no periodic frames outside poll sequence if remote demand set """
bfd_session_up(self)
def test_no_periodic_if_remote_demand(self):
""" no periodic frames outside poll sequence if remote demand set """
bfd_session_up(self)
self.test_session.send_packet()
self.assertTrue(echo_seen, "No echo packets received")
self.test_session.send_packet()
self.assertTrue(echo_seen, "No echo packets received")
self.assert_equal(events[0].state, BFDState.down, BFDState)
self.assertTrue(verified_diag, "Incorrect diagnostics code received")
self.assert_equal(events[0].state, BFDState.down, BFDState)
self.assertTrue(verified_diag, "Incorrect diagnostics code received")
events = self.vapi.collect_events()
self.assert_equal(len(events), 0, "number of bfd events")
events = self.vapi.collect_events()
self.assert_equal(len(events), 0, "number of bfd events")
events = self.vapi.collect_events()
self.assert_equal(len(events), 0, "number of bfd events")
events = self.vapi.collect_events()
self.assert_equal(len(events), 0, "number of bfd events")
self.test_session.send_packet()
self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
self.test_session.send_packet()
self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
def test_invalid_echo_checksum(self):
""" echo packets with invalid checksum don't keep a session up """
bfd_session_up(self)
def test_invalid_echo_checksum(self):
""" echo packets with invalid checksum don't keep a session up """
bfd_session_up(self)
self.test_session.send_packet()
self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
self.test_session.send_packet()
self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
verify_event(self, e, expected_state=BFDState.up)
e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
verify_event(self, e, expected_state=BFDState.up)
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
def test_hold_up_meticulous(self):
""" hold BFD session up - meticulous auth """
key = self.factory.create_random_key(
def test_hold_up_meticulous(self):
""" hold BFD session up - meticulous auth """
key = self.factory.create_random_key(
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
def test_send_bad_seq_number(self):
""" session is not kept alive by msgs with bad sequence numbers"""
key = self.factory.create_random_key(
def test_send_bad_seq_number(self):
""" session is not kept alive by msgs with bad sequence numbers"""
key = self.factory.create_random_key(
wait_for_bfd_packet(self)
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
wait_for_bfd_packet(self)
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
def test_mismatch_auth(self):
""" session is not brought down by unauthenticated msg """
key = self.factory.create_random_key(self)
def test_mismatch_auth(self):
""" session is not brought down by unauthenticated msg """
key = self.factory.create_random_key(self)
def test_mismatch_bfd_key_id(self):
""" session is not brought down by msg with non-existent key-id """
key = self.factory.create_random_key(self)
def test_mismatch_bfd_key_id(self):
""" session is not brought down by msg with non-existent key-id """
key = self.factory.create_random_key(self)
def test_mismatched_auth_type(self):
""" session is not brought down by msg with wrong auth type """
key = self.factory.create_random_key(self)
def test_mismatched_auth_type(self):
""" session is not brought down by msg with wrong auth type """
key = self.factory.create_random_key(self)
vpp_session, legitimate_test_session, rogue_test_session,
{'auth_type': BFDAuthType.keyed_md5})
vpp_session, legitimate_test_session, rogue_test_session,
{'auth_type': BFDAuthType.keyed_md5})
class BFDAuthOnOffTestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (changing auth) """
class BFDAuthOnOffTestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (changing auth) """