Addon: fix MyPy errors

This commit is contained in:
Sybren A. Stüvel 2022-03-14 16:38:06 +01:00
parent 7b090bdbd6
commit 33b5faff2b
4 changed files with 32 additions and 28 deletions

View File

@ -2,7 +2,7 @@
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from typing import Optional, Any
import logging
import queue
import threading
@ -19,10 +19,10 @@ shaman = wheels.load_wheel("blender_asset_tracer.pack.shaman")
log = logging.getLogger(__name__)
# For using in other parts of the add-on, so only this file imports BAT.
Aborted = pack.Aborted
FileTransferError = transfer.FileTransferError
parse_shaman_endpoint = shaman.parse_endpoint
# # For using in other parts of the add-on, so only this file imports BAT.
# Aborted = pack.Aborted
# FileTransferError = transfer.FileTransferError
# parse_shaman_endpoint = shaman.parse_endpoint
class Message:
@ -57,26 +57,27 @@ class MsgDone(Message):
missing_files: list[Path]
class BatProgress(progress.Callback):
# MyPy doesn't understand the way BAT subpackages are imported.
class BatProgress(progress.Callback): # type: ignore
"""Report progress of BAT Packing to the given queue."""
def __init__(self, queue: queue.Queue[Message]) -> None:
def __init__(self, queue: queue.SimpleQueue[Message]) -> None:
super().__init__()
self.queue = queue
def _set_attr(self, attr: str, value):
def _set_attr(self, attr: str, value: Any) -> None:
msg = MsgSetWMAttribute(attr, value)
self.queue.put(msg)
def _txt(self, msg: str):
def _txt(self, msg: str) -> None:
"""Set a text in a thread-safe way."""
self._set_attr("flamenco_bat_status_txt", msg)
def _status(self, status: str):
def _status(self, status: str) -> None:
"""Set the flamenco_bat_status property in a thread-safe way."""
self._set_attr("flamenco_bat_status", status)
def _progress(self, progress: int):
def _progress(self, progress: int) -> None:
"""Set the flamenco_bat_progress property in a thread-safe way."""
self._set_attr("flamenco_bat_progress", progress)
msg = MsgProgress(percentage=progress)
@ -93,7 +94,7 @@ class BatProgress(progress.Callback):
else:
self._txt("Pack of %s done" % output_blendfile.name)
def pack_aborted(self, reason: str):
def pack_aborted(self, reason: str) -> None:
self._txt("Aborted: %s" % reason)
self._status("ABORTED")
@ -157,9 +158,10 @@ class BatProgress(progress.Callback):
class PackThread(threading.Thread):
queue: queue.Queue[Message]
queue: queue.SimpleQueue[Message]
def __init__(self, packer: pack.Packer) -> None:
# MyPy doesn't understand the way BAT subpackages are imported.
def __init__(self, packer: pack.Packer) -> None: # type: ignore
# Quitting Blender should abort the transfer (instead of hanging until
# the transfer is done), hence daemon=True.
super().__init__(daemon=True, name="PackThread")
@ -221,15 +223,15 @@ _running_packthread: typing.Optional[PackThread] = None
_packer_lock = threading.RLock()
def copy(
def copy( # type: ignore
base_blendfile: Path,
project: Path,
target: str,
exclusion_filter: str,
*,
relative_only: bool,
packer_class=pack.Packer,
**packer_args
packer_class=pack.Packer, # type: ignore
**packer_args: dict[Any, Any],
) -> PackThread:
"""Use BAT to copy the given file and dependencies to the target location.
@ -248,7 +250,7 @@ def copy(
target,
compress=True,
relative_only=relative_only,
**packer_args
**packer_args,
)
if exclusion_filter:
filter_parts = exclusion_filter.strip().split(" ")

View File

@ -31,10 +31,12 @@ _job_type_enum_items: list[
Union[tuple[str, str, str], tuple[str, str, str, int, int]]
] = []
_selected_job_type_propgroup: Optional[job_types_propgroup.JobTypePropertyGroup] = None
_selected_job_type_propgroup: Optional[
type[job_types_propgroup.JobTypePropertyGroup]
] = None
def fetch_available_job_types(api_client, scene: bpy.types.Scene):
def fetch_available_job_types(api_client: _ApiClient, scene: bpy.types.Scene) -> None:
from flamenco.manager import ApiClient
from flamenco.manager.api import jobs_api
from flamenco.manager.model.available_job_types import AvailableJobTypes
@ -155,7 +157,7 @@ def job_for_scene(scene: bpy.types.Scene) -> Optional[_SubmittedJob]:
settings = JobSettings()
metadata = JobMetadata()
job = SubmittedJob(
job: SubmittedJob = SubmittedJob(
name=scene.flamenco_job_name,
type=settings_propgroup.job_type.name,
priority=50,
@ -166,7 +168,7 @@ def job_for_scene(scene: bpy.types.Scene) -> Optional[_SubmittedJob]:
return job
def _clear_available_job_types(scene: bpy.types.Scene):
def _clear_available_job_types(scene: bpy.types.Scene) -> None:
global _available_job_types
global _job_type_enum_items
@ -177,7 +179,7 @@ def _clear_available_job_types(scene: bpy.types.Scene):
scene.flamenco_available_job_types_json = ""
def _clear_job_type_propgroup():
def _clear_job_type_propgroup() -> None:
global _selected_job_type_propgroup
try:

View File

@ -178,7 +178,7 @@ def _job_setting_key_to_label(setting_key: str) -> str:
def _set_if_available(
some_dict: dict[object, object],
some_dict: dict[Any, Any],
setting: object,
key: str,
transform: Optional[Callable[[object], object]] = None,

View File

@ -109,18 +109,18 @@ class FLAMENCO_OT_submit_job(FlamencoOpMixin, bpy.types.Operator):
bl_description = "Pack the current blend file and send it to Flamenco"
bl_options = {"REGISTER"} # No UNDO.
job_name: bpy.props.StringProperty(name="Job Name")
job_name: bpy.props.StringProperty(name="Job Name") # type: ignore
timer: Optional[bpy.types.Timer] = None
packthread: Optional[_PackThread] = None
log = _log.getChild(bl_idname)
def invoke(self, context: bpy.types.Context, event) -> set[str]:
def invoke(self, context: bpy.types.Context, event: bpy.types.Event) -> set[str]:
filepath = self._save_blendfile(context)
return self._bat_pack(context, filepath)
def modal(self, context: bpy.types.Context, event) -> set[str]:
def modal(self, context: bpy.types.Context, event: bpy.types.Event) -> set[str]:
# This function is called for TIMER events to poll the BAT pack thread.
if event.type != "TIMER":
return {"PASS_THROUGH"}
@ -177,7 +177,7 @@ class FLAMENCO_OT_submit_job(FlamencoOpMixin, bpy.types.Operator):
if bat_interface.is_packing():
self.report({"ERROR"}, "Another packing operation is running")
self._quit()
self._quit(context)
return {"CANCELLED"}
# TODO: get project path from addon preferences / project definition on Manager.