Switch licenses in GPL directory
[csit.git] / GPL / traffic_scripts / lisp / lisp_check.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2020 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose Apache 2.
18 #
19 # Unless required by applicable law or agreed to in writing, software
20 # distributed under the License is distributed on an "AS IS" BASIS,
21 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 # See the License for the specific language governing permissions and
23 # limitations under the License.
24
25 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
26 a LISP-encapsulated packet on the other interface and verifies received packet.
27 """
28
29 import sys
30 import ipaddress
31
32 from scapy.all import bind_layers, Packet
33 from scapy.fields import FlagsField, BitField, IntField
34 from scapy.layers.inet import ICMP, IP, UDP
35 from scapy.layers.inet6 import ICMPv6EchoRequest
36 from scapy.layers.inet6 import IPv6
37 from scapy.layers.l2 import Ether
38 from scapy.packet import Raw
39
40 from ..PacketVerifier import RxQueue, TxQueue
41 from ..TrafficScriptArg import TrafficScriptArg
42
43
44 class LispHeader(Packet):
45     """Scapy header for the LISP Layer."""
46
47     name = u"Lisp Header"
48     fields_desc = [
49         FlagsField(
50             u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"", u"", u""]
51         ),
52         BitField(u"nonce/map_version", 0, size=24),
53         IntField(u"instance_id/locator_status_bits", 0)]
54
55
56 class LispInnerIP(IP):
57     """Scapy inner LISP layer for IPv4-in-IPv4."""
58
59     name = u"Lisp Inner Layer - IPv4"
60
61
62 class LispInnerIPv6(IPv6):
63     """Scapy inner LISP layer for IPv6-in-IPv6."""
64
65     name = u"Lisp Inner Layer - IPv6"
66
67
68 def valid_ipv4(ip):
69     try:
70         ipaddress.IPv4Address(ip)
71         return True
72     except (AttributeError, ipaddress.AddressValueError):
73         return False
74
75
76 def valid_ipv6(ip):
77     try:
78         ipaddress.IPv6Address(ip)
79         return True
80     except (AttributeError, ipaddress.AddressValueError):
81         return False
82
83
84 def main():
85     """Send IP ICMP packet from one traffic generator interface to the other.
86
87     :raises RuntimeError: If the received packet is not correct."""
88
89     args = TrafficScriptArg(
90         [
91             u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
92             u"dut_if2_mac", u"src_rloc", u"dst_rloc"
93         ],
94         [u"ot_mode"]
95     )
96
97     tx_src_mac = args.get_arg(u"tg_src_mac")
98     tx_dst_mac = args.get_arg(u"dut_if1_mac")
99     rx_dst_mac = args.get_arg(u"tg_dst_mac")
100     rx_src_mac = args.get_arg(u"dut_if2_mac")
101     src_ip = args.get_arg(u"src_ip")
102     dst_ip = args.get_arg(u"dst_ip")
103     src_rloc = args.get_arg(u"src_rloc")
104     dst_rloc = args.get_arg(u"dst_rloc")
105     tx_if = args.get_arg(u"tx_if")
106     rx_if = args.get_arg(u"rx_if")
107     ot_mode = args.get_arg(u"ot_mode")
108
109     rxq = RxQueue(rx_if)
110     txq = TxQueue(tx_if)
111
112     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
113
114     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
115         pkt_raw /= IP(src=src_ip, dst=dst_ip)
116         pkt_raw /= ICMP()
117         ip_format = IP
118         lisp_layer = LispInnerIP
119     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
120         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
121         pkt_raw /= ICMPv6EchoRequest()
122         ip_format = IPv6
123         lisp_layer = LispInnerIPv6
124     else:
125         raise ValueError(u"IP not in correct format")
126
127     bind_layers(UDP, LispHeader, dport=4341)
128     bind_layers(LispHeader, lisp_layer)
129
130     pkt_raw /= Raw()
131     sent_packets = list()
132     sent_packets.append(pkt_raw)
133     txq.send(pkt_raw)
134
135     if tx_if == rx_if:
136         ether = rxq.recv(2, ignore=sent_packets)
137     else:
138         ether = rxq.recv(2)
139
140     if ether is None:
141         raise RuntimeError(u"ICMP echo Rx timeout")
142
143     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
144         print(u"MAC addresses match.")
145     else:
146         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
147
148     ip = ether.payload
149
150     if ot_mode == u"6to4":
151         if not isinstance(ip, IP):
152             raise RuntimeError(f"Not an IP packet received {ip!r}")
153     elif ot_mode == u"4to6":
154         if not isinstance(ip, IPv6):
155             raise RuntimeError(f"Not an IP packet received {ip!r}")
156     elif not isinstance(ip, ip_format):
157             raise RuntimeError(f"Not an IP packet received {ip!r}")
158
159     lisp = ether.getlayer(lisp_layer)
160     if not lisp:
161         raise RuntimeError("Lisp layer not present or parsing failed.")
162
163     # Compare data from packets
164     if src_ip == lisp.src:
165         print(u"Source IP matches source EID.")
166     else:
167         raise RuntimeError(
168             f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
169         )
170
171     if dst_ip == lisp.dst:
172         print(u"Destination IP matches destination EID.")
173     else:
174         raise RuntimeError(
175             f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
176         )
177
178     if src_rloc == ip.src:
179         print(u"Source RLOC matches configuration.")
180     else:
181         raise RuntimeError(
182             f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
183         )
184
185     if dst_rloc == ip.dst:
186         print(u"Destination RLOC matches configuration.")
187     else:
188         raise RuntimeError(
189             f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
190         )
191
192     sys.exit(0)
193
194
195 if __name__ == u"__main__":
196     main()