2 """ L2BD ARP term Test """
8 from scapy.packet import Raw
9 from scapy.layers.l2 import Ether, ARP
10 from scapy.layers.inet import IP
12 from framework import VppTestCase, VppTestRunner
13 from util import Host, ppp, mactobinary
16 class TestL2bdArpTerm(VppTestCase):
17 """ L2BD arp termination Test Case """
22 Perform standard class setup (defined by class method setUpClass in
23 class VppTestCase) before running the test case, set test case related
24 variables and configure VPP.
26 super(TestL2bdArpTerm, cls).setUpClass()
29 # Create pg interfaces
31 cls.ifs_per_bd = ifs_per_bd = 3
32 n_ifs = n_bd * ifs_per_bd
33 cls.create_pg_interfaces(range(n_ifs))
35 # Set up all interfaces
36 for i in cls.pg_interfaces:
42 super(TestL2bdArpTerm, cls).tearDownClass()
47 Clear trace and packet infos before running each test.
49 self.reset_packet_infos()
50 super(TestL2bdArpTerm, self).setUp()
54 Show various debug prints after each test.
56 super(TestL2bdArpTerm, self).tearDown()
58 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
59 self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
61 def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1):
63 self.vapi.bd_ip_mac_add_del(bd_id=bd_id,
70 def mac_list(cls, b6_range):
71 return ["00:00:ca:fe:00:%02x" % b6 for b6 in b6_range]
74 def ip4_host(cls, subnet, host, mac):
76 ip4="172.17.1%02u.%u" % (subnet, host))
79 def ip4_hosts(cls, subnet, start, mac_list):
80 return {cls.ip4_host(subnet, start + j, mac_list[j])
81 for j in range(len(mac_list))}
87 return [cls.pg_interfaces[j] for j in range(start, start + n)]
89 def bd_add_del(self, bd_id=1, is_add=1):
91 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
92 for swif in self.bd_swifs(bd_id):
93 swif_idx = swif.sw_if_index
94 self.vapi.sw_interface_set_l2_bridge(
95 swif_idx, bd_id=bd_id, enable=is_add)
97 self.vapi.bridge_domain_add_del(bd_id=bd_id, is_add=is_add)
100 def arp(cls, src_host, host):
101 return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
103 hwsrc=src_host.bin_mac,
108 def arp_reqs(cls, src_host, entries):
109 return [cls.arp(src_host, e) for e in entries]
111 def response_host(self, src_host, arp_resp):
112 ether = arp_resp[Ether]
113 self.assertEqual(ether.dst, src_host.mac)
116 self.assertEqual(arp.hwtype, 1)
117 self.assertEqual(arp.ptype, 0x800)
118 self.assertEqual(arp.hwlen, 6)
119 self.assertEqual(arp.plen, 4)
120 arp_opts = {"who-has": 1, "is-at": 2}
121 self.assertEqual(arp.op, arp_opts["is-at"])
122 self.assertEqual(arp.hwdst, src_host.mac)
123 self.assertEqual(arp.pdst, src_host.ip4)
124 return Host(arp.hwsrc, arp.psrc)
126 def arp_resp_hosts(self, src_host, pkts):
127 return {self.response_host(src_host, p) for p in pkts}
129 def set_bd_flags(self, bd_id, **args):
131 Enable/disable defined feature(s) of the bridge domain.
133 :param int bd_id: Bridge domain ID.
134 :param list args: List of feature/status pairs. Allowed features: \
135 learn, forward, flood, uu_flood and arp_term. Status False means \
136 disable, status True means enable the feature.
137 :raise: ValueError in case of unknown feature in the input.
141 feature_bitmap = 1 << 0
142 elif flag == "forward":
143 feature_bitmap = 1 << 1
144 elif flag == "flood":
145 feature_bitmap = 1 << 2
146 elif flag == "uu_flood":
147 feature_bitmap = 1 << 3
148 elif flag == "arp_term":
149 feature_bitmap = 1 << 4
151 raise ValueError("Unknown feature used: %s" % flag)
152 is_set = 1 if args[flag] else 0
153 self.vapi.bridge_flags(bd_id, is_set, feature_bitmap)
154 self.logger.info("Bridge domain ID %d updated" % bd_id)
156 def verify_arp(self, src_host, req_hosts, resp_hosts, bd_id=1):
157 reqs = self.arp_reqs(src_host, req_hosts)
159 for swif in self.bd_swifs(bd_id):
160 swif.add_stream(reqs)
162 self.pg_enable_capture(self.pg_interfaces)
165 for swif in self.bd_swifs(bd_id):
166 resp_pkts = swif.get_capture(len(resp_hosts))
167 resps = self.arp_resp_hosts(src_host, resp_pkts)
168 self.assertEqual(len(resps ^ resp_hosts), 0)
170 def test_l2bd_arp_term_01(self):
171 """ L2BD arp term - add 5 hosts, verify arp responses
173 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
174 self.bd_add_del(1, is_add=1)
175 self.set_bd_flags(1, arp_term=True, flood=False,
176 uu_flood=False, learn=False)
177 macs = self.mac_list(range(1, 5))
178 hosts = self.ip4_hosts(4, 1, macs)
179 self.add_del_arp_term_hosts(hosts, is_add=1)
180 self.verify_arp(src_host, hosts, hosts)
181 type(self).hosts = hosts
183 def test_l2bd_arp_term_02(self):
184 """ L2BD arp term - delete 3 hosts, verify arp responses
186 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
187 macs = self.mac_list(range(1, 3))
188 deleted = self.ip4_hosts(4, 1, macs)
189 self.add_del_arp_term_hosts(deleted, is_add=0)
190 remaining = self.hosts - deleted
191 self.verify_arp(src_host, self.hosts, remaining)
192 type(self).hosts = remaining
193 self.bd_add_del(1, is_add=0)
195 def test_l2bd_arp_term_03(self):
196 """ L2BD arp term - recreate BD1, readd 3 hosts, verify arp responses
198 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
199 self.bd_add_del(1, is_add=1)
200 self.set_bd_flags(1, arp_term=True, flood=False,
201 uu_flood=False, learn=False)
202 macs = self.mac_list(range(1, 3))
203 readded = self.ip4_hosts(4, 1, macs)
204 self.add_del_arp_term_hosts(readded, is_add=1)
205 self.verify_arp(src_host, self.hosts | readded, readded)
206 type(self).hosts = readded
208 def test_l2bd_arp_term_04(self):
209 """ L2BD arp term - 2 IP4 addrs per host
211 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
212 macs = self.mac_list(range(1, 3))
213 sub5_hosts = self.ip4_hosts(5, 1, macs)
214 self.add_del_arp_term_hosts(sub5_hosts, is_add=1)
215 hosts = self.hosts | sub5_hosts
216 self.verify_arp(src_host, hosts, hosts)
217 type(self).hosts = hosts
218 self.bd_add_del(1, is_add=0)
220 def test_l2bd_arp_term_05(self):
221 """ L2BD arp term - create and update 10 IP4-mac pairs
223 src_host = self.ip4_host(50, 50, "00:00:11:22:33:44")
224 self.bd_add_del(1, is_add=1)
225 self.set_bd_flags(1, arp_term=True, flood=False,
226 uu_flood=False, learn=False)
227 macs1 = self.mac_list(range(10, 20))
228 hosts1 = self.ip4_hosts(5, 1, macs1)
229 self.add_del_arp_term_hosts(hosts1, is_add=1)
230 self.verify_arp(src_host, hosts1, hosts1)
231 macs2 = self.mac_list(range(20, 30))
232 hosts2 = self.ip4_hosts(5, 1, macs2)
233 self.add_del_arp_term_hosts(hosts2, is_add=1)
234 self.verify_arp(src_host, hosts1, hosts2)
235 self.bd_add_del(1, is_add=0)
238 if __name__ == '__main__':
239 unittest.main(testRunner=VppTestRunner)