FIX: Pylint reduce
[csit.git] / GPL / traffic_scripts / send_ip_check_headers.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose
18 # Apache 2.
19 #
20 # Unless required by applicable law or agreed to in writing, software
21 # distributed under the License is distributed on an "AS IS" BASIS,
22 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 # See the License for the specific language governing permissions and
24 # limitations under the License.
25
26 """Traffic script that sends an IP IPv4/IPv6 packet from one interface
27 to the other. Source and destination IP addresses and source and destination
28 MAC addresses are checked in received packet.
29 """
30
31 import sys
32
33 import ipaddress
34
35 from robot.api import logger
36 from scapy.layers.inet import IP
37 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6MLReport2
38 from scapy.layers.l2 import Ether, Dot1Q
39 from scapy.packet import Raw
40
41 from .PacketVerifier import RxQueue, TxQueue
42 from .TrafficScriptArg import TrafficScriptArg
43
44
45 def valid_ipv4(ip_address):
46     """Check IPv4 address.
47
48     :param ip_address: IPv4 address to check.
49     :type ip_address: str
50     :returns: True if IP address is correct.
51     :rtype: bool
52     :raises AttributeError, AddressValueError: If IP address is not valid.
53     """
54     try:
55         ipaddress.IPv4Address(ip_address)
56         return True
57     except (AttributeError, ipaddress.AddressValueError):
58         return False
59
60
61 def valid_ipv6(ip_address):
62     """Check IPv6 address.
63
64     :param ip_address: IPv6 address to check.
65     :type ip_address: str
66     :returns: True if IP address is correct.
67     :rtype: bool
68     :raises AttributeError, AddressValueError: If IP address is not valid.
69     """
70     try:
71         ipaddress.IPv6Address(ip_address)
72         return True
73     except (AttributeError, ipaddress.AddressValueError):
74         return False
75
76
77 def main():
78     """Send IP/IPv6 packet from one traffic generator interface to the other."""
79     args = TrafficScriptArg(
80         [
81             u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
82             u"dut_if2_mac"
83         ],
84         [
85             u"encaps_tx", u"vlan_tx", u"vlan_outer_tx", u"encaps_rx",
86             u"vlan_rx", u"vlan_outer_rx"
87         ]
88     )
89
90     tx_src_mac = args.get_arg(u"tg_src_mac")
91     tx_dst_mac = args.get_arg(u"dut_if1_mac")
92     rx_dst_mac = args.get_arg(u"tg_dst_mac")
93     rx_src_mac = args.get_arg(u"dut_if2_mac")
94     src_ip = args.get_arg(u"src_ip")
95     dst_ip = args.get_arg(u"dst_ip")
96     tx_if = args.get_arg(u"tx_if")
97     rx_if = args.get_arg(u"rx_if")
98
99     encaps_tx = args.get_arg(u"encaps_tx")
100     vlan_tx = args.get_arg(u"vlan_tx")
101     vlan_outer_tx = args.get_arg(u"vlan_outer_tx")
102     encaps_rx = args.get_arg(u"encaps_rx")
103     vlan_rx = args.get_arg(u"vlan_rx")
104     vlan_outer_rx = args.get_arg(u"vlan_outer_rx")
105
106     rxq = RxQueue(rx_if)
107     txq = TxQueue(tx_if)
108
109     sent_packets = list()
110     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
111
112     if encaps_tx == u"Dot1q":
113         pkt_raw /= Dot1Q(vlan=int(vlan_tx))
114     elif encaps_tx == u"Dot1ad":
115         pkt_raw.type = 0x88a8
116         pkt_raw /= Dot1Q(vlan=vlan_outer_tx)
117         pkt_raw /= Dot1Q(vlan=vlan_tx)
118
119     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
120         pkt_raw /= IP(src=src_ip, dst=dst_ip, proto=61)
121         ip_format = IP
122     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
123         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
124         ip_format = IPv6
125     else:
126         raise ValueError(u"IP not in correct format")
127
128     pkt_raw /= Raw()
129     sent_packets.append(pkt_raw)
130     txq.send(pkt_raw)
131
132     while True:
133         if tx_if == rx_if:
134             ether = rxq.recv(2, ignore=sent_packets)
135         else:
136             ether = rxq.recv(2)
137
138         if ether is None:
139             raise RuntimeError(u"IP packet Rx timeout")
140
141         if ether.haslayer(ICMPv6ND_NS):
142             # read another packet in the queue if the current one is ICMPv6ND_NS
143             continue
144         elif ether.haslayer(ICMPv6MLReport2):
145             # read another packet in the queue if the current one is
146             # ICMPv6MLReport2
147             continue
148         else:
149             # otherwise process the current packet
150             break
151
152     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
153         logger.trace(u"MAC matched")
154     else:
155         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
156
157     if encaps_rx == u"Dot1q":
158         if ether[Dot1Q].vlan == int(vlan_rx):
159             logger.trace(u"VLAN matched")
160         else:
161             raise RuntimeError(
162                 f"Ethernet frame with wrong VLAN tag "
163                 f"({ether[Dot1Q].vlan}-received, "
164                 f"{vlan_rx}-expected):\n{ether!r}"
165             )
166         ip = ether[Dot1Q].payload
167     elif encaps_rx == u"Dot1ad":
168         raise NotImplementedError()
169     else:
170         ip = ether.payload
171
172     if not isinstance(ip, ip_format):
173         raise RuntimeError(f"Not an IP packet received {ip!r}")
174
175     # Compare data from packets
176     if src_ip == ip.src:
177         logger.trace(u"Src IP matched")
178     else:
179         raise RuntimeError(
180             f"Matching Src IP unsuccessful: {src_ip} != {ip.src}"
181         )
182
183     if dst_ip == ip.dst:
184         logger.trace(u"Dst IP matched")
185     else:
186         raise RuntimeError(
187             f"Matching Dst IP unsuccessful: {dst_ip} != {ip.dst}"
188         )
189
190     sys.exit(0)
191
192
193 if __name__ == u"__main__":
194     main()