9f95c4952035bc728e9e2beea36978722491dd68
[csit.git] / GPL / traffic_scripts / send_vxlan_check_vxlan.py
1 #!/usr/bin/env python3
2
3 # Copyright (c) 2020 Cisco and/or its affiliates.
4 #
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6 #
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later;  you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
11 #
12 #     http://www.apache.org/licenses/LICENSE-2.0
13 #     https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
14 #
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+.  If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose Apache 2.
18 #
19 # Unless required by applicable law or agreed to in writing, software
20 # distributed under the License is distributed on an "AS IS" BASIS,
21 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 # See the License for the specific language governing permissions and
23 # limitations under the License.
24
25 """Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface to
26 the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
27 """
28
29 import sys
30
31 from scapy.layers.inet import IP, UDP
32 from scapy.layers.l2 import Ether
33 from scapy.packet import Raw
34
35 from .PacketVerifier import RxQueue, TxQueue
36 from .TrafficScriptArg import TrafficScriptArg
37 from . import vxlan
38
39
40 def main():
41     """Send IP ICMPv4/ICMPv6 packet from one traffic generator interface to
42     the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
43     """
44     args = TrafficScriptArg(
45         [
46             u"tx_src_mac", u"tx_dst_mac", u"tx_src_ip", u"tx_dst_ip", u"tx_vni",
47             u"rx_src_ip", u"rx_dst_ip", u"rx_vni"
48         ]
49     )
50
51     tx_if = args.get_arg(u"tx_if")
52     rx_if = args.get_arg(u"rx_if")
53     tx_src_mac = args.get_arg(u"tx_src_mac")
54     tx_dst_mac = args.get_arg(u"tx_dst_mac")
55     tx_src_ip = args.get_arg(u"tx_src_ip")
56     tx_dst_ip = args.get_arg(u"tx_dst_ip")
57     tx_vni = args.get_arg(u"tx_vni")
58     rx_src_ip = args.get_arg(u"rx_src_ip")
59     rx_dst_ip = args.get_arg(u"rx_dst_ip")
60     rx_vni = args.get_arg(u"rx_vni")
61
62     rxq = RxQueue(rx_if)
63     txq = TxQueue(tx_if)
64
65     sent_packets = []
66
67     tx_pkt_p = (Ether(src=u"02:00:00:00:00:01", dst=u"02:00:00:00:00:02") /
68                 IP(src=u"192.168.1.1", dst=u"192.168.1.2") /
69                 UDP(sport=12345, dport=1234) /
70                 Raw(u"raw data"))
71
72     pkt_raw = (Ether(src=tx_src_mac, dst=tx_dst_mac) /
73                IP(src=tx_src_ip, dst=tx_dst_ip) /
74                UDP(sport=23456) /
75                vxlan.VXLAN(vni=int(tx_vni)) /
76                tx_pkt_p)
77
78     pkt_raw /= Raw()
79     # Send created packet on one interface and receive on the other
80     sent_packets.append(pkt_raw)
81     txq.send(pkt_raw)
82
83     ether = rxq.recv(2, ignore=sent_packets)
84
85     # Check whether received packet contains layers Ether, IP and VXLAN
86     if ether is None:
87         raise RuntimeError(u"Packet Rx timeout")
88     ip = ether.payload
89
90     if ip.src != rx_src_ip:
91         raise RuntimeError(f"IP src mismatch {ip.src} != {rx_src_ip}")
92     if ip.dst != rx_dst_ip:
93         raise RuntimeError(f"IP dst mismatch {ip.dst} != {rx_dst_ip}")
94     if ip.payload.dport != 4789:
95         raise RuntimeError(
96             f"VXLAN UDP port mismatch {ip.payload.dport} != 4789"
97         )
98     vxlan_pkt = ip.payload.payload
99
100     if int(vxlan_pkt.vni) != int(rx_vni):
101         raise RuntimeError(u"vxlan mismatch")
102     rx_pkt_p = vxlan_pkt.payload
103
104     if rx_pkt_p.src != tx_pkt_p.src:
105         raise RuntimeError(
106             f"RX encapsulated MAC src mismatch {rx_pkt_p.src} != {tx_pkt_p.src}"
107         )
108     if rx_pkt_p.dst != tx_pkt_p.dst:
109         raise RuntimeError(
110             f"RX encapsulated MAC dst mismatch {rx_pkt_p.dst} != {tx_pkt_p.dst}"
111         )
112     if rx_pkt_p[IP].src != tx_pkt_p[IP].src:
113         raise RuntimeError(
114             f"RX encapsulated IP src mismatch {rx_pkt_p[IP].src} != "
115             f"{tx_pkt_p[IP].src}"
116         )
117     if rx_pkt_p[IP].dst != tx_pkt_p[IP].dst:
118         raise RuntimeError(
119             f"RX encapsulated IP dst mismatch {rx_pkt_p[IP].dst} != "
120             f"{tx_pkt_p[IP].dst}"
121         )
122
123     # TODO: verify inner Ether()
124
125     sys.exit(0)
126
127
128 if __name__ == u"__main__":
129     main()