3 # Copyright (c) 2020 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 """Traffic script that sends an IP IPv4/IPv6 packet from one interface
17 to the other. Source and destination IP addresses and source and destination
18 MAC addresses are checked in received packet.
25 from robot.api import logger
26 from scapy.layers.inet import IP
27 from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
28 from scapy.layers.l2 import Ether, Dot1Q
29 from scapy.packet import Raw
31 from .PacketVerifier import RxQueue, TxQueue
32 from .TrafficScriptArg import TrafficScriptArg
37 ipaddress.IPv4Address(ip)
39 except (AttributeError, ipaddress.AddressValueError):
45 ipaddress.IPv6Address(ip)
47 except (AttributeError, ipaddress.AddressValueError):
52 """Send IP/IPv6 packet from one traffic generator interface to the other."""
53 args = TrafficScriptArg(
55 u"tg_src_mac", u"tg_dst_mac", u"src_ip", u"dst_ip", u"dut_if1_mac",
59 u"encaps_tx", u"vlan_tx", u"vlan_outer_tx", u"encaps_rx",
60 u"vlan_rx", u"vlan_outer_rx"
64 tx_src_mac = args.get_arg(u"tg_src_mac")
65 tx_dst_mac = args.get_arg(u"dut_if1_mac")
66 rx_dst_mac = args.get_arg(u"tg_dst_mac")
67 rx_src_mac = args.get_arg(u"dut_if2_mac")
68 src_ip = args.get_arg(u"src_ip")
69 dst_ip = args.get_arg(u"dst_ip")
70 tx_if = args.get_arg(u"tx_if")
71 rx_if = args.get_arg(u"rx_if")
73 encaps_tx = args.get_arg(u"encaps_tx")
74 vlan_tx = args.get_arg(u"vlan_tx")
75 vlan_outer_tx = args.get_arg(u"vlan_outer_tx")
76 encaps_rx = args.get_arg(u"encaps_rx")
77 vlan_rx = args.get_arg(u"vlan_rx")
78 vlan_outer_rx = args.get_arg(u"vlan_outer_rx")
84 pkt_raw = Ether(src=tx_src_mac, dst=tx_dst_mac)
86 if encaps_tx == u"Dot1q":
87 pkt_raw /= Dot1Q(vlan=int(vlan_tx))
88 elif encaps_tx == u"Dot1ad":
90 pkt_raw /= Dot1Q(vlan=vlan_outer_tx)
91 pkt_raw /= Dot1Q(vlan=vlan_tx)
93 if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
94 pkt_raw /= IP(src=src_ip, dst=dst_ip, proto=61)
96 elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
97 pkt_raw /= IPv6(src=src_ip, dst=dst_ip)
100 raise ValueError(u"IP not in correct format")
103 sent_packets.append(pkt_raw)
108 ether = rxq.recv(2, ignore=sent_packets)
113 raise RuntimeError(u"IP packet Rx timeout")
115 if ether.haslayer(ICMPv6ND_NS):
116 # read another packet in the queue if the current one is ICMPv6ND_NS
119 # otherwise process the current packet
122 if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
123 logger.trace(u"MAC matched")
125 raise RuntimeError(f"Matching packet unsuccessful: {ether!r}")
127 if encaps_rx == u"Dot1q":
128 if ether[Dot1Q].vlan == int(vlan_rx):
129 logger.trace(u"VLAN matched")
132 f"Ethernet frame with wrong VLAN tag "
133 f"({ether[Dot1Q].vlan}-received, "
134 f"{vlan_rx}-expected):\n{ether!r}"
136 ip = ether[Dot1Q].payload
137 elif encaps_rx == u"Dot1ad":
138 raise NotImplementedError()
142 if not isinstance(ip, ip_format):
143 raise RuntimeError(f"Not an IP packet received {ip!r}")
145 # Compare data from packets
147 logger.trace(u"Src IP matched")
150 f"Matching Src IP unsuccessful: {src_ip} != {ip.src}"
154 logger.trace(u"Dst IP matched")
157 f"Matching Dst IP unsuccessful: {dst_ip} != {ip.dst}"
163 if __name__ == u"__main__":