Fix struct.error: expected bytes object got <class 'str'>
Change-Id: I837ae6e97e44c789a9372677151b157956525334
Signed-off-by: Paul Vinciguerra <pvinci@vinciconsulting.com>
15 files changed:
def get_lisp_locator_sets_dump_entry(self):
result = self.test.vapi.lisp_locator_set_dump()
for ls in result:
def get_lisp_locator_sets_dump_entry(self):
result = self.test.vapi.lisp_locator_set_dump()
for ls in result:
- if ls.ls_name.strip('\x00') == self._ls_name:
+ if ls.ls_name.strip(b'\x00') == self._ls_name:
else:
raise Exception('Unsupported EID format {!s}!'.format(eid))
else:
raise Exception('Unsupported EID format {!s}!'.format(eid))
+ @property
+ def packed(self):
if self.eid_type == LispEIDType.IP4:
return socket.inet_pton(socket.AF_INET, self.eid_address)
elif self.eid_type == LispEIDType.IP6:
if self.eid_type == LispEIDType.IP4:
return socket.inet_pton(socket.AF_INET, self.eid_address)
elif self.eid_type == LispEIDType.IP6:
def get_lisp_mapping_dump_entry(self):
return self.test.vapi.lisp_eid_table_dump(
def get_lisp_mapping_dump_entry(self):
return self.test.vapi.lisp_eid_table_dump(
- eid_set=1, prefix_length=self._eid.prefix_length,
- vni=self._vni, eid_type=self._eid.eid_type, eid=str(self._eid))
+ eid_set=1, prefix_length=self._eid.prefix_length,
+ vni=self._vni, eid_type=self._eid.eid_type,
+ eid=self._eid.packed)
def query_vpp_config(self):
mapping = self.get_lisp_mapping_dump_entry()
def query_vpp_config(self):
mapping = self.get_lisp_mapping_dump_entry()
def add_vpp_config(self):
self.test.vapi.lisp_add_del_local_eid(
ls_name=self._ls_name, eid_type=self._eid.eid_type,
def add_vpp_config(self):
self.test.vapi.lisp_add_del_local_eid(
ls_name=self._ls_name, eid_type=self._eid.eid_type,
- eid=str(self._eid), prefix_len=self._eid.prefix_length,
+ eid=self._eid.packed, prefix_len=self._eid.prefix_length,
vni=self._vni, key_id=self._key_id, key=self._key)
self._test.registry.register(self, self.test.logger)
def remove_vpp_config(self):
self.test.vapi.lisp_add_del_local_eid(
ls_name=self._ls_name, eid_type=self._eid.eid_type,
vni=self._vni, key_id=self._key_id, key=self._key)
self._test.registry.register(self, self.test.logger)
def remove_vpp_config(self):
self.test.vapi.lisp_add_del_local_eid(
ls_name=self._ls_name, eid_type=self._eid.eid_type,
- eid=str(self._eid), prefix_len=self._eid.prefix_length,
+ eid=self._eid.packed, prefix_len=self._eid.prefix_length,
vni=self._vni, is_add=0)
def object_id(self):
vni=self._vni, is_add=0)
def object_id(self):
def add_vpp_config(self):
self.test.vapi.lisp_add_del_remote_mapping(
rlocs=self._rlocs, eid_type=self._eid.eid_type,
def add_vpp_config(self):
self.test.vapi.lisp_add_del_remote_mapping(
rlocs=self._rlocs, eid_type=self._eid.eid_type,
- eid=str(self._eid), eid_prefix_len=self._eid.prefix_length,
+ eid=self._eid.packed, eid_prefix_len=self._eid.prefix_length,
vni=self._vni, rlocs_num=len(self._rlocs))
self._test.registry.register(self, self.test.logger)
def remove_vpp_config(self):
self.test.vapi.lisp_add_del_remote_mapping(
vni=self._vni, rlocs_num=len(self._rlocs))
self._test.registry.register(self, self.test.logger)
def remove_vpp_config(self):
self.test.vapi.lisp_add_del_remote_mapping(
- eid_type=self._eid.eid_type, eid=str(self._eid),
+ eid_type=self._eid.eid_type, eid=self._eid.packed,
eid_prefix_len=self._eid.prefix_length, vni=self._vni,
is_add=0, rlocs_num=0)
eid_prefix_len=self._eid.prefix_length, vni=self._vni,
is_add=0, rlocs_num=0)
def add_vpp_config(self):
self.test.vapi.lisp_add_del_adjacency(
def add_vpp_config(self):
self.test.vapi.lisp_add_del_adjacency(
- leid=str(self._leid),
- reid=str(self._reid), eid_type=self._leid.eid_type,
+ leid=self._leid.packed,
+ reid=self._reid.packed, eid_type=self._leid.eid_type,
leid_len=self._leid.prefix_length,
reid_len=self._reid.prefix_length, vni=self._vni)
self._test.registry.register(self, self.test.logger)
leid_len=self._leid.prefix_length,
reid_len=self._reid.prefix_length, vni=self._vni)
self._test.registry.register(self, self.test.logger)
if eid.prefix_length != prefix_len:
return False
if eid.prefix_length != prefix_len:
return False
- if str(eid) != eid_data[0:eid.data_length]:
+ if eid.packed != eid_data[0:eid.data_length]:
def remove_vpp_config(self):
self.test.vapi.lisp_add_del_adjacency(
def remove_vpp_config(self):
self.test.vapi.lisp_add_del_adjacency(
- leid=str(self._leid),
- reid=str(self._reid), eid_type=self._leid.eid_type,
+ leid=self._leid.packed,
+ reid=self._reid.packed, eid_type=self._leid.eid_type,
leid_len=self._leid.prefix_length,
reid_len=self._reid.prefix_length, vni=self._vni, is_add=0)
leid_len=self._leid.prefix_length,
reid_len=self._reid.prefix_length, vni=self._vni, is_add=0)
self.acl_index = 4294967295
def create_rule(self, is_ipv6=0, permit_deny=0, proto=-1,
self.acl_index = 4294967295
def create_rule(self, is_ipv6=0, permit_deny=0, proto=-1,
- s_prefix=0, s_ip='\x00\x00\x00\x00', sport_from=0,
- sport_to=65535, d_prefix=0, d_ip='\x00\x00\x00\x00',
+ s_prefix=0, s_ip=b'\x00\x00\x00\x00', sport_from=0,
+ sport_to=65535, d_prefix=0, d_ip=b'\x00\x00\x00\x00',
dport_from=0, dport_to=65535):
if proto == -1 or proto == 0:
sport_to = 0
dport_from=0, dport_to=65535):
if proto == -1 or proto == 0:
sport_to = 0
reply = self._test.vapi.acl_add_replace(self.acl_index,
r=rules,
reply = self._test.vapi.acl_add_replace(self.acl_index,
r=rules,
self.acl_index = reply.acl_index
return self.acl_index
self.acl_index = reply.acl_index
return self.acl_index
- policer = self.vapi.policer_add_del("ip4-punt", 400, 0, 10, 0,
+ policer = self.vapi.policer_add_del(b"ip4-punt", 400, 0, 10, 0,
rate_type=1)
self.vapi.ip_punt_police(policer.policer_index)
rate_type=1)
self.vapi.ip_punt_police(policer.policer_index)
# remove the poilcer. back to full rx
#
self.vapi.ip_punt_police(policer.policer_index, is_add=0)
# remove the poilcer. back to full rx
#
self.vapi.ip_punt_police(policer.policer_index, is_add=0)
- self.vapi.policer_add_del("ip4-punt", 400, 0, 10, 0,
+ self.vapi.policer_add_del(b"ip4-punt", 400, 0, 10, 0,
rate_type=1, is_add=0)
self.send_and_expect(self.pg0, pkts, self.pg1)
rate_type=1, is_add=0)
self.send_and_expect(self.pg0, pkts, self.pg1)
icmpv6_id = 0xb
icmpv6_seq = 5
icmpv6_id = 0xb
icmpv6_seq = 5
- icmpv6_data = '\x0a' * 18
+ icmpv6_data = b'\x0a' * 18
p_echo_request = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6,
p_echo_request = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6,
- policer = self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
+ policer = self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
rate_type=1)
self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
rate_type=1)
self.vapi.ip_punt_police(policer.policer_index, is_ip6=1)
# remove the policer. back to full rx
#
self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
# remove the policer. back to full rx
#
self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
- self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
+ self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
rate_type=1, is_add=0)
self.send_and_expect(self.pg0, pkts, self.pg1)
rate_type=1, is_add=0)
self.send_and_expect(self.pg0, pkts, self.pg1)
test_cases = [
{
'name': 'basic ip4 over ip4',
test_cases = [
{
'name': 'basic ip4 over ip4',
- 'locator-sets': [VppLispLocatorSet(self, 'ls-4o4')],
+ 'locator-sets': [VppLispLocatorSet(self, b'ls-4o4')],
- VppLispLocator(self, self.pg1.sw_if_index, 'ls-4o4')
+ VppLispLocator(self, self.pg1.sw_if_index, b'ls-4o4')
- VppLocalMapping(self, self.seid_ip4, 'ls-4o4')
+ VppLocalMapping(self, self.seid_ip4, b'ls-4o4')
],
'remote-mappings': [
VppRemoteMapping(self, self.deid_ip4_net,
],
'remote-mappings': [
VppRemoteMapping(self, self.deid_ip4_net,
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, ICMP
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, ICMP
from framework import VppTestCase, VppTestRunner, running_extended_tests
from remote_test import RemoteClass, RemoteVppTestCase
from framework import VppTestCase, VppTestRunner, running_extended_tests
from remote_test import RemoteClass, RemoteVppTestCase
def _check_socket_filename(self, dump, socket_id, filename):
for d in dump:
if (d.socket_id == socket_id) and (
def _check_socket_filename(self, dump, socket_id, filename):
for d in dump:
if (d.socket_id == socket_id) and (
- d.socket_filename.rstrip("\0") == filename):
+ d.socket_filename.rstrip(b"\0") == filename):
dump = self.vapi.memif_socket_filename_dump()
self.assertTrue(
self._check_socket_filename(
dump = self.vapi.memif_socket_filename_dump()
self.assertTrue(
self._check_socket_filename(
- dump, 0, self.tempdir + "/memif.sock"))
+ dump, 0, b"%s/memif.sock" % six.ensure_binary(
+ self.tempdir, encoding='utf-8')))
memif_sockets = []
# existing path
memif_sockets.append(
VppSocketFilename(
memif_sockets = []
# existing path
memif_sockets.append(
VppSocketFilename(
- self, 1, self.tempdir + "/memif1.sock"))
+ self, 1, b"%s/memif1.sock" % six.ensure_binary(
+ self.tempdir, encoding='utf-8')))
# default path (test tempdir)
memif_sockets.append(
VppSocketFilename(
self,
2,
# default path (test tempdir)
memif_sockets.append(
VppSocketFilename(
self,
2,
add_default_folder=True))
# create new folder in default folder
memif_sockets.append(
VppSocketFilename(
self,
3,
add_default_folder=True))
# create new folder in default folder
memif_sockets.append(
VppSocketFilename(
self,
3,
add_default_folder=True))
for sock in memif_sockets:
add_default_folder=True))
for sock in memif_sockets:
dump = self.vapi.memif_socket_filename_dump()
self.assertTrue(
self._check_socket_filename(
dump = self.vapi.memif_socket_filename_dump()
self.assertTrue(
self._check_socket_filename(
- dump, 0, self.tempdir + "/memif.sock"))
+ dump, 0, b"%s/memif.sock" % six.ensure_binary(
+ self.tempdir, encoding='utf-8')))
def _create_delete_test_one_interface(self, memif):
memif.add_vpp_config()
def _create_delete_test_one_interface(self, memif):
memif.add_vpp_config()
self._create_delete_test_one_interface(memif)
def test_memif_create_custom_socket(self):
self._create_delete_test_one_interface(memif)
def test_memif_create_custom_socket(self):
- """ Memif create with non-default socket filname """
+ """ Memif create with non-default socket filename """
memif_sockets = []
# existing path
memif_sockets.append(
VppSocketFilename(
memif_sockets = []
# existing path
memif_sockets.append(
VppSocketFilename(
- self, 1, self.tempdir + "/memif1.sock"))
+ self, 1, b"%s/memif1.sock" % six.ensure_binary(
+ self.tempdir, encoding='utf-8')))
# default path (test tempdir)
memif_sockets.append(
VppSocketFilename(
self,
2,
# default path (test tempdir)
memif_sockets.append(
VppSocketFilename(
self,
2,
add_default_folder=True))
# create new folder in default folder
memif_sockets.append(
VppSocketFilename(
self,
3,
add_default_folder=True))
# create new folder in default folder
memif_sockets.append(
VppSocketFilename(
self,
3,
add_default_folder=True))
memif = VppMemif(self, MEMIF_ROLE.SLAVE, MEMIF_MODE.ETHERNET)
add_default_folder=True))
memif = VppMemif(self, MEMIF_ROLE.SLAVE, MEMIF_MODE.ETHERNET)
ring_size=1024, buffer_size=2048)
remote_socket = VppSocketFilename(self.remote_test, 1,
ring_size=1024, buffer_size=2048)
remote_socket = VppSocketFilename(self.remote_test, 1,
- self.tempdir + "/memif.sock")
+ b"%s/memif.sock" % six.ensure_binary(
+ self.tempdir, encoding='utf-8'))
remote_socket.add_vpp_config()
remote_memif = VppMemif(self.remote_test, MEMIF_ROLE.MASTER,
remote_socket.add_vpp_config()
remote_memif = VppMemif(self.remote_test, MEMIF_ROLE.MASTER,
memif = VppMemif(self, MEMIF_ROLE.SLAVE, MEMIF_MODE.ETHERNET)
remote_socket = VppSocketFilename(self.remote_test, 1,
memif = VppMemif(self, MEMIF_ROLE.SLAVE, MEMIF_MODE.ETHERNET)
remote_socket = VppSocketFilename(self.remote_test, 1,
- self.tempdir + "/memif.sock")
+ b"%s/memif.sock" % six.ensure_binary(
+ self.tempdir, encoding='utf-8'))
remote_socket.add_vpp_config()
remote_memif = VppMemif(self.remote_test, MEMIF_ROLE.MASTER,
remote_socket.add_vpp_config()
remote_memif = VppMemif(self.remote_test, MEMIF_ROLE.MASTER,
is_inside=0)
sm = self.vapi.nat44_static_mapping_dump()
self.assertEqual(len(sm), 1)
is_inside=0)
sm = self.vapi.nat44_static_mapping_dump()
self.assertEqual(len(sm), 1)
- self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
+ self.assertEqual((sm[0].tag).split(b'\0', 1)[0], b'')
self.assertEqual(sm[0].protocol, 0)
self.assertEqual(sm[0].local_port, 0)
self.assertEqual(sm[0].external_port, 0)
self.assertEqual(sm[0].protocol, 0)
self.assertEqual(sm[0].local_port, 0)
self.assertEqual(sm[0].external_port, 0)
self.tcp_port_out = 6303
self.udp_port_out = 6304
self.icmp_id_out = 6305
self.tcp_port_out = 6303
self.udp_port_out = 6304
self.icmp_id_out = 6305
self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
is_inside=0)
sm = self.vapi.nat44_static_mapping_dump()
self.assertEqual(len(sm), 1)
is_inside=0)
sm = self.vapi.nat44_static_mapping_dump()
self.assertEqual(len(sm), 1)
- self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
+ self.assertEqual((sm[0].tag).split(b'\0', 1)[0], tag)
# out2in
pkts = self.create_stream_out(self.pg1, nat_ip)
# out2in
pkts = self.create_stream_out(self.pg1, nat_ip)
def test_interface_addr_static_mapping(self):
""" Static mapping with addresses from interface """
def test_interface_addr_static_mapping(self):
""" Static mapping with addresses from interface """
self.vapi.nat44_add_del_interface_addr(self.pg7.sw_if_index)
self.nat44_add_static_mapping(
self.vapi.nat44_add_del_interface_addr(self.pg7.sw_if_index)
self.nat44_add_static_mapping(
self.assertEqual(1, len(static_mappings))
self.assertEqual(self.pg7.sw_if_index,
static_mappings[0].external_sw_if_index)
self.assertEqual(1, len(static_mappings))
self.assertEqual(self.pg7.sw_if_index,
static_mappings[0].external_sw_if_index)
- self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
+ self.assertEqual((static_mappings[0].tag).split(b'\0', 1)[0], tag)
# configure interface address and check static mappings
self.pg7.config_ip4()
# configure interface address and check static mappings
self.pg7.config_ip4()
if sm.external_sw_if_index == 0xFFFFFFFF:
self.assertEqual(sm.external_ip_address[0:4],
self.pg7.local_ip4n)
if sm.external_sw_if_index == 0xFFFFFFFF:
self.assertEqual(sm.external_ip_address[0:4],
self.pg7.local_ip4n)
- self.assertEqual((sm.tag).split('\0', 1)[0], tag)
+ self.assertEqual((sm.tag).split(b'\0', 1)[0], tag)
resolved = True
self.assertTrue(resolved)
resolved = True
self.assertTrue(resolved)
self.assertEqual(1, len(static_mappings))
self.assertEqual(self.pg7.sw_if_index,
static_mappings[0].external_sw_if_index)
self.assertEqual(1, len(static_mappings))
self.assertEqual(self.pg7.sw_if_index,
static_mappings[0].external_sw_if_index)
- self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
+ self.assertEqual((static_mappings[0].tag).split(b'\0', 1)[0], tag)
# configure interface address again and check static mappings
self.pg7.config_ip4()
# configure interface address again and check static mappings
self.pg7.config_ip4()
if sm.external_sw_if_index == 0xFFFFFFFF:
self.assertEqual(sm.external_ip_address[0:4],
self.pg7.local_ip4n)
if sm.external_sw_if_index == 0xFFFFFFFF:
self.assertEqual(sm.external_ip_address[0:4],
self.pg7.local_ip4n)
- self.assertEqual((sm.tag).split('\0', 1)[0], tag)
+ self.assertEqual((sm.tag).split(b'\0', 1)[0], tag)
resolved = True
self.assertTrue(resolved)
resolved = True
self.assertTrue(resolved)
cls.pg1.config_ip6()
cls.pg1.resolve_ndp()
cls.pg1.config_ip6()
cls.pg1.resolve_ndp()
- cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00' * 16,
+ cls.vapi.ip_add_del_route(is_ipv6=True, dst_address=b'\x00' * 16,
dst_address_length=0,
next_hop_address=cls.pg1.remote_ip6n,
next_hop_sw_if_index=cls.pg1.sw_if_index)
dst_address_length=0,
next_hop_address=cls.pg1.remote_ip6n,
next_hop_sw_if_index=cls.pg1.sw_if_index)
import binascii
import random
import socket
import binascii
import random
import socket
-import scapy.layers.inet6 as inet6
import threading
import struct
import threading
import struct
from struct import unpack, unpack_from
from struct import unpack, unpack_from
+
+try:
+ import unittest2 as unittest
+except ImportError:
+ import unittest
+
from util import ppp, ppc
from re import compile
import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP
from util import ppp, ppc
from re import compile
import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP
+import scapy.layers.inet6 as inet6
from scapy.layers.inet6 import IPv6, ICMPv6DestUnreach
from scapy.layers.inet6 import IPv6, ICMPv6DestUnreach
from framework import VppTestCase, VppTestRunner
from framework import VppTestCase, VppTestRunner
#
# configure a punt socket
#
#
# configure a punt socket
#
- self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111")
- self.vapi.punt_socket_register(2222, self.tempdir+"/socket_punt_2222")
+ self.vapi.punt_socket_register(1111, b"%s/socket_punt_1111" %
+ six.ensure_binary(self.tempdir))
+ self.vapi.punt_socket_register(2222, b"%s/socket_punt_2222" %
+ six.ensure_binary(self.tempdir))
punts = self.vapi.punt_socket_dump(is_ip6=0)
self.assertEqual(len(punts), 2)
self.assertEqual(punts[0].punt.l4_port, 1111)
punts = self.vapi.punt_socket_dump(is_ip6=0)
self.assertEqual(len(punts), 2)
self.assertEqual(punts[0].punt.l4_port, 1111)
#
# configure a punt socket again
#
#
# configure a punt socket again
#
- self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111")
- self.vapi.punt_socket_register(3333, self.tempdir+"/socket_punt_3333")
+ self.vapi.punt_socket_register(1111, b"%s/socket_punt_1111" %
+ six.ensure_binary(self.tempdir))
+ self.vapi.punt_socket_register(3333, b"%s/socket_punt_3333" %
+ six.ensure_binary(self.tempdir))
punts = self.vapi.punt_socket_dump(is_ip6=0)
self.assertEqual(len(punts), 3)
punts = self.vapi.punt_socket_dump(is_ip6=0)
self.assertEqual(len(punts), 3)
#
# configure a punt socket
#
#
# configure a punt socket
#
- self.socket_client_create(self.tempdir+"/socket_" + str(port))
- self.vapi.punt_socket_register(port, self.tempdir+"/socket_" +
- str(port))
+ self.socket_client_create(b"%s/socket_%d" % (
+ six.ensure_binary(self.tempdir), port))
+ self.vapi.punt_socket_register(port, b"%s/socket_%d" % (
+ six.ensure_binary(self.tempdir), port))
punts = self.vapi.punt_socket_dump(is_ip6=0)
self.assertEqual(len(punts), 1)
punts = self.vapi.punt_socket_dump(is_ip6=0)
self.assertEqual(len(punts), 1)
# configure a punt socket
#
for p in self.ports:
# configure a punt socket
#
for p in self.ports:
- self.socket_client_create(self.tempdir+"/socket_" + str(p))
- self.vapi.punt_socket_register(p, self.tempdir+"/socket_" + str(p))
+ self.socket_client_create(b"%s/socket_%d" % (
+ six.ensure_binary(self.tempdir), p))
+ self.vapi.punt_socket_register(p, b"%s/socket_%d" % (
+ six.ensure_binary(self.tempdir), p))
punts = self.vapi.punt_socket_dump(is_ip6=0)
self.assertEqual(len(punts), len(self.ports))
punts = self.vapi.punt_socket_dump(is_ip6=0)
self.assertEqual(len(punts), len(self.ports))
# configure a punt socket
#
# configure a punt socket
#
- self.socket_client_create(self.tempdir+"/socket_multi")
+ self.socket_client_create(b"%s/socket_multi" %
+ six.ensure_binary(self.tempdir))
- self.vapi.punt_socket_register(p, self.tempdir+"/socket_multi")
+ self.vapi.punt_socket_register(p,
+ b"%s/socket_multi" %
+ six.ensure_binary(self.tempdir))
punts = self.vapi.punt_socket_dump(is_ip6=0)
self.assertEqual(len(punts), len(self.ports))
punts = self.vapi.punt_socket_dump(is_ip6=0)
self.assertEqual(len(punts), len(self.ports))
#
# configure a punt socket
#
#
# configure a punt socket
#
- self.vapi.punt_socket_register(1111, self.tempdir+"/socket_1111",
+ self.vapi.punt_socket_register(1111, b"%s/socket_1111" %
+ six.ensure_binary(self.tempdir),
- self.vapi.punt_socket_register(2222, self.tempdir+"/socket_2222",
+ self.vapi.punt_socket_register(2222, b"%s/socket_2222" %
+ six.ensure_binary(self.tempdir),
is_ip4=0)
punts = self.vapi.punt_socket_dump(is_ip6=1)
self.assertEqual(len(punts), 2)
is_ip4=0)
punts = self.vapi.punt_socket_dump(is_ip6=1)
self.assertEqual(len(punts), 2)
#
# configure a punt socket again
#
#
# configure a punt socket again
#
- self.vapi.punt_socket_register(1111, self.tempdir+"/socket_1111",
+ self.vapi.punt_socket_register(1111, b"%s/socket_1111" %
+ six.ensure_binary(self.tempdir),
is_ip4=0)
punts = self.vapi.punt_socket_dump(is_ip6=1)
self.assertEqual(len(punts), 2)
is_ip4=0)
punts = self.vapi.punt_socket_dump(is_ip6=1)
self.assertEqual(len(punts), 2)
#
# configure a punt socket
#
#
# configure a punt socket
#
- self.socket_client_create(self.tempdir+"/socket_" + str(port))
- self.vapi.punt_socket_register(port, self.tempdir+"/socket_" +
- str(port), is_ip4=0)
+ self.socket_client_create(b"%s/socket_%d" % (
+ six.ensure_binary(self.tempdir), port))
+ self.vapi.punt_socket_register(port, b"%s/socket_%d" % (
+ six.ensure_binary(self.tempdir), port), is_ip4=0)
punts = self.vapi.punt_socket_dump(is_ip6=1)
self.assertEqual(len(punts), 1)
punts = self.vapi.punt_socket_dump(is_ip6=1)
self.assertEqual(len(punts), 1)
# configure a punt socket
#
for p in self.ports:
# configure a punt socket
#
for p in self.ports:
- self.socket_client_create(self.tempdir+"/socket_" + str(p))
- self.vapi.punt_socket_register(p, self.tempdir+"/socket_" + str(p),
- is_ip4=0)
+ self.socket_client_create(b"%s/socket_%d" % (
+ six.ensure_binary(self.tempdir), p))
+ self.vapi.punt_socket_register(p, b"%s/socket_%d" % (
+ six.ensure_binary(self.tempdir), p), is_ip4=0)
punts = self.vapi.punt_socket_dump(is_ip6=1)
self.assertEqual(len(punts), len(self.ports))
punts = self.vapi.punt_socket_dump(is_ip6=1)
self.assertEqual(len(punts), len(self.ports))
#
# configure a punt socket
#
#
# configure a punt socket
#
- self.socket_client_create(self.tempdir+"/socket_multi")
+ self.socket_client_create(b"%s/socket_multi" %
+ six.ensure_binary(self.tempdir))
- self.vapi.punt_socket_register(p, self.tempdir+"/socket_multi",
+ self.vapi.punt_socket_register(p,
+ b"%s/socket_multi" %
+ six.ensure_binary(self.tempdir),
is_ip4=0)
punts = self.vapi.punt_socket_dump(is_ip6=1)
self.assertEqual(len(punts), len(self.ports))
is_ip4=0)
punts = self.vapi.punt_socket_dump(is_ip6=1)
self.assertEqual(len(punts), len(self.ports))
table_id += 1
# Configure namespaces
table_id += 1
# Configure namespaces
- self.vapi.app_namespace_add_del(namespace_id="0",
+ self.vapi.app_namespace_add_del(namespace_id=b"0",
sw_if_index=self.loop0.sw_if_index)
sw_if_index=self.loop0.sw_if_index)
- self.vapi.app_namespace_add_del(namespace_id="1",
+ self.vapi.app_namespace_add_del(namespace_id=b"1",
sw_if_index=self.loop1.sw_if_index)
def tearDown(self):
sw_if_index=self.loop1.sw_if_index)
def tearDown(self):
table_id += 1
# Configure namespaces
table_id += 1
# Configure namespaces
- self.vapi.app_namespace_add_del(namespace_id="0",
+ self.vapi.app_namespace_add_del(namespace_id=b"0",
sw_if_index=self.loop0.sw_if_index)
sw_if_index=self.loop0.sw_if_index)
- self.vapi.app_namespace_add_del(namespace_id="1",
+ self.vapi.app_namespace_add_del(namespace_id=b"1",
sw_if_index=self.loop1.sw_if_index)
def tearDown(self):
sw_if_index=self.loop1.sw_if_index)
def tearDown(self):
table_id += 1
# Configure namespaces
table_id += 1
# Configure namespaces
- self.vapi.app_namespace_add_del(namespace_id="0",
+ self.vapi.app_namespace_add_del(namespace_id=b"0",
sw_if_index=self.loop0.sw_if_index)
sw_if_index=self.loop0.sw_if_index)
- self.vapi.app_namespace_add_del(namespace_id="1",
+ self.vapi.app_namespace_add_del(namespace_id=b"1",
sw_if_index=self.loop1.sw_if_index)
def tearDown(self):
sw_if_index=self.loop1.sw_if_index)
def tearDown(self):
table_id += 1
# Configure namespaces
table_id += 1
# Configure namespaces
- self.vapi.app_namespace_add_del(namespace_id="1", secret=1234,
+ self.vapi.app_namespace_add_del(namespace_id=b"1", secret=1234,
sw_if_index=self.loop0.sw_if_index)
sw_if_index=self.loop0.sw_if_index)
- self.vapi.app_namespace_add_del(namespace_id="2", secret=5678,
+ self.vapi.app_namespace_add_del(namespace_id=b"2", secret=5678,
sw_if_index=self.loop1.sw_if_index)
# Add inter-table routes
sw_if_index=self.loop1.sw_if_index)
# Add inter-table routes
table_id += 1
# Configure namespaces
table_id += 1
# Configure namespaces
- self.vapi.app_namespace_add_del(namespace_id="1", secret=1234,
+ self.vapi.app_namespace_add_del(namespace_id=b"1", secret=1234,
sw_if_index=self.loop0.sw_if_index)
sw_if_index=self.loop0.sw_if_index)
- self.vapi.app_namespace_add_del(namespace_id="2", secret=5678,
+ self.vapi.app_namespace_add_del(namespace_id=b"2", secret=5678,
sw_if_index=self.loop1.sw_if_index)
# Add inter-table routes
sw_if_index=self.loop1.sw_if_index)
# Add inter-table routes
self.logger.info("Vhost User add interfaces")
# create interface 1 (VirtualEthernet0/0/0)
self.logger.info("Vhost User add interfaces")
# create interface 1 (VirtualEthernet0/0/0)
- vhost_if1 = VppVhostInterface(self, sock_filename='/tmp/sock1')
+ vhost_if1 = VppVhostInterface(self, sock_filename=b'/tmp/sock1')
vhost_if1.add_vpp_config()
vhost_if1.admin_up()
# create interface 2 (VirtualEthernet0/0/1)
vhost_if1.add_vpp_config()
vhost_if1.admin_up()
# create interface 2 (VirtualEthernet0/0/1)
- vhost_if2 = VppVhostInterface(self, sock_filename='/tmp/sock2')
+ vhost_if2 = VppVhostInterface(self, sock_filename=b'/tmp/sock2')
vhost_if2.add_vpp_config()
vhost_if2.admin_up()
vhost_if2.add_vpp_config()
vhost_if2.admin_up()
# (like delete interface events from other tests)
self.vapi.collect_events()
# (like delete interface events from other tests)
self.vapi.collect_events()
- vhost_if = VppVhostInterface(self, sock_filename='/tmp/sock1')
+ vhost_if = VppVhostInterface(self, sock_filename=b'/tmp/sock1')
# create vhost interface
vhost_if.add_vpp_config()
# create vhost interface
vhost_if.add_vpp_config()
from vpp_object import VppObject
from vpp_object import VppObject
rv = self._test.vapi.memif_socket_filename_add_del(
1, self.socket_id, self.socket_filename)
if self.add_default_folder:
rv = self._test.vapi.memif_socket_filename_add_del(
1, self.socket_id, self.socket_filename)
if self.add_default_folder:
- self.socket_filename = self._test.tempdir + "/" \
- + self.socket_filename
+ self.socket_filename = b"%s/%s" % (
+ six.ensure_binary(self._test.tempdir, encoding='utf-8'),
+ self.socket_filename)
return rv
def remove_vpp_config(self):
return rv
def remove_vpp_config(self):
'is_add': 1, },
'mpls_tunnel_dump': {'sw_if_index': 4294967295, },
'nat44_add_del_address_range': {'is_add': 1, 'vrf_id': 4294967295, },
'is_add': 1, },
'mpls_tunnel_dump': {'sw_if_index': 4294967295, },
'nat44_add_del_address_range': {'is_add': 1, 'vrf_id': 4294967295, },
- 'nat44_add_del_identity_mapping': {'ip': '0', 'sw_if_index': 4294967295,
+ 'nat44_add_del_identity_mapping': {'ip': b'0', 'sw_if_index': 4294967295,
'addr_only': 1, 'is_add': 1, },
'nat44_add_del_interface_addr': {'is_add': 1, },
'nat44_add_del_lb_static_mapping': {'is_add': 1, },
'addr_only': 1, 'is_add': 1, },
'nat44_add_del_interface_addr': {'is_add': 1, },
'nat44_add_del_lb_static_mapping': {'is_add': 1, },
def nat44_add_del_identity_mapping(
self,
def nat44_add_del_identity_mapping(
self,
sw_if_index=0xFFFFFFFF,
port=0,
addr_only=1,
sw_if_index=0xFFFFFFFF,
port=0,
addr_only=1,