Source code for debusine.db.models.workspaces

# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Data models for db workspaces."""

import re
from collections.abc import Sequence
from datetime import datetime, timedelta
from typing import (
    Any,
    Generic,
    TYPE_CHECKING,
    TypeAlias,
    TypeVar,
    Union,
    assert_never,
    cast,
)

from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from django.contrib.postgres.aggregates import ArrayAgg
from django.core.exceptions import ValidationError
from django.db import models
from django.db.models import F, Max, Q, QuerySet, UniqueConstraint, Value
from django.db.models.functions import Greatest
from django.urls import reverse

from debusine.db.context import ContextConsistencyError, context
from debusine.db.models import permissions
from debusine.db.models.files import File
from debusine.db.models.permissions import (
    PartialCheckResult,
    PermissionUser,
    ROLES,
    enforce,
    permission_check,
    permission_filter,
)
from debusine.db.models.scopes import Scope, ScopeRoles
from debusine.utils.typing_utils import copy_signature_from

if TYPE_CHECKING:
    from django_stubs_ext.db.models import TypedModelMeta

    from debusine.db.models import Collection, User
else:
    TypedModelMeta = object

A = TypeVar("A")

#: Workspace names reserved for use in toplevel URL path components
RESERVED_WORKSPACE_NAMES = frozenset(
    # TODO: trim after !1274
    (
        "accounts",
        "artifact",
        "task-status",
        "user",
        "workers",
        "work-request",
        "workspaces",
    )
)

#: Regexp matching the structure of workspace names
workspace_name_regex = re.compile(r"^[A-Za-z][A-Za-z0-9+._-]*$")


def WORKSPACE_ROLES(
    user: "User",
    *,
    scope: Sequence[ScopeRoles] = (),
    workspace: Sequence["WorkspaceRoles"] = (),
    member: str | None = None,
) -> Q:
    """
    Filter workspaces by role.

    It selects elements for which user has at least one of the given scope or
    workspace roles.

    Note that this does not enforce uniqueness, and it can select a workspace
    twice if it's both directly owned and part of an owned scope.
    """
    result = Q()
    if member:
        if scope:
            result |= ROLES(user, *scope, member=f"{member}__scope")
        if workspace:
            result |= ROLES(user, *workspace, member=member)
    else:
        if scope:
            result |= ROLES(user, *scope, member="scope")
        if workspace:
            result |= ROLES(user, *workspace)
    if not result:
        raise ValueError("at least one of scope or workspace needs to be set")
    return result


def is_valid_workspace_name(value: str) -> bool:
    """Check if value is a valid workspace name."""
    if value in RESERVED_WORKSPACE_NAMES:
        return False
    return bool(workspace_name_regex.match(value))


def validate_workspace_name(value: str) -> None:
    """Validate workspace names."""
    if not is_valid_workspace_name(value):
        raise ValidationError(
            "%(value)r is not a valid workspace name", params={"value": value}
        )


class WorkspaceQuerySet(QuerySet["Workspace", A], Generic[A]):
    """Custom QuerySet for Workspace."""

    def with_expiration_time(self) -> "WorkspaceQuerySet[Any]":
        """Annotate the queryset with expiration times."""
        return self.annotate(
            expiration_time=F("expiration_delay")
            + Greatest("created_at", Max("workrequest__completed_at"))
        )

    def in_current_scope(self) -> "WorkspaceQuerySet[A]":
        """Filter to workspaces in the current scope."""
        if context.scope is None:
            raise ContextConsistencyError("scope is not set")
        return self.filter(scope=context.scope)

    @permission_filter
    def can_display(self, user: PermissionUser) -> "WorkspaceQuerySet[A]":
        """Keep only Workspaces that can be displayed."""
        assert user is not None  # Enforced by decorator
        # Workers can currently access all workspaces
        # TODO: see #523
        if context.worker_token:
            return self
        constraints = Q(public=True)
        # This is the same check done in Workspace.set_current, and if changed
        # they need to be kept in sync
        if user.is_authenticated:
            constraints |= WORKSPACE_ROLES(
                user,
                scope=[Scope.Roles.OWNER],
                workspace=[Workspace.Roles.OWNER, Workspace.Roles.CONTRIBUTOR],
            )
        return self.filter(constraints).distinct()

    @permission_filter
    def can_configure(self, user: PermissionUser) -> "WorkspaceQuerySet[A]":
        """Keep only Workspaces that can be configured."""
        assert user is not None  # Enforced by decorator
        # Workers cannot change workspace configuration
        if context.worker_token:
            return self.none()
        if not user.is_authenticated:
            return self.none()
        constraints = WORKSPACE_ROLES(
            user,
            scope=[Scope.Roles.OWNER],
            workspace=[Workspace.Roles.OWNER],
        )
        return self.filter(constraints).distinct()

    @permission_filter
    def can_create_artifacts(
        self, user: PermissionUser
    ) -> "WorkspaceQuerySet[A]":
        """Keep only Workspaces where the user can create artifacts."""
        assert user is not None  # Enforced by decorator
        # Workers can currently work on all workspaces
        # TODO: see #523
        if context.worker_token:
            return self
        if not user.is_authenticated:
            return self.none()
        # TODO: this allows writing to public workspaces. It reflects the
        # status quo, and it may now be what we want in the long term
        constraints = Q(public=True) | WORKSPACE_ROLES(
            user,
            scope=[Scope.Roles.OWNER],
            workspace=[Workspace.Roles.OWNER, Workspace.Roles.CONTRIBUTOR],
        )
        return self.filter(constraints).distinct()

    @permission_filter
    def can_create_work_requests(
        self, user: PermissionUser
    ) -> "WorkspaceQuerySet[A]":
        """Workspaces where the user can create work requests."""
        assert user is not None  # Enforced by decorator
        # Workers and anonymous users are not allowed
        if context.worker_token or not user.is_authenticated:
            return self.none()
        return self.filter(
            WORKSPACE_ROLES(
                user,
                scope=[Scope.Roles.OWNER],
                # TODO: At some point we may disallow users with only
                # contributor permissions from creating work requests
                # directly rather than via workflows, but that depends on
                # some other things, such as exactly how we implement custom
                # workflows and making it easy for users to create their own
                # workspaces.
                workspace=[Workspace.Roles.OWNER, Workspace.Roles.CONTRIBUTOR],
            )
        ).distinct()

    @permission_filter
    def can_create_experiment_workspace(
        self, user: PermissionUser
    ) -> "WorkspaceQuerySet[A]":
        """Workspaces from which the user can create an experiment workspace."""
        assert user is not None  # Enforced by decorator
        # Workers and anonymous users are not allowed
        if context.worker_token or not user.is_authenticated:
            return self.none()
        constraints = Q(public=True) | WORKSPACE_ROLES(
            user,
            scope=[Scope.Roles.OWNER],
            workspace=[Workspace.Roles.OWNER, Workspace.Roles.CONTRIBUTOR],
        )
        return self.filter(constraints).distinct()


class WorkspaceManager(models.Manager["Workspace"]):
    """Manager for Workspace model."""

    def get_roles_model(self) -> type["WorkspaceRole"]:
        """Get the model used for role assignment."""
        return WorkspaceRole

    def get_queryset(self) -> WorkspaceQuerySet[Any]:
        """Use the custom QuerySet."""
        return WorkspaceQuerySet(self.model, using=self._db)

    def get_for_context(self, name: str) -> "Workspace":
        """
        Query a workspace for setting into the current context.

        This defaults to context.scope as the scope, and prefetches roles for
        the current user
        """
        # Note: user permissions are checked by Workspace.set_current
        scope = context.require_scope()
        user = context.require_user()
        queryset = self.get_queryset().select_related("scope")
        if user.is_authenticated:
            queryset = queryset.annotate(
                user_roles=ArrayAgg(
                    "roles__role",
                    filter=Q(roles__group__users=user),
                    distinct=True,
                    default=Value([]),
                )
            )
        workspace = queryset.get(scope=scope, name=name)
        assert isinstance(workspace, Workspace)
        return workspace


class DeleteWorkspaces:
    """Delete all workspaces in a queryset."""

    def __init__(self, workspaces: WorkspaceQuerySet["Workspace"]) -> None:
        """Store workspaces to delete and compute affected objects."""
        # Import here to prevent circular dependencies
        from debusine.db import models as dmodels

        self.workspaces = workspaces

        # Since we use on_delete=PROTECT on most models, there may be
        # elements in the model interdependency graphs that we are not
        # deleting yet.
        #
        # It is difficult to test this without having infrastructure to
        # simulate a fully populated database that is kept up to date as
        # new models get added, so for the moment we limit ourselves to
        # adding to this as the need arises.
        self.workflow_templates = dmodels.WorkflowTemplate.objects.filter(
            workspace__in=workspaces
        )
        self.work_requests = dmodels.WorkRequest.objects.filter(
            workspace__in=workspaces
        )
        self.collection_items = dmodels.CollectionItem.objects.filter(
            parent_collection__workspace__in=workspaces
        )
        self.collections = dmodels.Collection.objects.filter(
            workspace__in=workspaces
        )
        self.file_in_artifacts = dmodels.FileInArtifact.objects.filter(
            artifact__workspace__in=workspaces
        )
        self.artifact_relations = dmodels.ArtifactRelation.objects.filter(
            Q(artifact__workspace__in=workspaces)
            | Q(target__workspace__in=workspaces)
        )
        self.artifacts = dmodels.Artifact.objects.filter(
            workspace__in=workspaces
        )
        self.workspace_roles = WorkspaceRole.objects.filter(
            resource__in=workspaces
        )
        self.asset_usages = dmodels.AssetUsage.objects.filter(
            workspace__in=workspaces
        )
        self.assets = dmodels.Asset.objects.filter(workspace__in=workspaces)

    def perform_deletions(self) -> None:
        """Delete the workspaces and all their resources."""
        self.workflow_templates.delete()
        self.work_requests.delete()
        self.collection_items.delete()
        self.collections.delete()
        self.file_in_artifacts.delete()
        self.artifact_relations.delete()
        self.artifacts.delete()
        self.asset_usages.delete()
        self.assets.delete()
        self.workspace_roles.delete()
        self.workspaces.delete()


[docs] def default_workspace() -> "Workspace": """Return the default Workspace.""" return Workspace.objects.get( scope__name=settings.DEBUSINE_DEFAULT_SCOPE, name=settings.DEBUSINE_DEFAULT_WORKSPACE, )
class WorkspaceRoles(permissions.Roles): """Available roles for a Workspace.""" OWNER = "owner", "Owner" CONTRIBUTOR = "contributor", "Contributor"
[docs] class Workspace(models.Model): """Workspace model.""" Roles: TypeAlias = WorkspaceRoles objects = WorkspaceManager.from_queryset(WorkspaceQuerySet)() name = models.CharField( max_length=255, validators=[validate_workspace_name] ) public = models.BooleanField(default=False) default_expiration_delay = models.DurationField( default=timedelta(0), help_text="minimal time that a new artifact is kept in the" " workspace before being expired", ) inherits = models.ManyToManyField( "db.Workspace", through="db.WorkspaceChain", through_fields=("child", "parent"), related_name="inherited_by", ) scope = models.ForeignKey( Scope, on_delete=models.PROTECT, related_name="workspaces" ) created_at = models.DateTimeField( auto_now_add=True, help_text="time the workspace was created" ) expiration_delay = models.DurationField( blank=True, null=True, help_text=( "if set, time since the last task completion time" "after which the workspace can be deleted" ), ) class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["scope", "name"], name="%(app_label)s_%(class)s_unique_scope_name", ), ]
[docs] def get_absolute_url(self) -> str: """Return an absolute URL to this workspace.""" return reverse("workspaces:detail", kwargs={"wname": self.name})
[docs] def get_absolute_url_configure(self) -> str: """Return an absolute URL to configure this workspace.""" return reverse("workspaces:update", kwargs={"wname": self.name})
[docs] @copy_signature_from(models.Model.save) def save(self, **kwargs: Any) -> None: """Wrap save with permission checks.""" if self._state.adding: # Create enforce(self.scope.can_create_workspace) else: # Update ... # TODO: check for update permissions return super().save(**kwargs)
[docs] def set_current(self) -> None: """ Set this as the current workspace. This needs to be called after ``context.set_scope`` and ``context.set_user``. """ if (old_workspace := context.workspace) is not None: raise ContextConsistencyError( f"Workspace was already set to {old_workspace}" ) if (scope := context.scope) is None: raise ContextConsistencyError("Cannot set workspace before scope") if self.scope != scope: raise ContextConsistencyError( f"workspace scope {self.scope.name!r}" f" does not match current scope {scope.name!r}" ) if (user := context.user) is None: if context.worker_token: user = AnonymousUser() else: raise ContextConsistencyError( "Cannot set workspace before user" ) workspace_roles: frozenset[Workspace.Roles] if not user.is_authenticated and context.worker_token: workspace_roles = frozenset() elif (user_roles := getattr(self, "user_roles", None)) is not None: # Use cached version if available workspace_roles = frozenset(user_roles) else: workspace_roles = frozenset(self.get_roles(user)) # Check workspace visibility. This is the same as the can_display # predicate, and if changed they need to be kept in sync if ( self.public or context.worker_token or Scope.Roles.OWNER in context.scope_roles or Workspace.Roles.OWNER in workspace_roles or Workspace.Roles.CONTRIBUTOR in workspace_roles ): context._workspace.set(self) context._workspace_roles.set(workspace_roles) else: raise ContextConsistencyError( f"User {user} cannot access workspace {self}" )
[docs] @permission_check("{user} cannot display workspace {resource}") def can_display(self, user: PermissionUser) -> bool: """Check if the workspace can be displayed.""" assert user is not None # enforced by decorator # Shortcuts to avoid hitting the database for common cases if self.public: return True # Workers can currently access all workspaces # TODO: see #523 if context.worker_token: return True if not user.is_authenticated: return False # Shortcut to avoid hitting the database for common cases match self.context_has_role( user, (Workspace.Roles.OWNER, Workspace.Roles.CONTRIBUTOR), scope_roles=Scope.Roles.OWNER, ): case PartialCheckResult.ALLOW: return True case PartialCheckResult.DENY: return False case PartialCheckResult.PASS: pass case _ as unreachable: assert_never(unreachable) return Workspace.objects.can_display(user).filter(pk=self.pk).exists()
[docs] @permission_check("{user} cannot configure workspace {resource}") def can_configure(self, user: PermissionUser) -> bool: """Check if the workspace can be configured.""" assert user is not None # enforced by decorator # Workers cannot change a workspace configuration if context.worker_token: return False if not user.is_authenticated: return False # Shortcut to avoid hitting the database for common cases match self.context_has_role( user, Workspace.Roles.OWNER, scope_roles=Scope.Roles.OWNER, ): case PartialCheckResult.ALLOW: return True case PartialCheckResult.DENY: return False case PartialCheckResult.PASS: pass case _ as unreachable: assert_never(unreachable) return Workspace.objects.can_configure(user).filter(pk=self.pk).exists()
[docs] @permission_check("{user} cannot create artifacts in {resource}") def can_create_artifacts(self, user: PermissionUser) -> bool: """Check if the user can create artifacts.""" assert user is not None # enforced by decorator # Workers can currently access all workspaces # TODO: see #523 if context.worker_token: return True # Anonymous users cannot create artifacts if not user.is_authenticated: return False # Shortcuts to avoid hitting the database for common cases if self.public: # TODO: this allows writing to public workspaces. It reflects the # status quo, and it may now be what we want in the long term return True match self.context_has_role( user, (Workspace.Roles.OWNER, Workspace.Roles.CONTRIBUTOR), scope_roles=Scope.Roles.OWNER, ): case PartialCheckResult.ALLOW: return True case PartialCheckResult.DENY: return False case PartialCheckResult.PASS: pass case _ as unreachable: assert_never(unreachable) return ( Workspace.objects.can_create_artifacts(user) .filter(pk=self.pk) .exists() )
[docs] @permission_check("{user} cannot create work requests in {resource}") def can_create_work_requests(self, user: PermissionUser) -> bool: """Check if the user can create work requests.""" assert user is not None # enforced by decorator # Workers and anonymous users are not allowed if context.worker_token or not user.is_authenticated: return False # Shortcut to avoid hitting the database for common cases match self.context_has_role( user, (Workspace.Roles.OWNER, Workspace.Roles.CONTRIBUTOR), scope_roles=Scope.Roles.OWNER, ): case PartialCheckResult.ALLOW: return True case PartialCheckResult.DENY: return False case PartialCheckResult.PASS: pass case _ as unreachable: assert_never(unreachable) return ( Workspace.objects.can_create_work_requests(user) .filter(pk=self.pk) .exists() )
[docs] @permission_check( "{user} cannot create an experiment workspace from {resource}" ) def can_create_experiment_workspace(self, user: PermissionUser) -> bool: """Check if the user can create an experiment workspace from this.""" assert user is not None # enforced by decorator # Workers can not create experiment workspaces if context.worker_token: return False # Anonymous cannot create experiment workspaces if not user.is_authenticated: return False # Shortcut to avoid hitting the database for common cases if self.public: return True match self.context_has_role( user, (Workspace.Roles.OWNER, Workspace.Roles.CONTRIBUTOR), scope_roles=Scope.Roles.OWNER, ): case PartialCheckResult.ALLOW: return True case PartialCheckResult.DENY: return False case PartialCheckResult.PASS: pass case _ as unreachable: assert_never(unreachable) return ( Workspace.objects.can_create_experiment_workspace(user) .filter(pk=self.pk) .exists() )
[docs] def context_has_role( self, user: "User", roles: WorkspaceRoles | Sequence[WorkspaceRoles] = (), *, scope_roles: ScopeRoles | Sequence[ScopeRoles] = (), ) -> PartialCheckResult: """ Check user roles in the current context. :returns: * ALLOW if the context has enough information to determine that the user has at least one of the given roles * DENY if the context has enough information to determine that the user does not have any of the given roles * PASS if the context does not have enough information to decide """ if not roles and not scope_roles: raise ValueError( "context_has_role needs at least one workspace or scope role" ) if not roles: return self.scope.context_has_role(user, scope_roles) if ( scope_roles and self.scope.context_has_role(user, scope_roles) == PartialCheckResult.ALLOW ): return PartialCheckResult.ALLOW if context.user != user or context.workspace != self: return PartialCheckResult.PASS if isinstance(roles, WorkspaceRoles): roles = (roles,) for role in roles: if role in context.workspace_roles: return PartialCheckResult.ALLOW return PartialCheckResult.DENY
# See https://github.com/typeddjango/django-stubs/issues/1047 for the typing
[docs] def get_roles( self, user: Union["User", "AnonymousUser"] ) -> QuerySet["WorkspaceRole", "WorkspaceRoles"]: """Get the roles of the user on this workspace.""" if not user.is_authenticated: result = WorkspaceRole.objects.none().values_list("role", flat=True) else: result = ( self.roles.filter(group__users=user) .values_list("role", flat=True) .distinct() ) return cast(QuerySet["WorkspaceRole", "WorkspaceRoles"], result)
@property def expire_at(self) -> datetime | None: """Return computed expiration date.""" if self.expiration_delay is None: return None # Try to reuse a value precomputed by # WorkspaceQuerySet.with_expiration_time() # otherwise, compute it and set it if not hasattr(self, "expiration_time"): query = Workspace.objects.with_expiration_time().filter(pk=self.pk) # expiration_time is introduced by with_expiration_time but it's # currently hard to explain to the type system (expiration_time,) = query.values_list( "expiration_time", flat=True ) # type: ignore[misc] setattr(self, "expiration_time", expiration_time) return cast( datetime, self.expiration_time # type: ignore[attr-defined] )
[docs] def file_needs_upload(self, fileobj: File) -> bool: """ Return True if fileobj needs to be uploaded to this workspace. Before an artifact can be considered complete, all its files must be part of the artifact's workspace. This requires each file to be in one of its stores, and also to be in some artifact in the workspace. Otherwise, the file must be uploaded, even if it already exists somewhere else in debusine; this prevents users obtaining unauthorized access to existing file contents. """ from debusine.db.models import FileInArtifact if not self.scope.download_file_stores(fileobj).exists(): return True if not FileInArtifact.objects.filter( artifact__workspace=self, file=fileobj, complete=True ).exists(): return True return False
[docs] def set_inheritance(self, chain: Sequence["Workspace"]) -> None: """Set the inheritance chain for this workspace.""" # Check for duplicates in the chain before altering the database seen: set[int] = set() for workspace in chain: if workspace.pk in seen: raise ValueError( f"duplicate workspace {workspace.name!r}" " in inheritance chain" ) seen.add(workspace.pk) WorkspaceChain.objects.filter(child=self).delete() for idx, workspace in enumerate(chain): WorkspaceChain.objects.create( child=self, parent=workspace, order=idx )
[docs] def get_collection( self, *, # TODO: allow user to be None to mean take it from context? user: Union["User", "AnonymousUser"], category: str, name: str, visited: set[int] | None = None, ) -> "Collection": """ Lookup a collection by category and name. If the collection is not found in this workspace, it follows the workspace inheritance chain using a depth-first search. :param user: user to use for permission checking :param category: collection category :param name: collection name :param visited: for internal use only: state used during graph traversal :raises Collection.DoesNotExist: if the collection was not found """ from debusine.db.models import Collection # Ensure that the user can access this workspace if not self.can_display(user): raise Collection.DoesNotExist # Lookup in this workspace try: return Collection.objects.get( workspace=self, category=category, name=name ) except Collection.DoesNotExist: pass if visited is None: visited = set() visited.add(self.pk) # Follow the inheritance chain for node in self.chain_parents.order_by("order").select_related( "parent" ): workspace = node.parent # Break inheritance loops if workspace.pk in visited: continue try: return workspace.get_collection( user=user, category=category, name=name, visited=visited ) except Collection.DoesNotExist: pass raise Collection.DoesNotExist
[docs] def get_singleton_collection( self, *, user: Union["User", "AnonymousUser"], category: str ) -> "Collection": """ Lookup a singleton collection by category. If the collection is not found in this workspace, it follows the workspace inheritance chain using a depth-first search. :param user: user to use for permission checking :param category: collection category :raises Collection.DoesNotExist: if the collection was not found """ return self.get_collection(user=user, category=category, name="_")
def __str__(self) -> str: """Return basic information of Workspace.""" return f"{self.scope.name}/{self.name}"
class WorkspaceChain(models.Model): """Workspace chaining model.""" child = models.ForeignKey( Workspace, on_delete=models.CASCADE, related_name="chain_parents", help_text="Workspace that falls back on `parent` for lookups", ) parent = models.ForeignKey( Workspace, on_delete=models.CASCADE, related_name="chain_children", help_text="Workspace to be looked up if lookup in `child` fails", ) order = models.IntegerField( help_text="Lookup order of this element in the chain", ) class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["child", "parent"], name="%(app_label)s_%(class)s_unique_child_parent", ), UniqueConstraint( fields=["child", "order"], name="%(app_label)s_%(class)s_unique_child_order", ), ] def __str__(self) -> str: """Return basic information of Workspace.""" return f"{self.order}:{self.child.name}{self.parent.name}" class WorkspaceRole(models.Model): """Role assignments for workspaces.""" Roles: TypeAlias = WorkspaceRoles resource = models.ForeignKey( Workspace, on_delete=models.CASCADE, related_name="roles", ) group = models.ForeignKey( "Group", on_delete=models.CASCADE, related_name="workspace_roles", ) role = models.CharField(max_length=16, choices=Roles.choices) class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["resource", "group", "role"], name="%(app_label)s_%(class)s_unique_resource_group_role", ), ] def __str__(self) -> str: """Return a description of the role assignment.""" return f"{self.group}{self.role}{self.resource}"