CSIT-611: HC Test: Lisp suite updates + LispGPE
[csit.git] / resources / traffic_scripts / lisp / lispgpe_check.py
1 #!/usr/bin/env python
2 # Copyright (c) 2017 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an ICMP/ICMPv6 packet out one interface, receives
16 a LISPGPE-encapsulated packet on the other interface and verifies received
17 packet.
18 """
19
20 import sys
21 import ipaddress
22
23 from scapy.layers.inet import ICMP, IP, UDP
24 from scapy.layers.inet6 import ICMPv6EchoRequest
25 from scapy.layers.inet6 import IPv6
26 from scapy.layers.l2 import Ether
27 from scapy.all import bind_layers, Packet
28 from scapy.fields import FlagsField, BitField, XBitField, IntField
29
30 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
31 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
32
33
34 class LispGPEHeader(Packet):
35     """Scapy header for the Lisp GPE Layer."""
36     name = "Lisp GPE Header"
37     fields_desc = [
38         FlagsField("flags", None, 8, ["N", "L", "E", "V", "I", "P", "R", "O"]),
39         BitField("version", 0, size=2),
40         BitField("reserved", 0, size=14),
41         XBitField("next_protocol", 0, size=8),
42         IntField("instance_id/locator_status_bits", 0)]
43
44     def guess_payload_class(self, payload):
45         protocol = {
46             0x1: LispGPEInnerIP,
47             0x2: LispGPEInnerIPv6,
48             0x3: LispGPEInnerEther,
49             0x4: LispGPEInnerNSH
50         }
51         return protocol[self.next_protocol]
52
53
54 class LispGPEInnerIP(IP):
55     """Scapy inner LISP GPE layer for IPv4-in-IPv4."""
56     name = "Lisp GPE Inner Layer - IPv4"
57
58
59 class LispGPEInnerIPv6(IPv6):
60     """Scapy inner LISP GPE layer for IPv6-in-IPv6."""
61     name = "Lisp GPE Inner Layer - IPv6"
62
63
64 class LispGPEInnerEther(Ether):
65     """Scapy inner LISP GPE layer for Lisp-L2."""
66     name = "Lisp GPE Inner Layer - Ethernet"
67
68
69 class LispGPEInnerNSH(Packet):
70     """Scapy inner LISP GPE layer for Lisp-NSH.
71
72     Parsing not implemented.
73     """
74
75
76 def valid_ipv4(ip):
77     try:
78         ipaddress.IPv4Address(unicode(ip))
79         return True
80     except (AttributeError, ipaddress.AddressValueError):
81         return False
82
83
84 def valid_ipv6(ip):
85     try:
86         ipaddress.IPv6Address(unicode(ip))
87         return True
88     except (AttributeError, ipaddress.AddressValueError):
89         return False
90
91
92 def main():
93     """Send IP ICMP packet from one traffic generator interface to the other.
94
95     :raises RuntimeError: If the received packet is not correct."""
96
97     args = TrafficScriptArg(
98         ['tg_src_mac', 'tg_dst_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
99          'dut_if2_mac', 'src_rloc', 'dst_rloc'])
100
101     tx_src_mac = args.get_arg('tg_src_mac')
102     tx_dst_mac = args.get_arg('dut_if1_mac')
103     rx_dst_mac = args.get_arg('tg_dst_mac')
104     rx_src_mac = args.get_arg('dut_if2_mac')
105     src_ip = args.get_arg('src_ip')
106     dst_ip = args.get_arg('dst_ip')
107     src_rloc = args.get_arg("src_rloc")
108     dst_rloc = args.get_arg("dst_rloc")
109     tx_if = args.get_arg('tx_if')
110     rx_if = args.get_arg('rx_if')
111
112     rxq = RxQueue(rx_if)
113     txq = TxQueue(tx_if)
114     sent_packets = []
115     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
116
117     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
118         pkt_raw /= IP(src=src_ip, dst=dst_ip)
119         pkt_raw /= ICMP()
120         ip_format = IP
121     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
122         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
123         pkt_raw /= ICMPv6EchoRequest()
124         ip_format = IPv6
125     else:
126         raise ValueError("IP not in correct format")
127
128     bind_layers(UDP, LispGPEHeader, dport=4341)
129
130     sent_packets.append(pkt_raw)
131     txq.send(pkt_raw)
132
133     if tx_if == rx_if:
134         ether = rxq.recv(2, ignore=sent_packets)
135     else:
136         ether = rxq.recv(2)
137
138     if ether is None:
139         raise RuntimeError("ICMP echo Rx timeout")
140
141     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
142         print("MAC addresses match.")
143     else:
144         raise RuntimeError(
145             "Matching packet unsuccessful: {0}".format(ether.__repr__()))
146
147     ip = ether.payload
148
149     if not isinstance(ip, ip_format):
150         raise RuntimeError(
151             "Not an IP packet received {0}".format(ip.__repr__()))
152
153     lisp = ether.getlayer(LispGPEHeader).underlayer
154     if not lisp:
155         raise RuntimeError("Lisp layer not present or parsing failed.")
156
157     # Compare data from packets
158     if src_ip == lisp.src:
159         print("Source IP matches source EID.")
160     else:
161         raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
162                            .format(src_ip, lisp.src))
163
164     if dst_ip == lisp.dst:
165         print("Destination IP matches destination EID.")
166     else:
167         raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
168                            .format(dst_ip, lisp.dst))
169
170     if src_rloc == ip.src:
171         print("Source RLOC matches configuration.")
172     else:
173         raise RuntimeError("Matching Src RLOC unsuccessful: {} != {}"
174                            .format(src_rloc, ip.src))
175
176     if dst_rloc == ip.dst:
177         print("Destination RLOC matches configuration.")
178     else:
179         raise RuntimeError("Matching dst RLOC unsuccessful: {} != {}"
180                            .format(dst_rloc, ip.dst))
181
182     sys.exit(0)
183
184
185 if __name__ == "__main__":
186     main()