2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends an empty IPv4 UDP datagram encapsulated in IPv6
16 and checks if is correctly decapsulated."""
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet6 import IPv6
22 from scapy.layers.inet import IP, UDP
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
28 def _is_udp_in_ipv4(pkt):
29 """If UDP is in IPv4 packet return True,
30 else return False. False is returned also if exception occurs."""
31 ipv4_type = int('0x0800', 16) # IPv4
33 if pkt.type == ipv4_type:
34 if pkt.payload.proto == 17: # UDP
36 except: # pylint: disable=bare-except
41 def main(): # pylint: disable=too-many-statements, too-many-locals
42 """Main function of the script file."""
43 args = TrafficScriptArg(['tx_dst_mac', 'tx_src_mac',
44 'tx_dst_ipv6', 'tx_src_ipv6',
45 'tx_dst_ipv4', 'tx_src_ipv4', 'tx_src_udp_port',
46 'rx_dst_mac', 'rx_src_mac'])
47 rx_if = args.get_arg('rx_if')
48 tx_if = args.get_arg('tx_if')
49 tx_src_mac = args.get_arg('tx_src_mac')
50 tx_dst_mac = args.get_arg('tx_dst_mac')
51 tx_dst_ipv6 = args.get_arg('tx_dst_ipv6')
52 tx_src_ipv6 = args.get_arg('tx_src_ipv6')
53 tx_dst_ipv4 = args.get_arg('tx_dst_ipv4')
54 tx_src_ipv4 = args.get_arg('tx_src_ipv4')
55 tx_src_udp_port = int(args.get_arg('tx_src_udp_port'))
56 tx_dst_udp_port = 20000
57 rx_dst_mac = args.get_arg('rx_dst_mac')
58 rx_src_mac = args.get_arg('rx_src_mac')
64 # Create empty UDP datagram in IPv4 and IPv6
65 tx_pkt = (Ether(dst=tx_dst_mac, src=tx_src_mac) /
66 IPv6(src=tx_src_ipv6, dst=tx_dst_ipv6) /
67 IP(src=tx_src_ipv4, dst=tx_dst_ipv4) /
68 UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port))
71 sent_packets.append(tx_pkt)
75 if _is_udp_in_ipv4(pkt):
79 raise RuntimeError("UDP in IPv4 Rx error.")
82 if ether.dst != rx_dst_mac:
83 raise RuntimeError("Destination MAC error {} != {}.".
84 format(ether.dst, rx_dst_mac))
85 print "Destination MAC: OK."
87 if ether.src != rx_src_mac:
88 raise RuntimeError("Source MAC error {} != {}.".
89 format(ether.src, rx_src_mac))
90 print "Source MAC: OK."
95 if ipv4.dst != tx_dst_ipv4:
96 raise RuntimeError("Destination IPv4 error {} != {}.".
97 format(ipv4.dst, tx_dst_ipv4))
98 print "Destination IPv4: OK."
100 if ipv4.src != tx_src_ipv4:
101 raise RuntimeError("Source IPv4 error {} != {}.".
102 format(ipv4.src, tx_src_ipv4))
103 print "Source IPv4: OK."
105 if ipv4.proto != 17: # UDP
106 raise RuntimeError("IPv4 protocol error {} != UDP.".
108 print "IPv4 protocol: OK."
113 if udp.dport != tx_dst_udp_port:
114 raise RuntimeError("UDP dport error {} != {}.".
115 format(udp.dport, tx_dst_udp_port))
116 print "UDP dport: OK."
118 if udp.sport != tx_src_udp_port:
119 raise RuntimeError("UDP sport error {} != {}.".
120 format(udp.sport, tx_src_udp_port))
121 print "UDP sport: OK."
125 if __name__ == "__main__":