tcp: add builtin server/client transfer test
[vpp.git] / test / util.py
index d6aa9a4..3e0267a 100644 (file)
@@ -6,6 +6,13 @@ from abc import abstractmethod, ABCMeta
 from cStringIO import StringIO
 from scapy.layers.inet6 import in6_mactoifaceid
 
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+from scapy.layers.inet import IP, UDP, TCP
+from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
+from scapy.packet import Packet
+from socket import inet_pton, AF_INET, AF_INET6
+
 
 def ppp(headline, packet):
     """ Return string containing the output of scapy packet.show() call. """
@@ -163,3 +170,62 @@ class ForeignAddressFactory(object):
             raise Exception("Network host address exhaustion")
         self.count += 1
         return self.net_template.format(self.count)
+
+
+class L4_Conn():
+    """ L4 'connection' tied to two VPP interfaces """
+    def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
+        self.testcase = testcase
+        self.ifs = [None, None]
+        self.ifs[0] = if1
+        self.ifs[1] = if2
+        self.address_family = af
+        self.l4proto = l4proto
+        self.ports = [None, None]
+        self.ports[0] = port1
+        self.ports[1] = port2
+        self
+
+    def pkt(self, side, l4args={}, payload="x"):
+        is_ip6 = 1 if self.address_family == AF_INET6 else 0
+        s0 = side
+        s1 = 1-side
+        src_if = self.ifs[s0]
+        dst_if = self.ifs[s1]
+        layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
+                   IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
+        merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
+        merged_l4args.update(l4args)
+        p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
+             layer_3[is_ip6] /
+             self.l4proto(**merged_l4args) /
+             Raw(payload))
+        return p
+
+    def send(self, side, flags=None, payload=""):
+        l4args = {}
+        if flags is not None:
+            l4args['flags'] = flags
+        self.ifs[side].add_stream(self.pkt(side,
+                                           l4args=l4args, payload=payload))
+        self.ifs[1-side].enable_capture()
+        self.testcase.pg_start()
+
+    def recv(self, side):
+        p = self.ifs[side].wait_for_packet(1)
+        return p
+
+    def send_through(self, side, flags=None, payload=""):
+        self.send(side, flags, payload)
+        p = self.recv(1-side)
+        return p
+
+    def send_pingpong(self, side, flags1=None, flags2=None):
+        p1 = self.send_through(side, flags1)
+        p2 = self.send_through(1-side, flags2)
+        return [p1, p2]
+
+
+class L4_CONN_SIDE:
+    L4_CONN_SIDE_ZERO = 0
+    L4_CONN_SIDE_ONE = 1