SPAN test - improvement of packet check
[csit.git] / resources / traffic_scripts / span_check.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface
16 to the other. Source and destination IP addresses and source and destination
17 MAC addresses are checked in received packet.
18 """
19
20 import sys
21 import ipaddress
22
23 from scapy.layers.inet import IP, ICMP, ARP
24 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
25 from scapy.layers.l2 import Ether
26
27 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad
28 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
29
30
31 def valid_ipv4(address):
32     """Check if IP address has the correct IPv4 address format.
33
34     :param address: IP address.
35     :type address: str
36     :return: True in case of correct IPv4 address format,
37     otherwise return false.
38     :rtype: bool
39     """
40     try:
41         ipaddress.IPv4Address(unicode(address))
42         return True
43     except (AttributeError, ipaddress.AddressValueError):
44         return False
45
46
47 def valid_ipv6(address):
48     """Check if IP address has the correct IPv6 address format.
49
50     :param address: IP address.
51     :type address: str
52     :return: True in case of correct IPv6 address format,
53     otherwise return false.
54     :rtype: bool
55     """
56     try:
57         ipaddress.IPv6Address(unicode(address))
58         return True
59     except (AttributeError, ipaddress.AddressValueError):
60         return False
61
62
63 def main():
64     """Send a simple L2 or ICMP packet from one TG interface to DUT, then
65     receive a copy of the packet on the second TG interface, and a copy of
66     the ICMP reply."""
67     args = TrafficScriptArg(
68         ['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', 'ptype'])
69
70     src_mac = args.get_arg('tg_src_mac')
71     dst_mac = args.get_arg('dut_if1_mac')
72     src_ip = args.get_arg('src_ip')
73     dst_ip = args.get_arg('dst_ip')
74     tx_if = args.get_arg('tx_if')
75     rx_if = args.get_arg('rx_if')
76     ptype = args.get_arg('ptype')
77
78     rxq_mirrored = RxQueue(rx_if)
79     rxq_tx = RxQueue(tx_if)
80     txq = TxQueue(tx_if)
81
82     sent = []
83
84     if ptype == "ARP":
85         pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
86                    ARP(hwsrc=src_mac, hwdst="00:00:00:00:00:00",
87                        psrc=src_ip, pdst=dst_ip, op="who-has"))
88     elif ptype == "ICMP":
89         if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
90             pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
91                        IP(src=src_ip, dst=dst_ip) /
92                        ICMP(type="echo-request"))
93         else:
94             raise ValueError("IP addresses not in correct format")
95     elif ptype == "ICMPv6":
96         if valid_ipv6(src_ip) and valid_ipv6(dst_ip):
97             pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
98                        IPv6(src=src_ip, dst=dst_ip) /
99                        ICMPv6EchoRequest())
100         else:
101             raise ValueError("IPv6 addresses not in correct format")
102     else:
103         raise RuntimeError("Unexpected payload type.")
104
105     txq.send(pkt_raw)
106     sent.append(auto_pad(pkt_raw))
107     ether = rxq_mirrored.recv(2)
108
109     # Receive copy of Rx packet.
110     if ether is None:
111         raise RuntimeError("Rx timeout of mirrored Rx packet")
112     pkt = auto_pad(pkt_raw)
113     if str(ether) != str(pkt):
114         print("Mirrored Rx packet doesn't match the original Rx packet.")
115         if ether.src != src_mac or ether.dst != dst_mac:
116             raise RuntimeError("MAC mismatch in mirrored Rx packet.")
117         if ptype == "ARP":
118             if not ether.haslayer(ARP):
119                 raise RuntimeError("Mirrored Rx packet is not an ARP packet.")
120             if ether['ARP'].op != 1:  # 1=who-has
121                 raise RuntimeError("Mirrored Rx packet is not an ARP request.")
122             if ether['ARP'].hwsrc != src_mac or ether['ARP'].hwdst != dst_mac:
123                 raise RuntimeError("MAC mismatch in mirrored Rx ARP packet.")
124             if ether['ARP'].psrc != src_ip or ether['ARP'].pdst != dst_ip:
125                 raise RuntimeError("IP address mismatch in mirrored "
126                                    "Rx ARP packet.")
127         elif ptype == "ICMP":
128             if not ether.haslayer(IP):
129                 raise RuntimeError("Mirrored Rx packet is not an IPv4 packet.")
130             if ether['IP'].src != src_ip or ether['IP'].dst != dst_ip:
131                 raise RuntimeError("IP address mismatch in mirrored "
132                                    "Rx IPv4 packet.")
133             if not ether.haslayer(ICMP):
134                 raise RuntimeError("Mirrored Rx packet is not an ICMP packet.")
135             if ether['ICMP'].type != 8:  # 8=echo-request
136                 raise RuntimeError("Mirrored Rx packet is not an ICMP "
137                                    "echo request.")
138         elif ptype == "ICMPv6":
139             if not ether.haslayer(IPv6):
140                 raise RuntimeError("Mirrored Rx packet is not an IPv6 packet.")
141             if ether['IPv6'].src != src_ip or ether['IPv6'].dst != dst_ip:
142                 raise RuntimeError("IP address mismatch in mirrored "
143                                    "Rx IPv6 packet.")
144             if not ether.haslayer(ICMPv6EchoRequest):
145                 raise RuntimeError("Mirrored Rx packet is not an ICMPv6 "
146                                    "echo request.")
147     print("Mirrored Rx packet check OK.\n")
148
149     # Receive reply on TG Tx port.
150     ether_repl = rxq_tx.recv(2, sent)
151     if ether_repl is None:
152         raise RuntimeError("Reply not received on TG Tx port.")
153     else:
154         print("Reply received on TG Tx port.\n")
155
156     # Receive copy of Tx packet.
157     ether = rxq_mirrored.recv(2)
158     if ether is None:
159         raise RuntimeError("Rx timeout of mirrored Tx packet")
160     if str(ether) != str(ether_repl):
161         print("Mirrored Tx packet doesn't match the received Tx packet.")
162         if ether.src != ether_repl.src or ether.dst != ether_repl.dst:
163             raise RuntimeError("MAC mismatch in mirrored Tx packet.")
164         if ptype == "ARP":
165             if not ether.haslayer(ARP):
166                 raise RuntimeError("Mirrored Tx packet is not an ARP packet.")
167             if ether['ARP'].op != ether_repl['ARP'].op:  # 2=is_at
168                 raise RuntimeError("ARP operational code mismatch "
169                                    "in mirrored Tx packet.")
170             if ether['ARP'].hwsrc != ether_repl['ARP'].hwsrc\
171                     or ether['ARP'].hwdst != ether_repl['ARP'].hwdst:
172                 raise RuntimeError("MAC mismatch in mirrored Tx ARP packet.")
173             if ether['ARP'].psrc != ether_repl['ARP'].psrc\
174                     or ether['ARP'].pdst != ether_repl['ARP'].pdst:
175                 raise RuntimeError("IP address mismatch in mirrored "
176                                    "Tx ARP packet.")
177         elif ptype == "ICMP":
178             if not ether.haslayer(IP):
179                 raise RuntimeError("Mirrored Tx packet is not an IPv4 packet.")
180             if ether['IP'].src != ether_repl['IP'].src\
181                     or ether['IP'].dst != ether_repl['IP'].dst:
182                 raise RuntimeError("IP address mismatch in mirrored "
183                                    "Tx IPv4 packet.")
184             if not ether.haslayer(ICMP):
185                 raise RuntimeError("Mirrored Tx packet is not an ICMP packet.")
186             if ether['ICMP'].type != ether_repl['ICMP'].type:  # 0=echo-reply
187                 raise RuntimeError("ICMP packet type mismatch "
188                                    "in mirrored Tx packet.")
189         elif ptype == "ICMPv6":
190             if not ether.haslayer(IPv6):
191                 raise RuntimeError("Mirrored Tx packet is not an IPv6 packet.")
192             if ether['IPv6'].src != ether_repl['IPv6'].src\
193                     or ether['IPv6'].dst != ether_repl['IPv6'].dst:
194                 raise RuntimeError("IP address mismatch in mirrored "
195                                    "Tx IPv6 packet.")
196             if ether[2].name != ether_repl[2].name:
197                 raise RuntimeError("ICMPv6 message type mismatch "
198                                    "in mirrored Tx packet.")
199     print("Mirrored Tx packet check OK.\n")
200     sys.exit(0)
201
202
203 if __name__ == "__main__":
204     main()