24f4faa3f477aa831319f9068009ba4089dfb4b0
[csit.git] / resources / traffic_scripts / icmpv6_echo_req_resp.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """Send ICMPv6 echo request from one TG port to another through DUT nodes and
17    send reply back. Also verify hop limit processing."""
18
19 import sys
20 import logging
21 logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
22 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
23 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
24 from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
25 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
26 from scapy.all import Ether
27
28
29 def main():
30     args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_nh_mac', 'dst_nh_mac',
31                              'src_ip', 'dst_ip', 'h_num'])
32
33     src_rxq = RxQueue(args.get_arg('rx_if'))
34     src_txq = TxQueue(args.get_arg('rx_if'))
35     dst_rxq = RxQueue(args.get_arg('tx_if'))
36     dst_txq = TxQueue(args.get_arg('tx_if'))
37
38     src_mac = args.get_arg('src_mac')
39     dst_mac = args.get_arg('dst_mac')
40     src_nh_mac = args.get_arg('src_nh_mac')
41     dst_nh_mac = args.get_arg('dst_nh_mac')
42     src_ip = args.get_arg('src_ip')
43     dst_ip = args.get_arg('dst_ip')
44     hop_num = int(args.get_arg('h_num'))
45     hop_limit = 64
46     echo_id = 0xa
47     echo_seq = 0x1
48
49     src_sent_packets = []
50     dst_sent_packets = []
51
52     # send ICMPv6 neighbor advertisement message
53     pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
54                       IPv6(src=src_ip, dst='ff02::1:ff00:2') /
55                       ICMPv6ND_NA(tgt=src_ip, R=0) /
56                       ICMPv6NDOptDstLLAddr(lladdr=src_mac))
57     src_sent_packets.append(pkt_send)
58     src_txq.send(pkt_send)
59     pkt_send = (Ether(src=dst_mac, dst='ff:ff:ff:ff:ff:ff') /
60                       IPv6(src=dst_ip, dst='ff02::1:ff00:2') /
61                       ICMPv6ND_NA(tgt=dst_ip, R=0) /
62                       ICMPv6NDOptDstLLAddr(lladdr=dst_mac))
63     dst_sent_packets.append(pkt_send)
64     dst_txq.send(pkt_send)
65
66     # send ICMPv6 echo request from first TG interface
67     pkt_send = (Ether(src=src_mac, dst=src_nh_mac) /
68                       IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) /
69                       ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
70     src_sent_packets.append(pkt_send)
71     src_txq.send(pkt_send)
72
73     # receive ICMPv6 echo request on second TG interface
74     ether = dst_rxq.recv(2, dst_sent_packets)
75     if ether is None:
76         src_rxq._proc.terminate()
77         dst_rxq._proc.terminate()
78         raise RuntimeError('ICMPv6 echo reply Rx timeout')
79
80     if not ether.haslayer(IPv6):
81         src_rxq._proc.terminate()
82         dst_rxq._proc.terminate()
83         raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
84             ether.__repr__()))
85
86     ipv6 = ether['IPv6']
87
88     # verify hop limit processing
89     if ipv6.hlim != (hop_limit - hop_num):
90         src_rxq._proc.terminate()
91         dst_rxq._proc.terminate()
92         raise RuntimeError(
93             'Invalid hop limit {0} should be {1}'.format(ipv6.hlim,
94                                                          hop_limit - hop_num))
95
96     if not ipv6.haslayer(ICMPv6EchoRequest):
97         src_rxq._proc.terminate()
98         dst_rxq._proc.terminate()
99         raise RuntimeError(
100             'Unexpected packet with no IPv6 ICMP received {0}'.format(
101                 ipv6.__repr__()))
102
103     icmpv6 = ipv6['ICMPv6 Echo Request']
104
105     # check identifier and sequence number
106     if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
107         src_rxq._proc.terminate()
108         dst_rxq._proc.terminate()
109         raise RuntimeError(
110             'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' +
111             'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
112
113     # verify checksum
114     cksum = icmpv6.cksum
115     del icmpv6.cksum
116     tmp = ICMPv6EchoRequest(str(icmpv6))
117     if tmp.cksum != cksum:
118         src_rxq._proc.terminate()
119         dst_rxq._proc.terminate()
120         raise RuntimeError(
121             'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
122
123     # send ICMPv6 echo reply from second TG interface
124     pkt_send = (Ether(src=dst_mac, dst=dst_nh_mac) /
125                       IPv6(src=dst_ip, dst=src_ip) /
126                       ICMPv6EchoReply(id=echo_id, seq=echo_seq))
127     dst_sent_packets.append(pkt_send)
128     dst_txq.send(pkt_send)
129
130     # receive ICMPv6 echo reply on first TG interface
131     ether = src_rxq.recv(2, src_sent_packets)
132     if ether is None:
133         src_rxq._proc.terminate()
134         dst_rxq._proc.terminate()
135         raise RuntimeError('ICMPv6 echo reply Rx timeout')
136
137     if not ether.haslayer(IPv6):
138         src_rxq._proc.terminate()
139         dst_rxq._proc.terminate()
140         raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
141             ether.__repr__()))
142
143     ipv6 = ether['IPv6']
144
145     # verify hop limit processing
146     if ipv6.hlim != (hop_limit - hop_num):
147         src_rxq._proc.terminate()
148         dst_rxq._proc.terminate()
149         raise RuntimeError(
150             'Invalid hop limit {0} should be {1}'.format(ipv6.hlim,
151                                                          hop_limit - hop_num))
152
153     if not ipv6.haslayer(ICMPv6EchoReply):
154         src_rxq._proc.terminate()
155         dst_rxq._proc.terminate()
156         raise RuntimeError(
157             'Unexpected packet with no IPv6 ICMP received {0}'.format(
158                 ipv6.__repr__()))
159
160     icmpv6 = ipv6['ICMPv6 Echo Reply']
161
162     # check identifier and sequence number
163     if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
164         src_rxq._proc.terminate()
165         dst_rxq._proc.terminate()
166         raise RuntimeError(
167             'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' +
168             'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
169
170     # verify checksum
171     cksum = icmpv6.cksum
172     del icmpv6.cksum
173     tmp = ICMPv6EchoReply(str(icmpv6))
174     if tmp.cksum != cksum:
175         src_rxq._proc.terminate()
176         dst_rxq._proc.terminate()
177         raise RuntimeError(
178             'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
179
180     src_rxq._proc.terminate()
181     dst_rxq._proc.terminate()
182     sys.exit(0)
183
184 if __name__ == "__main__":
185     main()