""" BFD tests """
from __future__ import division
-import unittest
-import hashlib
+
import binascii
+import hashlib
import time
-from struct import pack, unpack
+import unittest
from random import randint, shuffle, getrandbits
from socket import AF_INET, AF_INET6, inet_ntop
-from scapy.packet import Raw
-from scapy.layers.l2 import Ether
+from struct import pack, unpack
+
+import six
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+
from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
BFDDiagCode, BFDState, BFD_vpp_echo
from framework import VppTestCase, VppTestRunner, running_extended_tests
-from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
-from vpp_lo_interface import VppLoInterface
from util import ppp
+from vpp_ip import DpoProto
+from vpp_ip_route import VppIpRoute, VppRoutePath
+from vpp_lo_interface import VppLoInterface
from vpp_papi_provider import UnexpectedApiReturnValueError
-from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
+from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
USEC_IN_SEC = 1000000
session = VppBFDUDPSession(self, self.pg0, self.pg0.remote_ip4)
session.add_vpp_config()
- with self.vapi.expect_negative_api_retval():
+ with self.vapi.assert_negative_api_retval():
session.add_vpp_config()
session.remove_vpp_config()
session.add_vpp_config()
session.activate_auth(key2)
+ def test_set_del_udp_echo_source(self):
+ """ set/del udp echo source """
+ self.create_loopback_interfaces(1)
+ self.loopback0 = self.lo_interfaces[0]
+ self.loopback0.admin_up()
+ echo_source = self.vapi.bfd_udp_get_echo_source()
+ self.assertFalse(echo_source.is_set)
+ self.assertFalse(echo_source.have_usable_ip4)
+ self.assertFalse(echo_source.have_usable_ip6)
+
+ self.vapi.bfd_udp_set_echo_source(self.loopback0.sw_if_index)
+ echo_source = self.vapi.bfd_udp_get_echo_source()
+ self.assertTrue(echo_source.is_set)
+ self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
+ self.assertFalse(echo_source.have_usable_ip4)
+ self.assertFalse(echo_source.have_usable_ip6)
+
+ self.loopback0.config_ip4()
+ unpacked = unpack("!L", self.loopback0.local_ip4n)
+ echo_ip4 = pack("!L", unpacked[0] ^ 1)
+ echo_source = self.vapi.bfd_udp_get_echo_source()
+ self.assertTrue(echo_source.is_set)
+ self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
+ self.assertTrue(echo_source.have_usable_ip4)
+ self.assertEqual(echo_source.ip4_addr, echo_ip4)
+ self.assertFalse(echo_source.have_usable_ip6)
+
+ self.loopback0.config_ip6()
+ unpacked = unpack("!LLLL", self.loopback0.local_ip6n)
+ echo_ip6 = pack("!LLLL", unpacked[0], unpacked[1], unpacked[2],
+ unpacked[3] ^ 1)
+ echo_source = self.vapi.bfd_udp_get_echo_source()
+ self.assertTrue(echo_source.is_set)
+ self.assertEqual(echo_source.sw_if_index, self.loopback0.sw_if_index)
+ self.assertTrue(echo_source.have_usable_ip4)
+ self.assertEqual(echo_source.ip4_addr, echo_ip4)
+ self.assertTrue(echo_source.have_usable_ip6)
+ self.assertEqual(echo_source.ip6_addr, echo_ip6)
+
+ self.vapi.bfd_udp_del_echo_source()
+ echo_source = self.vapi.bfd_udp_get_echo_source()
+ self.assertFalse(echo_source.is_set)
+ self.assertFalse(echo_source.have_usable_ip4)
+ self.assertFalse(echo_source.have_usable_ip6)
+
@unittest.skipUnless(running_extended_tests(), "part of extended tests")
class BFDTestSession(object):
def verify_event(test, event, expected_state):
""" Verify correctness of event values. """
e = event
- test.logger.debug("BFD: Event: %s" % repr(e))
+ test.logger.debug("BFD: Event: %s" % six.reprlib(e))
test.assert_equal(e.sw_if_index,
test.vpp_session.interface.sw_if_index,
"BFD interface index")
def test_intf_deleted(self):
""" interface with bfd session deleted """
- intf = VppLoInterface(self, 0)
+ intf = VppLoInterface(self)
intf.config_ip4()
intf.admin_up()
sw_if_index = intf.sw_if_index
def test_intf_deleted(self):
""" interface with bfd session deleted """
- intf = VppLoInterface(self, 0)
+ intf = VppLoInterface(self)
intf.config_ip6()
intf.admin_up()
sw_if_index = intf.sw_if_index