CSIT-611: HC Test: Lisp suite updates + LispGPE
[csit.git] / resources / traffic_scripts / lisp / lisp_check.py
1 #!/usr/bin/env python
2 # Copyright (c) 2017 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
16 a LISP-encapsulated packet on the other interface and verifies received packet.
17 """
18
19 import sys
20 import ipaddress
21
22 from scapy.layers.inet import ICMP, IP, UDP
23 from scapy.layers.inet6 import ICMPv6EchoRequest
24 from scapy.layers.inet6 import IPv6
25 from scapy.layers.l2 import Ether
26 from scapy.all import bind_layers, Packet
27 from scapy.fields import FlagsField, BitField, IntField
28
29 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
30 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
31
32
33 class LispHeader(Packet):
34     """Scapy header for the LISP Layer."""
35     name = "Lisp Header"
36     fields_desc = [
37         FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "", "", ""]),
38         BitField("nonce/map_version", 0, size=24),
39         IntField("instance_id/locator_status_bits", 0)]
40
41
42 class LispInnerIP(IP):
43     """Scapy inner LISP layer for IPv4-in-IPv4."""
44     name = "Lisp Inner Layer - IPv4"
45
46
47 class LispInnerIPv6(IPv6):
48     """Scapy inner LISP layer for IPv6-in-IPv6."""
49     name = "Lisp Inner Layer - IPv6"
50
51
52 def valid_ipv4(ip):
53     try:
54         ipaddress.IPv4Address(unicode(ip))
55         return True
56     except (AttributeError, ipaddress.AddressValueError):
57         return False
58
59
60 def valid_ipv6(ip):
61     try:
62         ipaddress.IPv6Address(unicode(ip))
63         return True
64     except (AttributeError, ipaddress.AddressValueError):
65         return False
66
67
68 def main():
69     """Send IP ICMP packet from one traffic generator interface to the other.
70
71     :raises RuntimeError: If the received packet is not correct."""
72
73     args = TrafficScriptArg(
74         ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
75          'dut_if2_mac', 'src_rloc', 'dst_rloc'])
76
77     tx_src_mac = args.get_arg('tg_src_mac')
78     tx_dst_mac = args.get_arg('dut_if1_mac')
79     rx_dst_mac = args.get_arg('tg_dst_mac')
80     rx_src_mac = args.get_arg('dut_if2_mac')
81     src_ip = args.get_arg('src_ip')
82     dst_ip = args.get_arg('dst_ip')
83     src_rloc = args.get_arg("src_rloc")
84     dst_rloc = args.get_arg("dst_rloc")
85     tx_if = args.get_arg('tx_if')
86     rx_if = args.get_arg('rx_if')
87
88     rxq = RxQueue(rx_if)
89     txq = TxQueue(tx_if)
90     sent_packets = []
91     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
92
93     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
94         pkt_raw /= IP(src=src_ip, dst=dst_ip)
95         pkt_raw /= ICMP()
96         ip_format = IP
97         lisp_layer = LispInnerIP
98     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
99         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
100         pkt_raw /= ICMPv6EchoRequest()
101         ip_format = IPv6
102         lisp_layer = LispInnerIPv6
103     else:
104         raise ValueError("IP not in correct format")
105
106     bind_layers(UDP, LispHeader, dport=4341)
107     bind_layers(LispHeader, lisp_layer)
108     sent_packets.append(pkt_raw)
109     txq.send(pkt_raw)
110
111     if tx_if == rx_if:
112         ether = rxq.recv(2, ignore=sent_packets)
113     else:
114         ether = rxq.recv(2)
115
116     if ether is None:
117         raise RuntimeError("ICMP echo Rx timeout")
118
119     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
120         print("MAC addresses match.")
121     else:
122         raise RuntimeError(
123             "Matching packet unsuccessful: {0}".format(ether.__repr__()))
124
125     ip = ether.payload
126
127     if not isinstance(ip, ip_format):
128         raise RuntimeError(
129             "Not an IP packet received {0}".format(ip.__repr__()))
130
131     lisp = ether.getlayer(lisp_layer)
132     if not lisp:
133         raise RuntimeError("Lisp layer not present or parsing failed.")
134
135     # Compare data from packets
136     if src_ip == lisp.src:
137         print("Source IP matches source EID.")
138     else:
139         raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
140                            .format(src_ip, lisp.src))
141
142     if dst_ip == lisp.dst:
143         print("Destination IP matches destination EID.")
144     else:
145         raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
146                            .format(dst_ip, lisp.dst))
147
148     if src_rloc == ip.src:
149         print("Source RLOC matches configuration.")
150     else:
151         raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}"
152                            .format(src_rloc, ip.src))
153
154     if dst_rloc == ip.dst:
155         print("Destination RLOC matches configuration.")
156     else:
157         raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}"
158                            .format(dst_rloc, ip.dst))
159
160     sys.exit(0)
161
162
163 if __name__ == "__main__":
164     main()