2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends an empty UDP datagram and checks if IPv6 addresses
16 are correctly translate to IPv4 addresses."""
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet6 import IPv6, UDP
22 from ipaddress import ip_address
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
28 def _check_udp_checksum(pkt):
29 """Check UDP checksum in IP packet. Return True if checksum is correct
31 new = pkt.__class__(str(pkt))
33 new = new.__class__(str(new))
34 return new['UDP'].chksum == pkt['UDP'].chksum
37 def _is_udp_in_ipv4(pkt):
38 """If IPv4 protocol type in the given pkt is UDP, return True,
39 else return False. False is returned also if exception occurs."""
40 ipv4_type = int('0x0800', 16) # IPv4
42 if pkt.type == ipv4_type:
43 if pkt.payload.proto == 17: # UDP
45 except AttributeError:
50 def main(): # pylint: disable=too-many-statements, too-many-locals
51 """Main function of the script file."""
52 args = TrafficScriptArg(['tx_dst_mac', 'tx_src_mac',
53 'tx_src_ipv6', 'tx_dst_ipv6',
54 'tx_src_udp_port', 'rx_dst_mac', 'rx_src_mac',
55 'rx_src_ipv4', 'rx_dst_ipv4'])
56 rx_if = args.get_arg('rx_if')
57 tx_if = args.get_arg('tx_if')
58 tx_dst_mac = args.get_arg('tx_dst_mac')
59 tx_src_mac = args.get_arg('tx_src_mac')
60 tx_src_ipv6 = args.get_arg('tx_src_ipv6')
61 tx_dst_ipv6 = args.get_arg('tx_dst_ipv6')
62 tx_src_udp_port = int(args.get_arg('tx_src_udp_port'))
63 tx_dst_udp_port = 20000
64 rx_dst_mac = args.get_arg('rx_dst_mac')
65 rx_src_mac = args.get_arg('rx_src_mac')
66 rx_src_ipv4 = args.get_arg('rx_src_ipv4')
67 rx_dst_ipv4 = args.get_arg('rx_dst_ipv4')
73 # Create empty UDP datagram in IPv6
75 udp = (Ether(dst=tx_dst_mac, src=tx_src_mac) /
76 IPv6(src=tx_src_ipv6, dst=tx_dst_ipv6) /
77 UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port) /
81 sent_packets.append(udp)
85 if _is_udp_in_ipv4(pkt):
89 raise RuntimeError("UDP in IPv4 Rx error.")
92 if ether.dst != rx_dst_mac:
93 raise RuntimeError("Destination MAC error {} != {}.".
94 format(ether.dst, rx_dst_mac))
95 print "Destination MAC: OK."
97 if ether.src != rx_src_mac:
98 raise RuntimeError("Source MAC error {} != {}.".
99 format(ether.src, rx_src_mac))
100 print "Source MAC: OK."
105 if ip_address(unicode(ipv4.dst)) != ip_address(unicode(rx_dst_ipv4)):
106 raise RuntimeError("Destination IPv4 error {} != {}.".
107 format(ipv4.dst, rx_dst_ipv4))
108 print "Destination IPv4: OK."
110 if ip_address(unicode(ipv4.src)) != ip_address(unicode(rx_src_ipv4)):
111 raise RuntimeError("Source IPv4 error {} != {}.".
112 format(ipv4.src, rx_src_ipv4))
113 print "Source IPv4: OK."
118 if udp.dport != tx_dst_udp_port:
119 raise RuntimeError("UDP dport error {} != {}.".
120 format(udp.dport, tx_dst_udp_port))
121 print "UDP dport: OK."
123 if udp.sport != tx_src_udp_port:
124 raise RuntimeError("UDP sport error {} != {}.".
125 format(udp.sport, tx_src_udp_port))
126 print "UDP sport: OK."
128 if not _check_udp_checksum(ipv4):
129 raise RuntimeError("UDP checksum error.")
130 print "UDP checksum OK."
134 if __name__ == "__main__":