Skip to content
Commits on Source (42)
...@@ -2,6 +2,7 @@ image: $DOCKER_REGISTRY/pdsl/rdiffweb-build:$CI_PIPELINE_IID ...@@ -2,6 +2,7 @@ image: $DOCKER_REGISTRY/pdsl/rdiffweb-build:$CI_PIPELINE_IID
variables: variables:
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip" PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"
PIP_INDEX_URL: https://nexus.patrikdufresne.com/repository/pypi-proxy/simple/
stages: stages:
- prebuild - prebuild
...@@ -33,6 +34,36 @@ prebuild-docker-image: ...@@ -33,6 +34,36 @@ prebuild-docker-image:
- nosetests-*.xml - nosetests-*.xml
expire_in: 1 day expire_in: 1 day
py27-jinja26:
<<: *tox
py27-jinja27:
<<: *tox
py27-jinja28:
<<: *tox
py27-jinja29:
<<: *tox
py27-jinja210:
<<: *tox
py3-jinja26:
<<: *tox
py3-jinja27:
<<: *tox
py3-jinja28:
<<: *tox
py3-jinja29:
<<: *tox
py3-jinja210:
<<: *tox
py27-cherrypy35: py27-cherrypy35:
<<: *tox <<: *tox
...@@ -185,8 +216,9 @@ rdiffweb_deploy_demo: ...@@ -185,8 +216,9 @@ rdiffweb_deploy_demo:
- mkdir -p ~/.ssh - mkdir -p ~/.ssh
- chmod 700 ~/.ssh - chmod 700 ~/.ssh
script: script:
- export RDIFFWEB_VERSION=$(python ./setup.py --version)
- python ./setup.py --version - python ./setup.py --version
- export RDIFFWEB_VERSION=$(python ./setup.py --version)
- echo RDIFFWEB_VERSION=$RDIFFWEB_VERSION
- git clone "http://${GITLAB_USR}:${GITLAB_PWD}@git.patrikdufresne.com/pdsl/ansible-config.git" - git clone "http://${GITLAB_USR}:${GITLAB_PWD}@git.patrikdufresne.com/pdsl/ansible-config.git"
- cd ansible-config - cd ansible-config
- ansible-playbook rdiffweb.yml -i pdsl --extra-vars "ansible_user=root rdiffweb_version=$RDIFFWEB_VERSION rdiffweb_default_repositories=true" --limit arbuc - ansible-playbook rdiffweb.yml -i pdsl --extra-vars "ansible_user=root rdiffweb_version=$RDIFFWEB_VERSION rdiffweb_default_repositories=true" --limit arbuc
...@@ -75,6 +75,28 @@ Professional support for Rdiffweb is available by contacting [Patrik Dufresne Se ...@@ -75,6 +75,28 @@ Professional support for Rdiffweb is available by contacting [Patrik Dufresne Se
# Changelog # Changelog
## 1.1.0 (2019-10-31)
This release focus on improving the admin area and building the fundation for repository access control list (ACL).
* Update documentation from PDSL web site
* Improve the navigation bar layout
* Update the login page headline
* Update jinja2 version to allow 2.10.x
* Show server log in admin area
* Reduce code smell
* Add System information in admin area
* Validate credential using local database before LDAP
* Reffactoring templates macros
* Enhance user's view search bar
* Change repository URL to username/repopath
* Add System information in admin area
* Improve testcases
* Clean-up obsolete code
* Fix issue with captital case encoding name
* Fix compilation of less files
* Fix google font import
## 1.0.3 (2019-10-04) ## 1.0.3 (2019-10-04)
* Removing the auto update repos * Removing the auto update repos
......
...@@ -146,4 +146,24 @@ This is out-of-scope. The following is only provided as a suggestion and is in ...@@ -146,4 +146,24 @@ This is out-of-scope. The following is only provided as a suggestion and is in
no way a complete reference. no way a complete reference.
See [/extras/nginx](../extras/nginx) folder for example of nginx configuration See [/extras/nginx](../extras/nginx) folder for example of nginx configuration
to be used with rdiffweb. to be used with rdiffweb.
\ No newline at end of file
## Other settings
| Parameter | Description | Required | Example |
| --- | --- | --- | --- |
| ServerHost | Define the IP address to listen to. Use 0.0.0.0 to listen on all interfaces. | No | 127.0.0.1 |
| ServerPort | Define the host to listen to. Default to 8080 | No | 80 |
| LogLevel | Define the log level. ERROR, WARN, INFO, DEBUG | No | DEBUG |
| Environment | Define the type of environment: development, production. This is used to limit the information shown to the user when an error occur. | No | production |
| HeaderName | Define the application name displayed in the title bar and header menu. | No | My Backup |
| DefaultTheme | Define the default theme. Either: default or orange | No | orange |
| WelcomeMsg | Replace the headling displayed in the login page | No | - |
| LogFile | Define the location of the log file | No | /var/log/rdiffweb.log |
| LogAccessFile | Define the location of the access log file | No | /var/log/rdiffweb-access.log |
| RemoveOlderTime | Time when to execute the remove older task | No | 22:00 |
| SQLiteDBFile | Location of the SQLite database | No | /etc/rdiffweb/rdw.db |
| AddMissingUser | True to create users from LDAP when the credential are valid. | No | True |
| AdminUser | Define the name of the default admin user to be created | No | admin |
| FavIcon | Define the FavIcon to be displayed in the browser title | No | /etc/rdiffweb/my-fav.ico |
| TempDir | Define an alternate temp directory to be used when restoring files. | No | /retore/ |
...@@ -61,13 +61,6 @@ class Controller(object): ...@@ -61,13 +61,6 @@ class Controller(object):
def app(self): def app(self):
return cherrypy.request.app return cherrypy.request.app
@property
def currentuser(self):
"""
Get the current user.
"""
return cherrypy.serving.request.login
def _compile_template(self, template_name, **kwargs): def _compile_template(self, template_name, **kwargs):
""" """
Used to generate a standard HTML page using the given template. Used to generate a standard HTML page using the given template.
...@@ -78,11 +71,11 @@ class Controller(object): ...@@ -78,11 +71,11 @@ class Controller(object):
parms = { parms = {
"lang": loc.language, "lang": loc.language,
} }
if self.currentuser: if self.app.currentuser:
parms.update({ parms.update({
"is_login": False, "is_login": False,
'username': self.currentuser.username, 'username': self.app.currentuser.username,
'is_admin': self.currentuser.is_admin, 'is_admin': self.app.currentuser.is_admin,
}) })
# Append custom branding # Append custom branding
......
#!/usr/bin/python
# -*- coding: utf-8 -*-
# rdiffweb, A web interface to rdiff-backup repositories
# Copyright (C) 2019 rdiffweb contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import cherrypy
'''
Created on Oct. 21, 2019
@author: Patrik Dufresne
'''
def is_admin():
# Authentication may have remove the default handle to let the user login.
if cherrypy.serving.request.handler is None:
return True
# Otherwise, validate the permissions.
if not cherrypy.serving.request.login or not cherrypy.serving.request.login.is_admin:
raise cherrypy.HTTPError("403 Forbidden")
# Make sure it's running after authentication (priority = 71)
cherrypy.tools.is_admin = cherrypy.Tool('before_handler', is_admin, priority = 72)
...@@ -20,62 +20,138 @@ from __future__ import absolute_import ...@@ -20,62 +20,138 @@ from __future__ import absolute_import
from __future__ import unicode_literals from __future__ import unicode_literals
from builtins import str from builtins import str
import grp
import logging import logging
import os import os
import platform
import pwd
import subprocess
import sys
import cherrypy import cherrypy
from rdiffweb.controller import Controller, validate_isinstance import psutil
from rdiffweb.controller import Controller
from rdiffweb.core import RdiffError, RdiffWarning from rdiffweb.core import RdiffError, RdiffWarning
from rdiffweb.core import rdw_spider_repos from rdiffweb.core import rdw_spider_repos
from rdiffweb.core.config import Option
from rdiffweb.core.i18n import ugettext as _ from rdiffweb.core.i18n import ugettext as _
import subprocess from rdiffweb.core.rdw_templating import do_format_filesize as filesize
# Define the logger # Define the logger
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def get_log_files(app): def get_pyinfo():
""" if platform.dist()[0] != '' and platform.dist()[1] != '':
Return a list of log files to be shown in admin area. yield _('OS Version'), '%s %s (%s %s)' % (platform.system(), platform.release(), platform.dist()[0].capitalize(), platform.dist()[1])
""" else:
logfiles = [app.cfg.get('logfile'), app.cfg.get('logaccessfile')] yield _('OS Version'), '%s %s' % (platform.system(), platform.release())
logfiles = [fn for fn in logfiles if fn] if hasattr(os, 'path'): yield _('OS Path'), os.environ['PATH']
return [os.path.basename(fn) for fn in logfiles] if hasattr(sys, 'version'): yield _('Python Version'), ''.join(sys.version)
if hasattr(sys, 'subversion'): yield _('Python Subversion'), ', '.join(sys.subversion)
if hasattr(sys, 'prefix'): yield _('Python Prefix'), sys.prefix
def get_log_data(app, logfile, num=2000): if hasattr(sys, 'executable'): yield _('Python Executable'), sys.executable
""" if hasattr(sys, 'path'): yield _('Python Path'), ', '.join(sys.path)
Return a list of log files to be shown in admin area.
"""
logfiles = [app.cfg.get('logfile'), app.cfg.get('logaccessfile')] def get_osinfo():
logfiles = [fn for fn in logfiles if fn]
for fn in logfiles: def gr_name(gid):
if logfile == os.path.basename(fn): try:
try: return grp.getgrgid(gid).gr_name
return subprocess.check_output(['tail', '-n', str(num), fn]).decode('utf-8') except:
except: return
logging.exception('fail to get log file content')
return "Error getting file content" def pw_name(uid):
try:
return pwd.getpwuid(os.getuid()).pw_name
except:
return
if hasattr(sys, 'getfilesystemencoding'): yield _('File System Encoding'), sys.getfilesystemencoding()
if hasattr(os, 'getcwd'):
yield _('Current Working Directory'), os.getcwd()
if hasattr(os, 'getegid'):
yield _('Effective Group'), '%s (%s)' % (os.getegid(), gr_name(os.getegid()))
if hasattr(os, 'geteuid'):
yield _('Effective User'), '%s (%s)' % (os.geteuid(), pw_name(os.geteuid))
if hasattr(os, 'getgid'):
yield _('Group'), '%s (%s)' % (os.getgid(), gr_name(os.getgid()))
if hasattr(os, 'getuid'):
yield _('User'), '%s (%s)' % (os.getuid(), gr_name(os.getuid()))
if hasattr(os, 'getgroups'):
yield _('Group Membership'), ', '.join(['%s (%s)' % (gid, gr_name(gid)) for gid in os.getgroups()])
try:
if hasattr(os, 'getpid') and hasattr(os, 'getppid'):
yield _('Process ID'), ('%s (parent: %s)' % (os.getpid(), os.getppid()))
except:
pass
def get_hwinfo():
if hasattr(os, 'getloadavg'):
yield _('Load Average'), ', '.join(map(str, map(lambda x: round(x, 2), os.getloadavg())))
yield _('CPU Count'), psutil.cpu_count()
meminfo = psutil.virtual_memory()
yield _('Memory usage'), '%s / %s' % (filesize(meminfo.used), filesize(meminfo.total))
def get_pkginfo():
import jinja2
yield _('Jinja2 Version'), getattr(jinja2, '__version__')
yield _('CherryPy Version'), getattr(cherrypy, '__version__')
from rdiffweb.core.user_sqlite import sqlite3 # @UnresolvedImport
yield _('SQLite Version'), getattr(sqlite3, 'version')
try:
import ldap
yield _('LDAP Version'), getattr(ldap, '__version__')
yield _('LDAP SASL Support (Cyrus-SASL)'), ldap.SASL_AVAIL # @UndefinedVariable
yield _('LDAP TLS Support (OpenSSL)'), ldap.TLS_AVAIL # @UndefinedVariable
except:
pass
@cherrypy.tools.is_admin()
class AdminPage(Controller): class AdminPage(Controller):
"""Administration pages. Allow to manage users database.""" """Administration pages. Allow to manage users database."""
logfile = Option('logfile')
logaccessfile = Option('logaccessfile')
def _check_user_root_dir(self, directory): def _check_user_root_dir(self, directory):
"""Raised an exception if the directory is not valid.""" """Raised an exception if the directory is not valid."""
if not os.access(directory, os.F_OK) or not os.path.isdir(directory): if not os.access(directory, os.F_OK) or not os.path.isdir(directory):
raise RdiffWarning(_("User root directory %s is not accessible!") % directory) raise RdiffWarning(_("User root directory %s is not accessible!") % directory)
def _get_log_files(self):
"""
Return a list of log files to be shown in admin area.
"""
logfiles = [self.logfile, self.logaccessfile]
logfiles = [fn for fn in logfiles if fn]
return [os.path.basename(fn) for fn in logfiles]
def _get_log_data(self, logfile, num=2000):
"""
Return a list of log files to be shown in admin area.
"""
logfiles = [self.logfile, self.logaccessfile]
logfiles = [fn for fn in logfiles if fn]
for fn in logfiles:
if logfile == os.path.basename(fn):
try:
return subprocess.check_output(['tail', '-n', str(num), fn], stderr=subprocess.STDOUT).decode('utf-8')
except:
logging.exception('fail to get log file content')
return "Error getting file content"
@cherrypy.expose @cherrypy.expose
def default(self): def default(self):
# Check if user is an administrator
if not self.app.currentuser or not self.app.currentuser.is_admin:
raise cherrypy.HTTPError(403)
user_count = 0 user_count = 0
repo_count = 0 repo_count = 0
for user in self.app.userdb.list(): for user in self.app.userdb.users():
user_count += 1 user_count += 1
repo_count += len(user.repos) repo_count += len(user.repos)
...@@ -87,13 +163,9 @@ class AdminPage(Controller): ...@@ -87,13 +163,9 @@ class AdminPage(Controller):
@cherrypy.expose @cherrypy.expose
def logs(self, filename=u""): def logs(self, filename=u""):
# Check if user is an administrator
if not self.app.currentuser or not self.app.currentuser.is_admin:
raise cherrypy.HTTPError(403)
# Check if the filename is valid. # Check if the filename is valid.
data = "" data = ""
logfiles = get_log_files(self.app) logfiles = self._get_log_files()
if logfiles: if logfiles:
if not filename: if not filename:
filename = logfiles[0] filename = logfiles[0]
...@@ -101,7 +173,7 @@ class AdminPage(Controller): ...@@ -101,7 +173,7 @@ class AdminPage(Controller):
if filename not in logfiles: if filename not in logfiles:
raise cherrypy.HTTPError(404) raise cherrypy.HTTPError(404)
data = get_log_data(self.app, filename) data = self._get_log_data(filename)
params = { params = {
"filename": filename, "filename": filename,
...@@ -111,16 +183,9 @@ class AdminPage(Controller): ...@@ -111,16 +183,9 @@ class AdminPage(Controller):
return self._compile_template("admin_logs.html", **params) return self._compile_template("admin_logs.html", **params)
@cherrypy.expose @cherrypy.expose
def users(self, userfilter=u"", usersearch=u"", action=u"", username=u"", def users(self, criteria=u"", search=u"", action=u"", username=u"",
email=u"", password=u"", user_root=u"", is_admin=u""): email=u"", password=u"", user_root=u"", is_admin=u""):
# Check if user is an administrator
if not self.app.currentuser or not self.app.currentuser.is_admin:
raise cherrypy.HTTPError(403)
validate_isinstance(userfilter, str)
validate_isinstance(usersearch, str)
# If we're just showing the initial page, just do that # If we're just showing the initial page, just do that
params = {} params = {}
if self._is_submit(): if self._is_submit():
...@@ -133,34 +198,41 @@ class AdminPage(Controller): ...@@ -133,34 +198,41 @@ class AdminPage(Controller):
except RdiffError as e: except RdiffError as e:
params['error'] = str(e) params['error'] = str(e)
# Get page parameters params.update({
params.update( "criteria": criteria,
self._users_get_params_for_page(userfilter, usersearch)) "search": search,
"users": list(self.app.userdb.users(search=search, criteria=criteria))})
# Build users page # Build users page
return self._compile_template("admin_users.html", **params) return self._compile_template("admin_users.html", **params)
def _users_get_params_for_page(self, userfilter, usersearch): @cherrypy.expose
users = [{"username": user.username, def repos(self, criteria=u"", search=u""):
"email": user.email, params = {
"is_admin": user.is_admin, "criteria": criteria,
"user_root": user.user_root, "search": search,
} for user in self.app.userdb.list()] "repos": list(self.app.userdb.repos(search=search, criteria=criteria))
}
# Apply the filters. return self._compile_template("admin_repos.html", **params)
filtered_users = users
if userfilter == "admins": @cherrypy.expose
filtered_users = [x for x in filtered_users if x["is_admin"]] def sysinfo(self):
# Apply the search.
if usersearch: params = {
filtered_users = [x for x in filtered_users "version": self.app.version,
if usersearch in x["username"] or "plugins": self.app.plugins,
usersearch in x["email"]] # Config
"cfg": {
return {"userfilter": userfilter, k: '********' if 'password' in k else v
"usersearch": usersearch, for k, v in self.app.cfg.items()},
"filtered_users": filtered_users, # System Info entries
"users": users} "pyinfo": list(get_pyinfo()),
"osinfo": list(get_osinfo()),
"hwinfo": list(get_hwinfo()),
"ldapinfo": list(get_pkginfo()),
}
return self._compile_template("admin_sysinfo.html", **params)
def _users_handle_action(self, action, username, email, password, def _users_handle_action(self, action, username, email, password,
user_root, is_admin): user_root, is_admin):
...@@ -173,7 +245,7 @@ class AdminPage(Controller): ...@@ -173,7 +245,7 @@ class AdminPage(Controller):
# Don't allow the user to changes it's "admin" state. # Don't allow the user to changes it's "admin" state.
is_admin = self.app.currentuser.is_admin is_admin = self.app.currentuser.is_admin
is_admin = str(is_admin).lower() in ['true', '1'] is_admin = str(is_admin).lower() in ['on', 'true', '1']
# Fork the behaviour according to the action. # Fork the behaviour according to the action.
if action == "edit": if action == "edit":
......
...@@ -42,22 +42,14 @@ class BrowsePage(Controller): ...@@ -42,22 +42,14 @@ class BrowsePage(Controller):
@cherrypy.expose @cherrypy.expose
def default(self, path=b"", restore="", limit='10'): def default(self, path=b"", restore="", limit='10'):
validate_isinstance(path, bytes)
validate_isinstance(restore, str) validate_isinstance(restore, str)
limit = validate_int(limit) limit = validate_int(limit)
restore = bool(restore) restore = bool(restore)
logger.debug("browsing [%r]", path)
# Check user access to the given repo & path # Check user access to the given repo & path
(repo_obj, path_obj) = self.app.currentuser.get_repo_path(path) (repo_obj, path_obj) = self.app.currentuser.get_repo_path(path)
# Build the parameters # Build the parameters
parms = self._get_parms_for_page(repo_obj, path_obj, restore, limit)
return self._compile_template("browse.html", **parms)
def _get_parms_for_page(self, repo_obj, path_obj, restore, limit):
# Build "parent directories" links # Build "parent directories" links
# TODO This Should to me elsewhere. It contains logic related to librdiff encoding. # TODO This Should to me elsewhere. It contains logic related to librdiff encoding.
parents = [] parents = []
...@@ -84,12 +76,12 @@ class BrowsePage(Controller): ...@@ -84,12 +76,12 @@ class BrowsePage(Controller):
# Get list of actual directory entries # Get list of actual directory entries
dir_entries = path_obj.dir_entries[::-1] dir_entries = path_obj.dir_entries[::-1]
return {"limit": limit, parms = {
"repo_name": repo_obj.display_name, "repo" : repo_obj,
"repo_path": repo_obj.path, "path" : path_obj,
"path": path_obj.path, "limit": limit,
"dir_entries": dir_entries, "dir_entries": dir_entries,
"isdir": path_obj.isdir, "parents": parents,
"parents": parents, "restore_dates": restore_dates,
"restore_dates": restore_dates, "warning": warning}
"warning": warning} return self._compile_template("browse.html", **parms)
...@@ -37,18 +37,7 @@ _logger = logging.getLogger(__name__) ...@@ -37,18 +37,7 @@ _logger = logging.getLogger(__name__)
@poppath('graph') @poppath('graph')
class GraphsPage(Controller): class GraphsPage(Controller):
def _data(self, path, **kwargs): def _data(self, repo_obj, **kwargs):
assert isinstance(path, bytes)
_logger.debug("repo stats [%r]", path)
# Check user permissions
try:
repo_obj = self.app.currentuser.get_repo(path)
except librdiff.FileError as e:
_logger.exception("invalid user path [%r]", path)
return self._compile_error_template(str(e))
attrs = [ attrs = [
'starttime', 'endtime', 'elapsedtime', 'sourcefiles', 'sourcefilesize', 'starttime', 'endtime', 'elapsedtime', 'sourcefiles', 'sourcefilesize',
'mirrorfiles', 'mirrorfilesize', 'newfiles', 'newfilesize', 'deletedfiles', 'mirrorfiles', 'mirrorfilesize', 'newfiles', 'newfilesize', 'deletedfiles',
...@@ -73,42 +62,31 @@ class GraphsPage(Controller): ...@@ -73,42 +62,31 @@ class GraphsPage(Controller):
return func() return func()
def _page(self, path, graph, **kwargs): def _page(self, repo_obj, graph, **kwargs):
""" """
Generic method to show graphs. Generic method to show graphs.
""" """
_logger.debug("repo graphs [%r][%r]", graph, path)
# Check user permissions
try:
repo_obj = self.app.currentuser.get_repo(path)
except librdiff.FileError as e:
_logger.exception("invalid user path [%r]", path)
return self._compile_error_template(str(e))
# Check if any action to process. # Check if any action to process.
params = { params = {
'repo_name': repo_obj.display_name, 'repo': repo_obj,
'repo_path': repo_obj.path,
'graphs': graph, 'graphs': graph,
} }
# Generate page. # Generate page.
return self._compile_template("graphs_%s.html" % graph, **params) return self._compile_template("graphs_%s.html" % graph, **params)
@cherrypy.expose @cherrypy.expose
def index(self, graph, path, **kwargs): def default(self, graph, path, **kwargs):
""" """
Called to show every graphs Called to show every graphs
""" """
validate_isinstance(path, bytes)
validate_isinstance(graph, bytes) validate_isinstance(graph, bytes)
graph = graph.decode('ascii', 'replace') graph = graph.decode('ascii', 'replace')
repo_obj = self.app.currentuser.get_repo(path)
# check if data should be shown. # check if data should be shown.
if graph == 'data': if graph == 'data':
return self._data(path, **kwargs) return self._data(repo_obj, **kwargs)
elif graph in ['activities', 'errors', 'files', 'sizes', 'times']: elif graph in ['activities', 'errors', 'files', 'sizes', 'times']:
return self._page(path, graph, **kwargs) return self._page(repo_obj, graph, **kwargs)
# Raise error. # Raise error.
raise cherrypy.NotFound() raise cherrypy.NotFound()
...@@ -23,7 +23,7 @@ import logging ...@@ -23,7 +23,7 @@ import logging
import cherrypy import cherrypy
from rdiffweb.controller import Controller, validate_isinstance, validate_int from rdiffweb.controller import Controller, validate_int
from rdiffweb.controller.dispatch import poppath from rdiffweb.controller.dispatch import poppath
from rdiffweb.core.i18n import ugettext as _ from rdiffweb.core.i18n import ugettext as _
...@@ -37,7 +37,6 @@ class HistoryPage(Controller): ...@@ -37,7 +37,6 @@ class HistoryPage(Controller):
@cherrypy.expose @cherrypy.expose
def default(self, path=b"", limit='10', **kwargs): def default(self, path=b"", limit='10', **kwargs):
validate_isinstance(path, bytes)
limit = validate_int(limit) limit = validate_int(limit)
repo_obj = self.app.currentuser.get_repo(path) repo_obj = self.app.currentuser.get_repo(path)
...@@ -50,8 +49,7 @@ class HistoryPage(Controller): ...@@ -50,8 +49,7 @@ class HistoryPage(Controller):
parms = { parms = {
"limit": limit, "limit": limit,
"repo_name": repo_obj.display_name, "repo": repo_obj,
"repo_path": repo_obj.path,
"history_entries": repo_obj.get_history_entries(numLatestEntries=limit, reverse=True), "history_entries": repo_obj.get_history_entries(numLatestEntries=limit, reverse=True),
"warning": warning, "warning": warning,
} }
......
...@@ -38,12 +38,7 @@ class LocationsPage(Controller): ...@@ -38,12 +38,7 @@ class LocationsPage(Controller):
def index(self): def index(self):
# Get page params # Get page params
params = { params = {
"repos": [{ "repos": self.app.currentuser.repo_objs,
"path": repo_obj.path,
"name_split": repo_obj.display_name.strip('/').split('/'),
"last_backup_date": repo_obj.last_backup_date,
'status': repo_obj.status,
} for repo_obj in self.app.currentuser.repo_objs],
"disk_usage": self.app.currentuser.disk_usage, "disk_usage": self.app.currentuser.disk_usage,
} }
# Render the page. # Render the page.
......
...@@ -20,11 +20,10 @@ from __future__ import absolute_import ...@@ -20,11 +20,10 @@ from __future__ import absolute_import
from __future__ import unicode_literals from __future__ import unicode_literals
import logging import logging
from rdiffweb.controller import Controller, validate_isinstance, validate from rdiffweb.controller import Controller, validate, validate_int
from rdiffweb.controller.dispatch import poppath from rdiffweb.controller.dispatch import poppath
from rdiffweb.core.i18n import ugettext as _ from rdiffweb.core.i18n import ugettext as _
from builtins import bytes
import cherrypy import cherrypy
# Define the logger # Define the logger
...@@ -35,79 +34,60 @@ _logger = logging.getLogger(__name__) ...@@ -35,79 +34,60 @@ _logger = logging.getLogger(__name__)
class SettingsPage(Controller): class SettingsPage(Controller):
@cherrypy.expose @cherrypy.expose
def index(self, path=b""): def default(self, path=b"", action=None, **kwargs):
validate_isinstance(path, bytes)
repo_obj = self.app.currentuser.get_repo(path) repo_obj = self.app.currentuser.get_repo(path)
if action == 'delete':
self._delete(repo_obj, **kwargs)
if kwargs.get('keepdays'):
return self._remove_older(repo_obj, **kwargs)
elif kwargs.get('new_encoding'):
return self._set_encoding(repo_obj, **kwargs)
elif kwargs.get('maxage'):
return self._set_maxage(repo_obj, **kwargs)
# Get page data. # Get page data.
params = { params = {
'repo_name': repo_obj.display_name, 'repo': repo_obj,
'repo_path': repo_obj.path,
'current_encoding': repo_obj.encoding,
'keepdays': repo_obj.keepdays, 'keepdays': repo_obj.keepdays,
} }
# Generate page. # Generate page.
return self._compile_template("settings.html", **params) return self._compile_template("settings.html", **params)
def _delete(self, repo_obj, confirm=None, redirect='/', **kwargs):
@poppath() """
class SetEncodingPage(Controller): Delete the repository.
"""
@cherrypy.expose() # Validate the name
def index(self, path=b'', new_encoding=None): validate(confirm)
if confirm != repo_obj.display_name:
_logger.debug("do not delete repo, bad confirmation %r != %r", confirm, repo_obj.display_name)
raise cherrypy.HTTPError(400)
# Refresh repository list
repo_obj.delete()
raise cherrypy.HTTPRedirect(redirect)
def _set_encoding(self, repo_obj, new_encoding=None, **kwargs):
""" """
Update repository encoding via Ajax. Update repository encoding via Ajax.
""" """
validate_isinstance(path, bytes)
validate(new_encoding) validate(new_encoding)
repo_obj = self.app.currentuser.get_repo(path)
try: try:
repo_obj.encoding = new_encoding repo_obj.encoding = new_encoding
except ValueError: except ValueError:
raise cherrypy.HTTPError(400, _("invalid encoding value")) raise cherrypy.HTTPError(400, _("invalid encoding value"))
return _("Updated") return _("Updated")
def _set_maxage(self, repo_obj, maxage=None, **kwargs):
@poppath()
class RemoveOlderPage(Controller):
@cherrypy.expose()
def index(self, path=b"", keepdays=None):
validate_isinstance(path, bytes)
validate(keepdays)
# Get repository object from user database.
r = self.app.currentuser.get_repo(path)
# Update the database.
try:
r.keepdays = keepdays
except ValueError:
_logger.warning("invalid keepdays value %r", keepdays)
raise cherrypy.HTTPError(400, _("Invalid value"))
return _("Updated")
@poppath()
class DeleteRepoPage(Controller):
@cherrypy.expose
def index(self, path=b"", **kwargs):
""" """
Delete the repository. Update repository maxage via Ajax.
""" """
validate_isinstance(path, bytes) validate_int(maxage)
repo_obj.maxage = maxage
# Check user permissions return _("Updated")
repo_obj = self.app.currentuser.get_repo(path)
# Validate the name
confirm_name = kwargs.get('confirm_name', None)
if confirm_name != repo_obj.display_name:
_logger.info("bad confirmation %r != %r", confirm_name, repo_obj.display_name)
raise cherrypy.HTTPError(400)
# Refresh repository list
repo_obj.delete()
raise cherrypy.HTTPRedirect("/") def _remove_older(self, repo_obj, keepdays=None, **kwargs):
validate_int(keepdays)
# Update the database.
repo_obj.keepdays = keepdays
return _("Updated")
...@@ -30,7 +30,6 @@ from rdiffweb.controller.dispatch import poppath ...@@ -30,7 +30,6 @@ from rdiffweb.controller.dispatch import poppath
from rdiffweb.core import librdiff from rdiffweb.core import librdiff
from rdiffweb.core import rdw_helpers from rdiffweb.core import rdw_helpers
# Define the logger # Define the logger
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -40,7 +39,6 @@ class StatusPage(Controller): ...@@ -40,7 +39,6 @@ class StatusPage(Controller):
@cherrypy.expose @cherrypy.expose
def default(self, path=b"", date="", failures=""): def default(self, path=b"", date="", failures=""):
validate_isinstance(path, bytes)
validate_isinstance(date, str) validate_isinstance(date, str)
# Validate date # Validate date
...@@ -63,7 +61,7 @@ class StatusPage(Controller): ...@@ -63,7 +61,7 @@ class StatusPage(Controller):
if path: if path:
user_repos = [self.app.currentuser.get_repo(path)] user_repos = [self.app.currentuser.get_repo(path)]
else: else:
user_repos = self.app.currentuser.repos user_repos = self.app.currentuser.repo_objs
failuresOnly = failures != "" failuresOnly = failures != ""
messages = self._getUserMessages(user_repos, not failuresOnly, True, startTime, endTime) messages = self._getUserMessages(user_repos, not failuresOnly, True, startTime, endTime)
...@@ -80,23 +78,14 @@ class StatusPage(Controller): ...@@ -80,23 +78,14 @@ class StatusPage(Controller):
earliest_date, earliest_date,
latest_date): latest_date):
user_root = self.app.currentuser.user_root
repoErrors = [] repoErrors = []
allBackups = [] allBackups = []
for repo in repos: for repo_obj in repos:
repo = repo.lstrip("/") backups = repo_obj.get_history_entries(-1, earliest_date, latest_date)
try: allBackups += [{"repo": repo_obj,
repo_obj = librdiff.RdiffRepo(user_root, repo) "date": backup.date,
backups = repo_obj.get_history_entries(-1, earliest_date, "size": backup.size,
latest_date) "errors": backup.errors} for backup in backups]
allBackups += [{"repo_path": repo_obj.path,
"repo_name": repo_obj.display_name,
"date": backup.date,
"size": backup.size,
"errors": backup.errors} for backup in backups]
except librdiff.FileError:
logger.exception("invalid user path %s" % repo)
allBackups.sort(key=lambda x: x["date"]) allBackups.sort(key=lambda x: x["date"])
failedBackups = [x for x in allBackups if x["errors"]] failedBackups = [x for x in allBackups if x["errors"]]
...@@ -118,9 +107,7 @@ class StatusPage(Controller): ...@@ -118,9 +107,7 @@ class StatusPage(Controller):
{"is_success": False, {"is_success": False,
"date": date, "date": date,
"repoErrors": [], "repoErrors": [],
"backups": [], "backups": []})
"repo_path": job["repo_path"],
"repo_name": job["repo_name"]})
userMessages.append(job) userMessages.append(job)
# generate success messages (publish date is most recent backup date) # generate success messages (publish date is most recent backup date)
......
...@@ -22,12 +22,9 @@ User can control the notification period. ...@@ -22,12 +22,9 @@ User can control the notification period.
from __future__ import unicode_literals from __future__ import unicode_literals
import logging import logging
from rdiffweb.controller import Controller
from rdiffweb.core import RdiffError, RdiffWarning
from rdiffweb.core.i18n import ugettext as _
from builtins import str from rdiffweb.controller import Controller, validate_int
import cherrypy from rdiffweb.core.i18n import ugettext as _
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
...@@ -44,39 +41,18 @@ class NotificationPref(Controller): ...@@ -44,39 +41,18 @@ class NotificationPref(Controller):
for repo in self.app.currentuser.repo_objs: for repo in self.app.currentuser.repo_objs:
# Get value received for the repo. # Get value received for the repo.
value = kwargs.get(repo.name, None) value = kwargs.get(repo.name, None)
if value is None: if value:
continue # Update the maxage
try: repo.maxage = validate_int(value)
value = int(value)
except:
continue
# Update the maxage
repo.maxage = value
def render_prefs_panel(self, panelid, **kwargs): # @UnusedVariable def render_prefs_panel(self, panelid, action=None, **kwargs): # @UnusedVariable
# Process the parameters. # Process the parameters.
params = dict() if action == "set_notification_info":
action = kwargs.get('action') self._handle_set_notification_info(**kwargs)
if action:
try:
if action == "set_notification_info":
self._handle_set_notification_info(**kwargs)
else:
_logger.info("unknown action: %s", action)
raise cherrypy.NotFound("Unknown action")
except RdiffWarning as e:
params['warning'] = str(e)
except RdiffError as e:
params['error'] = str(e)
except Exception as e:
_logger.warning("unknown error processing action", exc_info=True)
params['error'] = _("Unknown error")
params.update({ params = {
'email': self.app.currentuser.email, 'email': self.app.currentuser.email,
'repos': [ 'repos': self.app.currentuser.repo_objs,
{'name': r.name, 'maxage': r.maxage} }
for r in self.app.currentuser.repo_objs],
})
return "prefs_notification.html", params return "prefs_notification.html", params
...@@ -52,7 +52,7 @@ class APITest(WebCase): ...@@ -52,7 +52,7 @@ class APITest(WebCase):
self.assertEqual(repo.get('last_backup_date'), '2016-02-02T16:30:40-05:00') self.assertEqual(repo.get('last_backup_date'), '2016-02-02T16:30:40-05:00')
self.assertEqual(repo.get('status'), 'ok') self.assertEqual(repo.get('status'), 'ok')
self.assertEqual(repo.get('display_name'), 'testcases') self.assertEqual(repo.get('display_name'), 'testcases')
self.assertEqual(repo.get('encoding'), 'utf_8') self.assertEqual(repo.get('encoding'), 'utf-8')
self.assertEqual(repo.get('name'), 'testcases') self.assertEqual(repo.get('name'), 'testcases')
self.assertEqual(repo.get('maxage'), 0) self.assertEqual(repo.get('maxage'), 0)
......
...@@ -44,7 +44,7 @@ class CheckLinkTest(WebCase): ...@@ -44,7 +44,7 @@ class CheckLinkTest(WebCase):
""" """
Crawl all the pages to find broken links. Crawl all the pages to find broken links.
""" """
ignore = ['/restore/testcases/BrokenSymlink.*', '/browse/testcases/BrokenSymlink.*'] ignore = ['/restore/admin/testcases/BrokenSymlink.*', '/browse/admin/testcases/BrokenSymlink.*']
done = set(['#', '/logout/']) done = set(['#', '/logout/'])
todo = OrderedDict() todo = OrderedDict()
todo["/"] = "/" todo["/"] = "/"
......
...@@ -90,7 +90,7 @@ class AdminUsersAsAdminTest(AbstractAdminTest): ...@@ -90,7 +90,7 @@ class AdminUsersAsAdminTest(AbstractAdminTest):
self.assertNotInBody("/var/backups/") self.assertNotInBody("/var/backups/")
self.assertInBody("/tmp/") self.assertInBody("/tmp/")
# Check with filters # Check with filters
self.getPage("/admin/users/?userfilter=admins") self.getPage("/admin/users/?criteria=admins")
self.assertInBody("test2") self.assertInBody("test2")
finally: finally:
self._delete_user("test2") self._delete_user("test2")
...@@ -114,7 +114,7 @@ class AdminUsersAsAdminTest(AbstractAdminTest): ...@@ -114,7 +114,7 @@ class AdminUsersAsAdminTest(AbstractAdminTest):
self.assertNotInBody("/var/backups/") self.assertNotInBody("/var/backups/")
self.assertInBody("/tmp/") self.assertInBody("/tmp/")
# Check with filter # Check with filter
self.getPage("/admin/users/?userfilter=admins") self.getPage("/admin/users/?criteria=admins")
self.assertInBody("Éric") self.assertInBody("Éric")
finally: finally:
self._delete_user("Éric") self._delete_user("Éric")
...@@ -196,20 +196,20 @@ class AdminUsersAsAdminTest(AbstractAdminTest): ...@@ -196,20 +196,20 @@ class AdminUsersAsAdminTest(AbstractAdminTest):
self._edit_user("test4", "test1@test.com", "test1", "/var/backups/", False) self._edit_user("test4", "test1@test.com", "test1", "/var/backups/", False)
self.assertStatus(500) self.assertStatus(500)
def test_userfilter(self): def test_criteria(self):
""" """
Check if admin filter is working. Check if admin criteria is working.
""" """
self.getPage("/admin/users/?userfilter=admins") self.getPage("/admin/users/?criteria=admins")
self.assertNotInBody("test1") self.assertNotInBody("test1")
def test_usersearch(self): def test_search(self):
""" """
Check if user search is working. Check if user search is working.
""" """
self.getPage("/admin/users/?usersearch=tes") self.getPage("/admin/users?search=tes")
self.assertInBody("test1") self.assertInBody("test1")
self.getPage("/admin/users/?usersearch=coucou") self.getPage("/admin/users?search=coucou")
self.assertNotInBody("test1") self.assertNotInBody("test1")
...@@ -245,11 +245,18 @@ class AdminUsersAsUserTest(AbstractAdminTest): ...@@ -245,11 +245,18 @@ class AdminUsersAsUserTest(AbstractAdminTest):
self._edit_user("test", "test1@test.com", "test", "/var/invalid/", False) self._edit_user("test", "test1@test.com", "test", "/var/invalid/", False)
self.assertStatus(403) self.assertStatus(403)
def test_list(self): def test_users(self):
""" """
Check if listing user is forbidden. Check if listing user is forbidden.
""" """
self.getPage("/admin/users/") self.getPage("/admin/users")
self.assertStatus(403)
def test_repos(self):
"""
Check if listing user is forbidden.
"""
self.getPage("/admin/repos")
self.assertStatus(403) self.assertStatus(403)
...@@ -288,6 +295,59 @@ class AdminLogsTest(WebCase): ...@@ -288,6 +295,59 @@ class AdminLogsTest(WebCase):
self.assertStatus(200) self.assertStatus(200)
self.assertInBody("rdiffweb.log") self.assertInBody("rdiffweb.log")
self.assertInBody("Error getting file content") self.assertInBody("Error getting file content")
def test_logs_with_invalid_file(self):
self.app.cfg['logfile'] = './rdiffweb.log'
self.app.cfg['logaccessfile'] = './rdiffweb-access.log'
self.getPage("/admin/logs/invalid")
self.assertStatus(404)
class AdminReposTest(WebCase):
login = True
reset_testcases = True
def test_repos(self):
self.getPage("/admin/repos")
self.assertStatus(200)
def test_repos_with_search(self):
# Search something that exists
self.getPage("/admin/repos?search=test")
self.assertStatus(200)
self.assertInBody(self.REPO)
# Search something that doesn't exists
self.getPage("/admin/repos?search=coucou")
self.assertStatus(200)
self.assertNotInBody(self.REPO)
self.assertInBody("No repository found")
def test_repos_with_criteria(self):
# Search something that exists
self.getPage("/admin/repos?criteria=ok")
self.assertStatus(200)
self.assertInBody(self.REPO)
# Search something that exists
self.getPage("/admin/repos?criteria=failed")
self.assertStatus(200)
self.assertNotInBody(self.REPO)
self.assertInBody("No repository found")
class AdminSysinfoTest(WebCase):
login = True
def test_sysinfo(self):
self.getPage("/admin/sysinfo")
self.assertStatus(200)
self.assertInBody("Operating System Info")
self.assertInBody("Python Info")
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -73,19 +73,19 @@ class BrowsePageTest(WebCase): ...@@ -73,19 +73,19 @@ class BrowsePageTest(WebCase):
self.assertInBody("/Fichier%20%40%20%3Croot%3E?date=") self.assertInBody("/Fichier%20%40%20%3Croot%3E?date=")
# Répertoire (@vec) {càraçt#èrë} $épêcial # Répertoire (@vec) {càraçt#èrë} $épêcial
self.assertInBody("Répertoire (@vec) {càraçt#èrë} $épêcial") self.assertInBody("Répertoire (@vec) {càraçt#èrë} $épêcial")
self.assertInBody("/R%C3%A9pertoire%20%28%40vec%29%20%7Bc%C3%A0ra%C3%A7t%23%C3%A8r%C3%AB%7D%20%24%C3%A9p%C3%AAcial/") self.assertInBody("/R%C3%A9pertoire%20%28%40vec%29%20%7Bc%C3%A0ra%C3%A7t%23%C3%A8r%C3%AB%7D%20%24%C3%A9p%C3%AAcial")
# test\test # test\test
self.assertInBody("test\\test") self.assertInBody("test\\test")
self.assertInBody("/test%5Ctest/") self.assertInBody("/test%5Ctest")
# <F!chïer> (@vec) {càraçt#èrë} $épêcial # <F!chïer> (@vec) {càraçt#èrë} $épêcial
self.assertInBody("&lt;F!chïer&gt; (@vec) {càraçt#èrë} $épêcial") self.assertInBody("&lt;F!chïer&gt; (@vec) {càraçt#èrë} $épêcial")
self.assertInBody("/%3CF%21ch%C3%AFer%3E%20%28%40vec%29%20%7Bc%C3%A0ra%C3%A7t%23%C3%A8r%C3%AB%7D%20%24%C3%A9p%C3%AAcial?date=") self.assertInBody("/%3CF%21ch%C3%AFer%3E%20%28%40vec%29%20%7Bc%C3%A0ra%C3%A7t%23%C3%A8r%C3%AB%7D%20%24%C3%A9p%C3%AAcial?date=")
# Répertoire Existant # Répertoire Existant
self.assertInBody("Répertoire Existant") self.assertInBody("Répertoire Existant")
self.assertInBody("/R%C3%A9pertoire%20Existant/") self.assertInBody("/R%C3%A9pertoire%20Existant")
# Répertoire Supprimé # Répertoire Supprimé
self.assertInBody("Répertoire Supprimé") self.assertInBody("Répertoire Supprimé")
self.assertInBody("/R%C3%A9pertoire%20Supprim%C3%A9/") self.assertInBody("/R%C3%A9pertoire%20Supprim%C3%A9")
# Quoted folder # Quoted folder
self.assertInBody("Char Z to quote") self.assertInBody("Char Z to quote")
self.assertInBody("/Char%20%3B090%20to%20quote") self.assertInBody("/Char%20%3B090%20to%20quote")
...@@ -191,6 +191,10 @@ class BrowsePageTest(WebCase): ...@@ -191,6 +191,10 @@ class BrowsePageTest(WebCase):
self._browse("invalid/", "") self._browse("invalid/", "")
self.assertStatus(404) self.assertStatus(404)
self.assertInBody("Not Found") self.assertInBody("Not Found")
self._browse("admin/invalid/", "")
self.assertStatus(404)
self.assertInBody("Not Found")
def test_invalid_path(self): def test_invalid_path(self):
""" """
...@@ -213,20 +217,47 @@ class BrowsePageTest(WebCase): ...@@ -213,20 +217,47 @@ class BrowsePageTest(WebCase):
""" """
# Change the user setting to match single repo. # Change the user setting to match single repo.
user = self.app.userdb.get_user(self.USERNAME) user = self.app.userdb.get_user(self.USERNAME)
user.user_root = os.path.join(self.app.testcases, self.REPO) user.user_root = os.path.join(self.app.testcases, 'testcases')
user.repos = [''] user.repos = ['']
# Check if listing locations is working # Check if listing locations is working
self.getPage('/') self.getPage('/')
self.assertStatus('200 OK') self.assertStatus('200 OK')
self.assertInBody(self.REPO) self.assertInBody('testcases')
# Check if browsing is working. # Check if browsing is working.
self.getPage('/browse/') self.getPage('/browse/admin')
self.assertStatus('200 OK') self.assertStatus('200 OK')
self.assertInBody('Files') self.assertInBody('Files')
# Check sub directory browsing # Check sub directory browsing
self.getPage('/browse/Revisions/') self.getPage('/browse/admin/Revisions/')
self.assertStatus('200 OK') self.assertStatus('200 OK')
self.assertInBody('Files') self.assertInBody('Files')
def test_as_another_user(self):
# Create a nother user with admin right
user_obj = self.app.userdb.add_user('anotheruser', 'password')
user_obj.user_root = self.app.testcases
user_obj.repos = ['testcases']
self.getPage('/browse/admin')
self.assertStatus('404 Not Found')
# Browse admin's repos
self.getPage('/browse/anotheruser')
self.assertStatus('404 Not Found')
self.getPage('/browse/anotheruser/testcases')
self.assertStatus('200 OK')
self.getPage('/browse/anotheruser/testcases/Revisions/')
self.assertStatus('200 OK')
# Remove admin right
admin = self.app.userdb.get_user('admin')
admin.is_admin = 0
# Browse admin's repos
self.getPage('/browse/anotheruser/testcases')
self.assertStatus('403 Forbidden')
self.getPage('/browse/anotheruser/testcases/Revisions/')
self.assertStatus('403 Forbidden')
if __name__ == "__main__": if __name__ == "__main__":
# import sys;sys.argv = ['', 'Test.testName'] # import sys;sys.argv = ['', 'Test.testName']
......
...@@ -36,11 +36,28 @@ class SettingsTest(WebCase): ...@@ -36,11 +36,28 @@ class SettingsTest(WebCase):
reset_testcases = True reset_testcases = True
def _stats(self, repo): def test_activities(self):
return self.getPage("/graphs/data/" + repo + "/") self.getPage("/graphs/activities/" + self.REPO + "/")
self.assertStatus('200 OK')
def test_errors(self):
self.getPage("/graphs/errors/" + self.REPO + "/")
self.assertStatus('200 OK')
def test_files(self):
self.getPage("/graphs/files/" + self.REPO + "/")
self.assertStatus('200 OK')
def test_stats(self): def test_sizes(self):
self._stats(self.REPO) self.getPage("/graphs/sizes/" + self.REPO + "/")
self.assertStatus('200 OK')
def test_times(self):
self.getPage("/graphs/times/" + self.REPO + "/")
self.assertStatus('200 OK')
def test_data(self):
self.getPage("/graphs/data/" + self.REPO + "/")
self.assertStatus('200 OK') self.assertStatus('200 OK')
# Check header # Check header
expected = b"""date,starttime,endtime,elapsedtime,sourcefiles,sourcefilesize,mirrorfiles,mirrorfilesize,newfiles,newfilesize,deletedfiles,deletedfilesize,changedfiles,changedsourcesize,changedmirrorsize,incrementfiles,incrementfilesize,totaldestinationsizechange,errors expected = b"""date,starttime,endtime,elapsedtime,sourcefiles,sourcefilesize,mirrorfiles,mirrorfilesize,newfiles,newfilesize,deletedfiles,deletedfilesize,changedfiles,changedsourcesize,changedmirrorsize,incrementfiles,incrementfilesize,totaldestinationsizechange,errors
...@@ -69,6 +86,24 @@ class SettingsTest(WebCase): ...@@ -69,6 +86,24 @@ class SettingsTest(WebCase):
""" """
self.assertEquals(expected, self.body) self.assertEquals(expected, self.body)
def test_as_another_user(self):
# Create a nother user with admin right
user_obj = self.app.userdb.add_user('anotheruser', 'password')
user_obj.user_root = self.app.testcases
user_obj.repos = ['testcases']
self.getPage("/graphs/activities/anotheruser/testcases")
self.assertStatus('200 OK')
self.assertInBody("Activities")
# Remove admin right
admin = self.app.userdb.get_user('admin')
admin.is_admin = 0
# Browse admin's repos
self.getPage("/graphs/activities/anotheruser/testcases")
self.assertStatus('403 Forbidden')
if __name__ == "__main__": if __name__ == "__main__":
# import sys;sys.argv = ['', 'Test.testName'] # import sys;sys.argv = ['', 'Test.testName']
......
...@@ -57,6 +57,24 @@ class HistoryPageTest(WebCase): ...@@ -57,6 +57,24 @@ class HistoryPageTest(WebCase):
self._history(self.REPO, 50) self._history(self.REPO, 50)
self.assertNotInBody("Show more") self.assertNotInBody("Show more")
def test_as_another_user(self):
# Create a nother user with admin right
user_obj = self.app.userdb.add_user('anotheruser', 'password')
user_obj.user_root = self.app.testcases
user_obj.repos = ['testcases']
self.getPage("/history/anotheruser/testcases")
self.assertStatus('200 OK')
# Remove admin right
admin = self.app.userdb.get_user('admin')
admin.is_admin = 0
# Browse admin's repos
self.getPage("/history/anotheruser/testcases")
self.assertStatus('403 Forbidden')
if __name__ == "__main__": if __name__ == "__main__":
# import sys;sys.argv = ['', 'Test.testName'] # import sys;sys.argv = ['', 'Test.testName']
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
......
...@@ -38,7 +38,7 @@ class LoginPageTest(WebCase): ...@@ -38,7 +38,7 @@ class LoginPageTest(WebCase):
""" """
self.getPage('/') self.getPage('/')
self.assertStatus('200 OK') self.assertStatus('200 OK')
self.assertInBody('login') self.assertInBody('Enter your username and password to log in.')
def test_getpage_with_plaintext(self): def test_getpage_with_plaintext(self):
""" """
...@@ -46,7 +46,7 @@ class LoginPageTest(WebCase): ...@@ -46,7 +46,7 @@ class LoginPageTest(WebCase):
""" """
self.getPage('/', headers=[("Accept", "text/plain")]) self.getPage('/', headers=[("Accept", "text/plain")])
self.assertStatus('200 OK') self.assertStatus('200 OK')
self.assertInBody('login') self.assertInBody('Enter your username and password to log in.')
def test_getpage_with_redirect_get(self): def test_getpage_with_redirect_get(self):
""" """
...@@ -119,6 +119,14 @@ class LoginPageTest(WebCase): ...@@ -119,6 +119,14 @@ class LoginPageTest(WebCase):
self.assertStatus('200 OK') self.assertStatus('200 OK')
self.assertInBody('Invalid username or password.') self.assertInBody('Invalid username or password.')
def test_getpage_admin(self):
"""
Access to admin area without session should redirect to login page.
"""
self.getPage('/admin/')
self.assertStatus('200 OK')
self.assertInBody('Enter your username and password to log in.')
def test_getapi_without_authorization(self): def test_getapi_without_authorization(self):
""" """
Check if 401 is return when authorization is not provided. Check if 401 is return when authorization is not provided.
......