Report: Add TSA plots
[csit.git] / resources / traffic_scripts / span_check.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface
16 to the other. Source and destination IP addresses and source and destination
17 MAC addresses are checked in received packet.
18 """
19
20 import sys
21 import ipaddress
22
23 from scapy.layers.inet import IP, ICMP, ARP
24 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
25 from scapy.layers.l2 import Ether
26
27 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad
28 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
29
30
31 def valid_ipv4(address):
32     """Check if IP address has the correct IPv4 address format.
33
34     :param address: IP address.
35     :type address: str
36     :return: True in case of correct IPv4 address format,
37     otherwise return false.
38     :rtype: bool
39     """
40     try:
41         ipaddress.IPv4Address(unicode(address))
42         return True
43     except (AttributeError, ipaddress.AddressValueError):
44         return False
45
46
47 def valid_ipv6(address):
48     """Check if IP address has the correct IPv6 address format.
49
50     :param address: IP address.
51     :type address: str
52     :return: True in case of correct IPv6 address format,
53     otherwise return false.
54     :rtype: bool
55     """
56     try:
57         ipaddress.IPv6Address(unicode(address))
58         return True
59     except (AttributeError, ipaddress.AddressValueError):
60         return False
61
62
63 def main():
64     """Send a simple L2 or ICMP packet from one TG interface to DUT, then
65     receive a copy of the packet on the second TG interface, and a copy of
66     the ICMP reply."""
67     args = TrafficScriptArg(['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
68                              'ptype'])
69
70     src_mac = args.get_arg('tg_src_mac')
71     dst_mac = args.get_arg('dut_if1_mac')
72     src_ip = args.get_arg('src_ip')
73     dst_ip = args.get_arg('dst_ip')
74     tx_if = args.get_arg('tx_if')
75     rx_if = args.get_arg('rx_if')
76     ptype = args.get_arg('ptype')
77
78     rxq_mirrored = RxQueue(rx_if)
79     rxq_tx = RxQueue(tx_if)
80     txq = TxQueue(tx_if)
81
82     sent = []
83
84     if ptype == "ARP":
85         pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
86                    ARP(hwsrc=src_mac, hwdst="00:00:00:00:00:00",
87                        psrc=src_ip, pdst=dst_ip, op="who-has"))
88     elif ptype == "ICMP":
89         if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
90             pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
91                        IP(src=src_ip, dst=dst_ip) /
92                        ICMP(type="echo-request"))
93         else:
94             raise ValueError("IP addresses not in correct format")
95     elif ptype == "ICMPv6":
96         if valid_ipv6(src_ip) and valid_ipv6(dst_ip):
97             pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
98                        IPv6(src=src_ip, dst=dst_ip) /
99                        ICMPv6EchoRequest())
100         else:
101             raise ValueError("IPv6 addresses not in correct format")
102     else:
103         raise RuntimeError("Unexpected payload type.")
104
105     txq.send(pkt_raw)
106     sent.append(auto_pad(pkt_raw))
107
108     # Receive copy of Rx packet.
109     while True:
110         ether = rxq_mirrored.recv(2)
111         if ether is None:
112             raise RuntimeError("Rx timeout of mirrored Rx packet")
113
114         if ether.haslayer(ICMPv6ND_NS):
115             # read another packet in the queue if the current one is ICMPv6ND_NS
116             continue
117         else:
118             # otherwise process the current packet
119             break
120
121     pkt = auto_pad(pkt_raw)
122     if str(ether) != str(pkt):
123         print("Mirrored Rx packet doesn't match the original Rx packet.")
124         if ether.src != src_mac or ether.dst != dst_mac:
125             raise RuntimeError("MAC mismatch in mirrored Rx packet.")
126         if ptype == "ARP":
127             if not ether.haslayer(ARP):
128                 raise RuntimeError("Mirrored Rx packet is not an ARP packet.")
129             if ether['ARP'].op != 1:  # 1=who-has
130                 raise RuntimeError("Mirrored Rx packet is not an ARP request.")
131             if ether['ARP'].hwsrc != src_mac or ether['ARP'].hwdst != dst_mac:
132                 raise RuntimeError("MAC mismatch in mirrored Rx ARP packet.")
133             if ether['ARP'].psrc != src_ip or ether['ARP'].pdst != dst_ip:
134                 raise RuntimeError("IP address mismatch in mirrored "
135                                    "Rx ARP packet.")
136         elif ptype == "ICMP":
137             if not ether.haslayer(IP):
138                 raise RuntimeError("Mirrored Rx packet is not an IPv4 packet.")
139             if ether['IP'].src != src_ip or ether['IP'].dst != dst_ip:
140                 raise RuntimeError("IP address mismatch in mirrored "
141                                    "Rx IPv4 packet.")
142             if not ether.haslayer(ICMP):
143                 raise RuntimeError("Mirrored Rx packet is not an ICMP packet.")
144             if ether['ICMP'].type != 8:  # 8=echo-request
145                 raise RuntimeError("Mirrored Rx packet is not an ICMP "
146                                    "echo request.")
147         elif ptype == "ICMPv6":
148             if not ether.haslayer(IPv6):
149                 raise RuntimeError("Mirrored Rx packet is not an IPv6 packet.")
150             if ether['IPv6'].src != src_ip or ether['IPv6'].dst != dst_ip:
151                 raise RuntimeError("IP address mismatch in mirrored "
152                                    "Rx IPv6 packet.")
153             if not ether.haslayer(ICMPv6EchoRequest):
154                 raise RuntimeError("Mirrored Rx packet is not an ICMPv6 "
155                                    "echo request.")
156     print("Mirrored Rx packet check OK.\n")
157
158     # Receive reply on TG Tx port.
159     ether_repl = rxq_tx.recv(2, sent)
160
161     if ether_repl is None:
162         raise RuntimeError("Reply not received on TG Tx port.")
163
164     print("Reply received on TG Tx port.\n")
165
166     # Receive copy of Tx packet.
167     ether = rxq_mirrored.recv(2)
168     if ether is None:
169         raise RuntimeError("Rx timeout of mirrored Tx packet")
170
171     if str(ether) != str(ether_repl):
172         print("Mirrored Tx packet doesn't match the received Tx packet.")
173         if ether.src != ether_repl.src or ether.dst != ether_repl.dst:
174             raise RuntimeError("MAC mismatch in mirrored Tx packet.")
175         if ptype == "ARP":
176             if not ether.haslayer(ARP):
177                 raise RuntimeError("Mirrored Tx packet is not an ARP packet.")
178             if ether['ARP'].op != ether_repl['ARP'].op:  # 2=is_at
179                 raise RuntimeError("ARP operational code mismatch "
180                                    "in mirrored Tx packet.")
181             if ether['ARP'].hwsrc != ether_repl['ARP'].hwsrc\
182                     or ether['ARP'].hwdst != ether_repl['ARP'].hwdst:
183                 raise RuntimeError("MAC mismatch in mirrored Tx ARP packet.")
184             if ether['ARP'].psrc != ether_repl['ARP'].psrc\
185                     or ether['ARP'].pdst != ether_repl['ARP'].pdst:
186                 raise RuntimeError("IP address mismatch in mirrored "
187                                    "Tx ARP packet.")
188         elif ptype == "ICMP":
189             if not ether.haslayer(IP):
190                 raise RuntimeError("Mirrored Tx packet is not an IPv4 packet.")
191             if ether['IP'].src != ether_repl['IP'].src\
192                     or ether['IP'].dst != ether_repl['IP'].dst:
193                 raise RuntimeError("IP address mismatch in mirrored "
194                                    "Tx IPv4 packet.")
195             if not ether.haslayer(ICMP):
196                 raise RuntimeError("Mirrored Tx packet is not an ICMP packet.")
197             if ether['ICMP'].type != ether_repl['ICMP'].type:  # 0=echo-reply
198                 raise RuntimeError("ICMP packet type mismatch "
199                                    "in mirrored Tx packet.")
200         elif ptype == "ICMPv6":
201             if not ether.haslayer(IPv6):
202                 raise RuntimeError("Mirrored Tx packet is not an IPv6 packet.")
203             if ether['IPv6'].src != ether_repl['IPv6'].src\
204                     or ether['IPv6'].dst != ether_repl['IPv6'].dst:
205                 raise RuntimeError("IP address mismatch in mirrored "
206                                    "Tx IPv6 packet.")
207             if ether[2].name != ether_repl[2].name:
208                 raise RuntimeError("ICMPv6 message type mismatch "
209                                    "in mirrored Tx packet.")
210     print("Mirrored Tx packet check OK.\n")
211     sys.exit(0)
212
213
214 if __name__ == "__main__":
215     main()