2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends an empty UDP datagram and checks if IPv6 addresses
16 are correctly translate to IPv4 addresses."""
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet6 import IPv6, UDP
22 from ipaddress import ip_address
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
28 def _check_udp_checksum(pkt):
29 """Check UDP checksum in IP packet. Return True if checksum is correct
31 new = pkt.__class__(str(pkt))
33 new = new.__class__(str(new))
34 return new['UDP'].chksum == pkt['UDP'].chksum
37 def _is_udp_in_ipv4(pkt):
38 """If IPv4 protocol type in the given pkt is UDP, return True,
39 else return False. False is returned also if exception occurs."""
40 ipv4_type = int('0x0800', 16) # IPv4
42 if pkt.type == ipv4_type:
43 if pkt.payload.proto == 17: # UDP
45 except AttributeError:
50 def main(): # pylint: disable=too-many-statements, too-many-locals
51 """Main function of the script file."""
52 args = TrafficScriptArg(['tx_dst_mac', 'tx_src_ipv6', 'tx_dst_ipv6',
53 'tx_src_udp_port', 'rx_dst_mac', 'rx_src_mac',
54 'rx_src_ipv4', 'rx_dst_ipv4'])
55 rx_if = args.get_arg('rx_if')
56 tx_if = args.get_arg('tx_if')
57 tx_dst_mac = args.get_arg('tx_dst_mac')
58 tx_src_ipv6 = args.get_arg('tx_src_ipv6')
59 tx_dst_ipv6 = args.get_arg('tx_dst_ipv6')
60 tx_src_udp_port = int(args.get_arg('tx_src_udp_port'))
61 tx_dst_udp_port = 20000
62 rx_dst_mac = args.get_arg('rx_dst_mac')
63 rx_src_mac = args.get_arg('rx_src_mac')
64 rx_src_ipv4 = args.get_arg('rx_src_ipv4')
65 rx_dst_ipv4 = args.get_arg('rx_dst_ipv4')
71 # Create empty UDP datagram in IPv6
73 udp = (Ether(dst=tx_dst_mac) /
74 IPv6(src=tx_src_ipv6, dst=tx_dst_ipv6) /
75 UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port) /
79 sent_packets.append(udp)
83 if _is_udp_in_ipv4(pkt):
87 raise RuntimeError("UDP in IPv4 Rx error.")
90 if ether.dst != rx_dst_mac:
91 raise RuntimeError("Destination MAC error {} != {}.".
92 format(ether.dst, rx_dst_mac))
93 print "Destination MAC: OK."
95 if ether.src != rx_src_mac:
96 raise RuntimeError("Source MAC error {} != {}.".
97 format(ether.src, rx_src_mac))
98 print "Source MAC: OK."
103 if ip_address(unicode(ipv4.dst)) != ip_address(unicode(rx_dst_ipv4)):
104 raise RuntimeError("Destination IPv4 error {} != {}.".
105 format(ipv4.dst, rx_dst_ipv4))
106 print "Destination IPv4: OK."
108 if ip_address(unicode(ipv4.src)) != ip_address(unicode(rx_src_ipv4)):
109 raise RuntimeError("Source IPv4 error {} != {}.".
110 format(ipv4.src, rx_src_ipv4))
111 print "Source IPv4: OK."
116 if udp.dport != tx_dst_udp_port:
117 raise RuntimeError("UDP dport error {} != {}.".
118 format(udp.dport, tx_dst_udp_port))
119 print "UDP dport: OK."
121 if udp.sport != tx_src_udp_port:
122 raise RuntimeError("UDP sport error {} != {}.".
123 format(udp.sport, tx_src_udp_port))
124 print "UDP sport: OK."
126 if not _check_udp_checksum(ipv4):
127 raise RuntimeError("UDP checksum error.")
128 print "UDP checksum OK."
132 if __name__ == "__main__":