retries = 0
while True:
try:
- dir = self.ls(name)
- return self.dump(dir).values()[0]
+ d = self.ls(name)
+ s = self.dump(d)
+ if len(s) > 1:
+ raise AttributeError('Matches multiple counters {}'
+ .format(name))
+ k, v = s.popitem()
+ return v
except VPPStatsIOError as e:
if retries > 10:
return None
split = read.splitlines(True)
if len(stderr_fragment) > 0:
split[0] = "%s%s" % (stderr_fragment, split[0])
- if len(split) > 0 and split[-1].endswith("\n"):
+ if len(split) > 0 and split[-1].endswith(b"\n"):
limit = None
else:
limit = -1
if hasattr(cls, 'pump_thread_stop_flag'):
cls.pump_thread_stop_flag.set()
if hasattr(cls, 'pump_thread_wakeup_pipe'):
- os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up')
+ os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
if hasattr(cls, 'pump_thread'):
cls.logger.debug("Waiting for pump thread to stop")
cls.pump_thread.join()
stderr_log(single_line_delim)
stderr_log('VPP output to stderr while running %s:', cls.__name__)
stderr_log(single_line_delim)
- vpp_output = "".join(cls.vpp_stderr_deque)
+ vpp_output = "".join(str(cls.vpp_stderr_deque))
with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
f.write(vpp_output)
stderr_log('\n%s', vpp_output)
self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
self.result_parent_end, self.result_child_end = Pipe(duplex=False)
self.testcase_suite = testcase_suite
- self.stdouterr_queue = manager.StreamQueue()
+ if sys.version[0] == '2':
+ self.stdouterr_queue = manager.StreamQueue()
+ else:
+ from multiprocessing import get_context
+ self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
self.logger = get_parallel_logger(self.stdouterr_queue)
self.child = Process(target=test_runner_wrapper,
args=(testcase_suite,
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
-from util import mactobinary
+from vpp_mac import mactobinary
from vpp_bond_interface import VppBondInterface
from vpp_neighbor import VppNeighbor
from vpp_ip_route import find_route, VppIpTable
from util import mk_ll_addr
-
+from vpp_mac import mactobinary, binarytomac
from scapy.layers.l2 import Ether, getmacbyip, ARP
from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6, in6_getnsmac
from socket import AF_INET, AF_INET6
from scapy.utils import inet_pton, inet_ntop
from scapy.utils6 import in6_ptop
-from util import mactobinary
DHCP4_CLIENT_PORT = 68
DHCP4_SERVER_PORT = 67
from socket import AF_INET, AF_INET6
from scapy.utils import inet_pton, inet_ntop
-from util import mactobinary
from vpp_papi_provider import L2_VTR_OP
from scapy.layers.inet import IP, UDP
from framework import VppTestCase, VppTestRunner
-from util import mactobinary
+from vpp_mac import mactobinary
from vpp_papi_provider import L2_PORT_TYPE
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable
from socket import AF_INET, AF_INET6, inet_pton
-import StringIO
+from util import reassemble4
+
""" Testipip is a subclass of VPPTestCase classes.
"""
-# Replace by deframent from scapy.
-def reassemble(listoffragments):
- buffer = StringIO.StringIO()
- first = listoffragments[0]
- buffer.seek(20)
- for pkt in listoffragments:
- buffer.seek(pkt[IP].frag*8)
- buffer.write(pkt[IP].payload)
- first.len = len(buffer.getvalue()) + 20
- first.flags = 0
- del(first.chksum)
- header = str(first[IP])[:20]
- return first[IP].__class__(header + buffer.getvalue())
-
-
class TestIPIP(VppTestCase):
""" IPIP Test Case """
i.admin_down()
def validate(self, rx, expected):
- self.assertEqual(rx, expected.__class__(str(expected)))
+ self.assertEqual(rx, expected.__class__(expected))
def generate_ip4_frags(self, payload_length, fragment_size):
p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
self.pg1.add_stream(frags)
self.pg_start()
rx = self.pg0.get_capture(6)
- reass_pkt = reassemble(rx)
+ reass_pkt = reassemble4(rx)
p4_reply.ttl -= 1
p4_reply.id = 256
self.validate(reass_pkt, p4_reply)
self.pg1.add_stream(frags)
self.pg_start()
rx = self.pg0.get_capture(2)
- reass_pkt = reassemble(rx)
+ reass_pkt = reassemble4(rx)
p4_reply.ttl -= 1
p4_reply.id = 512
self.validate(reass_pkt, p4_reply)
rv = self.vapi.ipip_del_tunnel(sw_if_index=self.tunnel_if_index)
def validate(self, rx, expected):
- self.assertEqual(rx, expected.__class__(str(expected)))
+ self.assertEqual(rx, expected.__class__(expected))
def generate_ip6_frags(self, payload_length, fragment_size):
p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
from framework import VppTestCase, VppTestRunner
-from util import Host, ppp, mactobinary
-from vpp_mac import VppMacAddress
+from util import Host, ppp
+from vpp_mac import VppMacAddress, mactobinary
from vpp_ip import VppIpAddress
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath
from socket import AF_INET, AF_INET6, inet_pton
-import StringIO
+from util import reassemble4
+
""" Test_mtu is a subclass of VPPTestCase classes.
MTU tests.
"""
-def reassemble(listoffragments):
- buffer = StringIO.StringIO()
- first = listoffragments[0]
- buffer.seek(20)
- for pkt in listoffragments:
- # pkt.show2()
- buffer.seek(pkt[IP].frag*8)
- buffer.write(pkt[IP].payload)
- first.len = len(buffer.getvalue()) + 20
- first.flags = 0
- del(first.chksum)
- header = str(first[IP])[:20]
- return first[IP].__class__(header + buffer.getvalue())
-
-
class TestMTU(VppTestCase):
""" MTU Test Case """
+ maxDiff = None
@classmethod
def setUpClass(cls):
i.admin_down()
def validate(self, rx, expected):
- self.assertEqual(rx, expected.__class__(str(expected)))
+ self.assertEqual(rx, expected.__class__(expected))
def validate_bytes(self, rx, expected):
self.assertEqual(rx, expected)
ttl=254, len=576, id=0) /
p_icmp4 / p_ip4 / p_payload)
icmp4_reply[1].ttl -= 1
- n = icmp4_reply.__class__(str(icmp4_reply))
- s = str(icmp4_reply)
+ n = icmp4_reply.__class__(icmp4_reply)
+ s = bytes(icmp4_reply)
icmp4_reply = s[0:576]
rx = self.send_and_expect(self.pg0, p4*11, self.pg0)
for p in rx:
# p.show2()
# n.show2()
- self.validate_bytes(str(p[1]), icmp4_reply)
+ self.validate_bytes(bytes(p[1]), icmp4_reply)
# Now with DF off. Expect fragments.
# First go with 1500 byte packets.
self.pg0.add_stream(p4*1)
self.pg_start()
rx = self.pg1.get_capture(3)
- reass_pkt = reassemble(rx)
+ reass_pkt = reassemble4(rx)
self.validate(reass_pkt, p4_reply)
'''
self.pg0.add_stream(p4*1)
self.pg_start()
rx = self.pg1.get_capture(16)
- reass_pkt = reassemble(rx)
+ reass_pkt = reassemble4(rx)
reass_pkt.show2()
p4_reply.show2()
self.validate(reass_pkt, p4_reply)
hlim=255, plen=1240) /
p_icmp6 / p_ip6 / p_payload)
icmp6_reply[2].hlim -= 1
- n = icmp6_reply.__class__(str(icmp6_reply))
- s = str(icmp6_reply)
+ n = icmp6_reply.__class__(icmp6_reply)
+ s = bytes(icmp6_reply)
icmp6_reply_str = s[0:1280]
rx = self.send_and_expect(self.pg0, p6*9, self.pg0)
for p in rx:
- self.validate_bytes(str(p[1]), icmp6_reply_str)
+ self.validate_bytes(bytes(p[1]), icmp6_reply_str)
# Reset MTU
self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index,
import socket
import unittest
import struct
-import StringIO
import random
from framework import VppTestCase, VppTestRunner, running_extended_tests
from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
from time import sleep
from util import ip4_range
-from util import mactobinary
+from vpp_mac import mactobinary
from syslog_rfc5424_parser import SyslogMessage, ParseError
from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
from vpp_papi_provider import SYSLOG_SEVERITY
+from io import BytesIO
class MethodHolder(VppTestCase):
:returns: Reassembled IPv4 packet
"""
- buffer = StringIO.StringIO()
+ buffer = BytesIO()
for p in frags:
self.assertEqual(p[IP].src, src)
self.assertEqual(p[IP].dst, dst)
self.assert_ip_checksum_valid(p)
buffer.seek(p[IP].frag * 8)
- buffer.write(p[IP].payload)
+ buffer.write(bytes(p[IP].payload))
ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
proto=frags[0][IP].proto)
if ip.proto == IP_PROTOS.tcp:
:returns: Reassembled IPv6 packet
"""
- buffer = StringIO.StringIO()
+ buffer = BytesIO()
for p in frags:
self.assertEqual(p[IPv6].src, src)
self.assertEqual(p[IPv6].dst, dst)
buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
- buffer.write(p[IPv6ExtHdrFragment].payload)
+ buffer.write(bytes(p[IPv6ExtHdrFragment].payload))
ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
nh=frags[0][IPv6ExtHdrFragment].nh)
if ip.nh == IP_PROTOS.tcp:
from vpp_sub_interface import VppP2PSubint
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath
-from util import mactobinary
+from vpp_mac import mactobinary
class P2PEthernetAPI(VppTestCase):
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from scapy.volatile import RandMAC, RandIP
-
-from util import ppp, ppc, mactobinary
+from vpp_mac import mactobinary
+from util import ppp, ppc
import socket
--- /dev/null
+#!/usr/bin/env python
+"""Test framework utilitty functions tests"""
+
+import unittest
+from framework import VppTestCase, VppTestRunner
+from vpp_mac import mactobinary, binarytomac
+
+
+class TestUtil (VppTestCase):
+ """ MAC to binary and back """
+ def test_mac_to_binary(self):
+ mac = 'aa:bb:cc:dd:ee:ff'
+ b = mactobinary(mac)
+ mac2 = binarytomac(b)
+ self.assertEqual(type(mac), type(mac2))
+ self.assertEqual(mac2, mac)
+
+if __name__ == '__main__':
+ unittest.main(testRunner=VppTestRunner)
#!/usr/bin/env python
import socket
-from util import ip4n_range, ip4_range
+from util import ip4n_range, ip4_range, reassemble4
import unittest
from framework import VppTestCase, VppTestRunner
from template_bd import BridgeDomain
from scapy.layers.vxlan import VXLAN
from scapy.utils import atol
-import StringIO
-
-
-def reassemble(listoffragments):
- buffer = StringIO.StringIO()
- first = listoffragments[0]
- buffer.seek(20)
- for pkt in listoffragments:
- buffer.seek(pkt[IP].frag*8)
- buffer.write(pkt[IP].payload)
- first.len = len(buffer.getvalue()) + 20
- first.flags = 0
- del(first.chksum)
- header = str(first[IP])[:20]
- return first[IP].__class__(header + buffer.getvalue())
-
class TestVxlan(BridgeDomain, VppTestCase):
""" VXLAN Test Case """
# Pick first received frame and check if it's correctly encapsulated.
out = self.pg0.get_capture(2)
ether = out[0]
- pkt = reassemble(out)
+ pkt = reassemble4(out)
pkt = ether / pkt
self.check_encapsulation(pkt, self.single_tunnel_bd)
#!/usr/bin/env python
import socket
-from util import ip4_range
+from util import ip4_range, reassemble4_ether
import unittest
from framework import VppTestCase, VppTestRunner
from template_bd import BridgeDomain
from scapy.layers.vxlan import VXLAN
from scapy.utils import atol
-import StringIO
-
-
-def reassemble(listoffragments):
- buffer = StringIO.StringIO()
- first = listoffragments[0]
- buffer.seek(20)
- for pkt in listoffragments:
- buffer.seek(pkt[IP].frag*8)
- buffer.write(pkt[IP].payload)
- first.len = len(buffer.getvalue()) + 20
- first.flags = 0
- del(first.chksum)
- header = str(first[Ether])[:34]
- return first[Ether].__class__(header + buffer.getvalue())
-
class TestVxlanGbp(VppTestCase):
""" VXLAN GBP Test Case """
# Pick first received frame and check if it's correctly encapsulated.
out = self.pg0.get_capture(2)
- pkt = reassemble(out)
+ pkt = reassemble4_ether(out)
self.check_encapsulation(pkt, self.single_tunnel_bd)
payload = self.decapsulate(pkt)
import sys
import os.path
from abc import abstractmethod, ABCMeta
-from cStringIO import StringIO
from scapy.utils6 import in6_mactoifaceid
from scapy.layers.l2 import Ether
IPv6ExtHdrHopByHop
from scapy.utils import hexdump
from socket import AF_INET6
+from io import BytesIO
+from vpp_mac import mactobinary
def ppp(headline, packet):
""" Return string containing the output of scapy packet.show() call. """
- o = StringIO()
+ o = BytesIO()
old_stdout = sys.stdout
sys.stdout = o
print(headline)
for ip in ip4_range(ip4, s, e))
-def mactobinary(mac):
- """ Convert the : separated format into binary packet data for the API """
- return mac.replace(':', '').decode('hex')
-
-
def mk_ll_addr(mac):
euid = in6_mactoifaceid(mac)
addr = "fe80::" + euid
pkts[-1][IPv6ExtHdrFragment].m = 0 # reset more-flags in last fragment
return pkts
+
+
+def reassemble4_core(listoffragments, return_ip):
+ buffer = BytesIO()
+ first = listoffragments[0]
+ buffer.seek(20)
+ for pkt in listoffragments:
+ buffer.seek(pkt[IP].frag*8)
+ buffer.write(bytes(pkt[IP].payload))
+ first.len = len(buffer.getvalue()) + 20
+ first.flags = 0
+ del(first.chksum)
+ if return_ip:
+ header = bytes(first[IP])[:20]
+ return first[IP].__class__(header + buffer.getvalue())
+ else:
+ header = bytes(first[Ether])[:34]
+ return first[Ether].__class__(header + buffer.getvalue())
+
+
+def reassemble4_ether(listoffragments):
+ return reassemble4_core(listoffragments, False)
+
+
+def reassemble4(listoffragments):
+ return reassemble4_core(listoffragments, True)
from six import moves
-from util import Host, mk_ll_addr, mactobinary
+from util import Host, mk_ll_addr
+from vpp_mac import mactobinary, binarytomac
class VppInterface(object):
r = self.test.vapi.sw_interface_dump()
for intf in r:
if intf.sw_if_index == self.sw_if_index:
- self._name = intf.interface_name.split(b'\0', 1)[0]
- self._local_mac = \
- ':'.join(binascii.hexlify(intf.l2_address)[i:i + 2]
- for i in range(0, 12, 2))
+ self._name = intf.interface_name.split(b'\0',
+ 1)[0].decode('utf8')
+ self._local_mac = binarytomac(intf.l2_address)
self._dump = intf
break
else:
:param vrf_id: The FIB table / VRF ID. (Default value = 0)
"""
for host in self._remote_hosts:
- macn = host.mac.replace(":", "").decode('hex')
+ macn = mactobinary(host.mac)
ipn = host.ip4n
self.test.vapi.ip_neighbor_add_del(
self.sw_if_index, macn, ipn)
:param vrf_id: The FIB table / VRF ID. (Default value = 0)
"""
for host in self._remote_hosts:
- macn = host.mac.replace(":", "").decode('hex')
+ macn = mactobinary(host.mac)
ipn = host.ip6n
self.test.vapi.ip_neighbor_add_del(
self.sw_if_index, macn, ipn, is_ipv6=1)
"""
from vpp_object import *
-from util import mactobinary
from vpp_ip import VppIpAddress
-from vpp_mac import VppMacAddress
+from vpp_mac import VppMacAddress, mactobinary
from vpp_lo_interface import VppLoInterface
MAC Types
"""
+import binascii
-from util import mactobinary
+
+def mactobinary(mac):
+ """ Convert the : separated format into binary packet data for the API """
+ return binascii.unhexlify(mac.replace(':', ''))
+
+
+def binarytomac(binary):
+ """ Convert binary packed data in a : separated string """
+ x = b':'.join(binascii.hexlify(binary)[i:i + 2]
+ for i in range(0, 12, 2))
+ return str(x.decode('ascii'))
class VppMacAddress():
from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
from vpp_object import *
-from util import mactobinary
+from vpp_mac import mactobinary
def find_nbr(test, sw_if_index, ip_addr, is_static=0, inet=AF_INET, mac=None):
from collections import deque
from six import moves
-
+from vpp_mac import mactobinary
from hook import Hook
from vpp_l2 import L2_PORT_TYPE
"""
self.hook.before_cli(cli)
cli += '\n'
- r = self.papi.cli_inband(length=len(cli), cmd=cli)
+ r = self.papi.cli_inband(length=len(cli), cmd=str(cli).encode('utf8'))
self.hook.after_cli(cli)
if hasattr(r, 'reply'):
return r.reply.decode().rstrip('\x00')
return cli + "\n" + str(self.cli(cli))
def _convert_mac(self, mac):
- return mac.replace(':', '').decode('hex')
+ return mactobinary(mac)
def show_version(self):
""" """
:param is_static: (Default value = 0)
:param is_no_adj_fib: (Default value = 0)
"""
-
return self.api(
self.papi.ip_neighbor_add_del,
{'sw_if_index': sw_if_index,
from vpp_interface import VppInterface
import socket
-from util import mactobinary
+from vpp_mac import mactobinary
class VppPppoeInterface(VppInterface):