1 """ test framework utilities """
7 from socket import AF_INET6
12 from scapy.layers.l2 import Ether
13 from scapy.layers.inet import IP
14 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
16 from scapy.packet import Raw
17 from scapy.utils import hexdump
18 from scapy.utils6 import in6_mactoifaceid
20 from io import BytesIO
21 from vpp_papi import mac_pton
23 # Set up an empty logger for the testcase that can be overridden as necessary
24 null_logger = logging.getLogger('VppTestCase.util')
25 null_logger.addHandler(logging.NullHandler())
28 def ppp(headline, packet):
29 """ Return string containing the output of scapy packet.show() call. """
30 return '%s\n%s\n\n%s\n' % (headline,
31 hexdump(packet, dump=True),
32 packet.show(dump=True))
35 def ppc(headline, capture, limit=10):
36 """ Return string containing ppp() printout for a capture.
38 :param headline: printed as first line of output
39 :param capture: packets to print
40 :param limit: limit the print to # of packets
45 if limit < len(capture):
46 tail = "\nPrint limit reached, %s out of %s packets printed" % (
48 body = "".join([ppp("Packet #%s:" % count, p)
49 for count, p in zip(range(0, limit), capture)])
50 return "%s\n%s%s" % (headline, body, tail)
53 def ip4_range(ip4, s, e):
54 tmp = ip4.rsplit('.', 1)[0]
55 return ("%s.%d" % (tmp, i) for i in range(s, e))
58 def mcast_ip_to_mac(ip):
59 ip = ipaddress.ip_address(ip)
60 if not ip.is_multicast:
61 raise ValueError("Must be multicast address.")
64 mcast_mac = "01:00:5e:%02x:%02x:%02x" % ((ip_as_int >> 16) & 0x7f,
65 (ip_as_int >> 8) & 0xff,
68 mcast_mac = "33:33:%02x:%02x:%02x:%02x" % ((ip_as_int >> 24) & 0xff,
69 (ip_as_int >> 16) & 0xff,
70 (ip_as_int >> 8) & 0xff,
75 # wrapper around scapy library function.
77 euid = in6_mactoifaceid(str(mac))
78 addr = "fe80::" + euid
82 def ip6_normalize(ip6):
83 return socket.inet_ntop(socket.AF_INET6,
84 socket.inet_pton(socket.AF_INET6, ip6))
87 def get_core_path(tempdir):
88 return "%s/%s" % (tempdir, get_core_pattern())
91 def is_core_present(tempdir):
92 return os.path.isfile(get_core_path(tempdir))
95 def get_core_pattern():
96 with open("/proc/sys/kernel/core_pattern", "r") as f:
97 corefmt = f.read().strip()
101 def check_core_path(logger, core_path):
102 corefmt = get_core_pattern()
103 if corefmt.startswith("|"):
105 "WARNING: redirecting the core dump through a"
106 " filter may result in truncated dumps.")
108 " You may want to check the filter settings"
109 " or uninstall it and edit the"
110 " /proc/sys/kernel/core_pattern accordingly.")
112 " current core pattern is: %s" % corefmt)
115 class NumericConstant:
119 def __init__(self, value):
129 if self._value in self.desc_dict:
130 return self.desc_dict[self._value]
135 """ Generic test host "connected" to VPPs interface. """
145 return mac_pton(self._mac)
149 """ IPv4 address - string """
154 """ IPv4 address of remote host - raw, suitable as API parameter."""
155 return socket.inet_pton(socket.AF_INET, self._ip4)
159 """ IPv6 address - string """
164 """ IPv6 address of remote host - raw, suitable as API parameter."""
165 return socket.inet_pton(socket.AF_INET6, self._ip6)
169 """ IPv6 link-local address - string """
174 """ IPv6 link-local address of remote host -
175 raw, suitable as API parameter."""
176 return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
178 def __eq__(self, other):
179 if isinstance(other, Host):
180 return (self.mac == other.mac and
181 self.ip4 == other.ip4 and
182 self.ip6 == other.ip6 and
183 self.ip6_ll == other.ip6_ll)
187 def __ne__(self, other):
188 return not self.__eq__(other)
191 return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
197 return hash(self.__repr__())
199 def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
203 self._ip6_ll = ip6_ll
207 """ L4 'connection' tied to two VPP interfaces """
209 def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
210 self.testcase = testcase
211 self.ifs = [None, None]
214 self.address_family = af
215 self.l4proto = l4proto
216 self.ports = [None, None]
217 self.ports[0] = port1
218 self.ports[1] = port2
221 def pkt(self, side, l4args={}, payload="x"):
222 is_ip6 = 1 if self.address_family == AF_INET6 else 0
225 src_if = self.ifs[s0]
226 dst_if = self.ifs[s1]
227 layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
228 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
229 merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
230 merged_l4args.update(l4args)
231 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
233 self.l4proto(**merged_l4args) /
237 def send(self, side, flags=None, payload=""):
239 if flags is not None:
240 l4args['flags'] = flags
241 self.ifs[side].add_stream(self.pkt(side,
242 l4args=l4args, payload=payload))
243 self.ifs[1 - side].enable_capture()
244 self.testcase.pg_start()
246 def recv(self, side):
247 p = self.ifs[side].wait_for_packet(1)
250 def send_through(self, side, flags=None, payload=""):
251 self.send(side, flags, payload)
252 p = self.recv(1 - side)
255 def send_pingpong(self, side, flags1=None, flags2=None):
256 p1 = self.send_through(side, flags1)
257 p2 = self.send_through(1 - side, flags2)
262 L4_CONN_SIDE_ZERO = 0
266 def fragment_rfc791(packet, fragsize, logger=null_logger):
268 Fragment an IPv4 packet per RFC 791
269 :param packet: packet to fragment
270 :param fragsize: size at which to fragment
271 :note: IP options are not supported
272 :returns: list of fragments
274 logger.debug(ppp("Fragmenting packet:", packet))
275 packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
276 if len(packet[IP].options) > 0:
277 raise Exception("Not implemented")
278 if len(packet) <= fragsize:
281 pre_ip_len = len(packet) - len(packet[IP])
282 ip_header_len = packet[IP].ihl * 4
283 hex_packet = scapy.compat.raw(packet)
284 hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
285 hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
289 otl = len(packet[IP])
290 nfb = int((fragsize - pre_ip_len - ihl * 4) / 8)
293 p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
296 p[IP].len = ihl * 4 + nfb * 8
300 p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
301 p[IP].len = otl - nfb * 8
302 p[IP].frag = fo + nfb
305 more_fragments = fragment_rfc791(p, fragsize, logger)
306 pkts.extend(more_fragments)
311 def fragment_rfc8200(packet, identification, fragsize, logger=null_logger):
313 Fragment an IPv6 packet per RFC 8200
314 :param packet: packet to fragment
315 :param fragsize: size at which to fragment
316 :note: IP options are not supported
317 :returns: list of fragments
319 packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
320 if len(packet) <= fragsize:
322 logger.debug(ppp("Fragmenting packet:", packet))
326 hop_by_hop_hdr = None
330 l = packet.getlayer(counter)
332 if l.__class__ is IPv6:
334 # ignore 2nd IPv6 header and everything below..
338 elif l.__class__ is IPv6ExtHdrFragment:
339 raise Exception("Already fragmented")
340 elif l.__class__ is IPv6ExtHdrRouting:
341 routing_hdr = counter
342 elif l.__class__ is IPv6ExtHdrHopByHop:
343 hop_by_hop_hdr = counter
344 elif seen_ipv6 and not upper_layer and \
345 not l.__class__.__name__.startswith('IPv6ExtHdr'):
346 upper_layer = counter
347 counter = counter + 1
348 l = packet.getlayer(counter)
351 "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
352 (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
354 if upper_layer is None:
355 raise Exception("Upper layer header not found in IPv6 packet")
357 last_per_fragment_hdr = ipv6_nr
358 if routing_hdr is None:
359 if hop_by_hop_hdr is not None:
360 last_per_fragment_hdr = hop_by_hop_hdr
362 last_per_fragment_hdr = routing_hdr
363 logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
365 per_fragment_headers = packet.copy()
366 per_fragment_headers[last_per_fragment_hdr].remove_payload()
367 logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
369 ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
370 hex_payload = scapy.compat.raw(ext_and_upper_layer)
371 logger.debug("Payload length is %s" % len(hex_payload))
372 logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
374 fragment_ext_hdr = IPv6ExtHdrFragment()
375 logger.debug(ppp("Fragment header:", fragment_ext_hdr))
377 len_ext_and_upper_layer_payload = len(ext_and_upper_layer.payload)
378 if not len_ext_and_upper_layer_payload and \
379 hasattr(ext_and_upper_layer, "data"):
380 len_ext_and_upper_layer_payload = len(ext_and_upper_layer.data)
382 if len(per_fragment_headers) + len(fragment_ext_hdr) +\
383 len(ext_and_upper_layer) - len_ext_and_upper_layer_payload\
385 raise Exception("Cannot fragment this packet - MTU too small "
386 "(%s, %s, %s, %s, %s)" % (
387 len(per_fragment_headers), len(fragment_ext_hdr),
388 len(ext_and_upper_layer),
389 len_ext_and_upper_layer_payload, fragsize))
391 orig_nh = packet[IPv6].nh
392 p = per_fragment_headers
395 p = p / fragment_ext_hdr
396 del p[IPv6ExtHdrFragment].nh
397 first_payload_len_nfb = int((fragsize - len(p)) / 8)
398 p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
400 p[IPv6ExtHdrFragment].nh = orig_nh
401 p[IPv6ExtHdrFragment].id = identification
402 p[IPv6ExtHdrFragment].offset = 0
403 p[IPv6ExtHdrFragment].m = 1
404 p = p.__class__(scapy.compat.raw(p))
405 logger.debug(ppp("Fragment %s:" % len(pkts), p))
407 offset = first_payload_len_nfb * 8
408 logger.debug("Offset after first fragment: %s" % offset)
409 while len(hex_payload) > offset:
410 p = per_fragment_headers
413 p = p / fragment_ext_hdr
414 del p[IPv6ExtHdrFragment].nh
415 l_nfb = int((fragsize - len(p)) / 8)
416 p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
417 p[IPv6ExtHdrFragment].nh = orig_nh
418 p[IPv6ExtHdrFragment].id = identification
419 p[IPv6ExtHdrFragment].offset = int(offset / 8)
420 p[IPv6ExtHdrFragment].m = 1
421 p = p.__class__(scapy.compat.raw(p))
422 logger.debug(ppp("Fragment %s:" % len(pkts), p))
424 offset = offset + l_nfb * 8
426 pkts[-1][IPv6ExtHdrFragment].m = 0 # reset more-flags in last fragment
431 def reassemble4_core(listoffragments, return_ip):
433 first = listoffragments[0]
435 for pkt in listoffragments:
436 buffer.seek(pkt[IP].frag*8)
437 buffer.write(bytes(pkt[IP].payload))
438 first.len = len(buffer.getvalue()) + 20
442 header = bytes(first[IP])[:20]
443 return first[IP].__class__(header + buffer.getvalue())
445 header = bytes(first[Ether])[:34]
446 return first[Ether].__class__(header + buffer.getvalue())
449 def reassemble4_ether(listoffragments):
450 return reassemble4_core(listoffragments, False)
453 def reassemble4(listoffragments):
454 return reassemble4_core(listoffragments, True)