AmqpClient
class AmqpClient : public std::enable_shared_from_this<AmqpClient>
@brief Class that exposes an AMQP client
It will create a unique queue and consume from it exclusively and with automatic acknowledgment. Its queue name will start with the given instanceId and will potentially be suffixed by some characters to ensure uniqueness.
To actually receive messages via the handlers specified in the constructor, the client has to subscribe to exchanges, potentially with routing keys to select messages on the broker side.
Note
: This client does not know about Karabo "domains" (a.k.a. "topics"), i.e. exchanges and queues created are "shared" among all clients connected to the same broker.
Types
| Name | Description |
|---|---|
| ChannelStatus | Channel status tells what should be the next step to do in channel preparation |
| ExchangeStatus | Exchange status tells about the status of a known exchange |
| SubscriptionStatus | Subscription status tells in which status a registered subscription currently is |
Variables
| Name | Description |
|---|---|
| m_postponedPubMessages | Messages postponed since channel not yet ready or exchange not yet declared |
Functions
| Name | Description |
|---|---|
| AmqpClient | Create client with raw data interface from connection connection : the connection, all internal data access will run in its io context instanceId : the client id - will usually be the name of the queue that will be subscribed queueArgs : the arguments passed to queue creation readHandler : a read handler for all received messages (if an invalid function, must call setReadHandler before the first subscription) |
| setReadHandler | (Re-)set the read handler that will be called for all received messages readHandler : A valid read function (karabo::data::ParameterException if not valid) |
| asyncSubscribe | Asynchronously subscribes client If subscription is reported to have failed, it will be tried again - at next subscription or - if reviveIfReconnected() is called. |
| asyncUnsubscribe | Asynchronously unsubscribes client Note: Success will be reported for an unsubscription from exchange/routing key that it was not subscribed before exchange : name of AMQP exchange that will be unsubscribed from routingKey : the AMQP routing key to unsubscribe from onUnsubscriptionDone : a valid handler called in AMQP io context (so please no mutex inside, please) when unsubscription succeeded or failed |
| asyncUnsubscribeAll | Asynchronously unsubscribes client from all subscriptions onUnsubscriptionDone : a valid handler called in AMQP io context (so please no mutex inside, please) when all unsubscription requests are done. If any of them failed, the error code passed is the one of the last failure |
| asyncPublish | Asynchronously publish data exchange : the exchange... routingKey : ...and the routingkey for the data data : a raw data container fo the message to be published (must be non-zero pointer) onPublishDone : handler called in AMQP io context (so please no mutex inside, please) when data published |
| reviveIfReconnected | Revice after connection was lost and re-established Means to recreate channel, redo all subscriptions and publish postponed messages To be called if AmqpConnection is connected again after connection loss Must be called within io context of AmqpConnection |
| asyncPrepareChannel | Prepare m_channel until it is ChannelStatus::READY onChannelPrepared : handler called when m_channel READY or if failure on the way Must be called in the io context of the AmqpConnection |
| moveChannelState | Helper to move the created channel through its states, asynchronously calling itself. |
| doPublish | Helper to publish, must run in io context and only when channel is READY and exchange declared |
| queueMessage | Queue message (or drop if queueu too long), must run in io context |
| publishPostponed | Helper to publish postponed messages until first found with an exchange that is not yet declared |
Variable Details
m_postponedPubMessages
std::queue<PostponedMessage> m_postponedPubMessages
Messages postponed since channel not yet ready or exchange not yet declared
Function Details
AmqpClient
AmqpClient(AmqpConnection::Pointer connection, std::string instanceId, AMQP::Table queueArgs, ReadHandler readHandler)
Create client with raw data interface from connection
connection
: the connection, all internal data access will run in its io context
instanceId
: the client id - will usually be the name of the queue that will be subscribed
queueArgs
: the arguments passed to queue creation
readHandler
: a read handler for all received messages
(if an invalid function, must call setReadHandler before the first subscription)
asyncPrepareChannel
void asyncPrepareChannel(AsyncHandler onChannelPrepared)
Prepare m_channel until it is ChannelStatus::READY
onChannelPrepared
: handler called when m_channel READY or if failure on the way
Must be called in the io context of the AmqpConnection
asyncPublish
void asyncPublish(const std::string& exchange, const std::string& routingKey, const std::shared_ptr<std::vector<char>>& data, AsyncHandler onPublishDone)
Asynchronously publish data
exchange
: the exchange...
routingKey
: ...and the routingkey for the data
data
: a raw data container fo the message to be published (must be non-zero pointer)
onPublishDone
: handler called in AMQP io context (so please no mutex inside, please)
when data published
asyncSubscribe
void asyncSubscribe(const std::string& exchange, const std::string& routingKey, AsyncHandler onSubscriptionDone)
Asynchronously subscribes client
If subscription is reported to have failed, it will be tried again - at next subscription or - if reviveIfReconnected() is called.
exchange
: name of AMQP exchange that will be created if not yet existing
routingKey
: the AMQP routing key
onSubscriptionDone
: a valid handler called in AMQP io context (so please no mutex inside, please)
when subscription established or failed
asyncUnsubscribe
void asyncUnsubscribe(const std::string& exchange, const std::string& routingKey, AsyncHandler onUnsubscriptionDone)
Asynchronously unsubscribes client
Note: Success will be reported for an unsubscription from exchange/routing key that it was not subscribed before
exchange
: name of AMQP exchange that will be unsubscribed from
routingKey
: the AMQP routing key to unsubscribe from
onUnsubscriptionDone
: a valid handler called in AMQP io context (so please no mutex inside, please)
when unsubscription succeeded or failed
asyncUnsubscribeAll
void asyncUnsubscribeAll(AsyncHandler onUnsubscriptionsDone)
Asynchronously unsubscribes client from all subscriptions
onUnsubscriptionDone
: a valid handler called in AMQP io context (so please no mutex inside, please)
when all unsubscription requests are done. If any of them failed, the error
code passed is the one of the last failure
doPublish
void doPublish(const std::string& exchange, const std::string& routingKey, const std::shared_ptr<std::vector<char>>& data, const AsyncHandler& onPublishDone)
Helper to publish, must run in io context and only when channel is READY and exchange declared
moveChannelState
void moveChannelState()
Helper to move the created channel through its states, asynchronously calling itself. If READY (or failure), call and erase the m_channelPreparationCallback
publishPostponed
void publishPostponed()
Helper to publish postponed messages until first found with an exchange that is not yet declared
queueMessage
void queueMessage(PostponedMessage&& message)
Queue message (or drop if queueu too long), must run in io context
reviveIfReconnected
void reviveIfReconnected()
Revice after connection was lost and re-established
Means to recreate channel, redo all subscriptions and publish postponed messages
To be called if AmqpConnection is connected again after connection loss Must be called within io context of AmqpConnection
setReadHandler
void setReadHandler(ReadHandler readHandler)
(Re-)set the read handler that will be called for all received messages
readHandler
: A valid read function (karabo::data::ParameterException if not valid)