refactoring

This commit is contained in:
aneuhmanh 2025-03-07 01:37:22 +02:00
parent 2e119ab7df
commit 3fac6070ca
17 changed files with 919 additions and 838 deletions

207
admin.py Normal file
View File

@ -0,0 +1,207 @@
from flask import render_template, redirect, url_for, request, abort
from flask_login import login_required, current_user
from wtforms import StringField, SubmitField
from wtforms.validators import DataRequired
from flask_wtf import FlaskForm
from models import db, User, Comments, Image, Votes, VideoVotes, Video, Comic, ComicVotes, Cookies, UpdateCookiesForm
import os
import shutil
import uuid
import aiofiles
import bcrypt
from utils import update_related_tables
def register_admin_routes(app):
@app.route('/admin', methods=['GET', 'POST'])
@login_required
def admin():
if current_user.username != 'naturefie':
return redirect(url_for('index'))
form = UpdateCookiesForm()
user_cookies = {
user.id: Cookies.query.filter_by(username=user.username).first().cookies if Cookies.query.filter_by(username=user.username).first() else 0
for user in User.query.all()
}
comments = Comments.query.order_by(Comments.comment_date.desc()).all()
return render_template(
'panel.html',
arts=Image.query.all(),
comics=Comic.query.all(),
videos=Video.query.all(),
users=User.query.all(),
comments=comments,
form=form,
user_cookies=user_cookies
)
@app.route('/admin/delete/<content_type>/<int:content_id>', methods=['POST'])
@login_required
def admin_delete_content(content_type, content_id):
models = {
'art': (Image, 'arts', 'image_file', Votes, 'image_id'),
'video': (Video, 'videos', 'video_file', VideoVotes, 'video_id'),
'comic': (Comic, 'comics', 'comic_folder', ComicVotes, 'comic_id')
}
if content_type not in models:
abort(404)
model, folder, file_field, vote_model, foreign_key = models[content_type]
content = model.query.get_or_404(content_id)
vote_model.query.filter(getattr(vote_model, foreign_key) == content_id).delete()
Comments.query.filter(getattr(Comments, foreign_key) == content_id).delete()
file_path = os.path.join(app.config['UPLOAD_FOLDER'][folder], getattr(content, file_field))
if os.path.exists(file_path):
if os.path.isfile(file_path):
os.remove(file_path)
else:
shutil.rmtree(file_path)
db.session.delete(content)
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/delete/user/<int:user_id>', methods=['POST'])
@login_required
def admin_delete_user(user_id):
user = User.query.get_or_404(user_id)
if current_user.username != 'naturefie':
return redirect(url_for('admin'))
db.session.delete(user)
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/update_comment/<int:comment_id>', methods=['POST'])
@login_required
def admin_update_comment(comment_id):
comment = Comments.query.get_or_404(comment_id)
if current_user.username != 'naturefie':
abort(403)
new_text = request.form.get('comment_text', '').strip()
if not new_text:
return redirect(url_for('admin'))
comment.comment_text = new_text
try:
db.session.commit()
print(f"Updated comment ID {comment_id}: {comment.comment_text}")
except Exception as e:
db.session.rollback()
print(f"Error updating comment: {e}")
return redirect(url_for('admin'))
@app.route('/admin/delete_comment/<int:comment_id>', methods=['POST'])
@login_required
def admin_delete_comment(comment_id):
comment = Comments.query.get_or_404(comment_id)
if current_user.username != 'naturefie':
abort(403)
db.session.delete(comment)
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/update_cookies/<int:user_id>', methods=['POST'])
@login_required
def admin_update_cookies(user_id):
user = User.query.get_or_404(user_id)
if request.method == 'POST':
new_cookie_count = request.form.get('cookies', type=int)
if new_cookie_count is not None and new_cookie_count >= 0:
user_cookies = Cookies.query.filter_by(username=user.username).first()
if not user_cookies:
user_cookies = Cookies(username=user.username, cookies=new_cookie_count)
db.session.add(user_cookies)
else:
user_cookies.cookies = new_cookie_count
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/update_video/<int:content_id>', methods=['POST'])
@login_required
def admin_update_video(content_id):
video = Video.query.get_or_404(content_id)
if current_user.username != 'naturefie':
return redirect(url_for('admin'))
new_video_name = request.form.get('video_name')
new_description = request.form.get('description')
new_tags = request.form.get('tags')
if new_video_name and new_video_name != video.video_name:
if len(new_video_name) < 3 or len(new_video_name) > 100:
return redirect(url_for('admin'))
video.video_name = new_video_name
if new_description:
video.description = new_description
if new_tags:
video.tags = new_tags
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/update_user/<int:user_id>', methods=['POST'])
@login_required
def admin_update_user(user_id):
user = User.query.get_or_404(user_id)
if current_user.username != 'naturefie':
return redirect(url_for('admin'))
new_username = request.form.get('username')
new_password = request.form.get('password')
if new_username and new_username != user.username:
if len(new_username) < 3 or len(new_username) > 20:
return redirect(url_for('admin'))
if User.query.filter_by(username=new_username).first():
return redirect(url_for('admin'))
old_username = user.username
user.username = new_username
update_related_tables(old_username, new_username)
if new_password:
if len(new_password) < 6:
return redirect(url_for('admin'))
hashed_password = bcrypt.generate_password_hash(new_password).decode('utf-8')
user.encrypted_password = hashed_password
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/update_tags/<content_type>/<int:content_id>', methods=['POST'])
@login_required
def admin_update_tags(content_type, content_id):
models = {
'art': Image,
'video': Video,
'comic': Comic
}
if content_type not in models:
abort(404)
model = models[content_type]
content = model.query.get_or_404(content_id)
new_tags = request.form.get('tags', '').strip()
content.tags = new_tags
db.session.commit()
return redirect(url_for('admin'))

821
app.py
View File

@ -21,60 +21,24 @@ from flask_wtf.file import FileAllowed
from flask_wtf.csrf import validate_csrf
from wtforms import StringField, PasswordField, SubmitField, FileField, BooleanField, RadioField, SelectField, TextAreaField
from wtforms.validators import DataRequired, Length, EqualTo, ValidationError, Regexp
from wtforms import StringField, PasswordField, SubmitField
from wtforms.validators import DataRequired, EqualTo, Regexp
from flask_wtf import FlaskForm, RecaptchaField
from dotenv import load_dotenv, find_dotenv
import aiofiles.os
from sqlalchemy import func, or_
import magic
dotenv_path = find_dotenv()
load_dotenv(dotenv_path, override=True)
from config import Config
from models import db, bcrypt, User, Comments, Image, Votes, VideoVotes, Video, Comic, ComicPage, ComicVotes, Cookies, Views, Item, UserItem, Post, Subscription, EditTagsForm, EditVideoForm, RegistrationForm, LoginForm, EmptyForm
from admin import register_admin_routes
from upload import upload_bp
from utils import allowed_file, check_file_content, check_file_size, convert_to_webp, generate_unique_filename, get_content_query, get_client_ip, get_autocomplete_suggestions
from auth import auth_bp
app = Flask(__name__)
csrf = CSRFProtect(app)
app.config.update(
SECRET_KEY=os.getenv('SECRET_KEY'),
WTF_CSRF_ENABLED=True,
RECAPTCHA_PUBLIC_KEY=os.getenv('RECAPTCHA_PUBLIC_KEY'),
RECAPTCHA_PRIVATE_KEY=os.getenv('RECAPTCHA_PRIVATE_KEY'),
SQLALCHEMY_DATABASE_URI=os.getenv('SQLALCHEMY_DATABASE_URI'),
UPLOAD_FOLDER={
'images': 'static/arts/',
'arts': 'static/arts/',
'videos': 'static/videos/',
'thumbnails': 'static/thumbnails/',
'avatars': 'static/avatars/',
'banners': 'static/banners/',
'comics': 'static/comics',
'comicthumbs': 'static/comicthumbs/',
'posts': 'static/posts/'
},
ALLOWED_IMAGE_EXTENSIONS={'png', 'jpg', 'jpeg', 'gif', 'webp'},
ALLOWED_VIDEO_EXTENSIONS={'mp4', 'avi', 'mov'},
MAX_IMAGE_SIZE=15 * 1024 * 1024,
MAX_VIDEO_SIZE=10 * 1024 * 1024 * 1024
)
app.config.from_object(Config)
def allowed_file(filename, allowed_extensions):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in allowed_extensions
def check_file_size(file, max_size):
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0)
return file_size <= max_size
def check_file_content(file, allowed_mime_types):
mime = magic.Magic(mime=True)
file_mime_type = mime.from_buffer(file.read(1024))
file.seek(0)
return file_mime_type in allowed_mime_types
db = SQLAlchemy(app)
bcrypt = Bcrypt(app)
db.init_app(app)
bcrypt.init_app(app)
login_manager = LoginManager(app)
login_manager.login_view = 'login'
@ -82,202 +46,9 @@ login_manager.login_view = 'login'
def load_user(user_id):
return User.query.get(int(user_id))
class Comments(db.Model):
__tablename__ = 'comments'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String, nullable=False)
image_id = db.Column(db.Integer, db.ForeignKey('image.id'), nullable=True)
video_id = db.Column(db.Integer, db.ForeignKey('video.id'), nullable=True)
comic_id = db.Column(db.Integer, db.ForeignKey('comics.id'), nullable=True)
post_id = db.Column(db.Integer, db.ForeignKey('post.id'), nullable=True)
comment_text = db.Column(db.Text, nullable=False)
comment_date = db.Column(db.DateTime, default=datetime.utcnow)
image = db.relationship('Image', back_populates='comments')
video = db.relationship('Video', back_populates='comments')
comic = db.relationship('Comic', back_populates='comments', overlaps="comic_link")
post = db.relationship('Post', backref='comments')
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(20), unique=True, nullable=False)
encrypted_password = db.Column(db.String(60), nullable=False)
ip_address = db.Column(db.String(15), nullable=True)
avatar_file = db.Column(db.String(50), nullable=True)
banner_file = db.Column(db.String(50), nullable=True)
bio = db.Column(db.Text, nullable=True)
current_item = db.Column(db.String(30), nullable=True)
def __repr__(self):
return f'<User {self.username}>'
def check_password(self, password):
return bcrypt.check_password_hash(self.encrypted_password, password)
class Image(db.Model):
__tablename__ = 'image'
id = db.Column(db.Integer, primary_key=True)
image_file = db.Column(db.String(40), nullable=False)
username = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
publication_date = db.Column(db.DateTime, default=datetime.utcnow)
tags = db.Column(db.String(100), nullable=True)
cookie_votes = db.Column(db.Integer, default=0)
comments = db.relationship('Comments', back_populates='image', cascade='all, delete-orphan')
class Votes(db.Model):
__tablename__ = 'votes'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(100), nullable=False)
image_id = db.Column(db.Integer, db.ForeignKey('image.id'), nullable=False)
image = db.relationship('Image', backref=db.backref('votes', lazy=True))
def __repr__(self):
return f'<Vote {self.username} for image {self.image_id}>'
class VideoVotes(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(120), nullable=False)
video_id = db.Column(db.Integer, db.ForeignKey('video.id'), nullable=False)
class Video(db.Model):
id = db.Column(db.Integer, primary_key=True)
video_file = db.Column(db.String(100), nullable=False)
video_name = db.Column(db.String(100), nullable=False)
video_thumbnail_file = db.Column(db.String(100), nullable=False)
username = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
publication_date = db.Column(db.DateTime, default=datetime.utcnow)
tags = db.Column(db.String(100), nullable=True)
description = db.Column(db.Text, nullable=True)
cookie_votes = db.Column(db.Integer, default=0)
comments = db.relationship('Comments', back_populates='video')
class Comic(db.Model):
__tablename__ = 'comics'
id = db.Column(db.Integer, primary_key=True)
comic_folder = db.Column(db.String(100), nullable=False)
comic_thumbnail_file = db.Column(db.String(100), nullable=False)
username = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
name = db.Column(db.String(100), nullable=False, unique=True)
publication_date = db.Column(db.DateTime, default=datetime.utcnow)
tags = db.Column(db.String(100), nullable=True)
cookie_votes = db.Column(db.Integer, default=0)
comments = db.relationship('Comments', back_populates='comic', overlaps="comic_link")
pages = db.relationship('ComicPage', back_populates='comic', cascade="all, delete-orphan")
class ComicPage(db.Model):
__tablename__ = 'comic_pages'
id = db.Column(db.Integer, primary_key=True)
comic_id = db.Column(db.Integer, db.ForeignKey('comics.id', ondelete='CASCADE'), nullable=False)
page_number = db.Column(db.Integer, nullable=False)
file_path = db.Column(db.String(200), nullable=False)
comic = db.relationship('Comic', back_populates='pages')
class ComicVotes(db.Model):
__tablename__ = 'comic_votes'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(100), nullable=False)
comic_id = db.Column(db.Integer, db.ForeignKey('comics.id'), nullable=False)
vote = db.Column(db.Integer)
comic = db.relationship('Comic', backref='votes', lazy=True)
class Cookies(db.Model):
username = db.Column(db.String(20), primary_key=True, nullable=False)
cookies = db.Column(db.Integer, default=0)
class Views(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
image_id = db.Column(db.Integer, db.ForeignKey('image.id'), nullable=True)
video_id = db.Column(db.Integer, db.ForeignKey('video.id'), nullable=True)
username = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
view_date = db.Column(db.DateTime, default=datetime.utcnow)
__table_args__ = (
db.UniqueConstraint('image_id', 'username', name='unique_image_view'),
db.UniqueConstraint('video_id', 'username', name='unique_video_view')
)
class Item(db.Model):
__tablename__ = 'items'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
item_path = db.Column(db.String, nullable=False)
price = db.Column(db.Integer, nullable=False, default=0)
visible = db.Column(db.Boolean, default=True)
def __repr__(self):
return f'<Item {self.id}: {self.item_path}, Price: {self.price}, Visible: {self.visible}>'
class UserItem(db.Model):
__tablename__ = 'user_items'
username = db.Column(db.String, db.ForeignKey('user.username'), primary_key=True)
item_id = db.Column(db.Integer, db.ForeignKey('items.id'), primary_key=True)
item = db.relationship('Item', backref=db.backref('user_items', lazy=True))
def __repr__(self):
return f'<UserItem {self.username}, Item {self.item_id}>'
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
post_date = db.Column(db.DateTime, default=datetime.utcnow)
text = db.Column(db.Text, nullable=False)
media_file = db.Column(db.String(100), nullable=True)
user = db.relationship('User', backref=db.backref('posts', lazy=True))
class Subscription(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
author_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
user = db.relationship('User', foreign_keys=[user_id], backref=db.backref('subscriptions', lazy=True))
author = db.relationship('User', foreign_keys=[author_id], backref=db.backref('followers', lazy=True))
def __repr__(self):
return f'<Subscription user_id={self.user_id} author_id={self.author_id}>'
def get_content_query(model, subscriptions, search_query):
query = model.query
if search_query:
tags = [tag.strip().lower() for tag in search_query.replace(',', ' ').split()]
filter_condition = [
model.tags.like(f'%{tag}%') for tag in tags
]
query = query.filter(
or_(*filter_condition)
)
if subscriptions:
query = query.filter(or_(
model.username.in_(subscriptions),
model.username.notin_(subscriptions)
))
query = query.order_by(
func.coalesce(model.cookie_votes, 0).desc(),
model.publication_date.desc()
)
return query
register_admin_routes(app)
app.register_blueprint(upload_bp)
app.register_blueprint(auth_bp)
@app.route('/')
def index():
@ -444,10 +215,6 @@ def view(content_type, id):
comic_pages=comic_pages if content_type == 'comic' else None
)
class EditTagsForm(FlaskForm):
tags = StringField('Tags', validators=[DataRequired(), Length(max=100)], render_kw={"placeholder": "Enter tags"})
submit = SubmitField('Save')
@app.route('/image_edit/<int:id>', methods=['GET', 'POST'])
@login_required
def image_edit(id):
@ -477,13 +244,6 @@ def image_edit(id):
image_preview_url=image_preview_url
)
class EditVideoForm(FlaskForm):
video_name = StringField('Title', validators=[DataRequired(), Length(max=100)], render_kw={"placeholder": "Enter video title"})
video_thumbnail = FileField('Thumbnail', validators=[FileAllowed(['jpg', 'jpeg', 'png', 'gif'], 'Only images!')])
description = TextAreaField('Description', validators=[DataRequired(), Length(max=500)], render_kw={"placeholder": "Enter video description"})
tags = StringField('Tags', validators=[DataRequired(), Length(max=100)], render_kw={"placeholder": "Tags"})
submit = SubmitField('Save')
@app.route('/video_edit/<int:id>', methods=['GET', 'POST'])
@login_required
def video_edit(id):
@ -552,225 +312,6 @@ def autocomplete():
return jsonify(suggestions)
def get_autocomplete_suggestions(query):
last_tag = query.split(',')[-1].strip()
all_tags = Image.query.with_entities(Image.tags).all()
unique_tags = set(tag.strip() for tags in all_tags if tags.tags for tag in tags.tags.split(','))
filtered_tags = [tag for tag in unique_tags if last_tag.lower() in tag.lower()]
return filtered_tags[:5]
class UploadForm(FlaskForm):
image_file = FileField('Choose File', validators=[DataRequired()])
tags = StringField('Tags (comma-separated)', validators=[DataRequired()])
recaptcha = RecaptchaField()
agree_with_rules = BooleanField('I agree with the publication rules',
validators=[DataRequired(message="You must agree with the publication rules.")])
submit = SubmitField('Upload')
async def convert_to_webp(image_file):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _sync_convert_to_webp, image_file)
def _sync_convert_to_webp(image_file):
with PILImage.open(image_file) as img:
output = io.BytesIO()
img.convert("RGB").save(output, format="WEBP", quality=90, optimize=True)
output.seek(0)
return output
@app.route('/upload', methods=['GET', 'POST'])
@login_required
async def upload():
form = UploadForm()
if form.validate_on_submit():
image_file = form.image_file.data
tags = form.tags.data
allowed_mime_types = {'image/png', 'image/jpeg', 'image/gif', 'image/webp'}
if not (allowed_file(image_file.filename, app.config['ALLOWED_IMAGE_EXTENSIONS']) and
check_file_content(image_file, allowed_mime_types) and
await check_file_size(image_file, app.config['MAX_IMAGE_SIZE'])):
return redirect(url_for('upload'))
unique_filename = f"{uuid.uuid4().hex}.webp"
filepath = os.path.join(app.config['UPLOAD_FOLDER']['images'], unique_filename)
if os.path.exists(filepath):
return redirect(url_for('upload'))
webp_image = await convert_to_webp(image_file)
async with aiofiles.open(filepath, 'wb') as f:
await f.write(webp_image.read())
img = Image(image_file=unique_filename, username=current_user.username, tags=tags, cookie_votes=0)
db.session.add(img)
user_cookie = Cookies.query.filter_by(username=current_user.username).first()
if user_cookie:
user_cookie.cookies += 1
else:
user_cookie = Cookies(username=current_user.username, cookies=1)
db.session.add(user_cookie)
db.session.commit()
return redirect(url_for('index'))
return render_template('upload.html', form=form)
def allowed_file(filename, allowed_extensions):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in allowed_extensions
async def check_file_size(file, max_size):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _sync_check_file_size, file, max_size)
def _sync_check_file_size(file, max_size):
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0)
return file_size <= max_size
async def generate_unique_filename(filename, upload_folder):
base, ext = os.path.splitext(secure_filename(filename))
while True:
unique_filename = f"{base}_{uuid.uuid4().hex}{ext}"
file_path = os.path.join(upload_folder, unique_filename)
if not await aiofiles.os.path.exists(file_path):
return unique_filename
class UploadVideoForm(FlaskForm):
video_file = FileField('Video File', validators=[DataRequired()])
thumbnail = FileField('Thumbnail', validators=[DataRequired(), FileAllowed(['jpg', 'png', 'jpeg'])])
name = StringField('Video Name', validators=[DataRequired()])
tags = StringField('Tags', validators=[DataRequired()])
description = StringField('Description', validators=[DataRequired()])
recaptcha = RecaptchaField()
submit = SubmitField('Upload')
agree_with_rules = BooleanField('I agree with the publication rules', validators=[DataRequired()])
@app.route('/upload_video', methods=['GET', 'POST'])
@login_required
async def upload_video():
form = UploadVideoForm()
if form.validate_on_submit():
video_file = form.video_file.data
video_thumbnail = form.thumbnail.data
video_name = form.name.data
tags = form.tags.data
description = form.description.data
allowed_video_mime_types = {'video/mp4', 'video/x-msvideo', 'video/quicktime'}
allowed_image_mime_types = {'image/png', 'image/jpeg', 'image/gif', 'image/webp'}
if video_file and video_thumbnail:
if not (allowed_file(video_file.filename, app.config['ALLOWED_VIDEO_EXTENSIONS']) and
check_file_content(video_file, allowed_video_mime_types)):
return redirect(url_for('upload_video'))
if not await check_file_size(video_file, app.config['MAX_VIDEO_SIZE']):
return redirect(url_for('upload_video'))
if not (allowed_file(video_thumbnail.filename, app.config['ALLOWED_IMAGE_EXTENSIONS']) and
check_file_content(video_thumbnail, allowed_image_mime_types)):
return redirect(url_for('upload_video'))
video_filename = await generate_unique_filename(app.config['UPLOAD_FOLDER']['videos'], 'mp4')
thumbnail_filename = f"{uuid.uuid4().hex}.webp"
video_path = os.path.join(app.config['UPLOAD_FOLDER']['videos'], video_filename)
thumbnail_path = os.path.join(app.config['UPLOAD_FOLDER']['thumbnails'], thumbnail_filename)
async with aiofiles.open(video_path, 'wb') as f:
await f.write(video_file.read())
webp_thumbnail = await convert_to_webp(video_thumbnail)
async with aiofiles.open(thumbnail_path, 'wb') as f:
await f.write(webp_thumbnail.read())
video = Video(
video_file=video_filename,
video_name=video_name,
video_thumbnail_file=thumbnail_filename,
username=current_user.username,
tags=tags,
description=description
)
db.session.add(video)
db.session.commit()
return redirect(url_for('videos'))
return render_template('upload_video.html', form=form)
async def generate_unique_filename(upload_folder, extension):
while True:
unique_filename = f"{uuid.uuid4().hex}.{extension}"
file_path = os.path.join(upload_folder, unique_filename)
if not await aiofiles.os.path.exists(file_path):
return unique_filename
@app.route('/comic_upload', methods=['GET', 'POST'])
@login_required
async def comic_upload():
if request.method == 'POST':
ct = request.files['thumbnail']
n = request.form['title']
tags = request.form.get('tags', '')
allowed_image_mime_types = {'image/png', 'image/jpeg', 'image/gif', 'image/webp'}
if db.session.execute(db.select(Comic).filter_by(name=n)).scalar():
return render_template('comic_upload.html')
if ct:
if not (allowed_file(ct.filename, app.config['ALLOWED_IMAGE_EXTENSIONS']) and
check_file_content(ct, allowed_image_mime_types)):
return redirect(url_for('comic_upload'))
tf = f"{uuid.uuid4().hex}.webp"
tp = os.path.join(app.config['UPLOAD_FOLDER']['comicthumbs'], tf)
webp_thumbnail = await convert_to_webp(ct)
async with aiofiles.open(tp, 'wb') as f:
await f.write(webp_thumbnail.read())
cf = os.path.join(app.config['UPLOAD_FOLDER']['comics'], n)
os.makedirs(cf, exist_ok=True)
new_comic = Comic(
comic_folder=n,
comic_thumbnail_file=tf,
username=current_user.username,
name=n,
tags=tags
)
db.session.add(new_comic)
db.session.flush()
async def save_pages():
pages = request.files.getlist('pages[]')
for i, p in enumerate(sorted(pages, key=lambda x: x.filename), start=1):
if p:
if not (allowed_file(p.filename, app.config['ALLOWED_IMAGE_EXTENSIONS']) and
check_file_content(p, allowed_image_mime_types)):
return redirect(url_for('comic_upload'))
filename = f"{uuid.uuid4().hex}.webp"
file_path = os.path.join(cf, filename)
webp_page = await convert_to_webp(p)
async with aiofiles.open(file_path, 'wb') as f:
await f.write(webp_page.read())
new_page = ComicPage(comic_id=new_comic.id, page_number=i, file_path=file_path)
db.session.add(new_page)
db.session.commit()
await save_pages()
return redirect(url_for('comics'))
return render_template('comic_upload.html')
@app.route('/user_pubs/<pub_type>/<username>')
def user_pubs(pub_type, username):
p = request.args.get('page', 1, type=int)
@ -816,102 +357,6 @@ def user_pubs(pub_type, username):
search_query=search_query
)
class LoginForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
password = PasswordField('Password', validators=[DataRequired()])
recaptcha = RecaptchaField()
submit = SubmitField('Login')
def get_client_ip():
if 'X-Forwarded-For' in request.headers:
forwarded_for = request.headers['X-Forwarded-For']
ip_address = forwarded_for.split(',')[0]
else:
ip_address = request.remote_addr
return ip_address
class RegistrationForm(FlaskForm):
username = StringField(
'Username',
validators=[
DataRequired(),
Length(min=3, max=20),
Regexp(r'^[a-zA-Z0-9_]+$', message="Username can contain only letters, numbers, and underscores.")
]
)
password = PasswordField('Password', validators=[DataRequired(), Length(min=6)])
confirm_password = PasswordField('Confirm Password', validators=[DataRequired(), EqualTo('password')])
recaptcha = RecaptchaField()
submit = SubmitField('Register')
def validate_username(self, username):
username.data = username.data.lower()
user = User.query.filter_by(username=username.data).first()
if user:
return
if not re.match(r'^[a-z0-9]+$', username.data):
return
def validate_ip(self):
ip_address = get_client_ip()
user_with_ip = User.query.filter_by(ip_address=ip_address).first()
if user_with_ip:
return
@app.route('/register', methods=['GET', 'POST'])
def register():
form = RegistrationForm()
if form.validate_on_submit():
hashed_password = bcrypt.generate_password_hash(form.password.data).decode('utf-8')
ip_address = get_client_ip()
existing_user = User.query.filter_by(ip_address=ip_address).first()
if existing_user:
return render_template('register.html', form=form)
username = form.username.data.lower()
user = User(username=username, encrypted_password=hashed_password, ip_address=ip_address)
try:
db.session.add(user)
db.session.commit()
return redirect(url_for('login'))
except IntegrityError:
db.session.rollback()
return render_template('register.html', form=form)
@app.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user and user.check_password(form.password.data):
login_user(user)
if user.ip_address is None:
ip_address = get_client_ip()
user.ip_address = ip_address
db.session.commit()
return redirect(url_for('profile', username=user.username))
return render_template('login.html', form=form)
@app.route('/logout')
def logout():
logout_user()
return redirect(url_for('index'))
@app.route('/delete/<content_type>/<int:content_id>', methods=['POST'])
@login_required
def delete(content_type, content_id):
@ -1065,9 +510,6 @@ def vote_comic(comic_id):
return redirect(url_for('view', content_type='comic', id=comic_id))
class EmptyForm(FlaskForm):
pass
@app.route('/comic_edit/<int:comic_id>', methods=['GET', 'POST'])
@login_required
async def comic_edit(comic_id):
@ -1143,38 +585,6 @@ def update_related_tables(old_username, new_username):
db.session.commit()
@app.route('/upload_post', methods=['GET', 'POST'])
@login_required
async def upload_post():
if request.method == 'POST':
post_text = request.form.get('post_text')
post_media = request.files.get('post_media')
if post_text:
new_post = Post(
username=current_user.username,
text=post_text
)
db.session.add(new_post)
if post_media and allowed_file(post_media.filename, app.config['ALLOWED_IMAGE_EXTENSIONS']):
if await check_file_size(post_media, app.config['MAX_IMAGE_SIZE']):
unique_filename = f"{uuid.uuid4().hex}.webp"
media_path = os.path.join(app.config['UPLOAD_FOLDER']['posts'], unique_filename)
webp_image = await convert_to_webp(post_media)
async with aiofiles.open(media_path, 'wb') as f:
await f.write(webp_image.read())
new_post.media_file = unique_filename
else:
return redirect(url_for('upload_post'))
db.session.commit()
return redirect(url_for('user_posts', username=current_user.username))
else:
return redirect(url_for('upload_post'))
return render_template('upload_post.html')
@app.route('/profile_edit', methods=['GET', 'POST'])
@login_required
def profile_edit():
@ -1418,211 +828,6 @@ def privacy_policy():
def terms_of_use():
return render_template('terms_of_use.html')
@app.route('/admin', methods=['GET', 'POST'])
@login_required
def admin():
if current_user.username != 'naturefie':
return redirect(url_for('index'))
form = UpdateCookiesForm()
user_cookies = {
user.id: Cookies.query.filter_by(username=user.username).first().cookies if Cookies.query.filter_by(username=user.username).first() else 0
for user in User.query.all()
}
comments = Comments.query.order_by(Comments.comment_date.desc()).all()
return render_template(
'panel.html',
arts=Image.query.all(),
comics=Comic.query.all(),
videos=Video.query.all(),
users=User.query.all(),
comments=comments,
form=form,
user_cookies=user_cookies
)
@app.route('/admin/delete/<content_type>/<int:content_id>', methods=['POST'])
@login_required
def admin_delete_content(content_type, content_id):
models = {
'art': (Image, 'arts', 'image_file', Votes, 'image_id'),
'video': (Video, 'videos', 'video_file', VideoVotes, 'video_id'),
'comic': (Comic, 'comics', 'comic_folder', ComicVotes, 'comic_id')
}
if content_type not in models:
abort(404)
model, folder, file_field, vote_model, foreign_key = models[content_type]
content = model.query.get_or_404(content_id)
vote_model.query.filter(getattr(vote_model, foreign_key) == content_id).delete()
Comments.query.filter(getattr(Comments, foreign_key) == content_id).delete()
file_path = os.path.join(app.config['UPLOAD_FOLDER'][folder], getattr(content, file_field))
if os.path.exists(file_path):
if os.path.isfile(file_path):
os.remove(file_path)
else:
shutil.rmtree(file_path)
db.session.delete(content)
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/delete/user/<int:user_id>', methods=['POST'])
@login_required
def admin_delete_user(user_id):
user = User.query.get_or_404(user_id)
if current_user.username != 'naturefie':
return redirect(url_for('admin'))
db.session.delete(user)
db.session.commit()
return redirect(url_for('admin'))
class UpdateCookiesForm(FlaskForm):
cookies = StringField('Количество печенек', validators=[DataRequired()])
submit = SubmitField('Применить')
@app.route('/admin/update_comment/<int:comment_id>', methods=['POST'])
@login_required
def admin_update_comment(comment_id):
comment = Comments.query.get_or_404(comment_id)
if current_user.username != 'naturefie':
abort(403)
new_text = request.form.get('comment_text', '').strip()
if not new_text:
pass
return redirect(url_for('admin'))
comment.comment_text = new_text
try:
db.session.commit()
print(f"Updated comment ID {comment_id}: {comment.comment_text}")
except Exception as e:
db.session.rollback()
print(f"Error updating comment: {e}")
return redirect(url_for('admin'))
@app.route('/admin/delete_comment/<int:comment_id>', methods=['POST'])
@login_required
def admin_delete_comment(comment_id):
comment = Comments.query.get_or_404(comment_id)
if current_user.username != 'naturefie':
abort(403)
db.session.delete(comment)
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/update_cookies/<int:user_id>', methods=['POST'])
@login_required
def admin_update_cookies(user_id):
user = User.query.get_or_404(user_id)
if request.method == 'POST':
new_cookie_count = request.form.get('cookies', type=int)
if new_cookie_count is not None and new_cookie_count >= 0:
user_cookies = Cookies.query.filter_by(username=user.username).first()
if not user_cookies:
user_cookies = Cookies(username=user.username, cookies=new_cookie_count)
db.session.add(user_cookies)
else:
user_cookies.cookies = new_cookie_count
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/update_video/<int:content_id>', methods=['POST'])
@login_required
def admin_update_video(content_id):
video = Video.query.get_or_404(content_id)
if current_user.username != 'naturefie':
return redirect(url_for('admin'))
new_video_name = request.form.get('video_name')
new_description = request.form.get('description')
new_tags = request.form.get('tags')
if new_video_name and new_video_name != video.video_name:
if len(new_video_name) < 3 or len(new_video_name) > 100:
return redirect(url_for('admin'))
video.video_name = new_video_name
if new_description:
video.description = new_description
if new_tags:
video.tags = new_tags
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/update_user/<int:user_id>', methods=['POST'])
@login_required
def admin_update_user(user_id):
user = User.query.get_or_404(user_id)
if current_user.username != 'naturefie':
return redirect(url_for('admin'))
new_username = request.form.get('username')
new_password = request.form.get('password')
if new_username and new_username != user.username:
if len(new_username) < 3 or len(new_username) > 20:
return redirect(url_for('admin'))
if User.query.filter_by(username=new_username).first():
return redirect(url_for('admin'))
old_username = user.username
user.username = new_username
update_related_tables(old_username, new_username)
if new_password:
if len(new_password) < 6:
return redirect(url_for('admin'))
hashed_password = bcrypt.generate_password_hash(new_password).decode('utf-8')
user.encrypted_password = hashed_password
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/update_tags/<content_type>/<int:content_id>', methods=['POST'])
@login_required
def admin_update_tags(content_type, content_id):
models = {
'art': Image,
'video': Video,
'comic': Comic
}
if content_type not in models:
abort(404)
model = models[content_type]
content = model.query.get_or_404(content_id)
new_tags = request.form.get('tags', '').strip()
content.tags = new_tags
db.session.commit()
return redirect(url_for('admin'))
@app.route('/publication_rules')
def publication_rules():
return render_template('publication_rules.html')
@ -1657,4 +862,4 @@ def buy_item(item_id):
if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(debug=False)
app.run(debug=True)

66
auth.py Normal file
View File

@ -0,0 +1,66 @@
from flask import Blueprint, render_template, redirect, url_for, request
from flask_login import login_user, logout_user, login_required, current_user
from sqlalchemy.exc import IntegrityError
from models import db, User
from utils import get_client_ip
from models import RegistrationForm, LoginForm, PasswordField, RecaptchaField, SubmitField
from flask_bcrypt import Bcrypt
from wtforms.validators import DataRequired, Length, EqualTo
auth_bp = Blueprint('auth', __name__)
bcrypt = Bcrypt()
password = PasswordField('Password', validators=[DataRequired(), Length(min=6)])
confirm_password = PasswordField('Confirm Password', validators=[DataRequired(), EqualTo('password')])
recaptcha = RecaptchaField()
submit = SubmitField('Register')
@auth_bp.route('/register', methods=['GET', 'POST'])
def register():
form = RegistrationForm()
if form.validate_on_submit():
hashed_password = bcrypt.generate_password_hash(form.password.data).decode('utf-8')
ip_address = get_client_ip()
existing_user = User.query.filter_by(ip_address=ip_address).first()
if existing_user:
return render_template('register.html', form=form)
username = form.username.data.lower()
user = User(username=username, encrypted_password=hashed_password, ip_address=ip_address)
try:
db.session.add(user)
db.session.commit()
return redirect(url_for('auth.login'))
except IntegrityError:
db.session.rollback()
return render_template('register.html', form=form)
@auth_bp.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user and user.check_password(form.password.data):
login_user(user)
if user.ip_address is None:
ip_address = get_client_ip()
user.ip_address = ip_address
db.session.commit()
return redirect(url_for('profile', username=user.username))
return render_template('login.html', form=form)
@auth_bp.route('/logout')
def logout():
logout_user()
return redirect(url_for('index'))

27
config.py Normal file
View File

@ -0,0 +1,27 @@
import os
from dotenv import load_dotenv, find_dotenv
dotenv_path = find_dotenv()
load_dotenv(dotenv_path, override=True)
class Config:
SECRET_KEY = os.getenv('SECRET_KEY')
WTF_CSRF_ENABLED = True
RECAPTCHA_PUBLIC_KEY = os.getenv('RECAPTCHA_PUBLIC_KEY')
RECAPTCHA_PRIVATE_KEY = os.getenv('RECAPTCHA_PRIVATE_KEY')
SQLALCHEMY_DATABASE_URI = os.getenv('SQLALCHEMY_DATABASE_URI')
UPLOAD_FOLDER = {
'images': 'static/arts/',
'arts': 'static/arts/',
'videos': 'static/videos/',
'thumbnails': 'static/thumbnails/',
'avatars': 'static/avatars/',
'banners': 'static/banners/',
'comics': 'static/comics',
'comicthumbs': 'static/comicthumbs/',
'posts': 'static/posts/'
}
ALLOWED_IMAGE_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
ALLOWED_VIDEO_EXTENSIONS = {'mp4', 'avi', 'mov'}
MAX_IMAGE_SIZE = 15 * 1024 * 1024
MAX_VIDEO_SIZE = 10 * 1024 * 1024 * 1024

259
models.py Normal file
View File

@ -0,0 +1,259 @@
import os
import io
import re
import uuid
import shutil
import random
import asyncio
from datetime import datetime
import requests
from PIL import Image as PILImage
from sqlalchemy.exc import IntegrityError
from werkzeug.exceptions import BadRequest
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
from flask import Flask, abort, render_template, redirect, url_for, request, flash, session, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_bcrypt import Bcrypt
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from flask_wtf import FlaskForm, RecaptchaField, CSRFProtect
from flask_wtf.file import FileAllowed
from flask_wtf.csrf import validate_csrf
from wtforms import StringField, PasswordField, SubmitField, FileField, BooleanField, RadioField, SelectField, TextAreaField
from wtforms.validators import DataRequired, Length, EqualTo, ValidationError, Regexp
from dotenv import load_dotenv, find_dotenv
import aiofiles.os
from sqlalchemy import func, or_
import magic
from config import Config
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, SubmitField, FileField, BooleanField
from wtforms.validators import DataRequired
from flask_wtf import RecaptchaField
from flask_wtf.file import FileAllowed
db = SQLAlchemy()
bcrypt = Bcrypt()
class Comments(db.Model):
__tablename__ = 'comments'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String, nullable=False)
image_id = db.Column(db.Integer, db.ForeignKey('image.id'), nullable=True)
video_id = db.Column(db.Integer, db.ForeignKey('video.id'), nullable=True)
comic_id = db.Column(db.Integer, db.ForeignKey('comics.id'), nullable=True)
post_id = db.Column(db.Integer, db.ForeignKey('post.id'), nullable=True)
comment_text = db.Column(db.Text, nullable=False)
comment_date = db.Column(db.DateTime, default=datetime.utcnow)
image = db.relationship('Image', back_populates='comments')
video = db.relationship('Video', back_populates='comments')
comic = db.relationship('Comic', back_populates='comments', overlaps="comic_link")
post = db.relationship('Post', backref='comments')
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(20), unique=True, nullable=False)
encrypted_password = db.Column(db.String(60), nullable=False)
ip_address = db.Column(db.String(15), nullable=True)
avatar_file = db.Column(db.String(50), nullable=True)
banner_file = db.Column(db.String(50), nullable=True)
bio = db.Column(db.Text, nullable=True)
current_item = db.Column(db.String(30), nullable=True)
def __repr__(self):
return f'<User {self.username}>'
def check_password(self, password):
return bcrypt.check_password_hash(self.encrypted_password, password)
class Image(db.Model):
__tablename__ = 'image'
id = db.Column(db.Integer, primary_key=True)
image_file = db.Column(db.String(40), nullable=False)
username = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
publication_date = db.Column(db.DateTime, default=datetime.utcnow)
tags = db.Column(db.String(100), nullable=True)
cookie_votes = db.Column(db.Integer, default=0)
comments = db.relationship('Comments', back_populates='image', cascade='all, delete-orphan')
class Votes(db.Model):
__tablename__ = 'votes'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(100), nullable=False)
image_id = db.Column(db.Integer, db.ForeignKey('image.id'), nullable=False)
image = db.relationship('Image', backref=db.backref('votes', lazy=True))
def __repr__(self):
return f'<Vote {self.username} for image {self.image_id}>'
class VideoVotes(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(120), nullable=False)
video_id = db.Column(db.Integer, db.ForeignKey('video.id'), nullable=False)
class Video(db.Model):
id = db.Column(db.Integer, primary_key=True)
video_file = db.Column(db.String(100), nullable=False)
video_name = db.Column(db.String(100), nullable=False)
video_thumbnail_file = db.Column(db.String(100), nullable=False)
username = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
publication_date = db.Column(db.DateTime, default=datetime.utcnow)
tags = db.Column(db.String(100), nullable=True)
description = db.Column(db.Text, nullable=True)
cookie_votes = db.Column(db.Integer, default=0)
comments = db.relationship('Comments', back_populates='video')
class Comic(db.Model):
__tablename__ = 'comics'
id = db.Column(db.Integer, primary_key=True)
comic_folder = db.Column(db.String(100), nullable=False)
comic_thumbnail_file = db.Column(db.String(100), nullable=False)
username = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
name = db.Column(db.String(100), nullable=False, unique=True)
publication_date = db.Column(db.DateTime, default=datetime.utcnow)
tags = db.Column(db.String(100), nullable=True)
cookie_votes = db.Column(db.Integer, default=0)
comments = db.relationship('Comments', back_populates='comic', overlaps="comic_link")
pages = db.relationship('ComicPage', back_populates='comic', cascade="all, delete-orphan")
class ComicPage(db.Model):
__tablename__ = 'comic_pages'
id = db.Column(db.Integer, primary_key=True)
comic_id = db.Column(db.Integer, db.ForeignKey('comics.id', ondelete='CASCADE'), nullable=False)
page_number = db.Column(db.Integer, nullable=False)
file_path = db.Column(db.String(200), nullable=False)
comic = db.relationship('Comic', back_populates='pages')
class ComicVotes(db.Model):
__tablename__ = 'comic_votes'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(100), nullable=False)
comic_id = db.Column(db.Integer, db.ForeignKey('comics.id'), nullable=False)
vote = db.Column(db.Integer)
comic = db.relationship('Comic', backref='votes', lazy=True)
class Cookies(db.Model):
username = db.Column(db.String(20), primary_key=True, nullable=False)
cookies = db.Column(db.Integer, default=0)
class Views(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
image_id = db.Column(db.Integer, db.ForeignKey('image.id'), nullable=True)
video_id = db.Column(db.Integer, db.ForeignKey('video.id'), nullable=True)
username = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
view_date = db.Column(db.DateTime, default=datetime.utcnow)
__table_args__ = (
db.UniqueConstraint('image_id', 'username', name='unique_image_view'),
db.UniqueConstraint('video_id', 'username', name='unique_video_view')
)
class Item(db.Model):
__tablename__ = 'items'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
item_path = db.Column(db.String, nullable=False)
price = db.Column(db.Integer, nullable=False, default=0)
visible = db.Column(db.Boolean, default=True)
def __repr__(self):
return f'<Item {self.id}: {self.item_path}, Price: {self.price}, Visible: {self.visible}>'
class UserItem(db.Model):
__tablename__ = 'user_items'
username = db.Column(db.String, db.ForeignKey('user.username'), primary_key=True)
item_id = db.Column(db.Integer, db.ForeignKey('items.id'), primary_key=True)
item = db.relationship('Item', backref=db.backref('user_items', lazy=True))
def __repr__(self):
return f'<UserItem {self.username}, Item {self.item_id}>'
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
post_date = db.Column(db.DateTime, default=datetime.utcnow)
text = db.Column(db.Text, nullable=False)
media_file = db.Column(db.String(100), nullable=True)
user = db.relationship('User', backref=db.backref('posts', lazy=True))
class Subscription(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
author_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
user = db.relationship('User', foreign_keys=[user_id], backref=db.backref('subscriptions', lazy=True))
author = db.relationship('User', foreign_keys=[author_id], backref=db.backref('followers', lazy=True))
def __repr__(self):
return f'<Subscription user_id={self.user_id} author_id={self.author_id}>'
class UploadForm(FlaskForm):
image_file = FileField('Choose File', validators=[DataRequired()])
tags = StringField('Tags (comma-separated)', validators=[DataRequired()])
recaptcha = RecaptchaField()
agree_with_rules = BooleanField('I agree with the publication rules',
validators=[DataRequired(message="You must agree with the publication rules.")])
submit = SubmitField('Upload')
class UploadVideoForm(FlaskForm):
video_file = FileField('Video File', validators=[DataRequired()])
thumbnail = FileField('Thumbnail', validators=[DataRequired(), FileAllowed(['jpg', 'png', 'jpeg'])])
name = StringField('Video Name', validators=[DataRequired()])
tags = StringField('Tags', validators=[DataRequired()])
description = StringField('Description', validators=[DataRequired()])
recaptcha = RecaptchaField()
submit = SubmitField('Upload')
agree_with_rules = BooleanField('I agree with the publication rules', validators=[DataRequired()])
class EditTagsForm(FlaskForm):
tags = StringField('Tags', validators=[DataRequired(), Length(max=100)], render_kw={"placeholder": "Enter tags"})
submit = SubmitField('Save')
class EditVideoForm(FlaskForm):
video_name = StringField('Title', validators=[DataRequired(), Length(max=100)], render_kw={"placeholder": "Enter video title"})
video_thumbnail = FileField('Thumbnail', validators=[FileAllowed(['jpg', 'jpeg', 'png', 'gif'], 'Only images!')])
description = TextAreaField('Description', validators=[DataRequired(), Length(max=500)], render_kw={"placeholder": "Enter video description"})
tags = StringField('Tags', validators=[DataRequired(), Length(max=100)], render_kw={"placeholder": "Tags"})
submit = SubmitField('Save')
class LoginForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
password = PasswordField('Password', validators=[DataRequired()])
recaptcha = RecaptchaField()
submit = SubmitField('Login')
class RegistrationForm(FlaskForm):
username = StringField(
'Username',
validators=[
DataRequired(),
Length(min=3, max=20),
Regexp(r'^[a-zA-Z0-9_]+$', message="Username can contain only letters, numbers, and underscores.")
]
)
password = PasswordField('Password', validators=[DataRequired(), Length(min=6)])
confirm_password = PasswordField('Confirm Password', validators=[DataRequired(), EqualTo('password')])
recaptcha = RecaptchaField()
submit = SubmitField('Register')
class EmptyForm(FlaskForm):
pass
class UpdateCookiesForm(FlaskForm):
cookies = StringField('Количество печенек', validators=[DataRequired()])
submit = SubmitField('Применить')

Binary file not shown.

View File

@ -18,10 +18,10 @@
<div class="link-container">
{% if title == 'Register' %}
<span class="link-text">Already have an account?</span>
<a href="{{ url_for('login') }}" class="button">Login</a>
<a href="{{ url_for('auth.login') }}" class="button">Login</a>
{% elif title == 'Login' %}
<span class="link-text">Don't have an account?</span>
<a href="{{ url_for('register') }}" class="button">Register</a>
<a href="{{ url_for('auth.register') }}" class="button">Register</a>
{% endif %}
</div>
</div>

View File

@ -12,7 +12,7 @@
</head>
<body>
<h1>Upload Comic</h1>
<form class="upload-comic-form" method="POST" action="{{ url_for('comic_upload') }}" enctype="multipart/form-data">
<form class="upload-comic-form" method="POST" action="{{ url_for('upload.comic_upload') }}" enctype="multipart/form-data">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<div class="form-group">

View File

@ -1,6 +1,6 @@
{% extends "auth.html" %}
{% block title %}🫐login - artberry🫐{% endblock %}
{% set action = 'login' %}
{% set action = 'auth.login' %}
{% block content %}
<form method="POST" action="{{ url_for(action) }}">
@ -30,6 +30,6 @@
<div class="link-container">
<span class="link-text">Don't have an account?</span>
<a href="{{ url_for('register') }}" class="button">Register</a>
<a href="{{ url_for('auth.register') }}" class="button">Register</a>
</div>
{% endblock %}

View File

@ -1,6 +1,5 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=no">
<meta charset="UTF-8">
@ -14,7 +13,6 @@
}
</style>
</head>
<body>
<div class="banner">
<img src="{{ url_for('static', filename='banners/' + (user.banner_file if user.banner_file else 'default-banner.png')) }}"
@ -32,7 +30,6 @@
{% endif %}
</div>
</div>
<div class="bio">
<p class="biotext">{{ user.username }}</p>
<p class="biotext">{{ user.bio }}</p>
@ -41,7 +38,7 @@
<a href="{{ url_for('user_posts', username=user.username) }}" class="button">Posts</a>
{% if is_current_user %}
<a href="{{ url_for('profile_edit') }}" class="button">Edit Profile</a>
<a href="{{ url_for('upload_post') }}" class="button">Upload Post</a>
<a href="{{ url_for('upload.upload_post') }}" class="button">Upload Post</a>
{% endif %}
{% if current_user.is_authenticated and current_user.username != user.username %}
@ -125,8 +122,5 @@
<p>No published comics.</p>
{% endif %}
</section>
</body>
</html>

View File

@ -1,8 +1,6 @@
<link rel="icon" href="{{ url_for('static', filename='artberry.ico') }}" type="image/x-icon">
{% extends "auth.html" %}
{% block title %}🫐register - artberry🫐{% endblock %}
{% set action = 'register' %}
{% set action = 'auth.register' %}
{% block content %}
<form method="POST" action="{{ url_for(action) }}">
{{ form.hidden_tag() }}
@ -23,9 +21,8 @@
{{ form.submit(class="login-button button", value="Register") }}
</form>
<div class="link-container">
<span class="link-text">Already have an account?</span>
<a href="{{ url_for('login') }}" class="button">Login</a>
<a href="{{ url_for('auth.login') }}" class="button">Login</a>
</div>
{% endblock %}

View File

@ -14,7 +14,7 @@
<h1>Upload Art</h1>
</header>
<main>
<form method="POST" action="{{ url_for('upload') }}" enctype="multipart/form-data" class="upload-form">
<form method="POST" action="{{ url_for('upload.upload') }}" enctype="multipart/form-data" class="upload-form">
{{ form.hidden_tag() }}
<div class="form-group">
<label for="image_file">{{ form.image_file.label }}</label>

View File

@ -10,10 +10,10 @@
</head>
<body>
<h2>Create New Post</h2>
<form method="POST" action="{{ url_for('upload_post') }}" enctype="multipart/form-data">
<form method="POST" action="{{ url_for('upload.upload_post') }}" enctype="multipart/form-data">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<label for="post_text">Post Text:</label><br>
<label for="post_text">Post Text</label><br>
<textarea id="post_text" name="post_text" class="input-field" placeholder="Write your post..." maxlength="128" required></textarea><br>
<label for="post_media">Upload Media (Image/GIF):</label><br>

View File

@ -15,7 +15,7 @@
<h1>Upload Video</h1>
</header>
<main>
<form method="POST" action="{{ url_for('upload_video') }}" enctype="multipart/form-data" class="upload-video-form">
<form method="POST" action="{{ url_for('upload.upload_video') }}" enctype="multipart/form-data" class="upload-video-form">
{{ form.hidden_tag() }}
<div class="form-group">

View File

@ -14,12 +14,12 @@
{{ form.hidden_tag() }}
<div class="form-group">
<label for="video_name">Название видео</label>
<label for="video_name">Video Name</label>
{{ form.video_name(class="input-field") }}
</div>
<div class="form-group">
<label for="video_thumbnail">Обложка видео</label>
<label for="video_thumbnail">Video Thumbnail</label>
{{ form.video_thumbnail(class="file-input") }}
{% if video.video_thumbnail_file %}
<img src="{{ url_for('static', filename='thumbnails/' + video.video_thumbnail_file) }}" alt="Thumbnail preview" style="display: flex;margin: 0 auto;">

192
upload.py Normal file
View File

@ -0,0 +1,192 @@
import os
import uuid
import aiofiles
from flask import Blueprint, render_template, redirect, url_for, request, current_app
from flask_login import login_required, current_user
from werkzeug.utils import secure_filename
from models import db, Image, Video, Comic, ComicPage, Post, Cookies, UploadForm, UploadVideoForm
from utils import allowed_file, check_file_content, check_file_size, convert_to_webp, generate_unique_filename
upload_bp = Blueprint('upload', __name__)
@upload_bp.route('/upload', methods=['GET', 'POST'])
@login_required
async def upload():
form = UploadForm()
if form.validate_on_submit():
image_file = form.image_file.data
tags = form.tags.data
allowed_mime_types = {'image/png', 'image/jpeg', 'image/gif', 'image/webp'}
if not (allowed_file(image_file.filename, current_app.config['ALLOWED_IMAGE_EXTENSIONS']) and
check_file_content(image_file, allowed_mime_types) and
await check_file_size(image_file, current_app.config['MAX_IMAGE_SIZE'])):
return redirect(url_for('upload.upload'))
unique_filename = f"{uuid.uuid4().hex}.webp"
filepath = os.path.join(current_app.config['UPLOAD_FOLDER']['images'], unique_filename)
if os.path.exists(filepath):
return redirect(url_for('upload.upload'))
webp_image = await convert_to_webp(image_file)
async with aiofiles.open(filepath, 'wb') as f:
await f.write(webp_image.read())
img = Image(image_file=unique_filename, username=current_user.username, tags=tags, cookie_votes=0)
db.session.add(img)
user_cookie = Cookies.query.filter_by(username=current_user.username).first()
if user_cookie:
user_cookie.cookies += 1
else:
user_cookie = Cookies(username=current_user.username, cookies=1)
db.session.add(user_cookie)
db.session.commit()
return redirect(url_for('index'))
return render_template('upload.html', form=form)
@upload_bp.route('/upload_video', methods=['GET', 'POST'])
@login_required
async def upload_video():
form = UploadVideoForm()
if form.validate_on_submit():
video_file = form.video_file.data
video_thumbnail = form.thumbnail.data
video_name = form.name.data
tags = form.tags.data
description = form.description.data
allowed_video_mime_types = {'video/mp4', 'video/x-msvideo', 'video/quicktime'}
allowed_image_mime_types = {'image/png', 'image/jpeg', 'image/gif', 'image/webp'}
if video_file and video_thumbnail:
if not (allowed_file(video_file.filename, current_app.config['ALLOWED_VIDEO_EXTENSIONS']) and
check_file_content(video_file, allowed_video_mime_types)):
return redirect(url_for('upload.upload_video'))
if not await check_file_size(video_file, current_app.config['MAX_VIDEO_SIZE']):
return redirect(url_for('upload.upload_video'))
if not (allowed_file(video_thumbnail.filename, current_app.config['ALLOWED_IMAGE_EXTENSIONS']) and
check_file_content(video_thumbnail, allowed_image_mime_types)):
return redirect(url_for('upload.upload_video'))
video_filename = await generate_unique_filename(current_app.config['UPLOAD_FOLDER']['videos'], 'mp4')
thumbnail_filename = f"{uuid.uuid4().hex}.webp"
video_path = os.path.join(current_app.config['UPLOAD_FOLDER']['videos'], video_filename)
thumbnail_path = os.path.join(current_app.config['UPLOAD_FOLDER']['thumbnails'], thumbnail_filename)
async with aiofiles.open(video_path, 'wb') as f:
await f.write(video_file.read())
webp_thumbnail = await convert_to_webp(video_thumbnail)
async with aiofiles.open(thumbnail_path, 'wb') as f:
await f.write(webp_thumbnail.read())
video = Video(
video_file=video_filename,
video_name=video_name,
video_thumbnail_file=thumbnail_filename,
username=current_user.username,
tags=tags,
description=description
)
db.session.add(video)
db.session.commit()
return redirect(url_for('videos'))
return render_template('upload_video.html', form=form)
@upload_bp.route('/comic_upload', methods=['GET', 'POST'])
@login_required
async def comic_upload():
if request.method == 'POST':
ct = request.files['thumbnail']
n = request.form['title']
tags = request.form.get('tags', '')
allowed_image_mime_types = {'image/png', 'image/jpeg', 'image/gif', 'image/webp'}
if db.session.execute(db.select(Comic).filter_by(name=n)).scalar():
return render_template('comic_upload.html')
if ct:
if not (allowed_file(ct.filename, current_app.config['ALLOWED_IMAGE_EXTENSIONS']) and
check_file_content(ct, allowed_image_mime_types)):
return redirect(url_for('upload.comic_upload'))
tf = f"{uuid.uuid4().hex}.webp"
tp = os.path.join(current_app.config['UPLOAD_FOLDER']['comicthumbs'], tf)
webp_thumbnail = await convert_to_webp(ct)
async with aiofiles.open(tp, 'wb') as f:
await f.write(webp_thumbnail.read())
cf = os.path.join(current_app.config['UPLOAD_FOLDER']['comics'], n)
os.makedirs(cf, exist_ok=True)
new_comic = Comic(
comic_folder=n,
comic_thumbnail_file=tf,
username=current_user.username,
name=n,
tags=tags
)
db.session.add(new_comic)
db.session.flush()
async def save_pages():
pages = request.files.getlist('pages[]')
for i, p in enumerate(sorted(pages, key=lambda x: x.filename), start=1):
if p:
if not (allowed_file(p.filename, current_app.config['ALLOWED_IMAGE_EXTENSIONS']) and
check_file_content(p, allowed_image_mime_types)):
return redirect(url_for('upload.comic_upload'))
filename = f"{uuid.uuid4().hex}.webp"
file_path = os.path.join(cf, filename)
webp_page = await convert_to_webp(p)
async with aiofiles.open(file_path, 'wb') as f:
await f.write(webp_page.read())
new_page = ComicPage(comic_id=new_comic.id, page_number=i, file_path=file_path)
db.session.add(new_page)
db.session.commit()
await save_pages()
return redirect(url_for('comics'))
return render_template('comic_upload.html')
@upload_bp.route('/upload_post', methods=['GET', 'POST'])
@login_required
async def upload_post():
if request.method == 'POST':
post_text = request.form.get('post_text')
post_media = request.files.get('post_media')
if post_text:
new_post = Post(
username=current_user.username,
text=post_text
)
db.session.add(new_post)
if post_media and allowed_file(post_media.filename, current_app.config['ALLOWED_IMAGE_EXTENSIONS']):
if await check_file_size(post_media, current_app.config['MAX_IMAGE_SIZE']):
unique_filename = f"{uuid.uuid4().hex}.webp"
media_path = os.path.join(current_app.config['UPLOAD_FOLDER']['posts'], unique_filename)
webp_image = await convert_to_webp(post_media)
async with aiofiles.open(media_path, 'wb') as f:
await f.write(webp_image.read())
new_post.media_file = unique_filename
else:
return redirect(url_for('upload.upload_post'))
db.session.commit()
return redirect(url_for('user_posts', username=current_user.username))
else:
return redirect(url_for('upload.upload_post'))
return render_template('upload_post.html')

134
utils.py Normal file
View File

@ -0,0 +1,134 @@
import os
import io
import uuid
import aiofiles
import asyncio
import magic
import re
from PIL import Image as PILImage
from werkzeug.utils import secure_filename
from sqlalchemy import func, or_
from flask import request
from models import db, Comments, Image, Video, Comic, Post, User
def allowed_file(filename, allowed_extensions):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in allowed_extensions
async def check_file_size(file, max_size):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _sync_check_file_size, file, max_size)
def _sync_check_file_size(file, max_size):
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0)
return file_size <= max_size
def check_file_content(file, allowed_mime_types):
mime = magic.Magic(mime=True)
file_mime_type = mime.from_buffer(file.read(1024))
file.seek(0)
return file_mime_type in allowed_mime_types
async def convert_to_webp(image_file):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _sync_convert_to_webp, image_file)
def _sync_convert_to_webp(image_file):
with PILImage.open(image_file) as img:
output = io.BytesIO()
img.convert("RGB").save(output, format="WEBP", quality=90, optimize=True)
output.seek(0)
return output
async def generate_unique_filename(upload_folder, extension):
while True:
unique_filename = f"{uuid.uuid4().hex}.{extension}"
file_path = os.path.join(upload_folder, unique_filename)
if not await aiofiles.os.path.exists(file_path):
return unique_filename
def update_related_tables(old_username, new_username):
models_to_update = [Comments, Image, Video, Comic, Post]
for model in models_to_update:
for record in model.query.filter_by(username=old_username).all():
record.username = new_username
db.session.commit()
def get_content_query(model, subscriptions, search_query):
query = model.query
if search_query:
tags = [tag.strip().lower() for tag in search_query.replace(',', ' ').split()]
filter_condition = [
model.tags.like(f'%{tag}%') for tag in tags
]
query = query.filter(
or_(*filter_condition)
)
if subscriptions:
query = query.filter(or_(
model.username.in_(subscriptions),
model.username.notin_(subscriptions)
))
query = query.order_by(
func.coalesce(model.cookie_votes, 0).desc(),
model.publication_date.desc()
)
return query
def _sync_check_file_size(file, max_size):
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0)
return file_size <= max_size
async def generate_unique_filename(filename, upload_folder):
base, ext = os.path.splitext(secure_filename(filename))
while True:
unique_filename = f"{base}_{uuid.uuid4().hex}{ext}"
file_path = os.path.join(upload_folder, unique_filename)
if not await aiofiles.os.path.exists(file_path):
return unique_filename
def get_client_ip():
if 'X-Forwarded-For' in request.headers:
forwarded_for = request.headers['X-Forwarded-For']
ip_address = forwarded_for.split(',')[0]
else:
ip_address = request.remote_addr
return ip_address
def validate_username(self, username):
username.data = username.data.lower()
user = User.query.filter_by(username=username.data).first()
if user:
return
if not re.match(r'^[a-z0-9]+$', username.data):
return
def validate_ip(self):
ip_address = get_client_ip()
user_with_ip = User.query.filter_by(ip_address=ip_address).first()
if user_with_ip:
return
def get_autocomplete_suggestions(query):
last_tag = query.split(',')[-1].strip()
all_tags = Image.query.with_entities(Image.tags).all()
unique_tags = set(tag.strip() for tags in all_tags if tags.tags for tag in tags.tags.split(','))
filtered_tags = [tag for tag in unique_tags if last_tag.lower() in tag.lower()]
return filtered_tags[:5]