From b021b21d88e4bf60c407776097327d085b8908ec Mon Sep 17 00:00:00 2001 From: Mikhail Podgurskiy Date: Wed, 25 Nov 2020 07:33:03 +0600 Subject: [PATCH] Opensouse forms and fsm --- tests/fsm/__init__.py | 0 tests/fsm/test_fsm__advanced.py | 107 ++++++++ tests/fsm/test_fsm__basics.py | 97 ++++++++ tests/fsm/test_fsm__conditions.py | 42 ++++ tests/fsm/test_fsm__inheritance.py | 116 +++++++++ tests/fsm/test_fsm__model.py | 63 +++++ tests/fsm/test_fsm__permissions.py | 46 ++++ tests/test_middleware.py | 50 ++++ tests/test_this_object.py | 47 ++++ tests/test_utils__viewprop.py | 22 ++ viewflow/__init__.py | 26 ++ viewflow/apps.py | 8 + viewflow/conf.py | 53 ++++ viewflow/forms/__init__.py | 5 + viewflow/forms/renderers.py | 384 ++++++++++++++++++++++++++++ viewflow/fsm/__init__.py | 9 + viewflow/fsm/base.py | 386 +++++++++++++++++++++++++++++ viewflow/fsm/typing.py | 8 + viewflow/middleware.py | 82 ++++++ viewflow/templatetags/viewflow.py | 41 +++ viewflow/this_object.py | 49 ++++ 21 files changed, 1641 insertions(+) create mode 100644 tests/fsm/__init__.py create mode 100644 tests/fsm/test_fsm__advanced.py create mode 100644 tests/fsm/test_fsm__basics.py create mode 100644 tests/fsm/test_fsm__conditions.py create mode 100644 tests/fsm/test_fsm__inheritance.py create mode 100644 tests/fsm/test_fsm__model.py create mode 100644 tests/fsm/test_fsm__permissions.py create mode 100644 tests/test_middleware.py create mode 100644 tests/test_this_object.py create mode 100644 tests/test_utils__viewprop.py create mode 100644 viewflow/apps.py create mode 100644 viewflow/conf.py create mode 100644 viewflow/forms/__init__.py create mode 100644 viewflow/forms/renderers.py create mode 100644 viewflow/fsm/__init__.py create mode 100644 viewflow/fsm/base.py create mode 100644 viewflow/fsm/typing.py create mode 100644 viewflow/middleware.py create mode 100644 viewflow/this_object.py diff --git a/tests/fsm/__init__.py b/tests/fsm/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/fsm/test_fsm__advanced.py b/tests/fsm/test_fsm__advanced.py new file mode 100644 index 0000000..1c19428 --- /dev/null +++ b/tests/fsm/test_fsm__advanced.py @@ -0,0 +1,107 @@ +"""FSM tests for advanced source/target options""" + +from django.test import TestCase +from django.utils.translation import gettext_lazy as _ +from viewflow.fsm import State +from .test_fsm__basics import ReviewState + + +class Publication(object): + stage = State( + ReviewState, + default=ReviewState.NEW + ) + + def __init__(self, text): + self.text = text + + @stage.transition(source=ReviewState.NEW) + def notify(self): + pass + + @stage.transition( + source={ReviewState.NEW, ReviewState.HIDDEN}, + target=ReviewState.PUBLISHED + ) + def publish(self): + pass + + @stage.transition(source=ReviewState.NEW, target=ReviewState.PUBLISHED) + @stage.transition(source=ReviewState.PUBLISHED, target=ReviewState.NEW, label=_('Return to new')) + def toggle(self): + pass + toggle.label = _('Toggle publication state') + + @stage.transition(source=ReviewState.PUBLISHED, target=ReviewState.REJECTED) + def trash(self): + if len(self.text) > 1000: + self.hide() + else: + self.remove() + + @stage.transition( + source=ReviewState.REJECTED, + target=ReviewState.REMOVED + ) + def remove(self): + pass + + @stage.transition( + source=ReviewState.REJECTED, + target=ReviewState.HIDDEN + ) + def hide(self): + pass + + +class Test(TestCase): # noqa: D101 + def test_no_target_transition(self): + publication = Publication(text='test') + publication.notify() + self.assertEqual(publication.stage, ReviewState.NEW) + + def test_big_publication_process(self): + publication = Publication(text='test' * 251) + self.assertEqual(publication.stage, ReviewState.NEW) + + publication.publish() + self.assertEqual(publication.stage, ReviewState.PUBLISHED) + + publication.trash() + self.assertEqual(publication.stage, ReviewState.HIDDEN) + + def test_small_publication_process(self): + publication = Publication(text='test' * 249) + self.assertEqual(publication.stage, ReviewState.NEW) + + publication.publish() + self.assertEqual(publication.stage, ReviewState.PUBLISHED) + + publication.trash() + self.assertEqual(publication.stage, ReviewState.REMOVED) + + def test_available_transitions(self): + self.assertEqual([ + (transition.target, transition.slug) + for transition in Publication.stage.get_outgoing_transitions(ReviewState.NEW) + ], [ + (None, 'notify'), + (ReviewState.PUBLISHED, 'publish'), + (ReviewState.PUBLISHED, 'toggle') + ]) + + self.assertEqual([ + (transition.target, transition.slug) + for transition in Publication.stage.get_outgoing_transitions(ReviewState.PUBLISHED) + ], [ + (ReviewState.NEW, 'toggle'), + (ReviewState.REJECTED, 'trash') + ]) + + self.assertEqual([ + (transition.target, transition.slug) + for transition in Publication.stage.get_outgoing_transitions(ReviewState.REJECTED) + ], [ + (ReviewState.HIDDEN, 'hide'), + (ReviewState.REMOVED, 'remove') + ]) diff --git a/tests/fsm/test_fsm__basics.py b/tests/fsm/test_fsm__basics.py new file mode 100644 index 0000000..3df429e --- /dev/null +++ b/tests/fsm/test_fsm__basics.py @@ -0,0 +1,97 @@ +from unittest import TestCase +from viewflow import fsm +from django.db.models import TextChoices +from django.utils.translation import gettext_lazy as _ + + +class ReviewState(TextChoices): # noqa:D100 + NEW = 'NEW', _('New') + APPROVED = 'APPROVED', _('Approved') + REJECTED = 'REJECTED', _('Rejected') + PUBLISHED = 'PUBLISHED', _('Published') + HIDDEN = 'HIDDEN', _('Hidden') + REMOVED = 'REMOVED', _('Removed') + + +class Publication(object): # noqa:D100 + stage = fsm.State(ReviewState, default=ReviewState.NEW) + + def __init__(self, text): + self.text = text + + @stage.transition( + source=ReviewState.NEW, + target=ReviewState.PUBLISHED + ) + def publish(self): + pass + + @stage.transition( + source=fsm.State.ANY, + target=ReviewState.REMOVED + ) + def remove(self): + pass + + @stage.transition( + source=ReviewState.PUBLISHED, + target=ReviewState.HIDDEN + ) + def hide(self): + pass + + +# REST +# Admin +# Viewset + + +class Test(TestCase): # noqa:D100 + def setUp(self): + self.publication = Publication(text='test publication') + self.assertEqual(self.publication.stage, ReviewState.NEW) + + def test_direct_modifications_not_allowed(self): + with self.assertRaises(AttributeError): + self.publication.stage = ReviewState.REMOVED + + def test_method_transitions_list(self): + transitions = { + (transition.source, transition.target) + for transition in self.publication.publish.get_transitions() + } + self.assertEqual({ + (ReviewState.NEW, ReviewState.PUBLISHED), + }, transitions) + + def test_field_transitions(self): + transitions = { + (transition.source, transition.target) + for transitions in Publication.stage.get_transitions().values() + for transition in transitions + } + self.assertEqual({ + (ReviewState.NEW, ReviewState.PUBLISHED), + (ReviewState.PUBLISHED, ReviewState.HIDDEN), + (fsm.State.ANY, ReviewState.REMOVED), + }, transitions) + + def test_allowed_transitions_succeed(self): + self.publication.publish() + self.assertEqual(self.publication.stage, ReviewState.PUBLISHED) + + self.publication.remove() + self.assertEqual(self.publication.stage, ReviewState.REMOVED) + + def test_prohibited_transition_failed(self): + self.assertRaises(fsm.TransitionNotAllowed, self.publication.hide) + + def test_transitions_can_proceed(self): + self.assertTrue(self.publication.publish.can_proceed()) + self.assertTrue(self.publication.remove.can_proceed()) + self.assertFalse(self.publication.hide.can_proceed()) + + self.publication.publish() + self.assertFalse(self.publication.publish.can_proceed()) + self.assertTrue(self.publication.remove.can_proceed()) + self.assertTrue(self.publication.hide.can_proceed()) diff --git a/tests/fsm/test_fsm__conditions.py b/tests/fsm/test_fsm__conditions.py new file mode 100644 index 0000000..1f05fb9 --- /dev/null +++ b/tests/fsm/test_fsm__conditions.py @@ -0,0 +1,42 @@ +"""Tests for @transition(conditions=...) parameter.""" + +from django.test import TestCase +from viewflow import this +from viewflow.fsm import State, TransitionNotAllowed +from .test_fsm__basics import ReviewState + + +class _Publication(object): + stage = State( + ReviewState, + default=ReviewState.NEW + ) + + def __init__(self, text): + self.text = text + + @stage.transition( + source=ReviewState.NEW, + target=ReviewState.PUBLISHED, + conditions=[this.is_long] + ) + def publish(self): + pass + + def is_long(self): + return State.CONDITION( + len(self.text) > 1000, + unmet="Review is too short" + ) + + +class _Test(TestCase): + def test_conditions_met(self): + publication = _Publication('test' * 251) + publication.publish() + + def test_unmet_conditions_raises_exception(self): + publication = _Publication('test' * 249) + with self.assertRaises(TransitionNotAllowed): + publication.publish() + diff --git a/tests/fsm/test_fsm__inheritance.py b/tests/fsm/test_fsm__inheritance.py new file mode 100644 index 0000000..83c7506 --- /dev/null +++ b/tests/fsm/test_fsm__inheritance.py @@ -0,0 +1,116 @@ +from django.utils.translation import gettext_lazy as _ +from unittest import TestCase +from viewflow import fsm, this +from .test_fsm__basics import Publication, ReviewState + + +class GuestPublication(Publication): + @Publication.stage.transition( + source=ReviewState.NEW, + target=ReviewState.APPROVED, + permission=this.is_approver + ) + def approve(self): + pass + + @Publication.stage.transition( + source=ReviewState.NEW, + target=ReviewState.REJECTED, + permission=this.is_approver + ) + def reject(self): + pass + + @Publication.stage.transition( + source=ReviewState.HIDDEN, + target=ReviewState.PUBLISHED, + permission=this.is_superuser + ) + @Publication.stage.transition( + source=ReviewState.APPROVED, + target=ReviewState.PUBLISHED + ) + def publish(self): + pass + + @Publication.stage.transition( + source=fsm.State.ANY, + target=ReviewState.HIDDEN, + conditions=[this.is_short] + ) + def hide(self): + pass + + @Publication.stage.super() + def remove(self): + super().remove.original() + + def is_superuser(self, user): + return user.is_superuser + + def is_approver(self, user): + return user.is_staff + is_approver.unmet_message = _("You have no staff rights") + + def is_short(self): + text_length = len(self.text) + return fsm.State.UNMET( + text_length < 1000, + _("Review is too shot, add %d symbols") % 1000 - text_length + ) + + +class Test(TestCase): + def setUp(self): + self.publication = GuestPublication(text='test publication') + self.assertEqual(self.publication.stage, ReviewState.NEW) + + def test_method_transitions_list(self): + transitions = { + (transition.source, transition.target) + for transition in self.publication.publish.get_transitions() + } + self.assertEqual({ + (ReviewState.APPROVED, ReviewState.PUBLISHED), + (ReviewState.HIDDEN, ReviewState.PUBLISHED), + }, transitions) + + def test_field_transitions(self): + transitions = { + (transition.source, transition.target) + for transitions in GuestPublication.stage.get_transitions().values() + for transition in transitions + } + self.assertEqual({ + (ReviewState.NEW, ReviewState.APPROVED), + (ReviewState.NEW, ReviewState.REJECTED), + (ReviewState.APPROVED, ReviewState.PUBLISHED), + (ReviewState.HIDDEN, ReviewState.PUBLISHED), + (fsm.State.ANY, ReviewState.HIDDEN), + (fsm.State.ANY, ReviewState.REMOVED), + }, transitions) + + def test_redefined_transition_failed(self): + self.assertRaises(fsm.TransitionNotAllowed, self.publication.publish) + + def test_approvement_workflow_succeed(self): + self.publication.approve() + self.assertEqual(self.publication.stage, ReviewState.APPROVED) + + self.publication.publish() + self.assertEqual(self.publication.stage, ReviewState.PUBLISHED) + + self.publication.remove() + self.assertEqual(self.publication.stage, ReviewState.REMOVED) + + def test_rejection_workflow_succeed(self): + self.publication.reject() + self.assertEqual(self.publication.stage, ReviewState.REJECTED) + + self.assertRaises(fsm.TransitionNotAllowed, self.publication.publish) + + def test_transition_permission(self): + pass + + def test_transition_condition(self): + pass diff --git a/tests/fsm/test_fsm__model.py b/tests/fsm/test_fsm__model.py new file mode 100644 index 0000000..1ca53ed --- /dev/null +++ b/tests/fsm/test_fsm__model.py @@ -0,0 +1,63 @@ +from django.db import models +from django.test import TestCase +from viewflow import fsm +from .test_fsm__basics import ReviewState + + +class Report(models.Model): + text = models.TextField() + stage = models.CharField(max_length=150) + + +class ReportReview(object): + stage = fsm.State(ReviewState, default=ReviewState.NEW) + + def __init__(self, report): + self.report = report + + @stage.setter() + def _set_report_stage(self, value): + self.report.stage = value + + @stage.getter() + def _get_report_stage(self): + return self.report.stage + + @stage.on_success() + def _on_transition_success(self, descriptor, source, target): + self.report.save() + + @stage.transition( + source=ReviewState.NEW, + target=ReviewState.PUBLISHED + ) + def publish(self): + pass + + @stage.transition( + source=fsm.State.ANY, + target=ReviewState.REMOVED + ) + def remove(self): + pass + + +class Test(TestCase): + def test_default_state_of_new_object(self): + report = Report(text="test") + review = ReportReview(report) + self.assertEqual(review.stage, ReviewState.NEW) + + def test_state_of_existing_object(self): + report = Report.objects.create(stage=ReviewState.PUBLISHED, text="test") + review = ReportReview(report) + self.assertEqual(review.stage, ReviewState.PUBLISHED) + + def test_state_success_called(self): + report = Report(text="test") + review = ReportReview(report) + review.publish() + + self.assertEqual(review.stage, ReviewState.PUBLISHED) + self.assertEqual(report.stage, ReviewState.PUBLISHED) + self.assertTrue(report.pk is not None) diff --git a/tests/fsm/test_fsm__permissions.py b/tests/fsm/test_fsm__permissions.py new file mode 100644 index 0000000..3c901d6 --- /dev/null +++ b/tests/fsm/test_fsm__permissions.py @@ -0,0 +1,46 @@ +"""Test for permission check.""" + +from django.contrib.auth.models import User +from django.test import TestCase +from viewflow import this +from viewflow.fsm import State +from .test_fsm__basics import ReviewState + + +class _Publication(object): + stage = State(ReviewState, default=ReviewState.NEW) + + @stage.transition( + source=ReviewState.NEW, + target=ReviewState.REMOVED, + permission=this.can_remove_review + ) + def remove(self): + pass + + def can_remove_review(self, user): + return State.CONDITION(user.is_staff, unmet="Only staff users can delete reviews") + + +class _Test(TestCase): + def setUp(self): + self.privileged_user = User.objects.create( + username='privileged', + is_staff=True + ) + + self.unprivileged_user = User.objects.create( + username='unprivileged', + is_staff=False + ) + + def test_callable_permission(self): + publication = _Publication() + + self.assertTrue( + publication.remove.has_perm(self.privileged_user) + ) + + self.assertFalse( + publication.remove.has_perm(self.unprivileged_user) + ) diff --git a/tests/test_middleware.py b/tests/test_middleware.py new file mode 100644 index 0000000..9152eb1 --- /dev/null +++ b/tests/test_middleware.py @@ -0,0 +1,50 @@ +from django.test import TestCase, override_settings +from django.urls import path +from django.views.generic import TemplateView +from viewflow.urls import AppMenuMixin, Application, Site, Viewset, route + + +class NestedViewset(Viewset): + app_name = 'nested' + page_url = path('page/', TemplateView.as_view(template_name='viewflow/base.html'), name="page") + + +class CityViewset(AppMenuMixin, Viewset): + index_url = path('', TemplateView.as_view(template_name='viewflow/base.html'), name="index") + nested_url = route('nested', NestedViewset()) + + +site = Site(title="Test site", items=[ + Application(app_name="test", items=[ + CityViewset(), + ]) +]) + +urlpatterns = [ + path('', site.urls) +] + + +@override_settings(ROOT_URLCONF=__name__) +class Test(TestCase): + def test_context_injected(self): + response = self.client.get('/test/city/') + match = response.wsgi_request.resolver_match + + self.assertTrue(hasattr(match, 'site')) + self.assertEqual(match.site, site) + + self.assertTrue(hasattr(match, 'app')) + self.assertEqual(match.app, site._children[0]) + + # App -> Site -> CityViewset + self.assertTrue(hasattr(match, 'viewset')) + self.assertEqual(match.viewset, site._children[0]._children[0]) + + def test_nested_viewset_injected(self): + response = self.client.get('/test/city/nested/page/') + match = response.wsgi_request.resolver_match + + # App -> Site -> CityViewset -> Nested + self.assertTrue(hasattr(match, 'viewset')) + self.assertEqual(match.viewset, site._children[0]._children[0].nested_url.viewset) diff --git a/tests/test_this_object.py b/tests/test_this_object.py new file mode 100644 index 0000000..d0bf5a5 --- /dev/null +++ b/tests/test_this_object.py @@ -0,0 +1,47 @@ +import functools +from django.test import TestCase +from viewflow import this + + +class Review(object): + approver = 'Will Smith' + publisher = 'John Doe' + + def approve(self): + return 'approve' + + def publish(self): + return 'publish' + + def _this_owner(self, transition): + if transition == self.approve: + return self.approver + elif transition == self.publish: + return self.publisher + else: + raise ValueError(f"Can't find owner for {transition}") + + def _this_call(self, transition): + def call_transition(self, transition): + return transition() + return functools.partial(call_transition, self, transition) + + + +class Test(TestCase): + def test_this_refs_data(self): + self.assertEqual(this.some_name.name, 'some_name') + self.assertEqual(this.another_some_name.name, 'another_some_name') + + def test_this_ref_resolve(self): + review = Review() + + approve = this.approve.resolve(review) + self.assertEqual(approve, review.approve) + self.assertEqual(Review.approver, this.approve.owner.resolve(review)) + self.assertEqual(this.approve.call.resolve(review)(), 'approve') + + publish = this.publish.resolve(review) + self.assertEqual(publish, review.publish) + self.assertEqual(Review.publisher, this.publish.owner.resolve(review)) + self.assertEqual(this.publish.call.resolve(review)(), 'publish') diff --git a/tests/test_utils__viewprop.py b/tests/test_utils__viewprop.py new file mode 100644 index 0000000..bf64ea7 --- /dev/null +++ b/tests/test_utils__viewprop.py @@ -0,0 +1,22 @@ +from django.test import TestCase +from viewflow import viewprop + + +class Viewset(object): # noqa : D101 + def __init__(self, view=None): + if view is not None: + self.view = view + + @viewprop + def view(self): + return 'default' + + +class Test(TestCase): # noqa: D101 + def test_default_viewprop_value(self): + viewset = Viewset() + self.assertEqual(viewset.view, 'default') + + def test_redefine_viewprop_succeed(self): + viewset = Viewset(view='new value') + self.assertEqual(viewset.view, 'new value') diff --git a/viewflow/__init__.py b/viewflow/__init__.py index e69de29..e641115 100644 --- a/viewflow/__init__.py +++ b/viewflow/__init__.py @@ -0,0 +1,26 @@ +"""Viewflow - dev toolkit for backoffice automation.""" +from django.conf import settings as django_settings +from .conf import settings +from .this_object import this +from .utils import viewprop, Icon, DEFAULT + +__title__ = 'Django-Viewflow' +__version__ = '2.0a1' +__author__ = 'Mikhail Podgurskiy' +__license__ = 'AGPL' +__copyright__ = 'Copyright 2018 Mikhail Podgurskiy' + +__all__ = ( + 'this', 'viewprop', 'Icon', 'DEFAULT' +) + +default_app_config = 'viewflow.apps.ViewflowConfig' + +if settings.AUTOREGISTER: + # Register site middleware + site_middleware = 'viewflow.middleware.SiteMiddleware' + if site_middleware not in django_settings.MIDDLEWARE: + django_settings.MIDDLEWARE += (site_middleware, ) + turbolinks_middleware = 'viewflow.middleware.TurbolinksMiddleware' + if turbolinks_middleware not in django_settings.MIDDLEWARE: + django_settings.MIDDLEWARE += (turbolinks_middleware, ) diff --git a/viewflow/apps.py b/viewflow/apps.py new file mode 100644 index 0000000..b03d9b8 --- /dev/null +++ b/viewflow/apps.py @@ -0,0 +1,8 @@ +from django.apps import AppConfig + + +class ViewflowConfig(AppConfig): + """Default application config.""" + + name = 'viewflow' + label = 'viewflow_base' # allow to user 'viewflow' label for 'viewflow.workflow' diff --git a/viewflow/conf.py b/viewflow/conf.py new file mode 100644 index 0000000..25a3c6d --- /dev/null +++ b/viewflow/conf.py @@ -0,0 +1,53 @@ +""" +Settings for Viewflow are all namespaced in the VIEWFLOW setting. +For example your project's `settings.py` file might look like this: + +VIEWFLOW = { + 'WIDGET_RENDERERS': { + 'django.forms.DateTimeInput': 'myapp.renderers.MyDateTimeRenderer' + } +} +""" + +from copy import deepcopy + +from django.conf import settings as django_settings +from django.test.signals import setting_changed +from django.utils.module_loading import import_string + +from viewflow.forms import renderers + + +DEFAULTS = { + 'AUTOREGISTER': True, + 'WIDGET_RENDERERS': renderers.WIDGET_RENDERERS, +} + + +class Settings(object): + def __init__(self, custom=None): + if custom is None: + custom = getattr(django_settings, 'VIEWFLOW', {}) + self.settings = deepcopy(DEFAULTS) + + for key, value in custom.get('WIDGET_RENDERERS', {}): + widget_class, renderer_class = import_string(key), import_string(value) + self.settings['WIDGET_RENDERERS'][widget_class] = renderer_class() + + def __getattr__(self, attr): + if attr not in self.settings: + raise AttributeError("Invalid viewflow setting: '%s'" % attr) + return self.settings[attr] + + +settings = Settings() + + +def reload_settings(*args, **kwargs): + global settings + setting, value = kwargs['setting'], kwargs['value'] + if setting == 'VIEWFLOW': + settings = Settings(value) + + +setting_changed.connect(reload_settings) diff --git a/viewflow/forms/__init__.py b/viewflow/forms/__init__.py new file mode 100644 index 0000000..0c80bd3 --- /dev/null +++ b/viewflow/forms/__init__.py @@ -0,0 +1,5 @@ +from .renderers import Column, Fieldset, Layout, FormLayout, LayoutNode, Row, Stacked, Span + +__all__ = ( + 'Column', 'Fieldset', 'FormLayout', 'Layout', 'LayoutNode', 'Row', 'Stacked', 'Span', +) diff --git a/viewflow/forms/renderers.py b/viewflow/forms/renderers.py new file mode 100644 index 0000000..4b63558 --- /dev/null +++ b/viewflow/forms/renderers.py @@ -0,0 +1,384 @@ +import json +import re +from typing import List, Type, Union +from functools import lru_cache +from xml.etree import ElementTree + +from django import forms +from django.core.serializers.json import DjangoJSONEncoder +from django.utils.encoding import force_str +from django.utils.formats import get_format +from django.utils.text import format_lazy +from django.utils.translation import gettext_lazy as _ + +from viewflow.utils import MARKER + + +AUTO = MARKER('AUTO') + + +class WidgetRenderer(object): + tag = 'vf-field-input' + + def __init__(self, root: ElementTree.Element, bound_field: forms.BoundField): + self.root = root + self.bound_field = bound_field + + def format_value(self, value): + if isinstance(value, (tuple, list)): + return ','.join(f"{item}" for item in value) + return f"{value}" + + def create_root(self, context): + attrs = { + key: str(value) if value is not True else key + for key, value in context['widget']['attrs'].items() + if value is not False + } + attrs['name'] = context['widget']['name'] + value = context['widget']['value'] + if value is not None: + attrs['value'] = self.format_value(value) + if self.bound_field.label: + attrs['label'] = self.bound_field.label + if self.bound_field.errors: + attrs['error'] = self.bound_field.errors[0] + if self.bound_field.help_text: + attrs['help-text'] = self.bound_field.help_text + return ElementTree.SubElement(self.root, self.tag, **attrs) + + def render(self, template_name, context, request=None): + return self.create_root(context) + + @staticmethod + @lru_cache(maxsize=None) + def get_renderer(widget: forms.Widget) -> 'Type[WidgetRenderer]': + from viewflow.conf import settings + for widget_class in type(widget).mro()[:-2]: + try: + return settings.WIDGET_RENDERERS[widget_class] + except KeyError: + continue + + return InputRenderer + + +class CheckboxRenderer(WidgetRenderer): + tag = 'vf-field-checkbox' + + +class InputRenderer(WidgetRenderer): + def create_root(self, context): + root = super().create_root(context) + root.attrib['type'] = context['widget']['type'] + return root + + +class HiddenInputRenderer(WidgetRenderer): + tag = 'input' + + def create_root(self, context): + root = super().create_root(context) + root.attrib['type'] = 'hidden' + return root + + +class PasswordInputRenderer(InputRenderer): + tag = 'vf-field-password' + + +class FormLayout: + """Default form layout.""" + def append_non_field_errors(self, form: forms.Form, root: ElementTree.Element): + errors = form.non_field_errors() + errors.extend( + form.error_class(bound_field.errors) + for bound_field in form.hidden_fields() + if bound_field.errors + ) + + if errors: + wrapper = ElementTree.SubElement(root, 'div', { + 'class': 'vf-form__errors' + }) + for error in errors: + child = ElementTree.SubElement(wrapper, 'div', { + 'class': 'vf-form__error', + }) + child.text = str(error) + + def append_hidden_fields(self, form: forms.Form, root: ElementTree.Element): + hidden_fields = form.hidden_fields() + if hidden_fields: + wrapper = ElementTree.SubElement(root, 'div', { + 'class': 'vf-form__hiddenfields' + }) + for bound_field in hidden_fields: + self.append_field(wrapper, bound_field, HiddenInputRenderer) + + def append_visible_fields(self, form: forms.Form, root: ElementTree.Element): + visible_fields = form.visible_fields() + if visible_fields: + wrapper = ElementTree.SubElement(root, 'div', { + 'class': 'vf-form__visiblefields mdc-layout-grid__inner' + }) + for bound_field in form.visible_fields(): + container = ElementTree.SubElement(wrapper, 'div', { + 'class': 'mdc-layout-grid__cell mdc-layout-grid__cell--span-12' + }) + self.append_field(container, bound_field) + + def append_field(self, root: ElementTree.Element, bound_field: forms.BoundField, renderer_class=None): + if renderer_class is None: + renderer_class = WidgetRenderer.get_renderer(bound_field.field.widget) + + widget = bound_field.field.widget + if bound_field.field.localize: + widget.is_localized = True + attrs = bound_field.build_widget_attrs({}, widget) + if bound_field.auto_id and 'id' not in widget.attrs: + attrs.setdefault('id', bound_field.auto_id) + + widget.render( + name=bound_field.html_name, + value=bound_field.value(), + attrs=attrs, + renderer=renderer_class(root, bound_field), + ) + + def render_form(self, form: forms.Form) -> ElementTree.Element: + root = ElementTree.Element('div', { + 'class': 'vf-form mdc-layout-grid' + }) + self.append_non_field_errors(form, root) + self.append_hidden_fields(form, root) + self.append_visible_fields(form, root) + return root + + def render(self, form: forms.Form, default_layout: 'FormLayout' = None): + return ElementTree.tostring( + self.render_form(form), + encoding='unicode', + method='html', + ) + + +class Layout(FormLayout): + """Customizable form layout.""" + def __init__(self, *elements, **kwargs): + self.children = _convert_to_children(elements) + super().__init__(**kwargs) + + def append_visible_fields(self, form: forms.Form, root: ElementTree.Element): + wrapper = ElementTree.SubElement(root, 'div', { + 'class': 'vf-form__visiblefields mdc-layout-grid__inner' + }) + + Column(*self.children).append(self, form, wrapper) + + +class LayoutNode(object): + """Base class for self-rendered nodes.""" + + def __init__(self, desktop=AUTO, tablet=AUTO, mobile=AUTO): + assert desktop == AUTO or 1 <= desktop <= 12 + self.desktop = desktop + + assert tablet == AUTO or 1 <= tablet <= 8 + self.tablet = tablet + + assert mobile == AUTO or 1 <= mobile <= 4 + self.mobile = mobile + + def append(self, layout: FormLayout, form: forms.Form, root: ElementTree.Element): + raise NotImplementedError("Subclass should override this") + + +class Column(LayoutNode): + """Place elements vertically stacked, one under another. + + Example: + layout = Layout( + Row( + Column('first_name', 'last_name', desktop=8, tablet=6) + 'sex_options' + ) + ) + """ + + def __init__(self, *elements, **kwargs): + self.id_ = kwargs.pop('id_', None) + self.children = _convert_to_children(elements) + super().__init__(**kwargs) + + def append(self, layout: FormLayout, form: forms.Form, root: ElementTree.Element): + if self.children: + wrapper = ElementTree.SubElement(root, 'div', { + 'class': 'vf-form-column mdc-layout-grid__cell mdc-layout-grid__cell--span-12' + }) + if self.id_: + wrapper.attrib['id'] = self.id_ + for child in self.children: + child.append(layout, form, wrapper) + + +class Row(LayoutNode): + """Spread elements over a single line. + + Example: + + layout = Layout( + Row( + 'first_name', + Row('last_name', 'sex', tablet=5) + ) + ) + """ + + def __init__(self, *elements, **kwargs): + self.id_ = kwargs.pop('id_', None) + self.children = _convert_to_children(elements) + super().__init__(**kwargs) + + def append(self, layout: FormLayout, form: forms.Form, root: ElementTree.Element): + desktop = _children_sizes( + [child.desktop for child in self.children], grid_size=12, + grid_name='desktop', keep_in_row=True + ) + tablet = _children_sizes( + [child.tablet for child in self.children], grid_size=8, + grid_name='tablet', keep_in_row=False + ) + mobile = _children_sizes( + [child.mobile for child in self.children], grid_size=4, + grid_name='mobile', keep_in_row=False + ) + + def append_child(wrapper, child, desktop: int, tablet: int, mobile: int): + classes = ' '.join([ + "mdc-layout-grid__cell", + f"mdc-layout-grid__cell--span-{desktop}-desktop", + f"mdc-layout-grid__cell--span-{tablet}-tablet", + f"mdc-layout-grid__cell--span-{mobile}-mobile" + ]) + + element = ElementTree.SubElement(wrapper, 'div', { + 'class': classes, + }) + child.append(layout, form, element) + + wrapper = ElementTree.SubElement(root, 'div', { + 'class': "vf-form-row mdc-layout-grid__inner", + }) + if self.id_: + wrapper.attrib['id'] = self.id_ + for child_data in zip(self.children, desktop, tablet, mobile): + append_child(wrapper, *child_data) + + +class Span(LayoutNode): + """Span a form field over several columns. + + Example:: + layout = Layout( + Row(Span('first_name'), Span('last_name')) + Row( + Span('email', tablet=6, mobile=3), + 'sex' + ) + ) + + By default span is auto-sized. On a desktop all auto-sized elements + would be spread equally over the free place of a row, non occupied by + elements with specific sizes. + + On mobile and tablet if all elements in a row have auto-sizes, + each element would be placed in a new line. If even one element + in a row has a specific size, all auto-sized elements would be + kept in a single line, like on a desktop. + + """ + def __init__(self, field_name, **kwargs): + self.field_name = field_name + super().__init__(**kwargs) + + def __str__(self): + return f'Field {self.field_name} <{self.desktop}, {self.tablet}, {self.mobile}>' + + def append(self, layout: FormLayout, form: forms.Form, root: ElementTree.Element): + try: + bound_field = form[self.field_name] + except KeyError as exc: + raise ValueError( + f'{self.field_name} field not found in the {type(form).__name__}' + ) from exc + + layout.append_field(root, bound_field) + + +class Fieldset(Column): + def __init__(self, title, *elements, **kwargs): + self.title = title + super().__init__(*elements, **kwargs) + + def append(self, layout: FormLayout, form: forms.Form, root: ElementTree.Element): + wrapper = ElementTree.SubElement(root, 'div', { + 'class': "vf-form__formset", + }) + title = ElementTree.SubElement(wrapper, 'h3', { + 'class': "mdc-typography--subheading2 vf-form__formset-header", + }) + title.text = force_str(self.title) + super().append(layout, form, wrapper) + + +class Stacked(LayoutNode): + """Render stacked inline.""" + def __init__(self, *elements, **kwargs): + self.id_ = kwargs.pop('id_', None) + self.children = _convert_to_children(elements) + super().__init__(**kwargs) + + def append(self, layout: FormLayout, form: forms.Form, root: ElementTree.Element): + """ TODO implement stacked render.""" + # return Div(class_="vf-form-column mdc-layout-grid__cell mdc-layout-grid__cell--span-12", id_=self.id_) / [ + # child.render(form) for child in self.children + # ] + + +def _convert_to_children(elements: List[Union[LayoutNode, str]]): + result = [] + for element in elements: + if isinstance(element, LayoutNode): + result.append(element) + elif isinstance(element, str): + result.append(Span(element)) + else: + raise ValueError(f"Unknown element {element} type {type(element)}") + return result + + +def _children_sizes(spans, grid_size=12, grid_name='desktop', keep_in_row=True): + bound = sum(span for span in spans if span != AUTO) + auto_count = sum(1 for span in spans if span == AUTO) + + if bound == 0 and not keep_in_row: + # If children AUTO-sized - put every child on the own row + return [grid_size for _ in spans] + else: + rest = grid_size - bound + if rest < 0 or (auto_count != 0 and grid_size % auto_count) != 0: + raise ValueError( + f"Can't equally spread {spans} over {grid_size} columns on a {grid_name} grid" + ) + return [ + rest // auto_count if child == AUTO else child + for child in spans + ] + + +WIDGET_RENDERERS = { + forms.CheckboxInput: CheckboxRenderer, + forms.HiddenInput: HiddenInputRenderer, + forms.PasswordInput: PasswordInputRenderer, +} diff --git a/viewflow/fsm/__init__.py b/viewflow/fsm/__init__.py new file mode 100644 index 0000000..b190835 --- /dev/null +++ b/viewflow/fsm/__init__.py @@ -0,0 +1,9 @@ +"""Finite state machine workflow.""" + +from .base import ( + TransitionNotAllowed, State +) + +__all__ = ( + 'TransitionNotAllowed', 'State' +) diff --git a/viewflow/fsm/base.py b/viewflow/fsm/base.py new file mode 100644 index 0000000..df328cb --- /dev/null +++ b/viewflow/fsm/base.py @@ -0,0 +1,386 @@ +"""Base FSM declarations.""" + +from __future__ import annotations + +import inspect +from typing import Callable, List, Type +from viewflow.this_object import ThisObject +from viewflow.utils import MARKER +from .typing import Condition, StateTransitions + + +class TransitionNotAllowed(Exception): + """Raised when a transition is not allowed.""" + + +class Transition(object): + """State transition definition.""" + + def __init__(self, func, source, target, label=None, conditions=None, permission=None): # noqa D102 + self.func = func + self.source = source + self.target = target + self._label = label + self.permission = permission + self.conditions: List[Condition] = conditions if conditions else [] + + @property + def label(self): + """Transition human-readable label.""" + if self._label: + return self._label + else: + try: + return self.func.label + except AttributeError: + return self.func.__name__.title() + + @property + def slug(self): + return self.func.__name__ + + def conditions_met(self, instance) -> bool: + """Check that all associated conditions is True.""" + conditions = [ + condition.resolve(instance.__class__) if isinstance(condition, ThisObject) else condition + for condition in self.conditions + ] + return all(map(lambda condition: condition(instance), conditions)) + + def has_perm(self, instance, user) -> bool: + """Check the permission of the transition.""" + if self.permission is None: + return True + elif callable(self.permission): + return self.permission(instance, user) + elif isinstance(self.permission, ThisObject): + permission = self.permission.resolve(instance) + return permission(user) + else: + raise ValueError(f"Unknown permission type {type(self.permission)}") + + +class TransitionMethod(object): + """Unbound transition method wrapper. + + Provides shortcut to enumerate all method transitions, ex:: + + Review.publish.get_transitions() + """ + + do_not_call_in_templates = True + + def __init__(self, state: State, func: Callable, descriptor: TransitionDescriptor, owner: Type): + self._state = state + self._func = func + self._descriptor = descriptor + self._owner = owner + + self.__doc__ = func.__doc__ + + def get_transitions(self) -> List[Transition]: + return self._descriptor.get_transitions() + + @property + def slug(self): + """Transition name.""" + return self.func.__name__ + + +class TransitionBoundMethod(object): + """Instance method wrapper that performs the transition.""" + + do_not_call_in_templates = True + + class Wrapper(object): + """Wrapper context object, to simplify __call__ method debug""" + def __init__(self, parent: 'TransitionBoundMethod', kwargs): + self.parent = parent + self.caller_kwargs = kwargs + self.initial_state = None + self.target_state = None + + def __enter__(self): + self.initial_state = self.parent._state.get(self.parent._instance) + transition = self.parent._descriptor.get_transition(self.initial_state) + + if transition is None: + raise TransitionNotAllowed(f'{self.parent.label} :: no transition from "{self.initial_state}"') + + if not transition.conditions_met(self.parent._instance): + raise TransitionNotAllowed( + f" '{transition.label}' transition conditions have not been met" + ) + + self.target_state = transition.target + if self.target_state: + self.parent._state.set(self.parent._instance, self.target_state) + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + self.parent._state.set(self.parent._instance, self.initial_state) + else: + self.parent._state.transition_succeed( + self.parent._instance, self.parent, + self.initial_state, self.target_state, + **self.caller_kwargs + ) + + def __init__(self, state, func, descriptor, instance): + self._state: State = state + self._func: Callable = func + self._descriptor: TransitionDescriptor = descriptor + self._instance = instance + + def original(self, *args, **kwargs): + """Call the unwrapped class method.""" + return self._func(self._instance, *args, **kwargs) + + def can_proceed(self, check_conditions=True): + """Check is transition available.""" + current_state = self._state.get(self._instance) + transition = self._descriptor.get_transition(current_state) + if transition and check_conditions: + return transition.conditions_met(self._instance) + return False + + def has_perm(self, user): + current_state = self._state.get(self._instance) + transition = self._descriptor.get_transition(current_state) + if transition: + return transition.has_perm(self._instance, user) + return False + + @property + def label(self): + """Transition human-readable label.""" + current_state = self._state.get(self._instance) + transition = self._descriptor.get_transition(current_state) + if transition: + return transition.label + else: + try: + return self._func.label + except AttributeError: + return self._func.__name__.title() + + def __call__(self, *args, **kwargs): + with TransitionBoundMethod.Wrapper(self, kwargs=kwargs): + return self._func(self._instance, *args, **kwargs) + + def get_transitions(self): + return self._descriptor.get_transitions() + + +class TransitionDescriptor(object): + """Base transition definition descriptor.""" + + do_not_call_in_templates = True + + def __init__(self, state, func): # noqa D102 + self._state = state + self._func = func + self._transitions = {} + + def __get__(self, instance, owner=None): + if instance: + return TransitionBoundMethod(self._state, self._func, self, instance) + else: + return TransitionMethod(self._state, self._func, self, owner) + + def add_transition(self, transition): + self._transitions[transition.source] = transition + + def get_transitions(self) -> List[Transition]: + """List of all transitions.""" + return self._transitions.values() + + def get_transition(self, source_state): + """Get a transition of a source_state. + + Returns None if there is no outgoing transitions. + """ + transition = self._transitions.get(source_state, None) + if transition is None: + transition = self._transitions.get(State.ANY, None) + return transition + + +class SuperTransitionDescriptor(object): + do_not_call_in_templates = True + + def __init__(self, state, func): # noqa D102 + self._state = state + self._func = func + + def __get__(self, instance, owner=None): + if instance: + return TransitionBoundMethod(self._state, self._func, self.get_descriptor(instance.__class__), instance) + else: + return TransitionMethod(self._state, self._func, self.get_descriptor(owner), owner) + + def get_descriptor(self, owner) -> TransitionDescriptor: + """Lookup for the transition descriptor in the base classes.""" + for cls in owner.__mro__[1:]: + if hasattr(cls, self._func.__name__): + super_method = getattr(cls, self._func.__name__) + if isinstance(super_method, TransitionMethod): + break + else: + raise ValueError('Base transition not found') + + return super_method._descriptor + + +class StateDescriptor(object): + """Class-bound value for a state descriptor. + + Provides shortcut to enumerate all class transitions, ex:: + + Review.state.get_transitions() + """ + + def __init__(self, state: 'State', owner: type): + self._state = state + self._owner = owner + + def __getattr__(self, attr): + return getattr(self._state, attr) + + def get_transitions(self) -> StateTransitions: + propname = '__fsm_{}_transitions'.format(self._state.propname) + transitions = self._owner.__dict__.get(propname, None) + if transitions is None: + transitions = {} + + methods = inspect.getmembers( + self._owner, + lambda attr: isinstance(attr, TransitionMethod) + ) + transitions = { + method: method.get_transitions() + for _, method in methods + } + setattr(self._owner, propname, transitions) + + return transitions + + def get_outgoing_transitions(self, state) -> List[Transition]: + return [ + transition + for transitions in self.get_transitions().values() + for transition in transitions + if transition.source == state or (transition.source == State.ANY and transition.target != state) + ] + + +class State(object): + """State slot field.""" + + ANY = MARKER('ANY') + + def __init__(self, states, default=None): + self._default = default + self._setter = None + self._getter = None + self._on_success = None + + def __get__(self, instance, owner=None): + if instance: + return self.get(instance) + return StateDescriptor(self, owner) + + def __set__(self, instance, value): + raise AttributeError('Direct state modification is not allowed') + + def get(self, instance): + """Get the state from the underline class instance.""" + if self._getter: + value = self._getter(instance) + if self._default: + return value if value else self._default + else: + return value + return getattr(instance, self.propname, self._default) + + def set(self, instance, value): + """Get the state of the underline class instance.""" + if self._setter: + self._setter(instance, value) + else: + setattr(instance, self.propname, value) + + def transition_succeed(self, instance, descriptor, source, target, **kwargs): + if self._on_success: + self._on_success(instance, descriptor, source, target, **kwargs) + + @property + def propname(self): + """State storage attribute.""" + return '__fsm{}'.format(id(self)) + + def transition( + self, source=None, target=None, label=None, + conditions=None, permission=None, + ): + """Transition method decorator.""" + def _wrapper(func): + if isinstance(func, TransitionDescriptor): + descriptor = func + else: + descriptor = TransitionDescriptor(self, func) + + source_list = source + if not isinstance(source, (list, tuple, set)): + source_list = [source] + + for src in source_list: + transition = Transition( + func=descriptor._func, + source=src, + target=target, + label=label, + conditions=conditions, + permission=permission + ) + descriptor.add_transition(transition) + + return descriptor + return _wrapper + + def super(self): + def _wrapper(func): + return SuperTransitionDescriptor(self, func) + return _wrapper + + def setter(self): + def _wrapper(func): + self._setter = func + return func + return _wrapper + + def getter(self): + def _wrapper(func): + self._getter = func + return func + return _wrapper + + def on_success(self): + def _wrapper(func): + self._on_success = func + return func + return _wrapper + + class CONDITION(object): + """Boolean-like object to return value accompanied with a messsage from fsm conditions.""" + + def __init__(self, is_true: bool, unmet: str = ""): + self.is_true = is_true + self.unmet = unmet + + @property + def message(self): + return self.message if self.unmet else '' + + def __bool__(self): + return self.is_true diff --git a/viewflow/fsm/typing.py b/viewflow/fsm/typing.py new file mode 100644 index 0000000..5f3ce8f --- /dev/null +++ b/viewflow/fsm/typing.py @@ -0,0 +1,8 @@ +from typing import Callable, Any, List, Mapping, TYPE_CHECKING + +if TYPE_CHECKING: + from .base import TransitionMethod, Transition # NOQA + +StateValue = Any +Condition = Callable[[Any], bool] +StateTransitions = Mapping['TransitionMethod', List['Transition']] diff --git a/viewflow/middleware.py b/viewflow/middleware.py new file mode 100644 index 0000000..6be56db --- /dev/null +++ b/viewflow/middleware.py @@ -0,0 +1,82 @@ +from django.core.exceptions import PermissionDenied + + +class SiteMiddleware(object): + """ + Set `site` and `app` attributes on request.resolver_match object. + """ + def __init__(self, get_response): + self.get_response = get_response + + def __call__(self, request): + return self.get_response(request) + + def process_view(self, request, callback, callback_args, callback_kwargs): + if not hasattr(request, 'user'): + raise ValueError( + 'No `request.user` found. `django.contrib.auth.context_processors.auth` ' + 'missing or `material.middleware.site included before it.` ' + 'You need to add auth middleware or change middlewares order.') + match = request.resolver_match + if match: + extra = getattr(match.url_name, 'extra', {}) + site, app, viewset = extra.get('site'), extra.get('app'), extra.get('viewset') + + if site: + if not site.has_perm(request.user): + raise PermissionDenied + + if app: + if not app.has_perm(request.user): + raise PermissionDenied + + if viewset: + if hasattr(viewset, 'has_perm') and not viewset.has_perm(request.user): + raise PermissionDenied + + for name, value in extra.items(): + setattr(request.resolver_match, name, value) + + return None + + def process_template_response(self, request, response): + app = getattr(request.resolver_match, 'app', None) + if app: + app_context = app.get_context_data(request) + for key, value in app_context.items(): + if key in response.context_data: + raise ValueError(f'App context key {key} clashes with view response context') + else: + response.context_data[key] = value + return response + + +class TurbolinksMiddleware(object): + """ + Send the `Turbolinks-Location` header in response to a visit that was redirected, + and Turbolinks will replace the browser’s topmost history entry . + """ + + def __init__(self, get_response): + self.get_response = get_response + + def __call__(self, request): + response = self.get_response(request) + + is_turbolinks = request.META.get('HTTP_TURBOLINKS_REFERRER') + is_response_redirect = response.has_header('Location') + + if is_turbolinks: + if is_response_redirect: + location = response['Location'] + prev_location = request.session.pop('_turbolinks_redirect_to', None) + if prev_location is not None: + # relative subsequent redirect + if location.startswith('.'): + location = prev_location.split('?')[0] + location + request.session['_turbolinks_redirect_to'] = location + else: + if request.session.get('_turbolinks_redirect_to'): + location = request.session.pop('_turbolinks_redirect_to') + response['Turbolinks-Location'] = location + return response diff --git a/viewflow/templatetags/viewflow.py b/viewflow/templatetags/viewflow.py index 5a30e9c..e2f4860 100644 --- a/viewflow/templatetags/viewflow.py +++ b/viewflow/templatetags/viewflow.py @@ -7,6 +7,7 @@ from django.utils.encoding import force_text from django.utils.html import conditional_escape from viewflow.contrib import auth +from viewflow.forms import FormLayout from viewflow.urls import Site, Viewset @@ -99,6 +100,46 @@ class ViewsetURLNode(template.Node): return url +@register.tag('render') +class FormNode(template.Node): + """ + Render a django form using google material-components-web library. + + Example: + + {% render_form form [layout] %} + """ + default_layout = FormLayout() + + def __init__(self, parser, token): + bits = token.split_contents() + + layout_expr = None + if len(bits) == 2: + tag, form_expr = bits + elif len(bits) == 3: + tag, form_expr, layout_expr = bits + else: + raise template.TemplateSyntaxError( + "Invalid syntax in material tag, expects only form and optional layout arguments.") + + self.form_expr = parser.compile_filter(form_expr) + self.layout_expr = parser.compile_filter(layout_expr) if layout_expr else None + + def render(self, context): + form = self.form_expr.resolve(context) + if not isinstance(form, forms.BaseForm): + raise template.TemplateSyntaxError("material tag first argument must be a form") + + layout = None + if self.layout_expr: + layout = self.layout_expr.resolve(context) + if layout and not isinstance(layout, FormLayout): + raise template.TemplateSyntaxError("material tag second argument must be a layout") + + return layout.render(form) if layout else self.default_layout.render(form) + + @register.tag('get_absolute_url') class AbsoluteURLNode(template.Node): """ diff --git a/viewflow/this_object.py b/viewflow/this_object.py new file mode 100644 index 0000000..7b7b250 --- /dev/null +++ b/viewflow/this_object.py @@ -0,0 +1,49 @@ +"""Forward-reference for class-body declarations""" + + +class ThisMethod(object): + """Reference to a method""" + def __init__(self, propname, methodname): + self._propname = propname + self._methodname = methodname + + def resolve(self, instance): + # TODO meaningfull exception + prop = getattr(instance, self._propname) + method = getattr(instance, f'_this_{self._methodname}') + return method(prop) + + +class ThisObject(object): + """Helper for forward references""" + + def __init__(self, name): # noqa D102 + self.name = name + + def resolve(self, instance): + # TODO meaningfull exception + return getattr(instance, self.name) + + def __getattr__(self, name): + return ThisMethod(self.name, name) + + +class This(object): + """Helper for building forward references. + + The rationale is ability to specify references to the class + attributes and methods before they are declared. + + `this` is like a `self` but for the class body. + """ + def resolve(self, instance, this_ref): + if isinstance(this_ref, (ThisObject, ThisMethod)): + return this_ref.resolve(instance) + else: + return this_ref + + def __getattr__(self, name): + return ThisObject(name) + + +this = This()