Revert "fix(IPsecUtil): Delete keywords no longer used"
[csit.git] / GPL / traffic_scripts / send_ip_check_headers.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose
18 # Apache 2.
19 #
20 # Unless required by applicable law or agreed to in writing, software
21 # distributed under the License is distributed on an "AS IS" BASIS,
22 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 # See the License for the specific language governing permissions and
24 # limitations under the License.
25
26 """Traffic script that sends an IP IPv4/IPv6 packet from one interface
27 to the other. Source and destination IP addresses and source and destination
28 MAC addresses are checked in received packet.
29 """
30
31 import sys
32
33 from robot.api import logger
34 from scapy.layers.inet import IP
35 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6MLReport2, ICMPv6ND_RA
36 from scapy.layers.l2 import Ether, Dot1Q
37 from scapy.packet import Raw
38
39 from .PacketVerifier import RxQueue, TxQueue
40 from .TrafficScriptArg import TrafficScriptArg
41 from .ValidIp import valid_ipv4, valid_ipv6
42
43 def main():
44     """Send IP/IPv6 packet from one traffic generator interface to the other."""
45     args = TrafficScriptArg(
46         [
47             u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
48             u"dut_if2_mac"
49         ],
50         [
51             u"encaps_tx", u"vlan_tx", u"vlan_outer_tx", u"encaps_rx",
52             u"vlan_rx", u"vlan_outer_rx"
53         ]
54     )
55
56     tx_src_mac = args.get_arg(u"tg_src_mac")
57     tx_dst_mac = args.get_arg(u"dut_if1_mac")
58     rx_dst_mac = args.get_arg(u"tg_dst_mac")
59     rx_src_mac = args.get_arg(u"dut_if2_mac")
60     src_ip = args.get_arg(u"src_ip")
61     dst_ip = args.get_arg(u"dst_ip")
62     tx_if = args.get_arg(u"tx_if")
63     rx_if = args.get_arg(u"rx_if")
64
65     encaps_tx = args.get_arg(u"encaps_tx")
66     vlan_tx = args.get_arg(u"vlan_tx")
67     vlan_outer_tx = args.get_arg(u"vlan_outer_tx")
68     encaps_rx = args.get_arg(u"encaps_rx")
69     vlan_rx = args.get_arg(u"vlan_rx")
70     vlan_outer_rx = args.get_arg(u"vlan_outer_rx")
71
72     rxq = RxQueue(rx_if)
73     txq = TxQueue(tx_if)
74
75     sent_packets = list()
76     pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
77
78     if encaps_tx == u"Dot1q":
79         pkt_raw /= Dot1Q(vlan=int(vlan_tx))
80     elif encaps_tx == u"Dot1ad":
81         pkt_raw.type = 0x88a8
82         pkt_raw /= Dot1Q(vlan=vlan_outer_tx)
83         pkt_raw /= Dot1Q(vlan=vlan_tx)
84
85     if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
86         pkt_raw /= IP(src=src_ip, dst=dst_ip, proto=61)
87         ip_format = IP
88     elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
89         pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
90         ip_format = IPv6
91     else:
92         raise ValueError(u"IP not in correct format")
93
94     pkt_raw /= Raw()
95     sent_packets.append(pkt_raw)
96     txq.send(pkt_raw)
97
98     while True:
99         if tx_if == rx_if:
100             ether = rxq.recv(2, ignore=sent_packets)
101         else:
102             ether = rxq.recv(2)
103
104         if ether is None:
105             raise RuntimeError(u"IP packet Rx timeout")
106
107         if ether.haslayer(ICMPv6ND_NS):
108             # read another packet in the queue if the current one is ICMPv6ND_NS
109             continue
110         elif ether.haslayer(ICMPv6MLReport2):
111             # read another packet in the queue if the current one is
112             # ICMPv6MLReport2
113             continue
114         elif ether.haslayer(ICMPv6ND_RA):
115             # read another packet in the queue if the current one is
116             # ICMPv6ND_RA
117             continue
118
119         break
120
121     if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
122         logger.trace(u"MAC matched")
123     else:
124         raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
125
126     if encaps_rx == u"Dot1q":
127         if ether[Dot1Q].vlan == int(vlan_rx):
128             logger.trace(u"VLAN matched")
129         else:
130             raise RuntimeError(
131                 f"Ethernet frame with wrong VLAN tag "
132                 f"({ether[Dot1Q].vlan}-received, "
133                 f"{vlan_rx}-expected):\n{ether!r}"
134             )
135         ip = ether[Dot1Q].payload
136     elif encaps_rx == u"Dot1ad":
137         raise NotImplementedError()
138     else:
139         ip = ether.payload
140
141     if not isinstance(ip, ip_format):
142         raise RuntimeError(f"Not an IP packet received {ip!r}")
143
144     # Compare data from packets
145     if src_ip == ip.src:
146         logger.trace(u"Src IP matched")
147     else:
148         raise RuntimeError(
149             f"Matching Src IP unsuccessful: {src_ip} != {ip.src}"
150         )
151
152     if dst_ip == ip.dst:
153         logger.trace(u"Dst IP matched")
154     else:
155         raise RuntimeError(
156             f"Matching Dst IP unsuccessful: {dst_ip} != {ip.dst}"
157         )
158
159     sys.exit(0)
160
161
162 if __name__ == u"__main__":
163     main()