2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends an empty UDP datagram and checks if IPv4 is
16 correctly encapsulated into IPv6 packet."""
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet import IP, UDP
22 from ipaddress import ip_address
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
28 def _is_ipv4_in_ipv6(pkt):
29 """If IPv6 next header type in the given pkt is IPv4, return True,
30 else return False. False is returned also if exception occurs."""
31 ipv6_type = int('0x86dd', 16) # IPv6
33 if pkt.type == ipv6_type:
34 if pkt.payload.nh == 4:
36 except: # pylint: disable=bare-except
41 def main(): # pylint: disable=too-many-statements, too-many-locals
42 """Main function of the script file."""
43 args = TrafficScriptArg(['tx_dst_mac', 'tx_src_ipv4', 'tx_dst_ipv4',
44 'tx_dst_udp_port', 'rx_dst_mac', 'rx_src_mac',
45 'src_ipv6', 'dst_ipv6'])
46 rx_if = args.get_arg('rx_if')
47 tx_if = args.get_arg('tx_if')
48 tx_dst_mac = args.get_arg('tx_dst_mac')
49 tx_src_ipv4 = args.get_arg('tx_src_ipv4')
50 tx_dst_ipv4 = args.get_arg('tx_dst_ipv4')
51 tx_dst_udp_port = int(args.get_arg('tx_dst_udp_port'))
52 tx_src_udp_port = 20000
53 rx_dst_mac = args.get_arg('rx_dst_mac')
54 rx_src_mac = args.get_arg('rx_src_mac')
55 rx_src_ipv6 = args.get_arg('src_ipv6')
56 rx_dst_ipv6 = args.get_arg('dst_ipv6')
62 # Create empty UDP datagram
63 udp = (Ether(dst=tx_dst_mac) /
64 IP(src=tx_src_ipv4, dst=tx_dst_ipv4) /
65 UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port))
68 sent_packets.append(udp)
72 if _is_ipv4_in_ipv6(pkt):
76 raise RuntimeError("IPv4 in IPv6 Rx error.")
79 if ether.dst != rx_dst_mac:
80 raise RuntimeError("Destination MAC error {} != {}.".
81 format(ether.dst, rx_dst_mac))
82 print "Destination MAC: OK."
84 if ether.src != rx_src_mac:
85 raise RuntimeError("Source MAC error {} != {}.".
86 format(ether.src, rx_src_mac))
87 print "Source MAC: OK."
92 if ip_address(unicode(ipv6.dst)) != ip_address(unicode(rx_dst_ipv6)):
93 raise RuntimeError("Destination IP error {} != {}.".
94 format(ipv6.dst, rx_dst_ipv6))
95 print "Destination IPv6: OK."
97 if ip_address(unicode(ipv6.src)) != ip_address(unicode(rx_src_ipv6)):
98 raise RuntimeError("Source IP error {} != {}.".
99 format(ipv6.src, rx_src_ipv6))
100 print "Source IPv6: OK."
105 if ipv4.dst != tx_dst_ipv4:
106 raise RuntimeError("Destination IP error {} != {}.".
107 format(ipv4.dst, tx_dst_ipv4))
108 print "Destination IPv4: OK."
110 if ipv4.src != tx_src_ipv4:
111 raise RuntimeError("Source IP error {} != {}.".
112 format(ipv4.src, tx_src_ipv4))
113 print "Source IPv4: OK."
115 if ipv4.proto != 17: # UDP
116 raise RuntimeError("IP protocol error {} != UDP.".
118 print "IPv4 protocol: OK."
123 if udp.dport != tx_dst_udp_port:
124 raise RuntimeError("UDP dport error {} != {}.".
125 format(udp.dport, tx_dst_udp_port))
126 print "UDP dport: OK."
128 if udp.sport != tx_src_udp_port:
129 raise RuntimeError("UDP sport error {} != {}.".
130 format(udp.sport, tx_src_udp_port))
131 print "UDP sport: OK."
135 if __name__ == "__main__":