VAT-to-PAPI: VPPCounters
[csit.git] / resources / traffic_scripts / send_ipv4_icmp_check_lw_4o6.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends an IPv4 ICMP packet with ID field and checks if
16 IPv4 is correctly encapsulated into IPv6 packet. Doing for ICMP echo request
17 and ICMP echo response packet."""
18
19 import sys
20
21 from scapy.layers.l2 import Ether
22 from scapy.layers.inet import IP, ICMP, icmptypes
23
24 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
26
27
28 def _is_ipv4_in_ipv6(pkt):
29     """If IPv6 next header type in the given pkt is IPv4, return True,
30     else return False. False is returned also if exception occurs."""
31     ipv6_type = int('0x86dd', 16)  # IPv6
32     try:
33         if pkt.type == ipv6_type:
34             if pkt.payload.nh == 4:
35                 return True
36     except:  # pylint: disable=bare-except
37         return False
38     return False
39
40
41 def main():  # pylint: disable=too-many-statements, too-many-locals
42     """Main function of the script file."""
43     args = TrafficScriptArg(['tx_dst_mac', 'tx_src_ipv4', 'tx_dst_ipv4',
44                              'tx_icmp_id', 'rx_dst_mac', 'rx_src_mac',
45                              'src_ipv6', 'dst_ipv6'])
46     rx_if = args.get_arg('rx_if')
47     tx_if = args.get_arg('tx_if')
48     tx_dst_mac = args.get_arg('tx_dst_mac')
49     tx_src_ipv4 = args.get_arg('tx_src_ipv4')
50     tx_dst_ipv4 = args.get_arg('tx_dst_ipv4')
51     tx_icmp_id = int(args.get_arg('tx_icmp_id'))
52     rx_dst_mac = args.get_arg('rx_dst_mac')
53     rx_src_mac = args.get_arg('rx_src_mac')
54     rx_src_ipv6 = args.get_arg('src_ipv6')
55     rx_dst_ipv6 = args.get_arg('dst_ipv6')
56
57     rxq = RxQueue(rx_if)
58     txq = TxQueue(tx_if)
59     sent_packets = []
60
61     for icmp_type in ('echo-request', 'echo-reply'):
62         print '\nChecking ICMP type: {}'.format(icmp_type)
63
64         # Create ICMP request
65         tx_pkt = (Ether(dst=tx_dst_mac) /
66                   IP(src=tx_src_ipv4, dst=tx_dst_ipv4) /
67                   ICMP(type=icmp_type, id=tx_icmp_id))
68
69         txq.send(tx_pkt)
70         sent_packets.append(tx_pkt)
71
72         for _ in range(5):
73             pkt = rxq.recv(2)
74             if _is_ipv4_in_ipv6(pkt):
75                 ether = pkt
76                 break
77         else:
78             raise RuntimeError("IPv4 in IPv6 Rx error.")
79
80         # check ethernet
81         if ether.dst != rx_dst_mac:
82             raise RuntimeError("Destination MAC error {} != {}.".
83                                format(ether.dst, rx_dst_mac))
84         print "Destination MAC: OK."
85
86         if ether.src != rx_src_mac:
87             raise RuntimeError("Source MAC error {} != {}.".
88                                format(ether.src, rx_src_mac))
89         print "Source MAC: OK."
90
91         ipv6 = ether.payload
92
93         # check ipv6
94         if ipv6.dst != rx_dst_ipv6:
95             raise RuntimeError("Destination IP error {} != {}.".
96                                format(ipv6.dst, rx_dst_ipv6))
97         print "Destination IPv6: OK."
98
99         if ipv6.src != rx_src_ipv6:
100             raise RuntimeError("Source IP error {} != {}.".
101                                format(ipv6.src, rx_src_ipv6))
102         print "Source IPv6: OK."
103
104         ipv4 = ipv6.payload
105
106         # check ipv4
107         if ipv4.dst != tx_dst_ipv4:
108             raise RuntimeError("Destination IP error {} != {}.".
109                                format(ipv4.dst, tx_dst_ipv4))
110         print "Destination IPv4: OK."
111
112         if ipv4.src != tx_src_ipv4:
113             raise RuntimeError("Source IP error {} != {}.".
114                                format(ipv4.src, tx_src_ipv4))
115         print "Source IPv4: OK."
116
117         # check icmp echo request
118         if ipv4.proto != 1:  # ICMP
119             raise RuntimeError("IP protocol error {} != ICMP.".
120                                format(ipv4.proto))
121         print "IPv4 protocol: OK."
122
123         icmp = ipv4.payload
124
125         # check icmp
126         if icmptypes[icmp.type] != icmp_type:
127             raise RuntimeError("ICMP type error {} != echo request.".
128                                format(icmp.type))
129         print "ICMP type: OK."
130
131         if icmp.id != tx_icmp_id:
132             raise RuntimeError("ICMP ID error {} != {}.".
133                                format(icmp.id, tx_icmp_id))
134         print "ICMP ID: OK."
135
136     sys.exit(0)
137
138 if __name__ == "__main__":
139     main()