d4de8635d70f93127c1b7c83b2c3dde7ca37affe
[csit.git] / resources / traffic_scripts / lisp / lispgpe_check.py
1 #!/usr/bin/env python
2 # Copyright (c) 2017 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
16 a LISPGPE-encapsulated packet on the other interface and verifies received
17 packet.
18 """
19
20 import sys
21 import ipaddress
22
23 from scapy.layers.inet import ICMP, IP, UDP
24 from scapy.layers.inet6 import ICMPv6EchoRequest
25 from scapy.layers.inet6 import IPv6
26 from scapy.layers.l2 import Ether
27 from scapy.all import bind_layers, Packet
28 from scapy.fields import FlagsField, BitField, XBitField, IntField
29
30 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
31 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
32
33
34 class LispGPEHeader(Packet):
35     """Scapy header for the Lisp GPE Layer."""
36     name = "Lisp GPE Header"
37     fields_desc = [
38         FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "P", "R", "O"]),
39         BitField("version", 0, size=2),
40         BitField("reserved", 0, size=14),
41         XBitField("next_protocol", 0, size=8),
42         IntField("instance_id/locator_status_bits", 0)]
43
44     def guess_payload_class(self, payload):
45         protocol = {
46             0x1: LispGPEInnerIP,
47             0x2: LispGPEInnerIPv6,
48             0x3: LispGPEInnerEther,
49             0x4: LispGPEInnerNSH
50         }
51         return protocol[self.next_protocol]
52
53
54 class LispGPEInnerIP(IP):
55     """Scapy inner LISP GPE layer for IPv4-in-IPv4."""
56     name = "Lisp GPE Inner Layer - IPv4"
57
58
59 class LispGPEInnerIPv6(IPv6):
60     """Scapy inner LISP GPE layer for IPv6-in-IPv6."""
61     name = "Lisp GPE Inner Layer - IPv6"
62
63
64 class LispGPEInnerEther(Ether):
65     """Scapy inner LISP GPE layer for Lisp-L2."""
66     name = "Lisp GPE Inner Layer - Ethernet"
67
68
69 class LispGPEInnerNSH(Packet):
70     """Scapy inner LISP GPE layer for Lisp-NSH.
71
72     Parsing not implemented.
73     """
74
75
76 def valid_ipv4(ip):
77     try:
78         ipaddress.IPv4Address(unicode(ip))
79         return True
80     except (AttributeError, ipaddress.AddressValueError):
81         return False
82
83
84 def valid_ipv6(ip):
85     try:
86         ipaddress.IPv6Address(unicode(ip))
87         return True
88     except (AttributeError, ipaddress.AddressValueError):
89         return False
90
91
92 def main():
93     """Send IP ICMP packet from one traffic generator interface to the other.
94
95     :raises RuntimeError: If the received packet is not correct."""
96
97     args = TrafficScriptArg(
98         ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
99          'dut_if2_mac', 'src_rloc', 'dst_rloc'],
100         ['ot_mode'])
101
102     tx_src_mac = args.get_arg('tg_src_mac')
103     tx_dst_mac = args.get_arg('dut_if1_mac')
104     rx_dst_mac = args.get_arg('tg_dst_mac')
105     rx_src_mac = args.get_arg('dut_if2_mac')
106     src_ip = args.get_arg('src_ip')
107     dst_ip = args.get_arg('dst_ip')
108     src_rloc = args.get_arg("src_rloc")
109     dst_rloc = args.get_arg("dst_rloc")
110     tx_if = args.get_arg('tx_if')
111     rx_if = args.get_arg('rx_if')
112     ot_mode = args.get_arg('ot_mode')
113
114     rxq = RxQueue(rx_if)
115     txq = TxQueue(tx_if)
116     sent_packets = []
117     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
118
119     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
120         pkt_raw /= IP(src=src_ip, dst=dst_ip)
121         pkt_raw /= ICMP()
122         ip_format = IP
123     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
124         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
125         pkt_raw /= ICMPv6EchoRequest()
126         ip_format = IPv6
127     else:
128         raise ValueError("IP not in correct format")
129
130     bind_layers(UDP, LispGPEHeader, dport=4341)
131
132     sent_packets.append(pkt_raw)
133     txq.send(pkt_raw)
134
135     if tx_if == rx_if:
136         ether = rxq.recv(2, ignore=sent_packets)
137     else:
138         ether = rxq.recv(2)
139
140     if ether is None:
141         raise RuntimeError("ICMP echo Rx timeout")
142
143     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
144         print("MAC addresses match.")
145     else:
146         raise RuntimeError(
147             "Matching packet unsuccessful: {0}".format(ether.__repr__()))
148
149     ip = ether.payload
150
151     if ot_mode == '6to4':
152         if not isinstance(ip, IP):
153             raise RuntimeError(
154                 "Not an IP packet received {0}".format(ip.__repr__()))
155     elif ot_mode == '4to6':
156         if not isinstance(ip, IPv6):
157             raise RuntimeError(
158                 "Not an IP packet received {0}".format(ip.__repr__()))
159     elif not isinstance(ip, ip_format):
160             raise RuntimeError(
161                 "Not an IP packet received {0}".format(ip.__repr__()))
162
163     lisp = ether.getlayer(LispGPEHeader).underlayer
164     if not lisp:
165         raise RuntimeError("Lisp layer not present or parsing failed.")
166
167     # Compare data from packets
168     if src_ip == lisp.src:
169         print("Source IP matches source EID.")
170     else:
171         raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
172                            .format(src_ip, lisp.src))
173
174     if dst_ip == lisp.dst:
175         print("Destination IP matches destination EID.")
176     else:
177         raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
178                            .format(dst_ip, lisp.dst))
179
180     if src_rloc == ip.src:
181         print("Source RLOC matches configuration.")
182     else:
183         raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}"
184                            .format(src_rloc, ip.src))
185
186     if dst_rloc == ip.dst:
187         print("Destination RLOC matches configuration.")
188     else:
189         raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}"
190                            .format(dst_rloc, ip.dst))
191
192     sys.exit(0)
193
194
195 if __name__ == "__main__":
196     main()