|
NAMEIO::Lambda - non-blocking I/O as lambda calculusSYNOPSISThe code below demonstrates execution of parallel HTTP requestsuse strict; use IO::Lambda qw(:lambda :func); use IO::Socket::INET; # this function creates a new lambda object # associated with one socket, and fetches a single URL sub http { my $host = shift; # Simple HTTP functions by first sending request to the remote, and # then waiting for the response. This sets up a new lambda object with # attached one of many closures that process sequentially return lambda { # create a socket, and issue a tcp connect my $socket = IO::Socket::INET-> new( PeerAddr => $host, PeerPort => 80 ); # Wait until socket become writable. Parameters to writable() # are passed using context(). This association is remembered # within the engine. context $socket; # writeable sets up a possible event to monitor, when # $socket is writeable, execute the closure. writable { # The engine discovered we can write, so send the request print $socket "GET /index.html HTTP/1.0\r\n\r\n"; # This variable needs to stay shared across # multiple invocations of our readable closure, so # it needs to be outside that closure. Here, it collects # whatever the remote returns my $buf = ''; # readable registers another event to monitor - # that $socket is readable. Note that we do not # need to set the context again because when we get # here, the engine knows what context this command # took place in, and assumes the same context. # Also note that socket won't be awaited for writable events # anymore, and this code won't be executed for this $socket. readable { # This closure is executed when we can read. # Read from the socket. sysread() returns number of # bytes read. Zero means EOF, and undef means error, so # we stop on these conditions. # If we return without registering a follow-up # handler, this return will be processed as the # end of this sequence of events for whoever is # waiting on us. return $buf unless sysread( $socket, $buf, 1024, length($buf)); # We're not done so we need to do this again. # Note that the engine knows that it just # called this closure because $socket was # readable, so it can infer that it is supposed # to set up a callback that will call this # closure when $socket is next readable. again; }}} } # Fire up a single lambda and wait until it completes. print http('www.perl.com')-> wait; # Fire up a lambda that waits for two http requests in parallel. # tails() can wait for more than one lambda my @hosts = ('www.perl.com', 'www.google.com'); lambda { context map { http($_) } @hosts; # tails() asynchronously waits until all lambdas in the context # are finished. tails { print @_ } }-> wait; # crawl for all urls in parallel, but keep 10 parallel connections max print par(10)-> wait(map { http($_) } @hosts); # crawl for all urls sequentially print mapcar( curry { http(shift) })-> wait(@hosts); Note: "io" and "lambda" are synonyms - I personally prefer "lambda" but some find the word slightly inappropriate, hence "io". See however Higher-order functions to see why it is more "lambda" than "io". DESCRIPTIONThis module is another attempt to fight the horrors of non-blocking I/O. It tries to bring back the simplicity of the declarative programming style, that is only available when one employs threads, coroutines, or co-processes. Usually coding non-blocking I/O for single process, single thread programs requires construction of state machines, often fairly complex, which fact doesn't help the code clarity, and is the reason why the asynchronous I/O programming is often considered 'messy'. Similar to the concept of monads in functional languages, that enforce a certain order of execution over generally orderless functions, "IO::Lambda" allows writing I/O callbacks in a style that resembles the good old sequential, declarative programming.The manual begins with code examples, then proceeds to explaining basic assumptions, then finally gets to the complex concepts, where the real fun begins. You can skip directly there ("Stream IO", "Higher-order functions"), where the functional style mixes with I/O. If, on the contrary, you are intimidated by the module's ambitions, you can skip to "Simple use" for a more gentle introduction. Those, who are interested how the module is different from the other I/O frameworks, please continue reading. Warning: API in version 1.01 has slightly changed. See IO::Lambda::Compat for dealing with program written usign the older API. Simple useThis section is for those who don't need all of the module's powerful machinery. Simple callback-driven programming examples show how to use the module for unsophisticated tasks, using concepts similar to the other I/O frameworks. It is possible to use the module on this level only, however one must be aware that by doing so, the real power of the higher-order abstraction is not used."IO::Lambda", like all I/O multiplexing libraries, provides functions for registering callbacks, that in turn are called when a timeout occurs, or when a file handle is ready for reading and/or writing. See below code examples that demonstrate how to program on this level of abstraction.
Example: reading lines from a filehandleGiven $filehandle is non-blocking, the following code creates a lambda object (later, simply a lambda) that reads from the handle until EOF or an error occured. Here, "getline" (see "Stream IO" below) constructs a lambda that reads a single line from a filehandle.use IO::Lambda qw(:all); sub my_reader { my $filehandle = shift; lambda { context getline, $filehandle, \(my $buf = ''); tail { my ( $string, $error) = @_; if ( $error) { warn "error: $error\n"; } else { print $string; return again; } }} } Assume we have two socket connections, and sockets are non-blocking - read from both of them in parallel. The following code creates a lambda that reads from two readers: sub my_reader_all { my @filehandles = @_; lambda { context map { my_reader($_) } @filehandles; tails { print "all is finished\n" }; } } my_reader_all( $socket1, $socket2)-> wait; Non-blocking HTTP clientGiven a socket, create a lambda that implements the HTTP protocoluse IO::Lambda qw(:all); use IO::Socket; use HTTP::Request; sub talk { my $req = shift; my $socket = IO::Socket::INET-> new( PeerAddr => 'www.perl.com', PeerPort => 80); lambda { context $socket; writable { # connected print $socket "GET ", $req-> uri, "\r\n\r\n"; my $buf = ''; readable { sysread $socket, $buf, 1024, length($buf) or return $buf; again; # wait for reading and re-do the block } } } } Connect and talk to the remote $request = HTTP::Request-> new( GET => 'http://www.perl.com'); my $q = talk( $request ); print $q-> wait; # will print content of $buf Connect two parallel connections: by explicitly waiting for each $q = lambda { context talk($request); tail { print shift }; context talk($request2); tail { print shift }; }; $q-> wait; Connect two parallel connections: by waiting for all $q = lambda { context talk($request1), talk($request2); tails { print for @_ }; }; $q-> wait; Teach our simple http request to redirect by wrapping talk(). talk_redirect() will have exactly the same properties as talk() does sub talk_redirect { my $req = shift; lambda { context talk( $req); tail { my $res = HTTP::Response-> parse( shift ); return $res unless $res-> code == 302; $req-> uri( $res-> uri); context talk( $req); again; } } } Full example codeuse strict; use IO::Lambda qw(:lambda); use IO::Socket::INET; sub get { my ( $socket, $url) = @_; lambda { context $socket; writable { print $socket "GET $url HTTP/1.0\r\n\r\n"; my $buf = ''; readable { my $n = sysread( $socket, $buf, 1024, length($buf)); return "read error:$!" unless defined $n; return $buf unless $n; again; }}} } sub get_parallel { my @hosts = @_; lambda { context map { get( IO::Socket::INET-> new( PeerAddr => $_, PeerPort => 80 ), '/index.html') } @hosts; tails { join("\n\n\n", @_ ) } } } print get_parallel('www.perl.com', 'www.google.com')-> wait; See tests and additional examples in directory "eg/" for more information. APIEvents and statesA lambda is an "IO::Lambda" object, that waits for I/O and timeout events, and for events generated when other lambdas are completed. On each such event a callback is executed. The result of the execution is saved, and passed on to the next callback, when the next event arrives.Life cycle of a lambda goes through three modes: passive, waiting, and stopped. A lambda that is just created, or was later reset with "reset" call, is in the passive state. When the lambda gets started, the only executed code will be the callback associated with the lambda: $q = lambda { print "hello world!\n" }; # not printed anything yet $q-> wait; # <- here it will Lambdas are usually not started explicitly. Usually, the function that can wait for a lambda, starts it too. "wait", the synchronous waiter, and "tail"/"tails", the asynchronous ones, start passive lambdas when called. A lambda is finished when there are no more events to listen to. The lambda in the example above will finish right after "print" statement. Lambda can listen to events by calling conditions, that internally subscribe the lambda object to the corresponding file handles, timers, and other lambdas. Most of the expressive power of "IO::Lambda" lies in the conditions, such as "readable", "writable", "timeout". Conditions are different from normal perl subroutines in the way how they receive their parameters. The only parameter they receive in the normal way, is the associated callback, while all other parameters are passed to it through the alternate stack, by the explicit "context" call. In the example below, lambda watches for file handle readability: $q = lambda { context \*SOCKET; readable { print "I'm readable!\n"; } # here is nothing printed yet }; # and here is nothing printed yet Such lambda, when started, will switch to the waiting state, which means that it will be waiting for the socket. The lambda will finish only after the callback associated with "readable" condition is called. Of course, new event listeners can be created inside all callbacks, on each state. This fact constitutes another large benefit of "IO::Lambda", as it allows to program FSMs dynamically. The new event listeners can be created either by explicitly calling condition, or by restarting the last condition with the "again" call. For example, code readable { print 1; again if int rand(2) } prints indeterminable number of ones. ContextsAll callbacks associated with a lambda object (further on, merely lambda) execute in one, private context, also associated to the lambda. The context here means that all conditions register callbacks on an implicitly given lambda object, and keep the passed parameters on the context stack. The fact that the context is preserved between states, helps building terser code with series of IO calls:context \*SOCKET; writable { readable { }} is actually the shorter form for context \*SOCKET; writable { context \*SOCKET; # <-- context here is retained from one frame up readable { }} And as the context is bound to the current closure, the current lambda object is too, in "this" property. The code above is actually my $self = this; context \*SOCKET; writable { this $self; # <-- object reference is retained here context \*SOCKET; readable { }} "this" can be used if more than one lambda needs to be accessed. In which case, this $object; context @context; is the same as this $object, @context; which means that explicitly setting "this" will always clear the context. Data and execution flowA lambda is initially called with some arguments passed from the outside. These arguments can be stored using the "call" method; "wait" and "tail" also issue "call" internally, thus replacing any previous data stored by "call". Inside the lambda these arguments are available as @_.Whatever is returned by a condition callback (including the "lambda" condition itself), will be passed further on as @_ to the next callback, or to the outside, if the lambda is finished. The result of the finished lambda is available by "peek" method, that returns either all array of data available in the array context, or first item in the array otherwise. "wait" returns the same data as "peek" does. When more than one lambda watches for another lambda, the latter will get its last callback results passed to all the watchers. However, when a lambda creates more than one state that derive from the current state, a forking behaviour of sorts, the latest stored results gets overwritten by the first executed callback, so constructions such as readable { 1 + shift }; writable { 2 + shift }; ... wait(0) will eventually return 3, but whether it will be 1+2 or 2+1, is undefined. "wait" is not the only function that synchronises input and output data. "wait_for_all" method waits for all lambdas, including the caller, to finish. It returns collected results of all the objects in a single list. "wait_for_any" method waits for at least one lambda, from the list of passed lambdas (again, including the caller), to finish. It returns list of finished objects as soon as possible. TimeTimers and I/O timeouts can be given not only in the timeout values, as it usually is in event libraries, but also as deadlines in (fractional) seconds since epoch. This decision, strange at first sight, actually helps a lot when a total execution time is to be tracked. For example, the following code reads as many bytes as possible from a socket within 5 seconds:lambda { my $buf = ''; context $socket, time + 5; readable { if ( shift ) { return again if sysread $socket, $buf, 1024, length($buf); } else { print "oops! a timeout\n"; } $buf; } }; Rewriting the same code with "readable" semantics that accepts time as a timeout instead, would be not that elegant: lambda { my $buf = ''; my $time_left = 5; my $now = time; context $socket, $time_left; readable { if ( shift ) { if (sysread $socket, $buf, 1024, length($buf)) { $time_left -= (time - $now); $now = time; context $socket, $time_left; return again; } } else { print "oops! a timeout\n"; } $buf; } }; However, the exact opposite is true for "timeout". The following two lines both sleep 5 seconds: lambda { context 5; timeout {} } lambda { context time + 5; timeout {} } Internally, timers use "Time::HiRes::time" that gives the fractional number of seconds. This however is not required for the caller, because when high-res timers are not used, timeouts will simply be less precise, and will jitter plus-minus half a second. ConditionsAll conditions receive their parameters from the context stack, or simply the context. The only parameter passed to them by using perl call, is the callback itself. Conditions can also be called without a callback, in which case, they will pass further data that otherwise would be passed as @_ to the callback. Thus, a condition can be called either asreadable { .. code ... } or &readable(); # no callback &readable; # DANGEROUS!! same as &readable(@_) Conditions can either be used after explicit exporting use IO::Lambda qw(:lambda); lambda { ... } or by using the package syntax, use IO::Lambda; IO::Lambda::lambda { ... }; Note: If you know concept of continuation-passing style, this is exactly how conditions work, except that closures are used instead of continuations (Brock Wilcox:thanks!) .
Stream IOThe whole point of this module is to help building protocols of arbitrary complexity in a clear, consequent programming style. Consider how perl's low-level "sysread" and "syswrite" relate to its higher-level "readline", where the latter not only does the buffering, but also recognizes $/ as input record separator. The section above described lower-level lambda I/O conditions, that are only useful for "sysread" and "syswrite". This section tells about higher-level lambdas that relate to these low-level ones, as the aforementioned "readline" relates to "sysread".All functions in this section return the lambda, that does the actual work. Not unlike as a class constructor returns a newly created class instance, these functions return newly created lambdas. Such functions will be further referred as lambda constructors, or simply constructors. Therefore, constructors are documented here as having two inputs and one output, as for example a function "sysreader" is a function that takes 0 parameters, always returns a new lambda, and this lambda, in turn, takes four parameters and returns two. This constructor will be described as # sysreader() :: ($fh,$$buf,$length,$deadline) -> ($result,$error) Since all stream I/O lambdas return same set of scalars, the return type will be further on referred as "ioresult": # ioresult :: ($result, $error) # sysreader() :: ($fh,$$buf,$length,$deadline) -> ioresult "ioresult"'s first scalar is defined on success, and is not otherwise. In the latter case, the second scalar contains the error, usually either $! or 'timeout' (if $deadline was set). Before describing the actual functions, consider the code that may benefit from using them. Let's take a lambda that needs to implement a very simple HTTP/0.9 request: lambda { my $handle = shift; my $buf = ''; context getline, $handle, \$buf; tail { my $req = shift; die "bad request" unless $req =~ m[GET (.*)$]i; do_request($handle, $1); }} "getline" reads from $handle to $buf, and wakes up when a new line is there. However, what if we need, for example, HTTPS instead of HTTP, where reading from a socket may involve some writing, and of course some waiting? Then the first default parameter to getline has to be replaced. By default, context getline, $handle, \$buf; is the same as my $reader = sysreader; context getline($reader), $handle, \$buf; where "sysreader" creates a lambda $reader, that given $handle, awaits when it becomes readable, and reads from it. "getline", in turn, repeatedly calls $reader, until the whole line is read. Thus, we call context getline(https_reader), $handle, \$buf; instead, that should conform to sysreader signature: sub https_reader { lambda { my ( $fh, $buf, $length, $deadline) = @_; # read from SSL socket return $error ? (undef, $error) : $data; } } I'm not showing the actual implementation of a HTTPS reader (if you're curious, look at IO::Lambda::HTTP::HTTPS ), but the idea is that inside that reader, it is perfectly fine to do any number of read and write operations, and wait for their completion too, as long as the upper-level lambda will sooner or later gets the data. "getline" (or, rather, "readbuf" that "getline" is based on) won't care about internal states of the reader. Check out t/06_stream.t that emulates reading and writing implemented in this fashion. These functions are imported with use IO::Lambda qw(:stream);
Higher-order functionsFunctions described in this section justify the lambda in "IO::Lambda". Named deliberately after the classic function names, they provide a similar interface.These function are imported with use IO::Lambda qw(:func);
Object APIThis section lists methods of "IO::Lambda" class. Note that by design all lambda-style functionality is also available for object-style programming. Together with the fact that lambda syntax is not exported by default, it thus leaves a place for possible implementations of user-defined syntax, either with or without lambdas, on top of the object API, without accessing the internals.The object API is mostly targeted to developers that need to connect third-party asynchronous event libraries with the lambda interface.
Exceptions and backtraceIn addition to the normal call stack as reported by the "caller" builtin, it can be useful also to access execution information of the thread of events, when a lambda waits for another, which in turn waits for another, etc. The following functions deal with backtrace information and exceptions, that propagate through thread of events.
MISCELLANEOUSIncluded modules
DebuggingVarious sub-modules can be controlled with the single environment variable, "IO_LAMBDA_DEBUG", which is treated as a comma-separated list of modules. For example,env IO_LAMBDA_DEBUG=io=2,http perl script.pl displays I/O debug messages from "IO::Lambda" (with extra verbosity) and from "IO::Lambda::HTTP". "IO::Lambda" responds for the following keys:
Keys recognized for the other modules: select,dbi,http,https,signal,message,thread,fork,poll,flock. Online informationProject homepage: <http://iolambda.karasik.eu.org/>Mailing list: io-lambda-general at lists.sourceforge.net, thanks to sourceforge. Subscribe by visiting <https://lists.sourceforge.net/lists/listinfo/io-lambda-general>. Benchmarks
ApologeticsThere are many async libraries readily available from CPAN. "IO::Lambda" is yet another one. How is it different from the existing tools? Why using it? To answer these questions, I need to show the evolution of async libraries, to explain how they grew from simple tools to complex frameworks.First, all async libraries are based on OS-level syscalls, like "select", "poll", "epoll", "kqueue", and "Win32::WaitForMultipleObjects". The first layer of async libraries provides access to exactly these facilites: there are "IO::Select", "IO::Epoll", "IO::Kqueue" etc. I won't go deepeer into describing pros and contras for programming on this level, this should be obvious. Perl modules of the next abstraction layer are often characterised by portability and event loops. While the modules of the first layer are seldom portable, and have no event loops, the second layer modules strive to be OS-independent, and use callbacks to ease the otherwise convoluted ways async I/O would be programmed. These modules mostly populate the "asynchronous input-output programming frameworks" niche in the perl world. The examples are many: "IO::Events", "EV", "AnyEvent", "IO::NonBlocking", "IO::Multiplex", to name the few. Finally, there's the third layer of complexity, which, before "IO::Lambda", had a single representative: "POE" (now, to the best of my knowledge, "IO::Async" also partially falls in this category). Modules of the third layer are based on concepts from the second, but introduce a powerful tool to help the programming of complex protocols, something that isn't available in the second layer modules: finite state machines (FSMs). The FSMs reduce programming complexity, for example, of intricate network protocols, that are best modelled as a set of states in a logical circuit. Also, the third layer modules are agnostic of the event loop module: the programmer is (almost) free to choose the event loop backend, such as native "select", "Gtk", "EV", "Prima", or "AnyEvent", depending on the nature of the task. "IO::Lambda" allows the programmer to build protocols of arbitrary complexity, and is also based on event loops, callbacks, and is portable. It differs from "POE" in the way the FSMs are declared. Where "POE" requires an explicit switch from one state to another, using f.ex. "post" or "yield" commands, "IO::Lambda" incorporates the switching directly into the program syntax. Consider "POE" code: POE::Session-> create( inline_states => { state1 => sub { print "state1\n"; $_[ KERNEL]-> yield("state2"); }, state2 => sub { print "state2\n"; }, }); and the correspodning "IO::Lambda" code (state1 and state2 are conditions, they need to be declared separately): lambda { state1 { print "state1\n"; state2 { print "state2\n"; }} } In "IO::Lambda", the programming style is (deliberately) not much different from the declarative print "state1\n"; print "state2\n"; as much as the nature of asynchronous programming allows that. To sum up, the intended use of "IO::Lambda" is for areas where simple callback-based libraries require lots of additional work, and where state machines are beneficial. Complex protocols like HTTP, parallel execution of several tasks, strict control of task and protocol hierarchy - this is the domain where "IO::Lambda" works best. LICENSE AND COPYRIGHTThis work is partially sponsored by capmon ApS.This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself. AUTHORDmitry Karasik, <dmitry@karasik.eu.org>.I wish to thank those who helped me: Ben Tilly for providing thorough comments to the code in the synopsis, bringing up various important issues, valuable discussions, for his patience and dedicated collaboration. David A. Golden for discussions about names, and his propositions to rename some terms into more appropriate, such as "read" to "readable", and "predicate" to "condition". Rocco Caputo for optimizing the POE benchmark script. Randal L. Schwartz, Brock Wilcox, and zby@perlmonks helped me to understand how the documentation for the module could be made better. All the good people on perlmonks.org and perl conferences, who invested their time into understanding the module.
Visit the GSP FreeBSD Man Page Interface. |