1 """ test framework utilities """
5 from socket import AF_INET6
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP
12 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
14 from scapy.packet import Raw
15 from scapy.utils import hexdump
16 from scapy.utils6 import in6_mactoifaceid
18 from io import BytesIO
19 from vpp_papi import mac_pton
22 def ppp(headline, packet):
23 """ Return string containing the output of scapy packet.show() call. """
24 return '%s\n%s\n\n%s\n' % (headline,
25 hexdump(packet, dump=True),
26 packet.show(dump=True))
29 def ppc(headline, capture, limit=10):
30 """ Return string containing ppp() printout for a capture.
32 :param headline: printed as first line of output
33 :param capture: packets to print
34 :param limit: limit the print to # of packets
39 if limit < len(capture):
40 tail = "\nPrint limit reached, %s out of %s packets printed" % (
42 body = "".join([ppp("Packet #%s:" % count, p)
43 for count, p in zip(range(0, limit), capture)])
44 return "%s\n%s%s" % (headline, body, tail)
47 def ip4_range(ip4, s, e):
48 tmp = ip4.rsplit('.', 1)[0]
49 return ("%s.%d" % (tmp, i) for i in range(s, e))
52 def ip4n_range(ip4n, s, e):
53 ip4 = socket.inet_ntop(socket.AF_INET, ip4n)
54 return (socket.inet_pton(socket.AF_INET, ip)
55 for ip in ip4_range(ip4, s, e))
58 # wrapper around scapy library function.
60 euid = in6_mactoifaceid(mac)
61 addr = "fe80::" + euid
65 def ip6_normalize(ip6):
66 return socket.inet_ntop(socket.AF_INET6,
67 socket.inet_pton(socket.AF_INET6, ip6))
70 def get_core_path(tempdir):
71 return "%s/%s" % (tempdir, get_core_pattern())
74 def is_core_present(tempdir):
75 return os.path.isfile(get_core_path(tempdir))
78 def get_core_pattern():
79 with open("/proc/sys/kernel/core_pattern", "r") as f:
80 corefmt = f.read().strip()
84 def check_core_path(logger, core_path):
85 corefmt = get_core_pattern()
86 if corefmt.startswith("|"):
88 "WARNING: redirecting the core dump through a"
89 " filter may result in truncated dumps.")
91 " You may want to check the filter settings"
92 " or uninstall it and edit the"
93 " /proc/sys/kernel/core_pattern accordingly.")
95 " current core pattern is: %s" % corefmt)
98 class NumericConstant(object):
102 def __init__(self, value):
112 if self._value in self.desc_dict:
113 return self.desc_dict[self._value]
118 """ Generic test host "connected" to VPPs interface. """
128 return mac_pton(self._mac)
132 """ IPv4 address - string """
137 """ IPv4 address of remote host - raw, suitable as API parameter."""
138 return socket.inet_pton(socket.AF_INET, self._ip4)
142 """ IPv6 address - string """
147 """ IPv6 address of remote host - raw, suitable as API parameter."""
148 return socket.inet_pton(socket.AF_INET6, self._ip6)
152 """ IPv6 link-local address - string """
157 """ IPv6 link-local address of remote host -
158 raw, suitable as API parameter."""
159 return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
161 def __eq__(self, other):
162 if isinstance(other, Host):
163 return (self.mac == other.mac and
164 self.ip4 == other.ip4 and
165 self.ip6 == other.ip6 and
166 self.ip6_ll == other.ip6_ll)
170 def __ne__(self, other):
171 return not self.__eq__(other)
174 return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
180 return hash(self.__repr__())
182 def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
186 self._ip6_ll = ip6_ll
189 class ForeignAddressFactory(object):
192 net_template = '10.10.10.{}'
193 net = net_template.format(0) + '/' + str(prefix_len)
197 raise Exception("Network host address exhaustion")
199 return self.net_template.format(self.count)
203 """ L4 'connection' tied to two VPP interfaces """
205 def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
206 self.testcase = testcase
207 self.ifs = [None, None]
210 self.address_family = af
211 self.l4proto = l4proto
212 self.ports = [None, None]
213 self.ports[0] = port1
214 self.ports[1] = port2
217 def pkt(self, side, l4args={}, payload="x"):
218 is_ip6 = 1 if self.address_family == AF_INET6 else 0
221 src_if = self.ifs[s0]
222 dst_if = self.ifs[s1]
223 layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
224 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
225 merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
226 merged_l4args.update(l4args)
227 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
229 self.l4proto(**merged_l4args) /
233 def send(self, side, flags=None, payload=""):
235 if flags is not None:
236 l4args['flags'] = flags
237 self.ifs[side].add_stream(self.pkt(side,
238 l4args=l4args, payload=payload))
239 self.ifs[1 - side].enable_capture()
240 self.testcase.pg_start()
242 def recv(self, side):
243 p = self.ifs[side].wait_for_packet(1)
246 def send_through(self, side, flags=None, payload=""):
247 self.send(side, flags, payload)
248 p = self.recv(1 - side)
251 def send_pingpong(self, side, flags1=None, flags2=None):
252 p1 = self.send_through(side, flags1)
253 p2 = self.send_through(1 - side, flags2)
258 L4_CONN_SIDE_ZERO = 0
262 class LoggerWrapper(object):
263 def __init__(self, logger=None):
264 self._logger = logger
266 def debug(self, *args, **kwargs):
268 self._logger.debug(*args, **kwargs)
270 def error(self, *args, **kwargs):
272 self._logger.error(*args, **kwargs)
275 def fragment_rfc791(packet, fragsize, _logger=None):
277 Fragment an IPv4 packet per RFC 791
278 :param packet: packet to fragment
279 :param fragsize: size at which to fragment
280 :note: IP options are not supported
281 :returns: list of fragments
283 logger = LoggerWrapper(_logger)
284 logger.debug(ppp("Fragmenting packet:", packet))
285 packet = packet.__class__(str(packet)) # recalculate all values
286 if len(packet[IP].options) > 0:
287 raise Exception("Not implemented")
288 if len(packet) <= fragsize:
291 pre_ip_len = len(packet) - len(packet[IP])
292 ip_header_len = packet[IP].ihl * 4
293 hex_packet = str(packet)
294 hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
295 hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
299 otl = len(packet[IP])
300 nfb = (fragsize - pre_ip_len - ihl * 4) / 8
303 p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
306 p[IP].len = ihl * 4 + nfb * 8
310 p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
311 p[IP].len = otl - nfb * 8
312 p[IP].frag = fo + nfb
315 more_fragments = fragment_rfc791(p, fragsize, _logger)
316 pkts.extend(more_fragments)
321 def fragment_rfc8200(packet, identification, fragsize, _logger=None):
323 Fragment an IPv6 packet per RFC 8200
324 :param packet: packet to fragment
325 :param fragsize: size at which to fragment
326 :note: IP options are not supported
327 :returns: list of fragments
329 logger = LoggerWrapper(_logger)
330 packet = packet.__class__(str(packet)) # recalculate all values
331 if len(packet) <= fragsize:
333 logger.debug(ppp("Fragmenting packet:", packet))
337 hop_by_hop_hdr = None
341 l = packet.getlayer(counter)
343 if l.__class__ is IPv6:
345 # ignore 2nd IPv6 header and everything below..
349 elif l.__class__ is IPv6ExtHdrFragment:
350 raise Exception("Already fragmented")
351 elif l.__class__ is IPv6ExtHdrRouting:
352 routing_hdr = counter
353 elif l.__class__ is IPv6ExtHdrHopByHop:
354 hop_by_hop_hdr = counter
355 elif seen_ipv6 and not upper_layer and \
356 not l.__class__.__name__.startswith('IPv6ExtHdr'):
357 upper_layer = counter
358 counter = counter + 1
359 l = packet.getlayer(counter)
362 "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
363 (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
365 if upper_layer is None:
366 raise Exception("Upper layer header not found in IPv6 packet")
368 last_per_fragment_hdr = ipv6_nr
369 if routing_hdr is None:
370 if hop_by_hop_hdr is not None:
371 last_per_fragment_hdr = hop_by_hop_hdr
373 last_per_fragment_hdr = routing_hdr
374 logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
376 per_fragment_headers = packet.copy()
377 per_fragment_headers[last_per_fragment_hdr].remove_payload()
378 logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
380 ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
381 hex_payload = str(ext_and_upper_layer)
382 logger.debug("Payload length is %s" % len(hex_payload))
383 logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
385 fragment_ext_hdr = IPv6ExtHdrFragment()
386 logger.debug(ppp("Fragment header:", fragment_ext_hdr))
388 if len(per_fragment_headers) + len(fragment_ext_hdr) +\
389 len(ext_and_upper_layer) - len(ext_and_upper_layer.payload)\
391 raise Exception("Cannot fragment this packet - MTU too small "
392 "(%s, %s, %s, %s, %s)" % (
393 len(per_fragment_headers), len(fragment_ext_hdr),
394 len(ext_and_upper_layer),
395 len(ext_and_upper_layer.payload), fragsize))
397 orig_nh = packet[IPv6].nh
398 p = per_fragment_headers
401 p = p / fragment_ext_hdr
402 del p[IPv6ExtHdrFragment].nh
403 first_payload_len_nfb = (fragsize - len(p)) / 8
404 p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
406 p[IPv6ExtHdrFragment].nh = orig_nh
407 p[IPv6ExtHdrFragment].id = identification
408 p[IPv6ExtHdrFragment].offset = 0
409 p[IPv6ExtHdrFragment].m = 1
410 p = p.__class__(str(p))
411 logger.debug(ppp("Fragment %s:" % len(pkts), p))
413 offset = first_payload_len_nfb * 8
414 logger.debug("Offset after first fragment: %s" % offset)
415 while len(hex_payload) > offset:
416 p = per_fragment_headers
419 p = p / fragment_ext_hdr
420 del p[IPv6ExtHdrFragment].nh
421 l_nfb = (fragsize - len(p)) / 8
422 p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
423 p[IPv6ExtHdrFragment].nh = orig_nh
424 p[IPv6ExtHdrFragment].id = identification
425 p[IPv6ExtHdrFragment].offset = offset / 8
426 p[IPv6ExtHdrFragment].m = 1
427 p = p.__class__(str(p))
428 logger.debug(ppp("Fragment %s:" % len(pkts), p))
430 offset = offset + l_nfb * 8
432 pkts[-1][IPv6ExtHdrFragment].m = 0 # reset more-flags in last fragment
437 def reassemble4_core(listoffragments, return_ip):
439 first = listoffragments[0]
441 for pkt in listoffragments:
442 buffer.seek(pkt[IP].frag*8)
443 buffer.write(bytes(pkt[IP].payload))
444 first.len = len(buffer.getvalue()) + 20
448 header = bytes(first[IP])[:20]
449 return first[IP].__class__(header + buffer.getvalue())
451 header = bytes(first[Ether])[:34]
452 return first[Ether].__class__(header + buffer.getvalue())
455 def reassemble4_ether(listoffragments):
456 return reassemble4_core(listoffragments, False)
459 def reassemble4(listoffragments):
460 return reassemble4_core(listoffragments, True)