CSIT Data driver 10/32410/13
authorpmikus <pmikus@cisco.com>
Fri, 21 May 2021 08:11:19 +0000 (08:11 +0000)
committerPeter Mikus <pmikus@cisco.com>
Tue, 25 May 2021 09:26:49 +0000 (09:26 +0000)
+ S3 storage driver implementation

Signed-off-by: pmikus <pmikus@cisco.com>
Change-Id: Ifafec2afbbeb4cd4724c17208ad7ec40b6cd7c96

resources/tools/presentation/requirements.txt
resources/tools/presentation/run_cpta.sh
resources/tools/presentation/run_report.sh
resources/tools/storage/__init__.py [new file with mode: 0644]
resources/tools/storage/__main__.py [new file with mode: 0644]
resources/tools/storage/storage.py [new file with mode: 0755]

index b96c4ab..f5fac53 100644 (file)
@@ -9,3 +9,5 @@ plotly==4.1.1
 PTable==0.9.2
 hdrhistogram==0.6.1
 urllib3==1.25.6
+boto3==1.17.78
+botocore =1.20.78
\ No newline at end of file
index 18b0ec7..8d3dd26 100755 (executable)
@@ -14,8 +14,12 @@ mkdir ${DIR[WORKING]}
 virtualenv -p $(which python3) ${DIR[WORKING]}/env
 source ${DIR[WORKING]}/env/bin/activate
 
-# FIXME: Temporary hack until all docker dns will be solved
-echo "nameserver 172.17.0.1" > /etc/resolv.conf
+# FIXME: s3 config (until migrated to vault, then account will be reset)
+mkdir -p ${HOME}/.aws
+echo "[nomad-s3]" >> ${HOME}/.aws/config
+echo "[nomad-s3]
+aws_access_key_id = csit
+aws_secret_access_key = Csit1234" >> ${HOME}/.aws/credentials
 
 # Install python dependencies:
 pip3 install -r requirements.txt
index fd78816..bc147d4 100755 (executable)
@@ -17,8 +17,12 @@ mkdir ${DIR[WORKING]}
 virtualenv -p $(which python3) ${DIR[WORKING]}/env
 source ${DIR[WORKING]}/env/bin/activate
 
-# FIXME: Temporary hack until all docker dns will be solved
-echo "nameserver 172.17.0.1" > /etc/resolv.conf
+# FIXME: s3 config (until migrated to vault, then account will be reset)
+mkdir -p ${HOME}/.aws
+echo "[nomad-s3]" >> ${HOME}/.aws/config
+echo "[nomad-s3]
+aws_access_key_id = csit
+aws_secret_access_key = Csit1234" >> ${HOME}/.aws/credentials
 
 # Install python dependencies:
 pip3 install -r requirements.txt
diff --git a/resources/tools/storage/__init__.py b/resources/tools/storage/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/resources/tools/storage/__main__.py b/resources/tools/storage/__main__.py
new file mode 100644 (file)
index 0000000..e8452dc
--- /dev/null
@@ -0,0 +1,53 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""S3 Storage Backend."""
+
+from json import dumps
+
+from argparse import ArgumentParser, RawDescriptionHelpFormatter
+
+from .storage import Storage
+
+
+def main():
+    """
+    Main entry function when called from CLI.
+    """
+    parser = ArgumentParser(
+        description=u"S3 Storage Backend Operation.",
+        formatter_class=RawDescriptionHelpFormatter
+    )
+    parser.add_argument(
+        u"-e", u"--expression", required=False, type=str,
+        default=u"select * from s3object s",
+        help=u"S3 compatible SQL query."
+    )
+
+    args = parser.parse_args()
+
+    json_iterator = Storage(
+        endpoint_url=u"http://storage.service.consul:9000",
+        bucket=u"docs",
+        profile_name=u"nomad-s3"
+    ).s3_file_processing(
+        prefix=u"", expression=args.expression
+    )
+    for item in json_iterator:
+        print(dumps(item, indent=4, sort_keys=False))
+
+
+if __name__ == u"__main__":
+    main()
diff --git a/resources/tools/storage/storage.py b/resources/tools/storage/storage.py
new file mode 100755 (executable)
index 0000000..9932bc3
--- /dev/null
@@ -0,0 +1,270 @@
+#!/usr/bin/env/env python3
+
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Storage Backend Class."""
+
+from json import loads
+from struct import unpack
+from gzip import GzipFile
+
+from boto3 import Session
+from botocore import exceptions
+
+S3_API_LIMIT = 1048576
+
+
+class Storage:
+    """Class implementing storage object retrieval.
+    S3 Select API allows us to retrieve a subset of data by using simple SQL
+    expressions. By using Select API to retrieve only the data needed by the
+    application, drastic performance improvements can be achieved.
+    """
+    def __init__(self, endpoint_url, bucket, profile_name):
+        """Class init function to create S3 client object.
+
+        :param endpoint_url: S3 storage endpoint url.
+        :param bucket: S3 parent bucket.
+        :param profile_name: S3 storage configuration.
+        :type endpoint_url: str
+        :type bucket: str
+        :type profile_name: str
+        """
+        self.endpoint_url = endpoint_url
+        self.bucket = bucket
+        self.profile_name = profile_name
+
+        self.session = Session(profile_name=self.profile_name)
+        self.client = self.session.client(
+            service_name=u"s3", endpoint_url=self.endpoint_url
+        )
+        self.resource = self.session.resource(
+            service_name=u"s3", endpoint_url=self.endpoint_url
+        )
+
+    def __repr__(self):
+        """Return a string executable as Python constructor call.
+
+        :returns: Executable constructor call.
+        :rtype: str
+        """
+        return (
+            f"Storage(endpoint_url={self.endpoint_url!r}, "
+            f"bucket={self.bucket!r}, "
+            f"profile_name={self.profile_name!r})"
+        )
+
+    def _get_matching_s3_keys(
+            self, bucket, prefix=u"", suffix=u""):
+        """This function generates the keys in an S3 bucket. Function act as
+        a Python generator object.
+
+        :param bucket: Name of the S3 bucket.
+        :param prefix: Only fetch keys that start with this prefix (optional).
+        :param suffix: Only fetch keys that end with this suffix (optional).
+        :type bucket: str
+        :type prefix: str
+        :type suffix: str
+        :raises RuntimeError: If connection to storage fails.
+        """
+        kwargs = {
+            u"Bucket": bucket
+        }
+
+        prefixes = (prefix, ) if isinstance(prefix, str) else prefix
+
+        for key_prefix in prefixes:
+            kwargs[u"Prefix"] = key_prefix
+            try:
+                paginator = self.client.get_paginator(u"list_objects_v2")
+                for page in paginator.paginate(**kwargs):
+                    try:
+                        contents = page[u"Contents"]
+                    except KeyError:
+                        break
+
+                    for obj in contents:
+                        key = obj[u"Key"]
+                        if key.endswith(suffix):
+                            yield obj
+            except exceptions.EndpointConnectionError:
+                raise RuntimeError(
+                    u"Connection Error!"
+                )
+
+    def _get_matching_s3_content(
+            self, key, expression):
+        """This function filters the contents of an S3 object based on a simple
+        structured query language (SQL) statement. In the request, along with
+        the SQL expression, we are specifying JSON serialization of the object.
+        S3 uses this format to parse object data into records, and returns only
+        records that match the specified SQL expression. Data serialization
+        format for the response is set to JSON.
+
+        :param key: S3 Key (file path).
+        :param expression: S3 compatible SQL query.
+        :type key: str
+        :type expression: str
+        :returns: JSON content of interest.
+        :rtype: str
+        :raises RuntimeError: If connection to storage fails.
+        :raises ValueError: If JSON reading fails.
+        """
+        try:
+            content = self.client.select_object_content(
+                Bucket=self.bucket,
+                Key=key,
+                ExpressionType=u"SQL",
+                Expression=expression,
+                InputSerialization={
+                    u"JSON": {
+                        u"Type": u"Document"
+                    },
+                    u"CompressionType": u"GZIP"
+                },
+                OutputSerialization={
+                    u"JSON": {
+                        u"RecordDelimiter": u""
+                    }
+                }
+            )
+            records = u""
+            for event in content[u"Payload"]:
+                if u"Records" in event:
+                    records = event[u"Records"][u"Payload"].decode(u"utf-8")
+            return records
+        except exceptions.EndpointConnectionError:
+            raise RuntimeError(
+                u"Connection Error!"
+            )
+        except exceptions.EventStreamError:
+            raise ValueError(
+                u"Malformed JSON content!"
+            )
+
+    def _get_matching_s3_object(
+            self, key):
+        """Gets full S3 object. If the file is gzip'd it will be unpacked.
+
+        :param key: Name of the S3 key (file).
+        :type key: str
+        :returns: JSON file of interest.
+        :rtype: str
+        :raises RuntimeError: If connection to storage fails.
+        """
+        try:
+            streaming_object = self.client.get_object(
+                Bucket=self.bucket,
+                Key=key
+            )[u"Body"]
+            with GzipFile(fileobj=streaming_object) as gzipfile:
+                content = gzipfile.read()
+            return content
+        except exceptions.EndpointConnectionError:
+            raise RuntimeError(
+                u"Connection Error!"
+            )
+
+    def _get_matching_s3_length(
+            self, key):
+        """Gets the file size of S3 object. If the file is gzip'd the packed
+        size is reported.
+
+        :param key: Name of the S3 key (file).
+        :type key: str
+        :returns: File size in bytes. Defaults to 0 if any error.
+        :rtype: int
+        :raises RuntimeError: If connection to storage fails.
+        """
+        try:
+            compressed_size = self.client.get_object(
+                Bucket=self.bucket,
+                Key=key
+            )[u"ContentLength"]
+            last_four_bytes = self.client.get_object(
+                Bucket=self.bucket,
+                Key=key,
+                Range=f"bytes={compressed_size-4}-{compressed_size}"
+            )[u"Body"]
+            return unpack(u"I", last_four_bytes.read(4))[0]
+        except exceptions.EndpointConnectionError:
+            return 0
+
+    def is_large_file(
+            self, key):
+        """Returns True if file is larger then 1MB that S3 select allows.
+
+        :param key: Name of the S3 key (file).
+        :type key: str
+        :returns: Returns True if file is large then 1MB that S3 select allows.
+        :rtype: bool
+        """
+        return bool(
+            self._get_matching_s3_length(key=key[u"Key"]) > S3_API_LIMIT
+        )
+
+    def s3_file_processing(
+            self, prefix=u"", suffix=u"json.gz",
+            expression=u"select * from s3object s"):
+        """Batch S3 key processing. Function retrieves list of files and use
+        S3 Select API to query content.
+
+        :param prefix: Only fetch keys that start with this prefix (optional).
+        :param suffix: Only fetch keys that end with this suffix (optional).
+        :param expression: S3 compatible SQL query (optional).
+        :type prefix: str
+        :type suffix: str
+        :type expression: str
+        """
+        key_iterator = self._get_matching_s3_keys(
+            bucket=self.bucket,
+            prefix=prefix,
+            suffix=suffix
+        )
+
+        for key in key_iterator:
+            try:
+                yield key[u"Key"], loads(
+                    self._get_matching_s3_content(
+                        key=key[u"Key"], expression=expression
+                    )
+                )
+            except ValueError:
+                return
+
+    def s3_dump_file_processing(
+            self, prefix=u"", suffix=u"json.gz"):
+        """Batch S3 key processing. Function retrieves list of files and use
+        S3 Get Object API to query content.
+
+        :param prefix: Only fetch keys that start with this prefix (optional).
+        :param suffix: Only fetch keys that end with this suffix (optional).
+        :type prefix: str
+        :type suffix: str
+        """
+        key_iterator = self._get_matching_s3_keys(
+            bucket=self.bucket,
+            prefix=prefix,
+            suffix=suffix
+        )
+
+        for key in key_iterator:
+            try:
+                yield loads(
+                    self._get_matching_s3_object(
+                        key=key[u"Key"]
+                    )
+                )
+            except ValueError:
+                return