Files
pre-commit-ci[bot] 3c8384a908 chore(fmt): auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2025-01-29 18:29:09 +00:00

433 lines
14 KiB
Python

"""Manim Slides' configuration tools."""
import json
import shutil
from functools import wraps
from inspect import Parameter, signature
from pathlib import Path
from textwrap import dedent
from typing import Any, Callable, Optional
import rtoml
from pydantic import (
BaseModel,
Field,
FilePath,
PositiveInt,
PrivateAttr,
ValidationError,
conset,
field_serializer,
field_validator,
model_validator,
)
from pydantic_extra_types.color import Color
from .logger import logger
Receiver = Callable[..., Any]
class Signal(BaseModel): # type: ignore[misc]
"""Signal that notifies a list of receivers when it is emitted."""
__receivers: set[Receiver] = PrivateAttr(default_factory=set)
def connect(self, receiver: Receiver) -> None:
"""
Connect a receiver to this signal.
This is a no-op if the receiver was already connected to this signal.
:param receiver: The receiver to connect.
"""
self.__receivers.add(receiver)
def disconnect(self, receiver: Receiver) -> None:
"""
Disconnect a receiver from this signal.
This is a no-op if the receiver was not connected to this signal.
:param receiver: The receiver to disconnect.
"""
self.__receivers.discard(receiver)
def emit(self, *args: Any) -> None:
"""
Emit this signal and call each of the attached receivers.
:param args: Positional arguments passed to each receiver.
"""
for receiver in self.__receivers:
receiver(*args)
def key_id(name: str) -> PositiveInt:
"""
Return the id corresponding to the given key name.
:param str: The name of the key, e.g., 'Q'.
:return: The corresponding id.
"""
from qtpy.QtCore import Qt # Avoid importing Qt too early."""
return getattr(Qt, f"Key_{name}")
class Key(BaseModel): # type: ignore[misc]
"""Represent a list of key codes, with optionally a name."""
ids: conset(PositiveInt, min_length=1) # type: ignore[valid-type]
name: Optional[str] = None
__signal: Signal = PrivateAttr(default_factory=Signal)
def set_ids(self, *ids: int) -> None:
self.ids = set(ids)
def match(self, key_id: int) -> bool:
"""Return whether a given key id matches this key."""
m = key_id in self.ids
if m:
logger.debug(f"Pressed key: {self.name}")
return m
@property
def signal(self) -> Signal:
return self.__signal
def connect(self, function: Receiver) -> None:
self.__signal.connect(function)
@field_serializer("ids")
def serialize_dt(self, ids: set[int]) -> list[int]:
return list(self.ids)
class Keys(BaseModel): # type: ignore[misc]
QUIT: Key = Field(default_factory=lambda: Key(ids=[key_id("Q")], name="QUIT"))
PLAY_PAUSE: Key = Field(
default_factory=lambda: Key(ids=[key_id("Space")], name="PLAY / PAUSE")
)
NEXT: Key = Field(default_factory=lambda: Key(ids=[key_id("Right")], name="NEXT"))
PREVIOUS: Key = Field(
default_factory=lambda: Key(ids=[key_id("Left")], name="PREVIOUS")
)
REVERSE: Key = Field(default_factory=lambda: Key(ids=[key_id("V")], name="REVERSE"))
REPLAY: Key = Field(default_factory=lambda: Key(ids=[key_id("R")], name="REPLAY"))
FULL_SCREEN: Key = Field(
default_factory=lambda: Key(ids=[key_id("F")], name="TOGGLE FULL SCREEN")
)
HIDE_MOUSE: Key = Field(
default_factory=lambda: Key(ids=[key_id("H")], name="HIDE / SHOW MOUSE")
)
@model_validator(mode="before")
@classmethod
def ids_are_unique_across_keys(cls, values: dict[str, Key]) -> dict[str, Key]:
ids: set[int] = set()
for key in values.values():
if len(ids.intersection(key["ids"])) != 0:
raise ValueError(
"Two or more keys share a common key code: please make sure each key has distinct key codes"
)
ids.update(key["ids"])
return values
def merge_with(self, other: "Keys") -> "Keys":
for key_name, key in self:
other_key = getattr(other, key_name)
key.ids = list(set(key.ids).union(other_key.ids))
key.name = other_key.name or key.name
return self
def dispatch_key_function(self) -> Callable[[PositiveInt], None]:
_dispatch = {}
for _, key in self:
for _id in key.ids:
_dispatch[_id] = key.signal
def dispatch(key: PositiveInt) -> None:
if signal := _dispatch.get(key, None):
signal.emit()
return dispatch
class Config(BaseModel): # type: ignore[misc]
"""General Manim Slides config."""
keys: Keys = Field(default_factory=Keys)
"""The key mapping."""
@classmethod
def from_file(cls, path: Path) -> "Config":
"""Read a configuration from a file."""
return cls.model_validate(rtoml.load(path)) # type: ignore
def to_file(self, path: Path) -> None:
"""Dump this configuration to a file."""
rtoml.dump(self.model_dump(), path, pretty=True)
def merge_with(self, other: "Config") -> "Config":
"""
Merge with another config.
:param other: The other config to be merged with.
:return: This config, updated.
"""
self.keys = self.keys.merge_with(other.keys)
return self
class BaseSlideConfig(BaseModel): # type: ignore
"""Base class for slide config."""
loop: bool = False
"""Whether this slide should loop."""
auto_next: bool = False
"""Whether this slide is skipped upon completion."""
playback_rate: float = 1.0
"""The speed at which the animation is played (1.0 is normal)."""
reversed_playback_rate: float = 1.0
"""The speed at which the reversed animation is played."""
notes: str = ""
"""The notes attached to this slide."""
dedent_notes: bool = True
"""Whether to automatically remove any leading indentation in the notes."""
skip_animations: bool = False
src: Optional[FilePath] = None
@classmethod
def wrapper(cls, arg_name: str) -> Callable[..., Any]:
"""
Wrap a function to transform keyword argument into an instance of this class.
The function signature is updated to reflect the new keyword-only arguments.
The wrapped function must follow two criteria:
- its last parameter must be ``**kwargs`` (or equivalent);
- and its second last parameter must be ``<arg_name>``.
:param arg_name: The name of the argument.
:return: The wrapped function.
"""
# TODO: improve docs and (maybe) type-hints too
def _wrapper_(fun: Callable[..., Any]) -> Callable[..., Any]:
@wraps(fun)
def __wrapper__(*args: Any, **kwargs: Any) -> Any: # noqa: N807
fun_kwargs = {
key: value
for key, value in kwargs.items()
if key not in cls.model_fields
}
fun_kwargs[arg_name] = cls(**kwargs)
return fun(*args, **fun_kwargs)
sig = signature(fun)
parameters = list(sig.parameters.values())
parameters[-2:-1] = [
Parameter(
field_name,
Parameter.KEYWORD_ONLY,
default=field_info.default,
annotation=field_info.annotation,
)
for field_name, field_info in cls.model_fields.items()
]
sig = sig.replace(parameters=parameters)
__wrapper__.__signature__ = sig # type: ignore[attr-defined]
return __wrapper__
return _wrapper_
@model_validator(mode="after")
def apply_dedent_notes(
self,
) -> "BaseSlideConfig":
"""
Remove indentation from notes, if specified.
:return: The config, optionally modified.
"""
if self.dedent_notes:
self.notes = dedent(self.notes)
return self
class PreSlideConfig(BaseSlideConfig):
"""Slide config to be used prior to rendering."""
start_animation: int
"""The index of the first animation."""
end_animation: int
"""The index after the last animation."""
@classmethod
def from_base_slide_config_and_animation_indices(
cls,
base_slide_config: BaseSlideConfig,
start_animation: int,
end_animation: int,
) -> "PreSlideConfig":
"""
Create a config from a base config and animation indices.
:param base_slide_config: The base config.
:param start_animation: The index of the first animation.
:param end_animation: The index after the last animation.
"""
return cls(
start_animation=start_animation,
end_animation=end_animation,
**base_slide_config.model_dump(),
)
@field_validator("start_animation", "end_animation")
@classmethod
def index_is_posint(cls, v: int) -> int:
"""
Validate that animation indices are positive integers.
:param v: An animation index.
:return: The animation index, if valid.
"""
if v < 0:
raise ValueError("Animation index (start or end) cannot be negative")
return v
@model_validator(mode="after")
def start_animation_is_before_end(
self,
) -> "PreSlideConfig":
"""
Validate that start and end animation indices satisfy 'start < end'.
:return: The config, if indices are valid.
"""
raise ValueError(
"Start animation index must be strictly lower than end animation index"
)
return self
@model_validator(mode="after")
def has_src_or_more_than_zero_animations(
self,
) -> "PreSlideConfig":
if self.src is not None and self.start_animation != self.end_animation:
raise ValueError(
"A slide cannot have 'src=...' and more than zero animations at the same time."
)
elif self.src is None and self.start_animation == self.end_animation:
raise ValueError(
"You have to play at least one animation (e.g., 'self.wait()') "
"before pausing. If you want to start paused, use the appropriate "
"command-line option when presenting. "
"IMPORTANT: when using ManimGL, 'self.wait()' is not considered "
"to be an animation, so prefer to directly use 'self.play(...)'."
)
return self
@property
def slides_slice(self) -> slice:
return slice(self.start_animation, self.end_animation)
class SlideConfig(BaseSlideConfig):
"""Slide config to be used after rendering."""
file: FilePath
"""The file containing the animation."""
rev_file: FilePath
"""The file containing the reversed animation."""
@classmethod
def from_pre_slide_config_and_files(
cls, pre_slide_config: PreSlideConfig, file: Path, rev_file: Path
) -> "SlideConfig":
return cls(file=file, rev_file=rev_file, **pre_slide_config.model_dump())
class PresentationConfig(BaseModel): # type: ignore[misc]
"""Presentation config that contains all necessary information for a presentation."""
slides: list[SlideConfig] = Field(min_length=1)
"""The non-empty list of slide configs."""
resolution: tuple[PositiveInt, PositiveInt] = (1920, 1080)
"""The resolution of the animation files."""
background_color: Color = "black"
"""The background color of the animation files."""
@classmethod
def from_file(cls, path: Path) -> "PresentationConfig":
"""
Read a presentation configuration from a file.
:param path: The path where the config is read from.
"""
with open(path) as f:
obj = json.load(f)
slides = obj.setdefault("slides", [])
parent = path.parent.parent # Never fails, but parents[1] can fail
for slide in slides:
if file := slide.get("file", None):
slide["file"] = parent / file
if rev_file := slide.get("rev_file", None):
slide["rev_file"] = parent / rev_file
return cls.model_validate(obj) # type: ignore
def to_file(self, path: Path) -> None:
"""
Dump the presentation configuration to a file.
:param path: The path to save this config.
"""
with open(path, "w") as f:
f.write(self.model_dump_json(indent=2))
def copy_to(
self,
folder: Path,
use_cached: bool = True,
include_reversed: bool = True,
prefix: str = "",
) -> None:
"""
Copy the files to a given directory and return the corresponding configuration.
:param folder: The folder that will contain the animation files.
:param use_cached: Whether caching should be used to avoid copies when possible.
:param include_reversed: Whether to also copy reversed animation to the folder.
:param prefix: Optional prefix added to each file name.
"""
for slide_config in self.slides:
file = slide_config.file
rev_file = slide_config.rev_file
dest = folder / f"{prefix}{file.name}"
rev_dest = folder / f"{prefix}{rev_file.name}"
if not use_cached or not dest.exists():
shutil.copy(file, dest)
if include_reversed and (not use_cached or not rev_dest.exists()):
# TODO: if include_reversed is False, then rev_dev will likely not exist
# and this will cause an issue when decoding.
shutil.copy(rev_file, rev_dest)