Change-Id: Icdf6abc9e53d33b26fd1d531c7dda6be0bb9cb55
Signed-off-by: Paul Vinciguerra <pvinci@vinciconsulting.com>
27 files changed:
from inspect import getdoc, isclass
from traceback import format_exception
from logging import FileHandler, DEBUG, Formatter
from inspect import getdoc, isclass
from traceback import format_exception
from logging import FileHandler, DEBUG, Formatter
from scapy.packet import Raw
from hook import StepHook, PollHook, VppDiedError
from vpp_pg_interface import VppPGInterface
from scapy.packet import Raw
from hook import StepHook, PollHook, VppDiedError
from vpp_pg_interface import VppPGInterface
if hasattr(cls, 'vpp'):
if hasattr(cls, 'vapi'):
if hasattr(cls, 'vpp'):
if hasattr(cls, 'vapi'):
+ cls.logger.debug("Disconnecting class vapi client on %s",
+ cls.__name__)
+ cls.logger.debug("Deleting class vapi attribute on %s",
+ cls.__name__)
del cls.vapi
cls.vpp.poll()
if cls.vpp.returncode is None:
del cls.vapi
cls.vpp.poll()
if cls.vpp.returncode is None:
cls.vpp.kill()
cls.logger.debug("Waiting for vpp to die")
cls.vpp.communicate()
cls.vpp.kill()
cls.logger.debug("Waiting for vpp to die")
cls.vpp.communicate()
+ cls.logger.debug("Deleting class vpp attribute on %s",
+ cls.__name__)
del cls.vpp
if cls.vpp_startup_failed:
del cls.vpp
if cls.vpp_startup_failed:
def tearDown(self):
""" Show various debug prints after each test """
def tearDown(self):
""" Show various debug prints after each test """
- super(VppTestCase, self).tearDown()
self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
(self.__class__.__name__, self._testMethodName,
self._testMethodDoc))
self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
(self.__class__.__name__, self._testMethodName,
self._testMethodDoc))
def assert_packet_checksums_valid(self, packet,
ignore_zero_udp_checksums=True):
def assert_packet_checksums_valid(self, packet,
ignore_zero_udp_checksums=True):
- received = packet.__class__(str(packet))
+ received = packet.__class__(scapy.compat.raw(packet))
self.logger.debug(
ppp("Verifying packet checksums for packet:", received))
udp_layers = ['UDP', 'UDPerror']
checksum_fields = ['cksum', 'chksum']
checksums = []
counter = 0
self.logger.debug(
ppp("Verifying packet checksums for packet:", received))
udp_layers = ['UDP', 'UDPerror']
checksum_fields = ['cksum', 'chksum']
checksums = []
counter = 0
- temp = received.__class__(str(received))
+ temp = received.__class__(scapy.compat.raw(received))
while True:
layer = temp.getlayer(counter)
if layer:
while True:
layer = temp.getlayer(counter)
if layer:
counter = counter + 1
if 0 == len(checksums):
return
counter = counter + 1
if 0 == len(checksums):
return
- temp = temp.__class__(str(temp))
+ temp = temp.__class__(scapy.compat.raw(temp))
for layer, cf in checksums:
calc_sum = getattr(temp[layer], cf)
self.assert_equal(
for layer, cf in checksums:
calc_sum = getattr(temp[layer], cf)
self.assert_equal(
received_packet_checksum = getattr(received_packet[layer], field_name)
if ignore_zero_checksum and 0 == received_packet_checksum:
return
received_packet_checksum = getattr(received_packet[layer], field_name)
if ignore_zero_checksum and 0 == received_packet_checksum:
return
- recalculated = received_packet.__class__(str(received_packet))
+ recalculated = received_packet.__class__(
+ scapy.compat.raw(received_packet))
delattr(recalculated[layer], field_name)
delattr(recalculated[layer], field_name)
- recalculated = recalculated.__class__(str(recalculated))
+ recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
self.assert_equal(received_packet_checksum,
getattr(recalculated[layer], field_name),
"packet checksum on layer: %s" % layer)
self.assert_equal(received_packet_checksum,
getattr(recalculated[layer], field_name),
"packet checksum on layer: %s" % layer)
from log import RED, single_line_delim, double_line_delim
import ipaddress
from subprocess import check_output, CalledProcessError
from log import RED, single_line_delim, double_line_delim
import ipaddress
from subprocess import check_output, CalledProcessError
from util import check_core_path, get_core_path
from util import check_core_path, get_core_path
return val
if len(val) == 6:
return '{!s} ({!s})'.format(val, ':'.join(['{:02x}'.format(
return val
if len(val) == 6:
return '{!s} ({!s})'.format(val, ':'.join(['{:02x}'.format(
- ord(x)) for x in val]))
+ scapy.compat.orb(x)) for x in val]))
try:
# we don't call test_type(val) because it is a packed value.
return '{!s} ({!s})'.format(val, str(
try:
# we don't call test_type(val) because it is a packed value.
return '{!s} ({!s})'.format(val, str(
from random import choice, shuffle
from pprint import pprint
from random import choice, shuffle
from pprint import pprint
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP, TCP
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP, TCP
# Scapy IPv6 stuff is too smart for its own good.
# So we do this and coerce the ICMP into unknown type
if packet.haslayer(UDP):
# Scapy IPv6 stuff is too smart for its own good.
# So we do this and coerce the ICMP into unknown type
if packet.haslayer(UDP):
- data = str(packet[UDP][Raw])
+ data = scapy.compat.raw(packet[UDP][Raw])
- data = str(ICMP(str(packet[l3].payload))[Raw])
+ data = scapy.compat.raw(ICMP(
+ scapy.compat.raw(packet[l3].payload))[Raw])
- data = str(ICMPv6Unknown(str(packet[l3].payload)).msgbody)
+ data = scapy.compat.raw(ICMPv6Unknown(
+ scapy.compat.raw(packet[l3].payload)).msgbody)
udp_or_icmp = packet[l3].payload
payload_info = self.payload_to_info(data)
packet_index = payload_info.index
udp_or_icmp = packet[l3].payload
payload_info = self.payload_to_info(data)
packet_index = payload_info.index
"""ACL plugin - MACIP tests
"""
import binascii
"""ACL plugin - MACIP tests
"""
import binascii
import random
from socket import inet_ntop, inet_pton, AF_INET, AF_INET6
from struct import pack, unpack
import re
import unittest
import random
from socket import inet_ntop, inet_pton, AF_INET, AF_INET6
from struct import pack, unpack
import re
import unittest
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
return acls
def create_rules(self, mac_type=EXACT_MAC, ip_type=EXACT_IP,
return acls
def create_rules(self, mac_type=EXACT_MAC, ip_type=EXACT_IP,
- acl_count=1, rules_count=[1]):
+ acl_count=1, rules_count=None):
+ if rules_count is None:
+ rules_count = [1]
src_mac = int("220000dead00", 16)
for acl in range(2, (acl_count+1) * 2):
rules = []
src_mac = int("220000dead00", 16)
for acl in range(2, (acl_count+1) * 2):
rules = []
ip4[3] = 0
ip6[8] = random.randint(100, 200)
ip6[15] = 0
ip4[3] = 0
ip6[8] = random.randint(100, 200)
ip6[15] = 0
for j in range(0, len(ip)):
ip_pack += pack('<B', int(ip[j]))
for j in range(0, len(ip)):
ip_pack += pack('<B', int(ip[j]))
'is_ipv6': is_ip6,
'src_ip_addr': ip_pack,
'src_ip_prefix_len': ip_len,
'is_ipv6': is_ip6,
'src_ip_addr': ip_pack,
'src_ip_prefix_len': ip_len,
- 'src_mac': mac.replace(':', '').decode('hex'),
- 'src_mac_mask': mask.replace(':', '').decode('hex')})
+ 'src_mac': binascii.unhexlify(mac.replace(':', '')),
+ 'src_mac_mask': binascii.unhexlify(
+ mask.replace(':', ''))})
rules.append(rule)
if ip_type == self.WILD_IP:
break
rules.append(rule)
if ip_type == self.WILD_IP:
break
def verify_macip_acls(self, acl_count, rules_count, expected_count=2):
reply = self.macip_acl_dump_debug()
for acl in range(2, (acl_count+1) * 2):
def verify_macip_acls(self, acl_count, rules_count, expected_count=2):
reply = self.macip_acl_dump_debug()
for acl in range(2, (acl_count+1) * 2):
- self.assertEqual(reply[acl - 2].count, rules_count[acl/2-1])
+ self.assertEqual(reply[acl - 2].count, rules_count[acl//2-1])
self.vapi.macip_acl_interface_get()
self.vapi.macip_acl_interface_get()
sub_ip[15] = random.randint(200, 255)
elif ip_type == self.SUBNET_IP:
if denyIP:
sub_ip[15] = random.randint(200, 255)
elif ip_type == self.SUBNET_IP:
if denyIP:
- sub_ip[2] = str(int(sub_ip[2]) + 1)
+ sub_ip[2] = int(sub_ip[2]) + 1
sub_ip[14] = random.randint(100, 199)
sub_ip[15] = random.randint(200, 255)
sub_ip[14] = random.randint(100, 199)
sub_ip[15] = random.randint(200, 255)
- src_ip6 = inet_ntop(AF_INET6, str(bytearray(sub_ip)))
+ packed_src_ip6 = b''.join(
+ [scapy.compat.chb(x) for x in sub_ip])
+ src_ip6 = inet_ntop(AF_INET6, packed_src_ip6)
packet /= IPv6(src=src_ip6, dst=dst_ip6)
else:
if ip_type != self.EXACT_IP:
sub_ip = ip_rule.split('.')
if ip_type == self.WILD_IP:
packet /= IPv6(src=src_ip6, dst=dst_ip6)
else:
if ip_type != self.EXACT_IP:
sub_ip = ip_rule.split('.')
if ip_type == self.WILD_IP:
- sub_ip[0] = str(random.randint(1, 49))
- sub_ip[1] = str(random.randint(50, 99))
- sub_ip[2] = str(random.randint(100, 199))
- sub_ip[3] = str(random.randint(200, 255))
+ sub_ip[0] = random.randint(1, 49)
+ sub_ip[1] = random.randint(50, 99)
+ sub_ip[2] = random.randint(100, 199)
+ sub_ip[3] = random.randint(200, 255)
elif ip_type == self.SUBNET_IP:
if denyIP:
elif ip_type == self.SUBNET_IP:
if denyIP:
- sub_ip[1] = str(int(sub_ip[1])+1)
- sub_ip[2] = str(random.randint(100, 199))
- sub_ip[3] = str(random.randint(200, 255))
- src_ip4 = ".".join(sub_ip)
+ sub_ip[1] = int(sub_ip[1])+1
+ sub_ip[2] = random.randint(100, 199)
+ sub_ip[3] = random.randint(200, 255)
+ src_ip4 = '.'.join(['{!s}'.format(x) for x in sub_ip])
packet /= IP(src=src_ip4, dst=dst_ip4, frag=0, flags=0)
packet /= UDP(sport=src_port, dport=dst_port)/Raw(payload)
packet /= IP(src=src_ip4, dst=dst_ip4, frag=0, flags=0)
packet /= UDP(sport=src_port, dport=dst_port)/Raw(payload)
- packet[Raw].load += " mac:"+src_mac
+ packet[Raw].load += b" mac:%s" % scapy.compat.raw(src_mac)
size = self.pg_if_packet_sizes[p % len(self.pg_if_packet_sizes)]
if isinstance(src_if, VppSubInterface):
size = self.pg_if_packet_sizes[p % len(self.pg_if_packet_sizes)]
if isinstance(src_if, VppSubInterface):
sub_ip = list(unpack('<16B', inet_pton(AF_INET6, ip)))
for i in range(8, 16):
sub_ip[i] = 0
sub_ip = list(unpack('<16B', inet_pton(AF_INET6, ip)))
for i in range(8, 16):
sub_ip[i] = 0
- ip = inet_ntop(AF_INET6, str(bytearray(sub_ip)))
+ packed_ip = b''.join(
+ [scapy.compat.chb(x) for x in sub_ip])
+ ip = inet_ntop(AF_INET6, packed_ip)
else:
if ip_type == self.WILD_IP:
ip = "0.0.0.0"
else:
if ip_type == self.WILD_IP:
ip = "0.0.0.0"
'is_ipv6': is_ip6,
'src_ip_addr': ip_rule,
'src_ip_prefix_len': prefix_len,
'is_ipv6': is_ip6,
'src_ip_addr': ip_rule,
'src_ip_prefix_len': prefix_len,
- 'src_mac': mac_rule.replace(':', '').decode('hex'),
- 'src_mac_mask': mac_mask.replace(':', '').decode('hex')})
+ 'src_mac': binascii.unhexlify(mac_rule.replace(':', '')),
+ 'src_mac_mask': binascii.unhexlify(
+ mac_mask.replace(':', ''))})
macip_rules.append(macip_rule)
# deny all other packets
macip_rules.append(macip_rule)
# deny all other packets
from struct import pack, unpack
from six import moves
from struct import pack, unpack
from six import moves
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
while conf_key_id in self._conf_key_ids:
conf_key_id = randint(0, 0xFFFFFFFF)
self._conf_key_ids[conf_key_id] = 1
while conf_key_id in self._conf_key_ids:
conf_key_id = randint(0, 0xFFFFFFFF)
self._conf_key_ids[conf_key_id] = 1
- key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
+ key = scapy.compat.raw(
+ bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
return VppBFDAuthKey(test=test, auth_type=auth_type,
conf_key_id=conf_key_id, key=key)
return VppBFDAuthKey(test=test, auth_type=auth_type,
conf_key_id=conf_key_id, key=key)
self.test.logger.debug("BFD: Creating packet")
self.fill_packet_fields(packet)
if self.sha1_key:
self.test.logger.debug("BFD: Creating packet")
self.fill_packet_fields(packet)
if self.sha1_key:
- hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
+ hash_material = scapy.compat.raw(
+ packet[BFD])[:32] + self.sha1_key.key + \
"\0" * (20 - len(self.sha1_key.key))
self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
hashlib.sha1(hash_material).hexdigest())
"\0" * (20 - len(self.sha1_key.key))
self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
hashlib.sha1(hash_material).hexdigest())
# last 20 bytes represent the hash - so replace them with the key,
# pad the result with zeros and hash the result
hash_material = bfd.original[:-20] + self.sha1_key.key + \
# last 20 bytes represent the hash - so replace them with the key,
# pad the result with zeros and hash the result
hash_material = bfd.original[:-20] + self.sha1_key.key + \
- "\0" * (20 - len(self.sha1_key.key))
+ b"\0" * (20 - len(self.sha1_key.key))
expected_hash = hashlib.sha1(hash_material).hexdigest()
self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
expected_hash, "Auth key hash")
expected_hash = hashlib.sha1(hash_material).hexdigest()
self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
expected_hash, "Auth key hash")
# halve required min rx
old_required_min_rx = self.vpp_session.required_min_rx
self.vpp_session.modify_parameters(
# halve required min rx
old_required_min_rx = self.vpp_session.required_min_rx
self.vpp_session.modify_parameters(
- required_min_rx=0.5 * self.vpp_session.required_min_rx)
+ required_min_rx=self.vpp_session.required_min_rx // 2)
# now we wait 0.8*3*old-req-min-rx and the session should still be up
self.sleep(0.8 * self.vpp_session.detect_mult *
old_required_min_rx / USEC_IN_SEC,
# now we wait 0.8*3*old-req-min-rx and the session should still be up
self.sleep(0.8 * self.vpp_session.detect_mult *
old_required_min_rx / USEC_IN_SEC,
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
- self.assertEqual(str(p[UDP].payload),
- str(echo_packet[UDP].payload),
+ self.assertEqual(scapy.compat.raw(p[UDP].payload),
+ scapy.compat.raw(echo_packet[UDP].payload),
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
- self.assertEqual(str(p[UDP].payload),
- str(echo_packet[UDP].payload),
+ self.assertEqual(scapy.compat.raw(p[UDP].payload),
+ scapy.compat.raw(echo_packet[UDP].payload),
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
def cli_verify_no_response(self, cli):
""" execute a CLI, asserting that the response is empty """
self.assert_equal(self.vapi.cli(cli),
def cli_verify_no_response(self, cli):
""" execute a CLI, asserting that the response is empty """
self.assert_equal(self.vapi.cli(cli),
"CLI command response")
def cli_verify_response(self, cli, expected):
"CLI command response")
def cli_verify_response(self, cli, expected):
self.cli_verify_no_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
self.cli_verify_no_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(ord(c)) for c in k.key)))
+ "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(
self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(
self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(ord(c)) for c in k2.key)),
+ "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work
self.cli_verify_no_response(
"bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
(k.conf_key_id,
self.cli_verify_no_response(
"bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(ord(c)) for c in k.key)))
+ "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(self, self.pg0,
self.pg0.remote_ip6, af=AF_INET6,
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(self, self.pg0,
self.pg0.remote_ip6, af=AF_INET6,
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(ord(c)) for c in k2.key)),
+ "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work
VppBierDispTable, VppBierTable, VppBierTableID, VppBierRoute
from vpp_udp_encap import VppUdpEncap
VppBierDispTable, VppBierTable, VppBierTableID, VppBierRoute
from vpp_udp_encap import VppUdpEncap
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
for pkt_size in pkt_sizes:
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
MPLS(label=77, ttl=255) /
for pkt_size in pkt_sizes:
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
MPLS(label=77, ttl=255) /
- BIER(length=hdr_len_id, BitString=chr(255)*n_bytes) /
+ BIER(length=hdr_len_id,
+ BitString=scapy.compat.chb(255)*n_bytes) /
IPv6(src=self.pg0.remote_ip6, dst=self.pg0.remote_ip6) /
UDP(sport=1234, dport=1234) /
IPv6(src=self.pg0.remote_ip6, dst=self.pg0.remote_ip6) /
UDP(sport=1234, dport=1234) /
- Raw(chr(5) * pkt_size))
+ Raw(scapy.compat.chb(5) * pkt_size))
pkts = p
self.pg0.add_stream(pkts)
pkts = p
self.pg0.add_stream(pkts)
self.assertEqual(bier_hdr.Proto, 5)
# The bit-string should consist only of the BP given by i.
self.assertEqual(bier_hdr.Proto, 5)
# The bit-string should consist only of the BP given by i.
- byte_array = ['\0'] * (n_bytes)
- byte_val = chr(1 << (bp - 1) % 8)
- byte_pos = n_bytes - (((bp - 1) / 8) + 1)
+ byte_array = [b'\0'] * (n_bytes)
+ byte_val = scapy.compat.chb(1 << (bp - 1) % 8)
+ byte_pos = n_bytes - (((bp - 1) // 8) + 1)
byte_array[byte_pos] = byte_val
byte_array[byte_pos] = byte_val
- bitstring = ''.join(byte_array)
+ bitstring = ''.join([scapy.compat.chb(x) for x in byte_array])
self.assertEqual(len(bitstring), len(bier_hdr.BitString))
self.assertEqual(bitstring, bier_hdr.BitString)
#
# cleanup. not strictly necessary, but it's much quicker this way
self.assertEqual(len(bitstring), len(bier_hdr.BitString))
self.assertEqual(bitstring, bier_hdr.BitString)
#
# cleanup. not strictly necessary, but it's much quicker this way
- # becuase the bier_fib_dump and ip_fib_dump will be empty when the
+ # because the bier_fib_dump and ip_fib_dump will be empty when the
# auto-cleanup kicks in
#
for br in bier_routes:
# auto-cleanup kicks in
#
for br in bier_routes:
MPLS(label=77, ttl=255) /
BIER(length=BIERLength.BIER_LEN_64,
entropy=ii,
MPLS(label=77, ttl=255) /
BIER(length=BIERLength.BIER_LEN_64,
entropy=ii,
- BitString=chr(255)*16) /
+ BitString=scapy.compat.chb(255)*16) /
IPv6(src=self.pg0.remote_ip6,
dst=self.pg0.remote_ip6) /
UDP(sport=1234, dport=1234) /
IPv6(src=self.pg0.remote_ip6,
dst=self.pg0.remote_ip6) /
UDP(sport=1234, dport=1234) /
#
# An imposition object with both bit-positions set
#
#
# An imposition object with both bit-positions set
#
- bi = VppBierImp(self, bti, 333, chr(0x3) * 32)
+ bi = VppBierImp(self, bti, 333, scapy.compat.chb(0x3) * 32)
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
MPLS(label=77, ttl=255) /
BIER(length=BIERLength.BIER_LEN_256,
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
MPLS(label=77, ttl=255) /
BIER(length=BIERLength.BIER_LEN_256,
+ BitString=scapy.compat.chb(255)*32,
BFRID=99) /
IP(src="1.1.1.1", dst="232.1.1.1") /
UDP(sport=1234, dport=1234) /
BFRID=99) /
IP(src="1.1.1.1", dst="232.1.1.1") /
UDP(sport=1234, dport=1234) /
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
MPLS(label=77, ttl=255) /
BIER(length=BIERLength.BIER_LEN_256,
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
MPLS(label=77, ttl=255) /
BIER(length=BIERLength.BIER_LEN_256,
+ BitString=scapy.compat.chb(255)*32,
BFRID=77) /
IP(src="1.1.1.1", dst="232.1.1.1") /
UDP(sport=1234, dport=1234) /
BFRID=77) /
IP(src="1.1.1.1", dst="232.1.1.1") /
UDP(sport=1234, dport=1234) /
# A multicast route to forward post BIER disposition that needs
# a check against sending back into the BIER core
#
# A multicast route to forward post BIER disposition that needs
# a check against sending back into the BIER core
#
- bi = VppBierImp(self, bti, 333, chr(0x3) * 32)
+ bi = VppBierImp(self, bti, 333, scapy.compat.chb(0x3) * 32)
bi.add_vpp_config()
route_eg_232_1_1_2 = VppIpMRoute(
bi.add_vpp_config()
route_eg_232_1_1_2 = VppIpMRoute(
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
MPLS(label=77, ttl=255) /
BIER(length=BIERLength.BIER_LEN_256,
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
MPLS(label=77, ttl=255) /
BIER(length=BIERLength.BIER_LEN_256,
+ BitString=scapy.compat.chb(255)*32,
BFRID=77) /
IP(src="1.1.1.1", dst="232.1.1.2") /
UDP(sport=1234, dport=1234) /
BFRID=77) /
IP(src="1.1.1.1", dst="232.1.1.2") /
UDP(sport=1234, dport=1234) /
bt = VppBierTable(self, bti, 77)
bt.add_vpp_config()
bt = VppBierTable(self, bti, 77)
bt.add_vpp_config()
- lowest = ['\0'] * (n_bytes)
- lowest[-1] = chr(1)
- highest = ['\0'] * (n_bytes)
- highest[0] = chr(128)
+ lowest = [b'\0'] * (n_bytes)
+ lowest[-1] = scapy.compat.chb(1)
+ highest = [b'\0'] * (n_bytes)
+ highest[0] = scapy.compat.chb(128)
#
# Impostion Sets bit strings
#
# Impostion Sets bit strings
src=self.pg0.remote_mac) /
IP(src="1.1.1.1", dst="232.1.1.1") /
UDP(sport=1234, dport=1234) /
src=self.pg0.remote_mac) /
IP(src="1.1.1.1", dst="232.1.1.1") /
UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(5) * 32))
rx = self.send_and_expect(self.pg0, p*65, self.pg1)
rx = self.send_and_expect(self.pg0, p*65, self.pg1)
src=self.pg0.remote_mac) /
IP(src="1.1.1.1", dst="232.1.1.2") /
UDP(sport=1234, dport=1234) /
src=self.pg0.remote_mac) /
IP(src="1.1.1.1", dst="232.1.1.2") /
UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(5) * 512))
rx = self.send_and_expect(self.pg0, p*65, self.pg1)
self.assertEqual(rx[0][IP].src, "1.1.1.1")
rx = self.send_and_expect(self.pg0, p*65, self.pg1)
self.assertEqual(rx[0][IP].src, "1.1.1.1")
# only use the second, but creating 2 tests with a non-zero
# value index in the route add
#
# only use the second, but creating 2 tests with a non-zero
# value index in the route add
#
- bi = VppBierImp(self, bti, 333, chr(0xff) * 32)
+ bi = VppBierImp(self, bti, 333, scapy.compat.chb(0xff) * 32)
- bi2 = VppBierImp(self, bti, 334, chr(0xff) * 32)
+ bi2 = VppBierImp(self, bti, 334, scapy.compat.chb(0xff) * 32)
UDP(sport=333, dport=8138) /
BIFT(sd=1, set=0, bsl=2, ttl=255) /
BIER(length=BIERLength.BIER_LEN_256,
UDP(sport=333, dport=8138) /
BIFT(sd=1, set=0, bsl=2, ttl=255) /
BIER(length=BIERLength.BIER_LEN_256,
+ BitString=scapy.compat.chb(255)*32,
BFRID=99) /
IP(src="1.1.1.1", dst="232.1.1.1") /
UDP(sport=1234, dport=1234) /
BFRID=99) /
IP(src="1.1.1.1", dst="232.1.1.1") /
UDP(sport=1234, dport=1234) /
super(TestCDP, cls).tearDownClass()
raise
super(TestCDP, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestCDP, cls).tearDownClass()
+
def test_enable_cdp(self):
self.logger.info(self.vapi.cli("cdp enable"))
ret = self.vapi.cli("show cdp")
def test_enable_cdp(self):
self.logger.info(self.vapi.cli("cdp enable"))
ret = self.vapi.cli("show cdp")
from vpp_neighbor import VppNeighbor
from vpp_ip_route import find_route, VppIpTable
from util import mk_ll_addr
from vpp_neighbor import VppNeighbor
from vpp_ip_route import find_route, VppIpTable
from util import mk_ll_addr
from scapy.layers.l2 import Ether, getmacbyip, ARP
from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6, in6_getnsmac
from scapy.layers.l2 import Ether, getmacbyip, ARP
from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6, in6_getnsmac
#
# 1. not our IP address = not checked by VPP? so offer is replayed
# to client
#
# 1. not our IP address = not checked by VPP? so offer is replayed
# to client
- bad_ip = option_82[0:8] + chr(33) + option_82[9:]
+ bad_ip = option_82[0:8] + scapy.compat.chb(33) + option_82[9:]
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
"DHCP offer option 82 bad address")
# 2. Not a sw_if_index VPP knows
"DHCP offer option 82 bad address")
# 2. Not a sw_if_index VPP knows
- bad_if_index = option_82[0:2] + chr(33) + option_82[3:]
+ bad_if_index = option_82[0:2] + scapy.compat.chb(33) + option_82[3:]
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
vx_tun_l3.add_vpp_config()
#
vx_tun_l3.add_vpp_config()
#
- # packets destined to unkown addresses in the BVI's subnet
+ # packets destined to unknown addresses in the BVI's subnet
# are ARP'd for
#
p4 = (Ether(src=self.pg0.remote_mac, dst=str(self.router_mac)) /
# are ARP'd for
#
p4 = (Ether(src=self.pg0.remote_mac, dst=str(self.router_mac)) /
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q, GRE
from scapy.layers.inet import IP, UDP
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q, GRE
from scapy.layers.inet import IP, UDP
GRE() /
Ether(dst=RandMAC('*:*:*:*:*:*'),
src=RandMAC('*:*:*:*:*:*')) /
GRE() /
Ether(dst=RandMAC('*:*:*:*:*:*'),
src=RandMAC('*:*:*:*:*:*')) /
- IP(src=str(RandIP()), dst=str(RandIP())) /
+ IP(src=scapy.compat.raw(RandIP()),
+ dst=scapy.compat.raw(RandIP())) /
UDP(sport=1234, dport=1234) /
Raw(payload))
info.data = p.copy()
UDP(sport=1234, dport=1234) /
Raw(payload))
info.data = p.copy()
Ether(dst=RandMAC('*:*:*:*:*:*'),
src=RandMAC('*:*:*:*:*:*')) /
Dot1Q(vlan=vlan) /
Ether(dst=RandMAC('*:*:*:*:*:*'),
src=RandMAC('*:*:*:*:*:*')) /
Dot1Q(vlan=vlan) /
- IP(src=str(RandIP()), dst=str(RandIP())) /
+ IP(src=scapy.compat.raw(RandIP()),
+ dst=scapy.compat.raw(RandIP())) /
UDP(sport=1234, dport=1234) /
Raw(payload))
info.data = p.copy()
UDP(sport=1234, dport=1234) /
Raw(payload))
info.data = p.copy()
self.assertEqual(rx_ip.src, tunnel_src)
self.assertEqual(rx_ip.dst, tunnel_dst)
self.assertEqual(rx_ip.src, tunnel_src)
self.assertEqual(rx_ip.dst, tunnel_dst)
- rx_gre = GRE(str(rx_ip[IPv6].payload))
+ rx_gre = GRE(scapy.compat.raw(rx_ip[IPv6].payload))
rx_ip = rx_gre[IPv6]
self.assertEqual(rx_ip.src, tx_ip.src)
rx_ip = rx_gre[IPv6]
self.assertEqual(rx_ip.src, tx_ip.src)
self.assertEqual(rx_ip.src, tunnel_src)
self.assertEqual(rx_ip.dst, tunnel_dst)
self.assertEqual(rx_ip.src, tunnel_src)
self.assertEqual(rx_ip.dst, tunnel_dst)
- rx_gre = GRE(str(rx_ip[IPv6].payload))
+ rx_gre = GRE(scapy.compat.raw(rx_ip[IPv6].payload))
tx_ip = tx[IP]
rx_ip = rx_gre[IP]
tx_ip = tx[IP]
rx_ip = rx_gre[IP]
self.assertEqual(rx_ip.src, tunnel_src)
self.assertEqual(rx_ip.dst, tunnel_dst)
self.assertEqual(rx_ip.src, tunnel_src)
self.assertEqual(rx_ip.dst, tunnel_dst)
- rx_gre = GRE(str(rx_ip[IP].payload))
+ rx_gre = GRE(scapy.compat.raw(rx_ip[IP].payload))
rx_ip = rx_gre[IPv6]
tx_ip = tx[IPv6]
rx_ip = rx_gre[IPv6]
tx_ip = tx[IPv6]
import socket
import unittest
import socket
import unittest
from scapy.contrib.mpls import MPLS
from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
from scapy.layers.l2 import Ether, Dot1Q, ARP
from scapy.contrib.mpls import MPLS
from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
from scapy.layers.l2 import Ether, Dot1Q, ARP
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
- n_dest_addr = '{:08x}'.format(dest_addr).decode('hex')
+ n_dest_addr = binascii.unhexlify('{:08x}'.format(dest_addr))
self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len,
n_next_hop_addr)
added_ips.append(socket.inet_ntoa(n_dest_addr))
self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len,
n_next_hop_addr)
added_ips.append(socket.inet_ntoa(n_dest_addr))
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
- n_dest_addr = '{:08x}'.format(dest_addr).decode('hex')
+ n_dest_addr = binascii.unhexlify('{:08x}'.format(dest_addr))
self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len,
n_next_hop_addr, is_add=0)
removed_ips.append(socket.inet_ntoa(n_dest_addr))
self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len,
n_next_hop_addr, is_add=0)
removed_ips.append(socket.inet_ntoa(n_dest_addr))
import random
import socket
import random
import socket
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ARP
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ARP
if found:
break
for host in pg_if.remote_hosts:
if found:
break
for host in pg_if.remote_hosts:
- if str(addr) == str(host.ip4):
+ if scapy.compat.raw(addr) == \
+ scapy.compat.raw(host.ip4):
vrf_count += 1
found = True
break
vrf_count += 1
found = True
break
import unittest
from parameterized import parameterized
import unittest
from parameterized import parameterized
import scapy.layers.inet6 as inet6
from scapy.contrib.mpls import MPLS
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
import scapy.layers.inet6 as inet6
from scapy.contrib.mpls import MPLS
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
from scapy.layers.l2 import Ether
from scapy.layers.inet import ICMP, IP, TCP, UDP
from scapy.layers.ipsec import SecurityAssociation, ESP
from scapy.layers.l2 import Ether
from scapy.layers.inet import ICMP, IP, TCP, UDP
from scapy.layers.ipsec import SecurityAssociation, ESP
from util import ppp, ppc
from template_ipsec import TemplateIpsec
from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
from util import ppp, ppc
from template_ipsec import TemplateIpsec
from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
def verify_capture_encrypted(self, capture, sa):
for packet in capture:
try:
def verify_capture_encrypted(self, capture, sa):
for packet in capture:
try:
- copy = packet.__class__(str(packet))
+ copy = packet.__class__(scapy.compat.raw(packet))
- copy = packet.__class__(str(copy))
+ copy = packet.__class__(scapy.compat.raw(copy))
self.assert_equal(packet[UDP].len, copy[UDP].len,
"UDP header length")
self.assert_packet_checksums_valid(packet)
self.assert_equal(packet[UDP].len, copy[UDP].len,
"UDP header length")
self.assert_packet_checksums_valid(packet)
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether, GRE
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether, GRE
self.assertEqual(gre.proto, 0x0800 if isv4 else 0x86DD)
self.assertEqual(gre.flags, 0)
self.assertEqual(gre.version, 0)
self.assertEqual(gre.proto, 0x0800 if isv4 else 0x86DD)
self.assertEqual(gre.flags, 0)
self.assertEqual(gre.version, 0)
- inner = IPver(str(gre.payload))
+ inner = IPver(scapy.compat.raw(gre.payload))
payload_info = self.payload_to_info(inner[Raw])
self.info = self.packet_infos[payload_info.index]
self.assertEqual(payload_info.src, self.pg0.sw_if_index)
payload_info = self.payload_to_info(inner[Raw])
self.info = self.packet_infos[payload_info.index]
self.assertEqual(payload_info.src, self.pg0.sw_if_index)
- self.assertEqual(str(inner), str(self.info.data[IPver]))
+ self.assertEqual(scapy.compat.raw(inner),
+ scapy.compat.raw(self.info.data[IPver]))
def checkCapture(self, encap, isv4):
self.pg0.assert_nothing_captured()
def checkCapture(self, encap, isv4):
self.pg0.assert_nothing_captured()
)
self.assertEqual(ip.nh, 47)
# self.assertEqual(len(ip.options), 0)
)
self.assertEqual(ip.nh, 47)
# self.assertEqual(len(ip.options), 0)
- gre = GRE(str(p[IPv6].payload))
+ gre = GRE(scapy.compat.raw(p[IPv6].payload))
self.checkInner(gre, isv4)
elif (encap == 'l3dsr'):
ip = p[IP]
self.checkInner(gre, isv4)
elif (encap == 'l3dsr'):
ip = p[IP]
)
self.assertEqual(ip.nh, 17)
self.assertGreaterEqual(ip.hlim, 63)
)
self.assertEqual(ip.nh, 17)
self.assertGreaterEqual(ip.hlim, 63)
- udp = UDP(str(p[IPv6].payload))
+ udp = UDP(scapy.compat.raw(p[IPv6].payload))
self.assertEqual(udp.dport, 3307)
load[asid] += 1
except:
self.assertEqual(udp.dport, 3307)
load[asid] += 1
except:
from framework import VppTestCase, VppTestRunner
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath
from framework import VppTestCase, VppTestRunner
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath
from scapy.layers.l2 import Ether, Raw
from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
from scapy.layers.l2 import Ether, Raw
from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
def validate(self, rx, expected):
self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
def validate(self, rx, expected):
- self.assertEqual(rx, expected.__class__(str(expected)))
+ self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
def payload(self, len):
return 'x' * len
def payload(self, len):
return 'x' * len
p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
dst="2001:db8:1f0::c0a8:1:f") / payload)
p6_translated.hlim -= 1
p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
dst="2001:db8:1f0::c0a8:1:f") / payload)
p6_translated.hlim -= 1
- p6_translated['TCP'].options = [('MSS', 1300)]
+ p6_translated[TCP].options = [('MSS', 1300)]
rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
for p in rx:
self.validate(p[1], p6_translated)
rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
for p in rx:
self.validate(p[1], p6_translated)
dst=self.pg0.remote_ip4) / payload)
p4_translated.id = 0
p4_translated.ttl -= 1
dst=self.pg0.remote_ip4) / payload)
p4_translated.id = 0
p4_translated.ttl -= 1
- p4_translated['TCP'].options = [('MSS', 1300)]
+ p4_translated[TCP].options = [('MSS', 1300)]
rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
for p in rx:
self.validate(p[1], p4_translated)
rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
for p in rx:
self.validate(p[1], p4_translated)
VppMplsLabel, MplsLspMode, find_mpls_route
from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
VppMplsLabel, MplsLspMode, find_mpls_route
from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP
verify_mpls_stack(self, rx, mpls_labels)
tx_eth = tx[Ether]
verify_mpls_stack(self, rx, mpls_labels)
tx_eth = tx[Ether]
- rx_eth = Ether(str(rx[MPLS].payload))
+ rx_eth = Ether(scapy.compat.raw(rx[MPLS].payload))
self.assertEqual(rx_eth.src, tx_eth.src)
self.assertEqual(rx_eth.dst, tx_eth.dst)
self.assertEqual(rx_eth.src, tx_eth.src)
self.assertEqual(rx_eth.dst, tx_eth.dst)
import random
from framework import VppTestCase, VppTestRunner, running_extended_tests
import random
from framework import VppTestCase, VppTestRunner, running_extended_tests
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
pref_n[13] = ip4_n[1]
pref_n[14] = ip4_n[2]
pref_n[15] = ip4_n[3]
pref_n[13] = ip4_n[1]
pref_n[14] = ip4_n[2]
pref_n[15] = ip4_n[3]
- return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
+ packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
+ return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
def extract_ip4(self, ip6, plen):
"""
def extract_ip4(self, ip6, plen):
"""
p = (IP(src=src_if.remote_ip4, dst=dst) /
TCP(sport=sport, dport=dport) /
Raw(data))
p = (IP(src=src_if.remote_ip4, dst=dst) /
TCP(sport=sport, dport=dport) /
Raw(data))
- p = p.__class__(str(p))
- chksum = p['TCP'].chksum
+ p = p.__class__(scapy.compat.raw(p))
+ chksum = p[TCP].chksum
proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
elif proto == IP_PROTOS.udp:
proto_header = UDP(sport=sport, dport=dport)
proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
elif proto == IP_PROTOS.udp:
proto_header = UDP(sport=sport, dport=dport)
self.assertEqual(6, len(data))
for record in data:
# natEvent
self.assertEqual(6, len(data))
for record in data:
# natEvent
- self.assertIn(ord(record[230]), [4, 5])
- if ord(record[230]) == 4:
+ self.assertIn(scapy.compat.orb(record[230]), [4, 5])
+ if scapy.compat.orb(record[230]) == 4:
nat44_ses_create_num += 1
else:
nat44_ses_delete_num += 1
nat44_ses_create_num += 1
else:
nat44_ses_delete_num += 1
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
- if IP_PROTOS.icmp == ord(record[4]):
+ if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
self.assertEqual(struct.pack("!H", self.icmp_id_out),
record[227])
self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
self.assertEqual(struct.pack("!H", self.icmp_id_out),
record[227])
- elif IP_PROTOS.tcp == ord(record[4]):
+ elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
self.assertEqual(struct.pack("!H", self.tcp_port_in),
record[7])
self.assertEqual(struct.pack("!H", self.tcp_port_out),
record[227])
self.assertEqual(struct.pack("!H", self.tcp_port_in),
record[7])
self.assertEqual(struct.pack("!H", self.tcp_port_out),
record[227])
- elif IP_PROTOS.udp == ord(record[4]):
+ elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
self.assertEqual(struct.pack("!H", self.udp_port_in),
record[7])
self.assertEqual(struct.pack("!H", self.udp_port_out),
self.assertEqual(struct.pack("!H", self.udp_port_in),
record[7])
self.assertEqual(struct.pack("!H", self.udp_port_out),
self.assertEqual(1, len(data))
record = data[0]
# natEvent
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 3)
+ self.assertEqual(scapy.compat.orb(record[230]), 3)
# natPoolID
self.assertEqual(struct.pack("!I", 0), record[283])
# natPoolID
self.assertEqual(struct.pack("!I", 0), record[283])
self.assertEqual(1, len(data))
record = data[0]
# natEvent
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 1), record[466])
# maxSessionEntries
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 1), record[466])
# maxSessionEntries
self.assertEqual(1, len(data))
record = data[0]
# natEvent
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 2), record[466])
# maxBIBEntries
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 2), record[466])
# maxBIBEntries
self.assertEqual(1, len(data))
record = data[0]
# natEvent
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 5), record[466])
# maxFragmentsPendingReassembly
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 5), record[466])
# maxFragmentsPendingReassembly
self.assertEqual(1, len(data))
record = data[0]
# natEvent
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 5), record[466])
# maxFragmentsPendingReassembly
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 5), record[466])
# maxFragmentsPendingReassembly
record = data[0]
# natEvent
if is_create:
record = data[0]
# natEvent
if is_create:
- self.assertEqual(ord(record[230]), 10)
+ self.assertEqual(scapy.compat.orb(record[230]), 10)
- self.assertEqual(ord(record[230]), 11)
+ self.assertEqual(scapy.compat.orb(record[230]), 11)
# sourceIPv6Address
self.assertEqual(src_addr, record[27])
# postNATSourceIPv4Address
self.assertEqual(self.nat_addr_n, record[225])
# protocolIdentifier
# sourceIPv6Address
self.assertEqual(src_addr, record[27])
# postNATSourceIPv4Address
self.assertEqual(self.nat_addr_n, record[225])
# protocolIdentifier
- self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
+ self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# sourceTransportPort
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# sourceTransportPort
record = data[0]
# natEvent
if is_create:
record = data[0]
# natEvent
if is_create:
- self.assertEqual(ord(record[230]), 6)
+ self.assertEqual(scapy.compat.orb(record[230]), 6)
- self.assertEqual(ord(record[230]), 7)
+ self.assertEqual(scapy.compat.orb(record[230]), 7)
# sourceIPv6Address
self.assertEqual(src_addr, record[27])
# destinationIPv6Address
# sourceIPv6Address
self.assertEqual(src_addr, record[27])
# destinationIPv6Address
self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
record[226])
# protocolIdentifier
self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
record[226])
# protocolIdentifier
- self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
+ self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# sourceTransportPort
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# sourceTransportPort
self.assertEqual(1, len(data))
record = data[0]
# natEvent
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 3), record[466])
# maxEntriesPerUser
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 3), record[466])
# maxEntriesPerUser
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
reass = self.vapi.nat_reass_dump()
self.port_in = random.randint(1025, 65535)
reass = self.vapi.nat_reass_dump()
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
for i in range(2):
self.port_in = random.randint(1025, 65535)
for i in range(2):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
# send packet from host to server
pkts = self.create_stream_frag(self.pg0,
# send packet from host to server
pkts = self.create_stream_frag(self.pg0,
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
for i in range(2):
self.port_in = random.randint(1025, 65535)
for i in range(2):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
for i in range(2):
self.port_in = random.randint(1025, 65535)
for i in range(2):
is_inside=0)
self.vapi.nat44_forwarding_enable_disable(1)
is_inside=0)
self.vapi.nat44_forwarding_enable_disable(1)
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
pkts = self.create_stream_frag(self.pg1,
self.pg0.remote_ip4,
4789,
pkts = self.create_stream_frag(self.pg1,
self.pg0.remote_ip4,
4789,
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
src_port=self.ipfix_src_port)
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
src_port=self.ipfix_src_port)
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
self.tcp_port_in = random.randint(1025, 65535)
pkts = self.create_stream_frag(self.pg0,
self.pg1.remote_ip4,
self.tcp_port_in = random.randint(1025, 65535)
pkts = self.create_stream_frag(self.pg0,
self.pg1.remote_ip4,
reass_n_start = len(reass)
# in2out
reass_n_start = len(reass)
# in2out
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
self.pg0.add_stream(pkts)
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
self.pg0.add_stream(pkts)
self.assertEqual(data, p[Raw].load)
# out2in
self.assertEqual(data, p[Raw].load)
# out2in
- data = "A" * 4 + "b" * 16 + "C" * 3
+ data = b"A" * 4 + b"b" * 16 + b"C" * 3
pkts = self.create_stream_frag(self.pg1,
self.nat_addr,
20,
pkts = self.create_stream_frag(self.pg1,
self.nat_addr,
20,
self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
# in2out
self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
# in2out
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
pkts.reverse()
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
pkts.reverse()
self.assertEqual(data, p[Raw].load)
# out2in
self.assertEqual(data, p[Raw].load)
# out2in
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
pkts = self.create_stream_frag(self.pg1,
self.nat_addr,
20,
pkts = self.create_stream_frag(self.pg1,
self.nat_addr,
20,
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
src_port=self.ipfix_src_port)
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
src_port=self.ipfix_src_port)
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
pkts.reverse()
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
pkts.reverse()
for p in capture:
if p.haslayer(Data):
data = ipfix.decode_data_set(p.getlayer(Set))
for p in capture:
if p.haslayer(Data):
data = ipfix.decode_data_set(p.getlayer(Set))
- if ord(data[0][230]) == 10:
+ if scapy.compat.orb(data[0][230]) == 10:
self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
- elif ord(data[0][230]) == 6:
+ elif scapy.compat.orb(data[0][230]) == 6:
self.verify_ipfix_nat64_ses(data,
1,
self.pg0.remote_ip6n,
self.verify_ipfix_nat64_ses(data,
1,
self.pg0.remote_ip6n,
self.ipfix_domain_id)
if p.haslayer(Data):
data = ipfix.decode_data_set(p.getlayer(Set))
self.ipfix_domain_id)
if p.haslayer(Data):
data = ipfix.decode_data_set(p.getlayer(Set))
- if ord(data[0][230]) == 11:
+ if scapy.compat.orb(data[0][230]) == 11:
self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
- elif ord(data[0][230]) == 7:
+ elif scapy.compat.orb(data[0][230]) == 7:
self.verify_ipfix_nat64_ses(data,
0,
self.pg0.remote_ip6n,
self.verify_ipfix_nat64_ses(data,
0,
self.pg0.remote_ip6n,
VppIpTable, DpoProto
from vpp_papi import VppEnum
VppIpTable, DpoProto
from vpp_papi import VppEnum
from scapy.packet import Raw
from scapy.layers.l2 import Ether, ARP, Dot1Q
from scapy.layers.inet import IP, UDP
from scapy.packet import Raw
from scapy.layers.l2 import Ether, ARP, Dot1Q
from scapy.layers.inet import IP, UDP
#
# change the interface's MAC
#
#
# change the interface's MAC
#
- mac = [chr(0x00), chr(0x00), chr(0x00),
- chr(0x33), chr(0x33), chr(0x33)]
+ mac = [scapy.compat.chb(0x00), scapy.compat.chb(0x00),
+ scapy.compat.chb(0x00), scapy.compat.chb(0x33),
+ scapy.compat.chb(0x33), scapy.compat.chb(0x33)]
mac_string = ''.join(mac)
self.vapi.sw_interface_set_mac_address(self.pg1.sw_if_index,
mac_string = ''.join(mac)
self.vapi.sw_interface_set_mac_address(self.pg1.sw_if_index,
from struct import unpack, unpack_from
from util import ppp, ppc
from re import compile
from struct import unpack, unpack_from
from util import ppp, ppc
from re import compile
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP
# Format MAC Address
def get_mac_addr(bytes_addr):
# Format MAC Address
def get_mac_addr(bytes_addr):
- return ':'.join('%02x' % ord(b) for b in bytes_addr)
+ return ':'.join('%02x' % scapy.compat.orb(b) for b in bytes_addr)
# Format IP Address
def ipv4(bytes_addr):
# Format IP Address
def ipv4(bytes_addr):
- return '.'.join('%d' % ord(b) for b in bytes_addr)
+ return '.'.join('%d' % scapy.compat.orb(b) for b in bytes_addr)
from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
VppMplsLabel, VppMplsTable
from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
VppMplsLabel, VppMplsTable
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet import IP, UDP
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet import IP, UDP
# for table 1 map the n=0xff possible values of input QoS mark,
# n to 1-n
#
# for table 1 map the n=0xff possible values of input QoS mark,
# n to 1-n
#
- output = [chr(0)] * 256
+ output = [scapy.compat.chb(0)] * 256
- output[i] = chr(255 - i)
- os = ''.join(output)
+ output[i] = scapy.compat.chb(255 - i)
+ os = b''.join(output)
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
#
# For table 2 (and up) use the value n for everything
#
#
# For table 2 (and up) use the value n for everything
#
- output = [chr(2)] * 256
- os = ''.join(output)
+ output = [scapy.compat.chb(2)] * 256
+ os = b''.join(output)
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
self.vapi.qos_egress_map_update(2, rows)
self.vapi.qos_egress_map_update(2, rows)
- output = [chr(3)] * 256
- os = ''.join(output)
+ output = [scapy.compat.chb(3)] * 256
+ os = b''.join(output)
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
self.vapi.qos_egress_map_update(3, rows)
self.vapi.qos_egress_map_update(3, rows)
- output = [chr(4)] * 256
- os = ''.join(output)
+ output = [scapy.compat.chb(4)] * 256
+ os = b''.join(output)
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
UDP(sport=1234, dport=1234) /
p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * 65))
p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
tc=1) /
UDP(sport=1234, dport=1234) /
p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
tc=1) /
UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * 65))
#
# Since we have not yet enabled the recording of the input QoS
#
# Since we have not yet enabled the recording of the input QoS
from_ip = 6
from_mpls = 5
from_vlan = 4
from_ip = 6
from_mpls = 5
from_vlan = 4
- output = [chr(from_ext)] * 256
- os1 = ''.join(output)
- output = [chr(from_vlan)] * 256
- os2 = ''.join(output)
- output = [chr(from_mpls)] * 256
- os3 = ''.join(output)
- output = [chr(from_ip)] * 256
- os4 = ''.join(output)
+ output = [scapy.compat.chb(from_ext)] * 256
+ os1 = b''.join(output)
+ output = [scapy.compat.chb(from_vlan)] * 256
+ os2 = b''.join(output)
+ output = [scapy.compat.chb(from_mpls)] * 256
+ os3 = b''.join(output)
+ output = [scapy.compat.chb(from_ip)] * 256
+ os4 = b''.join(output)
rows = [{'outputs': os1},
{'outputs': os2},
{'outputs': os3},
rows = [{'outputs': os1},
{'outputs': os2},
{'outputs': os3},
p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
UDP(sport=1234, dport=1234) /
p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * 65))
p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
UDP(sport=1234, dport=1234) /
p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * 65))
rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
MPLS(label=32, cos=3, ttl=2) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
UDP(sport=1234, dport=1234) /
MPLS(label=32, cos=3, ttl=2) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * 65))
rx = self.send_and_expect(self.pg0, p_m1 * 65, self.pg1)
for p in rx:
rx = self.send_and_expect(self.pg0, p_m1 * 65, self.pg1)
for p in rx:
MPLS(label=33, ttl=2, cos=3) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
UDP(sport=1234, dport=1234) /
MPLS(label=33, ttl=2, cos=3) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * 65))
rx = self.send_and_expect(self.pg0, p_m2 * 65, self.pg1)
rx = self.send_and_expect(self.pg0, p_m2 * 65, self.pg1)
#
# QoS for all input values
#
#
# QoS for all input values
#
- output = [chr(0)] * 256
+ output = [scapy.compat.chb(0)] * 256
- output[i] = chr(255 - i)
- os = ''.join(output)
+ output[i] = scapy.compat.chb(255 - i)
+ os = b''.join(output)
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
Dot1Q(vlan=11, prio=1) /
IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
UDP(sport=1234, dport=1234) /
Dot1Q(vlan=11, prio=1) /
IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * 65))
p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
UDP(sport=1234, dport=1234) /
p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * 65))
rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
Dot1Q(vlan=11, prio=2) /
IPv6(src="2001::1", dst="2001::2", tc=1) /
UDP(sport=1234, dport=1234) /
Dot1Q(vlan=11, prio=2) /
IPv6(src="2001::1", dst="2001::2", tc=1) /
UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * 65))
p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IPv6(src="3001::1", dst="2001::1", tc=1) /
UDP(sport=1234, dport=1234) /
p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IPv6(src="3001::1", dst="2001::1", tc=1) /
UDP(sport=1234, dport=1234) /
+ Raw(scapy.compat.chb(100) * 65))
rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
import unittest
from parameterized import parameterized
import unittest
from parameterized import parameterized
from scapy.packet import Raw
from scapy.layers.l2 import Ether, GRE
from scapy.layers.inet import IP, UDP, ICMP
from scapy.packet import Raw
from scapy.layers.l2 import Ether, GRE
from scapy.layers.inet import IP, UDP, ICMP
cls.pkt_infos = []
for index, info in six.iteritems(infos):
p = info.data
cls.pkt_infos = []
for index, info in six.iteritems(infos):
p = info.data
- # cls.logger.debug(ppp("Packet:", p.__class__(str(p))))
+ # cls.logger.debug(ppp("Packet:",
+ # p.__class__(scapy.compat.raw(p))))
fragments_400 = fragment_rfc791(p, 400)
fragments_300 = fragment_rfc791(p, 300)
fragments_200 = [
fragments_400 = fragment_rfc791(p, 400)
fragments_300 = fragment_rfc791(p, 300)
fragments_200 = [
cls.pkt_infos = []
for index, info in six.iteritems(infos):
p = info.data
cls.pkt_infos = []
for index, info in six.iteritems(infos):
p = info.data
- # cls.logger.debug(ppp("Packet:", p.__class__(str(p))))
+ # cls.logger.debug(ppp("Packet:",
+ # p.__class__(scapy.compat.raw(p))))
fragments_400 = fragment_rfc8200(p, info.index, 400)
fragments_300 = fragment_rfc8200(p, info.index, 300)
cls.pkt_infos.append((index, fragments_400, fragments_300))
fragments_400 = fragment_rfc8200(p, info.index, 400)
fragments_300 = fragment_rfc8200(p, info.index, 300)
cls.pkt_infos.append((index, fragments_400, fragments_300))
Raw())
self.extend_packet(p, 1000, self.padding)
fragments = fragment_rfc8200(p, 1, 500)
Raw())
self.extend_packet(p, 1000, self.padding)
fragments = fragment_rfc8200(p, 1, 500)
- bad_fragment = p.__class__(str(fragments[1]))
+ bad_fragment = p.__class__(scapy.compat.raw(fragments[1]))
bad_fragment[IPv6ExtHdrFragment].nh = 59
bad_fragment[IPv6ExtHdrFragment].offset = 0
self.pg_enable_capture()
bad_fragment[IPv6ExtHdrFragment].nh = 59
bad_fragment[IPv6ExtHdrFragment].offset = 0
self.pg_enable_capture()
cls.pkt_infos = []
for index, info in six.iteritems(infos):
p = info.data
cls.pkt_infos = []
for index, info in six.iteritems(infos):
p = info.data
- # cls.logger.debug(ppp("Packet:", p.__class__(str(p))))
+ # cls.logger.debug(ppp("Packet:",
+ # p.__class__(scapy.compat.raw(p))))
fragments_300 = fragment_rfc791(p, 300)
cls.pkt_infos.append((index, fragments_300))
cls.fragments_300 = [x for (_, frags) in cls.pkt_infos for x in frags]
fragments_300 = fragment_rfc791(p, 300)
cls.pkt_infos.append((index, fragments_300))
cls.fragments_300 = [x for (_, frags) in cls.pkt_infos for x in frags]
from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
tx_ip.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
tx_ip.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
+ tx_ip = IP(scapy.compat.raw(tx_ip))
self.assertEqual(rx_srh.payload, tx_ip)
self.assertEqual(rx_srh.payload, tx_ip)
self.assertEqual(rx_srh.nh, 59)
# the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
self.assertEqual(rx_srh.nh, 59)
# the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
- self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
+ self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
self.logger.debug("packet verification: SUCCESS")
self.logger.debug("packet verification: SUCCESS")
tx_ip2.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
tx_ip2.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- tx_ip2 = IP(str(tx_ip2))
+ tx_ip2 = IP(scapy.compat.raw(tx_ip2))
self.assertEqual(rx_ip, tx_ip2)
self.assertEqual(rx_ip, tx_ip2)
tx_ip = tx_pkt.getlayer(IPv6)
# we can't just get the 2nd Ether layer
# get the Raw content and dissect it as Ether
tx_ip = tx_pkt.getlayer(IPv6)
# we can't just get the 2nd Ether layer
# get the Raw content and dissect it as Ether
- tx_eth1 = Ether(str(tx_pkt[Raw]))
+ tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
# verify if rx'ed packet has no SRH
self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
# verify if rx'ed packet has no SRH
self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
# read back the dumped packet (with str())
# to force computing these fields
# probably other ways are possible
# read back the dumped packet (with str())
# to force computing these fields
# probably other ways are possible
+ p = Ether(scapy.compat.raw(p))
payload_info.data = p.copy()
self.logger.debug(ppp("Created packet:", p))
pkts.append(p)
payload_info.data = p.copy()
self.logger.debug(ppp("Created packet:", p))
pkts.append(p)
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
payload_info = self.payload_to_info(
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
payload_info = self.payload_to_info(
- str(Ether(str(packet[Raw]))[Raw]))
+ Ether(scapy.compat.r(packet[Raw]))[Raw])
:param compare_func: function to compare in and out packet
"""
self.logger.info("Verifying capture on interface %s using function %s"
:param compare_func: function to compare in and out packet
"""
self.logger.info("Verifying capture on interface %s using function %s"
- % (dst_if.name, compare_func.func_name))
+ % (dst_if.name, compare_func.__name__))
last_info = dict()
for i in self.pg_interfaces:
last_info = dict()
for i in self.pg_interfaces:
from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
tx_ip2.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
tx_ip2.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- tx_ip2 = IP(str(tx_ip2))
+ tx_ip2 = IP(scapy.compat.raw(tx_ip2))
self.assertEqual(rx_ip, tx_ip2)
self.assertEqual(rx_ip, tx_ip2)
tx_ip.chksum = None
# -> read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
tx_ip.chksum = None
# -> read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- self.assertEqual(rx_srh.payload, IP(str(tx_ip)))
+ self.assertEqual(rx_srh.payload, IP(scapy.compat.raw(tx_ip)))
self.logger.debug("packet verification: SUCCESS")
self.logger.debug("packet verification: SUCCESS")
tx_ip = tx_pkt.getlayer(IPv6)
# we can't just get the 2nd Ether layer
# get the Raw content and dissect it as Ether
tx_ip = tx_pkt.getlayer(IPv6)
# we can't just get the 2nd Ether layer
# get the Raw content and dissect it as Ether
- tx_eth1 = Ether(str(tx_pkt[Raw]))
+ tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
# verify if rx'ed packet has no SRH
self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
# verify if rx'ed packet has no SRH
self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
# the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
tx_ether = tx_pkt.getlayer(Ether)
# the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
tx_ether = tx_pkt.getlayer(Ether)
- self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
+ self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
self.logger.debug("packet verification: SUCCESS")
self.logger.debug("packet verification: SUCCESS")
# read back the dumped packet (with str())
# to force computing these fields
# probably other ways are possible
# read back the dumped packet (with str())
# to force computing these fields
# probably other ways are possible
+ p = Ether(scapy.compat.raw(p))
payload_info.data = p.copy()
self.logger.debug(ppp("Created packet:", p))
pkts.append(p)
payload_info.data = p.copy()
self.logger.debug(ppp("Created packet:", p))
pkts.append(p)
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
payload_info = self.payload_to_info(
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
payload_info = self.payload_to_info(
- Ether(str(packet[Raw]))[Raw])
+ Ether(scapy.compat.raw(packet[Raw]))[Raw])
:param compare_func: function to compare in and out packet
"""
self.logger.info("Verifying capture on interface %s using function %s"
:param compare_func: function to compare in and out packet
"""
self.logger.info("Verifying capture on interface %s using function %s"
- % (dst_if.name, compare_func.func_name))
+ % (dst_if.name, compare_func.__name__))
last_info = dict()
for i in self.pg_interfaces:
last_info = dict()
for i in self.pg_interfaces:
from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
tx_ip.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
tx_ip.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
+ tx_ip = IP(scapy.compat.raw(tx_ip))
self.assertEqual(payload, tx_ip)
self.assertEqual(payload, tx_ip)
payload = rx_ip.payload
# the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
payload = rx_ip.payload
# the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
- self.assertEqual(Ether(str(payload)), tx_ether)
+ self.assertEqual(Ether(scapy.compat.raw(payload)), tx_ether)
self.logger.debug("packet verification: SUCCESS")
self.logger.debug("packet verification: SUCCESS")
tx_ip2.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
tx_ip2.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- tx_ip2 = IP(str(tx_ip2))
+ tx_ip2 = IP(scapy.compat.raw(tx_ip2))
self.assertEqual(rx_ip, tx_ip2)
self.assertEqual(rx_ip, tx_ip2)
tx_ip = tx_pkt.getlayer(IPv6)
# we can't just get the 2nd Ether layer
# get the Raw content and dissect it as Ether
tx_ip = tx_pkt.getlayer(IPv6)
# we can't just get the 2nd Ether layer
# get the Raw content and dissect it as Ether
- tx_eth1 = Ether(str(tx_pkt[Raw]))
+ tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
# verify if rx'ed packet has no SRH
self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
# verify if rx'ed packet has no SRH
self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
# read back the dumped packet (with str())
# to force computing these fields
# probably other ways are possible
# read back the dumped packet (with str())
# to force computing these fields
# probably other ways are possible
+ p = Ether(scapy.compat.raw(p))
payload_info.data = p.copy()
self.logger.debug(ppp("Created packet:", p))
pkts.append(p)
payload_info.data = p.copy()
self.logger.debug(ppp("Created packet:", p))
pkts.append(p)
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
payload_info = self.payload_to_info(
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
payload_info = self.payload_to_info(
- Ether(str(packet[Raw]))[Raw])
+ Ether(scapy.compat.raw(packet[Raw]))[Raw])
:param compare_func: function to compare in and out packet
"""
self.logger.info("Verifying capture on interface %s using function %s"
:param compare_func: function to compare in and out packet
"""
self.logger.info("Verifying capture on interface %s using function %s"
- % (dst_if.name, compare_func.func_name))
+ % (dst_if.name, compare_func.__name__))
last_info = dict()
for i in self.pg_interfaces:
last_info = dict()
for i in self.pg_interfaces:
import sys
import os.path
import sys
import os.path
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP
from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP
from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
"""
logger = LoggerWrapper(_logger)
logger.debug(ppp("Fragmenting packet:", packet))
"""
logger = LoggerWrapper(_logger)
logger.debug(ppp("Fragmenting packet:", packet))
- packet = packet.__class__(str(packet)) # recalculate all values
+ packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
if len(packet[IP].options) > 0:
raise Exception("Not implemented")
if len(packet) <= fragsize:
if len(packet[IP].options) > 0:
raise Exception("Not implemented")
if len(packet) <= fragsize:
pre_ip_len = len(packet) - len(packet[IP])
ip_header_len = packet[IP].ihl * 4
pre_ip_len = len(packet) - len(packet[IP])
ip_header_len = packet[IP].ihl * 4
- hex_packet = str(packet)
+ hex_packet = scapy.compat.raw(packet)
hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
:returns: list of fragments
"""
logger = LoggerWrapper(_logger)
:returns: list of fragments
"""
logger = LoggerWrapper(_logger)
- packet = packet.__class__(str(packet)) # recalculate all values
+ packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
if len(packet) <= fragsize:
return [packet]
logger.debug(ppp("Fragmenting packet:", packet))
if len(packet) <= fragsize:
return [packet]
logger.debug(ppp("Fragmenting packet:", packet))
logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
- hex_payload = str(ext_and_upper_layer)
+ hex_payload = scapy.compat.raw(ext_and_upper_layer)
logger.debug("Payload length is %s" % len(hex_payload))
logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
logger.debug("Payload length is %s" % len(hex_payload))
logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
p[IPv6ExtHdrFragment].id = identification
p[IPv6ExtHdrFragment].offset = 0
p[IPv6ExtHdrFragment].m = 1
p[IPv6ExtHdrFragment].id = identification
p[IPv6ExtHdrFragment].offset = 0
p[IPv6ExtHdrFragment].m = 1
- p = p.__class__(str(p))
+ p = p.__class__(scapy.compat.raw(p))
logger.debug(ppp("Fragment %s:" % len(pkts), p))
pkts.append(p)
offset = first_payload_len_nfb * 8
logger.debug(ppp("Fragment %s:" % len(pkts), p))
pkts.append(p)
offset = first_payload_len_nfb * 8
p[IPv6ExtHdrFragment].id = identification
p[IPv6ExtHdrFragment].offset = offset / 8
p[IPv6ExtHdrFragment].m = 1
p[IPv6ExtHdrFragment].id = identification
p[IPv6ExtHdrFragment].offset = offset / 8
p[IPv6ExtHdrFragment].m = 1
- p = p.__class__(str(p))
+ p = p.__class__(scapy.compat.raw(p))
logger.debug(ppp("Fragment %s:" % len(pkts), p))
pkts.append(p)
offset = offset + l_nfb * 8
logger.debug(ppp("Fragment %s:" % len(pkts), p))
pkts.append(p)
offset = offset + l_nfb * 8
import socket
import struct
from traceback import format_exc, format_stack
import socket
import struct
from traceback import format_exc, format_stack
from scapy.utils import wrpcap, rdpcap, PcapReader
from scapy.plist import PacketList
from vpp_interface import VppInterface
from scapy.utils import wrpcap, rdpcap, PcapReader
from scapy.plist import PacketList
from vpp_interface import VppInterface
# Make Dot1AD packet content recognizable to scapy
if arp_reply.type == 0x88a8:
arp_reply.type = 0x8100
# Make Dot1AD packet content recognizable to scapy
if arp_reply.type == 0x88a8:
arp_reply.type = 0x8100
- arp_reply = Ether(str(arp_reply))
+ arp_reply = Ether(scapy.compat.raw(arp_reply))
try:
if arp_reply[ARP].op == ARP.is_at:
self.test.logger.info("VPP %s MAC address is %s " %
try:
if arp_reply[ARP].op == ARP.is_at:
self.test.logger.info("VPP %s MAC address is %s " %
ndp_reply = captured_packet.copy() # keep original for exception
# Make Dot1AD packet content recognizable to scapy
if ndp_reply.type == 0x88a8:
ndp_reply = captured_packet.copy() # keep original for exception
# Make Dot1AD packet content recognizable to scapy
if ndp_reply.type == 0x88a8:
+ self._test.logger.info(
+ "Replacing EtherType: 0x88a8 with "
+ "0x8100 and regenerating Ethernet header. ")
- ndp_reply = Ether(str(ndp_reply))
+ ndp_reply = Ether(scapy.compat.raw(ndp_reply))
try:
ndp_na = ndp_reply[ICMPv6ND_NA]
opt = ndp_na[ICMPv6NDOptDstLLAddr]
try:
ndp_na = ndp_reply[ICMPv6ND_NA]
opt = ndp_na[ICMPv6NDOptDstLLAddr]