.. _async-with-ansar: Asynchronous Programming with Ansar *********************************** At its core, asynchronous programming (from here on referred to as just async) with ansar is based on a simple model; there are objects and those objects send messages to each other. Code executes in response to those messages and only in response to messages. To kick things off all objects are presented with a start message. In many ways async programming is about harnessing the potential of multithreading and multiprocessing. It is a unifying model for software operation that allows these underlying technologies to be applied to challenges such as concurrency. It is also about giving developers a model that copes with an innately async world, i.e. users must always be able to cancel the latest request, even where that happens to be a complex concurrent operation. Delivering a complete and practical framework for writing software based on the async model is a longer story. By presenting a series of issues and discussing the appropriate ansar solution, this document illustrates how useful async can be and how ansar delivers that potential to developers. .. note:: Source files appearing in this section can be downloaded from `here `_. The repo ``Makefile`` contains the setup needed for this guide. Related background information can be found :ref:`here`. Hello World =========== To help dispel concerns that adopting an async framework results in completely alien code, here is the classical “hello world” application written as a pure async application; .. literalinclude:: class-that-stores/hello-world.py :language: python Running this code produces the following:: $ python3 hello-world.py hello world $ A single import operation gains access to the complete set of ansar features. There is the ubiquitous arrangement around a ``main()`` function and a conditional statement checking the value of ``__name__``. The async-specific elements are the calls to ``ar.bind()`` (i.e. a synonym for :func:`~.bind.bind_any`) and :func:`~. framework.create_object`. .. note:: For a complete explanation of how to create standard ansar applications - and why - look :ref:`here`. Under the hood the ``main()`` function actually runs in its own dedicated platform thread. This construction happens inside :func:`~.framework.create_object`. Among other things it allows the ansar framework to manage control-c and the related platform signals. The ``self`` argument passed to the ``main()`` function is the gateway to all the async capabilities that the application might choose to use. To see evidence of the work being carried out by ansar, try this; .. literalinclude:: class-that-stores/hello-world-debug-brad .. note:: All ansar logging is placed on ``stderr``. To recover some real-estate, details such as process IDs and the full ISO time format have been omitted. For full information on logging within ansar look :ref:`here`. The :func:`~. framework.create_object` function is responsible for the processing of flags such as ``debug-level``. Without some indication from the command-line, all async logging is discarded. The function also accepts application-defined flags, environment variables and piped input; topics covered in later sections. Application output from ``print()`` to ``stdout`` interrupts the logging at a somewhat random point. Logging is built-in and is thread-safe. A consequence of the latter is that mingling of logging on ``stderr`` and application output on ``stdout`` can give the appearance that events are happening out of order. Applications do not have to be “all async” or “all procedural”. Ansar provides several ways to integrate async capability into an application. An existing, more traditional application might insert fragments such as this; .. code-block:: python import ansar.create as ar def hello(self): print('hello world') return 0 ar.bind(hello) def some_function(): with ar.OpenChannel() as channel: a = channel.create(hello) m = channel.select(ar.Complete) return m.value A standard synchronous call ``some_function()`` uses a context to create a ``channel``. That object becomes the gateway to async capabilities, in the same manner as the ``self`` parameter passed to ``main()`` in the earlier ``hello-world.py`` application. Applications can also add calls to :func:`~.root.start_up` and :func:`~.root.tear_down` at appropriate points in the codebase. Full async capabilities are subsequently available throughout the application. A Query, A Request And A Poll ============================= An area of interest has been identified in an existing application. There are three data items that need to be gathered and then processed into a single result. A query on a database, a request to a network API and the polling of a device all need to complete before an action can be taken. The current implementation is clear but is also sequential. .. code-block:: python selection = db_query(query) response = network_request(request) status = device_poll(poll) action = what_next(selection, response, status) Concurrency should improve throughput. The first step in an async implementation is a simulation. The three data functions are implemented as empty functions that simply consume time. The time periods used are taken from statistics gathered from logs and then randomized slightly. On average the query is known to take about 0.75s, the request about 1.75s and the poll about 0.5s. In total this is around 3.0s which threatens the quality of service requirement for system responsiveness. On paper at least, concurrency could bring this figure down to 1.75s, i.e. the longest of the three periods. A DB Query ---------- Simulation of ``db_query()`` looks like this; .. literalinclude:: class-that-stores/db-query.py :language: python The ``main()`` function is now using the ``self`` parameter to access async features. It uses the :meth:`~.point.Point.create()` method to start an instance of the ``db_query()`` function. This builds a chain of object creations - each in their own dedicated thread - that starts in :func:`~. framework.create_object` and looks similar to a chain of function calls. Arguments passed to the :meth:`~.point.Point.create()` method become arguments passed to the specified functions, e.g. the ``'query'`` value is available to the ``db_query()`` function in the ``query`` variable. When using async objects in this way there is no blocking call-return sequence. Instead there needs to be a separate "wait" for a signal or notification that an object has terminated. This is received by the :meth:`~.pending.Buffering.select()` method in the form of an :class:`~.lifecycle.Completed` message. The value returned by the terminated Python function is contained in that message, i.e. ``m.value``. In this simulation this will always be a ``str`` with the value ``‘selection’``. The create-wait style of operation obviously does not replace the call-return style. Users of ansar get to choose whichever is appropriate. The implementation of ``db_query()`` involves; * the calculation of a wait period using the ``adjust()`` function, * starting a timer with the async :meth:`~.point.Point.start()` method, * waiting for the timer signal using the :meth:`~.pending.Buffering.select()` method and * returning the ‘selection’ value. Running the simulation looks like this; .. literalinclude:: class-that-stores/db-query-debug-brad The chain of object creations can be seen in the logs. The framework creates the instance of ``main()`` and ``main()`` creates the instance of ``db_query()``. The chain unwinds starting with ``db_query()`` catching the timer ``ar.T1`` message, ``main()`` catching the termination of ``db_query()`` and the framework catching completion of ``main()``. Interruption Of A Query ----------------------- For the purposes of clarity, one important requirement was deferred for the first iteration. All async objects are required to observe the interrupt protocol, which begins with the receipt of a :class:`~.lifecycle.Stop` message. This special message can be received at any time. A proper implementation of the simulation looks like this; .. literalinclude:: class-that-stores/db-query-stop.py :language: python The :func:`~. framework.create_object` function maps the platform signal SIGINT to a :class:`~.lifecycle.Stop` message, forwarding it to the instance of ``main()``. If you are quick enough on the keyboard, hitting control-c within the 0.75 second window initiates an interrupt. Code changes include the addition of :class:`~.lifecycle.Stop` on the calls to :meth:`~.pending.Buffering.select()` and related conditional code blocks. In the case of the ``main()`` object this means forwarding any :class:`~.lifecycle.Stop` message on to the ``db_query()`` object and waiting for the subsequent unwinding. The logs for an interrupt look like:: $ python3 db-query-stop.py --debug-level=DEBUG 01:48:05.876 + <00000007>start_vector - Created by <00000001> 01:48:05.876 ~ <00000007>start_vector - Executable "/home/brad/gh/ansar-create/doc/source/class-that-stores/db-query-stop.py" as process (123674) 01:48:05.876 ~ <00000007>start_vector - Working folder "/home/brad/gh/ansar-create/doc/source/class-that-stores" 01:48:05.876 ~ <00000007>start_vector - Running object "__main__.main" 01:48:05.876 ~ <00000007>start_vector - Class threads (1) "retries" (1) 01:48:05.876 + <00000008>main - Created by <00000007> 01:48:05.876 + <00000009>db_query - Created by <00000008> 01:48:05.876 > <00000009>db_query - Sent StartTimer to <00000003> ^C01:48:06.577 < <00000007>start_vector - Received Stop from <00000001> 01:48:06.577 > <00000007>start_vector - Sent Stop to <00000008> 01:48:06.577 < <00000008>main - Received Stop from <00000007> 01:48:06.577 > <00000008>main - Sent Stop to <00000009> 01:48:06.578 < <00000009>db_query - Received Stop from <00000008> 01:48:06.578 X <00000009>db_query - Destroyed 01:48:06.578 < <00000008>main - Received Completed from <00000009> 01:48:06.578 X <00000008>main - Destroyed 01:48:06.579 < <00000007>start_vector - Received Completed from <00000008> 01:48:06.579 X <00000007>start_vector - Destroyed { "value": [ "ansar.create.lifecycle.Aborted", {}, [] ] } The response to a control-c is that the ``main()`` object returns an :class:`~.lifecycle.Aborted` message to the framework. A JSON encoding of that message is placed on ``stdout`` as the result of the ``db-query-interrupt.py`` application. Any message returned to the framework is handled in the same way. An ``int`` is a special case and is assumed to be an “exit code”. The framework silently exits with the presented value. Note the ``^C`` printed by the terminal as a representation of receiving the control-c. Query, Request and Poll As Objects ---------------------------------- The next step is to introduce the other two data sources to our simulation. These are simply duplications of the ``db_query()`` object except with different time delay values; .. literalinclude:: class-that-stores/query-request-poll.py :language: python This establishes a reference for comparison with the next implementation. The logs for this sequential version appear below; .. literalinclude:: class-that-stores/query-request-poll-debug-brad The timestamps show that from the moment ``db_query()`` is created through to the ``main()`` function receiving the completion of ``device_poll()`` consumes over 3.5 seconds (this figure varies as the logs are auto-generated by the documentation system). This is a little higher than the sum of the averages but there is operational overhead and the adjustments added to the timeouts. A Concurrent Approach --------------------- Finally we can implement concurrency. In this implementation the three objects are all created before the ``main()`` object shifts to waiting for completion messages. The effect is that the three threads associated with the instances of ``db_query()``, ``network_request()`` and ``device_poll()`` are all running at the same time. .. literalinclude:: class-that-stores/query-request-poll-concurrent.py :language: python The most difficult part is probably the support of interruptions. An interruption can arrive after 0 or more of the 3 objects have already terminated. The ``main()`` object needs to track who is still active so that it can propagate the :class:`~.lifecycle.Stop` message to the appropriate child objects. The solution shown here is rather “brute-force” in that it achieves the goal without actually processing the materials returned by the 3 objects, and without using ansar features designed for just this purpose. It’s good enough for this simulation. An execution looks like this; .. literalinclude:: class-that-stores/query-request-poll-concurrent-debug-brad Repeating the previous calculation we get around 2.0 seconds, substantially less than the sequential version and verifying the potential that concurrency has to improve throughput. Again, that comparison is muddied by the presence of random adjustments. The reason for including that randomization will emerge in a later section. Conclusion Of The Simulation ---------------------------- The potential benefits of concurrency should not be a surprise to anyone. As long as the underlying activities of the 3 objects do not involve shared resources, moving to a concurrent approach should realize improvements of the order demonstrated by the simulation, whatever the development toolset might be. The real purpose of the simulation was to illustrate the use of async programming with ansar, to show how easy it might be to adopt an async approach to different programming challenges. A cut-back coding style was adopted that involved the least async detail possible. The results were hopefully clearer but they also barely scratched the surface of what ansar offers. As an example, there is a real case for implementing objects such as ``db_query()``, ``network_request()`` and ``device_poll()`` as state machines, or even as messages that are sent to state machines. Ansar is also capable of treating *processes* as async objects - it is technically reasonable to implement ``device_poll()`` as a standalone application while also fully preserving interrupt capability. Refer to following sections. Converting Functions To Machines ================================ Knowing that ansar would be making substantial use of threads meant that strong management would be needed. Given the nature of multi-threading, it was a good match to use the async machinery itself. Threads became “first-order” types through the support for function objects like ``db_query()``, alongside 2 other object types known as machines. Machines are implemented as the classes :class:`~.create.machine.Stateless` and :class:`~.create.machine.StateMachine`. To the parent of an object, these variations on the implementation of an object are completely transparent. Object implementations can switch type with no impact on the objects around them, including the parent. The function ``db_query()`` can become the ``class DbQuery(ar.Point,ar.Stateless)`` without affecting the ``main()`` parent at all. That type switch looks like this; .. literalinclude:: class-that-stores/db-query-stateless.py :language: python A class definition along with a small set of related functions has replaced the ``db_query()`` function definition. A name change from ``db_query`` to ``DbQuery``, was all that was required in the ``main()`` function (a note for the more pedantic - the name could have stayed the same for zero change to main). .. literalinclude:: class-that-stores/db-query-stateless-debug-brad An async object such as ``DbQuery`` is the combination of a standard data object and a required set of *transition functions*. The transitions are called in response to the receipt of messages, or stated another way; messages are dispatched to transitions based on the type of the received message. The standard data object is passed on every transition call as the first parameter, i.e. ``self``, in the same manner as the ``self`` parameter is passed to function objects such as ``main()``. The runtime tables required to make all this work are constructed by the call to ``ar.bind()`` (:func:`~.machine.bind_stateless`) using supplied dispatch information, e.g. ``DB_QUERY_DISPATCH``. Names for each implied transition - such as ``DbQuery_Start()`` - are searched for across all the loaded modules and then saved in internal dispatch machinery. The combination of a class definition and a set of global functions may seem odd. The reasons for this particular approach are slightly technical but are motivated by runtime efficiency. Other popular approaches to dispatching such as defining the transitions as method functions within the ``DbQuery`` class, incur a significant runtime overhead. On the flip-side, this arrangement forces a breakdown of the problem at hand. Each individual transition function is small and the perceived imposition becomes a benefit as demands on the object become more complex. Managing The Allocation Of Threads ---------------------------------- Operationally, ``db_query()`` and ``DbQuery`` are substantially different. Function objects have a fresh, new platform thread assigned to them at creation time, whereas machines do not. Instead, machines are assigned to a thread at registration time, i.e. inside ``ar.bind()``. The assigned thread is the only thread to ever call the transition functions of a particular machine - a necessary multithreading guarantee. There are 3 mechanisms for managing the assignment of threads to machines; * do nothing and accept a default assignment or, * use a special base class in the machine inheritance list to request a dedicated thread for each instance of the machine or, * specify a named thread at registration time to be shared by one or more classes of machine. Accepting the default assignment is fine for many applications. A special background thread is created by the ansar runtime (i.e. inside the call to :func:`~. framework.create_object`) and all the work associated with all “default assignment” machines is performed there. As applications become larger and more complex, a significant quantity of work can fall on the shoulders of that thread. Eventually this can produce a performance bottleneck though this is an issue mitigated by specifics of the Python interpreter (i.e. the GIL). More importantly, in extreme cases it may create a context conducive to deadlocks. This is a rare but real issue for any multithreaded application. There is some protection in using one of the explicit thread assignment techniques to increase the number of threads working inside the dispatch machinery. As a side-benefit this is also likely to help with application throughput and responsiveness, particularly where I/O is involved. To arrange for a dedicated thread for each instance of a machine, use the ``ar.Threaded`` base class rather than the :class:`~.point.Point` class (the former inherits the latter); .. code-block:: python class DbQuery(ar.Threaded, ar.Stateless): def __init__(self, query): ar.Threaded.__init__(self) ar.Stateless.__init__(self) self.query = query This change guarantees the assignment of a distinct platform thread for every instance of a ``DbQuery`` machine. If a thread-per-instance is overly heavy-handed this change to the registration of the non-threaded DbQuery is the other option; .. code-block:: python ar.bind(DbQuery, DB_QUERY_DISPATCH, thread='db') Execution of this version looks like this; .. literalinclude:: class-that-stores/db-query-machine-threaded-debug-brad The ``start_vector`` (part of the :func:`~. framework.create_object` machinery) is now listing “db” as one of the named threads. The log entry means that there are now 2 named threads created during startup (the full complement of framework threads is around 6), called “retries” and “db” respectively and both of these threads have a single class assigned to them. All instances of a single class (i.e. ``DbQuery``) will execute on the “db” thread and something similar will happen with the “retries” thread. Converting Functions To Message Servers ======================================= Creating an async object to perform work and produce a result is one pattern of use. Another pattern can be useful in scenarios involving resources such as database servers and network APIs. Resources such as these typically involve some form of connection. To make requests to a network API the client must first establish that connection, and when the client is finished making requests the connection should be closed. Connections are also apt to fail at inconvenient moments, requiring an automated reconnection strategy. A server object is needed to manage the ongoing status of the connection and to keep the wider application free of the associated concerns. At the same time that object must accept requests from the application and present them over the connection. Making a network request now involves the sending of a message to that object and receiving a response, rather than the full lifecycle of an object. Application code changes from this; .. code-block:: python r = self.create(NetworkRequest, 'request') m = self.select(ar.Completed, ar.Stop) To this; .. code-block:: python r = self.send(NetworkRequest('request'), api) m = self.select(NetworkResponse, ar.Stop) Where ``api`` is the address of the new server object and both ``NetworkRequest`` and ``NetworkResponse`` are class definitions of messages. As well as managing the connection issue, this arrangement is much more efficient. A minimal simulation of the server looks like this; .. literalinclude:: class-that-stores/network-request-server.py :language: python The ``NetworkServer`` object is a state machine or FSM. Declaration of a state machine in ansar is similar to the declaration of a stateless machine with the addition of; * declaration of the possible states - ``INITIAL``, ``READY`` and ``STANDBY``, * inheriting from the proper base class - ``NetworkServer(.., ar.StateMachine)``, * assignment of the initial state - ``ar.StateMachine.__init__(self, INITIAL)``, * declaration of more complex dispatch information - ``NETWORK_SERVER_DISPATCH``, * transition functions that return the next state - ``return READY``. The object manages the connection, initiating retries as appropriate. It also functions as an intermediary between clients and the underlying service, responding with messages appropriate to the current connection state. In this minimal use of a server object, i.e. the single instance of sending a ``NetworkRequest``, there is no real opportunity for the server to flex its capabilities but there is sufficient code to illustrate the pattern. The ``connect()``, ``request_response()`` and ``disconnect()`` functions are representative placeholders. The first 2 functions include simulations of fallibility, to motivate the machine to check the results and modify its behaviour accordingly. A successful request looks like this; .. literalinclude:: class-that-stores/network-request-server-debug-brad Where the ``connect()`` fails, the logs look like this; .. code-block:: :emphasize-lines: 12,13 $ python3 network-request-server.py --debug-level=DEBUG 00:29:53.189 + <00000007>start_vector - Created by <00000001> 00:29:53.189 ~ <00000007>start_vector - Executable "/home/brad/gh/ansar-create/doc/source/class-that-stores/network-request-server.py" as process (128847) 00:29:53.189 ~ <00000007>start_vector - Working folder "/home/brad/gh/ansar-create/doc/source/class-that-stores" 00:29:53.189 ~ <00000007>start_vector - Running object "__main__.main" 00:29:53.189 ~ <00000007>start_vector - Class threads (1) "retries" (1) 00:29:53.189 + <00000008>main - Created by <00000007> 00:29:53.189 + <00000009>NetworkServer[INITIAL] - Created by <00000008> 00:29:53.189 > <00000008>main - Sent NetworkRequest to <00000009> 00:29:53.190 < <00000009>NetworkServer[INITIAL] - Received Start from <00000008> 00:29:53.190 > <00000009>NetworkServer[INITIAL] - Sent StartTimer to <00000003> 00:29:53.190 < <00000009>NetworkServer[STANDBY] - Received NetworkRequest from <00000008> 00:29:53.190 > <00000009>NetworkServer[STANDBY] - Sent NetworkDown to <00000008> 00:29:53.190 < <00000008>main - Received NetworkDown from <00000009> 00:29:53.190 > <00000008>main - Sent Stop to <00000009> 00:29:53.190 < <00000009>NetworkServer[STANDBY] - Received Stop from <00000008> 00:29:53.190 X <00000009>NetworkServer[STANDBY] - Destroyed 00:29:53.190 < <00000008>main - Received Completed from <00000009> 00:29:53.190 X <00000008>main - Destroyed 00:29:53.190 < <00000007>start_vector - Received Completed from <00000008> 00:29:53.190 X <00000007>start_vector - Destroyed This implements a strong operational model within a small quantity of maintainable code. Note also that the ``NetworkServer`` object supports multi-threading; there can be many instances of ``self.send(NetworkRequest(..), api)`` at many different locations in an application. Using :meth:`~.point.Point.reply` ensures that the response message is always routed to the proper party. Converting Functions To Processes ================================= Multi-processing is also part of the ansar toolbox. This section takes the ``device_poll()`` function object and pushes it into an entirely separate platform process, while retaining its original functionality. The new process can happily co-exist beside the ``DbQuery`` and ``NetworkServer`` objects, delivering a dataset to the ``main()`` function as before. It also responds properly to a control-c. An Object Inside a Process -------------------------- The first task is to move the function into a separate module with the proper framework around it; .. literalinclude:: class-that-stores/device-poll.py :language: python Interfacing With A Process -------------------------- Input and output messages ``DeviceControl`` and ``DevicePoll`` have been defined to carry creation and completion information across the inter-process boundary. Passing a ``factory_input`` value causes the framework to parse stdin and present the results as the 3rd parameter to the ``device_poll()`` instance. Ansar also manages persistent application settings which are passed as the 2nd parameter. In this default case there are none and the parameter can be ignored. The create-complete messages look like this; .. literalinclude:: class-that-stores/device_if.py :language: python Passing Input From The Command Line ----------------------------------- The new process requires an encoding, either piped over ``stdin`` or using the ``--input-file=name`` flag; .. literalinclude:: class-that-stores/device-poll-debug-brad An encoding of a ``DeviceControl`` object is piped to the ``device-poll.py`` process and after the programmed delay the process terminates. An encoding of a ``DevicePoll`` object is placed on ``stdout``. The ``device-control`` file looks like this, i.e. the standard layout of a JSON encoding produced by the `ansar.encode` library; .. literalinclude:: device-control To generate a sample input for any standard application just use the ``--dump-input`` flag. This will place an encoding of the input expected by that particular application, on stdout:: $ dist/device-poll --dump-input { "value": { "control": "control" } } Redirecting this to a file allows for convenient editing. Sharing Messages Across Multiple Processes ------------------------------------------ For successful exchange of the device messages, the classes must be known to all processes by the same name. In this scenario it is enough to place the messages in a module (``device_if.py`` - an abbreviation of device interface) that is then imported by interested parties. All main modules and the device message module are in the same folder. Sharing message types across multiple processes is as simple as sharing any class definitions. Messages types can be placed in namespace packages that are then imported as normal. The ansar :class:`~.lifecycle.Stop` message is distinct to another message named ``Stop`` because internally the ansar message is known as ``ansar.create.lifecycle.Stop``. Whatever the local technique for sharing classes around a codebase may be, it should also work for message types. In those difficult scenarios there is always; .. code-block:: python import sys sys.path.insert(0, os.path.abspath('*message-folder*')) Bringing It All Together ======================== The essential async object is created, accepts parameters, receives messages and terminates with a completion message. It is also required to respond to a :class:`~.lifecycle.Stop` message at any time. All current activities should be interrupted (i.e. the :class:`~.lifecycle.Stop` should be propagated to any child objects) before terminating itself. An instance of the :class:`~.processing.Process` machine object satisfies these operational requirements. Under the hood it starts a named executable file, parameters passed to the object are forwarded to the new process and output from the process is included in the :class:`~.lifecycle.Completed` message at termination - the object operates as an internal proxy for an external process. Any :class:`~.lifecycle.Stop` message received by the :class:`~.processing.Process` object results in the sending of a signal (i.e. SIGINT) to the associated process id. As described in earlier sections, this signal will be mapped to an :class:`~.lifecycle.Stop` message by the receiving process, propagating the interrupt across the parent-child boundary. The final simulation combines all 3 patterns of async programming into a single application. Here are the ``db_query()`` function object with its dedicated platform thread, the ``NetworkServer`` state machine running in the default transition thread, and the ``device_poll.py`` process, all running side-by-side and delivering the same 3 data items in a concurrent fashion to ``main()``; .. literalinclude:: class-that-stores/query-request-poll-3-way.py :language: python The combination of a try-finally block and the ``stop()`` function has cleaned up the termination a little, and the loop to collect the 3 data items has changed in support of the change to network requests. Logs relating to this application now include the logs of a child process - there are now logs from 2 different instances of the ``start_vector`` object; .. literalinclude:: class-that-stores/query-request-poll-3-way-debug-brad Where Is The Device Server ========================== It would be a natural evolution to move ``device-poll.py`` (i.e. the process) to an arrangement similar to ``NetworkServer``. It would become a state machine called ``DeviceServer`` and clients would send control messages to it and receive poll messages in response (i.e. ``DeviceControl`` and ``DevicePoll``). This arrangement would remove the need to start a fresh process for each poll. The obvious missing link is the network messaging needed to transport the control-poll messages between the parent and child processes (i.e. the proposed ``DeviceServer`` and ``main()``). Async network messaging is the domain of the `ansar.connect` library, whereas this document is focused on asynchronous programming with ansar. The `ansar.connect`` library is currently being rewritten for Python 3.x and will be released as soon as it becomes available. Why The Time Adjustment ======================= According to the statistics provided for the simulation the average execution times were 0.75s, 1.75s and 0.5s for the query, request and the poll, respectively. If the 3 functions were started at the same time then the completion order can be expected to be poll, query and then request. A sequence of 3 :class:`~.Buffering.select()` calls can then assume the :class:`~.lifecycle.Completed` messages will arrive in that same order, and process the ``m.value`` accordingly. This is a naive approach to a standard asynchronous issue. The ``adjust()`` function simulates the variability in actual execution times. Consider if the poll were to execute particularly slowly and the query were to execute quickly, the order may be compromised. Processing of the 3 ``m.values`` would fail. There are 2 general strategies to resolve this issue. The :class:`~.lifecycle.Completed` messages can be processed according to the type of their contents or according to who sent them. The former requires that all the objects involved return values of distinct type and the latter requires a “return address" at the moment each :class:`~.lifecycle.Completed` message is received. If each object returns a distinct type the code looks like this; .. code-block:: python while len(a) > 0: m = self.select(ar.Completed, ar.Stop) if isinstance(m, ar.Stop): return ar.Aborted() a.remove(self.return_address) value = m.value if isinstance(value, DbSelection): selection = value elif isinstance(m, NetworkResponse): response = value elif isinstance(m, DevicePoll): poll = value Adoption of the alternate technique looks like this; .. code-block:: python q = self.create(db_query, 'query') r = self.create(network_request, 'request') p = self.create(device_poll, 'poll') a = [q, r, p] def stop(): for t in a: self.send(ar.Stop(), t) self.select(ar.Completed) try: while len(a) > 0: m = self.select(ar.Completed, ar.Stop) if isinstance(m, ar.Stop): return ar.Abort() a.remove(self.return_address) if self.return_address == q: selection = m.value elif self.return_address == r: response = m.value elif self.return_address == p: poll = m.value finally: stop() action = what_next(selection, response, poll) Both versions are impervious to variations in execution times. The former technique is the style of message processing seen in state machines. The latter technique has dedicated supporting methods in ansar. A context is used to automate interruption handling; .. code-block:: python # Create objects q = self.create(db_query, 'query') r = self.create(network_request, 'request') p = self.create(device_poll, 'poll') # Assign identity to each running object. self.assign(q, db_query) self.assign(r, network_request) self.assign(p, device_poll) with ar.AutoStop(self): # At least one incomplete object. while self.working(): m = self.select(ar.Completed, ar.Stop) if isinstance(m, ar.Stop): return ar.Abort() # Completed. Remove from object list and recover identity. d = self.debrief() if d is db_query: selection = m.value elif d is network_request: response = m.value elif d is device_poll: poll = m.value action = what_next(selection, response, poll) The methods ``assign()``, ``working()`` and ``debrief()`` reduce the overhead of working with mixed collections of child objects, with the constant issue that an interruption can arrive at any time. Code clarity is improved. Note that any object can be passed to ``assign()`` as the identity. A unique ``str`` value would be the most typical. In those scenarios it is perfectly reasonable to use the ``=`` operator for idenity checking. Where there are multiple instances of complex objects, such as ``Job()`` objects, the ``is`` operator can be more appropriate. More About Message Processing ============================= Consider the following code; .. code-block:: python q = self.create(db_query, 'query') m = self.select(ar.Completed, ar.Stop) Or this; .. code-block:: python q = self.send(DbQuery('query'), db) m = self.select(DbSelection, ar.Stop) The pattern of :meth:`~.Point.create()` or :meth:`~.Point.send()` and a matching :class:`~.Buffering.select()` can repeat throughout an async application. The :meth:`~.pending.Buffering.ask()` method bundles this pattern into a single call; .. code-block:: python m = self.ask(DbQuery('query'), (DbSelection, ar.Stop), db) This is both convenient and a curiosity; :meth:`~.pending.Buffering.ask()` would appear to subvert the non-blocking nature of async software. For this reason, methods such as :meth:`~.pending.Buffering.ask()` and :meth:`~.pending.Buffering.select()` are only available inside function objects like ``db_query()``, which run inside their own dedicated threads. Machines and their transition functions have no access to these methods. This arrangement ensures that whatever happens inside a function object such as ``db_query`` does not affect the operational status of other application objects. Note that it is still advisable to declare timers on all message input calls - both :meth:`~.pending.Buffering.ask()` and :meth:`~.pending.Buffering.select()` accept a ``seconds`` parameter. The presence of that parameter enables a timer and automatically adds the ``SelectTimer`` class to the possible result types. To assist with the more complex corners of asynchronicity, ansar implements a sophisticated message input sub-system, with methods such as :meth:`~.pending.Buffering.ask()` at the very top. It includes capabilities such as “saving” where received messages can be “pushed back” into the input stream for a form of deferred processing. .. code-block:: python m = self.ask(DbQuery('query'), (DbSelection, ar.Stop), db, saving=(NetworkRequest, DevicePoll)) If encountered, messages listed in the saving tuple are deferred. An equivalent mechanism exists in the dispatch information associated with a state machine; .. code-block:: python READY: ( (DbQuery, ar.Stop), (NetworkRequest, DevicePoll) ), The first tuple lists the messages to be accepted in the ``READY`` state while the second tuple lists the messages to be deferred to a more convenient moment. The machine eventually expects to process the full complement. The “save” abstraction comes directly from SDL - the Specification and Description Language. Full description of the input system is outside the scope of this document. Suffice to say that ansar message processing is based on a formal model with substantial standards, organizations, toolsets and communities associated with it.