+ self.vpp_session = vpp_bfd_udp_session
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = legitimate_test_session
+ # bring vpp session up
+ bfd_session_up(self)
+ # send packet from rogue session
+ rogue_test_session.update(
+ my_discriminator=self.test_session.my_discriminator,
+ your_discriminator=self.test_session.your_discriminator,
+ desired_min_tx=self.test_session.desired_min_tx,
+ required_min_rx=self.test_session.required_min_rx,
+ detect_mult=self.test_session.detect_mult,
+ diag=self.test_session.diag,
+ state=self.test_session.state,
+ auth_type=self.test_session.auth_type)
+ if rogue_bfd_values:
+ rogue_test_session.update(**rogue_bfd_values)
+ rogue_test_session.update(state=BFDState.down)
+ rogue_test_session.send_packet()
+ wait_for_bfd_packet(self)
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+
+ def test_mismatch_auth(self):
+ """ session is not brought down by unauthenticated msg """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ vpp_session = VppBFDUDPSession(
+ self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
+ legitimate_test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=vpp_session.bfd_key_id)
+ rogue_test_session = BFDTestSession(self, self.pg0, AF_INET)
+ self.execute_rogue_session_scenario(vpp_session,
+ legitimate_test_session,
+ rogue_test_session)
+
+ def test_mismatch_bfd_key_id(self):
+ """ session is not brought down by msg with non-existent key-id """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ vpp_session = VppBFDUDPSession(
+ self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
+ # pick a different random bfd key id
+ x = randint(0, 255)
+ while x == vpp_session.bfd_key_id:
+ x = randint(0, 255)
+ legitimate_test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=vpp_session.bfd_key_id)
+ rogue_test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key, bfd_key_id=x)
+ self.execute_rogue_session_scenario(vpp_session,
+ legitimate_test_session,
+ rogue_test_session)
+
+ def test_mismatched_auth_type(self):
+ """ session is not brought down by msg with wrong auth type """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ vpp_session = VppBFDUDPSession(
+ self, self.pg0, self.pg0.remote_ip4, sha1_key=key)
+ legitimate_test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=vpp_session.bfd_key_id)
+ rogue_test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=vpp_session.bfd_key_id)
+ self.execute_rogue_session_scenario(
+ vpp_session, legitimate_test_session, rogue_test_session,
+ {'auth_type': BFDAuthType.keyed_md5})
+
+ def test_restart(self):
+ """ simulate remote peer restart and resynchronization """
+ key = self.factory.create_random_key(
+ self, BFDAuthType.meticulous_keyed_sha1)
+ key.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4, sha1_key=key)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=self.vpp_session.bfd_key_id, our_seq_number=0)
+ bfd_session_up(self)
+ # don't send any packets for 2*detection_time
+ detection_time = self.vpp_session.detect_mult *\
+ self.vpp_session.required_min_rx / USEC_IN_SEC
+ self.sleep(detection_time, "simulating peer restart")
+ events = self.vapi.collect_events()
+ self.assert_equal(len(events), 1, "number of bfd events")
+ verify_event(self, events[0], expected_state=BFDState.down)
+ self.test_session.update(state=BFDState.down)
+ # reset sequence number
+ self.test_session.our_seq_number = 0
+ self.test_session.vpp_seq_number = None
+ # now throw away any pending packets
+ self.pg0.enable_capture()
+ bfd_session_up(self)
+
+
+class BFDAuthOnOffTestCase(VppTestCase):
+ """Bidirectional Forwarding Detection (BFD) (changing auth) """
+
+ pg0 = None
+ vpp_session = None
+ test_session = None
+
+ @classmethod
+ def setUpClass(cls):
+ super(BFDAuthOnOffTestCase, cls).setUpClass()
+ try:
+ cls.create_pg_interfaces([0])
+ cls.pg0.config_ip4()
+ cls.pg0.admin_up()
+ cls.pg0.resolve_arp()
+
+ except Exception:
+ super(BFDAuthOnOffTestCase, cls).tearDownClass()
+ raise
+
+ def setUp(self):
+ super(BFDAuthOnOffTestCase, self).setUp()
+ self.factory = AuthKeyFactory()
+ self.vapi.want_bfd_events()
+ self.pg0.enable_capture()
+
+ def tearDown(self):
+ if not self.vpp_dead:
+ self.vapi.want_bfd_events(enable_disable=0)
+ self.vapi.collect_events() # clear the event queue
+ super(BFDAuthOnOffTestCase, self).tearDown()
+
+ def test_auth_on_immediate(self):
+ """ turn auth on without disturbing session state (immediate) """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(self, self.pg0, AF_INET)
+ bfd_session_up(self)
+ for dummy in range(self.test_session.detect_mult * 2):
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(p[BFD].state, BFDState.up, BFDState)
+ self.test_session.send_packet()
+ self.vpp_session.activate_auth(key)
+ self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
+ self.test_session.sha1_key = key
+ for dummy in range(self.test_session.detect_mult * 2):
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(p[BFD].state, BFDState.up, BFDState)
+ self.test_session.send_packet()
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
+
+ def test_auth_off_immediate(self):
+ """ turn auth off without disturbing session state (immediate) """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4, sha1_key=key)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=self.vpp_session.bfd_key_id)
+ bfd_session_up(self)
+ # self.vapi.want_bfd_events(enable_disable=0)
+ for dummy in range(self.test_session.detect_mult * 2):
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(p[BFD].state, BFDState.up, BFDState)
+ self.test_session.inc_seq_num()
+ self.test_session.send_packet()
+ self.vpp_session.deactivate_auth()
+ self.test_session.bfd_key_id = None
+ self.test_session.sha1_key = None
+ for dummy in range(self.test_session.detect_mult * 2):
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(p[BFD].state, BFDState.up, BFDState)
+ self.test_session.inc_seq_num()
+ self.test_session.send_packet()
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
+
+ def test_auth_change_key_immediate(self):
+ """ change auth key without disturbing session state (immediate) """
+ key1 = self.factory.create_random_key(self)
+ key1.add_vpp_config()
+ key2 = self.factory.create_random_key(self)
+ key2.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4, sha1_key=key1)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key1,
+ bfd_key_id=self.vpp_session.bfd_key_id)
+ bfd_session_up(self)
+ for dummy in range(self.test_session.detect_mult * 2):
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(p[BFD].state, BFDState.up, BFDState)
+ self.test_session.send_packet()
+ self.vpp_session.activate_auth(key2)
+ self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
+ self.test_session.sha1_key = key2
+ for dummy in range(self.test_session.detect_mult * 2):
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(p[BFD].state, BFDState.up, BFDState)
+ self.test_session.send_packet()
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
+
+ def test_auth_on_delayed(self):
+ """ turn auth on without disturbing session state (delayed) """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(self, self.pg0, AF_INET)
+ bfd_session_up(self)
+ for dummy in range(self.test_session.detect_mult * 2):
+ wait_for_bfd_packet(self)
+ self.test_session.send_packet()
+ self.vpp_session.activate_auth(key, delayed=True)
+ for dummy in range(self.test_session.detect_mult * 2):
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(p[BFD].state, BFDState.up, BFDState)
+ self.test_session.send_packet()
+ self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
+ self.test_session.sha1_key = key
+ self.test_session.send_packet()
+ for dummy in range(self.test_session.detect_mult * 2):
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(p[BFD].state, BFDState.up, BFDState)
+ self.test_session.send_packet()
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
+
+ def test_auth_off_delayed(self):
+ """ turn auth off without disturbing session state (delayed) """
+ key = self.factory.create_random_key(self)
+ key.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4, sha1_key=key)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key,
+ bfd_key_id=self.vpp_session.bfd_key_id)
+ bfd_session_up(self)
+ for dummy in range(self.test_session.detect_mult * 2):
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(p[BFD].state, BFDState.up, BFDState)
+ self.test_session.send_packet()
+ self.vpp_session.deactivate_auth(delayed=True)
+ for dummy in range(self.test_session.detect_mult * 2):
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(p[BFD].state, BFDState.up, BFDState)
+ self.test_session.send_packet()
+ self.test_session.bfd_key_id = None
+ self.test_session.sha1_key = None
+ self.test_session.send_packet()
+ for dummy in range(self.test_session.detect_mult * 2):
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(p[BFD].state, BFDState.up, BFDState)
+ self.test_session.send_packet()
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")
+
+ def test_auth_change_key_delayed(self):
+ """ change auth key without disturbing session state (delayed) """
+ key1 = self.factory.create_random_key(self)
+ key1.add_vpp_config()
+ key2 = self.factory.create_random_key(self)
+ key2.add_vpp_config()
+ self.vpp_session = VppBFDUDPSession(self, self.pg0,
+ self.pg0.remote_ip4, sha1_key=key1)
+ self.vpp_session.add_vpp_config()
+ self.vpp_session.admin_up()
+ self.test_session = BFDTestSession(
+ self, self.pg0, AF_INET, sha1_key=key1,
+ bfd_key_id=self.vpp_session.bfd_key_id)
+ bfd_session_up(self)
+ for dummy in range(self.test_session.detect_mult * 2):
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(p[BFD].state, BFDState.up, BFDState)
+ self.test_session.send_packet()
+ self.vpp_session.activate_auth(key2, delayed=True)
+ for dummy in range(self.test_session.detect_mult * 2):
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(p[BFD].state, BFDState.up, BFDState)
+ self.test_session.send_packet()
+ self.test_session.bfd_key_id = self.vpp_session.bfd_key_id
+ self.test_session.sha1_key = key2
+ self.test_session.send_packet()
+ for dummy in range(self.test_session.detect_mult * 2):
+ p = wait_for_bfd_packet(self)
+ self.assert_equal(p[BFD].state, BFDState.up, BFDState)
+ self.test_session.send_packet()
+ self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
+ self.assert_equal(len(self.vapi.collect_events()), 0,
+ "number of bfd events")