Files
redash/redash/authentication/ldap_auth.py
2024-05-18 07:36:29 -07:00

101 lines
3.0 KiB
Python

import logging
import sys
from flask import Blueprint, flash, redirect, render_template, request, url_for
from flask_login import current_user
from redash import settings
try:
from ldap3 import Connection, Server
from ldap3.utils.conv import escape_filter_chars
except ImportError:
if settings.LDAP_LOGIN_ENABLED:
sys.exit(
"The ldap3 library was not found. This is required to use LDAP authentication. Rebuild the Docker image installing the `ldap3` poetry dependency group."
)
from redash.authentication import (
create_and_login_user,
get_next_path,
logout_and_redirect_to_index,
)
from redash.authentication.org_resolving import current_org
from redash.handlers.base import org_scoped_rule
logger = logging.getLogger("ldap_auth")
blueprint = Blueprint("ldap_auth", __name__)
@blueprint.route(org_scoped_rule("/ldap/login"), methods=["GET", "POST"])
def login(org_slug=None):
index_url = url_for("redash.index", org_slug=org_slug)
unsafe_next_path = request.args.get("next", index_url)
next_path = get_next_path(unsafe_next_path)
if not settings.LDAP_LOGIN_ENABLED:
logger.error("Cannot use LDAP for login without being enabled in settings")
return redirect(url_for("redash.index", next=next_path))
if current_user.is_authenticated:
return redirect(next_path)
if request.method == "POST":
ldap_user = auth_ldap_user(request.form["email"], request.form["password"])
if ldap_user is not None:
user = create_and_login_user(
current_org,
ldap_user[settings.LDAP_DISPLAY_NAME_KEY][0],
ldap_user[settings.LDAP_EMAIL_KEY][0],
)
if user is None:
return logout_and_redirect_to_index()
return redirect(next_path or url_for("redash.index"))
else:
flash("Incorrect credentials.")
return render_template(
"login.html",
org_slug=org_slug,
next=next_path,
email=request.form.get("email", ""),
show_password_login=True,
username_prompt=settings.LDAP_CUSTOM_USERNAME_PROMPT,
hide_forgot_password=True,
)
def auth_ldap_user(username, password):
clean_username = escape_filter_chars(username)
server = Server(settings.LDAP_HOST_URL, use_ssl=settings.LDAP_SSL)
if settings.LDAP_BIND_DN is not None:
conn = Connection(
server,
settings.LDAP_BIND_DN,
password=settings.LDAP_BIND_DN_PASSWORD,
authentication=settings.LDAP_AUTH_METHOD,
auto_bind=True,
)
else:
conn = Connection(server, auto_bind=True)
conn.search(
settings.LDAP_SEARCH_DN,
settings.LDAP_SEARCH_TEMPLATE % {"username": clean_username},
attributes=[settings.LDAP_DISPLAY_NAME_KEY, settings.LDAP_EMAIL_KEY],
)
if len(conn.entries) == 0:
return None
user = conn.entries[0]
if not conn.rebind(user=user.entry_dn, password=password):
return None
return user