from inspect import getdoc, isclass
from traceback import format_exception
from logging import FileHandler, DEBUG, Formatter
+
+import scapy.compat
from scapy.packet import Raw
from hook import StepHook, PollHook, VppDiedError
from vpp_pg_interface import VppPGInterface
if hasattr(cls, 'vpp'):
if hasattr(cls, 'vapi'):
+ cls.logger.debug("Disconnecting class vapi client on %s",
+ cls.__name__)
cls.vapi.disconnect()
+ cls.logger.debug("Deleting class vapi attribute on %s",
+ cls.__name__)
del cls.vapi
cls.vpp.poll()
if cls.vpp.returncode is None:
cls.vpp.kill()
cls.logger.debug("Waiting for vpp to die")
cls.vpp.communicate()
+ cls.logger.debug("Deleting class vpp attribute on %s",
+ cls.__name__)
del cls.vpp
if cls.vpp_startup_failed:
def tearDown(self):
""" Show various debug prints after each test """
- super(VppTestCase, self).tearDown()
self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
(self.__class__.__name__, self._testMethodName,
self._testMethodDoc))
def assert_packet_checksums_valid(self, packet,
ignore_zero_udp_checksums=True):
- received = packet.__class__(str(packet))
+ received = packet.__class__(scapy.compat.raw(packet))
self.logger.debug(
ppp("Verifying packet checksums for packet:", received))
udp_layers = ['UDP', 'UDPerror']
checksum_fields = ['cksum', 'chksum']
checksums = []
counter = 0
- temp = received.__class__(str(received))
+ temp = received.__class__(scapy.compat.raw(received))
while True:
layer = temp.getlayer(counter)
if layer:
counter = counter + 1
if 0 == len(checksums):
return
- temp = temp.__class__(str(temp))
+ temp = temp.__class__(scapy.compat.raw(temp))
for layer, cf in checksums:
calc_sum = getattr(temp[layer], cf)
self.assert_equal(
received_packet_checksum = getattr(received_packet[layer], field_name)
if ignore_zero_checksum and 0 == received_packet_checksum:
return
- recalculated = received_packet.__class__(str(received_packet))
+ recalculated = received_packet.__class__(
+ scapy.compat.raw(received_packet))
delattr(recalculated[layer], field_name)
- recalculated = recalculated.__class__(str(recalculated))
+ recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
self.assert_equal(received_packet_checksum,
getattr(recalculated[layer], field_name),
"packet checksum on layer: %s" % layer)
from log import RED, single_line_delim, double_line_delim
import ipaddress
from subprocess import check_output, CalledProcessError
+
+import scapy.compat
+
from util import check_core_path, get_core_path
return val
if len(val) == 6:
return '{!s} ({!s})'.format(val, ':'.join(['{:02x}'.format(
- ord(x)) for x in val]))
+ scapy.compat.orb(x)) for x in val]))
try:
# we don't call test_type(val) because it is a packed value.
return '{!s} ({!s})'.format(val, str(
from random import choice, shuffle
from pprint import pprint
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP, TCP
# Scapy IPv6 stuff is too smart for its own good.
# So we do this and coerce the ICMP into unknown type
if packet.haslayer(UDP):
- data = str(packet[UDP][Raw])
+ data = scapy.compat.raw(packet[UDP][Raw])
else:
if l3 == IP:
- data = str(ICMP(str(packet[l3].payload))[Raw])
+ data = scapy.compat.raw(ICMP(
+ scapy.compat.raw(packet[l3].payload))[Raw])
else:
- data = str(ICMPv6Unknown(str(packet[l3].payload)).msgbody)
+ data = scapy.compat.raw(ICMPv6Unknown(
+ scapy.compat.raw(packet[l3].payload)).msgbody)
udp_or_icmp = packet[l3].payload
payload_info = self.payload_to_info(data)
packet_index = payload_info.index
"""ACL plugin - MACIP tests
"""
import binascii
+import ipaddress
import random
from socket import inet_ntop, inet_pton, AF_INET, AF_INET6
from struct import pack, unpack
import re
import unittest
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
return acls
def create_rules(self, mac_type=EXACT_MAC, ip_type=EXACT_IP,
- acl_count=1, rules_count=[1]):
+ acl_count=1, rules_count=None):
acls = []
+ if rules_count is None:
+ rules_count = [1]
src_mac = int("220000dead00", 16)
for acl in range(2, (acl_count+1) * 2):
rules = []
ip4[3] = 0
ip6[8] = random.randint(100, 200)
ip6[15] = 0
- ip_pack = ''
+ ip_pack = b''
for j in range(0, len(ip)):
ip_pack += pack('<B', int(ip[j]))
'is_ipv6': is_ip6,
'src_ip_addr': ip_pack,
'src_ip_prefix_len': ip_len,
- 'src_mac': mac.replace(':', '').decode('hex'),
- 'src_mac_mask': mask.replace(':', '').decode('hex')})
+ 'src_mac': binascii.unhexlify(mac.replace(':', '')),
+ 'src_mac_mask': binascii.unhexlify(
+ mask.replace(':', ''))})
rules.append(rule)
if ip_type == self.WILD_IP:
break
def verify_macip_acls(self, acl_count, rules_count, expected_count=2):
reply = self.macip_acl_dump_debug()
for acl in range(2, (acl_count+1) * 2):
- self.assertEqual(reply[acl - 2].count, rules_count[acl/2-1])
+ self.assertEqual(reply[acl - 2].count, rules_count[acl//2-1])
self.vapi.macip_acl_interface_get()
sub_ip[15] = random.randint(200, 255)
elif ip_type == self.SUBNET_IP:
if denyIP:
- sub_ip[2] = str(int(sub_ip[2]) + 1)
+ sub_ip[2] = int(sub_ip[2]) + 1
sub_ip[14] = random.randint(100, 199)
sub_ip[15] = random.randint(200, 255)
- src_ip6 = inet_ntop(AF_INET6, str(bytearray(sub_ip)))
+ packed_src_ip6 = b''.join(
+ [scapy.compat.chb(x) for x in sub_ip])
+ src_ip6 = inet_ntop(AF_INET6, packed_src_ip6)
packet /= IPv6(src=src_ip6, dst=dst_ip6)
else:
if ip_type != self.EXACT_IP:
sub_ip = ip_rule.split('.')
if ip_type == self.WILD_IP:
- sub_ip[0] = str(random.randint(1, 49))
- sub_ip[1] = str(random.randint(50, 99))
- sub_ip[2] = str(random.randint(100, 199))
- sub_ip[3] = str(random.randint(200, 255))
+ sub_ip[0] = random.randint(1, 49)
+ sub_ip[1] = random.randint(50, 99)
+ sub_ip[2] = random.randint(100, 199)
+ sub_ip[3] = random.randint(200, 255)
elif ip_type == self.SUBNET_IP:
if denyIP:
- sub_ip[1] = str(int(sub_ip[1])+1)
- sub_ip[2] = str(random.randint(100, 199))
- sub_ip[3] = str(random.randint(200, 255))
- src_ip4 = ".".join(sub_ip)
+ sub_ip[1] = int(sub_ip[1])+1
+ sub_ip[2] = random.randint(100, 199)
+ sub_ip[3] = random.randint(200, 255)
+ src_ip4 = '.'.join(['{!s}'.format(x) for x in sub_ip])
packet /= IP(src=src_ip4, dst=dst_ip4, frag=0, flags=0)
packet /= UDP(sport=src_port, dport=dst_port)/Raw(payload)
- packet[Raw].load += " mac:"+src_mac
+ packet[Raw].load += b" mac:%s" % scapy.compat.raw(src_mac)
size = self.pg_if_packet_sizes[p % len(self.pg_if_packet_sizes)]
if isinstance(src_if, VppSubInterface):
sub_ip = list(unpack('<16B', inet_pton(AF_INET6, ip)))
for i in range(8, 16):
sub_ip[i] = 0
- ip = inet_ntop(AF_INET6, str(bytearray(sub_ip)))
+ packed_ip = b''.join(
+ [scapy.compat.chb(x) for x in sub_ip])
+ ip = inet_ntop(AF_INET6, packed_ip)
else:
if ip_type == self.WILD_IP:
ip = "0.0.0.0"
'is_ipv6': is_ip6,
'src_ip_addr': ip_rule,
'src_ip_prefix_len': prefix_len,
- 'src_mac': mac_rule.replace(':', '').decode('hex'),
- 'src_mac_mask': mac_mask.replace(':', '').decode('hex')})
+ 'src_mac': binascii.unhexlify(mac_rule.replace(':', '')),
+ 'src_mac_mask': binascii.unhexlify(
+ mac_mask.replace(':', ''))})
macip_rules.append(macip_rule)
# deny all other packets
from struct import pack, unpack
from six import moves
+import scapy.compat
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
while conf_key_id in self._conf_key_ids:
conf_key_id = randint(0, 0xFFFFFFFF)
self._conf_key_ids[conf_key_id] = 1
- key = str(bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
+ key = scapy.compat.raw(
+ bytearray([randint(0, 255) for _ in range(randint(1, 20))]))
return VppBFDAuthKey(test=test, auth_type=auth_type,
conf_key_id=conf_key_id, key=key)
self.test.logger.debug("BFD: Creating packet")
self.fill_packet_fields(packet)
if self.sha1_key:
- hash_material = str(packet[BFD])[:32] + self.sha1_key.key + \
+ hash_material = scapy.compat.raw(
+ packet[BFD])[:32] + self.sha1_key.key + \
"\0" * (20 - len(self.sha1_key.key))
self.test.logger.debug("BFD: Calculated SHA1 hash: %s" %
hashlib.sha1(hash_material).hexdigest())
# last 20 bytes represent the hash - so replace them with the key,
# pad the result with zeros and hash the result
hash_material = bfd.original[:-20] + self.sha1_key.key + \
- "\0" * (20 - len(self.sha1_key.key))
+ b"\0" * (20 - len(self.sha1_key.key))
expected_hash = hashlib.sha1(hash_material).hexdigest()
self.test.assert_equal(binascii.hexlify(bfd.auth_key_hash),
expected_hash, "Auth key hash")
# halve required min rx
old_required_min_rx = self.vpp_session.required_min_rx
self.vpp_session.modify_parameters(
- required_min_rx=0.5 * self.vpp_session.required_min_rx)
+ required_min_rx=self.vpp_session.required_min_rx // 2)
# now we wait 0.8*3*old-req-min-rx and the session should still be up
self.sleep(0.8 * self.vpp_session.detect_mult *
old_required_min_rx / USEC_IN_SEC,
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
- self.assertEqual(str(p[UDP].payload),
- str(echo_packet[UDP].payload),
+ self.assertEqual(scapy.compat.raw(p[UDP].payload),
+ scapy.compat.raw(echo_packet[UDP].payload),
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
udp_sport_rx += 1
# need to compare the hex payload here, otherwise BFD_vpp_echo
# gets in way
- self.assertEqual(str(p[UDP].payload),
- str(echo_packet[UDP].payload),
+ self.assertEqual(scapy.compat.raw(p[UDP].payload),
+ scapy.compat.raw(echo_packet[UDP].payload),
"Received packet is not the echo packet sent")
self.assert_equal(udp_sport_tx, udp_sport_rx, "UDP source port (== "
"ECHO packet identifier for test purposes)")
def cli_verify_no_response(self, cli):
""" execute a CLI, asserting that the response is empty """
self.assert_equal(self.vapi.cli(cli),
- "",
+ b"",
"CLI command response")
def cli_verify_response(self, cli, expected):
self.cli_verify_no_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(ord(c)) for c in k.key)))
+ "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(
self, self.pg0, self.pg0.remote_ip4, sha1_key=k)
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(ord(c)) for c in k2.key)),
+ "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work
self.cli_verify_no_response(
"bfd key set conf-key-id %s type meticulous-keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(ord(c)) for c in k.key)))
+ "".join("{:02x}".format(scapy.compat.orb(c)) for c in k.key)))
self.assertTrue(k.query_vpp_config())
self.vpp_session = VppBFDUDPSession(self, self.pg0,
self.pg0.remote_ip6, af=AF_INET6,
self.cli_verify_response(
"bfd key set conf-key-id %s type keyed-sha1 secret %s" %
(k.conf_key_id,
- "".join("{:02x}".format(ord(c)) for c in k2.key)),
+ "".join("{:02x}".format(scapy.compat.orb(c)) for c in k2.key)),
"bfd key set: `bfd_auth_set_key' API call failed, "
"rv=-103:BFD object in use")
# manipulating the session using old secret should still work
VppBierDispTable, VppBierTable, VppBierTableID, VppBierRoute
from vpp_udp_encap import VppUdpEncap
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
for pkt_size in pkt_sizes:
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
MPLS(label=77, ttl=255) /
- BIER(length=hdr_len_id, BitString=chr(255)*n_bytes) /
+ BIER(length=hdr_len_id,
+ BitString=scapy.compat.chb(255)*n_bytes) /
IPv6(src=self.pg0.remote_ip6, dst=self.pg0.remote_ip6) /
UDP(sport=1234, dport=1234) /
- Raw(chr(5) * pkt_size))
+ Raw(scapy.compat.chb(5) * pkt_size))
pkts = p
self.pg0.add_stream(pkts)
self.assertEqual(bier_hdr.Proto, 5)
# The bit-string should consist only of the BP given by i.
- byte_array = ['\0'] * (n_bytes)
- byte_val = chr(1 << (bp - 1) % 8)
- byte_pos = n_bytes - (((bp - 1) / 8) + 1)
+ byte_array = [b'\0'] * (n_bytes)
+ byte_val = scapy.compat.chb(1 << (bp - 1) % 8)
+ byte_pos = n_bytes - (((bp - 1) // 8) + 1)
byte_array[byte_pos] = byte_val
- bitstring = ''.join(byte_array)
+ bitstring = ''.join([scapy.compat.chb(x) for x in byte_array])
self.assertEqual(len(bitstring), len(bier_hdr.BitString))
self.assertEqual(bitstring, bier_hdr.BitString)
#
# cleanup. not strictly necessary, but it's much quicker this way
- # becuase the bier_fib_dump and ip_fib_dump will be empty when the
+ # because the bier_fib_dump and ip_fib_dump will be empty when the
# auto-cleanup kicks in
#
for br in bier_routes:
MPLS(label=77, ttl=255) /
BIER(length=BIERLength.BIER_LEN_64,
entropy=ii,
- BitString=chr(255)*16) /
+ BitString=scapy.compat.chb(255)*16) /
IPv6(src=self.pg0.remote_ip6,
dst=self.pg0.remote_ip6) /
UDP(sport=1234, dport=1234) /
#
# An imposition object with both bit-positions set
#
- bi = VppBierImp(self, bti, 333, chr(0x3) * 32)
+ bi = VppBierImp(self, bti, 333, scapy.compat.chb(0x3) * 32)
bi.add_vpp_config()
#
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
MPLS(label=77, ttl=255) /
BIER(length=BIERLength.BIER_LEN_256,
- BitString=chr(255)*32,
+ BitString=scapy.compat.chb(255)*32,
BFRID=99) /
IP(src="1.1.1.1", dst="232.1.1.1") /
UDP(sport=1234, dport=1234) /
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
MPLS(label=77, ttl=255) /
BIER(length=BIERLength.BIER_LEN_256,
- BitString=chr(255)*32,
+ BitString=scapy.compat.chb(255)*32,
BFRID=77) /
IP(src="1.1.1.1", dst="232.1.1.1") /
UDP(sport=1234, dport=1234) /
# A multicast route to forward post BIER disposition that needs
# a check against sending back into the BIER core
#
- bi = VppBierImp(self, bti, 333, chr(0x3) * 32)
+ bi = VppBierImp(self, bti, 333, scapy.compat.chb(0x3) * 32)
bi.add_vpp_config()
route_eg_232_1_1_2 = VppIpMRoute(
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
MPLS(label=77, ttl=255) /
BIER(length=BIERLength.BIER_LEN_256,
- BitString=chr(255)*32,
+ BitString=scapy.compat.chb(255)*32,
BFRID=77) /
IP(src="1.1.1.1", dst="232.1.1.2") /
UDP(sport=1234, dport=1234) /
bt = VppBierTable(self, bti, 77)
bt.add_vpp_config()
- lowest = ['\0'] * (n_bytes)
- lowest[-1] = chr(1)
- highest = ['\0'] * (n_bytes)
- highest[0] = chr(128)
+ lowest = [b'\0'] * (n_bytes)
+ lowest[-1] = scapy.compat.chb(1)
+ highest = [b'\0'] * (n_bytes)
+ highest[0] = scapy.compat.chb(128)
#
# Impostion Sets bit strings
src=self.pg0.remote_mac) /
IP(src="1.1.1.1", dst="232.1.1.1") /
UDP(sport=1234, dport=1234) /
- Raw(chr(5) * 32))
+ Raw(scapy.compat.chb(5) * 32))
rx = self.send_and_expect(self.pg0, p*65, self.pg1)
src=self.pg0.remote_mac) /
IP(src="1.1.1.1", dst="232.1.1.2") /
UDP(sport=1234, dport=1234) /
- Raw(chr(5) * 512))
+ Raw(scapy.compat.chb(5) * 512))
rx = self.send_and_expect(self.pg0, p*65, self.pg1)
self.assertEqual(rx[0][IP].src, "1.1.1.1")
# only use the second, but creating 2 tests with a non-zero
# value index in the route add
#
- bi = VppBierImp(self, bti, 333, chr(0xff) * 32)
+ bi = VppBierImp(self, bti, 333, scapy.compat.chb(0xff) * 32)
bi.add_vpp_config()
- bi2 = VppBierImp(self, bti, 334, chr(0xff) * 32)
+ bi2 = VppBierImp(self, bti, 334, scapy.compat.chb(0xff) * 32)
bi2.add_vpp_config()
#
UDP(sport=333, dport=8138) /
BIFT(sd=1, set=0, bsl=2, ttl=255) /
BIER(length=BIERLength.BIER_LEN_256,
- BitString=chr(255)*32,
+ BitString=scapy.compat.chb(255)*32,
BFRID=99) /
IP(src="1.1.1.1", dst="232.1.1.1") /
UDP(sport=1234, dport=1234) /
super(TestCDP, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestCDP, cls).tearDownClass()
+
def test_enable_cdp(self):
self.logger.info(self.vapi.cli("cdp enable"))
ret = self.vapi.cli("show cdp")
from vpp_neighbor import VppNeighbor
from vpp_ip_route import find_route, VppIpTable
from util import mk_ll_addr
+import scapy.compat
from scapy.layers.l2 import Ether, getmacbyip, ARP
from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6, in6_getnsmac
#
# 1. not our IP address = not checked by VPP? so offer is replayed
# to client
- bad_ip = option_82[0:8] + chr(33) + option_82[9:]
+ bad_ip = option_82[0:8] + scapy.compat.chb(33) + option_82[9:]
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
"DHCP offer option 82 bad address")
# 2. Not a sw_if_index VPP knows
- bad_if_index = option_82[0:2] + chr(33) + option_82[3:]
+ bad_if_index = option_82[0:2] + scapy.compat.chb(33) + option_82[3:]
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
vx_tun_l3.add_vpp_config()
#
- # packets destined to unkown addresses in the BVI's subnet
+ # packets destined to unknown addresses in the BVI's subnet
# are ARP'd for
#
p4 = (Ether(src=self.pg0.remote_mac, dst=str(self.router_mac)) /
import unittest
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q, GRE
from scapy.layers.inet import IP, UDP
GRE() /
Ether(dst=RandMAC('*:*:*:*:*:*'),
src=RandMAC('*:*:*:*:*:*')) /
- IP(src=str(RandIP()), dst=str(RandIP())) /
+ IP(src=scapy.compat.raw(RandIP()),
+ dst=scapy.compat.raw(RandIP())) /
UDP(sport=1234, dport=1234) /
Raw(payload))
info.data = p.copy()
Ether(dst=RandMAC('*:*:*:*:*:*'),
src=RandMAC('*:*:*:*:*:*')) /
Dot1Q(vlan=vlan) /
- IP(src=str(RandIP()), dst=str(RandIP())) /
+ IP(src=scapy.compat.raw(RandIP()),
+ dst=scapy.compat.raw(RandIP())) /
UDP(sport=1234, dport=1234) /
Raw(payload))
info.data = p.copy()
self.assertEqual(rx_ip.src, tunnel_src)
self.assertEqual(rx_ip.dst, tunnel_dst)
- rx_gre = GRE(str(rx_ip[IPv6].payload))
+ rx_gre = GRE(scapy.compat.raw(rx_ip[IPv6].payload))
rx_ip = rx_gre[IPv6]
self.assertEqual(rx_ip.src, tx_ip.src)
self.assertEqual(rx_ip.src, tunnel_src)
self.assertEqual(rx_ip.dst, tunnel_dst)
- rx_gre = GRE(str(rx_ip[IPv6].payload))
+ rx_gre = GRE(scapy.compat.raw(rx_ip[IPv6].payload))
tx_ip = tx[IP]
rx_ip = rx_gre[IP]
self.assertEqual(rx_ip.src, tunnel_src)
self.assertEqual(rx_ip.dst, tunnel_dst)
- rx_gre = GRE(str(rx_ip[IP].payload))
+ rx_gre = GRE(scapy.compat.raw(rx_ip[IP].payload))
rx_ip = rx_gre[IPv6]
tx_ip = tx[IPv6]
import socket
import unittest
+import scapy.compat
from scapy.contrib.mpls import MPLS
from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
from scapy.layers.l2 import Ether, Dot1Q, ARP
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
- n_dest_addr = '{:08x}'.format(dest_addr).decode('hex')
+ n_dest_addr = binascii.unhexlify('{:08x}'.format(dest_addr))
self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len,
n_next_hop_addr)
added_ips.append(socket.inet_ntoa(n_dest_addr))
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
- n_dest_addr = '{:08x}'.format(dest_addr).decode('hex')
+ n_dest_addr = binascii.unhexlify('{:08x}'.format(dest_addr))
self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len,
n_next_hop_addr, is_add=0)
removed_ips.append(socket.inet_ntoa(n_dest_addr))
import random
import socket
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ARP
if found:
break
for host in pg_if.remote_hosts:
- if str(addr) == str(host.ip4):
+ if scapy.compat.raw(addr) == \
+ scapy.compat.raw(host.ip4):
vrf_count += 1
found = True
break
import unittest
from parameterized import parameterized
+import scapy.compat
import scapy.layers.inet6 as inet6
from scapy.contrib.mpls import MPLS
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
import socket
+import scapy.compat
from scapy.layers.l2 import Ether
from scapy.layers.inet import ICMP, IP, TCP, UDP
from scapy.layers.ipsec import SecurityAssociation, ESP
+
from util import ppp, ppc
from template_ipsec import TemplateIpsec
from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
def verify_capture_encrypted(self, capture, sa):
for packet in capture:
try:
- copy = packet.__class__(str(packet))
+ copy = packet.__class__(scapy.compat.raw(packet))
del copy[UDP].len
- copy = packet.__class__(str(copy))
+ copy = packet.__class__(scapy.compat.raw(copy))
self.assert_equal(packet[UDP].len, copy[UDP].len,
"UDP header length")
self.assert_packet_checksums_valid(packet)
import socket
+import scapy.compat
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether, GRE
self.assertEqual(gre.proto, 0x0800 if isv4 else 0x86DD)
self.assertEqual(gre.flags, 0)
self.assertEqual(gre.version, 0)
- inner = IPver(str(gre.payload))
+ inner = IPver(scapy.compat.raw(gre.payload))
payload_info = self.payload_to_info(inner[Raw])
self.info = self.packet_infos[payload_info.index]
self.assertEqual(payload_info.src, self.pg0.sw_if_index)
- self.assertEqual(str(inner), str(self.info.data[IPver]))
+ self.assertEqual(scapy.compat.raw(inner),
+ scapy.compat.raw(self.info.data[IPver]))
def checkCapture(self, encap, isv4):
self.pg0.assert_nothing_captured()
)
self.assertEqual(ip.nh, 47)
# self.assertEqual(len(ip.options), 0)
- gre = GRE(str(p[IPv6].payload))
+ gre = GRE(scapy.compat.raw(p[IPv6].payload))
self.checkInner(gre, isv4)
elif (encap == 'l3dsr'):
ip = p[IP]
)
self.assertEqual(ip.nh, 17)
self.assertGreaterEqual(ip.hlim, 63)
- udp = UDP(str(p[IPv6].payload))
+ udp = UDP(scapy.compat.raw(p[IPv6].payload))
self.assertEqual(udp.dport, 3307)
load[asid] += 1
except:
from framework import VppTestCase, VppTestRunner
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath
+
+import scapy.compat
from scapy.layers.l2 import Ether, Raw
from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
def validate(self, rx, expected):
- self.assertEqual(rx, expected.__class__(str(expected)))
+ self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
def payload(self, len):
return 'x' * len
p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
dst="2001:db8:1f0::c0a8:1:f") / payload)
p6_translated.hlim -= 1
- p6_translated['TCP'].options = [('MSS', 1300)]
+ p6_translated[TCP].options = [('MSS', 1300)]
rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
for p in rx:
self.validate(p[1], p6_translated)
dst=self.pg0.remote_ip4) / payload)
p4_translated.id = 0
p4_translated.ttl -= 1
- p4_translated['TCP'].options = [('MSS', 1300)]
+ p4_translated[TCP].options = [('MSS', 1300)]
rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
for p in rx:
self.validate(p[1], p4_translated)
VppMplsLabel, MplsLspMode, find_mpls_route
from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP
verify_mpls_stack(self, rx, mpls_labels)
tx_eth = tx[Ether]
- rx_eth = Ether(str(rx[MPLS].payload))
+ rx_eth = Ether(scapy.compat.raw(rx[MPLS].payload))
self.assertEqual(rx_eth.src, tx_eth.src)
self.assertEqual(rx_eth.dst, tx_eth.dst)
import random
from framework import VppTestCase, VppTestRunner, running_extended_tests
+
+import scapy.compat
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
pref_n[13] = ip4_n[1]
pref_n[14] = ip4_n[2]
pref_n[15] = ip4_n[3]
- return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
+ packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
+ return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
def extract_ip4(self, ip6, plen):
"""
p = (IP(src=src_if.remote_ip4, dst=dst) /
TCP(sport=sport, dport=dport) /
Raw(data))
- p = p.__class__(str(p))
- chksum = p['TCP'].chksum
+ p = p.__class__(scapy.compat.raw(p))
+ chksum = p[TCP].chksum
proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
elif proto == IP_PROTOS.udp:
proto_header = UDP(sport=sport, dport=dport)
self.assertEqual(6, len(data))
for record in data:
# natEvent
- self.assertIn(ord(record[230]), [4, 5])
- if ord(record[230]) == 4:
+ self.assertIn(scapy.compat.orb(record[230]), [4, 5])
+ if scapy.compat.orb(record[230]) == 4:
nat44_ses_create_num += 1
else:
nat44_ses_delete_num += 1
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
- if IP_PROTOS.icmp == ord(record[4]):
+ if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
self.assertEqual(struct.pack("!H", self.icmp_id_out),
record[227])
- elif IP_PROTOS.tcp == ord(record[4]):
+ elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
self.assertEqual(struct.pack("!H", self.tcp_port_in),
record[7])
self.assertEqual(struct.pack("!H", self.tcp_port_out),
record[227])
- elif IP_PROTOS.udp == ord(record[4]):
+ elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
self.assertEqual(struct.pack("!H", self.udp_port_in),
record[7])
self.assertEqual(struct.pack("!H", self.udp_port_out),
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 3)
+ self.assertEqual(scapy.compat.orb(record[230]), 3)
# natPoolID
self.assertEqual(struct.pack("!I", 0), record[283])
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 1), record[466])
# maxSessionEntries
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 2), record[466])
# maxBIBEntries
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 5), record[466])
# maxFragmentsPendingReassembly
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 5), record[466])
# maxFragmentsPendingReassembly
record = data[0]
# natEvent
if is_create:
- self.assertEqual(ord(record[230]), 10)
+ self.assertEqual(scapy.compat.orb(record[230]), 10)
else:
- self.assertEqual(ord(record[230]), 11)
+ self.assertEqual(scapy.compat.orb(record[230]), 11)
# sourceIPv6Address
self.assertEqual(src_addr, record[27])
# postNATSourceIPv4Address
self.assertEqual(self.nat_addr_n, record[225])
# protocolIdentifier
- self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
+ self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# sourceTransportPort
record = data[0]
# natEvent
if is_create:
- self.assertEqual(ord(record[230]), 6)
+ self.assertEqual(scapy.compat.orb(record[230]), 6)
else:
- self.assertEqual(ord(record[230]), 7)
+ self.assertEqual(scapy.compat.orb(record[230]), 7)
# sourceIPv6Address
self.assertEqual(src_addr, record[27])
# destinationIPv6Address
self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
record[226])
# protocolIdentifier
- self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
+ self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4]))
# ingressVRFID
self.assertEqual(struct.pack("!I", 0), record[234])
# sourceTransportPort
self.assertEqual(1, len(data))
record = data[0]
# natEvent
- self.assertEqual(ord(record[230]), 13)
+ self.assertEqual(scapy.compat.orb(record[230]), 13)
# natQuotaExceededEvent
self.assertEqual(struct.pack("I", 3), record[466])
# maxEntriesPerUser
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
reass = self.vapi.nat_reass_dump()
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
for i in range(2):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
# send packet from host to server
pkts = self.create_stream_frag(self.pg0,
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
for i in range(2):
layer = self.proto2layer(proto)
if proto == IP_PROTOS.tcp:
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
else:
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
self.port_in = random.randint(1025, 65535)
for i in range(2):
is_inside=0)
self.vapi.nat44_forwarding_enable_disable(1)
- data = "A" * 16 + "B" * 16 + "C" * 3
+ data = b"A" * 16 + b"B" * 16 + b"C" * 3
pkts = self.create_stream_frag(self.pg1,
self.pg0.remote_ip4,
4789,
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
src_port=self.ipfix_src_port)
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
self.tcp_port_in = random.randint(1025, 65535)
pkts = self.create_stream_frag(self.pg0,
self.pg1.remote_ip4,
reass_n_start = len(reass)
# in2out
- data = 'a' * 200
+ data = b'a' * 200
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
self.pg0.add_stream(pkts)
self.assertEqual(data, p[Raw].load)
# out2in
- data = "A" * 4 + "b" * 16 + "C" * 3
+ data = b"A" * 4 + b"b" * 16 + b"C" * 3
pkts = self.create_stream_frag(self.pg1,
self.nat_addr,
20,
self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
# in2out
- data = 'a' * 200
+ data = b'a' * 200
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
pkts.reverse()
self.assertEqual(data, p[Raw].load)
# out2in
- data = "A" * 4 + "B" * 16 + "C" * 3
+ data = b"A" * 4 + b"B" * 16 + b"C" * 3
pkts = self.create_stream_frag(self.pg1,
self.nat_addr,
20,
self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
src_port=self.ipfix_src_port)
- data = 'a' * 200
+ data = b'a' * 200
pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
self.tcp_port_in, 20, data)
pkts.reverse()
for p in capture:
if p.haslayer(Data):
data = ipfix.decode_data_set(p.getlayer(Set))
- if ord(data[0][230]) == 10:
+ if scapy.compat.orb(data[0][230]) == 10:
self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
- elif ord(data[0][230]) == 6:
+ elif scapy.compat.orb(data[0][230]) == 6:
self.verify_ipfix_nat64_ses(data,
1,
self.pg0.remote_ip6n,
self.ipfix_domain_id)
if p.haslayer(Data):
data = ipfix.decode_data_set(p.getlayer(Set))
- if ord(data[0][230]) == 11:
+ if scapy.compat.orb(data[0][230]) == 11:
self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
- elif ord(data[0][230]) == 7:
+ elif scapy.compat.orb(data[0][230]) == 7:
self.verify_ipfix_nat64_ses(data,
0,
self.pg0.remote_ip6n,
VppIpTable, DpoProto
from vpp_papi import VppEnum
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, ARP, Dot1Q
from scapy.layers.inet import IP, UDP
#
# change the interface's MAC
#
- mac = [chr(0x00), chr(0x00), chr(0x00),
- chr(0x33), chr(0x33), chr(0x33)]
+ mac = [scapy.compat.chb(0x00), scapy.compat.chb(0x00),
+ scapy.compat.chb(0x00), scapy.compat.chb(0x33),
+ scapy.compat.chb(0x33), scapy.compat.chb(0x33)]
mac_string = ''.join(mac)
self.vapi.sw_interface_set_mac_address(self.pg1.sw_if_index,
from struct import unpack, unpack_from
from util import ppp, ppc
from re import compile
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP
# Format MAC Address
def get_mac_addr(bytes_addr):
- return ':'.join('%02x' % ord(b) for b in bytes_addr)
+ return ':'.join('%02x' % scapy.compat.orb(b) for b in bytes_addr)
# Format IP Address
def ipv4(bytes_addr):
- return '.'.join('%d' % ord(b) for b in bytes_addr)
+ return '.'.join('%d' % scapy.compat.orb(b) for b in bytes_addr)
# Unpack Ethernet Frame
from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
VppMplsLabel, VppMplsTable
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet import IP, UDP
# for table 1 map the n=0xff possible values of input QoS mark,
# n to 1-n
#
- output = [chr(0)] * 256
+ output = [scapy.compat.chb(0)] * 256
for i in range(0, 255):
- output[i] = chr(255 - i)
- os = ''.join(output)
+ output[i] = scapy.compat.chb(255 - i)
+ os = b''.join(output)
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
#
# For table 2 (and up) use the value n for everything
#
- output = [chr(2)] * 256
- os = ''.join(output)
+ output = [scapy.compat.chb(2)] * 256
+ os = b''.join(output)
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
self.vapi.qos_egress_map_update(2, rows)
- output = [chr(3)] * 256
- os = ''.join(output)
+ output = [scapy.compat.chb(3)] * 256
+ os = b''.join(output)
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
self.vapi.qos_egress_map_update(3, rows)
- output = [chr(4)] * 256
- os = ''.join(output)
+ output = [scapy.compat.chb(4)] * 256
+ os = b''.join(output)
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
tc=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
#
# Since we have not yet enabled the recording of the input QoS
from_ip = 6
from_mpls = 5
from_vlan = 4
- output = [chr(from_ext)] * 256
- os1 = ''.join(output)
- output = [chr(from_vlan)] * 256
- os2 = ''.join(output)
- output = [chr(from_mpls)] * 256
- os3 = ''.join(output)
- output = [chr(from_ip)] * 256
- os4 = ''.join(output)
+ output = [scapy.compat.chb(from_ext)] * 256
+ os1 = b''.join(output)
+ output = [scapy.compat.chb(from_vlan)] * 256
+ os2 = b''.join(output)
+ output = [scapy.compat.chb(from_mpls)] * 256
+ os3 = b''.join(output)
+ output = [scapy.compat.chb(from_ip)] * 256
+ os4 = b''.join(output)
rows = [{'outputs': os1},
{'outputs': os2},
{'outputs': os3},
p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
MPLS(label=32, cos=3, ttl=2) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
rx = self.send_and_expect(self.pg0, p_m1 * 65, self.pg1)
for p in rx:
MPLS(label=33, ttl=2, cos=3) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.4", tos=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
rx = self.send_and_expect(self.pg0, p_m2 * 65, self.pg1)
#
# QoS for all input values
#
- output = [chr(0)] * 256
+ output = [scapy.compat.chb(0)] * 256
for i in range(0, 255):
- output[i] = chr(255 - i)
- os = ''.join(output)
+ output[i] = scapy.compat.chb(255 - i)
+ os = b''.join(output)
rows = [{'outputs': os},
{'outputs': os},
{'outputs': os},
Dot1Q(vlan=11, prio=1) /
IP(src="1.1.1.1", dst="10.0.0.2", tos=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src="1.1.1.1", dst="10.0.0.1", tos=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
Dot1Q(vlan=11, prio=2) /
IPv6(src="2001::1", dst="2001::2", tc=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
p_v2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IPv6(src="3001::1", dst="2001::1", tc=1) /
UDP(sport=1234, dport=1234) /
- Raw(chr(100) * 65))
+ Raw(scapy.compat.chb(100) * 65))
rx = self.send_and_expect(self.pg1, p_v2 * 65, self.pg0)
import unittest
from parameterized import parameterized
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, GRE
from scapy.layers.inet import IP, UDP, ICMP
cls.pkt_infos = []
for index, info in six.iteritems(infos):
p = info.data
- # cls.logger.debug(ppp("Packet:", p.__class__(str(p))))
+ # cls.logger.debug(ppp("Packet:",
+ # p.__class__(scapy.compat.raw(p))))
fragments_400 = fragment_rfc791(p, 400)
fragments_300 = fragment_rfc791(p, 300)
fragments_200 = [
cls.pkt_infos = []
for index, info in six.iteritems(infos):
p = info.data
- # cls.logger.debug(ppp("Packet:", p.__class__(str(p))))
+ # cls.logger.debug(ppp("Packet:",
+ # p.__class__(scapy.compat.raw(p))))
fragments_400 = fragment_rfc8200(p, info.index, 400)
fragments_300 = fragment_rfc8200(p, info.index, 300)
cls.pkt_infos.append((index, fragments_400, fragments_300))
Raw())
self.extend_packet(p, 1000, self.padding)
fragments = fragment_rfc8200(p, 1, 500)
- bad_fragment = p.__class__(str(fragments[1]))
+ bad_fragment = p.__class__(scapy.compat.raw(fragments[1]))
bad_fragment[IPv6ExtHdrFragment].nh = 59
bad_fragment[IPv6ExtHdrFragment].offset = 0
self.pg_enable_capture()
cls.pkt_infos = []
for index, info in six.iteritems(infos):
p = info.data
- # cls.logger.debug(ppp("Packet:", p.__class__(str(p))))
+ # cls.logger.debug(ppp("Packet:",
+ # p.__class__(scapy.compat.raw(p))))
fragments_300 = fragment_rfc791(p, 300)
cls.pkt_infos.append((index, fragments_300))
cls.fragments_300 = [x for (_, frags) in cls.pkt_infos for x in frags]
from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
tx_ip.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- tx_ip = IP(str(tx_ip))
+ tx_ip = IP(scapy.compat.raw(tx_ip))
self.assertEqual(rx_srh.payload, tx_ip)
self.assertEqual(rx_srh.nh, 59)
# the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
- self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
+ self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
self.logger.debug("packet verification: SUCCESS")
tx_ip2.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- tx_ip2 = IP(str(tx_ip2))
+ tx_ip2 = IP(scapy.compat.raw(tx_ip2))
self.assertEqual(rx_ip, tx_ip2)
tx_ip = tx_pkt.getlayer(IPv6)
# we can't just get the 2nd Ether layer
# get the Raw content and dissect it as Ether
- tx_eth1 = Ether(str(tx_pkt[Raw]))
+ tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
# verify if rx'ed packet has no SRH
self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
# read back the dumped packet (with str())
# to force computing these fields
# probably other ways are possible
- p = Ether(str(p))
+ p = Ether(scapy.compat.raw(p))
payload_info.data = p.copy()
self.logger.debug(ppp("Created packet:", p))
pkts.append(p)
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
payload_info = self.payload_to_info(
- str(Ether(str(packet[Raw]))[Raw]))
+ Ether(scapy.compat.r(packet[Raw]))[Raw])
return payload_info
:param compare_func: function to compare in and out packet
"""
self.logger.info("Verifying capture on interface %s using function %s"
- % (dst_if.name, compare_func.func_name))
+ % (dst_if.name, compare_func.__name__))
last_info = dict()
for i in self.pg_interfaces:
from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
tx_ip2.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- tx_ip2 = IP(str(tx_ip2))
+ tx_ip2 = IP(scapy.compat.raw(tx_ip2))
self.assertEqual(rx_ip, tx_ip2)
tx_ip.chksum = None
# -> read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- self.assertEqual(rx_srh.payload, IP(str(tx_ip)))
+ self.assertEqual(rx_srh.payload, IP(scapy.compat.raw(tx_ip)))
self.logger.debug("packet verification: SUCCESS")
tx_ip = tx_pkt.getlayer(IPv6)
# we can't just get the 2nd Ether layer
# get the Raw content and dissect it as Ether
- tx_eth1 = Ether(str(tx_pkt[Raw]))
+ tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
# verify if rx'ed packet has no SRH
self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
# the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
tx_ether = tx_pkt.getlayer(Ether)
- self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
+ self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
self.logger.debug("packet verification: SUCCESS")
# read back the dumped packet (with str())
# to force computing these fields
# probably other ways are possible
- p = Ether(str(p))
+ p = Ether(scapy.compat.raw(p))
payload_info.data = p.copy()
self.logger.debug(ppp("Created packet:", p))
pkts.append(p)
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
payload_info = self.payload_to_info(
- Ether(str(packet[Raw]))[Raw])
+ Ether(scapy.compat.raw(packet[Raw]))[Raw])
return payload_info
:param compare_func: function to compare in and out packet
"""
self.logger.info("Verifying capture on interface %s using function %s"
- % (dst_if.name, compare_func.func_name))
+ % (dst_if.name, compare_func.__name__))
last_info = dict()
for i in self.pg_interfaces:
from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
tx_ip.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- tx_ip = IP(str(tx_ip))
+ tx_ip = IP(scapy.compat.raw(tx_ip))
self.assertEqual(payload, tx_ip)
payload = rx_ip.payload
# the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
- self.assertEqual(Ether(str(payload)), tx_ether)
+ self.assertEqual(Ether(scapy.compat.raw(payload)), tx_ether)
self.logger.debug("packet verification: SUCCESS")
tx_ip2.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- tx_ip2 = IP(str(tx_ip2))
+ tx_ip2 = IP(scapy.compat.raw(tx_ip2))
self.assertEqual(rx_ip, tx_ip2)
tx_ip = tx_pkt.getlayer(IPv6)
# we can't just get the 2nd Ether layer
# get the Raw content and dissect it as Ether
- tx_eth1 = Ether(str(tx_pkt[Raw]))
+ tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
# verify if rx'ed packet has no SRH
self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
# read back the dumped packet (with str())
# to force computing these fields
# probably other ways are possible
- p = Ether(str(p))
+ p = Ether(scapy.compat.raw(p))
payload_info.data = p.copy()
self.logger.debug(ppp("Created packet:", p))
pkts.append(p)
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
payload_info = self.payload_to_info(
- Ether(str(packet[Raw]))[Raw])
+ Ether(scapy.compat.raw(packet[Raw]))[Raw])
return payload_info
:param compare_func: function to compare in and out packet
"""
self.logger.info("Verifying capture on interface %s using function %s"
- % (dst_if.name, compare_func.func_name))
+ % (dst_if.name, compare_func.__name__))
last_info = dict()
for i in self.pg_interfaces:
import sys
import os.path
+import scapy.compat
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP
from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
"""
logger = LoggerWrapper(_logger)
logger.debug(ppp("Fragmenting packet:", packet))
- packet = packet.__class__(str(packet)) # recalculate all values
+ packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
if len(packet[IP].options) > 0:
raise Exception("Not implemented")
if len(packet) <= fragsize:
pre_ip_len = len(packet) - len(packet[IP])
ip_header_len = packet[IP].ihl * 4
- hex_packet = str(packet)
+ hex_packet = scapy.compat.raw(packet)
hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
:returns: list of fragments
"""
logger = LoggerWrapper(_logger)
- packet = packet.__class__(str(packet)) # recalculate all values
+ packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
if len(packet) <= fragsize:
return [packet]
logger.debug(ppp("Fragmenting packet:", packet))
logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
- hex_payload = str(ext_and_upper_layer)
+ hex_payload = scapy.compat.raw(ext_and_upper_layer)
logger.debug("Payload length is %s" % len(hex_payload))
logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
p[IPv6ExtHdrFragment].id = identification
p[IPv6ExtHdrFragment].offset = 0
p[IPv6ExtHdrFragment].m = 1
- p = p.__class__(str(p))
+ p = p.__class__(scapy.compat.raw(p))
logger.debug(ppp("Fragment %s:" % len(pkts), p))
pkts.append(p)
offset = first_payload_len_nfb * 8
p[IPv6ExtHdrFragment].id = identification
p[IPv6ExtHdrFragment].offset = offset / 8
p[IPv6ExtHdrFragment].m = 1
- p = p.__class__(str(p))
+ p = p.__class__(scapy.compat.raw(p))
logger.debug(ppp("Fragment %s:" % len(pkts), p))
pkts.append(p)
offset = offset + l_nfb * 8
import socket
import struct
from traceback import format_exc, format_stack
+
+import scapy.compat
from scapy.utils import wrpcap, rdpcap, PcapReader
from scapy.plist import PacketList
from vpp_interface import VppInterface
# Make Dot1AD packet content recognizable to scapy
if arp_reply.type == 0x88a8:
arp_reply.type = 0x8100
- arp_reply = Ether(str(arp_reply))
+ arp_reply = Ether(scapy.compat.raw(arp_reply))
try:
if arp_reply[ARP].op == ARP.is_at:
self.test.logger.info("VPP %s MAC address is %s " %
ndp_reply = captured_packet.copy() # keep original for exception
# Make Dot1AD packet content recognizable to scapy
if ndp_reply.type == 0x88a8:
+ self._test.logger.info(
+ "Replacing EtherType: 0x88a8 with "
+ "0x8100 and regenerating Ethernet header. ")
ndp_reply.type = 0x8100
- ndp_reply = Ether(str(ndp_reply))
+ ndp_reply = Ether(scapy.compat.raw(ndp_reply))
try:
ndp_na = ndp_reply[ICMPv6ND_NA]
opt = ndp_na[ICMPv6NDOptDstLLAddr]