Artberry-web/auth.py
2025-04-20 23:35:38 +03:00

141 lines
4.6 KiB
Python

from flask import Blueprint, render_template, redirect, url_for, request, session
from flask_login import login_user, logout_user, login_required, current_user
from sqlalchemy.exc import IntegrityError
from models import db, User
import uuid
from utils import get_client_ip
from models import RegistrationForm, LoginForm, PasswordField, RecaptchaField, SubmitField
from flask_bcrypt import Bcrypt
from wtforms.validators import DataRequired, Length, EqualTo
from config import Config
from authlib.integrations.flask_client import OAuth # type: ignore
auth_bp = Blueprint('auth', __name__)
bcrypt = Bcrypt()
google = None
def init_oauth(oauth):
global google
google = oauth.google
password = PasswordField('Password', validators=[DataRequired(), Length(min=6)])
confirm_password = PasswordField('Confirm Password', validators=[DataRequired(), EqualTo('password')])
recaptcha = RecaptchaField()
submit = SubmitField('Register')
@auth_bp.route('/register', methods=['GET', 'POST'])
def register():
form = RegistrationForm()
if form.validate_on_submit():
ip_address = get_client_ip()
existing_user = User.query.filter_by(ip_address=ip_address).first()
if existing_user:
return render_template('register.html', form=form, recaptcha_key=Config.RECAPTCHA_PUBLIC_KEY)
hashed_password = bcrypt.generate_password_hash(form.password.data).decode('utf-8')
username = form.username.data.lower()
user = User(username=username, encrypted_password=hashed_password, ip_address=ip_address)
try:
db.session.add(user)
db.session.commit()
return redirect(url_for('auth.login'))
except IntegrityError:
db.session.rollback()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return render_template('register-modal.html', form=form)
return render_template('register.html', form=form, recaptcha_key=Config.RECAPTCHA_PUBLIC_KEY)
@auth_bp.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user:
if user.google_id:
return redirect(url_for('auth.login_google'))
if user.check_password(form.password.data):
login_user(user)
if user.ip_address is None:
ip_address = get_client_ip()
user.ip_address = ip_address
db.session.commit()
return redirect(url_for('profile', username=user.username))
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return render_template('login-modal.html', form=form, recaptcha_key=Config.RECAPTCHA_PUBLIC_KEY)
return render_template('login.html', form=form, recaptcha_key=Config.RECAPTCHA_PUBLIC_KEY)
@auth_bp.route('/register-modal')
def register_modal():
form = RegistrationForm()
return render_template('register-modal.html', form=form)
@auth_bp.route('/login-modal')
def login_modal():
form = LoginForm()
return render_template('login-modal.html', form=form)
@auth_bp.route('/logout')
def logout():
logout_user()
return redirect(url_for('index'))
@auth_bp.route('/login/google')
def login_google():
base_domain = request.host_url.rstrip('/')
redirect_uri = f"{base_domain}/auth/google/callback"
state = str(uuid.uuid4())
nonce = str(uuid.uuid4())
session['oauth_state'] = state
session['oauth_nonce'] = nonce
return google.authorize_redirect(redirect_uri, state=state, nonce=nonce)
@auth_bp.route('/auth/google/callback')
def authorize_google():
state = session.pop('oauth_state', None)
nonce = session.pop('oauth_nonce', None)
if not state or state != request.args.get('state'):
return "Error: Invalid state parameter", 400
token = google.authorize_access_token()
try:
user_info = google.parse_id_token(token, nonce=nonce)
except Exception as e:
return "Error: Invalid ID token", 400
google_id = user_info['sub']
user = User.query.filter_by(google_id=google_id).first()
if user:
login_user(user)
return redirect(url_for('index'))
username = f"user_{uuid.uuid4().hex[:8]}"
user = User(
username=username,
google_id=google_id,
ip_address=get_client_ip()
)
db.session.add(user)
db.session.commit()
login_user(user)
return redirect(url_for('index'))