Separate files needing GPL license
[csit.git] / GPL / traffic_scripts / lisp / lisp_check.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2020 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
17 a LISP-encapsulated packet on the other interface and verifies received packet.
18 """
19
20 import sys
21 import ipaddress
22
23 from scapy.all import bind_layers, Packet
24 from scapy.fields import FlagsField, BitField, IntField
25 from scapy.layers.inet import ICMP, IP, UDP
26 from scapy.layers.inet6 import ICMPv6EchoRequest
27 from scapy.layers.inet6 import IPv6
28 from scapy.layers.l2 import Ether
29 from scapy.packet import Raw
30
31 from ..PacketVerifier import RxQueue, TxQueue
32 from ..TrafficScriptArg import TrafficScriptArg
33
34
35 class LispHeader(Packet):
36     """Scapy header for the LISP Layer."""
37
38     name = u"Lisp Header"
39     fields_desc = [
40         FlagsField(
41             u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"", u"", u""]
42         ),
43         BitField(u"nonce/map_version", 0, size=24),
44         IntField(u"instance_id/locator_status_bits", 0)]
45
46
47 class LispInnerIP(IP):
48     """Scapy inner LISP layer for IPv4-in-IPv4."""
49
50     name = u"Lisp Inner Layer - IPv4"
51
52
53 class LispInnerIPv6(IPv6):
54     """Scapy inner LISP layer for IPv6-in-IPv6."""
55
56     name = u"Lisp Inner Layer - IPv6"
57
58
59 def valid_ipv4(ip):
60     try:
61         ipaddress.IPv4Address(ip)
62         return True
63     except (AttributeError, ipaddress.AddressValueError):
64         return False
65
66
67 def valid_ipv6(ip):
68     try:
69         ipaddress.IPv6Address(ip)
70         return True
71     except (AttributeError, ipaddress.AddressValueError):
72         return False
73
74
75 def main():
76     """Send IP ICMP packet from one traffic generator interface to the other.
77
78     :raises RuntimeError: If the received packet is not correct."""
79
80     args = TrafficScriptArg(
81         [
82             u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
83             u"dut_if2_mac", u"src_rloc", u"dst_rloc"
84         ],
85         [u"ot_mode"]
86     )
87
88     tx_src_mac = args.get_arg(u"tg_src_mac")
89     tx_dst_mac = args.get_arg(u"dut_if1_mac")
90     rx_dst_mac = args.get_arg(u"tg_dst_mac")
91     rx_src_mac = args.get_arg(u"dut_if2_mac")
92     src_ip = args.get_arg(u"src_ip")
93     dst_ip = args.get_arg(u"dst_ip")
94     src_rloc = args.get_arg(u"src_rloc")
95     dst_rloc = args.get_arg(u"dst_rloc")
96     tx_if = args.get_arg(u"tx_if")
97     rx_if = args.get_arg(u"rx_if")
98     ot_mode = args.get_arg(u"ot_mode")
99
100     rxq = RxQueue(rx_if)
101     txq = TxQueue(tx_if)
102
103     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
104
105     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
106         pkt_raw /= IP(src=src_ip, dst=dst_ip)
107         pkt_raw /= ICMP()
108         ip_format = IP
109         lisp_layer = LispInnerIP
110     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
111         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
112         pkt_raw /= ICMPv6EchoRequest()
113         ip_format = IPv6
114         lisp_layer = LispInnerIPv6
115     else:
116         raise ValueError(u"IP not in correct format")
117
118     bind_layers(UDP, LispHeader, dport=4341)
119     bind_layers(LispHeader, lisp_layer)
120
121     pkt_raw /= Raw()
122     sent_packets = list()
123     sent_packets.append(pkt_raw)
124     txq.send(pkt_raw)
125
126     if tx_if == rx_if:
127         ether = rxq.recv(2, ignore=sent_packets)
128     else:
129         ether = rxq.recv(2)
130
131     if ether is None:
132         raise RuntimeError(u"ICMP echo Rx timeout")
133
134     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
135         print(u"MAC addresses match.")
136     else:
137         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
138
139     ip = ether.payload
140
141     if ot_mode == u"6to4":
142         if not isinstance(ip, IP):
143             raise RuntimeError(f"Not an IP packet received {ip!r}")
144     elif ot_mode == u"4to6":
145         if not isinstance(ip, IPv6):
146             raise RuntimeError(f"Not an IP packet received {ip!r}")
147     elif not isinstance(ip, ip_format):
148             raise RuntimeError(f"Not an IP packet received {ip!r}")
149
150     lisp = ether.getlayer(lisp_layer)
151     if not lisp:
152         raise RuntimeError("Lisp layer not present or parsing failed.")
153
154     # Compare data from packets
155     if src_ip == lisp.src:
156         print(u"Source IP matches source EID.")
157     else:
158         raise RuntimeError(
159             f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
160         )
161
162     if dst_ip == lisp.dst:
163         print(u"Destination IP matches destination EID.")
164     else:
165         raise RuntimeError(
166             f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
167         )
168
169     if src_rloc == ip.src:
170         print(u"Source RLOC matches configuration.")
171     else:
172         raise RuntimeError(
173             f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
174         )
175
176     if dst_rloc == ip.dst:
177         print(u"Destination RLOC matches configuration.")
178     else:
179         raise RuntimeError(
180             f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
181         )
182
183     sys.exit(0)
184
185
186 if __name__ == u"__main__":
187     main()