0ecebbe3984c577fd16d741c51964517693ce846
[csit.git] / GPL / traffic_scripts / send_ip_check_headers.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose
18 # Apache 2.
19 #
20 # Unless required by applicable law or agreed to in writing, software
21 # distributed under the License is distributed on an "AS IS" BASIS,
22 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 # See the License for the specific language governing permissions and
24 # limitations under the License.
25
26 """Traffic script that sends an IP IPv4/IPv6 packet from one interface
27 to the other. Source and destination IP addresses and source and destination
28 MAC addresses are checked in received packet.
29 """
30
31 import sys
32
33 import ipaddress
34
35 from robot.api import logger
36 from scapy.layers.inet import IP
37 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6MLReport2, ICMPv6ND_RA
38 from scapy.layers.l2 import Ether, Dot1Q
39 from scapy.packet import Raw
40
41 from .PacketVerifier import RxQueue, TxQueue
42 from .TrafficScriptArg import TrafficScriptArg
43
44
45 def valid_ipv4(ip_address):
46     """Check IPv4 address.
47
48     :param ip_address: IPv4 address to check.
49     :type ip_address: str
50     :returns: True if IP address is correct.
51     :rtype: bool
52     :raises AttributeError, AddressValueError: If IP address is not valid.
53     """
54     try:
55         ipaddress.IPv4Address(ip_address)
56         return True
57     except (AttributeError, ipaddress.AddressValueError):
58         return False
59
60
61 def valid_ipv6(ip_address):
62     """Check IPv6 address.
63
64     :param ip_address: IPv6 address to check.
65     :type ip_address: str
66     :returns: True if IP address is correct.
67     :rtype: bool
68     :raises AttributeError, AddressValueError: If IP address is not valid.
69     """
70     try:
71         ipaddress.IPv6Address(ip_address)
72         return True
73     except (AttributeError, ipaddress.AddressValueError):
74         return False
75
76
77 def main():
78     """Send IP/IPv6 packet from one traffic generator interface to the other."""
79     args = TrafficScriptArg(
80         [
81             u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
82             u"dut_if2_mac"
83         ],
84         [
85             u"encaps_tx", u"vlan_tx", u"vlan_outer_tx", u"encaps_rx",
86             u"vlan_rx", u"vlan_outer_rx"
87         ]
88     )
89
90     tx_src_mac = args.get_arg(u"tg_src_mac")
91     tx_dst_mac = args.get_arg(u"dut_if1_mac")
92     rx_dst_mac = args.get_arg(u"tg_dst_mac")
93     rx_src_mac = args.get_arg(u"dut_if2_mac")
94     src_ip = args.get_arg(u"src_ip")
95     dst_ip = args.get_arg(u"dst_ip")
96     tx_if = args.get_arg(u"tx_if")
97     rx_if = args.get_arg(u"rx_if")
98
99     encaps_tx = args.get_arg(u"encaps_tx")
100     vlan_tx = args.get_arg(u"vlan_tx")
101     vlan_outer_tx = args.get_arg(u"vlan_outer_tx")
102     encaps_rx = args.get_arg(u"encaps_rx")
103     vlan_rx = args.get_arg(u"vlan_rx")
104     vlan_outer_rx = args.get_arg(u"vlan_outer_rx")
105
106     rxq = RxQueue(rx_if)
107     txq = TxQueue(tx_if)
108
109     sent_packets = list()
110     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
111
112     if encaps_tx == u"Dot1q":
113         pkt_raw /= Dot1Q(vlan=int(vlan_tx))
114     elif encaps_tx == u"Dot1ad":
115         pkt_raw.type = 0x88a8
116         pkt_raw /= Dot1Q(vlan=vlan_outer_tx)
117         pkt_raw /= Dot1Q(vlan=vlan_tx)
118
119     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
120         pkt_raw /= IP(src=src_ip, dst=dst_ip, proto=61)
121         ip_format = IP
122     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
123         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
124         ip_format = IPv6
125     else:
126         raise ValueError(u"IP not in correct format")
127
128     pkt_raw /= Raw()
129     sent_packets.append(pkt_raw)
130     txq.send(pkt_raw)
131
132     while True:
133         if tx_if == rx_if:
134             ether = rxq.recv(2, ignore=sent_packets)
135         else:
136             ether = rxq.recv(2)
137
138         if ether is None:
139             raise RuntimeError(u"IP packet Rx timeout")
140
141         if ether.haslayer(ICMPv6ND_NS):
142             # read another packet in the queue if the current one is ICMPv6ND_NS
143             continue
144         elif ether.haslayer(ICMPv6MLReport2):
145             # read another packet in the queue if the current one is
146             # ICMPv6MLReport2
147             continue
148         elif ether.haslayer(ICMPv6ND_RA):
149             # read another packet in the queue if the current one is
150             # ICMPv6ND_RA
151             continue
152
153         break
154
155     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
156         logger.trace(u"MAC matched")
157     else:
158         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
159
160     if encaps_rx == u"Dot1q":
161         if ether[Dot1Q].vlan == int(vlan_rx):
162             logger.trace(u"VLAN matched")
163         else:
164             raise RuntimeError(
165                 f"Ethernet frame with wrong VLAN tag "
166                 f"({ether[Dot1Q].vlan}-received, "
167                 f"{vlan_rx}-expected):\n{ether!r}"
168             )
169         ip = ether[Dot1Q].payload
170     elif encaps_rx == u"Dot1ad":
171         raise NotImplementedError()
172     else:
173         ip = ether.payload
174
175     if not isinstance(ip, ip_format):
176         raise RuntimeError(f"Not an IP packet received {ip!r}")
177
178     # Compare data from packets
179     if src_ip == ip.src:
180         logger.trace(u"Src IP matched")
181     else:
182         raise RuntimeError(
183             f"Matching Src IP unsuccessful: {src_ip} != {ip.src}"
184         )
185
186     if dst_ip == ip.dst:
187         logger.trace(u"Dst IP matched")
188     else:
189         raise RuntimeError(
190             f"Matching Dst IP unsuccessful: {dst_ip} != {ip.dst}"
191         )
192
193     sys.exit(0)
194
195
196 if __name__ == u"__main__":
197     main()