Use only Qemu Kill on the teardown of double qemu setup
[csit.git] / resources / traffic_scripts / icmpv6_echo_req_resp.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """Send ICMPv6 echo request from one TG port to another through DUT nodes and
17    send reply back. Also verify hop limit processing."""
18
19 import sys
20 import logging
21 logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
22 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\
23     checksum_equal
24 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
25 from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
26 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
27 from scapy.all import Ether
28
29
30 def main():
31     args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_nh_mac', 'dst_nh_mac',
32                              'src_ip', 'dst_ip', 'h_num'])
33
34     src_rxq = RxQueue(args.get_arg('rx_if'))
35     src_txq = TxQueue(args.get_arg('rx_if'))
36     dst_rxq = RxQueue(args.get_arg('tx_if'))
37     dst_txq = TxQueue(args.get_arg('tx_if'))
38
39     src_mac = args.get_arg('src_mac')
40     dst_mac = args.get_arg('dst_mac')
41     src_nh_mac = args.get_arg('src_nh_mac')
42     dst_nh_mac = args.get_arg('dst_nh_mac')
43     src_ip = args.get_arg('src_ip')
44     dst_ip = args.get_arg('dst_ip')
45     hop_num = int(args.get_arg('h_num'))
46     hop_limit = 64
47     echo_id = 0xa
48     echo_seq = 0x1
49
50     src_sent_packets = []
51     dst_sent_packets = []
52
53     # send ICMPv6 neighbor advertisement message
54     pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
55                       IPv6(src=src_ip, dst='ff02::1:ff00:2') /
56                       ICMPv6ND_NA(tgt=src_ip, R=0) /
57                       ICMPv6NDOptDstLLAddr(lladdr=src_mac))
58     src_sent_packets.append(pkt_send)
59     src_txq.send(pkt_send)
60     pkt_send = (Ether(src=dst_mac, dst='ff:ff:ff:ff:ff:ff') /
61                       IPv6(src=dst_ip, dst='ff02::1:ff00:2') /
62                       ICMPv6ND_NA(tgt=dst_ip, R=0) /
63                       ICMPv6NDOptDstLLAddr(lladdr=dst_mac))
64     dst_sent_packets.append(pkt_send)
65     dst_txq.send(pkt_send)
66
67     # send ICMPv6 echo request from first TG interface
68     pkt_send = (Ether(src=src_mac, dst=src_nh_mac) /
69                       IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) /
70                       ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
71     src_sent_packets.append(pkt_send)
72     src_txq.send(pkt_send)
73
74     # receive ICMPv6 echo request on second TG interface
75     ether = dst_rxq.recv(2, dst_sent_packets)
76     if ether is None:
77         raise RuntimeError('ICMPv6 echo reply Rx timeout')
78
79     if not ether.haslayer(IPv6):
80         raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
81             ether.__repr__()))
82
83     ipv6 = ether['IPv6']
84
85     # verify hop limit processing
86     if ipv6.hlim != (hop_limit - hop_num):
87         raise RuntimeError(
88             'Invalid hop limit {0} should be {1}'.format(ipv6.hlim,
89                                                          hop_limit - hop_num))
90
91     if not ipv6.haslayer(ICMPv6EchoRequest):
92         raise RuntimeError(
93             'Unexpected packet with no IPv6 ICMP received {0}'.format(
94                 ipv6.__repr__()))
95
96     icmpv6 = ipv6['ICMPv6 Echo Request']
97
98     # check identifier and sequence number
99     if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
100         raise RuntimeError(
101             'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \
102             'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
103
104     # verify checksum
105     cksum = icmpv6.cksum
106     del icmpv6.cksum
107     tmp = ICMPv6EchoRequest(str(icmpv6))
108     if not checksum_equal(tmp.cksum, cksum):
109         raise RuntimeError(
110             'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
111
112     # send ICMPv6 echo reply from second TG interface
113     pkt_send = (Ether(src=dst_mac, dst=dst_nh_mac) /
114                       IPv6(src=dst_ip, dst=src_ip) /
115                       ICMPv6EchoReply(id=echo_id, seq=echo_seq))
116     dst_sent_packets.append(pkt_send)
117     dst_txq.send(pkt_send)
118
119     # receive ICMPv6 echo reply on first TG interface
120     ether = src_rxq.recv(2, src_sent_packets)
121     if ether is None:
122         raise RuntimeError('ICMPv6 echo reply Rx timeout')
123
124     if not ether.haslayer(IPv6):
125         raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
126             ether.__repr__()))
127
128     ipv6 = ether['IPv6']
129
130     # verify hop limit processing
131     if ipv6.hlim != (hop_limit - hop_num):
132         raise RuntimeError(
133             'Invalid hop limit {0} should be {1}'.format(ipv6.hlim,
134                                                          hop_limit - hop_num))
135
136     if not ipv6.haslayer(ICMPv6EchoReply):
137         raise RuntimeError(
138             'Unexpected packet with no IPv6 ICMP received {0}'.format(
139                 ipv6.__repr__()))
140
141     icmpv6 = ipv6['ICMPv6 Echo Reply']
142
143     # check identifier and sequence number
144     if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
145         raise RuntimeError(
146             'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \
147             'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
148
149     # verify checksum
150     cksum = icmpv6.cksum
151     del icmpv6.cksum
152     tmp = ICMPv6EchoReply(str(icmpv6))
153     if not checksum_equal(tmp.cksum, cksum):
154         raise RuntimeError(
155             'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
156
157     sys.exit(0)
158
159 if __name__ == "__main__":
160     main()