Performance: Fix l3fwd in 3node
[csit.git] / resources / traffic_scripts / send_ip_check_headers.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2019 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """Traffic script that sends an IP IPv4/IPv6 packet from one interface
17 to the other. Source and destination IP addresses and source and destination
18 MAC addresses are checked in received packet.
19 """
20
21 import sys
22
23 import ipaddress
24
25 from robot.api import logger
26 from scapy.layers.inet import IP
27 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
28 from scapy.layers.l2 import Ether, Dot1Q
29 from scapy.packet import Raw
30
31 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
32 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
33
34
35 def valid_ipv4(ip):
36     try:
37         ipaddress.IPv4Address(ip)
38         return True
39     except (AttributeError, ipaddress.AddressValueError):
40         return False
41
42
43 def valid_ipv6(ip):
44     try:
45         ipaddress.IPv6Address(ip)
46         return True
47     except (AttributeError, ipaddress.AddressValueError):
48         return False
49
50
51 def main():
52     """Send IP/IPv6 packet from one traffic generator interface to the other."""
53     args = TrafficScriptArg(
54         [
55             u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
56             u"dut_if2_mac"
57         ],
58         [
59             u"encaps_tx", u"vlan_tx", u"vlan_outer_tx", u"encaps_rx",
60             u"vlan_rx", u"vlan_outer_rx"
61         ]
62     )
63
64     tx_src_mac = args.get_arg(u"tg_src_mac")
65     tx_dst_mac = args.get_arg(u"dut_if1_mac")
66     rx_dst_mac = args.get_arg(u"tg_dst_mac")
67     rx_src_mac = args.get_arg(u"dut_if2_mac")
68     src_ip = args.get_arg(u"src_ip")
69     dst_ip = args.get_arg(u"dst_ip")
70     tx_if = args.get_arg(u"tx_if")
71     rx_if = args.get_arg(u"rx_if")
72
73     encaps_tx = args.get_arg(u"encaps_tx")
74     vlan_tx = args.get_arg(u"vlan_tx")
75     vlan_outer_tx = args.get_arg(u"vlan_outer_tx")
76     encaps_rx = args.get_arg(u"encaps_rx")
77     vlan_rx = args.get_arg(u"vlan_rx")
78     vlan_outer_rx = args.get_arg(u"vlan_outer_rx")
79
80     rxq = RxQueue(rx_if)
81     txq = TxQueue(tx_if)
82
83     sent_packets =list()
84     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
85
86     if encaps_tx == u"Dot1q":
87         pkt_raw /= Dot1Q(vlan=int(vlan_tx))
88     elif encaps_tx == u"Dot1ad":
89         pkt_raw.type = 0x88a8
90         pkt_raw /= Dot1Q(vlan=vlan_outer_tx)
91         pkt_raw /= Dot1Q(vlan=vlan_tx)
92
93     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
94         pkt_raw /= IP(src=src_ip, dst=dst_ip, proto=61)
95         ip_format = IP
96     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
97         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
98         ip_format = IPv6
99     else:
100         raise ValueError(u"IP not in correct format")
101
102     pkt_raw /= Raw()
103     sent_packets.append(pkt_raw)
104     txq.send(pkt_raw)
105
106     while True:
107         if tx_if == rx_if:
108             ether = rxq.recv(2, ignore=sent_packets)
109         else:
110             ether = rxq.recv(2)
111
112         if ether is None:
113             raise RuntimeError(u"IP packet Rx timeout")
114
115         if ether.haslayer(ICMPv6ND_NS):
116             # read another packet in the queue if the current one is ICMPv6ND_NS
117             continue
118         else:
119             # otherwise process the current packet
120             break
121
122     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
123         logger.trace(u"MAC matched")
124     else:
125         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
126
127     if encaps_rx == u"Dot1q":
128         if ether[Dot1Q].vlan == int(vlan_rx):
129             logger.trace(u"VLAN matched")
130         else:
131             raise RuntimeError(
132                 f"Ethernet frame with wrong VLAN tag "
133                 f"({ether[Dot1Q].vlan}-received, "
134                 f"{vlan_rx}-expected):\n{ether!r}"
135             )
136         ip = ether[Dot1Q].payload
137     elif encaps_rx == u"Dot1ad":
138         raise NotImplementedError()
139     else:
140         ip = ether.payload
141
142     if not isinstance(ip, ip_format):
143         raise RuntimeError(f"Not an IP packet received {ip!r}")
144
145     # Compare data from packets
146     if src_ip == ip.src:
147         logger.trace(u"Src IP matched")
148     else:
149         raise RuntimeError(
150             f"Matching Src IP unsuccessful: {src_ip} != {ip.src}"
151         )
152
153     if dst_ip == ip.dst:
154         logger.trace(u"Dst IP matched")
155     else:
156         raise RuntimeError(
157             f"Matching Dst IP unsuccessful: {dst_ip} != {ip.dst}"
158         )
159
160     sys.exit(0)
161
162
163 if __name__ == u"__main__":
164     main()