Code Review
/
vpp.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
VPP-1508: Use scapy.compat to manage packet level library differences.
[vpp.git]
/
test
/
test_bfd.py
diff --git
a/test/test_bfd.py
b/test/test_bfd.py
index
7b47422
..
4c3f535
100644
(file)
--- a/
test/test_bfd.py
+++ b/
test/test_bfd.py
@@
-12,6
+12,7
@@
from socket import AF_INET, AF_INET6, inet_ntop
from struct import pack, unpack
from six import moves
from struct import pack, unpack
from six import moves
+import scapy.compat
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
@@
-42,7
+43,8
@@
class AuthKeyFactory(object):
while conf_key_id in self._conf_key_ids:
conf_key_id = randint(0, 0xFFFFFFFF)
self._conf_key_ids[conf_key_id] = 1
while conf_key_id in self._conf_key_ids:
conf_key_id = randint(0, 0xFFFFFFFF)
self._conf_key_ids[conf_key_id] = 1
- key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
+ key = scapy.compat.raw(
+ bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
return VppBFDAuthKey(test=test, auth_type=auth_type,
conf_key_id=conf_key_id, key=key)
return VppBFDAuthKey(test=test, auth_type=auth_type,
conf_key_id=conf_key_id, key=key)
@@
-435,7
+437,8
@@
class BFDTestSession(object):
self.test.logger.debug("BFD: Creating packet")
self.fill_packet_fields(packet)
if self.sha1_key:
self.test.logger.debug("BFD: Creating packet")
self.fill_packet_fields(packet)
if self.sha1_key:
- hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
+ hash_material = scapy.compat.raw(
+ packet[BFD])[:32] + self.sha1_key.key + \
"\0" * (20 - len(self.sha1_key.key))
self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
hashlib.sha1(hash_material).hexdigest())
"\0" * (20 - len(self.sha1_key.key))
self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
hashlib.sha1(hash_material).hexdigest())
@@
-492,7
+495,7
@@
class BFDTestSession(object):
# last 20 bytes represent the hash - so replace them with the key,
# pad the result with zeros and hash the result
hash_material = bfd.original[:-20] + self.sha1_key.key + \
# last 20 bytes represent the hash - so replace them with the key,
# pad the result with zeros and hash the result
hash_material = bfd.original[:-20] + self.sha1_key.key + \
- "\0" * (20 - len(self.sha1_key.key))
+
b
"\0" * (20 - len(self.sha1_key.key))
expected_hash = hashlib.sha1(hash_material).hexdigest()
self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
expected_hash, "Auth key hash")
expected_hash = hashlib.sha1(hash_material).hexdigest()
self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
expected_hash, "Auth key hash")
@@
-923,7
+926,7
@@
class BFD4TestCase(VppTestCase):
# halve required min rx
old_required_min_rx = self.vpp_session.required_min_rx
self.vpp_session.modify_parameters(
# halve required min rx
old_required_min_rx = self.vpp_session.required_min_rx
self.vpp_session.modify_parameters(
- required_min_rx=
0.5 * self.vpp_session.required_min_rx
)
+ required_min_rx=
self.vpp_session.required_min_rx // 2
)
# now we wait 0.8*3*old-req-min-rx and the session should still be up
self.sleep(0.8 * self.vpp_session.detect_mult *
old_required_min_rx / USEC_IN_SEC,
# now we wait 0.8*3*old-req-min-rx and the session should still be up
self.sleep(0.8 * self.vpp_session.detect_mult *
old_required_min_rx / USEC_IN_SEC,
@@
-1118,8
+1121,8
@@
class BFD4TestCase(VppTestCase):
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
- self.assertEqual(s
tr
(p[UDP].payload),
- s
tr
(echo_packet[UDP].payload),
+ self.assertEqual(s
capy.compat.raw
(p[UDP].payload),
+ s
capy.compat.raw
(echo_packet[UDP].payload),
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
@@
-1612,8
+1615,8
@@
class BFD6TestCase(VppTestCase):
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
- self.assertEqual(s
tr
(p[UDP].payload),
- s
tr
(echo_packet[UDP].payload),
+ self.assertEqual(s
capy.compat.raw
(p[UDP].payload),
+ s
capy.compat.raw
(echo_packet[UDP].payload),
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
@@
-2293,7
+2296,7
@@
class BFDCLITestCase(VppTestCase):
def cli_verify_no_response(self, cli):
""" execute a CLI, asserting that the response is empty """
self.assert_equal(self.vapi.cli(cli),
def cli_verify_no_response(self, cli):
""" execute a CLI, asserting that the response is empty """
self.assert_equal(self.vapi.cli(cli),
- "",
+
b
"",
"CLI command response")
def cli_verify_response(self, cli, expected):
"CLI command response")
def cli_verify_response(self, cli, expected):
@@
-2325,7
+2328,7
@@
class BFDCLITestCase(VppTestCase):
self.cli_verify_no_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
self.cli_verify_no_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(
ord
(c)) for c in k.key)))
+ "".join("{:02x}".format(
scapy.compat.orb
(c)) for c in k.key)))
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(
self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(
self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
@@
-2342,7
+2345,7
@@
class BFDCLITestCase(VppTestCase):
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(
ord
(c)) for c in k2.key)),
+ "".join("{:02x}".format(
scapy.compat.orb
(c)) for c in k2.key)),
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work
@@
-2361,7
+2364,7
@@
class BFDCLITestCase(VppTestCase):
self.cli_verify_no_response(
"bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
(k.conf_key_id,
self.cli_verify_no_response(
"bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(
ord
(c)) for c in k.key)))
+ "".join("{:02x}".format(
scapy.compat.orb
(c)) for c in k.key)))
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(self, self.pg0,
self.pg0.remote_ip6, af=AF_INET6,
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(self, self.pg0,
self.pg0.remote_ip6, af=AF_INET6,
@@
-2380,7
+2383,7
@@
class BFDCLITestCase(VppTestCase):
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(
ord
(c)) for c in k2.key)),
+ "".join("{:02x}".format(
scapy.compat.orb
(c)) for c in k2.key)),
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work