refactor test framework
[vpp.git] / test / test_l2xc.py
index f5fc874..d448a04 100644 (file)
 #!/usr/bin/env python
-## @file test_l2xc.py
-#  Module to provide L2 cross-connect test case.
-#
-#  The module provides a set of tools for L2 cross-connect tests.
-
-import logging
-logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
 
 import unittest
 import random
-from framework import VppTestCase, VppTestRunner
-from scapy.layers.l2 import Ether, Raw
+
+from scapy.packet import Raw
+from scapy.layers.l2 import Ether
 from scapy.layers.inet import IP, UDP
+from logging import *
+
+from framework import VppTestCase, VppTestRunner
+from util import TestHost
 
 
-## Subclass of the VppTestCase class.
-#
-#  This subclass is a class for L2 cross-connect test cases. It provides methods
-#  to create interfaces, configuring L2 cross-connects, creating and verifying
-#  packet streams.
 class TestL2xc(VppTestCase):
     """ L2XC Test Case """
 
     # Test variables
-    interf_nr = 4           # Number of interfaces
     hosts_nr = 10           # Number of hosts
     pkts_per_burst = 257    # Number of packets per burst
 
-    ## Class method to start the test case.
-    #  Overrides setUpClass method in VppTestCase class.
-    #  There is used try..except statement to ensure that the tear down of
-    #  the class will be executed even if any exception is raised.
-    #  @param cls The class pointer.
     @classmethod
     def setUpClass(cls):
         super(TestL2xc, cls).setUpClass()
 
-        try:
-            ## Create interfaces
-            cls.interfaces = range(TestL2xc.interf_nr)
-            cls.create_interfaces(cls.interfaces)
+    def setUp(self):
+        super(TestL2xc, self).setUp()
 
-            ## Create bi-directional cross-connects between pg0 and pg1
-            cls.api("sw_interface_set_l2_xconnect rx pg0 tx pg1 enable")
-            cls.api("sw_interface_set_l2_xconnect rx pg1 tx pg0 enable")
+        # create 4 pg interfaces
+        self.create_pg_interfaces(range(4))
 
-            ## Create bi-directional cross-connects between pg2 and pg3
-            cls.api("sw_interface_set_l2_xconnect rx pg2 tx pg3 enable")
-            cls.api("sw_interface_set_l2_xconnect rx pg3 tx pg2 enable")
+        # packet flows mapping pg0 -> pg1, pg2 -> pg3, etc.
+        self.flows = dict()
+        self.flows[self.pg0] = [self.pg1]
+        self.flows[self.pg1] = [self.pg0]
+        self.flows[self.pg2] = [self.pg3]
+        self.flows[self.pg3] = [self.pg2]
 
-            cls.cli(0, "show l2patch")
+        # packet sizes
+        self.pg_if_packet_sizes = [64, 512, 1518, 9018]
 
-            ## Create host MAC and IPv4 lists
-            cls.create_host_lists(TestL2xc.hosts_nr)
+        self.interfaces = list(self.pg_interfaces)
 
-        except Exception as e:
-            cls.tearDownClass()
-            raise e
+        # Create bi-directional cross-connects between pg0 and pg1
+        self.vapi.sw_interface_set_l2_xconnect(
+            self.pg0.sw_if_index, self.pg1.sw_if_index, enable=1)
+        self.vapi.sw_interface_set_l2_xconnect(
+            self.pg1.sw_if_index, self.pg0.sw_if_index, enable=1)
+
+        # Create bi-directional cross-connects between pg2 and pg3
+        self.vapi.sw_interface_set_l2_xconnect(
+            self.pg2.sw_if_index, self.pg3.sw_if_index, enable=1)
+        self.vapi.sw_interface_set_l2_xconnect(
+            self.pg3.sw_if_index, self.pg2.sw_if_index, enable=1)
+
+        info(self.vapi.cli("show l2patch"))
+
+        # mapping between packet-generator index and lists of test hosts
+        self.hosts_by_pg_idx = dict()
+
+        # Create host MAC and IPv4 lists
+        # self.MY_MACS = dict()
+        # self.MY_IP4S = dict()
+        self.create_host_lists(TestL2xc.hosts_nr)
+
+        # setup all interfaces
+        for i in self.interfaces:
+            i.admin_up()
 
-    ## Method to define tear down VPP actions of the test case.
-    #  Overrides tearDown method in VppTestCase class.
-    #  @param self The object pointer.
     def tearDown(self):
-        self.cli(2, "show int")
-        self.cli(2, "show trace")
-        self.cli(2, "show hardware")
-        self.cli(2, "show l2patch")
-        self.cli(2, "show error")
-        self.cli(2, "show run")
-
-    ## Class method to create required number of MAC and IPv4 addresses.
-    #  Create required number of host MAC addresses and distribute them among
-    #  interfaces. Create host IPv4 address for every host MAC address too.
-    #  @param cls The class pointer.
-    #  @param count Integer variable to store the number of MAC addresses to be
-    #  created.
-    @classmethod
-    def create_host_lists(cls, count):
-        for i in cls.interfaces:
-            cls.MY_MACS[i] = []
-            cls.MY_IP4S[i] = []
+        super(TestL2xc, self).tearDown()
+        if not self.vpp_dead:
+            info(self.vapi.cli("show l2patch"))
+
+    def create_host_lists(self, count):
+        """ Method to create required number of MAC and IPv4 addresses.
+        Create required number of host MAC addresses and distribute them among
+        interfaces. Create host IPv4 address for every host MAC address too.
+
+        :param count: Number of hosts to create MAC and IPv4 addresses for.
+        Type: int
+        """
+        for pg_if in self.pg_interfaces:
+            # self.MY_MACS[i.sw_if_index] = []
+            # self.MY_IP4S[i.sw_if_index] = []
+            self.hosts_by_pg_idx[pg_if.sw_if_index] = []
+            hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
             for j in range(0, count):
-                cls.MY_MACS[i].append("00:00:00:ff:%02x:%02x" % (i, j))
-                cls.MY_IP4S[i].append("172.17.1%02x.%u" % (i, j))
-        ## @var MY_MACS
-        #  Dictionary variable to store list of MAC addresses per interface.
-        ## @var MY_IP4S
-        #  Dictionary variable to store list of IPv4 addresses per interface.
-
-    ## Method to create packet stream for the packet generator interface.
-    #  Create input packet stream for the given packet generator interface with
-    #  packets of different length targeted for all other created packet
-    #  generator interfaces.
-    #  @param self The object pointer.
-    #  @param pg_id Integer variable to store the index of the interface to
-    #  create the input packet stream.
-    #  @return pkts List variable to store created input stream of packets.
-    def create_stream(self, pg_id):
-        # TODO: use variables to create lists based on interface number
-        pg_targets = [None] * 4
-        pg_targets[0] = [1]
-        pg_targets[1] = [0]
-        pg_targets[2] = [3]
-        pg_targets[3] = [2]
+                host = TestHost(
+                    "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
+                    "172.17.1%02x.%u" % (pg_if.sw_if_index, j))
+                hosts.append(host)
+
+    def create_stream(self, src_if, packet_sizes):
         pkts = []
         for i in range(0, TestL2xc.pkts_per_burst):
-            target_pg_id = pg_targets[pg_id][0]
-            target_host_id = random.randrange(len(self.MY_MACS[target_pg_id]))
-            source_host_id = random.randrange(len(self.MY_MACS[pg_id]))
-            pkt_info = self.create_packet_info(pg_id, target_pg_id)
+            dst_if = self.flows[src_if][0]
+            dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index])
+            src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index])
+            pkt_info = self.create_packet_info(
+                src_if.sw_if_index, dst_if.sw_if_index)
             payload = self.info_to_payload(pkt_info)
-            p = (Ether(dst=self.MY_MACS[target_pg_id][target_host_id],
-                       src=self.MY_MACS[pg_id][source_host_id]) /
-                 IP(src=self.MY_IP4S[pg_id][source_host_id],
-                    dst=self.MY_IP4S[target_pg_id][target_host_id]) /
+            p = (Ether(dst=dst_host.mac, src=src_host.mac) /
+                 IP(src=src_host.ip4, dst=dst_host.ip4) /
                  UDP(sport=1234, dport=1234) /
                  Raw(payload))
             pkt_info.data = p.copy()
-            packet_sizes = [64, 512, 1518, 9018]
             size = packet_sizes[(i / 2) % len(packet_sizes)]
             self.extend_packet(p, size)
             pkts.append(p)
         return pkts
-        ## @var pg_targets
-        #  List variable to store list of indexes of target packet generator
-        #  interfaces for every source packet generator interface.
-        ## @var target_pg_id
-        #  Integer variable to store the index of the random target packet
-        #  generator interfaces.
-        ## @var target_host_id
-        #  Integer variable to store the index of the randomly chosen
-        #  destination host MAC/IPv4 address.
-        ## @var source_host_id
-        #  Integer variable to store the index of the randomly chosen source
-        #  host MAC/IPv4 address.
-        ## @var pkt_info
-        #  Object variable to store the information about the generated packet.
-        ## @var payload
-        #  String variable to store the payload of the packet to be generated.
-        ## @var p
-        #  Object variable to store the generated packet.
-        ## @var packet_sizes
-        #  List variable to store required packet sizes.
-        ## @var size
-        #  List variable to store required packet sizes.
-
-    ## Method to verify packet stream received on the packet generator interface.
-    #  Verify packet-by-packet the output stream captured on a given packet
-    #  generator (pg) interface using following packet payload data - order of
-    #  packet in the stream, index of the source and destination pg interface,
-    #  src and dst host IPv4 addresses and src port and dst port values of UDP
-    #  layer.
-    #  @param self The object pointer.
-    #  @param o Integer variable to store the index of the interface to
-    #  verify the output packet stream.
-    #  @param capture List variable to store the captured output packet stream.
-    def verify_capture(self, o, capture):
-        last_info = {}
+
+    def verify_capture(self, pg_if, capture):
+        last_info = dict()
         for i in self.interfaces:
-            last_info[i] = None
+            last_info[i.sw_if_index] = None
+        dst_sw_if_index = pg_if.sw_if_index
         for packet in capture:
             try:
                 ip = packet[IP]
                 udp = packet[UDP]
                 payload_info = self.payload_to_info(str(packet[Raw]))
-                self.assertEqual(payload_info.dst, o)
-                self.log("Got packet on port %u: src=%u (id=%u)"
-                         % (o, payload_info.src, payload_info.index), 2)
+                packet_index = payload_info.index
+                self.assertEqual(payload_info.dst, dst_sw_if_index)
+                debug("Got packet on port %s: src=%u (id=%u)" %
+                      (pg_if.name, payload_info.src, packet_index))
                 next_info = self.get_next_packet_info_for_interface2(
-                    payload_info.src, payload_info.dst,
+                    payload_info.src, dst_sw_if_index,
                     last_info[payload_info.src])
                 last_info[payload_info.src] = next_info
                 self.assertTrue(next_info is not None)
-                self.assertEqual(payload_info.index, next_info.index)
+                self.assertEqual(packet_index, next_info.index)
+                saved_packet = next_info.data
                 # Check standard fields
-                self.assertEqual(ip.src, next_info.data[IP].src)
-                self.assertEqual(ip.dst, next_info.data[IP].dst)
-                self.assertEqual(udp.sport, next_info.data[UDP].sport)
-                self.assertEqual(udp.dport, next_info.data[UDP].dport)
+                self.assertEqual(ip.src, saved_packet[IP].src)
+                self.assertEqual(ip.dst, saved_packet[IP].dst)
+                self.assertEqual(udp.sport, saved_packet[UDP].sport)
+                self.assertEqual(udp.dport, saved_packet[UDP].dport)
             except:
-                self.log("Unexpected or invalid packet:")
+                error("Unexpected or invalid packet:")
                 packet.show()
                 raise
         for i in self.interfaces:
             remaining_packet = self.get_next_packet_info_for_interface2(
-                i, o, last_info[i])
+                i, dst_sw_if_index, last_info[i.sw_if_index])
             self.assertTrue(remaining_packet is None,
                             "Port %u: Packet expected from source %u didn't"
-                            " arrive" % (o, i))
-        ## @var last_info
-        #  Dictionary variable to store verified packets per packet generator
-        #  interface.
-        ## @var ip
-        #  Object variable to store the IP layer of the packet.
-        ## @var udp
-        #  Object variable to store the UDP layer of the packet.
-        ## @var payload_info
-        #  Object variable to store required information about the packet.
-        ## @var next_info
-        #  Object variable to store information about next packet.
-        ## @var remaining_packet
-        #  Object variable to store information about remaining packet.
-
-    ## Method defining L2 cross-connect test case.
-    #  Contains steps of the test case.
-    #  @param self The object pointer.
+                            " arrive" % (dst_sw_if_index, i.sw_if_index))
+
     def test_l2xc(self):
         """ L2XC test
 
@@ -217,27 +159,21 @@ class TestL2xc(VppTestCase):
             burst of packets per interface
         """
 
-        ## Create incoming packet streams for packet-generator interfaces
+        # Create incoming packet streams for packet-generator interfaces
         for i in self.interfaces:
-            pkts = self.create_stream(i)
-            self.pg_add_stream(i, pkts)
+            pkts = self.create_stream(i, self.pg_if_packet_sizes)
+            i.add_stream(pkts)
 
-        ## Enable packet capturing and start packet sending
-        self.pg_enable_capture(self.interfaces)
+        # Enable packet capturing and start packet sending
+        self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
-        ## Verify outgoing packet streams per packet-generator interface
-        for i in self.interfaces:
-            out = self.pg_get_capture(i)
-            self.log("Verifying capture %u" % i)
-            self.verify_capture(i, out)
-        ## @var pkts
-        #  List variable to store created input stream of packets for the packet
-        #  generator interface.
-        ## @var out
-        #  List variable to store captured output stream of packets for
-        #  the packet generator interface.
+        # Verify outgoing packet streams per packet-generator interface
+        for i in self.pg_interfaces:
+            capture = i.get_capture()
+            info("Verifying capture on interface %s" % i.name)
+            self.verify_capture(i, capture)
 
 
 if __name__ == '__main__':
-    unittest.main(testRunner = VppTestRunner)
+    unittest.main(testRunner=VppTestRunner)