Asynchronous Programming with Ansar
At its core, asynchronous programming (from here on referred to as just async) with ansar is based on a simple model; there are objects and those objects send messages to each other. Code executes in response to those messages and only in response to messages. To kick things off all objects are presented with a start message.
In many ways async programming is about harnessing the potential of multithreading and multiprocessing. It is a unifying model for software operation that allows these underlying technologies to be applied to challenges such as concurrency. It is also about giving developers a model that copes with an innately async world, i.e. users must always be able to cancel the latest request, even where that happens to be a complex concurrent operation.
Delivering a complete and practical framework for writing software based on the async model is a longer story. By presenting a series of issues and discussing the appropriate ansar solution, this document illustrates how useful async can be and how ansar delivers that potential to developers.
Note
Source files appearing in this section can be downloaded from here.
The repo Makefile contains the setup needed for this guide. Related background information can be
found here.
Hello World
To help dispel concerns that adopting an async framework results in completely alien code, here is the classical “hello world” application written as a pure async application;
import ansar.create as ar
def main(self):
print('hello world')
return 0
ar.bind(main)
if __name__ == '__main__':
ar.create_object(main)
Running this code produces the following:
$ python3 hello-world.py
hello world
$
A single import operation gains access to the complete set of ansar features. There is the
ubiquitous arrangement around a main() function and a conditional statement checking the value
of __name__. The async-specific elements are the calls to ar.bind() (i.e. a synonym
for bind_any()) and create_object().
Note
For a complete explanation of how to create standard ansar applications - and why - look here.
Under the hood the main() function actually runs in its own dedicated platform thread. This
construction happens inside create_object(). Among other things it allows the
ansar framework to manage control-c and the related platform signals. The self argument
passed to the main() function is the gateway to all the async capabilities that the application
might choose to use.
To see evidence of the work being carried out by ansar, try this;
$ python3 hello-world.py --debug-level=DEBUG
05:03:22.270 + <00000008>start_vector - Created by <00000001>
05:03:22.270 ~ <00000008>start_vector - Executable "/home/brad/gh/ansar-create/doc/source/class-that-stores/hello-world.py" as process (1468010)
05:03:22.270 ~ <00000008>start_vector - Working folder "/home/brad/gh/ansar-create/doc/source/class-that-stores"
05:03:22.270 ~ <00000008>start_vector - Running object "__main__.main"
05:03:22.270 ~ <00000008>start_vector - Class threads (1) "retries" (1)
05:03:22.270 + <00000009>main - Created by <00000008>
05:03:22.270 X <00000009>main - Destroyed
05:03:22.270 < <00000008>start_vector - Received Completed from <00000009>
05:03:22.270 X <00000008>start_vector - Destroyed
Note
All ansar logging is placed on stderr. To recover some real-estate, details such as process
IDs and the full ISO time format have been omitted. For full information on logging within
ansar look here.
The create_object() function is responsible for the processing of flags such
as debug-level. Without some indication from the command-line, all async logging is discarded.
The function also accepts application-defined flags, environment variables and piped input; topics
covered in later sections.
Application output from print() to stdout interrupts the logging at a somewhat random point. Logging
is built-in and is thread-safe. A consequence of the latter is that mingling of logging on stderr and
application output on stdout can give the appearance that events are happening out of order.
Applications do not have to be “all async” or “all procedural”. Ansar provides several ways to integrate async capability into an application. An existing, more traditional application might insert fragments such as this;
import ansar.create as ar
def hello(self):
print('hello world')
return 0
ar.bind(hello)
def some_function():
with ar.OpenChannel() as channel:
a = channel.create(hello)
m = channel.select(ar.Complete)
return m.value
A standard synchronous call some_function() uses a context to create a channel. That object becomes
the gateway to async capabilities, in the same manner as the self parameter passed to main() in
the earlier hello-world.py application. Applications can also add calls to start_up()
and tear_down() at appropriate points in the codebase. Full async capabilities are subsequently
available throughout the application.
A Query, A Request And A Poll
An area of interest has been identified in an existing application. There are three data items that need to be gathered and then processed into a single result. A query on a database, a request to a network API and the polling of a device all need to complete before an action can be taken. The current implementation is clear but is also sequential.
selection = db_query(query)
response = network_request(request)
status = device_poll(poll)
action = what_next(selection, response, status)
Concurrency should improve throughput.
The first step in an async implementation is a simulation. The three data functions are implemented as empty functions that simply consume time. The time periods used are taken from statistics gathered from logs and then randomized slightly. On average the query is known to take about 0.75s, the request about 1.75s and the poll about 0.5s. In total this is around 3.0s which threatens the quality of service requirement for system responsiveness. On paper at least, concurrency could bring this figure down to 1.75s, i.e. the longest of the three periods.
A DB Query
Simulation of db_query() looks like this;
import random
import ansar.create as ar
random.seed()
def adjust(period):
s = int(period / 0.1) + 1
b = int(s * 0.75) # Faster
e = int(s * 1.5) # Slower
a = random.randrange(b, e) * 0.1
if a < 0.25:
return 0.25
return a
def db_query(self, query):
period = adjust(0.75)
self.start(ar.T1, period)
self.select(ar.T1)
return 'selection'
ar.bind(db_query)
def main(self):
q = self.create(db_query, 'query')
m = self.select(ar.Completed)
selection = m.value
return 0
ar.bind(main)
if __name__ == '__main__':
ar.create_object(main)
The main() function is now using the self parameter to access async features. It
uses the create() method to start an instance of the db_query()
function. This builds a chain of object creations - each in their own dedicated thread - that
starts in create_object() and looks similar to a chain of function calls.
Arguments passed to the create() method become arguments passed to the
specified functions, e.g. the 'query' value is available to the db_query() function in
the query variable.
When using async objects in this way there is no blocking call-return sequence. Instead
there needs to be a separate “wait” for a signal or notification that an object has terminated.
This is received by the select() method in the form of
an Completed message. The value returned by the terminated Python function
is contained in that message, i.e. m.value. In this simulation this will always be
a str with the value ‘selection’. The create-wait style of operation obviously does not
replace the call-return style. Users of ansar get to choose whichever is appropriate.
The implementation of db_query() involves;
the calculation of a wait period using the
adjust()function,starting a timer with the async
start()method,waiting for the timer signal using the
select()method andreturning the ‘selection’ value.
Running the simulation looks like this;
$ python3 db-query.py --debug-level=DEBUG
05:03:17.879 + <00000008>start_vector - Created by <00000001>
05:03:17.879 ~ <00000008>start_vector - Executable "/home/brad/gh/ansar-create/doc/source/class-that-stores/db-query.py" as process (1467925)
05:03:17.879 ~ <00000008>start_vector - Working folder "/home/brad/gh/ansar-create/doc/source/class-that-stores"
05:03:17.879 ~ <00000008>start_vector - Running object "__main__.main"
05:03:17.879 ~ <00000008>start_vector - Class threads (1) "retries" (1)
05:03:17.879 + <00000009>main - Created by <00000008>
05:03:17.880 + <0000000a>db_query - Created by <00000009>
05:03:17.880 > <0000000a>db_query - Sent StartTimer to <00000003>
05:03:19.131 < <0000000a>db_query - Received T1 from <00000003>
05:03:19.131 X <0000000a>db_query - Destroyed
05:03:19.131 < <00000009>main - Received Completed from <0000000a>
05:03:19.131 X <00000009>main - Destroyed
05:03:19.132 < <00000008>start_vector - Received Completed from <00000009>
05:03:19.132 X <00000008>start_vector - Destroyed
The chain of object creations can be seen in the logs. The framework creates the instance
of main() and main() creates the instance of db_query(). The chain unwinds starting
with db_query() catching the timer ar.T1 message, main() catching the termination
of db_query() and the framework catching completion of main().
Interruption Of A Query
For the purposes of clarity, one important requirement was deferred for the first iteration.
All async objects are required to observe the interrupt protocol, which begins with the receipt
of a Stop message. This special message can be received at any time. A
proper implementation of the simulation looks like this;
import random
import ansar.create as ar
random.seed()
def adjust(period):
s = int(period / 0.1) + 1
b = int(s * 0.75) # Quicker.
e = int(s * 1.5) # Slower.
a = random.randrange(b, e) * 0.1
if a < 0.25:
return 0.25
return a
def db_query(self, query):
period = adjust(0.75)
self.start(ar.T1, period)
m = self.select(ar.T1, ar.Stop)
if isinstance(m, ar.Stop):
return ar.Aborted()
return 'selection'
ar.bind(db_query)
def main(self):
q = self.create(db_query, 'query')
m = self.select(ar.Completed, ar.Stop)
if isinstance(m, ar.Stop):
self.send(m, q)
self.select(ar.Completed)
return ar.Aborted()
selection = m.value
return 0
ar.bind(main)
if __name__ == '__main__':
ar.create_object(main)
The create_object() function maps the platform signal SIGINT to
a Stop message, forwarding it to the instance of main(). If you are
quick enough on the keyboard, hitting control-c within the 0.75 second window initiates an
interrupt. Code changes include the addition of Stop on the calls
to select() and related conditional code blocks. In the case
of the main() object this means forwarding any Stop message on to
the db_query() object and waiting for the subsequent unwinding.
The logs for an interrupt look like:
$ python3 db-query-stop.py --debug-level=DEBUG
01:48:05.876 + <00000007>start_vector - Created by <00000001>
01:48:05.876 ~ <00000007>start_vector - Executable "/home/brad/gh/ansar-create/doc/source/class-that-stores/db-query-stop.py" as process (123674)
01:48:05.876 ~ <00000007>start_vector - Working folder "/home/brad/gh/ansar-create/doc/source/class-that-stores"
01:48:05.876 ~ <00000007>start_vector - Running object "__main__.main"
01:48:05.876 ~ <00000007>start_vector - Class threads (1) "retries" (1)
01:48:05.876 + <00000008>main - Created by <00000007>
01:48:05.876 + <00000009>db_query - Created by <00000008>
01:48:05.876 > <00000009>db_query - Sent StartTimer to <00000003>
^C01:48:06.577 < <00000007>start_vector - Received Stop from <00000001>
01:48:06.577 > <00000007>start_vector - Sent Stop to <00000008>
01:48:06.577 < <00000008>main - Received Stop from <00000007>
01:48:06.577 > <00000008>main - Sent Stop to <00000009>
01:48:06.578 < <00000009>db_query - Received Stop from <00000008>
01:48:06.578 X <00000009>db_query - Destroyed
01:48:06.578 < <00000008>main - Received Completed from <00000009>
01:48:06.578 X <00000008>main - Destroyed
01:48:06.579 < <00000007>start_vector - Received Completed from <00000008>
01:48:06.579 X <00000007>start_vector - Destroyed
{
"value": [
"ansar.create.lifecycle.Aborted",
{},
[]
]
}
The response to a control-c is that the main() object returns an Aborted
message to the framework. A JSON encoding of that message is placed on stdout as the result
of the db-query-interrupt.py application. Any message returned to the framework is handled in the
same way. An int is a special case and is assumed to be an “exit code”. The framework silently
exits with the presented value. Note the ^C printed by the terminal as a representation of
receiving the control-c.
Query, Request and Poll As Objects
The next step is to introduce the other two data sources to our simulation. These are simply
duplications of the db_query() object except with different time delay values;
import random
import ansar.create as ar
random.seed()
def adjust(period):
s = int(period / 0.1) + 1
b = int(s * 0.75) # Quicker.
e = int(s * 1.5) # Slower.
a = random.randrange(b, e) * 0.1
if a < 0.25:
return 0.25
return a
def db_query(self, query):
period = adjust(0.75)
self.start(ar.T1, period)
m = self.select(ar.T1, ar.Stop)
if isinstance(m, ar.Stop):
return ar.Aborted()
return 'selection'
ar.bind(db_query)
def network_request(self, request):
period = adjust(1.75)
self.start(ar.T1, period)
m = self.select(ar.T1, ar.Stop)
if isinstance(m, ar.Stop):
return ar.Aborted()
return 'response'
ar.bind(network_request)
def device_poll(self, poll):
period = adjust(0.5)
self.start(ar.T1, period)
m = self.select(ar.T1, ar.Stop)
if isinstance(m, ar.Stop):
return ar.Aborted()
return 'status'
ar.bind(device_poll)
def main(self):
q = self.create(db_query, 'query')
m = self.select(ar.Completed, ar.Stop)
if isinstance(m, ar.Stop):
self.send(m, q)
self.select(ar.Completed)
return ar.Aborted()
selection = m.value
q = self.create(network_request, 'request')
m = self.select(ar.Completed, ar.Stop)
if isinstance(m, ar.Stop):
self.send(m, q)
self.select(ar.Completed)
return ar.Aborted()
response = m.value
q = self.create(device_poll, 'poll')
m = self.select(ar.Completed, ar.Stop)
if isinstance(m, ar.Stop):
self.send(m, q)
self.select(ar.Completed)
return ar.Aborted()
status = m.value
return 0
ar.bind(main)
if __name__ == '__main__':
ar.create_object(main)
This establishes a reference for comparison with the next implementation. The logs for this sequential version appear below;
$ python3 query-request-poll.py --debug-level=DEBUG
05:03:30.097 + <00000008>start_vector - Created by <00000001>
05:03:30.097 ~ <00000008>start_vector - Executable "/home/brad/gh/ansar-create/doc/source/class-that-stores/query-request-poll.py" as process (1468171)
05:03:30.097 ~ <00000008>start_vector - Working folder "/home/brad/gh/ansar-create/doc/source/class-that-stores"
05:03:30.097 ~ <00000008>start_vector - Running object "__main__.main"
05:03:30.097 ~ <00000008>start_vector - Class threads (1) "retries" (1)
05:03:30.097 + <00000009>main - Created by <00000008>
05:03:30.097 + <0000000a>db_query - Created by <00000009>
05:03:30.097 > <0000000a>db_query - Sent StartTimer to <00000003>
05:03:30.848 < <0000000a>db_query - Received T1 from <00000003>
05:03:30.848 X <0000000a>db_query - Destroyed
05:03:30.849 < <00000009>main - Received Completed from <0000000a>
05:03:30.849 + <0000000b>network_request - Created by <00000009>
05:03:30.849 > <0000000b>network_request - Sent StartTimer to <00000003>
05:03:33.603 < <0000000b>network_request - Received T1 from <00000003>
05:03:33.603 X <0000000b>network_request - Destroyed
05:03:33.603 < <00000009>main - Received Completed from <0000000b>
05:03:33.603 + <0000000c>device_poll - Created by <00000009>
05:03:33.604 > <0000000c>device_poll - Sent StartTimer to <00000003>
05:03:34.354 < <0000000c>device_poll - Received T1 from <00000003>
05:03:34.354 X <0000000c>device_poll - Destroyed
05:03:34.354 < <00000009>main - Received Completed from <0000000c>
05:03:34.354 X <00000009>main - Destroyed
05:03:34.355 < <00000008>start_vector - Received Completed from <00000009>
05:03:34.355 X <00000008>start_vector - Destroyed
The timestamps show that from the moment db_query() is created through to the main() function
receiving the completion of device_poll() consumes over 3.5 seconds (this figure varies as the logs
are auto-generated by the documentation system). This is a little higher than the sum of the averages
but there is operational overhead and the adjustments added to the timeouts.
A Concurrent Approach
Finally we can implement concurrency. In this implementation the three objects are all created before
the main() object shifts to waiting for completion messages. The effect is that the three threads
associated with the instances of db_query(), network_request() and device_poll() are all
running at the same time.
import random
import ansar.create as ar
random.seed()
def adjust(period):
s = int(period / 0.1) + 1
b = int(s * 0.75) # Quicker.
e = int(s * 1.5) # Slower.
a = random.randrange(b, e) * 0.1
if a < 0.25:
return 0.25
return a
def db_query(self, query):
period = adjust(0.75)
self.start(ar.T1, period)
m = self.select(ar.T1, ar.Stop)
if isinstance(m, ar.Stop):
return ar.Aborted()
return 'selection'
ar.bind(db_query)
def network_request(self, request):
period = adjust(1.75)
self.start(ar.T1, period)
m = self.select(ar.T1, ar.Stop)
if isinstance(m, ar.Stop):
return ar.Aborted()
return 'response'
ar.bind(network_request)
def device_poll(self, poll):
period = adjust(0.5)
self.start(ar.T1, period)
m = self.select(ar.T1, ar.Stop)
if isinstance(m, ar.Stop):
return ar.Aborted()
return 'status'
ar.bind(device_poll)
def main(self):
q = self.create(db_query, 'query')
r = self.create(network_request, 'request')
p = self.create(device_poll, 'poll')
a = [q, r, p]
def abort():
for t in a:
self.send(ar.Stop(), t)
self.select(ar.Completed)
return ar.Aborted()
for i in range(len(a)):
m = self.select(ar.Completed, ar.Stop)
if isinstance(m, ar.Stop):
return abort()
a.remove(self.return_address)
return 0
ar.bind(main)
if __name__ == '__main__':
ar.create_object(main)
The most difficult part is probably the support of interruptions. An interruption can arrive after 0 or
more of the 3 objects have already terminated. The main() object needs to track who is still active
so that it can propagate the Stop message to the appropriate child objects. The
solution shown here is rather “brute-force” in that it achieves the goal without actually processing the
materials returned by the 3 objects, and without using ansar features designed for just this purpose. It’s
good enough for this simulation. An execution looks like this;
$ python3 query-request-poll-concurrent.py --debug-level=DEBUG
05:03:25.961 + <00000008>start_vector - Created by <00000001>
05:03:25.961 ~ <00000008>start_vector - Executable "/home/brad/gh/ansar-create/doc/source/class-that-stores/query-request-poll-concurrent.py" as process (1468123)
05:03:25.961 ~ <00000008>start_vector - Working folder "/home/brad/gh/ansar-create/doc/source/class-that-stores"
05:03:25.961 ~ <00000008>start_vector - Running object "__main__.main"
05:03:25.961 ~ <00000008>start_vector - Class threads (1) "retries" (1)
05:03:25.961 + <00000009>main - Created by <00000008>
05:03:25.961 + <0000000a>db_query - Created by <00000009>
05:03:25.961 > <0000000a>db_query - Sent StartTimer to <00000003>
05:03:25.961 + <0000000b>network_request - Created by <00000009>
05:03:25.961 > <0000000b>network_request - Sent StartTimer to <00000003>
05:03:25.962 + <0000000c>device_poll - Created by <00000009>
05:03:25.962 > <0000000c>device_poll - Sent StartTimer to <00000003>
05:03:26.712 < <0000000c>device_poll - Received T1 from <00000003>
05:03:26.712 X <0000000c>device_poll - Destroyed
05:03:26.712 < <00000009>main - Received Completed from <0000000c>
05:03:26.962 < <0000000a>db_query - Received T1 from <00000003>
05:03:26.963 X <0000000a>db_query - Destroyed
05:03:26.963 < <00000009>main - Received Completed from <0000000a>
05:03:27.463 < <0000000b>network_request - Received T1 from <00000003>
05:03:27.463 X <0000000b>network_request - Destroyed
05:03:27.464 < <00000009>main - Received Completed from <0000000b>
05:03:27.464 X <00000009>main - Destroyed
05:03:27.464 < <00000008>start_vector - Received Completed from <00000009>
05:03:27.464 X <00000008>start_vector - Destroyed
Repeating the previous calculation we get around 2.0 seconds, substantially less than the sequential version and verifying the potential that concurrency has to improve throughput. Again, that comparison is muddied by the presence of random adjustments. The reason for including that randomization will emerge in a later section.
Conclusion Of The Simulation
The potential benefits of concurrency should not be a surprise to anyone. As long as the underlying activities of the 3 objects do not involve shared resources, moving to a concurrent approach should realize improvements of the order demonstrated by the simulation, whatever the development toolset might be.
The real purpose of the simulation was to illustrate the use of async programming with ansar, to show how easy it might be to adopt an async approach to different programming challenges.
A cut-back coding style was adopted that involved the least async detail possible. The results were
hopefully clearer but they also barely scratched the surface of what ansar offers. As an example, there
is a real case for implementing objects such as db_query(), network_request() and device_poll()
as state machines, or even as messages that are sent to state machines. Ansar is also capable of treating
processes as async objects - it is technically reasonable to implement device_poll() as a standalone
application while also fully preserving interrupt capability. Refer to following sections.
Converting Functions To Machines
Knowing that ansar would be making substantial use of threads meant that strong management would be
needed. Given the nature of multi-threading, it was a good match to use the async machinery itself.
Threads became “first-order” types through the support for function objects like db_query(),
alongside 2 other object types known as machines.
Machines are implemented as the classes Stateless and StateMachine.
To the parent of an object, these variations on the implementation of an object are completely
transparent. Object implementations can switch type with no impact on the objects around them,
including the parent. The function db_query() can become the class DbQuery(ar.Point,ar.Stateless)
without affecting the main() parent at all. That type switch looks like this;
import random
import ansar.create as ar
random.seed()
def adjust(period):
s = int(period / 0.1) + 1
b = int(s * 0.75) # Quicker.
e = int(s * 1.5) # Slower.
a = random.randrange(b, e) * 0.1
if a < 0.25:
return 0.25
return a
class DbQuery(ar.Point, ar.Stateless):
def __init__(self, query):
ar.Point.__init__(self)
ar.Stateless.__init__(self)
self.query = query
def DbQuery_Start(self, message):
period = adjust(1.75)
self.start(ar.T1, period)
def DbQuery_T1(self, message):
self.complete('selection')
def DbQuery_Stop(self, message):
self.complete(ar.Aborted())
DB_QUERY_DISPATCH = (ar.Start, ar.T1, ar.Stop)
ar.bind(DbQuery, DB_QUERY_DISPATCH)
def main(self):
q = self.create(DbQuery, 'query')
m = self.select(ar.Completed, ar.Stop)
if isinstance(m, ar.Stop):
self.send(m, q)
self.select(ar.Completed)
return ar.Aborted()
selection = m.value
return 0
ar.bind(main)
if __name__ == '__main__':
ar.create_object(main)
A class definition along with a small set of related functions has replaced the db_query() function
definition. A name change from db_query to DbQuery, was all that was required in the main()
function (a note for the more pedantic - the name could have stayed the same for zero change to main).
$ python3 db-query-stateless.py --debug-level=DEBUG
05:03:11.372 + <00000008>start_vector - Created by <00000001>
05:03:11.372 ~ <00000008>start_vector - Executable "/home/brad/gh/ansar-create/doc/source/class-that-stores/db-query-stateless.py" as process (1467865)
05:03:11.372 ~ <00000008>start_vector - Working folder "/home/brad/gh/ansar-create/doc/source/class-that-stores"
05:03:11.372 ~ <00000008>start_vector - Running object "__main__.main"
05:03:11.372 ~ <00000008>start_vector - Class threads (1) "retries" (1)
05:03:11.372 + <00000009>main - Created by <00000008>
05:03:11.372 + <0000000a>DbQuery - Created by <00000009>
05:03:11.372 < <0000000a>DbQuery - Received Start from <00000009>
05:03:11.372 > <0000000a>DbQuery - Sent StartTimer to <00000003>
05:03:13.375 < <0000000a>DbQuery - Received T1 from <00000003>
05:03:13.375 X <0000000a>DbQuery - Destroyed
05:03:13.375 < <00000009>main - Received Completed from <0000000a>
05:03:13.375 X <00000009>main - Destroyed
05:03:13.376 < <00000008>start_vector - Received Completed from <00000009>
05:03:13.376 X <00000008>start_vector - Destroyed
An async object such as DbQuery is the combination of a standard data object and a required set
of transition functions. The transitions are called in response to the receipt of messages, or stated
another way; messages are dispatched to transitions based on the type of the received message. The
standard data object is passed on every transition call as the first parameter, i.e. self, in the
same manner as the self parameter is passed to function objects such as main().
The runtime tables required to make all this work are constructed by the call to ar.bind()
(bind_stateless()) using supplied dispatch information, e.g. DB_QUERY_DISPATCH.
Names for each implied transition - such as DbQuery_Start() - are searched for across all the
loaded modules and then saved in internal dispatch machinery.
The combination of a class definition and a set of global functions may seem odd. The reasons for this
particular approach are slightly technical but are motivated by runtime efficiency. Other popular
approaches to dispatching such as defining the transitions as method functions within the DbQuery
class, incur a significant runtime overhead.
On the flip-side, this arrangement forces a breakdown of the problem at hand. Each individual transition function is small and the perceived imposition becomes a benefit as demands on the object become more complex.
Managing The Allocation Of Threads
Operationally, db_query() and DbQuery are substantially different. Function objects have a
fresh, new platform thread assigned to them at creation time, whereas machines do not. Instead, machines
are assigned to a thread at registration time, i.e. inside ar.bind(). The assigned thread is the
only thread to ever call the transition functions of a particular machine - a necessary multithreading
guarantee.
There are 3 mechanisms for managing the assignment of threads to machines;
do nothing and accept a default assignment or,
use a special base class in the machine inheritance list to request a dedicated thread for each instance of the machine or,
specify a named thread at registration time to be shared by one or more classes of machine.
Accepting the default assignment is fine for many applications. A special background thread is created by
the ansar runtime (i.e. inside the call to create_object()) and all the work associated with
all “default assignment” machines is performed there. As applications become larger and more complex, a
significant quantity of work can fall on the shoulders of that thread. Eventually this can produce a
performance bottleneck though this is an issue mitigated by specifics of the Python interpreter (i.e. the
GIL).
More importantly, in extreme cases it may create a context conducive to deadlocks. This is a rare but real issue for any multithreaded application. There is some protection in using one of the explicit thread assignment techniques to increase the number of threads working inside the dispatch machinery. As a side-benefit this is also likely to help with application throughput and responsiveness, particularly where I/O is involved.
To arrange for a dedicated thread for each instance of a machine, use the ar.Threaded base class rather
than the Point class (the former inherits the latter);
class DbQuery(ar.Threaded, ar.Stateless):
def __init__(self, query):
ar.Threaded.__init__(self)
ar.Stateless.__init__(self)
self.query = query
This change guarantees the assignment of a distinct platform thread for every instance of a DbQuery machine.
If a thread-per-instance is overly heavy-handed this change to the registration of the non-threaded DbQuery is the other option;
ar.bind(DbQuery, DB_QUERY_DISPATCH, thread='db')
Execution of this version looks like this;
$ python3 db-query-machine-threaded.py --debug-level=DEBUG
04:54:37.867 + <00000009>start_vector - Created by <00000001>
04:54:37.867 ~ <00000009>start_vector - Executable "/home/brad/gh/ansar-create/doc/source/class-that-stores/db-query-machine-threaded.py" as process (1467101)
04:54:37.867 ~ <00000009>start_vector - Working folder "/home/brad/gh/ansar-create/doc/source/class-that-stores"
04:54:37.867 ~ <00000009>start_vector - Running object "__main__.main"
04:54:37.867 ~ <00000009>start_vector - Class threads (2) "retries" (1),"db" (1)
04:54:37.867 + <0000000a>main - Created by <00000009>
04:54:37.867 + <0000000b>DbQuery - Created by <0000000a>
04:54:37.867 < <0000000b>DbQuery - Received Start from <0000000a>
04:54:37.867 > <0000000b>DbQuery - Sent StartTimer to <00000003>
04:54:39.870 < <0000000b>DbQuery - Received T1 from <00000003>
04:54:39.870 X <0000000b>DbQuery - Destroyed
04:54:39.870 < <0000000a>main - Received Completed from <0000000b>
04:54:39.870 X <0000000a>main - Destroyed
04:54:39.870 < <00000009>start_vector - Received Completed from <0000000a>
04:54:39.870 X <00000009>start_vector - Destroyed
The start_vector (part of the create_object() machinery) is now listing “db” as one of the named threads. The
log entry means that there are now 2 named threads created during startup (the full complement of framework
threads is around 6), called “retries” and “db” respectively and both of these threads have a single class
assigned to them. All instances of a single class (i.e. DbQuery) will execute on the “db” thread and something
similar will happen with the “retries” thread.
Converting Functions To Message Servers
Creating an async object to perform work and produce a result is one pattern of use. Another pattern can be useful in scenarios involving resources such as database servers and network APIs. Resources such as these typically involve some form of connection. To make requests to a network API the client must first establish that connection, and when the client is finished making requests the connection should be closed. Connections are also apt to fail at inconvenient moments, requiring an automated reconnection strategy.
A server object is needed to manage the ongoing status of the connection and to keep the wider application free of the associated concerns. At the same time that object must accept requests from the application and present them over the connection. Making a network request now involves the sending of a message to that object and receiving a response, rather than the full lifecycle of an object. Application code changes from this;
r = self.create(NetworkRequest, 'request')
m = self.select(ar.Completed, ar.Stop)
To this;
r = self.send(NetworkRequest('request'), api)
m = self.select(NetworkResponse, ar.Stop)
Where api is the address of the new server object and both NetworkRequest and NetworkResponse are class
definitions of messages. As well as managing the connection issue, this arrangement is much more efficient. A minimal
simulation of the server looks like this;
import random
import ansar.create as ar
random.seed()
def one_chance_in(possibles):
r = random.randrange(0, possibles)
return r == 0
def connect():
if one_chance_in(20):
return False
return True
def request_response(request):
if one_chance_in(100):
return None
return 'response'
def disconnect():
pass
class NetworkRequest(object):
def __init__(self, request='request'):
self.request = request
class NetworkResponse(object):
def __init__(self, response='response'):
self.response = response
class NetworkDown(object):
pass
ar.bind(NetworkRequest)
ar.bind(NetworkResponse)
ar.bind(NetworkDown)
class INITIAL: pass
class READY: pass
class STANDBY: pass
class NetworkServer(ar.Point, ar.StateMachine):
def __init__(self):
ar.Point.__init__(self)
ar.StateMachine.__init__(self, INITIAL)
def NetworkServer_INITIAL_Start(self, message):
if connect():
return READY
self.start(ar.T1, 5.0)
return STANDBY
def NetworkServer_READY_NetworkRequest(self, message):
response = request_response(message.request)
if response is None:
self.reply(NetworkDown())
self.start(ar.T1, 5.0)
return STANDBY
self.reply(NetworkResponse(response))
return READY
def NetworkServer_READY_Stop(self, message):
disconnect()
self.complete(ar.Aborted())
def NetworkServer_STANDBY_NetworkRequest(self, message):
self.reply(NetworkDown())
return STANDBY
def NetworkServer_STANDBY_T1(self, message):
if connect():
return READY
self.start(ar.T1, 5.0)
return STANDBY
def NetworkServer_STANDBY_Stop(self, message):
self.complete(ar.Aborted())
NETWORK_SERVER_DISPATCH = {
INITIAL: (
(ar.Start,), ()
),
READY: (
(NetworkRequest, ar.Stop), ()
),
STANDBY: (
(NetworkRequest, ar.T1, ar.Stop), ()
),
}
ar.bind(NetworkServer, NETWORK_SERVER_DISPATCH)
def main(self):
api = self.create(NetworkServer)
r = self.send(NetworkRequest('request'), api)
m = self.select(NetworkResponse, NetworkDown, ar.Stop)
if isinstance(m, NetworkResponse):
response = m.response
elif isinstance(m, NetworkDown):
pass
else:
pass
self.send(ar.Stop(), api)
self.select(ar.Completed)
return 0
ar.bind(main)
if __name__ == '__main__':
ar.create_object(main)
The NetworkServer object is a state machine or FSM. Declaration of a state machine in ansar is similar to the
declaration of a stateless machine with the addition of;
declaration of the possible states -
INITIAL,READYandSTANDBY,inheriting from the proper base class -
NetworkServer(.., ar.StateMachine),assignment of the initial state -
ar.StateMachine.__init__(self, INITIAL),declaration of more complex dispatch information -
NETWORK_SERVER_DISPATCH,transition functions that return the next state -
return READY.
The object manages the connection, initiating retries as appropriate. It also functions as an intermediary between
clients and the underlying service, responding with messages appropriate to the current connection state. In this minimal
use of a server object, i.e. the single instance of sending a NetworkRequest, there is no real opportunity for
the server to flex its capabilities but there is sufficient code to illustrate the pattern.
The connect(), request_response() and disconnect() functions are representative placeholders. The first 2
functions include simulations of fallibility, to motivate the machine to check the results and modify its behaviour
accordingly. A successful request looks like this;
$ python3 network-request-server.py --debug-level=DEBUG
05:03:22.908 + <00000008>start_vector - Created by <00000001>
05:03:22.909 ~ <00000008>start_vector - Executable "/home/brad/gh/ansar-create/doc/source/class-that-stores/network-request-server.py" as process (1468043)
05:03:22.909 ~ <00000008>start_vector - Working folder "/home/brad/gh/ansar-create/doc/source/class-that-stores"
05:03:22.909 ~ <00000008>start_vector - Running object "__main__.main"
05:03:22.909 ~ <00000008>start_vector - Class threads (1) "retries" (1)
05:03:22.909 + <00000009>main - Created by <00000008>
05:03:22.909 + <0000000a>NetworkServer[INITIAL] - Created by <00000009>
05:03:22.909 > <00000009>main - Sent NetworkRequest to <0000000a>
05:03:22.909 < <0000000a>NetworkServer[INITIAL] - Received Start from <00000009>
05:03:22.909 < <0000000a>NetworkServer[READY] - Received NetworkRequest from <00000009>
05:03:22.909 > <0000000a>NetworkServer[READY] - Sent NetworkResponse to <00000009>
05:03:22.909 < <00000009>main - Received NetworkResponse from <0000000a>
05:03:22.909 > <00000009>main - Sent Stop to <0000000a>
05:03:22.909 < <0000000a>NetworkServer[READY] - Received Stop from <00000009>
05:03:22.909 X <0000000a>NetworkServer[READY] - Destroyed
05:03:22.909 < <00000009>main - Received Completed from <0000000a>
05:03:22.909 X <00000009>main - Destroyed
05:03:22.909 < <00000008>start_vector - Received Completed from <00000009>
05:03:22.909 X <00000008>start_vector - Destroyed
Where the connect() fails, the logs look like this;
$ python3 network-request-server.py --debug-level=DEBUG
00:29:53.189 + <00000007>start_vector - Created by <00000001>
00:29:53.189 ~ <00000007>start_vector - Executable "/home/brad/gh/ansar-create/doc/source/class-that-stores/network-request-server.py" as process (128847)
00:29:53.189 ~ <00000007>start_vector - Working folder "/home/brad/gh/ansar-create/doc/source/class-that-stores"
00:29:53.189 ~ <00000007>start_vector - Running object "__main__.main"
00:29:53.189 ~ <00000007>start_vector - Class threads (1) "retries" (1)
00:29:53.189 + <00000008>main - Created by <00000007>
00:29:53.189 + <00000009>NetworkServer[INITIAL] - Created by <00000008>
00:29:53.189 > <00000008>main - Sent NetworkRequest to <00000009>
00:29:53.190 < <00000009>NetworkServer[INITIAL] - Received Start from <00000008>
00:29:53.190 > <00000009>NetworkServer[INITIAL] - Sent StartTimer to <00000003>
00:29:53.190 < <00000009>NetworkServer[STANDBY] - Received NetworkRequest from <00000008>
00:29:53.190 > <00000009>NetworkServer[STANDBY] - Sent NetworkDown to <00000008>
00:29:53.190 < <00000008>main - Received NetworkDown from <00000009>
00:29:53.190 > <00000008>main - Sent Stop to <00000009>
00:29:53.190 < <00000009>NetworkServer[STANDBY] - Received Stop from <00000008>
00:29:53.190 X <00000009>NetworkServer[STANDBY] - Destroyed
00:29:53.190 < <00000008>main - Received Completed from <00000009>
00:29:53.190 X <00000008>main - Destroyed
00:29:53.190 < <00000007>start_vector - Received Completed from <00000008>
00:29:53.190 X <00000007>start_vector - Destroyed
This implements a strong operational model within a small quantity of maintainable code. Note also that
the NetworkServer object supports multi-threading; there can be many instances of self.send(NetworkRequest(..), api)
at many different locations in an application. Using reply() ensures that the response message is always
routed to the proper party.
Converting Functions To Processes
Multi-processing is also part of the ansar toolbox. This section takes the device_poll() function object and pushes
it into an entirely separate platform process, while retaining its original functionality. The new process can happily
co-exist beside the DbQuery and NetworkServer objects, delivering a dataset to the main() function as
before. It also responds properly to a control-c.
An Object Inside a Process
The first task is to move the function into a separate module with the proper framework around it;
import random
import ansar.create as ar
from device_if import DeviceControl, DevicePoll
random.seed()
def adjust(period):
s = int(period / 0.1) + 1
b = int(s * 0.75) # Quicker.
e = int(s * 1.5) # Slower.
a = random.randrange(b, e) * 0.1
if a < 0.25:
return 0.25
return a
def device_poll(self, settings, control):
period = adjust(0.5)
self.start(ar.T1, period)
m = self.select(ar.T1, ar.Stop)
if isinstance(m, ar.Stop):
return ar.Aborted()
return DevicePoll('status')
ar.bind(device_poll)
default_input = DeviceControl()
if __name__ == '__main__':
ar.create_object(device_poll, factory_input=default_input)
Interfacing With A Process
Input and output messages DeviceControl and DevicePoll have been defined to carry creation and completion
information across the inter-process boundary. Passing a factory_input value causes the framework
to parse stdin and present the results as the 3rd parameter to the device_poll() instance. Ansar also manages
persistent application settings which are passed as the 2nd parameter. In this default case there are none and the
parameter can be ignored. The create-complete messages look like this;
import ansar.create as ar
class DeviceControl(object):
def __init__(self, control='control'):
self.control = control
class DevicePoll(object):
def __init__(self, poll='poll'):
self.poll = poll
ar.bind(DeviceControl)
ar.bind(DevicePoll)
Passing Input From The Command Line
The new process requires an encoding, either piped over stdin or using the --input-file=name flag;
$ cat device-control | python3 device-poll.py --debug-level=DEBUG
05:03:20.545 + <00000008>start_vector - Created by <00000001>
05:03:20.545 ~ <00000008>start_vector - Executable "/home/brad/gh/ansar-create/doc/source/class-that-stores/device-poll.py" as process (1467957)
05:03:20.545 ~ <00000008>start_vector - Working folder "/home/brad/gh/ansar-create/doc/source/class-that-stores"
05:03:20.545 ~ <00000008>start_vector - Running object "__main__.device_poll"
05:03:20.545 ~ <00000008>start_vector - Class threads (1) "retries" (1)
05:03:20.545 + <00000009>device_poll - Created by <00000008>
05:03:20.545 > <00000009>device_poll - Sent StartTimer to <00000003>
05:03:21.046 < <00000009>device_poll - Received T1 from <00000003>
05:03:21.046 X <00000009>device_poll - Destroyed
05:03:21.046 < <00000008>start_vector - Received Completed from <00000009>
05:03:21.046 X <00000008>start_vector - Destroyed
An encoding of a DeviceControl object is piped to the device-poll.py process and after the programmed delay
the process terminates. An encoding of a DevicePoll object is placed on stdout. The device-control file
looks like this, i.e. the standard layout of a JSON encoding produced by the ansar.encode library;
{
"value": {
"control": "control"
}
}
To generate a sample input for any standard application just use the --dump-input flag. This will place
an encoding of the input expected by that particular application, on stdout:
$ dist/device-poll --dump-input
{
"value": {
"control": "control"
}
}
Redirecting this to a file allows for convenient editing.
Sharing Messages Across Multiple Processes
For successful exchange of the device messages, the classes must be known to all processes by the same name. In this
scenario it is enough to place the messages in a module (device_if.py - an abbreviation of device interface) that
is then imported by interested parties. All main modules and the device message module are in the same folder.
Sharing message types across multiple processes is as simple as sharing any class definitions. Messages types
can be placed in namespace packages that are then imported as normal. The ansar Stop message is distinct
to another message named Stop because internally the ansar message is known as ansar.create.lifecycle.Stop.
Whatever the local technique for sharing classes around a codebase may be, it should also work for message types.
In those difficult scenarios there is always;
import sys
sys.path.insert(0, os.path.abspath('*message-folder*'))
Bringing It All Together
The essential async object is created, accepts parameters, receives messages and terminates with a completion
message. It is also required to respond to a Stop message at any time. All current activities should be
interrupted (i.e. the Stop should be propagated to any child objects) before terminating itself.
An instance of the Process machine object satisfies these operational requirements. Under the hood it starts
a named executable file, parameters passed to the object are forwarded to the new process and output from the
process is included in the Completed message at termination - the object operates as an internal proxy
for an external process. Any Stop message received by the Process object results in
the sending of a signal (i.e. SIGINT) to the associated process id. As described in earlier sections, this signal will be
mapped to an Stop message by the receiving process, propagating the interrupt across the parent-child boundary.
The final simulation combines all 3 patterns of async programming into a single application. Here are the db_query()
function object with its dedicated platform thread, the NetworkServer state machine running in the default
transition thread, and the device_poll.py process, all running side-by-side and delivering the same 3 data
items in a concurrent fashion to main();
import random
import ansar.create as ar
from device_if import DeviceControl, DevicePoll
random.seed()
def adjust(period):
s = int(period / 0.1) + 1
b = int(s * 0.75) # Quicker.
e = int(s * 1.5) # Slower.
a = random.randrange(b, e) * 0.1
if a < 0.25:
return 0.25
return a
def db_query(self, query):
period = adjust(0.75)
self.start(ar.T1, period)
m = self.select(ar.T1, ar.Stop)
if isinstance(m, ar.Stop):
return ar.Aborted()
return 'selection'
ar.bind(db_query)
class NetworkRequest(object):
def __init__(self, request='request'):
self.request = request
class NetworkResponse(object):
def __init__(self, response='response'):
self.response = response
class NetworkDown(object):
pass
ar.bind(NetworkRequest)
ar.bind(NetworkResponse)
ar.bind(NetworkDown)
def one_chance_in(possibles):
r = random.randrange(0, possibles)
return r == 0
def connect():
if one_chance_in(20):
return False
return True
def request_response(request):
if one_chance_in(100):
return None
return 'response'
def disconnect():
pass
class INITIAL: pass
class READY: pass
class STANDBY: pass
class NetworkServer(ar.Point, ar.StateMachine):
def __init__(self):
ar.Point.__init__(self)
ar.StateMachine.__init__(self, INITIAL)
def NetworkServer_INITIAL_Start(self, message):
if connect():
return READY
self.start(ar.T1, 5.0)
return STANDBY
def NetworkServer_READY_NetworkRequest(self, message):
response = request_response(message.request)
if response is None:
self.reply(NetworkDown())
self.start(ar.T1, 5.0)
return STANDBY
self.reply(NetworkResponse(response))
return READY
def NetworkServer_READY_Stop(self, message):
disconnect()
self.complete(ar.Aborted())
def NetworkServer_STANDBY_NetworkRequest(self, message):
self.reply(NetworkDown())
return STANDBY
def NetworkServer_STANDBY_T1(self, message):
if connect():
return READY
self.start(ar.T1, 5.0)
return STANDBY
def NetworkServer_STANDBY_Stop(self, message):
self.complete(ar.Aborted())
NETWORK_SERVER_DISPATCH = {
INITIAL: (
(ar.Start,), ()
),
READY: (
(NetworkRequest, ar.Stop), ()
),
STANDBY: (
(NetworkRequest, ar.T1, ar.Stop), ()
),
}
ar.bind(NetworkServer, NETWORK_SERVER_DISPATCH)
def main(self):
api = self.create(NetworkServer)
q = self.create(db_query, 'query')
self.send(NetworkRequest('request'), api)
p = self.create(ar.Process, 'device-poll', input=DeviceControl('control'))
a = [q, p]
def stop():
for t in a:
self.send(ar.Stop(), t)
self.select(ar.Completed)
self.send(ar.Stop(), api)
self.select(ar.Completed)
try:
while len(a) > 0:
m = self.select(NetworkResponse, NetworkDown, ar.Completed, ar.Stop)
if isinstance(m, ar.Stop):
return ar.Aborted()
elif isinstance(m, ar.Completed):
a.remove(self.return_address)
elif isinstance(m, NetworkResponse):
pass
elif isinstance(m, NetworkDown):
pass
finally:
stop()
return 0
ar.bind(main)
if __name__ == '__main__':
ar.create_object(main)
The combination of a try-finally block and the stop() function has cleaned up the termination a little, and
the loop to collect the 3 data items has changed in support of the change to network requests.
Logs relating to this application now include the logs of a child process - there are now logs from 2 different
instances of the start_vector object;
$ PATH=dist:${PATH} python3 query-request-poll-3-way.py --debug-level=DEBUG
05:03:23.547 + <00000008>start_vector - Created by <00000001>
05:03:23.547 ~ <00000008>start_vector - Executable "/home/brad/gh/ansar-create/doc/source/class-that-stores/query-request-poll-3-way.py" as process (1468071)
05:03:23.547 ~ <00000008>start_vector - Working folder "/home/brad/gh/ansar-create/doc/source/class-that-stores"
05:03:23.547 ~ <00000008>start_vector - Running object "__main__.main"
05:03:23.547 ~ <00000008>start_vector - Class threads (1) "retries" (1)
05:03:23.547 + <00000009>main - Created by <00000008>
05:03:23.547 + <0000000a>NetworkServer[INITIAL] - Created by <00000009>
05:03:23.547 + <0000000b>db_query - Created by <00000009>
05:03:23.547 < <0000000a>NetworkServer[INITIAL] - Received Start from <00000009>
05:03:23.547 > <0000000b>db_query - Sent StartTimer to <00000003>
05:03:23.547 > <00000009>main - Sent NetworkRequest to <0000000a>
05:03:23.547 + <0000000c>Process[INITIAL] - Created by <00000009>
05:03:23.547 < <0000000a>NetworkServer[READY] - Received NetworkRequest from <00000009>
05:03:23.547 > <0000000a>NetworkServer[READY] - Sent NetworkResponse to <00000009>
05:03:23.547 < <0000000c>Process[INITIAL] - Received Start from <00000009>
05:03:23.548 < <00000009>main - Received NetworkResponse from <0000000a>
05:03:23.548 ~ <0000000c>Process[INITIAL] - Execute dist/device-poll --call-signature=io --debug-level=DEBUG
05:03:23.548 ( <0000000c>Process[INITIAL] - Started process (1468081)
05:03:23.548 + <0000000d>wait - Created by <0000000c>
5:03:23.653 + <00000007>start_vector - Created by <00000001>
5:03:23.653 ~ <00000007>start_vector - Executable "/home/brad/gh/ansar-create/doc/source/class-that-stores/dist/device-poll" as process (1468083)
5:03:23.653 ~ <00000007>start_vector - Working folder "/home/brad/gh/ansar-create/doc/source/class-that-stores"
5:03:23.653 ~ <00000007>start_vector - Running object "__main__.device_poll"
5:03:23.653 ~ <00000007>start_vector - Class threads (1) "retries" (1)
5:03:23.653 + <00000008>device_poll - Created by <00000007>
5:03:23.653 > <00000008>device_poll - Sent StartTimer to <00000003>
5:03:24.153 < <00000008>device_poll - Received T1 from <00000003>
5:03:24.153 X <00000008>device_poll - Destroyed
5:03:24.154 < <00000007>start_vector - Received Completed from <00000008>
5:03:24.154 X <00000007>start_vector - Destroyed
05:03:24.298 < <0000000b>db_query - Received T1 from <00000003>
05:03:24.298 X <0000000b>db_query - Destroyed
05:03:24.299 < <00000009>main - Received Completed from <0000000b>
05:03:24.431 X <0000000d>wait - Destroyed
05:03:24.431 < <0000000c>Process[EXECUTING] - Received Completed from <0000000d>
05:03:24.431 ) <0000000c>Process[EXECUTING] - Process (1468081) ended with 0
05:03:24.432 X <0000000c>Process[EXECUTING] - Destroyed
05:03:24.432 < <00000009>main - Received Completed from <0000000c>
05:03:24.432 > <00000009>main - Sent Stop to <0000000a>
05:03:24.432 < <0000000a>NetworkServer[READY] - Received Stop from <00000009>
05:03:24.432 X <0000000a>NetworkServer[READY] - Destroyed
05:03:24.432 < <00000009>main - Received Completed from <0000000a>
05:03:24.432 X <00000009>main - Destroyed
05:03:24.432 < <00000008>start_vector - Received Completed from <00000009>
05:03:24.432 X <00000008>start_vector - Destroyed
Where Is The Device Server
It would be a natural evolution to move device-poll.py (i.e. the process) to an arrangement similar
to NetworkServer. It would become a state machine called DeviceServer and clients would send control
messages to it and receive poll messages in response (i.e. DeviceControl and DevicePoll). This arrangement
would remove the need to start a fresh process for each poll.
The obvious missing link is the network messaging needed to transport the control-poll messages between the
parent and child processes (i.e. the proposed DeviceServer and main()).
Async network messaging is the domain of the ansar.connect library, whereas this document is focused on asynchronous programming with ansar. The ansar.connect` library is currently being rewritten for Python 3.x and will be released as soon as it becomes available.
Why The Time Adjustment
According to the statistics provided for the simulation the average execution times were 0.75s, 1.75s and 0.5s
for the query, request and the poll, respectively. If the 3 functions were started at the same time then the
completion order can be expected to be poll, query and then request. A sequence of 3 select() calls can then
assume the Completed messages will arrive in that same order, and process the m.value accordingly.
This is a naive approach to a standard asynchronous issue.
The adjust() function simulates the variability in actual execution times. Consider if the poll were to
execute particularly slowly and the query were to execute quickly, the order may be compromised. Processing of
the 3 m.values would fail.
There are 2 general strategies to resolve this issue. The Completed messages can be processed according
to the type of their contents or according to who sent them. The former requires that all the objects involved
return values of distinct type and the latter requires a “return address” at the moment each Completed
message is received.
If each object returns a distinct type the code looks like this;
while len(a) > 0:
m = self.select(ar.Completed, ar.Stop)
if isinstance(m, ar.Stop):
return ar.Aborted()
a.remove(self.return_address)
value = m.value
if isinstance(value, DbSelection):
selection = value
elif isinstance(m, NetworkResponse):
response = value
elif isinstance(m, DevicePoll):
poll = value
Adoption of the alternate technique looks like this;
q = self.create(db_query, 'query')
r = self.create(network_request, 'request')
p = self.create(device_poll, 'poll')
a = [q, r, p]
def stop():
for t in a:
self.send(ar.Stop(), t)
self.select(ar.Completed)
try:
while len(a) > 0:
m = self.select(ar.Completed, ar.Stop)
if isinstance(m, ar.Stop):
return ar.Abort()
a.remove(self.return_address)
if self.return_address == q:
selection = m.value
elif self.return_address == r:
response = m.value
elif self.return_address == p:
poll = m.value
finally:
stop()
action = what_next(selection, response, poll)
Both versions are impervious to variations in execution times. The former technique is the style of message processing seen in state machines. The latter technique has dedicated supporting methods in ansar. A context is used to automate interruption handling;
# Create objects
q = self.create(db_query, 'query')
r = self.create(network_request, 'request')
p = self.create(device_poll, 'poll')
# Assign identity to each running object.
self.assign(q, db_query)
self.assign(r, network_request)
self.assign(p, device_poll)
with ar.AutoStop(self):
# At least one incomplete object.
while self.working():
m = self.select(ar.Completed, ar.Stop)
if isinstance(m, ar.Stop):
return ar.Abort()
# Completed. Remove from object list and recover identity.
d = self.debrief()
if d is db_query:
selection = m.value
elif d is network_request:
response = m.value
elif d is device_poll:
poll = m.value
action = what_next(selection, response, poll)
The methods assign(), working() and debrief() reduce the overhead of working
with mixed collections of child objects, with the constant issue that an interruption can
arrive at any time. Code clarity is improved. Note that any object can be passed to assign()
as the identity. A unique str value would be the most typical. In those scenarios it is
perfectly reasonable to use the = operator for idenity checking. Where there are multiple
instances of complex objects, such as Job() objects, the is operator can be more
appropriate.
More About Message Processing
Consider the following code;
q = self.create(db_query, 'query')
m = self.select(ar.Completed, ar.Stop)
Or this;
q = self.send(DbQuery('query'), db)
m = self.select(DbSelection, ar.Stop)
The pattern of create() or send() and a matching select() can repeat
throughout an async application. The ask() method bundles this pattern into a single call;
m = self.ask(DbQuery('query'), (DbSelection, ar.Stop), db)
This is both convenient and a curiosity; ask() would appear to subvert the non-blocking nature of async software.
For this reason, methods such as ask() and select() are only available inside function objects like db_query(),
which run inside their own dedicated threads. Machines and their transition functions have no access to these methods.
This arrangement ensures that whatever happens inside a function object such as db_query does not affect the operational
status of other application objects. Note that it is still advisable to declare timers on all message input calls -
both ask() and select() accept a seconds parameter. The presence of that parameter enables a timer and automatically
adds the SelectTimer class to the possible result types.
To assist with the more complex corners of asynchronicity, ansar implements a sophisticated message input sub-system,
with methods such as ask() at the very top. It includes capabilities such as “saving” where received messages can
be “pushed back” into the input stream for a form of deferred processing.
m = self.ask(DbQuery('query'), (DbSelection, ar.Stop), db, saving=(NetworkRequest, DevicePoll))
If encountered, messages listed in the saving tuple are deferred. An equivalent mechanism exists in the dispatch information associated with a state machine;
READY: (
(DbQuery, ar.Stop), (NetworkRequest, DevicePoll)
),
The first tuple lists the messages to be accepted in the READY state while the second tuple lists the messages to
be deferred to a more convenient moment. The machine eventually expects to process the full complement.
The “save” abstraction comes directly from SDL - the Specification and Description Language. Full description of the input system is outside the scope of this document. Suffice to say that ansar message processing is based on a formal model with substantial standards, organizations, toolsets and communities associated with it.