Newer
Older
# rdiffweb, A web interface to rdiff-backup repositories
# Copyright (C) 2014 rdiffweb contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
from builtins import str
from cherrypy._cpcompat import base64_decode
from cherrypy._cptools import HandlerTool
from future.utils import native_str
import logging
from rdiffweb.core import RdiffError, RdiffWarning
from rdiffweb.i18n import ugettext as _
from rdiffweb.page_main import MainPage
from cherrypy.lib import httpauth
# Define the logger
logger = logging.getLogger(__name__)
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
class BaseAuth(HandlerTool):
session_key = 'user'
def check_username_and_password(self, username, password):
"""Validate user credentials."""
logger.debug("check credentials for [%s]", username)
try:
userobj = cherrypy.request.app.userdb.login(username, password) # @UndefinedVariable
except:
logger.exception("fail to validate user credential")
raise RdiffWarning(_("Fail to validate user credential."))
if not userobj:
logger.warning("invalid username [%s] or password", username)
raise RdiffWarning(_("Invalid username or password."))
return userobj
def do_login(self, login, password, **kwargs):
"""Login. May raise redirect, or return True if request handled."""
# Validate username password. Raise an exception if invalid.
userobj = self.check_username_and_password(login, password)
# User successfully login.
logger.debug('setting request.login to %s', userobj)
cherrypy.serving.request.login = userobj
cherrypy.session[self.session_key] = userobj.username # @UndefinedVariable
self.on_login(userobj.username)
return True
def do_logout(self, **kwargs):
"""Logout. Return True if request handled."""
sess = cherrypy.session # @UndefinedVariable
username = sess.get(self.session_key)
sess[self.session_key] = None
cherrypy.serving.request.login = None
if username:
self.on_logout(username)
return True
def is_login(self):
"""Validate if the current user session is login."""
username = cherrypy.session.get(self.session_key) # @UndefinedVariable
if not username:
return False
userobj = cherrypy.request.app.userdb.get_user(username) # @UndefinedVariable
if not userobj:
return False
logger.debug('setting request.login to %s', userobj)
cherrypy.serving.request.login = userobj
return userobj
def on_login(self, username):
"""Called when user is login."""
pass
def on_logout(self, username):
"""Called when user is logout."""
pass
class AuthFormTool(BaseAuth):
"""
Tool used to control authentication to various ressources.
"""
def __init__(self):
BaseAuth.__init__(self, self.run, name='authform')
# Make sure to run after session tool (priority 50)
# Make sure to run after i18n tool (priority 60)
self._priority = 71
def do_check(self):
"""Assert username. Raise redirect, or return True if request handled."""
request = cherrypy.serving.request
response = cherrypy.serving.response
if not self.is_login():
url = cherrypy.url(qs=request.query_string)
logger.debug('no username, routing to login_screen with from_page %(url)r', locals())
response.body = self.login_screen(url)
if "Content-Length" in response.headers:
# Delete Content-Length header so finalize() recalcs it.
del response.headers["Content-Length"]
return True
def do_login(self, login, password, redirect=b'/', **kwargs):
"""Login. May raise redirect, or return True if request handled."""
response = cherrypy.serving.response
try:
super(AuthFormTool, self).do_login(login, password, **kwargs)
except RdiffError as e:
body = self.login_screen(redirect, login, str(e))
response.body = body
if "Content-Length" in response.headers:
# Delete Content-Length header so finalize() recalcs it.
del response.headers["Content-Length"]
return True
logger.debug('redirect user to %r', redirect or b"/")
raise cherrypy.HTTPRedirect(redirect or b"/")
def do_logout(self, redirect=b'/', **kwargs):
"""Logout. May raise redirect, or return True if request handled."""
super(AuthFormTool, self).do_logout(**kwargs)
raise cherrypy.HTTPRedirect(redirect)
def login_screen(self, redirect=b'/', username='', error_msg='', **kwargs):
app = cherrypy.request.app
main_page = MainPage(app)
# Re-encode the redirect for display in HTML
redirect = quote_url(redirect, safe=";/?:@&=+$,%")
params = {
'redirect': redirect,
'login': username,
'warning': error_msg
}
# Add welcome message to params. Try to load translated message.
params["welcome_msg"] = app.cfg.get_config("WelcomeMsg")
if hasattr(cherrypy.response, 'i18n'):
lang = cherrypy.response.i18n.locale.language
params["welcome_msg"] = app.cfg.get_config("WelcomeMsg[%s]" % (lang), params["welcome_msg"])
return main_page._compile_template("login.html", **params).encode("utf-8")
def run(self):
"""Called to execute this tool."""
request = cherrypy.serving.request
response = cherrypy.serving.response
path = request.path_info
if path.startswith(native_str('/login')):
if request.method != 'POST':
response.headers['Allow'] = "POST"
logger.warn('do_login requires POST, redirect to /')
# Redirect to / instead of showing error.
raise cherrypy.HTTPRedirect(b'/')
logger.info('routing %(path)r to do_login', locals())
return self.do_login(**request.params)
elif path.startswith(native_str('/logout')):
logger.info('routing %(path)r to do_logout', locals())
return self.do_logout(**request.params)
# No special path, validate session.
logger.info('no special path, running do_check')
return self.do_check()
cherrypy.tools.authform = AuthFormTool()
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
class BasicAuth(BaseAuth):
"""
Tool used to control authentication to various ressources.
"""
def __init__(self):
BaseAuth.__init__(self, self.run, name='authbasic')
# Make sure to run before authform (priority 71)
self._priority = 70
def run(self):
"""Filter used to restrict access to resource via HTTP basic auth."""
# Proceed with basic authentication.
request = cherrypy.serving.request
path = request.path_info
ah = request.headers.get('authorization')
if ah:
try:
scheme, params = ah.split(' ', 1)
if scheme.lower() == 'basic':
# Validate user credential.
login, password = base64_decode(params).split(':', 1)
logger.info('routing %(path)r to do_login', locals())
try:
return self.do_login(login, password)
except RdiffError as e:
logger.info('basic auth fail for user: %s', login, exc_info=1)
raise cherrypy.HTTPError(403)
except (ValueError, binascii.Error):
raise cherrypy.HTTPError(400, 'Bad Request')
logger.info('no authorization header, running is_login')
if not self.is_login():
# Inform the user-agent this path is protected.
cherrypy.serving.response.headers['www-authenticate'] = httpauth.basicAuth('rdiffweb')
raise cherrypy.HTTPError(401, "You are not authorized to access that resource")
cherrypy.tools.authbasic = BasicAuth()