License: Wrap GPL block to 80 characters
[csit.git] / GPL / traffic_scripts / lisp / lisp_check.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose
18 # Apache 2.
19 #
20 # Unless required by applicable law or agreed to in writing, software
21 # distributed under the License is distributed on an "AS IS" BASIS,
22 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 # See the License for the specific language governing permissions and
24 # limitations under the License.
25
26 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
27 a LISP-encapsulated packet on the other interface and verifies received packet.
28 """
29
30 import sys
31 import ipaddress
32
33 from scapy.all import bind_layers, Packet
34 from scapy.fields import FlagsField, BitField, IntField
35 from scapy.layers.inet import ICMP, IP, UDP
36 from scapy.layers.inet6 import ICMPv6EchoRequest
37 from scapy.layers.inet6 import IPv6
38 from scapy.layers.l2 import Ether
39 from scapy.packet import Raw
40
41 from ..PacketVerifier import RxQueue, TxQueue
42 from ..TrafficScriptArg import TrafficScriptArg
43
44
45 class LispHeader(Packet):
46     """Scapy header for the LISP Layer."""
47
48     name = u"Lisp Header"
49     fields_desc = [
50         FlagsField(
51             u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"", u"", u""]
52         ),
53         BitField(u"nonce/map_version", 0, size=24),
54         IntField(u"instance_id/locator_status_bits", 0)]
55
56
57 class LispInnerIP(IP):
58     """Scapy inner LISP layer for IPv4-in-IPv4."""
59
60     name = u"Lisp Inner Layer - IPv4"
61
62
63 class LispInnerIPv6(IPv6):
64     """Scapy inner LISP layer for IPv6-in-IPv6."""
65
66     name = u"Lisp Inner Layer - IPv6"
67
68
69 def valid_ipv4(ip):
70     try:
71         ipaddress.IPv4Address(ip)
72         return True
73     except (AttributeError, ipaddress.AddressValueError):
74         return False
75
76
77 def valid_ipv6(ip):
78     try:
79         ipaddress.IPv6Address(ip)
80         return True
81     except (AttributeError, ipaddress.AddressValueError):
82         return False
83
84
85 def main():
86     """Send IP ICMP packet from one traffic generator interface to the other.
87
88     :raises RuntimeError: If the received packet is not correct."""
89
90     args = TrafficScriptArg(
91         [
92             u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
93             u"dut_if2_mac", u"src_rloc", u"dst_rloc"
94         ],
95         [u"ot_mode"]
96     )
97
98     tx_src_mac = args.get_arg(u"tg_src_mac")
99     tx_dst_mac = args.get_arg(u"dut_if1_mac")
100     rx_dst_mac = args.get_arg(u"tg_dst_mac")
101     rx_src_mac = args.get_arg(u"dut_if2_mac")
102     src_ip = args.get_arg(u"src_ip")
103     dst_ip = args.get_arg(u"dst_ip")
104     src_rloc = args.get_arg(u"src_rloc")
105     dst_rloc = args.get_arg(u"dst_rloc")
106     tx_if = args.get_arg(u"tx_if")
107     rx_if = args.get_arg(u"rx_if")
108     ot_mode = args.get_arg(u"ot_mode")
109
110     rxq = RxQueue(rx_if)
111     txq = TxQueue(tx_if)
112
113     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
114
115     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
116         pkt_raw /= IP(src=src_ip, dst=dst_ip)
117         pkt_raw /= ICMP()
118         ip_format = IP
119         lisp_layer = LispInnerIP
120     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
121         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
122         pkt_raw /= ICMPv6EchoRequest()
123         ip_format = IPv6
124         lisp_layer = LispInnerIPv6
125     else:
126         raise ValueError(u"IP not in correct format")
127
128     bind_layers(UDP, LispHeader, dport=4341)
129     bind_layers(LispHeader, lisp_layer)
130
131     pkt_raw /= Raw()
132     sent_packets = list()
133     sent_packets.append(pkt_raw)
134     txq.send(pkt_raw)
135
136     if tx_if == rx_if:
137         ether = rxq.recv(2, ignore=sent_packets)
138     else:
139         ether = rxq.recv(2)
140
141     if ether is None:
142         raise RuntimeError(u"ICMP echo Rx timeout")
143
144     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
145         print(u"MAC addresses match.")
146     else:
147         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
148
149     ip = ether.payload
150
151     if ot_mode == u"6to4":
152         if not isinstance(ip, IP):
153             raise RuntimeError(f"Not an IP packet received {ip!r}")
154     elif ot_mode == u"4to6":
155         if not isinstance(ip, IPv6):
156             raise RuntimeError(f"Not an IP packet received {ip!r}")
157     elif not isinstance(ip, ip_format):
158             raise RuntimeError(f"Not an IP packet received {ip!r}")
159
160     lisp = ether.getlayer(lisp_layer)
161     if not lisp:
162         raise RuntimeError("Lisp layer not present or parsing failed.")
163
164     # Compare data from packets
165     if src_ip == lisp.src:
166         print(u"Source IP matches source EID.")
167     else:
168         raise RuntimeError(
169             f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
170         )
171
172     if dst_ip == lisp.dst:
173         print(u"Destination IP matches destination EID.")
174     else:
175         raise RuntimeError(
176             f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
177         )
178
179     if src_rloc == ip.src:
180         print(u"Source RLOC matches configuration.")
181     else:
182         raise RuntimeError(
183             f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
184         )
185
186     if dst_rloc == ip.dst:
187         print(u"Destination RLOC matches configuration.")
188     else:
189         raise RuntimeError(
190             f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
191         )
192
193     sys.exit(0)
194
195
196 if __name__ == u"__main__":
197     main()