From 6e4c6ad92e59045f0babf5af5093cb8402ec37fb Mon Sep 17 00:00:00 2001 From: Paul Vinciguerra Date: Sun, 25 Nov 2018 10:35:29 -0800 Subject: [PATCH] VPP-1508 python3 tests: .encode('hex') Change to binascii.hexlify() for consistent bahavior. Change-Id: Ie430cdd1ffeb6510db4aa037546e42d85992093b Signed-off-by: Paul Vinciguerra --- test/test_acl_plugin_macip.py | 9 +++++---- test/test_classifier.py | 9 ++++----- test/test_classifier_ip6.py | 6 ++++-- test/test_flowprobe.py | 43 ++++++++++++++++++++++++------------------- test/test_ip4.py | 11 +++++------ test/vpp_interface.py | 3 ++- 6 files changed, 44 insertions(+), 37 deletions(-) diff --git a/test/test_acl_plugin_macip.py b/test/test_acl_plugin_macip.py index 224bc31c0b2..951e86af474 100644 --- a/test/test_acl_plugin_macip.py +++ b/test/test_acl_plugin_macip.py @@ -1,6 +1,7 @@ #!/usr/bin/env python """ACL plugin - MACIP tests """ +import binascii import random import re import unittest @@ -185,8 +186,8 @@ class MethodHolder(VppTestCase): rule = "DENY " print " IP6" if r.is_ipv6 else " IP4", \ rule, \ - r.src_mac.encode('hex'), \ - r.src_mac_mask.encode('hex'),\ + binascii.hexlify(r.src_mac), \ + binascii.hexlify(r.src_mac_mask),\ unpack('<16B', r.src_ip_addr), \ r.src_ip_prefix_len return acls @@ -575,8 +576,8 @@ class MethodHolder(VppTestCase): # TODO : verify # for acl in acls: # for r in acl.r: - # print r.src_mac.encode('hex'), \ - # r.src_mac_mask.encode('hex'),\ + # print binascii.hexlify(r.src_mac), \ + # binascii.hexlify(r.src_mac_mask),\ # unpack('<16B', r.src_ip_addr), \ # r.src_ip_prefix_len # diff --git a/test/test_classifier.py b/test/test_classifier.py index ade96737e42..f865cb9f6d5 100644 --- a/test/test_classifier.py +++ b/test/test_classifier.py @@ -1,9 +1,8 @@ #!/usr/bin/env python -import unittest -import socket import binascii -import sys +import socket +import unittest from framework import VppTestCase, VppTestRunner @@ -218,9 +217,9 @@ class TestClassifier(VppTestCase): :param int dst_port: destination port number "x" """ if src_ip: - src_ip = socket.inet_aton(src_ip).encode('hex') + src_ip = binascii.hexlify(socket.inet_aton(src_ip)) if dst_ip: - dst_ip = socket.inet_aton(dst_ip).encode('hex') + dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)) return ('{:0>20}{:0>12}{:0>8}{:0>4}{:0>4}'.format( hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:], diff --git a/test/test_classifier_ip6.py b/test/test_classifier_ip6.py index 9f4c20aae3e..cbcf5c47257 100644 --- a/test/test_classifier_ip6.py +++ b/test/test_classifier_ip6.py @@ -184,9 +184,11 @@ class TestClassifier(VppTestCase): :param int dst_port: destination port number "x" """ if src_ip: - src_ip = socket.inet_pton(socket.AF_INET6, src_ip).encode('hex') + src_ip = binascii.hexlify(socket.inet_pton( + socket.AF_INET6, src_ip)) if dst_ip: - dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).encode('hex') + dst_ip = binascii.hexlify(socket.inet_pton( + socket.AF_INET6, dst_ip)) return ('{:0>14}{:0>34}{:0>32}{:0>4}{:0>4}'.format( hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:], diff --git a/test/test_flowprobe.py b/test/test_flowprobe.py index df6b4230699..d2fd031777c 100644 --- a/test/test_flowprobe.py +++ b/test/test_flowprobe.py @@ -1,4 +1,6 @@ #!/usr/bin/env python + +import binascii import random import socket import unittest @@ -196,8 +198,8 @@ class MethodHolder(VppTestCase): if cflow.haslayer(Data): data = decoder.decode_data_set(cflow.getlayer(Set)) for record in data: - self.assertEqual(int(record[1].encode('hex'), 16), octets) - self.assertEqual(int(record[2].encode('hex'), 16), packets) + self.assertEqual(int(binascii.hexlify(record[1]), 16), octets) + self.assertEqual(int(binascii.hexlify(record[2]), 16), packets) def send_packets(self, src_if=None, dst_if=None): if src_if is None: @@ -225,9 +227,9 @@ class MethodHolder(VppTestCase): if data_set is not None: for record in data: # skip flow if in/out gress interface is 0 - if int(record[10].encode('hex'), 16) == 0: + if int(binascii.hexlify(record[10]), 16) == 0: continue - if int(record[14].encode('hex'), 16) == 0: + if int(binascii.hexlify(record[14]), 16) == 0: continue for field in data_set: @@ -247,7 +249,7 @@ class MethodHolder(VppTestCase): else: ip = socket.inet_pton(socket.AF_INET6, ip_layer.src) - value = int(ip.encode('hex'), 16) + value = int(binascii.hexlify(ip), 16) elif value == 'dst_ip': if ip_ver == 'v4': ip = socket.inet_pton(socket.AF_INET, @@ -255,12 +257,13 @@ class MethodHolder(VppTestCase): else: ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst) - value = int(ip.encode('hex'), 16) + value = int(binascii.hexlify(ip), 16) elif value == 'sport': value = int(capture[0][UDP].sport) elif value == 'dport': value = int(capture[0][UDP].dport) - self.assertEqual(int(record[field].encode('hex'), 16), + self.assertEqual(int(binascii.hexlify( + record[field]), 16), value) def verify_cflow_data_notimer(self, decoder, capture, cflows): @@ -274,8 +277,10 @@ class MethodHolder(VppTestCase): for rec in data: p = capture[idx] idx += 1 - self.assertEqual(p[IP].len, int(rec[1].encode('hex'), 16)) - self.assertEqual(1, int(rec[2].encode('hex'), 16)) + self.assertEqual(p[IP].len, int( + binascii.hexlify(rec[1]), 16)) + self.assertEqual(1, int( + binascii.hexlify(rec[2]), 16)) self.assertEqual(len(capture), idx) def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1, @@ -411,25 +416,25 @@ class Flowprobe(MethodHolder): if cflow.haslayer(Data): record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0] # ingress interface - self.assertEqual(int(record[10].encode('hex'), 16), 8) + self.assertEqual(int(binascii.hexlify(record[10]), 16), 8) # egress interface - self.assertEqual(int(record[14].encode('hex'), 16), 9) + self.assertEqual(int(binascii.hexlify(record[14]), 16), 9) # packets - self.assertEqual(int(record[2].encode('hex'), 16), 1) + self.assertEqual(int(binascii.hexlify(record[2]), 16), 1) # src mac self.assertEqual(':'.join(re.findall('..', record[56].encode( 'hex'))), self.pg8.local_mac) # dst mac self.assertEqual(':'.join(re.findall('..', record[80].encode( 'hex'))), self.pg8.remote_mac) - flowTimestamp = int(record[156].encode('hex'), 16) >> 32 + flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32 # flow start timestamp self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1) - flowTimestamp = int(record[157].encode('hex'), 16) >> 32 + flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32 # flow end timestamp self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1) # ethernet type - self.assertEqual(int(record[256].encode('hex'), 16), 8) + self.assertEqual(int(binascii.hexlify(record[256]), 16), 8) # src ip self.assertEqual('.'.join(re.findall('..', record[8].encode( 'hex'))), @@ -441,13 +446,13 @@ class Flowprobe(MethodHolder): '.'.join('{:02x}'.format(int(n)) for n in "9.0.0.100".split('.'))) # protocol (TCP) - self.assertEqual(int(record[4].encode('hex'), 16), 6) + self.assertEqual(int(binascii.hexlify(record[4]), 16), 6) # src port - self.assertEqual(int(record[7].encode('hex'), 16), 1234) + self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234) # dst port - self.assertEqual(int(record[11].encode('hex'), 16), 4321) + self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321) # tcp flags - self.assertEqual(int(record[6].encode('hex'), 16), 80) + self.assertEqual(int(binascii.hexlify(record[6]), 16), 80) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0000") diff --git a/test/test_ip4.py b/test/test_ip4.py index bd44a0a7fc9..9f2c925ec4d 100644 --- a/test/test_ip4.py +++ b/test/test_ip4.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +import binascii import random import socket import unittest @@ -302,9 +303,8 @@ class TestIPv4FibCrud(VppTestCase): :return list: added ips with 32 prefix """ added_ips = [] - dest_addr = int(socket.inet_pton(socket.AF_INET, - start_dest_addr).encode('hex'), - 16) + dest_addr = int(binascii.hexlify(socket.inet_pton(socket.AF_INET, + start_dest_addr)), 16) dest_addr_len = 32 n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr) for _ in range(count): @@ -318,9 +318,8 @@ class TestIPv4FibCrud(VppTestCase): def unconfig_fib_many_to_one(self, start_dest_addr, next_hop_addr, count): removed_ips = [] - dest_addr = int(socket.inet_pton(socket.AF_INET, - start_dest_addr).encode('hex'), - 16) + dest_addr = int(binascii.hexlify(socket.inet_pton(socket.AF_INET, + start_dest_addr)), 16) dest_addr_len = 32 n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr) for _ in range(count): diff --git a/test/vpp_interface.py b/test/vpp_interface.py index 8639f8745c5..a464bf38984 100644 --- a/test/vpp_interface.py +++ b/test/vpp_interface.py @@ -1,3 +1,4 @@ +import binascii import socket from abc import abstractmethod, ABCMeta @@ -232,7 +233,7 @@ class VppInterface(object): if intf.sw_if_index == self.sw_if_index: self._name = intf.interface_name.split(b'\0', 1)[0] self._local_mac = \ - ':'.join(intf.l2_address.encode('hex')[i:i + 2] + ':'.join(binascii.hexlify(intf.l2_address)[i:i + 2] for i in range(0, 12, 2)) self._dump = intf break -- 2.16.6