import subprocess
import sys
import tempfile
-from mimetypes import MimeTypes
import boto3
from botocore.exceptions import ClientError
logging.getLogger(u"botocore").setLevel(logging.INFO)
+FILE_TYPE = {
+ u"xml": u"application/xml",
+ u"html": u"text/html",
+ u"txt": u"text/plain",
+ u"log": u"text/plain",
+ u"css": u"text/css",
+ u"md": u"text/markdown",
+ u"rst": u"text/x-rst",
+ u"csv": u"text/csv",
+ u"svg": u"image/svg+xml",
+ u"jpg": u"image/jpeg",
+ u"png": u"image/png",
+ u"gif": u"image/gif",
+ u"js": u"application/javascript",
+ u"pdf": u"application/pdf",
+ u"json": u"application/json",
+ u"otf": u"font/otf",
+ u"ttf": u"font/ttf",
+ u"woff": u"font/woff",
+ u"woff2": u"font/woff2"
+}
+
+
def compress_text(src_dpath):
"""Compress all text files in directory.
:type src_fpath: str
:type s3_path: str
"""
- mime = MimeTypes().guess_type(src_fpath)[0]
- encoding = MimeTypes().guess_type(src_fpath)[1]
-
- extra_args = {
- u"ContentType": u"text/plain"
- }
- text_html_extra_args = {
- u"ContentType": u"text/html"
- }
- text_plain_extra_args = {
- u"ContentType": u"text/plain"
- }
- app_xml_extra_args = {
- u"ContentType": u"application/xml"
- }
-
- if mime is None and encoding is None:
- extra_args = extra_args
- elif mime is None or mime in u"text/plain":
- extra_args = text_plain_extra_args
- elif mime in u"text/html":
- extra_args = text_html_extra_args
- elif mime in u"application/xml":
- extra_args = app_xml_extra_args
- else:
- extra_args = extra_args
+ def is_gzip_file(filepath):
+ with open(filepath, u"rb") as test_f:
+ return test_f.read(2) == b"\x1f\x8b"
+
+ if os.path.isdir(src_fpath):
+ return
+ if os.path.isfile(src_fpath):
+ file_name, file_extension = os.path.splitext(src_fpath)
+ content_encoding = u""
+ content_type = u"application/octet-stream"
+ if is_gzip_file(src_fpath):
+ file_name, file_extension = os.path.splitext(file_name)
+ content_encoding = "gzip"
+ content_type = FILE_TYPE.get(
+ file_extension.strip("."),
+ u"application/octet-stream"
+ )
- if encoding:
- extra_args[u"ContentEncoding"] = encoding
+ extra_args = dict()
+ extra_args[u"ContentType"] = content_type
+ if content_encoding:
+ extra_args[u"ContentEncoding"] = content_encoding
try:
s3_resource.Bucket(s3_bucket).upload_file(