FIX: Add ICMPv6MLReport2 masking
[csit.git] / GPL / traffic_scripts / send_vxlan_check_vxlan.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2021 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose
18 # Apache 2.
19 #
20 # Unless required by applicable law or agreed to in writing, software
21 # distributed under the License is distributed on an "AS IS" BASIS,
22 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 # See the License for the specific language governing permissions and
24 # limitations under the License.
25
26 """Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface to
27 the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
28 """
29
30 import sys
31
32 from scapy.layers.inet import IP, UDP
33 from scapy.layers.l2 import Ether
34 from scapy.packet import Raw
35
36 from .PacketVerifier import RxQueue, TxQueue
37 from .TrafficScriptArg import TrafficScriptArg
38 from . import vxlan
39
40
41 def main():
42     """Send IP ICMPv4/ICMPv6 packet from one traffic generator interface to
43     the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
44     """
45     args = TrafficScriptArg(
46         [
47             u"tx_src_mac", u"tx_dst_mac", u"tx_src_ip", u"tx_dst_ip", u"tx_vni",
48             u"rx_src_ip", u"rx_dst_ip", u"rx_vni"
49         ]
50     )
51
52     tx_if = args.get_arg(u"tx_if")
53     rx_if = args.get_arg(u"rx_if")
54     tx_src_mac = args.get_arg(u"tx_src_mac")
55     tx_dst_mac = args.get_arg(u"tx_dst_mac")
56     tx_src_ip = args.get_arg(u"tx_src_ip")
57     tx_dst_ip = args.get_arg(u"tx_dst_ip")
58     tx_vni = args.get_arg(u"tx_vni")
59     rx_src_ip = args.get_arg(u"rx_src_ip")
60     rx_dst_ip = args.get_arg(u"rx_dst_ip")
61     rx_vni = args.get_arg(u"rx_vni")
62
63     rxq = RxQueue(rx_if)
64     txq = TxQueue(tx_if)
65
66     sent_packets = []
67
68     tx_pkt_p = (Ether(src=u"02:00:00:00:00:01", dst=u"02:00:00:00:00:02") /
69                 IP(src=u"192.168.1.1", dst=u"192.168.1.2") /
70                 UDP(sport=12345, dport=1234) /
71                 Raw(u"raw data"))
72
73     pkt_raw = (Ether(src=tx_src_mac, dst=tx_dst_mac) /
74                IP(src=tx_src_ip, dst=tx_dst_ip) /
75                UDP(sport=23456) /
76                vxlan.VXLAN(vni=int(tx_vni)) /
77                tx_pkt_p)
78
79     pkt_raw /= Raw()
80     # Send created packet on one interface and receive on the other
81     sent_packets.append(pkt_raw)
82     txq.send(pkt_raw)
83
84     ether = rxq.recv(2, ignore=sent_packets)
85
86     # Check whether received packet contains layers Ether, IP and VXLAN
87     if ether is None:
88         raise RuntimeError(u"Packet Rx timeout")
89     ip = ether.payload
90
91     if ip.src != rx_src_ip:
92         raise RuntimeError(f"IP src mismatch {ip.src} != {rx_src_ip}")
93     if ip.dst != rx_dst_ip:
94         raise RuntimeError(f"IP dst mismatch {ip.dst} != {rx_dst_ip}")
95     if ip.payload.dport != 4789:
96         raise RuntimeError(
97             f"VXLAN UDP port mismatch {ip.payload.dport} != 4789"
98         )
99     vxlan_pkt = ip.payload.payload
100
101     if int(vxlan_pkt.vni) != int(rx_vni):
102         raise RuntimeError(u"vxlan mismatch")
103     rx_pkt_p = vxlan_pkt.payload
104
105     if rx_pkt_p.src != tx_pkt_p.src:
106         raise RuntimeError(
107             f"RX encapsulated MAC src mismatch {rx_pkt_p.src} != {tx_pkt_p.src}"
108         )
109     if rx_pkt_p.dst != tx_pkt_p.dst:
110         raise RuntimeError(
111             f"RX encapsulated MAC dst mismatch {rx_pkt_p.dst} != {tx_pkt_p.dst}"
112         )
113     if rx_pkt_p[IP].src != tx_pkt_p[IP].src:
114         raise RuntimeError(
115             f"RX encapsulated IP src mismatch {rx_pkt_p[IP].src} != "
116             f"{tx_pkt_p[IP].src}"
117         )
118     if rx_pkt_p[IP].dst != tx_pkt_p[IP].dst:
119         raise RuntimeError(
120             f"RX encapsulated IP dst mismatch {rx_pkt_p[IP].dst} != "
121             f"{tx_pkt_p[IP].dst}"
122         )
123
124     # TODO: verify inner Ether()
125
126     sys.exit(0)
127
128
129 if __name__ == u"__main__":
130     main()