Skip to content

AmqpClient

class AmqpClient : public std::enable_shared_from_this<AmqpClient>

@brief Class that exposes an AMQP client

It will create a unique queue and consume from it exclusively and with automatic acknowledgment. Its queue name will start with the given instanceId and will potentially be suffixed by some characters to ensure uniqueness.

To actually receive messages via the handlers specified in the constructor, the client has to subscribe to exchanges, potentially with routing keys to select messages on the broker side.

Note

: This client does not know about Karabo "domains" (a.k.a. "topics"), i.e. exchanges and queues created are "shared" among all clients connected to the same broker.

Types

Name Description
ChannelStatus Channel status tells what should be the next step to do in channel preparation
ExchangeStatus Exchange status tells about the status of a known exchange
SubscriptionStatus Subscription status tells in which status a registered subscription currently is

Variables

Name Description
m_postponedPubMessages Messages postponed since channel not yet ready or exchange not yet declared

Functions

Name Description
AmqpClient Create client with raw data interface from connection connection : the connection, all internal data access will run in its io context instanceId : the client id - will usually be the name of the queue that will be subscribed queueArgs : the arguments passed to queue creation readHandler : a read handler for all received messages (if an invalid function, must call setReadHandler before the first subscription)
setReadHandler (Re-)set the read handler that will be called for all received messages readHandler : A valid read function (karabo::data::ParameterException if not valid)
asyncSubscribe Asynchronously subscribes client If subscription is reported to have failed, it will be tried again - at next subscription or - if reviveIfReconnected() is called.
asyncUnsubscribe Asynchronously unsubscribes client Note: Success will be reported for an unsubscription from exchange/routing key that it was not subscribed before exchange : name of AMQP exchange that will be unsubscribed from routingKey : the AMQP routing key to unsubscribe from onUnsubscriptionDone : a valid handler called in AMQP io context (so please no mutex inside, please) when unsubscription succeeded or failed
asyncUnsubscribeAll Asynchronously unsubscribes client from all subscriptions onUnsubscriptionDone : a valid handler called in AMQP io context (so please no mutex inside, please) when all unsubscription requests are done. If any of them failed, the error code passed is the one of the last failure
asyncPublish Asynchronously publish data exchange : the exchange... routingKey : ...and the routingkey for the data data : a raw data container fo the message to be published (must be non-zero pointer) onPublishDone : handler called in AMQP io context (so please no mutex inside, please) when data published
reviveIfReconnected Revice after connection was lost and re-established Means to recreate channel, redo all subscriptions and publish postponed messages To be called if AmqpConnection is connected again after connection loss Must be called within io context of AmqpConnection
asyncPrepareChannel Prepare m_channel until it is ChannelStatus::READY onChannelPrepared : handler called when m_channel READY or if failure on the way Must be called in the io context of the AmqpConnection
moveChannelState Helper to move the created channel through its states, asynchronously calling itself.
doPublish Helper to publish, must run in io context and only when channel is READY and exchange declared
queueMessage Queue message (or drop if queueu too long), must run in io context
publishPostponed Helper to publish postponed messages until first found with an exchange that is not yet declared

Variable Details

m_postponedPubMessages

std::queue<PostponedMessage> m_postponedPubMessages

Messages postponed since channel not yet ready or exchange not yet declared

Function Details

AmqpClient

AmqpClient(AmqpConnection::Pointer connection, std::string instanceId, AMQP::Table queueArgs, ReadHandler readHandler)

Create client with raw data interface from connection

connection : the connection, all internal data access will run in its io context

instanceId : the client id - will usually be the name of the queue that will be subscribed

queueArgs : the arguments passed to queue creation

readHandler : a read handler for all received messages (if an invalid function, must call setReadHandler before the first subscription)

asyncPrepareChannel

void asyncPrepareChannel(AsyncHandler onChannelPrepared)

Prepare m_channel until it is ChannelStatus::READY

onChannelPrepared : handler called when m_channel READY or if failure on the way

Must be called in the io context of the AmqpConnection

asyncPublish

void asyncPublish(const std::string& exchange, const std::string& routingKey, const std::shared_ptr<std::vector<char>>& data, AsyncHandler onPublishDone)

Asynchronously publish data

exchange : the exchange...

routingKey : ...and the routingkey for the data

data : a raw data container fo the message to be published (must be non-zero pointer)

onPublishDone : handler called in AMQP io context (so please no mutex inside, please) when data published

asyncSubscribe

void asyncSubscribe(const std::string& exchange, const std::string& routingKey, AsyncHandler onSubscriptionDone)

Asynchronously subscribes client

If subscription is reported to have failed, it will be tried again - at next subscription or - if reviveIfReconnected() is called.

exchange : name of AMQP exchange that will be created if not yet existing

routingKey : the AMQP routing key

onSubscriptionDone : a valid handler called in AMQP io context (so please no mutex inside, please) when subscription established or failed

asyncUnsubscribe

void asyncUnsubscribe(const std::string& exchange, const std::string& routingKey, AsyncHandler onUnsubscriptionDone)

Asynchronously unsubscribes client

Note: Success will be reported for an unsubscription from exchange/routing key that it was not subscribed before

exchange : name of AMQP exchange that will be unsubscribed from

routingKey : the AMQP routing key to unsubscribe from

onUnsubscriptionDone : a valid handler called in AMQP io context (so please no mutex inside, please) when unsubscription succeeded or failed

asyncUnsubscribeAll

void asyncUnsubscribeAll(AsyncHandler onUnsubscriptionsDone)

Asynchronously unsubscribes client from all subscriptions

onUnsubscriptionDone : a valid handler called in AMQP io context (so please no mutex inside, please) when all unsubscription requests are done. If any of them failed, the error code passed is the one of the last failure

doPublish

void doPublish(const std::string& exchange, const std::string& routingKey, const std::shared_ptr<std::vector<char>>& data, const AsyncHandler& onPublishDone)

Helper to publish, must run in io context and only when channel is READY and exchange declared

moveChannelState

void moveChannelState()

Helper to move the created channel through its states, asynchronously calling itself. If READY (or failure), call and erase the m_channelPreparationCallback

publishPostponed

void publishPostponed()

Helper to publish postponed messages until first found with an exchange that is not yet declared

queueMessage

void queueMessage(PostponedMessage&& message)

Queue message (or drop if queueu too long), must run in io context

reviveIfReconnected

void reviveIfReconnected()

Revice after connection was lost and re-established

Means to recreate channel, redo all subscriptions and publish postponed messages

To be called if AmqpConnection is connected again after connection loss Must be called within io context of AmqpConnection

setReadHandler

void setReadHandler(ReadHandler readHandler)

(Re-)set the read handler that will be called for all received messages

readHandler : A valid read function (karabo::data::ParameterException if not valid)