# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Utility functions for workflows."""
import functools
from collections.abc import Collection as AbcCollection
from collections.abc import Iterable, Sequence
from typing import Any, TYPE_CHECKING
from debusine.artifacts import SourcePackage
from debusine.artifacts.models import (
ArtifactCategory,
CollectionCategory,
DebianBinaryPackage,
DebianBinaryPackages,
DebianPackageBuildLog,
DebianSourcePackage,
DebianUpload,
get_source_package_name,
)
from debusine.client.models import LookupChildType
from debusine.db.models import Artifact, ArtifactRelation, CollectionItem
from debusine.server.collections.lookup import (
LookupResult,
lookup_multiple,
lookup_single,
reconstruct_lookup,
)
from debusine.server.workflows.base import ArtifactHasNoArchitecture
from debusine.tasks import BaseTask, TaskConfigError
from debusine.tasks.models import LookupMultiple, LookupSingle
if TYPE_CHECKING:
from debusine.server.workflows import Workflow
[docs]
@functools.lru_cache(maxsize=100)
def source_package(workflow: "Workflow[Any, Any]") -> Artifact:
"""
Retrieve the source package artifact.
If ``workflow.data.input`` exists, use
``workflow.data.input.source_artifact``, otherwise
``workflow.data.source_artifact``.
If the source artifact is a :ref:`debian:upload <artifact-upload>`,
returns its :ref:`debian:source-package <artifact-source-package>`.
"""
if hasattr(workflow.data, "input"):
lookup = workflow.data.input.source_artifact
configuration_key = "input.source_artifact"
else:
assert hasattr(workflow.data, "source_artifact")
lookup = workflow.data.source_artifact
configuration_key = "source_artifact"
artifact = lookup_single(
lookup,
workflow.workspace,
user=workflow.work_request.created_by,
workflow_root=workflow.work_request.get_workflow_root(),
expect_type=LookupChildType.ARTIFACT,
).artifact
return locate_debian_source_package(configuration_key, artifact)
[docs]
@functools.lru_cache(maxsize=100)
def source_package_data(workflow: "Workflow[Any, Any]") -> DebianSourcePackage:
"""Return source package artifact data for the workflow."""
return SourcePackage.create_data(source_package(workflow).data)
[docs]
def lookup_result_architecture(result: LookupResult) -> str:
"""Get architecture from result of looking up an artifact."""
architecture: str | None
if result.artifact is not None:
artifact_data = result.artifact.create_data()
match artifact_data:
case DebianBinaryPackages():
architecture = artifact_data.architecture
case DebianBinaryPackage():
architecture = artifact_data.deb_fields.get("Architecture")
case DebianUpload():
architecture = artifact_data.changes_fields.get("Architecture")
case _:
raise ArtifactHasNoArchitecture(f"{type(artifact_data)}")
assert isinstance(architecture, str)
return architecture
elif result.collection_item is not None:
architecture = result.collection_item.data.get("architecture")
if isinstance(architecture, str):
return architecture
else:
raise ValueError(
"Unexpected result: must have collection_item or artifact"
)
raise ValueError(
f"Cannot determine architecture for lookup result: {result}"
)
[docs]
def filter_artifact_lookup_by_arch(
workflow: "Workflow[Any, Any]",
lookup: LookupMultiple,
architectures: Iterable[str],
) -> LookupMultiple:
"""Filter an artifact lookup by architecture."""
workflow_root = workflow.work_request.get_workflow_root()
results = lookup_multiple(
lookup,
workflow.workspace,
user=workflow.work_request.created_by,
workflow_root=workflow_root,
expect_type=LookupChildType.ARTIFACT_OR_PROMISE,
)
relevant: list[LookupSingle] = []
for result in results:
arch_in_lookup = lookup_result_architecture(result)
if arch_in_lookup in architectures:
relevant.append(
reconstruct_lookup(result, workflow_root=workflow_root)
)
return LookupMultiple.parse_obj(sorted(relevant))
[docs]
def get_architectures(
workflow: "Workflow[Any, Any]", lookup: LookupMultiple
) -> set[str]:
"""
Return set with all the architectures in the artifacts from the lookup.
The architectures are extracted from each lookup result using
:py:func:`lookup_result_architecture`.
"""
results = lookup_multiple(
lookup,
workflow.workspace,
user=workflow.work_request.created_by,
workflow_root=workflow.work_request.get_workflow_root(),
expect_type=LookupChildType.ARTIFACT_OR_PROMISE,
)
return {lookup_result_architecture(result) for result in results}
[docs]
def locate_debian_source_package(
configuration_key: str, artifact: Artifact
) -> Artifact:
"""
Accept a debian:upload or debian:source-package in a workflow.
Resolve to the :ref:`debian:source-package <artifact-source-package>`.
"""
BaseTask.ensure_artifact_categories(
configuration_key=configuration_key,
category=artifact.category,
expected=[ArtifactCategory.SOURCE_PACKAGE, ArtifactCategory.UPLOAD],
)
match artifact.category:
case ArtifactCategory.SOURCE_PACKAGE:
return artifact
case ArtifactCategory.UPLOAD:
return follow_artifact_relation(
artifact,
ArtifactRelation.Relations.EXTENDS,
ArtifactCategory.SOURCE_PACKAGE,
)
case _ as unreachable: # pragma: no cover
raise AssertionError(f"Unexpected artifact category: {unreachable}")
[docs]
def follow_artifact_relation(
artifact: Artifact,
relation_type: ArtifactRelation.Relations,
category: ArtifactCategory,
) -> Artifact:
"""Follow relations from artifact to find an artifact of category."""
try:
relation = artifact.relations.get(
type=relation_type, target__category=category
)
except ArtifactRelation.DoesNotExist:
raise TaskConfigError(
f"Unable to find an artifact of category {category.value} with "
f'a relationship of type {relation_type} from "{artifact}"'
)
except ArtifactRelation.MultipleObjectsReturned:
raise TaskConfigError(
f"Multiple artifacts of category {category.value} with "
f'a relationship of type {relation_type} from "{artifact}" '
f"found"
)
return relation.target
[docs]
def locate_debian_source_package_lookup(
workflow: "Workflow[Any, Any]", configuration_key: str, lookup: LookupSingle
) -> LookupSingle:
"""
Return a lookup to a debian:source-package.
If the specified lookup returns a :ref:`debian:source-package
<artifact-source-package>`, return it. If it returns a
:ref:`debian:upload <artifact-upload>`, find the related
:ref:`debian:source-package <artifact-source-package>` and return a
lookup to it.
"""
artifact = lookup_single(
lookup,
workflow.workspace,
user=workflow.work_request.created_by,
workflow_root=workflow.work_request.get_workflow_root(),
expect_type=LookupChildType.ARTIFACT,
).artifact
if artifact.category == ArtifactCategory.UPLOAD:
source_package = locate_debian_source_package(
configuration_key, artifact
)
return f"{source_package.id}@artifacts"
BaseTask.ensure_artifact_categories(
configuration_key=configuration_key,
category=artifact.category,
expected=[ArtifactCategory.SOURCE_PACKAGE],
)
return lookup
[docs]
def get_source_package_names(
results: Sequence[LookupResult],
*,
configuration_key: str,
artifact_expected_categories: AbcCollection[ArtifactCategory],
) -> list[str]:
"""
Return a sorted list of source package names from results.
It ensures that:
- The :py:class:`LookupResult` objects contain either an artifact or
promise.
- Artifacts belong to the artifact_expected_categories.
- If :py:class:`LookupResult` is a promise: extracts the name from the
promise data ``source_package_name``.
:param results: A sequence of :py:class:`LookupResult` objects
representing artifacts to be processed. Each entry is expected to be
either an artifact or a promise.
:param configuration_key: A string used by
:py:meth:`BaseTask.ensure_artifact_categories` for the exception
message.
:param artifact_expected_categories: valid :py:class:`ArtifactCategory`
that artifacts must belong to.
:return: A sorted list of source package names.
"""
source_package_names = set()
for result in results:
# lookup_multiple expect_type: only artifacts or promises
match result.result_type:
case CollectionItem.Types.ARTIFACT:
assert result.artifact is not None
category = result.artifact.category
BaseTask.ensure_artifact_categories(
configuration_key=configuration_key,
category=category,
expected=artifact_expected_categories,
)
artifact_data = result.artifact.create_data()
assert isinstance(
artifact_data,
(
DebianSourcePackage,
DebianUpload,
DebianBinaryPackage,
DebianBinaryPackages,
DebianPackageBuildLog,
),
)
source_package_names.add(get_source_package_name(artifact_data))
case _:
# Makes coverage happy
# It's a promise.
assert result.result_type == CollectionItem.Types.BARE
assert result.collection_item is not None
BaseTask.ensure_artifact_categories(
configuration_key=configuration_key,
category=result.collection_item.data["promise_category"],
expected=artifact_expected_categories,
)
if (
package_name := result.collection_item.data.get(
"source_package_name"
)
) is not None:
source_package_names.add(package_name)
return sorted(source_package_names)
[docs]
def get_available_architectures(
workflow: "Workflow[Any, Any]", *, vendor: str, codename: str
) -> set[str]:
"""Get architectures available for use with this vendor/codename."""
architectures = set()
for result in lookup_multiple(
LookupMultiple.parse_obj(
{"collection": vendor, "data__codename": codename}
),
workflow.workspace,
user=workflow.work_request.created_by,
default_category=CollectionCategory.ENVIRONMENTS,
expect_type=LookupChildType.ARTIFACT,
):
architectures.add(result.artifact.data.get("architecture"))
architectures.add("all")
return architectures