1625 lines
59 KiB
Python
1625 lines
59 KiB
Python
import os
|
|
import io
|
|
import re
|
|
import uuid
|
|
import shutil
|
|
import random
|
|
import asyncio
|
|
from datetime import datetime
|
|
import requests
|
|
from PIL import Image as PILImage
|
|
from sqlalchemy.exc import IntegrityError
|
|
from werkzeug.exceptions import BadRequest
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from werkzeug.utils import secure_filename
|
|
from flask import Flask, abort, render_template, redirect, url_for, request, flash, session, jsonify
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_bcrypt import Bcrypt
|
|
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
|
|
from flask_wtf import FlaskForm, RecaptchaField, CSRFProtect
|
|
from flask_wtf.file import FileAllowed
|
|
from flask_wtf.csrf import validate_csrf
|
|
from wtforms import StringField, PasswordField, SubmitField, FileField,BooleanField, RadioField, SelectField, TextAreaField
|
|
from wtforms.validators import DataRequired, Length, EqualTo, ValidationError, Regexp
|
|
from wtforms import StringField, PasswordField, SubmitField
|
|
from wtforms.validators import DataRequired, EqualTo, Regexp
|
|
from flask_wtf import FlaskForm, RecaptchaField
|
|
from dotenv import load_dotenv, find_dotenv
|
|
import aiofiles.os
|
|
from sqlalchemy import func, or_
|
|
|
|
dotenv_path = find_dotenv()
|
|
load_dotenv(dotenv_path, override=True)
|
|
|
|
app = Flask(__name__)
|
|
csrf = CSRFProtect(app)
|
|
|
|
app.config.update(
|
|
SECRET_KEY=os.getenv('SECRET_KEY'),
|
|
WTF_CSRF_ENABLED=True,
|
|
RECAPTCHA_PUBLIC_KEY=os.getenv('RECAPTCHA_PUBLIC_KEY'),
|
|
RECAPTCHA_PRIVATE_KEY=os.getenv('RECAPTCHA_PRIVATE_KEY'),
|
|
SQLALCHEMY_DATABASE_URI=os.getenv('SQLALCHEMY_DATABASE_URI'),
|
|
UPLOAD_FOLDER={
|
|
'images': 'static/arts/',
|
|
'arts': 'static/arts/',
|
|
'videos': 'static/videos/',
|
|
'thumbnails': 'static/thumbnails/',
|
|
'avatars': 'static/avatars/',
|
|
'banners': 'static/banners/',
|
|
'comics': 'static/comics',
|
|
'comicthumbs': 'static/comicthumbs/',
|
|
'posts': 'static/posts/'
|
|
},
|
|
ALLOWED_IMAGE_EXTENSIONS={'png', 'jpg', 'jpeg', 'gif', 'webp'},
|
|
ALLOWED_VIDEO_EXTENSIONS={'mp4', 'avi', 'mov'},
|
|
MAX_IMAGE_SIZE=15 * 1024 * 1024,
|
|
MAX_VIDEO_SIZE=10 * 1024 * 1024 * 1024
|
|
)
|
|
|
|
def allowed_file(filename, allowed_extensions):
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in allowed_extensions
|
|
|
|
def check_file_size(file, max_size):
|
|
file.seek(0, os.SEEK_END)
|
|
file_size = file.tell()
|
|
file.seek(0)
|
|
return file_size <= max_size
|
|
|
|
db = SQLAlchemy(app)
|
|
bcrypt = Bcrypt(app)
|
|
login_manager = LoginManager(app)
|
|
login_manager.login_view = 'login'
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
return User.query.get(int(user_id))
|
|
|
|
class Comments(db.Model):
|
|
__tablename__ = 'comments'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String, nullable=False)
|
|
image_id = db.Column(db.Integer, db.ForeignKey('image.id'), nullable=True)
|
|
video_id = db.Column(db.Integer, db.ForeignKey('video.id'), nullable=True)
|
|
comic_id = db.Column(db.Integer, db.ForeignKey('comics.id'), nullable=True)
|
|
post_id = db.Column(db.Integer, db.ForeignKey('post.id'), nullable=True)
|
|
comment_text = db.Column(db.Text, nullable=False)
|
|
comment_date = db.Column(db.DateTime, default=datetime.utcnow)
|
|
|
|
image = db.relationship('Image', back_populates='comments')
|
|
video = db.relationship('Video', back_populates='comments')
|
|
comic = db.relationship('Comic', back_populates='comments', overlaps="comic_link")
|
|
post = db.relationship('Post', backref='comments')
|
|
|
|
class User(db.Model, UserMixin):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(20), unique=True, nullable=False)
|
|
encrypted_password = db.Column(db.String(60), nullable=False)
|
|
ip_address = db.Column(db.String(15), nullable=True)
|
|
avatar_file = db.Column(db.String(50), nullable=True)
|
|
banner_file = db.Column(db.String(50), nullable=True)
|
|
bio = db.Column(db.Text, nullable=True)
|
|
current_item = db.Column(db.String(30), nullable=True)
|
|
|
|
def __repr__(self):
|
|
return f'<User {self.username}>'
|
|
|
|
def check_password(self, password):
|
|
return bcrypt.check_password_hash(self.encrypted_password, password)
|
|
|
|
|
|
class Image(db.Model):
|
|
__tablename__ = 'image'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
image_file = db.Column(db.String(40), nullable=False)
|
|
username = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
|
|
publication_date = db.Column(db.DateTime, default=datetime.utcnow)
|
|
tags = db.Column(db.String(100), nullable=True)
|
|
cookie_votes = db.Column(db.Integer, default=0)
|
|
|
|
comments = db.relationship('Comments', back_populates='image', cascade='all, delete-orphan')
|
|
|
|
class Votes(db.Model):
|
|
__tablename__ = 'votes'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(100), nullable=False)
|
|
image_id = db.Column(db.Integer, db.ForeignKey('image.id'), nullable=False)
|
|
|
|
image = db.relationship('Image', backref=db.backref('votes', lazy=True))
|
|
|
|
def __repr__(self):
|
|
return f'<Vote {self.username} for image {self.image_id}>'
|
|
|
|
class VideoVotes(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(120), nullable=False)
|
|
video_id = db.Column(db.Integer, db.ForeignKey('video.id'), nullable=False)
|
|
|
|
class Video(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
video_file = db.Column(db.String(100), nullable=False)
|
|
video_name = db.Column(db.String(100), nullable=False)
|
|
video_thumbnail_file = db.Column(db.String(100), nullable=False)
|
|
username = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
|
|
publication_date = db.Column(db.DateTime, default=datetime.utcnow)
|
|
tags = db.Column(db.String(100), nullable=True)
|
|
description = db.Column(db.Text, nullable=True)
|
|
cookie_votes = db.Column(db.Integer, default=0)
|
|
|
|
comments = db.relationship('Comments', back_populates='video')
|
|
|
|
class Comic(db.Model):
|
|
__tablename__ = 'comics'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
comic_folder = db.Column(db.String(100), nullable=False)
|
|
comic_thumbnail_file = db.Column(db.String(100), nullable=False)
|
|
username = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
|
|
name = db.Column(db.String(100), nullable=False, unique=True)
|
|
publication_date = db.Column(db.DateTime, default=datetime.utcnow)
|
|
tags = db.Column(db.String(100), nullable=True)
|
|
cookie_votes = db.Column(db.Integer, default=0)
|
|
|
|
comments = db.relationship('Comments', back_populates='comic', overlaps="comic_link")
|
|
pages = db.relationship('ComicPage', back_populates='comic', cascade="all, delete-orphan")
|
|
|
|
|
|
class ComicPage(db.Model):
|
|
__tablename__ = 'comic_pages'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
comic_id = db.Column(db.Integer, db.ForeignKey('comics.id', ondelete='CASCADE'), nullable=False)
|
|
page_number = db.Column(db.Integer, nullable=False)
|
|
file_path = db.Column(db.String(200), nullable=False)
|
|
|
|
comic = db.relationship('Comic', back_populates='pages')
|
|
|
|
class ComicVotes(db.Model):
|
|
__tablename__ = 'comic_votes'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(100), nullable=False)
|
|
comic_id = db.Column(db.Integer, db.ForeignKey('comics.id'), nullable=False)
|
|
vote = db.Column(db.Integer)
|
|
|
|
comic = db.relationship('Comic', backref='votes', lazy=True)
|
|
|
|
class Cookies(db.Model):
|
|
username = db.Column(db.String(20), primary_key=True, nullable=False)
|
|
cookies = db.Column(db.Integer, default=0)
|
|
|
|
class Views(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
|
|
image_id = db.Column(db.Integer, db.ForeignKey('image.id'), nullable=True)
|
|
video_id = db.Column(db.Integer, db.ForeignKey('video.id'), nullable=True)
|
|
username = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
|
|
view_date = db.Column(db.DateTime, default=datetime.utcnow)
|
|
|
|
__table_args__ = (
|
|
db.UniqueConstraint('image_id', 'username', name='unique_image_view'),
|
|
db.UniqueConstraint('video_id', 'username', name='unique_video_view')
|
|
)
|
|
|
|
class Item(db.Model):
|
|
__tablename__ = 'items'
|
|
|
|
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
|
|
item_path = db.Column(db.String, nullable=False)
|
|
price = db.Column(db.Integer, nullable=False, default=0)
|
|
visible = db.Column(db.Boolean, default=True)
|
|
|
|
def __repr__(self):
|
|
return f'<Item {self.id}: {self.item_path}, Price: {self.price}, Visible: {self.visible}>'
|
|
|
|
class UserItem(db.Model):
|
|
__tablename__ = 'user_items'
|
|
|
|
username = db.Column(db.String, db.ForeignKey('user.username'), primary_key=True)
|
|
item_id = db.Column(db.Integer, db.ForeignKey('items.id'), primary_key=True)
|
|
|
|
item = db.relationship('Item', backref=db.backref('user_items', lazy=True))
|
|
|
|
def __repr__(self):
|
|
return f'<UserItem {self.username}, Item {self.item_id}>'
|
|
|
|
class Post(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(20), db.ForeignKey('user.username'), nullable=False)
|
|
post_date = db.Column(db.DateTime, default=datetime.utcnow)
|
|
text = db.Column(db.Text, nullable=False)
|
|
media_file = db.Column(db.String(100), nullable=True)
|
|
|
|
user = db.relationship('User', backref=db.backref('posts', lazy=True))
|
|
|
|
class Subscription(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
|
author_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
|
|
|
user = db.relationship('User', foreign_keys=[user_id], backref=db.backref('subscriptions', lazy=True))
|
|
author = db.relationship('User', foreign_keys=[author_id], backref=db.backref('followers', lazy=True))
|
|
|
|
def __repr__(self):
|
|
return f'<Subscription user_id={self.user_id} author_id={self.author_id}>'
|
|
|
|
def get_content_query(model, subscriptions, search_query):
|
|
query = model.query
|
|
|
|
if search_query:
|
|
|
|
tags = [tag.strip().lower() for tag in search_query.replace(',', ' ').split()]
|
|
|
|
filter_condition = [
|
|
model.tags.like(f'%{tag}%') for tag in tags
|
|
]
|
|
|
|
query = query.filter(
|
|
or_(*filter_condition)
|
|
)
|
|
|
|
if subscriptions:
|
|
query = query.filter(or_(
|
|
model.username.in_(subscriptions),
|
|
model.username.notin_(subscriptions)
|
|
))
|
|
|
|
query = query.order_by(
|
|
func.coalesce(model.cookie_votes, 0).desc(),
|
|
model.publication_date.desc()
|
|
)
|
|
|
|
return query
|
|
|
|
@app.route('/')
|
|
def index():
|
|
page = request.args.get('page', 1, type=int)
|
|
search_query = request.args.get('search')
|
|
|
|
subscriptions = []
|
|
if current_user.is_authenticated:
|
|
|
|
subscriptions = [
|
|
sub.author.username
|
|
for sub in Subscription.query.filter_by(user_id=current_user.id).all()
|
|
]
|
|
|
|
query = get_content_query(Image, subscriptions, search_query)
|
|
pagination = query.paginate(page=page, per_page=25, error_out=False)
|
|
|
|
user_cookies = 0
|
|
if current_user.is_authenticated:
|
|
user_cookies_record = Cookies.query.filter_by(username=current_user.username).first()
|
|
user_cookies = user_cookies_record.cookies if user_cookies_record else 0
|
|
|
|
return render_template(
|
|
'index.html',
|
|
images=pagination.items,
|
|
pagination=pagination,
|
|
user_cookies=user_cookies,
|
|
search_query=search_query,
|
|
content_type='art'
|
|
)
|
|
|
|
@app.route('/vote_art/<int:image_id>', methods=['POST'])
|
|
@login_required
|
|
def vote_art(image_id):
|
|
image = Image.query.get_or_404(image_id)
|
|
user_cookies = Cookies.query.filter_by(username=current_user.username).first()
|
|
|
|
if image.username == current_user.username:
|
|
return redirect(url_for('view', content_type='art', id=image_id))
|
|
|
|
if user_cookies and user_cookies.cookies > 0:
|
|
existing_vote = Votes.query.filter_by(username=current_user.username, image_id=image_id).first()
|
|
if not existing_vote:
|
|
user_cookies.cookies -= 1
|
|
image.cookie_votes += 1
|
|
|
|
new_vote = Votes(username=current_user.username, image_id=image.id)
|
|
db.session.add(new_vote)
|
|
|
|
db.session.commit()
|
|
|
|
return redirect(url_for('view', content_type='art', id=image_id))
|
|
|
|
@app.route('/view/<content_type>/<int:id>', methods=['GET', 'POST'])
|
|
|
|
@app.route('/view/<content_type>/<int:id>', methods=['GET', 'POST'])
|
|
def view(content_type, id):
|
|
comments = []
|
|
avatars = {user.username: user.avatar_file for user in User.query.all()}
|
|
cu = current_user.username if current_user.is_authenticated else None
|
|
|
|
user_cookies = 0
|
|
if current_user.is_authenticated:
|
|
user_cookies_record = Cookies.query.filter_by(username=current_user.username).first()
|
|
user_cookies = user_cookies_record.cookies if user_cookies_record else 0
|
|
|
|
if content_type == 'art':
|
|
content = Image.query.get_or_404(id)
|
|
comments = Comments.query.filter_by(image_id=id).order_by(Comments.comment_date.desc()).all()
|
|
|
|
search_query = request.args.get('search')
|
|
page = request.args.get('page', 1, type=int)
|
|
subscriptions = []
|
|
|
|
if current_user.is_authenticated:
|
|
subscriptions = [
|
|
sub.author.username
|
|
for sub in Subscription.query.filter_by(user_id=current_user.id).all()
|
|
]
|
|
|
|
query = get_content_query(Image, subscriptions, search_query)
|
|
all_images = query.all()
|
|
image_ids = [image.id for image in all_images]
|
|
|
|
current_index = image_ids.index(id)
|
|
prev_index = (current_index - 1) % len(image_ids)
|
|
next_index = (current_index + 1) % len(image_ids)
|
|
|
|
prev_content = all_images[prev_index]
|
|
next_content = all_images[next_index]
|
|
random_content = random.choice(all_images)
|
|
|
|
elif content_type == 'video':
|
|
content = Video.query.get_or_404(id)
|
|
comments = Comments.query.filter_by(video_id=id).order_by(Comments.comment_date.desc()).all()
|
|
|
|
all_videos = Video.query.order_by(Video.id).all()
|
|
video_ids = [video.id for video in all_videos]
|
|
current_index = video_ids.index(id)
|
|
prev_index = (current_index - 1) % len(all_videos)
|
|
next_index = (current_index + 1) % len(all_videos)
|
|
|
|
prev_content = all_videos[prev_index]
|
|
next_content = all_videos[next_index]
|
|
|
|
random_content = random.choice(all_videos)
|
|
|
|
elif content_type == 'comic':
|
|
content = Comic.query.get_or_404(id)
|
|
comments = Comments.query.filter_by(comic_id=id).order_by(Comments.comment_date.desc()).all()
|
|
|
|
comic_pages_dir = os.path.join(app.config['UPLOAD_FOLDER']['comics'], content.comic_folder)
|
|
|
|
if not os.path.exists(comic_pages_dir):
|
|
return render_template('error.html', message="Comic pages not found")
|
|
|
|
comic_pages = os.listdir(comic_pages_dir)
|
|
comic_pages = sorted(comic_pages)
|
|
|
|
all_comics = Comic.query.order_by(Comic.id).all()
|
|
comic_ids = [comic.id for comic in all_comics]
|
|
current_index = comic_ids.index(id)
|
|
prev_index = (current_index - 1) % len(all_comics)
|
|
next_index = (current_index + 1) % len(all_comics)
|
|
|
|
prev_content = all_comics[prev_index]
|
|
next_content = all_comics[next_index]
|
|
|
|
random_content = random.choice(all_comics)
|
|
|
|
else:
|
|
abort(404)
|
|
|
|
if request.method == 'POST' and current_user.is_authenticated:
|
|
comment_text = request.form.get('comment')
|
|
max_comment_length = 44
|
|
if comment_text and len(comment_text) > max_comment_length:
|
|
return redirect(url_for('view', content_type=content_type, id=id))
|
|
|
|
if comment_text:
|
|
new_comment = None
|
|
if content_type == 'art':
|
|
new_comment = Comments(username=cu, image_id=id, comment_text=comment_text)
|
|
elif content_type == 'video':
|
|
new_comment = Comments(username=cu, video_id=id, comment_text=comment_text)
|
|
elif content_type == 'comic':
|
|
new_comment = Comments(username=cu, comic_id=id, comment_text=comment_text)
|
|
|
|
if new_comment:
|
|
db.session.add(new_comment)
|
|
db.session.commit()
|
|
return redirect(url_for('view', content_type=content_type, id=id))
|
|
|
|
return render_template(
|
|
'view.html',
|
|
content=content,
|
|
content_type=content_type,
|
|
comments=comments,
|
|
avatars=avatars,
|
|
prev_content=prev_content,
|
|
next_content=next_content,
|
|
random_content=random_content,
|
|
user_cookies=user_cookies,
|
|
comic_pages=comic_pages if content_type == 'comic' else None
|
|
)
|
|
|
|
class EditTagsForm(FlaskForm):
|
|
tags = StringField('Tags', validators=[DataRequired(), Length(max=100)], render_kw={"placeholder": "Enter tags"})
|
|
submit = SubmitField('Save')
|
|
|
|
@app.route('/image_edit/<int:id>', methods=['GET', 'POST'])
|
|
@login_required
|
|
def image_edit(id):
|
|
|
|
image = Image.query.get_or_404(id)
|
|
|
|
if image.username != current_user.username:
|
|
return redirect(url_for('index'))
|
|
|
|
form = EditTagsForm()
|
|
|
|
if form.validate_on_submit():
|
|
|
|
image.tags = form.tags.data
|
|
db.session.commit()
|
|
return redirect(url_for('view', content_type='art', id=id))
|
|
|
|
if request.method == 'GET':
|
|
form.tags.data = image.tags
|
|
|
|
image_preview_url = url_for('static', filename=f'arts/{image.image_file}')
|
|
|
|
return render_template(
|
|
'image_edit.html',
|
|
form=form,
|
|
image=image,
|
|
image_preview_url=image_preview_url
|
|
)
|
|
|
|
class EditVideoForm(FlaskForm):
|
|
video_name = StringField('Title', validators=[DataRequired(), Length(max=100)], render_kw={"placeholder": "Enter video title"})
|
|
video_thumbnail = FileField('Thumbnail', validators=[FileAllowed(['jpg', 'jpeg', 'png', 'gif'], 'Only images!')])
|
|
description = TextAreaField('Description', validators=[DataRequired(), Length(max=500)], render_kw={"placeholder": "Enter video description"})
|
|
tags = StringField('Tags', validators=[DataRequired(), Length(max=100)], render_kw={"placeholder": "Tags"})
|
|
submit = SubmitField('Save')
|
|
|
|
@app.route('/video_edit/<int:id>', methods=['GET', 'POST'])
|
|
@login_required
|
|
def video_edit(id):
|
|
|
|
video = Video.query.get_or_404(id)
|
|
|
|
if video.username != current_user.username:
|
|
return redirect(url_for('index'))
|
|
|
|
form = EditVideoForm()
|
|
|
|
if request.method == 'GET':
|
|
form.video_name.data = video.video_name
|
|
form.description.data = video.description
|
|
form.tags.data = video.tags
|
|
|
|
if form.validate_on_submit():
|
|
|
|
video.video_name = form.video_name.data
|
|
video.description = form.description.data
|
|
video.tags = form.tags.data
|
|
|
|
if form.video_thumbnail.data:
|
|
thumbnail_file = form.video_thumbnail.data
|
|
if allowed_file(thumbnail_file.filename, app.config['ALLOWED_IMAGE_EXTENSIONS']):
|
|
|
|
thumbnail_filename = generate_unique_filename(thumbnail_file.filename, app.config['UPLOAD_FOLDER']['thumbnails'])
|
|
thumbnail_path = os.path.join(app.config['UPLOAD_FOLDER']['thumbnails'], thumbnail_filename)
|
|
|
|
thumbnail_file.save(thumbnail_path)
|
|
video.video_thumbnail_file = thumbnail_filename
|
|
|
|
db.session.commit()
|
|
return redirect(url_for('view', content_type='video', id=id))
|
|
|
|
video_preview_url = url_for('static', filename=f'videos/{video.video_thumbnail_file}')
|
|
|
|
return render_template(
|
|
'video_edit.html',
|
|
form=form,
|
|
video=video,
|
|
video_preview_url=video_preview_url
|
|
)
|
|
|
|
@app.route('/navbar')
|
|
def navbar():
|
|
return render_template(
|
|
'navbar.html'
|
|
)
|
|
|
|
@app.route('/card')
|
|
def card():
|
|
return render_template(
|
|
'card.html'
|
|
)
|
|
|
|
@app.route('/autocomplete')
|
|
def autocomplete():
|
|
search_query = request.args.get('search', '', type=str)
|
|
|
|
if search_query:
|
|
|
|
suggestions = get_autocomplete_suggestions(search_query)
|
|
else:
|
|
suggestions = []
|
|
|
|
return jsonify(suggestions)
|
|
|
|
def get_autocomplete_suggestions(query):
|
|
|
|
last_tag = query.split(',')[-1].strip()
|
|
|
|
all_tags = Image.query.with_entities(Image.tags).all()
|
|
|
|
unique_tags = set(tag.strip() for tags in all_tags if tags.tags for tag in tags.tags.split(','))
|
|
|
|
filtered_tags = [tag for tag in unique_tags if last_tag.lower() in tag.lower()]
|
|
|
|
return filtered_tags[:5]
|
|
|
|
class UploadForm(FlaskForm):
|
|
image_file = FileField('Choose File', validators=[DataRequired()])
|
|
tags = StringField('Tags (comma-separated)', validators=[DataRequired()])
|
|
recaptcha = RecaptchaField()
|
|
agree_with_rules = BooleanField('I agree with the publication rules',
|
|
validators=[DataRequired(message="You must agree with the publication rules.")])
|
|
submit = SubmitField('Upload')
|
|
|
|
async def convert_to_webp(image_file):
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(None, _sync_convert_to_webp, image_file)
|
|
|
|
def _sync_convert_to_webp(image_file):
|
|
with PILImage.open(image_file) as img:
|
|
output = io.BytesIO()
|
|
img.convert("RGB").save(output, format="WEBP", quality=90, optimize=True)
|
|
output.seek(0)
|
|
return output
|
|
|
|
@app.route('/upload', methods=['GET', 'POST'])
|
|
@login_required
|
|
async def upload():
|
|
form = UploadForm()
|
|
if form.validate_on_submit():
|
|
image_file = form.image_file.data
|
|
tags = form.tags.data
|
|
if not (allowed_file(image_file.filename, app.config['ALLOWED_IMAGE_EXTENSIONS']) and
|
|
await check_file_size(image_file, app.config['MAX_IMAGE_SIZE'])):
|
|
return redirect(url_for('upload'))
|
|
unique_filename = f"{uuid.uuid4().hex}.webp"
|
|
filepath = os.path.join(app.config['UPLOAD_FOLDER']['images'], unique_filename)
|
|
if os.path.exists(filepath):
|
|
return redirect(url_for('upload'))
|
|
webp_image = await convert_to_webp(image_file)
|
|
async with aiofiles.open(filepath, 'wb') as f:
|
|
await f.write(webp_image.read())
|
|
img = Image(image_file=unique_filename, username=current_user.username, tags=tags, cookie_votes=0)
|
|
db.session.add(img)
|
|
user_cookie = Cookies.query.filter_by(username=current_user.username).first()
|
|
if user_cookie:
|
|
user_cookie.cookies += 1
|
|
else:
|
|
user_cookie = Cookies(username=current_user.username, cookies=1)
|
|
db.session.add(user_cookie)
|
|
db.session.commit()
|
|
return redirect(url_for('index'))
|
|
return render_template('upload.html', form=form)
|
|
|
|
def allowed_file(filename, allowed_extensions):
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in allowed_extensions
|
|
|
|
async def check_file_size(file, max_size):
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(None, _sync_check_file_size, file, max_size)
|
|
|
|
def _sync_check_file_size(file, max_size):
|
|
file.seek(0, os.SEEK_END)
|
|
file_size = file.tell()
|
|
file.seek(0)
|
|
return file_size <= max_size
|
|
|
|
async def generate_unique_filename(filename, upload_folder):
|
|
base, ext = os.path.splitext(secure_filename(filename))
|
|
while True:
|
|
unique_filename = f"{base}_{uuid.uuid4().hex}{ext}"
|
|
file_path = os.path.join(upload_folder, unique_filename)
|
|
if not await aiofiles.os.path.exists(file_path):
|
|
return unique_filename
|
|
|
|
class UploadVideoForm(FlaskForm):
|
|
video_file = FileField('Video File', validators=[DataRequired()])
|
|
thumbnail = FileField('Thumbnail', validators=[DataRequired(), FileAllowed(['jpg', 'png', 'jpeg'])])
|
|
name = StringField('Video Name', validators=[DataRequired()])
|
|
tags = StringField('Tags', validators=[DataRequired()])
|
|
description = StringField('Description', validators=[DataRequired()])
|
|
recaptcha = RecaptchaField()
|
|
submit = SubmitField('Upload')
|
|
agree_with_rules = BooleanField('I agree with the publication rules', validators=[DataRequired()])
|
|
|
|
@app.route('/upload_video', methods=['GET', 'POST'])
|
|
@login_required
|
|
async def upload_video():
|
|
form = UploadVideoForm()
|
|
if form.validate_on_submit():
|
|
video_file = form.video_file.data
|
|
video_thumbnail = form.thumbnail.data
|
|
video_name = form.name.data
|
|
tags = form.tags.data
|
|
description = form.description.data
|
|
|
|
if video_file and video_thumbnail:
|
|
if not allowed_file(video_file.filename, app.config['ALLOWED_VIDEO_EXTENSIONS']):
|
|
return redirect(url_for('upload_video'))
|
|
if not await check_file_size(video_file, app.config['MAX_VIDEO_SIZE']):
|
|
return redirect(url_for('upload_video'))
|
|
if not allowed_file(video_thumbnail.filename, app.config['ALLOWED_IMAGE_EXTENSIONS']):
|
|
return redirect(url_for('upload_video'))
|
|
|
|
video_filename = await generate_unique_filename(app.config['UPLOAD_FOLDER']['videos'], 'mp4')
|
|
thumbnail_filename = f"{uuid.uuid4().hex}.webp"
|
|
|
|
video_path = os.path.join(app.config['UPLOAD_FOLDER']['videos'], video_filename)
|
|
thumbnail_path = os.path.join(app.config['UPLOAD_FOLDER']['thumbnails'], thumbnail_filename)
|
|
|
|
async with aiofiles.open(video_path, 'wb') as f:
|
|
await f.write(video_file.read())
|
|
|
|
webp_thumbnail = await convert_to_webp(video_thumbnail)
|
|
async with aiofiles.open(thumbnail_path, 'wb') as f:
|
|
await f.write(webp_thumbnail.read())
|
|
|
|
video = Video(
|
|
video_file=video_filename,
|
|
video_name=video_name,
|
|
video_thumbnail_file=thumbnail_filename,
|
|
username=current_user.username,
|
|
tags=tags,
|
|
description=description
|
|
)
|
|
db.session.add(video)
|
|
db.session.commit()
|
|
|
|
return redirect(url_for('videos'))
|
|
|
|
return render_template('upload_video.html', form=form)
|
|
|
|
async def generate_unique_filename(upload_folder, extension):
|
|
while True:
|
|
unique_filename = f"{uuid.uuid4().hex}.{extension}"
|
|
file_path = os.path.join(upload_folder, unique_filename)
|
|
if not await aiofiles.os.path.exists(file_path):
|
|
return unique_filename
|
|
|
|
@app.route('/comic_upload', methods=['GET', 'POST'])
|
|
@login_required
|
|
async def comic_upload():
|
|
if request.method == 'POST':
|
|
ct = request.files['thumbnail']
|
|
n = request.form['title']
|
|
tags = request.form.get('tags', '')
|
|
|
|
if db.session.execute(db.select(Comic).filter_by(name=n)).scalar():
|
|
return render_template('comic_upload.html')
|
|
|
|
if ct:
|
|
tf = f"{uuid.uuid4().hex}.webp"
|
|
tp = os.path.join(app.config['UPLOAD_FOLDER']['comicthumbs'], tf)
|
|
webp_thumbnail = await convert_to_webp(ct)
|
|
async with aiofiles.open(tp, 'wb') as f:
|
|
await f.write(webp_thumbnail.read())
|
|
|
|
cf = os.path.join(app.config['UPLOAD_FOLDER']['comics'], n)
|
|
os.makedirs(cf, exist_ok=True)
|
|
|
|
new_comic = Comic(
|
|
comic_folder=n,
|
|
comic_thumbnail_file=tf,
|
|
username=current_user.username,
|
|
name=n,
|
|
tags=tags
|
|
)
|
|
db.session.add(new_comic)
|
|
db.session.flush()
|
|
|
|
async def save_pages():
|
|
pages = request.files.getlist('pages[]')
|
|
for i, p in enumerate(sorted(pages, key=lambda x: x.filename), start=1):
|
|
if p:
|
|
filename = f"{uuid.uuid4().hex}.webp"
|
|
file_path = os.path.join(cf, filename)
|
|
webp_page = await convert_to_webp(p)
|
|
async with aiofiles.open(file_path, 'wb') as f:
|
|
await f.write(webp_page.read())
|
|
|
|
new_page = ComicPage(comic_id=new_comic.id, page_number=i, file_path=file_path)
|
|
db.session.add(new_page)
|
|
db.session.commit()
|
|
|
|
await save_pages()
|
|
return redirect(url_for('comics'))
|
|
|
|
return render_template('comic_upload.html')
|
|
|
|
@app.route('/user_pubs/<pub_type>/<username>')
|
|
def user_pubs(pub_type, username):
|
|
p = request.args.get('page', 1, type=int)
|
|
search_query = request.args.get('search', '')
|
|
|
|
if pub_type == 'arts':
|
|
query = Image.query.filter_by(username=username)
|
|
if search_query:
|
|
|
|
query = query.filter(or_(
|
|
Image.image_file.ilike(f'%{search_query}%'),
|
|
Image.tags.ilike(f'%{search_query}%')
|
|
))
|
|
items = query.order_by(Image.publication_date.desc()).paginate(page=p, per_page=25, error_out=False)
|
|
|
|
elif pub_type == 'videos':
|
|
query = Video.query.filter_by(username=username)
|
|
if search_query:
|
|
|
|
query = query.filter(Video.video_name.ilike(f'%{search_query}%'))
|
|
items = query.order_by(Video.publication_date.desc()).paginate(page=p, per_page=25, error_out=False)
|
|
|
|
elif pub_type == 'comics':
|
|
query = Comic.query.filter_by(username=username)
|
|
if search_query:
|
|
|
|
query = query.filter(Comic.name.ilike(f'%{search_query}%'))
|
|
items = query.order_by(Comic.publication_date.desc()).paginate(page=p, per_page=25, error_out=False)
|
|
|
|
else:
|
|
abort(404)
|
|
|
|
user_cookies = Cookies.query.filter_by(username=username).first()
|
|
cookies_count = user_cookies.cookies if user_cookies else 0
|
|
|
|
return render_template(
|
|
'user_pubs.html',
|
|
items=items.items,
|
|
pagination=items,
|
|
pub_type=pub_type,
|
|
username=username,
|
|
cookies_count=cookies_count,
|
|
search_query=search_query
|
|
)
|
|
|
|
class LoginForm(FlaskForm):
|
|
username = StringField('Username', validators=[DataRequired()])
|
|
password = PasswordField('Password', validators=[DataRequired()])
|
|
recaptcha = RecaptchaField()
|
|
submit = SubmitField('Login')
|
|
|
|
def get_client_ip():
|
|
if 'X-Forwarded-For' in request.headers:
|
|
forwarded_for = request.headers['X-Forwarded-For']
|
|
ip_address = forwarded_for.split(',')[0]
|
|
else:
|
|
ip_address = request.remote_addr
|
|
|
|
return ip_address
|
|
|
|
class RegistrationForm(FlaskForm):
|
|
username = StringField(
|
|
'Username',
|
|
validators=[
|
|
DataRequired(),
|
|
Length(min=3, max=20),
|
|
Regexp(r'^[a-zA-Z0-9_]+$', message="Username can contain only letters, numbers, and underscores.")
|
|
]
|
|
)
|
|
|
|
password = PasswordField('Password', validators=[DataRequired(), Length(min=6)])
|
|
confirm_password = PasswordField('Confirm Password', validators=[DataRequired(), EqualTo('password')])
|
|
recaptcha = RecaptchaField()
|
|
submit = SubmitField('Register')
|
|
|
|
def validate_username(self, username):
|
|
username.data = username.data.lower()
|
|
user = User.query.filter_by(username=username.data).first()
|
|
if user:
|
|
return
|
|
|
|
if not re.match(r'^[a-z0-9]+$', username.data):
|
|
return
|
|
|
|
def validate_ip(self):
|
|
ip_address = get_client_ip()
|
|
user_with_ip = User.query.filter_by(ip_address=ip_address).first()
|
|
if user_with_ip:
|
|
return
|
|
|
|
@app.route('/register', methods=['GET', 'POST'])
|
|
def register():
|
|
form = RegistrationForm()
|
|
|
|
if form.validate_on_submit():
|
|
hashed_password = bcrypt.generate_password_hash(form.password.data).decode('utf-8')
|
|
|
|
ip_address = get_client_ip()
|
|
|
|
existing_user = User.query.filter_by(ip_address=ip_address).first()
|
|
if existing_user:
|
|
return render_template('register.html', form=form)
|
|
|
|
username = form.username.data.lower()
|
|
|
|
user = User(username=username, encrypted_password=hashed_password, ip_address=ip_address)
|
|
|
|
try:
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
return redirect(url_for('login'))
|
|
except IntegrityError:
|
|
db.session.rollback()
|
|
|
|
return render_template('register.html', form=form)
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
form = LoginForm()
|
|
|
|
if form.validate_on_submit():
|
|
user = User.query.filter_by(username=form.username.data).first()
|
|
|
|
if user and user.check_password(form.password.data):
|
|
login_user(user)
|
|
|
|
if user.ip_address is None:
|
|
ip_address = get_client_ip()
|
|
user.ip_address = ip_address
|
|
db.session.commit()
|
|
|
|
return redirect(url_for('profile', username=user.username))
|
|
|
|
return render_template('login.html', form=form)
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
logout_user()
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
@app.route('/delete/<content_type>/<int:content_id>', methods=['POST'])
|
|
@login_required
|
|
def delete(content_type, content_id):
|
|
if content_type == 'art':
|
|
content = Image.query.get_or_404(content_id)
|
|
file_path = os.path.join(app.config['UPLOAD_FOLDER']['arts'], content.image_file)
|
|
elif content_type == 'video':
|
|
content = Video.query.get_or_404(content_id)
|
|
file_path = os.path.join(app.config['UPLOAD_FOLDER']['videos'], content.video_file)
|
|
elif content_type == 'comic':
|
|
content = Comic.query.get_or_404(content_id)
|
|
file_path = os.path.join(app.config['UPLOAD_FOLDER']['comics'], content.comic_folder)
|
|
else:
|
|
abort(404, "Invalid content type")
|
|
|
|
if content.username != current_user.username:
|
|
abort(403)
|
|
|
|
if os.path.exists(file_path):
|
|
if os.path.isdir(file_path):
|
|
shutil.rmtree(file_path)
|
|
else:
|
|
os.remove(file_path)
|
|
|
|
db.session.delete(content)
|
|
db.session.commit()
|
|
return redirect(url_for('profile', username=current_user.username))
|
|
|
|
|
|
@app.route('/delete_comment/<int:comment_id>', methods=['POST'])
|
|
@login_required
|
|
def delete_comment(comment_id):
|
|
comment = db.session.get(Comments, comment_id) or abort(404)
|
|
|
|
if comment.image_id:
|
|
content_type = 'art'
|
|
content_id = comment.image_id
|
|
elif comment.video_id:
|
|
content_type = 'video'
|
|
content_id = comment.video_id
|
|
else:
|
|
content_type = 'comic'
|
|
content_id = comment.comic_id
|
|
|
|
if comment.username == current_user.username:
|
|
try:
|
|
if hasattr(comment, 'post_id') and comment.post_id:
|
|
post_id = comment.post_id
|
|
db.session.delete(comment)
|
|
db.session.commit()
|
|
return redirect(url_for('user_posts', username=current_user.username))
|
|
|
|
db.session.delete(comment)
|
|
db.session.commit()
|
|
return redirect(url_for('view', content_type=content_type, id=content_id))
|
|
except IntegrityError:
|
|
db.session.rollback()
|
|
return redirect(url_for('view', content_type=content_type, id=content_id))
|
|
|
|
return redirect(request.referrer or url_for('index'))
|
|
|
|
@app.route('/videos')
|
|
def videos():
|
|
page = request.args.get('page', 1, type=int)
|
|
search_query = request.args.get('search')
|
|
|
|
subscriptions = []
|
|
if current_user.is_authenticated:
|
|
subscriptions = [sub.author_id for sub in Subscription.query.filter_by(user_id=current_user.id).all()]
|
|
|
|
query = get_content_query(Video, subscriptions, search_query)
|
|
pagination = query.paginate(page=page, per_page=10, error_out=False)
|
|
|
|
user_cookies = 0
|
|
if current_user.is_authenticated:
|
|
user_cookies_record = Cookies.query.filter_by(username=current_user.username).first()
|
|
user_cookies = user_cookies_record.cookies if user_cookies_record else 0
|
|
|
|
return render_template(
|
|
'videos.html',
|
|
videos=pagination.items,
|
|
pagination=pagination,
|
|
user_cookies=user_cookies,
|
|
search_query=search_query,
|
|
content_type='video'
|
|
)
|
|
|
|
@app.route('/vote_video/<int:video_id>', methods=['POST'])
|
|
@login_required
|
|
def vote_video(video_id):
|
|
video = Video.query.get_or_404(video_id)
|
|
user_cookies = Cookies.query.filter_by(username=current_user.username).first()
|
|
|
|
if video.username == current_user.username:
|
|
return redirect(url_for('view', content_type='video', id=video_id))
|
|
|
|
if user_cookies and user_cookies.cookies > 0:
|
|
existing_vote = VideoVotes.query.filter_by(username=current_user.username, video_id=video_id).first()
|
|
if not existing_vote:
|
|
user_cookies.cookies -= 1
|
|
video.cookie_votes += 1
|
|
new_vote = VideoVotes(username=current_user.username, video_id=video.id)
|
|
db.session.add(new_vote)
|
|
db.session.commit()
|
|
|
|
return redirect(url_for('view', content_type='video', id=video_id))
|
|
|
|
@app.route('/comics')
|
|
def comics():
|
|
page = request.args.get('page', 1, type=int)
|
|
search_query = request.args.get('search')
|
|
|
|
subscriptions = []
|
|
if current_user.is_authenticated:
|
|
subscriptions = [sub.author_id for sub in Subscription.query.filter_by(user_id=current_user.id).all()]
|
|
|
|
query = get_content_query(Comic, subscriptions, search_query)
|
|
pagination = query.paginate(page=page, per_page=10, error_out=False)
|
|
|
|
user_cookies = 0
|
|
if current_user.is_authenticated:
|
|
user_cookies_record = Cookies.query.filter_by(username=current_user.username).first()
|
|
user_cookies = user_cookies_record.cookies if user_cookies_record else 0
|
|
|
|
return render_template(
|
|
'comics.html',
|
|
comics=pagination.items,
|
|
pagination=pagination,
|
|
user_cookies=user_cookies,
|
|
search_query=search_query,
|
|
content_type='comic'
|
|
)
|
|
|
|
@app.route('/vote_comic/<int:comic_id>', methods=['POST'])
|
|
@login_required
|
|
def vote_comic(comic_id):
|
|
comic = Comic.query.get_or_404(comic_id)
|
|
user_cookies = Cookies.query.filter_by(username=current_user.username).first()
|
|
|
|
if comic.username == current_user.username:
|
|
return redirect(url_for('view', content_type='comic', id=comic_id))
|
|
|
|
if user_cookies and user_cookies.cookies > 0:
|
|
existing_vote = ComicVotes.query.filter_by(username=current_user.username, comic_id=comic_id).first()
|
|
if not existing_vote:
|
|
user_cookies.cookies -= 1
|
|
comic.cookie_votes += 1
|
|
new_vote = ComicVotes(username=current_user.username, comic_id=comic.id)
|
|
db.session.add(new_vote)
|
|
db.session.commit()
|
|
|
|
return redirect(url_for('view', content_type='comic', id=comic_id))
|
|
|
|
class EmptyForm(FlaskForm):
|
|
pass
|
|
|
|
@app.route('/comic_edit/<int:comic_id>', methods=['GET', 'POST'])
|
|
@login_required
|
|
def comic_edit(comic_id):
|
|
comic = Comic.query.get_or_404(comic_id)
|
|
|
|
if comic.username != current_user.username:
|
|
return redirect(url_for('index'))
|
|
|
|
cfp = os.path.join(app.config['UPLOAD_FOLDER']['comics'], comic.name)
|
|
if not os.path.exists(cfp):
|
|
os.makedirs(cfp)
|
|
|
|
form = EmptyForm()
|
|
|
|
if request.method == 'POST' and form.validate_on_submit():
|
|
action = request.form.get('action')
|
|
|
|
if action == 'delete' and (page_id := request.form.get('page')):
|
|
page = ComicPage.query.get(page_id)
|
|
if page:
|
|
os.remove(page.file_path)
|
|
db.session.delete(page)
|
|
db.session.commit()
|
|
|
|
elif action == 'update' and (page_id := request.form.get('page')) and 'new_page' in request.files:
|
|
new_page = request.files['new_page']
|
|
filename = secure_filename(new_page.filename)
|
|
file_path = os.path.join(cfp, filename)
|
|
new_page.save(file_path)
|
|
|
|
page = ComicPage.query.get(page_id)
|
|
if page:
|
|
os.remove(page.file_path)
|
|
page.file_path = file_path
|
|
db.session.commit()
|
|
|
|
elif action == 'add' and 'new_page' in request.files:
|
|
new_page = request.files['new_page']
|
|
filename = secure_filename(new_page.filename)
|
|
file_path = os.path.join(cfp, filename)
|
|
new_page.save(file_path)
|
|
|
|
page_number = (db.session.query(db.func.max(ComicPage.page_number)).filter_by(comic_id=comic.id).scalar() or 0) + 1
|
|
db.session.add(ComicPage(comic_id=comic.id, page_number=page_number, file_path=file_path))
|
|
db.session.commit()
|
|
|
|
return redirect(url_for('comic_edit', comic_id=comic.id))
|
|
|
|
return render_template('comic_edit.html', comic=comic, comic_pages=comic.pages, form=form)
|
|
|
|
def update_related_tables(old_username, new_username):
|
|
|
|
Comments.query.filter_by(username=old_username).update({"username": new_username})
|
|
|
|
Image.query.filter_by(username=old_username).update({"username": new_username})
|
|
|
|
Video.query.filter_by(username=old_username).update({"username": new_username})
|
|
|
|
Comic.query.filter_by(username=old_username).update({"username": new_username})
|
|
|
|
Cookies.query.filter_by(username=old_username).update({"username": new_username})
|
|
|
|
Post.query.filter_by(username=old_username).update({"username": new_username})
|
|
|
|
Subscription.query.filter_by(user_id=old_username).update({"user_id": new_username})
|
|
Subscription.query.filter_by(author_id=old_username).update({"author_id": new_username})
|
|
|
|
UserItem.query.filter_by(username=old_username).update({"username": new_username})
|
|
|
|
db.session.commit()
|
|
|
|
@app.route('/upload_post', methods=['GET', 'POST'])
|
|
@login_required
|
|
async def upload_post():
|
|
if request.method == 'POST':
|
|
post_text = request.form.get('post_text')
|
|
post_media = request.files.get('post_media')
|
|
|
|
if post_text:
|
|
new_post = Post(
|
|
username=current_user.username,
|
|
text=post_text
|
|
)
|
|
db.session.add(new_post)
|
|
|
|
if post_media and allowed_file(post_media.filename, app.config['ALLOWED_IMAGE_EXTENSIONS']):
|
|
if await check_file_size(post_media, app.config['MAX_IMAGE_SIZE']):
|
|
unique_filename = f"{uuid.uuid4().hex}.webp"
|
|
media_path = os.path.join(app.config['UPLOAD_FOLDER']['posts'], unique_filename)
|
|
webp_image = await convert_to_webp(post_media)
|
|
async with aiofiles.open(media_path, 'wb') as f:
|
|
await f.write(webp_image.read())
|
|
new_post.media_file = unique_filename
|
|
else:
|
|
return redirect(url_for('upload_post'))
|
|
|
|
db.session.commit()
|
|
return redirect(url_for('user_posts', username=current_user.username))
|
|
else:
|
|
return redirect(url_for('upload_post'))
|
|
|
|
return render_template('upload_post.html')
|
|
|
|
@app.route('/profile_edit', methods=['GET', 'POST'])
|
|
@login_required
|
|
def profile_edit():
|
|
if request.method == 'POST':
|
|
section = request.form.get('section')
|
|
|
|
if section == 'avatar_banner':
|
|
|
|
avatar = request.files.get('avatar')
|
|
banner = request.files.get('banner')
|
|
|
|
if avatar and allowed_file(avatar.filename, app.config['ALLOWED_IMAGE_EXTENSIONS']):
|
|
if check_file_size(avatar, app.config['MAX_IMAGE_SIZE']):
|
|
avatar_filename = secure_filename(avatar.filename)
|
|
avatar_path = os.path.join(app.config['UPLOAD_FOLDER']['avatars'], avatar_filename)
|
|
avatar.save(avatar_path)
|
|
current_user.avatar_file = avatar_filename
|
|
else:
|
|
return redirect(url_for('profile_edit'))
|
|
|
|
if banner and allowed_file(banner.filename, app.config['ALLOWED_IMAGE_EXTENSIONS']):
|
|
if check_file_size(banner, app.config['MAX_IMAGE_SIZE']):
|
|
banner_filename = secure_filename(banner.filename)
|
|
banner_path = os.path.join(app.config['UPLOAD_FOLDER']['banners'], banner_filename)
|
|
banner.save(banner_path)
|
|
current_user.banner_file = banner_filename
|
|
else:
|
|
return redirect(url_for('profile_edit'))
|
|
|
|
db.session.commit()
|
|
|
|
return redirect(url_for('profile', username=current_user.username))
|
|
|
|
elif section == 'username':
|
|
|
|
new_username = request.form.get('new_username')
|
|
if new_username and new_username != current_user.username:
|
|
if len(new_username) < 3 or len(new_username) > 20:
|
|
pass
|
|
elif User.query.filter_by(username=new_username).first():
|
|
pass
|
|
else:
|
|
old_username = current_user.username
|
|
current_user.username = new_username
|
|
update_related_tables(old_username, new_username)
|
|
return redirect(url_for('profile', username=new_username))
|
|
|
|
elif section == 'bio':
|
|
|
|
bio = request.form.get('bio')
|
|
current_user.bio = bio
|
|
db.session.commit()
|
|
|
|
elif section == 'password':
|
|
|
|
current_password = request.form.get('current_password')
|
|
new_password = request.form.get('new_password')
|
|
confirm_password = request.form.get('confirm_new_password')
|
|
|
|
if not current_user.check_password(current_password):
|
|
pass
|
|
elif new_password != confirm_password:
|
|
pass
|
|
elif len(new_password) < 6:
|
|
pass
|
|
else:
|
|
current_user.encrypted_password = bcrypt.generate_password_hash(new_password).decode('utf-8')
|
|
db.session.commit()
|
|
return redirect(url_for('profile_edit'))
|
|
|
|
elif section == 'create_post':
|
|
|
|
post_text = request.form.get('post_text')
|
|
post_media = request.files.get('post_media')
|
|
|
|
if post_text:
|
|
new_post = Post(
|
|
username=current_user.username,
|
|
text=post_text
|
|
)
|
|
db.session.add(new_post)
|
|
|
|
if post_media and allowed_file(post_media.filename, app.config['ALLOWED_IMAGE_EXTENSIONS']):
|
|
if check_file_size(post_media, app.config['MAX_IMAGE_SIZE']):
|
|
media_filename = secure_filename(post_media.filename)
|
|
media_path = os.path.join(app.config['UPLOAD_FOLDER']['posts'], media_filename)
|
|
post_media.save(media_path)
|
|
new_post.media_file = media_filename
|
|
db.session.commit()
|
|
pass
|
|
else:
|
|
return redirect(url_for('profile_edit'))
|
|
|
|
db.session.commit()
|
|
return redirect(url_for('user_posts', username=current_user.username))
|
|
|
|
elif section == 'decoration':
|
|
selected_item_id = request.form.get('selected_item')
|
|
if selected_item_id:
|
|
|
|
selected_item = UserItem.query.filter_by(username=current_user.username, item_id=selected_item_id).first()
|
|
|
|
if selected_item:
|
|
|
|
current_user.current_item = selected_item.item.id
|
|
db.session.commit()
|
|
pass
|
|
else:
|
|
pass
|
|
return redirect(url_for('profile', username=current_user.username))
|
|
|
|
db.session.commit()
|
|
return redirect(url_for('profile_edit'))
|
|
|
|
return render_template(
|
|
'profile_edit.html',
|
|
user=current_user,
|
|
user_items=UserItem.query.filter_by(username=current_user.username).all()
|
|
)
|
|
|
|
@app.route('/profile/<username>')
|
|
def profile(username):
|
|
user = User.query.filter_by(username=username).first_or_404()
|
|
|
|
if not user.avatar_file:
|
|
user.avatar_file = 'default-avatar.png'
|
|
|
|
if not user.banner_file:
|
|
user.banner_file = 'default-banner.png'
|
|
|
|
total_arts = Image.query.filter_by(username=username).count()
|
|
total_videos = Video.query.filter_by(username=username).count()
|
|
total_comics = Comic.query.filter_by(username=username).count()
|
|
|
|
arts = Image.query.filter_by(username=username).order_by(Image.cookie_votes.desc(), Image.publication_date.desc()).limit(3).all()
|
|
videos = Video.query.filter_by(username=username).order_by(Video.cookie_votes.desc(), Video.publication_date.desc()).limit(3).all()
|
|
comics = Comic.query.filter_by(username=username).order_by(Comic.cookie_votes.desc(), Comic.publication_date.desc()).limit(3).all()
|
|
|
|
subscribed = (
|
|
current_user.is_authenticated and
|
|
Subscription.query.filter_by(user_id=current_user.id, author_id=user.id).first() is not None
|
|
)
|
|
|
|
current_item = Item.query.get(user.current_item) if user.current_item else None
|
|
|
|
is_current_user = current_user.is_authenticated and current_user.username == user.username
|
|
|
|
subscriptions_count = Subscription.query.filter_by(author_id=user.id).count()
|
|
|
|
return render_template(
|
|
'profile.html',
|
|
user=user,
|
|
arts=arts,
|
|
videos=videos,
|
|
comics=comics,
|
|
total_arts=total_arts,
|
|
total_videos=total_videos,
|
|
total_comics=total_comics,
|
|
current_item=current_item,
|
|
is_current_user=is_current_user,
|
|
subscribed=subscribed,
|
|
subscriptions_count=subscriptions_count
|
|
)
|
|
|
|
@app.route('/posts/<username>')
|
|
def user_posts(username):
|
|
user = User.query.filter_by(username=username).first_or_404()
|
|
posts = Post.query.filter_by(username=username).order_by(Post.post_date.desc()).all()
|
|
avatars = {user.username: user.avatar_file for user in User.query.all()}
|
|
return render_template('post.html', username=username, posts=posts, avatars=avatars)
|
|
|
|
@app.route('/posts/<username>/add_comment/<int:post_id>', methods=['POST'])
|
|
@login_required
|
|
def add_comment(username, post_id):
|
|
comment_text = request.form.get('comment_text')
|
|
if not comment_text:
|
|
return redirect(url_for('user_posts', username=username))
|
|
|
|
comment = Comments(
|
|
username=current_user.username,
|
|
post_id=post_id,
|
|
comment_text=comment_text,
|
|
comment_date=datetime.utcnow()
|
|
)
|
|
db.session.add(comment)
|
|
db.session.commit()
|
|
return redirect(url_for('user_posts', username=username))
|
|
|
|
@app.route('/subscribe/<int:author_id>', methods=['POST'])
|
|
@login_required
|
|
def subscribe(author_id):
|
|
if author_id == current_user.id:
|
|
return redirect(url_for('profile', username=current_user.username))
|
|
|
|
existing_subscription = Subscription.query.filter_by(user_id=current_user.id, author_id=author_id).first()
|
|
if existing_subscription:
|
|
return redirect(url_for('profile', username=current_user.username))
|
|
|
|
new_subscription = Subscription(user_id=current_user.id, author_id=author_id)
|
|
db.session.add(new_subscription)
|
|
db.session.commit()
|
|
|
|
return redirect(url_for('profile', username=current_user.username))
|
|
|
|
@app.route('/unsubscribe/<int:author_id>', methods=['POST'])
|
|
@login_required
|
|
def unsubscribe(author_id):
|
|
|
|
subscription = Subscription.query.filter_by(
|
|
user_id=current_user.id, author_id=author_id).first()
|
|
if subscription:
|
|
db.session.delete(subscription)
|
|
db.session.commit()
|
|
return redirect(url_for('profile', username=User.query.get(author_id).username))
|
|
|
|
@app.route('/delete_post/<int:post_id>', methods=['POST'])
|
|
@login_required
|
|
def delete_post(post_id):
|
|
try:
|
|
validate_csrf(request.form.get('csrf_token'))
|
|
except BadRequest:
|
|
return redirect(url_for('user_posts', username=current_user.username))
|
|
|
|
post = Post.query.get_or_404(post_id)
|
|
if post.username != current_user.username:
|
|
abort(403)
|
|
|
|
if post.media_file:
|
|
media_path = os.path.join(app.config['UPLOAD_FOLDER']['posts'], post.media_file)
|
|
if os.path.exists(media_path):
|
|
os.remove(media_path)
|
|
db.session.delete(post)
|
|
db.session.commit()
|
|
return redirect(url_for('user_posts', username=current_user.username))
|
|
|
|
@app.route('/privacy_policy')
|
|
def privacy_policy():
|
|
return render_template('privacy_policy.html')
|
|
|
|
@app.route('/terms_of_use')
|
|
def terms_of_use():
|
|
return render_template('terms_of_use.html')
|
|
|
|
|
|
@app.route('/admin', methods=['GET', 'POST'])
|
|
@login_required
|
|
def admin():
|
|
if current_user.username != 'naturefie':
|
|
return redirect(url_for('index'))
|
|
|
|
form = UpdateCookiesForm()
|
|
|
|
user_cookies = {
|
|
user.id: Cookies.query.filter_by(username=user.username).first().cookies if Cookies.query.filter_by(username=user.username).first() else 0
|
|
for user in User.query.all()
|
|
}
|
|
|
|
comments = Comments.query.order_by(Comments.comment_date.desc()).all()
|
|
|
|
return render_template(
|
|
'panel.html',
|
|
arts=Image.query.all(),
|
|
comics=Comic.query.all(),
|
|
videos=Video.query.all(),
|
|
users=User.query.all(),
|
|
comments=comments,
|
|
form=form,
|
|
user_cookies=user_cookies
|
|
)
|
|
|
|
|
|
@app.route('/admin/delete/<content_type>/<int:content_id>', methods=['POST'])
|
|
@login_required
|
|
def admin_delete_content(content_type, content_id):
|
|
models = {
|
|
'art': (Image, 'arts', 'image_file', Votes, 'image_id'),
|
|
'video': (Video, 'videos', 'video_file', VideoVotes, 'video_id'),
|
|
'comic': (Comic, 'comics', 'comic_folder', ComicVotes, 'comic_id')
|
|
}
|
|
|
|
if content_type not in models:
|
|
abort(404)
|
|
|
|
model, folder, file_field, vote_model, foreign_key = models[content_type]
|
|
|
|
content = model.query.get_or_404(content_id)
|
|
|
|
vote_model.query.filter(getattr(vote_model, foreign_key) == content_id).delete()
|
|
|
|
Comments.query.filter(getattr(Comments, foreign_key) == content_id).delete()
|
|
|
|
file_path = os.path.join(app.config['UPLOAD_FOLDER'][folder], getattr(content, file_field))
|
|
if os.path.exists(file_path):
|
|
if os.path.isfile(file_path):
|
|
os.remove(file_path)
|
|
else:
|
|
shutil.rmtree(file_path)
|
|
|
|
db.session.delete(content)
|
|
db.session.commit()
|
|
return redirect(url_for('admin'))
|
|
|
|
@app.route('/admin/delete/user/<int:user_id>', methods=['POST'])
|
|
@login_required
|
|
def admin_delete_user(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
if current_user.username != 'naturefie':
|
|
return redirect(url_for('admin'))
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
return redirect(url_for('admin'))
|
|
|
|
class UpdateCookiesForm(FlaskForm):
|
|
cookies = StringField('Количество печенек', validators=[DataRequired()])
|
|
submit = SubmitField('Применить')
|
|
|
|
@app.route('/admin/update_comment/<int:comment_id>', methods=['POST'])
|
|
@login_required
|
|
def admin_update_comment(comment_id):
|
|
|
|
comment = Comments.query.get_or_404(comment_id)
|
|
if current_user.username != 'naturefie':
|
|
abort(403)
|
|
|
|
new_text = request.form.get('comment_text', '').strip()
|
|
if not new_text:
|
|
pass
|
|
return redirect(url_for('admin'))
|
|
|
|
comment.comment_text = new_text
|
|
try:
|
|
db.session.commit()
|
|
print(f"Updated comment ID {comment_id}: {comment.comment_text}")
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
print(f"Error updating comment: {e}")
|
|
|
|
return redirect(url_for('admin'))
|
|
|
|
@app.route('/admin/delete_comment/<int:comment_id>', methods=['POST'])
|
|
@login_required
|
|
def admin_delete_comment(comment_id):
|
|
comment = Comments.query.get_or_404(comment_id)
|
|
if current_user.username != 'naturefie':
|
|
abort(403)
|
|
|
|
db.session.delete(comment)
|
|
db.session.commit()
|
|
return redirect(url_for('admin'))
|
|
|
|
@app.route('/admin/update_cookies/<int:user_id>', methods=['POST'])
|
|
@login_required
|
|
def admin_update_cookies(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
if request.method == 'POST':
|
|
new_cookie_count = request.form.get('cookies', type=int)
|
|
if new_cookie_count is not None and new_cookie_count >= 0:
|
|
|
|
user_cookies = Cookies.query.filter_by(username=user.username).first()
|
|
if not user_cookies:
|
|
|
|
user_cookies = Cookies(username=user.username, cookies=new_cookie_count)
|
|
db.session.add(user_cookies)
|
|
else:
|
|
|
|
user_cookies.cookies = new_cookie_count
|
|
db.session.commit()
|
|
|
|
return redirect(url_for('admin'))
|
|
|
|
@app.route('/admin/update_video/<int:content_id>', methods=['POST'])
|
|
@login_required
|
|
def admin_update_video(content_id):
|
|
video = Video.query.get_or_404(content_id)
|
|
if current_user.username != 'naturefie':
|
|
return redirect(url_for('admin'))
|
|
|
|
new_video_name = request.form.get('video_name')
|
|
new_description = request.form.get('description')
|
|
new_tags = request.form.get('tags')
|
|
|
|
if new_video_name and new_video_name != video.video_name:
|
|
if len(new_video_name) < 3 or len(new_video_name) > 100:
|
|
return redirect(url_for('admin'))
|
|
|
|
video.video_name = new_video_name
|
|
|
|
if new_description:
|
|
video.description = new_description
|
|
|
|
if new_tags:
|
|
video.tags = new_tags
|
|
|
|
db.session.commit()
|
|
return redirect(url_for('admin'))
|
|
|
|
@app.route('/admin/update_user/<int:user_id>', methods=['POST'])
|
|
@login_required
|
|
def admin_update_user(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
if current_user.username != 'naturefie':
|
|
return redirect(url_for('admin'))
|
|
|
|
new_username = request.form.get('username')
|
|
new_password = request.form.get('password')
|
|
|
|
if new_username and new_username != user.username:
|
|
if len(new_username) < 3 or len(new_username) > 20:
|
|
return redirect(url_for('admin'))
|
|
if User.query.filter_by(username=new_username).first():
|
|
return redirect(url_for('admin'))
|
|
|
|
old_username = user.username
|
|
user.username = new_username
|
|
update_related_tables(old_username, new_username)
|
|
|
|
if new_password:
|
|
if len(new_password) < 6:
|
|
return redirect(url_for('admin'))
|
|
|
|
hashed_password = bcrypt.generate_password_hash(new_password).decode('utf-8')
|
|
user.encrypted_password = hashed_password
|
|
|
|
db.session.commit()
|
|
return redirect(url_for('admin'))
|
|
|
|
@app.route('/admin/update_tags/<content_type>/<int:content_id>', methods=['POST'])
|
|
@login_required
|
|
def admin_update_tags(content_type, content_id):
|
|
models = {
|
|
'art': Image,
|
|
'video': Video,
|
|
'comic': Comic
|
|
}
|
|
|
|
if content_type not in models:
|
|
abort(404)
|
|
|
|
model = models[content_type]
|
|
content = model.query.get_or_404(content_id)
|
|
|
|
new_tags = request.form.get('tags', '').strip()
|
|
|
|
content.tags = new_tags
|
|
db.session.commit()
|
|
|
|
return redirect(url_for('admin'))
|
|
|
|
@app.route('/publication_rules')
|
|
def publication_rules():
|
|
return render_template('publication_rules.html')
|
|
|
|
@app.route('/shop')
|
|
@login_required
|
|
def shop():
|
|
|
|
items = Item.query.filter_by(visible=True).all()
|
|
user_cookies = Cookies.query.filter_by(username=current_user.username).first().cookies if Cookies.query.filter_by(username=current_user.username).first() else 0
|
|
user_item_ids = {ui.item_id for ui in UserItem.query.filter_by(username=current_user.username).all()}
|
|
return render_template('shop.html', items=items, user=current_user, user_cookies=user_cookies, user_item_ids=user_item_ids)
|
|
|
|
@app.route('/buy_item/<int:item_id>', methods=['POST'])
|
|
@login_required
|
|
def buy_item(item_id):
|
|
username = current_user.username
|
|
user_cookies = Cookies.query.filter_by(username=username).first()
|
|
item = Item.query.get(item_id)
|
|
|
|
if not user_cookies or not item or not item.visible or user_cookies.cookies < item.price:
|
|
return redirect(url_for('shop'))
|
|
|
|
if UserItem.query.filter_by(username=username, item_id=item.id).first():
|
|
return redirect(url_for('shop'))
|
|
|
|
user_cookies.cookies -= item.price
|
|
db.session.add(UserItem(username=username, item_id=item.id))
|
|
db.session.commit()
|
|
return redirect(url_for('shop'))
|
|
|
|
if __name__ == '__main__':
|
|
with app.app_context():
|
|
db.create_all()
|
|
app.run(debug=False) |