<html><head><meta name="color-scheme" content="light dark"></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;">from __future__ import annotations

import abc
from asyncio import (
    FIRST_COMPLETED,
    CancelledError,
    Queue,
    Task,
    create_task,
    get_running_loop,
    wait,
)
from collections import Counter
from collections.abc import Sequence
from contextlib import AsyncExitStack
from logging import getLogger
from typing import (
    Any,
    Callable,
    Generic,
    NamedTuple,
    NewType,
    TypeVar,
    cast,
)
from uuid import uuid4
from weakref import ref as weakref

from anyio import Semaphore
from typing_extensions import TypeAlias

from reactpy.config import (
    REACTPY_ASYNC_RENDERING,
    REACTPY_CHECK_VDOM_SPEC,
    REACTPY_DEBUG_MODE,
)
from reactpy.core._life_cycle_hook import LifeCycleHook
from reactpy.core.types import (
    ComponentType,
    EventHandlerDict,
    Key,
    LayoutEventMessage,
    LayoutUpdateMessage,
    VdomChild,
    VdomDict,
    VdomJson,
)
from reactpy.core.vdom import validate_vdom_json
from reactpy.utils import Ref

logger = getLogger(__name__)


class Layout:
    """Responsible for "rendering" components. That is, turning them into VDOM."""

    __slots__: tuple[str, ...] = (
        "root",
        "_event_handlers",
        "_rendering_queue",
        "_render_tasks",
        "_render_tasks_ready",
        "_root_life_cycle_state_id",
        "_model_states_by_life_cycle_state_id",
    )

    if not hasattr(abc.ABC, "__weakref__"):  # nocov
        __slots__ += ("__weakref__",)

    def __init__(self, root: ComponentType) -&gt; None:
        super().__init__()
        if not isinstance(root, ComponentType):
            msg = f"Expected a ComponentType, not {type(root)!r}."
            raise TypeError(msg)
        self.root = root

    async def __aenter__(self) -&gt; Layout:
        # create attributes here to avoid access before entering context manager
        self._event_handlers: EventHandlerDict = {}
        self._render_tasks: set[Task[LayoutUpdateMessage]] = set()
        self._render_tasks_ready: Semaphore = Semaphore(0)

        self._rendering_queue: _ThreadSafeQueue[_LifeCycleStateId] = _ThreadSafeQueue()
        root_model_state = _new_root_model_state(self.root, self._schedule_render_task)

        self._root_life_cycle_state_id = root_id = root_model_state.life_cycle_state.id
        self._model_states_by_life_cycle_state_id = {root_id: root_model_state}
        self._schedule_render_task(root_id)

        return self

    async def __aexit__(self, *exc: object) -&gt; None:
        root_csid = self._root_life_cycle_state_id
        root_model_state = self._model_states_by_life_cycle_state_id[root_csid]

        for t in self._render_tasks:
            t.cancel()
            try:
                await t
            except CancelledError:
                pass

        await self._unmount_model_states([root_model_state])

        # delete attributes here to avoid access after exiting context manager
        del self._event_handlers
        del self._rendering_queue
        del self._root_life_cycle_state_id
        del self._model_states_by_life_cycle_state_id

    async def deliver(self, event: LayoutEventMessage) -&gt; None:
        """Dispatch an event to the targeted handler"""
        # It is possible for an element in the frontend to produce an event
        # associated with a backend model that has been deleted. We only handle
        # events if the element and the handler exist in the backend. Otherwise
        # we just ignore the event.
        handler = self._event_handlers.get(event["target"])

        if handler is not None:
            try:
                await handler.function(event["data"])
            except Exception:
                logger.exception(f"Failed to execute event handler {handler}")
        else:
            logger.info(
                f"Ignored event - handler {event['target']!r} "
                "does not exist or its component unmounted"
            )

    async def render(self) -&gt; LayoutUpdateMessage:
        if REACTPY_ASYNC_RENDERING.current:
            return await self._parallel_render()
        else:  # nocov
            return await self._serial_render()

    async def _serial_render(self) -&gt; LayoutUpdateMessage:  # nocov
        """Await the next available render. This will block until a component is updated"""
        while True:
            model_state_id = await self._rendering_queue.get()
            try:
                model_state = self._model_states_by_life_cycle_state_id[model_state_id]
            except KeyError:
                logger.debug(
                    "Did not render component with model state ID "
                    f"{model_state_id!r} - component already unmounted"
                )
            else:
                return await self._create_layout_update(model_state)

    async def _parallel_render(self) -&gt; LayoutUpdateMessage:
        """Await to fetch the first completed render within our asyncio task group.
        We use the `asyncio.tasks.wait` API in order to return the first completed task.
        """
        await self._render_tasks_ready.acquire()
        done, _ = await wait(self._render_tasks, return_when=FIRST_COMPLETED)
        update_task: Task[LayoutUpdateMessage] = done.pop()
        self._render_tasks.remove(update_task)
        return update_task.result()

    async def _create_layout_update(
        self, old_state: _ModelState
    ) -&gt; LayoutUpdateMessage:
        new_state = _copy_component_model_state(old_state)
        component = new_state.life_cycle_state.component

        async with AsyncExitStack() as exit_stack:
            await self._render_component(exit_stack, old_state, new_state, component)

        if REACTPY_CHECK_VDOM_SPEC.current:
            validate_vdom_json(new_state.model.current)

        return {
            "type": "layout-update",
            "path": new_state.patch_path,
            "model": new_state.model.current,
        }

    async def _render_component(
        self,
        exit_stack: AsyncExitStack,
        old_state: _ModelState | None,
        new_state: _ModelState,
        component: ComponentType,
    ) -&gt; None:
        life_cycle_state = new_state.life_cycle_state
        life_cycle_hook = life_cycle_state.hook

        self._model_states_by_life_cycle_state_id[life_cycle_state.id] = new_state

        await life_cycle_hook.affect_component_will_render(component)
        exit_stack.push_async_callback(life_cycle_hook.affect_layout_did_render)
        try:
            raw_model = component.render()
            # wrap the model in a fragment (i.e. tagName="") to ensure components have
            # a separate node in the model state tree. This could be removed if this
            # components are given a node in the tree some other way
            wrapper_model: VdomDict = {"tagName": "", "children": [raw_model]}
            await self._render_model(exit_stack, old_state, new_state, wrapper_model)
        except Exception as error:
            logger.exception(f"Failed to render {component}")
            new_state.model.current = {
                "tagName": "",
                "error": (
                    f"{type(error).__name__}: {error}"
                    if REACTPY_DEBUG_MODE.current
                    else ""
                ),
            }
        finally:
            await life_cycle_hook.affect_component_did_render()

        try:
            parent = new_state.parent
        except AttributeError:
            pass  # only happens for root component
        else:
            key, index = new_state.key, new_state.index
            parent.children_by_key[key] = new_state
            # need to add this model to parent's children without mutating parent model
            old_parent_model = parent.model.current
            old_parent_children = old_parent_model["children"]
            parent.model.current = {
                **old_parent_model,
                "children": [
                    *old_parent_children[:index],
                    new_state.model.current,
                    *old_parent_children[index + 1 :],
                ],
            }

    async def _render_model(
        self,
        exit_stack: AsyncExitStack,
        old_state: _ModelState | None,
        new_state: _ModelState,
        raw_model: Any,
    ) -&gt; None:
        try:
            new_state.model.current = {"tagName": raw_model["tagName"]}
        except Exception as e:  # nocov
            msg = f"Expected a VDOM element dict, not {raw_model}"
            raise ValueError(msg) from e
        if "key" in raw_model:
            new_state.key = new_state.model.current["key"] = raw_model["key"]
        if "importSource" in raw_model:
            new_state.model.current["importSource"] = raw_model["importSource"]
        self._render_model_attributes(old_state, new_state, raw_model)
        await self._render_model_children(
            exit_stack, old_state, new_state, raw_model.get("children", [])
        )

    def _render_model_attributes(
        self,
        old_state: _ModelState | None,
        new_state: _ModelState,
        raw_model: dict[str, Any],
    ) -&gt; None:
        # extract event handlers from 'eventHandlers' and 'attributes'
        handlers_by_event: EventHandlerDict = raw_model.get("eventHandlers", {})

        if "attributes" in raw_model:
            attrs = raw_model["attributes"].copy()
            new_state.model.current["attributes"] = attrs

        if old_state is None:
            self._render_model_event_handlers_without_old_state(
                new_state, handlers_by_event
            )
            return None

        for old_event in set(old_state.targets_by_event).difference(handlers_by_event):
            old_target = old_state.targets_by_event[old_event]
            del self._event_handlers[old_target]

        if not handlers_by_event:
            return None

        model_event_handlers = new_state.model.current["eventHandlers"] = {}
        for event, handler in handlers_by_event.items():
            if event in old_state.targets_by_event:
                target = old_state.targets_by_event[event]
            else:
                target = uuid4().hex if handler.target is None else handler.target
            new_state.targets_by_event[event] = target
            self._event_handlers[target] = handler
            model_event_handlers[event] = {
                "target": target,
                "preventDefault": handler.prevent_default,
                "stopPropagation": handler.stop_propagation,
            }

        return None

    def _render_model_event_handlers_without_old_state(
        self,
        new_state: _ModelState,
        handlers_by_event: EventHandlerDict,
    ) -&gt; None:
        if not handlers_by_event:
            return None

        model_event_handlers = new_state.model.current["eventHandlers"] = {}
        for event, handler in handlers_by_event.items():
            target = uuid4().hex if handler.target is None else handler.target
            new_state.targets_by_event[event] = target
            self._event_handlers[target] = handler
            model_event_handlers[event] = {
                "target": target,
                "preventDefault": handler.prevent_default,
                "stopPropagation": handler.stop_propagation,
            }

        return None

    async def _render_model_children(
        self,
        exit_stack: AsyncExitStack,
        old_state: _ModelState | None,
        new_state: _ModelState,
        raw_children: Any,
    ) -&gt; None:
        if not isinstance(raw_children, (list, tuple)):
            raw_children = [raw_children]

        if old_state is None:
            if raw_children:
                await self._render_model_children_without_old_state(
                    exit_stack, new_state, raw_children
                )
            return None
        elif not raw_children:
            await self._unmount_model_states(list(old_state.children_by_key.values()))
            return None

        children_info = _get_children_info(raw_children)

        new_keys = {k for _, _, k in children_info}
        if len(new_keys) != len(children_info):
            key_counter = Counter(item[2] for item in children_info)
            duplicate_keys = [key for key, count in key_counter.items() if count &gt; 1]
            msg = f"Duplicate keys {duplicate_keys} at {new_state.patch_path or '/'!r}"
            raise ValueError(msg)

        old_keys = set(old_state.children_by_key).difference(new_keys)
        if old_keys:
            await self._unmount_model_states(
                [old_state.children_by_key[key] for key in old_keys]
            )

        new_state.model.current["children"] = []
        for index, (child, child_type, key) in enumerate(children_info):
            old_child_state = old_state.children_by_key.get(key)
            if child_type is _DICT_TYPE:
                old_child_state = old_state.children_by_key.get(key)
                if old_child_state is None:
                    new_child_state = _make_element_model_state(
                        new_state,
                        index,
                        key,
                    )
                elif old_child_state.is_component_state:
                    await self._unmount_model_states([old_child_state])
                    new_child_state = _make_element_model_state(
                        new_state,
                        index,
                        key,
                    )
                    old_child_state = None
                else:
                    new_child_state = _update_element_model_state(
                        old_child_state,
                        new_state,
                        index,
                    )
                await self._render_model(
                    exit_stack, old_child_state, new_child_state, child
                )
                new_state.append_child(new_child_state.model.current)
                new_state.children_by_key[key] = new_child_state
            elif child_type is _COMPONENT_TYPE:
                child = cast(ComponentType, child)
                old_child_state = old_state.children_by_key.get(key)
                if old_child_state is None:
                    new_child_state = _make_component_model_state(
                        new_state,
                        index,
                        key,
                        child,
                        self._schedule_render_task,
                    )
                elif old_child_state.is_component_state and (
                    old_child_state.life_cycle_state.component.type != child.type
                ):
                    await self._unmount_model_states([old_child_state])
                    old_child_state = None
                    new_child_state = _make_component_model_state(
                        new_state,
                        index,
                        key,
                        child,
                        self._schedule_render_task,
                    )
                else:
                    new_child_state = _update_component_model_state(
                        old_child_state,
                        new_state,
                        index,
                        child,
                        self._schedule_render_task,
                    )
                await self._render_component(
                    exit_stack, old_child_state, new_child_state, child
                )
            else:
                old_child_state = old_state.children_by_key.get(key)
                if old_child_state is not None:
                    await self._unmount_model_states([old_child_state])
                new_state.append_child(child)

    async def _render_model_children_without_old_state(
        self,
        exit_stack: AsyncExitStack,
        new_state: _ModelState,
        raw_children: list[Any],
    ) -&gt; None:
        children_info = _get_children_info(raw_children)

        new_keys = {k for _, _, k in children_info}
        if len(new_keys) != len(children_info):
            key_counter = Counter(k for _, _, k in children_info)
            duplicate_keys = [key for key, count in key_counter.items() if count &gt; 1]
            msg = f"Duplicate keys {duplicate_keys} at {new_state.patch_path or '/'!r}"
            raise ValueError(msg)

        new_state.model.current["children"] = []
        for index, (child, child_type, key) in enumerate(children_info):
            if child_type is _DICT_TYPE:
                child_state = _make_element_model_state(new_state, index, key)
                await self._render_model(exit_stack, None, child_state, child)
                new_state.append_child(child_state.model.current)
                new_state.children_by_key[key] = child_state
            elif child_type is _COMPONENT_TYPE:
                child_state = _make_component_model_state(
                    new_state, index, key, child, self._schedule_render_task
                )
                await self._render_component(exit_stack, None, child_state, child)
            else:
                new_state.append_child(child)

    async def _unmount_model_states(self, old_states: list[_ModelState]) -&gt; None:
        to_unmount = old_states[::-1]  # unmount in reversed order of rendering
        while to_unmount:
            model_state = to_unmount.pop()

            for target in model_state.targets_by_event.values():
                del self._event_handlers[target]

            if model_state.is_component_state:
                life_cycle_state = model_state.life_cycle_state
                del self._model_states_by_life_cycle_state_id[life_cycle_state.id]
                await life_cycle_state.hook.affect_component_will_unmount()

            to_unmount.extend(model_state.children_by_key.values())

    def _schedule_render_task(self, lcs_id: _LifeCycleStateId) -&gt; None:
        if not REACTPY_ASYNC_RENDERING.current:
            self._rendering_queue.put(lcs_id)
            return None
        try:
            model_state = self._model_states_by_life_cycle_state_id[lcs_id]
        except KeyError:
            logger.debug(
                "Did not render component with model state ID "
                f"{lcs_id!r} - component already unmounted"
            )
        else:
            self._render_tasks.add(create_task(self._create_layout_update(model_state)))
            self._render_tasks_ready.release()

    def __repr__(self) -&gt; str:
        return f"{type(self).__name__}({self.root})"


def _new_root_model_state(
    component: ComponentType, schedule_render: Callable[[_LifeCycleStateId], None]
) -&gt; _ModelState:
    return _ModelState(
        parent=None,
        index=-1,
        key=None,
        model=Ref(),
        patch_path="",
        children_by_key={},
        targets_by_event={},
        life_cycle_state=_make_life_cycle_state(component, schedule_render),
    )


def _make_component_model_state(
    parent: _ModelState,
    index: int,
    key: Any,
    component: ComponentType,
    schedule_render: Callable[[_LifeCycleStateId], None],
) -&gt; _ModelState:
    return _ModelState(
        parent=parent,
        index=index,
        key=key,
        model=Ref(),
        patch_path=f"{parent.patch_path}/children/{index}",
        children_by_key={},
        targets_by_event={},
        life_cycle_state=_make_life_cycle_state(component, schedule_render),
    )


def _copy_component_model_state(old_model_state: _ModelState) -&gt; _ModelState:
    # use try/except here because not having a parent is rare (only the root state)
    try:
        parent: _ModelState | None = old_model_state.parent
    except AttributeError:
        parent = None

    return _ModelState(
        parent=parent,
        index=old_model_state.index,
        key=old_model_state.key,
        model=Ref(),  # does not copy the model
        patch_path=old_model_state.patch_path,
        children_by_key={},
        targets_by_event={},
        life_cycle_state=old_model_state.life_cycle_state,
    )


def _update_component_model_state(
    old_model_state: _ModelState,
    new_parent: _ModelState,
    new_index: int,
    new_component: ComponentType,
    schedule_render: Callable[[_LifeCycleStateId], None],
) -&gt; _ModelState:
    return _ModelState(
        parent=new_parent,
        index=new_index,
        key=old_model_state.key,
        model=Ref(),  # does not copy the model
        patch_path=f"{new_parent.patch_path}/children/{new_index}",
        children_by_key={},
        targets_by_event={},
        life_cycle_state=(
            _update_life_cycle_state(old_model_state.life_cycle_state, new_component)
            if old_model_state.is_component_state
            else _make_life_cycle_state(new_component, schedule_render)
        ),
    )


def _make_element_model_state(
    parent: _ModelState,
    index: int,
    key: Any,
) -&gt; _ModelState:
    return _ModelState(
        parent=parent,
        index=index,
        key=key,
        model=Ref(),
        patch_path=f"{parent.patch_path}/children/{index}",
        children_by_key={},
        targets_by_event={},
    )


def _update_element_model_state(
    old_model_state: _ModelState,
    new_parent: _ModelState,
    new_index: int,
) -&gt; _ModelState:
    return _ModelState(
        parent=new_parent,
        index=new_index,
        key=old_model_state.key,
        model=Ref(),  # does not copy the model
        patch_path=old_model_state.patch_path,
        children_by_key={},
        targets_by_event={},
    )


class _ModelState:
    """State that is bound to a particular element within the layout"""

    __slots__ = (
        "__weakref__",
        "_parent_ref",
        "_render_semaphore",
        "children_by_key",
        "index",
        "key",
        "life_cycle_state",
        "model",
        "patch_path",
        "targets_by_event",
    )

    def __init__(
        self,
        parent: _ModelState | None,
        index: int,
        key: Any,
        model: Ref[VdomJson],
        patch_path: str,
        children_by_key: dict[Key, _ModelState],
        targets_by_event: dict[str, str],
        life_cycle_state: _LifeCycleState | None = None,
    ):
        self.index = index
        """The index of the element amongst its siblings"""

        self.key = key
        """A key that uniquely identifies the element amongst its siblings"""

        self.model = model
        """The actual model of the element"""

        self.patch_path = patch_path
        """A "/" delimited path to the element within the greater layout"""

        self.children_by_key = children_by_key
        """Child model states indexed by their unique keys"""

        self.targets_by_event = targets_by_event
        """The element's event handler target strings indexed by their event name"""

        # === Conditionally Available Attributes ===
        # It's easier to conditionally assign than to force a null check on every usage

        if parent is not None:
            self._parent_ref = weakref(parent)
            """The parent model state"""

        if life_cycle_state is not None:
            self.life_cycle_state = life_cycle_state
            """The state for the element's component (if it has one)"""

    @property
    def is_component_state(self) -&gt; bool:
        return hasattr(self, "life_cycle_state")

    @property
    def parent(self) -&gt; _ModelState:
        parent = self._parent_ref()
        if parent is None:
            raise RuntimeError("detached model state")  # nocov
        return parent

    def append_child(self, child: Any) -&gt; None:
        self.model.current["children"].append(child)

    def __repr__(self) -&gt; str:  # nocov
        return f"ModelState({ {s: getattr(self, s, None) for s in self.__slots__} })"


def _make_life_cycle_state(
    component: ComponentType,
    schedule_render: Callable[[_LifeCycleStateId], None],
) -&gt; _LifeCycleState:
    life_cycle_state_id = _LifeCycleStateId(uuid4().hex)
    return _LifeCycleState(
        life_cycle_state_id,
        LifeCycleHook(lambda: schedule_render(life_cycle_state_id)),
        component,
    )


def _update_life_cycle_state(
    old_life_cycle_state: _LifeCycleState,
    new_component: ComponentType,
) -&gt; _LifeCycleState:
    return _LifeCycleState(
        old_life_cycle_state.id,
        # the hook is preserved across renders because it holds the state
        old_life_cycle_state.hook,
        new_component,
    )


_LifeCycleStateId = NewType("_LifeCycleStateId", str)


class _LifeCycleState(NamedTuple):
    """Component state for :class:`_ModelState`"""

    id: _LifeCycleStateId
    """A unique identifier used in the :class:`~reactpy.core.hooks.LifeCycleHook` callback"""

    hook: LifeCycleHook
    """The life cycle hook"""

    component: ComponentType
    """The current component instance"""


_Type = TypeVar("_Type")


class _ThreadSafeQueue(Generic[_Type]):
    def __init__(self) -&gt; None:
        self._loop = get_running_loop()
        self._queue: Queue[_Type] = Queue()
        self._pending: set[_Type] = set()

    def put(self, value: _Type) -&gt; None:
        if value not in self._pending:
            self._pending.add(value)
            self._loop.call_soon_threadsafe(self._queue.put_nowait, value)

    async def get(self) -&gt; _Type:
        value = await self._queue.get()
        self._pending.remove(value)
        return value


def _get_children_info(children: list[VdomChild]) -&gt; Sequence[_ChildInfo]:
    infos: list[_ChildInfo] = []
    for index, child in enumerate(children):
        if child is None:
            continue
        elif isinstance(child, dict):
            child_type = _DICT_TYPE
            key = child.get("key")
        elif isinstance(child, ComponentType):
            child_type = _COMPONENT_TYPE
            key = child.key
        else:
            child = f"{child}"
            child_type = _STRING_TYPE
            key = None

        if key is None:
            key = index

        infos.append((child, child_type, key))

    return infos


_ChildInfo: TypeAlias = tuple[Any, "_ElementType", Key]

# used in _process_child_type_and_key
_ElementType = NewType("_ElementType", int)
_DICT_TYPE = _ElementType(1)
_COMPONENT_TYPE = _ElementType(2)
_STRING_TYPE = _ElementType(3)
</pre></body></html>