Python3: resources and libraries
[csit.git] / resources / traffic_scripts / lisp / lispgpe_check.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2019 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
17 a LISPGPE-encapsulated packet on the other interface and verifies received
18 packet.
19 """
20
21 import sys
22 import ipaddress
23
24 from scapy.all import bind_layers, Packet
25 from scapy.fields import FlagsField, BitField, XBitField, IntField
26 from scapy.layers.inet import ICMP, IP, UDP
27 from scapy.layers.inet6 import ICMPv6EchoRequest
28 from scapy.layers.inet6 import IPv6
29 from scapy.layers.l2 import Ether
30 from scapy.packet import Raw
31
32 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
33 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
34
35
36 class LispGPEHeader(Packet):
37     """Scapy header for the Lisp GPE Layer."""
38
39     name = "Lisp GPE Header"
40     fields_desc = [
41         FlagsField(
42             u"flags", None, 8, [u"N", u"L", u"E", u"V", u"I", u"P", u"R", u"O"]
43         ),
44         BitField(u"version", 0, size=2),
45         BitField(u"reserved", 0, size=14),
46         XBitField(u"next_protocol", 0, size=8),
47         IntField(u"instance_id/locator_status_bits", 0)
48     ]
49
50     def guess_payload_class(self, payload):
51         protocol = {
52             0x1: LispGPEInnerIP,
53             0x2: LispGPEInnerIPv6,
54             0x3: LispGPEInnerEther,
55             0x4: LispGPEInnerNSH
56         }
57         return protocol[self.next_protocol]
58
59
60 class LispGPEInnerIP(IP):
61     """Scapy inner LISP GPE layer for IPv4-in-IPv4."""
62
63     name = u"Lisp GPE Inner Layer - IPv4"
64
65
66 class LispGPEInnerIPv6(IPv6):
67     """Scapy inner LISP GPE layer for IPv6-in-IPv6."""
68
69     name = u"Lisp GPE Inner Layer - IPv6"
70
71
72 class LispGPEInnerEther(Ether):
73     """Scapy inner LISP GPE layer for Lisp-L2."""
74
75     name = u"Lisp GPE Inner Layer - Ethernet"
76
77
78 class LispGPEInnerNSH(Packet):
79     """Scapy inner LISP GPE layer for Lisp-NSH.
80
81     Parsing not implemented.
82     """
83
84
85 def valid_ipv4(ip):
86     try:
87         ipaddress.IPv4Address(ip)
88         return True
89     except (AttributeError, ipaddress.AddressValueError):
90         return False
91
92
93 def valid_ipv6(ip):
94     try:
95         ipaddress.IPv6Address(ip)
96         return True
97     except (AttributeError, ipaddress.AddressValueError):
98         return False
99
100
101 def main():
102     """Send IP ICMP packet from one traffic generator interface to the other.
103
104     :raises RuntimeError: If the received packet is not correct."""
105
106     args = TrafficScriptArg(
107         [
108             u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
109             u"dut_if2_mac", u"src_rloc", u"dst_rloc"
110         ],
111         [u"ot_mode"]
112     )
113
114     tx_src_mac = args.get_arg(u"tg_src_mac")
115     tx_dst_mac = args.get_arg(u"dut_if1_mac")
116     rx_dst_mac = args.get_arg(u"tg_dst_mac")
117     rx_src_mac = args.get_arg(u"dut_if2_mac")
118     src_ip = args.get_arg(u"src_ip")
119     dst_ip = args.get_arg(u"dst_ip")
120     src_rloc = args.get_arg(u"src_rloc")
121     dst_rloc = args.get_arg(u"dst_rloc")
122     tx_if = args.get_arg(u"tx_if")
123     rx_if = args.get_arg(u"rx_if")
124     ot_mode = args.get_arg(u"ot_mode")
125
126     rxq = RxQueue(rx_if)
127     txq = TxQueue(tx_if)
128
129     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
130
131     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
132         pkt_raw /= IP(src=src_ip, dst=dst_ip)
133         pkt_raw /= ICMP()
134         ip_format = IP
135     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
136         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
137         pkt_raw /= ICMPv6EchoRequest()
138         ip_format = IPv6
139     else:
140         raise ValueError(u"IP not in correct format")
141
142     bind_layers(UDP, LispGPEHeader, dport=4341)
143
144     pkt_raw /= Raw()
145     sent_packets = list()
146     sent_packets.append(pkt_raw)
147     txq.send(pkt_raw)
148
149     if tx_if == rx_if:
150         ether = rxq.recv(2, ignore=sent_packets)
151     else:
152         ether = rxq.recv(2)
153
154     if ether is None:
155         raise RuntimeError(u"ICMP echo Rx timeout")
156
157     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
158         print(u"MAC addresses match.")
159     else:
160         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
161
162     ip = ether.payload
163
164     if ot_mode == u"6to4":
165         if not isinstance(ip, IP):
166             raise RuntimeError(f"Not an IP packet received {ip!r}")
167     elif ot_mode == u"4to6":
168         if not isinstance(ip, IPv6):
169             raise RuntimeError(f"Not an IP packet received {ip!r}")
170     elif not isinstance(ip, ip_format):
171             raise RuntimeError(f"Not an IP packet received {ip!r}")
172
173     lisp = ether.getlayer(LispGPEHeader).underlayer
174     if not lisp:
175         raise RuntimeError(u"Lisp layer not present or parsing failed.")
176
177     # Compare data from packets
178     if src_ip == lisp.src:
179         print(u"Source IP matches source EID.")
180     else:
181         raise RuntimeError(
182             f"Matching Src IP unsuccessful: {src_ip} != {lisp.src}"
183         )
184
185     if dst_ip == lisp.dst:
186         print(u"Destination IP matches destination EID.")
187     else:
188         raise RuntimeError(
189             f"Matching Dst IP unsuccessful: {dst_ip} != {lisp.dst}"
190         )
191
192     if src_rloc == ip.src:
193         print(u"Source RLOC matches configuration.")
194     else:
195         raise RuntimeError(
196             f"Matching Src RLOC unsuccessful: {src_rloc} != {ip.src}"
197         )
198
199     if dst_rloc == ip.dst:
200         print(u"Destination RLOC matches configuration.")
201     else:
202         raise RuntimeError(
203             f"Matching dst RLOC unsuccessful: {dst_rloc} != {ip.dst}"
204         )
205
206     sys.exit(0)
207
208
209 if __name__ == u"__main__":
210     main()