VPP-1508 Python3 abstract classes
[vpp.git] / test / util.py
1 """ test framework utilities """
2
3 import abc
4 import socket
5 import six
6 import sys
7 import os.path
8 from scapy.utils6 import in6_mactoifaceid
9
10 from scapy.layers.l2 import Ether
11 from scapy.packet import Raw
12 from scapy.layers.inet import IP
13 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
14     IPv6ExtHdrHopByHop
15 from scapy.utils import hexdump
16 from socket import AF_INET6
17 from io import BytesIO
18 from vpp_papi import mac_pton
19
20
21 def ppp(headline, packet):
22     """ Return string containing the output of scapy packet.show() call. """
23     return '%s\n%s\n\n%s\n' % (headline,
24                                hexdump(packet, dump=True),
25                                packet.show(dump=True))
26
27
28 def ppc(headline, capture, limit=10):
29     """ Return string containing ppp() printout for a capture.
30
31     :param headline: printed as first line of output
32     :param capture: packets to print
33     :param limit: limit the print to # of packets
34     """
35     if not capture:
36         return headline
37     tail = ""
38     if limit < len(capture):
39         tail = "\nPrint limit reached, %s out of %s packets printed" % (
40             limit, len(capture))
41     body = "".join([ppp("Packet #%s:" % count, p)
42                     for count, p in zip(range(0, limit), capture)])
43     return "%s\n%s%s" % (headline, body, tail)
44
45
46 def ip4_range(ip4, s, e):
47     tmp = ip4.rsplit('.', 1)[0]
48     return ("%s.%d" % (tmp, i) for i in range(s, e))
49
50
51 def ip4n_range(ip4n, s, e):
52     ip4 = socket.inet_ntop(socket.AF_INET, ip4n)
53     return (socket.inet_pton(socket.AF_INET, ip)
54             for ip in ip4_range(ip4, s, e))
55
56
57 def mk_ll_addr(mac):
58     euid = in6_mactoifaceid(mac)
59     addr = "fe80::" + euid
60     return addr
61
62
63 def ip6_normalize(ip6):
64     return socket.inet_ntop(socket.AF_INET6,
65                             socket.inet_pton(socket.AF_INET6, ip6))
66
67
68 def get_core_path(tempdir):
69     return "%s/%s" % (tempdir, get_core_pattern())
70
71
72 def is_core_present(tempdir):
73     return os.path.isfile(get_core_path(tempdir))
74
75
76 def get_core_pattern():
77     with open("/proc/sys/kernel/core_pattern", "r") as f:
78         corefmt = f.read().strip()
79     return corefmt
80
81
82 def check_core_path(logger, core_path):
83     corefmt = get_core_pattern()
84     if corefmt.startswith("|"):
85         logger.error(
86             "WARNING: redirecting the core dump through a"
87             " filter may result in truncated dumps.")
88         logger.error(
89             "   You may want to check the filter settings"
90             " or uninstall it and edit the"
91             " /proc/sys/kernel/core_pattern accordingly.")
92         logger.error(
93             "   current core pattern is: %s" % corefmt)
94
95
96 class NumericConstant(object):
97
98     desc_dict = {}
99
100     def __init__(self, value):
101         self._value = value
102
103     def __int__(self):
104         return self._value
105
106     def __long__(self):
107         return self._value
108
109     def __str__(self):
110         if self._value in self.desc_dict:
111             return self.desc_dict[self._value]
112         return ""
113
114
115 class Host(object):
116     """ Generic test host "connected" to VPPs interface. """
117
118     @property
119     def mac(self):
120         """ MAC address """
121         return self._mac
122
123     @property
124     def bin_mac(self):
125         """ MAC address """
126         return mac_pton(self._mac)
127
128     @property
129     def ip4(self):
130         """ IPv4 address - string """
131         return self._ip4
132
133     @property
134     def ip4n(self):
135         """ IPv4 address of remote host - raw, suitable as API parameter."""
136         return socket.inet_pton(socket.AF_INET, self._ip4)
137
138     @property
139     def ip6(self):
140         """ IPv6 address - string """
141         return self._ip6
142
143     @property
144     def ip6n(self):
145         """ IPv6 address of remote host - raw, suitable as API parameter."""
146         return socket.inet_pton(socket.AF_INET6, self._ip6)
147
148     @property
149     def ip6_ll(self):
150         """ IPv6 link-local address - string """
151         return self._ip6_ll
152
153     @property
154     def ip6n_ll(self):
155         """ IPv6 link-local address of remote host -
156         raw, suitable as API parameter."""
157         return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
158
159     def __eq__(self, other):
160         if isinstance(other, Host):
161             return (self.mac == other.mac and
162                     self.ip4 == other.ip4 and
163                     self.ip6 == other.ip6 and
164                     self.ip6_ll == other.ip6_ll)
165         else:
166             return False
167
168     def __ne__(self, other):
169         return not self.__eq__(other)
170
171     def __repr__(self):
172         return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
173                                                             self.ip4,
174                                                             self.ip6,
175                                                             self.ip6_ll)
176
177     def __hash__(self):
178         return hash(self.__repr__())
179
180     def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
181         self._mac = mac
182         self._ip4 = ip4
183         self._ip6 = ip6
184         self._ip6_ll = ip6_ll
185
186
187 class ForeignAddressFactory(object):
188     count = 0
189     prefix_len = 24
190     net_template = '10.10.10.{}'
191     net = net_template.format(0) + '/' + str(prefix_len)
192
193     def get_ip4(self):
194         if self.count > 255:
195             raise Exception("Network host address exhaustion")
196         self.count += 1
197         return self.net_template.format(self.count)
198
199
200 class L4_Conn():
201     """ L4 'connection' tied to two VPP interfaces """
202
203     def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
204         self.testcase = testcase
205         self.ifs = [None, None]
206         self.ifs[0] = if1
207         self.ifs[1] = if2
208         self.address_family = af
209         self.l4proto = l4proto
210         self.ports = [None, None]
211         self.ports[0] = port1
212         self.ports[1] = port2
213         self
214
215     def pkt(self, side, l4args={}, payload="x"):
216         is_ip6 = 1 if self.address_family == AF_INET6 else 0
217         s0 = side
218         s1 = 1 - side
219         src_if = self.ifs[s0]
220         dst_if = self.ifs[s1]
221         layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
222                    IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
223         merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
224         merged_l4args.update(l4args)
225         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
226              layer_3[is_ip6] /
227              self.l4proto(**merged_l4args) /
228              Raw(payload))
229         return p
230
231     def send(self, side, flags=None, payload=""):
232         l4args = {}
233         if flags is not None:
234             l4args['flags'] = flags
235         self.ifs[side].add_stream(self.pkt(side,
236                                            l4args=l4args, payload=payload))
237         self.ifs[1 - side].enable_capture()
238         self.testcase.pg_start()
239
240     def recv(self, side):
241         p = self.ifs[side].wait_for_packet(1)
242         return p
243
244     def send_through(self, side, flags=None, payload=""):
245         self.send(side, flags, payload)
246         p = self.recv(1 - side)
247         return p
248
249     def send_pingpong(self, side, flags1=None, flags2=None):
250         p1 = self.send_through(side, flags1)
251         p2 = self.send_through(1 - side, flags2)
252         return [p1, p2]
253
254
255 class L4_CONN_SIDE:
256     L4_CONN_SIDE_ZERO = 0
257     L4_CONN_SIDE_ONE = 1
258
259
260 class LoggerWrapper(object):
261     def __init__(self, logger=None):
262         self._logger = logger
263
264     def debug(self, *args, **kwargs):
265         if self._logger:
266             self._logger.debug(*args, **kwargs)
267
268     def error(self, *args, **kwargs):
269         if self._logger:
270             self._logger.error(*args, **kwargs)
271
272
273 def fragment_rfc791(packet, fragsize, _logger=None):
274     """
275     Fragment an IPv4 packet per RFC 791
276     :param packet: packet to fragment
277     :param fragsize: size at which to fragment
278     :note: IP options are not supported
279     :returns: list of fragments
280     """
281     logger = LoggerWrapper(_logger)
282     logger.debug(ppp("Fragmenting packet:", packet))
283     packet = packet.__class__(str(packet))  # recalculate all values
284     if len(packet[IP].options) > 0:
285         raise Exception("Not implemented")
286     if len(packet) <= fragsize:
287         return [packet]
288
289     pre_ip_len = len(packet) - len(packet[IP])
290     ip_header_len = packet[IP].ihl * 4
291     hex_packet = str(packet)
292     hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
293     hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
294
295     pkts = []
296     ihl = packet[IP].ihl
297     otl = len(packet[IP])
298     nfb = (fragsize - pre_ip_len - ihl * 4) / 8
299     fo = packet[IP].frag
300
301     p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
302     p[IP].flags = "MF"
303     p[IP].frag = fo
304     p[IP].len = ihl * 4 + nfb * 8
305     del p[IP].chksum
306     pkts.append(p)
307
308     p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
309     p[IP].len = otl - nfb * 8
310     p[IP].frag = fo + nfb
311     del p[IP].chksum
312
313     more_fragments = fragment_rfc791(p, fragsize, _logger)
314     pkts.extend(more_fragments)
315
316     return pkts
317
318
319 def fragment_rfc8200(packet, identification, fragsize, _logger=None):
320     """
321     Fragment an IPv6 packet per RFC 8200
322     :param packet: packet to fragment
323     :param fragsize: size at which to fragment
324     :note: IP options are not supported
325     :returns: list of fragments
326     """
327     logger = LoggerWrapper(_logger)
328     packet = packet.__class__(str(packet))  # recalculate all values
329     if len(packet) <= fragsize:
330         return [packet]
331     logger.debug(ppp("Fragmenting packet:", packet))
332     pkts = []
333     counter = 0
334     routing_hdr = None
335     hop_by_hop_hdr = None
336     upper_layer = None
337     seen_ipv6 = False
338     ipv6_nr = -1
339     l = packet.getlayer(counter)
340     while l is not None:
341         if l.__class__ is IPv6:
342             if seen_ipv6:
343                 # ignore 2nd IPv6 header and everything below..
344                 break
345             ipv6_nr = counter
346             seen_ipv6 = True
347         elif l.__class__ is IPv6ExtHdrFragment:
348             raise Exception("Already fragmented")
349         elif l.__class__ is IPv6ExtHdrRouting:
350             routing_hdr = counter
351         elif l.__class__ is IPv6ExtHdrHopByHop:
352             hop_by_hop_hdr = counter
353         elif seen_ipv6 and not upper_layer and \
354                 not l.__class__.__name__.startswith('IPv6ExtHdr'):
355             upper_layer = counter
356         counter = counter + 1
357         l = packet.getlayer(counter)
358
359     logger.debug(
360         "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
361         (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
362
363     if upper_layer is None:
364         raise Exception("Upper layer header not found in IPv6 packet")
365
366     last_per_fragment_hdr = ipv6_nr
367     if routing_hdr is None:
368         if hop_by_hop_hdr is not None:
369             last_per_fragment_hdr = hop_by_hop_hdr
370     else:
371         last_per_fragment_hdr = routing_hdr
372     logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
373
374     per_fragment_headers = packet.copy()
375     per_fragment_headers[last_per_fragment_hdr].remove_payload()
376     logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
377
378     ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
379     hex_payload = str(ext_and_upper_layer)
380     logger.debug("Payload length is %s" % len(hex_payload))
381     logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
382
383     fragment_ext_hdr = IPv6ExtHdrFragment()
384     logger.debug(ppp("Fragment header:", fragment_ext_hdr))
385
386     if len(per_fragment_headers) + len(fragment_ext_hdr) +\
387             len(ext_and_upper_layer) - len(ext_and_upper_layer.payload)\
388             > fragsize:
389         raise Exception("Cannot fragment this packet - MTU too small "
390                         "(%s, %s, %s, %s, %s)" % (
391                             len(per_fragment_headers), len(fragment_ext_hdr),
392                             len(ext_and_upper_layer),
393                             len(ext_and_upper_layer.payload), fragsize))
394
395     orig_nh = packet[IPv6].nh
396     p = per_fragment_headers
397     del p[IPv6].plen
398     del p[IPv6].nh
399     p = p / fragment_ext_hdr
400     del p[IPv6ExtHdrFragment].nh
401     first_payload_len_nfb = (fragsize - len(p)) / 8
402     p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
403     del p[IPv6].plen
404     p[IPv6ExtHdrFragment].nh = orig_nh
405     p[IPv6ExtHdrFragment].id = identification
406     p[IPv6ExtHdrFragment].offset = 0
407     p[IPv6ExtHdrFragment].m = 1
408     p = p.__class__(str(p))
409     logger.debug(ppp("Fragment %s:" % len(pkts), p))
410     pkts.append(p)
411     offset = first_payload_len_nfb * 8
412     logger.debug("Offset after first fragment: %s" % offset)
413     while len(hex_payload) > offset:
414         p = per_fragment_headers
415         del p[IPv6].plen
416         del p[IPv6].nh
417         p = p / fragment_ext_hdr
418         del p[IPv6ExtHdrFragment].nh
419         l_nfb = (fragsize - len(p)) / 8
420         p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
421         p[IPv6ExtHdrFragment].nh = orig_nh
422         p[IPv6ExtHdrFragment].id = identification
423         p[IPv6ExtHdrFragment].offset = offset / 8
424         p[IPv6ExtHdrFragment].m = 1
425         p = p.__class__(str(p))
426         logger.debug(ppp("Fragment %s:" % len(pkts), p))
427         pkts.append(p)
428         offset = offset + l_nfb * 8
429
430     pkts[-1][IPv6ExtHdrFragment].m = 0  # reset more-flags in last fragment
431
432     return pkts
433
434
435 def reassemble4_core(listoffragments, return_ip):
436     buffer = BytesIO()
437     first = listoffragments[0]
438     buffer.seek(20)
439     for pkt in listoffragments:
440         buffer.seek(pkt[IP].frag*8)
441         buffer.write(bytes(pkt[IP].payload))
442     first.len = len(buffer.getvalue()) + 20
443     first.flags = 0
444     del(first.chksum)
445     if return_ip:
446         header = bytes(first[IP])[:20]
447         return first[IP].__class__(header + buffer.getvalue())
448     else:
449         header = bytes(first[Ether])[:34]
450         return first[Ether].__class__(header + buffer.getvalue())
451
452
453 def reassemble4_ether(listoffragments):
454     return reassemble4_core(listoffragments, False)
455
456
457 def reassemble4(listoffragments):
458     return reassemble4_core(listoffragments, True)