from flask import Flask, render_template, redirect, url_for, request, abort from flask_wtf.csrf import CSRFProtect from flask_sqlalchemy import SQLAlchemy from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user from werkzeug.security import generate_password_hash, check_password_hash import os app = Flask(__name__) csrf = CSRFProtect(app) app.config["SECRET_KEY"] = os.environ["FLASK_SECRET_KEY"] from urllib.parse import quote_plus # Get database credentials from environment db_password = os.environ['DB_PASSWORD'] # URL-encode all special characters including @ and $ encoded_password = quote_plus(db_password, safe='') app.logger.info(f"Using database password: {'*' * len(db_password)}") app.config["SQLALCHEMY_DATABASE_URI"] = f"postgresql://flaskuser:{db_password}@db:5432/flaskdb" app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False db = SQLAlchemy(app) # Initialize database CLI command @app.cli.command("init-db") def init_db(): import os with app.app_context(): db.create_all() print("Database initialized") login_manager = LoginManager(app) login_manager.login_view = "login" class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) email = db.Column(db.String(100), unique=True) password = db.Column(db.String(200)) # Increased for password hash admin = db.Column(db.Boolean, default=False) @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) @app.route("/") def home(): return redirect(url_for("login")) @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": email = request.form.get("email") password = request.form.get("password") user = User.query.filter_by(email=email).first() if user and check_password_hash(user.password, password): login_user(user) return redirect(url_for("dashboard")) return "Invalid credentials" return render_template("login.html") @app.route("/register", methods=["GET", "POST"]) def register(): if request.method == "POST": email = request.form.get("email") password = request.form.get("password") confirm_password = request.form.get("confirm_password") if password != confirm_password: return "Passwords do not match" if User.query.filter_by(email=email).first(): return "Email already registered" password = generate_password_hash(password) new_user = User(email=email, password=password) # Automatically make first user admin if User.query.count() == 0: new_user.admin = True db.session.add(new_user) db.session.commit() return redirect(url_for("login")) return render_template("register.html") @app.route("/dashboard") @login_required def dashboard(): return render_template("dashboard.html") @app.route("/logout") @login_required def logout(): logout_user() return redirect(url_for("login")) @app.route("/admin", methods=["GET", "POST"]) @login_required def admin(): if not current_user.admin: abort(403) if request.method == "POST": action = request.form.get("action") user_id = request.form.get("user_id") user = User.query.get(user_id) if user and user != current_user: # Prevent self-modification if action in ['demote', 'delete'] and user.admin: admin_count = User.query.filter_by(admin=True).count() if admin_count == 1: abort(400, description="Cannot remove last admin") if action == "promote": user.admin = True elif action == "demote": user.admin = False elif action == "delete": db.session.delete(user) db.session.commit() users = User.query.all() return render_template("admin.html", users=users)