|
NAMEKafka::Consumer - Perl interface for Kafka consumer client.VERSIONThis documentation refers to "Kafka::Consumer" version 0.8010 .SYNOPSISuse 5.010; use strict; use warnings; use Scalar::Util qw( blessed ); use Try::Tiny; use Kafka qw( $DEFAULT_MAX_BYTES $DEFAULT_MAX_NUMBER_OF_OFFSETS $RECEIVE_EARLIEST_OFFSETS ); use Kafka::Connection; use Kafka::Consumer; my ( $connection, $consumer ); try { #-- Connection $connection = Kafka::Connection->new( host => 'localhost' ); #-- Consumer $consumer = Kafka::Consumer->new( Connection => $connection ); # Get a list of valid offsets up max_number before the given time my $offsets = $consumer->offsets( 'mytopic', # topic 0, # partition $RECEIVE_EARLIEST_OFFSETS, # time $DEFAULT_MAX_NUMBER_OF_OFFSETS # max_number ); if( @$offsets ) { say "Received offset: $_" foreach @$offsets; } else { warn "Error: Offsets are not received\n"; } # Consuming messages my $messages = $consumer->fetch( 'mytopic', # topic 0, # partition 0, # offset $DEFAULT_MAX_BYTES # Maximum size of MESSAGE(s) to receive ); if ( $messages ) { foreach my $message ( @$messages ) { if( $message->valid ) { say 'payload : ', $message->payload; say 'key : ', $message->key; say 'offset : ', $message->offset; say 'next_offset: ', $message->next_offset; } else { say 'error : ', $message->error; } } } } catch { if ( blessed( $_ ) && $_->isa( 'Kafka::Exception' ) ) { warn 'Error: (', $_->code, ') ', $_->message, "\n"; exit; } else { die $_; } }; # Closes the consumer and cleans up undef $consumer; undef $connection; DESCRIPTIONKafka consumer API is implemented by "Kafka::Consumer" class.The main features of the "Kafka::Consumer" class are:
The Kafka consumer response returns ARRAY references for "offsets" and "fetch" methods. Array returned by "offsets" contains offset integers. Array returned by "fetch" contains objects of Kafka::Message class. CONSTRUCTOR"new"Creates a new consumer client object. Returns the created "Kafka::Consumer" object. "new()" takes arguments in key-value pairs. The following arguments are recognized:
METHODSThe following methods are defined for the "Kafka::Consumer" class:"fetch( $topic, $partition, $start_offset, $max_size )" Get a list of messages to consume one by one up to $max_size bytes. Returns the reference to array of the Kafka::Message objects. "fetch()" takes the following arguments:
"offsets( $topic, $partition, $time, $max_number )" Get a list of valid offsets up to $max_number before the given time. Returns reference to array of the offset integers (Math::BigInt integers on 32 bit system). "offsets()" takes the following arguments:
DIAGNOSTICSWhen error is detected, an exception, represented by object of "Kafka::Exception::Consumer" class, is thrown (see Kafka::Exceptions).code and a more descriptive message provide information about thrown exception. Consult documentation of the Kafka::Exceptions for the list of all available methods. Authors suggest using of Try::Tiny's "try" and "catch" to handle exceptions while working with Kafka package.
SEE ALSOThe basic operation of the Kafka package modules:Kafka - constants and messages used by the Kafka package modules. Kafka::Connection - interface to connect to a Kafka cluster. Kafka::Producer - interface for producing client. Kafka::Consumer - interface for consuming client. Kafka::Message - interface to access Kafka message properties. Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems. Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol. Kafka::IO - low-level interface for communication with Kafka server. Kafka::Exceptions - module designated to handle Kafka exceptions. Kafka::Internals - internal constants and functions used by several package modules. A wealth of detail about the Apache Kafka and the Kafka Protocol: Main page at <http://kafka.apache.org/> Kafka Protocol at <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol> SOURCE CODEKafka package is hosted on GitHub: <https://github.com/TrackingSoft/Kafka>AUTHORSergey Gladkov, <sgladkov@trackingsoft.com>CONTRIBUTORSAlexander SoloveyJeremy Jordan Sergiy Zuban Vlad Marchenko COPYRIGHT AND LICENSECopyright (C) 2012-2013 by TrackingSoft LLC.This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at <http://dev.perl.org/licenses/artistic.html>. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
Visit the GSP FreeBSD Man Page Interface. |