import os import io import uuid import aiofiles import asyncio import magic import re from PIL import Image as PILImage from werkzeug.utils import secure_filename from sqlalchemy import func, or_ from flask import request from models import db, Comments, Image, Video, Comic, Post, User def allowed_file(filename, allowed_extensions): return '.' in filename and filename.rsplit('.', 1)[1].lower() in allowed_extensions async def check_file_size(file, max_size): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _sync_check_file_size, file, max_size) def _sync_check_file_size(file, max_size): file.seek(0, os.SEEK_END) file_size = file.tell() file.seek(0) return file_size <= max_size def check_file_content(file, allowed_mime_types): mime = magic.Magic(mime=True) file_mime_type = mime.from_buffer(file.read(1024)) file.seek(0) return file_mime_type in allowed_mime_types async def convert_to_webp(image_file): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _sync_convert_to_webp, image_file) def _sync_convert_to_webp(image_file): with PILImage.open(image_file) as img: output = io.BytesIO() img.convert("RGB").save(output, format="WEBP", quality=90, optimize=True) output.seek(0) return output async def generate_unique_filename(upload_folder, extension): while True: unique_filename = f"{uuid.uuid4().hex}.{extension}" file_path = os.path.join(upload_folder, unique_filename) if not await aiofiles.os.path.exists(file_path): return unique_filename def update_related_tables(old_username, new_username): models_to_update = [Comments, Image, Video, Comic, Post] for model in models_to_update: for record in model.query.filter_by(username=old_username).all(): record.username = new_username db.session.commit() def get_content_query(model, subscriptions, search_query): query = model.query if search_query: tags = [tag.strip().lower() for tag in search_query.replace(',', ' ').split()] filter_condition = [ model.tags.like(f'%{tag}%') for tag in tags ] query = query.filter( or_(*filter_condition) ) if subscriptions: query = query.filter(or_( model.username.in_(subscriptions), model.username.notin_(subscriptions) )) query = query.order_by( func.coalesce(model.cookie_votes, 0).desc(), model.publication_date.desc() ) return query def _sync_check_file_size(file, max_size): file.seek(0, os.SEEK_END) file_size = file.tell() file.seek(0) return file_size <= max_size async def generate_unique_filename(filename, upload_folder): base, ext = os.path.splitext(secure_filename(filename)) while True: unique_filename = f"{base}_{uuid.uuid4().hex}{ext}" file_path = os.path.join(upload_folder, unique_filename) if not await aiofiles.os.path.exists(file_path): return unique_filename def get_client_ip(): if 'X-Forwarded-For' in request.headers: forwarded_for = request.headers['X-Forwarded-For'] ip_address = forwarded_for.split(',')[0] else: ip_address = request.remote_addr return ip_address def validate_username(self, username): username.data = username.data.lower() user = User.query.filter_by(username=username.data).first() if user: return if not re.match(r'^[a-z0-9]+$', username.data): return def validate_ip(self): ip_address = get_client_ip() user_with_ip = User.query.filter_by(ip_address=ip_address).first() if user_with_ip: return def get_autocomplete_suggestions(query): last_tag = query.split(',')[-1].strip() all_tags = Image.query.with_entities(Image.tags).all() unique_tags = set(tag.strip() for tags in all_tags if tags.tags for tag in tags.tags.split(',')) filtered_tags = [tag for tag in unique_tags if last_tag.lower() in tag.lower()] return filtered_tags[:5]