Opensouse forms and fsm

This commit is contained in:
Mikhail Podgurskiy
2020-11-25 07:33:03 +06:00
parent c76aba2cd9
commit b021b21d88
21 changed files with 1641 additions and 0 deletions

0
tests/fsm/__init__.py Normal file
View File

View File

@@ -0,0 +1,107 @@
"""FSM tests for advanced source/target options"""
from django.test import TestCase
from django.utils.translation import gettext_lazy as _
from viewflow.fsm import State
from .test_fsm__basics import ReviewState
class Publication(object):
stage = State(
ReviewState,
default=ReviewState.NEW
)
def __init__(self, text):
self.text = text
@stage.transition(source=ReviewState.NEW)
def notify(self):
pass
@stage.transition(
source={ReviewState.NEW, ReviewState.HIDDEN},
target=ReviewState.PUBLISHED
)
def publish(self):
pass
@stage.transition(source=ReviewState.NEW, target=ReviewState.PUBLISHED)
@stage.transition(source=ReviewState.PUBLISHED, target=ReviewState.NEW, label=_('Return to new'))
def toggle(self):
pass
toggle.label = _('Toggle publication state')
@stage.transition(source=ReviewState.PUBLISHED, target=ReviewState.REJECTED)
def trash(self):
if len(self.text) > 1000:
self.hide()
else:
self.remove()
@stage.transition(
source=ReviewState.REJECTED,
target=ReviewState.REMOVED
)
def remove(self):
pass
@stage.transition(
source=ReviewState.REJECTED,
target=ReviewState.HIDDEN
)
def hide(self):
pass
class Test(TestCase): # noqa: D101
def test_no_target_transition(self):
publication = Publication(text='test')
publication.notify()
self.assertEqual(publication.stage, ReviewState.NEW)
def test_big_publication_process(self):
publication = Publication(text='test' * 251)
self.assertEqual(publication.stage, ReviewState.NEW)
publication.publish()
self.assertEqual(publication.stage, ReviewState.PUBLISHED)
publication.trash()
self.assertEqual(publication.stage, ReviewState.HIDDEN)
def test_small_publication_process(self):
publication = Publication(text='test' * 249)
self.assertEqual(publication.stage, ReviewState.NEW)
publication.publish()
self.assertEqual(publication.stage, ReviewState.PUBLISHED)
publication.trash()
self.assertEqual(publication.stage, ReviewState.REMOVED)
def test_available_transitions(self):
self.assertEqual([
(transition.target, transition.slug)
for transition in Publication.stage.get_outgoing_transitions(ReviewState.NEW)
], [
(None, 'notify'),
(ReviewState.PUBLISHED, 'publish'),
(ReviewState.PUBLISHED, 'toggle')
])
self.assertEqual([
(transition.target, transition.slug)
for transition in Publication.stage.get_outgoing_transitions(ReviewState.PUBLISHED)
], [
(ReviewState.NEW, 'toggle'),
(ReviewState.REJECTED, 'trash')
])
self.assertEqual([
(transition.target, transition.slug)
for transition in Publication.stage.get_outgoing_transitions(ReviewState.REJECTED)
], [
(ReviewState.HIDDEN, 'hide'),
(ReviewState.REMOVED, 'remove')
])

View File

@@ -0,0 +1,97 @@
from unittest import TestCase
from viewflow import fsm
from django.db.models import TextChoices
from django.utils.translation import gettext_lazy as _
class ReviewState(TextChoices): # noqa:D100
NEW = 'NEW', _('New')
APPROVED = 'APPROVED', _('Approved')
REJECTED = 'REJECTED', _('Rejected')
PUBLISHED = 'PUBLISHED', _('Published')
HIDDEN = 'HIDDEN', _('Hidden')
REMOVED = 'REMOVED', _('Removed')
class Publication(object): # noqa:D100
stage = fsm.State(ReviewState, default=ReviewState.NEW)
def __init__(self, text):
self.text = text
@stage.transition(
source=ReviewState.NEW,
target=ReviewState.PUBLISHED
)
def publish(self):
pass
@stage.transition(
source=fsm.State.ANY,
target=ReviewState.REMOVED
)
def remove(self):
pass
@stage.transition(
source=ReviewState.PUBLISHED,
target=ReviewState.HIDDEN
)
def hide(self):
pass
# REST
# Admin
# Viewset
class Test(TestCase): # noqa:D100
def setUp(self):
self.publication = Publication(text='test publication')
self.assertEqual(self.publication.stage, ReviewState.NEW)
def test_direct_modifications_not_allowed(self):
with self.assertRaises(AttributeError):
self.publication.stage = ReviewState.REMOVED
def test_method_transitions_list(self):
transitions = {
(transition.source, transition.target)
for transition in self.publication.publish.get_transitions()
}
self.assertEqual({
(ReviewState.NEW, ReviewState.PUBLISHED),
}, transitions)
def test_field_transitions(self):
transitions = {
(transition.source, transition.target)
for transitions in Publication.stage.get_transitions().values()
for transition in transitions
}
self.assertEqual({
(ReviewState.NEW, ReviewState.PUBLISHED),
(ReviewState.PUBLISHED, ReviewState.HIDDEN),
(fsm.State.ANY, ReviewState.REMOVED),
}, transitions)
def test_allowed_transitions_succeed(self):
self.publication.publish()
self.assertEqual(self.publication.stage, ReviewState.PUBLISHED)
self.publication.remove()
self.assertEqual(self.publication.stage, ReviewState.REMOVED)
def test_prohibited_transition_failed(self):
self.assertRaises(fsm.TransitionNotAllowed, self.publication.hide)
def test_transitions_can_proceed(self):
self.assertTrue(self.publication.publish.can_proceed())
self.assertTrue(self.publication.remove.can_proceed())
self.assertFalse(self.publication.hide.can_proceed())
self.publication.publish()
self.assertFalse(self.publication.publish.can_proceed())
self.assertTrue(self.publication.remove.can_proceed())
self.assertTrue(self.publication.hide.can_proceed())

View File

@@ -0,0 +1,42 @@
"""Tests for @transition(conditions=...) parameter."""
from django.test import TestCase
from viewflow import this
from viewflow.fsm import State, TransitionNotAllowed
from .test_fsm__basics import ReviewState
class _Publication(object):
stage = State(
ReviewState,
default=ReviewState.NEW
)
def __init__(self, text):
self.text = text
@stage.transition(
source=ReviewState.NEW,
target=ReviewState.PUBLISHED,
conditions=[this.is_long]
)
def publish(self):
pass
def is_long(self):
return State.CONDITION(
len(self.text) > 1000,
unmet="Review is too short"
)
class _Test(TestCase):
def test_conditions_met(self):
publication = _Publication('test' * 251)
publication.publish()
def test_unmet_conditions_raises_exception(self):
publication = _Publication('test' * 249)
with self.assertRaises(TransitionNotAllowed):
publication.publish()

View File

@@ -0,0 +1,116 @@
from django.utils.translation import gettext_lazy as _
from unittest import TestCase
from viewflow import fsm, this
from .test_fsm__basics import Publication, ReviewState
class GuestPublication(Publication):
@Publication.stage.transition(
source=ReviewState.NEW,
target=ReviewState.APPROVED,
permission=this.is_approver
)
def approve(self):
pass
@Publication.stage.transition(
source=ReviewState.NEW,
target=ReviewState.REJECTED,
permission=this.is_approver
)
def reject(self):
pass
@Publication.stage.transition(
source=ReviewState.HIDDEN,
target=ReviewState.PUBLISHED,
permission=this.is_superuser
)
@Publication.stage.transition(
source=ReviewState.APPROVED,
target=ReviewState.PUBLISHED
)
def publish(self):
pass
@Publication.stage.transition(
source=fsm.State.ANY,
target=ReviewState.HIDDEN,
conditions=[this.is_short]
)
def hide(self):
pass
@Publication.stage.super()
def remove(self):
super().remove.original()
def is_superuser(self, user):
return user.is_superuser
def is_approver(self, user):
return user.is_staff
is_approver.unmet_message = _("You have no staff rights")
def is_short(self):
text_length = len(self.text)
return fsm.State.UNMET(
text_length < 1000,
_("Review is too shot, add %d symbols") % 1000 - text_length
)
class Test(TestCase):
def setUp(self):
self.publication = GuestPublication(text='test publication')
self.assertEqual(self.publication.stage, ReviewState.NEW)
def test_method_transitions_list(self):
transitions = {
(transition.source, transition.target)
for transition in self.publication.publish.get_transitions()
}
self.assertEqual({
(ReviewState.APPROVED, ReviewState.PUBLISHED),
(ReviewState.HIDDEN, ReviewState.PUBLISHED),
}, transitions)
def test_field_transitions(self):
transitions = {
(transition.source, transition.target)
for transitions in GuestPublication.stage.get_transitions().values()
for transition in transitions
}
self.assertEqual({
(ReviewState.NEW, ReviewState.APPROVED),
(ReviewState.NEW, ReviewState.REJECTED),
(ReviewState.APPROVED, ReviewState.PUBLISHED),
(ReviewState.HIDDEN, ReviewState.PUBLISHED),
(fsm.State.ANY, ReviewState.HIDDEN),
(fsm.State.ANY, ReviewState.REMOVED),
}, transitions)
def test_redefined_transition_failed(self):
self.assertRaises(fsm.TransitionNotAllowed, self.publication.publish)
def test_approvement_workflow_succeed(self):
self.publication.approve()
self.assertEqual(self.publication.stage, ReviewState.APPROVED)
self.publication.publish()
self.assertEqual(self.publication.stage, ReviewState.PUBLISHED)
self.publication.remove()
self.assertEqual(self.publication.stage, ReviewState.REMOVED)
def test_rejection_workflow_succeed(self):
self.publication.reject()
self.assertEqual(self.publication.stage, ReviewState.REJECTED)
self.assertRaises(fsm.TransitionNotAllowed, self.publication.publish)
def test_transition_permission(self):
pass
def test_transition_condition(self):
pass

View File

@@ -0,0 +1,63 @@
from django.db import models
from django.test import TestCase
from viewflow import fsm
from .test_fsm__basics import ReviewState
class Report(models.Model):
text = models.TextField()
stage = models.CharField(max_length=150)
class ReportReview(object):
stage = fsm.State(ReviewState, default=ReviewState.NEW)
def __init__(self, report):
self.report = report
@stage.setter()
def _set_report_stage(self, value):
self.report.stage = value
@stage.getter()
def _get_report_stage(self):
return self.report.stage
@stage.on_success()
def _on_transition_success(self, descriptor, source, target):
self.report.save()
@stage.transition(
source=ReviewState.NEW,
target=ReviewState.PUBLISHED
)
def publish(self):
pass
@stage.transition(
source=fsm.State.ANY,
target=ReviewState.REMOVED
)
def remove(self):
pass
class Test(TestCase):
def test_default_state_of_new_object(self):
report = Report(text="test")
review = ReportReview(report)
self.assertEqual(review.stage, ReviewState.NEW)
def test_state_of_existing_object(self):
report = Report.objects.create(stage=ReviewState.PUBLISHED, text="test")
review = ReportReview(report)
self.assertEqual(review.stage, ReviewState.PUBLISHED)
def test_state_success_called(self):
report = Report(text="test")
review = ReportReview(report)
review.publish()
self.assertEqual(review.stage, ReviewState.PUBLISHED)
self.assertEqual(report.stage, ReviewState.PUBLISHED)
self.assertTrue(report.pk is not None)

View File

@@ -0,0 +1,46 @@
"""Test for permission check."""
from django.contrib.auth.models import User
from django.test import TestCase
from viewflow import this
from viewflow.fsm import State
from .test_fsm__basics import ReviewState
class _Publication(object):
stage = State(ReviewState, default=ReviewState.NEW)
@stage.transition(
source=ReviewState.NEW,
target=ReviewState.REMOVED,
permission=this.can_remove_review
)
def remove(self):
pass
def can_remove_review(self, user):
return State.CONDITION(user.is_staff, unmet="Only staff users can delete reviews")
class _Test(TestCase):
def setUp(self):
self.privileged_user = User.objects.create(
username='privileged',
is_staff=True
)
self.unprivileged_user = User.objects.create(
username='unprivileged',
is_staff=False
)
def test_callable_permission(self):
publication = _Publication()
self.assertTrue(
publication.remove.has_perm(self.privileged_user)
)
self.assertFalse(
publication.remove.has_perm(self.unprivileged_user)
)

50
tests/test_middleware.py Normal file
View File

@@ -0,0 +1,50 @@
from django.test import TestCase, override_settings
from django.urls import path
from django.views.generic import TemplateView
from viewflow.urls import AppMenuMixin, Application, Site, Viewset, route
class NestedViewset(Viewset):
app_name = 'nested'
page_url = path('page/', TemplateView.as_view(template_name='viewflow/base.html'), name="page")
class CityViewset(AppMenuMixin, Viewset):
index_url = path('', TemplateView.as_view(template_name='viewflow/base.html'), name="index")
nested_url = route('nested', NestedViewset())
site = Site(title="Test site", items=[
Application(app_name="test", items=[
CityViewset(),
])
])
urlpatterns = [
path('', site.urls)
]
@override_settings(ROOT_URLCONF=__name__)
class Test(TestCase):
def test_context_injected(self):
response = self.client.get('/test/city/')
match = response.wsgi_request.resolver_match
self.assertTrue(hasattr(match, 'site'))
self.assertEqual(match.site, site)
self.assertTrue(hasattr(match, 'app'))
self.assertEqual(match.app, site._children[0])
# App -> Site -> CityViewset
self.assertTrue(hasattr(match, 'viewset'))
self.assertEqual(match.viewset, site._children[0]._children[0])
def test_nested_viewset_injected(self):
response = self.client.get('/test/city/nested/page/')
match = response.wsgi_request.resolver_match
# App -> Site -> CityViewset -> Nested
self.assertTrue(hasattr(match, 'viewset'))
self.assertEqual(match.viewset, site._children[0]._children[0].nested_url.viewset)

47
tests/test_this_object.py Normal file
View File

@@ -0,0 +1,47 @@
import functools
from django.test import TestCase
from viewflow import this
class Review(object):
approver = 'Will Smith'
publisher = 'John Doe'
def approve(self):
return 'approve'
def publish(self):
return 'publish'
def _this_owner(self, transition):
if transition == self.approve:
return self.approver
elif transition == self.publish:
return self.publisher
else:
raise ValueError(f"Can't find owner for {transition}")
def _this_call(self, transition):
def call_transition(self, transition):
return transition()
return functools.partial(call_transition, self, transition)
class Test(TestCase):
def test_this_refs_data(self):
self.assertEqual(this.some_name.name, 'some_name')
self.assertEqual(this.another_some_name.name, 'another_some_name')
def test_this_ref_resolve(self):
review = Review()
approve = this.approve.resolve(review)
self.assertEqual(approve, review.approve)
self.assertEqual(Review.approver, this.approve.owner.resolve(review))
self.assertEqual(this.approve.call.resolve(review)(), 'approve')
publish = this.publish.resolve(review)
self.assertEqual(publish, review.publish)
self.assertEqual(Review.publisher, this.publish.owner.resolve(review))
self.assertEqual(this.publish.call.resolve(review)(), 'publish')

View File

@@ -0,0 +1,22 @@
from django.test import TestCase
from viewflow import viewprop
class Viewset(object): # noqa : D101
def __init__(self, view=None):
if view is not None:
self.view = view
@viewprop
def view(self):
return 'default'
class Test(TestCase): # noqa: D101
def test_default_viewprop_value(self):
viewset = Viewset()
self.assertEqual(viewset.view, 'default')
def test_redefine_viewprop_succeed(self):
viewset = Viewset(view='new value')
self.assertEqual(viewset.view, 'new value')

View File

@@ -0,0 +1,26 @@
"""Viewflow - dev toolkit for backoffice automation."""
from django.conf import settings as django_settings
from .conf import settings
from .this_object import this
from .utils import viewprop, Icon, DEFAULT
__title__ = 'Django-Viewflow'
__version__ = '2.0a1'
__author__ = 'Mikhail Podgurskiy'
__license__ = 'AGPL'
__copyright__ = 'Copyright 2018 Mikhail Podgurskiy'
__all__ = (
'this', 'viewprop', 'Icon', 'DEFAULT'
)
default_app_config = 'viewflow.apps.ViewflowConfig'
if settings.AUTOREGISTER:
# Register site middleware
site_middleware = 'viewflow.middleware.SiteMiddleware'
if site_middleware not in django_settings.MIDDLEWARE:
django_settings.MIDDLEWARE += (site_middleware, )
turbolinks_middleware = 'viewflow.middleware.TurbolinksMiddleware'
if turbolinks_middleware not in django_settings.MIDDLEWARE:
django_settings.MIDDLEWARE += (turbolinks_middleware, )

8
viewflow/apps.py Normal file
View File

@@ -0,0 +1,8 @@
from django.apps import AppConfig
class ViewflowConfig(AppConfig):
"""Default application config."""
name = 'viewflow'
label = 'viewflow_base' # allow to user 'viewflow' label for 'viewflow.workflow'

53
viewflow/conf.py Normal file
View File

@@ -0,0 +1,53 @@
"""
Settings for Viewflow are all namespaced in the VIEWFLOW setting.
For example your project's `settings.py` file might look like this:
VIEWFLOW = {
'WIDGET_RENDERERS': {
'django.forms.DateTimeInput': 'myapp.renderers.MyDateTimeRenderer'
}
}
"""
from copy import deepcopy
from django.conf import settings as django_settings
from django.test.signals import setting_changed
from django.utils.module_loading import import_string
from viewflow.forms import renderers
DEFAULTS = {
'AUTOREGISTER': True,
'WIDGET_RENDERERS': renderers.WIDGET_RENDERERS,
}
class Settings(object):
def __init__(self, custom=None):
if custom is None:
custom = getattr(django_settings, 'VIEWFLOW', {})
self.settings = deepcopy(DEFAULTS)
for key, value in custom.get('WIDGET_RENDERERS', {}):
widget_class, renderer_class = import_string(key), import_string(value)
self.settings['WIDGET_RENDERERS'][widget_class] = renderer_class()
def __getattr__(self, attr):
if attr not in self.settings:
raise AttributeError("Invalid viewflow setting: '%s'" % attr)
return self.settings[attr]
settings = Settings()
def reload_settings(*args, **kwargs):
global settings
setting, value = kwargs['setting'], kwargs['value']
if setting == 'VIEWFLOW':
settings = Settings(value)
setting_changed.connect(reload_settings)

View File

@@ -0,0 +1,5 @@
from .renderers import Column, Fieldset, Layout, FormLayout, LayoutNode, Row, Stacked, Span
__all__ = (
'Column', 'Fieldset', 'FormLayout', 'Layout', 'LayoutNode', 'Row', 'Stacked', 'Span',
)

384
viewflow/forms/renderers.py Normal file
View File

@@ -0,0 +1,384 @@
import json
import re
from typing import List, Type, Union
from functools import lru_cache
from xml.etree import ElementTree
from django import forms
from django.core.serializers.json import DjangoJSONEncoder
from django.utils.encoding import force_str
from django.utils.formats import get_format
from django.utils.text import format_lazy
from django.utils.translation import gettext_lazy as _
from viewflow.utils import MARKER
AUTO = MARKER('AUTO')
class WidgetRenderer(object):
tag = 'vf-field-input'
def __init__(self, root: ElementTree.Element, bound_field: forms.BoundField):
self.root = root
self.bound_field = bound_field
def format_value(self, value):
if isinstance(value, (tuple, list)):
return ','.join(f"{item}" for item in value)
return f"{value}"
def create_root(self, context):
attrs = {
key: str(value) if value is not True else key
for key, value in context['widget']['attrs'].items()
if value is not False
}
attrs['name'] = context['widget']['name']
value = context['widget']['value']
if value is not None:
attrs['value'] = self.format_value(value)
if self.bound_field.label:
attrs['label'] = self.bound_field.label
if self.bound_field.errors:
attrs['error'] = self.bound_field.errors[0]
if self.bound_field.help_text:
attrs['help-text'] = self.bound_field.help_text
return ElementTree.SubElement(self.root, self.tag, **attrs)
def render(self, template_name, context, request=None):
return self.create_root(context)
@staticmethod
@lru_cache(maxsize=None)
def get_renderer(widget: forms.Widget) -> 'Type[WidgetRenderer]':
from viewflow.conf import settings
for widget_class in type(widget).mro()[:-2]:
try:
return settings.WIDGET_RENDERERS[widget_class]
except KeyError:
continue
return InputRenderer
class CheckboxRenderer(WidgetRenderer):
tag = 'vf-field-checkbox'
class InputRenderer(WidgetRenderer):
def create_root(self, context):
root = super().create_root(context)
root.attrib['type'] = context['widget']['type']
return root
class HiddenInputRenderer(WidgetRenderer):
tag = 'input'
def create_root(self, context):
root = super().create_root(context)
root.attrib['type'] = 'hidden'
return root
class PasswordInputRenderer(InputRenderer):
tag = 'vf-field-password'
class FormLayout:
"""Default form layout."""
def append_non_field_errors(self, form: forms.Form, root: ElementTree.Element):
errors = form.non_field_errors()
errors.extend(
form.error_class(bound_field.errors)
for bound_field in form.hidden_fields()
if bound_field.errors
)
if errors:
wrapper = ElementTree.SubElement(root, 'div', {
'class': 'vf-form__errors'
})
for error in errors:
child = ElementTree.SubElement(wrapper, 'div', {
'class': 'vf-form__error',
})
child.text = str(error)
def append_hidden_fields(self, form: forms.Form, root: ElementTree.Element):
hidden_fields = form.hidden_fields()
if hidden_fields:
wrapper = ElementTree.SubElement(root, 'div', {
'class': 'vf-form__hiddenfields'
})
for bound_field in hidden_fields:
self.append_field(wrapper, bound_field, HiddenInputRenderer)
def append_visible_fields(self, form: forms.Form, root: ElementTree.Element):
visible_fields = form.visible_fields()
if visible_fields:
wrapper = ElementTree.SubElement(root, 'div', {
'class': 'vf-form__visiblefields mdc-layout-grid__inner'
})
for bound_field in form.visible_fields():
container = ElementTree.SubElement(wrapper, 'div', {
'class': 'mdc-layout-grid__cell mdc-layout-grid__cell--span-12'
})
self.append_field(container, bound_field)
def append_field(self, root: ElementTree.Element, bound_field: forms.BoundField, renderer_class=None):
if renderer_class is None:
renderer_class = WidgetRenderer.get_renderer(bound_field.field.widget)
widget = bound_field.field.widget
if bound_field.field.localize:
widget.is_localized = True
attrs = bound_field.build_widget_attrs({}, widget)
if bound_field.auto_id and 'id' not in widget.attrs:
attrs.setdefault('id', bound_field.auto_id)
widget.render(
name=bound_field.html_name,
value=bound_field.value(),
attrs=attrs,
renderer=renderer_class(root, bound_field),
)
def render_form(self, form: forms.Form) -> ElementTree.Element:
root = ElementTree.Element('div', {
'class': 'vf-form mdc-layout-grid'
})
self.append_non_field_errors(form, root)
self.append_hidden_fields(form, root)
self.append_visible_fields(form, root)
return root
def render(self, form: forms.Form, default_layout: 'FormLayout' = None):
return ElementTree.tostring(
self.render_form(form),
encoding='unicode',
method='html',
)
class Layout(FormLayout):
"""Customizable form layout."""
def __init__(self, *elements, **kwargs):
self.children = _convert_to_children(elements)
super().__init__(**kwargs)
def append_visible_fields(self, form: forms.Form, root: ElementTree.Element):
wrapper = ElementTree.SubElement(root, 'div', {
'class': 'vf-form__visiblefields mdc-layout-grid__inner'
})
Column(*self.children).append(self, form, wrapper)
class LayoutNode(object):
"""Base class for self-rendered nodes."""
def __init__(self, desktop=AUTO, tablet=AUTO, mobile=AUTO):
assert desktop == AUTO or 1 <= desktop <= 12
self.desktop = desktop
assert tablet == AUTO or 1 <= tablet <= 8
self.tablet = tablet
assert mobile == AUTO or 1 <= mobile <= 4
self.mobile = mobile
def append(self, layout: FormLayout, form: forms.Form, root: ElementTree.Element):
raise NotImplementedError("Subclass should override this")
class Column(LayoutNode):
"""Place elements vertically stacked, one under another.
Example:
layout = Layout(
Row(
Column('first_name', 'last_name', desktop=8, tablet=6)
'sex_options'
)
)
"""
def __init__(self, *elements, **kwargs):
self.id_ = kwargs.pop('id_', None)
self.children = _convert_to_children(elements)
super().__init__(**kwargs)
def append(self, layout: FormLayout, form: forms.Form, root: ElementTree.Element):
if self.children:
wrapper = ElementTree.SubElement(root, 'div', {
'class': 'vf-form-column mdc-layout-grid__cell mdc-layout-grid__cell--span-12'
})
if self.id_:
wrapper.attrib['id'] = self.id_
for child in self.children:
child.append(layout, form, wrapper)
class Row(LayoutNode):
"""Spread elements over a single line.
Example:
layout = Layout(
Row(
'first_name',
Row('last_name', 'sex', tablet=5)
)
)
"""
def __init__(self, *elements, **kwargs):
self.id_ = kwargs.pop('id_', None)
self.children = _convert_to_children(elements)
super().__init__(**kwargs)
def append(self, layout: FormLayout, form: forms.Form, root: ElementTree.Element):
desktop = _children_sizes(
[child.desktop for child in self.children], grid_size=12,
grid_name='desktop', keep_in_row=True
)
tablet = _children_sizes(
[child.tablet for child in self.children], grid_size=8,
grid_name='tablet', keep_in_row=False
)
mobile = _children_sizes(
[child.mobile for child in self.children], grid_size=4,
grid_name='mobile', keep_in_row=False
)
def append_child(wrapper, child, desktop: int, tablet: int, mobile: int):
classes = ' '.join([
"mdc-layout-grid__cell",
f"mdc-layout-grid__cell--span-{desktop}-desktop",
f"mdc-layout-grid__cell--span-{tablet}-tablet",
f"mdc-layout-grid__cell--span-{mobile}-mobile"
])
element = ElementTree.SubElement(wrapper, 'div', {
'class': classes,
})
child.append(layout, form, element)
wrapper = ElementTree.SubElement(root, 'div', {
'class': "vf-form-row mdc-layout-grid__inner",
})
if self.id_:
wrapper.attrib['id'] = self.id_
for child_data in zip(self.children, desktop, tablet, mobile):
append_child(wrapper, *child_data)
class Span(LayoutNode):
"""Span a form field over several columns.
Example::
layout = Layout(
Row(Span('first_name'), Span('last_name'))
Row(
Span('email', tablet=6, mobile=3),
'sex'
)
)
By default span is auto-sized. On a desktop all auto-sized elements
would be spread equally over the free place of a row, non occupied by
elements with specific sizes.
On mobile and tablet if all elements in a row have auto-sizes,
each element would be placed in a new line. If even one element
in a row has a specific size, all auto-sized elements would be
kept in a single line, like on a desktop.
"""
def __init__(self, field_name, **kwargs):
self.field_name = field_name
super().__init__(**kwargs)
def __str__(self):
return f'Field {self.field_name} <{self.desktop}, {self.tablet}, {self.mobile}>'
def append(self, layout: FormLayout, form: forms.Form, root: ElementTree.Element):
try:
bound_field = form[self.field_name]
except KeyError as exc:
raise ValueError(
f'{self.field_name} field not found in the {type(form).__name__}'
) from exc
layout.append_field(root, bound_field)
class Fieldset(Column):
def __init__(self, title, *elements, **kwargs):
self.title = title
super().__init__(*elements, **kwargs)
def append(self, layout: FormLayout, form: forms.Form, root: ElementTree.Element):
wrapper = ElementTree.SubElement(root, 'div', {
'class': "vf-form__formset",
})
title = ElementTree.SubElement(wrapper, 'h3', {
'class': "mdc-typography--subheading2 vf-form__formset-header",
})
title.text = force_str(self.title)
super().append(layout, form, wrapper)
class Stacked(LayoutNode):
"""Render stacked inline."""
def __init__(self, *elements, **kwargs):
self.id_ = kwargs.pop('id_', None)
self.children = _convert_to_children(elements)
super().__init__(**kwargs)
def append(self, layout: FormLayout, form: forms.Form, root: ElementTree.Element):
""" TODO implement stacked render."""
# return Div(class_="vf-form-column mdc-layout-grid__cell mdc-layout-grid__cell--span-12", id_=self.id_) / [
# child.render(form) for child in self.children
# ]
def _convert_to_children(elements: List[Union[LayoutNode, str]]):
result = []
for element in elements:
if isinstance(element, LayoutNode):
result.append(element)
elif isinstance(element, str):
result.append(Span(element))
else:
raise ValueError(f"Unknown element {element} type {type(element)}")
return result
def _children_sizes(spans, grid_size=12, grid_name='desktop', keep_in_row=True):
bound = sum(span for span in spans if span != AUTO)
auto_count = sum(1 for span in spans if span == AUTO)
if bound == 0 and not keep_in_row:
# If children AUTO-sized - put every child on the own row
return [grid_size for _ in spans]
else:
rest = grid_size - bound
if rest < 0 or (auto_count != 0 and grid_size % auto_count) != 0:
raise ValueError(
f"Can't equally spread {spans} over {grid_size} columns on a {grid_name} grid"
)
return [
rest // auto_count if child == AUTO else child
for child in spans
]
WIDGET_RENDERERS = {
forms.CheckboxInput: CheckboxRenderer,
forms.HiddenInput: HiddenInputRenderer,
forms.PasswordInput: PasswordInputRenderer,
}

9
viewflow/fsm/__init__.py Normal file
View File

@@ -0,0 +1,9 @@
"""Finite state machine workflow."""
from .base import (
TransitionNotAllowed, State
)
__all__ = (
'TransitionNotAllowed', 'State'
)

386
viewflow/fsm/base.py Normal file
View File

@@ -0,0 +1,386 @@
"""Base FSM declarations."""
from __future__ import annotations
import inspect
from typing import Callable, List, Type
from viewflow.this_object import ThisObject
from viewflow.utils import MARKER
from .typing import Condition, StateTransitions
class TransitionNotAllowed(Exception):
"""Raised when a transition is not allowed."""
class Transition(object):
"""State transition definition."""
def __init__(self, func, source, target, label=None, conditions=None, permission=None): # noqa D102
self.func = func
self.source = source
self.target = target
self._label = label
self.permission = permission
self.conditions: List[Condition] = conditions if conditions else []
@property
def label(self):
"""Transition human-readable label."""
if self._label:
return self._label
else:
try:
return self.func.label
except AttributeError:
return self.func.__name__.title()
@property
def slug(self):
return self.func.__name__
def conditions_met(self, instance) -> bool:
"""Check that all associated conditions is True."""
conditions = [
condition.resolve(instance.__class__) if isinstance(condition, ThisObject) else condition
for condition in self.conditions
]
return all(map(lambda condition: condition(instance), conditions))
def has_perm(self, instance, user) -> bool:
"""Check the permission of the transition."""
if self.permission is None:
return True
elif callable(self.permission):
return self.permission(instance, user)
elif isinstance(self.permission, ThisObject):
permission = self.permission.resolve(instance)
return permission(user)
else:
raise ValueError(f"Unknown permission type {type(self.permission)}")
class TransitionMethod(object):
"""Unbound transition method wrapper.
Provides shortcut to enumerate all method transitions, ex::
Review.publish.get_transitions()
"""
do_not_call_in_templates = True
def __init__(self, state: State, func: Callable, descriptor: TransitionDescriptor, owner: Type):
self._state = state
self._func = func
self._descriptor = descriptor
self._owner = owner
self.__doc__ = func.__doc__
def get_transitions(self) -> List[Transition]:
return self._descriptor.get_transitions()
@property
def slug(self):
"""Transition name."""
return self.func.__name__
class TransitionBoundMethod(object):
"""Instance method wrapper that performs the transition."""
do_not_call_in_templates = True
class Wrapper(object):
"""Wrapper context object, to simplify __call__ method debug"""
def __init__(self, parent: 'TransitionBoundMethod', kwargs):
self.parent = parent
self.caller_kwargs = kwargs
self.initial_state = None
self.target_state = None
def __enter__(self):
self.initial_state = self.parent._state.get(self.parent._instance)
transition = self.parent._descriptor.get_transition(self.initial_state)
if transition is None:
raise TransitionNotAllowed(f'{self.parent.label} :: no transition from "{self.initial_state}"')
if not transition.conditions_met(self.parent._instance):
raise TransitionNotAllowed(
f" '{transition.label}' transition conditions have not been met"
)
self.target_state = transition.target
if self.target_state:
self.parent._state.set(self.parent._instance, self.target_state)
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
self.parent._state.set(self.parent._instance, self.initial_state)
else:
self.parent._state.transition_succeed(
self.parent._instance, self.parent,
self.initial_state, self.target_state,
**self.caller_kwargs
)
def __init__(self, state, func, descriptor, instance):
self._state: State = state
self._func: Callable = func
self._descriptor: TransitionDescriptor = descriptor
self._instance = instance
def original(self, *args, **kwargs):
"""Call the unwrapped class method."""
return self._func(self._instance, *args, **kwargs)
def can_proceed(self, check_conditions=True):
"""Check is transition available."""
current_state = self._state.get(self._instance)
transition = self._descriptor.get_transition(current_state)
if transition and check_conditions:
return transition.conditions_met(self._instance)
return False
def has_perm(self, user):
current_state = self._state.get(self._instance)
transition = self._descriptor.get_transition(current_state)
if transition:
return transition.has_perm(self._instance, user)
return False
@property
def label(self):
"""Transition human-readable label."""
current_state = self._state.get(self._instance)
transition = self._descriptor.get_transition(current_state)
if transition:
return transition.label
else:
try:
return self._func.label
except AttributeError:
return self._func.__name__.title()
def __call__(self, *args, **kwargs):
with TransitionBoundMethod.Wrapper(self, kwargs=kwargs):
return self._func(self._instance, *args, **kwargs)
def get_transitions(self):
return self._descriptor.get_transitions()
class TransitionDescriptor(object):
"""Base transition definition descriptor."""
do_not_call_in_templates = True
def __init__(self, state, func): # noqa D102
self._state = state
self._func = func
self._transitions = {}
def __get__(self, instance, owner=None):
if instance:
return TransitionBoundMethod(self._state, self._func, self, instance)
else:
return TransitionMethod(self._state, self._func, self, owner)
def add_transition(self, transition):
self._transitions[transition.source] = transition
def get_transitions(self) -> List[Transition]:
"""List of all transitions."""
return self._transitions.values()
def get_transition(self, source_state):
"""Get a transition of a source_state.
Returns None if there is no outgoing transitions.
"""
transition = self._transitions.get(source_state, None)
if transition is None:
transition = self._transitions.get(State.ANY, None)
return transition
class SuperTransitionDescriptor(object):
do_not_call_in_templates = True
def __init__(self, state, func): # noqa D102
self._state = state
self._func = func
def __get__(self, instance, owner=None):
if instance:
return TransitionBoundMethod(self._state, self._func, self.get_descriptor(instance.__class__), instance)
else:
return TransitionMethod(self._state, self._func, self.get_descriptor(owner), owner)
def get_descriptor(self, owner) -> TransitionDescriptor:
"""Lookup for the transition descriptor in the base classes."""
for cls in owner.__mro__[1:]:
if hasattr(cls, self._func.__name__):
super_method = getattr(cls, self._func.__name__)
if isinstance(super_method, TransitionMethod):
break
else:
raise ValueError('Base transition not found')
return super_method._descriptor
class StateDescriptor(object):
"""Class-bound value for a state descriptor.
Provides shortcut to enumerate all class transitions, ex::
Review.state.get_transitions()
"""
def __init__(self, state: 'State', owner: type):
self._state = state
self._owner = owner
def __getattr__(self, attr):
return getattr(self._state, attr)
def get_transitions(self) -> StateTransitions:
propname = '__fsm_{}_transitions'.format(self._state.propname)
transitions = self._owner.__dict__.get(propname, None)
if transitions is None:
transitions = {}
methods = inspect.getmembers(
self._owner,
lambda attr: isinstance(attr, TransitionMethod)
)
transitions = {
method: method.get_transitions()
for _, method in methods
}
setattr(self._owner, propname, transitions)
return transitions
def get_outgoing_transitions(self, state) -> List[Transition]:
return [
transition
for transitions in self.get_transitions().values()
for transition in transitions
if transition.source == state or (transition.source == State.ANY and transition.target != state)
]
class State(object):
"""State slot field."""
ANY = MARKER('ANY')
def __init__(self, states, default=None):
self._default = default
self._setter = None
self._getter = None
self._on_success = None
def __get__(self, instance, owner=None):
if instance:
return self.get(instance)
return StateDescriptor(self, owner)
def __set__(self, instance, value):
raise AttributeError('Direct state modification is not allowed')
def get(self, instance):
"""Get the state from the underline class instance."""
if self._getter:
value = self._getter(instance)
if self._default:
return value if value else self._default
else:
return value
return getattr(instance, self.propname, self._default)
def set(self, instance, value):
"""Get the state of the underline class instance."""
if self._setter:
self._setter(instance, value)
else:
setattr(instance, self.propname, value)
def transition_succeed(self, instance, descriptor, source, target, **kwargs):
if self._on_success:
self._on_success(instance, descriptor, source, target, **kwargs)
@property
def propname(self):
"""State storage attribute."""
return '__fsm{}'.format(id(self))
def transition(
self, source=None, target=None, label=None,
conditions=None, permission=None,
):
"""Transition method decorator."""
def _wrapper(func):
if isinstance(func, TransitionDescriptor):
descriptor = func
else:
descriptor = TransitionDescriptor(self, func)
source_list = source
if not isinstance(source, (list, tuple, set)):
source_list = [source]
for src in source_list:
transition = Transition(
func=descriptor._func,
source=src,
target=target,
label=label,
conditions=conditions,
permission=permission
)
descriptor.add_transition(transition)
return descriptor
return _wrapper
def super(self):
def _wrapper(func):
return SuperTransitionDescriptor(self, func)
return _wrapper
def setter(self):
def _wrapper(func):
self._setter = func
return func
return _wrapper
def getter(self):
def _wrapper(func):
self._getter = func
return func
return _wrapper
def on_success(self):
def _wrapper(func):
self._on_success = func
return func
return _wrapper
class CONDITION(object):
"""Boolean-like object to return value accompanied with a messsage from fsm conditions."""
def __init__(self, is_true: bool, unmet: str = ""):
self.is_true = is_true
self.unmet = unmet
@property
def message(self):
return self.message if self.unmet else ''
def __bool__(self):
return self.is_true

8
viewflow/fsm/typing.py Normal file
View File

@@ -0,0 +1,8 @@
from typing import Callable, Any, List, Mapping, TYPE_CHECKING
if TYPE_CHECKING:
from .base import TransitionMethod, Transition # NOQA
StateValue = Any
Condition = Callable[[Any], bool]
StateTransitions = Mapping['TransitionMethod', List['Transition']]

82
viewflow/middleware.py Normal file
View File

@@ -0,0 +1,82 @@
from django.core.exceptions import PermissionDenied
class SiteMiddleware(object):
"""
Set `site` and `app` attributes on request.resolver_match object.
"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
return self.get_response(request)
def process_view(self, request, callback, callback_args, callback_kwargs):
if not hasattr(request, 'user'):
raise ValueError(
'No `request.user` found. `django.contrib.auth.context_processors.auth` '
'missing or `material.middleware.site included before it.` '
'You need to add auth middleware or change middlewares order.')
match = request.resolver_match
if match:
extra = getattr(match.url_name, 'extra', {})
site, app, viewset = extra.get('site'), extra.get('app'), extra.get('viewset')
if site:
if not site.has_perm(request.user):
raise PermissionDenied
if app:
if not app.has_perm(request.user):
raise PermissionDenied
if viewset:
if hasattr(viewset, 'has_perm') and not viewset.has_perm(request.user):
raise PermissionDenied
for name, value in extra.items():
setattr(request.resolver_match, name, value)
return None
def process_template_response(self, request, response):
app = getattr(request.resolver_match, 'app', None)
if app:
app_context = app.get_context_data(request)
for key, value in app_context.items():
if key in response.context_data:
raise ValueError(f'App context key {key} clashes with view response context')
else:
response.context_data[key] = value
return response
class TurbolinksMiddleware(object):
"""
Send the `Turbolinks-Location` header in response to a visit that was redirected,
and Turbolinks will replace the browsers topmost history entry .
"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
is_turbolinks = request.META.get('HTTP_TURBOLINKS_REFERRER')
is_response_redirect = response.has_header('Location')
if is_turbolinks:
if is_response_redirect:
location = response['Location']
prev_location = request.session.pop('_turbolinks_redirect_to', None)
if prev_location is not None:
# relative subsequent redirect
if location.startswith('.'):
location = prev_location.split('?')[0] + location
request.session['_turbolinks_redirect_to'] = location
else:
if request.session.get('_turbolinks_redirect_to'):
location = request.session.pop('_turbolinks_redirect_to')
response['Turbolinks-Location'] = location
return response

View File

@@ -7,6 +7,7 @@ from django.utils.encoding import force_text
from django.utils.html import conditional_escape
from viewflow.contrib import auth
from viewflow.forms import FormLayout
from viewflow.urls import Site, Viewset
@@ -99,6 +100,46 @@ class ViewsetURLNode(template.Node):
return url
@register.tag('render')
class FormNode(template.Node):
"""
Render a django form using google material-components-web library.
Example:
{% render_form form [layout] %}
"""
default_layout = FormLayout()
def __init__(self, parser, token):
bits = token.split_contents()
layout_expr = None
if len(bits) == 2:
tag, form_expr = bits
elif len(bits) == 3:
tag, form_expr, layout_expr = bits
else:
raise template.TemplateSyntaxError(
"Invalid syntax in material tag, expects only form and optional layout arguments.")
self.form_expr = parser.compile_filter(form_expr)
self.layout_expr = parser.compile_filter(layout_expr) if layout_expr else None
def render(self, context):
form = self.form_expr.resolve(context)
if not isinstance(form, forms.BaseForm):
raise template.TemplateSyntaxError("material tag first argument must be a form")
layout = None
if self.layout_expr:
layout = self.layout_expr.resolve(context)
if layout and not isinstance(layout, FormLayout):
raise template.TemplateSyntaxError("material tag second argument must be a layout")
return layout.render(form) if layout else self.default_layout.render(form)
@register.tag('get_absolute_url')
class AbsoluteURLNode(template.Node):
"""

49
viewflow/this_object.py Normal file
View File

@@ -0,0 +1,49 @@
"""Forward-reference for class-body declarations"""
class ThisMethod(object):
"""Reference to a method"""
def __init__(self, propname, methodname):
self._propname = propname
self._methodname = methodname
def resolve(self, instance):
# TODO meaningfull exception
prop = getattr(instance, self._propname)
method = getattr(instance, f'_this_{self._methodname}')
return method(prop)
class ThisObject(object):
"""Helper for forward references"""
def __init__(self, name): # noqa D102
self.name = name
def resolve(self, instance):
# TODO meaningfull exception
return getattr(instance, self.name)
def __getattr__(self, name):
return ThisMethod(self.name, name)
class This(object):
"""Helper for building forward references.
The rationale is ability to specify references to the class
attributes and methods before they are declared.
`this` is like a `self` but for the class body.
"""
def resolve(self, instance, this_ref):
if isinstance(this_ref, (ThisObject, ThisMethod)):
return this_ref.resolve(instance)
else:
return this_ref
def __getattr__(self, name):
return ThisObject(name)
this = This()