Use only Qemu Kill on the teardown of double qemu setup
[csit.git] / resources / traffic_scripts / send_rs_check_ra.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Router solicitation check script."""
16
17 import sys
18 import ipaddress
19
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet6 import IPv6, ICMPv6ND_RS
22
23 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
24 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
25
26
27 def mac_to_ipv6_linklocal(mac):
28     """Transfer MAC address into specific link-local IPv6 address.
29
30     :param mac: MAC address to be transferred.
31     :type mac: str
32     :return: IPv6 link-local address.
33     :rtype: str
34     """
35     # Remove the most common delimiters: dots, dashes, etc.
36     mac_value = int(mac.translate(None, ' .:-'), 16)
37
38     # Split out the bytes that slot into the IPv6 address
39     # XOR the most significant byte with 0x02, inverting the
40     # Universal / Local bit
41     high2 = mac_value >> 32 & 0xffff ^ 0x0200
42     high1 = mac_value >> 24 & 0xff
43     low1 = mac_value >> 16 & 0xff
44     low2 = mac_value & 0xffff
45
46     return 'fe80::{:04x}:{:02x}ff:fe{:02x}:{:04x}'.format(
47         high2, high1, low1, low2)
48
49
50 def main():
51     """Send Router Solicitation packet, check if the received response\
52      is a Router Advertisement packet and verify."""
53
54     args = TrafficScriptArg(
55         ['src_mac', 'dst_mac', 'src_ip']
56     )
57
58     router_mac = args.get_arg('dst_mac')
59     src_mac = args.get_arg('src_mac')
60     src_ip = args.get_arg('src_ip')
61     if not src_ip:
62         src_ip = mac_to_ipv6_linklocal(src_mac)
63     tx_if = args.get_arg('tx_if')
64
65     txq = TxQueue(tx_if)
66     rxq = RxQueue(tx_if)
67
68     pkt_raw = (Ether(src=src_mac, dst='33:33:00:00:00:02') /
69                IPv6(src=src_ip, dst='ff02::2') /
70                ICMPv6ND_RS())
71
72     sent_packets = [pkt_raw]
73     txq.send(pkt_raw)
74
75     ether = rxq.recv(8, ignore=sent_packets)
76
77     # Check whether received packet contains layer RA and check other values
78     if ether is None:
79         raise RuntimeError('ICMP echo Rx timeout')
80
81     if ether.src != router_mac:
82         raise RuntimeError(
83             'Packet source MAC ({0}) does not match '
84             'router MAC ({1}).'.format(ether.src, router_mac))
85     if ether.dst != src_mac:
86         raise RuntimeError(
87             'Packet destination MAC ({0}) does not match '
88             'RS source MAC ({1}).'.format(ether.dst, src_mac))
89
90     if not ether.haslayer('ICMPv6ND_RA'):
91         raise RuntimeError('Not an RA packet received {0}'
92                            .format(ether.__repr__()))
93
94     src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src))
95     dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst))
96     router_link_local = ipaddress.IPv6Address(unicode(
97         mac_to_ipv6_linklocal(router_mac)))
98     rs_src_address = ipaddress.IPv6Address(unicode(src_ip))
99
100     if src_address != router_link_local:
101         raise RuntimeError(
102             'Packet source address ({0}) does not match '
103             'link local address({1})'.format(src_address, router_link_local))
104
105     if dst_address != rs_src_address:
106         raise RuntimeError(
107             'Packet destination address ({0}) does not match '
108             'RS source address ({1}).'.format(dst_address, rs_src_address))
109
110     if ether['IPv6'].hlim != 255:
111         raise RuntimeError('Hop limit not correct: {0}!=255'.format(
112             ether['IPv6'].hlim))
113
114     ra_code = ether['ICMPv6 Neighbor Discovery - Router Advertisement'].code
115     if ra_code != 0:
116         raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code))
117
118     sys.exit(0)
119
120 if __name__ == "__main__":
121     main()