CSIT-811 HC Test: BGP functional tests
[csit.git] / resources / traffic_scripts / honeycomb / bgp_open.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that listens for incoming BGP connections and verifies
16 received GBP Open message."""
17
18 import sys
19 import socket
20
21 from scapy.main import load_contrib
22 from scapy.layers.inet import Raw
23 from scapy.contrib.bgp import BGPHeader, BGPOpen
24
25 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
26
27
28 def main():
29     """Open a TCP listener socket on the default BGP port. Accept an incoming
30     connection, receive data and verify if data is a valid BGP Open message."""
31     args = TrafficScriptArg(['rx_ip', 'src_ip', 'rx_port', 'as_number',
32                              'holdtime'])
33
34     rx_ip = args.get_arg('rx_ip')
35     src_ip = args.get_arg('src_ip')
36     rx_port = int(args.get_arg('rx_port'))
37     as_number = int(args.get_arg('as_number'))
38     holdtime = int(args.get_arg('holdtime'))
39
40     load_contrib("bgp")
41
42     soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
43     soc.bind((rx_ip, rx_port))
44     soc.listen(1)
45
46     print "Listener active, awaiting connection..."
47     soc.settimeout(8)
48     conn, addr = soc.accept()
49     print 'Connection established with peer:', addr
50
51     data = conn.recv(256)
52     conn.close()
53     soc.close()
54
55     bgp_layer = (BGPHeader(Raw(data).load))
56     bgp_layer.show()
57
58     if not bgp_layer.haslayer(BGPOpen):
59         raise RuntimeError("Received data is not a BGP OPEN message.")
60     bgp_open = bgp_layer.getlayer(BGPOpen)
61     if bgp_open.bgp_id != src_ip:
62         raise RuntimeError(
63             "BGP ID mismatch. Received {0} but should be {1}".
64             format(bgp_open.bgp_id, src_ip))
65     else:
66         print "BGP ID matched."
67
68     if bgp_open.AS != as_number:
69         raise RuntimeError(
70             "BGP AS number mismatch. Received {0} but should be {1}".
71             format(bgp_open.AS, as_number))
72     else:
73         print "BGP AS number matched."
74
75     if bgp_open.hold_time != holdtime:
76         raise RuntimeError(
77             "Hold Time parameter mismatch. Received {0} but should be {1}.".
78             format(bgp_layer.getlayer(BGPOpen).holdtime, holdtime))
79     else:
80         print "BGP Hold Time parameter matched."
81
82     sys.exit(0)
83
84 if __name__ == "__main__":
85     main()