Skip to content

OutputChannel

class OutputChannel : public std::enable_shared_from_this<OutputChannel>

@brief An OutputChannel for passing data to pipelined processing

The OutputChannel class is used for writing data to pipelined processing inputs. It supports tracking of meta data for each data token written to it. Specifically, it e.g. allows for keeping track of data producers, here called sources, and timing and train information. Meta data information enables aggregation of multiple data source into one output channel interaction with a remote host, as well as aggregation of multiple train-related data of the same source. A mixture of both scenarios is possible.

An example of these use cases

OutputChannel::Pointer output = ... //

Hash data1;
....
OutputChannel::MetaData meta1("THIS/IS/SOURCE/A/channel1", karabo::data::Timestamp());
output->write(data1, meta1)

Hash data2_10;
....
OutputChannel::MetaData meta2_10("THIS/IS/SOURCE/B/channel2", timestampForTrain10);
output->write(data2_10, meta2)
OutputChannel::MetaData meta2_11("THIS/IS/SOURCE/B/channel2", timestampForTrain11);
output->write(data2_11, meta2_11)

Hash data_this_source;
...
// not passing any meta data to write will default the source to [deviceId]/[channelName]
// and the timestamp to the current timestamp
output->write(data_this_source);

// now actually send over the network
output->update();

Functions

Name Description
expectedParameters Necessary method as part of the factory/configuration system expected : [out] Description of expected parameters for this object (Schema)
OutputChannel If this object is constructed using the factory/configuration system this method is called.
OutputChannel Recommended constructor, allowing guaranteed-to-work initialization.
initialize "Second constructor", to be called after construction with second argument autoInit == 0.
getInitialConfiguration returns the initial readonly configuration parameters Returns a Hash containing the initial information that should not be updated via ShowConnectionHandler and ShowStatisticsHandler.
getInstanceIdName Concatenation of instance id and name
hasRegisteredCopyInputChannel Check whether an InputChannel with given id is registered to receive all data i.e. an InputChannel with "dataDistribution == copy" instanceId : of InputChannel Return : bool whether InputChannel of specified type is connected
hasRegisteredSharedInputChannel Check whether an InputChannel with given id is registered to receive a share of the data i.e. an InputChannel with "dataDistribution == shared" instanceId : of InputChannel Return : bool whether InputChannel of specified type is connected
write Writes a Hash containing data to the output channel.
write Writes a Hash containing data to the output channel.
write Writes a Hash containing data to the output channel.
write Writes a Hash containing data to the output channel.
update Update the output channel, i.e. send all data over the wire that was previously written by calling write(...). This is a synchronous method, i.e. blocks until all data is actually sent (or dropped or queued). safeNDArray : boolean to indicate whether all NDArrays inside the Hash passed to write(..) before are 'safe', i.e. their memory will not be referred to elsewhere after update is finished. Default is 'false', 'true' can avoid safety copies of NDArray content when data is queued or sent locally. Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.
asyncUpdate Semi-asynchronously update the output channel, i.e. start asynchronous sending of data over the wire that was previously written to the output channel's' buffer by calling write(...), but block as long as required to really start sending. The start of sending data is delayed - for any connected input channel that is currently not ready to receive more data, but is configured with "dataDistribution" as "copy" and with "onSlowness" as "wait", - or if none of the connected input channels that are configured with "dataDistribution" as "shared" are currently ready to receive data and if this output channel is configured with "noInputShared" as "wait". safeNDArray : boolean to indicate whether all NDArrays inside the Hash passed to write(..) before are 'safe', i.e. their memory will not be referred to elsewhere before 'readyHandler' is called. Default is 'false', 'true' can avoid safety copies of NDArray content when data is queued or sent locally. writeDoneHandler : callback when data (that is not queued) has been sent and thus even NDArray data inside it can be re-used again (except if safeNDArray was set to 'true' in which case its memory may still be used in a queue). Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.
asyncUpdateNoWait Expert method Asynchronously update the output channel, i.e. asynchronously send all data over the wire that was previously written by calling write(...) without any blocking. This method must not be called again before either 'readyForNextHandler' or 'writeDoneHandler' have been called. If next data should be sent, but neither handler has been called yet, one has to block or skip the data. In the latter case, the wish of a connected input channel that is configured to make the output "wait" if not ready, is ignored (policy violation). Both handlers have to be valid function pointers. readyForNextHandler : callback when asyncUpdateNoWait may be called again (this can only be delayed if any blocking input channel ["wait"] is connected) writeDoneHandler : callback when sending is finished (as confirmed by Tcp) or stopped due to disconnection, or data is internally queued. So now all NDArray inside the Hash passed to write(..) before can be re-used again (except if safeNDArray was set to 'true' in which case its memory may still be used in a queue) safeNDArray : boolean to indicate whether all NDArrays inside the Hash passed to write(..) before are 'safe', i.e. their memory will not be referred to elsewhere after update is finished. False triggers a data copy if data needs to be queued. TODO: Provide a handler called when sending data is completed, including any queued data, and thus NDArray data can be re-used again even if safeNDArray=false (i.e. buffers could be re-used). Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.
signalEndOfStream Synchronously send end-of-stream (EOS) notification to all connected input channels to indicate a logical break in the data stream.
asyncSignalEndOfStream Asynchonously send end-of-stream (EOS) notification to all connected input channels to indicate a logical break in the data stream.
registerSharedInputSelector Register handler that selects which of the connected input channels that have dataDistribution = "shared" is to be served.
disable Shut down all underlying connections, object will not be usable afterwards.
eraseOldChannel Erase instance with 'instanceId' from 'channelContainer' if existing - if same as 'newChannel', do not close
pushShareNext Helper to indicate that given shared input is ready to receive more data Requires m_registeredInputsMutex to be locked
popShareNext Helper to provide id of shared input that is ready to receive more data Requires m_registeredInputsMutex to be locked
isShareNextEmpty Helper to tell whether none of the shared inputs is ready to receive more data Requires m_registeredInputsMutex to be locked
hasSharedInput Helper to query whether given shared input is ready to receive more data Requires m_registeredInputsMutex to be locked
eraseSharedInput Helper to indicate that given shared input is currently not ready to receive more data Requires m_registeredInputsMutex to be locked
pushCopyNext Helper to indicate that given copy input is ready to receive more data Requires m_registeredInputsMutex to be locked
eraseCopyInput Erase instance from container of copy channels that are ready to receive data Requires m_registeredInputsMutex to be locked instanceId : Return : whether instanceId could be removed (i.e. was actually ready to receive)
updateChunkId helper to set new m_chunkId Return : true if new m_chunkId is valid (i.e. not equal m_invalidChunkId)
ensureValidChunkId helper for asyncUpdate() to ensure that at the end m_chunkId is valid - may block a while lockOfRegisteredInputsMutex : a scoped_lock locking m_registeredInputsMutex (may be unlocked and locked again during execution)
asyncPrepareCopy Figure out how to treat copy inputs, return via appending to reference arguments Requires m_registeredInputsMutex to be locked
asyncPrepareDistribute Figure out how to treat shared inputs, return via (appending to) reference arguments Requires m_registeredInputsMutex to be locked
asyncPrepareDistributeEos Figure out how to send EndOfStream for shared outputs, return via reference arguments Requires m_registeredInputsMutex to be locked Return : whether to queue for shared queue
asyncPrepareDistributeSelected Figure out how to treat shared inputs if sharedInputSelector is registered Requires m_registeredInputsMutex to be locked
asyncPrepareDistributeLoadBal Figure out how to treat shared inputs when load-balancing Requires m_registeredInputsMutex to be locked
resetSendOngoing Helper that sets the sendOngoing flag to false for given instanceId
asyncSendOne Helper to asynchronously send chunk data to channel in given channelInfo chunkId : The chunk to send channelInfo : Container with info about channel to send to doneHandler : Callback when sending done or failed
awaitUpdateFuture Helper for waiting for future that in case of long delay adds a thread to unblock Throws TimeoutException if not unblocked after two minutes.
debugId Provide a string identifying this output channel (useful in DEBUG logging)

Function Details

OutputChannel

explicit OutputChannel(const karabo::data::Hash& config)

If this object is constructed using the factory/configuration system this method is called.

The initialize() method must not be called if constructed this way.

Deprecated: Tcp server initialization is triggered, but there is no control when and whether it succeeded. So better use the constructor with additional int argument (and set it to zero).

config : Validated (@see expectedParameters) and default-filled configuration

OutputChannel(const karabo::data::Hash& config, int autoInit)

Recommended constructor, allowing guaranteed-to-work initialization.

The recommended way to call it is via the Configurator and with autoInit == 0, followed by calling initialize():

Hash config(); OutputChannel::Pointer output = Configurator::create("OutputChannel", cfg, 0); output->initialize();

Caveat: Make sure you do not pass a 'bool' instead of an 'int' as argument to create(..) since then the other constructor is chosen and the value of the 'bool' determines whether to validate cfg or not.

config : Validated (@see expectedParameters) and default-filled configuration

autoInit : If set to 0 (strongly recommended), the constructor does not yet try to initiate the TCP server initialization and the initialize() method has to be called as "second constructor". The advantage is that the initialization cannot fail on busy systems and one has control when the server is available for remote connections. If autoInit != 0, this constructor behaves as the other constructor and initialize() must not be called.

asyncPrepareCopy

void asyncPrepareCopy(unsigned int chunkId, std::vector<karabo::data::Hash*>& toSendImmediately, std::vector<karabo::data::Hash*>& toQueue, std::vector<karabo::data::Hash*>& toBlock)

Figure out how to treat copy inputs, return via appending to reference arguments

Requires m_registeredInputsMutex to be locked

asyncPrepareDistribute

void asyncPrepareDistribute(unsigned int chunkId, std::vector<karabo::data::Hash*>& toSendImmediately, std::vector<karabo::data::Hash*>& toQueue, std::vector<karabo::data::Hash*>& toBlock, bool& queue, bool& block)

Figure out how to treat shared inputs, return via (appending to) reference arguments

Requires m_registeredInputsMutex to be locked

asyncPrepareDistributeEos

bool asyncPrepareDistributeEos(unsigned int chunkId, std::vector<karabo::data::Hash*>& toSendImmediately, std::vector<karabo::data::Hash*>& toQueue, std::vector<karabo::data::Hash*>& toBlock)

Figure out how to send EndOfStream for shared outputs, return via reference arguments

Requires m_registeredInputsMutex to be locked

Return : whether to queue for shared queue

asyncPrepareDistributeLoadBal

void asyncPrepareDistributeLoadBal(unsigned int chunkId, std::vector<karabo::data::Hash*>& toSendImmediately, std::vector<karabo::data::Hash*>& toQueue, std::vector<karabo::data::Hash*>& toBlock, bool& queue, bool& block)

Figure out how to treat shared inputs when load-balancing

Requires m_registeredInputsMutex to be locked

asyncPrepareDistributeSelected

void asyncPrepareDistributeSelected(unsigned int chunkId, std::vector<karabo::data::Hash*>& toSendImmediately, std::vector<karabo::data::Hash*>& toQueue, std::vector<karabo::data::Hash*>& toBlock)

Figure out how to treat shared inputs if sharedInputSelector is registered

Requires m_registeredInputsMutex to be locked

asyncSendOne

void asyncSendOne(unsigned int chunkId, InputChannelInfo& channelInfo, std::function<void()>&& doneHandler)

Helper to asynchronously send chunk data to channel in given channelInfo

chunkId : The chunk to send

channelInfo : Container with info about channel to send to

doneHandler : Callback when sending done or failed

asyncSignalEndOfStream

void asyncSignalEndOfStream(std::function<void()>&& readyHandler)

Asynchonously send end-of-stream (EOS) notification to all connected input channels to indicate a logical break in the data stream.

readyHandler : callback when notification has been sent or queued

Thread safety: All the 'write(..)' methods, 'update()'/'asyncUpdate(cb)' and 'signalEndOfStream()'/'asyncSignalEndOfStream(cb)' must not be called concurrently.

asyncUpdate

void asyncUpdate(bool safeNDArray = false, std::function<void()>&& writeDoneHandler = {})

Semi-asynchronously update the output channel, i.e. start asynchronous sending of data over the wire that was previously written to the output channel's' buffer by calling write(...), but block as long as required to really start sending. The start of sending data is delayed - for any connected input channel that is currently not ready to receive more data, but is configured with "dataDistribution" as "copy" and with "onSlowness" as "wait", - or if none of the connected input channels that are configured with "dataDistribution" as "shared" are currently ready to receive data and if this output channel is configured with "noInputShared" as "wait".

safeNDArray : boolean to indicate whether all NDArrays inside the Hash passed to write(..) before are 'safe', i.e. their memory will not be referred to elsewhere before 'readyHandler' is called. Default is 'false', 'true' can avoid safety copies of NDArray content when data is queued or sent locally.

writeDoneHandler : callback when data (that is not queued) has been sent and thus even NDArray data inside it can be re-used again (except if safeNDArray was set to 'true' in which case its memory may still be used in a queue).

Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.

asyncUpdateNoWait

void asyncUpdateNoWait(std::function<void()>&& readyForNextHandler, std::function<void()>&& writeDoneHandler, bool safeNDArray)

Expert method

Asynchronously update the output channel, i.e. asynchronously send all data over the wire that was previously written by calling write(...) without any blocking.

This method must not be called again before either 'readyForNextHandler' or 'writeDoneHandler' have been called. If next data should be sent, but neither handler has been called yet, one has to block or skip the data. In the latter case, the wish of a connected input channel that is configured to make the output "wait" if not ready, is ignored (policy violation).

Both handlers have to be valid function pointers.

readyForNextHandler : callback when asyncUpdateNoWait may be called again (this can only be delayed if any blocking input channel ["wait"] is connected)

writeDoneHandler : callback when sending is finished (as confirmed by Tcp) or stopped due to disconnection, or data is internally queued. So now all NDArray inside the Hash passed to write(..) before can be re-used again (except if safeNDArray was set to 'true' in which case its memory may still be used in a queue)

safeNDArray : boolean to indicate whether all NDArrays inside the Hash passed to write(..) before are 'safe', i.e. their memory will not be referred to elsewhere after update is finished. False triggers a data copy if data needs to be queued.

TODO: Provide a handler called when sending data is completed, including any queued data, and thus NDArray data can be re-used again even if safeNDArray=false (i.e. buffers could be re-used).

Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.

awaitUpdateFuture

void awaitUpdateFuture(std::future<void>& fut, const char* which)

Helper for waiting for future that in case of long delay adds a thread to unblock

Throws TimeoutException if not unblocked after two minutes.

debugId

std::string debugId() const

Provide a string identifying this output channel (useful in DEBUG logging)

disable

void disable()

Shut down all underlying connections, object will not be usable afterwards.

Needed if stray shared pointers may be kept somewhere.

ensureValidChunkId

void ensureValidChunkId(std::unique_lock<std::mutex>& lockOfRegisteredInputsMutex)

helper for asyncUpdate() to ensure that at the end m_chunkId is valid - may block a while

lockOfRegisteredInputsMutex : a scoped_lock locking m_registeredInputsMutex (may be unlocked and locked again during execution)

eraseCopyInput

bool eraseCopyInput(const std::string& instanceId)

Erase instance from container of copy channels that are ready to receive data

Requires m_registeredInputsMutex to be locked

instanceId :

Return : whether instanceId could be removed (i.e. was actually ready to receive)

eraseOldChannel

void eraseOldChannel(InputChannels& channelContainer, const std::string& instanceId, const karabo::net::Channel::Pointer& newChannel) const

Erase instance with 'instanceId' from 'channelContainer' if existing - if same as 'newChannel', do not close

eraseSharedInput

void eraseSharedInput(const std::string& instanceId)

Helper to indicate that given shared input is currently not ready to receive more data

Requires m_registeredInputsMutex to be locked

expectedParameters

static void expectedParameters(karabo::data::Schema& expected)

Necessary method as part of the factory/configuration system

expected : [out] Description of expected parameters for this object (Schema)

getInitialConfiguration

karabo::data::Hash getInitialConfiguration() const

returns the initial readonly configuration parameters

Returns a Hash containing the initial information that should not be updated via ShowConnectionHandler and ShowStatisticsHandler. Currently only the address key is included.

getInstanceIdName

std::string getInstanceIdName() const

Concatenation of instance id and name

hasRegisteredCopyInputChannel

bool hasRegisteredCopyInputChannel(const std::string& instanceId) const

Check whether an InputChannel with given id is registered to receive all data

i.e. an InputChannel with "dataDistribution == copy"

instanceId : of InputChannel

Return : bool whether InputChannel of specified type is connected

hasRegisteredSharedInputChannel

bool hasRegisteredSharedInputChannel(const std::string& instanceId) const

Check whether an InputChannel with given id is registered to receive a share of the data

i.e. an InputChannel with "dataDistribution == shared"

instanceId : of InputChannel

Return : bool whether InputChannel of specified type is connected

hasSharedInput

bool hasSharedInput(const std::string& instanceId)

Helper to query whether given shared input is ready to receive more data

Requires m_registeredInputsMutex to be locked

initialize

void initialize()

"Second constructor", to be called after construction with second argument autoInit == 0.

Initializes the underlying Tcp server connection and makes it available for others.

May throw a karabo::util::NetworkException, e.g. if a non-zero port was defined in the input configuration and that is not available since used by something else.

isShareNextEmpty

bool isShareNextEmpty() const

Helper to tell whether none of the shared inputs is ready to receive more data

Requires m_registeredInputsMutex to be locked

popShareNext

std::string popShareNext()

Helper to provide id of shared input that is ready to receive more data

Requires m_registeredInputsMutex to be locked

pushCopyNext

void pushCopyNext(const std::string& instanceId)

Helper to indicate that given copy input is ready to receive more data

Requires m_registeredInputsMutex to be locked

pushShareNext

void pushShareNext(const std::string& instanceId)

Helper to indicate that given shared input is ready to receive more data

Requires m_registeredInputsMutex to be locked

registerSharedInputSelector

void registerSharedInputSelector(SharedInputSelector&& selector)

Register handler that selects which of the connected input channels that have dataDistribution = "shared" is to be served.

The handler will be called during update(..)/asyncUpdateNoWait with the ids of the connected "shared" input channels (e.g. "deviceId:input") as argument. The returned channel id will receive the data. If an empty string or an unknown id is returned, the data will be dropped.

selector : takes vector as argument and returns string

resetSendOngoing

void resetSendOngoing(const std::string& instanceId)

Helper that sets the sendOngoing flag to false for given instanceId

signalEndOfStream

void signalEndOfStream()

Synchronously send end-of-stream (EOS) notification to all connected input channels to indicate a logical break in the data stream.

Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.

update

void update(bool safeNDArray = false)

Update the output channel, i.e. send all data over the wire that was previously written by calling write(...). This is a synchronous method, i.e. blocks until all data is actually sent (or dropped or queued).

safeNDArray : boolean to indicate whether all NDArrays inside the Hash passed to write(..) before are 'safe', i.e. their memory will not be referred to elsewhere after update is finished. Default is 'false', 'true' can avoid safety copies of NDArray content when data is queued or sent locally.

Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.

updateChunkId

bool updateChunkId()

helper to set new m_chunkId

Return : true if new m_chunkId is valid (i.e. not equal m_invalidChunkId)

write

void write(const karabo::data::Hash& data, const Memory::MetaData& metaData, bool /*unused*/ = false)

Writes a Hash containing data to the output channel. Sending to the network happens when update() is called.

data : input Hash object

metaData : a MetaData object containing meta data for this data token.

Note: Any NDArray/ImageData inside data must stay untouched at least until update() or the callback of asyncUpdate(cb) has been called. See also the documentation of the safeNDArray flag of the update()/asyncUpdate() methods.

Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.

void write(const karabo::data::Hash& data, bool /*unused*/ = false)

Writes a Hash containing data to the output channel. Sending to the network happens when update() is called. Metadata is initialized to default values. Namely the sending devices device id and the output channel's name are used as data source.

data : input Hash object

Note: Any NDArray/ImageData inside data must stay untouched at least until update() or the callback of asyncUpdate(cb) has been called. See also the documentation of the safeNDArray flag of the update()/asyncUpdate() methods.

Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.

KARABO_DEPRECATED void write(const karabo::data::Hash::Pointer& data, const Memory::MetaData& metaData)

Writes a Hash containing data to the output channel. Sending to the network happens when update() is called.

data : shared pointer to input Hash object

metaData : a MetaData object containing meta data for this data token.

Note: Any NDArray/ImageData inside data must stay untouched at least until update() or the callback of asyncUpdate(cb) has been called. See also the documentation of the safeNDArray flag of the update()/asyncUpdate() methods.

Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.

KARABO_DEPRECATED void write(const karabo::data::Hash::Pointer& data)

Writes a Hash containing data to the output channel. Sending to the network happens asynchronously. Metadata is initialized to default values. Namely the sending devices device id and the output channel's name are used as data source.

data : shared pointer to input Hash object

Note: Any NDArray/ImageData inside data must stay untouched at least until update() or the callback of asyncUpdate(cb) has been called. See also the documentation of the safeNDArray flag of the update()/asyncUpdate() methods.

Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.