CSIT-651 Add keywords and template for memif
[csit.git] / resources / traffic_scripts / send_lw_4o6_check_ipv4_udp.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an empty IPv4 UDP datagram encapsulated in IPv6
16 and checks if is correctly decapsulated."""
17
18 import sys
19
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet6 import IPv6
22 from scapy.layers.inet import IP, UDP
23
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
26
27
28 def _is_udp_in_ipv4(pkt):
29     """If UDP is in IPv4 packet return True,
30     else return False. False is returned also if exception occurs."""
31     ipv4_type = int('0x0800', 16)  # IPv4
32     try:
33         if pkt.type == ipv4_type:
34             if pkt.payload.proto == 17:  # UDP
35                 return True
36     except:  # pylint: disable=bare-except
37         return False
38     return False
39
40
41 def main():  # pylint: disable=too-many-statements, too-many-locals
42     """Main function of the script file."""
43     args = TrafficScriptArg(['tx_dst_mac', 'tx_src_mac',
44                              'tx_dst_ipv6', 'tx_src_ipv6',
45                              'tx_dst_ipv4', 'tx_src_ipv4', 'tx_src_udp_port',
46                              'rx_dst_mac', 'rx_src_mac'])
47     rx_if = args.get_arg('rx_if')
48     tx_if = args.get_arg('tx_if')
49     tx_src_mac = args.get_arg('tx_src_mac')
50     tx_dst_mac = args.get_arg('tx_dst_mac')
51     tx_dst_ipv6 = args.get_arg('tx_dst_ipv6')
52     tx_src_ipv6 = args.get_arg('tx_src_ipv6')
53     tx_dst_ipv4 = args.get_arg('tx_dst_ipv4')
54     tx_src_ipv4 = args.get_arg('tx_src_ipv4')
55     tx_src_udp_port = int(args.get_arg('tx_src_udp_port'))
56     tx_dst_udp_port = 20000
57     rx_dst_mac = args.get_arg('rx_dst_mac')
58     rx_src_mac = args.get_arg('rx_src_mac')
59
60     rxq = RxQueue(rx_if)
61     txq = TxQueue(tx_if)
62     sent_packets = []
63
64     # Create empty UDP datagram in IPv4 and IPv6
65     tx_pkt = (Ether(dst=tx_dst_mac, src=tx_src_mac) /
66               IPv6(src=tx_src_ipv6, dst=tx_dst_ipv6) /
67               IP(src=tx_src_ipv4, dst=tx_dst_ipv4) /
68               UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port))
69
70     txq.send(tx_pkt)
71     sent_packets.append(tx_pkt)
72
73     for _ in range(5):
74         pkt = rxq.recv(2)
75         if _is_udp_in_ipv4(pkt):
76             ether = pkt
77             break
78     else:
79         raise RuntimeError("UDP in IPv4 Rx error.")
80
81     # check ethernet
82     if ether.dst != rx_dst_mac:
83         raise RuntimeError("Destination MAC error {} != {}.".
84                            format(ether.dst, rx_dst_mac))
85     print "Destination MAC: OK."
86
87     if ether.src != rx_src_mac:
88         raise RuntimeError("Source MAC error {} != {}.".
89                            format(ether.src, rx_src_mac))
90     print "Source MAC: OK."
91
92     ipv4 = ether.payload
93
94     # check ipv4
95     if ipv4.dst != tx_dst_ipv4:
96         raise RuntimeError("Destination IPv4 error {} != {}.".
97                            format(ipv4.dst, tx_dst_ipv4))
98     print "Destination IPv4: OK."
99
100     if ipv4.src != tx_src_ipv4:
101         raise RuntimeError("Source IPv4 error {} != {}.".
102                            format(ipv4.src, tx_src_ipv4))
103     print "Source IPv4: OK."
104
105     if ipv4.proto != 17:  # UDP
106         raise RuntimeError("IPv4 protocol error {} != UDP.".
107                            format(ipv4.proto))
108     print "IPv4 protocol: OK."
109
110     udp = ipv4.payload
111
112     # check udp
113     if udp.dport != tx_dst_udp_port:
114         raise RuntimeError("UDP dport error {} != {}.".
115                            format(udp.dport, tx_dst_udp_port))
116     print "UDP dport: OK."
117
118     if udp.sport != tx_src_udp_port:
119         raise RuntimeError("UDP sport error {} != {}.".
120                            format(udp.sport, tx_src_udp_port))
121     print "UDP sport: OK."
122
123     sys.exit(0)
124
125 if __name__ == "__main__":
126     main()