Refactor IP input checks for re-use at MPLS disposition
[vpp.git] / test / test_dhcp.py
index 4e8ed4c..42b80af 100644 (file)
@@ -6,7 +6,7 @@ import struct
 
 from framework import VppTestCase, VppTestRunner
 from vpp_neighbor import VppNeighbor
-from vpp_ip_route import find_route
+from vpp_ip_route import find_route, VppIpTable
 from util import mk_ll_addr
 
 from scapy.layers.l2 import Ether, getmacbyip, ARP
@@ -19,6 +19,7 @@ from scapy.layers.dhcp6 import DHCP6, DHCP6_Solicit, DHCP6_RelayForward, \
 from socket import AF_INET, AF_INET6
 from scapy.utils import inet_pton, inet_ntop
 from scapy.utils6 import in6_ptop
+from util import mactobinary
 
 DHCP4_CLIENT_PORT = 68
 DHCP4_SERVER_PORT = 67
@@ -34,9 +35,19 @@ class TestDHCP(VppTestCase):
 
         # create 3 pg interfaces
         self.create_pg_interfaces(range(4))
+        self.tables = []
 
         # pg0 and 1 are IP configured in VRF 0 and 1.
         # pg2 and 3 are non IP-configured in VRF 0 and 1
+        table_id = 0
+        for table_id in range(1, 4):
+            tbl4 = VppIpTable(self, table_id)
+            tbl4.add_vpp_config()
+            self.tables.append(tbl4)
+            tbl6 = VppIpTable(self, table_id, is_ip6=1)
+            tbl6.add_vpp_config()
+            self.tables.append(tbl6)
+
         table_id = 0
         for i in self.pg_interfaces[:2]:
             i.admin_up()
@@ -56,11 +67,15 @@ class TestDHCP(VppTestCase):
             table_id += 1
 
     def tearDown(self):
-        super(TestDHCP, self).tearDown()
-        for i in self.pg_interfaces:
+        for i in self.pg_interfaces[:2]:
             i.unconfig_ip4()
             i.unconfig_ip6()
+
+        for i in self.pg_interfaces:
+            i.set_table_ip4(0)
+            i.set_table_ip6(0)
             i.admin_down()
+        super(TestDHCP, self).tearDown()
 
     def send_and_assert_no_replies(self, intf, pkts, remark):
         intf.add_stream(pkts)
@@ -196,6 +211,10 @@ class TestDHCP(VppTestCase):
         self.verify_dhcp_has_option(pkt, "hostname", hostname)
         if client_id:
             self.verify_dhcp_has_option(pkt, "client_id", client_id)
+        bootp = pkt[BOOTP]
+        self.assertEqual(bootp.ciaddr, "0.0.0.0")
+        self.assertEqual(bootp.giaddr, "0.0.0.0")
+        self.assertEqual(bootp.flags, 0x8000)
 
     def verify_orig_dhcp_request(self, pkt, intf, hostname, ip):
         self.verify_orig_dhcp_pkt(pkt, intf)
@@ -203,6 +222,10 @@ class TestDHCP(VppTestCase):
         self.verify_dhcp_msg_type(pkt, "request")
         self.verify_dhcp_has_option(pkt, "hostname", hostname)
         self.verify_dhcp_has_option(pkt, "requested_addr", ip)
+        bootp = pkt[BOOTP]
+        self.assertEqual(bootp.ciaddr, "0.0.0.0")
+        self.assertEqual(bootp.giaddr, "0.0.0.0")
+        self.assertEqual(bootp.flags, 0x8000)
 
     def verify_relayed_dhcp_discover(self, pkt, intf, src_intf=None,
                                      fib_id=0, oui=0,
@@ -659,6 +682,8 @@ class TestDHCP(VppTestCase):
                                         "DHCP cleanup VRF 0")
         self.send_and_assert_no_replies(self.pg3, pkts_disc_vrf1,
                                         "DHCP cleanup VRF 1")
+        self.pg2.unconfig_ip4()
+        self.pg3.unconfig_ip4()
 
     def test_dhcp6_proxy(self):
         """ DHCPv6 Proxy"""
@@ -1037,6 +1062,8 @@ class TestDHCP(VppTestCase):
                                     server_table_id=0,
                                     is_ipv6=1,
                                     is_add=0)
+        self.pg2.unconfig_ip6()
+        self.pg3.unconfig_ip6()
 
     def test_dhcp_client(self):
         """ DHCP Client"""
@@ -1057,14 +1084,15 @@ class TestDHCP(VppTestCase):
         #
         # Sned back on offer, expect the request
         #
-        p = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
-             IP(src=self.pg2.remote_ip4, dst="255.255.255.255") /
-             UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT) /
-             BOOTP(op=1,
-                   yiaddr=self.pg2.local_ip4) /
-             DHCP(options=[('message-type', 'offer'), ('end')]))
+        p_offer = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
+                   IP(src=self.pg2.remote_ip4, dst="255.255.255.255") /
+                   UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT) /
+                   BOOTP(op=1, yiaddr=self.pg2.local_ip4) /
+                   DHCP(options=[('message-type', 'offer'),
+                                 ('server_id', self.pg2.remote_ip4),
+                                 ('end')]))
 
-        self.pg2.add_stream(p)
+        self.pg2.add_stream(p_offer)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
@@ -1075,19 +1103,18 @@ class TestDHCP(VppTestCase):
         #
         # Send an acknowloedgement
         #
-        p = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
-             IP(src=self.pg2.remote_ip4, dst="255.255.255.255") /
-             UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT) /
-             BOOTP(op=1,
-                   yiaddr=self.pg2.local_ip4) /
-             DHCP(options=[('message-type', 'ack'),
-                           ('subnet_mask', "255.255.255.0"),
-                           ('router', self.pg2.remote_ip4),
-                           ('server_id', self.pg2.remote_ip4),
-                           ('lease_time', 43200),
-                           ('end')]))
-
-        self.pg2.add_stream(p)
+        p_ack = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
+                 IP(src=self.pg2.remote_ip4, dst="255.255.255.255") /
+                 UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT) /
+                 BOOTP(op=1, yiaddr=self.pg2.local_ip4) /
+                 DHCP(options=[('message-type', 'ack'),
+                               ('subnet_mask', "255.255.255.0"),
+                               ('router', self.pg2.remote_ip4),
+                               ('server_id', self.pg2.remote_ip4),
+                               ('lease_time', 43200),
+                               ('end')]))
+
+        self.pg2.add_stream(p_ack)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
 
@@ -1103,11 +1130,12 @@ class TestDHCP(VppTestCase):
         # At the end of this procedure there should be a connected route
         # in the FIB
         #
+        self.assertTrue(find_route(self, self.pg2.local_ip4, 24))
         self.assertTrue(find_route(self, self.pg2.local_ip4, 32))
 
         # remove the left over ARP entry
         self.vapi.ip_neighbor_add_del(self.pg2.sw_if_index,
-                                      self.pg2.remote_mac,
+                                      mactobinary(self.pg2.remote_mac),
                                       self.pg2.remote_ip4,
                                       is_add=0)
         #
@@ -1119,10 +1147,14 @@ class TestDHCP(VppTestCase):
         # and now the route should be gone
         #
         self.assertFalse(find_route(self, self.pg2.local_ip4, 32))
+        self.assertFalse(find_route(self, self.pg2.local_ip4, 24))
 
         #
-        # Start the procedure again. this time have VPP send the clientiid
+        # Start the procedure again. this time have VPP send the client-ID
         #
+        self.pg2.admin_down()
+        self.sleep(1)
+        self.pg2.admin_up()
         self.vapi.dhcp_client(self.pg2.sw_if_index, hostname,
                               client_id=self.pg2.local_mac)
 
@@ -1131,10 +1163,47 @@ class TestDHCP(VppTestCase):
         self.verify_orig_dhcp_discover(rx[0], self.pg2, hostname,
                                        self.pg2.local_mac)
 
+        self.pg2.add_stream(p_offer)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        rx = self.pg2.get_capture(1)
+        self.verify_orig_dhcp_request(rx[0], self.pg2, hostname,
+                                      self.pg2.local_ip4)
+
+        #
+        # unicast the ack to the offered address
+        #
+        p_ack = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
+                 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
+                 UDP(sport=DHCP4_SERVER_PORT, dport=DHCP4_CLIENT_PORT) /
+                 BOOTP(op=1, yiaddr=self.pg2.local_ip4) /
+                 DHCP(options=[('message-type', 'ack'),
+                               ('subnet_mask', "255.255.255.0"),
+                               ('router', self.pg2.remote_ip4),
+                               ('server_id', self.pg2.remote_ip4),
+                               ('lease_time', 43200),
+                               ('end')]))
+
+        self.pg2.add_stream(p_ack)
+        self.pg_enable_capture(self.pg_interfaces)
+        self.pg_start()
+
+        #
+        # At the end of this procedure there should be a connected route
+        # in the FIB
+        #
+        self.assertTrue(find_route(self, self.pg2.local_ip4, 32))
+        self.assertTrue(find_route(self, self.pg2.local_ip4, 24))
+
         #
         # remove the DHCP config
         #
         self.vapi.dhcp_client(self.pg2.sw_if_index, hostname, is_add=0)
 
+        self.assertFalse(find_route(self, self.pg2.local_ip4, 32))
+        self.assertFalse(find_route(self, self.pg2.local_ip4, 24))
+
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)