Skip to content

AmqpConnection

class AmqpConnection : public std::enable_shared_from_this<AmqpConnection>

AmqpConnection

@brief Wraps the AMQP::TcpConnection and the single threaded io context where all calls to the amqp library must run

Types

Name Description
State Connection states

Type Aliases

Name Description
ChannelCreationHandler Handler for asyncCreateChannel Either returns the channel or (if returned channel pointer is empty) state the failure reason.

Variables

Name Description
m_registeredClients Track clients to inform about reconnections

Functions

Name Description
AmqpConnection Constructing a connection and starting the thread of the io context urls : vector of broker urls to try to connect to in asyncConnect (throws karabo::util::NetworkException if vector is empty)
getCurrentUrl Return currently used broker URL (either already connected to it or the currently/next tried one)
isConnected Whether connection established
connectionInfo Various info about internal connection (for debug logs)
asyncConnect Asynchronously connect to any of the broker addresses passed to the constructor.
asyncCreateChannel Trigger creation of an amqp channel and return it via the handler.
registerForReconnectInfo Register client to be informed about re-established connection after connection loss
cleanReconnectRegistrations Clean clients registered to receive reconnect info, i.e. remove all dangling weak pointers Can e.g. be called in the destructor of a client that registered before.
post Post a task to the io context The task must not contain blocking code since otherwise the thread running the AMQP communication is blocked.
dispatch Detach a task to the io context, i.e. run it now or later depending on which thread we are in The task must not contain blocking code since otherwise the thread running the AMQP communication is blocked.
doAsyncConnect Helper to asyncConnect iterating over urls until success or all urls tried.
informReconnection Must run in io context
triggerReconnection Must run in io context
stateString Convert State to a string (or rather const char*)

Type Alias Details

ChannelCreationHandler

using ChannelCreationHandler = std::function<void(const std::shared_ptr<AMQP::Channel>&, const std::string& errMsg)>

Handler for asyncCreateChannel

Either returns the channel or (if returned channel pointer is empty) state the failure reason.

Variable Details

m_registeredClients

std::set<std::weak_ptr<AmqpClient>, std::owner_less<std::weak_ptr<AmqpClient>>> m_registeredClients

Track clients to inform about reconnections

Function Details

AmqpConnection

AmqpConnection(std::vector<std::string> urls)

Constructing a connection and starting the thread of the io context

urls : vector of broker urls to try to connect to in asyncConnect (throws karabo::util::NetworkException if vector is empty)

asyncConnect

void asyncConnect(AsyncHandler onComplete)

Asynchronously connect to any of the broker addresses passed to the constructor.

Addresses will be tried in the order they have been passed. Can be called from any thread.

onComplete : AsyncHAndler called (not from within asyncConnect) about success or failure of connection attempt. If all addresses failed, the error code passed is the one of the last address passed to the constructor. The handler (if valid) will be called from within the internal io context, but not within the scope of asyncConnect.

asyncCreateChannel

void asyncCreateChannel(ChannelCreationHandler onComplete)

Trigger creation of an amqp channel and return it via the handler.

If not connected yet, try to connect first. Note that if connection lost, channel creation will not be tried again, but failure is reported.

Can be called from any thread.

onComplete : A valid (!) ChannelCreationHandler that will be called from within the internal io context, but not within the scope of asyncCreateChannel.

cleanReconnectRegistrations

void cleanReconnectRegistrations()

Clean clients registered to receive reconnect info, i.e. remove all dangling weak pointers

Can e.g. be called in the destructor of a client that registered before.

connectionInfo

std::string connectionInfo() const

Various info about internal connection (for debug logs)

dispatch

template <typename CompletionToken> void dispatch(CompletionToken&& task) const

Detach a task to the io context, i.e. run it now or later depending on which thread we are in

The task must not contain blocking code since otherwise the thread running the AMQP communication is blocked.

doAsyncConnect

void doAsyncConnect()

Helper to asyncConnect iterating over urls until success or all urls tried. Then calls m_onConnectionComplete.

Prerequisite: m_urlIndex < m_urls.size()

getCurrentUrl

std::string getCurrentUrl() const

Return currently used broker URL (either already connected to it or the currently/next tried one)

informReconnection

void informReconnection(const boost::system::error_code& ec)

Must run in io context

isConnected

bool isConnected() const

Whether connection established

post

template <typename CompletionToken> void post(CompletionToken&& task) const

Post a task to the io context

The task must not contain blocking code since otherwise the thread running the AMQP communication is blocked.

registerForReconnectInfo

void registerForReconnectInfo(std::weak_ptr<AmqpClient> client)

Register client to be informed about re-established connection after connection loss

stateString

const char* stateString(AmqpConnection::State state)

Convert State to a string (or rather const char*)

triggerReconnection

void triggerReconnection()

Must run in io context