1 """ test framework utilities """
7 from socket import AF_INET6
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP
15 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
17 from scapy.packet import Raw
18 from scapy.utils import hexdump
19 from scapy.utils6 import in6_mactoifaceid
21 from io import BytesIO
22 from vpp_papi import mac_pton
24 # Set up an empty logger for the testcase that can be overridden as necessary
25 null_logger = logging.getLogger('VppTestCase.util')
26 null_logger.addHandler(logging.NullHandler())
29 def ppp(headline, packet):
30 """ Return string containing the output of scapy packet.show() call. """
31 return '%s\n%s\n\n%s\n' % (headline,
32 hexdump(packet, dump=True),
33 packet.show(dump=True))
36 def ppc(headline, capture, limit=10):
37 """ Return string containing ppp() printout for a capture.
39 :param headline: printed as first line of output
40 :param capture: packets to print
41 :param limit: limit the print to # of packets
46 if limit < len(capture):
47 tail = "\nPrint limit reached, %s out of %s packets printed" % (
49 body = "".join([ppp("Packet #%s:" % count, p)
50 for count, p in zip(range(0, limit), capture)])
51 return "%s\n%s%s" % (headline, body, tail)
54 def ip4_range(ip4, s, e):
55 tmp = ip4.rsplit('.', 1)[0]
56 return ("%s.%d" % (tmp, i) for i in range(s, e))
59 def mcast_ip_to_mac(ip):
60 ip = ipaddress.ip_address(ip)
61 if not ip.is_multicast:
62 raise ValueError("Must be multicast address.")
65 mcast_mac = "01:00:5e:%02x:%02x:%02x" % ((ip_as_int >> 16) & 0x7f,
66 (ip_as_int >> 8) & 0xff,
69 mcast_mac = "33:33:%02x:%02x:%02x:%02x" % ((ip_as_int >> 24) & 0xff,
70 (ip_as_int >> 16) & 0xff,
71 (ip_as_int >> 8) & 0xff,
76 # wrapper around scapy library function.
78 euid = in6_mactoifaceid(str(mac))
79 addr = "fe80::" + euid
83 def ip6_normalize(ip6):
84 return socket.inet_ntop(socket.AF_INET6,
85 socket.inet_pton(socket.AF_INET6, ip6))
88 def get_core_path(tempdir):
89 return "%s/%s" % (tempdir, get_core_pattern())
92 def is_core_present(tempdir):
93 return os.path.isfile(get_core_path(tempdir))
96 def get_core_pattern():
97 with open("/proc/sys/kernel/core_pattern", "r") as f:
98 corefmt = f.read().strip()
102 def check_core_path(logger, core_path):
103 corefmt = get_core_pattern()
104 if corefmt.startswith("|"):
106 "WARNING: redirecting the core dump through a"
107 " filter may result in truncated dumps.")
109 " You may want to check the filter settings"
110 " or uninstall it and edit the"
111 " /proc/sys/kernel/core_pattern accordingly.")
113 " current core pattern is: %s" % corefmt)
116 class NumericConstant(object):
120 def __init__(self, value):
130 if self._value in self.desc_dict:
131 return self.desc_dict[self._value]
136 """ Generic test host "connected" to VPPs interface. """
146 return mac_pton(self._mac)
150 """ IPv4 address - string """
155 """ IPv4 address of remote host - raw, suitable as API parameter."""
156 return socket.inet_pton(socket.AF_INET, self._ip4)
160 """ IPv6 address - string """
165 """ IPv6 address of remote host - raw, suitable as API parameter."""
166 return socket.inet_pton(socket.AF_INET6, self._ip6)
170 """ IPv6 link-local address - string """
175 """ IPv6 link-local address of remote host -
176 raw, suitable as API parameter."""
177 return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
179 def __eq__(self, other):
180 if isinstance(other, Host):
181 return (self.mac == other.mac and
182 self.ip4 == other.ip4 and
183 self.ip6 == other.ip6 and
184 self.ip6_ll == other.ip6_ll)
188 def __ne__(self, other):
189 return not self.__eq__(other)
192 return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
198 return hash(self.__repr__())
200 def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
204 self._ip6_ll = ip6_ll
207 class ForeignAddressFactory(object):
210 net_template = '10.10.10.{}'
211 net = net_template.format(0) + '/' + str(prefix_len)
215 raise Exception("Network host address exhaustion")
217 return self.net_template.format(self.count)
221 """ L4 'connection' tied to two VPP interfaces """
223 def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
224 self.testcase = testcase
225 self.ifs = [None, None]
228 self.address_family = af
229 self.l4proto = l4proto
230 self.ports = [None, None]
231 self.ports[0] = port1
232 self.ports[1] = port2
235 def pkt(self, side, l4args={}, payload="x"):
236 is_ip6 = 1 if self.address_family == AF_INET6 else 0
239 src_if = self.ifs[s0]
240 dst_if = self.ifs[s1]
241 layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
242 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
243 merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
244 merged_l4args.update(l4args)
245 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
247 self.l4proto(**merged_l4args) /
251 def send(self, side, flags=None, payload=""):
253 if flags is not None:
254 l4args['flags'] = flags
255 self.ifs[side].add_stream(self.pkt(side,
256 l4args=l4args, payload=payload))
257 self.ifs[1 - side].enable_capture()
258 self.testcase.pg_start()
260 def recv(self, side):
261 p = self.ifs[side].wait_for_packet(1)
264 def send_through(self, side, flags=None, payload=""):
265 self.send(side, flags, payload)
266 p = self.recv(1 - side)
269 def send_pingpong(self, side, flags1=None, flags2=None):
270 p1 = self.send_through(side, flags1)
271 p2 = self.send_through(1 - side, flags2)
276 L4_CONN_SIDE_ZERO = 0
280 def fragment_rfc791(packet, fragsize, logger=null_logger):
282 Fragment an IPv4 packet per RFC 791
283 :param packet: packet to fragment
284 :param fragsize: size at which to fragment
285 :note: IP options are not supported
286 :returns: list of fragments
288 logger.debug(ppp("Fragmenting packet:", packet))
289 packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
290 if len(packet[IP].options) > 0:
291 raise Exception("Not implemented")
292 if len(packet) <= fragsize:
295 pre_ip_len = len(packet) - len(packet[IP])
296 ip_header_len = packet[IP].ihl * 4
297 hex_packet = scapy.compat.raw(packet)
298 hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
299 hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
303 otl = len(packet[IP])
304 nfb = int((fragsize - pre_ip_len - ihl * 4) / 8)
307 p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
310 p[IP].len = ihl * 4 + nfb * 8
314 p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
315 p[IP].len = otl - nfb * 8
316 p[IP].frag = fo + nfb
319 more_fragments = fragment_rfc791(p, fragsize, logger)
320 pkts.extend(more_fragments)
325 def fragment_rfc8200(packet, identification, fragsize, logger=null_logger):
327 Fragment an IPv6 packet per RFC 8200
328 :param packet: packet to fragment
329 :param fragsize: size at which to fragment
330 :note: IP options are not supported
331 :returns: list of fragments
333 packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
334 if len(packet) <= fragsize:
336 logger.debug(ppp("Fragmenting packet:", packet))
340 hop_by_hop_hdr = None
344 l = packet.getlayer(counter)
346 if l.__class__ is IPv6:
348 # ignore 2nd IPv6 header and everything below..
352 elif l.__class__ is IPv6ExtHdrFragment:
353 raise Exception("Already fragmented")
354 elif l.__class__ is IPv6ExtHdrRouting:
355 routing_hdr = counter
356 elif l.__class__ is IPv6ExtHdrHopByHop:
357 hop_by_hop_hdr = counter
358 elif seen_ipv6 and not upper_layer and \
359 not l.__class__.__name__.startswith('IPv6ExtHdr'):
360 upper_layer = counter
361 counter = counter + 1
362 l = packet.getlayer(counter)
365 "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
366 (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
368 if upper_layer is None:
369 raise Exception("Upper layer header not found in IPv6 packet")
371 last_per_fragment_hdr = ipv6_nr
372 if routing_hdr is None:
373 if hop_by_hop_hdr is not None:
374 last_per_fragment_hdr = hop_by_hop_hdr
376 last_per_fragment_hdr = routing_hdr
377 logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
379 per_fragment_headers = packet.copy()
380 per_fragment_headers[last_per_fragment_hdr].remove_payload()
381 logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
383 ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
384 hex_payload = scapy.compat.raw(ext_and_upper_layer)
385 logger.debug("Payload length is %s" % len(hex_payload))
386 logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
388 fragment_ext_hdr = IPv6ExtHdrFragment()
389 logger.debug(ppp("Fragment header:", fragment_ext_hdr))
391 len_ext_and_upper_layer_payload = len(ext_and_upper_layer.payload)
392 if not len_ext_and_upper_layer_payload and \
393 hasattr(ext_and_upper_layer, "data"):
394 len_ext_and_upper_layer_payload = len(ext_and_upper_layer.data)
396 if len(per_fragment_headers) + len(fragment_ext_hdr) +\
397 len(ext_and_upper_layer) - len_ext_and_upper_layer_payload\
399 raise Exception("Cannot fragment this packet - MTU too small "
400 "(%s, %s, %s, %s, %s)" % (
401 len(per_fragment_headers), len(fragment_ext_hdr),
402 len(ext_and_upper_layer),
403 len_ext_and_upper_layer_payload, fragsize))
405 orig_nh = packet[IPv6].nh
406 p = per_fragment_headers
409 p = p / fragment_ext_hdr
410 del p[IPv6ExtHdrFragment].nh
411 first_payload_len_nfb = int((fragsize - len(p)) / 8)
412 p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
414 p[IPv6ExtHdrFragment].nh = orig_nh
415 p[IPv6ExtHdrFragment].id = identification
416 p[IPv6ExtHdrFragment].offset = 0
417 p[IPv6ExtHdrFragment].m = 1
418 p = p.__class__(scapy.compat.raw(p))
419 logger.debug(ppp("Fragment %s:" % len(pkts), p))
421 offset = first_payload_len_nfb * 8
422 logger.debug("Offset after first fragment: %s" % offset)
423 while len(hex_payload) > offset:
424 p = per_fragment_headers
427 p = p / fragment_ext_hdr
428 del p[IPv6ExtHdrFragment].nh
429 l_nfb = int((fragsize - len(p)) / 8)
430 p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
431 p[IPv6ExtHdrFragment].nh = orig_nh
432 p[IPv6ExtHdrFragment].id = identification
433 p[IPv6ExtHdrFragment].offset = int(offset / 8)
434 p[IPv6ExtHdrFragment].m = 1
435 p = p.__class__(scapy.compat.raw(p))
436 logger.debug(ppp("Fragment %s:" % len(pkts), p))
438 offset = offset + l_nfb * 8
440 pkts[-1][IPv6ExtHdrFragment].m = 0 # reset more-flags in last fragment
445 def reassemble4_core(listoffragments, return_ip):
447 first = listoffragments[0]
449 for pkt in listoffragments:
450 buffer.seek(pkt[IP].frag*8)
451 buffer.write(bytes(pkt[IP].payload))
452 first.len = len(buffer.getvalue()) + 20
456 header = bytes(first[IP])[:20]
457 return first[IP].__class__(header + buffer.getvalue())
459 header = bytes(first[Ether])[:34]
460 return first[Ether].__class__(header + buffer.getvalue())
463 def reassemble4_ether(listoffragments):
464 return reassemble4_core(listoffragments, False)
467 def reassemble4(listoffragments):
468 return reassemble4_core(listoffragments, True)