Python3: PIP requirement
[csit.git] / resources / traffic_scripts / icmpv6_echo_req_resp.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2018 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """Send ICMPv6 echo request from one TG port to DUT port or to another TG port
17    through DUT node(s) and send reply back. Also verify hop limit processing."""
18
19 import sys
20 import logging
21
22 # pylint: disable=no-name-in-module
23 # pylint: disable=import-error
24 logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
25
26 from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
27 from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr
28 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
29 from scapy.all import Ether
30
31 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
32 from resources.libraries.python.PacketVerifier import checksum_equal
33 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
34
35
36 def main():
37     args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_nh_mac', 'dst_nh_mac',
38                              'src_ip', 'dst_ip', 'h_num'], ['is_dst_tg'])
39
40     src_rxq = RxQueue(args.get_arg('tx_if'))
41     src_txq = TxQueue(args.get_arg('tx_if'))
42
43     src_mac = args.get_arg('src_mac')
44     dst_mac = args.get_arg('dst_mac')
45     src_nh_mac = args.get_arg('src_nh_mac')
46     dst_nh_mac = args.get_arg('dst_nh_mac')
47     src_ip = args.get_arg('src_ip')
48     dst_ip = args.get_arg('dst_ip')
49     hop_num = int(args.get_arg('h_num'))
50
51     is_dst_tg = True if args.get_arg('is_dst_tg') in ['True', ''] else False
52     dst_rxq = RxQueue(args.get_arg('rx_if')) if is_dst_tg else None
53     dst_txq = TxQueue(args.get_arg('rx_if')) if is_dst_tg else None
54
55     hop_limit = 64
56     echo_id = 0xa
57     echo_seq = 0x1
58
59     src_sent_packets = []
60     dst_sent_packets = []
61
62     # send ICMPv6 neighbor advertisement message
63     pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
64                 IPv6(src=src_ip, dst='ff02::1:ff00:2') /
65                 ICMPv6ND_NA(tgt=src_ip, R=0) /
66                 ICMPv6NDOptDstLLAddr(lladdr=src_mac))
67     src_sent_packets.append(pkt_send)
68     src_txq.send(pkt_send)
69
70     if is_dst_tg:
71         # send ICMPv6 neighbor advertisement message
72         pkt_send = (Ether(src=dst_mac, dst='ff:ff:ff:ff:ff:ff') /
73                     IPv6(src=dst_ip, dst='ff02::1:ff00:2') /
74                     ICMPv6ND_NA(tgt=dst_ip, R=0) /
75                     ICMPv6NDOptDstLLAddr(lladdr=dst_mac))
76         dst_sent_packets.append(pkt_send)
77         dst_txq.send(pkt_send)
78
79     # send ICMPv6 echo request from first TG interface
80     pkt_send = (Ether(src=src_mac, dst=src_nh_mac) /
81                 IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) /
82                 ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
83     src_sent_packets.append(pkt_send)
84     src_txq.send(pkt_send)
85
86     if is_dst_tg:
87         # receive ICMPv6 echo request on second TG interface
88         while True:
89             ether = dst_rxq.recv(2, dst_sent_packets)
90             if ether is None:
91                 raise RuntimeError('ICMPv6 echo reply Rx timeout')
92
93             if ether.haslayer(ICMPv6ND_NS):
94                 # read another packet in the queue if the current one is
95                 # ICMPv6ND_NS
96                 continue
97             else:
98                 # otherwise process the current packet
99                 break
100
101         if not ether.haslayer(IPv6):
102             raise RuntimeError('Unexpected packet with no IPv6 received: {0}'.
103                                format(ether.__repr__()))
104
105         ipv6 = ether[IPv6]
106
107         # verify hop limit processing
108         if ipv6.hlim != (hop_limit - hop_num):
109             raise RuntimeError('Invalid hop limit {0} should be {1}'.
110                                format(ipv6.hlim, hop_limit - hop_num))
111
112         if not ipv6.haslayer(ICMPv6EchoRequest):
113             raise RuntimeError('Unexpected packet with no IPv6 ICMP received '
114                                '{0}'.format(ipv6.__repr__()))
115
116         icmpv6 = ipv6[ICMPv6EchoRequest]
117
118         # check identifier and sequence number
119         if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
120             raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} '
121                                'seq {1} should be ID {2} seq {3}'.
122                                format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
123
124         # verify checksum
125         cksum = icmpv6.cksum
126         del icmpv6.cksum
127         tmp = ICMPv6EchoRequest(str(icmpv6))
128         if not checksum_equal(tmp.cksum, cksum):
129             raise RuntimeError('Invalid checksum {0} should be {1}'.
130                                format(cksum, tmp.cksum))
131
132         # send ICMPv6 echo reply from second TG interface
133         pkt_send = (Ether(src=dst_mac, dst=dst_nh_mac) /
134                     IPv6(src=dst_ip, dst=src_ip, hlim=(ipv6.hlim - 1)) /
135                     ICMPv6EchoReply(id=echo_id, seq=echo_seq))
136         dst_sent_packets.append(pkt_send)
137         dst_txq.send(pkt_send)
138
139     # receive ICMPv6 echo reply on first TG interface
140     while True:
141         ether = src_rxq.recv(2, src_sent_packets)
142         if ether is None:
143             raise RuntimeError('ICMPv6 echo reply Rx timeout')
144
145         if ether.haslayer(ICMPv6ND_NS):
146             # read another packet in the queue if the current one is ICMPv6ND_NS
147             continue
148         else:
149             # otherwise process the current packet
150             break
151
152     if not ether.haslayer(IPv6):
153         raise RuntimeError('Unexpected packet with no IPv6 layer received {0}'.
154                            format(ether.__repr__()))
155
156     ipv6 = ether[IPv6]
157
158     # verify hop limit processing; destination node decrements hlim by one in
159     # outgoing ICMPv6 Echo Reply
160     directions = 2 if is_dst_tg else 1
161     hop_limit_reply = hop_limit - directions * hop_num - 1
162     if ipv6.hlim != hop_limit_reply:
163         raise RuntimeError('Invalid hop limit {0} should be {1}'.
164                            format(ipv6.hlim, hop_limit_reply))
165
166     if not ipv6.haslayer(ICMPv6EchoReply):
167         raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'.
168                            format(ipv6.__repr__()))
169
170     icmpv6 = ipv6[ICMPv6EchoReply]
171
172     # check identifier and sequence number
173     if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
174         raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} '
175                            'seq {1} should be ID {2} seq {3}'.
176                            format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
177
178     # verify checksum
179     cksum = icmpv6.cksum
180     del icmpv6.cksum
181     tmp = ICMPv6EchoReply(str(icmpv6))
182     if not checksum_equal(tmp.cksum, cksum):
183         raise RuntimeError('Invalid checksum {0} should be {1}'.
184                            format(cksum, tmp.cksum))
185
186     sys.exit(0)
187
188
189 if __name__ == "__main__":
190     main()