CSIT-563: HC Test: improved Lisp test coverage
[csit.git] / resources / traffic_scripts / send_ipv4_udp_check_lw_4o6.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an empty UDP datagram and checks if IPv4 is
16 correctly encapsulated into IPv6 packet."""
17
18 import sys
19
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet import IP, UDP
22 from ipaddress import ip_address
23
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
26
27
28 def _is_ipv4_in_ipv6(pkt):
29     """If IPv6 next header type in the given pkt is IPv4, return True,
30     else return False. False is returned also if exception occurs."""
31     ipv6_type = int('0x86dd', 16)  # IPv6
32     try:
33         if pkt.type == ipv6_type:
34             if pkt.payload.nh == 4:
35                 return True
36     except:  # pylint: disable=bare-except
37         return False
38     return False
39
40
41 def main():  # pylint: disable=too-many-statements, too-many-locals
42     """Main function of the script file."""
43     args = TrafficScriptArg(['tx_dst_mac', 'tx_src_ipv4', 'tx_dst_ipv4',
44                              'tx_dst_udp_port', 'rx_dst_mac', 'rx_src_mac',
45                              'src_ipv6', 'dst_ipv6'])
46     rx_if = args.get_arg('rx_if')
47     tx_if = args.get_arg('tx_if')
48     tx_dst_mac = args.get_arg('tx_dst_mac')
49     tx_src_ipv4 = args.get_arg('tx_src_ipv4')
50     tx_dst_ipv4 = args.get_arg('tx_dst_ipv4')
51     tx_dst_udp_port = int(args.get_arg('tx_dst_udp_port'))
52     tx_src_udp_port = 20000
53     rx_dst_mac = args.get_arg('rx_dst_mac')
54     rx_src_mac = args.get_arg('rx_src_mac')
55     rx_src_ipv6 = args.get_arg('src_ipv6')
56     rx_dst_ipv6 = args.get_arg('dst_ipv6')
57
58     rxq = RxQueue(rx_if)
59     txq = TxQueue(tx_if)
60     sent_packets = []
61
62     # Create empty UDP datagram
63     udp = (Ether(dst=tx_dst_mac) /
64            IP(src=tx_src_ipv4, dst=tx_dst_ipv4) /
65            UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port))
66
67     txq.send(udp)
68     sent_packets.append(udp)
69
70     for _ in range(5):
71         pkt = rxq.recv(2)
72         if _is_ipv4_in_ipv6(pkt):
73             ether = pkt
74             break
75     else:
76         raise RuntimeError("IPv4 in IPv6 Rx error.")
77
78     # check ethernet
79     if ether.dst != rx_dst_mac:
80         raise RuntimeError("Destination MAC error {} != {}.".
81                            format(ether.dst, rx_dst_mac))
82     print "Destination MAC: OK."
83
84     if ether.src != rx_src_mac:
85         raise RuntimeError("Source MAC error {} != {}.".
86                            format(ether.src, rx_src_mac))
87     print "Source MAC: OK."
88
89     ipv6 = ether.payload
90
91     # check ipv6
92     if ip_address(unicode(ipv6.dst)) != ip_address(unicode(rx_dst_ipv6)):
93         raise RuntimeError("Destination IP error {} != {}.".
94                            format(ipv6.dst, rx_dst_ipv6))
95     print "Destination IPv6: OK."
96
97     if ip_address(unicode(ipv6.src)) != ip_address(unicode(rx_src_ipv6)):
98         raise RuntimeError("Source IP error {} != {}.".
99                            format(ipv6.src, rx_src_ipv6))
100     print "Source IPv6: OK."
101
102     ipv4 = ipv6.payload
103
104     # check ipv4
105     if ipv4.dst != tx_dst_ipv4:
106         raise RuntimeError("Destination IP error {} != {}.".
107                            format(ipv4.dst, tx_dst_ipv4))
108     print "Destination IPv4: OK."
109
110     if ipv4.src != tx_src_ipv4:
111         raise RuntimeError("Source IP error {} != {}.".
112                            format(ipv4.src, tx_src_ipv4))
113     print "Source IPv4: OK."
114
115     if ipv4.proto != 17:  # UDP
116         raise RuntimeError("IP protocol error {} != UDP.".
117                            format(ipv4.proto))
118     print "IPv4 protocol: OK."
119
120     udp = ipv4.payload
121
122     # check udp
123     if udp.dport != tx_dst_udp_port:
124         raise RuntimeError("UDP dport error {} != {}.".
125                            format(udp.dport, tx_dst_udp_port))
126     print "UDP dport: OK."
127
128     if udp.sport != tx_src_udp_port:
129         raise RuntimeError("UDP sport error {} != {}.".
130                            format(udp.sport, tx_src_udp_port))
131     print "UDP sport: OK."
132
133     sys.exit(0)
134
135 if __name__ == "__main__":
136     main()