from framework import VppTestCase, VppTestRunner
from vpp_neighbor import VppNeighbor, find_nbr
+from vpp_ip_route import VppIpRoute, VppRoutePath, find_route
from scapy.packet import Raw
from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet import IP, UDP
+from scapy.contrib.mpls import MPLS
# not exported by scapy, so redefined here
arp_opts = {"who-has": 1, "is-at": 2}
self.pg3.set_table_ip4(1)
self.pg3.config_ip4()
+ def tearDown(self):
+ super(ARPTestCase, self).tearDown()
+ self.pg0.unconfig_ip4()
+ self.pg0.unconfig_ip6()
+
+ self.pg1.unconfig_ip4()
+ self.pg1.unconfig_ip6()
+
+ self.pg3.unconfig_ip4()
+
+ for i in self.pg_interfaces:
+ i.admin_down()
+
def verify_arp_req(self, rx, smac, sip, dip):
ether = rx[Ether]
self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
self.assertEqual(ip.src, sip)
self.assertEqual(ip.dst, dip)
+ def verify_ip_o_mpls(self, rx, smac, dmac, label, sip, dip):
+ ether = rx[Ether]
+ self.assertEqual(ether.dst, dmac)
+ self.assertEqual(ether.src, smac)
+
+ mpls = rx[MPLS]
+ self.assertTrue(mpls.label, label)
+
+ ip = rx[IP]
+ self.assertEqual(ip.src, sip)
+ self.assertEqual(ip.dst, dip)
+
def send_and_assert_no_replies(self, intf, pkts, remark):
intf.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
#
# Generate some hosts on the LAN
#
- self.pg1.generate_remote_hosts(4)
+ self.pg1.generate_remote_hosts(9)
#
# Send IP traffic to one of these unresolved hosts.
self.assertTrue(find_nbr(self,
self.pg1.sw_if_index,
self.pg1._remote_hosts[3].ip4))
+ #
+ # Fire in an ARP request before the interface becomes IP enabled
+ #
+ self.pg2.generate_remote_hosts(4)
+
+ p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) /
+ ARP(op="who-has",
+ hwsrc=self.pg2.remote_mac,
+ pdst=self.pg1.local_ip4,
+ psrc=self.pg2.remote_hosts[3].ip4))
+ self.send_and_assert_no_replies(self.pg2, p,
+ "interface not IP enabled")
+
+ #
+ # Make pg2 un-numbered to pg1
+ #
+ self.pg2.set_unnumbered(self.pg1.sw_if_index)
+
+ #
+ # We should respond to ARP requests for the unnumbered to address
+ # once an attached route to the source is known
+ #
+ self.send_and_assert_no_replies(
+ self.pg2, p,
+ "ARP req for unnumbered address - no source")
+
+ attached_host = VppIpRoute(self, self.pg2.remote_hosts[3].ip4, 32,
+ [VppRoutePath("0.0.0.0",
+ self.pg2.sw_if_index)])
+ attached_host.add_vpp_config()
+
+ self.pg2.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg2.get_capture(1)
+ self.verify_arp_resp(rx[0],
+ self.pg2.local_mac,
+ self.pg2.remote_mac,
+ self.pg1.local_ip4,
+ self.pg2.remote_hosts[3].ip4)
+
+ #
+ # A neighbor entry that has no associated FIB-entry
+ #
+ arp_no_fib = VppNeighbor(self,
+ self.pg1.sw_if_index,
+ self.pg1.remote_hosts[4].mac,
+ self.pg1.remote_hosts[4].ip4,
+ is_no_fib_entry=1)
+ arp_no_fib.add_vpp_config()
+
+ #
+ # check we have the neighbor, but no route
+ #
+ self.assertTrue(find_nbr(self,
+ self.pg1.sw_if_index,
+ self.pg1._remote_hosts[4].ip4))
+ self.assertFalse(find_route(self,
+ self.pg1._remote_hosts[4].ip4,
+ 32))
+ #
+ # pg2 is unnumbered to pg1, so we can form adjacencies out of pg2
+ # from within pg1's subnet
+ #
+ arp_unnum = VppNeighbor(self,
+ self.pg2.sw_if_index,
+ self.pg1.remote_hosts[5].mac,
+ self.pg1.remote_hosts[5].ip4)
+ arp_unnum.add_vpp_config()
+
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg1._remote_hosts[5].ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg2.get_capture(1)
+
+ self.verify_ip(rx[0],
+ self.pg2.local_mac,
+ self.pg1.remote_hosts[5].mac,
+ self.pg0.remote_ip4,
+ self.pg1._remote_hosts[5].ip4)
+
+ #
+ # ARP requests from hosts in pg1's subnet sent on pg2 are replied to
+ # with the unnumbered interface's address as the source
+ #
+ p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) /
+ ARP(op="who-has",
+ hwsrc=self.pg2.remote_mac,
+ pdst=self.pg1.local_ip4,
+ psrc=self.pg1.remote_hosts[6].ip4))
+
+ self.pg2.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg2.get_capture(1)
+ self.verify_arp_resp(rx[0],
+ self.pg2.local_mac,
+ self.pg2.remote_mac,
+ self.pg1.local_ip4,
+ self.pg1.remote_hosts[6].ip4)
+
+ #
+ # An attached host route out of pg2 for an undiscovered hosts generates
+ # an ARP request with the unnumbered address as the source
+ #
+ att_unnum = VppIpRoute(self, self.pg1.remote_hosts[7].ip4, 32,
+ [VppRoutePath("0.0.0.0",
+ self.pg2.sw_if_index)])
+ att_unnum.add_vpp_config()
+
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4,
+ dst=self.pg1._remote_hosts[7].ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg2.get_capture(1)
+
+ self.verify_arp_req(rx[0],
+ self.pg2.local_mac,
+ self.pg1.local_ip4,
+ self.pg1._remote_hosts[7].ip4)
+
+ p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) /
+ ARP(op="who-has",
+ hwsrc=self.pg2.remote_mac,
+ pdst=self.pg1.local_ip4,
+ psrc=self.pg1.remote_hosts[7].ip4))
+
+ self.pg2.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg2.get_capture(1)
+ self.verify_arp_resp(rx[0],
+ self.pg2.local_mac,
+ self.pg2.remote_mac,
+ self.pg1.local_ip4,
+ self.pg1.remote_hosts[7].ip4)
+
+ #
+ # An attached host route as yet unresolved out of pg2 for an
+ # undiscovered host, an ARP requests begets a response.
+ #
+ att_unnum1 = VppIpRoute(self, self.pg1.remote_hosts[8].ip4, 32,
+ [VppRoutePath("0.0.0.0",
+ self.pg2.sw_if_index)])
+ att_unnum1.add_vpp_config()
+
+ p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) /
+ ARP(op="who-has",
+ hwsrc=self.pg2.remote_mac,
+ pdst=self.pg1.local_ip4,
+ psrc=self.pg1.remote_hosts[8].ip4))
+
+ self.pg2.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg2.get_capture(1)
+ self.verify_arp_resp(rx[0],
+ self.pg2.local_mac,
+ self.pg2.remote_mac,
+ self.pg1.local_ip4,
+ self.pg1.remote_hosts[8].ip4)
#
# ERROR Cases
# 1 - don't respond to ARP request for address not within the
# interface's sub-net
- #
+ # 1a - nor within the unnumbered subnet
p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) /
ARP(op="who-has",
hwsrc=self.pg0.remote_mac,
psrc=self.pg0.remote_ip4))
self.send_and_assert_no_replies(self.pg0, p,
"ARP req for non-local destination")
+ p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) /
+ ARP(op="who-has",
+ hwsrc=self.pg2.remote_mac,
+ pdst="10.10.10.3",
+ psrc=self.pg1.remote_hosts[7].ip4))
+ self.send_and_assert_no_replies(
+ self.pg0, p,
+ "ARP req for non-local destination - unnum")
#
# 2 - don't respond to ARP request from an address not within the
pdst=self.pg0.local_ip4))
self.send_and_assert_no_replies(self.pg0, p,
"ARP req for non-local source")
+ p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) /
+ ARP(op="who-has",
+ hwsrc=self.pg2.remote_mac,
+ psrc="10.10.10.3",
+ pdst=self.pg0.local_ip4))
+ self.send_and_assert_no_replies(
+ self.pg0, p,
+ "ARP req for non-local source - unnum")
#
# 3 - don't respond to ARP request from an address that belongs to
#
dyn_arp.remove_vpp_config()
static_arp.remove_vpp_config()
+ self.pg2.unset_unnumbered(self.pg1.sw_if_index)
+
+ # need this to flush the adj-fibs
+ self.pg2.unset_unnumbered(self.pg1.sw_if_index)
+ self.pg2.admin_down()
def test_proxy_arp(self):
""" Proxy ARP """
"ARP req from disable")
self.send_and_assert_no_replies(self.pg2, arp_req_pg2,
"ARP req from disable")
+
+ #
+ # clean up on interface 2
+ #
+ self.pg2.unset_unnumbered(self.pg1.sw_if_index)
+
+ def test_mpls(self):
+ """ MPLS """
+
+ #
+ # Interface 2 does not yet have ip4 config
+ #
+ self.pg2.config_ip4()
+ self.pg2.generate_remote_hosts(2)
+
+ #
+ # Add a reoute with out going label via an ARP unresolved next-hop
+ #
+ ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
+ [VppRoutePath(self.pg2.remote_hosts[1].ip4,
+ self.pg2.sw_if_index,
+ labels=[55])])
+ ip_10_0_0_1.add_vpp_config()
+
+ #
+ # packets should generate an ARP request
+ #
+ p = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst="10.0.0.1") /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg2.get_capture(1)
+ self.verify_arp_req(rx[0],
+ self.pg2.local_mac,
+ self.pg2.local_ip4,
+ self.pg2._remote_hosts[1].ip4)
+
+ #
+ # now resolve the neighbours
+ #
+ self.pg2.configure_ipv4_neighbors()
+
+ #
+ # Now packet should be properly MPLS encapped.
+ # This verifies that MPLS link-type adjacencies are completed
+ # when the ARP entry resolves
+ #
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg2.get_capture(1)
+ self.verify_ip_o_mpls(rx[0],
+ self.pg2.local_mac,
+ self.pg2.remote_hosts[1].mac,
+ 55,
+ self.pg0.remote_ip4,
+ "10.0.0.1")
+ self.pg2.unconfig_ip4()
+
+if __name__ == '__main__':
+ unittest.main(testRunner=VppTestRunner)