diff --git a/rdiffweb/filter_authentication.py b/rdiffweb/filter_authentication.py index e6f3139db056320139d4f9f26b31d6af60eacf2a..8064f6b3db0f742dd754ad53a0ac0e157ef41d2a 100644 --- a/rdiffweb/filter_authentication.py +++ b/rdiffweb/filter_authentication.py @@ -38,31 +38,75 @@ from cherrypy.lib import httpauth logger = logging.getLogger(__name__) -def check_username_and_password(username, password): - """Validate user credentials.""" - logger.debug("check credentials for [%s]", username) - try: - userobj = cherrypy.request.app.userdb.login(username, password) # @UndefinedVariable - except: - logger.exception("fail to validate user credential") - raise RdiffWarning(_("Fail to validate user credential.")) - if not userobj: - logger.warning("invalid username [%s] or password", username) - raise RdiffWarning(_("Invalid username or password.")) - return userobj - - -class AuthFormTool(HandlerTool): +class BaseAuth(HandlerTool): + + session_key = 'user' + + def check_username_and_password(self, username, password): + """Validate user credentials.""" + logger.debug("check credentials for [%s]", username) + try: + userobj = cherrypy.request.app.userdb.login(username, password) # @UndefinedVariable + except: + logger.exception("fail to validate user credential") + raise RdiffWarning(_("Fail to validate user credential.")) + if not userobj: + logger.warning("invalid username [%s] or password", username) + raise RdiffWarning(_("Invalid username or password.")) + return userobj + + def do_login(self, login, password, **kwargs): + """Login. May raise redirect, or return True if request handled.""" + # Validate username password. Raise an exception if invalid. + userobj = self.check_username_and_password(login, password) + # User successfully login. + logger.debug('setting request.login to %s', userobj) + cherrypy.serving.request.login = userobj + cherrypy.session[self.session_key] = userobj.username # @UndefinedVariable + self.on_login(userobj.username) + return True + + def do_logout(self, **kwargs): + """Logout. Return True if request handled.""" + sess = cherrypy.session # @UndefinedVariable + username = sess.get(self.session_key) + sess[self.session_key] = None + cherrypy.serving.request.login = None + if username: + self.on_logout(username) + return True + + def is_login(self): + """Validate if the current user session is login.""" + username = cherrypy.session.get(self.session_key) # @UndefinedVariable + if not username: + return False + userobj = cherrypy.request.app.userdb.get_user(username) # @UndefinedVariable + if not userobj: + return False + logger.debug('setting request.login to %s', userobj) + cherrypy.serving.request.login = userobj + return userobj + + def on_login(self, username): + """Called when user is login.""" + pass + + def on_logout(self, username): + """Called when user is logout.""" + pass + + +class AuthFormTool(BaseAuth): """ Tool used to control authentication to various ressources. """ - session_key = 'user' def __init__(self): - HandlerTool.__init__(self, self.run, name='authform') + BaseAuth.__init__(self, self.run, name='authform') # Make sure to run after session tool (priority 50) # Make sure to run after i18n tool (priority 60) - self._priority = 70 + self._priority = 71 def do_check(self): """Assert username. Raise redirect, or return True if request handled.""" @@ -71,14 +115,6 @@ class AuthFormTool(HandlerTool): if not self.is_login(): url = cherrypy.url(qs=request.query_string) - - # If browser requesting text/plain. It's probably an Ajax call, don't - # redirect and raise an exception. - mtype = cherrypy.tools.accept.callable(['text/html', 'text/plain']) # @UndefinedVariable - if mtype == 'text/plain': - logger.debug('no username, requesting plain text, routing to 403 error from_page %(url)r', locals()) - raise cherrypy.HTTPError(403, _("Not logged in")) - logger.debug('no username, routing to login_screen with from_page %(url)r', locals()) response.body = self.login_screen(url) if "Content-Length" in response.headers: @@ -86,19 +122,11 @@ class AuthFormTool(HandlerTool): del response.headers["Content-Length"] return True - # Define the value of request.login to later in code we can reuse it. - username = cherrypy.session[self.session_key] # @UndefinedVariable - userobj = cherrypy.request.app.userdb.get_user(username) # @UndefinedVariable - if not userobj: - raise cherrypy.HTTPError(403) - logger.debug('setting request.login to %s', userobj) - cherrypy.serving.request.login = userobj - def do_login(self, login, password, redirect=b'/', **kwargs): """Login. May raise redirect, or return True if request handled.""" response = cherrypy.serving.response try: - userobj = check_username_and_password(login, password) + super(AuthFormTool, self).do_login(login, password, **kwargs) except RdiffError as e: body = self.login_screen(redirect, login, str(e)) response.body = body @@ -106,29 +134,15 @@ class AuthFormTool(HandlerTool): # Delete Content-Length header so finalize() recalcs it. del response.headers["Content-Length"] return True - # User successfully login. - logger.debug('setting request.login to %s', userobj) - cherrypy.serving.request.login = userobj - cherrypy.session[self.session_key] = userobj.username # @UndefinedVariable - self.on_login(userobj.username) + # Redirect user. logger.debug('redirect user to %r', redirect or b"/") raise cherrypy.HTTPRedirect(redirect or b"/") def do_logout(self, redirect=b'/', **kwargs): """Logout. May raise redirect, or return True if request handled.""" - sess = cherrypy.session # @UndefinedVariable - username = sess.get(self.session_key) - sess[self.session_key] = None - cherrypy.serving.request.login = None - if username: - self.on_logout(username) + super(AuthFormTool, self).do_logout(**kwargs) raise cherrypy.HTTPRedirect(redirect) - def is_login(self): - """Validate if the current user session is login.""" - username = cherrypy.session.get(self.session_key) # @UndefinedVariable - return username is not None - def login_screen(self, redirect=b'/', username='', error_msg='', **kwargs): app = cherrypy.request.app main_page = MainPage(app) @@ -150,14 +164,6 @@ class AuthFormTool(HandlerTool): return main_page._compile_template("login.html", **params).encode("utf-8") - def on_login(self, username): - """Called when user is login.""" - pass - - def on_logout(self, username): - """Called when user is logout.""" - pass - def run(self): """Called to execute this tool.""" request = cherrypy.serving.request @@ -185,37 +191,43 @@ class AuthFormTool(HandlerTool): cherrypy.tools.authform = AuthFormTool() -def authbasic(): +class BasicAuth(BaseAuth): + """ + Tool used to control authentication to various ressources. + """ + def __init__(self): + BaseAuth.__init__(self, self.run, name='authbasic') + # Make sure to run before authform (priority 71) + self._priority = 70 + + def run(self): + """Filter used to restrict access to resource via HTTP basic auth.""" + + # Proceed with basic authentication. + request = cherrypy.serving.request + path = request.path_info + ah = request.headers.get('authorization') + if ah: + try: + scheme, params = ah.split(' ', 1) + if scheme.lower() == 'basic': + # Validate user credential. + login, password = base64_decode(params).split(':', 1) + logger.info('routing %(path)r to do_login', locals()) + try: + return self.do_login(login, password) + except RdiffError as e: + logger.info('basic auth fail for user: %s', login, exc_info=1) + raise cherrypy.HTTPError(403) + + except (ValueError, binascii.Error): + raise cherrypy.HTTPError(400, 'Bad Request') + + logger.info('no authorization header, running is_login') + if not self.is_login(): + # Inform the user-agent this path is protected. + cherrypy.serving.response.headers['www-authenticate'] = httpauth.basicAuth('rdiffweb') + raise cherrypy.HTTPError(401, "You are not authorized to access that resource") - """Filter used to restrict access to resource via HTTP basic auth.""" - # Proceed with basic authentication. - request = cherrypy.serving.request - ah = request.headers.get('authorization') - if ah is not None: - try: - scheme, params = ah.split(' ', 1) - if scheme.lower() == 'basic': - # Validate user credential. - username, password = base64_decode(params).split(':', 1) - try: - userobj = check_username_and_password(username, password) - except RdiffError as e: - logger.info('basic auth fail for %s', username, e) - raise cherrypy.HTTPError(403) - - # User successfully login. - logger.debug('setting request.login to %s', userobj) - cherrypy.serving.request.login = userobj - return - - except (ValueError, binascii.Error): - raise cherrypy.HTTPError(400, 'Bad Request') - - # Inform the user-agent this path is protected. - cherrypy.serving.response.headers[ - 'www-authenticate'] = httpauth.basicAuth('rdiffweb') - raise cherrypy.HTTPError(401, "You are not authorized to access that resource") - - -cherrypy.tools.authbasic = cherrypy._cptools.HandlerTool(authbasic) +cherrypy.tools.authbasic = BasicAuth() diff --git a/rdiffweb/tests/test_page_login.py b/rdiffweb/tests/test_page_login.py index 979782bec272b108900cfe77ebb6061dcd99fbf0..f6df204f0fede41cc4d4afd1f352beb1dc56e121 100644 --- a/rdiffweb/tests/test_page_login.py +++ b/rdiffweb/tests/test_page_login.py @@ -23,6 +23,7 @@ Created on Dec 26, 2015 from __future__ import unicode_literals +from base64 import b64encode import logging import unittest @@ -41,11 +42,11 @@ class LoginPageTest(WebCase): def test_getpage_with_plaintext(self): """ - Requesting plain text without being authenticated should return a 403 - error instead of login form. + Requesting plain text without being authenticated should show the login form. """ self.getPage('/', headers=[("Accept", "text/plain")]) - self.assertStatus('403 Forbidden') + self.assertStatus('200 OK') + self.assertInBody('login') def test_getpage_with_redirect_get(self): """ @@ -102,7 +103,7 @@ class LoginPageTest(WebCase): def test_getpage_without_username(self): """ - Check if error 405 is raised when requesting /login without a username. + Check if error is raised when requesting /login without a username. """ self.getPage('/login/', method='GET') self.assertStatus('303 See Other') @@ -118,6 +119,75 @@ class LoginPageTest(WebCase): self.assertStatus('200 OK') self.assertInBody('Invalid username or password.') + def test_getapi_without_authorization(self): + """ + Check if 401 is return when authorization is not provided. + """ + self.getPage('/api/') + self.assertStatus('401 Unauthorized') + + def test_getapi_without_username(self): + """ + Check if error 403 is raised when requesting /login without a username. + """ + self.getPage('/api/', headers=[("Authorization", "Basic " + b64encode(b":admin123").decode('ascii'))]) + self.assertStatus('403 Forbidden') + + def test_getapi_with_empty_password(self): + """ + Check if 401 is return when authorization is not provided. + """ + self.getPage('/api/', headers=[("Authorization", "Basic " + b64encode(b"admin:").decode('ascii'))]) + self.assertStatus('403 Forbidden') + + def test_getapi_with_authorization(self): + """ + Check if 200 is return when authorization is not provided. + """ + self.getPage('/api/', headers=[("Authorization", "Basic " + b64encode(b"admin:admin123").decode('ascii'))]) + self.assertStatus('200 OK') + + def test_getapi_with_session(self): + """ + Check if 200 is return when authorization is not provided. + """ + b = {'login': 'admin', + 'password': 'admin123'} + self.getPage('/login/', method='POST', body=b) + self.assertStatus('303 See Other') + self.getPage('/') + self.assertStatus('200 OK') + # Get api using the same session. + self.getPage('/api/') + self.assertStatus('200 OK') + + +class LogoutPageTest(WebCase): + + def test_getpage_without_login(self): + # Accessing logout page directly will redirect to "/". + self.getPage('/logout/') + self.assertStatus('303 See Other') + self.assertHeaderItemValue('Location', self.baseurl + '/') + + def test_getpage_with_login(self): + # Login + b = {'login': 'admin', 'password': 'admin123'} + self.getPage('/login/', method='POST', body=b) + self.assertStatus('303 See Other') + # Get content of a page. + self.getPage("/prefs/") + self.assertStatus('200 OK') + # Then logout + self.getPage('/logout/') + self.assertStatus('303 See Other') + self.assertHeaderItemValue('Location', self.baseurl + '/') + # Get content of a page. + self.getPage("/prefs/") + self.assertStatus('200 OK') + self.assertInBody('login') + + if __name__ == "__main__": # import sys;sys.argv = ['', 'Test.testName'] logging.basicConfig(level=logging.DEBUG)