068f73652f45b961edfc009744103df839d59f56
[csit.git] / GPL / traffic_scripts / lisp / lispgpe_check.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2020 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose Apache 2.
18 #
19 # Unless required by applicable law or agreed to in writing, software
20 # distributed under the License is distributed on an "AS IS" BASIS,
21 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 # See the License for the specific language governing permissions and
23 # limitations under the License.
24
25 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
26 a LISPGPE-encapsulated packet on the other interface and verifies received
27 packet.
28 """
29
30 import sys
31 import ipaddress
32
33 from scapy.all import bind_layers, Packet
34 from scapy.fields import FlagsField, BitField, XBitField, IntField
35 from scapy.layers.inet import ICMP, IP, UDP
36 from scapy.layers.inet6 import ICMPv6EchoRequest
37 from scapy.layers.inet6 import IPv6
38 from scapy.layers.l2 import Ether
39 from scapy.packet import Raw
40
41 from ..PacketVerifier import RxQueue, TxQueue
42 from ..TrafficScriptArg import TrafficScriptArg
43
44
45 class LispGPEHeader(Packet):
46     """Scapy header for the Lisp GPE Layer."""
47
48     name = "Lisp GPE Header"
49     fields_desc = [
50         FlagsField(
51             u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"P", u"R", u"O"]
52         ),
53         BitField(u"version", 0, size=2),
54         BitField(u"reserved", 0, size=14),
55         XBitField(u"next_protocol", 0, size=8),
56         IntField(u"instance_id/locator_status_bits", 0)
57     ]
58
59     def guess_payload_class(self, payload):
60         protocol = {
61             0x1: LispGPEInnerIP,
62             0x2: LispGPEInnerIPv6,
63             0x3: LispGPEInnerEther,
64             0x4: LispGPEInnerNSH
65         }
66         return protocol[self.next_protocol]
67
68
69 class LispGPEInnerIP(IP):
70     """Scapy inner LISP GPE layer for IPv4-in-IPv4."""
71
72     name = u"Lisp GPE Inner Layer - IPv4"
73
74
75 class LispGPEInnerIPv6(IPv6):
76     """Scapy inner LISP GPE layer for IPv6-in-IPv6."""
77
78     name = u"Lisp GPE Inner Layer - IPv6"
79
80
81 class LispGPEInnerEther(Ether):
82     """Scapy inner LISP GPE layer for Lisp-L2."""
83
84     name = u"Lisp GPE Inner Layer - Ethernet"
85
86
87 class LispGPEInnerNSH(Packet):
88     """Scapy inner LISP GPE layer for Lisp-NSH.
89
90     Parsing not implemented.
91     """
92
93
94 def valid_ipv4(ip):
95     try:
96         ipaddress.IPv4Address(ip)
97         return True
98     except (AttributeError, ipaddress.AddressValueError):
99         return False
100
101
102 def valid_ipv6(ip):
103     try:
104         ipaddress.IPv6Address(ip)
105         return True
106     except (AttributeError, ipaddress.AddressValueError):
107         return False
108
109
110 def main():
111     """Send IP ICMP packet from one traffic generator interface to the other.
112
113     :raises RuntimeError: If the received packet is not correct."""
114
115     args = TrafficScriptArg(
116         [
117             u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
118             u"dut_if2_mac", u"src_rloc", u"dst_rloc"
119         ],
120         [u"ot_mode"]
121     )
122
123     tx_src_mac = args.get_arg(u"tg_src_mac")
124     tx_dst_mac = args.get_arg(u"dut_if1_mac")
125     rx_dst_mac = args.get_arg(u"tg_dst_mac")
126     rx_src_mac = args.get_arg(u"dut_if2_mac")
127     src_ip = args.get_arg(u"src_ip")
128     dst_ip = args.get_arg(u"dst_ip")
129     src_rloc = args.get_arg(u"src_rloc")
130     dst_rloc = args.get_arg(u"dst_rloc")
131     tx_if = args.get_arg(u"tx_if")
132     rx_if = args.get_arg(u"rx_if")
133     ot_mode = args.get_arg(u"ot_mode")
134
135     rxq = RxQueue(rx_if)
136     txq = TxQueue(tx_if)
137
138     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
139
140     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
141         pkt_raw /= IP(src=src_ip, dst=dst_ip)
142         pkt_raw /= ICMP()
143         ip_format = IP
144     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
145         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
146         pkt_raw /= ICMPv6EchoRequest()
147         ip_format = IPv6
148     else:
149         raise ValueError(u"IP not in correct format")
150
151     bind_layers(UDP, LispGPEHeader, dport=4341)
152
153     pkt_raw /= Raw()
154     sent_packets = list()
155     sent_packets.append(pkt_raw)
156     txq.send(pkt_raw)
157
158     if tx_if == rx_if:
159         ether = rxq.recv(2, ignore=sent_packets)
160     else:
161         ether = rxq.recv(2)
162
163     if ether is None:
164         raise RuntimeError(u"ICMP echo Rx timeout")
165
166     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
167         print(u"MAC addresses match.")
168     else:
169         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
170
171     ip = ether.payload
172
173     if ot_mode == u"6to4":
174         if not isinstance(ip, IP):
175             raise RuntimeError(f"Not an IP packet received {ip!r}")
176     elif ot_mode == u"4to6":
177         if not isinstance(ip, IPv6):
178             raise RuntimeError(f"Not an IP packet received {ip!r}")
179     elif not isinstance(ip, ip_format):
180             raise RuntimeError(f"Not an IP packet received {ip!r}")
181
182     lisp = ether.getlayer(LispGPEHeader).underlayer
183     if not lisp:
184         raise RuntimeError(u"Lisp layer not present or parsing failed.")
185
186     # Compare data from packets
187     if src_ip == lisp.src:
188         print(u"Source IP matches source EID.")
189     else:
190         raise RuntimeError(
191             f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
192         )
193
194     if dst_ip == lisp.dst:
195         print(u"Destination IP matches destination EID.")
196     else:
197         raise RuntimeError(
198             f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
199         )
200
201     if src_rloc == ip.src:
202         print(u"Source RLOC matches configuration.")
203     else:
204         raise RuntimeError(
205             f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
206         )
207
208     if dst_rloc == ip.dst:
209         print(u"Destination RLOC matches configuration.")
210     else:
211         raise RuntimeError(
212             f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
213         )
214
215     sys.exit(0)
216
217
218 if __name__ == u"__main__":
219     main()