Simplify test/util.py:ppp.
[vpp.git] / test / util.py
1 """ test framework utilities """
2
3 import socket
4 import sys
5 import os.path
6 from abc import abstractmethod, ABCMeta
7 from scapy.utils6 import in6_mactoifaceid
8
9 from scapy.layers.l2 import Ether
10 from scapy.packet import Raw
11 from scapy.layers.inet import IP
12 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
13     IPv6ExtHdrHopByHop
14 from scapy.utils import hexdump
15 from socket import AF_INET6
16 from io import BytesIO
17 from vpp_papi import mac_pton
18
19
20 def ppp(headline, packet):
21     """ Return string containing the output of scapy packet.show() call. """
22     return '%s\n%s\n\n%s\n' % (headline,
23                                hexdump(packet, dump=True),
24                                packet.show(dump=True))
25
26
27 def ppc(headline, capture, limit=10):
28     """ Return string containing ppp() printout for a capture.
29
30     :param headline: printed as first line of output
31     :param capture: packets to print
32     :param limit: limit the print to # of packets
33     """
34     if not capture:
35         return headline
36     tail = ""
37     if limit < len(capture):
38         tail = "\nPrint limit reached, %s out of %s packets printed" % (
39             limit, len(capture))
40     body = "".join([ppp("Packet #%s:" % count, p)
41                     for count, p in zip(range(0, limit), capture)])
42     return "%s\n%s%s" % (headline, body, tail)
43
44
45 def ip4_range(ip4, s, e):
46     tmp = ip4.rsplit('.', 1)[0]
47     return ("%s.%d" % (tmp, i) for i in range(s, e))
48
49
50 def ip4n_range(ip4n, s, e):
51     ip4 = socket.inet_ntop(socket.AF_INET, ip4n)
52     return (socket.inet_pton(socket.AF_INET, ip)
53             for ip in ip4_range(ip4, s, e))
54
55
56 def mk_ll_addr(mac):
57     euid = in6_mactoifaceid(mac)
58     addr = "fe80::" + euid
59     return addr
60
61
62 def ip6_normalize(ip6):
63     return socket.inet_ntop(socket.AF_INET6,
64                             socket.inet_pton(socket.AF_INET6, ip6))
65
66
67 def get_core_path(tempdir):
68     return "%s/%s" % (tempdir, get_core_pattern())
69
70
71 def is_core_present(tempdir):
72     return os.path.isfile(get_core_path(tempdir))
73
74
75 def get_core_pattern():
76     with open("/proc/sys/kernel/core_pattern", "r") as f:
77         corefmt = f.read().strip()
78     return corefmt
79
80
81 def check_core_path(logger, core_path):
82     corefmt = get_core_pattern()
83     if corefmt.startswith("|"):
84         logger.error(
85             "WARNING: redirecting the core dump through a"
86             " filter may result in truncated dumps.")
87         logger.error(
88             "   You may want to check the filter settings"
89             " or uninstall it and edit the"
90             " /proc/sys/kernel/core_pattern accordingly.")
91         logger.error(
92             "   current core pattern is: %s" % corefmt)
93
94
95 class NumericConstant(object):
96     __metaclass__ = ABCMeta
97
98     desc_dict = {}
99
100     @abstractmethod
101     def __init__(self, value):
102         self._value = value
103
104     def __int__(self):
105         return self._value
106
107     def __long__(self):
108         return self._value
109
110     def __str__(self):
111         if self._value in self.desc_dict:
112             return self.desc_dict[self._value]
113         return ""
114
115
116 class Host(object):
117     """ Generic test host "connected" to VPPs interface. """
118
119     @property
120     def mac(self):
121         """ MAC address """
122         return self._mac
123
124     @property
125     def bin_mac(self):
126         """ MAC address """
127         return mac_pton(self._mac)
128
129     @property
130     def ip4(self):
131         """ IPv4 address - string """
132         return self._ip4
133
134     @property
135     def ip4n(self):
136         """ IPv4 address of remote host - raw, suitable as API parameter."""
137         return socket.inet_pton(socket.AF_INET, self._ip4)
138
139     @property
140     def ip6(self):
141         """ IPv6 address - string """
142         return self._ip6
143
144     @property
145     def ip6n(self):
146         """ IPv6 address of remote host - raw, suitable as API parameter."""
147         return socket.inet_pton(socket.AF_INET6, self._ip6)
148
149     @property
150     def ip6_ll(self):
151         """ IPv6 link-local address - string """
152         return self._ip6_ll
153
154     @property
155     def ip6n_ll(self):
156         """ IPv6 link-local address of remote host -
157         raw, suitable as API parameter."""
158         return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
159
160     def __eq__(self, other):
161         if isinstance(other, Host):
162             return (self.mac == other.mac and
163                     self.ip4 == other.ip4 and
164                     self.ip6 == other.ip6 and
165                     self.ip6_ll == other.ip6_ll)
166         else:
167             return False
168
169     def __ne__(self, other):
170         return not self.__eq__(other)
171
172     def __repr__(self):
173         return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
174                                                             self.ip4,
175                                                             self.ip6,
176                                                             self.ip6_ll)
177
178     def __hash__(self):
179         return hash(self.__repr__())
180
181     def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
182         self._mac = mac
183         self._ip4 = ip4
184         self._ip6 = ip6
185         self._ip6_ll = ip6_ll
186
187
188 class ForeignAddressFactory(object):
189     count = 0
190     prefix_len = 24
191     net_template = '10.10.10.{}'
192     net = net_template.format(0) + '/' + str(prefix_len)
193
194     def get_ip4(self):
195         if self.count > 255:
196             raise Exception("Network host address exhaustion")
197         self.count += 1
198         return self.net_template.format(self.count)
199
200
201 class L4_Conn():
202     """ L4 'connection' tied to two VPP interfaces """
203
204     def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
205         self.testcase = testcase
206         self.ifs = [None, None]
207         self.ifs[0] = if1
208         self.ifs[1] = if2
209         self.address_family = af
210         self.l4proto = l4proto
211         self.ports = [None, None]
212         self.ports[0] = port1
213         self.ports[1] = port2
214         self
215
216     def pkt(self, side, l4args={}, payload="x"):
217         is_ip6 = 1 if self.address_family == AF_INET6 else 0
218         s0 = side
219         s1 = 1 - side
220         src_if = self.ifs[s0]
221         dst_if = self.ifs[s1]
222         layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
223                    IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
224         merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
225         merged_l4args.update(l4args)
226         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
227              layer_3[is_ip6] /
228              self.l4proto(**merged_l4args) /
229              Raw(payload))
230         return p
231
232     def send(self, side, flags=None, payload=""):
233         l4args = {}
234         if flags is not None:
235             l4args['flags'] = flags
236         self.ifs[side].add_stream(self.pkt(side,
237                                            l4args=l4args, payload=payload))
238         self.ifs[1 - side].enable_capture()
239         self.testcase.pg_start()
240
241     def recv(self, side):
242         p = self.ifs[side].wait_for_packet(1)
243         return p
244
245     def send_through(self, side, flags=None, payload=""):
246         self.send(side, flags, payload)
247         p = self.recv(1 - side)
248         return p
249
250     def send_pingpong(self, side, flags1=None, flags2=None):
251         p1 = self.send_through(side, flags1)
252         p2 = self.send_through(1 - side, flags2)
253         return [p1, p2]
254
255
256 class L4_CONN_SIDE:
257     L4_CONN_SIDE_ZERO = 0
258     L4_CONN_SIDE_ONE = 1
259
260
261 class LoggerWrapper(object):
262     def __init__(self, logger=None):
263         self._logger = logger
264
265     def debug(self, *args, **kwargs):
266         if self._logger:
267             self._logger.debug(*args, **kwargs)
268
269     def error(self, *args, **kwargs):
270         if self._logger:
271             self._logger.error(*args, **kwargs)
272
273
274 def fragment_rfc791(packet, fragsize, _logger=None):
275     """
276     Fragment an IPv4 packet per RFC 791
277     :param packet: packet to fragment
278     :param fragsize: size at which to fragment
279     :note: IP options are not supported
280     :returns: list of fragments
281     """
282     logger = LoggerWrapper(_logger)
283     logger.debug(ppp("Fragmenting packet:", packet))
284     packet = packet.__class__(str(packet))  # recalculate all values
285     if len(packet[IP].options) > 0:
286         raise Exception("Not implemented")
287     if len(packet) <= fragsize:
288         return [packet]
289
290     pre_ip_len = len(packet) - len(packet[IP])
291     ip_header_len = packet[IP].ihl * 4
292     hex_packet = str(packet)
293     hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
294     hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
295
296     pkts = []
297     ihl = packet[IP].ihl
298     otl = len(packet[IP])
299     nfb = (fragsize - pre_ip_len - ihl * 4) / 8
300     fo = packet[IP].frag
301
302     p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
303     p[IP].flags = "MF"
304     p[IP].frag = fo
305     p[IP].len = ihl * 4 + nfb * 8
306     del p[IP].chksum
307     pkts.append(p)
308
309     p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
310     p[IP].len = otl - nfb * 8
311     p[IP].frag = fo + nfb
312     del p[IP].chksum
313
314     more_fragments = fragment_rfc791(p, fragsize, _logger)
315     pkts.extend(more_fragments)
316
317     return pkts
318
319
320 def fragment_rfc8200(packet, identification, fragsize, _logger=None):
321     """
322     Fragment an IPv6 packet per RFC 8200
323     :param packet: packet to fragment
324     :param fragsize: size at which to fragment
325     :note: IP options are not supported
326     :returns: list of fragments
327     """
328     logger = LoggerWrapper(_logger)
329     packet = packet.__class__(str(packet))  # recalculate all values
330     if len(packet) <= fragsize:
331         return [packet]
332     logger.debug(ppp("Fragmenting packet:", packet))
333     pkts = []
334     counter = 0
335     routing_hdr = None
336     hop_by_hop_hdr = None
337     upper_layer = None
338     seen_ipv6 = False
339     ipv6_nr = -1
340     l = packet.getlayer(counter)
341     while l is not None:
342         if l.__class__ is IPv6:
343             if seen_ipv6:
344                 # ignore 2nd IPv6 header and everything below..
345                 break
346             ipv6_nr = counter
347             seen_ipv6 = True
348         elif l.__class__ is IPv6ExtHdrFragment:
349             raise Exception("Already fragmented")
350         elif l.__class__ is IPv6ExtHdrRouting:
351             routing_hdr = counter
352         elif l.__class__ is IPv6ExtHdrHopByHop:
353             hop_by_hop_hdr = counter
354         elif seen_ipv6 and not upper_layer and \
355                 not l.__class__.__name__.startswith('IPv6ExtHdr'):
356             upper_layer = counter
357         counter = counter + 1
358         l = packet.getlayer(counter)
359
360     logger.debug(
361         "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
362         (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
363
364     if upper_layer is None:
365         raise Exception("Upper layer header not found in IPv6 packet")
366
367     last_per_fragment_hdr = ipv6_nr
368     if routing_hdr is None:
369         if hop_by_hop_hdr is not None:
370             last_per_fragment_hdr = hop_by_hop_hdr
371     else:
372         last_per_fragment_hdr = routing_hdr
373     logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
374
375     per_fragment_headers = packet.copy()
376     per_fragment_headers[last_per_fragment_hdr].remove_payload()
377     logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
378
379     ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
380     hex_payload = str(ext_and_upper_layer)
381     logger.debug("Payload length is %s" % len(hex_payload))
382     logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
383
384     fragment_ext_hdr = IPv6ExtHdrFragment()
385     logger.debug(ppp("Fragment header:", fragment_ext_hdr))
386
387     if len(per_fragment_headers) + len(fragment_ext_hdr) +\
388             len(ext_and_upper_layer) - len(ext_and_upper_layer.payload)\
389             > fragsize:
390         raise Exception("Cannot fragment this packet - MTU too small "
391                         "(%s, %s, %s, %s, %s)" % (
392                             len(per_fragment_headers), len(fragment_ext_hdr),
393                             len(ext_and_upper_layer),
394                             len(ext_and_upper_layer.payload), fragsize))
395
396     orig_nh = packet[IPv6].nh
397     p = per_fragment_headers
398     del p[IPv6].plen
399     del p[IPv6].nh
400     p = p / fragment_ext_hdr
401     del p[IPv6ExtHdrFragment].nh
402     first_payload_len_nfb = (fragsize - len(p)) / 8
403     p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
404     del p[IPv6].plen
405     p[IPv6ExtHdrFragment].nh = orig_nh
406     p[IPv6ExtHdrFragment].id = identification
407     p[IPv6ExtHdrFragment].offset = 0
408     p[IPv6ExtHdrFragment].m = 1
409     p = p.__class__(str(p))
410     logger.debug(ppp("Fragment %s:" % len(pkts), p))
411     pkts.append(p)
412     offset = first_payload_len_nfb * 8
413     logger.debug("Offset after first fragment: %s" % offset)
414     while len(hex_payload) > offset:
415         p = per_fragment_headers
416         del p[IPv6].plen
417         del p[IPv6].nh
418         p = p / fragment_ext_hdr
419         del p[IPv6ExtHdrFragment].nh
420         l_nfb = (fragsize - len(p)) / 8
421         p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
422         p[IPv6ExtHdrFragment].nh = orig_nh
423         p[IPv6ExtHdrFragment].id = identification
424         p[IPv6ExtHdrFragment].offset = offset / 8
425         p[IPv6ExtHdrFragment].m = 1
426         p = p.__class__(str(p))
427         logger.debug(ppp("Fragment %s:" % len(pkts), p))
428         pkts.append(p)
429         offset = offset + l_nfb * 8
430
431     pkts[-1][IPv6ExtHdrFragment].m = 0  # reset more-flags in last fragment
432
433     return pkts
434
435
436 def reassemble4_core(listoffragments, return_ip):
437     buffer = BytesIO()
438     first = listoffragments[0]
439     buffer.seek(20)
440     for pkt in listoffragments:
441         buffer.seek(pkt[IP].frag*8)
442         buffer.write(bytes(pkt[IP].payload))
443     first.len = len(buffer.getvalue()) + 20
444     first.flags = 0
445     del(first.chksum)
446     if return_ip:
447         header = bytes(first[IP])[:20]
448         return first[IP].__class__(header + buffer.getvalue())
449     else:
450         header = bytes(first[Ether])[:34]
451         return first[Ether].__class__(header + buffer.getvalue())
452
453
454 def reassemble4_ether(listoffragments):
455     return reassemble4_core(listoffragments, False)
456
457
458 def reassemble4(listoffragments):
459     return reassemble4_core(listoffragments, True)