1 """ test framework utilities """
6 from abc import abstractmethod, ABCMeta
7 from cStringIO import StringIO
8 from scapy.utils6 import in6_mactoifaceid
10 from scapy.layers.l2 import Ether
11 from scapy.packet import Raw
12 from scapy.layers.inet import IP
13 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
15 from scapy.utils import hexdump
16 from socket import AF_INET6
19 def ppp(headline, packet):
20 """ Return string containing the output of scapy packet.show() call. """
22 old_stdout = sys.stdout
28 sys.stdout = old_stdout
32 def ppc(headline, capture, limit=10):
33 """ Return string containing ppp() printout for a capture.
35 :param headline: printed as first line of output
36 :param capture: packets to print
37 :param limit: limit the print to # of packets
42 if limit < len(capture):
43 tail = "\nPrint limit reached, %s out of %s packets printed" % (
45 body = "".join([ppp("Packet #%s:" % count, p)
46 for count, p in zip(range(0, limit), capture)])
47 return "%s\n%s%s" % (headline, body, tail)
50 def ip4_range(ip4, s, e):
51 tmp = ip4.rsplit('.', 1)[0]
52 return ("%s.%d" % (tmp, i) for i in range(s, e))
55 def ip4n_range(ip4n, s, e):
56 ip4 = socket.inet_ntop(socket.AF_INET, ip4n)
57 return (socket.inet_pton(socket.AF_INET, ip)
58 for ip in ip4_range(ip4, s, e))
62 """ Convert the : separated format into binary packet data for the API """
63 return mac.replace(':', '').decode('hex')
67 euid = in6_mactoifaceid(mac)
68 addr = "fe80::" + euid
72 def ip6_normalize(ip6):
73 return socket.inet_ntop(socket.AF_INET6,
74 socket.inet_pton(socket.AF_INET6, ip6))
77 def get_core_path(tempdir):
78 return "%s/%s" % (tempdir, get_core_pattern())
81 def is_core_present(tempdir):
82 return os.path.isfile(get_core_path(tempdir))
85 def get_core_pattern():
86 with open("/proc/sys/kernel/core_pattern", "r") as f:
87 corefmt = f.read().strip()
91 def check_core_path(logger, core_path):
92 corefmt = get_core_pattern()
93 if corefmt.startswith("|"):
95 "WARNING: redirecting the core dump through a"
96 " filter may result in truncated dumps.")
98 " You may want to check the filter settings"
99 " or uninstall it and edit the"
100 " /proc/sys/kernel/core_pattern accordingly.")
102 " current core pattern is: %s" % corefmt)
105 class NumericConstant(object):
106 __metaclass__ = ABCMeta
111 def __init__(self, value):
121 if self._value in self.desc_dict:
122 return self.desc_dict[self._value]
127 """ Generic test host "connected" to VPPs interface. """
137 return mactobinary(self._mac)
141 """ IPv4 address - string """
146 """ IPv4 address of remote host - raw, suitable as API parameter."""
147 return socket.inet_pton(socket.AF_INET, self._ip4)
151 """ IPv6 address - string """
156 """ IPv6 address of remote host - raw, suitable as API parameter."""
157 return socket.inet_pton(socket.AF_INET6, self._ip6)
161 """ IPv6 link-local address - string """
166 """ IPv6 link-local address of remote host -
167 raw, suitable as API parameter."""
168 return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
170 def __eq__(self, other):
171 if isinstance(other, Host):
172 return (self.mac == other.mac and
173 self.ip4 == other.ip4 and
174 self.ip6 == other.ip6 and
175 self.ip6_ll == other.ip6_ll)
179 def __ne__(self, other):
180 return not self.__eq__(other)
183 return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
189 return hash(self.__repr__())
191 def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
195 self._ip6_ll = ip6_ll
198 class ForeignAddressFactory(object):
201 net_template = '10.10.10.{}'
202 net = net_template.format(0) + '/' + str(prefix_len)
206 raise Exception("Network host address exhaustion")
208 return self.net_template.format(self.count)
212 """ L4 'connection' tied to two VPP interfaces """
214 def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
215 self.testcase = testcase
216 self.ifs = [None, None]
219 self.address_family = af
220 self.l4proto = l4proto
221 self.ports = [None, None]
222 self.ports[0] = port1
223 self.ports[1] = port2
226 def pkt(self, side, l4args={}, payload="x"):
227 is_ip6 = 1 if self.address_family == AF_INET6 else 0
230 src_if = self.ifs[s0]
231 dst_if = self.ifs[s1]
232 layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
233 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
234 merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
235 merged_l4args.update(l4args)
236 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
238 self.l4proto(**merged_l4args) /
242 def send(self, side, flags=None, payload=""):
244 if flags is not None:
245 l4args['flags'] = flags
246 self.ifs[side].add_stream(self.pkt(side,
247 l4args=l4args, payload=payload))
248 self.ifs[1 - side].enable_capture()
249 self.testcase.pg_start()
251 def recv(self, side):
252 p = self.ifs[side].wait_for_packet(1)
255 def send_through(self, side, flags=None, payload=""):
256 self.send(side, flags, payload)
257 p = self.recv(1 - side)
260 def send_pingpong(self, side, flags1=None, flags2=None):
261 p1 = self.send_through(side, flags1)
262 p2 = self.send_through(1 - side, flags2)
267 L4_CONN_SIDE_ZERO = 0
271 class LoggerWrapper(object):
272 def __init__(self, logger=None):
273 self._logger = logger
275 def debug(self, *args, **kwargs):
277 self._logger.debug(*args, **kwargs)
279 def error(self, *args, **kwargs):
281 self._logger.error(*args, **kwargs)
284 def fragment_rfc791(packet, fragsize, _logger=None):
286 Fragment an IPv4 packet per RFC 791
287 :param packet: packet to fragment
288 :param fragsize: size at which to fragment
289 :note: IP options are not supported
290 :returns: list of fragments
292 logger = LoggerWrapper(_logger)
293 logger.debug(ppp("Fragmenting packet:", packet))
294 packet = packet.__class__(str(packet)) # recalculate all values
295 if len(packet[IP].options) > 0:
296 raise Exception("Not implemented")
297 if len(packet) <= fragsize:
300 pre_ip_len = len(packet) - len(packet[IP])
301 ip_header_len = packet[IP].ihl * 4
302 hex_packet = str(packet)
303 hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
304 hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
308 otl = len(packet[IP])
309 nfb = (fragsize - pre_ip_len - ihl * 4) / 8
312 p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
315 p[IP].len = ihl * 4 + nfb * 8
319 p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
320 p[IP].len = otl - nfb * 8
321 p[IP].frag = fo + nfb
324 more_fragments = fragment_rfc791(p, fragsize, _logger)
325 pkts.extend(more_fragments)
330 def fragment_rfc8200(packet, identification, fragsize, _logger=None):
332 Fragment an IPv6 packet per RFC 8200
333 :param packet: packet to fragment
334 :param fragsize: size at which to fragment
335 :note: IP options are not supported
336 :returns: list of fragments
338 logger = LoggerWrapper(_logger)
339 packet = packet.__class__(str(packet)) # recalculate all values
340 if len(packet) <= fragsize:
342 logger.debug(ppp("Fragmenting packet:", packet))
346 hop_by_hop_hdr = None
350 l = packet.getlayer(counter)
352 if l.__class__ is IPv6:
354 # ignore 2nd IPv6 header and everything below..
358 elif l.__class__ is IPv6ExtHdrFragment:
359 raise Exception("Already fragmented")
360 elif l.__class__ is IPv6ExtHdrRouting:
361 routing_hdr = counter
362 elif l.__class__ is IPv6ExtHdrHopByHop:
363 hop_by_hop_hdr = counter
364 elif seen_ipv6 and not upper_layer and \
365 not l.__class__.__name__.startswith('IPv6ExtHdr'):
366 upper_layer = counter
367 counter = counter + 1
368 l = packet.getlayer(counter)
371 "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
372 (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
374 if upper_layer is None:
375 raise Exception("Upper layer header not found in IPv6 packet")
377 last_per_fragment_hdr = ipv6_nr
378 if routing_hdr is None:
379 if hop_by_hop_hdr is not None:
380 last_per_fragment_hdr = hop_by_hop_hdr
382 last_per_fragment_hdr = routing_hdr
383 logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
385 per_fragment_headers = packet.copy()
386 per_fragment_headers[last_per_fragment_hdr].remove_payload()
387 logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
389 ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
390 hex_payload = str(ext_and_upper_layer)
391 logger.debug("Payload length is %s" % len(hex_payload))
392 logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
394 fragment_ext_hdr = IPv6ExtHdrFragment()
395 logger.debug(ppp("Fragment header:", fragment_ext_hdr))
397 if len(per_fragment_headers) + len(fragment_ext_hdr) +\
398 len(ext_and_upper_layer) - len(ext_and_upper_layer.payload)\
400 raise Exception("Cannot fragment this packet - MTU too small "
401 "(%s, %s, %s, %s, %s)" % (
402 len(per_fragment_headers), len(fragment_ext_hdr),
403 len(ext_and_upper_layer),
404 len(ext_and_upper_layer.payload), fragsize))
406 orig_nh = packet[IPv6].nh
407 p = per_fragment_headers
410 p = p / fragment_ext_hdr
411 del p[IPv6ExtHdrFragment].nh
412 first_payload_len_nfb = (fragsize - len(p)) / 8
413 p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
415 p[IPv6ExtHdrFragment].nh = orig_nh
416 p[IPv6ExtHdrFragment].id = identification
417 p[IPv6ExtHdrFragment].offset = 0
418 p[IPv6ExtHdrFragment].m = 1
419 p = p.__class__(str(p))
420 logger.debug(ppp("Fragment %s:" % len(pkts), p))
422 offset = first_payload_len_nfb * 8
423 logger.debug("Offset after first fragment: %s" % offset)
424 while len(hex_payload) > offset:
425 p = per_fragment_headers
428 p = p / fragment_ext_hdr
429 del p[IPv6ExtHdrFragment].nh
430 l_nfb = (fragsize - len(p)) / 8
431 p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
432 p[IPv6ExtHdrFragment].nh = orig_nh
433 p[IPv6ExtHdrFragment].id = identification
434 p[IPv6ExtHdrFragment].offset = offset / 8
435 p[IPv6ExtHdrFragment].m = 1
436 p = p.__class__(str(p))
437 logger.debug(ppp("Fragment %s:" % len(pkts), p))
439 offset = offset + l_nfb * 8
441 pkts[-1][IPv6ExtHdrFragment].m = 0 # reset more-flags in last fragment