# Copyright 2024 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Run mergechanges on multiple uploads."""
import re
from pathlib import Path
from typing import Any
import debusine.utils
from debusine.artifacts import Upload
from debusine.artifacts.models import ArtifactCategory, CollectionCategory
from debusine.client.models import RelationType
from debusine.tasks import BaseTaskWithExecutor, RunCommandTask
from debusine.tasks.models import MergeUploadsData, MergeUploadsDynamicData
from debusine.tasks.server import TaskDatabaseInterface
[docs]
class MergeUploads(
RunCommandTask[MergeUploadsData, MergeUploadsDynamicData],
BaseTaskWithExecutor[MergeUploadsData, MergeUploadsDynamicData],
):
"""
Combines multiple debian:upload artifacts into a single one.
This is in preparation for uploading them together.
"""
TASK_VERSION = 1
# Target .changes file: use CAPTURE_OUTPUT_FILENAME mechanic.
# We could use `mergechanges -f` which automatically names the
# output .changes, but it does that inside the container's
# download_directory which can't easily scan and grab after the
# process finishes. So we use a static predictable output filename.
CAPTURE_OUTPUT_FILENAME = "multi.changes"
[docs]
def __init__(
self,
task_data: dict[str, Any],
dynamic_task_data: dict[str, Any] | None = None,
) -> None:
"""Initialize (constructor)."""
super().__init__(task_data, dynamic_task_data)
self._changes_files: list[Path] = []
[docs]
def compute_dynamic_data(
self, task_database: TaskDatabaseInterface
) -> MergeUploadsDynamicData:
"""Resolve artifact lookups for this task."""
return MergeUploadsDynamicData(
environment_id=self.get_environment(
task_database,
self.data.environment,
default_category=CollectionCategory.ENVIRONMENTS,
),
input_uploads_ids=task_database.lookup_multiple_artifacts(
self.data.input.uploads
),
)
def _cmdline(self) -> list[str]:
"""Build full mergechanges command line."""
cmdline = ['mergechanges']
for changes_file in self._changes_files:
cmdline.append(str(changes_file))
return cmdline
[docs]
def upload_artifacts(
self, execute_directory: Path, *, execution_success: bool # noqa: U100
) -> None:
"""Create DebianUpload artifact and relationships."""
if not self.debusine:
raise AssertionError("self.debusine not set")
assert self.dynamic_data
assert self.executor_instance
target_changes_file = execute_directory / self.CAPTURE_OUTPUT_FILENAME
# Move to source directory so Upload can validate referenced files.
# Rename following mergechanges' "multi" suffix convention.
ref_path = self._changes_files[0]
target_filename = re.sub(
'_[a-z]+.changes$', '_multi.changes', ref_path.name
)
target_changes_file = target_changes_file.rename(
ref_path.with_name(target_filename)
)
changes_artifact = Upload.create(changes_file=target_changes_file)
changes_uploaded = self.debusine.upload_artifact(
changes_artifact,
workspace=self.workspace_name,
work_request=self.work_request_id,
)
for input_upload_id in self.dynamic_data.input_uploads_ids:
self.debusine.relation_create(
changes_uploaded.id,
input_upload_id,
RelationType.EXTENDS,
)
[docs]
def get_label(self) -> str:
"""Return the task label."""
return "merge package uploads"