Files
kakigoori/images/views.py

277 lines
7.4 KiB
Python

import hashlib
import random
import string
from io import BytesIO
from PIL import Image as PILImage
from PIL import JpegImagePlugin
from django.conf import settings
from django.http import (
JsonResponse,
HttpResponseBadRequest,
HttpResponseNotFound,
)
from django.shortcuts import redirect, render
from django.views.decorators.csrf import csrf_exempt
from images.decorators import (
get_image,
can_upload_variant,
can_upload_image,
)
from images.models import Image, ImageVariant
from images.utils import get_b2_resource, remove_exif_gps_data
JpegImagePlugin._getmp = lambda x: None
def index(request):
return render(request, "index.html")
@csrf_exempt
@can_upload_image
def upload(request):
file = request.FILES["file"]
bucket = get_b2_resource()
filename = file.name
with PILImage.open(file) as im:
height, width = (im.height, im.width)
if im.format == "JPEG":
file_extension = "jpg"
content_type = "image/jpeg"
elif im.format == "PNG":
file_extension = "png"
content_type = "image/png"
else:
new_file = BytesIO()
filename = (
"".join(random.choices(string.ascii_uppercase + string.digits, k=12))
+ ".jpg"
)
im.save(
new_file,
format="jpeg",
quality=95,
icc_profile=im.info.get("icc_profile"),
keep_rgb=True,
)
file = new_file
content_type = "image/jpeg"
file_extension = "jpg"
file.seek(0)
file_md5_hash = hashlib.file_digest(file, "md5").hexdigest()
file.seek(0)
same_md5_image = Image.objects.filter(original_md5=file_md5_hash).first()
if same_md5_image:
return JsonResponse({"created": False, "id": same_md5_image.id})
image = Image(
original_name=filename,
original_mime_type=content_type,
original_md5=file_md5_hash,
height=height,
width=width,
model_version=2,
version=3,
)
image.save()
variant, _ = ImageVariant.objects.get_or_create(
image=image,
height=height,
width=width,
file_type=file_extension,
is_full_size=True,
available=True,
is_primary_variant=True,
)
image_data = remove_exif_gps_data(file.read())
bucket.put_object(
Body=image_data,
Key=variant.backblaze_filepath,
ContentType=content_type,
)
image.create_variant_tasks(variant)
image.uploaded = True
image.save()
return JsonResponse({"created": True, "id": image.id}, status=201)
@can_upload_variant
def image_type_optimization_needed(request, image_type):
variants = ImageVariant.objects.filter(
image__version=3, file_type=image_type, available=False
).all()[:100]
return JsonResponse(
{
"variants": [
{
"original_variant_id": variant.parent_variant_for_optimized_versions.id,
"original_variant_file_type": variant.parent_variant_for_optimized_versions.file_type,
"variant_id": variant.id,
"height": variant.height,
"width": variant.width,
}
for variant in variants
]
}
)
@csrf_exempt
@can_upload_variant
def upload_variant(request):
if "variant_id" not in request.POST:
return HttpResponseBadRequest()
variant = ImageVariant.objects.filter(id=request.POST["variant_id"]).first()
if not variant:
return HttpResponseNotFound()
file = request.FILES["file"]
bucket = get_b2_resource()
if variant.file_type == "avif":
content_type = "image/avif"
elif variant.file_type == "webp":
content_type = "image/webp"
else:
content_type = "binary/octet-stream"
bucket.upload_fileobj(
file, variant.backblaze_filepath, ExtraArgs={"ContentType": content_type}
)
variant.available = True
variant.save()
return JsonResponse({"status": "ok"})
def image_with_size(request, image, width, height, image_type):
gaussian_blur = float(request.GET.get("gaussian_blur", 0))
brightness = float(request.GET.get("brightness", 1))
image_variants = ImageVariant.objects.filter(
image=image,
height=height,
width=width,
gaussian_blur=gaussian_blur,
brightness=brightness,
)
if image_type != "auto":
if image_type == "original":
image_variants = image_variants.filter(file_type__in=["jpg", "png"])
else:
image_variants = image_variants.filter(file_type=image_type)
if image.version == 3:
image_variants = image_variants.filter(available=True)
variants = image_variants.all()
if not variants:
if image_type != "auto" and image_type != "original":
return JsonResponse({"error": "Image version not available"}, status=404)
else:
image_variant = image.create_variant(
width, height, gaussian_blur, brightness
)
return redirect(
f"{settings.S3_PUBLIC_BASE_PATH}/{image_variant.backblaze_filepath}"
)
if image_type == "auto":
variants_preferred_order = ["avif", "webp", "jpegli", "jpg", "png"]
elif image_type == "original":
variants_preferred_order = ["jpg", "png"]
else:
variants_preferred_order = [image_type]
accept_header = request.headers.get("Accept", default="")
for file_type in variants_preferred_order:
if (
file_type == "avif"
and image_type == "auto"
and "image/avif" not in accept_header
):
continue
if (
file_type == "webp"
and image_type == "auto"
and "image/webp" not in accept_header
):
continue
variant = [x for x in image_variants if x.file_type == file_type]
if not variant:
continue
variant = variant[0]
if image.version == 2:
if file_type == "jpegli":
file_name = "jpegli.jpg"
else:
file_name = "image." + file_type
return redirect(
f"{settings.S3_PUBLIC_BASE_PATH}/{image.backblaze_filepath}/{variant.width}-{variant.height}/{file_name}"
)
return redirect(f"{settings.S3_PUBLIC_BASE_PATH}/{variant.backblaze_filepath}")
return HttpResponseNotFound()
@get_image
def get_image_with_height(request, image, height, image_type):
if height >= image.height:
height = image.height
width = image.width
else:
width = int(height * image.width / image.height)
return image_with_size(request, image, width, height, image_type)
@get_image
def get_image_with_width(request, image, width, image_type):
if width >= image.width:
width = image.width
height = image.height
else:
height = int(width * image.height / image.width)
return image_with_size(request, image, width, height, image_type)
@get_image
def get(request, image, image_type):
return image_with_size(request, image, image.width, image.height, image_type)
@get_image
def get_thumbnail(request, image, image_type):
width, height = image.thumbnail_size
return image_with_size(request, image, width, height, image_type)