PAPI: Add MACAddress object wrapper for vl_api_mac_address_t
[vpp.git] / test / util.py
1 """ test framework utilities """
2
3 import socket
4 import sys
5 import os.path
6 from abc import abstractmethod, ABCMeta
7 from scapy.utils6 import in6_mactoifaceid
8
9 from scapy.layers.l2 import Ether
10 from scapy.packet import Raw
11 from scapy.layers.inet import IP
12 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, IPv6ExtHdrRouting,\
13     IPv6ExtHdrHopByHop
14 from scapy.utils import hexdump
15 from socket import AF_INET6
16 from io import BytesIO
17 from vpp_papi import mac_pton
18
19
20 def ppp(headline, packet):
21     """ Return string containing the output of scapy packet.show() call. """
22     o = BytesIO()
23     old_stdout = sys.stdout
24     sys.stdout = o
25     print(headline)
26     hexdump(packet)
27     print("")
28     packet.show()
29     sys.stdout = old_stdout
30     return o.getvalue()
31
32
33 def ppc(headline, capture, limit=10):
34     """ Return string containing ppp() printout for a capture.
35
36     :param headline: printed as first line of output
37     :param capture: packets to print
38     :param limit: limit the print to # of packets
39     """
40     if not capture:
41         return headline
42     tail = ""
43     if limit < len(capture):
44         tail = "\nPrint limit reached, %s out of %s packets printed" % (
45             limit, len(capture))
46     body = "".join([ppp("Packet #%s:" % count, p)
47                     for count, p in zip(range(0, limit), capture)])
48     return "%s\n%s%s" % (headline, body, tail)
49
50
51 def ip4_range(ip4, s, e):
52     tmp = ip4.rsplit('.', 1)[0]
53     return ("%s.%d" % (tmp, i) for i in range(s, e))
54
55
56 def ip4n_range(ip4n, s, e):
57     ip4 = socket.inet_ntop(socket.AF_INET, ip4n)
58     return (socket.inet_pton(socket.AF_INET, ip)
59             for ip in ip4_range(ip4, s, e))
60
61
62 def mk_ll_addr(mac):
63     euid = in6_mactoifaceid(mac)
64     addr = "fe80::" + euid
65     return addr
66
67
68 def ip6_normalize(ip6):
69     return socket.inet_ntop(socket.AF_INET6,
70                             socket.inet_pton(socket.AF_INET6, ip6))
71
72
73 def get_core_path(tempdir):
74     return "%s/%s" % (tempdir, get_core_pattern())
75
76
77 def is_core_present(tempdir):
78     return os.path.isfile(get_core_path(tempdir))
79
80
81 def get_core_pattern():
82     with open("/proc/sys/kernel/core_pattern", "r") as f:
83         corefmt = f.read().strip()
84     return corefmt
85
86
87 def check_core_path(logger, core_path):
88     corefmt = get_core_pattern()
89     if corefmt.startswith("|"):
90         logger.error(
91             "WARNING: redirecting the core dump through a"
92             " filter may result in truncated dumps.")
93         logger.error(
94             "   You may want to check the filter settings"
95             " or uninstall it and edit the"
96             " /proc/sys/kernel/core_pattern accordingly.")
97         logger.error(
98             "   current core pattern is: %s" % corefmt)
99
100
101 class NumericConstant(object):
102     __metaclass__ = ABCMeta
103
104     desc_dict = {}
105
106     @abstractmethod
107     def __init__(self, value):
108         self._value = value
109
110     def __int__(self):
111         return self._value
112
113     def __long__(self):
114         return self._value
115
116     def __str__(self):
117         if self._value in self.desc_dict:
118             return self.desc_dict[self._value]
119         return ""
120
121
122 class Host(object):
123     """ Generic test host "connected" to VPPs interface. """
124
125     @property
126     def mac(self):
127         """ MAC address """
128         return self._mac
129
130     @property
131     def bin_mac(self):
132         """ MAC address """
133         return mac_pton(self._mac)
134
135     @property
136     def ip4(self):
137         """ IPv4 address - string """
138         return self._ip4
139
140     @property
141     def ip4n(self):
142         """ IPv4 address of remote host - raw, suitable as API parameter."""
143         return socket.inet_pton(socket.AF_INET, self._ip4)
144
145     @property
146     def ip6(self):
147         """ IPv6 address - string """
148         return self._ip6
149
150     @property
151     def ip6n(self):
152         """ IPv6 address of remote host - raw, suitable as API parameter."""
153         return socket.inet_pton(socket.AF_INET6, self._ip6)
154
155     @property
156     def ip6_ll(self):
157         """ IPv6 link-local address - string """
158         return self._ip6_ll
159
160     @property
161     def ip6n_ll(self):
162         """ IPv6 link-local address of remote host -
163         raw, suitable as API parameter."""
164         return socket.inet_pton(socket.AF_INET6, self._ip6_ll)
165
166     def __eq__(self, other):
167         if isinstance(other, Host):
168             return (self.mac == other.mac and
169                     self.ip4 == other.ip4 and
170                     self.ip6 == other.ip6 and
171                     self.ip6_ll == other.ip6_ll)
172         else:
173             return False
174
175     def __ne__(self, other):
176         return not self.__eq__(other)
177
178     def __repr__(self):
179         return "Host { mac:%s ip4:%s ip6:%s ip6_ll:%s }" % (self.mac,
180                                                             self.ip4,
181                                                             self.ip6,
182                                                             self.ip6_ll)
183
184     def __hash__(self):
185         return hash(self.__repr__())
186
187     def __init__(self, mac=None, ip4=None, ip6=None, ip6_ll=None):
188         self._mac = mac
189         self._ip4 = ip4
190         self._ip6 = ip6
191         self._ip6_ll = ip6_ll
192
193
194 class ForeignAddressFactory(object):
195     count = 0
196     prefix_len = 24
197     net_template = '10.10.10.{}'
198     net = net_template.format(0) + '/' + str(prefix_len)
199
200     def get_ip4(self):
201         if self.count > 255:
202             raise Exception("Network host address exhaustion")
203         self.count += 1
204         return self.net_template.format(self.count)
205
206
207 class L4_Conn():
208     """ L4 'connection' tied to two VPP interfaces """
209
210     def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
211         self.testcase = testcase
212         self.ifs = [None, None]
213         self.ifs[0] = if1
214         self.ifs[1] = if2
215         self.address_family = af
216         self.l4proto = l4proto
217         self.ports = [None, None]
218         self.ports[0] = port1
219         self.ports[1] = port2
220         self
221
222     def pkt(self, side, l4args={}, payload="x"):
223         is_ip6 = 1 if self.address_family == AF_INET6 else 0
224         s0 = side
225         s1 = 1 - side
226         src_if = self.ifs[s0]
227         dst_if = self.ifs[s1]
228         layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
229                    IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
230         merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
231         merged_l4args.update(l4args)
232         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
233              layer_3[is_ip6] /
234              self.l4proto(**merged_l4args) /
235              Raw(payload))
236         return p
237
238     def send(self, side, flags=None, payload=""):
239         l4args = {}
240         if flags is not None:
241             l4args['flags'] = flags
242         self.ifs[side].add_stream(self.pkt(side,
243                                            l4args=l4args, payload=payload))
244         self.ifs[1 - side].enable_capture()
245         self.testcase.pg_start()
246
247     def recv(self, side):
248         p = self.ifs[side].wait_for_packet(1)
249         return p
250
251     def send_through(self, side, flags=None, payload=""):
252         self.send(side, flags, payload)
253         p = self.recv(1 - side)
254         return p
255
256     def send_pingpong(self, side, flags1=None, flags2=None):
257         p1 = self.send_through(side, flags1)
258         p2 = self.send_through(1 - side, flags2)
259         return [p1, p2]
260
261
262 class L4_CONN_SIDE:
263     L4_CONN_SIDE_ZERO = 0
264     L4_CONN_SIDE_ONE = 1
265
266
267 class LoggerWrapper(object):
268     def __init__(self, logger=None):
269         self._logger = logger
270
271     def debug(self, *args, **kwargs):
272         if self._logger:
273             self._logger.debug(*args, **kwargs)
274
275     def error(self, *args, **kwargs):
276         if self._logger:
277             self._logger.error(*args, **kwargs)
278
279
280 def fragment_rfc791(packet, fragsize, _logger=None):
281     """
282     Fragment an IPv4 packet per RFC 791
283     :param packet: packet to fragment
284     :param fragsize: size at which to fragment
285     :note: IP options are not supported
286     :returns: list of fragments
287     """
288     logger = LoggerWrapper(_logger)
289     logger.debug(ppp("Fragmenting packet:", packet))
290     packet = packet.__class__(str(packet))  # recalculate all values
291     if len(packet[IP].options) > 0:
292         raise Exception("Not implemented")
293     if len(packet) <= fragsize:
294         return [packet]
295
296     pre_ip_len = len(packet) - len(packet[IP])
297     ip_header_len = packet[IP].ihl * 4
298     hex_packet = str(packet)
299     hex_headers = hex_packet[:(pre_ip_len + ip_header_len)]
300     hex_payload = hex_packet[(pre_ip_len + ip_header_len):]
301
302     pkts = []
303     ihl = packet[IP].ihl
304     otl = len(packet[IP])
305     nfb = (fragsize - pre_ip_len - ihl * 4) / 8
306     fo = packet[IP].frag
307
308     p = packet.__class__(hex_headers + hex_payload[:nfb * 8])
309     p[IP].flags = "MF"
310     p[IP].frag = fo
311     p[IP].len = ihl * 4 + nfb * 8
312     del p[IP].chksum
313     pkts.append(p)
314
315     p = packet.__class__(hex_headers + hex_payload[nfb * 8:])
316     p[IP].len = otl - nfb * 8
317     p[IP].frag = fo + nfb
318     del p[IP].chksum
319
320     more_fragments = fragment_rfc791(p, fragsize, _logger)
321     pkts.extend(more_fragments)
322
323     return pkts
324
325
326 def fragment_rfc8200(packet, identification, fragsize, _logger=None):
327     """
328     Fragment an IPv6 packet per RFC 8200
329     :param packet: packet to fragment
330     :param fragsize: size at which to fragment
331     :note: IP options are not supported
332     :returns: list of fragments
333     """
334     logger = LoggerWrapper(_logger)
335     packet = packet.__class__(str(packet))  # recalculate all values
336     if len(packet) <= fragsize:
337         return [packet]
338     logger.debug(ppp("Fragmenting packet:", packet))
339     pkts = []
340     counter = 0
341     routing_hdr = None
342     hop_by_hop_hdr = None
343     upper_layer = None
344     seen_ipv6 = False
345     ipv6_nr = -1
346     l = packet.getlayer(counter)
347     while l is not None:
348         if l.__class__ is IPv6:
349             if seen_ipv6:
350                 # ignore 2nd IPv6 header and everything below..
351                 break
352             ipv6_nr = counter
353             seen_ipv6 = True
354         elif l.__class__ is IPv6ExtHdrFragment:
355             raise Exception("Already fragmented")
356         elif l.__class__ is IPv6ExtHdrRouting:
357             routing_hdr = counter
358         elif l.__class__ is IPv6ExtHdrHopByHop:
359             hop_by_hop_hdr = counter
360         elif seen_ipv6 and not upper_layer and \
361                 not l.__class__.__name__.startswith('IPv6ExtHdr'):
362             upper_layer = counter
363         counter = counter + 1
364         l = packet.getlayer(counter)
365
366     logger.debug(
367         "Layers seen: IPv6(#%s), Routing(#%s), HopByHop(#%s), upper(#%s)" %
368         (ipv6_nr, routing_hdr, hop_by_hop_hdr, upper_layer))
369
370     if upper_layer is None:
371         raise Exception("Upper layer header not found in IPv6 packet")
372
373     last_per_fragment_hdr = ipv6_nr
374     if routing_hdr is None:
375         if hop_by_hop_hdr is not None:
376             last_per_fragment_hdr = hop_by_hop_hdr
377     else:
378         last_per_fragment_hdr = routing_hdr
379     logger.debug("Last per-fragment hdr is #%s" % (last_per_fragment_hdr))
380
381     per_fragment_headers = packet.copy()
382     per_fragment_headers[last_per_fragment_hdr].remove_payload()
383     logger.debug(ppp("Per-fragment headers:", per_fragment_headers))
384
385     ext_and_upper_layer = packet.getlayer(last_per_fragment_hdr)[1]
386     hex_payload = str(ext_and_upper_layer)
387     logger.debug("Payload length is %s" % len(hex_payload))
388     logger.debug(ppp("Ext and upper layer:", ext_and_upper_layer))
389
390     fragment_ext_hdr = IPv6ExtHdrFragment()
391     logger.debug(ppp("Fragment header:", fragment_ext_hdr))
392
393     if len(per_fragment_headers) + len(fragment_ext_hdr) +\
394             len(ext_and_upper_layer) - len(ext_and_upper_layer.payload)\
395             > fragsize:
396         raise Exception("Cannot fragment this packet - MTU too small "
397                         "(%s, %s, %s, %s, %s)" % (
398                             len(per_fragment_headers), len(fragment_ext_hdr),
399                             len(ext_and_upper_layer),
400                             len(ext_and_upper_layer.payload), fragsize))
401
402     orig_nh = packet[IPv6].nh
403     p = per_fragment_headers
404     del p[IPv6].plen
405     del p[IPv6].nh
406     p = p / fragment_ext_hdr
407     del p[IPv6ExtHdrFragment].nh
408     first_payload_len_nfb = (fragsize - len(p)) / 8
409     p = p / Raw(hex_payload[:first_payload_len_nfb * 8])
410     del p[IPv6].plen
411     p[IPv6ExtHdrFragment].nh = orig_nh
412     p[IPv6ExtHdrFragment].id = identification
413     p[IPv6ExtHdrFragment].offset = 0
414     p[IPv6ExtHdrFragment].m = 1
415     p = p.__class__(str(p))
416     logger.debug(ppp("Fragment %s:" % len(pkts), p))
417     pkts.append(p)
418     offset = first_payload_len_nfb * 8
419     logger.debug("Offset after first fragment: %s" % offset)
420     while len(hex_payload) > offset:
421         p = per_fragment_headers
422         del p[IPv6].plen
423         del p[IPv6].nh
424         p = p / fragment_ext_hdr
425         del p[IPv6ExtHdrFragment].nh
426         l_nfb = (fragsize - len(p)) / 8
427         p = p / Raw(hex_payload[offset:offset + l_nfb * 8])
428         p[IPv6ExtHdrFragment].nh = orig_nh
429         p[IPv6ExtHdrFragment].id = identification
430         p[IPv6ExtHdrFragment].offset = offset / 8
431         p[IPv6ExtHdrFragment].m = 1
432         p = p.__class__(str(p))
433         logger.debug(ppp("Fragment %s:" % len(pkts), p))
434         pkts.append(p)
435         offset = offset + l_nfb * 8
436
437     pkts[-1][IPv6ExtHdrFragment].m = 0  # reset more-flags in last fragment
438
439     return pkts
440
441
442 def reassemble4_core(listoffragments, return_ip):
443     buffer = BytesIO()
444     first = listoffragments[0]
445     buffer.seek(20)
446     for pkt in listoffragments:
447         buffer.seek(pkt[IP].frag*8)
448         buffer.write(bytes(pkt[IP].payload))
449     first.len = len(buffer.getvalue()) + 20
450     first.flags = 0
451     del(first.chksum)
452     if return_ip:
453         header = bytes(first[IP])[:20]
454         return first[IP].__class__(header + buffer.getvalue())
455     else:
456         header = bytes(first[Ether])[:34]
457         return first[Ether].__class__(header + buffer.getvalue())
458
459
460 def reassemble4_ether(listoffragments):
461     return reassemble4_core(listoffragments, False)
462
463
464 def reassemble4(listoffragments):
465     return reassemble4_core(listoffragments, True)