1 # Copyright (c) 2017 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
15 import multiprocessing
20 class Counter(object):
21 """Counter used for stats collection."""
22 def __init__(self, start=0):
24 self.lock = multiprocessing.Lock()
27 def increment(self, value=1):
28 """Increment counter and return the new value."""
39 """Timer used used during test execution."""
40 def __init__(self, verbose=False):
41 self.verbose = verbose
44 """Start the timer."""
48 def __exit__(self, *args):
49 """Stop the timer and save current value."""
51 self.secs = self.end - self.start
52 self.msecs = self.secs * 1000 # millisecs
54 print("elapsed time: {0} ms".format(self.msecs))
57 class ConfigBlaster(object):
58 """Generates Netconf requests, receives replies and collects statistics."""
62 # Hello message with capabilities list for Netconf sessions.
63 hello = u"""<hello xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"
66 <capability>urn:ietf:params:netconf:base:1.0</capability>
71 # RPC to retrieve VPP version (minimal processing in VPP)
72 request_template = u"""<rpc xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"
75 <filter xmlns:ns0="urn:ietf:params:xml:ns:netconf:base:1.0"
77 <vpp-state xmlns="http://fd.io/hc2vpp/yang/vpp-management">
86 """Stores and further processes statistics collected by worker
87 threads during their execution.
92 self.ok_rqst_rate = Counter(0)
93 self.total_rqst_rate = Counter(0)
94 self.ok_rqsts = Counter(0)
95 self.total_rqsts = Counter(0)
97 def process_stats(self, rqst_stats, elapsed_time):
98 """Calculates the stats for request/reply throughput, and aggregates
99 statistics across all threads.
101 :param rqst_stats: Request statistics dictionary.
102 :param elapsed_time: Elapsed time for the test.
103 :type rqst_stats: dict
104 :type elapsed_time: int
105 :returns: Rates (requests/sec) for successfully finished requests
106 and the total number of requests.
109 ok_rqsts = rqst_stats["OK"]
110 total_rqsts = sum(rqst_stats.values())
112 ok_rqst_rate = ok_rqsts / elapsed_time
113 total_rqst_rate = total_rqsts / elapsed_time
115 self.ok_rqsts.increment(ok_rqsts)
116 self.total_rqsts.increment(total_rqsts)
118 self.ok_rqst_rate.increment(ok_rqst_rate)
119 self.total_rqst_rate.increment(total_rqst_rate)
121 return ok_rqst_rate, total_rqst_rate
124 def get_ok_rqst_rate(self):
125 return self.ok_rqst_rate.value
128 def get_total_rqst_rate(self):
129 return self.total_rqst_rate.value
132 def get_ok_rqsts(self):
133 return self.ok_rqsts.value
136 def get_total_rqsts(self):
137 return self.total_rqsts.value
139 def __init__(self, host, port, ncycles, nthreads, nrequests):
142 :param host: Target IP address.
143 :param port: Target port.
144 :param ncycles: Number of test cycles.
145 :param nthreads: Number of threads for packet generation.
146 :param nrequests: Number of requests to send per thread.
156 self.ncycles = ncycles
157 self.nthreads = nthreads
158 self.nrequests = nrequests
160 self.stats = self.Stats()
161 self.total_ok_rqsts = 0
163 self.print_lock = multiprocessing.Lock()
164 self.cond = multiprocessing.Condition()
165 self.threads_done = 0
169 def send_request(self, sock):
170 """Send Netconf request and receive the reply.
172 :param sock: Socket object to use for transfer.
173 :type sock: socket object
174 :returns: Response to request or error message.
178 sock.send(self.request_template)
180 return sock.recv(self.recv_buf)
181 except socket.timeout:
186 def send_requests(self, tid, stats):
187 """Read entries from the Honeycomb operational data store. This function
188 is executed by a worker thread.
190 :param tid: Thread ID - used to id the Blaster thread when
191 statistics for the thread are printed out.
192 :param stats: Synchronized queue object for returning execution stats.
194 :type stats: multiprocessing.Queue
197 rqst_stats = {"OK": 0, "Error": 0, "Timeout": 0}
199 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
201 # Initiate connection
202 sock.connect((self.host, self.port))
204 sock.send(self.hello)
205 # Receive hello message
206 sock.recv(self.recv_buf)
207 # Determine length of expected responses
208 self.recv_buf = len(self.send_request(sock))
210 with self.print_lock:
211 print("\n Thread {0}:\n"
212 " Sending {1} requests".format(tid,
215 replies = [None]*self.nrequests
217 for x in range(self.nrequests):
218 sts = self.send_request(sock)
221 for reply in replies:
222 if reply == "timeout":
223 rqst_stats["Timeout"] += 1
224 elif "error" in reply:
225 rqst_stats["Error"] += 1
227 rqst_stats["OK"] += 1
229 ok_rps, total_rps = self.stats.process_stats(
232 with self.print_lock:
233 print("\n Thread {0} results (READ): ".format(tid))
234 print(" Elapsed time: {0:.2f}s,".format(t.secs))
235 print(" Requests/s: {0:.2f} OK, {1:.2f} Total".format(
237 print(" Stats ({Requests}, {entries}): "),
239 self.threads_done += 1
243 stats.put({"stats": rqst_stats, "time": t.secs})
246 self.cond.notify_all()
248 def run_cycle(self, function):
249 """Runs a test cycle. Each test consists of <cycles> test cycles, where
250 <threads> worker threads are started in each test cycle. Each thread
251 reads <requests> entries using Netconf RPCs.
253 :param function: Function to be executed in each thread.
254 :type function: function
257 self.total_ok_rqsts = 0
258 stats_queue = multiprocessing.Queue()
260 for c in range(self.ncycles):
261 self.stats = self.Stats()
262 with self.print_lock:
263 print "\nCycle {0}:".format(c)
267 for i in range(self.nthreads):
268 t = multiprocessing.Process(target=function,
269 args=(i, stats_queue))
273 # Wait for all threads to finish and measure the execution time
276 thread_stats.append(stats_queue.get())
277 for thread in threads:
280 for item in thread_stats:
281 self.stats.process_stats(item["stats"], item["time"])
283 with self.print_lock:
284 print("\n*** Test summary:")
285 print(" Elapsed time: {0:.2f}s".format(t.secs))
287 " Peak requests/s: {0:.2f} OK, {1:.2f} Total".format(
288 self.stats.get_ok_rqst_rate,
289 self.stats.get_total_rqst_rate))
291 " Avg. requests/s: {0:.2f} OK, {1:.2f} Total ({2:.2f} "
292 "of peak total)".format(
293 self.stats.get_ok_rqsts / t.secs,
294 self.stats.get_total_rqsts / t.secs,
295 (self.stats.get_total_rqsts / t.secs * 100) /
296 self.stats.get_total_rqst_rate))
298 self.total_ok_rqsts += self.stats.get_ok_rqsts
300 self.threads_done = 0
302 def add_blaster(self):
304 self.run_cycle(self.send_requests)
307 def get_ok_rqsts(self):
308 return self.total_ok_rqsts
311 def create_arguments_parser():
312 """Creates argument parser for test script.
313 Shorthand to arg parser on library level in order to access and
314 eventually enhance in ancestors.
316 :returns: argument parser supporting arguments and parameters
317 :rtype: argparse.ArgumentParser
319 my_parser = argparse.ArgumentParser(
320 description="entry reading performance test: Reads entries from "
321 "the config tree, as specified by optional parameters.")
323 my_parser.add_argument(
324 "--host", default="127.0.0.1",
325 help="Host where odl controller is running (default is 127.0.0.1).")
326 my_parser.add_argument(
327 "--port", default=7777,
328 help="Port on which Honeycomb's Netconf is listening"
329 " (default is 7777 for TCP)")
330 my_parser.add_argument(
331 "--cycles", type=int, default=1,
332 help="Number of entry read cycles; default 1. <THREADS> worker threads "
333 "are started in each cycle and the cycle ends when all threads "
334 "finish. Another cycle is started when the previous cycle "
336 my_parser.add_argument(
337 "--threads", type=int, default=1,
338 help="Number of request worker threads to start in each cycle; "
339 "default=1. Each thread will read <entries> entries.")
340 my_parser.add_argument(
341 "--requests", type=int, default=10,
342 help="Number of requests that will be made by each worker thread "
343 "in each cycle; default 10")
347 if __name__ == "__main__":
349 parser = create_arguments_parser()
350 in_args = parser.parse_args()
352 fct = ConfigBlaster(in_args.host, in_args.port, in_args.cycles,
353 in_args.threads, in_args.requests)
355 # Run through <cycles>, where <threads> are started in each cycle and
356 # <entries> are added from each thread
359 print " Successful reads: {0}\n".format(fct.get_ok_rqsts)