From: Andrew Yourtchenko Date: Thu, 7 Sep 2017 11:22:24 +0000 (+0200) Subject: test: factor out "L4_Conn" into a class within util.py (VPP-931) X-Git-Tag: v17.10-rc1~120 X-Git-Url: https://gerrit.fd.io/r/gitweb?a=commitdiff_plain;ds=sidebyside;h=92dc12a01b1f5c75500bb015bd159d6bba0547b5;p=vpp.git test: factor out "L4_Conn" into a class within util.py (VPP-931) It seems a useful abstraction for the purposes of writing fine-grained tests, to be able to create a "connection" object which would be bound to two VPP interfaces, and hold some information about the state, allowing to send the packets back and forth with minimal amount of arguments. Change-Id: Idb83b6b82b38bded5b7e1756a41bb2df4cd58e3a Signed-off-by: Andrew Yourtchenko --- diff --git a/test/test_acl_plugin_conns.py b/test/test_acl_plugin_conns.py index 06f3cf7e178..0d4aa09d5eb 100644 --- a/test/test_acl_plugin_conns.py +++ b/test/test_acl_plugin_conns.py @@ -13,6 +13,7 @@ from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting from scapy.layers.inet6 import IPv6ExtHdrFragment from pprint import pprint from random import randint +from util import L4_Conn def to_acl_rule(self, is_permit, wildcard_sport=False): @@ -68,37 +69,7 @@ class IterateWithSleep(): self.testcase.sleep(self.sleep_sec) -class Conn(): - def __init__(self, testcase, if1, if2, af, l4proto, port1, port2): - self.testcase = testcase - self.ifs = [None, None] - self.ifs[0] = if1 - self.ifs[1] = if2 - self.address_family = af - self.l4proto = l4proto - self.ports = [None, None] - self.ports[0] = port1 - self.ports[1] = port2 - self - - def pkt(self, side, flags=None): - is_ip6 = 1 if self.address_family == AF_INET6 else 0 - s0 = side - s1 = 1-side - src_if = self.ifs[s0] - dst_if = self.ifs[s1] - layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4), - IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)] - payload = "x" - l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]} - if flags is not None: - l4args['flags'] = flags - p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / - layer_3[is_ip6] / - self.l4proto(**l4args) / - Raw(payload)) - return p - +class Conn(L4_Conn): def apply_acls(self, reflect_side, acl_side): pkts = [] pkts.append(self.pkt(0)) @@ -152,25 +123,6 @@ class Conn(): } return new_rule - def send(self, side, flags=None): - self.ifs[side].add_stream(self.pkt(side, flags)) - self.ifs[1-side].enable_capture() - self.testcase.pg_start() - - def recv(self, side): - p = self.ifs[side].wait_for_packet(1) - return p - - def send_through(self, side, flags=None): - self.send(side, flags) - p = self.recv(1-side) - return p - - def send_pingpong(self, side, flags1=None, flags2=None): - p1 = self.send_through(side, flags1) - p2 = self.send_through(1-side, flags2) - return [p1, p2] - @unittest.skipUnless(running_extended_tests(), "part of extended tests") class ACLPluginConnTestCase(VppTestCase): diff --git a/test/util.py b/test/util.py index d6aa9a42677..3e0267a3f4d 100644 --- a/test/util.py +++ b/test/util.py @@ -6,6 +6,13 @@ from abc import abstractmethod, ABCMeta from cStringIO import StringIO from scapy.layers.inet6 import in6_mactoifaceid +from scapy.layers.l2 import Ether +from scapy.packet import Raw +from scapy.layers.inet import IP, UDP, TCP +from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest +from scapy.packet import Packet +from socket import inet_pton, AF_INET, AF_INET6 + def ppp(headline, packet): """ Return string containing the output of scapy packet.show() call. """ @@ -163,3 +170,62 @@ class ForeignAddressFactory(object): raise Exception("Network host address exhaustion") self.count += 1 return self.net_template.format(self.count) + + +class L4_Conn(): + """ L4 'connection' tied to two VPP interfaces """ + def __init__(self, testcase, if1, if2, af, l4proto, port1, port2): + self.testcase = testcase + self.ifs = [None, None] + self.ifs[0] = if1 + self.ifs[1] = if2 + self.address_family = af + self.l4proto = l4proto + self.ports = [None, None] + self.ports[0] = port1 + self.ports[1] = port2 + self + + def pkt(self, side, l4args={}, payload="x"): + is_ip6 = 1 if self.address_family == AF_INET6 else 0 + s0 = side + s1 = 1-side + src_if = self.ifs[s0] + dst_if = self.ifs[s1] + layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4), + IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)] + merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]} + merged_l4args.update(l4args) + p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / + layer_3[is_ip6] / + self.l4proto(**merged_l4args) / + Raw(payload)) + return p + + def send(self, side, flags=None, payload=""): + l4args = {} + if flags is not None: + l4args['flags'] = flags + self.ifs[side].add_stream(self.pkt(side, + l4args=l4args, payload=payload)) + self.ifs[1-side].enable_capture() + self.testcase.pg_start() + + def recv(self, side): + p = self.ifs[side].wait_for_packet(1) + return p + + def send_through(self, side, flags=None, payload=""): + self.send(side, flags, payload) + p = self.recv(1-side) + return p + + def send_pingpong(self, side, flags1=None, flags2=None): + p1 = self.send_through(side, flags1) + p2 = self.send_through(1-side, flags2) + return [p1, p2] + + +class L4_CONN_SIDE: + L4_CONN_SIDE_ZERO = 0 + L4_CONN_SIDE_ONE = 1