2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends an IP ICMPv4/ICMPv6 packet from one interface to
16 the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
22 from scapy.layers.l2 import Ether, Dot1Q
23 from scapy.layers.inet import ICMP, IP
24 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
26 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
27 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
31 """Check if IP address has the correct IPv4 address format.
33 :param ip: IP address.
35 :return: True in case of correct IPv4 address format,
36 otherwise return false.
40 ipaddress.IPv4Address(unicode(ip))
42 except (AttributeError, ipaddress.AddressValueError):
47 """Check if IP address has the correct IPv6 address format.
49 :param ip: IP address.
51 :return: True in case of correct IPv6 address format,
52 otherwise return false.
56 ipaddress.IPv6Address(unicode(ip))
58 except (AttributeError, ipaddress.AddressValueError):
63 """Send IP ICMPv4/ICMPv6 packet from one traffic generator interface to
64 the other one. Dot1q or Dot1ad tagging of the ethernet frame can be set.
66 args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip'],
67 ['encaps', 'vlan1', 'vlan2', 'encaps_rx',
68 'vlan1_rx', 'vlan2_rx'])
70 src_mac = args.get_arg('src_mac')
71 dst_mac = args.get_arg('dst_mac')
72 src_ip = args.get_arg('src_ip')
73 dst_ip = args.get_arg('dst_ip')
75 encaps = args.get_arg('encaps')
76 vlan1 = args.get_arg('vlan1')
77 vlan2 = args.get_arg('vlan2')
78 encaps_rx = args.get_arg('encaps_rx')
79 vlan1_rx = args.get_arg('vlan1_rx')
80 vlan2_rx = args.get_arg('vlan2_rx')
82 tx_if = args.get_arg('tx_if')
83 rx_if = args.get_arg('rx_if')
91 # Create empty ip ICMP packet and add padding before sending
92 if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
94 pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
95 Dot1Q(vlan=int(vlan1)) /
96 IP(src=src_ip, dst=dst_ip) /
98 elif encaps == 'Dot1ad':
99 pkt_raw = (Ether(src=src_mac, dst=dst_mac, type=0x88A8) /
100 Dot1Q(vlan=int(vlan1), type=0x8100) /
101 Dot1Q(vlan=int(vlan2)) /
102 IP(src=src_ip, dst=dst_ip) /
105 pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
106 IP(src=src_ip, dst=dst_ip) /
110 elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
111 if encaps == 'Dot1q':
112 pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
113 Dot1Q(vlan=int(vlan1)) /
114 IPv6(src=src_ip, dst=dst_ip) /
116 elif encaps == 'Dot1ad':
117 pkt_raw = (Ether(src=src_mac, dst=dst_mac, type=0x88A8) /
118 Dot1Q(vlan=int(vlan1), type=0x8100) /
119 Dot1Q(vlan=int(vlan2)) /
120 IPv6(src=src_ip, dst=dst_ip) /
123 pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
124 IPv6(src=src_ip, dst=dst_ip) /
127 icmp_format = ICMPv6EchoRequest
129 raise ValueError("IP(s) not in correct format")
131 # Send created packet on one interface and receive on the other
132 sent_packets.append(pkt_raw)
135 # Receive ICMP / ICMPv6 echo reply
139 raise RuntimeError('ICMP echo Rx timeout')
141 if ether.haslayer(ICMPv6ND_NS):
142 # read another packet in the queue if the current one is ICMPv6ND_NS
145 # otherwise process the current packet
148 # Check whether received packet contains layers IP/IPv6 and
149 # ICMP/ICMPv6EchoRequest
151 if encaps_rx == 'Dot1q':
154 if not ether.haslayer(Dot1Q):
155 raise RuntimeError('Not VLAN tagged Eth frame received:\n{0}'.
156 format(ether.__repr__()))
157 elif ether[Dot1Q].vlan != int(vlan1_rx):
158 raise RuntimeError('Ethernet frame with wrong VLAN tag ({}) '
159 'received ({} expected):\n{}'.
160 format(ether[Dot1Q].vlan, vlan1_rx,
162 elif encaps_rx == 'Dot1ad':
168 raise RuntimeError('Encapsulation {0} not implemented yet.'.
171 raise RuntimeError('Unsupported encapsulation expected: {0}'.
174 if not ether.haslayer(ip_format):
175 raise RuntimeError('Not an IP/IPv6 packet received:\n{0}'.
176 format(ether.__repr__()))
178 if not ether.haslayer(icmp_format):
179 raise RuntimeError('Not an ICMP/ICMPv6EchoRequest packet received:\n'
180 '{0}'.format(ether.__repr__()))
185 if __name__ == "__main__":