|
NAMEKafka - Apache Kafka interface for Perl.VERSIONThis documentation refers to "Kafka" package version 0.8010 .SYNOPSISuse 5.010; use strict; use warnings; use Scalar::Util qw( blessed ); use Try::Tiny; use Kafka qw( $BITS64 ); use Kafka::Connection; # A simple example of Kafka usage # common information say 'This is Kafka package ', $Kafka::VERSION; say 'You have a ', $BITS64 ? '64' : '32', ' bit system'; my ( $connection, $producer, $consumer ); try { #-- Connect to local cluster $connection = Kafka::Connection->new( host => 'localhost' ); #-- Producer $producer = Kafka::Producer->new( Connection => $connection ); #-- Consumer $consumer = Kafka::Consumer->new( Connection => $connection ); } catch { if ( blessed( $_ ) && $_->isa( 'Kafka::Exception' ) ) { warn 'Error: (', $_->code, ') ', $_->message, "\n"; exit; } else { die $_; } }; # cleaning up undef $consumer; undef $producer; undef $connection; # another brief code example of the Kafka package # is provided in the "An Example" section. ABSTRACTThe Kafka package is a set of Perl modules which provides a simple and consistent application programming interface (API) to Apache Kafka 0.8, a high-throughput distributed messaging system.DESCRIPTIONThe user modules in this package provide an object oriented API. The IO agents, requests sent, and responses received from the Apache Kafka or mock servers are all represented by objects. This makes a simple and powerful interface to these services.The main features of the package are:
APACHE KAFKA'S STYLE COMMUNICATIONThe Kafka package is based on Kafka's 0.8 Protocol specification document at <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol>
The Connection ObjectClients use the Connection object to communicate with the Apache Kafka cluster. The Connection object is an interface layer between your application code and the Apache Kafka cluster.Connection object is required to create instances of classes Kafka::Producer or Kafka::Consumer. Kafka Connection API is implemented by Kafka::Connection class. use Kafka::Connection; # connect to local cluster with the defaults my $connection = Kafka::Connection->new( host => 'localhost' ); The main attributes of the Connection object are:
The IO ObjectThe Kafka::Connection object use internal class Kafka::IO to maintain communication with the particular server of Kafka cluster The IO object is an interface layer between Kafka::Connection object and the network.Kafka IO API is implemented by Kafka::IO class. Note that end user normally should have no need to use Kafka::IO but work with Kafka::Connection instead. use Kafka::IO; # connect to local server with the defaults my $io = Kafka::IO->new( host => 'localhost' ); The main attributes of the IO object are:
The Producer ObjectKafka producer API is implemented by Kafka::Producer class.use Kafka::Producer; #-- Producer my $producer = Kafka::Producer->new( Connection => $connection ); # Sending a single message $producer->send( 'mytopic', # topic 0, # partition 'Single message' # message ); # Sending a series of messages $producer->send( 'mytopic', # topic 0, # partition [ # messages 'The first message', 'The second message', 'The third message', ] ); The main methods and attributes of the producer request are:
The Consumer ObjectKafka consumer API is implemented by Kafka::Consumer class.use Kafka::Consumer; $consumer = Kafka::Consumer->new( Connection => $connection ); The request methods of the consumer object are "offsets()" and "fetch()". "offsets" method returns a reference to the list of offsets of received messages. "fetch" method returns a reference to the list of received Kafka::Message objects. use Kafka qw( $DEFAULT_MAX_BYTES $DEFAULT_MAX_NUMBER_OF_OFFSETS $RECEIVE_EARLIEST_OFFSETS ); # Get a list of valid offsets up to max_number before the given time my $offsets = $consumer->offsets( 'mytopic', # topic 0, # partition $RECEIVE_EARLIEST_OFFSETS, # time $DEFAULT_MAX_NUMBER_OF_OFFSETS # max_number ); say "Received offset: $_" foreach @$offsets; # Consuming messages my $messages = $consumer->fetch( 'mytopic', # topic 0, # partition 0, # offset $DEFAULT_MAX_BYTES # Maximum size of MESSAGE(s) to receive ); foreach my $message ( @$messages ) { if( $message->valid ) { say 'payload : ', $message->payload; say 'key : ', $message->key; say 'offset : ', $message->offset; say 'next_offset: ', $message->next_offset; } else { say 'error : ', $message->error; } } See Kafka::Consumer for additional information and documentation about class methods and arguments. The Message ObjectKafka message API is implemented by Kafka::Message class.if( $message->valid ) { say 'payload : ', $message->payload; say 'key : ', $message->key; say 'offset : ', $message->offset; say 'next_offset: ', $message->next_offset; } else { say 'error : ', $message->error; } Methods available for Kafka::Message object :
The Exception ObjectA designated class "Kafka::Exception" is used to provide a more detailed and structured information when error is detected.The following attributes are declared within "Kafka::Exception": code, message. Additional subclasses of "Kafka::Exception" designed to report errors in respective Kafka classes: "Kafka::Exception::Connection", "Kafka::Exception::Consumer", "Kafka::Exception::IO", "Kafka::Exception::Int64", "Kafka::Exception::Producer". Authors suggest using of Try::Tiny's "try" and "catch" to handle exceptions while working with Kafka module. EXPORTNone by default.Additional constantsAdditional constants are available for import, which can be used to define some type of parameters, and to identify various error cases.
CompressionAccording to Apache Kafka documentation:Kafka currently supports two compression codecs with the following codec numbers:
Error codesPossible error codes (complies with a hash of descriptions $ERROR):
Contains the descriptions of possible error codes obtained via ERROR_CODE box of Apache Kafka Wire Format protocol response.
An Exampleuse 5.010; use strict; use warnings; use Scalar::Util qw( blessed ); use Try::Tiny; use Kafka qw( $KAFKA_SERVER_PORT $REQUEST_TIMEOUT $RECEIVE_EARLIEST_OFFSETS $DEFAULT_MAX_NUMBER_OF_OFFSETS $DEFAULT_MAX_BYTES ); use Kafka::Connection; use Kafka::Producer; use Kafka::Consumer; my ( $connection, $producer, $consumer ); try { #-- Connection $connection = Kafka::IO->new( host => 'localhost' ); #-- Producer $producer = Kafka::Producer->new( Connection => $connection ); # Sending a single message $producer->send( 'mytopic', # topic 0, # partition 'Single message' # message ); # Sending a series of messages $producer->send( 'mytopic', # topic 0, # partition [ # messages 'The first message', 'The second message', 'The third message', ] ); #-- Consumer $consumer = Kafka::Consumer->new( Connection => $connection ); # Get a list of valid offsets up max_number before the given time my $offsets = $consumer->offsets( 'mytopic', # topic 0, # partition $RECEIVE_EARLIEST_OFFSETS, # time $DEFAULT_MAX_NUMBER_OF_OFFSETS # max_number ); if( @$offsets ) { say "Received offset: $_" foreach @$offsets; } else { warn "Error: Offsets are not received\n"; } # Consuming messages my $messages = $consumer->fetch( 'mytopic', # topic 0, # partition 0, # offset $DEFAULT_MAX_BYTES # Maximum size of MESSAGE(s) to receive ); if ( $messages ) { foreach my $message ( @$messages ) { if( $message->valid ) { say 'payload : ', $message->payload; say 'key : ', $message->key; say 'offset : ', $message->offset; say 'next_offset: ', $message->next_offset; } else { say 'error : ', $message->error; } } } } catch { if ( blessed( $_ ) && $_->isa( 'Kafka::Exception' ) ) { warn 'Error: (', $_->code, ') ', $_->message, "\n"; exit; } else { die $_; } }; # Closes and cleans up undef $consumer; undef $producer; undef $connection; DEPENDENCIESIn order to install and use this package you will need Perl version 5.10 or later. Some modules within this package depend on other packages that are distributed separately from Perl. We recommend that you have the following packages installed before you install Kafka:Compress::Snappy Const::Fast Data::Compare Data::HexDump::Range Exception::Class List::MoreUtils Params::Util Scalar::Util::Numeric String::CRC32 Sys::SigAction Try::Tiny Kafka package has the following optional dependencies: Capture::Tiny Clone Config::IniFiles File::HomeDir Proc::Daemon Proc::ProcessTable Sub::Install Test::Deep Test::Exception Test::NoWarnings Test::TCP If the optional modules are missing, some "prereq" tests are skipped. DIAGNOSTICSDebug output can be enabled by setting level via one of the following environment variables:"PERL_KAFKA_DEBUG=1" - debug is enabled for the whole "Kafka" package. "PERL_KAFKA_DEBUG=IO:1" - enable debug only for Kafka::IO only. "PERL_KAFKA_DEBUG=Connection:1" - enable debug only for particular Kafka::Connection. It's possible to set different debug levels, like in the following example: "PERL_KAFKA_DEBUG=Connection:1,IO:2" See documentation for a particular module for explanation of various debug levels. BUGS AND LIMITATIONSProducer and Consumer methods only work with one topic and one partition at a time. Also module does not implement the Offset Commit/Fetch API.Producer's, Consumer's, Connection's string arguments must be binary strings. Using Unicode strings may cause an error or data corruption. This module does not support Kafka protocol versions earlier than 0.8. Kafka::IO->new' uses Sys::SigAction and "alarm()" to limit some internal operations. This means that if an external "alarm()" was set, signal delivery may be delayed. With non-empty timeout, we use "alarm()" internally in Kafka::IO and try preserving existing "alarm()" if possible. However, if Time::HiRes::ualarm() is set before calling Kafka modules, its behaviour is unspecified (i.e. it could be reset or preserved etc.). For "gethostbyname" operations the non-empty timeout is rounded to the nearest greater positive integer; any timeouts less than 1 second are rounded to 1 second. You can disable the use of "alarm()" by setting "timeout => undef" in the constructor. The Kafka package was written, tested, and found working on recent Linux distributions. There are no known bugs in this package. Please report problems to the "AUTHOR". Patches are welcome. MORE DOCUMENTATIONAll modules contain detailed information on the interfaces they provide.SEE ALSOThe basic operation of the Kafka package modules:Kafka - constants and messages used by the Kafka package modules. Kafka::Connection - interface to connect to a Kafka cluster. Kafka::Producer - interface for producing client. Kafka::Consumer - interface for consuming client. Kafka::Message - interface to access Kafka message properties. Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems. Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol. Kafka::IO - low-level interface for communication with Kafka server. Kafka::Exceptions - module designated to handle Kafka exceptions. Kafka::Internals - internal constants and functions used by several package modules. A wealth of detail about the Apache Kafka and the Kafka Protocol: Main page at <http://kafka.apache.org/> Kafka Protocol at <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol> SOURCE CODEKafka package is hosted on GitHub: <https://github.com/TrackingSoft/Kafka>AUTHORSergey Gladkov, <sgladkov@trackingsoft.com>CONTRIBUTORSAlexander SoloveyJeremy Jordan Sergiy Zuban Vlad Marchenko COPYRIGHT AND LICENSECopyright (C) 2012-2013 by TrackingSoft LLC.This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at <http://dev.perl.org/licenses/artistic.html>. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
Visit the GSP FreeBSD Man Page Interface. |