df6bc42da5d5fe3e30d191fc45da593e213d01bc
[csit.git] / resources / traffic_scripts / send_lw_4o6_check_hairpinning_udp.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an empty IPv4 UDP datagram encapsulated in IPv6
16 and checks if is correctly re-encapsulated."""
17
18 import sys
19
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet6 import IPv6
22 from scapy.layers.inet import IP, UDP
23
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
26
27
28 def _is_ipv4_in_ipv6(pkt):
29     """If IPv6 next header type in the given pkt is IPv4, return True,
30     else return False. False is returned also if exception occurs."""
31     ipv6_type = int('0x86dd', 16)  # IPv6
32     try:
33         if pkt.type == ipv6_type:
34             if pkt.payload.nh == 4:
35                 return True
36     except:  # pylint: disable=bare-except
37         return False
38     return False
39
40
41 def main():  # pylint: disable=too-many-statements, too-many-locals
42     """Main function of the script file."""
43     args = TrafficScriptArg(['tx_dst_mac',
44                              'tx_dst_ipv6', 'tx_src_ipv6',
45                              'tx_dst_ipv4', 'tx_src_ipv4',
46                              'tx_dst_udp_port', 'tx_src_udp_port',
47                              'rx_dst_mac', 'rx_src_mac',
48                              'rx_dst_ipv6', 'rx_src_ipv6'])
49     rx_if = args.get_arg('rx_if')
50     tx_if = args.get_arg('tx_if')
51     tx_dst_mac = args.get_arg('tx_dst_mac')
52     tx_src_mac = '02:00:00:00:00:01'
53
54     tx_dst_ipv6 = args.get_arg('tx_dst_ipv6')
55     tx_src_ipv6 = args.get_arg('tx_src_ipv6')
56     tx_dst_ipv4 = args.get_arg('tx_dst_ipv4')
57     tx_src_ipv4 = args.get_arg('tx_src_ipv4')
58     tx_dst_udp_port = int(args.get_arg('tx_dst_udp_port'))
59     tx_src_udp_port = int(args.get_arg('tx_src_udp_port'))
60     rx_dst_mac = args.get_arg('rx_dst_mac')
61     rx_src_mac = args.get_arg('rx_src_mac')
62     rx_dst_ipv6 = args.get_arg('rx_dst_ipv6')
63     rx_src_ipv6 = args.get_arg('rx_src_ipv6')
64
65     rxq = RxQueue(rx_if)
66     txq = TxQueue(tx_if)
67     sent_packets = []
68
69     # Create empty UDP datagram in IPv4 and IPv6
70     tx_pkt = Ether(dst=tx_dst_mac, src=tx_src_mac)
71     tx_pkt /= IPv6(src=tx_src_ipv6, dst=tx_dst_ipv6)
72     tx_pkt /= IP(src=tx_src_ipv4, dst=tx_dst_ipv4)
73     tx_pkt /= UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port)
74
75     txq.send(tx_pkt)
76     sent_packets.append(tx_pkt)
77
78     for _ in range(5):
79         pkt = rxq.recv(2, ignore=sent_packets)
80         if _is_ipv4_in_ipv6(pkt):
81             ether = pkt
82             break
83     else:
84         raise RuntimeError("IPv4 in IPv6 Rx error.")
85
86     # check ethernet
87     if ether.dst != rx_dst_mac:
88         raise RuntimeError("Destination MAC error {} != {}.".
89                            format(ether.dst, rx_dst_mac))
90     print "Destination MAC: OK."
91
92     if ether.src != rx_src_mac:
93         raise RuntimeError("Source MAC error {} != {}.".
94                            format(ether.src, rx_src_mac))
95     print "Source MAC: OK."
96
97     ipv6 = ether.payload
98
99     # check ipv6
100     if ipv6.dst != rx_dst_ipv6:
101         raise RuntimeError("Destination IPv6 error {} != {}.".
102                            format(ipv6.dst, rx_dst_ipv6))
103     print "Destination IPv6: OK."
104
105     if ipv6.src != rx_src_ipv6:
106         raise RuntimeError("Source IPv6 error {} != {}.".
107                            format(ipv6.src, rx_src_ipv6))
108     print "Source IPv6: OK."
109
110     ipv4 = ipv6.payload
111
112     # check ipv4
113     if ipv4.dst != tx_dst_ipv4:
114         raise RuntimeError("Destination IPv4 error {} != {}.".
115                            format(ipv4.dst, tx_dst_ipv4))
116     print "Destination IPv4: OK."
117
118     if ipv4.src != tx_src_ipv4:
119         raise RuntimeError("Source IPv4 error {} != {}.".
120                            format(ipv4.src, tx_src_ipv4))
121     print "Source IPv4: OK."
122
123     if ipv4.proto != 17:  # UDP
124         raise RuntimeError("IPv4 protocol error {} != UDP.".
125                            format(ipv4.proto))
126     print "IPv4 protocol: OK."
127
128     udp = ipv4.payload
129
130     # check udp
131     if udp.dport != tx_dst_udp_port:
132         raise RuntimeError("UDP dport error {} != {}.".
133                            format(udp.dport, tx_dst_udp_port))
134     print "UDP dport: OK."
135
136     if udp.sport != tx_src_udp_port:
137         raise RuntimeError("UDP sport error {} != {}.".
138                            format(udp.sport, tx_src_udp_port))
139     print "UDP sport: OK."
140
141     sys.exit(0)
142
143 if __name__ == "__main__":
144     main()