TEST:add L2BD arp term tests
[vpp.git] / test / test_l2bd_arp_term.py
1 #!/usr/bin/env python
2 """ L2BD ARP term Test """
3
4 import unittest
5 import random
6 import copy
7
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether, ARP
10 from scapy.layers.inet import IP
11
12 from framework import VppTestCase, VppTestRunner
13 from util import Host, ppp, mactobinary
14
15
16 class TestL2bdArpTerm(VppTestCase):
17     """ L2BD arp termination Test Case """
18
19     @classmethod
20     def setUpClass(cls):
21         """
22         Perform standard class setup (defined by class method setUpClass in
23         class VppTestCase) before running the test case, set test case related
24         variables and configure VPP.
25         """
26         super(TestL2bdArpTerm, cls).setUpClass()
27
28         try:
29             # Create pg interfaces
30             n_bd = 1
31             cls.ifs_per_bd = ifs_per_bd = 3
32             n_ifs = n_bd * ifs_per_bd
33             cls.create_pg_interfaces(range(n_ifs))
34
35             # Set up all interfaces
36             for i in cls.pg_interfaces:
37                 i.admin_up()
38
39             cls.hosts = set()
40
41         except Exception:
42             super(TestL2bdArpTerm, cls).tearDownClass()
43             raise
44
45     def setUp(self):
46         """
47         Clear trace and packet infos before running each test.
48         """
49         self.reset_packet_infos()
50         super(TestL2bdArpTerm, self).setUp()
51
52     def tearDown(self):
53         """
54         Show various debug prints after each test.
55         """
56         super(TestL2bdArpTerm, self).tearDown()
57         if not self.vpp_dead:
58             self.logger.info(self.vapi.ppcli("show l2fib verbose"))
59             self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
60
61     def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1):
62         for e in entries:
63             self.vapi.bd_ip_mac_add_del(bd_id=bd_id,
64                                         mac=e.bin_mac,
65                                         ip=e.ip4n,
66                                         is_ipv6=0,
67                                         is_add=is_add)
68
69     @classmethod
70     def mac_list(cls, b6_range):
71         return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
72
73     @classmethod
74     def ip4_host(cls, subnet, host, mac):
75         return Host(mac=mac,
76                     ip4="172.17.1%02u.%u" % (subnet, host))
77
78     @classmethod
79     def ip4_hosts(cls, subnet, start, mac_list):
80         return {cls.ip4_host(subnet, start + j, mac_list[j])
81                 for j in range(len(mac_list))}
82
83     @classmethod
84     def bd_swifs(cls, b):
85         n = cls.ifs_per_bd
86         start = (b - 1) * n
87         return [cls.pg_interfaces[j] for j in range(start, start + n)]
88
89     def bd_add_del(self, bd_id=1, is_add=1):
90         if is_add:
91             self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
92         for swif in self.bd_swifs(bd_id):
93             swif_idx = swif.sw_if_index
94             self.vapi.sw_interface_set_l2_bridge(
95                 swif_idx, bd_id=bd_id, enable=is_add)
96         if not is_add:
97             self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
98
99     @classmethod
100     def arp(cls, src_host, host):
101         return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
102                 ARP(op="who-has",
103                     hwsrc=src_host.bin_mac,
104                     pdst=host.ip4,
105                     psrc=src_host.ip4))
106
107     @classmethod
108     def arp_reqs(cls, src_host, entries):
109         return [cls.arp(src_host, e) for e in entries]
110
111     def response_host(self, src_host, arp_resp):
112         ether = arp_resp[Ether]
113         self.assertEqual(ether.dst, src_host.mac)
114
115         arp = arp_resp[ARP]
116         self.assertEqual(arp.hwtype, 1)
117         self.assertEqual(arp.ptype, 0x800)
118         self.assertEqual(arp.hwlen, 6)
119         self.assertEqual(arp.plen, 4)
120         arp_opts = {"who-has": 1, "is-at": 2}
121         self.assertEqual(arp.op, arp_opts["is-at"])
122         self.assertEqual(arp.hwdst, src_host.mac)
123         self.assertEqual(arp.pdst, src_host.ip4)
124         return Host(arp.hwsrc, arp.psrc)
125
126     def arp_resp_hosts(self, src_host, pkts):
127         return {self.response_host(src_host, p) for p in pkts}
128
129     def set_bd_flags(self, bd_id, **args):
130         """
131         Enable/disable defined feature(s) of the bridge domain.
132
133         :param int bd_id: Bridge domain ID.
134         :param list args: List of feature/status pairs. Allowed features: \
135         learn, forward, flood, uu_flood and arp_term. Status False means \
136         disable, status True means enable the feature.
137         :raise: ValueError in case of unknown feature in the input.
138         """
139         for flag in args:
140             if flag == "learn":
141                 feature_bitmap = 1 << 0
142             elif flag == "forward":
143                 feature_bitmap = 1 << 1
144             elif flag == "flood":
145                 feature_bitmap = 1 << 2
146             elif flag == "uu_flood":
147                 feature_bitmap = 1 << 3
148             elif flag == "arp_term":
149                 feature_bitmap = 1 << 4
150             else:
151                 raise ValueError("Unknown feature used: %s" % flag)
152             is_set = 1 if args[flag] else 0
153             self.vapi.bridge_flags(bd_id, is_set, feature_bitmap)
154         self.logger.info("Bridge domain ID %d updated" % bd_id)
155
156     def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
157         reqs = self.arp_reqs(src_host, req_hosts)
158
159         for swif in self.bd_swifs(bd_id):
160             swif.add_stream(reqs)
161
162         self.pg_enable_capture(self.pg_interfaces)
163         self.pg_start()
164
165         for swif in self.bd_swifs(bd_id):
166             resp_pkts = swif.get_capture(len(resp_hosts))
167             resps = self.arp_resp_hosts(src_host, resp_pkts)
168             self.assertEqual(len(resps ^ resp_hosts), 0)
169
170     def test_l2bd_arp_term_01(self):
171         """ L2BD arp term - add 5 hosts, verify arp responses
172         """
173         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
174         self.bd_add_del(1, is_add=1)
175         self.set_bd_flags(1, arp_term=True, flood=False,
176                           uu_flood=False, learn=False)
177         macs = self.mac_list(range(1, 5))
178         hosts = self.ip4_hosts(4, 1, macs)
179         self.add_del_arp_term_hosts(hosts, is_add=1)
180         self.verify_arp(src_host, hosts, hosts)
181         type(self).hosts = hosts
182
183     def test_l2bd_arp_term_02(self):
184         """ L2BD arp term - delete 3 hosts, verify arp responses
185         """
186         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
187         macs = self.mac_list(range(1, 3))
188         deleted = self.ip4_hosts(4, 1, macs)
189         self.add_del_arp_term_hosts(deleted, is_add=0)
190         remaining = self.hosts - deleted
191         self.verify_arp(src_host, self.hosts, remaining)
192         type(self).hosts = remaining
193         self.bd_add_del(1, is_add=0)
194
195     def test_l2bd_arp_term_03(self):
196         """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses
197         """
198         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
199         self.bd_add_del(1, is_add=1)
200         self.set_bd_flags(1, arp_term=True, flood=False,
201                           uu_flood=False, learn=False)
202         macs = self.mac_list(range(1, 3))
203         readded = self.ip4_hosts(4, 1, macs)
204         self.add_del_arp_term_hosts(readded, is_add=1)
205         self.verify_arp(src_host, self.hosts | readded, readded)
206         type(self).hosts = readded
207
208     def test_l2bd_arp_term_04(self):
209         """ L2BD arp term - 2 IP4 addrs per host
210         """
211         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
212         macs = self.mac_list(range(1, 3))
213         sub5_hosts = self.ip4_hosts(5, 1, macs)
214         self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
215         hosts = self.hosts | sub5_hosts
216         self.verify_arp(src_host, hosts, hosts)
217         type(self).hosts = hosts
218         self.bd_add_del(1, is_add=0)
219
220     def test_l2bd_arp_term_05(self):
221         """ L2BD arp term - create and update 10 IP4-mac pairs
222         """
223         src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
224         self.bd_add_del(1, is_add=1)
225         self.set_bd_flags(1, arp_term=True, flood=False,
226                           uu_flood=False, learn=False)
227         macs1 = self.mac_list(range(10, 20))
228         hosts1 = self.ip4_hosts(5, 1, macs1)
229         self.add_del_arp_term_hosts(hosts1, is_add=1)
230         self.verify_arp(src_host, hosts1, hosts1)
231         macs2 = self.mac_list(range(20, 30))
232         hosts2 = self.ip4_hosts(5, 1, macs2)
233         self.add_del_arp_term_hosts(hosts2, is_add=1)
234         self.verify_arp(src_host, hosts1, hosts2)
235         self.bd_add_del(1, is_add=0)
236
237
238 if __name__ == '__main__':
239     unittest.main(testRunner=VppTestRunner)