""" test framework utilities """
+import abc
import socket
+from socket import AF_INET6
+import six
import sys
import os.path
-from abc import abstractmethod, ABCMeta
-from scapy.utils6 import in6_mactoifaceid
+import scapy.compat
from scapy.layers.l2 import Ether
-from scapy.packet import Raw
from scapy.layers.inet import IP
from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
IPv6ExtHdrHopByHop
+from scapy.packet import Raw
from scapy.utils import hexdump
-from socket import AF_INET6
+from scapy.utils6 import in6_mactoifaceid
+
from io import BytesIO
from vpp_papi import mac_pton
for ip in ip4_range(ip4, s, e))
+# wrapper around scapy library function.
def mk_ll_addr(mac):
euid = in6_mactoifaceid(mac)
addr = "fe80::" + euid
class NumericConstant(object):
- __metaclass__ = ABCMeta
desc_dict = {}
- @abstractmethod
def __init__(self, value):
self._value = value
"""
logger = LoggerWrapper(_logger)
logger.debug(ppp("Fragmenting packet:", packet))
- packet = packet.__class__(str(packet)) # recalculate all values
+ packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
if len(packet[IP].options) > 0:
raise Exception("Not implemented")
if len(packet) <= fragsize:
pre_ip_len = len(packet) - len(packet[IP])
ip_header_len = packet[IP].ihl * 4
- hex_packet = str(packet)
+ hex_packet = scapy.compat.raw(packet)
hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
:returns: list of fragments
"""
logger = LoggerWrapper(_logger)
- packet = packet.__class__(str(packet)) # recalculate all values
+ packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
if len(packet) <= fragsize:
return [packet]
logger.debug(ppp("Fragmenting packet:", packet))
logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
- hex_payload = str(ext_and_upper_layer)
+ hex_payload = scapy.compat.raw(ext_and_upper_layer)
logger.debug("Payload length is %s" % len(hex_payload))
logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
fragment_ext_hdr = IPv6ExtHdrFragment()
logger.debug(ppp("Fragment header:", fragment_ext_hdr))
+ len_ext_and_upper_layer_payload = len(ext_and_upper_layer.payload)
+ if not len_ext_and_upper_layer_payload and \
+ hasattr(ext_and_upper_layer, "data"):
+ len_ext_and_upper_layer_payload = len(ext_and_upper_layer.data)
+
if len(per_fragment_headers) + len(fragment_ext_hdr) +\
- len(ext_and_upper_layer) - len(ext_and_upper_layer.payload)\
+ len(ext_and_upper_layer) - len_ext_and_upper_layer_payload\
> fragsize:
raise Exception("Cannot fragment this packet - MTU too small "
"(%s, %s, %s, %s, %s)" % (
len(per_fragment_headers), len(fragment_ext_hdr),
len(ext_and_upper_layer),
- len(ext_and_upper_layer.payload), fragsize))
+ len_ext_and_upper_layer_payload, fragsize))
orig_nh = packet[IPv6].nh
p = per_fragment_headers
p[IPv6ExtHdrFragment].id = identification
p[IPv6ExtHdrFragment].offset = 0
p[IPv6ExtHdrFragment].m = 1
- p = p.__class__(str(p))
+ p = p.__class__(scapy.compat.raw(p))
logger.debug(ppp("Fragment %s:" % len(pkts), p))
pkts.append(p)
offset = first_payload_len_nfb * 8
p[IPv6ExtHdrFragment].id = identification
p[IPv6ExtHdrFragment].offset = offset / 8
p[IPv6ExtHdrFragment].m = 1
- p = p.__class__(str(p))
+ p = p.__class__(scapy.compat.raw(p))
logger.debug(ppp("Fragment %s:" % len(pkts), p))
pkts.append(p)
offset = offset + l_nfb * 8