X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=resources%2Ftraffic_scripts%2Fdhcp%2Fsend_dhcp_v6_messages.py;h=09742565ee1add3c8934aaf961a25b21e49ba1ce;hb=df5ed0d9317e2a1d4a7248459cc6aec39f80a052;hp=d570a8c8aa413ed018d9f125980d5c6e529db2b7;hpb=53ce7d17816de6516f3c21bd01835b8c1c474c79;p=csit.git diff --git a/resources/traffic_scripts/dhcp/send_dhcp_v6_messages.py b/resources/traffic_scripts/dhcp/send_dhcp_v6_messages.py index d570a8c8aa..09742565ee 100755 --- a/resources/traffic_scripts/dhcp/send_dhcp_v6_messages.py +++ b/resources/traffic_scripts/dhcp/send_dhcp_v6_messages.py @@ -30,6 +30,32 @@ def _check_udp_checksum(pkt): return new['UDP'].chksum == pkt['UDP'].chksum +def _get_dhcpv6_msgtype(msg_index): + """Return DHCPv6 message type string. + + :param msg_index: Index of message type. + :return: Message type. + :type msg_index: int + :rtype msg_str: str + """ + dhcp6_messages = { + 1: "SOLICIT", + 2: "ADVERTISE", + 3: "REQUEST", + 4: "CONFIRM", + 5: "RENEW", + 6: "REBIND", + 7: "REPLY", + 8: "RELEASE", + 9: "DECLINE", + 10: "RECONFIGURE", + 11: "INFORMATION-REQUEST", + 12: "RELAY-FORW", + 13: "RELAY-REPL" + } + return dhcp6_messages[msg_index] + + def dhcpv6_solicit(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip, server_ip, server_mac, client_duid, client_mac): """Send and check DHCPv6 SOLICIT proxy packet. @@ -77,27 +103,32 @@ def dhcpv6_solicit(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip, raise RuntimeError('DHCPv6 SOLICIT timeout') if ether.dst != server_mac: - raise RuntimeError("Destination MAC address error!") + raise RuntimeError("Destination MAC address error: {} != {}".format( + ether.dst, server_mac)) print "Destination MAC address: OK." if ether['IPv6'].src != proxy_ip: - raise RuntimeError("Source IP address error!") + raise RuntimeError("Source IP address error: {} != {}".format( + ether['IPv6'].src, proxy_ip)) print "Source IP address: OK." if ether['IPv6'].dst != server_ip: - raise RuntimeError("Destination IP address error!") + raise RuntimeError("Destination IP address error: {} != {}".format( + ether['IPv6'].dst, server_ip)) print "Destination IP address: OK." - if ether['IPv6']['UDP']\ - ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)']: - print "Relay Agent/Server Message: OK." - else: - raise RuntimeError("Relay Agent/Server Message error.") - - if ether['IPv6']['UDP']\ - ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)']\ - .linkaddr != proxy_ip: - raise RuntimeError("Proxy IP address error!") + msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP'] + ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].msgtype) + if msgtype != 'RELAY-FORW': + raise RuntimeError("Message type error: {} != RELAY-FORW".format( + msgtype)) + print "Message type: OK." + + linkaddr = ether['IPv6']['UDP']\ + ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].linkaddr + if linkaddr != proxy_ip: + raise RuntimeError("Proxy IP address error: {} != {}".format( + linkaddr, proxy_ip)) print "Proxy IP address: OK." try: @@ -156,16 +187,20 @@ def dhcpv6_advertise(rx_if, tx_if, link_local_ip, proxy_ip, raise RuntimeError('DHCPv6 ADVERTISE timeout') if ether['IPv6'].src != proxy_ip: - raise RuntimeError("Source IP address error!") + raise RuntimeError("Source IP address error: {} != {}".format( + ether['IPv6'].src, proxy_ip)) print "Source IP address: OK." if not _check_udp_checksum(ether['IPv6']): raise RuntimeError("Checksum error!") print "Checksum: OK." - if ether['IPv6']['UDP']['Raw'].load != interface_id: - raise RuntimeError("Interface ID error!") - print "Interface ID: OK." + msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP'] + ['DHCPv6 Advertise Message'].msgtype) + if msgtype != 'ADVERTISE': + raise RuntimeError("Message type error: {} != ADVERTISE".format( + msgtype)) + print "Message type: OK." def dhcpv6_request(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip, @@ -211,23 +246,27 @@ def dhcpv6_request(tx_if, rx_if, dhcp_multicast_ip, link_local_ip, proxy_ip, raise RuntimeError('DHCPv6 REQUEST timeout') if ether['IPv6'].src != proxy_ip: - raise RuntimeError("Source IP address error!") + raise RuntimeError("Source IP address error: {} != {}".format( + ether['IPv6'].src, proxy_ip)) print "Source IP address: OK." if ether['IPv6'].dst != server_ip: - raise RuntimeError("Destination IP address error!") + raise RuntimeError("Destination IP address error: {} != {}".format( + ether['IPv6'].dst, server_ip)) print "Destination IP address: OK." - if ether['IPv6']['UDP']\ - ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)']: - print "Relay Agent/Server Message: OK." - else: - raise RuntimeError("Relay Agent/Server Message error.") - - if ether['IPv6']['UDP']\ - ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)']\ - .linkaddr != proxy_ip: - raise RuntimeError("Proxy IP address error!") + msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP'] + ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].msgtype) + if msgtype != 'RELAY-FORW': + raise RuntimeError("Message type error: {} != RELAY-FORW".format( + msgtype)) + print "Message type: OK." + + linkaddr = ether['IPv6']['UDP']\ + ['DHCPv6 Relay Forward Message (Relay Agent/Server Message)'].linkaddr + if linkaddr != proxy_ip: + raise RuntimeError("Proxy IP address error: {} != {}".format( + linkaddr, proxy_ip)) print "Proxy IP address: OK." @@ -275,16 +314,19 @@ def dhcpv6_reply(rx_if, tx_if, link_local_ip, proxy_ip, server_ip, server_mac, raise RuntimeError('DHCPv6 REPLY timeout') if ether['IPv6'].src != proxy_ip: - raise RuntimeError("Source IP address error!") + raise RuntimeError("Source IP address error: {} != {}".format( + ether['IPv6'].src, proxy_ip)) print "Source IP address: OK." if not _check_udp_checksum(ether['IPv6']): raise RuntimeError("Checksum error!") print "Checksum: OK." - if ether['IPv6']['UDP']['Raw'].load != interface_id: - raise RuntimeError("Interface ID error!") - print "Interface ID: OK." + msgtype = _get_dhcpv6_msgtype(ether['IPv6']['UDP'] + ['DHCPv6 Reply Message'].msgtype) + if msgtype != 'REPLY': + raise RuntimeError("Message type error: {} != REPLY".format(msgtype)) + print "Message type: OK." def main():