Code Review
/
vpp.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
API: Cleanup APIs interface.api
[vpp.git]
/
test
/
test_bfd.py
diff --git
a/test/test_bfd.py
b/test/test_bfd.py
index
635e7e1
..
4c3f535
100644
(file)
--- a/
test/test_bfd.py
+++ b/
test/test_bfd.py
@@
-12,6
+12,7
@@
from socket import AF_INET, AF_INET6, inet_ntop
from struct import pack, unpack
from six import moves
from struct import pack, unpack
from six import moves
+import scapy.compat
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
@@
-42,12
+43,13
@@
class AuthKeyFactory(object):
while conf_key_id in self._conf_key_ids:
conf_key_id = randint(0, 0xFFFFFFFF)
self._conf_key_ids[conf_key_id] = 1
while conf_key_id in self._conf_key_ids:
conf_key_id = randint(0, 0xFFFFFFFF)
self._conf_key_ids[conf_key_id] = 1
- key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
+ key = scapy.compat.raw(
+ bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
return VppBFDAuthKey(test=test, auth_type=auth_type,
conf_key_id=conf_key_id, key=key)
return VppBFDAuthKey(test=test, auth_type=auth_type,
conf_key_id=conf_key_id, key=key)
-@unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
class BFDAPITestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) - API"""
class BFDAPITestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) - API"""
@@
-69,6
+71,10
@@
class BFDAPITestCase(VppTestCase):
super(BFDAPITestCase, cls).tearDownClass()
raise
super(BFDAPITestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDAPITestCase, cls).tearDownClass()
+
def setUp(self):
super(BFDAPITestCase, self).setUp()
self.factory = AuthKeyFactory()
def setUp(self):
super(BFDAPITestCase, self).setUp()
self.factory = AuthKeyFactory()
@@
-302,7
+308,6
@@
class BFDAPITestCase(VppTestCase):
self.assertFalse(echo_source.have_usable_ip6)
self.assertFalse(echo_source.have_usable_ip6)
-@unittest.skipUnless(running_extended_tests(), "part of extended tests")
class BFDTestSession(object):
""" BFD session as seen from test framework side """
class BFDTestSession(object):
""" BFD session as seen from test framework side """
@@
-432,7
+437,8
@@
class BFDTestSession(object):
self.test.logger.debug("BFD: Creating packet")
self.fill_packet_fields(packet)
if self.sha1_key:
self.test.logger.debug("BFD: Creating packet")
self.fill_packet_fields(packet)
if self.sha1_key:
- hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
+ hash_material = scapy.compat.raw(
+ packet[BFD])[:32] + self.sha1_key.key + \
"\0" * (20 - len(self.sha1_key.key))
self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
hashlib.sha1(hash_material).hexdigest())
"\0" * (20 - len(self.sha1_key.key))
self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
hashlib.sha1(hash_material).hexdigest())
@@
-489,7
+495,7
@@
class BFDTestSession(object):
# last 20 bytes represent the hash - so replace them with the key,
# pad the result with zeros and hash the result
hash_material = bfd.original[:-20] + self.sha1_key.key + \
# last 20 bytes represent the hash - so replace them with the key,
# pad the result with zeros and hash the result
hash_material = bfd.original[:-20] + self.sha1_key.key + \
- "\0" * (20 - len(self.sha1_key.key))
+
b
"\0" * (20 - len(self.sha1_key.key))
expected_hash = hashlib.sha1(hash_material).hexdigest()
self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
expected_hash, "Auth key hash")
expected_hash = hashlib.sha1(hash_material).hexdigest()
self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
expected_hash, "Auth key hash")
@@
-664,7
+670,7
@@
def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
return p
return p
-@unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
class BFD4TestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD)"""
class BFD4TestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD)"""
@@
-692,6
+698,10
@@
class BFD4TestCase(VppTestCase):
super(BFD4TestCase, cls).tearDownClass()
raise
super(BFD4TestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFD4TestCase, cls).tearDownClass()
+
def setUp(self):
super(BFD4TestCase, self).setUp()
self.factory = AuthKeyFactory()
def setUp(self):
super(BFD4TestCase, self).setUp()
self.factory = AuthKeyFactory()
@@
-748,7
+758,7
@@
class BFD4TestCase(VppTestCase):
bfd_session_up(self)
bfd_session_down(self)
bfd_session_up(self)
bfd_session_down(self)
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_hold_up(self):
""" hold BFD session up """
bfd_session_up(self)
def test_hold_up(self):
""" hold BFD session up """
bfd_session_up(self)
@@
-758,7
+768,7
@@
class BFD4TestCase(VppTestCase):
self.assert_equal(len(self.vapi.collect_events()), 0,
"number of bfd events")
self.assert_equal(len(self.vapi.collect_events()), 0,
"number of bfd events")
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_slow_timer(self):
""" verify slow periodic control frames while session down """
packet_count = 3
def test_slow_timer(self):
""" verify slow periodic control frames while session down """
packet_count = 3
@@
-773,7
+783,7
@@
class BFD4TestCase(VppTestCase):
time_diff, 0.70, 1.05, "time between slow packets")
prev_packet = next_packet
time_diff, 0.70, 1.05, "time between slow packets")
prev_packet = next_packet
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_zero_remote_min_rx(self):
""" no packets when zero remote required min rx interval """
bfd_session_up(self)
def test_zero_remote_min_rx(self):
""" no packets when zero remote required min rx interval """
bfd_session_up(self)
@@
-798,7
+808,7
@@
class BFD4TestCase(VppTestCase):
self.assert_equal(
len(self.vapi.collect_events()), 0, "number of bfd events")
self.assert_equal(
len(self.vapi.collect_events()), 0, "number of bfd events")
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_conn_down(self):
""" verify session goes down after inactivity """
bfd_session_up(self)
def test_conn_down(self):
""" verify session goes down after inactivity """
bfd_session_up(self)
@@
-808,7
+818,7
@@
class BFD4TestCase(VppTestCase):
e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
verify_event(self, e, expected_state=BFDState.down)
e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
verify_event(self, e, expected_state=BFDState.down)
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_large_required_min_rx(self):
""" large remote required min rx interval """
bfd_session_up(self)
def test_large_required_min_rx(self):
""" large remote required min rx interval """
bfd_session_up(self)
@@
-838,7
+848,7
@@
class BFD4TestCase(VppTestCase):
break
self.assert_equal(count, 0, "number of packets received")
break
self.assert_equal(count, 0, "number of packets received")
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_immediate_remote_min_rx_reduction(self):
""" immediately honor remote required min rx reduction """
self.vpp_session.remove_vpp_config()
def test_immediate_remote_min_rx_reduction(self):
""" immediately honor remote required min rx reduction """
self.vpp_session.remove_vpp_config()
@@
-871,7
+881,7
@@
class BFD4TestCase(VppTestCase):
"time between BFD packets")
reference_packet = p
"time between BFD packets")
reference_packet = p
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_modify_req_min_rx_double(self):
""" modify session - double required min rx """
bfd_session_up(self)
def test_modify_req_min_rx_double(self):
""" modify session - double required min rx """
bfd_session_up(self)
@@
-901,7
+911,7
@@
class BFD4TestCase(VppTestCase):
self.assert_in_range(time_to_event, .9 * timeout,
1.1 * timeout, "session timeout")
self.assert_in_range(time_to_event, .9 * timeout,
1.1 * timeout, "session timeout")
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_modify_req_min_rx_halve(self):
""" modify session - halve required min rx """
self.vpp_session.modify_parameters(
def test_modify_req_min_rx_halve(self):
""" modify session - halve required min rx """
self.vpp_session.modify_parameters(
@@
-916,7
+926,7
@@
class BFD4TestCase(VppTestCase):
# halve required min rx
old_required_min_rx = self.vpp_session.required_min_rx
self.vpp_session.modify_parameters(
# halve required min rx
old_required_min_rx = self.vpp_session.required_min_rx
self.vpp_session.modify_parameters(
- required_min_rx=
0.5 * self.vpp_session.required_min_rx
)
+ required_min_rx=
self.vpp_session.required_min_rx // 2
)
# now we wait 0.8*3*old-req-min-rx and the session should still be up
self.sleep(0.8 * self.vpp_session.detect_mult *
old_required_min_rx / USEC_IN_SEC,
# now we wait 0.8*3*old-req-min-rx and the session should still be up
self.sleep(0.8 * self.vpp_session.detect_mult *
old_required_min_rx / USEC_IN_SEC,
@@
-944,7
+954,7
@@
class BFD4TestCase(VppTestCase):
"time before bfd session goes down")
verify_event(self, e, expected_state=BFDState.down)
"time before bfd session goes down")
verify_event(self, e, expected_state=BFDState.down)
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_modify_detect_mult(self):
""" modify detect multiplier """
bfd_session_up(self)
def test_modify_detect_mult(self):
""" modify detect multiplier """
bfd_session_up(self)
@@
-968,7
+978,7
@@
class BFD4TestCase(VppTestCase):
self.assertNotIn("P", p.sprintf("%BFD.flags%"),
"Poll bit not set in BFD packet")
self.assertNotIn("P", p.sprintf("%BFD.flags%"),
"Poll bit not set in BFD packet")
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_queued_poll(self):
""" test poll sequence queueing """
bfd_session_up(self)
def test_queued_poll(self):
""" test poll sequence queueing """
bfd_session_up(self)
@@
-1034,7
+1044,7
@@
class BFD4TestCase(VppTestCase):
self.assertNotIn("P", p.sprintf("%BFD.flags%"),
"Poll bit set in BFD packet")
self.assertNotIn("P", p.sprintf("%BFD.flags%"),
"Poll bit set in BFD packet")
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_poll_response(self):
""" test correct response to control frame with poll bit set """
bfd_session_up(self)
def test_poll_response(self):
""" test correct response to control frame with poll bit set """
bfd_session_up(self)
@@
-1045,7
+1055,7
@@
class BFD4TestCase(VppTestCase):
self, pcap_time_min=time.time() - self.vpp_clock_offset)
self.assertIn("F", final.sprintf("%BFD.flags%"))
self, pcap_time_min=time.time() - self.vpp_clock_offset)
self.assertIn("F", final.sprintf("%BFD.flags%"))
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_no_periodic_if_remote_demand(self):
""" no periodic frames outside poll sequence if remote demand set """
bfd_session_up(self)
def test_no_periodic_if_remote_demand(self):
""" no periodic frames outside poll sequence if remote demand set """
bfd_session_up(self)
@@
-1058,7
+1068,7
@@
class BFD4TestCase(VppTestCase):
/ USEC_IN_SEC
count = 0
for dummy in range(self.test_session.detect_mult * 2):
/ USEC_IN_SEC
count = 0
for dummy in range(self.test_session.detect_mult * 2):
-
time
.sleep(transmit_time)
+
self
.sleep(transmit_time)
self.test_session.send_packet(demand)
try:
p = wait_for_bfd_packet(self, timeout=0)
self.test_session.send_packet(demand)
try:
p = wait_for_bfd_packet(self, timeout=0)
@@
-1111,8
+1121,8
@@
class BFD4TestCase(VppTestCase):
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
- self.assertEqual(s
tr
(p[UDP].payload),
- s
tr
(echo_packet[UDP].payload),
+ self.assertEqual(s
capy.compat.raw
(p[UDP].payload),
+ s
capy.compat.raw
(echo_packet[UDP].payload),
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
@@
-1172,7
+1182,7
@@
class BFD4TestCase(VppTestCase):
self.test_session.send_packet()
self.assertTrue(echo_seen, "No echo packets received")
self.test_session.send_packet()
self.assertTrue(echo_seen, "No echo packets received")
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_echo_fail(self):
""" session goes down if echo function fails """
bfd_session_up(self)
def test_echo_fail(self):
""" session goes down if echo function fails """
bfd_session_up(self)
@@
-1212,7
+1222,7
@@
class BFD4TestCase(VppTestCase):
self.assert_equal(events[0].state, BFDState.down, BFDState)
self.assertTrue(verified_diag, "Incorrect diagnostics code received")
self.assert_equal(events[0].state, BFDState.down, BFDState)
self.assertTrue(verified_diag, "Incorrect diagnostics code received")
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_echo_stop(self):
""" echo function stops if peer sets required min echo rx zero """
bfd_session_up(self)
def test_echo_stop(self):
""" echo function stops if peer sets required min echo rx zero """
bfd_session_up(self)
@@
-1244,7
+1254,7
@@
class BFD4TestCase(VppTestCase):
events = self.vapi.collect_events()
self.assert_equal(len(events), 0, "number of bfd events")
events = self.vapi.collect_events()
self.assert_equal(len(events), 0, "number of bfd events")
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_echo_source_removed(self):
""" echo function stops if echo source is removed """
bfd_session_up(self)
def test_echo_source_removed(self):
""" echo function stops if echo source is removed """
bfd_session_up(self)
@@
-1276,7
+1286,7
@@
class BFD4TestCase(VppTestCase):
events = self.vapi.collect_events()
self.assert_equal(len(events), 0, "number of bfd events")
events = self.vapi.collect_events()
self.assert_equal(len(events), 0, "number of bfd events")
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_stale_echo(self):
""" stale echo packets don't keep a session up """
bfd_session_up(self)
def test_stale_echo(self):
""" stale echo packets don't keep a session up """
bfd_session_up(self)
@@
-1329,7
+1339,7
@@
class BFD4TestCase(VppTestCase):
self.test_session.send_packet()
self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
self.test_session.send_packet()
self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_invalid_echo_checksum(self):
""" echo packets with invalid checksum don't keep a session up """
bfd_session_up(self)
def test_invalid_echo_checksum(self):
""" echo packets with invalid checksum don't keep a session up """
bfd_session_up(self)
@@
-1379,7
+1389,7
@@
class BFD4TestCase(VppTestCase):
self.test_session.send_packet()
self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
self.test_session.send_packet()
self.assertTrue(timeout_ok, "Expected timeout event didn't occur")
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_admin_up_down(self):
""" put session admin-up and admin-down """
bfd_session_up(self)
def test_admin_up_down(self):
""" put session admin-up and admin-down """
bfd_session_up(self)
@@
-1417,7
+1427,7
@@
class BFD4TestCase(VppTestCase):
e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
verify_event(self, e, expected_state=BFDState.up)
e = self.vapi.wait_for_event(1, "bfd_udp_session_details")
verify_event(self, e, expected_state=BFDState.up)
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_config_change_remote_demand(self):
""" configuration change while peer in demand mode """
bfd_session_up(self)
def test_config_change_remote_demand(self):
""" configuration change while peer in demand mode """
bfd_session_up(self)
@@
-1441,7
+1451,7
@@
class BFD4TestCase(VppTestCase):
/ USEC_IN_SEC
count = 0
for dummy in range(self.test_session.detect_mult * 2):
/ USEC_IN_SEC
count = 0
for dummy in range(self.test_session.detect_mult * 2):
-
time
.sleep(transmit_time)
+
self
.sleep(transmit_time)
self.test_session.send_packet(demand)
try:
p = wait_for_bfd_packet(self, timeout=0)
self.test_session.send_packet(demand)
try:
p = wait_for_bfd_packet(self, timeout=0)
@@
-1470,7
+1480,7
@@
class BFD4TestCase(VppTestCase):
self.assertFalse(vpp_session.query_vpp_config())
self.assertFalse(vpp_session.query_vpp_config())
-@unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
class BFD6TestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (IPv6) """
class BFD6TestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (IPv6) """
@@
-1498,6
+1508,10
@@
class BFD6TestCase(VppTestCase):
super(BFD6TestCase, cls).tearDownClass()
raise
super(BFD6TestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFD6TestCase, cls).tearDownClass()
+
def setUp(self):
super(BFD6TestCase, self).setUp()
self.factory = AuthKeyFactory()
def setUp(self):
super(BFD6TestCase, self).setUp()
self.factory = AuthKeyFactory()
@@
-1551,7
+1565,7
@@
class BFD6TestCase(VppTestCase):
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_hold_up(self):
""" hold BFD session up """
bfd_session_up(self)
def test_hold_up(self):
""" hold BFD session up """
bfd_session_up(self)
@@
-1601,8
+1615,8
@@
class BFD6TestCase(VppTestCase):
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
- self.assertEqual(s
tr
(p[UDP].payload),
- s
tr
(echo_packet[UDP].payload),
+ self.assertEqual(s
capy.compat.raw
(p[UDP].payload),
+ s
capy.compat.raw
(echo_packet[UDP].payload),
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
@@
-1680,13
+1694,21
@@
class BFD6TestCase(VppTestCase):
self.assertFalse(vpp_session.query_vpp_config())
self.assertFalse(vpp_session.query_vpp_config())
-@unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
class BFDFIBTestCase(VppTestCase):
""" BFD-FIB interactions (IPv6) """
vpp_session = None
test_session = None
class BFDFIBTestCase(VppTestCase):
""" BFD-FIB interactions (IPv6) """
vpp_session = None
test_session = None
+ @classmethod
+ def setUpClass(cls):
+ super(BFDFIBTestCase, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDFIBTestCase, cls).tearDownClass()
+
def setUp(self):
super(BFDFIBTestCase, self).setUp()
self.create_pg_interfaces(range(1))
def setUp(self):
super(BFDFIBTestCase, self).setUp()
self.create_pg_interfaces(range(1))
@@
-1782,7
+1804,7
@@
class BFDFIBTestCase(VppTestCase):
packet[IPv6].dst)
packet[IPv6].dst)
-@unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
class BFDSHA1TestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
class BFDSHA1TestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (SHA1 auth) """
@@
-1805,6
+1827,10
@@
class BFDSHA1TestCase(VppTestCase):
super(BFDSHA1TestCase, cls).tearDownClass()
raise
super(BFDSHA1TestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDSHA1TestCase, cls).tearDownClass()
+
def setUp(self):
super(BFDSHA1TestCase, self).setUp()
self.factory = AuthKeyFactory()
def setUp(self):
super(BFDSHA1TestCase, self).setUp()
self.factory = AuthKeyFactory()
@@
-1831,7
+1857,7
@@
class BFDSHA1TestCase(VppTestCase):
bfd_key_id=self.vpp_session.bfd_key_id)
bfd_session_up(self)
bfd_key_id=self.vpp_session.bfd_key_id)
bfd_session_up(self)
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_hold_up(self):
""" hold BFD session up """
key = self.factory.create_random_key(self)
def test_hold_up(self):
""" hold BFD session up """
key = self.factory.create_random_key(self)
@@
-1850,7
+1876,7
@@
class BFDSHA1TestCase(VppTestCase):
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_hold_up_meticulous(self):
""" hold BFD session up - meticulous auth """
key = self.factory.create_random_key(
def test_hold_up_meticulous(self):
""" hold BFD session up - meticulous auth """
key = self.factory.create_random_key(
@@
-1872,7
+1898,7
@@
class BFDSHA1TestCase(VppTestCase):
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
self.test_session.send_packet()
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_send_bad_seq_number(self):
""" session is not kept alive by msgs with bad sequence numbers"""
key = self.factory.create_random_key(
def test_send_bad_seq_number(self):
""" session is not kept alive by msgs with bad sequence numbers"""
key = self.factory.create_random_key(
@@
-1935,7
+1961,7
@@
class BFDSHA1TestCase(VppTestCase):
wait_for_bfd_packet(self)
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
wait_for_bfd_packet(self)
self.assert_equal(self.vpp_session.state, BFDState.up, BFDState)
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_mismatch_auth(self):
""" session is not brought down by unauthenticated msg """
key = self.factory.create_random_key(self)
def test_mismatch_auth(self):
""" session is not brought down by unauthenticated msg """
key = self.factory.create_random_key(self)
@@
-1950,7
+1976,7
@@
class BFDSHA1TestCase(VppTestCase):
legitimate_test_session,
rogue_test_session)
legitimate_test_session,
rogue_test_session)
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_mismatch_bfd_key_id(self):
""" session is not brought down by msg with non-existent key-id """
key = self.factory.create_random_key(self)
def test_mismatch_bfd_key_id(self):
""" session is not brought down by msg with non-existent key-id """
key = self.factory.create_random_key(self)
@@
-1970,7
+1996,7
@@
class BFDSHA1TestCase(VppTestCase):
legitimate_test_session,
rogue_test_session)
legitimate_test_session,
rogue_test_session)
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_mismatched_auth_type(self):
""" session is not brought down by msg with wrong auth type """
key = self.factory.create_random_key(self)
def test_mismatched_auth_type(self):
""" session is not brought down by msg with wrong auth type """
key = self.factory.create_random_key(self)
@@
-1987,7
+2013,7
@@
class BFDSHA1TestCase(VppTestCase):
vpp_session, legitimate_test_session, rogue_test_session,
{'auth_type': BFDAuthType.keyed_md5})
vpp_session, legitimate_test_session, rogue_test_session,
{'auth_type': BFDAuthType.keyed_md5})
- @unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+ @unittest.skipUnless(running_extended_tests, "part of extended tests")
def test_restart(self):
""" simulate remote peer restart and resynchronization """
key = self.factory.create_random_key(
def test_restart(self):
""" simulate remote peer restart and resynchronization """
key = self.factory.create_random_key(
@@
-2016,7
+2042,7
@@
class BFDSHA1TestCase(VppTestCase):
bfd_session_up(self)
bfd_session_up(self)
-@unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
class BFDAuthOnOffTestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (changing auth) """
class BFDAuthOnOffTestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (changing auth) """
@@
-2038,6
+2064,10
@@
class BFDAuthOnOffTestCase(VppTestCase):
super(BFDAuthOnOffTestCase, cls).tearDownClass()
raise
super(BFDAuthOnOffTestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDAuthOnOffTestCase, cls).tearDownClass()
+
def setUp(self):
super(BFDAuthOnOffTestCase, self).setUp()
self.factory = AuthKeyFactory()
def setUp(self):
super(BFDAuthOnOffTestCase, self).setUp()
self.factory = AuthKeyFactory()
@@
-2225,7
+2255,7
@@
class BFDAuthOnOffTestCase(VppTestCase):
"number of bfd events")
"number of bfd events")
-@unittest.skipUnless(running_extended_tests
()
, "part of extended tests")
+@unittest.skipUnless(running_extended_tests, "part of extended tests")
class BFDCLITestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (CLI) """
pg0 = None
class BFDCLITestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (CLI) """
pg0 = None
@@
-2245,6
+2275,10
@@
class BFDCLITestCase(VppTestCase):
super(BFDCLITestCase, cls).tearDownClass()
raise
super(BFDCLITestCase, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(BFDCLITestCase, cls).tearDownClass()
+
def setUp(self):
super(BFDCLITestCase, self).setUp()
self.factory = AuthKeyFactory()
def setUp(self):
super(BFDCLITestCase, self).setUp()
self.factory = AuthKeyFactory()
@@
-2262,7
+2296,7
@@
class BFDCLITestCase(VppTestCase):
def cli_verify_no_response(self, cli):
""" execute a CLI, asserting that the response is empty """
self.assert_equal(self.vapi.cli(cli),
def cli_verify_no_response(self, cli):
""" execute a CLI, asserting that the response is empty """
self.assert_equal(self.vapi.cli(cli),
- "",
+
b
"",
"CLI command response")
def cli_verify_response(self, cli, expected):
"CLI command response")
def cli_verify_response(self, cli, expected):
@@
-2294,7
+2328,7
@@
class BFDCLITestCase(VppTestCase):
self.cli_verify_no_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
self.cli_verify_no_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(
ord
(c)) for c in k.key)))
+ "".join("{:02x}".format(
scapy.compat.orb
(c)) for c in k.key)))
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(
self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(
self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
@@
-2311,7
+2345,7
@@
class BFDCLITestCase(VppTestCase):
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(
ord
(c)) for c in k2.key)),
+ "".join("{:02x}".format(
scapy.compat.orb
(c)) for c in k2.key)),
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work
@@
-2330,7
+2364,7
@@
class BFDCLITestCase(VppTestCase):
self.cli_verify_no_response(
"bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
(k.conf_key_id,
self.cli_verify_no_response(
"bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(
ord
(c)) for c in k.key)))
+ "".join("{:02x}".format(
scapy.compat.orb
(c)) for c in k.key)))
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(self, self.pg0,
self.pg0.remote_ip6, af=AF_INET6,
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(self, self.pg0,
self.pg0.remote_ip6, af=AF_INET6,
@@
-2349,7
+2383,7
@@
class BFDCLITestCase(VppTestCase):
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(
ord
(c)) for c in k2.key)),
+ "".join("{:02x}".format(
scapy.compat.orb
(c)) for c in k2.key)),
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work