rabbitmq-c
0.5.0
C AMQP Client library for RabbitMQ
|
Go to the source code of this file.
Data Structures | |
struct | amqp_bytes_t |
Buffer descriptor. More... | |
struct | amqp_decimal_t |
Decimal data type. More... | |
struct | amqp_table_t |
AMQP field table. More... | |
struct | amqp_array_t |
An AMQP Field Array. More... | |
struct | amqp_field_value_t |
A field table value. More... | |
struct | amqp_table_entry_t |
An entry in a field-table. More... | |
struct | amqp_pool_blocklist_t |
A list of allocation blocks. More... | |
struct | amqp_pool_t |
A memory pool. More... | |
struct | amqp_method_t |
An amqp method. More... | |
struct | amqp_frame_t |
An AMQP frame. More... | |
struct | amqp_rpc_reply_t |
Reply from a RPC method on the broker. More... | |
struct | amqp_message_t |
A message object. More... | |
struct | amqp_envelope_t |
Envelope object. More... | |
struct | amqp_connection_info |
Parameters used to connect to the RabbitMQ broker. More... | |
Macros | |
#define | AMQP_VERSION_MAJOR 0 |
Major library version number compile-time constant. More... | |
#define | AMQP_VERSION_MINOR 5 |
Minor library version number compile-time constant. More... | |
#define | AMQP_VERSION_PATCH 0 |
Patch library version number compile-time constant. More... | |
#define | AMQP_VERSION_IS_RELEASE 1 |
Version constant set to 1 for tagged release, 0 otherwise. More... | |
#define | AMQP_VERSION |
Packed version number. More... | |
#define | AMQP_VERSION_STRING AMQ_VERSION_STRING |
Version string compile-time constant. More... | |
#define | AMQP_DEFAULT_FRAME_SIZE 131072 |
Default frame size (128Kb) More... | |
#define | AMQP_DEFAULT_MAX_CHANNELS 0 |
Default maximum number of channels (0, no limit) More... | |
#define | AMQP_DEFAULT_HEARTBEAT 0 |
Default heartbeat interval (0, heartbeat disabled) More... | |
#define | AMQP_EMPTY_BYTES amqp_empty_bytes |
Deprecated, use amqp_empty_bytes instead. More... | |
#define | AMQP_EMPTY_TABLE amqp_empty_table |
Deprecated, use amqp_empty_table instead. More... | |
#define | AMQP_EMPTY_ARRAY amqp_empty_array |
Deprecated, use amqp_empty_array instead. More... | |
Typedefs | |
typedef int | amqp_boolean_t |
boolean type 0 = false, true otherwise More... | |
typedef uint32_t | amqp_method_number_t |
Method number. More... | |
typedef uint32_t | amqp_flags_t |
Bitmask for flags. More... | |
typedef uint16_t | amqp_channel_t |
Channel type. More... | |
typedef struct amqp_connection_state_t_ * | amqp_connection_state_t |
connection state object More... | |
Functions | |
uint32_t | amqp_version_number (void) |
Returns the rabbitmq-c version as a packed integer. More... | |
char const * | amqp_version (void) |
Returns the rabbitmq-c version as a string. More... | |
void | init_amqp_pool (amqp_pool_t *pool, size_t pagesize) |
Initializes an amqp_pool_t memory allocation pool for use. More... | |
void | recycle_amqp_pool (amqp_pool_t *pool) |
Recycles an amqp_pool_t memory allocation pool. More... | |
void | empty_amqp_pool (amqp_pool_t *pool) |
Empties an amqp memory pool. More... | |
void * | amqp_pool_alloc (amqp_pool_t *pool, size_t amount) |
Allocates a block of memory from an amqp_pool_t memory pool. More... | |
void | amqp_pool_alloc_bytes (amqp_pool_t *pool, size_t amount, amqp_bytes_t *output) |
Allocates a block of memory from an amqp_pool_t to an amqp_bytes_t. More... | |
amqp_bytes_t | amqp_cstring_bytes (char const *cstr) |
Wraps a c string in an amqp_bytes_t. More... | |
amqp_bytes_t | amqp_bytes_malloc_dup (amqp_bytes_t src) |
Duplicates an amqp_bytes_t buffer. More... | |
amqp_bytes_t | amqp_bytes_malloc (size_t amount) |
Allocates a amqp_bytes_t buffer. More... | |
void | amqp_bytes_free (amqp_bytes_t bytes) |
Frees an amqp_bytes_t buffer. More... | |
amqp_connection_state_t | amqp_new_connection (void) |
Allocate and initialize a new amqp_connection_state_t object. More... | |
int | amqp_get_sockfd (amqp_connection_state_t state) |
Get the underlying socket descriptor for the connection. More... | |
void | amqp_set_sockfd (amqp_connection_state_t state, int sockfd) |
Deprecated, use amqp_tcp_socket_new() or amqp_ssl_socket_new() More... | |
int | amqp_tune_connection (amqp_connection_state_t state, int channel_max, int frame_max, int heartbeat) |
Tune client side parameters. More... | |
int | amqp_get_channel_max (amqp_connection_state_t state) |
Get the maximum number of channels the connection can handle. More... | |
int | amqp_destroy_connection (amqp_connection_state_t state) |
Destroys an amqp_connection_state_t object. More... | |
int | amqp_handle_input (amqp_connection_state_t state, amqp_bytes_t received_data, amqp_frame_t *decoded_frame) |
Process incoming data. More... | |
amqp_boolean_t | amqp_release_buffers_ok (amqp_connection_state_t state) |
Check to see if connection memory can be released. More... | |
void | amqp_release_buffers (amqp_connection_state_t state) |
Release amqp_connection_state_t owned memory. More... | |
void | amqp_maybe_release_buffers (amqp_connection_state_t state) |
Release amqp_connection_state_t owned memory. More... | |
void | amqp_maybe_release_buffers_on_channel (amqp_connection_state_t state, amqp_channel_t channel) |
Release amqp_connection_state_t owned memory related to a channel. More... | |
int | amqp_send_frame (amqp_connection_state_t state, amqp_frame_t const *frame) |
Send a frame to the broker. More... | |
int | amqp_table_entry_cmp (void const *entry1, void const *entry2) |
Compare two table entries. More... | |
int | amqp_open_socket (char const *hostname, int portnumber) |
Open a socket to a remote host. More... | |
int | amqp_send_header (amqp_connection_state_t state) |
Send initial AMQP header to the broker. More... | |
amqp_boolean_t | amqp_frames_enqueued (amqp_connection_state_t state) |
Checks to see if there are any incoming frames ready to be read. More... | |
int | amqp_simple_wait_frame (amqp_connection_state_t state, amqp_frame_t *decoded_frame) |
Read a single amqp_frame_t. More... | |
int | amqp_simple_wait_frame_noblock (amqp_connection_state_t state, amqp_frame_t *decoded_frame, struct timeval *tv) |
Read a single amqp_frame_t with a timeout. More... | |
int | amqp_simple_wait_method (amqp_connection_state_t state, amqp_channel_t expected_channel, amqp_method_number_t expected_method, amqp_method_t *output) |
Waits for a specific method from the broker. More... | |
int | amqp_send_method (amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t id, void *decoded) |
Sends a method to the broker. More... | |
amqp_rpc_reply_t | amqp_simple_rpc (amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t request_id, amqp_method_number_t *expected_reply_ids, void *decoded_request_method) |
Sends a method to the broker and waits for a method response. More... | |
void * | amqp_simple_rpc_decoded (amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t request_id, amqp_method_number_t reply_id, void *decoded_request_method) |
Sends a method to the broker and waits for a method response. More... | |
amqp_rpc_reply_t | amqp_get_rpc_reply (amqp_connection_state_t state) |
Get the last global amqp_rpc_reply. More... | |
amqp_rpc_reply_t | amqp_login (amqp_connection_state_t state, char const *vhost, int channel_max, int frame_max, int heartbeat, amqp_sasl_method_enum sasl_method,...) |
Login to the broker. More... | |
amqp_rpc_reply_t | amqp_login_with_properties (amqp_connection_state_t state, char const *vhost, int channel_max, int frame_max, int heartbeat, const amqp_table_t *properties, amqp_sasl_method_enum sasl_method,...) |
Login to the broker passing a properties table. More... | |
int | amqp_basic_publish (amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_boolean_t mandatory, amqp_boolean_t immediate, struct amqp_basic_properties_t_ const *properties, amqp_bytes_t body) |
Publish a message to the broker. More... | |
amqp_rpc_reply_t | amqp_channel_close (amqp_connection_state_t state, amqp_channel_t channel, int code) |
Closes an channel. More... | |
amqp_rpc_reply_t | amqp_connection_close (amqp_connection_state_t state, int code) |
Closes the entire connection. More... | |
int | amqp_basic_ack (amqp_connection_state_t state, amqp_channel_t channel, uint64_t delivery_tag, amqp_boolean_t multiple) |
Acknowledges a message. More... | |
amqp_rpc_reply_t | amqp_basic_get (amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t no_ack) |
Do a basic.get. More... | |
int | amqp_basic_reject (amqp_connection_state_t state, amqp_channel_t channel, uint64_t delivery_tag, amqp_boolean_t requeue) |
Do a basic.reject. More... | |
int | amqp_basic_nack (amqp_connection_state_t state, amqp_channel_t channel, uint64_t delivery_tag, amqp_boolean_t multiple, amqp_boolean_t requeue) |
Do a basic.nack. More... | |
amqp_boolean_t | amqp_data_in_buffer (amqp_connection_state_t state) |
Check to see if there is data left in the receive buffer. More... | |
char * | amqp_error_string (int err) |
Get the error string for the given error code. More... | |
const char * | amqp_error_string2 (int err) |
Get the error string for the given error code. More... | |
int | amqp_decode_table (amqp_bytes_t encoded, amqp_pool_t *pool, amqp_table_t *output, size_t *offset) |
Deserialize an amqp_table_t from AMQP wireformat. More... | |
int | amqp_encode_table (amqp_bytes_t encoded, amqp_table_t *input, size_t *offset) |
Serializes an amqp_table_t to the AMQP wireformat. More... | |
int | amqp_table_clone (amqp_table_t *original, amqp_table_t *clone, amqp_pool_t *pool) |
Create a deep-copy of an amqp_table_t object. More... | |
amqp_rpc_reply_t | amqp_read_message (amqp_connection_state_t state, amqp_channel_t channel, amqp_message_t *message, int flags) |
Reads the next message on a channel. More... | |
void | amqp_destroy_message (amqp_message_t *message) |
Frees memory associated with a amqp_message_t allocated in amqp_read_message. More... | |
amqp_rpc_reply_t | amqp_consume_message (amqp_connection_state_t state, amqp_envelope_t *envelope, struct timeval *timeout, int flags) |
Wait for and consume a message. More... | |
void | amqp_destroy_envelope (amqp_envelope_t *envelope) |
Frees memory associated with a amqp_envelope_t allocated in amqp_consume_message() More... | |
void | amqp_default_connection_info (struct amqp_connection_info *parsed) |
Initialze an amqp_connection_info to default values. More... | |
int | amqp_parse_url (char *url, struct amqp_connection_info *parsed) |
Parse a connection URL. More... | |
int | amqp_socket_open (amqp_socket_t *self, const char *host, int port) |
Open a socket connection. More... | |
int | amqp_socket_open_noblock (amqp_socket_t *self, const char *host, int port, struct timeval *timeout) |
Open a socket connection. More... | |
int | amqp_socket_get_sockfd (amqp_socket_t *self) |
Get the socket descriptor in use by a socket object. More... | |
amqp_socket_t * | amqp_get_socket (amqp_connection_state_t state) |
Get the socket object associated with a amqp_connection_state_t. More... | |
amqp_table_t * | amqp_get_server_properties (amqp_connection_state_t state) |
Get the broker properties table. More... | |
Variables | |
const amqp_bytes_t | amqp_empty_bytes |
Empty bytes structure. More... | |
const amqp_table_t | amqp_empty_table |
Empty table structure. More... | |
const amqp_array_t | amqp_empty_array |
Empty table array structure. More... | |
#define AMQP_DEFAULT_FRAME_SIZE 131072 |
#define AMQP_DEFAULT_HEARTBEAT 0 |
Default heartbeat interval (0, heartbeat disabled)
#define AMQP_DEFAULT_MAX_CHANNELS 0 |
Default maximum number of channels (0, no limit)
#define AMQP_EMPTY_ARRAY amqp_empty_array |
#define AMQP_EMPTY_BYTES amqp_empty_bytes |
#define AMQP_EMPTY_TABLE amqp_empty_table |
#define AMQP_VERSION |
Packed version number.
AMQP_VERSION is a 4-byte unsigned integer with the most significant byte set to AMQP_VERSION_MAJOR, the second most significant byte set to AMQP_VERSION_MINOR, third most significant byte set to AMQP_VERSION_PATCH, and the lowest byte set to AMQP_VERSION_IS_RELEASE.
For example version 2.3.4 which is released version would be encoded as 0x02030401
#define AMQP_VERSION_IS_RELEASE 1 |
Version constant set to 1 for tagged release, 0 otherwise.
NOTE: versions that are not tagged releases are not guaranteed to be API/ABI compatible with older releases, and may change commit-to-commit.
#define AMQP_VERSION_MAJOR 0 |
Major library version number compile-time constant.
The major version is incremented when backwards incompatible API changes are made.
#define AMQP_VERSION_MINOR 5 |
Minor library version number compile-time constant.
The minor version is incremented when new APIs are added. Existing APIs are left alone.
#define AMQP_VERSION_PATCH 0 |
Patch library version number compile-time constant.
The patch version is incremented when library code changes, but the API is not changed.
#define AMQP_VERSION_STRING AMQ_VERSION_STRING |
Version string compile-time constant.
Non-released versions of the library will have "-pre" appended to the version string
typedef int amqp_boolean_t |
boolean type 0 = false, true otherwise
typedef uint16_t amqp_channel_t |
Channel type.
typedef struct amqp_connection_state_t_* amqp_connection_state_t |
connection state object
typedef uint32_t amqp_flags_t |
Bitmask for flags.
typedef uint32_t amqp_method_number_t |
Method number.
Field value types.
Enumerator | |
---|---|
AMQP_FIELD_KIND_BOOLEAN |
boolean type. 0 = false, 1 = true
|
AMQP_FIELD_KIND_I8 |
8-bit signed integer, datatype: int8_t |
AMQP_FIELD_KIND_U8 |
8-bit unsigned integer, datatype: uint8_t |
AMQP_FIELD_KIND_I16 |
16-bit signed integer, datatype: int16_t |
AMQP_FIELD_KIND_U16 |
16-bit unsigned integer, datatype: uint16_t |
AMQP_FIELD_KIND_I32 |
32-bit signed integer, datatype: int32_t |
AMQP_FIELD_KIND_U32 |
32-bit unsigned integer, datatype: uint32_t |
AMQP_FIELD_KIND_I64 |
64-bit signed integer, datatype: int64_t |
AMQP_FIELD_KIND_U64 |
64-bit unsigned integer, datatype: uint64_t |
AMQP_FIELD_KIND_F32 |
single-precision floating point value, datatype: float |
AMQP_FIELD_KIND_F64 |
double-precision floating point value, datatype: double |
AMQP_FIELD_KIND_DECIMAL |
amqp-decimal value, datatype: amqp_decimal_t |
AMQP_FIELD_KIND_UTF8 |
UTF-8 null-terminated character string, datatype: amqp_bytes_t. |
AMQP_FIELD_KIND_ARRAY |
field array (repeated values of another datatype. datatype: amqp_array_t |
AMQP_FIELD_KIND_TIMESTAMP |
64-bit timestamp. datatype uint64_t |
AMQP_FIELD_KIND_TABLE |
field table. encapsulates a table inside a table entry. datatype: amqp_table_t |
AMQP_FIELD_KIND_VOID |
empty entry |
AMQP_FIELD_KIND_BYTES |
unformatted byte string, datatype: amqp_bytes_t |
Response type.
enum amqp_status_enum |
Status codes.
Enumerator | |
---|---|
AMQP_STATUS_OK |
Operation successful. |
AMQP_STATUS_NO_MEMORY |
Memory allocation failed. |
AMQP_STATUS_BAD_AMQP_DATA |
Incorrect or corrupt data was received from the broker. This is a protocol error. |
AMQP_STATUS_UNKNOWN_CLASS |
An unknown AMQP class was received. This is a protocol error. |
AMQP_STATUS_UNKNOWN_METHOD |
An unknown AMQP method was received. This is a protocol error. |
AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED |
Unable to resolve the hostname. |
AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION |
The broker advertised an incompaible AMQP version. |
AMQP_STATUS_CONNECTION_CLOSED |
The connection to the broker has been closed. |
AMQP_STATUS_BAD_URL |
malformed AMQP URL |
AMQP_STATUS_SOCKET_ERROR |
A socket error occurred. |
AMQP_STATUS_INVALID_PARAMETER |
An invalid parameter was passed into the function. |
AMQP_STATUS_TABLE_TOO_BIG |
The amqp_table_t object cannot be serialized because the output buffer is too small. |
AMQP_STATUS_WRONG_METHOD |
The wrong method was received. |
AMQP_STATUS_TIMEOUT |
Operation timed out. |
AMQP_STATUS_TIMER_FAILURE |
The underlying system timer facility failed. |
AMQP_STATUS_HEARTBEAT_TIMEOUT |
Timed out waiting for heartbeat. |
AMQP_STATUS_UNEXPECTED_STATE |
Unexpected protocol state. |
AMQP_STATUS_TCP_ERROR |
A generic TCP error occurred. |
AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR |
An error occurred trying to initialize the socket library. |
AMQP_STATUS_SSL_ERROR |
A generic SSL error occurred. |
AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED |
SSL validation of hostname against peer certificate failed. |
AMQP_STATUS_SSL_PEER_VERIFY_FAILED |
SSL validation of peer certificate failed. |
AMQP_STATUS_SSL_CONNECTION_FAILED |
SSL handshake failed. |
int amqp_basic_ack | ( | amqp_connection_state_t | state, |
amqp_channel_t | channel, | ||
uint64_t | delivery_tag, | ||
amqp_boolean_t | multiple | ||
) |
Acknowledges a message.
Does a basic.ack on a received message
[in] | state | the connection object |
[in] | channel | the channel identifier |
[in] | delivery_tag | the delivery tag of the message to be ack'd |
[in] | multiple | if true, ack all messages up to this delivery tag, if false ack only this delivery tag |
amqp_rpc_reply_t amqp_basic_get | ( | amqp_connection_state_t | state, |
amqp_channel_t | channel, | ||
amqp_bytes_t | queue, | ||
amqp_boolean_t | no_ack | ||
) |
Do a basic.get.
Synchonously polls the broker for a message in a queue, and retrieves the message if a message is in the queue.
[in] | state | the connection object |
[in] | channel | the channel identifier to use |
[in] | queue | the queue name to retrieve from |
[in] | no_ack | if true the message is automatically ack'ed if false amqp_basic_ack should be called once the message retrieved has been processed |
int amqp_basic_nack | ( | amqp_connection_state_t | state, |
amqp_channel_t | channel, | ||
uint64_t | delivery_tag, | ||
amqp_boolean_t | multiple, | ||
amqp_boolean_t | requeue | ||
) |
Do a basic.nack.
Actively reject a message, this has the same effect as amqp_basic_reject() however, amqp_basic_nack() can negatively acknowledge multiple messages with one call much like amqp_basic_ack() can acknowledge mutliple messages with one call.
[in] | state | the connection object |
[in] | channel | the channel identifier |
[in] | delivery_tag | the delivery tag of the message to reject |
[in] | multiple | if set to 1 negatively acknowledge all unacknowledged messages on this channel. |
[in] | requeue | indicate to the broker whether it should requeue the message or dead-letter it. |
int amqp_basic_publish | ( | amqp_connection_state_t | state, |
amqp_channel_t | channel, | ||
amqp_bytes_t | exchange, | ||
amqp_bytes_t | routing_key, | ||
amqp_boolean_t | mandatory, | ||
amqp_boolean_t | immediate, | ||
struct amqp_basic_properties_t_ const * | properties, | ||
amqp_bytes_t | body | ||
) |
Publish a message to the broker.
Publish a message on an exchange with a routing key.
Note that at the AMQ protocol level basic.publish is an async method: this means error conditions that occur on the broker (such as publishing to a non-existent exchange) will not be reflected in the return value of this function.
in the return value from this function.
[in] | state | the connection object |
[in] | channel | the channel identifier |
[in] | exchange | the exchange on the broker to publish to |
[in] | routing_key | the routing key to use when publishing the message |
[in] | mandatory | indicate to the broker that the message MUST be routed to a queue. If the broker cannot do this it should respond with a basic.reject method. |
[in] | immediate | indicate to the broker that the message MUST be delivered to a consumer immediately. If the broker cannot do this it should response with a basic.reject method. |
[in] | properties | the properties associated with the message |
[in] | body | the message body |
Note: this function does heartbeat processing as of v0.4.0
int amqp_basic_reject | ( | amqp_connection_state_t | state, |
amqp_channel_t | channel, | ||
uint64_t | delivery_tag, | ||
amqp_boolean_t | requeue | ||
) |
Do a basic.reject.
Actively reject a message that has been delivered
[in] | state | the connection object |
[in] | channel | the channel identifier |
[in] | delivery_tag | the delivery tag of the message to reject |
[in] | requeue | indicate to the broker whether it should requeue the message or just discard it. |
void amqp_bytes_free | ( | amqp_bytes_t | bytes | ) |
Frees an amqp_bytes_t buffer.
Frees a buffer allocated with amqp_bytes_malloc() or amqp_bytes_malloc_dup()
Calling amqp_bytes_free on buffers not allocated with one of those two functions will result in undefined behavior
[in] | bytes | the buffer to free |
amqp_bytes_t amqp_bytes_malloc | ( | size_t | amount | ) |
Allocates a amqp_bytes_t buffer.
Creates an amqp_bytes_t buffer of the specified amount, the buffer should be freed using amqp_bytes_free()
[in] | amount | the size of the buffer in bytes |
amqp_bytes_t amqp_bytes_malloc_dup | ( | amqp_bytes_t | src | ) |
Duplicates an amqp_bytes_t buffer.
The buffer is cloned and the contents copied.
The memory associated with the output is allocated with amqp_bytes_malloc() and should be freed with amqp_bytes_free()
[in] | src |
amqp_rpc_reply_t amqp_channel_close | ( | amqp_connection_state_t | state, |
amqp_channel_t | channel, | ||
int | code | ||
) |
Closes an channel.
[in] | state | the connection object |
[in] | channel | the channel identifier |
[in] | code | the reason for closing the channel, AMQP_REPLY_SUCCESS is a good default |
amqp_rpc_reply_t amqp_connection_close | ( | amqp_connection_state_t | state, |
int | code | ||
) |
Closes the entire connection.
Implicitly closes all channels and informs the broker the connection is being closed, after receiving acknowldgement from the broker it closes the socket.
[in] | state | the connection object |
[in] | code | the reason code for closing the connection. AMQP_REPLY_SUCCESS is a good default. |
amqp_rpc_reply_t amqp_consume_message | ( | amqp_connection_state_t | state, |
amqp_envelope_t * | envelope, | ||
struct timeval * | timeout, | ||
int | flags | ||
) |
Wait for and consume a message.
Waits for a basic.deliver method on any channel, upon receipt of basic.deliver it reads that message, and returns. If any other method is received before basic.deliver, this function will return an amqp_rpc_reply_t with ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, and ret.library_error == AMQP_STATUS_UNEXPECTED_FRAME. The caller should then call amqp_simple_wait_frame() to read this frame and take appropriate action.
This function should be used after starting a consumer with the amqp_basic_consume() function
[in,out] | state | the connection object |
[in,out] | envelope | a pointer to a amqp_envelope_t object. Caller should call amqp_destroy_envelope() when it is done using the fields in the envelope object. The caller is responsible for allocating/destroying the amqp_envelope_t object itself. |
[in] | timeout | a timeout to wait for a message delivery. Passing in NULL will result in blocking behavior. |
[in] | flags | pass in 0. Currently unused. |
amqp_bytes_t amqp_cstring_bytes | ( | char const * | cstr | ) |
Wraps a c string in an amqp_bytes_t.
Takes a string, calculates its length and creates an amqp_bytes_t that points to it. The string is not duplicated.
For a given input cstr, The amqp_bytes_t output.bytes is the same as cstr, output.len is the length of the string not including the \0 terminator
This function uses strlen() internally so cstr must be properly terminated
[in] | cstr | the c string to wrap |
amqp_boolean_t amqp_data_in_buffer | ( | amqp_connection_state_t | state | ) |
Check to see if there is data left in the receive buffer.
Can be used to see if there is data still in the buffer, if so calling amqp_simple_wait_frame will not immediately enter a blocking read.
[in] | state | the connection object |
int amqp_decode_table | ( | amqp_bytes_t | encoded, |
amqp_pool_t * | pool, | ||
amqp_table_t * | output, | ||
size_t * | offset | ||
) |
Deserialize an amqp_table_t from AMQP wireformat.
This is an internal function and is not typically used by client applications
[in] | encoded | the buffer containing the serialized data |
[in] | pool | memory pool used to allocate the table entries from |
[in] | output | the amqp_table_t structure to fill in. Any existing entries will be erased |
[in,out] | offset | The offset into the encoded buffer to start reading the serialized table. It will be updated by this function to end of the table |
void amqp_default_connection_info | ( | struct amqp_connection_info * | parsed | ) |
Initialze an amqp_connection_info to default values.
The default values are:
[out] | parsed | the connection info to set defaults on |
int amqp_destroy_connection | ( | amqp_connection_state_t | state | ) |
Destroys an amqp_connection_state_t object.
Destroys a amqp_connection_state_t object that was created with amqp_new_connection(). If the connection with the broker is open, it will be implicitly closed with a reply code of 200 (success). Any memory that would be freed with amqp_maybe_release_buffers() or amqp_maybe_release_buffers_on_channel() will be freed, and use of that memory will caused undefined behavior.
[in] | state | the connection object |
void amqp_destroy_envelope | ( | amqp_envelope_t * | envelope | ) |
Frees memory associated with a amqp_envelope_t allocated in amqp_consume_message()
[in] | envelope |
void amqp_destroy_message | ( | amqp_message_t * | message | ) |
Frees memory associated with a amqp_message_t allocated in amqp_read_message.
[in] | message |
int amqp_encode_table | ( | amqp_bytes_t | encoded, |
amqp_table_t * | input, | ||
size_t * | offset | ||
) |
Serializes an amqp_table_t to the AMQP wireformat.
This is an internal function and is not typically used by client applications
[in] | encoded | the buffer where to serialize the table to |
[in] | input | the amqp_table_t to serialize |
[in,out] | offset | The offset into the encoded buffer to start writing the serialized table. It will be updated by this function to where writing left off |
char* amqp_error_string | ( | int | err | ) |
Get the error string for the given error code.
The returned string resides on the heap; the caller is responsible for freeing it.
[in] | err | return error code |
const char* amqp_error_string2 | ( | int | err | ) |
Get the error string for the given error code.
Get an error string associated with an error code. The string is statically allocated and does not need to be freed
[in] | err | the error code |
amqp_boolean_t amqp_frames_enqueued | ( | amqp_connection_state_t | state | ) |
Checks to see if there are any incoming frames ready to be read.
Checks to see if there are any amqp_frame_t objects buffered by the amqp_connection_state_t object. Having one or more frames buffered means that amqp_simple_wait_frame() or amqp_simple_wait_frame_noblock() will return a frame without potentially blocking on a read() call.
[in] | state | the connection object |
int amqp_get_channel_max | ( | amqp_connection_state_t | state | ) |
Get the maximum number of channels the connection can handle.
The maximum number of channels is set when connection negotiation takes place in amqp_login() or amqp_login_with_properties().
[in] | state | the connection object |
amqp_rpc_reply_t amqp_get_rpc_reply | ( | amqp_connection_state_t | state | ) |
Get the last global amqp_rpc_reply.
The API methods corresponding to most synchronous AMQP methods return a pointer to the decoded method result. Upon error, they return NULL, and we need some way of discovering what, if anything, went wrong. amqp_get_rpc_reply() returns the most recent amqp_rpc_reply_t instance corresponding to such an API operation for the given connection.
Only use it for operations that do not themselves return amqp_rpc_reply_t; operations that do return amqp_rpc_reply_t generally do NOT update this per-connection-global amqp_rpc_reply_t instance.
[in] | state | the connection object |
amqp_table_t* amqp_get_server_properties | ( | amqp_connection_state_t | state | ) |
Get the broker properties table.
[in] | state | the connection object |
amqp_socket_t* amqp_get_socket | ( | amqp_connection_state_t | state | ) |
Get the socket object associated with a amqp_connection_state_t.
[in] | state | the connection object to get the socket from |
int amqp_get_sockfd | ( | amqp_connection_state_t | state | ) |
Get the underlying socket descriptor for the connection.
[in] | state | the connection object |
int amqp_handle_input | ( | amqp_connection_state_t | state, |
amqp_bytes_t | received_data, | ||
amqp_frame_t * | decoded_frame | ||
) |
Process incoming data.
For a given buffer of data received from the broker, decode the first frame in the buffer. If more than one frame is contained in the input buffer the return value will be less than the received_data size, the caller should adjust received_data buffer descriptor to point to the beginning of the buffer + the return value.
[in] | state | the connection object |
[in] | received_data | a buffer of data received from the broker. The function will return the number of bytes of the buffer it used. The function copies these bytes to an internal buffer: this part of the buffer may be reused after this function successfully completes. |
[in,out] | decoded_frame | caller should pass in a pointer to an amqp_frame_t struct. If there is enough data in received_data for a complete frame, decoded_frame->frame_type will be set to something OTHER than 0. decoded_frame may contain members pointing to memory owned by the state object. This memory can be recycled with amqp_maybe_release_buffers() or amqp_maybe_release_buffers_on_channel() |
amqp_rpc_reply_t amqp_login | ( | amqp_connection_state_t | state, |
char const * | vhost, | ||
int | channel_max, | ||
int | frame_max, | ||
int | heartbeat, | ||
amqp_sasl_method_enum | sasl_method, | ||
... | |||
) |
Login to the broker.
After using amqp_open_socket and amqp_set_sockfd, call amqp_login to complete connecting to the broker
[in] | state | the connection object |
[in] | vhost | the virtual host to connect to on the broker. The default on most brokers is "/" |
[in] | channel_max | the limit for number of channels for the connection. 0 means no limit, and is a good default (AMQP_DEFAULT_MAX_CHANNELS) Note that the maximum number of channels the protocol supports is 65535 (2^16, with the 0-channel reserved) |
[in] | frame_max | the maximum size of an AMQP frame on the wire to request of the broker for this connection. 4096 is the minimum size, 2^31-1 is the maximum, a good default is 131072 (128KB), or AMQP_DEFAULT_FRAME_SIZE |
[in] | heartbeat | the number of seconds between heartbeat frames to request of the broker. A value of 0 disables heartbeats. Note rabbitmq-c only has partial support for heartbeats, as of v0.4.0 they are only serviced during amqp_basic_publish() and amqp_simple_wait_frame()/amqp_simple_wait_frame_noblock() |
[in] | sasl_method | the SASL method to authenticate with the broker. followed by the authentication information. For AMQP_SASL_METHOD_PLAIN, the AMQP_SASL_METHOD_PLAIN should be followed by two arguments in this order: const char* username, and const char* password. |
amqp_rpc_reply_t amqp_login_with_properties | ( | amqp_connection_state_t | state, |
char const * | vhost, | ||
int | channel_max, | ||
int | frame_max, | ||
int | heartbeat, | ||
const amqp_table_t * | properties, | ||
amqp_sasl_method_enum | sasl_method, | ||
... | |||
) |
Login to the broker passing a properties table.
This function is similar to amqp_login() and differs in that it provides a way to pass client properties to the broker. This is commonly used to negotiate newer protocol features as they are supported by the broker.
[in] | state | the connection object |
[in] | vhost | the virtual host to connect to on the broker. The default on most brokers is "/" |
[in] | channel_max | the limit for the number of channels for the connection. 0 means no limit, and is a good default (AMQP_DEFAULT_MAX_CHANNELS) Note that the maximum number of channels the protocol supports is 65535 (2^16, with the 0-channel reserved) |
[in] | frame_max | the maximum size of an AMQP frame ont he wire to request of the broker for this connection. 4096 is the minimum size, 2^31-1 is the maximum, a good default is 131072 (128KB), or AMQP_DEFAULT_FRAME_SIZE |
[in] | heartbeat | the number of seconds between heartbeat frame to request of the broker. A value of 0 disables heartbeats. Note rabbitmq-c only has partial support for hearts, as of v0.4.0 heartbeats are only serviced during amqp_basic_publish(), and amqp_simple_wait_frame()/amqp_simple_wait_frame_noblock() |
[in] | properties | a table of properties to send the broker. |
[in] | sasl_method | the SASL method to authenticate with the broker followed by the authentication information. For AMQP_SASL_METHOD_PLAN, the AMQP_SASL_METHOD_PLAIN parameter should be followed by two arguments in this order: const char* username, and const char* password. |
void amqp_maybe_release_buffers | ( | amqp_connection_state_t | state | ) |
Release amqp_connection_state_t owned memory.
Release memory owned by the amqp_connection_state_t object related to any channel, allowing reuse by the library. Use of any memory returned by the library before this function is called with result in undefined behavior.
[in] | state | the connection object |
void amqp_maybe_release_buffers_on_channel | ( | amqp_connection_state_t | state, |
amqp_channel_t | channel | ||
) |
Release amqp_connection_state_t owned memory related to a channel.
Release memory owned by the amqp_connection_state_t object related to the specified channel, allowing reuse by the library. Use of any memory returned the library for a specific channel will result in undefined behavior.
[in] | state | the connection object |
[in] | channel | the channel specifier for which memory should be released. Note that the library does not care about the state of the channel when calling this function |
amqp_connection_state_t amqp_new_connection | ( | void | ) |
Allocate and initialize a new amqp_connection_state_t object.
amqp_connection_state_t objects created with this function should be freed with amqp_destroy_connection()
int amqp_open_socket | ( | char const * | hostname, |
int | portnumber | ||
) |
Open a socket to a remote host.
Looks up the hostname, then attempts to open a socket to the host using the specified portnumber. It also sets various options on the socket to improve performance and correctness.
[in] | hostname | this can be a hostname or IP address. Both IPv4 and IPv6 are acceptable |
[in] | portnumber | the port to connect on. RabbitMQ brokers listen on port 5672, and 5671 for SSL |
int amqp_parse_url | ( | char * | url, |
struct amqp_connection_info * | parsed | ||
) |
Parse a connection URL.
An amqp connection url takes the form:
amqp://[$USERNAME[:$PASSWORD]@]$HOST[:$PORT]/[$VHOST]
Examples: amqp://guest:guest@localhost:5672// amqp://guest:guest@localhost/myvhost
[in] | url | URI to parse, note that this parameter is modified by the function. |
[out] | parsed | the connection info gleaned from the URI. The char* members will point to parts of the url input parameter. Memory management will depend on how the url is allocated. |
void* amqp_pool_alloc | ( | amqp_pool_t * | pool, |
size_t | amount | ||
) |
Allocates a block of memory from an amqp_pool_t memory pool.
Memory will be aligned on a 8-byte boundary. If a 0-length allocation is requested, a NULL pointer will be returned.
[in] | pool | the allocation pool to allocate the memory from |
[in] | amount | the size of the allocation in bytes. |
void amqp_pool_alloc_bytes | ( | amqp_pool_t * | pool, |
size_t | amount, | ||
amqp_bytes_t * | output | ||
) |
Allocates a block of memory from an amqp_pool_t to an amqp_bytes_t.
Memory will be aligned on a 8-byte boundary. If a 0-length allocation is requested, output.bytes = NULL.
[in] | pool | the allocation pool to allocate the memory from |
[in] | amount | the size of the allocation in bytes |
[in] | output | the location to store the pointer. On success output.bytes will be set to the beginning of the buffer output.len will be set to amount On error output.bytes will be set to NULL and output.len set to 0 |
amqp_rpc_reply_t amqp_read_message | ( | amqp_connection_state_t | state, |
amqp_channel_t | channel, | ||
amqp_message_t * | message, | ||
int | flags | ||
) |
Reads the next message on a channel.
Reads a complete message (header + body) on a specified channel. This function is intended to be used with amqp_basic_get() or when an AMQP_BASIC_DELIVERY_METHOD method is received.
[in,out] | state | the connection object |
[in] | channel | the channel on which to read the message from |
[in,out] | message | a pointer to a amqp_message_t object. Caller should call amqp_message_destroy() when it is done using the fields in the message object. The caller is responsible for allocating/destroying the amqp_message_t object itself. |
[in] | flags | pass in 0. Currently unused. |
void amqp_release_buffers | ( | amqp_connection_state_t | state | ) |
Release amqp_connection_state_t owned memory.
Release memory owned by the amqp_connection_state_t for reuse by the library. Use of any memory returned by the library before this function is called will result in undefined behavior.
[in] | state | the connection object |
amqp_boolean_t amqp_release_buffers_ok | ( | amqp_connection_state_t | state | ) |
Check to see if connection memory can be released.
Checks the state of an amqp_connection_state_t object to see if amqp_release_buffers() can be called successfully.
[in] | state | the connection object |
int amqp_send_frame | ( | amqp_connection_state_t | state, |
amqp_frame_t const * | frame | ||
) |
Send a frame to the broker.
[in] | state | the connection object |
[in] | frame | the frame to send to the broker |
int amqp_send_header | ( | amqp_connection_state_t | state | ) |
Send initial AMQP header to the broker.
This function sends the AMQP protocol header to the broker.
[in] | state | the connection object |
int amqp_send_method | ( | amqp_connection_state_t | state, |
amqp_channel_t | channel, | ||
amqp_method_number_t | id, | ||
void * | decoded | ||
) |
Sends a method to the broker.
This is a thin wrapper around amqp_send_frame(), providing a way to send a method to the broker on a specified channel.
[in] | state | the connection object |
[in] | channel | the channel object |
[in] | id | the method number |
[in] | decoded | the method object |
void amqp_set_sockfd | ( | amqp_connection_state_t | state, |
int | sockfd | ||
) |
Deprecated, use amqp_tcp_socket_new() or amqp_ssl_socket_new()
Sets the socket descriptor associated with the connection. The socket should be connected to a broker, and should not be read to or written from before calling this function. A socket descriptor can be created and opened using amqp_open_socket()
[in] | state | the connection object |
[in] | sockfd | the socket |
amqp_rpc_reply_t amqp_simple_rpc | ( | amqp_connection_state_t | state, |
amqp_channel_t | channel, | ||
amqp_method_number_t | request_id, | ||
amqp_method_number_t * | expected_reply_ids, | ||
void * | decoded_request_method | ||
) |
Sends a method to the broker and waits for a method response.
[in] | state | the connection object |
[in] | channel | the channel object |
[in] | request_id | the method number of the request |
[in] | expected_reply_ids | a 0 terminated array of expected response method numbers |
[in] | decoded_request_method | the method to be sent to the broker |
void* amqp_simple_rpc_decoded | ( | amqp_connection_state_t | state, |
amqp_channel_t | channel, | ||
amqp_method_number_t | request_id, | ||
amqp_method_number_t | reply_id, | ||
void * | decoded_request_method | ||
) |
Sends a method to the broker and waits for a method response.
[in] | state | the connection object |
[in] | channel | the channel object |
[in] | request_id | the method number of the request |
[in] | reply_id | the method number expected in response |
[in] | decoded_request_method | the request method |
int amqp_simple_wait_frame | ( | amqp_connection_state_t | state, |
amqp_frame_t * | decoded_frame | ||
) |
Read a single amqp_frame_t.
Waits for the next amqp_frame_t frame to be read from the broker. This function has the potential to block for a long time in the case of waiting for a basic.deliver method frame from the broker.
The library may buffer frames. When an amqp_connection_state_t object has frames buffered calling amqp_simple_wait_frame() will return an amqp_frame_t without entering a blocking read(). You can test to see if an amqp_connection_state_t object has frames buffered by calling the amqp_frames_enqueued() function.
The library has a socket read buffer. When there is data in an amqp_connection_state_t read buffer, amqp_simple_wait_frame() may return an amqp_frame_t without entering a blocking read(). You can test to see if an amqp_connection_state_t object has data in its read buffer by calling the amqp_data_in_buffer() function.
[in] | state | the connection object |
[out] | decoded_frame | the frame |
int amqp_simple_wait_frame_noblock | ( | amqp_connection_state_t | state, |
amqp_frame_t * | decoded_frame, | ||
struct timeval * | tv | ||
) |
Read a single amqp_frame_t with a timeout.
Waits for the next amqp_frame_t frame to be read from the broker, up to a timespan specified by tv. The function will return AMQP_STATUS_TIMEOUT if the timeout is reached. The tv value is not modified by the function.
If a 0 timeval is specified, the function behaves as if its non-blocking: it will test to see if a frame can be read from the broker, and return immediately.
If NULL is passed in for tv, the function will behave like amqp_simple_wait_frame() and block until a frame is received from the broker
The library may buffer frames. When an amqp_connection_state_t object has frames buffered calling amqp_simple_wait_frame_noblock() will return an amqp_frame_t without entering a blocking read(). You can test to see if an amqp_connection_state_t object has frames buffered by calling the amqp_frames_enqueued() function.
The library has a socket read buffer. When there is data in an amqp_connection_state_t read buffer, amqp_simple_wait_frame_noblock() may return an amqp_frame_t without entering a blocking read(). You can test to see if an amqp_connection_state_t object has data in its read buffer by calling the amqp_data_in_buffer() function.
[in,out] | state | the connection object |
[out] | decoded_frame | the frame |
[in] | tv | the maximum time to wait for a frame to be read. Setting tv->tv_sec = 0 and tv->tv_usec = 0 will do a non-blocking read. Specifying NULL for tv will make the function block until a frame is read. |
int amqp_simple_wait_method | ( | amqp_connection_state_t | state, |
amqp_channel_t | expected_channel, | ||
amqp_method_number_t | expected_method, | ||
amqp_method_t * | output | ||
) |
Waits for a specific method from the broker.
Waits for a single method on a channel from the broker. If a frame is received that does not match expected_channel or expected_method the program will abort
[in] | state | the connection object |
[in] | expected_channel | the channel that the method should be delivered on |
[in] | expected_method | the method to wait for |
[out] | output | the method |
int amqp_socket_get_sockfd | ( | amqp_socket_t * | self | ) |
Get the socket descriptor in use by a socket object.
Retrieve the underlying socket descriptor. This function can be used to perform low-level socket operations that aren't supported by the socket interface. Use with caution!
[in,out] | self | A socket object. |
int amqp_socket_open | ( | amqp_socket_t * | self, |
const char * | host, | ||
int | port | ||
) |
Open a socket connection.
This function opens a socket connection returned from amqp_tcp_socket_new() or amqp_ssl_socket_new(). This function should be called after setting socket options and prior to assigning the socket to an AMQP connection with amqp_set_socket().
[in,out] | self | A socket object. |
[in] | host | Connect to this host. |
[in] | port | Connect on this remote port. |
int amqp_socket_open_noblock | ( | amqp_socket_t * | self, |
const char * | host, | ||
int | port, | ||
struct timeval * | timeout | ||
) |
Open a socket connection.
This function opens a socket connection returned from amqp_tcp_socket_new() or amqp_ssl_socket_new(). This function should be called after setting socket options and prior to assigning the socket to an AMQP connection with amqp_set_socket().
[in,out] | self | A socket object. |
[in] | host | Connect to this host. |
[in] | port | Connect on this remote port. |
[in] | timeout | Max allowed time to spent on opening. If NULL - run in blocking mode |
int amqp_table_clone | ( | amqp_table_t * | original, |
amqp_table_t * | clone, | ||
amqp_pool_t * | pool | ||
) |
Create a deep-copy of an amqp_table_t object.
Creates a deep-copy of an amqp_table_t object, using the provided pool object to allocate the necessary memory. This memory can be freed later by call recycle_amqp_pool(), or empty_amqp_pool()
[in] | original | the table to copy |
[in,out] | clone | the table to copy to |
[in] | pool | the initialized memory pool to do allocations for the table from |
int amqp_table_entry_cmp | ( | void const * | entry1, |
void const * | entry2 | ||
) |
Compare two table entries.
Works just like strcmp(), comparing two the table keys, datatype, then values
[in] | entry1 | the entry on the left |
[in] | entry2 | the entry on the right |
int amqp_tune_connection | ( | amqp_connection_state_t | state, |
int | channel_max, | ||
int | frame_max, | ||
int | heartbeat | ||
) |
Tune client side parameters.
This function changes channel_max, frame_max, and heartbeat parameters, on the client side only. It does not try to renegotiate these parameters with the broker. Using this function will lead to unexpected results.
[in] | state | the connection object |
[in] | channel_max | the maximum number of channels. The largest this can be is 65535 |
[in] | frame_max | the maximum size of an frame. The smallest this can be is 4096 The largest this can be is 2147483647 Unless you know what you're doing the recommended size is 131072 or 128KB |
[in] | heartbeat | the number of seconds between heartbeats |
char const* amqp_version | ( | void | ) |
Returns the rabbitmq-c version as a string.
uint32_t amqp_version_number | ( | void | ) |
Returns the rabbitmq-c version as a packed integer.
See AMQP_VERSION
void empty_amqp_pool | ( | amqp_pool_t * | pool | ) |
Empties an amqp memory pool.
Releases all memory associated with an allocation pool
[in] | pool | the amqp_pool_t to empty |
void init_amqp_pool | ( | amqp_pool_t * | pool, |
size_t | pagesize | ||
) |
Initializes an amqp_pool_t memory allocation pool for use.
Readies an allocation pool for use. An amqp_pool_t must be initialized before use
[in] | pool | the amqp_pool_t structure to initialize. Calling this function on a pool a pool that has already been initialized will result in undefined behavior |
[in] | pagesize | the unit size that the pool will allocate memory chunks in. Anything allocated against the pool with a requested size will be carved out of a block this size. Allocations larger than this will be allocated individually |
void recycle_amqp_pool | ( | amqp_pool_t * | pool | ) |
Recycles an amqp_pool_t memory allocation pool.
Recycles the space allocate by the pool
This invalidates all allocations made against the pool before this call is made, any use of any allocations made before recycle_amqp_pool() is called will result in undefined behavior.
Note: this may or may not release memory, to force memory to be released call empty_amqp_pool().
[in] | pool | the amqp_pool_t to recycle |
const amqp_array_t amqp_empty_array |
Empty table array structure.
const amqp_bytes_t amqp_empty_bytes |
Empty bytes structure.
const amqp_table_t amqp_empty_table |
Empty table structure.