Skip to content
filter_authentication.py 8.75 KiB
Newer Older
Patrik Dufresne's avatar
Patrik Dufresne committed
#!/usr/bin/python
# -*- coding: utf-8 -*-
# rdiffweb, A web interface to rdiff-backup repositories
# Copyright (C) 2014 rdiffweb contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import
from __future__ import unicode_literals
Patrik Dufresne's avatar
Patrik Dufresne committed

from builtins import str
Patrik Dufresne's avatar
Patrik Dufresne committed
import cherrypy
from cherrypy._cpcompat import base64_decode
from cherrypy._cptools import HandlerTool
from future.utils import native_str
import logging
from rdiffweb.core import RdiffError, RdiffWarning
from rdiffweb.i18n import ugettext as _
from rdiffweb.page_main import MainPage
from rdiffweb.rdw_helpers import quote_url
from cherrypy.lib import httpauth
Patrik Dufresne's avatar
Patrik Dufresne committed

# Define the logger
logger = logging.getLogger(__name__)
class BaseAuth(HandlerTool):

    session_key = 'user'

    def check_username_and_password(self, username, password):
        """Validate user credentials."""
        logger.debug("check credentials for [%s]", username)
        try:
            userobj = cherrypy.request.app.userdb.login(username, password)  # @UndefinedVariable
        except:
            logger.exception("fail to validate user credential")
            raise RdiffWarning(_("Fail to validate user credential."))
        if not userobj:
            logger.warning("invalid username [%s] or password", username)
            raise RdiffWarning(_("Invalid username or password."))
        return userobj

    def do_login(self, login, password, **kwargs):
        """Login. May raise redirect, or return True if request handled."""
        # Validate username password. Raise an exception if invalid.
        userobj = self.check_username_and_password(login, password)
        # User successfully login.
        logger.debug('setting request.login to %s', userobj)
        cherrypy.serving.request.login = userobj
        cherrypy.session[self.session_key] = userobj.username  # @UndefinedVariable
        self.on_login(userobj.username)
        return True

    def do_logout(self, **kwargs):
        """Logout. Return True if request handled."""
        sess = cherrypy.session  # @UndefinedVariable
        username = sess.get(self.session_key)
        sess[self.session_key] = None
        cherrypy.serving.request.login = None
        if username:
            self.on_logout(username)
        return True

    def is_login(self):
        """Validate if the current user session is login."""
        username = cherrypy.session.get(self.session_key)  # @UndefinedVariable
        if not username:
            return False
        userobj = cherrypy.request.app.userdb.get_user(username)  # @UndefinedVariable
        if not userobj:
            return False
        logger.debug('setting request.login to %s', userobj)
        cherrypy.serving.request.login = userobj
        return userobj

    def on_login(self, username):
        """Called when user is login."""
        pass

    def on_logout(self, username):
        """Called when user is logout."""
        pass


class AuthFormTool(BaseAuth):
    """
    Tool used to control authentication to various ressources.
    """

    def __init__(self):
        BaseAuth.__init__(self, self.run, name='authform')
        # Make sure to run after session tool (priority 50)
        # Make sure to run after i18n tool (priority 60)

    def do_check(self):
        """Assert username. Raise redirect, or return True if request handled."""
        request = cherrypy.serving.request
        response = cherrypy.serving.response

        if not self.is_login():
            url = cherrypy.url(qs=request.query_string)
            logger.debug('no username, routing to login_screen with from_page %(url)r', locals())
            response.body = self.login_screen(url)
            if "Content-Length" in response.headers:
                # Delete Content-Length header so finalize() recalcs it.
                del response.headers["Content-Length"]
            return True

    def do_login(self, login, password, redirect=b'/', **kwargs):
        """Login. May raise redirect, or return True if request handled."""
        response = cherrypy.serving.response
        try:
            super(AuthFormTool, self).do_login(login, password, **kwargs)
        except RdiffError as e:
            body = self.login_screen(redirect, login, str(e))
            response.body = body
            if "Content-Length" in response.headers:
                # Delete Content-Length header so finalize() recalcs it.
                del response.headers["Content-Length"]
            return True
        logger.debug('redirect user to %r', redirect or b"/")
        raise cherrypy.HTTPRedirect(redirect or b"/")

    def do_logout(self, redirect=b'/', **kwargs):
        """Logout. May raise redirect, or return True if request handled."""
        super(AuthFormTool, self).do_logout(**kwargs)
        raise cherrypy.HTTPRedirect(redirect)

    def login_screen(self, redirect=b'/', username='', error_msg='', **kwargs):
        app = cherrypy.request.app
        main_page = MainPage(app)

        # Re-encode the redirect for display in HTML
        redirect = quote_url(redirect, safe=";/?:@&=+$,%")

        params = {
            'redirect': redirect,
            'login': username,
            'warning': error_msg
        }

        # Add welcome message to params. Try to load translated message.
        params["welcome_msg"] = app.cfg.get_config("WelcomeMsg")
        if hasattr(cherrypy.response, 'i18n'):
            lang = cherrypy.response.i18n.locale.language
            params["welcome_msg"] = app.cfg.get_config("WelcomeMsg[%s]" % (lang), params["welcome_msg"])

        return main_page._compile_template("login.html", **params).encode("utf-8")

    def run(self):
        """Called to execute this tool."""
        request = cherrypy.serving.request
        response = cherrypy.serving.response

        path = request.path_info
        if path.startswith(native_str('/login')):
            if request.method != 'POST':
                response.headers['Allow'] = "POST"
                logger.warn('do_login requires POST, redirect to /')
                # Redirect to / instead of showing error.
                raise cherrypy.HTTPRedirect(b'/')
            logger.info('routing %(path)r to do_login', locals())
            return self.do_login(**request.params)

        elif path.startswith(native_str('/logout')):
            logger.info('routing %(path)r to do_logout', locals())
            return self.do_logout(**request.params)

        # No special path, validate session.
        logger.info('no special path, running do_check')
        return self.do_check()


cherrypy.tools.authform = AuthFormTool()
class BasicAuth(BaseAuth):
    """
    Tool used to control authentication to various ressources.
    """
    def __init__(self):
        BaseAuth.__init__(self, self.run, name='authbasic')
        # Make sure to run before authform (priority 71)
        self._priority = 70

    def run(self):
        """Filter used to restrict access to resource via HTTP basic auth."""

        # Proceed with basic authentication.
        request = cherrypy.serving.request
        path = request.path_info
        ah = request.headers.get('authorization')
        if ah:
            try:
                scheme, params = ah.split(' ', 1)
                if scheme.lower() == 'basic':
                    # Validate user credential.
                    login, password = base64_decode(params).split(':', 1)
                    logger.info('routing %(path)r to do_login', locals())
                    try:
                        return self.do_login(login, password)
                    except RdiffError as e:
                        logger.info('basic auth fail for user: %s', login, exc_info=1)
                        raise cherrypy.HTTPError(403)

            except (ValueError, binascii.Error):
                raise cherrypy.HTTPError(400, 'Bad Request')

        logger.info('no authorization header, running is_login')
        if not self.is_login():
            # Inform the user-agent this path is protected.
            cherrypy.serving.response.headers['www-authenticate'] = httpauth.basicAuth('rdiffweb')
            raise cherrypy.HTTPError(401, "You are not authorized to access that resource")
cherrypy.tools.authbasic = BasicAuth()