Source code for projectroles.views

"""UI views for the projectroles app"""

import json
import logging
import re

from ipaddress import ip_address, ip_network
from typing import Any, Optional, Union
from urllib.parse import unquote_plus, urlparse

from django.apps import apps
from django.conf import settings
from django.contrib import auth, messages
from django.contrib.auth.mixins import AccessMixin
from django.core.exceptions import ImproperlyConfigured
from django.db import transaction
from django.db.models import QuerySet
from django.http import HttpRequest, HttpResponse, HttpResponseRedirect, Http404
from django.shortcuts import redirect
from django.urls import resolve, reverse, reverse_lazy
from django.utils import timezone
from django.utils.safestring import mark_safe
from django.views.generic import (
    TemplateView,
    DetailView,
    UpdateView,
    CreateView,
    DeleteView,
    View,
)
from django.views.generic.edit import ModelFormMixin, FormView
from django.views.generic.detail import ContextMixin

from rules.contrib.views import PermissionRequiredMixin, redirect_to_login

from projectroles import email
from projectroles.app_settings import AppSettingAPI
from projectroles.forms import (
    ProjectForm,
    RoleAssignmentForm,
    ProjectInviteForm,
    SiteAppSettingsForm,
    RemoteSiteForm,
    RoleAssignmentOwnerTransferForm,
    LocalUserForm,
)
from projectroles.models import (
    Project,
    Role,
    RoleAssignment,
    ProjectInvite,
    RemoteSite,
    RemoteProject,
    SODAR_CONSTANTS,
    ROLE_RANKING,
)
from projectroles.plugins import PluginAPI
from projectroles.remote_projects import RemoteProjectAPI
from projectroles.utils import get_display_name


app_settings = AppSettingAPI()
logger = logging.getLogger(__name__)
plugin_api = PluginAPI()
User = auth.get_user_model()


# SODAR constants
PROJECT_TYPE_PROJECT = SODAR_CONSTANTS['PROJECT_TYPE_PROJECT']
PROJECT_TYPE_CATEGORY = SODAR_CONSTANTS['PROJECT_TYPE_CATEGORY']
PROJECT_ROLE_OWNER = SODAR_CONSTANTS['PROJECT_ROLE_OWNER']
PROJECT_ROLE_DELEGATE = SODAR_CONSTANTS['PROJECT_ROLE_DELEGATE']
PROJECT_ROLE_CONTRIBUTOR = SODAR_CONSTANTS['PROJECT_ROLE_CONTRIBUTOR']
PROJECT_ROLE_GUEST = SODAR_CONSTANTS['PROJECT_ROLE_GUEST']
PROJECT_ROLE_VIEWER = SODAR_CONSTANTS['PROJECT_ROLE_VIEWER']
PROJECT_ROLE_FINDER = SODAR_CONSTANTS['PROJECT_ROLE_FINDER']
SITE_MODE_TARGET = SODAR_CONSTANTS['SITE_MODE_TARGET']
SITE_MODE_SOURCE = SODAR_CONSTANTS['SITE_MODE_SOURCE']
SITE_MODE_PEER = SODAR_CONSTANTS['SITE_MODE_PEER']
REMOTE_LEVEL_NONE = SODAR_CONSTANTS['REMOTE_LEVEL_NONE']
REMOTE_LEVEL_VIEW_AVAIL = SODAR_CONSTANTS['REMOTE_LEVEL_VIEW_AVAIL']
REMOTE_LEVEL_READ_INFO = SODAR_CONSTANTS['REMOTE_LEVEL_READ_INFO']
REMOTE_LEVEL_READ_ROLES = SODAR_CONSTANTS['REMOTE_LEVEL_READ_ROLES']
REMOTE_LEVEL_REVOKED = SODAR_CONSTANTS['REMOTE_LEVEL_REVOKED']
APP_SETTING_SCOPE_PROJECT = SODAR_CONSTANTS['APP_SETTING_SCOPE_PROJECT']
APP_SETTING_SCOPE_PROJECT_USER = SODAR_CONSTANTS[
    'APP_SETTING_SCOPE_PROJECT_USER'
]
APP_SETTING_TYPE_JSON = SODAR_CONSTANTS['APP_SETTING_TYPE_JSON']
PROJECT_ACTION_CREATE = SODAR_CONSTANTS['PROJECT_ACTION_CREATE']
PROJECT_ACTION_UPDATE = SODAR_CONSTANTS['PROJECT_ACTION_UPDATE']

# Local constants
APP_NAME = 'projectroles'
SEND_EMAIL = settings.PROJECTROLES_SEND_EMAIL
LOGIN_MSG = 'Please log in.'
NO_AUTH_MSG = 'User not authorized for requested action.'
NO_AUTH_LOGIN_MSG = 'Authentication required, please log in.'
FORM_INVALID_MSG = 'Form submission failed, see the form for details.'
PROJECT_BLOCK_MSG = (
    'Access to {project_type} temporarily blocked by an administrator, please '
    'try again later'
)
PROJECT_WELCOME_MSG = (
    'Welcome to {project_type} "{project_title}". You have been assigned the '
    'role of {role}.'
)
CAT_ARCHIVE_ERR_MSG = 'Setting archival is not allowed for {}.'.format(
    get_display_name(PROJECT_TYPE_CATEGORY, plural=True)
)
USER_PROFILE_UPDATE_MSG = 'User profile updated, please log in again.'
INVITE_NOT_FOUND_MSG = 'Invite not found.'
INVITE_LDAP_LOCAL_VIEW_MSG = (
    'Invite was issued for LDAP user, but local invite view was requested.'
)
INVITE_LOCAL_NOT_ALLOWED_MSG = 'Local users are not allowed.'

INVITE_LOGGED_IN_ACCEPT_MSG = (
    'Logged in user is not allowed to accept invites for other users.'
)
INVITE_USER_NOT_EQUAL_MSG = (
    'Invited user exists, but logged in user is not invited user.'
)
INVITE_USER_EXISTS_MSG = (
    'User with that email already exists. Please login to accept the invite.'
)
ROLE_CREATE_MSG = 'Membership granted with the role of "{role}".'
ROLE_UPDATE_MSG = 'Member role changed to "{role}".'
ROLE_DELETE_MSG = 'Your membership in this {project_type} has been removed.'
ROLE_LEAVE_MSG = 'Member {user_name} left the {project_type}.'
ROLE_LEAVE_INHERIT_MSG = 'Role inherited from parent {category_type}'
ROLE_LEAVE_OWNER_MSG = 'Owner role must be transferred to another user'
ROLE_LEAVE_REMOTE_MSG = (
    '{project_type} is remote, role must be changed on source site'
)
ROLE_FINDER_INFO = (
    'User can see nested {categories} and {projects}, but can not access them '
    'without having a role explicitly assigned.'
)
PROJECT_DELETE_MSG = (
    '{project_type} "{project_title}" deleted by user {user_name}.'
)
PROJECT_DELETE_CAT_ERR_MSG = (
    'Deletion not allowed for {project_type} with children. Delete the '
    'children before attempting deletion.'
)
PROJECT_DELETE_TARGET_ERR_MSG = (
    'Deletion not allowed for remote {project_type} with non-revoked access '
    'level. Revoke remote access on source site to enable deletion.'
)
PROJECT_DELETE_SOURCE_ERR_MSG = (
    'Non-revoked remotes of {project_type} found on target sites. Revoke '
    'access to the remotes to enable deletion.'
)
TARGET_CREATE_DISABLED_MSG = (
    'PROJECTROLES_TARGET_CREATE=False, creation not allowed.'
)
SITE_SETTING_UPDATE_MSG = 'Site app settings updated.'


# General UI view mixins -------------------------------------------------------


[docs] class LoginRequiredMixin(AccessMixin): """ Override of Django LoginRequiredMixin to handle anonymous access and kiosk mode. """ def is_login_required(self) -> bool: if getattr(settings, 'PROJECTROLES_KIOSK_MODE', False) or getattr( settings, 'PROJECTROLES_ALLOW_ANONYMOUS', False ): return False return True def dispatch(self, request, *args, **kwargs): if self.is_login_required() and not request.user.is_authenticated: return self.handle_no_permission() return super().dispatch(request, *args, **kwargs)
[docs] class LoggedInPermissionMixin(PermissionRequiredMixin): """ Mixin for handling redirection for both unlogged users and authenticated users without permissions. """ #: No permission message custom override no_perm_message = None #: No permission message Django messages level no_perm_message_level = 'error'
[docs] def has_permission(self): """ Override for this mixin also to work with admin users without a permission object. """ if getattr(settings, 'PROJECTROLES_KIOSK_MODE', False): return True try: return super().has_permission() except AttributeError: if self.request.user.is_superuser: return True return False
[docs] def add_no_perm_message(self): """ Add Django in the UI if handle_no_permission() fails. This can be overridden to implement specific logic in a view if e.g. a different message should be displayed depending on the referring view. """ level = self.no_perm_message_level.lower() msg_method = getattr(messages, level, None) if not msg_method: raise ValueError(f'Unknown message level "{level}"') if self.no_perm_message: msg = self.no_perm_message elif self.request.user.is_authenticated: msg = NO_AUTH_MSG else: msg = NO_AUTH_LOGIN_MSG msg_method(self.request, msg)
[docs] def handle_no_permission(self): """ Handle no permission and redirect user. If custom message is specified using self.login_message, it will be displayed. """ self.add_no_perm_message() if self.request.user.is_authenticated: return redirect(reverse('home')) return redirect_to_login(self.request.get_full_path())
[docs] class ProjectAccessMixin: """Mixin for providing access to a Project object from request kwargs""" #: Model class to use for projects. Can be overridden by e.g. a proxy model project_class = Project
[docs] def get_project( self, request: Optional[HttpRequest] = None, kwargs: Any = None ) -> Optional[Project]: """ Return SODAR Project object based or None if not found, based on the current request and view kwargs. If arguments are not provided, uses self.request and/or self.kwargs. :param request: Request object (optional) :param kwargs: View kwargs (optional) :return: Object of project_class or None if not found """ request = request or getattr(self, 'request') kwargs = kwargs or getattr(self, 'kwargs') if kwargs is None: raise ImproperlyConfigured('View kwargs are not accessible') # Project class object if 'project' in kwargs: return self.project_class.objects.filter( sodar_uuid=kwargs['project'] ).first() # Other object types if not request: raise ImproperlyConfigured('Current HTTP request is not accessible') model = None uuid_val = None for k, v in kwargs.items(): if not re.match(r'^[0-9a-f-]+$', str(v)): continue try: if '__' in k: # Model from another app ks = k.split('__') model = apps.get_model(ks[0], ks[1]) else: # Model from the same app app_name = resolve(request.path).app_name if app_name.find('.') != -1: app_name = app_name.split('.')[0] model = apps.get_model(app_name, k) uuid_val = k break except LookupError: pass if not model: return None try: obj = model.objects.get(sodar_uuid=kwargs[uuid_val]) if hasattr(obj, 'project'): return obj.project # Some objects may have a get_project() func instead of foreignkey elif hasattr(obj, 'get_project') and callable( getattr(obj, 'get_project', None) ): return obj.get_project() except model.DoesNotExist: pass return None
[docs] class ProjectPermissionMixin(PermissionRequiredMixin, ProjectAccessMixin): """ Mixin for providing a Project object and queryset for permission checking. """
[docs] def get_permission_object(self): return self.get_project()
[docs] def has_permission(self): """Overrides for project permission access""" project = self.get_project() if not project: raise Http404 # Prohibit access if project_access_block is set if ( not self.request.user.is_superuser and project.is_project() and app_settings.get( APP_NAME, 'project_access_block', project=project ) ): messages.error( self.request, PROJECT_BLOCK_MSG.format( project_type=get_display_name(project.type) ) + '.', ) return False # Override permissions for superuser, owner or delegate perm_override = ( self.request.user.is_superuser or project.is_owner_or_delegate(self.request.user) ) if not perm_override and app_settings.get( APP_NAME, 'ip_restrict', project ): for k in ( 'HTTP_X_FORWARDED_FOR', 'X_FORWARDED_FOR', 'FORWARDED', 'REMOTE_ADDR', ): v = self.request.META.get(k) if v: client_address = ip_address(v.split(',')[0]) break else: # Can't fetch client IP address return False ips = app_settings.get(APP_NAME, 'ip_allow_list', project) if not ips: return False for ip in [s.strip() for s in ips.split(',')]: if '/' in ip: if client_address in ip_network(ip): break elif client_address == ip_address(ip): break else: return False # Disable project app access for categories unless specifically enabled if project.is_category(): request_url = resolve(self.request.get_full_path()) if request_url.app_name != APP_NAME: app_plugin = plugin_api.get_app_plugin(request_url.app_name) if app_plugin and app_plugin.category_enable: return True return False # Disable access for non-owner/delegate if remote project is revoked if project.is_revoked() and not perm_override: return False return super().has_permission()
[docs] def get_queryset(self, *args, **kwargs): """ Override get_queryset() to filter down to the currently selected object. """ qs = super().get_queryset(*args, **kwargs) if qs.model == ProjectAccessMixin.project_class: return qs elif hasattr(qs.model, 'get_project_filter_key'): return qs.filter( **{qs.model.get_project_filter_key(): self.get_project()} ) elif hasattr(qs.model, 'project') or hasattr(qs.model, 'get_project'): return qs.filter(project=self.get_project()) raise AttributeError( 'Model does not have "project" member, get_project() method or ' 'get_project_filter_key() method' )
class HTTPRefererMixin: """ Mixin for updating a correct referer url in session cookie regardless of page reload. """ def get(self, request, *args, **kwargs): if 'HTTP_REFERER' in request.META: referer = request.META['HTTP_REFERER'] if ( 'real_referer' not in request.session or referer != request.build_absolute_uri() ): request.session['real_referer'] = referer return super().get(request, *args, **kwargs)
[docs] class PluginContextMixin(ContextMixin): """Mixin for adding plugin list to context data""" def get_context_data(self, *args, **kwargs): context = super().get_context_data(*args, **kwargs) context['app_plugins'] = plugin_api.get_active_plugins( plugin_type='project_app', custom_order=True ) return context
[docs] class ProjectContextMixin( HTTPRefererMixin, PluginContextMixin, ProjectAccessMixin ): """ Mixin for adding context data to Project base view and other views extending it. Includes HTTPRefererMixin for correct referer URL. """ def get_context_data(self, *args, **kwargs): context = super().get_context_data(*args, **kwargs) # Project if hasattr(self, 'object') and isinstance(self.object, Project): context['project'] = self.get_object() elif hasattr(self, 'object') and hasattr(self.object, 'project'): context['project'] = self.object.project else: context['project'] = self.get_project() # Project tagging/starring if 'project' in context and not getattr( settings, 'PROJECTROLES_KIOSK_MODE', False ): context['project_starred'] = app_settings.get( APP_NAME, 'project_star', context['project'], self.request.user, ) return context
class CurrentUserFormMixin: """Mixin for passing current user to form as current_user""" def get_form_kwargs(self): kwargs = super().get_form_kwargs() kwargs.update({'current_user': self.request.user}) return kwargs
[docs] class InvalidFormMixin: """ Mixin for UI improvements in invalid form failure. Recommended to be used with long forms spanning multiple screen heights. """
[docs] def form_invalid(self, form, **kwargs): """Override form_invalid() to add Django message on form failure""" messages.error(self.request, FORM_INVALID_MSG) return super().form_invalid(form, **kwargs)
# Projectroles Internal UI view mixins ----------------------------------------- class ProjectModifyPermissionMixin( LoggedInPermissionMixin, ProjectPermissionMixin ): """ Mixin for handling access to project modifying views, denying access even for local superusers if the project is remote and thus immutable. """ def has_permission(self): """Override has_permission() to check remote project status""" perm = super().has_permission() project = self.get_project() return ( False if project.is_remote() and not self._get_allow_remote_edit() else perm ) def _get_allow_remote_edit(self) -> bool: return getattr(self, 'allow_remote_edit', False) def handle_no_permission(self): """Override handle_no_permission to redirect user""" project = self.get_project() if project and project.is_remote(): messages.error( self.request, 'Modifications are not allowed for remote {}.'.format( get_display_name(PROJECT_TYPE_PROJECT, plural=True) ), ) return redirect(reverse('home')) elif self.request.user.is_authenticated: messages.error(self.request, NO_AUTH_MSG) return redirect(reverse('home')) else: messages.error(self.request, NO_AUTH_LOGIN_MSG) return redirect_to_login(self.request.get_full_path()) class RolePermissionMixin(ProjectModifyPermissionMixin): """ Mixin to ensure permissions for RoleAssignment according to user role in project. """ def has_permission(self): """Override has_permission to check perms depending on role""" if not super().has_permission(): return False try: obj = RoleAssignment.objects.get( sodar_uuid=self.kwargs['roleassignment'] ) if obj.role.name == PROJECT_ROLE_OWNER: return False elif obj.role.name == PROJECT_ROLE_DELEGATE: return self.request.user.has_perm( 'projectroles.update_project_delegate', self.get_permission_object(), ) else: return self.request.user.has_perm( 'projectroles.update_project_members', self.get_permission_object(), ) except RoleAssignment.DoesNotExist: return False def get_permission_object(self): """Override get_permission_object for checking Project permission""" return self.get_project() class ProjectListContextMixin: """Mixin for adding context data for displaying the project list.""" def _get_custom_cols(self, user: User) -> list[dict]: """ Return list of custom columns for projects including project data. :param user: User object :return: List of dicts """ i = 0 ret = [] for app_plugin in [ ap for ap in plugin_api.get_active_plugins(plugin_type='project_app') if ap.project_list_columns ]: # HACK for filesfolders columns (see issues #737 and #738) if app_plugin.name == 'filesfolders' and not getattr( settings, 'FILESFOLDERS_SHOW_LIST_COLUMNS', False ): continue for k, v in app_plugin.project_list_columns.items(): if not v['active']: continue v['app_plugin'] = app_plugin v['column_id'] = k v['ordering'] = v.get('ordering') or i ret.append(v) i += 1 return sorted(ret, key=lambda x: x['ordering']) def get_context_data(self, *args, **kwargs): context = super().get_context_data(*args, **kwargs) context['project_custom_cols'] = self._get_custom_cols( self.request.user ) base_col_count = 1 if self.request.user.is_superuser else 2 context['project_col_count'] = base_col_count + len( context['project_custom_cols'] ) context['page_options_default'] = app_settings.get( APP_NAME, 'project_list_pagination', user=self.request.user ) if not self.kwargs.get('project'): starred_default = app_settings.get( APP_NAME, 'project_list_home_starred', user=self.request.user ) else: starred_default = False context['project_list_starred_default'] = starred_default return context # General Views ---------------------------------------------------------------- class HomeView( LoginRequiredMixin, PluginContextMixin, ProjectListContextMixin, TemplateView, ): """Home view""" template_name = 'projectroles/home.html' # General Project Views -------------------------------------------------------- class ProjectDetailView( LoginRequiredMixin, LoggedInPermissionMixin, ProjectPermissionMixin, ProjectListContextMixin, ProjectContextMixin, DetailView, ): """Project details view""" permission_required = 'projectroles.view_project' model = Project slug_url_kwarg = 'project' slug_field = 'sodar_uuid' def add_no_perm_message(self): """ Override add_login_message() to display a different message when redirected from invite accept view as a new user. """ referer_url = self.request.META.get('HTTP_REFERER') if not referer_url: super().add_no_perm_message() return referer_path = urlparse(referer_url).path resolved_path = resolve(referer_path) if resolved_path.url_name.startswith('invite_process_'): messages.info(self.request, LOGIN_MSG) return super().add_no_perm_message() def get_context_data(self, *args, **kwargs): context = super().get_context_data(*args, **kwargs) project = self.object user = self.request.user # User role in project if user.is_superuser: context['role'] = None else: if user.is_authenticated: role_as = project.get_role(self.request.user) context['role'] = role_as.role if role_as else None if not context.get('role'): context['role'] = project.public_access # Visibility settings has_child_role = False if user.is_superuser: context['show_limited_alert'] = False context['show_project_list'] = project.is_category() has_child_role = True elif user.is_authenticated: has_child_role = project.has_role_in_children(user) context['show_limited_alert'] = ( not context['role'] and not has_child_role ) or ( context['role'] is not None and context['role'].rank >= ROLE_RANKING[PROJECT_ROLE_VIEWER] ) context['show_project_list'] = project.is_category() and ( context['role'] is not None or project.public_access is not None or project.has_public_children or has_child_role ) else: # Anonymous user context['show_limited_alert'] = ( not project.public_access or project.public_access.rank >= ROLE_RANKING[PROJECT_ROLE_VIEWER] ) context['show_project_list'] = project.has_public_children # Remote projects q_kwargs = { 'project_uuid': project.sodar_uuid, 'level': REMOTE_LEVEL_READ_ROLES, } if not self.request.user.has_perm( 'projectroles.view_hidden_projects', project ): q_kwargs['site__user_display'] = True if settings.PROJECTROLES_SITE_MODE == SITE_MODE_SOURCE: context['target_projects'] = RemoteProject.objects.filter( site__mode=SITE_MODE_TARGET, **q_kwargs ).order_by('site__name') elif settings.PROJECTROLES_SITE_MODE == SITE_MODE_TARGET: context['peer_projects'] = RemoteProject.objects.filter( site__mode=SITE_MODE_PEER, **q_kwargs ).order_by('site__name') return context # Search Views ----------------------------------------------------------------- class ProjectSearchMixin: """Common functionalities for search views""" def _get_app_results( self, user: User, search_terms: list[str], search_type: Optional[str], search_keywords: Optional[list[str]], ) -> list: """ Return app plugin search results. :param search_terms: Search terms (list of strings) :param search_type: Optional type keyword for search (string or None) :param search_keywords: Optional keywords (list of strings or None) :return: List """ plugins = plugin_api.get_active_plugins(plugin_type='project_app') ret = [] omit_apps_list = getattr(settings, 'PROJECTROLES_SEARCH_OMIT_APPS', []) search_apps = sorted( [ p for p in plugins if (p.search_enable and p.name not in omit_apps_list) ], key=lambda x: x.plugin_ordering, ) if search_type: search_apps = [ p for p in search_apps if search_type in p.search_types ] for plugin in search_apps: search_kwargs = { 'user': user, 'search_type': search_type, 'search_terms': search_terms, 'keywords': search_keywords, } search_res = { 'plugin': plugin, 'results': None, 'error': None, 'has_results': False, } try: search_res['results'] = plugin.search(**search_kwargs) for r in search_res['results']: if r.items and ( (isinstance(r.items, QuerySet) and r.items.count() > 0) or (isinstance(r.items, list) and len(r.items) > 0) ): search_res['has_results'] = True break # Build results into dict for easier use in templates search_res['results'] = { r.category: r for r in search_res['results'] } except Exception as ex: if settings.DEBUG: raise ex search_res['error'] = str(ex) logger.error( 'Exception raised by search() in {}: "{}" ({})'.format( plugin.name, ex, '; '.join( [f'{k}={v}' for k, v in search_kwargs.items()] ), ) ) ret.append(search_res) return ret def _get_not_found( self, search_type: Optional[str], project_results: list, app_results: list, ) -> list: """ Return list of apps for which objects were search for but not returned. :param search_type: Type keyword for search or None :param project_results: Results for projectroles search :param app_results: Results for app plugin search :return: List """ ret = [] if len(project_results) == 0 and ( not search_type or search_type == 'project' ): ret.append('Projects') for results in [a['results'] for a in app_results]: if not results: continue for k, r in results.items(): type_match = True if search_type else False if not type_match and search_type in r.search_types: type_match = True if (type_match or not search_type) and (not r.items): ret.append(r.title) return ret def dispatch(self, request, *args, **kwargs): if not getattr(settings, 'PROJECTROLES_ENABLE_SEARCH', False): messages.error(request, 'Search is not enabled.') return redirect('home') return super().dispatch(request, *args, **kwargs) class ProjectSearchResultsView( LoginRequiredMixin, ProjectSearchMixin, TemplateView ): """View for displaying results of search within projects""" template_name = 'projectroles/search_results.html' def _handle_context( self, request: HttpRequest, *args, **kwargs ) -> HttpResponse: """Handle context and render to response in GET/POST requests""" context = self.get_context_data(*args, **kwargs) if not context['search_terms']: messages.error(request, 'No search terms provided.') return redirect(reverse('home')) return super().render_to_response(context) def get_context_data(self, *args, **kwargs): context = super().get_context_data(**kwargs) search_input = '' search_terms = [] search_type = None keyword_input = [] search_keywords = {} if self.request.POST.get('m'): # Multi search search_terms = [ t.strip() for t in self.request.POST['m'].strip().split('\r\n') if len(t.strip()) >= 3 ] if self.request.POST.get('k'): keyword_input = self.request.POST['k'].strip().split(' ') elif self.request.GET.get('s'): # Single search search_input = self.request.GET.get('s').strip() search_split = search_input.split(' ') search_term = search_split[0].strip() for i in range(1, len(search_split)): s = search_split[i].strip() if ':' in s: keyword_input.append(s) elif s != '': search_term += ' ' + s.lower() if search_term: search_terms = [search_term] search_terms = list(dict.fromkeys(search_terms)) # Remove dupes for s in keyword_input: kw = s.split(':')[0].lower().strip() val = s.split(':')[1].lower().strip() if kw == 'type': search_type = val else: search_keywords[kw] = val context['search_input'] = search_input context['search_terms'] = search_terms context['search_type'] = search_type context['search_keywords'] = search_keywords # Get project results if not search_type or search_type == 'project': context['project_results'] = [] for p in Project.objects.find(search_terms, project_type='PROJECT'): if p.public_access or self.request.user.has_perm( 'projectroles.view_project', p ): context['project_results'].append(p) elif self.request.user.is_authenticated and p.parent: parent_as = p.parent.get_role(self.request.user) if ( parent_as and parent_as.role.rank >= ROLE_RANKING[PROJECT_ROLE_FINDER] ): context['project_results'].append(p) # Get app results context['app_results'] = self._get_app_results( self.request.user, search_terms, search_type, search_keywords ) # List apps for which no results were found context['not_found'] = self._get_not_found( search_type, context.get('project_results') or [], context['app_results'], ) return context def get(self, request, *args, **kwargs): return self._handle_context(request, *args, *kwargs) def post(self, request, *args, **kwargs): return self._handle_context(request, *args, *kwargs) class ProjectAdvancedSearchView( LoginRequiredMixin, ProjectSearchMixin, TemplateView ): """View for displaying advanced search form""" template_name = 'projectroles/search_advanced.html' def post(self, request, *args, **kwargs): return ProjectSearchResultsView.as_view()(request) # Project Modifying Views ------------------------------------------------------ class ProjectModifyPluginViewMixin: """Helpers for project modify API""" @classmethod def call_project_modify_api( cls, method_name: str, revert_name: Optional[str], method_args: list ): """ Call project modify API for a specific method and parameters. This method Will run reversion methods for all plugins if execution for one fails. :param method_name: Name of execution method in plugin (string) :param revert_name: Name of revert method in plugin (string or None) :param method_args: Arguments to be passed for the methods (list) :raise: Exception if execution for a plugin fails. """ modify_api_apps = getattr(settings, 'PROJECTROLES_MODIFY_API_APPS', []) app_plugins = [] if modify_api_apps: for a in modify_api_apps: plugin = plugin_api.get_app_plugin(a) if not plugin: msg = f'Unable to find active plugin "{a}"' logger.error(msg) raise ImproperlyConfigured(msg) app_plugins.append(plugin) else: app_plugins = plugin_api.get_active_plugins( 'backend' ) + plugin_api.get_active_plugins('project_app') called_plugins = [] for p in app_plugins: if not hasattr(p, method_name): continue # Only there if using ProjectModifyPluginAPIMixin logger.debug(f'Calling {method_name}() in plugin "{p.name}"') try: getattr(p, method_name)(*method_args) called_plugins.append(p) except Exception as ex: logger.error( f'Exception in {method_name}() for plugin "{p.name}": {ex}' ) if revert_name: for cp in called_plugins: try: cp.getattr(revert_name)(*method_args) except Exception as ex_revert: logger.error( f'Exception in {method_name}() for plugin ' f'"{cp.name}": {ex_revert}' ) raise ex class ProjectModifyMixin(ProjectModifyPluginViewMixin): """Mixin for Project creation/updating in UI and API views""" #: Remote site fields site_fields = {} @staticmethod def _get_old_project_data(project: Project) -> dict: """Get existing data from project fields""" return { 'title': project.title, 'parent': project.parent, 'description': project.description, 'readme': project.readme.raw, 'owner': project.get_owner().user, 'public_access': project.public_access, } @classmethod def _get_remote_project_data(cls, project: Project) -> dict: """Return existing remote project data""" ret = {} existing_sites = [] for rp in RemoteProject.objects.filter(project=project): ret[str(rp.site.sodar_uuid)] = rp.level == REMOTE_LEVEL_READ_ROLES existing_sites.append(rp.site.sodar_uuid) # Sites not yet added for rs in RemoteSite.objects.filter( mode=SITE_MODE_TARGET, user_display=True, owner_modifiable=True ).exclude(sodar_uuid__in=existing_sites): ret[str(rs.sodar_uuid)] = False return ret @staticmethod def _get_app_settings( data, instance: Optional[Project], user: User ) -> dict: """ Return a dictionary of project app settings and their values. :param data: Cleaned form data :param instance: Existing Project object or None :param user: User initiating the project update :return: Dict """ app_plugins = [ p for p in plugin_api.get_active_plugins() if p.app_settings ] project_settings = {} p_kwargs = {} # Show unmodifiable settings to superusers if user and not user.is_superuser: p_kwargs['user_modifiable'] = True for plugin in app_plugins + [None]: if plugin: name = plugin.name p_settings = app_settings.get_definitions( APP_SETTING_SCOPE_PROJECT, plugin=plugin, **p_kwargs ) else: name = APP_NAME p_settings = app_settings.get_definitions( APP_SETTING_SCOPE_PROJECT, plugin_name=name, **p_kwargs ) for s_def in p_settings.values(): s_name = f'settings.{name}.{s_def.name}' s_data = data.get(s_name) if instance.type not in s_def.project_types: continue if s_data is None and not instance: s_data = app_settings.get_default(name, s_def.name) if s_def.type == APP_SETTING_TYPE_JSON: if s_data is None: s_data = {} project_settings[s_name] = s_data elif s_data is not None: project_settings[s_name] = s_data return project_settings def _get_project_update_data( self, old_data: dict, project: Project, old_sites: list[RemoteSite], sites: list[RemoteSite], project_settings: dict, ) -> tuple[dict, list]: """ Return update data for project. Lists changed fields in upd_fields. :param old_data: Dict :param project: Project object :param old_sites: Old remote sites (list) :param sites: Current remote sites (list) :param project_settings: Project settings (dict) :return: extra_data (dict), upd_fields (list) """ extra_data = {} upd_fields = [] if old_data['title'] != project.title: extra_data['title'] = project.title upd_fields.append('title') if old_data['parent'] != project.parent: extra_data['parent'] = project.parent upd_fields.append('parent') if old_data['description'] != project.description: extra_data['description'] = project.description upd_fields.append('description') if old_data['readme'] != project.readme.raw: extra_data['readme'] = project.readme.raw upd_fields.append('readme') if old_data['public_access'] != project.public_access: extra_data['public_access'] = project.get_public_access_name() upd_fields.append('public_access') # Remote projects if ( settings.PROJECTROLES_SITE_MODE == SITE_MODE_SOURCE and project.is_project() ): for s in [f.split('.')[1] for f in self.site_fields]: if 'remote_sites' not in upd_fields and ( s not in old_sites or bool(old_sites[s]) != bool(sites[s]) ): upd_fields.append('remote_sites') if 'remote_sites' in upd_fields: extra_data['remote_sites'] = { k: bool(v) for k, v in sites.items() } # Settings for k, v in project_settings.items(): p_name = k.split('.')[1] s_name = k.split('.')[2] old_v = app_settings.get(p_name, s_name, project) if old_v != v: extra_data[k] = v upd_fields.append(k) return extra_data, upd_fields @staticmethod def _get_timeline_ok_status() -> str: timeline = plugin_api.get_backend_api('timeline_backend') if not timeline: raise ImproperlyConfigured('Timeline backend not found') else: return timeline.TL_STATUS_OK def _update_remote_sites(self, project: Project, data: dict) -> dict: """ Update project remote sites. :param project: Project object :param data: Dict :return: Dict """ ret = {} for f in self.site_fields: site_uuid = f.split('.')[1] site = RemoteSite.objects.filter(sodar_uuid=site_uuid).first() # TODO: Validate site here value = data[f] rp = RemoteProject.objects.filter( site=site, project=project ).first() modify = None if rp and ( (value and rp.level != REMOTE_LEVEL_READ_ROLES) or (not value and rp.level == REMOTE_LEVEL_READ_ROLES) ): rp.level = ( REMOTE_LEVEL_READ_ROLES if value else REMOTE_LEVEL_REVOKED ) rp.save() modify = 'Updated' elif not rp and value: # Only create if value is True rp = RemoteProject.objects.create( project_uuid=project.sodar_uuid, project=project, site=site, level=REMOTE_LEVEL_READ_ROLES, ) modify = 'Created' if modify: logger.debug( f'{modify} RemoteProject for site "{site.name}" ' f'({site.sodar_uuid}): {rp.level}' ) ret[site_uuid] = rp and rp.level == REMOTE_LEVEL_READ_ROLES return ret @classmethod def _update_settings(cls, project: Project, project_settings: dict): """Update project settings""" is_remote = project.is_remote() for k, v in project_settings.items(): _, plugin_name, setting_name = k.split('.', 3) # Skip updating global settings on target site if is_remote: # TODO: Optimize (this can require a lot of queries) s_def = app_settings.get_definition( setting_name, plugin_name=plugin_name ) if s_def.global_edit: continue app_settings.set( plugin_name=k.split('.')[1], setting_name=k.split('.')[2], value=v, project=project, validate=True, ) def _create_timeline_event( self, project: Project, action: str, owner: User, old_data: dict, old_sites: list, sites: list, project_settings: dict, request: HttpRequest, ) -> Any: # Actually TimelineEvent but not declared here """Create timeline event for action""" timeline = plugin_api.get_backend_api('timeline_backend') if not timeline: return None type_str = project.type.capitalize() if action == PROJECT_ACTION_CREATE: tl_desc = 'create ' + type_str.lower() + ' with {owner} as owner' extra_data = { 'title': project.title, 'owner': owner.username, 'description': project.description, 'readme': project.readme.raw, } # Add settings to extra data for k, v in project_settings.items(): extra_data[k] = v else: # Update tl_desc = 'update ' + type_str.lower() extra_data, upd_fields = self._get_project_update_data( old_data, project, old_sites, sites, project_settings ) if extra_data.get('parent'): # Convert parent object into UUID extra_data['parent'] = str(extra_data['parent'].sodar_uuid) if len(upd_fields) > 0: tl_desc += ' (' + ', '.join(x for x in upd_fields) + ')' tl_event = timeline.add_event( project=project, app_name=APP_NAME, user=request.user, event_name=f'project_{action.lower()}', description=tl_desc, extra_data=extra_data, ) if action == PROJECT_ACTION_CREATE: tl_event.add_object(owner, 'owner', owner.username) return tl_event @classmethod def _get_notify_recipients( cls, project: Project, request: HttpRequest ) -> list[User]: """ Return list of owner and delegate users to send notification alerts or emails to. Omits request user. This list can be further filtered down to check for user app settings. :param project: Project object :param request: Request object :return: List of User objects """ return [ a.user for a in project.get_roles( max_rank=ROLE_RANKING[PROJECT_ROLE_DELEGATE] ) if a.user != request.user and a.user.is_active ] @classmethod def _notify_users( cls, project: Project, action: str, owner: User, old_parent: Optional[Project], request: HttpRequest, ): """ Notify users about project creation and update. Displays app alerts and/or sends emails depending on the site configuration. """ app_alerts = plugin_api.get_backend_api('appalerts_backend') # Create alerts and send emails owner_as = RoleAssignment.objects.filter( project=project, user=owner ).first() # Owner change notification if ( request.user != owner and action == PROJECT_ACTION_CREATE and owner.is_active ): if app_alerts and app_settings.get( APP_NAME, 'notify_alert_role', user=owner ): app_alerts.add_alert( app_name=APP_NAME, alert_name='role_create', user=owner, message=ROLE_CREATE_MSG.format( project=project.title, role=owner_as.role.name ), url=reverse( 'projectroles:detail', kwargs={'project': project.sodar_uuid}, ), project=project, ) if SEND_EMAIL and app_settings.get( APP_NAME, 'notify_email_role', user=owner ): email.send_role_change_mail( action.lower(), project, owner, owner_as.role, request, ) # Project creation/moving for parent category owners and delegates if not project.parent: return recipients = cls._get_notify_recipients(project, request) # TODO: Could optimize this by pre-retrieving notify settings if action == PROJECT_ACTION_CREATE and recipients: if app_alerts: for r in recipients: if not app_settings.get( APP_NAME, 'notify_alert_project', user=r ): continue app_alerts.add_alert( app_name=APP_NAME, alert_name='project_create_parent', user=r, message=f'New {project.type.lower()} created under ' f'category "{project.parent.title}": ' f'"{project.title}".', url=reverse( 'projectroles:detail', kwargs={'project': project.sodar_uuid}, ), project=project, ) if SEND_EMAIL: email.send_project_create_mail(project, recipients, request) elif old_parent and recipients: for r in recipients: if not app_settings.get( APP_NAME, 'notify_alert_project', user=r ): continue app_alerts.add_alert( app_name=APP_NAME, alert_name='project_move_parent', user=r, message=f'{project.type.capitalize()} moved under category ' f'"{project.parent.title}": {project.title}".', url=reverse( 'projectroles:detail', kwargs={'project': project.sodar_uuid}, ), project=project, ) if SEND_EMAIL: email.send_project_move_mail(project, recipients, request) @transaction.atomic def modify_project( self, data: dict, request: HttpRequest, instance: Optional[Project] = None, ) -> Project: """ Create or update a Project. This method should be called either in form_valid() in a Django form view or save() in a DRF serializer. :param data: Cleaned data from a form or serializer :param request: Request initiating the action :param instance: Existing Project object or None :return: Created or updated Project object """ action = PROJECT_ACTION_UPDATE if instance else PROJECT_ACTION_CREATE old_data = {} old_project = None if instance: project = instance # In case of a PATCH request, get existing obj to fill out fields old_project = Project.objects.get(sodar_uuid=instance.sodar_uuid) old_data = self._get_old_project_data(old_project) # Store old data project.title = data.get('title') or old_project.title project.description = ( data.get('description') or old_project.description ) project.type = data.get('type') or old_project.type project.readme = data.get('readme') or old_project.readme # NOTE: Must do this as parent can exist but be None project.parent = ( data['parent'] if 'parent' in data else old_project.parent ) project.public_access = data.get('public_access') else: project = Project( title=data.get('title'), description=data.get('description'), type=data.get('type'), readme=data.get('readme'), parent=data.get('parent'), public_access=data.get('public_access'), ) project.save() owner = data.get('owner') if not owner and old_project: # In case of a PATCH request owner = old_project.get_owner().user # Create/update RemoteProject objects old_sites = {} sites = {} if ( settings.PROJECTROLES_SITE_MODE == SITE_MODE_SOURCE and project.is_project() ): self.site_fields = [f for f in data if f.startswith('remote_site.')] old_sites = self._get_remote_project_data(project) sites = self._update_remote_sites(project, data) # Get app settings, store old settings project_settings = self._get_app_settings(data, project, request.user) old_settings = None if action == PROJECT_ACTION_UPDATE: old_settings = json.loads(json.dumps(project_settings)) # Copy # Create timeline event tl_event = self._create_timeline_event( project, action, owner, old_data, old_sites, sites, project_settings, request, ) # Get old parent for project moving old_parent = ( old_project.parent if old_project and old_project.parent and old_project.parent != project.parent else None ) # Update owner and settings if action == PROJECT_ACTION_CREATE: RoleAssignment.objects.create( project=project, user=owner, role=Role.objects.get(name=PROJECT_ROLE_OWNER), ) self._update_settings(project, project_settings) project.save() # TODO: Is this required anymore? # Call for additional actions for project creation/update in plugins if getattr(settings, 'PROJECTROLES_ENABLE_MODIFY_API', False): args = [project, action, project_settings] if action == PROJECT_ACTION_UPDATE: args += [old_data, old_settings] else: args += [None, None] args.append(request) self.call_project_modify_api( 'perform_project_modify', 'revert_project_modify', args ) # If public access was updated, update has_public_children for parents if ( old_project and project.parent and old_project.public_access != project.public_access ): try: project._update_public_children() except Exception as ex: logger.error( f'Exception in updating has_public_children(): {ex}' ) # Once all is done, update timeline event, create alerts and emails if tl_event: tl_event.set_status(self._get_timeline_ok_status()) self._notify_users(project, action, owner, old_parent, request) return project class ProjectModifyFormMixin(ProjectModifyMixin): """Mixin for Project creation/updating in Django form views""" def form_valid(self, form): """Handle project updating if form is valid""" instance = form.instance if form.instance.pk else None action = PROJECT_ACTION_UPDATE if instance else PROJECT_ACTION_CREATE if instance and instance.parent: redirect_url = reverse( 'projectroles:detail', kwargs={'project': instance.parent.sodar_uuid}, ) else: redirect_url = reverse('home') try: project = self.modify_project( data=form.cleaned_data, request=self.request, instance=form.instance if instance else None, ) messages.success( self.request, '{} {}d.'.format( get_display_name(project.type, title=True), action.lower() ), ) redirect_url = reverse( 'projectroles:detail', kwargs={'project': project.sodar_uuid} ) except Exception as ex: messages.error( self.request, 'Unable to {} {}: {}'.format( action.lower(), form.cleaned_data['type'].lower(), ex ), ) if settings.DEBUG: raise ex return redirect(redirect_url) class ProjectDeleteAccessMixin: """ Mixin to check special access permissions for project deletion. Also used for ProjectUpdateView to control access to view link, hence a separate mixin. We want to provide an informative message to the end user and also prevent superuser access if needed, hence we're not implementing this in rules. """ def check_delete_permission( self, project: Project ) -> tuple[bool, Optional[str]]: """ Check delete permission. Also applies to superusers. :param project: Project object :return: Boolean, string (in case of error) or None """ # Prohibit access if project_access_block is set if ( not self.request.user.is_superuser and project.is_project() and app_settings.get( APP_NAME, 'project_access_block', project=project ) ): return ( False, PROJECT_BLOCK_MSG.format( project_type=get_display_name(project.type) ) + '.', ) p_type = get_display_name(project.type) if project.is_category() and project.get_children().count() > 0: return False, PROJECT_DELETE_CAT_ERR_MSG.format(project_type=p_type) # Disallow for remote projects which haven't been revoked if project.is_remote(): rp = RemoteProject.objects.filter( project_uuid=project.sodar_uuid, site__mode=SITE_MODE_SOURCE, ).first() if rp.level != REMOTE_LEVEL_REVOKED: return ( False, PROJECT_DELETE_TARGET_ERR_MSG.format(project_type=p_type), ) # Disallow for source projects with non-revoked remote projects # NOTE: Categories can be deleted elif project.is_project(): rps = RemoteProject.objects.filter( project_uuid=project.sodar_uuid, site__mode=SITE_MODE_TARGET ).exclude(level__in=[REMOTE_LEVEL_NONE, REMOTE_LEVEL_REVOKED]) if rps.count() > 0: return ( False, PROJECT_DELETE_SOURCE_ERR_MSG.format(project_type=p_type), ) return True, None class ProjectDeleteMixin(ProjectModifyPluginViewMixin): """Mixin for Project deletion in UI and API views""" @classmethod def _create_timeline_event(cls, project: Project, request: HttpRequest): """ Create timeline summary event for project deletion. Created as a classified site-wide event only viewable by superusers. :param project: Project object :param request: HttpRequest object """ timeline = plugin_api.get_backend_api('timeline_backend') if not timeline: return local_users = { a.user.username: a.role.name for a in project.local_roles.order_by( 'role__rank', 'user__username' ) } parent = str(project.parent.sodar_uuid) if project.parent else None extra_data = { 'title': project.title, 'type': project.type, 'parent': parent, 'description': project.description, 'readme': project.readme.raw, 'public_access': project.get_public_access_name(), 'archive': project.archive, 'full_title': project.full_title, 'sodar_uuid': str(project.sodar_uuid), 'local_roles': local_users, } timeline.add_event( project=None, # No project as it has been deleted app_name=APP_NAME, user=request.user, event_name='project_delete', description=f'delete {project.type.lower()} ' f'{project.get_log_title()}', extra_data=extra_data, classified=True, status_type=timeline.TL_STATUS_OK, ) @classmethod def get_redirect_url(cls, project: Project) -> str: if project.parent: return reverse( 'projectroles:detail', kwargs={'project': project.parent.sodar_uuid}, ) else: return reverse('home') def handle_delete(self, project: Project, request: HttpRequest): """ Handle project deletion. Deletes the object, creates a summary timeline event and sends out alerts and emails to project members. :param project: Project object of project to be deleted :param request: HttpRequest object """ app_alerts = plugin_api.get_backend_api('appalerts_backend') # Call project modify API plugin if getattr(settings, 'PROJECTROLES_ENABLE_MODIFY_API', False): self.call_project_modify_api( 'perform_project_delete', None, [project] ) # Create timeline event self._create_timeline_event(project, request) recipients = users = [ a.user for a in project.get_roles() if a.user.is_active and a.user != request.user ] # Create app alerts if app_alerts and recipients: alert_users = [ u for u in users if app_settings.get(APP_NAME, 'notify_alert_project', user=u) ] app_alerts.add_alerts( app_name=APP_NAME, alert_name='project_delete', users=alert_users, message=PROJECT_DELETE_MSG.format( project_type=get_display_name(project.type, title=True), project_title=project.title, user_name=request.user.username, ), ) # Send email if SEND_EMAIL and recipients: email.send_project_delete_mail(project, recipients, request) # Actually delete project object project.delete() class ProjectCreateView( LoginRequiredMixin, LoggedInPermissionMixin, ProjectPermissionMixin, ProjectModifyFormMixin, ProjectContextMixin, HTTPRefererMixin, CurrentUserFormMixin, InvalidFormMixin, CreateView, ): """Project creation view""" permission_required = 'projectroles.create_project' model = Project form_class = ProjectForm def has_permission(self): """ Override has_permission() to ensure even superuser can't create project under a remote category as target """ if not self.kwargs.get('project'): if self.request.user.is_superuser: return True # Allow top level project creation for superuser return False # Disallow for other users elif ( settings.PROJECTROLES_SITE_MODE == SITE_MODE_TARGET and self.kwargs.get('project') ): parent = Project.objects.filter( sodar_uuid=self.kwargs['project'] ).first() if parent and parent.is_remote(): return False return super().has_permission() def get_context_data(self, *args, **kwargs): context = super().get_context_data(*args, **kwargs) if 'project' in self.kwargs: context['parent'] = Project.objects.get( sodar_uuid=self.kwargs['project'] ) return context def get_form_kwargs(self): """Pass URL arguments to form""" kwargs = super().get_form_kwargs() kwargs.update(self.kwargs) return kwargs def dispatch(self, request, *args, **kwargs): """Override dispatch() for target site check""" # If site is in target mode and target creation is not allowed, redirect if ( settings.PROJECTROLES_SITE_MODE == SITE_MODE_TARGET and not settings.PROJECTROLES_TARGET_CREATE ): messages.error(request, TARGET_CREATE_DISABLED_MSG) return redirect(reverse('home')) return super().dispatch(request, *args, **kwargs) def get(self, request, *args, **kwargs): """Override get() to limit project creation under other projects""" if 'project' in self.kwargs: project = Project.objects.get(sodar_uuid=self.kwargs['project']) if project.is_project(): messages.error( self.request, 'Creating nested {} is not allowed.'.format( get_display_name(PROJECT_TYPE_PROJECT, plural=True) ), ) return redirect( reverse( 'projectroles:detail', kwargs={'project': project.sodar_uuid}, ) ) return super().get(request, *args, **kwargs) class ProjectUpdateView( LoginRequiredMixin, ProjectModifyPermissionMixin, ProjectContextMixin, ProjectModifyFormMixin, ProjectDeleteAccessMixin, CurrentUserFormMixin, InvalidFormMixin, UpdateView, ): """Project updating view""" permission_required = 'projectroles.update_project' model = Project form_class = ProjectForm slug_url_kwarg = 'project' slug_field = 'sodar_uuid' allow_remote_edit = True def get_context_data(self, *args, **kwargs): context = super().get_context_data(*args, **kwargs) access, msg = self.check_delete_permission(self.get_object()) context['project_delete_access'] = access context['project_delete_msg'] = msg or '' return context class ProjectArchiveView( LoginRequiredMixin, ProjectModifyPermissionMixin, ProjectContextMixin, ProjectModifyPluginViewMixin, TemplateView, ): """Project archiving/unarchiving view""" template_name = 'projectroles/project_archive_confirm.html' permission_required = 'projectroles.update_project' def _alert_users(self, project: Project, action: str, user: User): """ Alert users on project archiving/unarchiving. :param project: Project object :param action: String ("archive" or "unarchive") :param user: User initiating project archiving/unarchiving """ app_alerts = plugin_api.get_backend_api('appalerts_backend') if not app_alerts: return alert_p = get_display_name(PROJECT_TYPE_PROJECT, title=True) if action == 'archive': alert_msg = f'{alert_p} data is now read-only.' else: alert_msg = f'{alert_p} data can be modified.' users = [ a.user for a in project.get_roles() if a.user.is_active and a.user != user and app_settings.get(APP_NAME, 'notify_alert_project', user=a.user) ] if not users: return else: app_alerts.add_alerts( app_name=APP_NAME, alert_name=f'project_{action}', users=users, message=f'{alert_p} {action}d by {user.get_full_name()}. ' f'{alert_msg}', url=reverse( 'projectroles:detail', kwargs={'project': project.sodar_uuid}, ), project=project, ) def get(self, request, *args, **kwargs): """Override get() to check project type""" project = self.get_project() if project.is_category(): messages.error(request, CAT_ARCHIVE_ERR_MSG) return redirect( reverse( 'projectroles:detail', kwargs={'project': project.sodar_uuid}, ) ) return super().render_to_response( self.get_context_data(*args, **kwargs) ) def post(self, request, **kwargs): """Override post() to handle POST from confirmation template""" timeline = plugin_api.get_backend_api('timeline_backend') project = self.get_project() redirect_url = reverse( 'projectroles:detail', kwargs={'project': project.sodar_uuid} ) if project.is_category(): messages.error(request, CAT_ARCHIVE_ERR_MSG) return redirect(redirect_url) status = request.POST.get('status') if status is None: messages.error(request, 'Status not set, unable to set archival.') return redirect(redirect_url) status = True if status.lower() in ['1', 'true'] else False action = 'unarchive' if not status else 'archive' if project.archive == status: messages.warning( request, '{} is already {}d.'.format( get_display_name(project.type, title=True), action ), ) return redirect(redirect_url) try: project.set_archive(status) # Call for additional actions for archive/unarchive in plugins if getattr(settings, 'PROJECTROLES_ENABLE_MODIFY_API', False): self.call_project_modify_api( 'perform_project_archive', 'revert_project_archive', [project], ) messages.success( request, '{} {}d.'.format( get_display_name(project.type, title=True), action ), ) except Exception as ex: messages.error( request, 'Failed to {} {}: {}'.format( action, get_display_name(project.type), ex ), ) return redirect(redirect_url) try: if timeline: timeline.add_event( project=project, app_name=APP_NAME, user=request.user, event_name=f'project_{action}', description=f'{action} project', status_type=timeline.TL_STATUS_OK, ) # Alert users and send email self._alert_users(project, action, request.user) if SEND_EMAIL: # NOTE: Opt-out settings checked in email method email.send_project_archive_mail(project, action, request) except Exception as ex: messages.error(request, f'Failed to alert users: {ex}') return redirect(redirect_url) class ProjectDeleteView( LoginRequiredMixin, LoggedInPermissionMixin, ProjectContextMixin, ProjectDeleteMixin, ProjectDeleteAccessMixin, ProjectModifyPluginViewMixin, DeleteView, ): """Project deletion view""" model = Project permission_required = 'projectroles.delete_project' slug_field = 'sodar_uuid' slug_url_kwarg = 'project' def dispatch(self, request, *args, **kwargs): if not request.user.is_authenticated: return self.handle_no_permission() project = self.get_object() # Prevent access in certain conditions, even for superusers access, msg = self.check_delete_permission(project) if not access: messages.error(self.request, msg) return redirect('home') return super().dispatch(request, *args, **kwargs) def post(self, *args, **kwargs): project = self.get_object() redirect_url = reverse( 'projectroles:update', kwargs={'project': project.sodar_uuid} ) # Don't allow deletion unless user has input the host name host_confirm = self.request.POST.get('delete_host_confirm') actual_host = self.request.get_host().split(':')[0] if not host_confirm or host_confirm != actual_host: msg = ( f'Incorrect host name for confirming sheet deletion: ' f'"{host_confirm}"' ) logger.error(msg + f' (correct={actual_host})') messages.error( self.request, 'Host name input incorrect, deletion cancelled.' ) return redirect(redirect_url) # Proceed with deletion try: with transaction.atomic(): self.handle_delete(project, self.request) p_type = get_display_name(project.type, title=True) messages.success(self.request, f'{p_type} deleted.') redirect_url = self.get_redirect_url(project) except Exception as ex: if settings.DEBUG: raise ex p_type = get_display_name(project.type, title=False) messages.error(self.request, f'Failed to delete {p_type}: {ex}') return redirect(redirect_url) # RoleAssignment Views --------------------------------------------------------- class ProjectRoleView( LoginRequiredMixin, LoggedInPermissionMixin, ProjectPermissionMixin, ProjectContextMixin, TemplateView, ): """View for displaying project roles""" permission_required = 'projectroles.view_project_roles' template_name = 'projectroles/project_roles.html' model = Project def get_context_data(self, *args, **kwargs): project = self.get_project() project_remote = project.is_remote() context = super().get_context_data(*args, **kwargs) context['roles'] = sorted( project.get_roles(), key=lambda x: [x.role.rank, x.user.username] ) context['role_pagination'] = settings.PROJECTROLES_ROLE_PAGINATION owner_rank = ROLE_RANKING[PROJECT_ROLE_OWNER] context['enable_owner_transfer'] = any( [r.role.rank >= owner_rank for r in context['roles']] ) if project.is_remote(): context[ 'remote_roles_url' ] = project.get_source_site().url + reverse( 'projectroles:roles', kwargs={'project': project.sodar_uuid} ) context['finder_info'] = ROLE_FINDER_INFO.format( categories=get_display_name(PROJECT_TYPE_CATEGORY, plural=True), projects=get_display_name(PROJECT_TYPE_PROJECT, plural=True), ) context['site_read_only'] = app_settings.get(APP_NAME, 'site_read_only') if self.request.user.is_authenticated: # In case of superuser or public access project context['user_has_role'] = project.has_role( self.request.user, public=False ) own_local_as = RoleAssignment.objects.filter( project=project, user=self.request.user ).first() context['own_local_as'] = own_local_as # If site read-only mode is set, we can skip the rest as the button # is hidden if context['site_read_only']: return context context['project_leave_access'] = ( own_local_as is not None and own_local_as.role.rank > ROLE_RANKING[PROJECT_ROLE_OWNER] and not project_remote ) leave_msg = '' if not own_local_as: leave_msg = ROLE_LEAVE_INHERIT_MSG.format( category_type=get_display_name(PROJECT_TYPE_CATEGORY) ) elif own_local_as.role.rank == ROLE_RANKING[PROJECT_ROLE_OWNER]: leave_msg = ROLE_LEAVE_OWNER_MSG elif project_remote: leave_msg = ROLE_LEAVE_REMOTE_MSG.format( project_type=get_display_name(project.type, title=True) ) context['project_leave_msg'] = leave_msg return context class RoleAssignmentModifyMixin(ProjectModifyPluginViewMixin): """Mixin for RoleAssignment creation/updating in UI and API views""" @transaction.atomic def modify_assignment( self, data: dict, request: HttpRequest, project: Project, instance: Optional[RoleAssignment] = None, promote: bool = False, ) -> RoleAssignment: """ Create or update a RoleAssignment. This method should be called either in form_valid() in a Django form view or save() in a DRF serializer. The method calls related ProjectModifyPluginAPIMixin methods if enabled in your plugin. :param data: Cleaned data from a form or serializer :param request: Request initiating the action :param project: Project object :param instance: Existing RoleAssignment object or None :param promote: Promoting an inherited user (boolean, default=False) :return: Created or updated RoleAssignment object """ app_alerts = plugin_api.get_backend_api('appalerts_backend') timeline = plugin_api.get_backend_api('timeline_backend') tl_event = None action = PROJECT_ACTION_UPDATE if instance else PROJECT_ACTION_CREATE user = data.get('user') role = data.get('role') # Init Timeline event if timeline: tl_desc = '{} role {}"{}" for {{{}}}'.format( action.lower(), 'to ' if action == PROJECT_ACTION_UPDATE else '', role.name, 'user', ) tl_event = timeline.add_event( project=project, app_name=APP_NAME, user=request.user, event_name=f'role_{action.lower()}', description=tl_desc, ) tl_event.add_object(user, 'user', user.username) if action == PROJECT_ACTION_CREATE: role_as = RoleAssignment(project=project, user=user, role=role) old_role = None else: role_as = RoleAssignment.objects.get(project=project, user=user) old_role = role_as.role role_as.role = role role_as.save() # Call for additional actions for role creation/update in plugins if getattr(settings, 'PROJECTROLES_ENABLE_MODIFY_API', False): args = [role_as, action, old_role, request] self.call_project_modify_api( 'perform_role_modify', 'revert_role_modify', args ) if tl_event: tl_event.set_status('OK') if request.user != user: if app_alerts and app_settings.get( APP_NAME, 'notify_alert_role', user=user ): if action == PROJECT_ACTION_CREATE: alert_msg = ROLE_CREATE_MSG else: # Update alert_msg = ROLE_UPDATE_MSG app_alerts.add_alert( app_name=APP_NAME, alert_name='role_' + action.lower(), user=user, message=alert_msg.format( project=project.title, role=role.name ), url=reverse( 'projectroles:detail', kwargs={'project': project.sodar_uuid}, ), project=project, ) if SEND_EMAIL and app_settings.get( APP_NAME, 'notify_email_role', user=user ): email.send_role_change_mail( 'update' if promote else action.lower(), project, user, role, request, ) return role_as class RoleAssignmentModifyFormMixin(RoleAssignmentModifyMixin, ModelFormMixin): """Mixin for RoleAssignment creation and updating in Django form views""" def get_context_data(self, *args, **kwargs): context = super().get_context_data(*args, **kwargs) if self.kwargs.get('promote_as'): change_type = 'update' # If promoting inherited role else: change_type = self.request.resolver_match.url_name.split('_')[1] project = self.get_project() if change_type != 'delete': context['preview_subject'] = email.get_role_change_subject( change_type, project ) context['preview_body'] = email.get_role_change_body( change_type=change_type, project=project, user_name='{user_name}', issuer=self.request.user, role_name='{role_name}', request=self.request, ).replace('\n', '\\n') return context def form_valid(self, form): """Handle RoleAssignment updating if form is valid""" instance = form.instance if form.instance.pk else None action = 'update' if instance else 'create' project = self.get_project() try: self.object = self.modify_assignment( data=form.cleaned_data, request=self.request, project=project, instance=form.instance if instance else None, promote=True if form.cleaned_data.get('promote') else False, ) messages.success( self.request, 'Membership {} for {} with the role of {}.'.format( 'added' if action == 'create' else 'updated', self.object.user.username, self.object.role.name, ), ) except Exception as ex: messages.error(self.request, f'Membership updating failed: {ex}') return redirect( reverse( 'projectroles:roles', kwargs={'project': project.sodar_uuid}, ) ) class AppSettingCleanupMixin: """Mixin for helping with app setting cleanup in RoleAssignment removal""" @classmethod def cleanup_app_settings(cls, project: Project, user: User): """ Delete PROJECT_USER scope app settings for user in the project from which a role assignment was removed. Deletes app settings from inherited roles in category children. Leaves settings for categories if child roles exist. NOTE: Assumes the RoleAssignment object has already been deleted. :param project: Project object :param user: User object """ # If user still has an inherited role in project, skip if project.has_role(user): return # Delete settings for current project if project or no children if not project.is_category() or not project.has_role_in_children(user): app_settings.delete_by_scope( APP_SETTING_SCOPE_PROJECT_USER, project, user ) # Delete for children in case of a category children = ( project.get_children(flat=True) if project.is_category() else [] ) for c in children: if not c.has_role(user) and ( not c.is_category() or not c.has_role_in_children(user) ): app_settings.delete_by_scope( APP_SETTING_SCOPE_PROJECT_USER, c, user ) class RoleAssignmentDeleteMixin( AppSettingCleanupMixin, ProjectModifyPluginViewMixin ): """Mixin for RoleAssignment deletion/destroying in UI and API views""" @classmethod def _add_user_alert( cls, app_alerts: Any, project: Project, user: User, inh_as: Optional[RoleAssignment] = None, ): """ Create app alert for user on role assignment deletion. Creates a new alert as appropriate and dismisses alerts in projects the user can no longer access. :param app_alerts: AppAlertAPI object :param project: Project object :param user: User object :param inh_as: RoleAssignment object for inherited assignment or None """ if inh_as: message = ROLE_UPDATE_MSG.format( project=project.title, role=inh_as.role.name ) else: message = ROLE_DELETE_MSG.format( project_type=get_display_name(project.type) ) app_alerts.add_alert( app_name=APP_NAME, alert_name='role_{}'.format('update' if inh_as else 'delete'), user=user, message=message, project=project, ) @classmethod def _add_leave_alerts(cls, app_alerts: Any, project: Project, user: User): """ Send alerts to project owners and delegates about user leaving. :param app_alerts: AppAlertAPI object :param project: Project object :param user: User object """ recipients = [ a.user for a in project.get_roles( max_rank=ROLE_RANKING[PROJECT_ROLE_DELEGATE] ) if a.user.is_active and a.user != user and app_settings.get(APP_NAME, 'notify_alert_role', user=user) ] for r in recipients: app_alerts.add_alert( app_name=APP_NAME, alert_name='role_delete_own', user=r, message=ROLE_LEAVE_MSG.format( user_name=user.username, project_type=get_display_name(project.type), ), project=project, ) @classmethod @transaction.atomic def _dismiss_user_alerts( cls, app_alerts: Any, project: Project, user: User ): """ Dismiss user alerts in project and children without local role. :param app_alerts: AppAlertAPI object :param project: Project object :param user: User object """ AppAlert = app_alerts.get_model() dis_projects = [project] if project.is_category(): for c in project.get_children(flat=True): if not c.has_role(user): dis_projects.append(c) for a in AppAlert.objects.filter( user=user, project__in=dis_projects, active=True ): a.active = False a.save() def delete_assignment( self, role_as: RoleAssignment, request: Optional[HttpRequest] = None, notify: bool = True, ): """ Delete RoleAssignment. Calls the modify API for additional actions, raises app alerts and sends email notifications about the deletion. :param role_as: RoleAssingment object :param request: HttpRequest object or None :param notify: Add app alerts and send email if True """ app_alerts = plugin_api.get_backend_api('appalerts_backend') timeline = plugin_api.get_backend_api('timeline_backend') tl_event = None project = role_as.project user = role_as.user role = role_as.role # Init Timeline event if timeline: tl_event = timeline.add_event( project=project, app_name=APP_NAME, user=request.user if request else None, event_name='role_delete', description='delete role "{}" from {{{}}}'.format( role.name, 'user' ), ) tl_event.add_object(user, 'user', user.username) # Call the project plugin modify API for additional actions if getattr(settings, 'PROJECTROLES_ENABLE_MODIFY_API', False): self.call_project_modify_api( 'perform_role_delete', 'revert_role_delete', [role_as, request] ) # Delete object itself role_as.delete() # Delete corresponding PROJECT_USER settings self.cleanup_app_settings(project, user) if tl_event: tl_event.set_status(timeline.TL_STATUS_OK) if not notify: return role_as inh_as = project.get_role(user, inherited_only=True) if app_alerts: if request and request.user == user: self._add_leave_alerts(app_alerts, project, user) elif app_settings.get(APP_NAME, 'notify_alert_role', user=user): self._add_user_alert(app_alerts, project, user, inh_as) if not inh_as: self._dismiss_user_alerts(app_alerts, project, user) if SEND_EMAIL and request: if request and request.user == user: email.send_project_leave_mail(project, user, request) elif app_settings.get(APP_NAME, 'notify_email_role', user=user): if inh_as: email.send_role_change_mail( 'update', project, user, inh_as.role, request ) else: email.send_role_change_mail( 'delete', project, user, None, request ) return role_as class RoleAssignmentCreateView( LoginRequiredMixin, ProjectModifyPermissionMixin, ProjectContextMixin, CurrentUserFormMixin, RoleAssignmentModifyFormMixin, InvalidFormMixin, CreateView, ): """RoleAssignment creation view""" permission_required = 'projectroles.update_project_members' model = RoleAssignment form_class = RoleAssignmentForm #: Promote assignment promote_as = None def get_form_kwargs(self): """Pass URL arguments and current user to form""" kwargs = super().get_form_kwargs() kwargs.update(self.kwargs) return kwargs def get_context_data(self, **kwargs): context = super().get_context_data() context['promote_as'] = self.promote_as # Queried in get() return context def get(self, request, *args, **kwargs): # Validate inherited role promotion if set if self.kwargs.get('promote_as'): project = self.get_project() redirect_url = reverse( 'projectroles:roles', kwargs={'project': project.sodar_uuid} ) self.promote_as = RoleAssignment.objects.filter( sodar_uuid=self.kwargs['promote_as'] ).first() # Check for reached delegate limit del_count = RoleAssignment.objects.filter( project=project, role__name=PROJECT_ROLE_DELEGATE ).count() del_limit = settings.PROJECTROLES_DELEGATE_LIMIT if ( self.promote_as and self.promote_as.role.rank == ROLE_RANKING[PROJECT_ROLE_CONTRIBUTOR] and del_count >= del_limit ): messages.warning( request, f'Local delegate limit ({del_limit}) reached, no available ' f'roles for promotion.', ) return redirect(redirect_url) # Check for invalid roles if ( not self.promote_as or self.promote_as.role.rank <= ROLE_RANKING[PROJECT_ROLE_DELEGATE] or self.promote_as.project == project or self.promote_as.project not in project.get_parents() ): messages.error( request, 'Invalid role assignment for promotion.' ) return redirect(redirect_url) return super().get(request, *args, **kwargs) class RoleAssignmentUpdateView( LoginRequiredMixin, RolePermissionMixin, ProjectContextMixin, RoleAssignmentModifyFormMixin, CurrentUserFormMixin, InvalidFormMixin, UpdateView, ): """RoleAssignment updating view""" permission_required = 'projectroles.update_project_members' model = RoleAssignment form_class = RoleAssignmentForm slug_url_kwarg = 'roleassignment' slug_field = 'sodar_uuid' class RoleAssignmentDeleteView( LoginRequiredMixin, RolePermissionMixin, ProjectModifyPermissionMixin, ProjectContextMixin, RoleAssignmentDeleteMixin, DeleteView, ): """RoleAssignment deletion view""" permission_required = 'projectroles.update_project_members' model = RoleAssignment slug_url_kwarg = 'roleassignment' slug_field = 'sodar_uuid' def _get_inherited_children( self, project: Project, user: User, ret: list ) -> list: for child in project.get_children(): if not RoleAssignment.objects.filter(project=child, user=user): ret.append(child) else: ret = self._get_inherited_children(child, user, ret) return ret def get_context_data(self, *args, **kwargs): context = super().get_context_data(*args, **kwargs) project = self.get_project() user = self.object.user context['inherited_as'] = project.get_role(user, inherited_only=True) context['inherited_children'] = None if ( not context['inherited_as'] and self.object.role.rank < ROLE_RANKING[PROJECT_ROLE_FINDER] ): context['inherited_children'] = sorted( self._get_inherited_children(project, user, []), key=lambda x: x.full_title, ) return context def post(self, *args, **kwargs): self.object = self.get_object() user = self.object.user project = self.object.project # Override perms for owner/delegate if self.object.role.name == PROJECT_ROLE_OWNER or ( self.object.role.name == PROJECT_ROLE_DELEGATE and not self.request.user.has_perm( 'projectroles.update_project_delegate', project ) ): messages.error( self.request, f'You do not have permission to remove the membership of ' f'{self.object.role.name}.', ) else: try: self.object = self.delete_assignment( role_as=self.object, request=self.request ) messages.success( self.request, f'Membership of {user.username} removed.', ) except Exception as ex: messages.error( self.request, f'Failed to remove membership of {user.username}: {ex}', ) return redirect( reverse( 'projectroles:roles', kwargs={'project': project.sodar_uuid} ) ) class RoleAssignmentOwnDeleteView( LoginRequiredMixin, LoggedInPermissionMixin, ProjectContextMixin, RoleAssignmentDeleteMixin, DeleteView, ): """RoleAssignment deletion view for leaving project""" model = RoleAssignment # Perm overridden in has_permission() permission_required = 'projectroles.view_project' slug_url_kwarg = 'roleassignment' slug_field = 'sodar_uuid' template_name = 'projectroles/roleassignment_confirm_delete_own.html' def has_permission(self): """ Override has_permission() for one time check for view perms. NOTE: Single use case so we're not writing a common rules perm """ role_as = self.get_object() if role_as.project.is_project() and app_settings.get( APP_NAME, 'project_access_block', project=role_as.project ): return False if app_settings.get(APP_NAME, 'site_read_only'): return False user = self.request.user if ( role_as.user != user or role_as.role.rank < ROLE_RANKING[PROJECT_ROLE_DELEGATE] ): return False return True def get_context_data(self, *args, **kwargs): context = super().get_context_data(*args, **kwargs) role_as = self.get_object() user = role_as.user children = role_as.project.get_children(flat=True) local_child_projects = [ a.project for a in RoleAssignment.objects.filter( user=user, project__in=children ) ] context['inh_child_projects'] = [ p for p in children if p not in local_child_projects ] return context def post(self, *args, **kwargs): role_as = self.get_object() project = role_as.project try: self.object = self.delete_assignment( role_as=role_as, request=self.request ) messages.success( self.request, f'Successfully left {project.title}.' ) return redirect(reverse('home')) except Exception as ex: messages.error( self.request, f'Failed to leave {project.title}: {ex}' ) return redirect( reverse( 'projectroles:roles', kwargs={'project': project.sodar_uuid} ) ) class RoleAssignmentOwnerTransferMixin( AppSettingCleanupMixin, ProjectModifyPluginViewMixin ): """Mixin for owner RoleAssignment transfer in UI and API views""" #: Owner role object role_owner = None def _get_timeline_ok_status(self) -> Optional[str]: timeline = plugin_api.get_backend_api('timeline_backend') return timeline.TL_STATUS_OK if timeline else None def _get_timeline_failed_status(self) -> Optional[str]: timeline = plugin_api.get_backend_api('timeline_backend') return timeline.TL_STATUS_FAILED if timeline else None def _create_timeline_event( self, project: Project, old_owner: User, new_owner: User, old_owner_role: Optional[Role] = None, issuer: Optional[User] = None, ): """ Create timeline event for ownership transfer. :param project: Project object :param old_owner: User object for old owner :param new_owner: User object for new_owner :param old_owner_role: Role object for old owner's new role or None :param issuer: User who initiated ownership transfer or None """ timeline = plugin_api.get_backend_api('timeline_backend') if not timeline: return None tl_desc = 'transfer ownership from {prev_owner} to {new_owner}, ' if old_owner_role: tl_desc += f'set old owner as "{old_owner_role.name}"' else: tl_desc += 'remove old owner role' tl_event = timeline.add_event( project=project, app_name=APP_NAME, user=issuer, event_name='role_owner_transfer', description=tl_desc, extra_data={ 'prev_owner': old_owner.username, 'new_owner': new_owner.username, 'old_owner_role': ( old_owner_role.name if old_owner_role else None ), }, ) tl_event.add_object(old_owner, 'prev_owner', old_owner.username) tl_event.add_object(new_owner, 'new_owner', new_owner.username) return tl_event @transaction.atomic def _handle_transfer( self, project: Project, old_owner_as: RoleAssignment, new_owner: User, old_inh_owner: bool, old_owner_role: Optional[RoleAssignment] = None, request: Optional[HttpRequest] = None, ) -> bool: """ Handle ownership transfer with atomic rollback. :param project: Project object :param old_owner_as: RoleAssignment object for old owner :param new_owner: User object for new user :param old_inh_owner: Whether old owner is inherited owner (boolean) :param old_owner_role: Role object to set for old owner or None :param request: HttpRequest object or None """ # Inherited owner or no new role for old owner: delete local role if old_inh_owner or not old_owner_role: old_owner_as.delete() if not old_inh_owner: # Cleanup PROJECT_USER app settings self.cleanup_app_settings(project, old_owner_as.user) # Update old owner role else: old_owner_as.role = old_owner_role old_owner_as.save() # Update or create new owner role new_role_as = RoleAssignment.objects.filter( project=project, user=new_owner ).first() if new_role_as: new_role_as.role = self.role_owner new_role_as.save() else: RoleAssignment.objects.create( project=project, user=new_owner, role=self.role_owner ) # Call for additional actions for role creation/update in plugins if getattr(settings, 'PROJECTROLES_ENABLE_MODIFY_API', False): args = [ project, new_owner, old_owner_as.user, old_owner_role, request, ] self.call_project_modify_api( 'perform_owner_transfer', 'revert_owner_transfer', args ) return True def transfer_owner( self, project: Project, new_owner: User, old_owner_as: RoleAssignment, old_owner_role: Optional[Role] = None, request: Optional[HttpRequest] = None, notify_old: bool = True, ): """ Transfer project ownership to a new user and assign a new role to the previous owner. :param project: Project object :param new_owner: User object :param old_owner_as: RoleAssignment object :param old_owner_role: Role object for old owner's new role or None :param request: HttpRequest object or None :param notify_old: Notify old owner (boolean, default=True) """ app_alerts = plugin_api.get_backend_api('appalerts_backend') self.role_owner = Role.objects.get(name=PROJECT_ROLE_OWNER) old_owner = old_owner_as.user # Old owner inheritance old_inh_as = project.get_role(old_owner, inherited_only=True) old_inh_owner = ( True if old_inh_as and old_inh_as.role == self.role_owner else False ) # New owner inheritance new_inh_as = project.get_role(new_owner, inherited_only=True) new_inh_owner = ( True if new_inh_as and new_inh_as.role == self.role_owner else False ) issuer = request.user if request else None tl_event = self._create_timeline_event( project, old_owner, new_owner, old_owner_role, issuer ) try: self._handle_transfer( project, old_owner_as, new_owner, old_inh_owner, old_owner_role, request, ) except Exception as ex: if tl_event: tl_event.set_status(self._get_timeline_failed_status(), str(ex)) raise ex if tl_event: tl_event.set_status(self._get_timeline_ok_status()) # Notify old owner if ( notify_old and not old_inh_owner and issuer != old_owner and old_owner.is_active ): if app_alerts and app_settings.get( APP_NAME, 'notify_alert_role', user=old_owner ): if old_owner_role: alert_name = 'role_update' message = ROLE_UPDATE_MSG.format( project=project.title, role=old_owner_role.name ) alert_url = reverse( 'projectroles:detail', kwargs={'project': project.sodar_uuid}, ) else: alert_name = 'role_delete' message = ROLE_DELETE_MSG.format( project_type=get_display_name(project.type) ) alert_url = None app_alerts.add_alert( app_name=APP_NAME, alert_name=alert_name, user=old_owner, message=message, url=alert_url, project=project, ) if ( SEND_EMAIL and request and app_settings.get( APP_NAME, 'notify_email_role', user=old_owner ) ): # NOTE: Request is needed here email.send_role_change_mail( 'update' if old_owner_role else 'delete', project, old_owner, old_owner_role, request, ) # Notify new owner if not new_inh_owner and issuer != new_owner: if app_alerts and app_settings.get( APP_NAME, 'notify_alert_role', user=new_owner ): app_alerts.add_alert( app_name=APP_NAME, alert_name='role_update', user=new_owner, message=ROLE_UPDATE_MSG.format( project=project.title, role=self.role_owner.name ), url=reverse( 'projectroles:detail', kwargs={'project': project.sodar_uuid}, ), project=project, ) if ( SEND_EMAIL and request and app_settings.get( APP_NAME, 'notify_email_role', user=new_owner ) ): # NOTE: Request is needed here email.send_role_change_mail( 'update', project, new_owner, self.role_owner, request, ) class RoleAssignmentOwnerTransferView( LoginRequiredMixin, ProjectModifyPermissionMixin, CurrentUserFormMixin, ProjectContextMixin, RoleAssignmentOwnerTransferMixin, InvalidFormMixin, FormView, ): """Project owner RoleAssignment transfer view""" permission_required = 'projectroles.update_project_owner' template_name = 'projectroles/roleassignment_owner_transfer.html' form_class = RoleAssignmentOwnerTransferForm def get_form_kwargs(self): kwargs = super().get_form_kwargs() project = self.get_project() owner_as = RoleAssignment.objects.filter( project=project, role__name=PROJECT_ROLE_OWNER )[0] kwargs.update({'project': project, 'current_owner': owner_as.user}) return kwargs def get_context_data(self, *args, **kwargs): context = super().get_context_data(*args, **kwargs) owner_as = RoleAssignment.objects.filter( project=self.get_project(), role__name=PROJECT_ROLE_OWNER )[0] context.update({'current_owner': owner_as.user}) return context def form_valid(self, form): project = self.get_project() old_owner = form.current_owner old_owner_as = project.get_owner() new_owner = form.cleaned_data['new_owner'] old_owner_role = form.cleaned_data['old_owner_role'] redirect_url = reverse( 'projectroles:roles', kwargs={'project': project.sodar_uuid} ) try: self.transfer_owner( project, new_owner, old_owner_as, old_owner_role, request=self.request, ) except Exception as ex: # TODO: Add logging messages.error(self.request, f'Unable to transfer ownership: {ex}') if settings.DEBUG: raise ex return redirect(redirect_url) if old_owner.username != new_owner.username: success_msg = ( f'Successfully transferred ownership from {old_owner.username} ' f'to {new_owner.username}.' ) old_owner_email = app_settings.get( APP_NAME, 'notify_email_role', user=old_owner ) new_owner_email = app_settings.get( APP_NAME, 'notify_email_role', user=new_owner ) if SEND_EMAIL and (old_owner_email or new_owner_email): success_msg += ' Notification(s) have been sent by email.' messages.success(self.request, success_msg) return redirect(redirect_url) # ProjectInvite Views ---------------------------------------------------------- class ProjectInviteMixin: """Mixin for ProjectInvite helpers""" @classmethod def handle_invite( cls, invite: ProjectInvite, request: HttpRequest, resend: bool = False, add_message: bool = True, ): """ Handle invite creation, email sending/resending and logging to timeline. :param invite: ProjectInvite object :param request: HttpRequest object :param resend: Send or resend (bool) :param add_message: Add Django message on success/failure (bool) """ timeline = plugin_api.get_backend_api('timeline_backend') send_str = 'resend' if resend else 'send' status_type = timeline.TL_STATUS_OK status_desc = None if SEND_EMAIL: sent_mail = email.send_invite_mail(invite, request) if sent_mail == 0: status_type = timeline.TL_STATUS_FAILED status_desc = 'Email sending failed' else: status_type = timeline.TL_STATUS_FAILED status_desc = 'PROJECTROLES_SEND_EMAIL not True' if status_type != timeline.TL_STATUS_OK and not resend: status_desc += ', invite not created' # Add event in Timeline if timeline: timeline.add_event( project=invite.project, app_name=APP_NAME, user=request.user, event_name=f'invite_{send_str}', description=f'{send_str} project invite with role ' f'"{invite.role.name}" to {invite.email}', status_type=status_type, status_desc=status_desc, ) if add_message and status_type == timeline.TL_STATUS_OK: messages.success( request, 'Invite for "{}" role in {} sent to {}, expires on {}.'.format( invite.role.name, invite.project.title, invite.email, timezone.localtime(invite.date_expire).strftime( '%Y-%m-%d %H:%M' ), ), ) elif add_message and not resend: # NOTE: Delete invite if send fails invite.delete() messages.error(request, status_desc) @classmethod def revoke_invite( cls, invite: ProjectInvite, project: Project, request: HttpRequest ) -> ProjectInvite: """ Revoke invite. :param invite: ProjectInvite object :param project: Project object :param request: HttpRequest object :return: ProjectInvite object """ timeline = plugin_api.get_backend_api('timeline_backend') if invite: invite.active = False invite.save() # Add event in Timeline if timeline: timeline.add_event( project=project, app_name=APP_NAME, user=request.user, event_name='invite_revoke', description='revoke invite sent to "{}"'.format( invite.email if invite else 'N/A' ), status_type=( timeline.TL_STATUS_OK if invite else timeline.TL_STATUS_FAILED ), ) return invite class ProjectInviteView( LoginRequiredMixin, ProjectContextMixin, ProjectModifyPermissionMixin, TemplateView, ): """View for displaying and modifying project invites""" permission_required = 'projectroles.invite_users' template_name = 'projectroles/project_invites.html' model = ProjectInvite def get_context_data(self, *args, **kwargs): context = super().get_context_data(*args, **kwargs) context['invites'] = ProjectInvite.objects.filter( project=context['project'], active=True, date_expire__gt=timezone.now(), ) return context class ProjectInviteCreateView( LoginRequiredMixin, ProjectContextMixin, ProjectModifyPermissionMixin, ProjectInviteMixin, CurrentUserFormMixin, InvalidFormMixin, CreateView, ): """ProjectInvite creation view""" model = ProjectInvite form_class = ProjectInviteForm permission_required = 'projectroles.invite_users' def get_context_data(self, *args, **kwargs): context = super().get_context_data(*args, **kwargs) project = self.get_permission_object() context['preview_subject'] = email.get_invite_subject(project) context['preview_body'] = email.get_invite_body( project=project, issuer=self.request.user, role_name='{role_name}', invite_url='http://XXXXXXXXXXXXXXXXXXXXXXX', date_expire_str='YYYY-MM-DD HH:MM', ).replace('\n', '\\n') context['preview_message'] = email.get_invite_message( '{message}' ).replace('\n', '\\n') context['preview_footer'] = email.get_email_footer( self.request ).replace('\n', '\\n') return context def get_form_kwargs(self): """Pass current user and optional email/role to form""" kwargs = super().get_form_kwargs() kwargs.update({'project': self.get_permission_object().sodar_uuid}) email = self.request.GET.get('e', None) kwargs.update({'mail': unquote_plus(email) if email else None}) kwargs.update({'role': self.request.GET.get('r', None)}) return kwargs def form_valid(self, form): self.object = form.save() # Send mail and add to timeline self.handle_invite(invite=self.object, request=self.request) return redirect( reverse( 'projectroles:invites', kwargs={'project': self.object.project.sodar_uuid}, ) ) class ProjectInviteProcessMixin(ProjectModifyPluginViewMixin): """Mixin for accepting and processing project invites""" @classmethod def revoke_invite( cls, invite: ProjectInvite, user: Optional[User] = None, failed: bool = True, fail_desc: str = '', timeline: Any = None, ): """Set invite.active to False and save the invite""" invite.active = False invite.save() if failed and timeline and user: # Add event in Timeline timeline.add_event( project=invite.project, app_name=APP_NAME, user=user, event_name='invite_accept', description='accept project invite', status_type=timeline.TL_STATUS_FAILED, status_desc=fail_desc, ) def get_invite(self, secret: str) -> Optional[ProjectInvite]: """Get invite, display message if not found""" try: return ProjectInvite.objects.get(secret=secret) except ProjectInvite.DoesNotExist: messages.error(self.request, 'Invite does not exist.') return None def user_role_exists(self, invite: ProjectInvite, user: User) -> bool: """ Display message if user already has roles in project. Also revoke the invite if necessary. """ if invite.project.has_role(user): messages.warning( self.request, mark_safe( 'You are already a member of this {project}. ' '<a href="{url}">Please use this URL to access the ' '{project}</a>.'.format( project=get_display_name(invite.project.type), url=reverse( 'projectroles:detail', kwargs={'project': invite.project.sodar_uuid}, ), ) ), ) if invite.active: # Only revoke if active self.revoke_invite(invite, user) return True return False def is_invite_expired( self, invite: ProjectInvite, user: Optional[User] = None ) -> bool: """Display message and send email to issuer if invite is expired""" if invite.date_expire < timezone.now(): messages.error( self.request, f'Your invite has expired. Please contact the inviter: ' f'{invite.issuer.name} ({invite.issuer.email})', ) # Send notification of expiry to issuer if SEND_EMAIL and app_settings.get( APP_NAME, 'notify_email_role', user=invite.issuer ): email.send_invite_expiry_mail( invite, self.request, user_name=user.get_full_name() if user else invite.email, ) self.revoke_invite( invite, user, failed=True, fail_desc='Invite expired' ) return True return False # TODO: Combine with RoleAssignmentModifyMixin.modify_assignment? @transaction.atomic def create_assignment( self, invite: ProjectInvite, user: User, timeline: Any = None, ): """Create role assignment for invited user""" app_alerts = plugin_api.get_backend_api('appalerts_backend') tl_event = None if timeline: tl_event = timeline.add_event( project=invite.project, app_name=APP_NAME, user=user, event_name='invite_accept', description=f'accept project invite with role of ' f'"{invite.role.name}"', ) # Create the assignment role_as = RoleAssignment( user=user, project=invite.project, role=invite.role ) # Call for additional actions for role creation/update in plugins if getattr(settings, 'PROJECTROLES_ENABLE_MODIFY_API', False): args = [role_as, PROJECT_ACTION_CREATE, None, self.request] self.call_project_modify_api( 'perform_role_modify', 'revert_role_modify', args ) role_as.save() if tl_event: tl_event.set_status(timeline.TL_STATUS_OK) # Notify the issuer by alert and email if ( app_alerts and invite.issuer.is_active and app_settings.get( APP_NAME, 'notify_alert_role', user=invite.issuer ) ): app_alerts.add_alert( app_name=APP_NAME, alert_name='invite_accept', user=invite.issuer, message=f'Invitation sent to "{user.email}" for project ' f'"{invite.project.title}" with the role "{invite.role.name}" ' f'was accepted.', url=reverse( 'projectroles:detail', kwargs={'project': invite.project.sodar_uuid}, ), project=invite.project, ) if ( SEND_EMAIL and invite.issuer.is_active and app_settings.get( APP_NAME, 'notify_email_role', user=invite.issuer ) ): email.send_invite_accept_mail(invite, self.request, user) # Deactivate the invite and add success message self.revoke_invite(invite, user, failed=False, timeline=timeline) messages.success( self.request, PROJECT_WELCOME_MSG.format( project_type=get_display_name(invite.project.type), project_title=invite.project.title, role=invite.role.name, ), ) def redirect_error( self, msg: Union[str, Exception, None] = None ) -> HttpResponseRedirect: if msg: messages.error(self.request, msg) return redirect(reverse('home')) class ProjectInviteAcceptView(ProjectInviteProcessMixin, View): """View to handle accepting a project invite""" def _redirect_process(self, login: bool = True) -> HttpResponseRedirect: """Redirect to the proper process view""" url = 'projectroles:invite_process_{}'.format( 'login' if login else 'new_user' ) kw = {'secret': self.kwargs.get('secret')} return redirect(reverse(url, kwargs=kw)) def get(self, *args, **kwargs): invite = self.get_invite(secret=kwargs['secret']) if not invite: return self.redirect_error(INVITE_NOT_FOUND_MSG) user = self.request.user if ( not user.is_anonymous and user.is_authenticated and user.email == invite.email and self.user_role_exists(invite, user) ): return redirect( reverse( 'projectroles:detail', kwargs={'project': invite.project.sodar_uuid}, ) ) if (settings.ENABLE_LDAP and invite.is_ldap()) or ( settings.ENABLE_OIDC and not settings.PROJECTROLES_ALLOW_LOCAL_USERS ): return self._redirect_process() elif settings.PROJECTROLES_ALLOW_LOCAL_USERS: return self._redirect_process(False) return self.redirect_error( 'Local or OIDC users are not enabled on this site.' ) class ProjectInviteProcessLoginView( LoginRequiredMixin, ProjectInviteProcessMixin, View ): """ View for processing project invite with a logged in LDAP/OIDC user. Requires login and then creates project assignment for the invited user. """ def get(self, *args, **kwargs): invite = self.get_invite(kwargs['secret']) if not invite: return self.redirect_error(INVITE_NOT_FOUND_MSG) timeline = plugin_api.get_backend_api('timeline_backend') # Check if user has already accepted the invite if self.user_role_exists(invite, self.request.user): # NOTE: No message, simply redirect return redirect( reverse( 'projectroles:detail', kwargs={'project': invite.project.sodar_uuid}, ) ) # Check if invite expired if self.is_invite_expired(invite, self.request.user): return self.redirect_error() # Create RoleAssignment try: self.create_assignment(invite, self.request.user, timeline=timeline) except Exception as ex: return self.redirect_error(ex) return redirect( reverse( 'projectroles:detail', kwargs={'project': invite.project.sodar_uuid}, ) ) class ProjectInviteProcessNewUserView(ProjectInviteProcessMixin, FormView): """ View for processing a local/OIDC project invite as a newly created user. Also provides an OIDC login element to login instead of creating a local user account. """ form_class = LocalUserForm template_name = 'projectroles/user_form.html' def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context['invite'] = self.get_invite(self.kwargs['secret']) return context def get(self, *args, **kwargs): invite = self.get_invite(self.kwargs['secret']) if not invite: return self.redirect_error(INVITE_NOT_FOUND_MSG) timeline = plugin_api.get_backend_api('timeline_backend') # Redirect to login process view if OIDC is enabled but local isn't if settings.ENABLE_OIDC and not settings.PROJECTROLES_ALLOW_LOCAL_USERS: return redirect( reverse( 'projectroles:invite_process_login', kwargs={'secret': invite.secret}, ) ) # If local users are not allowed but OIDC is, redirect to home elif not settings.PROJECTROLES_ALLOW_LOCAL_USERS: return self.redirect_error(INVITE_LOCAL_NOT_ALLOWED_MSG) # Check invite for correct type if invite.is_ldap(): return self.redirect_error(INVITE_LDAP_LOCAL_VIEW_MSG) # Check if invited user exists user = User.objects.filter(email=invite.email).first() # Check if invite has expired if self.is_invite_expired(invite, user): return self.redirect_error() # Error message already added # A user is not logged in if self.request.user.is_anonymous: # Redirect to login if user exists if user: messages.info(self.request, INVITE_USER_EXISTS_MSG) return redirect( reverse('login') + '?next=' + reverse( 'projectroles:invite_process_new_user', kwargs={'secret': invite.secret}, ) ) # Show form if user doesn't exist and no user is logged in return super().get(*args, **kwargs) # Logged in but the invited user does not exist yet if not user: return self.redirect_error(INVITE_LOGGED_IN_ACCEPT_MSG) # Logged in user is not invited user if self.request.user != user: return self.redirect_error(INVITE_USER_NOT_EQUAL_MSG) # User exists but is not local if not user.is_local(): return self.redirect_error('User exists, but is not local.') # Create RoleAssignment try: self.create_assignment(invite, self.request.user, timeline=timeline) except Exception as ex: return self.redirect_error(ex) return redirect( reverse( 'projectroles:detail', kwargs={'project': invite.project.sodar_uuid}, ) ) def get_initial(self): """Returns the initial data for the form.""" initial = super().get_initial() try: invite = ProjectInvite.objects.get(secret=self.kwargs['secret']) except ProjectInvite.DoesNotExist: return initial username_base = invite.email.split('@')[0] i = 0 while True: username = username_base + str(i if i else '') try: User.objects.get(username=username) i += 1 except User.DoesNotExist: break initial.update({'email': invite.email, 'username': username}) return initial def form_valid(self, form): invite = self.get_invite(self.kwargs['secret']) if not invite: return redirect(reverse('home')) timeline = plugin_api.get_backend_api('timeline_backend') # Check if local users are allowed if not settings.PROJECTROLES_ALLOW_LOCAL_USERS: return self.redirect_error( 'Invite was issued to non-LDAP user, but local users are not ' 'allowed.' ) # Check invite for correct type if invite.is_ldap(): return self.redirect_error( 'Invite was issued for LDAP user, but local invite view was ' 'requested.' ) # Check if invite is expired if self.is_invite_expired(invite): return self.redirect_error() # Create user and RoleAssignment user = User.objects.create_user( form.cleaned_data['username'], email=form.cleaned_data['email'], password=form.cleaned_data['password'], first_name=form.cleaned_data['first_name'], last_name=form.cleaned_data['last_name'], ) if self.user_role_exists(invite, user): return redirect( reverse( 'projectroles:detail', kwargs={'project': invite.project.sodar_uuid}, ) ) try: self.create_assignment(invite, user, timeline=timeline) except Exception as ex: user.delete() return self.redirect_error(ex) return redirect( reverse( 'projectroles:detail', kwargs={'project': invite.project.sodar_uuid}, ) ) class ProjectInviteResendView( LoginRequiredMixin, ProjectModifyPermissionMixin, ProjectInviteMixin, View ): """View to handle resending a project invite""" permission_required = 'projectroles.invite_users' def get(self, *args, **kwargs): try: invite = ProjectInvite.objects.get( sodar_uuid=self.kwargs['projectinvite'], active=True ) except ProjectInvite.DoesNotExist: messages.error(self.request, INVITE_NOT_FOUND_MSG) return redirect( reverse( 'projectroles:invites', kwargs={'project': self.get_project()}, ) ) # Reset invite expiration date invite.reset_date_expire() # Resend mail and add to timeline self.handle_invite(invite=invite, request=self.request, resend=True) return redirect( reverse( 'projectroles:invites', kwargs={'project': invite.project.sodar_uuid}, ) ) class ProjectInviteRevokeView( LoginRequiredMixin, ProjectModifyPermissionMixin, ProjectContextMixin, ProjectInviteMixin, TemplateView, ): """Batch delete/move confirm view""" template_name = 'projectroles/invite_revoke_confirm.html' permission_required = 'projectroles.invite_users' def get_context_data(self, *args, **kwargs): context = super().get_context_data(*args, **kwargs) context['project'] = self.get_project() if 'projectinvite' in self.kwargs: try: context['invite'] = ProjectInvite.objects.get( sodar_uuid=self.kwargs['projectinvite'] ) except ProjectInvite.DoesNotExist: pass return context def post(self, request, **kwargs): """Override post() to handle POST from confirmation template""" project = self.get_project() invite = ProjectInvite.objects.filter( sodar_uuid=kwargs['projectinvite'] ).first() if ( invite and invite.role.name == PROJECT_ROLE_DELEGATE and not request.user.has_perm( 'projectroles.update_project_delegate', project ) ): # Causes revoke_failed_invite() to add failed timeline event invite = None messages.error( self.request, 'No permissions for updating delegate invite.' ) elif invite: messages.success(self.request, 'Invite revoked.') else: messages.error(self.request, 'Invite not found.') self.revoke_invite(invite, project, request) return redirect( reverse( 'projectroles:invites', kwargs={'project': project.sodar_uuid} ) ) # User management views -------------------------------------------------------- class LocalUserUpdateView( LoginRequiredMixin, PermissionRequiredMixin, HTTPRefererMixin, InvalidFormMixin, UpdateView, ): """Display and process the user update view""" form_class = LocalUserForm permission_required = 'projectroles.update_local_user' template_name = 'projectroles/user_form.html' success_url = reverse_lazy('home') def get_object(self, **kwargs): return self.request.user def get_success_url(self): messages.success(self.request, USER_PROFILE_UPDATE_MSG) return reverse('home') # Site App Setting management -------------------------------------------------- class SiteAppSettingsView( LoginRequiredMixin, LoggedInPermissionMixin, InvalidFormMixin, FormView ): """Site app settings form view""" form_class = SiteAppSettingsForm permission_required = 'projectroles.update_site_settings' success_url = reverse_lazy('projectroles:site_app_settings') template_name = 'projectroles/siteappsettings_form.html' def form_valid(self, form): result = super().form_valid(form) for k, v in form.cleaned_data.items(): if k.startswith('settings.'): _, plugin_name, setting_name = k.split('.', 3) app_settings.set(plugin_name, setting_name, v) messages.success(self.request, SITE_SETTING_UPDATE_MSG) return result # Remote site and project views ------------------------------------------------ class RemoteSiteListView( LoginRequiredMixin, LoggedInPermissionMixin, TemplateView ): """Main view for displaying remote site list""" permission_required = 'projectroles.update_remote' template_name = 'projectroles/remote_sites.html' def get_context_data(self, *args, **kwargs): context = super().get_context_data(*args, **kwargs) site_mode = ( SITE_MODE_TARGET if settings.PROJECTROLES_SITE_MODE == SITE_MODE_SOURCE else SITE_MODE_SOURCE ) sites = RemoteSite.objects.filter(mode=site_mode).order_by('name') if ( sites.count() > 0 and settings.PROJECTROLES_SITE_MODE == SITE_MODE_TARGET ): sites = sites[:1] context['sites'] = sites return context # TODO: Remove this once implementing #76 def get(self, request, *args, **kwargs): if getattr(settings, 'PROJECTROLES_DISABLE_CATEGORIES', False): messages.warning( request, '{} {} and nesting disabled, ' 'remote {} sync disabled.'.format( get_display_name(PROJECT_TYPE_PROJECT, title=True), get_display_name(PROJECT_TYPE_CATEGORY, plural=True), get_display_name(PROJECT_TYPE_PROJECT), ), ) return redirect('home') return super().get(request, *args, **kwargs) class RemoteSiteModifyMixin(ModelFormMixin): """Helpers for remote site modification""" def _create_timeline_event( self, remote_site: RemoteSite, user: User, form_action: str ): """ Create timeline event for remote site creation/update. :param remote_site: RemoteSite object :param user: User object :param form_action: String :param form_action: """ timeline = plugin_api.get_backend_api('timeline_backend') if not timeline: return status = form_action if form_action == 'set' else form_action[0:-1] event_name = f'{remote_site.mode.lower()}_site_{status}' tl_desc = '{} remote {} site {{{}}}'.format( status, remote_site.mode.lower(), 'remote_site', ) tl_event = timeline.add_event( project=None, app_name=APP_NAME, user=user, event_name=event_name, description=tl_desc, classified=True, extra_data={ 'name': remote_site.name, 'url': remote_site.url, 'description': remote_site.description, 'mode': remote_site.mode, 'user display': remote_site.user_display, 'secret': remote_site.secret, }, status_type=timeline.TL_STATUS_OK, ) tl_event.add_object( obj=remote_site, label='remote_site', name=remote_site.name ) def form_valid(self, form): """Override form_valid() to save timeline event and handle UI""" if self.object: form_action = 'updated' elif settings.PROJECTROLES_SITE_MODE == 'TARGET': form_action = 'set' else: form_action = 'created' self.object = form.save() # Create timeline event self._create_timeline_event(self.object, self.request.user, form_action) messages.success( self.request, f'{self.object.mode.capitalize()} site "{self.object.name}" ' f'{form_action}.', ) return redirect(reverse('projectroles:remote_sites')) class RemoteSiteCreateView( LoginRequiredMixin, LoggedInPermissionMixin, RemoteSiteModifyMixin, HTTPRefererMixin, CurrentUserFormMixin, InvalidFormMixin, CreateView, ): """RemoteSite creation view""" model = RemoteSite form_class = RemoteSiteForm permission_required = 'projectroles.update_remote' def get(self, request, *args, **kwargs): """ Override get() to disallow rendering this view if current site is in TARGET mode and a source site already exists. """ if ( settings.PROJECTROLES_SITE_MODE == SITE_MODE_TARGET and RemoteSite.objects.filter(mode=SITE_MODE_SOURCE).count() > 0 ): messages.error(request, 'Source site has already been set') return redirect(reverse('projectroles:remote_sites')) return super().get(request, args, kwargs) class RemoteSiteUpdateView( LoginRequiredMixin, LoggedInPermissionMixin, RemoteSiteModifyMixin, HTTPRefererMixin, CurrentUserFormMixin, InvalidFormMixin, UpdateView, ): """RemoteSite updating view""" model = RemoteSite form_class = RemoteSiteForm permission_required = 'projectroles.update_remote' slug_url_kwarg = 'remotesite' slug_field = 'sodar_uuid' class RemoteSiteDeleteView( LoginRequiredMixin, LoggedInPermissionMixin, HTTPRefererMixin, DeleteView, ): """RemoteSite deletion view""" model = RemoteSite permission_required = 'projectroles.update_remote' slug_url_kwarg = 'remotesite' slug_field = 'sodar_uuid' def get_success_url(self): timeline = plugin_api.get_backend_api('timeline_backend') if timeline: event_name = '{}_site_delete'.format( 'source' if self.object.mode == SITE_MODE_SOURCE else 'target' ) tl_desc = 'delete remote site {remote_site}' tl_event = timeline.add_event( project=None, app_name=APP_NAME, user=self.request.user, event_name=event_name, description=tl_desc, classified=True, status_type=timeline.TL_STATUS_OK, ) tl_event.add_object( obj=self.object, label='remote_site', name=self.object.name ) messages.success( self.request, f'{self.object.mode.capitalize()} site "{self.object.name}" ' f'deleted', ) return reverse('projectroles:remote_sites') class RemoteProjectListView( LoginRequiredMixin, LoggedInPermissionMixin, TemplateView ): """View for displaying the project lsit for remote site sync""" permission_required = 'projectroles.update_remote' template_name = 'projectroles/remote_projects.html' def get_context_data(self, *args, **kwargs): context = super().get_context_data(**kwargs) site = RemoteSite.objects.get(sodar_uuid=self.kwargs['remotesite']) context['site'] = site # Projects in SOURCE mode: all local projects of type PROJECT if settings.PROJECTROLES_SITE_MODE == SITE_MODE_SOURCE: projects = Project.objects.filter(type=PROJECT_TYPE_PROJECT) # Projects in TARGET mode: retrieve from source else: # SITE_MODE_TARGET remote_uuids = [p.project_uuid for p in site.projects.all()] projects = Project.objects.filter( type=PROJECT_TYPE_PROJECT, sodar_uuid__in=remote_uuids ) if projects: context['projects'] = projects.order_by('full_title') return context class RemoteProjectBatchUpdateView( LoginRequiredMixin, LoggedInPermissionMixin, TemplateView ): """ Manually created form view for updating project access in batch for a remote target site. """ permission_required = 'projectroles.update_remote' template_name = 'projectroles/remoteproject_update.html' def get_context_data(self, *args, **kwargs): context = super().get_context_data(**kwargs) # Current site context['site'] = RemoteSite.objects.filter( sodar_uuid=kwargs['remotesite'] ).first() return context def post(self, request, *args, **kwargs): timeline = plugin_api.get_backend_api('timeline_backend') post_data = request.POST context = self.get_context_data(*args, **kwargs) site = context['site'] confirmed = True if 'update-confirmed' in post_data else False redirect_url = reverse( 'projectroles:remote_projects', kwargs={'remotesite': site.sodar_uuid}, ) # Ensure site is in SOURCE mode if settings.PROJECTROLES_SITE_MODE != SITE_MODE_SOURCE: messages.error( request, 'Site in TARGET mode, cannot update {} access'.format( get_display_name(PROJECT_TYPE_PROJECT) ), ) return redirect(redirect_url) access_fields = { k: v for k, v in post_data.items() if k.startswith('remote_access') } # Confirmation needed if not confirmed: # Pass on (only) changed projects to confirmation form modifying_access = [] for k, v in access_fields.items(): project_uuid = k.split('_')[2] remote_obj = RemoteProject.objects.filter( site=site, project_uuid=project_uuid ).first() if (not remote_obj and v != REMOTE_LEVEL_NONE) or ( remote_obj and remote_obj.level != v ): modifying_access.append( { 'project': Project.objects.get( sodar_uuid=project_uuid ), 'old_level': ( REMOTE_LEVEL_NONE if not remote_obj else remote_obj.level ), 'new_level': v, } ) if not modifying_access: messages.warning( request, 'No changes to {} access detected'.format( get_display_name(PROJECT_TYPE_PROJECT) ), ) return redirect(redirect_url) context['modifying_access'] = modifying_access return super().render_to_response(context) # Confirmed modifying_access = [] old_level = REMOTE_LEVEL_NONE for k, v in access_fields.items(): project_uuid = k.split('_')[2] project = Project.objects.filter(sodar_uuid=project_uuid).first() # Update or create a RemoteProject object try: rp = RemoteProject.objects.get( site=site, project_uuid=project_uuid ) old_level = rp.level rp.level = v except RemoteProject.DoesNotExist: rp = RemoteProject( site=site, project_uuid=project_uuid, project=project, level=v, ) rp.save() modifying_access.append( { 'project': project.get_log_title(), 'old_level': ( REMOTE_LEVEL_NONE if not old_level else old_level ), 'new_level': v, } ) if timeline and project: tl_desc = 'update remote access on site {{{}}} to "{}"'.format( 'remote_site', SODAR_CONSTANTS['REMOTE_ACCESS_LEVELS'][v].lower(), ) tl_event = timeline.add_event( project=project, app_name=APP_NAME, user=request.user, event_name='remote_project_update', description=tl_desc, classified=True, status_type=timeline.TL_STATUS_OK, ) tl_event.add_object(site, 'remote_site', site.name) if timeline: tl_desc = 'update remote projects for {remote_site}' tl_event = timeline.add_event( project=None, app_name=APP_NAME, user=request.user, event_name='remote_batch_update', description=tl_desc, extra_data={'modifying_access': modifying_access}, classified=True, status_type=timeline.TL_STATUS_OK, ) tl_event.add_object(obj=site, label='remote_site', name=site.name) # All OK messages.success( request, 'Access level updated for {} {} in site "{}"'.format( len(access_fields.items()), get_display_name( PROJECT_TYPE_PROJECT, count=len(access_fields.items()) ), context['site'].name, ), ) return redirect(redirect_url) class RemoteProjectSyncView( LoginRequiredMixin, LoggedInPermissionMixin, TemplateView ): """Target site view for synchronizing remote projects from a source site""" permission_required = 'projectroles.update_remote' template_name = 'projectroles/remoteproject_sync.html' def get_context_data(self, *args, **kwargs): context = super().get_context_data(**kwargs) # Current site context['site'] = RemoteSite.objects.filter( sodar_uuid=kwargs['remotesite'] ).first() return context def get(self, request, *args, **kwargs): """Override get() for a confirmation view""" redirect_url = reverse('projectroles:remote_sites') if settings.PROJECTROLES_SITE_MODE == SITE_MODE_SOURCE: messages.error( request, 'Site in SOURCE mode, remote sync not allowed' ) return redirect(redirect_url) timeline = plugin_api.get_backend_api('timeline_backend') remote_api = RemoteProjectAPI() context = self.get_context_data(*args, **kwargs) source_site = context['site'] try: remote_data = remote_api.get_remote_data(source_site) except Exception as ex: messages.error( request, 'Unable to synchronize {}: {}'.format( get_display_name(PROJECT_TYPE_PROJECT, plural=True), ex ), ) return redirect(redirect_url) # Sync data try: update_data = remote_api.sync_remote_data( source_site, remote_data, request ) except Exception as ex: messages.error( request, f'Remote sync cancelled with exception: {ex}' ) if settings.DEBUG: raise ex return redirect(redirect_url) # Check for updates user_count = len( [v for v in update_data['users'].values() if 'status' in v] ) project_count = len( [v for v in update_data['projects'].values() if 'status' in v] ) app_settings_count = len( [ v for v in update_data['app_settings'].values() if 'status' in v and v['status'] != 'skipped' ] ) role_count = 0 for p in [p for p in update_data['projects'].values() if 'roles' in p]: for _ in [r for r in p['roles'].values() if 'status' in r]: role_count += 1 # Redirect if no changes were detected if ( user_count == 0 and project_count == 0 and role_count == 0 and app_settings_count == 0 ): messages.warning( request, 'No changes in remote site detected, nothing to synchronize.', ) return redirect(redirect_url) context['update_data'] = update_data context['user_count'] = user_count context['project_count'] = project_count context['role_count'] = role_count context['app_settings_count'] = app_settings_count # Create timeline events for projects if timeline: tl_desc = 'synchronize remote site {remote_site}' tl_event = timeline.add_event( project=None, app_name=APP_NAME, user=request.user, event_name='remote_project_sync', description=tl_desc, classified=True, status_type=timeline.TL_STATUS_OK, ) tl_event.add_object( obj=source_site, label='remote_site', name=source_site.name ) messages.success( request, '{} data updated according to source site.'.format( get_display_name(PROJECT_TYPE_PROJECT, title=True) ), ) return super().render_to_response(context)