Remove intrusive useless logging
[csit.git] / resources / traffic_scripts / send_lw_4o6_check_hairpinning_udp.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an empty IPv4 UDP datagram encapsulated in IPv6
16 and checks if is correctly re-encapsulated."""
17
18 import sys
19
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet6 import IPv6
22 from scapy.layers.inet import IP, UDP
23
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
26
27
28 def _is_ipv4_in_ipv6(pkt):
29     """If IPv6 next header type in the given pkt is IPv4, return True,
30     else return False. False is returned also if exception occurs."""
31     ipv6_type = int('0x86dd', 16)  # IPv6
32     try:
33         if pkt.type == ipv6_type:
34             if pkt.payload.nh == 4:
35                 return True
36     except:  # pylint: disable=bare-except
37         return False
38     return False
39
40
41 def main():  # pylint: disable=too-many-statements, too-many-locals
42     """Main function of the script file."""
43     args = TrafficScriptArg(['tx_dst_mac',
44                              'tx_dst_ipv6', 'tx_src_ipv6',
45                              'tx_dst_ipv4', 'tx_src_ipv4',
46                              'tx_dst_udp_port', 'tx_src_udp_port',
47                              'rx_dst_mac', 'rx_src_mac',
48                              'rx_dst_ipv6', 'rx_src_ipv6'])
49     rx_if = args.get_arg('rx_if')
50     tx_if = args.get_arg('tx_if')
51     tx_dst_mac = args.get_arg('tx_dst_mac')
52     tx_src_mac = '02:00:00:00:00:01'
53
54     tx_dst_ipv6 = args.get_arg('tx_dst_ipv6')
55     tx_src_ipv6 = args.get_arg('tx_src_ipv6')
56     tx_dst_ipv4 = args.get_arg('tx_dst_ipv4')
57     tx_src_ipv4 = args.get_arg('tx_src_ipv4')
58     tx_dst_udp_port = int(args.get_arg('tx_dst_udp_port'))
59     tx_src_udp_port = int(args.get_arg('tx_src_udp_port'))
60     rx_dst_mac = args.get_arg('rx_dst_mac')
61     rx_src_mac = args.get_arg('rx_src_mac')
62     rx_dst_ipv6 = args.get_arg('rx_dst_ipv6')
63     rx_src_ipv6 = args.get_arg('rx_src_ipv6')
64
65     rxq = RxQueue(rx_if)
66     txq = TxQueue(tx_if)
67     sent_packets = []
68
69     # Create empty UDP datagram in IPv4 and IPv6
70     tx_pkt = Ether(dst=tx_dst_mac, src=tx_src_mac)
71     tx_pkt /= IPv6(src=tx_src_ipv6, dst=tx_dst_ipv6)
72     tx_pkt /= IP(src=tx_src_ipv4, dst=tx_dst_ipv4)
73     tx_pkt /= UDP(sport=tx_src_udp_port, dport=tx_dst_udp_port)
74     tx_pkt /= 'udp_payload'
75
76     txq.send(tx_pkt)
77     sent_packets.append(tx_pkt)
78
79     for _ in range(5):
80         pkt = rxq.recv(2, ignore=sent_packets)
81         if _is_ipv4_in_ipv6(pkt):
82             ether = pkt
83             break
84     else:
85         raise RuntimeError("IPv4 in IPv6 Rx error.")
86
87     # check ethernet
88     if ether.dst != rx_dst_mac:
89         raise RuntimeError("Destination MAC error {} != {}.".
90                            format(ether.dst, rx_dst_mac))
91     print "Destination MAC: OK."
92
93     if ether.src != rx_src_mac:
94         raise RuntimeError("Source MAC error {} != {}.".
95                            format(ether.src, rx_src_mac))
96     print "Source MAC: OK."
97
98     ipv6 = ether.payload
99
100     # check ipv6
101     if ipv6.dst != rx_dst_ipv6:
102         raise RuntimeError("Destination IPv6 error {} != {}.".
103                            format(ipv6.dst, rx_dst_ipv6))
104     print "Destination IPv6: OK."
105
106     if ipv6.src != rx_src_ipv6:
107         raise RuntimeError("Source IPv6 error {} != {}.".
108                            format(ipv6.src, rx_src_ipv6))
109     print "Source IPv6: OK."
110
111     ipv4 = ipv6.payload
112
113     # check ipv4
114     if ipv4.dst != tx_dst_ipv4:
115         raise RuntimeError("Destination IPv4 error {} != {}.".
116                            format(ipv4.dst, tx_dst_ipv4))
117     print "Destination IPv4: OK."
118
119     if ipv4.src != tx_src_ipv4:
120         raise RuntimeError("Source IPv4 error {} != {}.".
121                            format(ipv4.src, tx_src_ipv4))
122     print "Source IPv4: OK."
123
124     if ipv4.proto != 17:  # UDP
125         raise RuntimeError("IPv4 protocol error {} != UDP.".
126                            format(ipv4.proto))
127     print "IPv4 protocol: OK."
128
129     udp = ipv4.payload
130
131     # check udp
132     if udp.dport != tx_dst_udp_port:
133         raise RuntimeError("UDP dport error {} != {}.".
134                            format(udp.dport, tx_dst_udp_port))
135     print "UDP dport: OK."
136
137     if udp.sport != tx_src_udp_port:
138         raise RuntimeError("UDP sport error {} != {}.".
139                            format(udp.sport, tx_src_udp_port))
140     print "UDP sport: OK."
141
142     sys.exit(0)
143
144 if __name__ == "__main__":
145     main()