Change to binascii.hexlify() for consistent bahavior.
Change-Id: Ie430cdd1ffeb6510db4aa037546e42d85992093b
Signed-off-by: Paul Vinciguerra <pvinci@vinciconsulting.com>
#!/usr/bin/env python
"""ACL plugin - MACIP tests
"""
#!/usr/bin/env python
"""ACL plugin - MACIP tests
"""
import random
import re
import unittest
import random
import re
import unittest
rule = "DENY "
print " IP6" if r.is_ipv6 else " IP4", \
rule, \
rule = "DENY "
print " IP6" if r.is_ipv6 else " IP4", \
rule, \
- r.src_mac.encode('hex'), \
- r.src_mac_mask.encode('hex'),\
+ binascii.hexlify(r.src_mac), \
+ binascii.hexlify(r.src_mac_mask),\
unpack('<16B', r.src_ip_addr), \
r.src_ip_prefix_len
return acls
unpack('<16B', r.src_ip_addr), \
r.src_ip_prefix_len
return acls
# TODO : verify
# for acl in acls:
# for r in acl.r:
# TODO : verify
# for acl in acls:
# for r in acl.r:
- # print r.src_mac.encode('hex'), \
- # r.src_mac_mask.encode('hex'),\
+ # print binascii.hexlify(r.src_mac), \
+ # binascii.hexlify(r.src_mac_mask),\
# unpack('<16B', r.src_ip_addr), \
# r.src_ip_prefix_len
#
# unpack('<16B', r.src_ip_addr), \
# r.src_ip_prefix_len
#
-import unittest
-import socket
+import socket
+import unittest
from framework import VppTestCase, VppTestRunner
from framework import VppTestCase, VppTestRunner
:param int dst_port: destination port number "x"
"""
if src_ip:
:param int dst_port: destination port number "x"
"""
if src_ip:
- src_ip = socket.inet_aton(src_ip).encode('hex')
+ src_ip = binascii.hexlify(socket.inet_aton(src_ip))
- dst_ip = socket.inet_aton(dst_ip).encode('hex')
+ dst_ip = binascii.hexlify(socket.inet_aton(dst_ip))
return ('{:0>20}{:0>12}{:0>8}{:0>4}{:0>4}'.format(
hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
return ('{:0>20}{:0>12}{:0>8}{:0>4}{:0>4}'.format(
hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
:param int dst_port: destination port number "x"
"""
if src_ip:
:param int dst_port: destination port number "x"
"""
if src_ip:
- src_ip = socket.inet_pton(socket.AF_INET6, src_ip).encode('hex')
+ src_ip = binascii.hexlify(socket.inet_pton(
+ socket.AF_INET6, src_ip))
- dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).encode('hex')
+ dst_ip = binascii.hexlify(socket.inet_pton(
+ socket.AF_INET6, dst_ip))
return ('{:0>14}{:0>34}{:0>32}{:0>4}{:0>4}'.format(
hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
return ('{:0>14}{:0>34}{:0>32}{:0>4}{:0>4}'.format(
hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
import random
import socket
import unittest
import random
import socket
import unittest
if cflow.haslayer(Data):
data = decoder.decode_data_set(cflow.getlayer(Set))
for record in data:
if cflow.haslayer(Data):
data = decoder.decode_data_set(cflow.getlayer(Set))
for record in data:
- self.assertEqual(int(record[1].encode('hex'), 16), octets)
- self.assertEqual(int(record[2].encode('hex'), 16), packets)
+ self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
+ self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
def send_packets(self, src_if=None, dst_if=None):
if src_if is None:
def send_packets(self, src_if=None, dst_if=None):
if src_if is None:
if data_set is not None:
for record in data:
# skip flow if in/out gress interface is 0
if data_set is not None:
for record in data:
# skip flow if in/out gress interface is 0
- if int(record[10].encode('hex'), 16) == 0:
+ if int(binascii.hexlify(record[10]), 16) == 0:
- if int(record[14].encode('hex'), 16) == 0:
+ if int(binascii.hexlify(record[14]), 16) == 0:
continue
for field in data_set:
continue
for field in data_set:
else:
ip = socket.inet_pton(socket.AF_INET6,
ip_layer.src)
else:
ip = socket.inet_pton(socket.AF_INET6,
ip_layer.src)
- value = int(ip.encode('hex'), 16)
+ value = int(binascii.hexlify(ip), 16)
elif value == 'dst_ip':
if ip_ver == 'v4':
ip = socket.inet_pton(socket.AF_INET,
elif value == 'dst_ip':
if ip_ver == 'v4':
ip = socket.inet_pton(socket.AF_INET,
else:
ip = socket.inet_pton(socket.AF_INET6,
ip_layer.dst)
else:
ip = socket.inet_pton(socket.AF_INET6,
ip_layer.dst)
- value = int(ip.encode('hex'), 16)
+ value = int(binascii.hexlify(ip), 16)
elif value == 'sport':
value = int(capture[0][UDP].sport)
elif value == 'dport':
value = int(capture[0][UDP].dport)
elif value == 'sport':
value = int(capture[0][UDP].sport)
elif value == 'dport':
value = int(capture[0][UDP].dport)
- self.assertEqual(int(record[field].encode('hex'), 16),
+ self.assertEqual(int(binascii.hexlify(
+ record[field]), 16),
value)
def verify_cflow_data_notimer(self, decoder, capture, cflows):
value)
def verify_cflow_data_notimer(self, decoder, capture, cflows):
for rec in data:
p = capture[idx]
idx += 1
for rec in data:
p = capture[idx]
idx += 1
- self.assertEqual(p[IP].len, int(rec[1].encode('hex'), 16))
- self.assertEqual(1, int(rec[2].encode('hex'), 16))
+ self.assertEqual(p[IP].len, int(
+ binascii.hexlify(rec[1]), 16))
+ self.assertEqual(1, int(
+ binascii.hexlify(rec[2]), 16))
self.assertEqual(len(capture), idx)
def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1,
self.assertEqual(len(capture), idx)
def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1,
if cflow.haslayer(Data):
record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
# ingress interface
if cflow.haslayer(Data):
record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
# ingress interface
- self.assertEqual(int(record[10].encode('hex'), 16), 8)
+ self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
- self.assertEqual(int(record[14].encode('hex'), 16), 9)
+ self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
- self.assertEqual(int(record[2].encode('hex'), 16), 1)
+ self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
# src mac
self.assertEqual(':'.join(re.findall('..', record[56].encode(
'hex'))), self.pg8.local_mac)
# dst mac
self.assertEqual(':'.join(re.findall('..', record[80].encode(
'hex'))), self.pg8.remote_mac)
# src mac
self.assertEqual(':'.join(re.findall('..', record[56].encode(
'hex'))), self.pg8.local_mac)
# dst mac
self.assertEqual(':'.join(re.findall('..', record[80].encode(
'hex'))), self.pg8.remote_mac)
- flowTimestamp = int(record[156].encode('hex'), 16) >> 32
+ flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
# flow start timestamp
self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
# flow start timestamp
self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
- flowTimestamp = int(record[157].encode('hex'), 16) >> 32
+ flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
# flow end timestamp
self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
# ethernet type
# flow end timestamp
self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
# ethernet type
- self.assertEqual(int(record[256].encode('hex'), 16), 8)
+ self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
# src ip
self.assertEqual('.'.join(re.findall('..', record[8].encode(
'hex'))),
# src ip
self.assertEqual('.'.join(re.findall('..', record[8].encode(
'hex'))),
'.'.join('{:02x}'.format(int(n)) for n in
"9.0.0.100".split('.')))
# protocol (TCP)
'.'.join('{:02x}'.format(int(n)) for n in
"9.0.0.100".split('.')))
# protocol (TCP)
- self.assertEqual(int(record[4].encode('hex'), 16), 6)
+ self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
- self.assertEqual(int(record[7].encode('hex'), 16), 1234)
+ self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
- self.assertEqual(int(record[11].encode('hex'), 16), 4321)
+ self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
- self.assertEqual(int(record[6].encode('hex'), 16), 80)
+ self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0000")
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0000")
import random
import socket
import unittest
import random
import socket
import unittest
:return list: added ips with 32 prefix
"""
added_ips = []
:return list: added ips with 32 prefix
"""
added_ips = []
- dest_addr = int(socket.inet_pton(socket.AF_INET,
- start_dest_addr).encode('hex'),
- 16)
+ dest_addr = int(binascii.hexlify(socket.inet_pton(socket.AF_INET,
+ start_dest_addr)), 16)
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
def unconfig_fib_many_to_one(self, start_dest_addr, next_hop_addr, count):
removed_ips = []
def unconfig_fib_many_to_one(self, start_dest_addr, next_hop_addr, count):
removed_ips = []
- dest_addr = int(socket.inet_pton(socket.AF_INET,
- start_dest_addr).encode('hex'),
- 16)
+ dest_addr = int(binascii.hexlify(socket.inet_pton(socket.AF_INET,
+ start_dest_addr)), 16)
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
import socket
from abc import abstractmethod, ABCMeta
import socket
from abc import abstractmethod, ABCMeta
if intf.sw_if_index == self.sw_if_index:
self._name = intf.interface_name.split(b'\0', 1)[0]
self._local_mac = \
if intf.sw_if_index == self.sw_if_index:
self._name = intf.interface_name.split(b'\0', 1)[0]
self._local_mac = \
- ':'.join(intf.l2_address.encode('hex')[i:i + 2]
+ ':'.join(binascii.hexlify(intf.l2_address)[i:i + 2]
for i in range(0, 12, 2))
self._dump = intf
break
for i in range(0, 12, 2))
self._dump = intf
break