"""UI views for the projectroles app"""
import json
import logging
import re
from ipaddress import ip_address, ip_network
from urllib.parse import unquote_plus, urlparse
from django.apps import apps
from django.conf import settings
from django.contrib import auth, messages
from django.contrib.auth.mixins import AccessMixin
from django.core.exceptions import ImproperlyConfigured
from django.db import transaction
from django.db.models import QuerySet
from django.http import Http404
from django.shortcuts import redirect
from django.urls import resolve, reverse, reverse_lazy
from django.utils import timezone
from django.utils.safestring import mark_safe
from django.views.generic import (
TemplateView,
DetailView,
UpdateView,
CreateView,
DeleteView,
View,
)
from django.views.generic.edit import ModelFormMixin, FormView
from django.views.generic.detail import ContextMixin
from rules.contrib.views import PermissionRequiredMixin, redirect_to_login
from projectroles import email
from projectroles.app_settings import AppSettingAPI
from projectroles.forms import (
ProjectForm,
RoleAssignmentForm,
ProjectInviteForm,
RemoteSiteForm,
RoleAssignmentOwnerTransferForm,
LocalUserForm,
)
from projectroles.models import (
Project,
Role,
RoleAssignment,
ProjectInvite,
RemoteSite,
RemoteProject,
SODAR_CONSTANTS,
ROLE_RANKING,
)
from projectroles.plugins import (
get_active_plugins,
get_app_plugin,
get_backend_api,
)
from projectroles.remote_projects import RemoteProjectAPI
from projectroles.utils import get_display_name
app_settings = AppSettingAPI()
logger = logging.getLogger(__name__)
User = auth.get_user_model()
# SODAR constants
PROJECT_TYPE_PROJECT = SODAR_CONSTANTS['PROJECT_TYPE_PROJECT']
PROJECT_TYPE_CATEGORY = SODAR_CONSTANTS['PROJECT_TYPE_CATEGORY']
PROJECT_ROLE_OWNER = SODAR_CONSTANTS['PROJECT_ROLE_OWNER']
PROJECT_ROLE_DELEGATE = SODAR_CONSTANTS['PROJECT_ROLE_DELEGATE']
PROJECT_ROLE_GUEST = SODAR_CONSTANTS['PROJECT_ROLE_GUEST']
PROJECT_ROLE_FINDER = SODAR_CONSTANTS['PROJECT_ROLE_FINDER']
SITE_MODE_TARGET = SODAR_CONSTANTS['SITE_MODE_TARGET']
SITE_MODE_SOURCE = SODAR_CONSTANTS['SITE_MODE_SOURCE']
SITE_MODE_PEER = SODAR_CONSTANTS['SITE_MODE_PEER']
REMOTE_LEVEL_NONE = SODAR_CONSTANTS['REMOTE_LEVEL_NONE']
REMOTE_LEVEL_VIEW_AVAIL = SODAR_CONSTANTS['REMOTE_LEVEL_VIEW_AVAIL']
REMOTE_LEVEL_READ_INFO = SODAR_CONSTANTS['REMOTE_LEVEL_READ_INFO']
REMOTE_LEVEL_READ_ROLES = SODAR_CONSTANTS['REMOTE_LEVEL_READ_ROLES']
REMOTE_LEVEL_REVOKED = SODAR_CONSTANTS['REMOTE_LEVEL_REVOKED']
APP_SETTING_SCOPE_PROJECT = SODAR_CONSTANTS['APP_SETTING_SCOPE_PROJECT']
APP_SETTING_SCOPE_PROJECT_USER = SODAR_CONSTANTS[
'APP_SETTING_SCOPE_PROJECT_USER'
]
PROJECT_ACTION_CREATE = SODAR_CONSTANTS['PROJECT_ACTION_CREATE']
PROJECT_ACTION_UPDATE = SODAR_CONSTANTS['PROJECT_ACTION_UPDATE']
# Local constants
APP_NAME = 'projectroles'
SEND_EMAIL = settings.PROJECTROLES_SEND_EMAIL
PROJECT_COLUMN_COUNT = 2 # Default columns
LOGIN_MSG = 'Please log in.'
NO_AUTH_MSG = 'User not authorized for requested action.'
NO_AUTH_LOGIN_MSG = 'Authentication required, please log in.'
FORM_INVALID_MSG = 'Form submission failed, see the form for details.'
PROJECT_WELCOME_MSG = (
'Welcome to {project_type} "{project_title}". You have been assigned the '
'role of {role}.'
)
CAT_ARCHIVE_ERR_MSG = 'Setting archival is not allowed for {}.'.format(
get_display_name(PROJECT_TYPE_CATEGORY, plural=True)
)
USER_PROFILE_UPDATE_MSG = 'User profile updated, please log in again.'
USER_PROFILE_LDAP_MSG = 'Profile editing not allowed for LDAP users.'
INVITE_NOT_FOUND_MSG = 'Invite not found.'
INVITE_LDAP_LOCAL_VIEW_MSG = (
'Invite was issued for LDAP user, but local invite view was requested.'
)
INVITE_LOCAL_NOT_ALLOWED_MSG = 'Local users are not allowed.'
INVITE_LOGGED_IN_ACCEPT_MSG = (
'Logged in user is not allowed to accept invites for other users.'
)
INVITE_USER_NOT_EQUAL_MSG = (
'Invited user exists, but logged in user is not invited user.'
)
INVITE_USER_EXISTS_MSG = (
'User with that email already exists. Please login to accept the invite.'
)
ROLE_CREATE_MSG = 'Membership granted with the role of "{role}".'
ROLE_UPDATE_MSG = 'Member role changed to "{role}".'
SEARCH_DICT_DEPRECATE_MSG = (
'Results from search() as a dict have been deprecated and support will be '
'removed in v1.1. Provide results as a list of PluginSearchResult objects '
'instead.'
)
TARGET_CREATE_DISABLED_MSG = (
'PROJECTROLES_TARGET_CREATE=False, creation not allowed.'
)
# General UI view mixins -------------------------------------------------------
[docs]
class LoginRequiredMixin(AccessMixin):
"""
Override of Django LoginRequiredMixin to handle anonymous access and kiosk
mode.
"""
def is_login_required(self):
if getattr(settings, 'PROJECTROLES_KIOSK_MODE', False) or getattr(
settings, 'PROJECTROLES_ALLOW_ANONYMOUS', False
):
return False
return True
def dispatch(self, request, *args, **kwargs):
if self.is_login_required() and not request.user.is_authenticated:
return self.handle_no_permission()
return super().dispatch(request, *args, **kwargs)
[docs]
class LoggedInPermissionMixin(PermissionRequiredMixin):
"""
Mixin for handling redirection for both unlogged users and authenticated
users without permissions.
"""
#: No permission message custom override
no_perm_message = None
#: No permission message Django messages level
no_perm_message_level = 'error'
[docs]
def has_permission(self):
"""
Override for this mixin also to work with admin users without a
permission object.
"""
if getattr(settings, 'PROJECTROLES_KIOSK_MODE', False):
return True
try:
return super().has_permission()
except AttributeError:
if self.request.user.is_superuser:
return True
return False
[docs]
def add_no_perm_message(self):
"""
Add Django in the UI if handle_no_permission() fails. This can be
overridden to implement specific logic in a view if e.g. a different
message should be displayed depending on the referring view.
"""
level = self.no_perm_message_level.lower()
msg_method = getattr(messages, level, None)
if not msg_method:
raise ValueError('Unknown message level "{}"'.format(level))
if self.no_perm_message:
msg = self.no_perm_message
elif self.request.user.is_authenticated:
msg = NO_AUTH_MSG
else:
msg = NO_AUTH_LOGIN_MSG
msg_method(self.request, msg)
[docs]
def handle_no_permission(self):
"""
Handle no permission and redirect user. If custom message is specified
using self.login_message, it will be displayed.
"""
self.add_no_perm_message()
if self.request.user.is_authenticated:
return redirect(reverse('home'))
return redirect_to_login(self.request.get_full_path())
[docs]
class ProjectAccessMixin:
"""Mixin for providing access to a Project object from request kwargs"""
#: Model class to use for projects. Can be overridden by e.g. a proxy model
project_class = Project
[docs]
def get_project(self, request=None, kwargs=None):
"""
Return SODAR Project object based or None if not found, based on
the current request and view kwargs. If arguments are not provided,
uses self.request and/or self.kwargs.
:param request: Request object (optional)
:param kwargs: View kwargs (optional)
:return: Object of project_class or None if not found
"""
request = request or getattr(self, 'request')
kwargs = kwargs or getattr(self, 'kwargs')
if kwargs is None:
raise ImproperlyConfigured('View kwargs are not accessible')
# Project class object
if 'project' in kwargs:
return self.project_class.objects.filter(
sodar_uuid=kwargs['project']
).first()
# Other object types
if not request:
raise ImproperlyConfigured('Current HTTP request is not accessible')
model = None
uuid_val = None
for k, v in kwargs.items():
if not re.match(r'^[0-9a-f-]+$', str(v)):
continue
try:
if '__' in k: # Model from another app
ks = k.split('__')
model = apps.get_model(ks[0], ks[1])
else: # Model from the same app
app_name = resolve(request.path).app_name
if app_name.find('.') != -1:
app_name = app_name.split('.')[0]
model = apps.get_model(app_name, k)
uuid_val = k
break
except LookupError:
pass
if not model:
return None
try:
obj = model.objects.get(sodar_uuid=kwargs[uuid_val])
if hasattr(obj, 'project'):
return obj.project
# Some objects may have a get_project() func instead of foreignkey
elif hasattr(obj, 'get_project') and callable(
getattr(obj, 'get_project', None)
):
return obj.get_project()
except model.DoesNotExist:
return None
[docs]
class ProjectPermissionMixin(PermissionRequiredMixin, ProjectAccessMixin):
"""
Mixin for providing a Project object and queryset for permission
checking.
"""
[docs]
def get_permission_object(self):
return self.get_project()
[docs]
def has_permission(self):
"""Overrides for project permission access"""
project = self.get_project()
if not project:
raise Http404
# Override permissions for superuser, owner or delegate
perm_override = (
self.request.user.is_superuser
or project.is_owner_or_delegate(self.request.user)
)
if not perm_override and app_settings.get(
'projectroles', 'ip_restrict', project
):
for k in (
'HTTP_X_FORWARDED_FOR',
'X_FORWARDED_FOR',
'FORWARDED',
'REMOTE_ADDR',
):
v = self.request.META.get(k)
if v:
client_address = ip_address(v.split(',')[0])
break
else: # Can't fetch client ip address
return False
for record in app_settings.get(
'projectroles', 'ip_allowlist', project
):
if '/' in record:
if client_address in ip_network(record):
break
elif client_address == ip_address(record):
break
else:
return False
# Disable project app access for categories unless specifically enabled
if project.type == PROJECT_TYPE_CATEGORY:
request_url = resolve(self.request.get_full_path())
if request_url.app_name != APP_NAME:
app_plugin = get_app_plugin(request_url.app_name)
if app_plugin and app_plugin.category_enable:
return True
return False
# Disable access for non-owner/delegate if remote project is revoked
if project.is_revoked() and not perm_override:
return False
return super().has_permission()
[docs]
def get_queryset(self, *args, **kwargs):
"""
Override get_queryset() to filter down to the currently selected object.
"""
qs = super().get_queryset(*args, **kwargs)
if qs.model == ProjectAccessMixin.project_class:
return qs
elif hasattr(qs.model, 'get_project_filter_key'):
return qs.filter(
**{qs.model.get_project_filter_key(): self.get_project()}
)
elif hasattr(qs.model, 'project') or hasattr(qs.model, 'get_project'):
return qs.filter(project=self.get_project())
raise AttributeError(
'Model does not have "project" member, get_project() method or '
'get_project_filter_key() method'
)
class HTTPRefererMixin:
"""
Mixin for updating a correct referer url in session cookie regardless of
page reload.
"""
def get(self, request, *args, **kwargs):
if 'HTTP_REFERER' in request.META:
referer = request.META['HTTP_REFERER']
if (
'real_referer' not in request.session
or referer != request.build_absolute_uri()
):
request.session['real_referer'] = referer
return super().get(request, *args, **kwargs)
[docs]
class PluginContextMixin(ContextMixin):
"""Mixin for adding plugin list to context data"""
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
context['app_plugins'] = get_active_plugins(
plugin_type='project_app', custom_order=True
)
return context
[docs]
class ProjectContextMixin(
HTTPRefererMixin, PluginContextMixin, ProjectAccessMixin
):
"""
Mixin for adding context data to Project base view and other views
extending it. Includes HTTPRefererMixin for correct referer URL.
"""
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
# Project
if hasattr(self, 'object') and isinstance(self.object, Project):
context['project'] = self.get_object()
elif hasattr(self, 'object') and hasattr(self.object, 'project'):
context['project'] = self.object.project
else:
context['project'] = self.get_project()
# Project tagging/starring
if 'project' in context and not getattr(
settings, 'PROJECTROLES_KIOSK_MODE', False
):
context['project_starred'] = app_settings.get(
'projectroles',
'project_star',
context['project'],
self.request.user,
)
return context
class CurrentUserFormMixin:
"""Mixin for passing current user to form as current_user"""
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs.update({'current_user': self.request.user})
return kwargs
# Projectroles Internal UI view mixins -----------------------------------------
class ProjectModifyPermissionMixin(
LoggedInPermissionMixin, ProjectPermissionMixin
):
"""
Mixin for handling access to project modifying views, denying access even
for local superusers if the project is remote and thus immutable.
"""
def has_permission(self):
"""Override has_permission() to check remote project status"""
perm = super().has_permission()
project = self.get_project()
return (
False
if project.is_remote() and not self._get_allow_remote_edit()
else perm
)
def _get_allow_remote_edit(self):
return getattr(self, 'allow_remote_edit', False)
def handle_no_permission(self):
"""Override handle_no_permission to redirect user"""
project = self.get_project()
if project and project.is_remote():
messages.error(
self.request,
'Modifications are not allowed for remote {}.'.format(
get_display_name(PROJECT_TYPE_PROJECT, plural=True)
),
)
return redirect(reverse('home'))
elif self.request.user.is_authenticated:
messages.error(self.request, NO_AUTH_MSG)
return redirect(reverse('home'))
else:
messages.error(self.request, NO_AUTH_LOGIN_MSG)
return redirect_to_login(self.request.get_full_path())
class RolePermissionMixin(ProjectModifyPermissionMixin):
"""
Mixin to ensure permissions for RoleAssignment according to user role in
project.
"""
def has_permission(self):
"""Override has_permission to check perms depending on role"""
if not super().has_permission():
return False
try:
obj = RoleAssignment.objects.get(
sodar_uuid=self.kwargs['roleassignment']
)
if obj.role.name == PROJECT_ROLE_OWNER:
return False
elif obj.role.name == PROJECT_ROLE_DELEGATE:
return self.request.user.has_perm(
'projectroles.update_project_delegate',
self.get_permission_object(),
)
else:
return self.request.user.has_perm(
'projectroles.update_project_members',
self.get_permission_object(),
)
except RoleAssignment.DoesNotExist:
return False
def get_permission_object(self):
"""Override get_permission_object for checking Project permission"""
return self.get_project()
class ProjectListContextMixin:
"""Mixin for adding context data for displaying the project list."""
def _get_custom_cols(self, user):
"""
Return list of custom columns for projects including project data.
:param user: User object
"""
i = 0
cols = []
for app_plugin in [
ap
for ap in get_active_plugins(plugin_type='project_app')
if ap.project_list_columns
]:
# HACK for filesfolders columns (see issues #737 and #738)
if app_plugin.name == 'filesfolders' and not getattr(
settings, 'FILESFOLDERS_SHOW_LIST_COLUMNS', False
):
continue
for k, v in app_plugin.project_list_columns.items():
if not v['active']:
continue
v['app_plugin'] = app_plugin
v['column_id'] = k
v['ordering'] = v.get('ordering') or i
cols.append(v)
i += 1
return sorted(cols, key=lambda x: x['ordering'])
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
context['project_custom_cols'] = self._get_custom_cols(
self.request.user
)
context['project_col_count'] = PROJECT_COLUMN_COUNT + len(
context['project_custom_cols']
)
return context
# General Views ----------------------------------------------------------------
class HomeView(
LoginRequiredMixin,
PluginContextMixin,
ProjectListContextMixin,
TemplateView,
):
"""Home view"""
template_name = 'projectroles/home.html'
# General Project Views --------------------------------------------------------
class ProjectDetailView(
LoginRequiredMixin,
LoggedInPermissionMixin,
ProjectPermissionMixin,
ProjectListContextMixin,
ProjectContextMixin,
DetailView,
):
"""Project details view"""
permission_required = 'projectroles.view_project'
model = Project
slug_url_kwarg = 'project'
slug_field = 'sodar_uuid'
def add_no_perm_message(self):
"""
Override add_login_message() to display a different message when
redirected from invite accept view as a new user.
"""
referer_url = self.request.META.get('HTTP_REFERER')
if not referer_url:
super().add_no_perm_message()
return
referer_path = urlparse(referer_url).path
resolved_path = resolve(referer_path)
if resolved_path.url_name.startswith('invite_process_'):
messages.info(self.request, LOGIN_MSG)
return
super().add_no_perm_message()
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
if self.request.user.is_superuser:
context['role'] = None
elif self.request.user.is_authenticated:
try:
role_as = RoleAssignment.objects.get(
user=self.request.user, project=self.object
)
context['role'] = role_as.role
except RoleAssignment.DoesNotExist:
context['role'] = None
elif self.object.public_guest_access:
context['role'] = Role.objects.filter(
name=PROJECT_ROLE_GUEST
).first()
else:
context['role'] = None
# Remote projects
q_kwargs = {
'project_uuid': self.object.sodar_uuid,
'level': REMOTE_LEVEL_READ_ROLES,
}
if not self.request.user.has_perm(
'projectroles.view_hidden_projects', self.object
):
q_kwargs['site__user_display'] = True
if settings.PROJECTROLES_SITE_MODE == SITE_MODE_SOURCE:
context['target_projects'] = RemoteProject.objects.filter(
site__mode=SITE_MODE_TARGET, **q_kwargs
).order_by('site__name')
elif settings.PROJECTROLES_SITE_MODE == SITE_MODE_TARGET:
context['peer_projects'] = RemoteProject.objects.filter(
site__mode=SITE_MODE_PEER, **q_kwargs
).order_by('site__name')
return context
# Search Views -----------------------------------------------------------------
class ProjectSearchMixin:
"""Common functionalities for search views"""
def _get_app_results(
self, user, search_terms, search_type, search_keywords
):
"""
Return app plugin search results.
:param search_terms: Search terms (list of strings)
:param search_type: Optional type keyword for search (string or None)
:param search_keywords: Optional keywords (list of strings or None)
:return: List
"""
plugins = get_active_plugins(plugin_type='project_app')
ret = []
omit_apps_list = getattr(settings, 'PROJECTROLES_SEARCH_OMIT_APPS', [])
search_apps = sorted(
[
p
for p in plugins
if (p.search_enable and p.name not in omit_apps_list)
],
key=lambda x: x.plugin_ordering,
)
if search_type:
search_apps = [
p for p in search_apps if search_type in p.search_types
]
for plugin in search_apps:
search_kwargs = {
'user': user,
'search_type': search_type,
'search_terms': search_terms,
'keywords': search_keywords,
}
search_res = {
'plugin': plugin,
'results': None,
'error': None,
'has_results': False,
}
try:
search_res['results'] = plugin.search(**search_kwargs)
if isinstance(search_res['results'], dict):
# TODO: Remove in v1.1 (see #1400)
logger.warning(SEARCH_DICT_DEPRECATE_MSG)
for v in search_res['results'].values():
items = v.get('items')
if items and (
(isinstance(items, QuerySet) and items.count() > 0)
or (isinstance(items, list) and len(items) > 0)
):
search_res['has_results'] = True
break
else:
for r in search_res['results']:
if r.items and (
(
isinstance(r.items, QuerySet)
and r.items.count() > 0
)
or (isinstance(r.items, list) and len(r.items) > 0)
):
search_res['has_results'] = True
break
# Build results into dict for easier use in templates
search_res['results'] = {
r.category: r for r in search_res['results']
}
except Exception as ex:
if settings.DEBUG:
raise ex
search_res['error'] = str(ex)
logger.error(
'Exception raised by search() in {}: "{}" ({})'.format(
plugin.name,
ex,
'; '.join(
[
'{}={}'.format(k, v)
for k, v in search_kwargs.items()
]
),
)
)
ret.append(search_res)
return ret
def _get_not_found(self, search_type, project_results, app_results):
"""
Return list of apps for which objects were search for but not returned.
:param search_type: Type keyword for search if set
:param project_results: Results for projectroles search
:param app_results: Results for app plugin search
:return: List
"""
ret = []
if len(project_results) == 0 and (
not search_type or search_type == 'project'
):
ret.append('Projects')
for results in [a['results'] for a in app_results]:
if not results:
continue
for k, r in results.items():
if isinstance(r, dict):
# TODO: Remove in v1.1 (see #1400)
type_match = True if search_type else False
if (
not type_match
and 'search_types' in r
and search_type in r['search_types']
):
type_match = True
if (type_match or not search_type) and (not r['items']):
ret.append(r['title'])
else:
type_match = True if search_type else False
if not type_match and search_type in r.search_types:
type_match = True
if (type_match or not search_type) and (not r.items):
ret.append(r.title)
return ret
def dispatch(self, request, *args, **kwargs):
if not getattr(settings, 'PROJECTROLES_ENABLE_SEARCH', False):
messages.error(request, 'Search is not enabled.')
return redirect('home')
return super().dispatch(request, *args, **kwargs)
class ProjectSearchResultsView(
LoginRequiredMixin, ProjectSearchMixin, TemplateView
):
"""View for displaying results of search within projects"""
template_name = 'projectroles/search_results.html'
def _handle_context(self, request, *args, **kwargs):
"""Handle context and render to response in GET/POST requests"""
context = self.get_context_data(*args, **kwargs)
if not context['search_terms']:
messages.error(request, 'No search terms provided.')
return redirect(reverse('home'))
return super().render_to_response(context)
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(**kwargs)
search_input = ''
search_terms = []
search_type = None
keyword_input = []
search_keywords = {}
if self.request.POST.get('m'): # Multi search
search_terms = [
t.strip()
for t in self.request.POST['m'].strip().split('\r\n')
if len(t.strip()) >= 3
]
if self.request.POST.get('k'):
keyword_input = self.request.POST['k'].strip().split(' ')
elif self.request.GET.get('s'): # Single search
search_input = self.request.GET.get('s').strip()
search_split = search_input.split(' ')
search_term = search_split[0].strip()
for i in range(1, len(search_split)):
s = search_split[i].strip()
if ':' in s:
keyword_input.append(s)
elif s != '':
search_term += ' ' + s.lower()
if search_term:
search_terms = [search_term]
search_terms = list(dict.fromkeys(search_terms)) # Remove dupes
for s in keyword_input:
kw = s.split(':')[0].lower().strip()
val = s.split(':')[1].lower().strip()
if kw == 'type':
search_type = val
else:
search_keywords[kw] = val
context['search_input'] = search_input
context['search_terms'] = search_terms
context['search_type'] = search_type
context['search_keywords'] = search_keywords
# Get project results
if not search_type or search_type == 'project':
context['project_results'] = []
for p in Project.objects.find(search_terms, project_type='PROJECT'):
if p.public_guest_access or self.request.user.has_perm(
'projectroles.view_project', p
):
context['project_results'].append(p)
elif self.request.user.is_authenticated and p.parent:
parent_as = p.parent.get_role(self.request.user)
if (
parent_as
and parent_as.role.rank
>= ROLE_RANKING[PROJECT_ROLE_FINDER]
):
context['project_results'].append(p)
# Get app results
context['app_results'] = self._get_app_results(
self.request.user, search_terms, search_type, search_keywords
)
# List apps for which no results were found
context['not_found'] = self._get_not_found(
search_type,
context.get('project_results') or [],
context['app_results'],
)
return context
def get(self, request, *args, **kwargs):
return self._handle_context(request, *args, *kwargs)
def post(self, request, *args, **kwargs):
return self._handle_context(request, *args, *kwargs)
class ProjectAdvancedSearchView(
LoginRequiredMixin, ProjectSearchMixin, TemplateView
):
"""View for displaying advanced search form"""
template_name = 'projectroles/search_advanced.html'
def post(self, request, *args, **kwargs):
return ProjectSearchResultsView.as_view()(request)
# Project Editing Views --------------------------------------------------------
class ProjectModifyPluginViewMixin:
"""Helpers for project modify API"""
@classmethod
def call_project_modify_api(cls, method_name, revert_name, method_args):
"""
Call project modify API for a specific method and parameters. This
method Will run reversion methods for all plugins if execution for one
fails.
:param method_name: Name of execution method in plugin (string)
:param revert_name: Name of revert method in plugin (string or None)
:param method_args: Arguments to be passed for the methods (list)
:raise: Exception if execution for a plugin fails.
"""
modify_api_apps = getattr(settings, 'PROJECTROLES_MODIFY_API_APPS', [])
app_plugins = []
if modify_api_apps:
for a in modify_api_apps:
plugin = get_app_plugin(a)
if not plugin:
msg = 'Unable to find active plugin "{}"'.format(a)
logger.error(msg)
raise ImproperlyConfigured(msg)
app_plugins.append(plugin)
else:
app_plugins = get_active_plugins('backend') + get_active_plugins(
'project_app'
)
called_plugins = []
for p in app_plugins:
if not hasattr(p, method_name):
continue # Only there if using ProjectModifyPluginAPIMixin
logger.debug(
'Calling {}() in plugin "{}"'.format(method_name, p.name)
)
try:
getattr(p, method_name)(*method_args)
called_plugins.append(p)
except Exception as ex:
logger.error(
'Exception in {}() for plugin "{}": {}'.format(
method_name, p.name, ex
)
)
if revert_name:
for cp in called_plugins:
try:
cp.getattr(revert_name)(*method_args)
except Exception as ex_revert:
logger.error(
'Exception in {}() for plugin "{}": {}'.format(
method_name, cp.name, ex_revert
)
)
raise ex
class ProjectModifyMixin(ProjectModifyPluginViewMixin):
"""Mixin for Project creation/updating in UI and API views"""
#: Remote site fields
site_fields = {}
@staticmethod
def _get_old_project_data(project):
"""Get existing data from project fields"""
return {
'title': project.title,
'parent': project.parent,
'description': project.description,
'readme': project.readme.raw,
'owner': project.get_owner().user,
'public_guest_access': project.public_guest_access,
}
@classmethod
def _get_remote_project_data(cls, project):
"""Return existing remote project data"""
ret = {}
existing_sites = []
for rp in RemoteProject.objects.filter(project=project):
ret[str(rp.site.sodar_uuid)] = rp.level == REMOTE_LEVEL_READ_ROLES
existing_sites.append(rp.site.sodar_uuid)
# Sites not yet added
for rs in RemoteSite.objects.filter(
mode=SITE_MODE_TARGET, user_display=True, owner_modifiable=True
).exclude(sodar_uuid__in=existing_sites):
ret[str(rs.sodar_uuid)] = False
return ret
@staticmethod
def _get_app_settings(data, instance, user):
"""
Return a dictionary of project app settings and their values.
:param data: Cleaned form data
:param instance: Existing Project object or None
:param user: User initiating the project update
:return: Dict
"""
app_plugins = [p for p in get_active_plugins() if p.app_settings]
project_settings = {}
p_kwargs = {}
# Show unmodifiable settings to superusers
if user and not user.is_superuser:
p_kwargs['user_modifiable'] = True
for plugin in app_plugins + [None]:
if plugin:
name = plugin.name
p_settings = app_settings.get_definitions(
APP_SETTING_SCOPE_PROJECT, plugin=plugin, **p_kwargs
)
else:
name = 'projectroles'
p_settings = app_settings.get_definitions(
APP_SETTING_SCOPE_PROJECT, plugin_name=name, **p_kwargs
)
for s_key, s_val in p_settings.items():
s_name = 'settings.{}.{}'.format(name, s_key)
s_data = data.get(s_name)
if (
not s_val.get('project_types', None)
and not instance.type == PROJECT_TYPE_PROJECT
or s_val.get('project_types', None)
and instance.type not in s_val['project_types']
):
continue
if s_data is None and not instance:
s_data = app_settings.get_default(name, s_key)
if s_val['type'] == 'JSON':
if s_data is None:
s_data = {}
project_settings[s_name] = json.dumps(s_data)
elif s_data is not None:
project_settings[s_name] = s_data
return project_settings
def _get_project_update_data(
self, old_data, project, old_sites, sites, project_settings
):
extra_data = {}
upd_fields = []
if old_data['title'] != project.title:
extra_data['title'] = project.title
upd_fields.append('title')
if old_data['parent'] != project.parent:
extra_data['parent'] = project.parent
upd_fields.append('parent')
if old_data['description'] != project.description:
extra_data['description'] = project.description
upd_fields.append('description')
if old_data['readme'] != project.readme.raw:
extra_data['readme'] = project.readme.raw
upd_fields.append('readme')
if old_data['public_guest_access'] != project.public_guest_access:
extra_data['public_guest_access'] = project.public_guest_access
upd_fields.append('public_guest_access')
# Remote projects
if (
settings.PROJECTROLES_SITE_MODE == SITE_MODE_SOURCE
and project.type == PROJECT_TYPE_PROJECT
):
for s in [f.split('.')[1] for f in self.site_fields]:
if 'remote_sites' not in upd_fields and (
s not in old_sites or bool(old_sites[s]) != bool(sites[s])
):
upd_fields.append('remote_sites')
if 'remote_sites' in upd_fields:
extra_data['remote_sites'] = {
k: bool(v) for k, v in sites.items()
}
# Settings
for k, v in project_settings.items():
p_name = k.split('.')[1]
s_name = k.split('.')[2]
s_def = app_settings.get_definition(s_name, plugin_name=p_name)
old_v = app_settings.get(p_name, s_name, project)
if s_def['type'] == 'JSON':
v = json.loads(v)
if old_v != v:
extra_data[k] = v
upd_fields.append(k)
return extra_data, upd_fields
@staticmethod
def _get_timeline_ok_status():
timeline = get_backend_api('timeline_backend')
if not timeline:
raise ImproperlyConfigured('Timeline backend not found')
else:
return timeline.TL_STATUS_OK
def _update_remote_sites(self, project, data):
"""Update project for remote sites"""
ret = {}
for f in self.site_fields:
site_uuid = f.split('.')[1]
site = RemoteSite.objects.filter(sodar_uuid=site_uuid).first()
# TODO: Validate site here
value = data[f]
rp = RemoteProject.objects.filter(
site=site, project=project
).first()
modify = None
if rp and (
(value and rp.level != REMOTE_LEVEL_READ_ROLES)
or (not value and rp.level == REMOTE_LEVEL_READ_ROLES)
):
rp.level = (
REMOTE_LEVEL_READ_ROLES if value else REMOTE_LEVEL_REVOKED
)
rp.save()
modify = 'Updated'
elif not rp and value: # Only create if value is True
rp = RemoteProject.objects.create(
project_uuid=project.sodar_uuid,
project=project,
site=site,
level=REMOTE_LEVEL_READ_ROLES,
)
modify = 'Created'
if modify:
logger.debug(
'{} RemoteProject for site "{}" ({}): {}'.format(
modify, site.name, site.sodar_uuid, rp.level
)
)
ret[site_uuid] = rp and rp.level == REMOTE_LEVEL_READ_ROLES
return ret
@classmethod
def _update_settings(cls, project, project_settings):
"""Update project settings"""
is_remote = project.is_remote()
for k, v in project_settings.items():
_, plugin_name, setting_name = k.split('.', 3)
# Skip updating global settings on target site
if is_remote:
# TODO: Optimize (this can require a lot of queries)
s_def = app_settings.get_definition(
setting_name, plugin_name=plugin_name
)
if app_settings.get_global_value(s_def):
continue
app_settings.set(
plugin_name=k.split('.')[1],
setting_name=k.split('.')[2],
value=v,
project=project,
validate=True,
)
def _create_timeline_event(
self,
project,
action,
owner,
old_data,
old_sites,
sites,
project_settings,
request,
):
timeline = get_backend_api('timeline_backend')
if not timeline:
return None
type_str = project.type.capitalize()
if action == PROJECT_ACTION_CREATE:
tl_desc = 'create ' + type_str.lower() + ' with {owner} as owner'
extra_data = {
'title': project.title,
'owner': owner.username,
'description': project.description,
'readme': project.readme.raw,
}
# Add settings to extra data
for k, v in project_settings.items():
p_name = k.split('.')[1]
s_name = k.split('.')[2]
s_def = app_settings.get_definition(s_name, plugin_name=p_name)
if s_def['type'] == 'JSON':
v = json.loads(v)
extra_data[k] = v
else: # Update
tl_desc = 'update ' + type_str.lower()
extra_data, upd_fields = self._get_project_update_data(
old_data, project, old_sites, sites, project_settings
)
if extra_data.get('parent'): # Convert parent object into UUID
extra_data['parent'] = str(extra_data['parent'].sodar_uuid)
if len(upd_fields) > 0:
tl_desc += ' (' + ', '.join(x for x in upd_fields) + ')'
tl_event = timeline.add_event(
project=project,
app_name=APP_NAME,
user=request.user,
event_name='project_{}'.format(action.lower()),
description=tl_desc,
extra_data=extra_data,
)
if action == PROJECT_ACTION_CREATE:
tl_event.add_object(owner, 'owner', owner.username)
return tl_event
@classmethod
def _notify_users(cls, project, action, owner, old_parent, request):
"""
Notify users about project creation and update. Displays app alerts
and/or sends emails depending on the site configuration.
"""
app_alerts = get_backend_api('appalerts_backend')
# Create alerts and send emails
owner_as = RoleAssignment.objects.filter(
project=project, user=owner
).first()
# Owner change notification
if request.user != owner and action == PROJECT_ACTION_CREATE:
if app_alerts:
app_alerts.add_alert(
app_name=APP_NAME,
alert_name='role_create',
user=owner,
message=ROLE_CREATE_MSG.format(
project=project.title, role=owner_as.role.name
),
url=reverse(
'projectroles:detail',
kwargs={'project': project.sodar_uuid},
),
project=project,
)
if SEND_EMAIL and app_settings.get(
APP_NAME, 'notify_email_role', user=owner
):
email.send_role_change_mail(
action.lower(),
project,
owner,
owner_as.role,
request,
)
# Project creation/moving for parent category owner
if (
project.parent
and project.parent.get_owner().user != owner_as.user
and project.parent.get_owner().user != request.user
):
parent_owner = project.parent.get_owner().user
parent_owner_email = app_settings.get(
APP_NAME, 'notify_email_project', user=parent_owner
)
if action == PROJECT_ACTION_CREATE and request.user != parent_owner:
if app_alerts:
app_alerts.add_alert(
app_name=APP_NAME,
alert_name='project_create_parent',
user=parent_owner,
message='New {} created under category '
'"{}": "{}".'.format(
project.type.lower(),
project.parent.title,
project.title,
),
url=reverse(
'projectroles:detail',
kwargs={'project': project.sodar_uuid},
),
project=project,
)
if SEND_EMAIL and parent_owner_email:
email.send_project_create_mail(project, request)
elif old_parent and request.user != parent_owner:
app_alerts.add_alert(
app_name=APP_NAME,
alert_name='project_move_parent',
user=parent_owner,
message='{} moved under category '
'"{}": "{}".'.format(
project.type.capitalize(),
project.parent.title,
project.title,
),
url=reverse(
'projectroles:detail',
kwargs={'project': project.sodar_uuid},
),
project=project,
)
if SEND_EMAIL and parent_owner_email:
email.send_project_move_mail(project, request)
@transaction.atomic
def modify_project(self, data, request, instance=None):
"""
Create or update a Project. This method should be called either in
form_valid() in a Django form view or save() in a DRF serializer.
:param data: Cleaned data from a form or serializer
:param request: Request initiating the action
:param instance: Existing Project object or None
:return: Created or updated Project object
"""
action = PROJECT_ACTION_UPDATE if instance else PROJECT_ACTION_CREATE
old_data = {}
old_project = None
if instance:
project = instance
# In case of a PATCH request, get existing obj to fill out fields
old_project = Project.objects.get(sodar_uuid=instance.sodar_uuid)
old_data = self._get_old_project_data(old_project) # Store old data
project.title = data.get('title') or old_project.title
project.description = (
data.get('description') or old_project.description
)
project.type = data.get('type') or old_project.type
project.readme = data.get('readme') or old_project.readme
# NOTE: Must do this as parent can exist but be None
project.parent = (
data['parent'] if 'parent' in data else old_project.parent
)
project.public_guest_access = (
data.get('public_guest_access') or False
)
else:
project = Project(
title=data.get('title'),
description=data.get('description'),
type=data.get('type'),
readme=data.get('readme'),
parent=data.get('parent'),
public_guest_access=data.get('public_guest_access') or False,
)
project.save()
owner = data.get('owner')
if not owner and old_project: # In case of a PATCH request
owner = old_project.get_owner().user
# Create/update RemoteProject objects
old_sites = {}
sites = {}
if (
settings.PROJECTROLES_SITE_MODE == SITE_MODE_SOURCE
and project.type == PROJECT_TYPE_PROJECT
):
self.site_fields = [f for f in data if f.startswith('remote_site.')]
old_sites = self._get_remote_project_data(project)
sites = self._update_remote_sites(project, data)
# Get app settings, store old settings
project_settings = self._get_app_settings(data, project, request.user)
old_settings = None
if action == PROJECT_ACTION_UPDATE:
old_settings = json.loads(json.dumps(project_settings)) # Copy
# Create timeline event
tl_event = self._create_timeline_event(
project,
action,
owner,
old_data,
old_sites,
sites,
project_settings,
request,
)
# Get old parent for project moving
old_parent = (
old_project.parent
if old_project
and old_project.parent
and old_project.parent != project.parent
else None
)
# Update owner and settings
if action == PROJECT_ACTION_CREATE:
RoleAssignment.objects.create(
project=project,
user=owner,
role=Role.objects.get(name=PROJECT_ROLE_OWNER),
)
self._update_settings(project, project_settings)
project.save() # TODO: Is this required anymore?
# Call for additional actions for project creation/update in plugins
if getattr(settings, 'PROJECTROLES_ENABLE_MODIFY_API', False):
args = [project, action, project_settings]
if action == PROJECT_ACTION_UPDATE:
args += [old_data, old_settings]
else:
args += [None, None]
args.append(request)
self.call_project_modify_api(
'perform_project_modify', 'revert_project_modify', args
)
# If public access was updated, update has_public_children for parents
if (
old_project
and project.parent
and old_project.public_guest_access != project.public_guest_access
):
try:
project._update_public_children()
except Exception as ex:
logger.error(
'Exception in updating has_public_children(): {}'.format(ex)
)
# Once all is done, update timeline event, create alerts and emails
if tl_event:
tl_event.set_status(self._get_timeline_ok_status())
self._notify_users(project, action, owner, old_parent, request)
return project
class ProjectModifyFormMixin(ProjectModifyMixin):
"""Mixin for Project creation/updating in Django form views"""
def form_valid(self, form):
"""Handle project updating if form is valid"""
instance = form.instance if form.instance.pk else None
action = PROJECT_ACTION_UPDATE if instance else PROJECT_ACTION_CREATE
if instance and instance.parent:
redirect_url = reverse(
'projectroles:detail',
kwargs={'project': instance.parent.sodar_uuid},
)
else:
redirect_url = reverse('home')
try:
project = self.modify_project(
data=form.cleaned_data,
request=self.request,
instance=form.instance if instance else None,
)
messages.success(
self.request,
'{} {}d.'.format(
get_display_name(project.type, title=True), action.lower()
),
)
redirect_url = reverse(
'projectroles:detail', kwargs={'project': project.sodar_uuid}
)
except Exception as ex:
messages.error(
self.request,
'Unable to {} {}: {}'.format(
action.lower(), form.cleaned_data['type'].lower(), ex
),
)
if settings.DEBUG:
raise ex
return redirect(redirect_url)
class ProjectCreateView(
LoginRequiredMixin,
LoggedInPermissionMixin,
ProjectPermissionMixin,
ProjectModifyFormMixin,
ProjectContextMixin,
HTTPRefererMixin,
CurrentUserFormMixin,
InvalidFormMixin,
CreateView,
):
"""Project creation view"""
permission_required = 'projectroles.create_project'
model = Project
form_class = ProjectForm
def has_permission(self):
"""
Override has_permission() to ensure even superuser can't create
project under a remote category as target
"""
if not self.kwargs.get('project'):
if self.request.user.is_superuser:
return True # Allow top level project creation for superuser
return False # Disallow for other users
elif (
settings.PROJECTROLES_SITE_MODE == SITE_MODE_TARGET
and self.kwargs.get('project')
):
parent = Project.objects.filter(
sodar_uuid=self.kwargs['project']
).first()
if parent and parent.is_remote():
return False
return super().has_permission()
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
if 'project' in self.kwargs:
context['parent'] = Project.objects.get(
sodar_uuid=self.kwargs['project']
)
return context
def get_form_kwargs(self):
"""Pass URL arguments to form"""
kwargs = super().get_form_kwargs()
kwargs.update(self.kwargs)
return kwargs
def dispatch(self, request, *args, **kwargs):
"""Override dispatch() for target site check"""
# If site is in target mode and target creation is not allowed, redirect
if (
settings.PROJECTROLES_SITE_MODE == SITE_MODE_TARGET
and not settings.PROJECTROLES_TARGET_CREATE
):
messages.error(request, TARGET_CREATE_DISABLED_MSG)
return redirect(reverse('home'))
return super().dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
"""Override get() to limit project creation under other projects"""
if 'project' in self.kwargs:
project = Project.objects.get(sodar_uuid=self.kwargs['project'])
if project.type != PROJECT_TYPE_CATEGORY:
messages.error(
self.request,
'Creating nested {} is not allowed.'.format(
get_display_name(PROJECT_TYPE_PROJECT, plural=True)
),
)
return redirect(
reverse(
'projectroles:detail',
kwargs={'project': project.sodar_uuid},
)
)
return super().get(request, *args, **kwargs)
class ProjectUpdateView(
LoginRequiredMixin,
ProjectModifyPermissionMixin,
ProjectContextMixin,
ProjectModifyFormMixin,
CurrentUserFormMixin,
InvalidFormMixin,
UpdateView,
):
"""Project updating view"""
permission_required = 'projectroles.update_project'
model = Project
form_class = ProjectForm
slug_url_kwarg = 'project'
slug_field = 'sodar_uuid'
allow_remote_edit = True
class ProjectArchiveView(
LoginRequiredMixin,
ProjectModifyPermissionMixin,
ProjectContextMixin,
ProjectModifyPluginViewMixin,
TemplateView,
):
"""Project archiving/unarchiving view"""
template_name = 'projectroles/project_archive_confirm.html'
permission_required = 'projectroles.update_project'
def _alert_users(self, project, action, user):
"""
Alert users on project archiving/unarchiving.
:param project: Project object
:param action: String ("archive" or "unarchive")
:param user: User initiating project archiving/unarchiving
"""
app_alerts = get_backend_api('appalerts_backend')
if not app_alerts:
return
alert_p = get_display_name(PROJECT_TYPE_PROJECT, title=True)
if action == 'archive':
alert_msg = '{} data is now read-only.'.format(alert_p)
else:
alert_msg = '{} data can be modified.'.format(alert_p)
users = [a.user for a in project.get_roles() if a.user != user]
users = list(set(users)) # Remove possible dupes (see issue #710)
if not users:
return
else:
app_alerts.add_alerts(
app_name=APP_NAME,
alert_name='project_{}'.format(action),
users=users,
message='{} {}d by {}. {}'.format(
alert_p, action, user.get_full_name(), alert_msg
),
url=reverse(
'projectroles:detail',
kwargs={'project': project.sodar_uuid},
),
project=project,
)
def get(self, request, *args, **kwargs):
"""Override get() to check project type"""
project = self.get_project()
if project.type != PROJECT_TYPE_PROJECT:
messages.error(request, CAT_ARCHIVE_ERR_MSG)
return redirect(
reverse(
'projectroles:detail',
kwargs={'project': project.sodar_uuid},
)
)
return super().render_to_response(
self.get_context_data(*args, **kwargs)
)
def post(self, request, **kwargs):
"""Override post() to handle POST from confirmation template"""
timeline = get_backend_api('timeline_backend')
project = self.get_project()
redirect_url = reverse(
'projectroles:detail', kwargs={'project': project.sodar_uuid}
)
if project.type == PROJECT_TYPE_CATEGORY:
messages.error(request, CAT_ARCHIVE_ERR_MSG)
return redirect(redirect_url)
status = request.POST.get('status')
if status is None:
messages.error(request, 'Status not set, unable to set archival.')
return redirect(redirect_url)
status = True if status.lower() in ['1', 'true'] else False
action = 'unarchive' if not status else 'archive'
if project.archive == status:
messages.warning(
request,
'{} is already {}d.'.format(
get_display_name(project.type, title=True), action
),
)
return redirect(redirect_url)
try:
project.set_archive(status)
# Call for additional actions for archive/unarchive in plugins
if getattr(settings, 'PROJECTROLES_ENABLE_MODIFY_API', False):
self.call_project_modify_api(
'perform_project_archive',
'revert_project_archive',
[project],
)
messages.success(
request,
'{} {}d.'.format(
get_display_name(project.type, title=True), action
),
)
except Exception as ex:
messages.error(
request,
'Failed to {} {}: {}'.format(
action, get_display_name(project.type), ex
),
)
return redirect(redirect_url)
try:
if timeline:
timeline.add_event(
project=project,
app_name=APP_NAME,
user=request.user,
event_name='project_{}'.format(action),
description='{} project'.format(action),
status_type=timeline.TL_STATUS_OK,
)
# Alert users and send email
self._alert_users(project, action, request.user)
if SEND_EMAIL: # NOTE: Opt-out settings checked in email method
email.send_project_archive_mail(project, action, request)
except Exception as ex:
messages.error(request, 'Failed to alert users: {}'.format(ex))
return redirect(redirect_url)
# RoleAssignment Views ---------------------------------------------------------
class ProjectRoleView(
LoginRequiredMixin,
LoggedInPermissionMixin,
ProjectPermissionMixin,
ProjectContextMixin,
TemplateView,
):
"""View for displaying project roles"""
permission_required = 'projectroles.view_project_roles'
template_name = 'projectroles/project_roles.html'
model = Project
def get_context_data(self, *args, **kwargs):
project = self.get_project()
context = super().get_context_data(*args, **kwargs)
context['roles'] = sorted(
project.get_roles(), key=lambda x: [x.role.rank, x.user.username]
)
owner_rank = ROLE_RANKING[PROJECT_ROLE_OWNER]
context['enable_owner_transfer'] = any(
[r.role.rank >= owner_rank for r in context['roles']]
)
if project.is_remote():
context[
'remote_roles_url'
] = project.get_source_site().url + reverse(
'projectroles:roles', kwargs={'project': project.sodar_uuid}
)
return context
class RoleAssignmentModifyMixin(ProjectModifyPluginViewMixin):
"""Mixin for RoleAssignment creation/updating in UI and API views"""
@transaction.atomic
def modify_assignment(
self, data, request, project, instance=None, promote=False
):
"""
Create or update a RoleAssignment. This method should be called either
in form_valid() in a Django form view or save() in a DRF serializer.
The method calls related ProjectModifyPluginAPIMixin methods if enabled
in your plugin.
:param data: Cleaned data from a form or serializer
:param request: Request initiating the action
:param project: Project object
:param instance: Existing RoleAssignment object or None
:param promote: Promoting an inherited user (boolean, default=False)
:return: Created or updated RoleAssignment object
"""
app_alerts = get_backend_api('appalerts_backend')
timeline = get_backend_api('timeline_backend')
tl_event = None
action = PROJECT_ACTION_UPDATE if instance else PROJECT_ACTION_CREATE
user = data.get('user')
role = data.get('role')
# Init Timeline event
if timeline:
tl_desc = '{} role {}"{}" for {{{}}}'.format(
action.lower(),
'to ' if action == PROJECT_ACTION_UPDATE else '',
role.name,
'user',
)
tl_event = timeline.add_event(
project=project,
app_name=APP_NAME,
user=request.user,
event_name='role_{}'.format(action.lower()),
description=tl_desc,
)
tl_event.add_object(user, 'user', user.username)
if action == PROJECT_ACTION_CREATE:
role_as = RoleAssignment(project=project, user=user, role=role)
old_role = None
else:
role_as = RoleAssignment.objects.get(project=project, user=user)
old_role = role_as.role
role_as.role = role
role_as.save()
# Call for additional actions for role creation/update in plugins
if getattr(settings, 'PROJECTROLES_ENABLE_MODIFY_API', False):
args = [role_as, action, old_role, request]
self.call_project_modify_api(
'perform_role_modify', 'revert_role_modify', args
)
if tl_event:
tl_event.set_status('OK')
if request.user != user:
if app_alerts:
if action == PROJECT_ACTION_CREATE:
alert_msg = ROLE_CREATE_MSG
else: # Update
alert_msg = ROLE_UPDATE_MSG
app_alerts.add_alert(
app_name=APP_NAME,
alert_name='role_' + action.lower(),
user=user,
message=alert_msg.format(
project=project.title, role=role.name
),
url=reverse(
'projectroles:detail',
kwargs={'project': project.sodar_uuid},
),
project=project,
)
if SEND_EMAIL and app_settings.get(
APP_NAME, 'notify_email_role', user=user
):
email.send_role_change_mail(
'update' if promote else action.lower(),
project,
user,
role,
request,
)
return role_as
class RoleAssignmentModifyFormMixin(RoleAssignmentModifyMixin, ModelFormMixin):
"""Mixin for RoleAssignment creation and updating in Django form views"""
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
if self.kwargs.get('promote_as'):
change_type = 'update' # If promoting inherited role
else:
change_type = self.request.resolver_match.url_name.split('_')[1]
project = self.get_project()
if change_type != 'delete':
context['preview_subject'] = email.get_role_change_subject(
change_type, project
)
context['preview_body'] = email.get_role_change_body(
change_type=change_type,
project=project,
user_name='{user_name}',
issuer=self.request.user,
role_name='{role_name}',
request=self.request,
).replace('\n', '\\n')
return context
def form_valid(self, form):
"""Handle RoleAssignment updating if form is valid"""
instance = form.instance if form.instance.pk else None
action = 'update' if instance else 'create'
project = self.get_project()
try:
self.object = self.modify_assignment(
data=form.cleaned_data,
request=self.request,
project=project,
instance=form.instance if instance else None,
promote=True if form.cleaned_data.get('promote') else False,
)
messages.success(
self.request,
'Membership {} for {} with the role of {}.'.format(
'added' if action == 'create' else 'updated',
self.object.user.username,
self.object.role.name,
),
)
except Exception as ex:
messages.error(
self.request, 'Membership updating failed: {}'.format(ex)
)
return redirect(
reverse(
'projectroles:roles',
kwargs={'project': project.sodar_uuid},
)
)
class RoleAssignmentDeleteMixin(ProjectModifyPluginViewMixin):
"""Mixin for RoleAssignment deletion/destroying in UI and API views"""
@transaction.atomic
def _update_app_alerts(self, app_alerts, project, user, inh_as):
"""
Update app alerts for user on role assignment deletion. Creates a new
alert as appropriate and dismisses alerts in projects the user can no
longer access.
:param app_alerts: AppAlertAPI object
:param project: Project object
:param user: SODARUser object
:param inh_as: RoleAssignment object for inherited assignment or None
"""
if inh_as:
message = ROLE_UPDATE_MSG.format(
project=project.title, role=inh_as.role.name
)
else:
message = 'Your membership in this {} has been removed.'.format(
get_display_name(project.type)
)
# Dismiss existing alerts
AppAlert = app_alerts.get_model()
dis_projects = [project]
if project.type == PROJECT_TYPE_CATEGORY:
for c in project.get_children(flat=True):
c_role_as = RoleAssignment.objects.filter(
project=c, user=user
).first()
if not c.public_guest_access and not c_role_as:
dis_projects.append(c)
for a in AppAlert.objects.filter(
user=user, project__in=dis_projects, active=True
):
a.active = False
a.save()
app_alerts.add_alert(
app_name=APP_NAME,
alert_name='role_{}'.format('update' if inh_as else 'delete'),
user=user,
message=message,
project=project,
)
def delete_assignment(self, request, instance):
app_alerts = get_backend_api('appalerts_backend')
timeline = get_backend_api('timeline_backend')
tl_event = None
project = instance.project
user = instance.user
role = instance.role
# Init Timeline event
if timeline:
tl_event = timeline.add_event(
project=project,
app_name=APP_NAME,
user=request.user,
event_name='role_delete',
description='delete role "{}" from {{{}}}'.format(
role.name, 'user'
),
)
tl_event.add_object(user, 'user', user.username)
# Call the project plugin modify API for additional actions
if getattr(settings, 'PROJECTROLES_ENABLE_MODIFY_API', False):
self.call_project_modify_api(
'perform_role_delete', 'revert_role_delete', [instance, request]
)
# Delete object itself
instance.delete()
# Delete corresponding PROJECT_USER settings
if (
not project.get_role(user)
and project.type == PROJECT_TYPE_CATEGORY
and not RoleAssignment.objects.filter(
project__in=project.get_children(flat=True), user=user
).exists()
):
app_settings.delete_by_scope(
APP_SETTING_SCOPE_PROJECT_USER, project, user
)
inh_as = project.get_role(user, inherited_only=True)
if tl_event:
tl_event.set_status(timeline.TL_STATUS_OK)
if app_alerts:
self._update_app_alerts(app_alerts, project, user, inh_as)
if SEND_EMAIL and app_settings.get(
APP_NAME, 'notify_email_role', user=user
):
if inh_as:
email.send_role_change_mail(
'update', project, user, inh_as.role, request
)
else:
email.send_role_change_mail(
'delete', project, user, None, request
)
return instance
class RoleAssignmentCreateView(
LoginRequiredMixin,
ProjectModifyPermissionMixin,
ProjectContextMixin,
CurrentUserFormMixin,
RoleAssignmentModifyFormMixin,
InvalidFormMixin,
CreateView,
):
"""RoleAssignment creation view"""
permission_required = 'projectroles.update_project_members'
model = RoleAssignment
form_class = RoleAssignmentForm
#: Promote assignment
promote_as = None
def get_form_kwargs(self):
"""Pass URL arguments and current user to form"""
kwargs = super().get_form_kwargs()
kwargs.update(self.kwargs)
return kwargs
def get_context_data(self, **kwargs):
context = super().get_context_data()
context['promote_as'] = self.promote_as # Queried in get()
return context
def get(self, request, *args, **kwargs):
# Validate inherited role promotion if set
if self.kwargs.get('promote_as'):
project = self.get_project()
self.promote_as = RoleAssignment.objects.filter(
sodar_uuid=self.kwargs['promote_as']
).first()
if (
not self.promote_as
or self.promote_as.role.rank
<= ROLE_RANKING[PROJECT_ROLE_DELEGATE]
or self.promote_as.project == project
or self.promote_as.project not in project.get_parents()
):
messages.error(
request, 'Invalid role assignment for promotion.'
)
return redirect(
reverse(
'projectroles:roles',
kwargs={'project': project.sodar_uuid},
)
)
return super().get(request, *args, **kwargs)
class RoleAssignmentUpdateView(
LoginRequiredMixin,
RolePermissionMixin,
ProjectContextMixin,
RoleAssignmentModifyFormMixin,
CurrentUserFormMixin,
InvalidFormMixin,
UpdateView,
):
"""RoleAssignment updating view"""
permission_required = 'projectroles.update_project_members'
model = RoleAssignment
form_class = RoleAssignmentForm
slug_url_kwarg = 'roleassignment'
slug_field = 'sodar_uuid'
class RoleAssignmentDeleteView(
LoginRequiredMixin,
RolePermissionMixin,
ProjectModifyPermissionMixin,
ProjectContextMixin,
RoleAssignmentDeleteMixin,
DeleteView,
):
"""RoleAssignment deletion view"""
permission_required = 'projectroles.update_project_members'
model = RoleAssignment
slug_url_kwarg = 'roleassignment'
slug_field = 'sodar_uuid'
def _get_inherited_children(self, project, user, ret):
for child in project.get_children():
if not RoleAssignment.objects.filter(project=child, user=user):
ret.append(child)
else:
ret = self._get_inherited_children(child, user, ret)
return ret
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
project = self.get_project()
user = self.object.user
context['inherited_as'] = project.get_role(user, inherited_only=True)
context['inherited_children'] = None
if (
not context['inherited_as']
and self.object.role.rank < ROLE_RANKING[PROJECT_ROLE_FINDER]
):
context['inherited_children'] = sorted(
self._get_inherited_children(project, user, []),
key=lambda x: x.full_title,
)
return context
def post(self, *args, **kwargs):
self.object = self.get_object()
user = self.object.user
project = self.object.project
# Override perms for owner/delegate
if self.object.role.name == PROJECT_ROLE_OWNER or (
self.object.role.name == PROJECT_ROLE_DELEGATE
and not self.request.user.has_perm(
'projectroles.update_project_delegate', project
)
):
messages.error(
self.request,
'You do not have permission to remove the '
'membership of {}.'.format(self.object.role.name),
)
else:
try:
self.object = self.delete_assignment(
request=self.request, instance=self.object
)
messages.success(
self.request,
'Membership of {} removed.'.format(user.username),
)
except Exception as ex:
messages.error(
self.request,
'Failed to remove membership of {}: {}'.format(
user.username, ex
),
)
return redirect(
reverse(
'projectroles:roles', kwargs={'project': project.sodar_uuid}
)
)
class RoleAssignmentOwnerTransferMixin(ProjectModifyPluginViewMixin):
"""Mixin for owner RoleAssignment transfer in UI and API views"""
#: Owner role object
role_owner = None
def _get_timeline_ok_status(self):
timeline = get_backend_api('timeline_backend')
if not timeline:
return None
else:
return timeline.TL_STATUS_OK
def _get_timeline_failed_status(self):
timeline = get_backend_api('timeline_backend')
if not timeline:
return None
else:
return timeline.TL_STATUS_FAILED
def _create_timeline_event(self, old_owner, new_owner, project):
timeline = get_backend_api('timeline_backend')
# Init Timeline event
if not timeline:
return None
tl_desc = 'transfer ownership from {{{}}} to {{{}}}'.format(
'prev_owner', 'new_owner'
)
tl_event = timeline.add_event(
project=project,
app_name=APP_NAME,
user=self.request.user,
event_name='role_owner_transfer',
description=tl_desc,
extra_data={
'prev_owner': old_owner.username,
'new_owner': new_owner.username,
},
)
tl_event.add_object(old_owner, 'prev_owner', old_owner.username)
tl_event.add_object(new_owner, 'new_owner', new_owner.username)
return tl_event
@transaction.atomic
def _handle_transfer(
self,
project,
old_owner_as,
new_owner,
old_owner_role,
old_inh_owner,
):
"""
Handle ownership transfer with atomic rollback.
:param project: Project object
:param old_owner_as: RoleAssignment object for old owner
:param new_owner: User object for new user
:param old_owner_role: Role object to set for old owner
:param old_inh_owner: Whether old owner is inherited owner (boolean)
"""
# Inherited owner, delete local role
if old_inh_owner:
old_owner_as.delete()
# Update old owner role
else:
old_owner_as.role = old_owner_role
old_owner_as.save()
# Update or create new owner role
new_role_as = RoleAssignment.objects.filter(
project=project, user=new_owner
).first()
if new_role_as:
new_role_as.role = self.role_owner
new_role_as.save()
else:
RoleAssignment.objects.create(
project=project, user=new_owner, role=self.role_owner
)
# Call for additional actions for role creation/update in plugins
if getattr(settings, 'PROJECTROLES_ENABLE_MODIFY_API', False):
args = [
project,
new_owner,
old_owner_as.user,
old_owner_role,
self.request,
]
self.call_project_modify_api(
'perform_owner_transfer', 'revert_owner_transfer', args
)
return True
def transfer_owner(self, project, new_owner, old_owner_as, old_owner_role):
"""
Transfer project ownership to a new user and assign a new role to the
previous owner.
:param project: Project object
:param new_owner: User object
:param old_owner_as: RoleAssignment object
:param old_owner_role: Role object for the previous owner's new role
:return:
"""
app_alerts = get_backend_api('appalerts_backend')
self.role_owner = Role.objects.get(name=PROJECT_ROLE_OWNER)
old_owner = old_owner_as.user
# Old owner inheritance
old_inh_as = project.get_role(old_owner, inherited_only=True)
old_inh_owner = (
True if old_inh_as and old_inh_as.role == self.role_owner else False
)
# New owner inheritance
new_inh_as = project.get_role(new_owner, inherited_only=True)
new_inh_owner = (
True if new_inh_as and new_inh_as.role == self.role_owner else False
)
tl_event = self._create_timeline_event(old_owner, new_owner, project)
try:
self._handle_transfer(
project, old_owner_as, new_owner, old_owner_role, old_inh_owner
)
except Exception as ex:
if tl_event:
tl_event.set_status(self._get_timeline_failed_status(), str(ex))
raise ex
if tl_event:
tl_event.set_status(self._get_timeline_ok_status())
if not old_inh_owner and self.request.user != old_owner:
if app_alerts:
app_alerts.add_alert(
app_name=APP_NAME,
alert_name='role_update',
user=old_owner,
message=ROLE_UPDATE_MSG.format(
project=project.title, role=old_owner_role.name
),
url=reverse(
'projectroles:detail',
kwargs={'project': project.sodar_uuid},
),
project=project,
)
if SEND_EMAIL and app_settings.get(
APP_NAME, 'notify_email_role', user=old_owner
):
email.send_role_change_mail(
'update', project, old_owner, old_owner_role, self.request
)
if not new_inh_owner and self.request.user != new_owner:
if app_alerts:
app_alerts.add_alert(
app_name=APP_NAME,
alert_name='role_update',
user=new_owner,
message=ROLE_UPDATE_MSG.format(
project=project.title, role=self.role_owner.name
),
url=reverse(
'projectroles:detail',
kwargs={'project': project.sodar_uuid},
),
project=project,
)
if SEND_EMAIL and app_settings.get(
APP_NAME, 'notify_email_role', user=new_owner
):
email.send_role_change_mail(
'update',
project,
new_owner,
self.role_owner,
self.request,
)
class RoleAssignmentOwnerTransferView(
LoginRequiredMixin,
ProjectModifyPermissionMixin,
CurrentUserFormMixin,
ProjectContextMixin,
RoleAssignmentOwnerTransferMixin,
InvalidFormMixin,
FormView,
):
"""Project owner RoleAssignment transfer view"""
permission_required = 'projectroles.update_project_owner'
template_name = 'projectroles/roleassignment_owner_transfer.html'
form_class = RoleAssignmentOwnerTransferForm
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
project = self.get_project()
owner_as = RoleAssignment.objects.filter(
project=project, role__name=PROJECT_ROLE_OWNER
)[0]
kwargs.update({'project': project, 'current_owner': owner_as.user})
return kwargs
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
owner_as = RoleAssignment.objects.filter(
project=self.get_project(), role__name=PROJECT_ROLE_OWNER
)[0]
context.update({'current_owner': owner_as.user})
return context
def form_valid(self, form):
project = form.project
old_owner = form.current_owner
old_owner_as = project.get_owner()
new_owner = form.cleaned_data['new_owner']
old_owner_role = form.cleaned_data['old_owner_role']
redirect_url = reverse(
'projectroles:roles', kwargs={'project': project.sodar_uuid}
)
try:
self.transfer_owner(
project, new_owner, old_owner_as, old_owner_role
)
except Exception as ex:
# TODO: Add logging
messages.error(
self.request, 'Unable to transfer ownership: {}'.format(ex)
)
if settings.DEBUG:
raise ex
return redirect(redirect_url)
if old_owner.username != new_owner.username:
success_msg = (
'Successfully transferred ownership from '
'{} to {}.'.format(old_owner.username, new_owner.username)
)
old_owner_email = app_settings.get(
APP_NAME, 'notify_email_role', user=old_owner
)
new_owner_email = app_settings.get(
APP_NAME, 'notify_email_role', user=new_owner
)
if SEND_EMAIL and (old_owner_email or new_owner_email):
success_msg += ' Notification(s) have been sent by email.'
messages.success(self.request, success_msg)
return redirect(redirect_url)
# ProjectInvite Views ----------------------------------------------------------
class ProjectInviteMixin:
"""Mixin for ProjectInvite helpers"""
@classmethod
def handle_invite(cls, invite, request, resend=False, add_message=True):
"""
Handle invite creation, email sending/resending and logging to timeline.
:param invite: ProjectInvite object
:param request: Django request object
:param resend: Send or resend (bool)
:param add_message: Add Django message on success/failure (bool)
"""
timeline = get_backend_api('timeline_backend')
send_str = 'resend' if resend else 'send'
status_type = timeline.TL_STATUS_OK
status_desc = None
if SEND_EMAIL:
sent_mail = email.send_invite_mail(invite, request)
if sent_mail == 0:
status_type = timeline.TL_STATUS_FAILED
status_desc = 'Email sending failed'
else:
status_type = timeline.TL_STATUS_FAILED
status_desc = 'PROJECTROLES_SEND_EMAIL not True'
if status_type != timeline.TL_STATUS_OK and not resend:
status_desc += ', invite not created'
# Add event in Timeline
if timeline:
timeline.add_event(
project=invite.project,
app_name=APP_NAME,
user=request.user,
event_name='invite_{}'.format(send_str),
description='{} project invite with role "{}" to {}'.format(
send_str, invite.role.name, invite.email
),
status_type=status_type,
status_desc=status_desc,
)
if add_message and status_type == timeline.TL_STATUS_OK:
messages.success(
request,
'Invite for "{}" role in {} sent to {}, expires on {}.'.format(
invite.role.name,
invite.project.title,
invite.email,
timezone.localtime(invite.date_expire).strftime(
'%Y-%m-%d %H:%M'
),
),
)
elif add_message and not resend: # NOTE: Delete invite if send fails
invite.delete()
messages.error(request, status_desc)
@classmethod
def revoke_invite(cls, invite, project, request):
timeline = get_backend_api('timeline_backend')
if invite:
invite.active = False
invite.save()
# Add event in Timeline
if timeline:
timeline.add_event(
project=project,
app_name=APP_NAME,
user=request.user,
event_name='invite_revoke',
description='revoke invite sent to "{}"'.format(
invite.email if invite else 'N/A'
),
status_type=(
timeline.TL_STATUS_OK
if invite
else timeline.TL_STATUS_FAILED
),
)
return invite
class ProjectInviteView(
LoginRequiredMixin,
ProjectContextMixin,
ProjectModifyPermissionMixin,
TemplateView,
):
"""View for displaying and modifying project invites"""
permission_required = 'projectroles.invite_users'
template_name = 'projectroles/project_invites.html'
model = ProjectInvite
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
context['invites'] = ProjectInvite.objects.filter(
project=context['project'],
active=True,
date_expire__gt=timezone.now(),
)
return context
class ProjectInviteCreateView(
LoginRequiredMixin,
ProjectContextMixin,
ProjectModifyPermissionMixin,
ProjectInviteMixin,
CurrentUserFormMixin,
InvalidFormMixin,
CreateView,
):
"""ProjectInvite creation view"""
model = ProjectInvite
form_class = ProjectInviteForm
permission_required = 'projectroles.invite_users'
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
project = self.get_permission_object()
context['preview_subject'] = email.get_invite_subject(project)
context['preview_body'] = email.get_invite_body(
project=project,
issuer=self.request.user,
role_name='{role_name}',
invite_url='http://XXXXXXXXXXXXXXXXXXXXXXX',
date_expire_str='YYYY-MM-DD HH:MM',
).replace('\n', '\\n')
context['preview_message'] = email.get_invite_message(
'{message}'
).replace('\n', '\\n')
context['preview_footer'] = email.get_email_footer(
self.request
).replace('\n', '\\n')
return context
def get_form_kwargs(self):
"""Pass current user and optional email/role to form"""
kwargs = super().get_form_kwargs()
kwargs.update({'project': self.get_permission_object().sodar_uuid})
email = self.request.GET.get('e', None)
kwargs.update({'mail': unquote_plus(email) if email else None})
kwargs.update({'role': self.request.GET.get('r', None)})
return kwargs
def form_valid(self, form):
self.object = form.save()
# Send mail and add to timeline
self.handle_invite(invite=self.object, request=self.request)
return redirect(
reverse(
'projectroles:invites',
kwargs={'project': self.object.project.sodar_uuid},
)
)
class ProjectInviteProcessMixin(ProjectModifyPluginViewMixin):
"""Mixin for accepting and processing project invites"""
@classmethod
def revoke_invite(
cls, invite, user=None, failed=True, fail_desc='', timeline=None
):
"""Set invite.active to False and save the invite"""
invite.active = False
invite.save()
if failed and timeline and user:
# Add event in Timeline
timeline.add_event(
project=invite.project,
app_name=APP_NAME,
user=user,
event_name='invite_accept',
description='accept project invite',
status_type=timeline.TL_STATUS_FAILED,
status_desc=fail_desc,
)
def get_invite(self, secret):
"""Get invite, display message if not found"""
try:
return ProjectInvite.objects.get(secret=secret)
except ProjectInvite.DoesNotExist:
messages.error(self.request, 'Invite does not exist.')
def user_role_exists(self, invite, user):
"""
Display message if user already has roles in project. Also revoke the
invite if necessary.
"""
if invite.project.has_role(user):
messages.warning(
self.request,
mark_safe(
'You are already a member of this {project}. '
'<a href="{url}">Please use this URL to access the '
'{project}</a>.'.format(
project=get_display_name(invite.project.type),
url=reverse(
'projectroles:detail',
kwargs={'project': invite.project.sodar_uuid},
),
)
),
)
if invite.active: # Only revoke if active
self.revoke_invite(invite, user)
return True
return False
def is_invite_expired(self, invite, user=None):
"""Display message and send email to issuer if invite is expired"""
if invite.date_expire < timezone.now():
messages.error(
self.request,
'Your invite has expired. Please contact the inviter: '
'{} ({})'.format(invite.issuer.name, invite.issuer.email),
)
# Send notification of expiry to issuer
if SEND_EMAIL and app_settings.get(
APP_NAME, 'notify_email_role', user=invite.issuer
):
email.send_expiry_note(
invite,
self.request,
user_name=user.get_full_name() if user else invite.email,
)
self.revoke_invite(
invite, user, failed=True, fail_desc='Invite expired'
)
return True
return False
# TODO: Combine with RoleAssignmentModifyMixin.modify_assignment?
@transaction.atomic
def create_assignment(self, invite, user, timeline=None):
"""Create role assignment for invited user"""
app_alerts = get_backend_api('appalerts_backend')
tl_event = None
if timeline:
tl_event = timeline.add_event(
project=invite.project,
app_name=APP_NAME,
user=user,
event_name='invite_accept',
description='accept project invite with role of "{}"'.format(
invite.role.name
),
)
# Create the assignment
role_as = RoleAssignment(
user=user, project=invite.project, role=invite.role
)
# Call for additional actions for role creation/update in plugins
if getattr(settings, 'PROJECTROLES_ENABLE_MODIFY_API', False):
args = [role_as, PROJECT_ACTION_CREATE, None, self.request]
self.call_project_modify_api(
'perform_role_modify', 'revert_role_modify', args
)
role_as.save()
if tl_event:
tl_event.set_status(timeline.TL_STATUS_OK)
# Notify the issuer by alert and email
if app_alerts:
app_alerts.add_alert(
app_name=APP_NAME,
alert_name='invite_accept',
user=invite.issuer,
message='Invitation sent to "{}" for project "{}" with the '
'role "{}" was accepted.'.format(
user.email, invite.project.title, invite.role.name
),
url=reverse(
'projectroles:detail',
kwargs={'project': invite.project.sodar_uuid},
),
project=invite.project,
)
if SEND_EMAIL and app_settings.get(
APP_NAME, 'notify_email_role', user=invite.issuer
):
email.send_accept_note(invite, self.request, user)
# Deactivate the invite
self.revoke_invite(invite, user, failed=False, timeline=timeline)
# Finally, redirect user to the project front page
messages.success(
self.request,
PROJECT_WELCOME_MSG.format(
project_type=get_display_name(invite.project.type),
project_title=invite.project.title,
role=invite.role.name,
),
)
def redirect_error(self, msg=None):
if msg:
messages.error(self.request, msg)
return redirect(reverse('home'))
class ProjectInviteAcceptView(ProjectInviteProcessMixin, View):
"""View to handle accepting a project invite"""
def _redirect_process(self, login=True):
"""Redirect to the proper process view"""
url = 'projectroles:invite_process_{}'.format(
'login' if login else 'new_user'
)
kw = {'secret': self.kwargs.get('secret')}
return redirect(reverse(url, kwargs=kw))
def get(self, *args, **kwargs):
invite = self.get_invite(secret=kwargs['secret'])
if not invite:
return self.redirect_error(INVITE_NOT_FOUND_MSG)
user = self.request.user
if (
not user.is_anonymous
and user.is_authenticated
and user.email == invite.email
and self.user_role_exists(invite, user)
):
return redirect(
reverse(
'projectroles:detail',
kwargs={'project': invite.project.sodar_uuid},
)
)
if (settings.ENABLE_LDAP and invite.is_ldap()) or (
settings.ENABLE_OIDC and not settings.PROJECTROLES_ALLOW_LOCAL_USERS
):
return self._redirect_process()
elif settings.PROJECTROLES_ALLOW_LOCAL_USERS:
return self._redirect_process(False)
return self.redirect_error(
'Local or OIDC users are not enabled on this site.'
)
class ProjectInviteProcessLoginView(
LoginRequiredMixin, ProjectInviteProcessMixin, View
):
"""
View for processing project invite with a logged in LDAP/OIDC user. Requires
login and then creates project assignment for the invited user.
"""
def get(self, *args, **kwargs):
invite = self.get_invite(kwargs['secret'])
if not invite:
return self.redirect_error(INVITE_NOT_FOUND_MSG)
timeline = get_backend_api('timeline_backend')
# Check if user has already accepted the invite
if self.user_role_exists(invite, self.request.user):
# NOTE: No message, simply redirect
return redirect(
reverse(
'projectroles:detail',
kwargs={'project': invite.project.sodar_uuid},
)
)
# Check if invite expired
if self.is_invite_expired(invite, self.request.user):
return self.redirect_error()
# Create RoleAssignment
try:
self.create_assignment(invite, self.request.user, timeline=timeline)
except Exception as ex:
return self.redirect_error(ex)
return redirect(
reverse(
'projectroles:detail',
kwargs={'project': invite.project.sodar_uuid},
)
)
class ProjectInviteProcessNewUserView(ProjectInviteProcessMixin, FormView):
"""
View for processing a local/OIDC project invite as a newly created user.
Also provides an OIDC login element to login instead of creating a local
user account.
"""
form_class = LocalUserForm
template_name = 'projectroles/user_form.html'
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['invite'] = self.get_invite(self.kwargs['secret'])
return context
def get(self, *args, **kwargs):
invite = self.get_invite(self.kwargs['secret'])
if not invite:
return self.redirect_error(INVITE_NOT_FOUND_MSG)
timeline = get_backend_api('timeline_backend')
# Redirect to login process view if OIDC is enabled but local isn't
if settings.ENABLE_OIDC and not settings.PROJECTROLES_ALLOW_LOCAL_USERS:
return redirect(
reverse(
'projectroles:invite_process_login',
kwargs={'secret': invite.secret},
)
)
# If local users are not allowed but OIDC is, redirect to home
elif not settings.PROJECTROLES_ALLOW_LOCAL_USERS:
return self.redirect_error(INVITE_LOCAL_NOT_ALLOWED_MSG)
# Check invite for correct type
if invite.is_ldap():
return self.redirect_error(INVITE_LDAP_LOCAL_VIEW_MSG)
# Check if invited user exists
user = User.objects.filter(email=invite.email).first()
# Check if invite has expired
if self.is_invite_expired(invite, user):
return self.redirect_error() # Error message already added
# A user is not logged in
if self.request.user.is_anonymous:
# Redirect to login if user exists
if user:
messages.info(self.request, INVITE_USER_EXISTS_MSG)
return redirect(
reverse('login')
+ '?next='
+ reverse(
'projectroles:invite_process_new_user',
kwargs={'secret': invite.secret},
)
)
# Show form if user doesn't exist and no user is logged in
return super().get(*args, **kwargs)
# Logged in but the invited user does not exist yet
if not user:
return self.redirect_error(INVITE_LOGGED_IN_ACCEPT_MSG)
# Logged in user is not invited user
if self.request.user != user:
return self.redirect_error(INVITE_USER_NOT_EQUAL_MSG)
# User exists but is not local
if not user.is_local():
return self.redirect_error('User exists, but is not local.')
# Create RoleAssignment
try:
self.create_assignment(invite, self.request.user, timeline=timeline)
except Exception as ex:
return self.redirect_error(ex)
return redirect(
reverse(
'projectroles:detail',
kwargs={'project': invite.project.sodar_uuid},
)
)
def get_initial(self):
"""Returns the initial data for the form."""
initial = super().get_initial()
try:
invite = ProjectInvite.objects.get(secret=self.kwargs['secret'])
except ProjectInvite.DoesNotExist:
return initial
username_base = invite.email.split('@')[0]
i = 0
while True:
username = username_base + str(i if i else '')
try:
User.objects.get(username=username)
i += 1
except User.DoesNotExist:
break
initial.update({'email': invite.email, 'username': username})
return initial
def form_valid(self, form):
invite = self.get_invite(self.kwargs['secret'])
if not invite:
return redirect(reverse('home'))
timeline = get_backend_api('timeline_backend')
# Check if local users are allowed
if not settings.PROJECTROLES_ALLOW_LOCAL_USERS:
return self.redirect_error(
'Invite was issued to non-LDAP user, but local users are not '
'allowed.'
)
# Check invite for correct type
if invite.is_ldap():
return self.redirect_error(
'Invite was issued for LDAP user, but local invite view was '
'requested.'
)
# Check if invite is expired
if self.is_invite_expired(invite):
return self.redirect_error()
# Create user and RoleAssignment
user = User.objects.create_user(
form.cleaned_data['username'],
email=form.cleaned_data['email'],
password=form.cleaned_data['password'],
first_name=form.cleaned_data['first_name'],
last_name=form.cleaned_data['last_name'],
)
if self.user_role_exists(invite, user):
return redirect(
reverse(
'projectroles:detail',
kwargs={'project': invite.project.sodar_uuid},
)
)
try:
self.create_assignment(invite, user, timeline=timeline)
except Exception as ex:
user.delete()
return self.redirect_error(ex)
return redirect(
reverse(
'projectroles:detail',
kwargs={'project': invite.project.sodar_uuid},
)
)
class ProjectInviteResendView(
LoginRequiredMixin, ProjectModifyPermissionMixin, ProjectInviteMixin, View
):
"""View to handle resending a project invite"""
permission_required = 'projectroles.invite_users'
def get(self, *args, **kwargs):
try:
invite = ProjectInvite.objects.get(
sodar_uuid=self.kwargs['projectinvite'], active=True
)
except ProjectInvite.DoesNotExist:
messages.error(self.request, INVITE_NOT_FOUND_MSG)
return redirect(
reverse(
'projectroles:invites',
kwargs={'project': self.get_project()},
)
)
# Reset invite expiration date
invite.reset_date_expire()
# Resend mail and add to timeline
self.handle_invite(invite=invite, request=self.request, resend=True)
return redirect(
reverse(
'projectroles:invites',
kwargs={'project': invite.project.sodar_uuid},
)
)
class ProjectInviteRevokeView(
LoginRequiredMixin,
ProjectModifyPermissionMixin,
ProjectContextMixin,
ProjectInviteMixin,
TemplateView,
):
"""Batch delete/move confirm view"""
template_name = 'projectroles/invite_revoke_confirm.html'
permission_required = 'projectroles.invite_users'
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
context['project'] = self.get_project()
if 'projectinvite' in self.kwargs:
try:
context['invite'] = ProjectInvite.objects.get(
sodar_uuid=self.kwargs['projectinvite']
)
except ProjectInvite.DoesNotExist:
pass
return context
def post(self, request, **kwargs):
"""Override post() to handle POST from confirmation template"""
project = self.get_project()
invite = ProjectInvite.objects.filter(
sodar_uuid=kwargs['projectinvite']
).first()
if (
invite
and invite.role.name == PROJECT_ROLE_DELEGATE
and not request.user.has_perm(
'projectroles.update_project_delegate', project
)
):
# Causes revoke_failed_invite() to add failed timeline event
invite = None
messages.error(
self.request, 'No permissions for updating delegate invite.'
)
elif invite:
messages.success(self.request, 'Invite revoked.')
else:
messages.error(self.request, 'Invite not found.')
self.revoke_invite(invite, project, request)
return redirect(
reverse(
'projectroles:invites', kwargs={'project': project.sodar_uuid}
)
)
# User management views --------------------------------------------------------
class UserUpdateView(
LoginRequiredMixin, HTTPRefererMixin, InvalidFormMixin, UpdateView
):
"""Display and process the user update view"""
form_class = LocalUserForm
template_name = 'projectroles/user_form.html'
success_url = reverse_lazy('home')
def get_object(self, **kwargs):
return self.request.user
def get(self, *args, **kwargs):
if not self.request.user.is_local():
messages.error(self.request, USER_PROFILE_LDAP_MSG)
return redirect(reverse('home'))
return super().get(*args, **kwargs)
def get_success_url(self):
messages.success(self.request, USER_PROFILE_UPDATE_MSG)
return reverse('home')
# Remote site and project views ------------------------------------------------
class RemoteSiteListView(
LoginRequiredMixin, LoggedInPermissionMixin, TemplateView
):
"""Main view for displaying remote site list"""
permission_required = 'projectroles.update_remote'
template_name = 'projectroles/remote_sites.html'
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
site_mode = (
SITE_MODE_TARGET
if settings.PROJECTROLES_SITE_MODE == SITE_MODE_SOURCE
else SITE_MODE_SOURCE
)
sites = RemoteSite.objects.filter(mode=site_mode).order_by('name')
if (
sites.count() > 0
and settings.PROJECTROLES_SITE_MODE == SITE_MODE_TARGET
):
sites = sites[:1]
context['sites'] = sites
return context
# TODO: Remove this once implementing #76
def get(self, request, *args, **kwargs):
if getattr(settings, 'PROJECTROLES_DISABLE_CATEGORIES', False):
messages.warning(
request,
'{} {} and nesting disabled, '
'remote {} sync disabled.'.format(
get_display_name(PROJECT_TYPE_PROJECT, title=True),
get_display_name(PROJECT_TYPE_CATEGORY, plural=True),
get_display_name(PROJECT_TYPE_PROJECT),
),
)
return redirect('home')
return super().get(request, *args, **kwargs)
class RemoteSiteModifyMixin(ModelFormMixin):
"""Helpers for remote site modification"""
def _create_timeline_event(self, remote_site, user, form_action):
"""
Create timeline event for remote site creation/update.
:param remote_site: RemoteSite object
:param user: SODARUser object
:param form_action: String
:param form_action:
"""
timeline = get_backend_api('timeline_backend')
if not timeline:
return
status = form_action if form_action == 'set' else form_action[0:-1]
if remote_site.mode == SITE_MODE_SOURCE:
event_name = 'source_site_{}'.format(status)
else:
event_name = 'target_site_{}'.format(status)
tl_desc = '{} remote {} site {{{}}}'.format(
status,
remote_site.mode.lower(),
'remote_site',
)
tl_event = timeline.add_event(
project=None,
app_name=APP_NAME,
user=user,
event_name=event_name,
description=tl_desc,
classified=True,
extra_data={
'name': remote_site.name,
'url': remote_site.url,
'description': remote_site.description,
'mode': remote_site.mode,
'user display': remote_site.user_display,
'secret': remote_site.secret,
},
status_type=timeline.TL_STATUS_OK,
)
tl_event.add_object(
obj=remote_site, label='remote_site', name=remote_site.name
)
def form_valid(self, form):
"""Override form_valid() to save timeline event and handle UI"""
if self.object:
form_action = 'updated'
elif settings.PROJECTROLES_SITE_MODE == 'TARGET':
form_action = 'set'
else:
form_action = 'created'
self.object = form.save()
# Create timeline event
self._create_timeline_event(self.object, self.request.user, form_action)
messages.success(
self.request,
'{} site "{}" {}.'.format(
self.object.mode.capitalize(), self.object.name, form_action
),
)
return redirect(reverse('projectroles:remote_sites'))
class RemoteSiteCreateView(
LoginRequiredMixin,
LoggedInPermissionMixin,
RemoteSiteModifyMixin,
HTTPRefererMixin,
CurrentUserFormMixin,
InvalidFormMixin,
CreateView,
):
"""RemoteSite creation view"""
model = RemoteSite
form_class = RemoteSiteForm
permission_required = 'projectroles.update_remote'
def get(self, request, *args, **kwargs):
"""
Override get() to disallow rendering this view if current site is
in TARGET mode and a source site already exists.
"""
if (
settings.PROJECTROLES_SITE_MODE == SITE_MODE_TARGET
and RemoteSite.objects.filter(mode=SITE_MODE_SOURCE).count() > 0
):
messages.error(request, 'Source site has already been set')
return redirect(reverse('projectroles:remote_sites'))
return super().get(request, args, kwargs)
class RemoteSiteUpdateView(
LoginRequiredMixin,
LoggedInPermissionMixin,
RemoteSiteModifyMixin,
HTTPRefererMixin,
CurrentUserFormMixin,
InvalidFormMixin,
UpdateView,
):
"""RemoteSite updating view"""
model = RemoteSite
form_class = RemoteSiteForm
permission_required = 'projectroles.update_remote'
slug_url_kwarg = 'remotesite'
slug_field = 'sodar_uuid'
class RemoteSiteDeleteView(
LoginRequiredMixin,
LoggedInPermissionMixin,
HTTPRefererMixin,
DeleteView,
):
"""RemoteSite deletion view"""
model = RemoteSite
permission_required = 'projectroles.update_remote'
slug_url_kwarg = 'remotesite'
slug_field = 'sodar_uuid'
def get_success_url(self):
timeline = get_backend_api('timeline_backend')
if timeline:
event_name = '{}_site_delete'.format(
'source' if self.object.mode == SITE_MODE_SOURCE else 'target'
)
tl_desc = 'delete remote site {remote_site}'
tl_event = timeline.add_event(
project=None,
app_name=APP_NAME,
user=self.request.user,
event_name=event_name,
description=tl_desc,
classified=True,
status_type=timeline.TL_STATUS_OK,
)
tl_event.add_object(
obj=self.object, label='remote_site', name=self.object.name
)
messages.success(
self.request,
'{} site "{}" deleted'.format(
self.object.mode.capitalize(), self.object.name
),
)
return reverse('projectroles:remote_sites')
class RemoteProjectListView(
LoginRequiredMixin, LoggedInPermissionMixin, TemplateView
):
"""View for displaying the project lsit for remote site sync"""
permission_required = 'projectroles.update_remote'
template_name = 'projectroles/remote_projects.html'
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
site = RemoteSite.objects.get(sodar_uuid=self.kwargs['remotesite'])
context['site'] = site
# Projects in SOURCE mode: all local projects of type PROJECT
if settings.PROJECTROLES_SITE_MODE == SITE_MODE_SOURCE:
projects = Project.objects.filter(type=PROJECT_TYPE_PROJECT)
# Projects in TARGET mode: retrieve from source
else: # SITE_MODE_TARGET
remote_uuids = [p.project_uuid for p in site.projects.all()]
projects = Project.objects.filter(
type=PROJECT_TYPE_PROJECT, sodar_uuid__in=remote_uuids
)
if projects:
context['projects'] = projects.order_by('full_title')
return context
class RemoteProjectBatchUpdateView(
LoginRequiredMixin, LoggedInPermissionMixin, TemplateView
):
"""
Manually created form view for updating project access in batch for a
remote target site.
"""
permission_required = 'projectroles.update_remote'
template_name = 'projectroles/remoteproject_update.html'
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
# Current site
context['site'] = RemoteSite.objects.filter(
sodar_uuid=kwargs['remotesite']
).first()
return context
def post(self, request, *args, **kwargs):
timeline = get_backend_api('timeline_backend')
post_data = request.POST
context = self.get_context_data(*args, **kwargs)
site = context['site']
confirmed = True if 'update-confirmed' in post_data else False
redirect_url = reverse(
'projectroles:remote_projects',
kwargs={'remotesite': site.sodar_uuid},
)
# Ensure site is in SOURCE mode
if settings.PROJECTROLES_SITE_MODE != SITE_MODE_SOURCE:
messages.error(
request,
'Site in TARGET mode, cannot update {} access'.format(
get_display_name(PROJECT_TYPE_PROJECT)
),
)
return redirect(redirect_url)
access_fields = {
k: v for k, v in post_data.items() if k.startswith('remote_access')
}
# Confirmation needed
if not confirmed:
# Pass on (only) changed projects to confirmation form
modifying_access = []
for k, v in access_fields.items():
project_uuid = k.split('_')[2]
remote_obj = RemoteProject.objects.filter(
site=site, project_uuid=project_uuid
).first()
if (not remote_obj and v != REMOTE_LEVEL_NONE) or (
remote_obj and remote_obj.level != v
):
modifying_access.append(
{
'project': Project.objects.get(
sodar_uuid=project_uuid
),
'old_level': (
REMOTE_LEVEL_NONE
if not remote_obj
else remote_obj.level
),
'new_level': v,
}
)
if not modifying_access:
messages.warning(
request,
'No changes to {} access detected'.format(
get_display_name(PROJECT_TYPE_PROJECT)
),
)
return redirect(redirect_url)
context['modifying_access'] = modifying_access
return super().render_to_response(context)
# Confirmed
modifying_access = []
old_level = REMOTE_LEVEL_NONE
for k, v in access_fields.items():
project_uuid = k.split('_')[2]
project = Project.objects.filter(sodar_uuid=project_uuid).first()
# Update or create a RemoteProject object
try:
rp = RemoteProject.objects.get(
site=site, project_uuid=project_uuid
)
old_level = rp.level
rp.level = v
except RemoteProject.DoesNotExist:
rp = RemoteProject(
site=site,
project_uuid=project_uuid,
project=project,
level=v,
)
rp.save()
modifying_access.append(
{
'project': project.get_log_title(),
'old_level': (
REMOTE_LEVEL_NONE if not old_level else old_level
),
'new_level': v,
}
)
if timeline and project:
tl_desc = 'update remote access on site {{{}}} to "{}"'.format(
'remote_site',
SODAR_CONSTANTS['REMOTE_ACCESS_LEVELS'][v].lower(),
)
tl_event = timeline.add_event(
project=project,
app_name=APP_NAME,
user=request.user,
event_name='remote_project_update',
description=tl_desc,
classified=True,
status_type=timeline.TL_STATUS_OK,
)
tl_event.add_object(site, 'remote_site', site.name)
if timeline:
tl_desc = 'update remote projects for {remote_site}'
tl_event = timeline.add_event(
project=None,
app_name=APP_NAME,
user=request.user,
event_name='remote_batch_update',
description=tl_desc,
extra_data={'modifying_access': modifying_access},
classified=True,
status_type=timeline.TL_STATUS_OK,
)
tl_event.add_object(obj=site, label='remote_site', name=site.name)
# All OK
messages.success(
request,
'Access level updated for {} {} in site "{}"'.format(
len(access_fields.items()),
get_display_name(
PROJECT_TYPE_PROJECT, count=len(access_fields.items())
),
context['site'].name,
),
)
return redirect(redirect_url)
class RemoteProjectSyncView(
LoginRequiredMixin, LoggedInPermissionMixin, TemplateView
):
"""Target site view for synchronizing remote projects from a source site"""
permission_required = 'projectroles.update_remote'
template_name = 'projectroles/remoteproject_sync.html'
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
# Current site
context['site'] = RemoteSite.objects.filter(
sodar_uuid=kwargs['remotesite']
).first()
return context
def get(self, request, *args, **kwargs):
"""Override get() for a confirmation view"""
redirect_url = reverse('projectroles:remote_sites')
if settings.PROJECTROLES_SITE_MODE == SITE_MODE_SOURCE:
messages.error(
request, 'Site in SOURCE mode, remote sync not allowed'
)
return redirect(redirect_url)
timeline = get_backend_api('timeline_backend')
remote_api = RemoteProjectAPI()
context = self.get_context_data(*args, **kwargs)
source_site = context['site']
try:
remote_data = remote_api.get_remote_data(source_site)
except Exception as ex:
messages.error(
request,
'Unable to synchronize {}: {}'.format(
get_display_name(PROJECT_TYPE_PROJECT, plural=True), ex
),
)
return redirect(redirect_url)
# Sync data
try:
update_data = remote_api.sync_remote_data(
source_site, remote_data, request
)
except Exception as ex:
messages.error(
request, 'Remote sync cancelled with exception: {}'.format(ex)
)
if settings.DEBUG:
raise ex
return redirect(redirect_url)
# Check for updates
user_count = len(
[v for v in update_data['users'].values() if 'status' in v]
)
project_count = len(
[v for v in update_data['projects'].values() if 'status' in v]
)
app_settings_count = len(
[
v
for v in update_data['app_settings'].values()
if 'status' in v and v['status'] != 'skipped'
]
)
role_count = 0
for p in [p for p in update_data['projects'].values() if 'roles' in p]:
for _ in [r for r in p['roles'].values() if 'status' in r]:
role_count += 1
# Redirect if no changes were detected
if (
user_count == 0
and project_count == 0
and role_count == 0
and app_settings_count == 0
):
messages.warning(
request,
'No changes in remote site detected, nothing to synchronize.',
)
return redirect(redirect_url)
context['update_data'] = update_data
context['user_count'] = user_count
context['project_count'] = project_count
context['role_count'] = role_count
context['app_settings_count'] = app_settings_count
# Create timeline events for projects
if timeline:
tl_desc = 'synchronize remote site {remote_site}'
tl_event = timeline.add_event(
project=None,
app_name=APP_NAME,
user=request.user,
event_name='remote_project_sync',
description=tl_desc,
classified=True,
status_type=timeline.TL_STATUS_OK,
)
tl_event.add_object(
obj=source_site, label='remote_site', name=source_site.name
)
messages.success(
request,
'{} data updated according to source site.'.format(
get_display_name(PROJECT_TYPE_PROJECT, title=True)
),
)
return super().render_to_response(context)