test: factor out "L4_Conn" into a class within util.py (VPP-931) 37/8337/2
authorAndrew Yourtchenko <ayourtch@gmail.com>
Thu, 7 Sep 2017 11:22:24 +0000 (13:22 +0200)
committerDamjan Marion <dmarion.lists@gmail.com>
Thu, 7 Sep 2017 13:38:56 +0000 (13:38 +0000)
It seems a useful abstraction for the purposes of writing
fine-grained tests, to be able to create a "connection" object
which would be bound to two VPP interfaces, and hold some
information about the state, allowing to send the packets
back and forth with minimal amount of arguments.

Change-Id: Idb83b6b82b38bded5b7e1756a41bb2df4cd58e3a
Signed-off-by: Andrew Yourtchenko <ayourtch@gmail.com>
test/test_acl_plugin_conns.py
test/util.py

index 06f3cf7..0d4aa09 100644 (file)
@@ -13,6 +13,7 @@ from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting
 from scapy.layers.inet6 import IPv6ExtHdrFragment
 from pprint import pprint
 from random import randint
+from util import L4_Conn
 
 
 def to_acl_rule(self, is_permit, wildcard_sport=False):
@@ -68,37 +69,7 @@ class IterateWithSleep():
             self.testcase.sleep(self.sleep_sec)
 
 
-class Conn():
-    def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
-        self.testcase = testcase
-        self.ifs = [None, None]
-        self.ifs[0] = if1
-        self.ifs[1] = if2
-        self.address_family = af
-        self.l4proto = l4proto
-        self.ports = [None, None]
-        self.ports[0] = port1
-        self.ports[1] = port2
-        self
-
-    def pkt(self, side, flags=None):
-        is_ip6 = 1 if self.address_family == AF_INET6 else 0
-        s0 = side
-        s1 = 1-side
-        src_if = self.ifs[s0]
-        dst_if = self.ifs[s1]
-        layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
-                   IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
-        payload = "x"
-        l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
-        if flags is not None:
-            l4args['flags'] = flags
-        p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
-             layer_3[is_ip6] /
-             self.l4proto(**l4args) /
-             Raw(payload))
-        return p
-
+class Conn(L4_Conn):
     def apply_acls(self, reflect_side, acl_side):
         pkts = []
         pkts.append(self.pkt(0))
@@ -152,25 +123,6 @@ class Conn():
              }
         return new_rule
 
-    def send(self, side, flags=None):
-        self.ifs[side].add_stream(self.pkt(side, flags))
-        self.ifs[1-side].enable_capture()
-        self.testcase.pg_start()
-
-    def recv(self, side):
-        p = self.ifs[side].wait_for_packet(1)
-        return p
-
-    def send_through(self, side, flags=None):
-        self.send(side, flags)
-        p = self.recv(1-side)
-        return p
-
-    def send_pingpong(self, side, flags1=None, flags2=None):
-        p1 = self.send_through(side, flags1)
-        p2 = self.send_through(1-side, flags2)
-        return [p1, p2]
-
 
 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
 class ACLPluginConnTestCase(VppTestCase):
index d6aa9a4..3e0267a 100644 (file)
@@ -6,6 +6,13 @@ from abc import abstractmethod, ABCMeta
 from cStringIO import StringIO
 from scapy.layers.inet6 import in6_mactoifaceid
 
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+from scapy.layers.inet import IP, UDP, TCP
+from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
+from scapy.packet import Packet
+from socket import inet_pton, AF_INET, AF_INET6
+
 
 def ppp(headline, packet):
     """ Return string containing the output of scapy packet.show() call. """
@@ -163,3 +170,62 @@ class ForeignAddressFactory(object):
             raise Exception("Network host address exhaustion")
         self.count += 1
         return self.net_template.format(self.count)
+
+
+class L4_Conn():
+    """ L4 'connection' tied to two VPP interfaces """
+    def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
+        self.testcase = testcase
+        self.ifs = [None, None]
+        self.ifs[0] = if1
+        self.ifs[1] = if2
+        self.address_family = af
+        self.l4proto = l4proto
+        self.ports = [None, None]
+        self.ports[0] = port1
+        self.ports[1] = port2
+        self
+
+    def pkt(self, side, l4args={}, payload="x"):
+        is_ip6 = 1 if self.address_family == AF_INET6 else 0
+        s0 = side
+        s1 = 1-side
+        src_if = self.ifs[s0]
+        dst_if = self.ifs[s1]
+        layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
+                   IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
+        merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
+        merged_l4args.update(l4args)
+        p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
+             layer_3[is_ip6] /
+             self.l4proto(**merged_l4args) /
+             Raw(payload))
+        return p
+
+    def send(self, side, flags=None, payload=""):
+        l4args = {}
+        if flags is not None:
+            l4args['flags'] = flags
+        self.ifs[side].add_stream(self.pkt(side,
+                                           l4args=l4args, payload=payload))
+        self.ifs[1-side].enable_capture()
+        self.testcase.pg_start()
+
+    def recv(self, side):
+        p = self.ifs[side].wait_for_packet(1)
+        return p
+
+    def send_through(self, side, flags=None, payload=""):
+        self.send(side, flags, payload)
+        p = self.recv(1-side)
+        return p
+
+    def send_pingpong(self, side, flags1=None, flags2=None):
+        p1 = self.send_through(side, flags1)
+        p2 = self.send_through(1-side, flags2)
+        return [p1, p2]
+
+
+class L4_CONN_SIDE:
+    L4_CONN_SIDE_ZERO = 0
+    L4_CONN_SIDE_ONE = 1