All callers of payload_to_info were required to wrap payload with str().
Refactor to call scapy's payload.load for raw payloads or specify the
specific fieldname.
Change-Id: I1c80599d4df8dc129dbb8274733afaad406d5bcf
Signed-off-by: Paul Vinciguerra <pvinci@vinciconsulting.com>
ip = packet[IP]
udp = packet[UDP]
# convert the payload to packet info object
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
# make sure the indexes match
self.assert_equal(payload_info.src, src_if.sw_if_index,
"source sw_if_index")
info.ip, info.proto)
@staticmethod
- def payload_to_info(payload):
+ def payload_to_info(payload, payload_field='load'):
"""
Convert packet payload to _PacketInfo object
:param payload: packet payload
-
+ :type: <class 'scapy.packet.Raw'>
+ :param: payload_field: packet fieldname of payload "load" for
+ <class 'scapy.packet.Raw'>
:returns: _PacketInfo object containing de-serialized data from payload
"""
- numbers = payload.split()
+ numbers = getattr(payload, payload_field).split()
info = _PacketInfo()
info.index = int(numbers[0])
info.src = int(numbers[1])
# Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
if traffic_type == self.ICMP and ip_type == self.IPV6:
payload_info = self.payload_to_info(
- packet[ICMPv6EchoRequest].data)
+ packet[ICMPv6EchoRequest], 'data')
payload = packet[ICMPv6EchoRequest]
else:
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
payload = packet[self.proto_map[payload_info.proto]]
except:
self.logger.error(ppp("Unexpected or invalid packet "
try:
ip_received = packet[IP]
proto_received = packet[proto_l]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug(
try:
ip6_received = packet[IPv6]
proto_received = packet[proto_l]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug(
packet[ICMPv6EchoRequest].data)
payload = packet[ICMPv6EchoRequest]
else:
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
payload = packet[self.proto_map[payload_info.proto]]
except:
self.logger.error(ppp("Unexpected or invalid packet "
try:
ip = packet[IP]
udp = packet[UDP]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug(
def _find_ip_match(self, find_in, pkt):
for p in find_in:
- if self.payload_to_info(str(p[Raw])) == \
- self.payload_to_info(str(pkt[Raw])):
+ if self.payload_to_info(p[Raw]) == \
+ self.payload_to_info(pkt[Raw]):
if p[IP].src != pkt[IP].src:
break
if p[IP].dst != pkt[IP].dst:
for packet in capture:
ip = packet[IP]
udp = packet[IP][UDP]
- payload_info = self.payload_to_info(str(packet[IP][UDP][Raw]))
+ payload_info = self.payload_to_info(packet[IP][UDP][Raw])
self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
for packet in capture:
ip = packet[IP]
udp = packet[IP][UDP]
- payload_info = self.payload_to_info(str(packet[IP][UDP][Raw]))
+ payload_info = self.payload_to_info(packet[IP][UDP][Raw])
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
try:
ip = packet[IP]
udp = packet[UDP]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
try:
ip = packet[IPv6]
udp = packet[inet6.UDP]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug(
try:
ip = packet[IPv6]
udp = packet[UDP]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
for packet in capture:
try:
ip_received = packet[ip_l]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
ip_sent = self._packet_infos[packet_index].data[ip_l]
self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
last_info[i.sw_if_index] = None
dst_sw_if_index = pg_if.sw_if_index
for packet in capture:
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
try:
ip = packet[IP]
udp = packet[UDP]
last_info[i.sw_if_index] = None
dst_sw_if_index = pg_if.sw_if_index
for packet in capture:
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
src_sw_if_index = payload_info.src
src_if = None
for ifc in self.pg_interfaces:
try:
ip = packet[IP]
udp = packet[UDP]
- info = self.payload_to_info(str(packet[Raw]))
+ info = self.payload_to_info(packet[Raw])
self.assertEqual(info.dst, dst)
self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
(dst_if.name, info.src, info.index))
try:
ip = packet[IP]
udp = packet[UDP]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
last_info[i.sw_if_index] = None
dst_sw_if_index = pg_if.sw_if_index
for packet in capture:
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
try:
ip = packet[IP]
udp = packet[UDP]
self.assertEqual(gre.flags, 0)
self.assertEqual(gre.version, 0)
inner = IPver(str(gre.payload))
- payload_info = self.payload_to_info(str(inner[Raw]))
+ payload_info = self.payload_to_info(inner[Raw])
self.info = self.packet_infos[payload_info.index]
self.assertEqual(payload_info.src, self.pg0.sw_if_index)
self.assertEqual(str(inner), str(self.info.data[IPver]))
self.logger.debug(ppp("Got packet:", packet))
ip = packet[scapy_ip_family]
udp = packet[UDP]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertTrue(
packet_index not in dropped_packet_indexes,
self.logger.debug(ppp("Got packet:", packet))
ip = packet[IP]
icmp = packet[ICMP]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
if packet_index in seen:
raise Exception(ppp("Duplicate packet received", packet))
self.logger.debug(ppp("Got packet:", packet))
ip = packet[ip_class]
udp = packet[UDP]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertTrue(
packet_index not in dropped_packet_indexes,
# but packet[Raw] gives the complete payload
# (incl L2 header) for the T.Encaps L2 case
try:
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
except:
# remote L2 header from packet[Raw]:
# but packet[Raw] gives the complete payload
# (incl L2 header) for the T.Encaps L2 case
try:
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
except:
# remote L2 header from packet[Raw]:
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
payload_info = self.payload_to_info(
- str(Ether(str(packet[Raw]))[Raw]))
+ Ether(str(packet[Raw]))[Raw])
return payload_info
# but packet[Raw] gives the complete payload
# (incl L2 header) for the T.Encaps L2 case
try:
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
except:
# remote L2 header from packet[Raw]:
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
payload_info = self.payload_to_info(
- str(Ether(str(packet[Raw]))[Raw]))
+ Ether(str(packet[Raw]))[Raw])
return payload_info