Broker-based Communication
Karabo’s distributed communication relies on message brokers for housekeeping (heartbeats, slow‑control parameters) and on point‑to‑point TCP/IP data pipelines for high‑throughput data (images, digitizer or pulse‑resolved data). Connection details are exchanged via the broker.
Data Pipelines
- Each device can define multiple output and input channels,
identified by
"<device_id>:<channel_name>", e.g.,A/DEVICE/WITH_OUTPUT:output_1. - Output → Input connections are configured by listing desired output‑channel IDs in each input channel’s configuration. Karabo auto‑establishes TCP connections when both devices are active.
- Data is sent as a Karabo Hash plus metadata (common timestamp, data source = output channel ID).
- Handlers on input channels:
- data handler:
fn(data, metadata)invoked per data item. - input handler:
fn(channel)must loop over available data items (batch size per configuration). - end-of-stream handler:
fn(channel)notified when the sender signals end‑of‑stream. - In C++/Python, device methods
writeChannel,signalEndOfStream(and underlying channelwrite,update,signalEndOfStream) are not thread‑safe and must not be called concurrently for the same channel. - Scatter/gather supported: many inputs ← one output; many outputs → one input.
Channel Configurations
Define channels in the device schema (e.g. in C++/Python expectedParameters).
You can also create channels dynamically, but for unknown future outputs it’s
recommended to use remote().registerChannelMonitor().
Input Channel Properties
- connectedOutputChannels (VECTOR_STRING)
List of output channel IDs to receive from, e.g.
["A/DEVICE/WITH_OUTPUT:output_1"]. - dataDistribution (STRING)
copy(default): each input receives all data.shared: data items are load‑distributed among all inputs in this mode.- onSlowness (STRING) (only in
copymode)
Policy if this input is too slow: drop(default since 2.10.0): drop new items.queueDrop(since 2.10.0): buffer items; if full, drop oldest.- minData (UINT32)
Minimum items per call in an input handler.
Default 1;0=all available;0xFFFFFFFF=unbounded. - respondToEndOfStream (BOOL)
Whether to invoke the end‑of‑stream handler. Defaulttrue. - delayOnInput (INT32)
Milliseconds to delay before signalling readiness for more data. Default 0.
Output Channel Properties
- distributionMode (STRING)
(applies when inputs useshareddistribution) load-balanced(default): send to any ready input.round-robin: cycle through inputs in order.- noInputShared (STRING)
(if nosharedinputs are ready) drop(default since 2.10.0): drop new items.queueDrop(since 2.10.0): buffer items; if full, drop oldest.- hostname (STRING)
Address for inputs to connect to.
Default"default"(device’s host); set to a specific interface if needed. - port (UINT32)
TCP port for inputs. Default 0 (auto‑assigned); if nonzero, must be free.
Schema Description of Channels
Although Karabo can send arbitrary hash structures, you should:
- Output channels: declare the schema of sent data (required for DAQ storage or GUI visualization).
- Input channels: declare expected data schemas.
In C++/Python, specify schemas in the device’s expectedParameters()
when defining channels.