885 lines
39 KiB
Python
885 lines
39 KiB
Python
from flask import Flask, render_template, redirect, url_for, request, abort, flash, jsonify, send_from_directory
|
|
import os
|
|
from flask_wtf.csrf import CSRFProtect
|
|
from functools import wraps
|
|
from datetime import datetime, timedelta
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from sqlalchemy.sql.sqltypes import Text
|
|
from flask_login import (
|
|
LoginManager, UserMixin, login_user, login_required, logout_user, current_user,
|
|
)
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
import json
|
|
|
|
# Booking configuration constants
|
|
MAX_BOOKING_DAYS_AHEAD = 182 # Maximum booking date is 6 months (182 days) from now
|
|
|
|
app = Flask(__name__)
|
|
csrf = CSRFProtect(app)
|
|
|
|
@app.template_filter("datetimeformat")
|
|
def datetimeformat(value, format="%Y-%m-%d %H:%M"):
|
|
if value is None:
|
|
return ""
|
|
return value.strftime(format)
|
|
|
|
@app.template_filter("json_parse")
|
|
def json_parse(value):
|
|
"""Parse a JSON string"""
|
|
if not value or not isinstance(value, str):
|
|
return value
|
|
try:
|
|
return json.loads(value)
|
|
except (json.JSONDecodeError, TypeError):
|
|
return value
|
|
|
|
app.config["SECRET_KEY"] = os.environ["FLASK_SECRET_KEY"]
|
|
db_password = os.environ["DB_PASSWORD"]
|
|
app.logger.info(f"Using database password: {'*' * len(db_password)}")
|
|
app.config["SQLALCHEMY_DATABASE_URI"] = f"postgresql://flaskuser:{db_password}@db:5432/flaskdb"
|
|
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
|
|
MAX_BOOKING_DAYS_AHEAD = 182 # Maximum booking date is 6 months (182 days) from now
|
|
db = SQLAlchemy(app)
|
|
|
|
@app.cli.command("init-db")
|
|
def init_db():
|
|
with app.app_context():
|
|
db.create_all()
|
|
print("Database initialized")
|
|
|
|
login_manager = LoginManager(app)
|
|
login_manager.login_view = "login"
|
|
|
|
class Company(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(100), unique=True)
|
|
floor_map_filename = db.Column(db.String(255), nullable=True)
|
|
|
|
def __repr__(self):
|
|
return f"<Company {self.name}>"
|
|
|
|
class User(UserMixin, db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
email = db.Column(db.String(100), unique=True)
|
|
password = db.Column(db.String(200))
|
|
first_name = db.Column(db.String(50))
|
|
last_name = db.Column(db.String(50))
|
|
avatar_url = db.Column(db.String(255), nullable=True)
|
|
reset_token = db.Column(db.String(100), unique=True, nullable=True)
|
|
reset_token_expires = db.Column(db.DateTime, nullable=True)
|
|
is_system_admin = db.Column(db.Boolean, default=False)
|
|
is_company_admin = db.Column(db.Boolean, default=False)
|
|
company_id = db.Column(db.Integer, db.ForeignKey("company.id"))
|
|
company = db.relationship("Company", backref="users", lazy=True)
|
|
invited_by = db.Column(db.Integer, db.ForeignKey("user.id"))
|
|
inviter = db.relationship("User", remote_side=[id], backref="invited_users")
|
|
bookings = db.relationship("Booking", backref="user", lazy=True)
|
|
|
|
def __repr__(self):
|
|
return f"<User {self.email}>"
|
|
|
|
def has_profile(self):
|
|
"""Check if user has set up their profile"""
|
|
return self.first_name and self.last_name
|
|
|
|
def complete_profile(self):
|
|
"""Check if user profile is complete"""
|
|
return self.has_profile() and self.avatar_url
|
|
|
|
|
|
class Group(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(100), unique=True)
|
|
description = db.Column(Text, nullable=True)
|
|
company_id = db.Column(db.Integer, db.ForeignKey("company.id"), nullable=False)
|
|
user_ids = db.Column(db.Text, nullable=True)
|
|
|
|
def __repr__(self):
|
|
return f"<Group {self.name} (Company {self.company.name})>"
|
|
|
|
class ResourcePermission(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
resource_id = db.Column(db.Integer, db.ForeignKey("resource.id"), unique=True)
|
|
permission_type = db.Column(db.String(20))
|
|
group_id = db.Column(db.Integer, db.ForeignKey("group.id"), nullable=True)
|
|
user_ids = db.Column(db.Text, nullable=True)
|
|
|
|
def __repr__(self):
|
|
return f"<ResourcePermission resource_id={self.resource_id}, type={self.permission_type}>"
|
|
|
|
class Resource(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(100))
|
|
description = db.Column(Text)
|
|
image_url = db.Column(db.String(255), nullable=True)
|
|
company_id = db.Column(db.Integer, db.ForeignKey("company.id"))
|
|
position_x = db.Column(db.Integer, nullable=True)
|
|
position_y = db.Column(db.Integer, nullable=True)
|
|
resource_type = db.Column(db.String(50), default="workoffice") # e.g., "workoffice", "meeting_room"
|
|
permissions = db.relationship("ResourcePermission", backref="resource", lazy=True)
|
|
|
|
def __repr__(self):
|
|
return f"<Resource {self.name} (type: {self.resource_type})>"
|
|
|
|
class Booking(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
resource_id = db.Column(db.Integer, db.ForeignKey("resource.id"))
|
|
resource_type = db.Column(db.String(50))
|
|
user_id = db.Column(db.Integer, db.ForeignKey("user.id"))
|
|
start_time = db.Column(db.DateTime)
|
|
end_time = db.Column(db.DateTime)
|
|
purpose = db.Column(db.Text)
|
|
resource = db.relationship("Resource", backref="bookings", lazy=True)
|
|
|
|
def __repr__(self):
|
|
return f"<Booking {self.id}>"
|
|
|
|
class Invitation(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
email = db.Column(db.String(100))
|
|
token = db.Column(db.String(100), unique=True)
|
|
company_id = db.Column(db.Integer, db.ForeignKey("company.id"))
|
|
invited_by = db.Column(db.Integer, db.ForeignKey("user.id"))
|
|
expires_at = db.Column(db.DateTime)
|
|
|
|
def __repr__(self):
|
|
return f"<Invitation {self.email}>"
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
return User.query.get(int(user_id))
|
|
|
|
@app.route("/")
|
|
def home():
|
|
return render_template("index.html")
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
if request.method == "POST":
|
|
email = request.form.get("email")
|
|
password = request.form.get("password")
|
|
user = User.query.filter_by(email=email).first()
|
|
if user and check_password_hash(user.password, password):
|
|
login_user(user)
|
|
return redirect(url_for("dashboard"))
|
|
return "Invalid credentials"
|
|
return render_template("login.html")
|
|
|
|
@app.route("/register", methods=["GET", "POST"])
|
|
def register():
|
|
token = request.args.get("token")
|
|
if request.method == "POST":
|
|
email = request.form.get("email")
|
|
password = request.form.get("password")
|
|
confirm_password = request.form.get("confirm_password")
|
|
first_name = request.form.get("first_name", "")
|
|
last_name = request.form.get("last_name", "")
|
|
company_name = request.form.get("company_name") if not token else None
|
|
|
|
# Handle avatar upload
|
|
avatar_url = None
|
|
avatar_image = request.files.get("avatar")
|
|
if avatar_image and avatar_image.filename:
|
|
from werkzeug.utils import secure_filename
|
|
if secure_filename(avatar_image.filename).lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
|
|
import uuid
|
|
filename = secure_filename(f"user_avatar_{uuid.uuid4().hex}_{avatar_image.filename}")
|
|
upload_folder = os.path.join(app.root_path, "static", "user_avatars")
|
|
os.makedirs(upload_folder, exist_ok=True)
|
|
avatar_image.save(os.path.join(upload_folder, filename))
|
|
avatar_url = f"user_avatars/{filename}"
|
|
|
|
if password != confirm_password:
|
|
return "Passwords do not match"
|
|
if User.query.filter_by(email=email).first():
|
|
return "Email already registered"
|
|
if token:
|
|
invitation = Invitation.query.filter_by(token=token).first()
|
|
if not invitation or invitation.expires_at < datetime.now():
|
|
return "Invalid or expired invitation"
|
|
company = Company.query.get(invitation.company_id)
|
|
if not company:
|
|
return "Invalid company"
|
|
new_user = User(
|
|
email=email,
|
|
password=generate_password_hash(password),
|
|
company_id=company.id,
|
|
invited_by=invitation.invited_by,
|
|
first_name=first_name,
|
|
last_name=last_name,
|
|
avatar_url=avatar_url
|
|
)
|
|
db.session.add(new_user)
|
|
db.session.delete(invitation)
|
|
db.session.commit()
|
|
return redirect(url_for("login"))
|
|
company_name = request.form.get("company_name")
|
|
existing_company = Company.query.filter_by(name=company_name).first()
|
|
if existing_company:
|
|
return "Company name already exists"
|
|
new_company = Company(name=company_name)
|
|
db.session.add(new_company)
|
|
db.session.flush()
|
|
new_user = User(
|
|
email=email,
|
|
password=generate_password_hash(password),
|
|
company_id=new_company.id,
|
|
is_company_admin=True,
|
|
first_name=first_name or "User", # Default to "User" if not provided
|
|
last_name=last_name or "Account", # Default to "Account" if not provided
|
|
avatar_url=avatar_url
|
|
)
|
|
if User.query.count() == 0:
|
|
new_user.is_system_admin = True
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
return redirect(url_for("login"))
|
|
return render_template("register.html", token=token)
|
|
|
|
@app.route("/dashboard")
|
|
@login_required
|
|
def dashboard():
|
|
if current_user.company_id:
|
|
resources = Resource.query.filter_by(company_id=current_user.company_id).all()
|
|
return render_template("resources.html", resources=resources)
|
|
return redirect(url_for("login"))
|
|
|
|
def has_permission_check(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not current_user.is_system_admin and not current_user.company_id:
|
|
abort(403)
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def system_admin_required(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not current_user.is_system_admin:
|
|
abort(403)
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def company_admin_required(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not current_user.is_company_admin:
|
|
abort(403)
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
@app.route("/resource/add", methods=["GET", "POST"])
|
|
@login_required
|
|
@company_admin_required
|
|
def add_resource():
|
|
if request.method == "POST":
|
|
name = request.form.get("name")
|
|
description = request.form.get("description")
|
|
image = request.files.get("image")
|
|
resource_type = request.form.get("resource_type", "workoffice")
|
|
permission_type = request.form.get("permission_type", "everyone")
|
|
group_id = request.form.get("group_id")
|
|
user_ids_json = request.form.get("user_ids")
|
|
|
|
# Handle image upload
|
|
image_url = None
|
|
if image and image.filename:
|
|
from werkzeug.utils import secure_filename
|
|
if secure_filename(image.filename).lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
|
|
import uuid
|
|
filename = secure_filename(f"resource_{uuid.uuid4().hex}_{image.filename}")
|
|
upload_folder = os.path.join(app.root_path, "static", "resource_images")
|
|
os.makedirs(upload_folder, exist_ok=True)
|
|
image.save(os.path.join(upload_folder, filename))
|
|
image_url = f"resource_images/{filename}"
|
|
|
|
try:
|
|
new_resource = Resource(name=name, description=description, image_url=image_url, company_id=current_user.company_id, resource_type=resource_type)
|
|
db.session.add(new_resource)
|
|
db.session.flush()
|
|
if permission_type == "group" and group_id:
|
|
permission = ResourcePermission(resource_id=new_resource.id, permission_type="group", group_id=group_id)
|
|
elif permission_type == "users" and user_ids_json:
|
|
try:
|
|
user_ids = json.loads(user_ids_json)
|
|
permission = ResourcePermission(resource_id=new_resource.id, permission_type="users", user_ids=json.dumps(user_ids))
|
|
except json.JSONDecodeError:
|
|
flash("Invalid user IDs format", "danger")
|
|
return redirect(url_for("add_resource"))
|
|
else:
|
|
permission = ResourcePermission(resource_id=new_resource.id, permission_type="everyone")
|
|
db.session.add(permission)
|
|
db.session.commit()
|
|
flash("Resource created successfully", "success")
|
|
return redirect(url_for("company_admin"))
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
flash(f"Error creating resource: {str(e)}", "danger")
|
|
return redirect(url_for("add_resource"))
|
|
groups = Group.query.filter_by(company_id=current_user.company_id).all()
|
|
return render_template("edit_resource.html", groups=groups)
|
|
|
|
@app.route("/resource/edit/<int:resource_id>", methods=["GET", "POST"])
|
|
@login_required
|
|
@company_admin_required
|
|
def edit_resource(resource_id):
|
|
resource = Resource.query.get_or_404(resource_id)
|
|
if request.method == "POST":
|
|
resource.name = request.form.get("name")
|
|
resource.description = request.form.get("description")
|
|
resource_type = request.form.get("resource_type", resource.resource_type)
|
|
resource.resource_type = resource_type
|
|
position_x = request.form.get("position_x")
|
|
position_y = request.form.get("position_y")
|
|
if position_x:
|
|
resource.position_x = int(position_x) if position_x.strip() else None
|
|
if position_y:
|
|
resource.position_y = int(position_y) if position_y.strip() else None
|
|
|
|
# Handle image upload
|
|
image = request.files.get("image")
|
|
if image and image.filename:
|
|
from werkzeug.utils import secure_filename
|
|
if secure_filename(image.filename).lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
|
|
import uuid
|
|
# Delete old image if exists
|
|
if resource.image_url:
|
|
old_filename = resource.image_url.replace('resource_images/', '')
|
|
old_path = os.path.join(app.root_path, "static", old_filename)
|
|
if os.path.exists(old_path):
|
|
os.remove(old_path)
|
|
filename = secure_filename(f"resource_{uuid.uuid4().hex}_{image.filename}")
|
|
upload_folder = os.path.join(app.root_path, "static", "resource_images")
|
|
os.makedirs(upload_folder, exist_ok=True)
|
|
image.save(os.path.join(upload_folder, filename))
|
|
resource.image_url = f"resource_images/{filename}"
|
|
|
|
permission_type = request.form.get("permission_type", "everyone")
|
|
group_id = request.form.get("group_id")
|
|
user_ids_json = request.form.get("user_ids")
|
|
existing_permission = ResourcePermission.query.filter_by(resource_id=resource_id).first()
|
|
if existing_permission:
|
|
db.session.delete(existing_permission)
|
|
db.session.flush()
|
|
if permission_type == "group" and group_id:
|
|
permission = ResourcePermission(resource_id=resource_id, permission_type="group", group_id=group_id)
|
|
elif permission_type == "users" and user_ids_json:
|
|
try:
|
|
user_ids = json.loads(user_ids_json)
|
|
permission = ResourcePermission(resource_id=resource_id, permission_type="users", user_ids=json.dumps(user_ids))
|
|
except json.JSONDecodeError:
|
|
flash("Invalid user IDs format", "danger")
|
|
return redirect(url_for("edit_resource", resource_id=resource_id))
|
|
else:
|
|
permission = ResourcePermission(resource_id=resource_id, permission_type="everyone")
|
|
db.session.add(permission)
|
|
db.session.commit()
|
|
flash("Resource updated successfully", "success")
|
|
return redirect(url_for("company_admin"))
|
|
groups = Group.query.filter_by(company_id=current_user.company_id).all()
|
|
permission = ResourcePermission.query.filter_by(resource_id=resource_id).first()
|
|
return render_template("edit_resource.html", resource=resource, groups=groups, permission=permission, resource_type=resource.resource_type)
|
|
|
|
@app.route("/logout")
|
|
@login_required
|
|
def logout():
|
|
logout_user()
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/book/<int:resource_id>", methods=["GET", "POST"])
|
|
@login_required
|
|
def book_resource(resource_id):
|
|
resource = Resource.query.get_or_404(resource_id)
|
|
permission = ResourcePermission.query.filter_by(resource_id=resource_id).first()
|
|
can_book = check_permission(resource_id, resource.company_id, current_user.id)
|
|
if not can_book:
|
|
flash("You don't have permission to book this resource", "danger")
|
|
return redirect(url_for("dashboard"))
|
|
|
|
if request.method == "POST":
|
|
start_time = datetime.fromisoformat(request.form.get("start_time"))
|
|
end_time = datetime.fromisoformat(request.form.get("end_time"))
|
|
|
|
# Check if start time is beyond max advance booking period
|
|
now = datetime.now()
|
|
max_bookable_time = now + timedelta(days=MAX_BOOKING_DAYS_AHEAD)
|
|
if start_time > max_bookable_time:
|
|
flash(f"Bookings can only be made up to {MAX_BOOKING_DAYS_AHEAD} months in advance", "danger")
|
|
return redirect(url_for("book_resource", resource_id=resource_id))
|
|
|
|
# Check if user already has an overlapping booking on the SAME resource type (max 1 active booking per type at a time)
|
|
conflicting_booking = Booking.query.filter(
|
|
Booking.user_id == current_user.id,
|
|
Booking.resource_type == resource.resource_type, # Only check same resource type
|
|
Booking.start_time < end_time,
|
|
Booking.end_time > start_time
|
|
).first()
|
|
|
|
if conflicting_booking:
|
|
flash(f"You already have a {resource.resource_type.replace('_', ' ')} booking during this time slot", "danger")
|
|
return redirect(url_for("book_resource", resource_id=resource_id))
|
|
|
|
new_booking = Booking(resource_id=resource_id, resource_type=resource.resource_type, user_id=current_user.id, start_time=start_time, end_time=end_time, purpose=request.form.get("purpose"))
|
|
db.session.add(new_booking)
|
|
db.session.commit()
|
|
return redirect(url_for("dashboard"))
|
|
|
|
return render_template("book_resource.html", resource=resource)
|
|
|
|
@app.route("/my-bookings")
|
|
@login_required
|
|
def my_bookings():
|
|
bookings = Booking.query.filter_by(user_id=current_user.id).all()
|
|
return render_template("my_bookings.html", bookings=bookings)
|
|
|
|
@app.route("/api/my-bookings")
|
|
@login_required
|
|
def api_my_bookings():
|
|
bookings = Booking.query.filter_by(user_id=current_user.id).all()
|
|
events = [{'title': f"{booking.resource.name} - {booking.purpose}", 'start': booking.start_time.isoformat(), 'end': booking.end_time.isoformat()} for booking in bookings]
|
|
return jsonify(events)
|
|
|
|
@app.route("/api/resource-bookings/<int:resource_id>")
|
|
@login_required
|
|
def api_resource_bookings(resource_id):
|
|
resource = Resource.query.get_or_404(resource_id)
|
|
if resource.company_id != current_user.company_id:
|
|
abort(403)
|
|
bookings = Booking.query.filter_by(resource_id=resource_id).all()
|
|
events = [{'title': f"Booked by {booking.user.email.split('@')[0]}" if booking.user_id != current_user.id else "Your booking", 'start': booking.start_time.isoformat(), 'end': booking.end_time.isoformat(), 'color': '#28a745' if booking.user_id == current_user.id else '#dc3545'} for booking in bookings]
|
|
return jsonify(events)
|
|
|
|
@app.route("/api/resource-availability")
|
|
@login_required
|
|
def api_resource_availability():
|
|
if not current_user.company_id:
|
|
abort(403)
|
|
import pytz
|
|
resources = Resource.query.filter_by(company_id=current_user.company_id).all()
|
|
stockholm_tz = pytz.timezone('Europe/Stockholm')
|
|
now = datetime.now(stockholm_tz).replace(tzinfo=None)
|
|
original_width = 5712
|
|
original_height = 4284
|
|
resource_data = []
|
|
for resource in resources:
|
|
if resource.position_x is not None and resource.position_y is not None:
|
|
current_booking = Booking.query.filter(Booking.resource_id == resource.id, Booking.start_time <= now, Booking.end_time > now).first()
|
|
is_available = current_booking is None
|
|
x_percent = (resource.position_x / original_width) * 100
|
|
y_percent = (resource.position_y / original_height) * 100
|
|
resource_data.append({'id': resource.id, 'name': resource.name, 'x': x_percent, 'y': y_percent, 'available': is_available, 'image_url': resource.image_url})
|
|
return jsonify(resource_data)
|
|
|
|
@app.route("/api/update-resource-position/<int:resource_id>", methods=["POST"])
|
|
@login_required
|
|
def api_update_resource_position(resource_id):
|
|
if not current_user.company_id:
|
|
abort(403)
|
|
resource = Resource.query.get_or_404(resource_id)
|
|
if resource.company_id != current_user.company_id:
|
|
abort(403)
|
|
if not current_user.is_company_admin:
|
|
abort(403)
|
|
try:
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'success': False, 'error': 'No data provided'}), 400
|
|
original_width = 5712
|
|
original_height = 4284
|
|
x_original = int((data.get('x_percent', 0) / 100) * original_width)
|
|
y_original = int((data.get('y_percent', 0) / 100) * original_height)
|
|
resource.position_x = x_original
|
|
resource.position_y = y_original
|
|
db.session.commit()
|
|
return jsonify({'success': True, 'message': 'Resource position updated successfully'})
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
@app.route("/delete-booking/<int:booking_id>", methods=["POST"])
|
|
@login_required
|
|
def delete_booking(booking_id):
|
|
booking = Booking.query.get_or_404(booking_id)
|
|
if booking.user_id != current_user.id and not current_user.is_company_admin:
|
|
abort(403)
|
|
db.session.delete(booking)
|
|
db.session.commit()
|
|
flash("Booking deleted successfully", "success")
|
|
return redirect(url_for("my_bookings"))
|
|
|
|
@app.route("/admin/system", methods=["GET", "POST"])
|
|
@login_required
|
|
@system_admin_required
|
|
def system_admin():
|
|
if request.method == "POST":
|
|
action = request.form.get("action")
|
|
user_id = request.form.get("user_id")
|
|
user = User.query.get(user_id)
|
|
if user and user != current_user:
|
|
if action == "promote_system":
|
|
user.is_system_admin = True
|
|
elif action == "demote_system":
|
|
if User.query.filter_by(is_system_admin=True).count() > 1:
|
|
user.is_system_admin = False
|
|
elif action == "promote_company":
|
|
user.is_company_admin = True
|
|
elif action == "demote_company":
|
|
company_admins = User.query.filter_by(company_id=user.company_id, is_company_admin=True).count()
|
|
if company_admins > 1:
|
|
user.is_company_admin = False
|
|
elif action == "delete":
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
users = User.query.all()
|
|
companies = Company.query.all()
|
|
groups = Group.query.all()
|
|
return render_template("admin_system.html", users=users, companies=companies, groups=groups)
|
|
|
|
@app.route("/admin/company", methods=["GET", "POST"])
|
|
@login_required
|
|
@company_admin_required
|
|
def company_admin():
|
|
company = Company.query.get(current_user.company_id)
|
|
if request.method == "POST":
|
|
action = request.form.get("action")
|
|
if action == "upload_floor_map" and "floor_map" in request.files:
|
|
from werkzeug.utils import secure_filename
|
|
file = request.files["floor_map"]
|
|
if file.filename:
|
|
filename = secure_filename(f"company_{current_user.company_id}_{file.filename}")
|
|
upload_folder = os.path.join(app.root_path, "static", "floor_maps")
|
|
os.makedirs(upload_folder, exist_ok=True)
|
|
file.save(os.path.join(upload_folder, filename))
|
|
company.floor_map_filename = filename
|
|
db.session.commit()
|
|
flash("Floor map uploaded successfully", "success")
|
|
return redirect(url_for("company_admin"))
|
|
email = request.form.get("email")
|
|
if email:
|
|
existing_user = User.query.filter_by(email=email).first()
|
|
if existing_user:
|
|
return "User already exists"
|
|
token = os.urandom(24).hex()
|
|
new_invite = Invitation(email=email, token=token, company_id=current_user.company_id, invited_by=current_user.id, expires_at=datetime.now() + timedelta(days=3))
|
|
db.session.add(new_invite)
|
|
db.session.commit()
|
|
invite_link = url_for("register", token=token, _external=True)
|
|
return f"Invitation sent! Share this link: {invite_link}"
|
|
invitations = Invitation.query.filter_by(company_id=current_user.company_id)
|
|
company_users = User.query.filter_by(company_id=current_user.company_id).options(db.joinedload(User.company), db.joinedload(User.inviter))
|
|
resources = Resource.query.filter_by(company_id=current_user.company_id).all()
|
|
groups = Group.query.filter_by(company_id=current_user.company_id).all()
|
|
return render_template("admin_company.html", users=company_users, invitations=invitations, company=company, resources=resources, groups=groups)
|
|
|
|
@app.route("/api/groups", methods=["GET"])
|
|
@login_required
|
|
@has_permission_check
|
|
def api_groups():
|
|
if not current_user.company_id:
|
|
abort(403)
|
|
groups = Group.query.filter_by(company_id=current_user.company_id).all()
|
|
group_data = [{'id': g.id, 'name': g.name} for g in groups]
|
|
return jsonify(group_data)
|
|
|
|
@app.route("/api/get-users", methods=["GET"])
|
|
@login_required
|
|
@has_permission_check
|
|
def api_get_users():
|
|
if not current_user.company_id:
|
|
abort(403)
|
|
users = User.query.filter_by(company_id=current_user.company_id).all()
|
|
user_ids = [u.id for u in users]
|
|
return jsonify(user_ids)
|
|
|
|
@app.route("/create-group/<int:company_id>", methods=["GET", "POST"])
|
|
@login_required
|
|
@company_admin_required
|
|
def create_group_page(company_id):
|
|
if company_id != current_user.company_id:
|
|
abort(403)
|
|
|
|
if request.method == "POST":
|
|
name = request.form.get("name")
|
|
description = request.form.get("description")
|
|
user_ids = request.form.getlist("user_ids[]")
|
|
|
|
if not name:
|
|
flash("Group name is required", "danger")
|
|
return redirect(url_for("create_group_page", company_id=company_id))
|
|
|
|
existing_group = Group.query.filter_by(name=name, company_id=company_id).first()
|
|
if existing_group:
|
|
flash(f"Group '{name}' already exists", "danger")
|
|
return redirect(url_for("create_group_page", company_id=company_id))
|
|
|
|
new_group = Group(name=name, description=description, company_id=company_id)
|
|
db.session.add(new_group)
|
|
db.session.flush()
|
|
|
|
if user_ids:
|
|
new_group.user_ids = json.dumps(user_ids)
|
|
|
|
db.session.commit()
|
|
|
|
flash(f"Group '{name}' created successfully", "success")
|
|
return redirect(url_for("company_admin", company_id=company_id))
|
|
|
|
users = User.query.filter_by(company_id=company_id).all()
|
|
return render_template("create_group.html", company=company_id, users=users)
|
|
|
|
@app.route("/api/create-group/<int:company_id>", methods=["GET", "POST"])
|
|
@login_required
|
|
@company_admin_required
|
|
def create_group_api(company_id):
|
|
if company_id != current_user.company_id:
|
|
abort(403)
|
|
|
|
if request.method == "GET":
|
|
groups = Group.query.filter_by(company_id=company_id).all()
|
|
return jsonify([{'id': g.id, 'name': g.name} for g in groups])
|
|
|
|
name = request.form.get("name")
|
|
if not name:
|
|
abort(400)
|
|
existing_group = Group.query.filter_by(name=name, company_id=company_id).first()
|
|
if existing_group:
|
|
abort(409)
|
|
new_group = Group(name=name, company_id=company_id)
|
|
db.session.add(new_group)
|
|
db.session.commit()
|
|
flash(f"Group '{name}' created successfully", "success")
|
|
return jsonify({'success': True, 'id': new_group.id, 'name': name})
|
|
|
|
@app.route("/group-members/<int:group_id>", methods=["GET", "POST"])
|
|
@login_required
|
|
@company_admin_required
|
|
def group_members_page(group_id):
|
|
"""Page for managing group members"""
|
|
group = Group.query.get_or_404(group_id)
|
|
if group.company_id != current_user.company_id:
|
|
abort(403)
|
|
|
|
if request.method == "POST":
|
|
action = request.form.get("action")
|
|
user_id = request.form.get("user_id")
|
|
if action == "add" and user_id:
|
|
user = User.query.get(user_id)
|
|
if user and user.company_id == group.company_id:
|
|
try:
|
|
user_ids = json.loads(group.user_ids)
|
|
except (json.JSONDecodeError, TypeError):
|
|
user_ids = []
|
|
if str(user_id) not in user_ids and int(user_id) not in user_ids:
|
|
user_ids.append(int(user_id))
|
|
group.user_ids = json.dumps(user_ids)
|
|
db.session.commit()
|
|
flash(f"User {user.email} added to group '{group.name}'", "success")
|
|
else:
|
|
flash(f"User {user.email} already a member", "info")
|
|
else:
|
|
flash(f"User {user.email} not in this company", "warning")
|
|
elif action == "remove" and user_id:
|
|
try:
|
|
user_ids = json.loads(group.user_ids)
|
|
except (json.JSONDecodeError, TypeError):
|
|
user_ids = []
|
|
# Normalize to int for consistent comparison and removal
|
|
user_id_int = int(user_id)
|
|
user_id_str = str(user_id)
|
|
# Check both string and int versions to handle mixed types
|
|
is_in_list = user_id_str in user_ids or user_id_int in user_ids
|
|
if is_in_list:
|
|
# Remove the matching element (handle both types)
|
|
if user_id_str in user_ids:
|
|
user_ids.remove(user_id_str)
|
|
elif user_id_int in user_ids:
|
|
user_ids.remove(user_id_int)
|
|
group.user_ids = json.dumps(user_ids)
|
|
db.session.commit()
|
|
flash(f"User removed from group '{group.name}'", "success")
|
|
else:
|
|
flash(f"User not a member of this group", "warning")
|
|
return redirect(url_for("group_members_page", group_id=group_id))
|
|
|
|
company_users = User.query.filter_by(company_id=group.company_id).options(db.joinedload(User.inviter)).all()
|
|
if group.user_ids:
|
|
try:
|
|
member_ids_list = json.loads(group.user_ids)
|
|
except (json.JSONDecodeError, TypeError):
|
|
member_ids_list = []
|
|
else:
|
|
member_ids_list = []
|
|
return render_template("group_members.html", group=group, users=company_users, member_ids=member_ids_list)
|
|
|
|
@app.route("/api/group-members/<int:group_id>", methods=["GET", "POST"])
|
|
@login_required
|
|
@company_admin_required
|
|
def group_members_api(group_id):
|
|
"""API endpoint for group members - kept for backwards compatibility"""
|
|
group = Group.query.get_or_404(group_id)
|
|
if group.company_id != current_user.company_id:
|
|
abort(403)
|
|
|
|
if request.method == "GET":
|
|
members = [{'id': u.id, 'email': u.email} for u in User.query.filter_by(company_id=group.company_id).all()]
|
|
return jsonify({'members': members})
|
|
|
|
return jsonify({'success': True, 'members': []})
|
|
|
|
@app.route("/api/delete-group/<int:group_id>", methods=["POST"])
|
|
@login_required
|
|
@company_admin_required
|
|
def delete_group(group_id):
|
|
group = Group.query.get_or_404(group_id)
|
|
if group.company_id != current_user.company_id:
|
|
abort(403)
|
|
ResourcePermission.query.filter_by(group_id=group_id).delete()
|
|
db.session.delete(group)
|
|
db.session.commit()
|
|
flash(f"Group deleted successfully", "success")
|
|
return jsonify({'success': True})
|
|
|
|
@app.route("/profile", methods=["GET", "POST"])
|
|
@login_required
|
|
def profile():
|
|
"""Profile management page"""
|
|
if request.method == "POST":
|
|
action = request.form.get("action")
|
|
first_name = request.form.get("first_name", "")
|
|
last_name = request.form.get("last_name", "")
|
|
avatar_image = request.files.get("avatar")
|
|
new_password = request.form.get("new_password", "")
|
|
confirm_password = request.form.get("confirm_password", "")
|
|
current_password = request.form.get("current_password", "")
|
|
|
|
# Handle avatar upload
|
|
avatar_url = current_user.avatar_url
|
|
if avatar_image and avatar_image.filename:
|
|
from werkzeug.utils import secure_filename
|
|
if secure_filename(avatar_image.filename).lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
|
|
import uuid
|
|
filename = secure_filename(f"user_avatar_{uuid.uuid4().hex}_{avatar_image.filename}")
|
|
upload_folder = os.path.join(app.root_path, "static", "user_avatars")
|
|
os.makedirs(upload_folder, exist_ok=True)
|
|
avatar_image.save(os.path.join(upload_folder, filename))
|
|
avatar_url = f"user_avatars/{filename}"
|
|
# Delete old avatar if exists (only if it uses the user_avatars prefix)
|
|
if current_user.avatar_url and current_user.avatar_url.startswith('user_avatars/'):
|
|
old_filename = current_user.avatar_url.replace('user_avatars/', '')
|
|
old_path = os.path.join(app.root_path, "static", old_filename)
|
|
if os.path.exists(old_path):
|
|
os.remove(old_path)
|
|
|
|
# Handle password change
|
|
if action == "change_password":
|
|
if not current_password or not new_password:
|
|
flash("Please fill in all password fields", "danger")
|
|
return redirect(url_for("profile"))
|
|
if new_password != confirm_password:
|
|
flash("New passwords do not match", "danger")
|
|
return redirect(url_for("profile"))
|
|
# Verify current password before changing
|
|
if not check_password_hash(current_user.password, current_password):
|
|
flash("Current password is incorrect", "danger")
|
|
return redirect(url_for("profile"))
|
|
current_user.password = generate_password_hash(new_password)
|
|
db.session.commit()
|
|
flash("Password changed successfully", "success")
|
|
return redirect(url_for("profile"))
|
|
|
|
# Handle profile update
|
|
current_user.first_name = first_name or current_user.first_name
|
|
current_user.last_name = last_name or current_user.last_name
|
|
current_user.avatar_url = avatar_url
|
|
db.session.commit()
|
|
flash("Profile updated successfully", "success")
|
|
return redirect(url_for("profile"))
|
|
|
|
return render_template("profile.html", user=current_user)
|
|
|
|
@app.route("/forgot-password", methods=["GET", "POST"])
|
|
def forgot_password():
|
|
"""Forgot password page"""
|
|
if request.method == "POST":
|
|
email = request.form.get("email")
|
|
existing_user = User.query.filter_by(email=email).first()
|
|
if existing_user:
|
|
# Generate reset token
|
|
import secrets
|
|
token = secrets.token_urlsafe(16)
|
|
existing_user.reset_token = token
|
|
existing_user.reset_token_expires = datetime.now() + timedelta(hours=24)
|
|
db.session.commit()
|
|
flash("Password reset link generated. Please enter your new password.", "success")
|
|
return redirect(url_for("set_password", token=token))
|
|
flash("Email not found. Please check your email address and try again.", "danger")
|
|
return render_template("forgot_password.html")
|
|
|
|
@app.route("/set-password/<token>", methods=["GET", "POST"])
|
|
def set_password(token):
|
|
"""Set new password page"""
|
|
if request.method == "POST":
|
|
new_password = request.form.get("new_password")
|
|
confirm_password = request.form.get("confirm_password")
|
|
if new_password != confirm_password:
|
|
flash("Passwords do not match", "danger")
|
|
return redirect(url_for("set_password", token=token))
|
|
|
|
# Find user with this token
|
|
import secrets
|
|
for user in User.query.all():
|
|
if user.reset_token == token:
|
|
user.password = generate_password_hash(new_password)
|
|
user.reset_token = None
|
|
user.reset_token_expires = None
|
|
db.session.commit()
|
|
flash("Password updated successfully. Please log in again.", "success")
|
|
return redirect(url_for("login"))
|
|
flash("Invalid or expired reset token", "danger")
|
|
|
|
return render_template("set_password.html", token=token)
|
|
|
|
@app.route("/at-the-office")
|
|
@login_required
|
|
def at_the_office():
|
|
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
|
today_end = today + timedelta(days=1)
|
|
resources = Resource.query.filter_by(company_id=current_user.company_id).all()
|
|
resource_ids = [r.id for r in resources]
|
|
office_bookings = Booking.query.filter(Booking.start_time >= today, Booking.start_time < today_end, Booking.resource_id.in_(resource_ids)).all()
|
|
bookings_by_user = {}
|
|
for booking in office_bookings:
|
|
user_email = booking.user.email
|
|
if user_email not in bookings_by_user:
|
|
bookings_by_user[user_email] = []
|
|
bookings_by_user[user_email].append(booking)
|
|
office_booking_list = [{'email': user_email, 'bookings': user_bookings} for user_email, user_bookings in bookings_by_user.items()]
|
|
return render_template("at_the_office.html", bookings=office_booking_list)
|
|
|
|
def check_permission(resource_id, company_id, user_id):
|
|
resource = Resource.query.get(resource_id)
|
|
user = User.query.get(user_id)
|
|
if not user or user.company_id != company_id:
|
|
return False
|
|
permission = ResourcePermission.query.filter_by(resource_id=resource_id).first()
|
|
if not permission:
|
|
return True
|
|
if permission.permission_type == "everyone":
|
|
return True
|
|
if permission.permission_type == "group" and permission.group_id:
|
|
group = Group.query.get(permission.group_id)
|
|
if group:
|
|
if not group.user_ids or group.user_ids.strip() == "":
|
|
return False
|
|
group_user_ids = json.loads(group.user_ids)
|
|
if user_id in group_user_ids:
|
|
return True
|
|
else:
|
|
return False
|
|
if permission.permission_type == "users" and permission.user_ids:
|
|
allowed_users = json.loads(permission.user_ids)
|
|
if user_id in allowed_users:
|
|
return True
|
|
else:
|
|
return False
|
|
return True |