1 """ test framework utilities """
8 from scapy.utils6 import in6_mactoifaceid
10 from scapy.layers.l2 import Ether
11 from scapy.packet import Raw
12 from scapy.layers.inet import IP
13 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
15 from scapy.utils import hexdump
16 from socket import AF_INET6
17 from io import BytesIO
18 from vpp_papi import mac_pton
21 def ppp(headline, packet):
22 """ Return string containing the output of scapy packet.show() call. """
23 return '%s\n%s\n\n%s\n' % (headline,
24 hexdump(packet, dump=True),
25 packet.show(dump=True))
28 def ppc(headline, capture, limit=10):
29 """ Return string containing ppp() printout for a capture.
31 :param headline: printed as first line of output
32 :param capture: packets to print
33 :param limit: limit the print to # of packets
38 if limit < len(capture):
39 tail = "\nPrint limit reached, %s out of %s packets printed" % (
41 body = "".join([ppp("Packet #%s:" % count, p)
42 for count, p in zip(range(0, limit), capture)])
43 return "%s\n%s%s" % (headline, body, tail)
46 def ip4_range(ip4, s, e):
47 tmp = ip4.rsplit('.', 1)[0]
48 return ("%s.%d" % (tmp, i) for i in range(s, e))
51 def ip4n_range(ip4n, s, e):
52 ip4 = socket.inet_ntop(socket.AF_INET, ip4n)
53 return (socket.inet_pton(socket.AF_INET, ip)
54 for ip in ip4_range(ip4, s, e))
58 euid = in6_mactoifaceid(mac)
59 addr = "fe80::" + euid
63 def ip6_normalize(ip6):
64 return socket.inet_ntop(socket.AF_INET6,
65 socket.inet_pton(socket.AF_INET6, ip6))
68 def get_core_path(tempdir):
69 return "%s/%s" % (tempdir, get_core_pattern())
72 def is_core_present(tempdir):
73 return os.path.isfile(get_core_path(tempdir))
76 def get_core_pattern():
77 with open("/proc/sys/kernel/core_pattern", "r") as f:
78 corefmt = f.read().strip()
82 def check_core_path(logger, core_path):
83 corefmt = get_core_pattern()
84 if corefmt.startswith("|"):
86 "WARNING: redirecting the core dump through a"
87 " filter may result in truncated dumps.")
89 " You may want to check the filter settings"
90 " or uninstall it and edit the"
91 " /proc/sys/kernel/core_pattern accordingly.")
93 " current core pattern is: %s" % corefmt)
96 class NumericConstant(object):
100 def __init__(self, value):
110 if self._value in self.desc_dict:
111 return self.desc_dict[self._value]
116 """ Generic test host "connected" to VPPs interface. """
126 return mac_pton(self._mac)
130 """ IPv4 address - string """
135 """ IPv4 address of remote host - raw, suitable as API parameter."""
136 return socket.inet_pton(socket.AF_INET, self._ip4)
140 """ IPv6 address - string """
145 """ IPv6 address of remote host - raw, suitable as API parameter."""
146 return socket.inet_pton(socket.AF_INET6, self._ip6)
150 """ IPv6 link-local address - string """
155 """ IPv6 link-local address of remote host -
156 raw, suitable as API parameter."""
157 return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
159 def __eq__(self, other):
160 if isinstance(other, Host):
161 return (self.mac == other.mac and
162 self.ip4 == other.ip4 and
163 self.ip6 == other.ip6 and
164 self.ip6_ll == other.ip6_ll)
168 def __ne__(self, other):
169 return not self.__eq__(other)
172 return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
178 return hash(self.__repr__())
180 def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
184 self._ip6_ll = ip6_ll
187 class ForeignAddressFactory(object):
190 net_template = '10.10.10.{}'
191 net = net_template.format(0) + '/' + str(prefix_len)
195 raise Exception("Network host address exhaustion")
197 return self.net_template.format(self.count)
201 """ L4 'connection' tied to two VPP interfaces """
203 def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
204 self.testcase = testcase
205 self.ifs = [None, None]
208 self.address_family = af
209 self.l4proto = l4proto
210 self.ports = [None, None]
211 self.ports[0] = port1
212 self.ports[1] = port2
215 def pkt(self, side, l4args={}, payload="x"):
216 is_ip6 = 1 if self.address_family == AF_INET6 else 0
219 src_if = self.ifs[s0]
220 dst_if = self.ifs[s1]
221 layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
222 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
223 merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
224 merged_l4args.update(l4args)
225 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
227 self.l4proto(**merged_l4args) /
231 def send(self, side, flags=None, payload=""):
233 if flags is not None:
234 l4args['flags'] = flags
235 self.ifs[side].add_stream(self.pkt(side,
236 l4args=l4args, payload=payload))
237 self.ifs[1 - side].enable_capture()
238 self.testcase.pg_start()
240 def recv(self, side):
241 p = self.ifs[side].wait_for_packet(1)
244 def send_through(self, side, flags=None, payload=""):
245 self.send(side, flags, payload)
246 p = self.recv(1 - side)
249 def send_pingpong(self, side, flags1=None, flags2=None):
250 p1 = self.send_through(side, flags1)
251 p2 = self.send_through(1 - side, flags2)
256 L4_CONN_SIDE_ZERO = 0
260 class LoggerWrapper(object):
261 def __init__(self, logger=None):
262 self._logger = logger
264 def debug(self, *args, **kwargs):
266 self._logger.debug(*args, **kwargs)
268 def error(self, *args, **kwargs):
270 self._logger.error(*args, **kwargs)
273 def fragment_rfc791(packet, fragsize, _logger=None):
275 Fragment an IPv4 packet per RFC 791
276 :param packet: packet to fragment
277 :param fragsize: size at which to fragment
278 :note: IP options are not supported
279 :returns: list of fragments
281 logger = LoggerWrapper(_logger)
282 logger.debug(ppp("Fragmenting packet:", packet))
283 packet = packet.__class__(str(packet)) # recalculate all values
284 if len(packet[IP].options) > 0:
285 raise Exception("Not implemented")
286 if len(packet) <= fragsize:
289 pre_ip_len = len(packet) - len(packet[IP])
290 ip_header_len = packet[IP].ihl * 4
291 hex_packet = str(packet)
292 hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
293 hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
297 otl = len(packet[IP])
298 nfb = (fragsize - pre_ip_len - ihl * 4) / 8
301 p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
304 p[IP].len = ihl * 4 + nfb * 8
308 p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
309 p[IP].len = otl - nfb * 8
310 p[IP].frag = fo + nfb
313 more_fragments = fragment_rfc791(p, fragsize, _logger)
314 pkts.extend(more_fragments)
319 def fragment_rfc8200(packet, identification, fragsize, _logger=None):
321 Fragment an IPv6 packet per RFC 8200
322 :param packet: packet to fragment
323 :param fragsize: size at which to fragment
324 :note: IP options are not supported
325 :returns: list of fragments
327 logger = LoggerWrapper(_logger)
328 packet = packet.__class__(str(packet)) # recalculate all values
329 if len(packet) <= fragsize:
331 logger.debug(ppp("Fragmenting packet:", packet))
335 hop_by_hop_hdr = None
339 l = packet.getlayer(counter)
341 if l.__class__ is IPv6:
343 # ignore 2nd IPv6 header and everything below..
347 elif l.__class__ is IPv6ExtHdrFragment:
348 raise Exception("Already fragmented")
349 elif l.__class__ is IPv6ExtHdrRouting:
350 routing_hdr = counter
351 elif l.__class__ is IPv6ExtHdrHopByHop:
352 hop_by_hop_hdr = counter
353 elif seen_ipv6 and not upper_layer and \
354 not l.__class__.__name__.startswith('IPv6ExtHdr'):
355 upper_layer = counter
356 counter = counter + 1
357 l = packet.getlayer(counter)
360 "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
361 (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
363 if upper_layer is None:
364 raise Exception("Upper layer header not found in IPv6 packet")
366 last_per_fragment_hdr = ipv6_nr
367 if routing_hdr is None:
368 if hop_by_hop_hdr is not None:
369 last_per_fragment_hdr = hop_by_hop_hdr
371 last_per_fragment_hdr = routing_hdr
372 logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
374 per_fragment_headers = packet.copy()
375 per_fragment_headers[last_per_fragment_hdr].remove_payload()
376 logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
378 ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
379 hex_payload = str(ext_and_upper_layer)
380 logger.debug("Payload length is %s" % len(hex_payload))
381 logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
383 fragment_ext_hdr = IPv6ExtHdrFragment()
384 logger.debug(ppp("Fragment header:", fragment_ext_hdr))
386 if len(per_fragment_headers) + len(fragment_ext_hdr) +\
387 len(ext_and_upper_layer) - len(ext_and_upper_layer.payload)\
389 raise Exception("Cannot fragment this packet - MTU too small "
390 "(%s, %s, %s, %s, %s)" % (
391 len(per_fragment_headers), len(fragment_ext_hdr),
392 len(ext_and_upper_layer),
393 len(ext_and_upper_layer.payload), fragsize))
395 orig_nh = packet[IPv6].nh
396 p = per_fragment_headers
399 p = p / fragment_ext_hdr
400 del p[IPv6ExtHdrFragment].nh
401 first_payload_len_nfb = (fragsize - len(p)) / 8
402 p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
404 p[IPv6ExtHdrFragment].nh = orig_nh
405 p[IPv6ExtHdrFragment].id = identification
406 p[IPv6ExtHdrFragment].offset = 0
407 p[IPv6ExtHdrFragment].m = 1
408 p = p.__class__(str(p))
409 logger.debug(ppp("Fragment %s:" % len(pkts), p))
411 offset = first_payload_len_nfb * 8
412 logger.debug("Offset after first fragment: %s" % offset)
413 while len(hex_payload) > offset:
414 p = per_fragment_headers
417 p = p / fragment_ext_hdr
418 del p[IPv6ExtHdrFragment].nh
419 l_nfb = (fragsize - len(p)) / 8
420 p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
421 p[IPv6ExtHdrFragment].nh = orig_nh
422 p[IPv6ExtHdrFragment].id = identification
423 p[IPv6ExtHdrFragment].offset = offset / 8
424 p[IPv6ExtHdrFragment].m = 1
425 p = p.__class__(str(p))
426 logger.debug(ppp("Fragment %s:" % len(pkts), p))
428 offset = offset + l_nfb * 8
430 pkts[-1][IPv6ExtHdrFragment].m = 0 # reset more-flags in last fragment
435 def reassemble4_core(listoffragments, return_ip):
437 first = listoffragments[0]
439 for pkt in listoffragments:
440 buffer.seek(pkt[IP].frag*8)
441 buffer.write(bytes(pkt[IP].payload))
442 first.len = len(buffer.getvalue()) + 20
446 header = bytes(first[IP])[:20]
447 return first[IP].__class__(header + buffer.getvalue())
449 header = bytes(first[Ether])[:34]
450 return first[Ether].__class__(header + buffer.getvalue())
453 def reassemble4_ether(listoffragments):
454 return reassemble4_core(listoffragments, False)
457 def reassemble4(listoffragments):
458 return reassemble4_core(listoffragments, True)