2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends an empty IPv4 UDP datagram encapsulated in IPv6
16 and checks if is correctly re-encapsulated."""
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet6 import IPv6
22 from scapy.layers.inet import IP, UDP
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
28 def _is_ipv4_in_ipv6(pkt):
29 """If IPv6 next header type in the given pkt is IPv4, return True,
30 else return False. False is returned also if exception occurs."""
31 ipv6_type = int('0x86dd', 16) # IPv6
33 if pkt.type == ipv6_type:
34 if pkt.payload.nh == 4:
36 except: # pylint: disable=bare-except
41 def main(): # pylint: disable=too-many-statements, too-many-locals
42 """Main function of the script file."""
43 args = TrafficScriptArg(['tx_dst_mac',
44 'tx_dst_ipv6', 'tx_src_ipv6',
45 'tx_dst_ipv4', 'tx_src_ipv4',
46 'tx_dst_udp_port', 'tx_src_udp_port',
47 'rx_dst_mac', 'rx_src_mac',
48 'rx_dst_ipv6', 'rx_src_ipv6'])
49 rx_if = args.get_arg('rx_if')
50 tx_if = args.get_arg('tx_if')
51 tx_dst_mac = args.get_arg('tx_dst_mac')
52 tx_src_mac = '02:00:00:00:00:01'
54 tx_dst_ipv6 = args.get_arg('tx_dst_ipv6')
55 tx_src_ipv6 = args.get_arg('tx_src_ipv6')
56 tx_dst_ipv4 = args.get_arg('tx_dst_ipv4')
57 tx_src_ipv4 = args.get_arg('tx_src_ipv4')
58 tx_dst_udp_port = int(args.get_arg('tx_dst_udp_port'))
59 tx_src_udp_port = int(args.get_arg('tx_src_udp_port'))
60 rx_dst_mac = args.get_arg('rx_dst_mac')
61 rx_src_mac = args.get_arg('rx_src_mac')
62 rx_dst_ipv6 = args.get_arg('rx_dst_ipv6')
63 rx_src_ipv6 = args.get_arg('rx_src_ipv6')
69 # Create empty UDP datagram in IPv4 and IPv6
70 tx_pkt = Ether(dst=tx_dst_mac, src=tx_src_mac)
71 tx_pkt /= IPv6(src=tx_src_ipv6, dst=tx_dst_ipv6)
72 tx_pkt /= IP(src=tx_src_ipv4, dst=tx_dst_ipv4)
73 tx_pkt /= UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port)
76 sent_packets.append(tx_pkt)
79 pkt = rxq.recv(2, ignore=sent_packets)
80 if _is_ipv4_in_ipv6(pkt):
84 raise RuntimeError("IPv4 in IPv6 Rx error.")
87 if ether.dst != rx_dst_mac:
88 raise RuntimeError("Destination MAC error {} != {}.".
89 format(ether.dst, rx_dst_mac))
90 print "Destination MAC: OK."
92 if ether.src != rx_src_mac:
93 raise RuntimeError("Source MAC error {} != {}.".
94 format(ether.src, rx_src_mac))
95 print "Source MAC: OK."
100 if ipv6.dst != rx_dst_ipv6:
101 raise RuntimeError("Destination IPv6 error {} != {}.".
102 format(ipv6.dst, rx_dst_ipv6))
103 print "Destination IPv6: OK."
105 if ipv6.src != rx_src_ipv6:
106 raise RuntimeError("Source IPv6 error {} != {}.".
107 format(ipv6.src, rx_src_ipv6))
108 print "Source IPv6: OK."
113 if ipv4.dst != tx_dst_ipv4:
114 raise RuntimeError("Destination IPv4 error {} != {}.".
115 format(ipv4.dst, tx_dst_ipv4))
116 print "Destination IPv4: OK."
118 if ipv4.src != tx_src_ipv4:
119 raise RuntimeError("Source IPv4 error {} != {}.".
120 format(ipv4.src, tx_src_ipv4))
121 print "Source IPv4: OK."
123 if ipv4.proto != 17: # UDP
124 raise RuntimeError("IPv4 protocol error {} != UDP.".
126 print "IPv4 protocol: OK."
131 if udp.dport != tx_dst_udp_port:
132 raise RuntimeError("UDP dport error {} != {}.".
133 format(udp.dport, tx_dst_udp_port))
134 print "UDP dport: OK."
136 if udp.sport != tx_src_udp_port:
137 raise RuntimeError("UDP sport error {} != {}.".
138 format(udp.sport, tx_src_udp_port))
139 print "UDP sport: OK."
143 if __name__ == "__main__":