refactor test framework
[vpp.git] / test / test_l2xc.py
1 #!/usr/bin/env python
2
3 import unittest
4 import random
5
6 from scapy.packet import Raw
7 from scapy.layers.l2 import Ether
8 from scapy.layers.inet import IP, UDP
9 from logging import *
10
11 from framework import VppTestCase, VppTestRunner
12 from util import TestHost
13
14
15 class TestL2xc(VppTestCase):
16     """ L2XC Test Case """
17
18     # Test variables
19     hosts_nr = 10           # Number of hosts
20     pkts_per_burst = 257    # Number of packets per burst
21
22     @classmethod
23     def setUpClass(cls):
24         super(TestL2xc, cls).setUpClass()
25
26     def setUp(self):
27         super(TestL2xc, self).setUp()
28
29         # create 4 pg interfaces
30         self.create_pg_interfaces(range(4))
31
32         # packet flows mapping pg0 -> pg1, pg2 -> pg3, etc.
33         self.flows = dict()
34         self.flows[self.pg0] = [self.pg1]
35         self.flows[self.pg1] = [self.pg0]
36         self.flows[self.pg2] = [self.pg3]
37         self.flows[self.pg3] = [self.pg2]
38
39         # packet sizes
40         self.pg_if_packet_sizes = [64, 512, 1518, 9018]
41
42         self.interfaces = list(self.pg_interfaces)
43
44         # Create bi-directional cross-connects between pg0 and pg1
45         self.vapi.sw_interface_set_l2_xconnect(
46             self.pg0.sw_if_index, self.pg1.sw_if_index, enable=1)
47         self.vapi.sw_interface_set_l2_xconnect(
48             self.pg1.sw_if_index, self.pg0.sw_if_index, enable=1)
49
50         # Create bi-directional cross-connects between pg2 and pg3
51         self.vapi.sw_interface_set_l2_xconnect(
52             self.pg2.sw_if_index, self.pg3.sw_if_index, enable=1)
53         self.vapi.sw_interface_set_l2_xconnect(
54             self.pg3.sw_if_index, self.pg2.sw_if_index, enable=1)
55
56         info(self.vapi.cli("show l2patch"))
57
58         # mapping between packet-generator index and lists of test hosts
59         self.hosts_by_pg_idx = dict()
60
61         # Create host MAC and IPv4 lists
62         # self.MY_MACS = dict()
63         # self.MY_IP4S = dict()
64         self.create_host_lists(TestL2xc.hosts_nr)
65
66         # setup all interfaces
67         for i in self.interfaces:
68             i.admin_up()
69
70     def tearDown(self):
71         super(TestL2xc, self).tearDown()
72         if not self.vpp_dead:
73             info(self.vapi.cli("show l2patch"))
74
75     def create_host_lists(self, count):
76         """ Method to create required number of MAC and IPv4 addresses.
77         Create required number of host MAC addresses and distribute them among
78         interfaces. Create host IPv4 address for every host MAC address too.
79
80         :param count: Number of hosts to create MAC and IPv4 addresses for.
81         Type: int
82         """
83         for pg_if in self.pg_interfaces:
84             # self.MY_MACS[i.sw_if_index] = []
85             # self.MY_IP4S[i.sw_if_index] = []
86             self.hosts_by_pg_idx[pg_if.sw_if_index] = []
87             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
88             for j in range(0, count):
89                 host = TestHost(
90                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
91                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j))
92                 hosts.append(host)
93
94     def create_stream(self, src_if, packet_sizes):
95         pkts = []
96         for i in range(0, TestL2xc.pkts_per_burst):
97             dst_if = self.flows[src_if][0]
98             dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index])
99             src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index])
100             pkt_info = self.create_packet_info(
101                 src_if.sw_if_index, dst_if.sw_if_index)
102             payload = self.info_to_payload(pkt_info)
103             p = (Ether(dst=dst_host.mac, src=src_host.mac) /
104                  IP(src=src_host.ip4, dst=dst_host.ip4) /
105                  UDP(sport=1234, dport=1234) /
106                  Raw(payload))
107             pkt_info.data = p.copy()
108             size = packet_sizes[(i / 2) % len(packet_sizes)]
109             self.extend_packet(p, size)
110             pkts.append(p)
111         return pkts
112
113     def verify_capture(self, pg_if, capture):
114         last_info = dict()
115         for i in self.interfaces:
116             last_info[i.sw_if_index] = None
117         dst_sw_if_index = pg_if.sw_if_index
118         for packet in capture:
119             try:
120                 ip = packet[IP]
121                 udp = packet[UDP]
122                 payload_info = self.payload_to_info(str(packet[Raw]))
123                 packet_index = payload_info.index
124                 self.assertEqual(payload_info.dst, dst_sw_if_index)
125                 debug("Got packet on port %s: src=%u (id=%u)" %
126                       (pg_if.name, payload_info.src, packet_index))
127                 next_info = self.get_next_packet_info_for_interface2(
128                     payload_info.src, dst_sw_if_index,
129                     last_info[payload_info.src])
130                 last_info[payload_info.src] = next_info
131                 self.assertTrue(next_info is not None)
132                 self.assertEqual(packet_index, next_info.index)
133                 saved_packet = next_info.data
134                 # Check standard fields
135                 self.assertEqual(ip.src, saved_packet[IP].src)
136                 self.assertEqual(ip.dst, saved_packet[IP].dst)
137                 self.assertEqual(udp.sport, saved_packet[UDP].sport)
138                 self.assertEqual(udp.dport, saved_packet[UDP].dport)
139             except:
140                 error("Unexpected or invalid packet:")
141                 packet.show()
142                 raise
143         for i in self.interfaces:
144             remaining_packet = self.get_next_packet_info_for_interface2(
145                 i, dst_sw_if_index, last_info[i.sw_if_index])
146             self.assertTrue(remaining_packet is None,
147                             "Port %u: Packet expected from source %u didn't"
148                             " arrive" % (dst_sw_if_index, i.sw_if_index))
149
150     def test_l2xc(self):
151         """ L2XC test
152
153         Test scenario:
154         1.config
155             2 pairs of 2 interfaces, l2xconnected
156
157         2.sending l2 eth packets between 4 interfaces
158             64B, 512B, 1518B, 9018B (ether_size)
159             burst of packets per interface
160         """
161
162         # Create incoming packet streams for packet-generator interfaces
163         for i in self.interfaces:
164             pkts = self.create_stream(i, self.pg_if_packet_sizes)
165             i.add_stream(pkts)
166
167         # Enable packet capturing and start packet sending
168         self.pg_enable_capture(self.pg_interfaces)
169         self.pg_start()
170
171         # Verify outgoing packet streams per packet-generator interface
172         for i in self.pg_interfaces:
173             capture = i.get_capture()
174             info("Verifying capture on interface %s" % i.name)
175             self.verify_capture(i, capture)
176
177
178 if __name__ == '__main__':
179     unittest.main(testRunner=VppTestRunner)