3 # Copyright (c) 2020 Cisco and/or its affiliates.
5 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
7 # Licensed under the Apache License 2.0 or
8 # GNU General Public License v2.0 or later; you may not use this file
9 # except in compliance with one of these Licenses. You
10 # may obtain a copy of the Licenses at:
12 # http://www.apache.org/licenses/LICENSE-2.0
13 # https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
15 # Note: If this file is linked with Scapy, which is GPLv2+, your use of it
16 # must be under GPLv2+. If at any point in the future it is no longer linked
17 # with Scapy (or other GPLv2+ licensed software), you are free to choose Apache 2.
19 # Unless required by applicable law or agreed to in writing, software
20 # distributed under the License is distributed on an "AS IS" BASIS,
21 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 # See the License for the specific language governing permissions and
23 # limitations under the License.
25 """Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface to
26 the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
31 from scapy.layers.inet import IP, UDP
32 from scapy.layers.l2 import Ether
33 from scapy.packet import Raw
35 from .PacketVerifier import RxQueue, TxQueue
36 from .TrafficScriptArg import TrafficScriptArg
41 """Send IP ICMPv4/ICMPv6 packet from one traffic generator interface to
42 the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
44 args = TrafficScriptArg(
46 u"tx_src_mac", u"tx_dst_mac", u"tx_src_ip", u"tx_dst_ip", u"tx_vni",
47 u"rx_src_ip", u"rx_dst_ip", u"rx_vni"
51 tx_if = args.get_arg(u"tx_if")
52 rx_if = args.get_arg(u"rx_if")
53 tx_src_mac = args.get_arg(u"tx_src_mac")
54 tx_dst_mac = args.get_arg(u"tx_dst_mac")
55 tx_src_ip = args.get_arg(u"tx_src_ip")
56 tx_dst_ip = args.get_arg(u"tx_dst_ip")
57 tx_vni = args.get_arg(u"tx_vni")
58 rx_src_ip = args.get_arg(u"rx_src_ip")
59 rx_dst_ip = args.get_arg(u"rx_dst_ip")
60 rx_vni = args.get_arg(u"rx_vni")
67 tx_pkt_p = (Ether(src=u"02:00:00:00:00:01", dst=u"02:00:00:00:00:02") /
68 IP(src=u"192.168.1.1", dst=u"192.168.1.2") /
69 UDP(sport=12345, dport=1234) /
72 pkt_raw = (Ether(src=tx_src_mac, dst=tx_dst_mac) /
73 IP(src=tx_src_ip, dst=tx_dst_ip) /
75 vxlan.VXLAN(vni=int(tx_vni)) /
79 # Send created packet on one interface and receive on the other
80 sent_packets.append(pkt_raw)
83 ether = rxq.recv(2, ignore=sent_packets)
85 # Check whether received packet contains layers Ether, IP and VXLAN
87 raise RuntimeError(u"Packet Rx timeout")
90 if ip.src != rx_src_ip:
91 raise RuntimeError(f"IP src mismatch {ip.src} != {rx_src_ip}")
92 if ip.dst != rx_dst_ip:
93 raise RuntimeError(f"IP dst mismatch {ip.dst} != {rx_dst_ip}")
94 if ip.payload.dport != 4789:
96 f"VXLAN UDP port mismatch {ip.payload.dport} != 4789"
98 vxlan_pkt = ip.payload.payload
100 if int(vxlan_pkt.vni) != int(rx_vni):
101 raise RuntimeError(u"vxlan mismatch")
102 rx_pkt_p = vxlan_pkt.payload
104 if rx_pkt_p.src != tx_pkt_p.src:
106 f"RX encapsulated MAC src mismatch {rx_pkt_p.src} != {tx_pkt_p.src}"
108 if rx_pkt_p.dst != tx_pkt_p.dst:
110 f"RX encapsulated MAC dst mismatch {rx_pkt_p.dst} != {tx_pkt_p.dst}"
112 if rx_pkt_p[IP].src != tx_pkt_p[IP].src:
114 f"RX encapsulated IP src mismatch {rx_pkt_p[IP].src} != "
115 f"{tx_pkt_p[IP].src}"
117 if rx_pkt_p[IP].dst != tx_pkt_p[IP].dst:
119 f"RX encapsulated IP dst mismatch {rx_pkt_p[IP].dst} != "
120 f"{tx_pkt_p[IP].dst}"
123 # TODO: verify inner Ether()
128 if __name__ == u"__main__":