2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 """Traffic script that sends an empty IPv4 UDP datagram encapsulated in IPv6
16 and checks if is correctly re-encapsulated."""
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet6 import IPv6
22 from scapy.layers.inet import IP, UDP
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
28 def _is_ipv4_in_ipv6(pkt):
29 """If IPv6 next header type in the given pkt is IPv4, return True,
30 else return False. False is returned also if exception occurs."""
31 ipv6_type = int('0x86dd', 16) # IPv6
33 if pkt.type == ipv6_type:
34 if pkt.payload.nh == 4:
36 except: # pylint: disable=bare-except
41 def main(): # pylint: disable=too-many-statements, too-many-locals
42 """Main function of the script file."""
43 args = TrafficScriptArg(['tx_dst_mac',
44 'tx_dst_ipv6', 'tx_src_ipv6',
45 'tx_dst_ipv4', 'tx_src_ipv4',
46 'tx_dst_udp_port', 'tx_src_udp_port',
47 'rx_dst_mac', 'rx_src_mac',
48 'rx_dst_ipv6', 'rx_src_ipv6'])
49 rx_if = args.get_arg('rx_if')
50 tx_if = args.get_arg('tx_if')
51 tx_dst_mac = args.get_arg('tx_dst_mac')
52 tx_src_mac = '02:00:00:00:00:01'
54 tx_dst_ipv6 = args.get_arg('tx_dst_ipv6')
55 tx_src_ipv6 = args.get_arg('tx_src_ipv6')
56 tx_dst_ipv4 = args.get_arg('tx_dst_ipv4')
57 tx_src_ipv4 = args.get_arg('tx_src_ipv4')
58 tx_dst_udp_port = int(args.get_arg('tx_dst_udp_port'))
59 tx_src_udp_port = int(args.get_arg('tx_src_udp_port'))
60 rx_dst_mac = args.get_arg('rx_dst_mac')
61 rx_src_mac = args.get_arg('rx_src_mac')
62 rx_dst_ipv6 = args.get_arg('rx_dst_ipv6')
63 rx_src_ipv6 = args.get_arg('rx_src_ipv6')
69 # Create empty UDP datagram in IPv4 and IPv6
70 tx_pkt = Ether(dst=tx_dst_mac, src=tx_src_mac)
71 tx_pkt /= IPv6(src=tx_src_ipv6, dst=tx_dst_ipv6)
72 tx_pkt /= IP(src=tx_src_ipv4, dst=tx_dst_ipv4)
73 tx_pkt /= UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port)
74 tx_pkt /= 'udp_payload'
77 sent_packets.append(tx_pkt)
80 pkt = rxq.recv(2, ignore=sent_packets)
81 if _is_ipv4_in_ipv6(pkt):
85 raise RuntimeError("IPv4 in IPv6 Rx error.")
88 if ether.dst != rx_dst_mac:
89 raise RuntimeError("Destination MAC error {} != {}.".
90 format(ether.dst, rx_dst_mac))
91 print "Destination MAC: OK."
93 if ether.src != rx_src_mac:
94 raise RuntimeError("Source MAC error {} != {}.".
95 format(ether.src, rx_src_mac))
96 print "Source MAC: OK."
101 if ipv6.dst != rx_dst_ipv6:
102 raise RuntimeError("Destination IPv6 error {} != {}.".
103 format(ipv6.dst, rx_dst_ipv6))
104 print "Destination IPv6: OK."
106 if ipv6.src != rx_src_ipv6:
107 raise RuntimeError("Source IPv6 error {} != {}.".
108 format(ipv6.src, rx_src_ipv6))
109 print "Source IPv6: OK."
114 if ipv4.dst != tx_dst_ipv4:
115 raise RuntimeError("Destination IPv4 error {} != {}.".
116 format(ipv4.dst, tx_dst_ipv4))
117 print "Destination IPv4: OK."
119 if ipv4.src != tx_src_ipv4:
120 raise RuntimeError("Source IPv4 error {} != {}.".
121 format(ipv4.src, tx_src_ipv4))
122 print "Source IPv4: OK."
124 if ipv4.proto != 17: # UDP
125 raise RuntimeError("IPv4 protocol error {} != UDP.".
127 print "IPv4 protocol: OK."
132 if udp.dport != tx_dst_udp_port:
133 raise RuntimeError("UDP dport error {} != {}.".
134 format(udp.dport, tx_dst_udp_port))
135 print "UDP dport: OK."
137 if udp.sport != tx_src_udp_port:
138 raise RuntimeError("UDP sport error {} != {}.".
139 format(udp.sport, tx_src_udp_port))
140 print "UDP sport: OK."
144 if __name__ == "__main__":