X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Ftraffic_scripts%2Fspan_check.py;fp=resources%2Ftraffic_scripts%2Fspan_check.py;h=0000000000000000000000000000000000000000;hp=cbf65d3aaf371ad5b2f01218efb919cbee33e630;hb=25e203b678e1cfd3691a450050115c66813165c3;hpb=10e0393fde6d919cf0e5848bc5e506d981642ef8 diff --git a/resources/traffic_scripts/span_check.py b/resources/traffic_scripts/span_check.py deleted file mode 100755 index cbf65d3aaf..0000000000 --- a/resources/traffic_scripts/span_check.py +++ /dev/null @@ -1,215 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2016 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface -to the other. Source and destination IP addresses and source and destination -MAC addresses are checked in received packet. -""" - -import sys -import ipaddress - -from scapy.layers.inet import IP, ICMP, ARP -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS -from scapy.layers.l2 import Ether - -from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad -from resources.libraries.python.TrafficScriptArg import TrafficScriptArg - - -def valid_ipv4(address): - """Check if IP address has the correct IPv4 address format. - - :param address: IP address. - :type address: str - :return: True in case of correct IPv4 address format, - otherwise return false. - :rtype: bool - """ - try: - ipaddress.IPv4Address(unicode(address)) - return True - except (AttributeError, ipaddress.AddressValueError): - return False - - -def valid_ipv6(address): - """Check if IP address has the correct IPv6 address format. - - :param address: IP address. - :type address: str - :return: True in case of correct IPv6 address format, - otherwise return false. - :rtype: bool - """ - try: - ipaddress.IPv6Address(unicode(address)) - return True - except (AttributeError, ipaddress.AddressValueError): - return False - - -def main(): - """Send a simple L2 or ICMP packet from one TG interface to DUT, then - receive a copy of the packet on the second TG interface, and a copy of - the ICMP reply.""" - args = TrafficScriptArg(['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', - 'ptype']) - - src_mac = args.get_arg('tg_src_mac') - dst_mac = args.get_arg('dut_if1_mac') - src_ip = args.get_arg('src_ip') - dst_ip = args.get_arg('dst_ip') - tx_if = args.get_arg('tx_if') - rx_if = args.get_arg('rx_if') - ptype = args.get_arg('ptype') - - rxq_mirrored = RxQueue(rx_if) - rxq_tx = RxQueue(tx_if) - txq = TxQueue(tx_if) - - sent = [] - - if ptype == "ARP": - pkt_raw = (Ether(src=src_mac, dst=dst_mac) / - ARP(hwsrc=src_mac, hwdst="00:00:00:00:00:00", - psrc=src_ip, pdst=dst_ip, op="who-has")) - elif ptype == "ICMP": - if valid_ipv4(src_ip) and valid_ipv4(dst_ip): - pkt_raw = (Ether(src=src_mac, dst=dst_mac) / - IP(src=src_ip, dst=dst_ip) / - ICMP(type="echo-request")) - else: - raise ValueError("IP addresses not in correct format") - elif ptype == "ICMPv6": - if valid_ipv6(src_ip) and valid_ipv6(dst_ip): - pkt_raw = (Ether(src=src_mac, dst=dst_mac) / - IPv6(src=src_ip, dst=dst_ip) / - ICMPv6EchoRequest()) - else: - raise ValueError("IPv6 addresses not in correct format") - else: - raise RuntimeError("Unexpected payload type.") - - txq.send(pkt_raw) - sent.append(auto_pad(pkt_raw)) - - # Receive copy of Rx packet. - while True: - ether = rxq_mirrored.recv(2) - if ether is None: - raise RuntimeError("Rx timeout of mirrored Rx packet") - - if ether.haslayer(ICMPv6ND_NS): - # read another packet in the queue if the current one is ICMPv6ND_NS - continue - else: - # otherwise process the current packet - break - - pkt = auto_pad(pkt_raw) - if str(ether) != str(pkt): - print("Mirrored Rx packet doesn't match the original Rx packet.") - if ether.src != src_mac or ether.dst != dst_mac: - raise RuntimeError("MAC mismatch in mirrored Rx packet.") - if ptype == "ARP": - if not ether.haslayer(ARP): - raise RuntimeError("Mirrored Rx packet is not an ARP packet.") - if ether['ARP'].op != 1: # 1=who-has - raise RuntimeError("Mirrored Rx packet is not an ARP request.") - if ether['ARP'].hwsrc != src_mac or ether['ARP'].hwdst != dst_mac: - raise RuntimeError("MAC mismatch in mirrored Rx ARP packet.") - if ether['ARP'].psrc != src_ip or ether['ARP'].pdst != dst_ip: - raise RuntimeError("IP address mismatch in mirrored " - "Rx ARP packet.") - elif ptype == "ICMP": - if not ether.haslayer(IP): - raise RuntimeError("Mirrored Rx packet is not an IPv4 packet.") - if ether['IP'].src != src_ip or ether['IP'].dst != dst_ip: - raise RuntimeError("IP address mismatch in mirrored " - "Rx IPv4 packet.") - if not ether.haslayer(ICMP): - raise RuntimeError("Mirrored Rx packet is not an ICMP packet.") - if ether['ICMP'].type != 8: # 8=echo-request - raise RuntimeError("Mirrored Rx packet is not an ICMP " - "echo request.") - elif ptype == "ICMPv6": - if not ether.haslayer(IPv6): - raise RuntimeError("Mirrored Rx packet is not an IPv6 packet.") - if ether['IPv6'].src != src_ip or ether['IPv6'].dst != dst_ip: - raise RuntimeError("IP address mismatch in mirrored " - "Rx IPv6 packet.") - if not ether.haslayer(ICMPv6EchoRequest): - raise RuntimeError("Mirrored Rx packet is not an ICMPv6 " - "echo request.") - print("Mirrored Rx packet check OK.\n") - - # Receive reply on TG Tx port. - ether_repl = rxq_tx.recv(2, sent) - - if ether_repl is None: - raise RuntimeError("Reply not received on TG Tx port.") - - print("Reply received on TG Tx port.\n") - - # Receive copy of Tx packet. - ether = rxq_mirrored.recv(2) - if ether is None: - raise RuntimeError("Rx timeout of mirrored Tx packet") - - if str(ether) != str(ether_repl): - print("Mirrored Tx packet doesn't match the received Tx packet.") - if ether.src != ether_repl.src or ether.dst != ether_repl.dst: - raise RuntimeError("MAC mismatch in mirrored Tx packet.") - if ptype == "ARP": - if not ether.haslayer(ARP): - raise RuntimeError("Mirrored Tx packet is not an ARP packet.") - if ether['ARP'].op != ether_repl['ARP'].op: # 2=is_at - raise RuntimeError("ARP operational code mismatch " - "in mirrored Tx packet.") - if ether['ARP'].hwsrc != ether_repl['ARP'].hwsrc\ - or ether['ARP'].hwdst != ether_repl['ARP'].hwdst: - raise RuntimeError("MAC mismatch in mirrored Tx ARP packet.") - if ether['ARP'].psrc != ether_repl['ARP'].psrc\ - or ether['ARP'].pdst != ether_repl['ARP'].pdst: - raise RuntimeError("IP address mismatch in mirrored " - "Tx ARP packet.") - elif ptype == "ICMP": - if not ether.haslayer(IP): - raise RuntimeError("Mirrored Tx packet is not an IPv4 packet.") - if ether['IP'].src != ether_repl['IP'].src\ - or ether['IP'].dst != ether_repl['IP'].dst: - raise RuntimeError("IP address mismatch in mirrored " - "Tx IPv4 packet.") - if not ether.haslayer(ICMP): - raise RuntimeError("Mirrored Tx packet is not an ICMP packet.") - if ether['ICMP'].type != ether_repl['ICMP'].type: # 0=echo-reply - raise RuntimeError("ICMP packet type mismatch " - "in mirrored Tx packet.") - elif ptype == "ICMPv6": - if not ether.haslayer(IPv6): - raise RuntimeError("Mirrored Tx packet is not an IPv6 packet.") - if ether['IPv6'].src != ether_repl['IPv6'].src\ - or ether['IPv6'].dst != ether_repl['IPv6'].dst: - raise RuntimeError("IP address mismatch in mirrored " - "Tx IPv6 packet.") - if ether[2].name != ether_repl[2].name: - raise RuntimeError("ICMPv6 message type mismatch " - "in mirrored Tx packet.") - print("Mirrored Tx packet check OK.\n") - sys.exit(0) - - -if __name__ == "__main__": - main()