9937de807739486f675f210d157dcba080a171f8
[csit.git] / resources / traffic_scripts / lisp / lisp_check.py
1 #!/usr/bin/env python
2 # Copyright (c) 2017 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
16 a LISP-encapsulated packet on the other interface and verifies received packet.
17 """
18
19 import sys
20 import ipaddress
21
22 from scapy.layers.inet import ICMP, IP, UDP
23 from scapy.layers.inet6 import ICMPv6EchoRequest
24 from scapy.layers.inet6 import IPv6
25 from scapy.layers.l2 import Ether
26 from scapy.all import bind_layers, Packet
27 from scapy.fields import FlagsField, BitField, IntField
28
29 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
30 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
31
32
33 class LispHeader(Packet):
34     """Scapy header for the LISP Layer."""
35     name = "Lisp Header"
36     fields_desc = [
37         FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "", "", ""]),
38         BitField("nonce/map_version", 0, size=24),
39         IntField("instance_id/locator_status_bits", 0)]
40
41
42 class LispInnerIP(IP):
43     """Scapy inner LISP layer for IPv4-in-IPv4."""
44     name = "Lisp Inner Layer - IPv4"
45
46
47 class LispInnerIPv6(IPv6):
48     """Scapy inner LISP layer for IPv6-in-IPv6."""
49     name = "Lisp Inner Layer - IPv6"
50
51
52 def valid_ipv4(ip):
53     try:
54         ipaddress.IPv4Address(unicode(ip))
55         return True
56     except (AttributeError, ipaddress.AddressValueError):
57         return False
58
59
60 def valid_ipv6(ip):
61     try:
62         ipaddress.IPv6Address(unicode(ip))
63         return True
64     except (AttributeError, ipaddress.AddressValueError):
65         return False
66
67
68 def main():
69     """Send IP ICMP packet from one traffic generator interface to the other.
70
71     :raises RuntimeError: If the received packet is not correct."""
72
73     args = TrafficScriptArg(
74         ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
75          'dut_if2_mac', 'src_rloc', 'dst_rloc'],
76         ['ot_mode'])
77
78     tx_src_mac = args.get_arg('tg_src_mac')
79     tx_dst_mac = args.get_arg('dut_if1_mac')
80     rx_dst_mac = args.get_arg('tg_dst_mac')
81     rx_src_mac = args.get_arg('dut_if2_mac')
82     src_ip = args.get_arg('src_ip')
83     dst_ip = args.get_arg('dst_ip')
84     src_rloc = args.get_arg("src_rloc")
85     dst_rloc = args.get_arg("dst_rloc")
86     tx_if = args.get_arg('tx_if')
87     rx_if = args.get_arg('rx_if')
88     ot_mode = args.get_arg('ot_mode')
89
90     rxq = RxQueue(rx_if)
91     txq = TxQueue(tx_if)
92     sent_packets = []
93     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
94
95     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
96         pkt_raw /= IP(src=src_ip, dst=dst_ip)
97         pkt_raw /= ICMP()
98         ip_format = IP
99         lisp_layer = LispInnerIP
100     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
101         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
102         pkt_raw /= ICMPv6EchoRequest()
103         ip_format = IPv6
104         lisp_layer = LispInnerIPv6
105     else:
106         raise ValueError("IP not in correct format")
107
108     bind_layers(UDP, LispHeader, dport=4341)
109     bind_layers(LispHeader, lisp_layer)
110     sent_packets.append(pkt_raw)
111     txq.send(pkt_raw)
112
113     if tx_if == rx_if:
114         ether = rxq.recv(2, ignore=sent_packets)
115     else:
116         ether = rxq.recv(2)
117
118     if ether is None:
119         raise RuntimeError("ICMP echo Rx timeout")
120
121     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
122         print("MAC addresses match.")
123     else:
124         raise RuntimeError(
125             "Matching packet unsuccessful: {0}".format(ether.__repr__()))
126
127     ip = ether.payload
128
129     if ot_mode == '6to4':
130         if not isinstance(ip, IP):
131             raise RuntimeError(
132                 "Not an IP packet received {0}".format(ip.__repr__()))
133     elif ot_mode == '4to6':
134         if not isinstance(ip, IP6):
135             raise RuntimeError(
136                 "Not an IP packet received {0}".format(ip.__repr__()))
137     elif not isinstance(ip, ip_format):
138             raise RuntimeError(
139                 "Not an IP packet received {0}".format(ip.__repr__()))
140
141     lisp = ether.getlayer(lisp_layer)
142     if not lisp:
143         raise RuntimeError("Lisp layer not present or parsing failed.")
144
145     # Compare data from packets
146     if src_ip == lisp.src:
147         print("Source IP matches source EID.")
148     else:
149         raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
150                            .format(src_ip, lisp.src))
151
152     if dst_ip == lisp.dst:
153         print("Destination IP matches destination EID.")
154     else:
155         raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
156                            .format(dst_ip, lisp.dst))
157
158     if src_rloc == ip.src:
159         print("Source RLOC matches configuration.")
160     else:
161         raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}"
162                            .format(src_rloc, ip.src))
163
164     if dst_rloc == ip.dst:
165         print("Destination RLOC matches configuration.")
166     else:
167         raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}"
168                            .format(dst_rloc, ip.dst))
169
170     sys.exit(0)
171
172
173 if __name__ == "__main__":
174     main()