Addon: Add worker cluster support

Worker clusters can be fetched from the Manager, and submitted jobs can
be assigned to those clusters.
This commit is contained in:
Sybren A. Stüvel 2023-04-04 12:19:44 +02:00
parent 3306c7fc8d
commit 2a6cbcf030
6 changed files with 141 additions and 4 deletions

View File

@ -19,7 +19,7 @@ from pathlib import Path
__is_first_load = "operators" not in locals() __is_first_load = "operators" not in locals()
if __is_first_load: if __is_first_load:
from . import operators, gui, job_types, comms, preferences from . import operators, gui, job_types, comms, preferences, worker_clusters
else: else:
import importlib import importlib
@ -28,6 +28,7 @@ else:
job_types = importlib.reload(job_types) job_types = importlib.reload(job_types)
comms = importlib.reload(comms) comms = importlib.reload(comms)
preferences = importlib.reload(preferences) preferences = importlib.reload(preferences)
worker_clusters = importlib.reload(worker_clusters)
import bpy import bpy
@ -145,6 +146,7 @@ def register() -> None:
) )
preferences.register() preferences.register()
worker_clusters.register()
operators.register() operators.register()
gui.register() gui.register()
job_types.register() job_types.register()
@ -162,4 +164,5 @@ def unregister() -> None:
job_types.unregister() job_types.unregister()
gui.unregister() gui.unregister()
operators.unregister() operators.unregister()
worker_clusters.unregister()
preferences.unregister() preferences.unregister()

View File

@ -43,6 +43,11 @@ class FLAMENCO_PT_job_submission(bpy.types.Panel):
col.prop(context.scene, "flamenco_job_name", text="Job Name") col.prop(context.scene, "flamenco_job_name", text="Job Name")
col.prop(context.scene, "flamenco_job_priority", text="Priority") col.prop(context.scene, "flamenco_job_priority", text="Priority")
# Worker cluster:
row = col.row(align=True)
row.prop(context.scene, "flamenco_worker_cluster", text="Cluster")
row.operator("flamenco.fetch_worker_clusters", text="", icon="FILE_REFRESH")
layout.separator() layout.separator()
col = layout.column() col = layout.column()

View File

@ -53,6 +53,11 @@ def job_for_scene(scene: bpy.types.Scene) -> Optional[_SubmittedJob]:
submitter_platform=platform.system().lower(), submitter_platform=platform.system().lower(),
type_etag=propgroup.job_type.etag, type_etag=propgroup.job_type.etag,
) )
worker_cluster: str = getattr(scene, "flamenco_worker_cluster", "")
if worker_cluster and worker_cluster != "-":
job.worker_cluster = worker_cluster
return job return job

View File

@ -10,7 +10,7 @@ from urllib3.exceptions import HTTPError, MaxRetryError
import bpy import bpy
from . import job_types, job_submission, preferences from . import job_types, job_submission, preferences, worker_clusters
from .job_types_propgroup import JobTypePropertyGroup from .job_types_propgroup import JobTypePropertyGroup
from .bat.submodules import bpathlib from .bat.submodules import bpathlib
@ -83,6 +83,37 @@ class FLAMENCO_OT_fetch_job_types(FlamencoOpMixin, bpy.types.Operator):
return {"FINISHED"} return {"FINISHED"}
class FLAMENCO_OT_fetch_worker_clusters(FlamencoOpMixin, bpy.types.Operator):
bl_idname = "flamenco.fetch_worker_clusters"
bl_label = "Fetch Worker Clusters"
bl_description = "Query Flamenco Manager to obtain the available worker clusters"
def execute(self, context: bpy.types.Context) -> set[str]:
api_client = self.get_api_client(context)
from flamenco.manager import ApiException
scene = context.scene
old_cluster = getattr(scene, "flamenco_worker_cluster", "")
try:
worker_clusters.refresh(context, api_client)
except ApiException as ex:
self.report({"ERROR"}, "Error getting job types: %s" % ex)
return {"CANCELLED"}
except MaxRetryError as ex:
# This is the common error, when for example the port number is
# incorrect and nothing is listening.
self.report({"ERROR"}, "Unable to reach Manager")
return {"CANCELLED"}
if old_cluster:
# TODO: handle cases where the old cluster no longer exists.
scene.flamenco_worker_cluster = old_cluster
return {"FINISHED"}
class FLAMENCO_OT_ping_manager(FlamencoOpMixin, bpy.types.Operator): class FLAMENCO_OT_ping_manager(FlamencoOpMixin, bpy.types.Operator):
bl_idname = "flamenco.ping_manager" bl_idname = "flamenco.ping_manager"
bl_label = "Flamenco: Ping Manager" bl_label = "Flamenco: Ping Manager"
@ -165,7 +196,9 @@ class FLAMENCO_OT_submit_job(FlamencoOpMixin, bpy.types.Operator):
if not context.blend_data.filepath: if not context.blend_data.filepath:
# The file path needs to be known before the file can be submitted. # The file path needs to be known before the file can be submitted.
self.report({"ERROR"}, "Please save your .blend file before submitting to Flamenco") self.report(
{"ERROR"}, "Please save your .blend file before submitting to Flamenco"
)
return {"CANCELLED"} return {"CANCELLED"}
filepath = self._save_blendfile(context) filepath = self._save_blendfile(context)
@ -633,6 +666,7 @@ class FLAMENCO3_OT_explore_file_path(bpy.types.Operator):
classes = ( classes = (
FLAMENCO_OT_fetch_job_types, FLAMENCO_OT_fetch_job_types,
FLAMENCO_OT_fetch_worker_clusters,
FLAMENCO_OT_ping_manager, FLAMENCO_OT_ping_manager,
FLAMENCO_OT_eval_setting, FLAMENCO_OT_eval_setting,
FLAMENCO_OT_submit_job, FLAMENCO_OT_submit_job,

View File

@ -34,6 +34,12 @@ def _manager_url_updated(prefs, context):
comms.ping_manager_with_report(context.window_manager, api_client, prefs) comms.ping_manager_with_report(context.window_manager, api_client, prefs)
class WorkerCluster(bpy.types.PropertyGroup):
id: bpy.props.StringProperty(name="id")
name: bpy.props.StringProperty(name="Name")
description: bpy.props.StringProperty(name="Description")
class FlamencoPreferences(bpy.types.AddonPreferences): class FlamencoPreferences(bpy.types.AddonPreferences):
bl_idname = "flamenco" bl_idname = "flamenco"
@ -71,6 +77,13 @@ class FlamencoPreferences(bpy.types.AddonPreferences):
get=lambda prefs: prefs.job_storage, get=lambda prefs: prefs.job_storage,
) )
worker_clusters: bpy.props.CollectionProperty( # type: ignore
type=WorkerCluster,
name="Worker Clusters",
description="Cache for the worker clusters available on the configured Manager",
options={"HIDDEN"},
)
def draw(self, context: bpy.types.Context) -> None: def draw(self, context: bpy.types.Context) -> None:
layout = self.layout layout = self.layout
layout.use_property_decorate = False layout.use_property_decorate = False
@ -117,7 +130,10 @@ def manager_url(context: bpy.types.Context) -> str:
return str(prefs.manager_url) return str(prefs.manager_url)
classes = (FlamencoPreferences,) classes = (
WorkerCluster,
FlamencoPreferences,
)
_register, _unregister = bpy.utils.register_classes_factory(classes) _register, _unregister = bpy.utils.register_classes_factory(classes)

View File

@ -0,0 +1,74 @@
# SPDX-License-Identifier: GPL-3.0-or-later
from typing import TYPE_CHECKING, Union
import bpy
from . import preferences
if TYPE_CHECKING:
from flamenco.manager import ApiClient as _ApiClient
else:
_ApiClient = object
_enum_items: list[Union[tuple[str, str, str], tuple[str, str, str, int, int]]] = []
def refresh(context: bpy.types.Context, api_client: _ApiClient) -> None:
"""Fetch the available worker clusters from the Manager."""
from flamenco.manager import ApiClient
from flamenco.manager.api import worker_mgt_api
from flamenco.manager.model.worker_cluster_list import WorkerClusterList
assert isinstance(api_client, ApiClient)
api = worker_mgt_api.WorkerMgtApi(api_client)
response: WorkerClusterList = api.fetch_worker_clusters()
# Store on the preferences, so a cached version persists until the next refresh.
prefs = preferences.get(context)
prefs.worker_clusters.clear()
for cluster in response.clusters:
rna_cluster = prefs.worker_clusters.add()
rna_cluster.id = cluster.id
rna_cluster.name = cluster.name
rna_cluster.description = getattr(cluster, "description", "")
def _get_enum_items(self, context):
global _enum_items
prefs = preferences.get(context)
_enum_items = [
("-", "No Cluster", "No cluster assigned, any worker can handle this job"),
]
_enum_items.extend(
(cluster.id, cluster.name, cluster.description)
for cluster in prefs.worker_clusters
)
return _enum_items
def register() -> None:
bpy.types.Scene.flamenco_worker_cluster = bpy.props.EnumProperty(
name="Worker Cluster",
items=_get_enum_items,
description="The set of Workers that can handle tasks of this job",
)
def unregister() -> None:
to_del = ((bpy.types.Scene, "flamenco_worker_cluster"),)
for ob, attr in to_del:
try:
delattr(ob, attr)
except AttributeError:
pass
if __name__ == "__main__":
import doctest
print(doctest.testmod())