Skip to content

AmqpBroker

class AmqpBroker : public Broker

Functions

Name Description
KARABO_CLASSINFO AmqpBroker operates currently with the following set of ... Signals are sent to the exchange ... ------- exchange = .signals routing_key = . <-- selector queue = <-- common queue the signals are emitted to the exchange bound via routing_key to the queue. The slotInstanceIds should subscribe to the AMQP::topic type exchange with the 'routing_key' and queue = Special case of above signals... signalHeartbeat ... ------------ ---------------- exchange = .signals routing_key = .signalHeartbeat queue = Calls, commands, requests, replies are sent to ---------------------------------- exchange = .slots routing_key = queue = <-- common queue all requests/calls/replies to the device send to this exchange The further message dispatching to slots is provided by using info in message header. Broadcast messages should be sent to ... ------------------ exchange = .global_slots routing_key = "" queue = there is a way of implementing "broadcast" messages like in JmsBroker. In JMS it was enough to use "
defaultQueueArgs Fill argument with default AMQP message queue creation arguments
startReading AMQP subscription: subscribe to the following exchanges... "m_domain.slots" with routingKey m_instanceId "m_domain.global_slots" handler : - success handler errorNotifier : - error handler
startReadingHeartbeats Heartbeat is used for tracking instances (tracking all instances or no tracking at all) AMQP subscription Subscribe to the exchange "m_domain.signals" with the routing key: "*.signalHeartbeat" heartbeats of all known connections handler : - success handler errorNotifier : - error handler
write Write message to broker, blocks until written topic : Either the "domain" as passed to the Broker base class, the "domain" with the suffix "_beats", or "karaboGuiDebug" header : of the message - must contain body : of the message

Function Details

KARABO_CLASSINFO

KARABO_CLASSINFO(AmqpBroker, "amqp", "2.0")

AmqpBroker operates currently with the following set of ...

Signals are sent to the exchange ...


exchange = .signals routing_key = . <-- selector queue = <-- common queue

the signals are emitted to the exchange bound via routing_key to the queue. The slotInstanceIds should subscribe to the AMQP::topic type exchange with the 'routing_key' and queue =

Special case of above signals... signalHeartbeat ... ------------ ---------------- exchange = .signals routing_key = .signalHeartbeat queue =

Calls, commands, requests, replies are sent to


exchange = .slots routing_key = queue = <-- common queue

all requests/calls/replies to the device send to this exchange The further message dispatching to slots is provided by using info in message header.

Broadcast messages should be sent to ...


exchange = .global_slots routing_key = "" queue =

there is a way of implementing "broadcast" messages like in JmsBroker. In JMS it was enough to use "|*|" in header's slotInstanceIds. In AMQP we have to be subscribed to such exchange (to receive broadcast messages). Known global slots: slotInstanceNew -- to announce the new device in Karabo network slotInstanceUpdated -- to announce the device info to be updated slotInstanceGone -- to announce device death, slotPing -- to trigger sending their status by all devices received such message

GUI debug


exchange = .karaboGuiDebug routing_key = "" queue =

GUI debugging channel

defaultQueueArgs

static void defaultQueueArgs(AMQP::Table& args)

Fill argument with default AMQP message queue creation arguments

startReading

void startReading(const consumer::MessageHandler& handler, const consumer::ErrorNotifier& errorNotifier = consumer::ErrorNotifier()) override

AMQP subscription: subscribe to the following exchanges... "m_domain.slots" with routingKey m_instanceId "m_domain.global_slots"

handler : - success handler

errorNotifier : - error handler

startReadingHeartbeats

void startReadingHeartbeats( const consumer::MessageHandler& handler, const consumer::ErrorNotifier& errorNotifier = consumer::ErrorNotifier()) override

Heartbeat is used for tracking instances (tracking all instances or no tracking at all)

AMQP subscription Subscribe to the exchange "m_domain.signals" with the routing key: "*.signalHeartbeat" heartbeats of all known connections

handler : - success handler

errorNotifier : - error handler

write

void write(const std::string& topic, const karabo::data::Hash::Pointer& header, const karabo::data::Hash::Pointer& body) override

Write message to broker, blocks until written

topic : Either the "domain" as passed to the Broker base class, the "domain" with the suffix "_beats", or "karaboGuiDebug"

header : of the message - must contain

body : of the message