X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_neighbor.py;h=9d91f031571ba866b1c648b346811cac9f561403;hb=404d88edce63a635b58ba54d21d91735ff933584;hp=1c7cc2678ff049f2687128228957c4ade17c0a96;hpb=24b170aac55d16cb3ff934d2f3d7983dc11cbe12;p=vpp.git diff --git a/test/test_neighbor.py b/test/test_neighbor.py index 1c7cc2678ff..9d91f031571 100644 --- a/test/test_neighbor.py +++ b/test/test_neighbor.py @@ -5,7 +5,8 @@ from socket import AF_INET, AF_INET6, inet_pton from framework import VppTestCase, VppTestRunner from vpp_neighbor import VppNeighbor, find_nbr -from vpp_ip_route import VppIpRoute, VppRoutePath, find_route +from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, \ + VppIpTable from scapy.packet import Raw from scapy.layers.l2 import Ether, ARP, Dot1Q @@ -39,11 +40,13 @@ class ARPTestCase(VppTestCase): self.pg1.config_ip6() # pg3 in a different VRF + self.tbl = VppIpTable(self, 1) + self.tbl.add_vpp_config() + self.pg3.set_table_ip4(1) self.pg3.config_ip4() def tearDown(self): - super(ARPTestCase, self).tearDown() self.pg0.unconfig_ip4() self.pg0.unconfig_ip6() @@ -51,10 +54,13 @@ class ARPTestCase(VppTestCase): self.pg1.unconfig_ip6() self.pg3.unconfig_ip4() + self.pg3.set_table_ip4(0) for i in self.pg_interfaces: i.admin_down() + super(ARPTestCase, self).tearDown() + def verify_arp_req(self, rx, smac, sip, dip): ether = rx[Ether] self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff") @@ -126,16 +132,6 @@ class ARPTestCase(VppTestCase): self.assertEqual(ip.src, sip) self.assertEqual(ip.dst, dip) - def send_and_assert_no_replies(self, intf, pkts, remark): - intf.add_stream(pkts) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - timeout = 1 - for i in self.pg_interfaces: - i.get_capture(0, timeout=timeout) - i.assert_nothing_captured(remark=remark) - timeout = 0.1 - def test_arp(self): """ ARP """ @@ -300,6 +296,10 @@ class ARPTestCase(VppTestCase): # self.pg2.set_unnumbered(self.pg1.sw_if_index) + unnum = self.vapi.ip_unnumbered_dump() + self.assertEqual(unnum[0].ip_sw_if_index, self.pg1.sw_if_index) + self.assertEqual(unnum[0].sw_if_index, self.pg2.sw_if_index) + # # We should respond to ARP requests for the unnumbered to address # once an attached route to the source is known @@ -635,7 +635,6 @@ class ARPTestCase(VppTestCase): # # 4 - don't respond to ARP requests that has mac source different # from ARP request HW source - # the router # p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP(op="who-has", @@ -645,6 +644,19 @@ class ARPTestCase(VppTestCase): self.send_and_assert_no_replies(self.pg0, p, "ARP req for non-local source") + # + # 5 - don't respond to ARP requests for address within the + # interface's sub-net but not the interface's address + # + self.pg0.generate_remote_hosts(2) + p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / + ARP(op="who-has", + hwsrc=self.pg0.remote_mac, + psrc=self.pg0.remote_hosts[0].ip4, + pdst=self.pg0.remote_hosts[1].ip4)) + self.send_and_assert_no_replies(self.pg0, p, + "ARP req for non-local destination") + # # cleanup # @@ -1080,6 +1092,230 @@ class ARPTestCase(VppTestCase): self.pg0.remote_ip4, self.pg1.remote_hosts[1].ip4) + def test_arp_static(self): + """ ARP Static""" + self.pg2.generate_remote_hosts(3) + + # + # Add a static ARP entry + # + static_arp = VppNeighbor(self, + self.pg2.sw_if_index, + self.pg2.remote_hosts[1].mac, + self.pg2.remote_hosts[1].ip4, + is_static=1) + static_arp.add_vpp_config() + + # + # Add the connected prefix to the interface + # + self.pg2.config_ip4() + + # + # We should now find the adj-fib + # + self.assertTrue(find_nbr(self, + self.pg2.sw_if_index, + self.pg2.remote_hosts[1].ip4, + is_static=1)) + self.assertTrue(find_route(self, + self.pg2.remote_hosts[1].ip4, + 32)) + + # + # remove the connected + # + self.pg2.unconfig_ip4() + + # + # put the interface into table 1 + # + self.pg2.set_table_ip4(1) + + # + # configure the same connected and expect to find the + # adj fib in the new table + # + self.pg2.config_ip4() + self.assertTrue(find_route(self, + self.pg2.remote_hosts[1].ip4, + 32, + table_id=1)) + + # + # clean-up + # + self.pg2.unconfig_ip4() + self.pg2.set_table_ip4(0) + + def test_arp_incomplete(self): + """ ARP Incomplete""" + self.pg1.generate_remote_hosts(3) + + p0 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, + dst=self.pg1.remote_hosts[1].ip4) / + UDP(sport=1234, dport=1234) / + Raw()) + p1 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, + dst=self.pg1.remote_hosts[2].ip4) / + UDP(sport=1234, dport=1234) / + Raw()) + + # + # a packet to an unresolved destination generates an ARP request + # + rx = self.send_and_expect(self.pg0, [p0], self.pg1) + self.verify_arp_req(rx[0], + self.pg1.local_mac, + self.pg1.local_ip4, + self.pg1._remote_hosts[1].ip4) + + # + # add a neighbour for remote host 1 + # + static_arp = VppNeighbor(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[1].mac, + self.pg1.remote_hosts[1].ip4, + is_static=1) + static_arp.add_vpp_config() + + # + # change the interface's MAC + # + mac = [chr(0x00), chr(0x00), chr(0x00), + chr(0x33), chr(0x33), chr(0x33)] + mac_string = ''.join(mac) + + self.vapi.sw_interface_set_mac_address(self.pg1.sw_if_index, + mac_string) + + # + # now ARP requests come from the new source mac + # + rx = self.send_and_expect(self.pg0, [p1], self.pg1) + self.verify_arp_req(rx[0], + "00:00:00:33:33:33", + self.pg1.local_ip4, + self.pg1._remote_hosts[2].ip4) + + # + # packets to the resolved host also have the new source mac + # + rx = self.send_and_expect(self.pg0, [p0], self.pg1) + self.verify_ip(rx[0], + "00:00:00:33:33:33", + self.pg1.remote_hosts[1].mac, + self.pg0.remote_ip4, + self.pg1.remote_hosts[1].ip4) + + # + # set the mac address on the inteface that does not have a + # configured subnet and thus no glean + # + self.vapi.sw_interface_set_mac_address(self.pg2.sw_if_index, + mac_string) + + def test_garp(self): + """ GARP """ + + # + # Generate some hosts on the LAN + # + self.pg1.generate_remote_hosts(4) + + # + # And an ARP entry + # + arp = VppNeighbor(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[1].mac, + self.pg1.remote_hosts[1].ip4) + arp.add_vpp_config() + + self.assertTrue(find_nbr(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[1].ip4, + mac=self.pg1.remote_hosts[1].mac)) + + # + # Send a GARP (request) to swap the host 1's address to that of host 2 + # + p1 = (Ether(dst="ff:ff:ff:ff:ff:ff", + src=self.pg1.remote_hosts[2].mac) / + ARP(op="who-has", + hwdst=self.pg1.local_mac, + hwsrc=self.pg1.remote_hosts[2].mac, + pdst=self.pg1.remote_hosts[1].ip4, + psrc=self.pg1.remote_hosts[1].ip4)) + + self.pg1.add_stream(p1) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + self.assertTrue(find_nbr(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[1].ip4, + mac=self.pg1.remote_hosts[2].mac)) + + # + # Send a GARP (reply) to swap the host 1's address to that of host 3 + # + p1 = (Ether(dst="ff:ff:ff:ff:ff:ff", + src=self.pg1.remote_hosts[3].mac) / + ARP(op="is-at", + hwdst=self.pg1.local_mac, + hwsrc=self.pg1.remote_hosts[3].mac, + pdst=self.pg1.remote_hosts[1].ip4, + psrc=self.pg1.remote_hosts[1].ip4)) + + self.pg1.add_stream(p1) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + self.assertTrue(find_nbr(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[1].ip4, + mac=self.pg1.remote_hosts[3].mac)) + + # + # GARPs (requets nor replies) for host we don't know yet + # don't result in new neighbour entries + # + p1 = (Ether(dst="ff:ff:ff:ff:ff:ff", + src=self.pg1.remote_hosts[3].mac) / + ARP(op="who-has", + hwdst=self.pg1.local_mac, + hwsrc=self.pg1.remote_hosts[3].mac, + pdst=self.pg1.remote_hosts[2].ip4, + psrc=self.pg1.remote_hosts[2].ip4)) + + self.pg1.add_stream(p1) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + self.assertFalse(find_nbr(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[2].ip4)) + + p1 = (Ether(dst="ff:ff:ff:ff:ff:ff", + src=self.pg1.remote_hosts[3].mac) / + ARP(op="is-at", + hwdst=self.pg1.local_mac, + hwsrc=self.pg1.remote_hosts[3].mac, + pdst=self.pg1.remote_hosts[2].ip4, + psrc=self.pg1.remote_hosts[2].ip4)) + + self.pg1.add_stream(p1) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + self.assertFalse(find_nbr(self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[2].ip4)) + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)