import binascii
import hashlib
+import ipaddress
+import reprlib
import time
import unittest
from random import randint, shuffle, getrandbits
from socket import AF_INET, AF_INET6, inet_ntop
from struct import pack, unpack
-from six import moves
import scapy.compat
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
BFDDiagCode, BFDState, BFD_vpp_echo
+from framework import tag_fixme_vpp_workers
from framework import VppTestCase, VppTestRunner, running_extended_tests
+from framework import tag_run_solo
from util import ppp
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath
session.add_vpp_config()
def test_shared_sha1_key(self):
- """ share single SHA1 key between multiple BFD sessions """
+ """ single SHA1 key shared by multiple BFD sessions """
key = self.factory.create_random_key(self)
key.add_vpp_config()
sessions = [
self.assertFalse(echo_source.have_usable_ip6)
self.loopback0.config_ip4()
- unpacked = unpack("!L", self.loopback0.local_ip4n)
- echo_ip4 = pack("!L", unpacked[0] ^ 1)
+ echo_ip4 = ipaddress.IPv4Address(int(ipaddress.IPv4Address(
+ self.loopback0.local_ip4)) ^ 1).packed
echo_source = self.vapi.bfd_udp_get_echo_source()
self.assertTrue(echo_source.is_set)
self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
self.assertFalse(echo_source.have_usable_ip6)
self.loopback0.config_ip6()
- unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
- echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
- unpacked[3] ^ 1)
+ echo_ip6 = ipaddress.IPv6Address(int(ipaddress.IPv6Address(
+ self.loopback0.local_ip6)) ^ 1).packed
+
echo_source = self.vapi.bfd_udp_get_echo_source()
self.assertTrue(echo_source.is_set)
self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
test.test_session.inc_seq_num()
test.test_session.send_packet()
test.logger.info("BFD: Waiting for event")
- e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(test, e, expected_state=BFDState.up)
test.logger.info("BFD: Session is Up")
test.test_session.update(state=BFDState.up)
test.test_session.inc_seq_num()
test.test_session.send_packet()
test.logger.info("BFD: Waiting for event")
- e = test.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = test.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(test, e, expected_state=BFDState.down)
test.logger.info("BFD: Session is Down")
test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
def verify_event(test, event, expected_state):
""" Verify correctness of event values. """
e = event
- test.logger.debug("BFD: Event: %s" % moves.reprlib.repr(e))
+ test.logger.debug("BFD: Event: %s" % reprlib.repr(e))
test.assert_equal(e.sw_if_index,
test.vpp_session.interface.sw_if_index,
"BFD interface index")
return p
+@tag_run_solo
class BFD4TestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD)"""
self.test_session.update(your_discriminator=p[BFD].my_discriminator,
state=BFDState.up)
self.logger.info("BFD: Waiting for event")
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.init)
self.logger.info("BFD: Sending Up")
self.test_session.send_packet()
self.logger.info("BFD: Waiting for event")
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.up)
self.logger.info("BFD: Session is Up")
self.test_session.update(state=BFDState.up)
detection_time = self.test_session.detect_mult *\
self.vpp_session.required_min_rx / USEC_IN_SEC
self.sleep(detection_time, "waiting for BFD session time-out")
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.down)
def test_peer_discr_reset_sess_down(self):
self.vpp_session.required_min_rx) / USEC_IN_SEC
self.test_session.send_packet(final)
time_mark = time.time()
- e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(2 * timeout, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.down)
time_to_event = time.time() - time_mark
self.assert_in_range(time_to_event, .9 * timeout,
self.vpp_session.required_min_rx / USEC_IN_SEC
before = time.time()
e = self.vapi.wait_for_event(
- 2 * detection_time, "bfd_udp_session_details")
+ 2 * detection_time, "bfd_udp_session_event")
after = time.time()
self.assert_in_range(after - before,
0.9 * detection_time,
bfd_session_up(self)
self.vpp_session.admin_down()
self.pg0.enable_capture()
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.admin_down)
for dummy in range(2):
p = wait_for_bfd_packet(self)
self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
self.vpp_session.admin_up()
self.test_session.update(state=BFDState.down)
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.down)
p = wait_for_bfd_packet(
self, pcap_time_min=time.time() - self.vpp_clock_offset)
p = wait_for_bfd_packet(
self, pcap_time_min=time.time() - self.vpp_clock_offset)
self.assert_equal(p[BFD].state, BFDState.init, BFDState)
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.init)
self.test_session.update(state=BFDState.up)
self.test_session.send_packet()
p = wait_for_bfd_packet(
self, pcap_time_min=time.time() - self.vpp_clock_offset)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.up)
def test_config_change_remote_demand(self):
vpp_session.add_vpp_config()
vpp_session.admin_up()
intf.remove_vpp_config()
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
self.assertFalse(vpp_session.query_vpp_config())
+@tag_run_solo
+@tag_fixme_vpp_workers
class BFD6TestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (IPv6) """
self.test_session.update(your_discriminator=p[BFD].my_discriminator,
state=BFDState.up)
self.logger.info("BFD: Waiting for event")
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.init)
self.logger.info("BFD: Sending Up")
self.test_session.send_packet()
self.logger.info("BFD: Waiting for event")
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
verify_event(self, e, expected_state=BFDState.up)
self.logger.info("BFD: Session is Up")
self.test_session.update(state=BFDState.up)
vpp_session.add_vpp_config()
vpp_session.admin_up()
intf.remove_vpp_config()
- e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
+ e = self.vapi.wait_for_event(1, "bfd_udp_session_event")
self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
self.assertFalse(vpp_session.query_vpp_config())
+@tag_run_solo
class BFDFIBTestCase(VppTestCase):
""" BFD-FIB interactions (IPv6) """
bfd_session_down(self)
+@tag_run_solo
class BFDSHA1TestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
bfd_session_up(self)
+@tag_run_solo
class BFDAuthOnOffTestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (changing auth) """
"number of bfd events")
+@tag_run_solo
class BFDCLITestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (CLI) """
pg0 = None
"IPv6 address usable as echo source: none" %
self.loopback0.name)
self.loopback0.config_ip4()
- unpacked = unpack("!L", self.loopback0.local_ip4n)
- echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
+ echo_ip4 = str(ipaddress.IPv4Address(int(ipaddress.IPv4Address(
+ self.loopback0.local_ip4)) ^ 1))
self.cli_verify_response("show bfd echo-source",
"UDP echo source is: %s\n"
"IPv4 address usable as echo source: %s\n"
"IPv6 address usable as echo source: none" %
(self.loopback0.name, echo_ip4))
- unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
- echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
- unpacked[2], unpacked[3] ^ 1))
+ echo_ip6 = str(ipaddress.IPv6Address(int(ipaddress.IPv6Address(
+ self.loopback0.local_ip6)) ^ 1))
self.loopback0.config_ip6()
self.cli_verify_response("show bfd echo-source",
"UDP echo source is: %s\n"