3 # Copyright (c) 2021 Cisco and/or its affiliates.
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later; you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
12 # http://www.apache.org/licenses/LICENSE-2.0
13 # https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+. If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose
20 # Unless required by applicable law or agreed to in writing, software
21 # distributed under the License is distributed on an "AS IS" BASIS,
22 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 # See the License for the specific language governing permissions and
24 # limitations under the License.
26 """Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface to
27 the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
32 from scapy.layers.inet import IP, UDP
33 from scapy.layers.l2 import Ether
34 from scapy.packet import Raw
36 from .PacketVerifier import RxQueue, TxQueue
37 from .TrafficScriptArg import TrafficScriptArg
42 """Send IP ICMPv4/ICMPv6 packet from one traffic generator interface to
43 the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
45 args = TrafficScriptArg(
47 u"tx_src_mac", u"tx_dst_mac", u"tx_src_ip", u"tx_dst_ip", u"tx_vni",
48 u"rx_src_ip", u"rx_dst_ip", u"rx_vni"
52 tx_if = args.get_arg(u"tx_if")
53 rx_if = args.get_arg(u"rx_if")
54 tx_src_mac = args.get_arg(u"tx_src_mac")
55 tx_dst_mac = args.get_arg(u"tx_dst_mac")
56 tx_src_ip = args.get_arg(u"tx_src_ip")
57 tx_dst_ip = args.get_arg(u"tx_dst_ip")
58 tx_vni = args.get_arg(u"tx_vni")
59 rx_src_ip = args.get_arg(u"rx_src_ip")
60 rx_dst_ip = args.get_arg(u"rx_dst_ip")
61 rx_vni = args.get_arg(u"rx_vni")
68 tx_pkt_p = (Ether(src=u"02:00:00:00:00:01", dst=u"02:00:00:00:00:02") /
69 IP(src=u"192.168.1.1", dst=u"192.168.1.2") /
70 UDP(sport=12345, dport=1234) /
73 pkt_raw = (Ether(src=tx_src_mac, dst=tx_dst_mac) /
74 IP(src=tx_src_ip, dst=tx_dst_ip) /
76 vxlan.VXLAN(vni=int(tx_vni)) /
80 # Send created packet on one interface and receive on the other
81 sent_packets.append(pkt_raw)
84 ether = rxq.recv(2, ignore=sent_packets)
86 # Check whether received packet contains layers Ether, IP and VXLAN
88 raise RuntimeError(u"Packet Rx timeout")
91 if ip.src != rx_src_ip:
92 raise RuntimeError(f"IP src mismatch {ip.src} != {rx_src_ip}")
93 if ip.dst != rx_dst_ip:
94 raise RuntimeError(f"IP dst mismatch {ip.dst} != {rx_dst_ip}")
95 if ip.payload.dport != 4789:
97 f"VXLAN UDP port mismatch {ip.payload.dport} != 4789"
99 vxlan_pkt = ip.payload.payload
101 if int(vxlan_pkt.vni) != int(rx_vni):
102 raise RuntimeError(u"vxlan mismatch")
103 rx_pkt_p = vxlan_pkt.payload
105 if rx_pkt_p.src != tx_pkt_p.src:
107 f"RX encapsulated MAC src mismatch {rx_pkt_p.src} != {tx_pkt_p.src}"
109 if rx_pkt_p.dst != tx_pkt_p.dst:
111 f"RX encapsulated MAC dst mismatch {rx_pkt_p.dst} != {tx_pkt_p.dst}"
113 if rx_pkt_p[IP].src != tx_pkt_p[IP].src:
115 f"RX encapsulated IP src mismatch {rx_pkt_p[IP].src} != "
116 f"{tx_pkt_p[IP].src}"
118 if rx_pkt_p[IP].dst != tx_pkt_p[IP].dst:
120 f"RX encapsulated IP dst mismatch {rx_pkt_p[IP].dst} != "
121 f"{tx_pkt_p[IP].dst}"
124 # TODO: verify inner Ether()
129 if __name__ == u"__main__":