import binascii
import hashlib
import ipaddress
+import reprlib
import time
import unittest
from random import randint, shuffle, getrandbits
from socket import AF_INET, AF_INET6, inet_ntop
from struct import pack, unpack
-from six import moves
import scapy.compat
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
BFDDiagCode, BFDState, BFD_vpp_echo
+from framework import tag_fixme_vpp_workers
from framework import VppTestCase, VppTestRunner, running_extended_tests
+from framework import tag_run_solo
from util import ppp
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath
session.add_vpp_config()
def test_shared_sha1_key(self):
- """ share single SHA1 key between multiple BFD sessions """
+ """ single SHA1 key shared by multiple BFD sessions """
key = self.factory.create_random_key(self)
key.add_vpp_config()
sessions = [
test.test_session.inc_seq_num()
test.test_session.send_packet()
test.logger.info("BFD: Waiting for event")
- e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(test, e, expected_state=BFDState.up)
test.logger.info("BFD: Session is Up")
test.test_session.update(state=BFDState.up)
test.test_session.inc_seq_num()
test.test_session.send_packet()
test.logger.info("BFD: Waiting for event")
- e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(test, e, expected_state=BFDState.down)
test.logger.info("BFD: Session is Down")
test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
def verify_event(test, event, expected_state):
""" Verify correctness of event values. """
e = event
- test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
+ test.logger.debug("BFD: Event: %s" % reprlib.repr(e))
test.assert_equal(e.sw_if_index,
test.vpp_session.interface.sw_if_index,
"BFD interface index")
return p
+@tag_run_solo
class BFD4TestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD)"""
vpp_session = None
test_session = None
- @classmethod
- def force_solo(cls):
- return True
-
@classmethod
def setUpClass(cls):
super(BFD4TestCase, cls).setUpClass()
self.test_session.update(your_discriminator=p[BFD].my_discriminator,
state=BFDState.up)
self.logger.info("BFD: Waiting for event")
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.init)
self.logger.info("BFD: Sending Up")
self.test_session.send_packet()
self.logger.info("BFD: Waiting for event")
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.up)
self.logger.info("BFD: Session is Up")
self.test_session.update(state=BFDState.up)
detection_time = self.test_session.detect_mult *\
self.vpp_session.required_min_rx / USEC_IN_SEC
self.sleep(detection_time, "waiting for BFD session time-out")
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.down)
def test_peer_discr_reset_sess_down(self):
self.vpp_session.required_min_rx) / USEC_IN_SEC
self.test_session.send_packet(final)
time_mark = time.time()
- e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.down)
time_to_event = time.time() - time_mark
self.assert_in_range(time_to_event, .9 * timeout,
self.vpp_session.required_min_rx / USEC_IN_SEC
before = time.time()
e = self.vapi.wait_for_event(
- 2 * detection_time, "bfd_udp_session_details")
+ 2 * detection_time, "bfd_udp_session_event")
after = time.time()
self.assert_in_range(after - before,
0.9 * detection_time,
bfd_session_up(self)
self.vpp_session.admin_down()
self.pg0.enable_capture()
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.admin_down)
for dummy in range(2):
p = wait_for_bfd_packet(self)
self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
self.vpp_session.admin_up()
self.test_session.update(state=BFDState.down)
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.down)
p = wait_for_bfd_packet(
self, pcap_time_min=time.time() - self.vpp_clock_offset)
p = wait_for_bfd_packet(
self, pcap_time_min=time.time() - self.vpp_clock_offset)
self.assert_equal(p[BFD].state, BFDState.init, BFDState)
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.init)
self.test_session.update(state=BFDState.up)
self.test_session.send_packet()
p = wait_for_bfd_packet(
self, pcap_time_min=time.time() - self.vpp_clock_offset)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.up)
def test_config_change_remote_demand(self):
vpp_session.add_vpp_config()
vpp_session.admin_up()
intf.remove_vpp_config()
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
self.assertFalse(vpp_session.query_vpp_config())
+@tag_run_solo
+@tag_fixme_vpp_workers
class BFD6TestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (IPv6) """
vpp_session = None
test_session = None
- @classmethod
- def force_solo(cls):
- return True
-
@classmethod
def setUpClass(cls):
super(BFD6TestCase, cls).setUpClass()
self.test_session.update(your_discriminator=p[BFD].my_discriminator,
state=BFDState.up)
self.logger.info("BFD: Waiting for event")
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.init)
self.logger.info("BFD: Sending Up")
self.test_session.send_packet()
self.logger.info("BFD: Waiting for event")
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.up)
self.logger.info("BFD: Session is Up")
self.test_session.update(state=BFDState.up)
vpp_session.add_vpp_config()
vpp_session.admin_up()
intf.remove_vpp_config()
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
self.assertFalse(vpp_session.query_vpp_config())
+@tag_run_solo
class BFDFIBTestCase(VppTestCase):
""" BFD-FIB interactions (IPv6) """
vpp_session = None
test_session = None
- @classmethod
- def force_solo(cls):
- return True
-
@classmethod
def setUpClass(cls):
super(BFDFIBTestCase, cls).setUpClass()
bfd_session_down(self)
+@tag_run_solo
class BFDSHA1TestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
vpp_session = None
test_session = None
- @classmethod
- def force_solo(cls):
- return True
-
@classmethod
def setUpClass(cls):
super(BFDSHA1TestCase, cls).setUpClass()
bfd_session_up(self)
+@tag_run_solo
class BFDAuthOnOffTestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (changing auth) """
vpp_session = None
test_session = None
- @classmethod
- def force_solo(cls):
- return True
-
@classmethod
def setUpClass(cls):
super(BFDAuthOnOffTestCase, cls).setUpClass()
"number of bfd events")
+@tag_run_solo
class BFDCLITestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (CLI) """
pg0 = None
- @classmethod
- def force_solo(cls):
- return True
-
@classmethod
def setUpClass(cls):
super(BFDCLITestCase, cls).setUpClass()