1 """ test framework utilities """
7 from socket import AF_INET6
12 from scapy.layers.l2 import Ether
13 from scapy.layers.inet import IP
14 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
16 from scapy.packet import Raw
17 from scapy.utils import hexdump
18 from scapy.utils6 import in6_mactoifaceid
20 from io import BytesIO
21 from vpp_papi import mac_pton
23 # Set up an empty logger for the testcase that can be overridden as necessary
24 null_logger = logging.getLogger('VppTestCase.util')
25 null_logger.addHandler(logging.NullHandler())
28 def ppp(headline, packet):
29 """ Return string containing the output of scapy packet.show() call. """
30 return '%s\n%s\n\n%s\n' % (headline,
31 hexdump(packet, dump=True),
32 packet.show(dump=True))
35 def ppc(headline, capture, limit=10):
36 """ Return string containing ppp() printout for a capture.
38 :param headline: printed as first line of output
39 :param capture: packets to print
40 :param limit: limit the print to # of packets
45 if limit < len(capture):
46 tail = "\nPrint limit reached, %s out of %s packets printed" % (
48 body = "".join([ppp("Packet #%s:" % count, p)
49 for count, p in zip(range(0, limit), capture)])
50 return "%s\n%s%s" % (headline, body, tail)
53 def ip4_range(ip4, s, e):
54 tmp = ip4.rsplit('.', 1)[0]
55 return ("%s.%d" % (tmp, i) for i in range(s, e))
58 def mcast_ip_to_mac(ip):
59 ip = ipaddress.ip_address(ip)
60 if not ip.is_multicast:
61 raise ValueError("Must be multicast address.")
64 mcast_mac = "01:00:5e:%02x:%02x:%02x" % ((ip_as_int >> 16) & 0x7f,
65 (ip_as_int >> 8) & 0xff,
68 mcast_mac = "33:33:%02x:%02x:%02x:%02x" % ((ip_as_int >> 24) & 0xff,
69 (ip_as_int >> 16) & 0xff,
70 (ip_as_int >> 8) & 0xff,
75 # wrapper around scapy library function.
77 euid = in6_mactoifaceid(str(mac))
78 addr = "fe80::" + euid
82 def ip6_normalize(ip6):
83 return socket.inet_ntop(socket.AF_INET6,
84 socket.inet_pton(socket.AF_INET6, ip6))
87 def get_core_path(tempdir):
88 return "%s/%s" % (tempdir, get_core_pattern())
91 def is_core_present(tempdir):
92 return os.path.isfile(get_core_path(tempdir))
95 def get_core_pattern():
96 with open("/proc/sys/kernel/core_pattern", "r") as f:
97 corefmt = f.read().strip()
101 def check_core_path(logger, core_path):
102 corefmt = get_core_pattern()
103 if corefmt.startswith("|"):
105 "WARNING: redirecting the core dump through a"
106 " filter may result in truncated dumps.")
108 " You may want to check the filter settings"
109 " or uninstall it and edit the"
110 " /proc/sys/kernel/core_pattern accordingly.")
112 " current core pattern is: %s" % corefmt)
115 class NumericConstant(object):
119 def __init__(self, value):
129 if self._value in self.desc_dict:
130 return self.desc_dict[self._value]
135 """ Generic test host "connected" to VPPs interface. """
145 return mac_pton(self._mac)
149 """ IPv4 address - string """
154 """ IPv4 address of remote host - raw, suitable as API parameter."""
155 return socket.inet_pton(socket.AF_INET, self._ip4)
159 """ IPv6 address - string """
164 """ IPv6 address of remote host - raw, suitable as API parameter."""
165 return socket.inet_pton(socket.AF_INET6, self._ip6)
169 """ IPv6 link-local address - string """
174 """ IPv6 link-local address of remote host -
175 raw, suitable as API parameter."""
176 return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
178 def __eq__(self, other):
179 if isinstance(other, Host):
180 return (self.mac == other.mac and
181 self.ip4 == other.ip4 and
182 self.ip6 == other.ip6 and
183 self.ip6_ll == other.ip6_ll)
187 def __ne__(self, other):
188 return not self.__eq__(other)
191 return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
197 return hash(self.__repr__())
199 def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
203 self._ip6_ll = ip6_ll
206 class ForeignAddressFactory(object):
209 net_template = '10.10.10.{}'
210 net = net_template.format(0) + '/' + str(prefix_len)
214 raise Exception("Network host address exhaustion")
216 return self.net_template.format(self.count)
220 """ L4 'connection' tied to two VPP interfaces """
222 def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
223 self.testcase = testcase
224 self.ifs = [None, None]
227 self.address_family = af
228 self.l4proto = l4proto
229 self.ports = [None, None]
230 self.ports[0] = port1
231 self.ports[1] = port2
234 def pkt(self, side, l4args={}, payload="x"):
235 is_ip6 = 1 if self.address_family == AF_INET6 else 0
238 src_if = self.ifs[s0]
239 dst_if = self.ifs[s1]
240 layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
241 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
242 merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
243 merged_l4args.update(l4args)
244 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
246 self.l4proto(**merged_l4args) /
250 def send(self, side, flags=None, payload=""):
252 if flags is not None:
253 l4args['flags'] = flags
254 self.ifs[side].add_stream(self.pkt(side,
255 l4args=l4args, payload=payload))
256 self.ifs[1 - side].enable_capture()
257 self.testcase.pg_start()
259 def recv(self, side):
260 p = self.ifs[side].wait_for_packet(1)
263 def send_through(self, side, flags=None, payload=""):
264 self.send(side, flags, payload)
265 p = self.recv(1 - side)
268 def send_pingpong(self, side, flags1=None, flags2=None):
269 p1 = self.send_through(side, flags1)
270 p2 = self.send_through(1 - side, flags2)
275 L4_CONN_SIDE_ZERO = 0
279 def fragment_rfc791(packet, fragsize, logger=null_logger):
281 Fragment an IPv4 packet per RFC 791
282 :param packet: packet to fragment
283 :param fragsize: size at which to fragment
284 :note: IP options are not supported
285 :returns: list of fragments
287 logger.debug(ppp("Fragmenting packet:", packet))
288 packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
289 if len(packet[IP].options) > 0:
290 raise Exception("Not implemented")
291 if len(packet) <= fragsize:
294 pre_ip_len = len(packet) - len(packet[IP])
295 ip_header_len = packet[IP].ihl * 4
296 hex_packet = scapy.compat.raw(packet)
297 hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
298 hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
302 otl = len(packet[IP])
303 nfb = int((fragsize - pre_ip_len - ihl * 4) / 8)
306 p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
309 p[IP].len = ihl * 4 + nfb * 8
313 p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
314 p[IP].len = otl - nfb * 8
315 p[IP].frag = fo + nfb
318 more_fragments = fragment_rfc791(p, fragsize, logger)
319 pkts.extend(more_fragments)
324 def fragment_rfc8200(packet, identification, fragsize, logger=null_logger):
326 Fragment an IPv6 packet per RFC 8200
327 :param packet: packet to fragment
328 :param fragsize: size at which to fragment
329 :note: IP options are not supported
330 :returns: list of fragments
332 packet = packet.__class__(scapy.compat.raw(packet)) # recalc. all values
333 if len(packet) <= fragsize:
335 logger.debug(ppp("Fragmenting packet:", packet))
339 hop_by_hop_hdr = None
343 l = packet.getlayer(counter)
345 if l.__class__ is IPv6:
347 # ignore 2nd IPv6 header and everything below..
351 elif l.__class__ is IPv6ExtHdrFragment:
352 raise Exception("Already fragmented")
353 elif l.__class__ is IPv6ExtHdrRouting:
354 routing_hdr = counter
355 elif l.__class__ is IPv6ExtHdrHopByHop:
356 hop_by_hop_hdr = counter
357 elif seen_ipv6 and not upper_layer and \
358 not l.__class__.__name__.startswith('IPv6ExtHdr'):
359 upper_layer = counter
360 counter = counter + 1
361 l = packet.getlayer(counter)
364 "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
365 (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
367 if upper_layer is None:
368 raise Exception("Upper layer header not found in IPv6 packet")
370 last_per_fragment_hdr = ipv6_nr
371 if routing_hdr is None:
372 if hop_by_hop_hdr is not None:
373 last_per_fragment_hdr = hop_by_hop_hdr
375 last_per_fragment_hdr = routing_hdr
376 logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
378 per_fragment_headers = packet.copy()
379 per_fragment_headers[last_per_fragment_hdr].remove_payload()
380 logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
382 ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
383 hex_payload = scapy.compat.raw(ext_and_upper_layer)
384 logger.debug("Payload length is %s" % len(hex_payload))
385 logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
387 fragment_ext_hdr = IPv6ExtHdrFragment()
388 logger.debug(ppp("Fragment header:", fragment_ext_hdr))
390 len_ext_and_upper_layer_payload = len(ext_and_upper_layer.payload)
391 if not len_ext_and_upper_layer_payload and \
392 hasattr(ext_and_upper_layer, "data"):
393 len_ext_and_upper_layer_payload = len(ext_and_upper_layer.data)
395 if len(per_fragment_headers) + len(fragment_ext_hdr) +\
396 len(ext_and_upper_layer) - len_ext_and_upper_layer_payload\
398 raise Exception("Cannot fragment this packet - MTU too small "
399 "(%s, %s, %s, %s, %s)" % (
400 len(per_fragment_headers), len(fragment_ext_hdr),
401 len(ext_and_upper_layer),
402 len_ext_and_upper_layer_payload, fragsize))
404 orig_nh = packet[IPv6].nh
405 p = per_fragment_headers
408 p = p / fragment_ext_hdr
409 del p[IPv6ExtHdrFragment].nh
410 first_payload_len_nfb = int((fragsize - len(p)) / 8)
411 p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
413 p[IPv6ExtHdrFragment].nh = orig_nh
414 p[IPv6ExtHdrFragment].id = identification
415 p[IPv6ExtHdrFragment].offset = 0
416 p[IPv6ExtHdrFragment].m = 1
417 p = p.__class__(scapy.compat.raw(p))
418 logger.debug(ppp("Fragment %s:" % len(pkts), p))
420 offset = first_payload_len_nfb * 8
421 logger.debug("Offset after first fragment: %s" % offset)
422 while len(hex_payload) > offset:
423 p = per_fragment_headers
426 p = p / fragment_ext_hdr
427 del p[IPv6ExtHdrFragment].nh
428 l_nfb = int((fragsize - len(p)) / 8)
429 p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
430 p[IPv6ExtHdrFragment].nh = orig_nh
431 p[IPv6ExtHdrFragment].id = identification
432 p[IPv6ExtHdrFragment].offset = int(offset / 8)
433 p[IPv6ExtHdrFragment].m = 1
434 p = p.__class__(scapy.compat.raw(p))
435 logger.debug(ppp("Fragment %s:" % len(pkts), p))
437 offset = offset + l_nfb * 8
439 pkts[-1][IPv6ExtHdrFragment].m = 0 # reset more-flags in last fragment
444 def reassemble4_core(listoffragments, return_ip):
446 first = listoffragments[0]
448 for pkt in listoffragments:
449 buffer.seek(pkt[IP].frag*8)
450 buffer.write(bytes(pkt[IP].payload))
451 first.len = len(buffer.getvalue()) + 20
455 header = bytes(first[IP])[:20]
456 return first[IP].__class__(header + buffer.getvalue())
458 header = bytes(first[Ether])[:34]
459 return first[Ether].__class__(header + buffer.getvalue())
462 def reassemble4_ether(listoffragments):
463 return reassemble4_core(listoffragments, False)
466 def reassemble4(listoffragments):
467 return reassemble4_core(listoffragments, True)