1 """ test framework utilities """
6 from socket import AF_INET6
12 from scapy.layers.l2 import Ether
13 from scapy.layers.inet import IP
14 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
16 from scapy.packet import Raw
17 from scapy.utils import hexdump
18 from scapy.utils6 import in6_mactoifaceid
20 from io import BytesIO
21 from vpp_papi import mac_pton
24 def ppp(headline, packet):
25 """ Return string containing the output of scapy packet.show() call. """
26 return '%s\n%s\n\n%s\n' % (headline,
27 hexdump(packet, dump=True),
28 packet.show(dump=True))
31 def ppc(headline, capture, limit=10):
32 """ Return string containing ppp() printout for a capture.
34 :param headline: printed as first line of output
35 :param capture: packets to print
36 :param limit: limit the print to # of packets
41 if limit < len(capture):
42 tail = "\nPrint limit reached, %s out of %s packets printed" % (
44 body = "".join([ppp("Packet #%s:" % count, p)
45 for count, p in zip(range(0, limit), capture)])
46 return "%s\n%s%s" % (headline, body, tail)
49 def ip4_range(ip4, s, e):
50 tmp = ip4.rsplit('.', 1)[0]
51 return ("%s.%d" % (tmp, i) for i in range(s, e))
54 def mcast_ip_to_mac(ip):
55 ip = ipaddress.ip_address(ip)
56 if not ip.is_multicast:
57 raise ValueError("Must be multicast address.")
60 mcast_mac = "01:00:5e:%02x:%02x:%02x" % ((ip_as_int >> 16) & 0x7f,
61 (ip_as_int >> 8) & 0xff,
64 mcast_mac = "33:33:%02x:%02x:%02x:%02x" % ((ip_as_int >> 24) & 0xff,
65 (ip_as_int >> 16) & 0xff,
66 (ip_as_int >> 8) & 0xff,
71 # wrapper around scapy library function.
73 euid = in6_mactoifaceid(str(mac))
74 addr = "fe80::" + euid
78 def ip6_normalize(ip6):
79 return socket.inet_ntop(socket.AF_INET6,
80 socket.inet_pton(socket.AF_INET6, ip6))
83 def get_core_path(tempdir):
84 return "%s/%s" % (tempdir, get_core_pattern())
87 def is_core_present(tempdir):
88 return os.path.isfile(get_core_path(tempdir))
91 def get_core_pattern():
92 with open("/proc/sys/kernel/core_pattern", "r") as f:
93 corefmt = f.read().strip()
97 def check_core_path(logger, core_path):
98 corefmt = get_core_pattern()
99 if corefmt.startswith("|"):
101 "WARNING: redirecting the core dump through a"
102 " filter may result in truncated dumps.")
104 " You may want to check the filter settings"
105 " or uninstall it and edit the"
106 " /proc/sys/kernel/core_pattern accordingly.")
108 " current core pattern is: %s" % corefmt)
111 class NumericConstant(object):
115 def __init__(self, value):
125 if self._value in self.desc_dict:
126 return self.desc_dict[self._value]
131 """ Generic test host "connected" to VPPs interface. """
141 return mac_pton(self._mac)
145 """ IPv4 address - string """
150 """ IPv4 address of remote host - raw, suitable as API parameter."""
151 return socket.inet_pton(socket.AF_INET, self._ip4)
155 """ IPv6 address - string """
160 """ IPv6 address of remote host - raw, suitable as API parameter."""
161 return socket.inet_pton(socket.AF_INET6, self._ip6)
165 """ IPv6 link-local address - string """
170 """ IPv6 link-local address of remote host -
171 raw, suitable as API parameter."""
172 return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
174 def __eq__(self, other):
175 if isinstance(other, Host):
176 return (self.mac == other.mac and
177 self.ip4 == other.ip4 and
178 self.ip6 == other.ip6 and
179 self.ip6_ll == other.ip6_ll)
183 def __ne__(self, other):
184 return not self.__eq__(other)
187 return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
193 return hash(self.__repr__())
195 def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
199 self._ip6_ll = ip6_ll
202 class ForeignAddressFactory(object):
205 net_template = '10.10.10.{}'
206 net = net_template.format(0) + '/' + str(prefix_len)
210 raise Exception("Network host address exhaustion")
212 return self.net_template.format(self.count)
216 """ L4 'connection' tied to two VPP interfaces """
218 def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
219 self.testcase = testcase
220 self.ifs = [None, None]
223 self.address_family = af
224 self.l4proto = l4proto
225 self.ports = [None, None]
226 self.ports[0] = port1
227 self.ports[1] = port2
230 def pkt(self, side, l4args={}, payload="x"):
231 is_ip6 = 1 if self.address_family == AF_INET6 else 0
234 src_if = self.ifs[s0]
235 dst_if = self.ifs[s1]
236 layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
237 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
238 merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
239 merged_l4args.update(l4args)
240 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
242 self.l4proto(**merged_l4args) /
246 def send(self, side, flags=None, payload=""):
248 if flags is not None:
249 l4args['flags'] = flags
250 self.ifs[side].add_stream(self.pkt(side,
251 l4args=l4args, payload=payload))
252 self.ifs[1 - side].enable_capture()
253 self.testcase.pg_start()
255 def recv(self, side):
256 p = self.ifs[side].wait_for_packet(1)
259 def send_through(self, side, flags=None, payload=""):
260 self.send(side, flags, payload)
261 p = self.recv(1 - side)
264 def send_pingpong(self, side, flags1=None, flags2=None):
265 p1 = self.send_through(side, flags1)
266 p2 = self.send_through(1 - side, flags2)
271 L4_CONN_SIDE_ZERO = 0
275 class LoggerWrapper(object):
276 def __init__(self, logger=None):
277 self._logger = logger
279 def debug(self, *args, **kwargs):
281 self._logger.debug(*args, **kwargs)
283 def error(self, *args, **kwargs):
285 self._logger.error(*args, **kwargs)
288 def fragment_rfc791(packet, fragsize, _logger=None):
290 Fragment an IPv4 packet per RFC 791
291 :param packet: packet to fragment
292 :param fragsize: size at which to fragment
293 :note: IP options are not supported
294 :returns: list of fragments
296 logger = LoggerWrapper(_logger)
297 logger.debug(ppp("Fragmenting packet:", packet))
298 packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
299 if len(packet[IP].options) > 0:
300 raise Exception("Not implemented")
301 if len(packet) <= fragsize:
304 pre_ip_len = len(packet) - len(packet[IP])
305 ip_header_len = packet[IP].ihl * 4
306 hex_packet = scapy.compat.raw(packet)
307 hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
308 hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
312 otl = len(packet[IP])
313 nfb = int((fragsize - pre_ip_len - ihl * 4) / 8)
316 p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
319 p[IP].len = ihl * 4 + nfb * 8
323 p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
324 p[IP].len = otl - nfb * 8
325 p[IP].frag = fo + nfb
328 more_fragments = fragment_rfc791(p, fragsize, _logger)
329 pkts.extend(more_fragments)
334 def fragment_rfc8200(packet, identification, fragsize, _logger=None):
336 Fragment an IPv6 packet per RFC 8200
337 :param packet: packet to fragment
338 :param fragsize: size at which to fragment
339 :note: IP options are not supported
340 :returns: list of fragments
342 logger = LoggerWrapper(_logger)
343 packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
344 if len(packet) <= fragsize:
346 logger.debug(ppp("Fragmenting packet:", packet))
350 hop_by_hop_hdr = None
354 l = packet.getlayer(counter)
356 if l.__class__ is IPv6:
358 # ignore 2nd IPv6 header and everything below..
362 elif l.__class__ is IPv6ExtHdrFragment:
363 raise Exception("Already fragmented")
364 elif l.__class__ is IPv6ExtHdrRouting:
365 routing_hdr = counter
366 elif l.__class__ is IPv6ExtHdrHopByHop:
367 hop_by_hop_hdr = counter
368 elif seen_ipv6 and not upper_layer and \
369 not l.__class__.__name__.startswith('IPv6ExtHdr'):
370 upper_layer = counter
371 counter = counter + 1
372 l = packet.getlayer(counter)
375 "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
376 (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
378 if upper_layer is None:
379 raise Exception("Upper layer header not found in IPv6 packet")
381 last_per_fragment_hdr = ipv6_nr
382 if routing_hdr is None:
383 if hop_by_hop_hdr is not None:
384 last_per_fragment_hdr = hop_by_hop_hdr
386 last_per_fragment_hdr = routing_hdr
387 logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
389 per_fragment_headers = packet.copy()
390 per_fragment_headers[last_per_fragment_hdr].remove_payload()
391 logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
393 ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
394 hex_payload = scapy.compat.raw(ext_and_upper_layer)
395 logger.debug("Payload length is %s" % len(hex_payload))
396 logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
398 fragment_ext_hdr = IPv6ExtHdrFragment()
399 logger.debug(ppp("Fragment header:", fragment_ext_hdr))
401 len_ext_and_upper_layer_payload = len(ext_and_upper_layer.payload)
402 if not len_ext_and_upper_layer_payload and \
403 hasattr(ext_and_upper_layer, "data"):
404 len_ext_and_upper_layer_payload = len(ext_and_upper_layer.data)
406 if len(per_fragment_headers) + len(fragment_ext_hdr) +\
407 len(ext_and_upper_layer) - len_ext_and_upper_layer_payload\
409 raise Exception("Cannot fragment this packet - MTU too small "
410 "(%s, %s, %s, %s, %s)" % (
411 len(per_fragment_headers), len(fragment_ext_hdr),
412 len(ext_and_upper_layer),
413 len_ext_and_upper_layer_payload, fragsize))
415 orig_nh = packet[IPv6].nh
416 p = per_fragment_headers
419 p = p / fragment_ext_hdr
420 del p[IPv6ExtHdrFragment].nh
421 first_payload_len_nfb = int((fragsize - len(p)) / 8)
422 p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
424 p[IPv6ExtHdrFragment].nh = orig_nh
425 p[IPv6ExtHdrFragment].id = identification
426 p[IPv6ExtHdrFragment].offset = 0
427 p[IPv6ExtHdrFragment].m = 1
428 p = p.__class__(scapy.compat.raw(p))
429 logger.debug(ppp("Fragment %s:" % len(pkts), p))
431 offset = first_payload_len_nfb * 8
432 logger.debug("Offset after first fragment: %s" % offset)
433 while len(hex_payload) > offset:
434 p = per_fragment_headers
437 p = p / fragment_ext_hdr
438 del p[IPv6ExtHdrFragment].nh
439 l_nfb = int((fragsize - len(p)) / 8)
440 p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
441 p[IPv6ExtHdrFragment].nh = orig_nh
442 p[IPv6ExtHdrFragment].id = identification
443 p[IPv6ExtHdrFragment].offset = int(offset / 8)
444 p[IPv6ExtHdrFragment].m = 1
445 p = p.__class__(scapy.compat.raw(p))
446 logger.debug(ppp("Fragment %s:" % len(pkts), p))
448 offset = offset + l_nfb * 8
450 pkts[-1][IPv6ExtHdrFragment].m = 0 # reset more-flags in last fragment
455 def reassemble4_core(listoffragments, return_ip):
457 first = listoffragments[0]
459 for pkt in listoffragments:
460 buffer.seek(pkt[IP].frag*8)
461 buffer.write(bytes(pkt[IP].payload))
462 first.len = len(buffer.getvalue()) + 20
466 header = bytes(first[IP])[:20]
467 return first[IP].__class__(header + buffer.getvalue())
469 header = bytes(first[Ether])[:34]
470 return first[Ether].__class__(header + buffer.getvalue())
473 def reassemble4_ether(listoffragments):
474 return reassemble4_core(listoffragments, False)
477 def reassemble4(listoffragments):
478 return reassemble4_core(listoffragments, True)