X-Git-Url: https://gerrit.fd.io/r/gitweb?p=csit.git;a=blobdiff_plain;f=resources%2Ftraffic_scripts%2Fhoneycomb%2Fread_vpp_version.py;fp=resources%2Ftraffic_scripts%2Fhoneycomb%2Fread_vpp_version.py;h=0000000000000000000000000000000000000000;hp=6a275e7c0fba5c819b95629f305345a440c5bb92;hb=e7ad66f3147662973039caaac33015de7e0c6f8c;hpb=20cc67d5f23a7f4e05b08012bf3d3a63be4bcf63 diff --git a/resources/traffic_scripts/honeycomb/read_vpp_version.py b/resources/traffic_scripts/honeycomb/read_vpp_version.py deleted file mode 100755 index 6a275e7c0f..0000000000 --- a/resources/traffic_scripts/honeycomb/read_vpp_version.py +++ /dev/null @@ -1,359 +0,0 @@ -# Copyright (c) 2017 Cisco and/or its affiliates. -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import socket -import multiprocessing -import argparse -from time import time - - -class Counter(object): - """Counter used for stats collection.""" - def __init__(self, start=0): - """Initializer.""" - self.lock = multiprocessing.Lock() - self.value = start - - def increment(self, value=1): - """Increment counter and return the new value.""" - self.lock.acquire() - val = self.value - try: - self.value += value - finally: - self.lock.release() - return val - - -class timer(object): - """Timer used used during test execution.""" - def __init__(self, verbose=False): - self.verbose = verbose - - def __enter__(self): - """Start the timer.""" - self.start = time() - return self - - def __exit__(self, *args): - """Stop the timer and save current value.""" - self.end = time() - self.secs = self.end - self.start - self.msecs = self.secs * 1000 # millisecs - if self.verbose: - print("elapsed time: {0} ms".format(self.msecs)) - - -class ConfigBlaster(object): - """Generates Netconf requests, receives replies and collects statistics.""" - - TIMEOUT = 10 - - # Hello message with capabilities list for Netconf sessions. - hello = u""" - - urn:ietf:params:netconf:base:1.0 - - - ]]>]]>""" - - # RPC to retrieve VPP version (minimal processing in VPP) - request_template = u""" - - - - - - - - - ]]>]]>""" - - class Stats(object): - """Stores and further processes statistics collected by worker - threads during their execution. - """ - - def __init__(self): - """Initializer.""" - self.ok_rqst_rate = Counter(0) - self.total_rqst_rate = Counter(0) - self.ok_rqsts = Counter(0) - self.total_rqsts = Counter(0) - - def process_stats(self, rqst_stats, elapsed_time): - """Calculates the stats for request/reply throughput, and aggregates - statistics across all threads. - - :param rqst_stats: Request statistics dictionary. - :param elapsed_time: Elapsed time for the test. - :type rqst_stats: dict - :type elapsed_time: int - :returns: Rates (requests/sec) for successfully finished requests - and the total number of requests. - :rtype: tuple - """ - ok_rqsts = rqst_stats["OK"] - total_rqsts = sum(rqst_stats.values()) - - ok_rqst_rate = ok_rqsts / elapsed_time - total_rqst_rate = total_rqsts / elapsed_time - - self.ok_rqsts.increment(ok_rqsts) - self.total_rqsts.increment(total_rqsts) - - self.ok_rqst_rate.increment(ok_rqst_rate) - self.total_rqst_rate.increment(total_rqst_rate) - - return ok_rqst_rate, total_rqst_rate - - @property - def get_ok_rqst_rate(self): - return self.ok_rqst_rate.value - - @property - def get_total_rqst_rate(self): - return self.total_rqst_rate.value - - @property - def get_ok_rqsts(self): - return self.ok_rqsts.value - - @property - def get_total_rqsts(self): - return self.total_rqsts.value - - def __init__(self, host, port, ncycles, nthreads, nrequests): - """Initializer. - - :param host: Target IP address. - :param port: Target port. - :param ncycles: Number of test cycles. - :param nthreads: Number of threads for packet generation. - :param nrequests: Number of requests to send per thread. - :type host: str - :type port: int - :type ncycles: int - :type nthreads: int - :type nrequests: int - """ - - self.host = host - self.port = port - self.ncycles = ncycles - self.nthreads = nthreads - self.nrequests = nrequests - - self.stats = self.Stats() - self.total_ok_rqsts = 0 - - self.print_lock = multiprocessing.Lock() - self.cond = multiprocessing.Condition() - self.threads_done = 0 - - self.recv_buf = 8192 - - def send_request(self, sock): - """Send Netconf request and receive the reply. - - :param sock: Socket object to use for transfer. - :type sock: socket object - :returns: Response to request or error message. - :rtype: str - """ - - sock.send(self.request_template) - try: - return sock.recv(self.recv_buf) - except socket.timeout: - return "timeout" - except socket.error: - return "error" - - def send_requests(self, tid, stats): - """Read entries from the Honeycomb operational data store. This function - is executed by a worker thread. - - :param tid: Thread ID - used to id the Blaster thread when - statistics for the thread are printed out. - :param stats: Synchronized queue object for returning execution stats. - :type tid: int - :type stats: multiprocessing.Queue - """ - - rqst_stats = {"OK": 0, "Error": 0, "Timeout": 0} - - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(5) - # Initiate connection - sock.connect((self.host, self.port)) - # Send hello message - sock.send(self.hello) - # Receive hello message - sock.recv(self.recv_buf) - # Determine length of expected responses - self.recv_buf = len(self.send_request(sock)) - - with self.print_lock: - print("\n Thread {0}:\n" - " Sending {1} requests".format(tid, - self.nrequests)) - - replies = [None]*self.nrequests - with timer() as t: - for x in range(self.nrequests): - sts = self.send_request(sock) - replies[x] = sts - - for reply in replies: - if reply == "timeout": - rqst_stats["Timeout"] += 1 - elif "error" in reply: - rqst_stats["Error"] += 1 - else: - rqst_stats["OK"] += 1 - - ok_rps, total_rps = self.stats.process_stats( - rqst_stats, t.secs) - - with self.print_lock: - print("\n Thread {0} results (READ): ".format(tid)) - print(" Elapsed time: {0:.2f}s,".format(t.secs)) - print(" Requests/s: {0:.2f} OK, {1:.2f} Total".format( - ok_rps, total_rps)) - print(" Stats ({Requests}, {entries}): "), - print(rqst_stats) - self.threads_done += 1 - - sock.close() - - stats.put({"stats": rqst_stats, "time": t.secs}) - - with self.cond: - self.cond.notify_all() - - def run_cycle(self, function): - """Runs a test cycle. Each test consists of test cycles, where - worker threads are started in each test cycle. Each thread - reads entries using Netconf RPCs. - - :param function: Function to be executed in each thread. - :type function: function - """ - - self.total_ok_rqsts = 0 - stats_queue = multiprocessing.Queue() - - for c in range(self.ncycles): - self.stats = self.Stats() - with self.print_lock: - print "\nCycle {0}:".format(c) - - threads = [] - thread_stats = [] - for i in range(self.nthreads): - t = multiprocessing.Process(target=function, - args=(i, stats_queue)) - threads.append(t) - t.start() - - # Wait for all threads to finish and measure the execution time - with timer() as t: - for _ in threads: - thread_stats.append(stats_queue.get()) - for thread in threads: - thread.join() - - for item in thread_stats: - self.stats.process_stats(item["stats"], item["time"]) - - with self.print_lock: - print("\n*** Test summary:") - print(" Elapsed time: {0:.2f}s".format(t.secs)) - print( - " Peak requests/s: {0:.2f} OK, {1:.2f} Total".format( - self.stats.get_ok_rqst_rate, - self.stats.get_total_rqst_rate)) - print( - " Avg. requests/s: {0:.2f} OK, {1:.2f} Total ({2:.2f} " - "of peak total)".format( - self.stats.get_ok_rqsts / t.secs, - self.stats.get_total_rqsts / t.secs, - (self.stats.get_total_rqsts / t.secs * 100) / - self.stats.get_total_rqst_rate)) - - self.total_ok_rqsts += self.stats.get_ok_rqsts - - self.threads_done = 0 - - def add_blaster(self): - """Run the test.""" - self.run_cycle(self.send_requests) - - @property - def get_ok_rqsts(self): - return self.total_ok_rqsts - - -def create_arguments_parser(): - """Creates argument parser for test script. - Shorthand to arg parser on library level in order to access and - eventually enhance in ancestors. - - :returns: argument parser supporting arguments and parameters - :rtype: argparse.ArgumentParser - """ - my_parser = argparse.ArgumentParser( - description="entry reading performance test: Reads entries from " - "the config tree, as specified by optional parameters.") - - my_parser.add_argument( - "--host", default="127.0.0.1", - help="Host where odl controller is running (default is 127.0.0.1).") - my_parser.add_argument( - "--port", default=7777, - help="Port on which Honeycomb's Netconf is listening" - " (default is 7777 for TCP)") - my_parser.add_argument( - "--cycles", type=int, default=1, - help="Number of entry read cycles; default 1. worker threads " - "are started in each cycle and the cycle ends when all threads " - "finish. Another cycle is started when the previous cycle " - "is finished.") - my_parser.add_argument( - "--threads", type=int, default=1, - help="Number of request worker threads to start in each cycle; " - "default=1. Each thread will read entries.") - my_parser.add_argument( - "--requests", type=int, default=10, - help="Number of requests that will be made by each worker thread " - "in each cycle; default 10") - - return my_parser - -if __name__ == "__main__": - - parser = create_arguments_parser() - in_args = parser.parse_args() - - fct = ConfigBlaster(in_args.host, in_args.port, in_args.cycles, - in_args.threads, in_args.requests) - - # Run through , where are started in each cycle and - # are added from each thread - fct.add_blaster() - - print " Successful reads: {0}\n".format(fct.get_ok_rqsts)