X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Ftraffic_scripts%2Fipv6_nd_proxy_check.py;fp=resources%2Ftraffic_scripts%2Fipv6_nd_proxy_check.py;h=0000000000000000000000000000000000000000;hp=1d96050cf43a045e127166685d6a09665d9d4a09;hb=d68951ac245150eeefa6e0f4156e4c1b5c9e9325;hpb=ed0258a440cfad7023d643f717ab78ac568dc59b diff --git a/resources/traffic_scripts/ipv6_nd_proxy_check.py b/resources/traffic_scripts/ipv6_nd_proxy_check.py deleted file mode 100755 index 1d96050cf4..0000000000 --- a/resources/traffic_scripts/ipv6_nd_proxy_check.py +++ /dev/null @@ -1,207 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends DHCPv6 proxy packets.""" - -from scapy.layers.inet import Ether -from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS -from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply - - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip): - """Send ICMPv6 Neighbor Solicitation packet and expect a response - from the proxy. - - :param tx_if: Interface on TG. - :param src_mac: MAC address of TG interface. - :param dst_mac: MAC address of proxy interface. - :param src_ip: IP address of TG interface. - :param dst_ip: IP address of proxied interface. - :type tx_if: str - :type src_mac: str - :type dst_mac: str - :type src_ip: str - :type dst_ip: str - :raises RuntimeError: If the received packet is not correct. - """ - - rxq = RxQueue(tx_if) - txq = TxQueue(tx_if) - - sent_packets = [] - - icmpv6nd_solicit_pkt = (Ether(src=src_mac, dst=dst_mac) / - IPv6(src=src_ip) / - ICMPv6ND_NS(tgt=dst_ip)) - - sent_packets.append(icmpv6nd_solicit_pkt) - txq.send(icmpv6nd_solicit_pkt) - - ether = None - for _ in range(5): - ether = rxq.recv(3, ignore=sent_packets) - if not ether: - continue - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue in case of ICMPv6ND_NS packet - continue - else: - # otherwise process the current packet - break - - if ether is None: - raise RuntimeError('ICMPv6ND Proxy response timeout.') - - if ether.src != dst_mac: - raise RuntimeError("Source MAC address error: {} != {}". - format(ether.src, dst_mac)) - print "Source MAC address: OK." - - if ether.dst != src_mac: - raise RuntimeError("Destination MAC address error: {} != {}". - format(ether.dst, src_mac)) - print "Destination MAC address: OK." - - if ether[IPv6].src != dst_ip: - raise RuntimeError("Source IP address error: {} != {}". - format(ether[IPv6].src, dst_ip)) - print "Source IP address: OK." - - if ether[IPv6].dst != src_ip: - raise RuntimeError("Destination IP address error: {} != {}". - format(ether[IPv6].dst, src_ip)) - print "Destination IP address: OK." - - try: - target_addr = ether[IPv6][ICMPv6ND_NA].tgt - except (KeyError, AttributeError): - raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.") - - if target_addr != dst_ip: - raise RuntimeError("ICMPv6 field 'Target address' error: {} != {}". - format(target_addr, dst_ip)) - print "Target address field: OK." - - -def ipv6_ping(src_if, dst_if, src_mac, dst_mac, - proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip): - """Sends ICMPv6 Echo Request, receive it and send a reply. - - :param src_if: First TG interface on link to DUT. - :param dst_if: Second TG interface on link to DUT. - :param src_mac: MAC address of first interface. - :param dst_mac: MAC address of second interface. - :param proxy_to_src_mac: MAC address of first proxy interface on DUT. - :param proxy_to_dst_mac: MAC address of second proxy interface on DUT. - :param src_ip: IP address of first interface. - :param dst_ip: IP address of second interface. - :type src_if: str - :type dst_if: str - :type src_mac: str - :type dst_mac: str - :type proxy_to_src_mac: str - :type proxy_to_dst_mac: str - :type src_ip: str - :type dst_ip: str - :raises RuntimeError: If a received packet is not correct. - """ - rxq = RxQueue(dst_if) - txq = TxQueue(src_if) - - icmpv6_ping_pkt = (Ether(src=src_mac, dst=proxy_to_src_mac) / - IPv6(src=src_ip, dst=dst_ip) / - ICMPv6EchoRequest()) - - txq.send(icmpv6_ping_pkt) - - ether = None - while True: - ether = rxq.recv(3) - if not ether: - continue - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue in case of ICMPv6ND_NS packet - continue - else: - # otherwise process the current packet - break - - if ether is None: - raise RuntimeError('ICMPv6 Echo Request timeout.') - try: - ether[IPv6]["ICMPv6 Echo Request"] - except KeyError: - raise RuntimeError("Received packet is not an ICMPv6 Echo Request.") - print "ICMP Echo: OK." - - rxq = RxQueue(src_if) - txq = TxQueue(dst_if) - - icmpv6_ping_pkt = (Ether(src=dst_mac, dst=proxy_to_dst_mac) / - IPv6(src=dst_ip, dst=src_ip) / - ICMPv6EchoReply()) - - txq.send(icmpv6_ping_pkt) - - ether = None - while True: - ether = rxq.recv(3) - if not ether: - continue - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue in case of ICMPv6ND_NS packet - continue - else: - # otherwise process the current packet - break - - if ether is None: - raise RuntimeError('DHCPv6 SOLICIT timeout') - try: - ether[IPv6]["ICMPv6 Echo Reply"] - except KeyError: - raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.") - - print "ICMP Reply: OK." - - -def main(): - """Send DHCPv6 proxy messages.""" - - args = TrafficScriptArg(['src_ip', 'dst_ip', 'src_mac', 'dst_mac', - 'proxy_to_src_mac', 'proxy_to_dst_mac']) - - src_if = args.get_arg('tx_if') - dst_if = args.get_arg('rx_if') - src_ip = args.get_arg("src_ip") - dst_ip = args.get_arg("dst_ip") - src_mac = args.get_arg('src_mac') - dst_mac = args.get_arg('dst_mac') - proxy_to_src_mac = args.get_arg('proxy_to_src_mac') - proxy_to_dst_mac = args.get_arg('proxy_to_dst_mac') - - # Neighbor solicitation - imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip) - - # Verify route (ICMP echo/reply) - ipv6_ping(src_if, dst_if, src_mac, dst_mac, proxy_to_src_mac, - proxy_to_dst_mac, src_ip, dst_ip) - - -if __name__ == "__main__": - main()