from struct import pack, unpack
from six import moves
+import scapy.compat
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
while conf_key_id in self._conf_key_ids:
conf_key_id = randint(0, 0xFFFFFFFF)
self._conf_key_ids[conf_key_id] = 1
- key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
+ key = scapy.compat.raw(
+ bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
return VppBFDAuthKey(test=test, auth_type=auth_type,
conf_key_id=conf_key_id, key=key)
super(BFDAPITestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDAPITestCase, cls).tearDownClass()
+
def setUp(self):
super(BFDAPITestCase, self).setUp()
self.factory = AuthKeyFactory()
self.assertFalse(echo_source.have_usable_ip6)
-@unittest.skipUnless(running_extended_tests, "part of extended tests")
class BFDTestSession(object):
""" BFD session as seen from test framework side """
self.test.logger.debug("BFD: Creating packet")
self.fill_packet_fields(packet)
if self.sha1_key:
- hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
+ hash_material = scapy.compat.raw(
+ packet[BFD])[:32] + self.sha1_key.key + \
"\0" * (20 - len(self.sha1_key.key))
self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
hashlib.sha1(hash_material).hexdigest())
# last 20 bytes represent the hash - so replace them with the key,
# pad the result with zeros and hash the result
hash_material = bfd.original[:-20] + self.sha1_key.key + \
- "\0" * (20 - len(self.sha1_key.key))
+ b"\0" * (20 - len(self.sha1_key.key))
expected_hash = hashlib.sha1(hash_material).hexdigest()
self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
expected_hash, "Auth key hash")
super(BFD4TestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFD4TestCase, cls).tearDownClass()
+
def setUp(self):
super(BFD4TestCase, self).setUp()
self.factory = AuthKeyFactory()
# halve required min rx
old_required_min_rx = self.vpp_session.required_min_rx
self.vpp_session.modify_parameters(
- required_min_rx=0.5 * self.vpp_session.required_min_rx)
+ required_min_rx=self.vpp_session.required_min_rx // 2)
# now we wait 0.8*3*old-req-min-rx and the session should still be up
self.sleep(0.8 * self.vpp_session.detect_mult *
old_required_min_rx / USEC_IN_SEC,
/ USEC_IN_SEC
count = 0
for dummy in range(self.test_session.detect_mult * 2):
- time.sleep(transmit_time)
+ self.sleep(transmit_time)
self.test_session.send_packet(demand)
try:
p = wait_for_bfd_packet(self, timeout=0)
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
- self.assertEqual(str(p[UDP].payload),
- str(echo_packet[UDP].payload),
+ self.assertEqual(scapy.compat.raw(p[UDP].payload),
+ scapy.compat.raw(echo_packet[UDP].payload),
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
/ USEC_IN_SEC
count = 0
for dummy in range(self.test_session.detect_mult * 2):
- time.sleep(transmit_time)
+ self.sleep(transmit_time)
self.test_session.send_packet(demand)
try:
p = wait_for_bfd_packet(self, timeout=0)
super(BFD6TestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFD6TestCase, cls).tearDownClass()
+
def setUp(self):
super(BFD6TestCase, self).setUp()
self.factory = AuthKeyFactory()
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
- self.assertEqual(str(p[UDP].payload),
- str(echo_packet[UDP].payload),
+ self.assertEqual(scapy.compat.raw(p[UDP].payload),
+ scapy.compat.raw(echo_packet[UDP].payload),
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
vpp_session = None
test_session = None
+ @classmethod
+ def setUpClass(cls):
+ super(BFDFIBTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDFIBTestCase, cls).tearDownClass()
+
def setUp(self):
super(BFDFIBTestCase, self).setUp()
self.create_pg_interfaces(range(1))
super(BFDSHA1TestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDSHA1TestCase, cls).tearDownClass()
+
def setUp(self):
super(BFDSHA1TestCase, self).setUp()
self.factory = AuthKeyFactory()
super(BFDAuthOnOffTestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDAuthOnOffTestCase, cls).tearDownClass()
+
def setUp(self):
super(BFDAuthOnOffTestCase, self).setUp()
self.factory = AuthKeyFactory()
super(BFDCLITestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDCLITestCase, cls).tearDownClass()
+
def setUp(self):
super(BFDCLITestCase, self).setUp()
self.factory = AuthKeyFactory()
def cli_verify_no_response(self, cli):
""" execute a CLI, asserting that the response is empty """
self.assert_equal(self.vapi.cli(cli),
- "",
+ b"",
"CLI command response")
def cli_verify_response(self, cli, expected):
self.cli_verify_no_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(ord(c)) for c in k.key)))
+ "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(
self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(ord(c)) for c in k2.key)),
+ "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work
self.cli_verify_no_response(
"bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(ord(c)) for c in k.key)))
+ "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(self, self.pg0,
self.pg0.remote_ip6, af=AF_INET6,
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(ord(c)) for c in k2.key)),
+ "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work