af117c19beeb253e0c5c9a7740174f3119aad469
[csit.git] / resources / traffic_scripts / send_ipv6_udp_check_map_t.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an empty UDP datagram and checks if IPv6 addresses
16 are correctly translate to IPv4 addresses."""
17
18 import sys
19
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet6 import IPv6, UDP
22 from ipaddress import ip_address
23
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
26
27
28 def _check_udp_checksum(pkt):
29     """Check UDP checksum in IP packet. Return True if checksum is correct
30     else False."""
31     new = pkt.__class__(str(pkt))
32     del new['UDP'].chksum
33     new = new.__class__(str(new))
34     return new['UDP'].chksum == pkt['UDP'].chksum
35
36
37 def _is_udp_in_ipv4(pkt):
38     """If IPv4 protocol type in the given pkt is UDP, return True,
39     else return False. False is returned also if exception occurs."""
40     ipv4_type = int('0x0800', 16)  # IPv4
41     try:
42         if pkt.type == ipv4_type:
43             if pkt.payload.proto == 17:  # UDP
44                 return True
45     except AttributeError:
46         return False
47     return False
48
49
50 def main():  # pylint: disable=too-many-statements, too-many-locals
51     """Main function of the script file."""
52     args = TrafficScriptArg(['tx_dst_mac', 'tx_src_ipv6', 'tx_dst_ipv6',
53                              'tx_src_udp_port', 'rx_dst_mac', 'rx_src_mac',
54                              'rx_src_ipv4', 'rx_dst_ipv4'])
55     rx_if = args.get_arg('rx_if')
56     tx_if = args.get_arg('tx_if')
57     tx_dst_mac = args.get_arg('tx_dst_mac')
58     tx_src_ipv6 = args.get_arg('tx_src_ipv6')
59     tx_dst_ipv6 = args.get_arg('tx_dst_ipv6')
60     tx_src_udp_port = int(args.get_arg('tx_src_udp_port'))
61     tx_dst_udp_port = 20000
62     rx_dst_mac = args.get_arg('rx_dst_mac')
63     rx_src_mac = args.get_arg('rx_src_mac')
64     rx_src_ipv4 = args.get_arg('rx_src_ipv4')
65     rx_dst_ipv4 = args.get_arg('rx_dst_ipv4')
66
67     rxq = RxQueue(rx_if)
68     txq = TxQueue(tx_if)
69     sent_packets = []
70
71     # Create empty UDP datagram in IPv6
72
73     udp = (Ether(dst=tx_dst_mac) /
74            IPv6(src=tx_src_ipv6, dst=tx_dst_ipv6) /
75            UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port) /
76            'udp_payload')
77
78     txq.send(udp)
79     sent_packets.append(udp)
80
81     for _ in range(5):
82         pkt = rxq.recv(2)
83         if _is_udp_in_ipv4(pkt):
84             ether = pkt
85             break
86     else:
87         raise RuntimeError("UDP in IPv4 Rx error.")
88
89     # check ethernet
90     if ether.dst != rx_dst_mac:
91         raise RuntimeError("Destination MAC error {} != {}.".
92                            format(ether.dst, rx_dst_mac))
93     print "Destination MAC: OK."
94
95     if ether.src != rx_src_mac:
96         raise RuntimeError("Source MAC error {} != {}.".
97                            format(ether.src, rx_src_mac))
98     print "Source MAC: OK."
99
100     ipv4 = ether.payload
101
102     # check ipv4
103     if ip_address(unicode(ipv4.dst)) != ip_address(unicode(rx_dst_ipv4)):
104         raise RuntimeError("Destination IPv4 error {} != {}.".
105                            format(ipv4.dst, rx_dst_ipv4))
106     print "Destination IPv4: OK."
107
108     if ip_address(unicode(ipv4.src)) != ip_address(unicode(rx_src_ipv4)):
109         raise RuntimeError("Source IPv4 error {} != {}.".
110                            format(ipv4.src, rx_src_ipv4))
111     print "Source IPv4: OK."
112
113     udp = ipv4.payload
114
115     # check udp
116     if udp.dport != tx_dst_udp_port:
117         raise RuntimeError("UDP dport error {} != {}.".
118                            format(udp.dport, tx_dst_udp_port))
119     print "UDP dport: OK."
120
121     if udp.sport != tx_src_udp_port:
122         raise RuntimeError("UDP sport error {} != {}.".
123                            format(udp.sport, tx_src_udp_port))
124     print "UDP sport: OK."
125
126     if not _check_udp_checksum(ipv4):
127         raise RuntimeError("UDP checksum error.")
128     print "UDP checksum OK."
129
130     sys.exit(0)
131
132 if __name__ == "__main__":
133     main()