GSP
Quick Navigator

Search Site

Unix VPS
A - Starter
B - Basic
C - Preferred
D - Commercial
MPS - Dedicated
Previous VPSs
* Sign Up! *

Support
Contact Us
Online Help
Handbooks
Domain Status
Man Pages

FAQ
Virtual Servers
Pricing
Billing
Technical

Network
Facilities
Connectivity
Topology Map

Miscellaneous
Server Agreement
Year 2038
Credits
 

USA Flag

 

 

Man Pages
RabbitMQ(3) User Contributed Perl Documentation RabbitMQ(3)

Net::AMQP::RabbitMQ - interact with RabbitMQ over AMQP using librabbitmq

  use Net::AMQP::RabbitMQ;
  my $mq = Net::AMQP::RabbitMQ->new();
  $mq->connect("localhost", { user => "guest", password => "guest" });
  $mq->channel_open(1);
  $mq->queue_declare(1, "queuename");
  $mq->publish(1, "queuename", "Hi there!");
  my $gotten = $mq->get(1, "queuename");
  print $gotten->{body} . "\n";
  $mq->disconnect();

"Net::AMQP::RabbitMQ" provides a simple wrapper around the librabbitmq library that allows connecting, declaring exchanges and queues, binding and unbinding queues, publishing, consuming and receiving events.

Error handling in this module is primarily achieve by "Perl_croak" (die). You should be making good use of "eval" around these methods to ensure that you appropriately catch the errors.

"cpanm Net::AMQP::RabbitMQ" or "cpan Net::AMQP::RabbitMQ"

Note that the "Net::AMQP::RabbitMQ" module includes the associated librabbitmq C library. Thus there is no need to install this separately beforehand.

All methods, unless specifically stated, return nothing on success and die on failure.

Failure to be connected is a fatal failure for most methods.

Creates a new Net::AMQP::RabbitMQ object.

Connect to RabbitMQ server.

$hostname is the host to which a connection will be attempted.

$options is an optional hash respecting the following keys:

    {
        user            => $user,        #default 'guest'
        password        => $password,    #default 'guest'
        port            => $port,        #default 5672
        vhost           => $vhost,       #default '/'
        channel_max     => $cmax,        #default 0
        frame_max       => $fmax,        #default 131072
        heartbeat       => $heartbeat,   #default 0
        timeout         => $seconds,     #default undef (no timeout)

        ssl             => 1 | 0,        #default 0
        ssl_verify_host => 1 | 0,        #default 1
        ssl_cacert      => $caert_path,  #needed for ssl
        ssl_init        => 1 | 0,        #default 1, initialise the openssl library
        
        ssl_cert        => $cert_path,   #client cert.pem and key.pem when using ssl certificate chains 
        ssl_key         => $key_path     #(with RabbitMQ's fail_if_no_peer_cert = true)
    }

You probably don't want to touch "ssl_init", unless you know what it does.

For now there is no option to disable ssl peer checking, meaning to use "ssl", "ssl_cacert" is required.

SSL NOTE

if the connection is cut when using ssl, openssl will throw a "SIGPIPE", you should catch this or perl will exit with error code 141

    $SIG{PIPE} = 'IGNORE';

Disconnect from the RabbitMQ server.

Get a reference to hash (hashref) of server properties. These may vary, you should use "Data::Dumper" to inspect. Properties will be provided for the RabbitMQ server to which you are connected.

Get a reference to hash (hashref) of client properties. These may vary, you should use "Data::Dumper" to inspect.

Returns true if a valid socket connection appears to exist, false otherwise.

Open an AMQP channel on the connection.

$channel is a positive integer describing the channel you which to open.

Close the specified channel.

$channel is a positive integer describing the channel you which to close.

Returns the maximum allowed channel number.

Declare an AMQP exchange on the RabbitMQ server unless it already exists. Bad things will happen if the exchange name already exists and different parameters are provided.

$channel is a channel that has been opened with "channel_open".

$exchange is the name of the exchange to be instantiated.

$options is an optional hash respecting the following keys:

     {
       exchange_type => $type,  #default 'direct'
       passive => $boolean,     #default 0
       durable => $boolean,     #default 0
       auto_delete => $boolean, #default 0
     }

Note that the default for the "auto_delete" option is different for "exchange_declare" and for "queue_declare".

$arguments is an optional hash of additional arguments to the RabbitMQ server, such as:

     {
       # exchange to try if no routes apply on this exchange
       alternate_exchange => 'alternate_exchange_name',
     }

Delete a AMQP exchange on the RabbitMQ server.

$channel is a channel that has been opened with "channel_open".

$exchange is the name of the exchange to be deleted.

$options is an optional hash respecting the following keys:

     {
       if_unused => $boolean,   #default 1
     }

Bind a source exchange to a destination exchange with a given routing key and/or parameters.

$channel is a channel that has been opened with "channel_open".

$destination is a previously declared exchange, $source is yet another previously declared exchange, and $routing_key is the routing key that will bind the specified source exchange to the specified destination exchange.

$arguments is an optional hash which will be passed to the server. When binding to an exchange of type "headers", this can be used to only receive messages with the supplied header values.

Remove a binding between source and destination exchanges.

$channel is a channel that has been opened with "channel_open".

$destination is a previously declared exchange, $source is yet another previously declared exchange, and $routing_key is the routing key that will unbind the specified source exchange from the specified destination exchange.

$arguments is an optional hash which will be passed to the server. When binding to an exchange of type "headers", this can be used to only receive messages with the supplied header values.

Declare an AMQP queue on the RabbitMQ server.

In scalar context, this method returns the queuename declared (important for retrieving the auto-generated queuename in the event that one was requested).

In array context, this method returns three items: queuename, the number of message waiting on the queue, and the number of consumers bound to the queue.

$channel is a channel that has been opened with "channel_open".

$queuename is the name of the queuename to be instantiated. If $queuename is undef or an empty string, then an auto generated queuename will be used.

$options is an optional hash respecting the following keys:

     {
       passive => $boolean,     #default 0
       durable => $boolean,     #default 0
       exclusive => $boolean,   #default 0
       auto_delete => $boolean, #default 1
     }

Note that the default for the "auto_delete" option is different for "exchange_declare" and for "queue_declare".

$arguments is an optional hash which will be passed to the server when the queue is created. This can be used for creating mirrored queues by using the x-ha-policy header.

Bind the specified queue to the specified exchange with a routing key.

$channel is a channel that has been opened with "channel_open".

$queuename is a previously declared queue, $exchange is a previously declared exchange, and $routing_key is the routing key that will bind the specified queue to the specified exchange.

$arguments is an optional hash which will be passed to the server. When binding to an exchange of type "headers", this can be used to only receive messages with the supplied header values.

Remove a binding between a queue and an exchange. If this fails, you must reopen the channel.

This is like the "queue_bind" with respect to arguments. This command unbinds the queue from the exchange. The $routing_key and $arguments must match the values supplied when the binding was created.

Delete a specified queue. If this fails, you must reopen the channel.

$options is an optional hash respecting the following keys:

     {
       if_unused    => $boolean,     #default 1
       if_empty     => $boolean,     #default 1
     }

Publish a message to an exchange.

$channel is a channel that has been opened with "channel_open".

$routing_key is the name of the routing key for this message.

$body is the payload to enqueue.

$options is an optional hash respecting the following keys:

     {
       exchange => $exchange,                     #default 'amq.direct'
       mandatory => $boolean,                     #default 0
       immediate => $boolean,                     #default 0
       force_utf8_in_header_strings => $boolean,  #default 0
     }

The "force_utf8_in_header_strings" option causes all headers which look like strings to be treated as UTF-8. In an attempt to make this a non-breaking change, this option is disabled by default. However, for all headers beginning with "x-", those are treated as UTF-8 regardless of this option (per spec).

$props is an optional hash (the AMQP 'props') respecting the following keys:

     {
       content_type => $string,
       content_encoding => $string,
       correlation_id => $string,
       reply_to => $string,
       expiration => $string,
       message_id => $string,
       type => $string,
       user_id => $string,
       app_id => $string,
       delivery_mode => $integer,
       priority => $integer,
       timestamp => $integer,
       headers => $headers # This should be a hashref of keys and values.
     }

Put the channel into consume mode.

The "consumer_tag" is returned. This command does not return AMQP messages, for that the "recv" method should be used.

$channel is a channel that has been opened with "channel_open".

$queuename is the name of the queue from which we'd like to consume.

$options is an optional hash respecting the following keys:

     {
       consumer_tag => $tag,    #absent by default
       no_local => $boolean,    #default 0
       no_ack => $boolean,      #default 1
       exclusive => $boolean,   #default 0
     }

Receive AMQP messages.

This method returns a reference to a hash (hashref) containing the following information:

     {
       body => 'Magic Transient Payload', # the reconstructed body
       routing_key => 'nr_test_q',        # route the message took
       exchange => 'nr_test_x',           # exchange used
       delivery_tag => 1,                 # (used for acks)
       redelivered => $boolean            # if message is redelivered
       consumer_tag => 'c_tag',           # tag from consume()
       props => $props,                   # hashref sent in
     }

$props is the hash sent by publish() respecting the following keys:

     {
       content_type => $string,
       content_encoding => $string,
       correlation_id => $string,
       reply_to => $string,
       expiration => $string,
       message_id => $string,
       type => $string,
       user_id => $string,
       app_id => $string,
       delivery_mode => $integer,
       priority => $integer,
       timestamp => $integer,
     }

$timeout is a positive integer, specifying the number of milliseconds to wait for a message. If you do not provide a timeout (or set it to 0), then this call will block until it receives a message. If you set it to -1 it will return immediately (waiting 0 ms).

If you provide a timeout, then the "recv" method returns "undef" if the timeout expires before a message is received from the server.

Take the channel out of consume mode previously enabled with "consume".

This method returns true or false indicating whether we got the expected "cancel-ok" response from the server.

$channel is a channel that has been opened with "channel_open".

$consumer_tag is a tag previously passed to "consume()" or one that was generated automatically as a result of calling "consume()" without an explicit tag.

Get a message from the specified queue (via "amqp_basic_get()").

The method returns "undef" immediately if no messages are available on the queue. If a message is available a reference to a hash (hashref) is returned with the following contents:

    {
      body => 'Magic Transient Payload', # the reconstructed body
      routing_key => 'nr_test_q',        # route the message took
      exchange => 'nr_test_x',           # exchange used
      content_type => 'foo',             # (only if specified)
      delivery_tag => 1,                 # (used for acks)
      redelivered => 0,                  # if message is redelivered
      message_count => 0,                # message count

      # Not all of these will be present. Consult the RabbitMQ reference for more details.
      props => {
        content_type => 'text/plain',
        content_encoding => 'none',
        correlation_id => '123',
        reply_to => 'somequeue',
        expiration => 1000,
        message_id => 'ABC',
        type => 'notmytype',
        user_id => 'guest',
        app_id => 'idd',
        delivery_mode => 1,
        priority => 2,
        timestamp => 1271857990,
        headers => {
          unsigned_integer => 12345,
          signed_integer   => -12345,
          double           => 3.141,
          string           => "string here",

          # The x-death header is a special header for dead-lettered messages (rejected or timed out).
          'x-death' => [
            {
              time           => 1271857954,
              exchange       => $exchange,
              queue          => $exchange,
              reason         => 'expired',
              'routing-keys' => [q{}],
            },
          ],
        },
      },
    }

$channel is a channel that has been opened with "channel_open".

$queuename is the name of the queue from which we'd like to consume.

$options is an optional hash respecting the following keys:

     {
       no_ack => $boolean,  #default 1
     }

Acknowledge a message.

$channel is a channel that has been opened with "channel_open".

$delivery_tag the delivery tag seen from a returned message from the "recv" method.

$multiple specifies if multiple are to be acknowledged at once.

Purge all messages from the specified queue.

$channel is a channel that has been opened with "channel_open".

$queuename is the queue to be purged.

Reject a message with the specified delivery tag.

$channel is a channel that has been opened with "channel_open".

$delivery_tag the delivery tag seen from a returned message from the "recv" method.

$requeue specifies if the message should be requeued.

Start a server-side (tx) transaction over $channel.

$channel is a channel that has been opened with "channel_open".

Commit a server-side (tx) transaction over $channel.

$channel is a channel that has been opened with "channel_open".

Rollback a server-side (tx) transaction over $channel.

$channel is a channel that has been opened with "channel_open".

Return the RPC timeout on the current connection.

The value returned will be either "undef", if the RPC timeout is unlimited, or a hashref with "tv_sec" for the number of seconds and "tv_usec" for the number of microseconds.

Set the RPC timeout for the current connection, using the seconds ("tv_sec") and microseconds ("tv_usec") provided. The arguments supplied can be either in the form of a hash or a hashref, so all of the following are valid:

    $mq->set_rpc_timeout(tv_sec => 10, tv_usec => 500000)
    $mq->set_rpc_timeout( { tv_sec => 10, tv_usec => 500000 } )
    $mq->set_rpc_timeout(tv_sec => 10)
    $mq->set_rpc_timeout(tv_usec => 500000)

In order to remove the time limit for RPC calls, simply pass "undef".

    $mq->set_rpc_timeout( undef )

Set quality of service flags on the current $channel.

$channel is a channel that has been opened with "channel_open".

$options is an optional hash respecting the following keys:

     {
       prefetch_count => $cnt,  #default 0
       prefetch_size  => $size, #default 0
       global         => $bool, #default 0
     }

Send a heartbeat. If you've connected with a heartbeat parameter, you must send a heartbeat periodically matching connection parameter or the server may snip the connection.

Note that since "recv" blocks for up to $timeout milliseconds, it automatically handles sending heartbeats for you while active.

Returns true if the module was compiled with SSL support, false otherwise

It should be noted that almost all errors in this library are considered fatal, insomuch as they trigger a "croak()". In these errors, if it appears that somehow the connection has been closed by the remote host, or otherwise invalidated, the socket will also be closed and should be re-opened before any additional calls are made.

    use Net::AMQP::RabbitMQ;

    my $channel = 1;
    my $exchange = "MyExchange.x";  # This exchange must exist already
    my $routing_key = "foobar";

    my $mq = Net::AMQP::RabbitMQ->new();
    $mq->connect("localhost", { user => "guest", password => "guest" });
    $mq->channel_open(1);
    $mq->publish($channel, $routing_key, "Message Here", { exchange => $exchange });
    $mq->disconnect();

    use Net::AMQP::RabbitMQ;
    use Data::Dumper;

    my $channel = 1;
    my $exchange = "MyExchange.x";  # This exchange must exist already
    my $routing_key = "foobar";

    my $mq = Net::AMQP::RabbitMQ->new();
    $mq->connect("localhost", { user => "guest", password => "guest" });
    $mq->channel_open($channel);

    # Declare queue, letting the server auto-generate one and collect the name
    my $queuename = $mq->queue_declare($channel, "");

    # Bind the new queue to the exchange using the routing key
    $mq->queue_bind($channel, $queuename, $exchange, $routing_key);

    # Request that messages be sent and receive them until interrupted
    $mq->consume($channel, $queuename);

    while ( my $message = $mq->recv(0) )
      {
        print "Received message:\n";
        print Dumper($message);
      }

    $mq->disconnect();

The test suite runs live tests against a RabbitMQ server at "https://www.cloudamqp.com/".

There are separte variables for the ssl and none ssl host/user/password/port.

If you are in an environment that won't let you connect to this host (or the test server is down), you can use these environment variables:

MQHOST
Hostname or IP address of the RabbitMQ server to connect to (defaults to "hornet.rmq.cloudamqp.com").
MQUSERNAME
Username for authentication (defaults to username for <https://www.cloudamqp.com>).
MQPASSWORD
Password for authentication (defaults to password for <https://www.cloudamqp.com>).
MQPORT
Port of the RabbitMQ server to connect to (defaults to 5672)
MQVHOST
Vhost to use (defaults to vhost for for <https://www.cloudamqp.com>).
MQSSL
Whether the tests should run with SSL enabled (defaults to false, but see also "MQSKIPSSL").
MQSKIPSSL
Whether the SSL tests should be skipped entirely. This option exists because the SSL tests used to ignore "MQSSL", and to maintain backwards compatibility, still do.
MQSSLHOST
Hostname or IP address of the RabbitMQ server to connect to (defaults to "hornet.rmq.cloudamqp.com").
MQSSLUSERNAME
Username for authentication (defaults to username for <https://www.cloudamqp.com>).
MQSSLPASSWORD
Password for authentication (defaults to password for <https://www.cloudamqp.com>).
MQSSLPORT
Port of the RabbitMQ server to connect to (defaults to 5671)
MQSSLCACERT
Path to the certificate file for SSL-enabled connections, defaults to t/ssl/cloudamqp.cacert.pem.
MQSSLVERIFYHOST
Whether SSL hostname verification should be enabled (defaults to true).
MQSSLINIT
Whether the openssl library should be initialized (defaults to true).
MQSSLVHOST
Vhost to use when in SSL mode (defaults to vhost for for <https://www.cloudamqp.com>).
MQADMINPROTOCOL
Protocol to use for accessing the admin. Defaults to https
MQADMINPORT
Port to use for accessing the admin interface. Defaults to 443

This module was forked from Net::RabbitMQ version 0.2.6 which uses an older version of librabbitmq, and doesn't work correctly with newer versions of RabbitMQ. The main change between this module and the original is this library uses a newer, unforked, version of librabbitmq.

This means this module only works with the AMQP 0.9.1 protocol, so requires RabbitMQ version 2+. Also, since the version of librabbitmq used is not a custom fork, it means this module doesn't support the basic_return callback method.

Theo Schlossnagle <jesus@omniti.com>

Mark Ellis <markellis@cpan.org>

Mike "manchicken" Stemle, Jr. <mstemle@cpan.org>

Dave Rolsky <autarch@urth.org>

Slaven Rezić

Armand Leclercq

Daniel W Burke

Dávid Kovács

Alexey Sheynuk

Karen Etheridge <ether@cpan.org>

Eric Brine <ikegami@cpan.org>

Peter Valdemar Mørch <pmorch@cpan.org>

Special thanks to <https://www.cloudamqp.com> for providing us with the RabbitMQ server the tests run against.

This software is licensed under the Mozilla Public License. See the LICENSE file in the top distribution directory for the full license text.

librabbitmq is licensed under the MIT License. See the LICENSE-MIT file in the top distribution directory for the full license text.

2021-10-23 perl v5.32.1

Search for    or go to Top of page |  Section 3 |  Main Index

Powered by GSP Visit the GSP FreeBSD Man Page Interface.
Output converted with ManDoc.