Python3: PIP requirement
[csit.git] / resources / traffic_scripts / dhcp / send_dhcp_v6_messages.py
index 21357ad..0974256 100755 (executable)
@@ -30,6 +30,32 @@ def _check_udp_checksum(pkt):
     return new['UDP'].chksum == pkt['UDP'].chksum
 
 
+def _get_dhcpv6_msgtype(msg_index):
+    """Return DHCPv6 message type string.
+
+    :param msg_index: Index of message type.
+    :return: Message type.
+    :type msg_index: int
+    :rtype msg_str: str
+    """
+    dhcp6_messages = {
+        1: "SOLICIT",
+        2: "ADVERTISE",
+        3: "REQUEST",
+        4: "CONFIRM",
+        5: "RENEW",
+        6: "REBIND",
+        7: "REPLY",
+        8: "RELEASE",
+        9: "DECLINE",
+        10: "RECONFIGURE",
+        11: "INFORMATION-REQUEST",
+        12: "RELAY-FORW",
+        13: "RELAY-REPL"
+    }
+    return dhcp6_messages[msg_index]
+
+
 def dhcpv6_solicit(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip,
                    server_ip, server_mac, client_duid, client_mac):
     """Send and check DHCPv6 SOLICIT proxy packet.
@@ -77,27 +103,32 @@ def dhcpv6_solicit(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip,
         raise RuntimeError('DHCPv6 SOLICIT timeout')
 
     if ether.dst != server_mac:
-        raise RuntimeError("Destination MAC address error!")
+        raise RuntimeError("Destination MAC address error: {} != {}".format(
+            ether.dst, server_mac))
     print "Destination MAC address: OK."
 
     if ether['IPv6'].src != proxy_ip:
-        raise RuntimeError("Source IP address error!")
+        raise RuntimeError("Source IP address error: {} != {}".format(
+            ether['IPv6'].src, proxy_ip))
     print "Source IP address: OK."
 
     if ether['IPv6'].dst != server_ip:
-        raise RuntimeError("Destination IP address error!")
+        raise RuntimeError("Destination IP address error: {} != {}".format(
+            ether['IPv6'].dst, server_ip))
     print "Destination IP address: OK."
 
-    if ether['IPv6']['UDP']\
-        ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)']:
-        print "Relay Agent/Server Message: OK."
-    else:
-        raise RuntimeError("Relay Agent/Server Message error.")
+    msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP']
+        ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].msgtype)
+    if msgtype != 'RELAY-FORW':
+        raise RuntimeError("Message type error: {} != RELAY-FORW".format(
+            msgtype))
+    print "Message type: OK."
 
-    if ether['IPv6']['UDP']\
-        ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)']\
-        .linkaddr != proxy_ip:
-        raise RuntimeError("Proxy IP address error!")
+    linkaddr = ether['IPv6']['UDP']\
+        ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].linkaddr
+    if linkaddr != proxy_ip:
+        raise RuntimeError("Proxy IP address error: {} != {}".format(
+           linkaddr, proxy_ip))
     print "Proxy IP address: OK."
 
     try:
@@ -156,15 +187,19 @@ def dhcpv6_advertise(rx_if, tx_if, link_local_ip, proxy_ip,
         raise RuntimeError('DHCPv6 ADVERTISE timeout')
 
     if ether['IPv6'].src != proxy_ip:
-        raise RuntimeError("Source IP address error!")
+        raise RuntimeError("Source IP address error: {} != {}".format(
+            ether['IPv6'].src, proxy_ip))
     print "Source IP address: OK."
 
     if not _check_udp_checksum(ether['IPv6']):
         raise RuntimeError("Checksum error!")
     print "Checksum: OK."
 
-    if ether['IPv6']['UDP']['DHCPv6 Advertise Message'].msgtype != 'ADVERTISE':
-        raise RuntimeError("Message type error!")
+    msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP']
+                                  ['DHCPv6 Advertise Message'].msgtype)
+    if msgtype != 'ADVERTISE':
+        raise RuntimeError("Message type error: {} != ADVERTISE".format(
+            msgtype))
     print "Message type: OK."
 
 
@@ -211,23 +246,27 @@ def dhcpv6_request(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip,
         raise RuntimeError('DHCPv6 REQUEST timeout')
 
     if ether['IPv6'].src != proxy_ip:
-        raise RuntimeError("Source IP address error!")
+        raise RuntimeError("Source IP address error: {} != {}".format(
+            ether['IPv6'].src, proxy_ip))
     print "Source IP address: OK."
 
     if ether['IPv6'].dst != server_ip:
-        raise RuntimeError("Destination IP address error!")
+        raise RuntimeError("Destination IP address error: {} != {}".format(
+            ether['IPv6'].dst, server_ip))
     print "Destination IP address: OK."
 
-    if ether['IPv6']['UDP']\
-        ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)']:
-        print "Relay Agent/Server Message: OK."
-    else:
-        raise RuntimeError("Relay Agent/Server Message error.")
+    msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP']
+        ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].msgtype)
+    if msgtype != 'RELAY-FORW':
+        raise RuntimeError("Message type error: {} != RELAY-FORW".format(
+            msgtype))
+    print "Message type: OK."
 
-    if ether['IPv6']['UDP']\
-            ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)']\
-            .linkaddr != proxy_ip:
-        raise RuntimeError("Proxy IP address error!")
+    linkaddr = ether['IPv6']['UDP']\
+        ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].linkaddr
+    if linkaddr != proxy_ip:
+        raise RuntimeError("Proxy IP address error: {} != {}".format(
+           linkaddr, proxy_ip))
     print "Proxy IP address: OK."
 
 
@@ -275,15 +314,18 @@ def dhcpv6_reply(rx_if, tx_if, link_local_ip, proxy_ip, server_ip, server_mac,
         raise RuntimeError('DHCPv6 REPLY timeout')
 
     if ether['IPv6'].src != proxy_ip:
-        raise RuntimeError("Source IP address error!")
+        raise RuntimeError("Source IP address error: {} != {}".format(
+            ether['IPv6'].src, proxy_ip))
     print "Source IP address: OK."
 
     if not _check_udp_checksum(ether['IPv6']):
         raise RuntimeError("Checksum error!")
     print "Checksum: OK."
 
-    if ether['IPv6']['UDP']['DHCPv6 Reply Message'].msgtype != 'REPLY':
-        raise RuntimeError("Message type error!")
+    msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP']
+                                  ['DHCPv6 Reply Message'].msgtype)
+    if msgtype != 'REPLY':
+        raise RuntimeError("Message type error: {} != REPLY".format(msgtype))
     print "Message type: OK."