import time
import unittest
from random import randint, shuffle, getrandbits
from socket import AF_INET, AF_INET6, inet_ntop
from struct import pack, unpack
import time
import unittest
from random import randint, shuffle, getrandbits
from socket import AF_INET, AF_INET6, inet_ntop
from struct import pack, unpack
import scapy.compat
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
import scapy.compat
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
BFDDiagCode, BFDState, BFD_vpp_echo
from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
BFDDiagCode, BFDState, BFD_vpp_echo
from util import ppp
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath
from util import ppp
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath
- unpacked = unpack("!L", self.loopback0.local_ip4n)
- echo_ip4 = pack("!L", unpacked[0] ^ 1)
+ echo_ip4 = ipaddress.IPv4Address(int(ipaddress.IPv4Address(
+ self.loopback0.local_ip4)) ^ 1).packed
echo_source = self.vapi.bfd_udp_get_echo_source()
self.assertTrue(echo_source.is_set)
self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
echo_source = self.vapi.bfd_udp_get_echo_source()
self.assertTrue(echo_source.is_set)
self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
- unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
- echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
- unpacked[3] ^ 1)
+ echo_ip6 = ipaddress.IPv6Address(int(ipaddress.IPv6Address(
+ self.loopback0.local_ip6)) ^ 1).packed
+
echo_source = self.vapi.bfd_udp_get_echo_source()
self.assertTrue(echo_source.is_set)
self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
echo_source = self.vapi.bfd_udp_get_echo_source()
self.assertTrue(echo_source.is_set)
self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
test.test_session.inc_seq_num()
test.test_session.send_packet()
test.logger.info("BFD: Waiting for event")
test.test_session.inc_seq_num()
test.test_session.send_packet()
test.logger.info("BFD: Waiting for event")
verify_event(test, e, expected_state=BFDState.up)
test.logger.info("BFD: Session is Up")
test.test_session.update(state=BFDState.up)
verify_event(test, e, expected_state=BFDState.up)
test.logger.info("BFD: Session is Up")
test.test_session.update(state=BFDState.up)
test.test_session.inc_seq_num()
test.test_session.send_packet()
test.logger.info("BFD: Waiting for event")
test.test_session.inc_seq_num()
test.test_session.send_packet()
test.logger.info("BFD: Waiting for event")
verify_event(test, e, expected_state=BFDState.down)
test.logger.info("BFD: Session is Down")
test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
verify_event(test, e, expected_state=BFDState.down)
test.logger.info("BFD: Session is Down")
test.assert_equal(test.vpp_session.state, BFDState.down, BFDState)
def verify_event(test, event, expected_state):
""" Verify correctness of event values. """
e = event
def verify_event(test, event, expected_state):
""" Verify correctness of event values. """
e = event
test.assert_equal(e.sw_if_index,
test.vpp_session.interface.sw_if_index,
"BFD interface index")
test.assert_equal(e.sw_if_index,
test.vpp_session.interface.sw_if_index,
"BFD interface index")
self.test_session.update(your_discriminator=p[BFD].my_discriminator,
state=BFDState.up)
self.logger.info("BFD: Waiting for event")
self.test_session.update(your_discriminator=p[BFD].my_discriminator,
state=BFDState.up)
self.logger.info("BFD: Waiting for event")
verify_event(self, e, expected_state=BFDState.init)
self.logger.info("BFD: Sending Up")
self.test_session.send_packet()
self.logger.info("BFD: Waiting for event")
verify_event(self, e, expected_state=BFDState.init)
self.logger.info("BFD: Sending Up")
self.test_session.send_packet()
self.logger.info("BFD: Waiting for event")
verify_event(self, e, expected_state=BFDState.up)
self.logger.info("BFD: Session is Up")
self.test_session.update(state=BFDState.up)
verify_event(self, e, expected_state=BFDState.up)
self.logger.info("BFD: Session is Up")
self.test_session.update(state=BFDState.up)
detection_time = self.test_session.detect_mult *\
self.vpp_session.required_min_rx / USEC_IN_SEC
self.sleep(detection_time, "waiting for BFD session time-out")
detection_time = self.test_session.detect_mult *\
self.vpp_session.required_min_rx / USEC_IN_SEC
self.sleep(detection_time, "waiting for BFD session time-out")
verify_event(self, e, expected_state=BFDState.down)
def test_peer_discr_reset_sess_down(self):
verify_event(self, e, expected_state=BFDState.down)
def test_peer_discr_reset_sess_down(self):
verify_event(self, e, expected_state=BFDState.down)
time_to_event = time.time() - time_mark
self.assert_in_range(time_to_event, .9 * timeout,
verify_event(self, e, expected_state=BFDState.down)
time_to_event = time.time() - time_mark
self.assert_in_range(time_to_event, .9 * timeout,
verify_event(self, e, expected_state=BFDState.admin_down)
for dummy in range(2):
p = wait_for_bfd_packet(self)
verify_event(self, e, expected_state=BFDState.admin_down)
for dummy in range(2):
p = wait_for_bfd_packet(self)
self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
self.vpp_session.admin_up()
self.test_session.update(state=BFDState.down)
self.assert_equal(p[BFD].state, BFDState.admin_down, BFDState)
self.vpp_session.admin_up()
self.test_session.update(state=BFDState.down)
verify_event(self, e, expected_state=BFDState.down)
p = wait_for_bfd_packet(
self, pcap_time_min=time.time() - self.vpp_clock_offset)
verify_event(self, e, expected_state=BFDState.down)
p = wait_for_bfd_packet(
self, pcap_time_min=time.time() - self.vpp_clock_offset)
p = wait_for_bfd_packet(
self, pcap_time_min=time.time() - self.vpp_clock_offset)
self.assert_equal(p[BFD].state, BFDState.init, BFDState)
p = wait_for_bfd_packet(
self, pcap_time_min=time.time() - self.vpp_clock_offset)
self.assert_equal(p[BFD].state, BFDState.init, BFDState)
verify_event(self, e, expected_state=BFDState.init)
self.test_session.update(state=BFDState.up)
self.test_session.send_packet()
p = wait_for_bfd_packet(
self, pcap_time_min=time.time() - self.vpp_clock_offset)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
verify_event(self, e, expected_state=BFDState.init)
self.test_session.update(state=BFDState.up)
self.test_session.send_packet()
p = wait_for_bfd_packet(
self, pcap_time_min=time.time() - self.vpp_clock_offset)
self.assert_equal(p[BFD].state, BFDState.up, BFDState)
verify_event(self, e, expected_state=BFDState.up)
def test_config_change_remote_demand(self):
verify_event(self, e, expected_state=BFDState.up)
def test_config_change_remote_demand(self):
self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
self.assertFalse(vpp_session.query_vpp_config())
self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
self.assertFalse(vpp_session.query_vpp_config())
self.test_session.update(your_discriminator=p[BFD].my_discriminator,
state=BFDState.up)
self.logger.info("BFD: Waiting for event")
self.test_session.update(your_discriminator=p[BFD].my_discriminator,
state=BFDState.up)
self.logger.info("BFD: Waiting for event")
verify_event(self, e, expected_state=BFDState.init)
self.logger.info("BFD: Sending Up")
self.test_session.send_packet()
self.logger.info("BFD: Waiting for event")
verify_event(self, e, expected_state=BFDState.init)
self.logger.info("BFD: Sending Up")
self.test_session.send_packet()
self.logger.info("BFD: Waiting for event")
verify_event(self, e, expected_state=BFDState.up)
self.logger.info("BFD: Session is Up")
self.test_session.update(state=BFDState.up)
verify_event(self, e, expected_state=BFDState.up)
self.logger.info("BFD: Session is Up")
self.test_session.update(state=BFDState.up)
self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
self.assertFalse(vpp_session.query_vpp_config())
self.assert_equal(e.sw_if_index, sw_if_index, "sw_if_index")
self.assertFalse(vpp_session.query_vpp_config())
class BFDAuthOnOffTestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (changing auth) """
class BFDAuthOnOffTestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (changing auth) """
- unpacked = unpack("!L", self.loopback0.local_ip4n)
- echo_ip4 = inet_ntop(AF_INET, pack("!L", unpacked[0] ^ 1))
+ echo_ip4 = str(ipaddress.IPv4Address(int(ipaddress.IPv4Address(
+ self.loopback0.local_ip4)) ^ 1))
self.cli_verify_response("show bfd echo-source",
"UDP echo source is: %s\n"
"IPv4 address usable as echo source: %s\n"
"IPv6 address usable as echo source: none" %
(self.loopback0.name, echo_ip4))
self.cli_verify_response("show bfd echo-source",
"UDP echo source is: %s\n"
"IPv4 address usable as echo source: %s\n"
"IPv6 address usable as echo source: none" %
(self.loopback0.name, echo_ip4))
- unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
- echo_ip6 = inet_ntop(AF_INET6, pack("!LLLL", unpacked[0], unpacked[1],
- unpacked[2], unpacked[3] ^ 1))
+ echo_ip6 = str(ipaddress.IPv6Address(int(ipaddress.IPv6Address(
+ self.loopback0.local_ip6)) ^ 1))
self.loopback0.config_ip6()
self.cli_verify_response("show bfd echo-source",
"UDP echo source is: %s\n"
self.loopback0.config_ip6()
self.cli_verify_response("show bfd echo-source",
"UDP echo source is: %s\n"