AmqpConnection
class AmqpConnection : public std::enable_shared_from_this<AmqpConnection>
AmqpConnection
@brief Wraps the AMQP::TcpConnection and the single threaded io context where all calls to the amqp library must run
Types
| Name | Description |
|---|---|
| State | Connection states |
Type Aliases
| Name | Description |
|---|---|
| ChannelCreationHandler | Handler for asyncCreateChannel Either returns the channel or (if returned channel pointer is empty) state the failure reason. |
Variables
| Name | Description |
|---|---|
| m_registeredClients | Track clients to inform about reconnections |
Functions
| Name | Description |
|---|---|
| AmqpConnection | Constructing a connection and starting the thread of the io context urls : vector of broker urls to try to connect to in asyncConnect (throws karabo::util::NetworkException if vector is empty) |
| getCurrentUrl | Return currently used broker URL (either already connected to it or the currently/next tried one) |
| isConnected | Whether connection established |
| connectionInfo | Various info about internal connection (for debug logs) |
| asyncConnect | Asynchronously connect to any of the broker addresses passed to the constructor. |
| asyncCreateChannel | Trigger creation of an amqp channel and return it via the handler. |
| registerForReconnectInfo | Register client to be informed about re-established connection after connection loss |
| cleanReconnectRegistrations | Clean clients registered to receive reconnect info, i.e. remove all dangling weak pointers Can e.g. be called in the destructor of a client that registered before. |
| post | Post a task to the io context The task must not contain blocking code since otherwise the thread running the AMQP communication is blocked. |
| dispatch | Detach a task to the io context, i.e. run it now or later depending on which thread we are in The task must not contain blocking code since otherwise the thread running the AMQP communication is blocked. |
| doAsyncConnect | Helper to asyncConnect iterating over urls until success or all urls tried. |
| informReconnection | Must run in io context |
| triggerReconnection | Must run in io context |
| stateString | Convert State to a string (or rather const char*) |
Type Alias Details
ChannelCreationHandler
using ChannelCreationHandler = std::function<void(const std::shared_ptr<AMQP::Channel>&, const std::string& errMsg)>
Handler for asyncCreateChannel
Either returns the channel or (if returned channel pointer is empty) state the failure reason.
Variable Details
m_registeredClients
std::set<std::weak_ptr<AmqpClient>, std::owner_less<std::weak_ptr<AmqpClient>>> m_registeredClients
Track clients to inform about reconnections
Function Details
AmqpConnection
AmqpConnection(std::vector<std::string> urls)
Constructing a connection and starting the thread of the io context
urls
: vector of broker urls to try to connect to in asyncConnect
(throws karabo::util::NetworkException if vector is empty)
asyncConnect
void asyncConnect(AsyncHandler onComplete)
Asynchronously connect to any of the broker addresses passed to the constructor.
Addresses will be tried in the order they have been passed. Can be called from any thread.
onComplete
: AsyncHAndler called (not from within asyncConnect) about success or failure of connection
attempt. If all addresses failed, the error code passed is the one of the last address
passed to the constructor.
The handler (if valid) will be called from within the internal io context,
but not within the scope of asyncConnect.
asyncCreateChannel
void asyncCreateChannel(ChannelCreationHandler onComplete)
Trigger creation of an amqp channel and return it via the handler.
If not connected yet, try to connect first. Note that if connection lost, channel creation will not be tried again, but failure is reported.
Can be called from any thread.
onComplete
: A valid (!) ChannelCreationHandler that will be called from within the internal io context,
but not within the scope of asyncCreateChannel.
cleanReconnectRegistrations
void cleanReconnectRegistrations()
Clean clients registered to receive reconnect info, i.e. remove all dangling weak pointers
Can e.g. be called in the destructor of a client that registered before.
connectionInfo
std::string connectionInfo() const
Various info about internal connection (for debug logs)
dispatch
template <typename CompletionToken> void dispatch(CompletionToken&& task) const
Detach a task to the io context, i.e. run it now or later depending on which thread we are in
The task must not contain blocking code since otherwise the thread running the AMQP communication is blocked.
doAsyncConnect
void doAsyncConnect()
Helper to asyncConnect iterating over urls until success or all urls tried. Then calls m_onConnectionComplete.
Prerequisite: m_urlIndex < m_urls.size()
getCurrentUrl
std::string getCurrentUrl() const
Return currently used broker URL (either already connected to it or the currently/next tried one)
informReconnection
void informReconnection(const boost::system::error_code& ec)
Must run in io context
isConnected
bool isConnected() const
Whether connection established
post
template <typename CompletionToken> void post(CompletionToken&& task) const
Post a task to the io context
The task must not contain blocking code since otherwise the thread running the AMQP communication is blocked.
registerForReconnectInfo
void registerForReconnectInfo(std::weak_ptr<AmqpClient> client)
Register client to be informed about re-established connection after connection loss
stateString
const char* stateString(AmqpConnection::State state)
Convert State to a string (or rather const char*)
triggerReconnection
void triggerReconnection()
Must run in io context