from socket import AF_INET, AF_INET6, inet_ntop
from struct import pack, unpack
from socket import AF_INET, AF_INET6, inet_ntop
from struct import pack, unpack
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
def verify_event(test, event, expected_state):
""" Verify correctness of event values. """
e = event
def verify_event(test, event, expected_state):
""" Verify correctness of event values. """
e = event
test.assert_equal(e.sw_if_index,
test.vpp_session.interface.sw_if_index,
"BFD interface index")
test.assert_equal(e.sw_if_index,
test.vpp_session.interface.sw_if_index,
"BFD interface index")
time_diff, 0.70, 1.05, "time between slow packets")
prev_packet = next_packet
time_diff, 0.70, 1.05, "time between slow packets")
prev_packet = next_packet
e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
verify_event(self, e, expected_state=BFDState.down)
e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
verify_event(self, e, expected_state=BFDState.down)
def test_immediate_remote_min_rx_reduction(self):
""" immediately honor remote required min rx reduction """
self.vpp_session.remove_vpp_config()
def test_immediate_remote_min_rx_reduction(self):
""" immediately honor remote required min rx reduction """
self.vpp_session.remove_vpp_config()
self.assert_in_range(time_to_event, .9 * timeout,
1.1 * timeout, "session timeout")
self.assert_in_range(time_to_event, .9 * timeout,
1.1 * timeout, "session timeout")
def test_modify_req_min_rx_halve(self):
""" modify session - halve required min rx """
self.vpp_session.modify_parameters(
def test_modify_req_min_rx_halve(self):
""" modify session - halve required min rx """
self.vpp_session.modify_parameters(
"time before bfd session goes down")
verify_event(self, e, expected_state=BFDState.down)
"time before bfd session goes down")
verify_event(self, e, expected_state=BFDState.down)
self.assertNotIn("P", p.sprintf("%BFD.flags%"),
"Poll bit not set in BFD packet")
self.assertNotIn("P", p.sprintf("%BFD.flags%"),
"Poll bit not set in BFD packet")
self.assertNotIn("P", p.sprintf("%BFD.flags%"),
"Poll bit set in BFD packet")
self.assertNotIn("P", p.sprintf("%BFD.flags%"),
"Poll bit set in BFD packet")
self, pcap_time_min=time.time() - self.vpp_clock_offset)
self.assertIn("F", final.sprintf("%BFD.flags%"))
self, pcap_time_min=time.time() - self.vpp_clock_offset)
self.assertIn("F", final.sprintf("%BFD.flags%"))
def test_no_periodic_if_remote_demand(self):
""" no periodic frames outside poll sequence if remote demand set """
bfd_session_up(self)
def test_no_periodic_if_remote_demand(self):
""" no periodic frames outside poll sequence if remote demand set """
bfd_session_up(self)
self.test_session.send_packet()
self.assertTrue(echo_seen, "No echo packets received")
self.test_session.send_packet()
self.assertTrue(echo_seen, "No echo packets received")
self.assert_equal(events[0].state, BFDState.down, BFDState)
self.assertTrue(verified_diag, "Incorrect diagnostics code received")
self.assert_equal(events[0].state, BFDState.down, BFDState)
self.assertTrue(verified_diag, "Incorrect diagnostics code received")
events = self.vapi.collect_events()
self.assert_equal(len(events), 0, "number of bfd events")
events = self.vapi.collect_events()
self.assert_equal(len(events), 0, "number of bfd events")
events = self.vapi.collect_events()
self.assert_equal(len(events), 0, "number of bfd events")
events = self.vapi.collect_events()
self.assert_equal(len(events), 0, "number of bfd events")
self.test_session.send_packet()
self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
self.test_session.send_packet()
self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
def test_invalid_echo_checksum(self):
""" echo packets with invalid checksum don't keep a session up """
bfd_session_up(self)
def test_invalid_echo_checksum(self):
""" echo packets with invalid checksum don't keep a session up """
bfd_session_up(self)
self.test_session.send_packet()
self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
self.test_session.send_packet()
self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
verify_event(self, e, expected_state=BFDState.up)
e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
verify_event(self, e, expected_state=BFDState.up)
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
def test_hold_up_meticulous(self):
""" hold BFD session up - meticulous auth """
key = self.factory.create_random_key(
def test_hold_up_meticulous(self):
""" hold BFD session up - meticulous auth """
key = self.factory.create_random_key(
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
def test_send_bad_seq_number(self):
""" session is not kept alive by msgs with bad sequence numbers"""
key = self.factory.create_random_key(
def test_send_bad_seq_number(self):
""" session is not kept alive by msgs with bad sequence numbers"""
key = self.factory.create_random_key(
wait_for_bfd_packet(self)
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
wait_for_bfd_packet(self)
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
def test_mismatch_auth(self):
""" session is not brought down by unauthenticated msg """
key = self.factory.create_random_key(self)
def test_mismatch_auth(self):
""" session is not brought down by unauthenticated msg """
key = self.factory.create_random_key(self)
def test_mismatch_bfd_key_id(self):
""" session is not brought down by msg with non-existent key-id """
key = self.factory.create_random_key(self)
def test_mismatch_bfd_key_id(self):
""" session is not brought down by msg with non-existent key-id """
key = self.factory.create_random_key(self)
def test_mismatched_auth_type(self):
""" session is not brought down by msg with wrong auth type """
key = self.factory.create_random_key(self)
def test_mismatched_auth_type(self):
""" session is not brought down by msg with wrong auth type """
key = self.factory.create_random_key(self)
vpp_session, legitimate_test_session, rogue_test_session,
{'auth_type': BFDAuthType.keyed_md5})
vpp_session, legitimate_test_session, rogue_test_session,
{'auth_type': BFDAuthType.keyed_md5})
class BFDAuthOnOffTestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (changing auth) """
class BFDAuthOnOffTestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (changing auth) """