#!/usr/bin/env python # Copyright (c) 2016 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Traffic script that sends DHCPv6 proxy packets.""" from scapy.layers.inet import Ether from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\ ICMPv6ND_NS from resources.libraries.python.PacketVerifier import RxQueue, TxQueue from resources.libraries.python.TrafficScriptArg import TrafficScriptArg def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip): """Send ICMPv6 Neighbor Solicitation packet and expect a response from the proxy. :param tx_if: Interface on TG. :param src_mac: MAC address of TG interface. :param dst_mac: MAC address of proxy interface. :param src_ip: IP address of TG interface. :param dst_ip: IP address of proxied interface. :type tx_if: str :type src_mac: str :type dst_mac: str :type src_ip: str :type dst_ip: str :raises RuntimeError: If the received packet is not correct. """ rxq = RxQueue(tx_if) txq = TxQueue(tx_if) sent_packets = [] icmpv6nd_solicit_pkt =\ Ether(src=src_mac, dst=dst_mac) / \ IPv6(src=src_ip) / \ ICMPv6ND_NS(tgt=dst_ip) sent_packets.append(icmpv6nd_solicit_pkt) txq.send(icmpv6nd_solicit_pkt) ether = None for _ in range(5): pkt = rxq.recv(3, ignore=sent_packets) if pkt is not None: ether = pkt break if ether is None: raise RuntimeError('ICMPv6ND Proxy response timeout.') if ether.src != dst_mac: raise RuntimeError("Source MAC address error: {} != {}".format( ether.src, dst_mac)) print "Source MAC address: OK." if ether.dst != src_mac: raise RuntimeError("Destination MAC address error: {} != {}".format( ether.dst, src_mac)) print "Destination MAC address: OK." if ether['IPv6'].src != dst_ip: raise RuntimeError("Source IP address error: {} != {}".format( ether['IPv6'].src, dst_ip)) print "Source IP address: OK." if ether['IPv6'].dst != src_ip: raise RuntimeError("Destination IP address error: {} != {}".format( ether['IPv6'].dst, src_ip)) print "Destination IP address: OK." try: target_addr = ether['IPv6']\ ['ICMPv6 Neighbor Discovery - Neighbor Advertisement'].tgt except (KeyError, AttributeError): raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.") if target_addr != dst_ip: raise RuntimeError("ICMPv6 field 'Target address' error:" " {} != {}".format(target_addr, dst_ip)) print "Target address field: OK." def ipv6_ping(src_if, dst_if, src_mac, dst_mac, proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip): """Sends ICMPv6 Echo Request, receive it and send a reply. :param src_if: First TG interface on link to DUT. :param dst_if: Second TG interface on link to DUT. :param src_mac: MAC address of first interface. :param dst_mac: MAC address of second interface. :param proxy_to_src_mac: MAC address of first proxy interface on DUT. :param proxy_to_dst_mac: MAC address of second proxy interface on DUT. :param src_ip: IP address of first interface. :param dst_ip: IP address of second interface. :type src_if: str :type dst_if: str :type src_mac: str :type dst_mac: str :type proxy_to_src_mac: str :type proxy_to_dst_mac: str :type src_ip: str :type dst_ip: str :raises RuntimeError: If a received packet is not correct. """ rxq = RxQueue(dst_if) txq = TxQueue(src_if) icmpv6_ping_pkt = \ Ether(src=src_mac, dst=proxy_to_src_mac) / \ IPv6(src=src_ip, dst=dst_ip) / \ ICMPv6EchoRequest() txq.send(icmpv6_ping_pkt) ether = None for _ in range(5): pkt = rxq.recv(3) if pkt is not None: ether = pkt break if ether is None: raise RuntimeError('ICMPv6 Echo Request timeout.') try: ether["IPv6"]["ICMPv6 Echo Request"] except KeyError: raise RuntimeError("Received packet is not an ICMPv6 Echo Request.") print "ICMP Echo: OK." rxq = RxQueue(src_if) txq = TxQueue(dst_if) icmpv6_ping_pkt = \ Ether(src=dst_mac, dst=proxy_to_dst_mac) / \ IPv6(src=dst_ip, dst=src_ip) / \ ICMPv6EchoReply() txq.send(icmpv6_ping_pkt) ether = None for _ in range(5): pkt = rxq.recv(3) if pkt is not None: ether = pkt break if ether is None: raise RuntimeError('DHCPv6 SOLICIT timeout') try: ether["IPv6"]["ICMPv6 Echo Reply"] except KeyError: raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.") print "ICMP Reply: OK." def main(): """Send DHCPv6 proxy messages.""" args = TrafficScriptArg(['src_ip', 'dst_ip', 'src_mac', 'dst_mac', 'proxy_to_src_mac', 'proxy_to_dst_mac']) src_if = args.get_arg('tx_if') dst_if = args.get_arg('rx_if') src_ip = args.get_arg("src_ip") dst_ip = args.get_arg("dst_ip") src_mac = args.get_arg('src_mac') dst_mac = args.get_arg('dst_mac') proxy_to_src_mac = args.get_arg('proxy_to_src_mac') proxy_to_dst_mac = args.get_arg('proxy_to_dst_mac') # Neighbor solicitation imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip) # Verify route (ICMP echo/reply) ipv6_ping(src_if, dst_if, src_mac, dst_mac, proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip) if __name__ == "__main__": main()