Artberry-web/utils.py
2025-03-07 01:37:22 +02:00

134 lines
4.1 KiB
Python

import os
import io
import uuid
import aiofiles
import asyncio
import magic
import re
from PIL import Image as PILImage
from werkzeug.utils import secure_filename
from sqlalchemy import func, or_
from flask import request
from models import db, Comments, Image, Video, Comic, Post, User
def allowed_file(filename, allowed_extensions):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in allowed_extensions
async def check_file_size(file, max_size):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _sync_check_file_size, file, max_size)
def _sync_check_file_size(file, max_size):
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0)
return file_size <= max_size
def check_file_content(file, allowed_mime_types):
mime = magic.Magic(mime=True)
file_mime_type = mime.from_buffer(file.read(1024))
file.seek(0)
return file_mime_type in allowed_mime_types
async def convert_to_webp(image_file):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _sync_convert_to_webp, image_file)
def _sync_convert_to_webp(image_file):
with PILImage.open(image_file) as img:
output = io.BytesIO()
img.convert("RGB").save(output, format="WEBP", quality=90, optimize=True)
output.seek(0)
return output
async def generate_unique_filename(upload_folder, extension):
while True:
unique_filename = f"{uuid.uuid4().hex}.{extension}"
file_path = os.path.join(upload_folder, unique_filename)
if not await aiofiles.os.path.exists(file_path):
return unique_filename
def update_related_tables(old_username, new_username):
models_to_update = [Comments, Image, Video, Comic, Post]
for model in models_to_update:
for record in model.query.filter_by(username=old_username).all():
record.username = new_username
db.session.commit()
def get_content_query(model, subscriptions, search_query):
query = model.query
if search_query:
tags = [tag.strip().lower() for tag in search_query.replace(',', ' ').split()]
filter_condition = [
model.tags.like(f'%{tag}%') for tag in tags
]
query = query.filter(
or_(*filter_condition)
)
if subscriptions:
query = query.filter(or_(
model.username.in_(subscriptions),
model.username.notin_(subscriptions)
))
query = query.order_by(
func.coalesce(model.cookie_votes, 0).desc(),
model.publication_date.desc()
)
return query
def _sync_check_file_size(file, max_size):
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0)
return file_size <= max_size
async def generate_unique_filename(filename, upload_folder):
base, ext = os.path.splitext(secure_filename(filename))
while True:
unique_filename = f"{base}_{uuid.uuid4().hex}{ext}"
file_path = os.path.join(upload_folder, unique_filename)
if not await aiofiles.os.path.exists(file_path):
return unique_filename
def get_client_ip():
if 'X-Forwarded-For' in request.headers:
forwarded_for = request.headers['X-Forwarded-For']
ip_address = forwarded_for.split(',')[0]
else:
ip_address = request.remote_addr
return ip_address
def validate_username(self, username):
username.data = username.data.lower()
user = User.query.filter_by(username=username.data).first()
if user:
return
if not re.match(r'^[a-z0-9]+$', username.data):
return
def validate_ip(self):
ip_address = get_client_ip()
user_with_ip = User.query.filter_by(ip_address=ip_address).first()
if user_with_ip:
return
def get_autocomplete_suggestions(query):
last_tag = query.split(',')[-1].strip()
all_tags = Image.query.with_entities(Image.tags).all()
unique_tags = set(tag.strip() for tags in all_tags if tags.tags for tag in tags.tags.split(','))
filtered_tags = [tag for tag in unique_tags if last_tag.lower() in tag.lower()]
return filtered_tags[:5]