139 lines
4.0 KiB
Python
139 lines
4.0 KiB
Python
from flask import Flask, render_template, redirect, url_for, request, abort
|
|
from flask_wtf.csrf import CSRFProtect
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
import os
|
|
|
|
app = Flask(__name__)
|
|
csrf = CSRFProtect(app)
|
|
|
|
app.config["SECRET_KEY"] = os.environ["FLASK_SECRET_KEY"]
|
|
from urllib.parse import quote_plus
|
|
|
|
# Get database credentials from environment
|
|
db_password = os.environ['DB_PASSWORD']
|
|
# URL-encode all special characters including @ and $
|
|
encoded_password = quote_plus(db_password, safe='')
|
|
app.logger.info(f"Using database password: {'*' * len(db_password)}")
|
|
app.config["SQLALCHEMY_DATABASE_URI"] = f"postgresql://flaskuser:{db_password}@db:5432/flaskdb"
|
|
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
|
|
|
|
db = SQLAlchemy(app)
|
|
|
|
|
|
# Initialize database CLI command
|
|
@app.cli.command("init-db")
|
|
def init_db():
|
|
import os
|
|
|
|
with app.app_context():
|
|
db.create_all()
|
|
print("Database initialized")
|
|
|
|
|
|
login_manager = LoginManager(app)
|
|
login_manager.login_view = "login"
|
|
|
|
|
|
class User(UserMixin, db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
email = db.Column(db.String(100), unique=True)
|
|
password = db.Column(db.String(200)) # Increased for password hash
|
|
admin = db.Column(db.Boolean, default=False)
|
|
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
return User.query.get(int(user_id))
|
|
|
|
|
|
@app.route("/")
|
|
def home():
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
if request.method == "POST":
|
|
email = request.form.get("email")
|
|
password = request.form.get("password")
|
|
user = User.query.filter_by(email=email).first()
|
|
|
|
if user and check_password_hash(user.password, password):
|
|
login_user(user)
|
|
return redirect(url_for("dashboard"))
|
|
|
|
return "Invalid credentials"
|
|
return render_template("login.html")
|
|
|
|
|
|
@app.route("/register", methods=["GET", "POST"])
|
|
def register():
|
|
if request.method == "POST":
|
|
email = request.form.get("email")
|
|
password = request.form.get("password")
|
|
confirm_password = request.form.get("confirm_password")
|
|
|
|
if password != confirm_password:
|
|
return "Passwords do not match"
|
|
|
|
if User.query.filter_by(email=email).first():
|
|
return "Email already registered"
|
|
|
|
password = generate_password_hash(password)
|
|
|
|
new_user = User(email=email, password=password)
|
|
|
|
# Automatically make first user admin
|
|
if User.query.count() == 0:
|
|
new_user.admin = True
|
|
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
|
|
return redirect(url_for("login"))
|
|
return render_template("register.html")
|
|
|
|
|
|
@app.route("/dashboard")
|
|
@login_required
|
|
def dashboard():
|
|
return render_template("dashboard.html")
|
|
|
|
|
|
@app.route("/logout")
|
|
@login_required
|
|
def logout():
|
|
logout_user()
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/admin", methods=["GET", "POST"])
|
|
@login_required
|
|
def admin():
|
|
if not current_user.admin:
|
|
abort(403)
|
|
|
|
if request.method == "POST":
|
|
action = request.form.get("action")
|
|
user_id = request.form.get("user_id")
|
|
user = User.query.get(user_id)
|
|
|
|
if user and user != current_user: # Prevent self-modification
|
|
if action in ['demote', 'delete'] and user.admin:
|
|
admin_count = User.query.filter_by(admin=True).count()
|
|
if admin_count == 1:
|
|
abort(400, description="Cannot remove last admin")
|
|
|
|
if action == "promote":
|
|
user.admin = True
|
|
elif action == "demote":
|
|
user.admin = False
|
|
elif action == "delete":
|
|
db.session.delete(user)
|
|
|
|
db.session.commit()
|
|
|
|
users = User.query.all()
|
|
return render_template("admin.html", users=users)
|