add vpp debugging support to test framework
[vpp.git] / test / test_l2bd.py
1 #!/usr/bin/env python
2
3 import unittest
4 from logging import *
5 import random
6
7 from scapy.packet import Raw
8 from scapy.layers.l2 import Ether, Dot1Q
9 from scapy.layers.inet import IP, UDP
10
11 from framework import VppTestCase, VppTestRunner
12 from vpp_sub_interface import VppDot1QSubint
13 from util import TestHost
14
15
16 class TestL2bd(VppTestCase):
17     """ L2BD Test Case """
18
19     # Test variables
20     bd_id = 1                 # Bridge domain ID
21     mac_entries_count = 100   # Number of MAC entries for bridge-domain to learn
22     dot1q_sub_id = 100        # SubID of dot1q sub-interface
23     dot1q_tag = 100           # VLAN tag for dot1q sub-interface
24     dot1ad_sub_id = 200       # SubID of dot1ad sub-interface
25     dot1ad_outer_tag = 200    # VLAN S-tag for dot1ad sub-interface
26     dot1ad_inner_tag = 300    # VLAN C-tag for dot1ad sub-interface
27     pkts_per_burst = 257      # Number of packets per burst
28
29     @classmethod
30     def setUpClass(cls):
31         super(TestL2bd, cls).setUpClass()
32
33     def setUp(self):
34         super(TestL2bd, self).setUp()
35
36         # create 3 pg interfaces
37         self.create_pg_interfaces(range(3))
38
39         # create 2 sub-interfaces for pg1 and pg2
40         self.sub_interfaces = [
41             VppDot1QSubint(self, self.pg1, TestL2bd.dot1q_sub_id),
42             VppDot1QSubint(self, self.pg2, TestL2bd.dot1ad_sub_id)]
43
44         # packet flows mapping pg0 -> pg1, pg2, etc.
45         self.flows = dict()
46         self.flows[self.pg0] = [self.pg1, self.pg2]
47         self.flows[self.pg1] = [self.pg0, self.pg2]
48         self.flows[self.pg2] = [self.pg0, self.pg1]
49
50         # packet sizes
51         self.pg_if_packet_sizes = [64, 512, 1518, 9018]
52         self.sub_if_packet_sizes = [64, 512, 1518 + 4, 9018 + 4]
53
54         self.interfaces = list(self.pg_interfaces)
55         self.interfaces.extend(self.sub_interfaces)
56
57         # Create BD with MAC learning enabled and put interfaces and
58         #  sub-interfaces to this BD
59         for pg_if in self.pg_interfaces:
60             sw_if_index = pg_if.sub_if.sw_if_index if hasattr(pg_if, 'sub_if') \
61                 else pg_if.sw_if_index
62             self.vapi.sw_interface_set_l2_bridge(sw_if_index,
63                                                  bd_id=TestL2bd.bd_id)
64
65         # setup all interfaces
66         for i in self.interfaces:
67             i.admin_up()
68
69         # mapping between packet-generator index and lists of test hosts
70         self.hosts_by_pg_idx = dict()
71
72         # create test host entries and inject packets to learn MAC entries in
73         # the bridge-domain
74         self.create_hosts_and_learn(TestL2bd.mac_entries_count)
75         info(self.vapi.cli("show l2fib"))
76
77     def tearDown(self):
78         super(TestL2bd, self).tearDown()
79         if not self.vpp_dead:
80             info(self.vapi.cli("show l2fib verbose"))
81             info(self.vapi.cli("show bridge-domain %s detail" % self.bd_id))
82
83     def create_hosts_and_learn(self, count):
84         """
85         Create required number of host MAC addresses and distribute them among
86         interfaces. Create host IPv4 address for every host MAC address. Create
87         L2 MAC packet stream with host MAC addresses per interface to let
88         the bridge domain learn these MAC addresses.
89
90         :param count: Integer number of hosts to create MAC/IPv4 addresses for.
91         """
92         n_int = len(self.pg_interfaces)
93         macs_per_if = count / n_int
94         i = -1
95         for pg_if in self.pg_interfaces:
96             i += 1
97             start_nr = macs_per_if * i
98             end_nr = count if i == (n_int - 1) else macs_per_if * (i + 1)
99             self.hosts_by_pg_idx[pg_if.sw_if_index] = []
100             hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
101             packets = []
102             for j in range(start_nr, end_nr):
103                 host = TestHost(
104                     "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
105                     "172.17.1%02x.%u" % (pg_if.sw_if_index, j))
106                 packet = (Ether(dst="ff:ff:ff:ff:ff:ff", src=host.mac))
107                 hosts.append(host)
108                 if hasattr(pg_if, 'sub_if'):
109                     packet = pg_if.sub_if.add_dot1_layer(packet)
110                 packets.append(packet)
111             pg_if.add_stream(packets)
112         info("Sending broadcast eth frames for MAC learning")
113         self.pg_start()
114
115     def create_stream(self, src_if, packet_sizes):
116         pkts = []
117         for i in range(0, TestL2bd.pkts_per_burst):
118             dst_if = self.flows[src_if][i % 2]
119             dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index])
120             src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index])
121             pkt_info = self.create_packet_info(
122                 src_if.sw_if_index, dst_if.sw_if_index)
123             payload = self.info_to_payload(pkt_info)
124             p = (Ether(dst=dst_host.mac, src=src_host.mac) /
125                  IP(src=src_host.ip4, dst=dst_host.ip4) /
126                  UDP(sport=1234, dport=1234) /
127                  Raw(payload))
128             pkt_info.data = p.copy()
129             if hasattr(src_if, 'sub_if'):
130                 p = src_if.sub_if.add_dot1_layer(p)
131             size = packet_sizes[(i / 2) % len(packet_sizes)]
132             self.extend_packet(p, size)
133             pkts.append(p)
134         return pkts
135
136     def verify_capture(self, pg_if, capture):
137         last_info = dict()
138         for i in self.pg_interfaces:
139             last_info[i.sw_if_index] = None
140         dst_sw_if_index = pg_if.sw_if_index
141         for packet in capture:
142             payload_info = self.payload_to_info(str(packet[Raw]))
143             src_sw_if_index = payload_info.src
144             src_if = None
145             for ifc in self.pg_interfaces:
146                 if ifc != pg_if:
147                     if ifc.sw_if_index == src_sw_if_index:
148                         src_if = ifc
149                         break
150             if hasattr(src_if, 'sub_if'):
151                 # Check VLAN tags and Ethernet header
152                 packet = src_if.sub_if.remove_dot1_layer(packet)
153             self.assertTrue(Dot1Q not in packet)
154             try:
155                 ip = packet[IP]
156                 udp = packet[UDP]
157                 packet_index = payload_info.index
158                 self.assertEqual(payload_info.dst, dst_sw_if_index)
159                 debug("Got packet on port %s: src=%u (id=%u)" %
160                       (pg_if.name, payload_info.src, packet_index))
161                 next_info = self.get_next_packet_info_for_interface2(
162                     payload_info.src, dst_sw_if_index,
163                     last_info[payload_info.src])
164                 last_info[payload_info.src] = next_info
165                 self.assertTrue(next_info is not None)
166                 self.assertEqual(packet_index, next_info.index)
167                 saved_packet = next_info.data
168                 # Check standard fields
169                 self.assertEqual(ip.src, saved_packet[IP].src)
170                 self.assertEqual(ip.dst, saved_packet[IP].dst)
171                 self.assertEqual(udp.sport, saved_packet[UDP].sport)
172                 self.assertEqual(udp.dport, saved_packet[UDP].dport)
173             except:
174                 error("Unexpected or invalid packet:")
175                 error(packet.show())
176                 raise
177         for i in self.pg_interfaces:
178             remaining_packet = self.get_next_packet_info_for_interface2(
179                 i, dst_sw_if_index, last_info[i.sw_if_index])
180             self.assertTrue(
181                 remaining_packet is None,
182                 "Port %u: Packet expected from source %u didn't arrive" %
183                 (dst_sw_if_index, i.sw_if_index))
184
185     def test_l2bd(self):
186         """ L2BD MAC learning test
187
188         1.config
189             MAC learning enabled
190             learn 100 MAC enries
191             3 interfaces: untagged, dot1q, dot1ad (dot1q used instead of dot1ad
192             in the first version)
193
194         2.sending l2 eth pkts between 3 interface
195             64B, 512B, 1518B, 9200B (ether_size)
196             burst of 257 pkts per interface
197         """
198
199         # Create incoming packet streams for packet-generator interfaces
200         for i in self.pg_interfaces:
201             packet_sizes = self.sub_if_packet_sizes if hasattr(i, 'sub_if') \
202                 else self.pg_if_packet_sizes
203             pkts = self.create_stream(i, packet_sizes)
204             i.add_stream(pkts)
205
206         # Enable packet capture and start packet sending
207         self.pg_enable_capture(self.pg_interfaces)
208         self.pg_start()
209
210         # Verify outgoing packet streams per packet-generator interface
211         for i in self.pg_interfaces:
212             capture = i.get_capture()
213             info("Verifying capture on interface %s" % i.name)
214             self.verify_capture(i, capture)
215
216
217 if __name__ == '__main__':
218     unittest.main(testRunner=VppTestRunner)