Update of VPP_STABLE_VER
[csit.git] / resources / traffic_scripts / honeycomb / read_vpp_version.py
1 # Copyright (c) 2017 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 import socket
15 import multiprocessing
16 import argparse
17 from time import time
18
19
20 class Counter(object):
21     """Counter used for stats collection."""
22     def __init__(self, start=0):
23         """Initializer."""
24         self.lock = multiprocessing.Lock()
25         self.value = start
26
27     def increment(self, value=1):
28         """Increment counter and return the new value."""
29         self.lock.acquire()
30         val = self.value
31         try:
32             self.value += value
33         finally:
34             self.lock.release()
35         return val
36
37
38 class timer(object):
39     """Timer used used during test execution."""
40     def __init__(self, verbose=False):
41         self.verbose = verbose
42
43     def __enter__(self):
44         """Start the timer."""
45         self.start = time()
46         return self
47
48     def __exit__(self, *args):
49         """Stop the timer and save current value."""
50         self.end = time()
51         self.secs = self.end - self.start
52         self.msecs = self.secs * 1000  # millisecs
53         if self.verbose:
54             print("elapsed time: {0} ms".format(self.msecs))
55
56
57 class ConfigBlaster(object):
58     """Generates Netconf requests, receives replies and collects statistics."""
59
60     TIMEOUT = 10
61
62     # Hello message with capabilities list for Netconf sessions.
63     hello = u"""<hello xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"
64     message-id="m-0">
65     <capabilities>
66     <capability>urn:ietf:params:netconf:base:1.0</capability>
67     </capabilities>
68     </hello>
69     ]]>]]>"""
70
71     # RPC to retrieve VPP version (minimal processing in VPP)
72     request_template = u"""<rpc xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"
73     message-id="m-1">
74     <get>
75     <filter xmlns:ns0="urn:ietf:params:xml:ns:netconf:base:1.0"
76     ns0:type="subtree">
77     <vpp-state xmlns="urn:opendaylight:params:xml:ns:yang:vpp:management">
78     <version/>
79     </vpp-state>
80     </filter>
81     </get>
82     </rpc>
83     ]]>]]>"""
84
85     class Stats(object):
86         """Stores and further processes statistics collected by worker
87         threads during their execution.
88         """
89
90         def __init__(self):
91             """Initializer."""
92             self.ok_rqst_rate = Counter(0)
93             self.total_rqst_rate = Counter(0)
94             self.ok_rqsts = Counter(0)
95             self.total_rqsts = Counter(0)
96
97         def process_stats(self, rqst_stats, elapsed_time):
98             """Calculates the stats for request/reply throughput, and aggregates
99             statistics across all threads.
100
101             :param rqst_stats: Request statistics dictionary.
102             :param elapsed_time: Elapsed time for the test.
103             :type rqst_stats: dict
104             :type elapsed_time: int
105             :returns: Rates (requests/sec) for successfully finished requests
106                      and the total number of requests.
107             :rtype: tuple
108             """
109             ok_rqsts = rqst_stats["OK"]
110             total_rqsts = sum(rqst_stats.values())
111
112             ok_rqst_rate = ok_rqsts / elapsed_time
113             total_rqst_rate = total_rqsts / elapsed_time
114
115             self.ok_rqsts.increment(ok_rqsts)
116             self.total_rqsts.increment(total_rqsts)
117
118             self.ok_rqst_rate.increment(ok_rqst_rate)
119             self.total_rqst_rate.increment(total_rqst_rate)
120
121             return ok_rqst_rate, total_rqst_rate
122
123         @property
124         def get_ok_rqst_rate(self):
125             return self.ok_rqst_rate.value
126
127         @property
128         def get_total_rqst_rate(self):
129             return self.total_rqst_rate.value
130
131         @property
132         def get_ok_rqsts(self):
133             return self.ok_rqsts.value
134
135         @property
136         def get_total_rqsts(self):
137             return self.total_rqsts.value
138
139     def __init__(self, host, port, ncycles, nthreads, nrequests):
140         """Initializer.
141
142         :param host: Target IP address.
143         :param port: Target port.
144         :param ncycles: Number of test cycles.
145         :param nthreads: Number of threads for packet generation.
146         :param nrequests: Number of requests to send per thread.
147         :type host: str
148         :type port: int
149         :type ncycles: int
150         :type nthreads: int
151         :type nrequests: int
152         """
153
154         self.host = host
155         self.port = port
156         self.ncycles = ncycles
157         self.nthreads = nthreads
158         self.nrequests = nrequests
159
160         self.stats = self.Stats()
161         self.total_ok_rqsts = 0
162
163         self.print_lock = multiprocessing.Lock()
164         self.cond = multiprocessing.Condition()
165         self.threads_done = 0
166
167         self.recv_buf = 8192
168
169     def send_request(self, sock):
170         """Send Netconf request and receive the reply.
171
172         :param sock: Socket object to use for transfer.
173         :type sock: socket object
174         :returns: Response to request or error message.
175         :rtype: str
176         """
177
178         sock.send(self.request_template)
179         try:
180             return sock.recv(self.recv_buf)
181         except socket.timeout:
182             return "timeout"
183         except socket.error:
184             return "error"
185
186     def send_requests(self, tid, stats):
187         """Read entries from the Honeycomb operational data store. This function
188         is executed by a worker thread.
189
190         :param tid: Thread ID - used to id the Blaster thread when
191         statistics for the thread are printed out.
192         :param stats: Synchronized queue object for returning execution stats.
193         :type tid: int
194         :type stats: multiprocessing.Queue
195         """
196
197         rqst_stats = {"OK": 0, "Error": 0, "Timeout": 0}
198
199         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
200         sock.settimeout(5)
201         # Initiate connection
202         sock.connect((self.host, self.port))
203         # Send hello message
204         sock.send(self.hello)
205         # Receive hello message
206         sock.recv(self.recv_buf)
207         # Determine length of expected responses
208         self.recv_buf = len(self.send_request(sock))
209
210         with self.print_lock:
211             print("\n    Thread {0}:\n"
212                   "        Sending {1} requests".format(tid,
213                                                         self.nrequests))
214
215         replies = [None]*self.nrequests
216         with timer() as t:
217             for x in range(self.nrequests):
218                 sts = self.send_request(sock)
219                 replies[x] = sts
220
221         for reply in replies:
222             if reply == "timeout":
223                 rqst_stats["Timeout"] += 1
224             elif "error" in reply:
225                 rqst_stats["Error"] += 1
226             else:
227                 rqst_stats["OK"] += 1
228
229         ok_rps, total_rps = self.stats.process_stats(
230             rqst_stats, t.secs)
231
232         with self.print_lock:
233             print("\n    Thread {0} results (READ): ".format(tid))
234             print("        Elapsed time: {0:.2f}s,".format(t.secs))
235             print("        Requests/s: {0:.2f} OK, {1:.2f} Total".format(
236                 ok_rps, total_rps))
237             print("        Stats ({Requests}, {entries}): "),
238             print(rqst_stats)
239             self.threads_done += 1
240
241         sock.close()
242
243         stats.put({"stats": rqst_stats, "time": t.secs})
244
245         with self.cond:
246             self.cond.notify_all()
247
248     def run_cycle(self, function):
249         """Runs a test cycle. Each test consists of <cycles> test cycles, where
250         <threads> worker threads are started in each test cycle. Each thread
251         reads <requests> entries using Netconf RPCs.
252
253         :param function: Function to be executed in each thread.
254         :type function: function
255         """
256
257         self.total_ok_rqsts = 0
258         stats_queue = multiprocessing.Queue()
259
260         for c in range(self.ncycles):
261             self.stats = self.Stats()
262             with self.print_lock:
263                 print "\nCycle {0}:".format(c)
264
265             threads = []
266             thread_stats = []
267             for i in range(self.nthreads):
268                 t = multiprocessing.Process(target=function,
269                                             args=(i, stats_queue))
270                 threads.append(t)
271                 t.start()
272
273             # Wait for all threads to finish and measure the execution time
274             with timer() as t:
275                 for _ in threads:
276                     thread_stats.append(stats_queue.get())
277                 for thread in threads:
278                     thread.join()
279
280             for item in thread_stats:
281                 self.stats.process_stats(item["stats"], item["time"])
282
283             with self.print_lock:
284                 print("\n*** Test summary:")
285                 print("    Elapsed time:    {0:.2f}s".format(t.secs))
286                 print(
287                     "    Peak requests/s: {0:.2f} OK, {1:.2f} Total".format(
288                         self.stats.get_ok_rqst_rate,
289                         self.stats.get_total_rqst_rate))
290                 print(
291                     "    Avg. requests/s: {0:.2f} OK, {1:.2f} Total ({2:.2f} "
292                     "of peak total)".format(
293                         self.stats.get_ok_rqsts / t.secs,
294                         self.stats.get_total_rqsts / t.secs,
295                         (self.stats.get_total_rqsts / t.secs * 100) /
296                         self.stats.get_total_rqst_rate))
297
298             self.total_ok_rqsts += self.stats.get_ok_rqsts
299
300             self.threads_done = 0
301
302     def add_blaster(self):
303         """Run the test."""
304         self.run_cycle(self.send_requests)
305
306     @property
307     def get_ok_rqsts(self):
308         return self.total_ok_rqsts
309
310
311 def create_arguments_parser():
312     """Creates argument parser for test script.
313     Shorthand to arg parser on library level in order to access and
314     eventually enhance in ancestors.
315
316     :returns: argument parser supporting arguments and parameters
317     :rtype: argparse.ArgumentParser
318     """
319     my_parser = argparse.ArgumentParser(
320         description="entry reading performance test: Reads entries from "
321                     "the config tree, as specified by optional parameters.")
322
323     my_parser.add_argument(
324         "--host", default="127.0.0.1",
325         help="Host where odl controller is running (default is 127.0.0.1).")
326     my_parser.add_argument(
327         "--port", default=7777,
328         help="Port on which Honeycomb's Netconf is listening"
329              " (default is 7777 for TCP)")
330     my_parser.add_argument(
331         "--cycles", type=int, default=1,
332         help="Number of entry read cycles; default 1. <THREADS> worker threads "
333              "are started in each cycle and the cycle ends when all threads "
334              "finish. Another cycle is started when the previous cycle "
335              "is finished.")
336     my_parser.add_argument(
337         "--threads", type=int, default=1,
338         help="Number of request worker threads to start in each cycle; "
339              "default=1. Each thread will read <entries> entries.")
340     my_parser.add_argument(
341         "--requests", type=int, default=10,
342         help="Number of requests that will be made by each worker thread "
343              "in each cycle; default 10")
344
345     return my_parser
346
347 if __name__ == "__main__":
348
349     parser = create_arguments_parser()
350     in_args = parser.parse_args()
351
352     fct = ConfigBlaster(in_args.host, in_args.port, in_args.cycles,
353                         in_args.threads, in_args.requests)
354
355     # Run through <cycles>, where <threads> are started in each cycle and
356     # <entries> are added from each thread
357     fct.add_blaster()
358
359     print "    Successful reads:  {0}\n".format(fct.get_ok_rqsts)