41bc0964e6a44693acd47e4650248950cf719e55
[csit.git] / GPL / traffic_scripts / lisp / lispgpe_check.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose
18 # Apache 2.
19 #
20 # Unless required by applicable law or agreed to in writing, software
21 # distributed under the License is distributed on an "AS IS" BASIS,
22 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 # See the License for the specific language governing permissions and
24 # limitations under the License.
25
26 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
27 a LISPGPE-encapsulated packet on the other interface and verifies received
28 packet.
29 """
30
31 import sys
32 import ipaddress
33
34 from scapy.all import bind_layers, Packet
35 from scapy.fields import FlagsField, BitField, XBitField, IntField
36 from scapy.layers.inet import ICMP, IP, UDP
37 from scapy.layers.inet6 import ICMPv6EchoRequest
38 from scapy.layers.inet6 import IPv6
39 from scapy.layers.l2 import Ether
40 from scapy.packet import Raw
41
42 from ..PacketVerifier import RxQueue, TxQueue
43 from ..TrafficScriptArg import TrafficScriptArg
44
45
46 class LispGPEHeader(Packet):
47     """Scapy header for the Lisp GPE Layer."""
48
49     name = "Lisp GPE Header"
50     fields_desc = [
51         FlagsField(
52             u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"P", u"R", u"O"]
53         ),
54         BitField(u"version", 0, size=2),
55         BitField(u"reserved", 0, size=14),
56         XBitField(u"next_protocol", 0, size=8),
57         IntField(u"instance_id/locator_status_bits", 0)
58     ]
59
60     def guess_payload_class(self, payload):
61         protocol = {
62             0x1: LispGPEInnerIP,
63             0x2: LispGPEInnerIPv6,
64             0x3: LispGPEInnerEther,
65             0x4: LispGPEInnerNSH
66         }
67         return protocol[self.next_protocol]
68
69
70 class LispGPEInnerIP(IP):
71     """Scapy inner LISP GPE layer for IPv4-in-IPv4."""
72
73     name = u"Lisp GPE Inner Layer - IPv4"
74
75
76 class LispGPEInnerIPv6(IPv6):
77     """Scapy inner LISP GPE layer for IPv6-in-IPv6."""
78
79     name = u"Lisp GPE Inner Layer - IPv6"
80
81
82 class LispGPEInnerEther(Ether):
83     """Scapy inner LISP GPE layer for Lisp-L2."""
84
85     name = u"Lisp GPE Inner Layer - Ethernet"
86
87
88 class LispGPEInnerNSH(Packet):
89     """Scapy inner LISP GPE layer for Lisp-NSH.
90
91     Parsing not implemented.
92     """
93
94
95 def valid_ipv4(ip):
96     try:
97         ipaddress.IPv4Address(ip)
98         return True
99     except (AttributeError, ipaddress.AddressValueError):
100         return False
101
102
103 def valid_ipv6(ip):
104     try:
105         ipaddress.IPv6Address(ip)
106         return True
107     except (AttributeError, ipaddress.AddressValueError):
108         return False
109
110
111 def main():
112     """Send IP ICMP packet from one traffic generator interface to the other.
113
114     :raises RuntimeError: If the received packet is not correct."""
115
116     args = TrafficScriptArg(
117         [
118             u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
119             u"dut_if2_mac", u"src_rloc", u"dst_rloc"
120         ],
121         [u"ot_mode"]
122     )
123
124     tx_src_mac = args.get_arg(u"tg_src_mac")
125     tx_dst_mac = args.get_arg(u"dut_if1_mac")
126     rx_dst_mac = args.get_arg(u"tg_dst_mac")
127     rx_src_mac = args.get_arg(u"dut_if2_mac")
128     src_ip = args.get_arg(u"src_ip")
129     dst_ip = args.get_arg(u"dst_ip")
130     src_rloc = args.get_arg(u"src_rloc")
131     dst_rloc = args.get_arg(u"dst_rloc")
132     tx_if = args.get_arg(u"tx_if")
133     rx_if = args.get_arg(u"rx_if")
134     ot_mode = args.get_arg(u"ot_mode")
135
136     rxq = RxQueue(rx_if)
137     txq = TxQueue(tx_if)
138
139     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
140
141     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
142         pkt_raw /= IP(src=src_ip, dst=dst_ip)
143         pkt_raw /= ICMP()
144         ip_format = IP
145     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
146         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
147         pkt_raw /= ICMPv6EchoRequest()
148         ip_format = IPv6
149     else:
150         raise ValueError(u"IP not in correct format")
151
152     bind_layers(UDP, LispGPEHeader, dport=4341)
153
154     pkt_raw /= Raw()
155     sent_packets = list()
156     sent_packets.append(pkt_raw)
157     txq.send(pkt_raw)
158
159     if tx_if == rx_if:
160         ether = rxq.recv(2, ignore=sent_packets)
161     else:
162         ether = rxq.recv(2)
163
164     if ether is None:
165         raise RuntimeError(u"ICMP echo Rx timeout")
166
167     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
168         print(u"MAC addresses match.")
169     else:
170         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
171
172     ip = ether.payload
173
174     if ot_mode == u"6to4":
175         if not isinstance(ip, IP):
176             raise RuntimeError(f"Not an IP packet received {ip!r}")
177     elif ot_mode == u"4to6":
178         if not isinstance(ip, IPv6):
179             raise RuntimeError(f"Not an IP packet received {ip!r}")
180     elif not isinstance(ip, ip_format):
181             raise RuntimeError(f"Not an IP packet received {ip!r}")
182
183     lisp = ether.getlayer(LispGPEHeader).underlayer
184     if not lisp:
185         raise RuntimeError(u"Lisp layer not present or parsing failed.")
186
187     # Compare data from packets
188     if src_ip == lisp.src:
189         print(u"Source IP matches source EID.")
190     else:
191         raise RuntimeError(
192             f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
193         )
194
195     if dst_ip == lisp.dst:
196         print(u"Destination IP matches destination EID.")
197     else:
198         raise RuntimeError(
199             f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
200         )
201
202     if src_rloc == ip.src:
203         print(u"Source RLOC matches configuration.")
204     else:
205         raise RuntimeError(
206             f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
207         )
208
209     if dst_rloc == ip.dst:
210         print(u"Destination RLOC matches configuration.")
211     else:
212         raise RuntimeError(
213             f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
214         )
215
216     sys.exit(0)
217
218
219 if __name__ == u"__main__":
220     main()