Use only Qemu Kill on the teardown of double qemu setup
[csit.git] / resources / traffic_scripts / send_ipv6_udp_check_map_t.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an empty UDP datagram and checks if IPv6 addresses
16 are correctly translate to IPv4 addresses."""
17
18 import sys
19
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet6 import IPv6, UDP
22 from ipaddress import ip_address
23
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
26
27
28 def _check_udp_checksum(pkt):
29     """Check UDP checksum in IP packet. Return True if checksum is correct
30     else False."""
31     new = pkt.__class__(str(pkt))
32     del new['UDP'].chksum
33     new = new.__class__(str(new))
34     return new['UDP'].chksum == pkt['UDP'].chksum
35
36
37 def _is_udp_in_ipv4(pkt):
38     """If IPv4 protocol type in the given pkt is UDP, return True,
39     else return False. False is returned also if exception occurs."""
40     ipv4_type = int('0x0800', 16)  # IPv4
41     try:
42         if pkt.type == ipv4_type:
43             if pkt.payload.proto == 17:  # UDP
44                 return True
45     except AttributeError:
46         return False
47     return False
48
49
50 def main():  # pylint: disable=too-many-statements, too-many-locals
51     """Main function of the script file."""
52     args = TrafficScriptArg(['tx_dst_mac', 'tx_src_mac',
53                              'tx_src_ipv6', 'tx_dst_ipv6',
54                              'tx_src_udp_port', 'rx_dst_mac', 'rx_src_mac',
55                              'rx_src_ipv4', 'rx_dst_ipv4'])
56     rx_if = args.get_arg('rx_if')
57     tx_if = args.get_arg('tx_if')
58     tx_dst_mac = args.get_arg('tx_dst_mac')
59     tx_src_mac = args.get_arg('tx_src_mac')
60     tx_src_ipv6 = args.get_arg('tx_src_ipv6')
61     tx_dst_ipv6 = args.get_arg('tx_dst_ipv6')
62     tx_src_udp_port = int(args.get_arg('tx_src_udp_port'))
63     tx_dst_udp_port = 20000
64     rx_dst_mac = args.get_arg('rx_dst_mac')
65     rx_src_mac = args.get_arg('rx_src_mac')
66     rx_src_ipv4 = args.get_arg('rx_src_ipv4')
67     rx_dst_ipv4 = args.get_arg('rx_dst_ipv4')
68
69     rxq = RxQueue(rx_if)
70     txq = TxQueue(tx_if)
71     sent_packets = []
72
73     # Create empty UDP datagram in IPv6
74
75     udp = (Ether(dst=tx_dst_mac, src=tx_src_mac) /
76            IPv6(src=tx_src_ipv6, dst=tx_dst_ipv6) /
77            UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port) /
78            'udp_payload')
79
80     txq.send(udp)
81     sent_packets.append(udp)
82
83     for _ in range(5):
84         pkt = rxq.recv(2)
85         if _is_udp_in_ipv4(pkt):
86             ether = pkt
87             break
88     else:
89         raise RuntimeError("UDP in IPv4 Rx error.")
90
91     # check ethernet
92     if ether.dst != rx_dst_mac:
93         raise RuntimeError("Destination MAC error {} != {}.".
94                            format(ether.dst, rx_dst_mac))
95     print "Destination MAC: OK."
96
97     if ether.src != rx_src_mac:
98         raise RuntimeError("Source MAC error {} != {}.".
99                            format(ether.src, rx_src_mac))
100     print "Source MAC: OK."
101
102     ipv4 = ether.payload
103
104     # check ipv4
105     if ip_address(unicode(ipv4.dst)) != ip_address(unicode(rx_dst_ipv4)):
106         raise RuntimeError("Destination IPv4 error {} != {}.".
107                            format(ipv4.dst, rx_dst_ipv4))
108     print "Destination IPv4: OK."
109
110     if ip_address(unicode(ipv4.src)) != ip_address(unicode(rx_src_ipv4)):
111         raise RuntimeError("Source IPv4 error {} != {}.".
112                            format(ipv4.src, rx_src_ipv4))
113     print "Source IPv4: OK."
114
115     udp = ipv4.payload
116
117     # check udp
118     if udp.dport != tx_dst_udp_port:
119         raise RuntimeError("UDP dport error {} != {}.".
120                            format(udp.dport, tx_dst_udp_port))
121     print "UDP dport: OK."
122
123     if udp.sport != tx_src_udp_port:
124         raise RuntimeError("UDP sport error {} != {}.".
125                            format(udp.sport, tx_src_udp_port))
126     print "UDP sport: OK."
127
128     if not _check_udp_checksum(ipv4):
129         raise RuntimeError("UDP checksum error.")
130     print "UDP checksum OK."
131
132     sys.exit(0)
133
134 if __name__ == "__main__":
135     main()