Pylint fixes
[csit.git] / resources / traffic_scripts / send_ipv4_udp_check_map_t.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends a UDP datagram and checks if IPv4 addresses
16 are correctly translate to IPv6 addresses."""
17
18 import sys
19
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet import IP, UDP
22 from ipaddress import ip_address
23
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
26
27
28 def _check_udp_checksum(pkt):
29     """Check UDP checksum in IP packet. Return True if checksum is correct
30     else False."""
31     new = pkt.__class__(str(pkt))
32     del new['UDP'].chksum
33     new = new.__class__(str(new))
34     return new['UDP'].chksum == pkt['UDP'].chksum
35
36
37 def _is_udp_in_ipv6(pkt):
38     """If IPv6 next header type in the given pkt is UDP, return True,
39     else return False. False is returned also if exception occurs."""
40     ipv6_type = int('0x86dd', 16)  # IPv6
41     try:
42         if pkt.type == ipv6_type:
43             if pkt.payload.nh == 17:  # UDP
44                 return True
45     except AttributeError:
46         return False
47     return False
48
49
50 def main():  # pylint: disable=too-many-statements, too-many-locals
51     """Main function of the script file."""
52     args = TrafficScriptArg(['tx_dst_mac', 'tx_src_ipv4', 'tx_dst_ipv4',
53                              'tx_dst_udp_port', 'rx_dst_mac', 'rx_src_mac',
54                              'rx_src_ipv6', 'rx_dst_ipv6'])
55     rx_if = args.get_arg('rx_if')
56     tx_if = args.get_arg('tx_if')
57     tx_dst_mac = args.get_arg('tx_dst_mac')
58     tx_src_ipv4 = args.get_arg('tx_src_ipv4')
59     tx_dst_ipv4 = args.get_arg('tx_dst_ipv4')
60     tx_dst_udp_port = int(args.get_arg('tx_dst_udp_port'))
61     tx_src_udp_port = 20000
62     rx_dst_mac = args.get_arg('rx_dst_mac')
63     rx_src_mac = args.get_arg('rx_src_mac')
64     rx_src_ipv6 = args.get_arg('rx_src_ipv6')
65     rx_dst_ipv6 = args.get_arg('rx_dst_ipv6')
66
67     rxq = RxQueue(rx_if)
68     txq = TxQueue(tx_if)
69     sent_packets = []
70
71     # Create empty UDP datagram
72     udp = (Ether(dst=tx_dst_mac) /
73            IP(src=tx_src_ipv4, dst=tx_dst_ipv4) /
74            UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port) /
75            'udp_payload')
76
77     txq.send(udp)
78     sent_packets.append(udp)
79
80     for _ in range(5):
81         pkt = rxq.recv(2)
82         if _is_udp_in_ipv6(pkt):
83             ether = pkt
84             break
85     else:
86         raise RuntimeError("UDP in IPv6 Rx error.")
87
88     # check ethernet
89     if ether.dst != rx_dst_mac:
90         raise RuntimeError("Destination MAC error {} != {}.".
91                            format(ether.dst, rx_dst_mac))
92     print "Destination MAC: OK."
93
94     if ether.src != rx_src_mac:
95         raise RuntimeError("Source MAC error {} != {}.".
96                            format(ether.src, rx_src_mac))
97     print "Source MAC: OK."
98
99     ipv6 = ether.payload
100
101     # check ipv6
102     if ip_address(unicode(ipv6.dst)) != ip_address(unicode(rx_dst_ipv6)):
103         raise RuntimeError("Destination IP error {} != {}.".
104                            format(ipv6.dst, rx_dst_ipv6))
105     print "Destination IPv6: OK."
106
107     if ip_address(unicode(ipv6.src)) != ip_address(unicode(rx_src_ipv6)):
108         raise RuntimeError("Source IP error {} != {}.".
109                            format(ipv6.src, rx_src_ipv6))
110     print "Source IPv6: OK."
111
112     udp = ipv6.payload
113
114     # check udp
115     if udp.dport != tx_dst_udp_port:
116         raise RuntimeError("UDP dport error {} != {}.".
117                            format(udp.dport, tx_dst_udp_port))
118     print "UDP dport: OK."
119
120     if udp.sport != tx_src_udp_port:
121         raise RuntimeError("UDP sport error {} != {}.".
122                            format(udp.sport, tx_src_udp_port))
123     print "UDP sport: OK."
124
125     if not _check_udp_checksum(ipv6):
126         raise RuntimeError("UDP checksum error.")
127     print "UDP checksum OK."
128
129     sys.exit(0)
130
131 if __name__ == "__main__":
132     main()