2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends an empty UDP datagram and checks if IPv4 is
16 correctly encapsulated into IPv6 packet."""
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet import IP, UDP
23 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
24 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
27 def _is_ipv4_in_ipv6(pkt):
28 """If IPv6 next header type in the given pkt is IPv4, return True,
29 else return False. False is returned also if exception occurs."""
30 ipv6_type = int('0x86dd', 16) # IPv6
32 if pkt.type == ipv6_type:
33 if pkt.payload.nh == 4:
35 except: # pylint: disable=bare-except
40 def main(): # pylint: disable=too-many-statements, too-many-locals
41 """Main function of the script file."""
42 args = TrafficScriptArg(['tx_dst_mac', 'tx_src_ipv4', 'tx_dst_ipv4',
43 'tx_dst_udp_port', 'rx_dst_mac', 'rx_src_mac',
44 'src_ipv6', 'dst_ipv6'])
45 rx_if = args.get_arg('rx_if')
46 tx_if = args.get_arg('tx_if')
47 tx_dst_mac = args.get_arg('tx_dst_mac')
48 tx_src_ipv4 = args.get_arg('tx_src_ipv4')
49 tx_dst_ipv4 = args.get_arg('tx_dst_ipv4')
50 tx_dst_udp_port = int(args.get_arg('tx_dst_udp_port'))
51 tx_src_udp_port = 20000
52 rx_dst_mac = args.get_arg('rx_dst_mac')
53 rx_src_mac = args.get_arg('rx_src_mac')
54 rx_src_ipv6 = args.get_arg('src_ipv6')
55 rx_dst_ipv6 = args.get_arg('dst_ipv6')
61 # Create empty UDP datagram
62 udp = (Ether(dst=tx_dst_mac) /
63 IP(src=tx_src_ipv4, dst=tx_dst_ipv4) /
64 UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port))
67 sent_packets.append(udp)
71 if _is_ipv4_in_ipv6(pkt):
75 raise RuntimeError("IPv4 in IPv6 Rx error.")
78 if ether.dst != rx_dst_mac:
79 raise RuntimeError("Destination MAC error {} != {}.".
80 format(ether.dst, rx_dst_mac))
81 print "Destination MAC: OK."
83 if ether.src != rx_src_mac:
84 raise RuntimeError("Source MAC error {} != {}.".
85 format(ether.src, rx_src_mac))
86 print "Source MAC: OK."
91 if ipv6.dst != rx_dst_ipv6:
92 raise RuntimeError("Destination IP error {} != {}.".
93 format(ipv6.dst, rx_dst_ipv6))
94 print "Destination IPv6: OK."
96 if ipv6.src != rx_src_ipv6:
97 raise RuntimeError("Source IP error {} != {}.".
98 format(ipv6.src, rx_src_ipv6))
99 print "Source IPv6: OK."
104 if ipv4.dst != tx_dst_ipv4:
105 raise RuntimeError("Destination IP error {} != {}.".
106 format(ipv4.dst, tx_dst_ipv4))
107 print "Destination IPv4: OK."
109 if ipv4.src != tx_src_ipv4:
110 raise RuntimeError("Source IP error {} != {}.".
111 format(ipv4.src, tx_src_ipv4))
112 print "Source IPv4: OK."
114 if ipv4.proto != 17: # UDP
115 raise RuntimeError("IP protocol error {} != UDP.".
117 print "IPv4 protocol: OK."
122 if udp.dport != tx_dst_udp_port:
123 raise RuntimeError("UDP dport error {} != {}.".
124 format(udp.dport, tx_dst_udp_port))
125 print "UDP dport: OK."
127 if udp.sport != tx_src_udp_port:
128 raise RuntimeError("UDP sport error {} != {}.".
129 format(udp.sport, tx_src_udp_port))
130 print "UDP sport: OK."
134 if __name__ == "__main__":