Refactor old work

This commit is contained in:
Jérome Eertmans
2022-07-12 17:11:32 +02:00
parent ddeb20646d
commit f3d2c4e731
10 changed files with 232 additions and 114 deletions

3
manim_slides/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .slide import Slide, ThreeDSlide
__version__ = "3.0.0"

27
manim_slides/commons.py Normal file
View File

@ -0,0 +1,27 @@
import click
from .defaults import CONFIG_PATH
def config_path_option(function):
return click.option(
"-c",
"--config",
"config_path",
default=CONFIG_PATH,
type=click.Path(dir_okay=False),
help="Set path to configuration file.",
)(function)
def config_options(function):
function = config_path_option(function)
function = click.option(
"-f", "--force", is_flag=True, help="Overwrite any existing configuration file."
)(function)
function = click.option(
"-m",
"--merge",
is_flag=True,
help="Merge any existing configuration file with the new configuration.",
)(function)
return function

48
manim_slides/config.py Normal file
View File

@ -0,0 +1,48 @@
from typing import Optional, Set
from pydantic import BaseModel, root_validator, validator
from .defaults import LEFT_ARROW_KEY_CODE, RIGHT_ARROW_KEY_CODE
class Key(BaseModel):
ids: Set[int]
name: Optional[str] = None
@validator("ids", each_item=True)
def id_is_posint(cls, v: int):
if v < 0:
raise ValueError("Key ids cannot be negative integers")
return v
def match(self, key_id: int):
return key_id in self.ids
class Config(BaseModel):
QUIT: Key = Key(ids=[ord("q")], name="QUIT")
CONTINUE: Key = Key(ids=[RIGHT_ARROW_KEY_CODE], name="CONTINUE / NEXT")
BACK: Key = Key(ids=[LEFT_ARROW_KEY_CODE], name="BACK")
REWIND: Key = Key(ids=[ord("r")], name="REWIND")
PLAY_PAUSE: Key = Key(ids=[32], name="PLAY / PAUSE")
@root_validator
def ids_are_unique_across_keys(cls, values):
ids = set()
for key in values.values():
if len(ids.intersection(key.ids)) != 0:
raise ValueError(
f"Two or more keys share a common key code: please make sure each key has distinc key codes"
)
ids.update(key.ids)
return values
def merge_with(self, other: "Config") -> "Config":
for key_name, key in self:
other_key = getattr(other, key_name)
key.ids.update(other_key.ids)
key.name = other_key.name or key.name
return self

11
manim_slides/defaults.py Normal file
View File

@ -0,0 +1,11 @@
import platform
FOLDER_PATH: str = "./slides"
CONFIG_PATH: str = ".manim-slides.json"
if platform.system() == "Windows":
RIGHT_ARROW_KEY_CODE = 2555904
LEFT_ARROW_KEY_CODE = 2424832
else:
RIGHT_ARROW_KEY_CODE = 65363
LEFT_ARROW_KEY_CODE = 65361

20
manim_slides/main.py Normal file
View File

@ -0,0 +1,20 @@
import click
from click_default_group import DefaultGroup
from . import __version__
from .present import present
from .wizard import wizard, init
@click.group(cls=DefaultGroup, default="present", default_if_no_args=True)
@click.version_option(__version__, "-v", "--version")
@click.help_option("-h", "--help")
def cli():
pass
cli.add_command(present)
cli.add_command(wizard)
cli.add_command(init)
if __name__ == "__main__":
cli()

292
manim_slides/present.py Normal file
View File

@ -0,0 +1,292 @@
import json
import math
import os
import sys
import time
from enum import Enum
import click
import cv2
import numpy as np
from .config import Config
from .defaults import CONFIG_PATH, FOLDER_PATH
from .commons import config_path_option
class State(Enum):
PLAYING = 0
PAUSED = 1
WAIT = 2
END = 3
def __str__(self):
if self.value == 0: return "Playing"
if self.value == 1: return "Paused"
if self.value == 2: return "Wait"
if self.value == 3: return "End"
return "..."
def now():
return round(time.time() * 1000)
def fix_time(x):
return x if x > 0 else 1
class Presentation:
def __init__(self, config, last_frame_next: bool = False):
self.last_frame_next = last_frame_next
self.slides = config["slides"]
self.files = config["files"]
self.lastframe = []
self.caps = [None for _ in self.files]
self.reset()
self.add_last_slide()
def add_last_slide(self):
last_slide_end = self.slides[-1]["end_animation"]
last_animation = len(self.files)
self.slides.append(dict(
start_animation = last_slide_end,
end_animation = last_animation,
type = "last",
number = len(self.slides) + 1,
terminated = False
))
def reset(self):
self.current_animation = 0
self.load_this_cap(0)
self.current_slide_i = 0
self.slides[-1]["terminated"] = False
def next(self):
if self.current_slide["type"] == "last":
self.current_slide["terminated"] = True
else:
self.current_slide_i = min(len(self.slides) - 1, self.current_slide_i + 1)
self.rewind_slide()
def prev(self):
self.current_slide_i = max(0, self.current_slide_i - 1)
self.rewind_slide()
def rewind_slide(self):
self.current_animation = self.current_slide["start_animation"]
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
def load_this_cap(self,cap_number):
if self.caps[cap_number] == None:
# unload other caps
for i in range(len(self.caps)):
if self.caps[i] != None:
self.caps[i].release()
self.caps[i] = None
# load this cap
self.caps[cap_number] = cv2.VideoCapture(self.files[cap_number])
@property
def current_slide(self):
return self.slides[self.current_slide_i]
@property
def current_cap(self):
self.load_this_cap(self.current_animation)
return self.caps[self.current_animation]
@property
def fps(self):
return self.current_cap.get(cv2.CAP_PROP_FPS)
# This function updates the state given the previous state.
# It does this by reading the video information and checking if the state is still correct.
# It returns the frame to show (lastframe) and the new state.
def update_state(self, state):
if state == State.PAUSED:
if len(self.lastframe) == 0:
_, self.lastframe = self.current_cap.read()
return self.lastframe, state
still_playing, frame = self.current_cap.read()
if still_playing:
self.lastframe = frame
elif state in [state.WAIT, state.PAUSED]:
return self.lastframe, state
elif self.current_slide["type"] == "last" and self.current_slide["terminated"]:
return self.lastframe, State.END
if not still_playing:
if self.current_slide["end_animation"] == self.current_animation + 1:
if self.current_slide["type"] == "slide":
# To fix "it always ends one frame before the animation", uncomment this.
# But then clears on the next slide will clear the stationary after this slide.
if self.last_frame_next:
self.load_this_cap(self.next_cap)
self.next_cap = self.caps[self.current_animation + 1]
self.next_cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
_, self.lastframe = self.next_cap.read()
state = State.WAIT
elif self.current_slide["type"] == "loop":
self.current_animation = self.current_slide["start_animation"]
state = State.PLAYING
self.rewind_slide()
elif self.current_slide["type"] == "last":
self.current_slide["terminated"] = True
elif self.current_slide["type"] == "last" and self.current_slide["end_animation"] == self.current_animation:
state = State.WAIT
else:
# Play next video!
self.current_animation += 1
self.load_this_cap(self.current_animation)
# Reset video to position zero if it has been played before
self.current_cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
return self.lastframe, state
class Display:
def __init__(self, presentations, config, start_paused=False, fullscreen=False):
self.presentations = presentations
self.start_paused = start_paused
self.config = config
self.state = State.PLAYING
self.lastframe = None
self.current_presentation_i = 0
self.lag = 0
self.last_time = now()
if fullscreen:
cv2.namedWindow("Video", cv2.WND_PROP_FULLSCREEN)
cv2.setWindowProperty("Video", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
@property
def current_presentation(self):
return self.presentations[self.current_presentation_i]
def run(self):
while True:
self.lastframe, self.state = self.current_presentation.update_state(self.state)
if self.state == State.PLAYING or self.state == State.PAUSED:
if self.start_paused:
self.state = State.PAUSED
self.start_paused = False
if self.state == State.END:
if self.current_presentation_i == len(self.presentations) - 1:
self.quit()
else:
self.current_presentation_i += 1
self.state = State.PLAYING
self.handle_key()
self.show_video()
self.show_info()
def show_video(self):
self.lag = now() - self.last_time
self.last_time = now()
cv2.imshow("Video", self.lastframe)
def show_info(self):
info = np.zeros((130, 420), np.uint8)
font_args = (cv2.FONT_HERSHEY_SIMPLEX, 0.7, 255)
grid_x = [30, 230]
grid_y = [30, 70, 110]
cv2.putText(
info,
f"Animation: {self.current_presentation.current_animation}",
(grid_x[0], grid_y[0]),
*font_args
)
cv2.putText(
info,
f"State: {self.state}",
(grid_x[1], grid_y[0]),
*font_args
)
cv2.putText(
info,
f"Slide {self.current_presentation.current_slide['number']}/{len(self.current_presentation.slides)}",
(grid_x[0], grid_y[1]),
*font_args
)
cv2.putText(
info,
f"Slide Type: {self.current_presentation.current_slide['type']}",
(grid_x[1], grid_y[1]),
*font_args
)
cv2.putText(
info,
f"Scene {self.current_presentation_i + 1}/{len(self.presentations)}",
((grid_x[0]+grid_x[1])//2, grid_y[2]),
*font_args
)
cv2.imshow("Info", info)
def handle_key(self):
sleep_time = math.ceil(1000/self.current_presentation.fps)
key = cv2.waitKeyEx(fix_time(sleep_time - self.lag))
if self.config.QUIT.match(key):
self.quit()
elif self.state == State.PLAYING and self.config.PLAY_PAUSE.match(key):
self.state = State.PAUSED
elif self.state == State.PAUSED and self.config.PLAY_PAUSE.match(key):
self.state = State.PLAYING
elif self.state == State.WAIT and (self.config.CONTINUE.match(key) or self.config.PLAY_PAUSE.match(key)):
self.current_presentation.next()
self.state = State.PLAYING
elif self.state == State.PLAYING and self.config.CONTINUE.match(key):
self.current_presentation.next()
elif self.config.BACK.match(key):
if self.current_presentation.current_slide_i == 0:
self.current_presentation_i = max(0, self.current_presentation_i - 1)
self.current_presentation.reset()
self.state = State.PLAYING
else:
self.current_presentation.prev()
self.state = State.PLAYING
elif self.config.REWIND.match(key):
self.current_presentation.rewind_slide()
self.state = State.PLAYING
def quit(self):
cv2.destroyAllWindows()
sys.exit()
@click.command()
@click.argument("scenes", nargs=-1)
@config_path_option
@click.option("--folder", default=FOLDER_PATH, type=click.Path(exists=True, file_okay=False), help="Set slides folder.")
@click.option("--start-paused", is_flag=True, help="Start paused.")
@click.option("--fullscreen", is_flag=True, help="Fullscreen mode.")
@click.option("--last-frame-next", is_flag=True, help="Show the next animation first frame as last frame (hack).")
@click.help_option("-h", "--help")
def present(scenes, config_path, folder, start_paused, fullscreen, last_frame_next):
"""Present the different scenes"""
presentations = list()
for scene in scenes:
config_file = os.path.join(folder, f"{scene}.json")
if not os.path.exists(config_file):
raise Exception(f"File {config_file} does not exist, check the scene name and make sure to use Slide as your scene base class")
config = json.load(open(config_file))
presentations.append(Presentation(config, last_frame_next=last_frame_next))
if os.path.exists(config_path):
config = Config.parse_file(config_path)
else:
config = Config()
display = Display(presentations, config=config, start_paused=start_paused, fullscreen=fullscreen)
display.run()

89
manim_slides/slide.py Normal file
View File

@ -0,0 +1,89 @@
import json
import os
import shutil
from manim import Scene, ThreeDScene, config
from .defaults import FOLDER_PATH
class Slide(Scene):
def __init__(self, *args, output_folder=FOLDER_PATH, **kwargs):
super(Slide, self).__init__(*args, **kwargs)
self.output_folder = output_folder
self.slides = list()
self.current_slide = 1
self.current_animation = 0
self.loop_start_animation = None
self.pause_start_animation = 0
def play(self, *args, **kwargs):
super(Slide, self).play(*args, **kwargs)
self.current_animation += 1
def pause(self):
self.slides.append(dict(
type="slide",
start_animation=self.pause_start_animation,
end_animation=self.current_animation,
number=self.current_slide
))
self.current_slide += 1
self.pause_start_animation = self.current_animation
def start_loop(self):
assert self.loop_start_animation is None, "You cannot nest loops"
self.loop_start_animation = self.current_animation
def end_loop(self):
assert self.loop_start_animation is not None, "You have to start a loop before ending it"
self.slides.append(dict(
type="loop",
start_animation=self.loop_start_animation,
end_animation=self.current_animation,
number=self.current_slide
))
self.current_slide += 1
self.loop_start_animation = None
self.pause_start_animation = self.current_animation
def render(self, *args, **kwargs):
# We need to disable the caching limit since we rely on intermidiate files
max_files_cached = config["max_files_cached"]
config["max_files_cached"] = float("inf")
super(Slide, self).render(*args, **kwargs)
config["max_files_cached"] = max_files_cached
if not os.path.exists(self.output_folder):
os.mkdir(self.output_folder)
files_folder = os.path.join(self.output_folder, "files")
if not os.path.exists(files_folder):
os.mkdir(files_folder)
scene_name = type(self).__name__
scene_files_folder = os.path.join(files_folder, scene_name)
if os.path.exists(scene_files_folder):
shutil.rmtree(scene_files_folder)
if not os.path.exists(scene_files_folder):
os.mkdir(scene_files_folder)
files = list()
for src_file in self.renderer.file_writer.partial_movie_files:
dst_file = os.path.join(scene_files_folder, os.path.basename(src_file))
shutil.copyfile(src_file, dst_file)
files.append(dst_file)
f = open(os.path.join(self.output_folder, "%s.json" % (scene_name, )), "w")
json.dump(dict(
slides=self.slides,
files=files
), f)
f.close()
class ThreeDSlide(ThreeDScene, Slide):
pass

71
manim_slides/wizard.py Normal file
View File

@ -0,0 +1,71 @@
import os
import sys
import click
import cv2
import numpy as np
from .config import Config
from .commons import config_options
from .defaults import CONFIG_PATH
def prompt(question: str) -> int:
font_args = (cv2.FONT_HERSHEY_SIMPLEX, 0.7, 255)
display = np.zeros((130, 420), np.uint8)
cv2.putText(display, "* Manim Slides Wizard *", (70, 33), *font_args)
cv2.putText(display, question, (30, 85), *font_args)
cv2.imshow("Manim Slides Configuration Wizard", display)
return cv2.waitKeyEx(-1)
@click.command()
@config_options
def wizard(config_path, force, merge):
"""Launch configuration wizard."""
return _init(config_path, force, merge, skip_interactive=False)
@click.command()
@config_options
def init(config_path, force, merge, skip_interactive=False):
"""Initialize a new default configuration file."""
return _init(config_path, force, merge, skip_interactive=True)
def _init(config_path, force, merge, skip_interactive=False):
if os.path.exists(config_path):
click.secho(f"The `{CONFIG_PATH}` configuration file exists")
if not force and not merge:
choice = click.prompt("Do you want to continue and (o)verwrite / (m)erge it, or (q)uit?", type=click.Choice(["o", "m", "q"], case_sensitive=False))
force = choice == "o"
merge = choice == "m"
if force:
click.secho("Overwriting.")
elif merge:
click.secho("Merging.")
else:
click.secho("Exiting.")
sys.exit(0)
config = Config()
if not skip_interactive:
prompt("Press any key to continue")
for _, key in config:
key.ids = [prompt(f"Press the {key.name} key")]
if merge:
config = Config.parse_file(config_path).merge_with(config)
with open(config_path, "w") as config_file:
config_file.write(config.json(indent=4))
click.echo(f"Configuration file successfully save to `{config_path}`")