CSIT Data driver
[csit.git] / resources / tools / storage / storage.py
diff --git a/resources/tools/storage/storage.py b/resources/tools/storage/storage.py
new file mode 100755 (executable)
index 0000000..9932bc3
--- /dev/null
@@ -0,0 +1,270 @@
+#!/usr/bin/env/env python3
+
+# Copyright (c) 2021 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Storage Backend Class."""
+
+from json import loads
+from struct import unpack
+from gzip import GzipFile
+
+from boto3 import Session
+from botocore import exceptions
+
+S3_API_LIMIT = 1048576
+
+
+class Storage:
+    """Class implementing storage object retrieval.
+    S3 Select API allows us to retrieve a subset of data by using simple SQL
+    expressions. By using Select API to retrieve only the data needed by the
+    application, drastic performance improvements can be achieved.
+    """
+    def __init__(self, endpoint_url, bucket, profile_name):
+        """Class init function to create S3 client object.
+
+        :param endpoint_url: S3 storage endpoint url.
+        :param bucket: S3 parent bucket.
+        :param profile_name: S3 storage configuration.
+        :type endpoint_url: str
+        :type bucket: str
+        :type profile_name: str
+        """
+        self.endpoint_url = endpoint_url
+        self.bucket = bucket
+        self.profile_name = profile_name
+
+        self.session = Session(profile_name=self.profile_name)
+        self.client = self.session.client(
+            service_name=u"s3", endpoint_url=self.endpoint_url
+        )
+        self.resource = self.session.resource(
+            service_name=u"s3", endpoint_url=self.endpoint_url
+        )
+
+    def __repr__(self):
+        """Return a string executable as Python constructor call.
+
+        :returns: Executable constructor call.
+        :rtype: str
+        """
+        return (
+            f"Storage(endpoint_url={self.endpoint_url!r}, "
+            f"bucket={self.bucket!r}, "
+            f"profile_name={self.profile_name!r})"
+        )
+
+    def _get_matching_s3_keys(
+            self, bucket, prefix=u"", suffix=u""):
+        """This function generates the keys in an S3 bucket. Function act as
+        a Python generator object.
+
+        :param bucket: Name of the S3 bucket.
+        :param prefix: Only fetch keys that start with this prefix (optional).
+        :param suffix: Only fetch keys that end with this suffix (optional).
+        :type bucket: str
+        :type prefix: str
+        :type suffix: str
+        :raises RuntimeError: If connection to storage fails.
+        """
+        kwargs = {
+            u"Bucket": bucket
+        }
+
+        prefixes = (prefix, ) if isinstance(prefix, str) else prefix
+
+        for key_prefix in prefixes:
+            kwargs[u"Prefix"] = key_prefix
+            try:
+                paginator = self.client.get_paginator(u"list_objects_v2")
+                for page in paginator.paginate(**kwargs):
+                    try:
+                        contents = page[u"Contents"]
+                    except KeyError:
+                        break
+
+                    for obj in contents:
+                        key = obj[u"Key"]
+                        if key.endswith(suffix):
+                            yield obj
+            except exceptions.EndpointConnectionError:
+                raise RuntimeError(
+                    u"Connection Error!"
+                )
+
+    def _get_matching_s3_content(
+            self, key, expression):
+        """This function filters the contents of an S3 object based on a simple
+        structured query language (SQL) statement. In the request, along with
+        the SQL expression, we are specifying JSON serialization of the object.
+        S3 uses this format to parse object data into records, and returns only
+        records that match the specified SQL expression. Data serialization
+        format for the response is set to JSON.
+
+        :param key: S3 Key (file path).
+        :param expression: S3 compatible SQL query.
+        :type key: str
+        :type expression: str
+        :returns: JSON content of interest.
+        :rtype: str
+        :raises RuntimeError: If connection to storage fails.
+        :raises ValueError: If JSON reading fails.
+        """
+        try:
+            content = self.client.select_object_content(
+                Bucket=self.bucket,
+                Key=key,
+                ExpressionType=u"SQL",
+                Expression=expression,
+                InputSerialization={
+                    u"JSON": {
+                        u"Type": u"Document"
+                    },
+                    u"CompressionType": u"GZIP"
+                },
+                OutputSerialization={
+                    u"JSON": {
+                        u"RecordDelimiter": u""
+                    }
+                }
+            )
+            records = u""
+            for event in content[u"Payload"]:
+                if u"Records" in event:
+                    records = event[u"Records"][u"Payload"].decode(u"utf-8")
+            return records
+        except exceptions.EndpointConnectionError:
+            raise RuntimeError(
+                u"Connection Error!"
+            )
+        except exceptions.EventStreamError:
+            raise ValueError(
+                u"Malformed JSON content!"
+            )
+
+    def _get_matching_s3_object(
+            self, key):
+        """Gets full S3 object. If the file is gzip'd it will be unpacked.
+
+        :param key: Name of the S3 key (file).
+        :type key: str
+        :returns: JSON file of interest.
+        :rtype: str
+        :raises RuntimeError: If connection to storage fails.
+        """
+        try:
+            streaming_object = self.client.get_object(
+                Bucket=self.bucket,
+                Key=key
+            )[u"Body"]
+            with GzipFile(fileobj=streaming_object) as gzipfile:
+                content = gzipfile.read()
+            return content
+        except exceptions.EndpointConnectionError:
+            raise RuntimeError(
+                u"Connection Error!"
+            )
+
+    def _get_matching_s3_length(
+            self, key):
+        """Gets the file size of S3 object. If the file is gzip'd the packed
+        size is reported.
+
+        :param key: Name of the S3 key (file).
+        :type key: str
+        :returns: File size in bytes. Defaults to 0 if any error.
+        :rtype: int
+        :raises RuntimeError: If connection to storage fails.
+        """
+        try:
+            compressed_size = self.client.get_object(
+                Bucket=self.bucket,
+                Key=key
+            )[u"ContentLength"]
+            last_four_bytes = self.client.get_object(
+                Bucket=self.bucket,
+                Key=key,
+                Range=f"bytes={compressed_size-4}-{compressed_size}"
+            )[u"Body"]
+            return unpack(u"I", last_four_bytes.read(4))[0]
+        except exceptions.EndpointConnectionError:
+            return 0
+
+    def is_large_file(
+            self, key):
+        """Returns True if file is larger then 1MB that S3 select allows.
+
+        :param key: Name of the S3 key (file).
+        :type key: str
+        :returns: Returns True if file is large then 1MB that S3 select allows.
+        :rtype: bool
+        """
+        return bool(
+            self._get_matching_s3_length(key=key[u"Key"]) > S3_API_LIMIT
+        )
+
+    def s3_file_processing(
+            self, prefix=u"", suffix=u"json.gz",
+            expression=u"select * from s3object s"):
+        """Batch S3 key processing. Function retrieves list of files and use
+        S3 Select API to query content.
+
+        :param prefix: Only fetch keys that start with this prefix (optional).
+        :param suffix: Only fetch keys that end with this suffix (optional).
+        :param expression: S3 compatible SQL query (optional).
+        :type prefix: str
+        :type suffix: str
+        :type expression: str
+        """
+        key_iterator = self._get_matching_s3_keys(
+            bucket=self.bucket,
+            prefix=prefix,
+            suffix=suffix
+        )
+
+        for key in key_iterator:
+            try:
+                yield key[u"Key"], loads(
+                    self._get_matching_s3_content(
+                        key=key[u"Key"], expression=expression
+                    )
+                )
+            except ValueError:
+                return
+
+    def s3_dump_file_processing(
+            self, prefix=u"", suffix=u"json.gz"):
+        """Batch S3 key processing. Function retrieves list of files and use
+        S3 Get Object API to query content.
+
+        :param prefix: Only fetch keys that start with this prefix (optional).
+        :param suffix: Only fetch keys that end with this suffix (optional).
+        :type prefix: str
+        :type suffix: str
+        """
+        key_iterator = self._get_matching_s3_keys(
+            bucket=self.bucket,
+            prefix=prefix,
+            suffix=suffix
+        )
+
+        for key in key_iterator:
+            try:
+                yield loads(
+                    self._get_matching_s3_object(
+                        key=key[u"Key"]
+                    )
+                )
+            except ValueError:
+                return