OutputChannel
class OutputChannel : public std::enable_shared_from_this<OutputChannel>
@brief An OutputChannel for passing data to pipelined processing
The OutputChannel class is used for writing data to pipelined processing inputs. It supports tracking of meta data for each data token written to it. Specifically, it e.g. allows for keeping track of data producers, here called sources, and timing and train information. Meta data information enables aggregation of multiple data source into one output channel interaction with a remote host, as well as aggregation of multiple train-related data of the same source. A mixture of both scenarios is possible.
An example of these use cases
OutputChannel::Pointer output = ... //
Hash data1;
....
OutputChannel::MetaData meta1("THIS/IS/SOURCE/A/channel1", karabo::data::Timestamp());
output->write(data1, meta1)
Hash data2_10;
....
OutputChannel::MetaData meta2_10("THIS/IS/SOURCE/B/channel2", timestampForTrain10);
output->write(data2_10, meta2)
OutputChannel::MetaData meta2_11("THIS/IS/SOURCE/B/channel2", timestampForTrain11);
output->write(data2_11, meta2_11)
Hash data_this_source;
...
// not passing any meta data to write will default the source to [deviceId]/[channelName]
// and the timestamp to the current timestamp
output->write(data_this_source);
// now actually send over the network
output->update();
Functions
| Name | Description |
|---|---|
| expectedParameters | Necessary method as part of the factory/configuration system expected : [out] Description of expected parameters for this object (Schema) |
| OutputChannel | If this object is constructed using the factory/configuration system this method is called. |
| OutputChannel | Recommended constructor, allowing guaranteed-to-work initialization. |
| initialize | "Second constructor", to be called after construction with second argument autoInit == 0. |
| getInitialConfiguration | returns the initial readonly configuration parameters Returns a Hash containing the initial information that should not be updated via ShowConnectionHandler and ShowStatisticsHandler. |
| getInstanceIdName | Concatenation of instance id and name |
| hasRegisteredCopyInputChannel | Check whether an InputChannel with given id is registered to receive all data i.e. an InputChannel with "dataDistribution == copy" instanceId : of InputChannel Return : bool whether InputChannel of specified type is connected |
| hasRegisteredSharedInputChannel | Check whether an InputChannel with given id is registered to receive a share of the data i.e. an InputChannel with "dataDistribution == shared" instanceId : of InputChannel Return : bool whether InputChannel of specified type is connected |
| write | Writes a Hash containing data to the output channel. |
| write | Writes a Hash containing data to the output channel. |
| write | Writes a Hash containing data to the output channel. |
| write | Writes a Hash containing data to the output channel. |
| update | Update the output channel, i.e. send all data over the wire that was previously written by calling write(...). This is a synchronous method, i.e. blocks until all data is actually sent (or dropped or queued). safeNDArray : boolean to indicate whether all NDArrays inside the Hash passed to write(..) before are 'safe', i.e. their memory will not be referred to elsewhere after update is finished. Default is 'false', 'true' can avoid safety copies of NDArray content when data is queued or sent locally. Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently. |
| asyncUpdate | Semi-asynchronously update the output channel, i.e. start asynchronous sending of data over the wire that was previously written to the output channel's' buffer by calling write(...), but block as long as required to really start sending. The start of sending data is delayed - for any connected input channel that is currently not ready to receive more data, but is configured with "dataDistribution" as "copy" and with "onSlowness" as "wait", - or if none of the connected input channels that are configured with "dataDistribution" as "shared" are currently ready to receive data and if this output channel is configured with "noInputShared" as "wait". safeNDArray : boolean to indicate whether all NDArrays inside the Hash passed to write(..) before are 'safe', i.e. their memory will not be referred to elsewhere before 'readyHandler' is called. Default is 'false', 'true' can avoid safety copies of NDArray content when data is queued or sent locally. writeDoneHandler : callback when data (that is not queued) has been sent and thus even NDArray data inside it can be re-used again (except if safeNDArray was set to 'true' in which case its memory may still be used in a queue). Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently. |
| asyncUpdateNoWait | Expert method Asynchronously update the output channel, i.e. asynchronously send all data over the wire that was previously written by calling write(...) without any blocking. This method must not be called again before either 'readyForNextHandler' or 'writeDoneHandler' have been called. If next data should be sent, but neither handler has been called yet, one has to block or skip the data. In the latter case, the wish of a connected input channel that is configured to make the output "wait" if not ready, is ignored (policy violation). Both handlers have to be valid function pointers. readyForNextHandler : callback when asyncUpdateNoWait may be called again (this can only be delayed if any blocking input channel ["wait"] is connected) writeDoneHandler : callback when sending is finished (as confirmed by Tcp) or stopped due to disconnection, or data is internally queued. So now all NDArray inside the Hash passed to write(..) before can be re-used again (except if safeNDArray was set to 'true' in which case its memory may still be used in a queue) safeNDArray : boolean to indicate whether all NDArrays inside the Hash passed to write(..) before are 'safe', i.e. their memory will not be referred to elsewhere after update is finished. False triggers a data copy if data needs to be queued. TODO: Provide a handler called when sending data is completed, including any queued data, and thus NDArray data can be re-used again even if safeNDArray=false (i.e. buffers could be re-used). Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently. |
| signalEndOfStream | Synchronously send end-of-stream (EOS) notification to all connected input channels to indicate a logical break in the data stream. |
| asyncSignalEndOfStream | Asynchonously send end-of-stream (EOS) notification to all connected input channels to indicate a logical break in the data stream. |
| registerSharedInputSelector | Register handler that selects which of the connected input channels that have dataDistribution = "shared" is to be served. |
| disable | Shut down all underlying connections, object will not be usable afterwards. |
| eraseOldChannel | Erase instance with 'instanceId' from 'channelContainer' if existing - if same as 'newChannel', do not close |
| pushShareNext | Helper to indicate that given shared input is ready to receive more data Requires m_registeredInputsMutex to be locked |
| popShareNext | Helper to provide id of shared input that is ready to receive more data Requires m_registeredInputsMutex to be locked |
| isShareNextEmpty | Helper to tell whether none of the shared inputs is ready to receive more data Requires m_registeredInputsMutex to be locked |
| hasSharedInput | Helper to query whether given shared input is ready to receive more data Requires m_registeredInputsMutex to be locked |
| eraseSharedInput | Helper to indicate that given shared input is currently not ready to receive more data Requires m_registeredInputsMutex to be locked |
| pushCopyNext | Helper to indicate that given copy input is ready to receive more data Requires m_registeredInputsMutex to be locked |
| eraseCopyInput | Erase instance from container of copy channels that are ready to receive data Requires m_registeredInputsMutex to be locked instanceId : Return : whether instanceId could be removed (i.e. was actually ready to receive) |
| updateChunkId | helper to set new m_chunkId Return : true if new m_chunkId is valid (i.e. not equal m_invalidChunkId) |
| ensureValidChunkId | helper for asyncUpdate() to ensure that at the end m_chunkId is valid - may block a while lockOfRegisteredInputsMutex : a scoped_lock locking m_registeredInputsMutex (may be unlocked and locked again during execution) |
| asyncPrepareCopy | Figure out how to treat copy inputs, return via appending to reference arguments Requires m_registeredInputsMutex to be locked |
| asyncPrepareDistribute | Figure out how to treat shared inputs, return via (appending to) reference arguments Requires m_registeredInputsMutex to be locked |
| asyncPrepareDistributeEos | Figure out how to send EndOfStream for shared outputs, return via reference arguments Requires m_registeredInputsMutex to be locked Return : whether to queue for shared queue |
| asyncPrepareDistributeSelected | Figure out how to treat shared inputs if sharedInputSelector is registered Requires m_registeredInputsMutex to be locked |
| asyncPrepareDistributeLoadBal | Figure out how to treat shared inputs when load-balancing Requires m_registeredInputsMutex to be locked |
| resetSendOngoing | Helper that sets the sendOngoing flag to false for given instanceId |
| asyncSendOne | Helper to asynchronously send chunk data to channel in given channelInfo chunkId : The chunk to send channelInfo : Container with info about channel to send to doneHandler : Callback when sending done or failed |
| awaitUpdateFuture | Helper for waiting for future that in case of long delay adds a thread to unblock Throws TimeoutException if not unblocked after two minutes. |
| debugId | Provide a string identifying this output channel (useful in DEBUG logging) |
Function Details
OutputChannel
explicit OutputChannel(const karabo::data::Hash& config)
If this object is constructed using the factory/configuration system this method is called.
The initialize() method must not be called if constructed this way.
Deprecated: Tcp server initialization is triggered, but there is no control when and whether it succeeded. So better use the constructor with additional int argument (and set it to zero).
config
: Validated (@see expectedParameters) and default-filled configuration
OutputChannel(const karabo::data::Hash& config, int autoInit)
Recommended constructor, allowing guaranteed-to-work initialization.
The recommended way to call it is via the Configurator and with autoInit == 0, followed by calling initialize():
Hash config(
Caveat: Make sure you do not pass a 'bool' instead of an 'int' as argument to create(..) since then the other constructor is chosen and the value of the 'bool' determines whether to validate cfg or not.
config
: Validated (@see expectedParameters) and default-filled configuration
autoInit
: If set to 0 (strongly recommended), the constructor does not yet try to initiate the
TCP server initialization and the initialize() method has to be called as
"second constructor". The advantage is that the initialization cannot fail on busy
systems and one has control when the server is available for remote connections.
If autoInit != 0, this constructor behaves as the other constructor and initialize()
must not be called.
asyncPrepareCopy
void asyncPrepareCopy(unsigned int chunkId, std::vector<karabo::data::Hash*>& toSendImmediately, std::vector<karabo::data::Hash*>& toQueue, std::vector<karabo::data::Hash*>& toBlock)
Figure out how to treat copy inputs, return via appending to reference arguments
Requires m_registeredInputsMutex to be locked
asyncPrepareDistribute
void asyncPrepareDistribute(unsigned int chunkId, std::vector<karabo::data::Hash*>& toSendImmediately, std::vector<karabo::data::Hash*>& toQueue, std::vector<karabo::data::Hash*>& toBlock, bool& queue, bool& block)
Figure out how to treat shared inputs, return via (appending to) reference arguments
Requires m_registeredInputsMutex to be locked
asyncPrepareDistributeEos
bool asyncPrepareDistributeEos(unsigned int chunkId, std::vector<karabo::data::Hash*>& toSendImmediately, std::vector<karabo::data::Hash*>& toQueue, std::vector<karabo::data::Hash*>& toBlock)
Figure out how to send EndOfStream for shared outputs, return via reference arguments
Requires m_registeredInputsMutex to be locked
Return : whether to queue for shared queue
asyncPrepareDistributeLoadBal
void asyncPrepareDistributeLoadBal(unsigned int chunkId, std::vector<karabo::data::Hash*>& toSendImmediately, std::vector<karabo::data::Hash*>& toQueue, std::vector<karabo::data::Hash*>& toBlock, bool& queue, bool& block)
Figure out how to treat shared inputs when load-balancing
Requires m_registeredInputsMutex to be locked
asyncPrepareDistributeSelected
void asyncPrepareDistributeSelected(unsigned int chunkId, std::vector<karabo::data::Hash*>& toSendImmediately, std::vector<karabo::data::Hash*>& toQueue, std::vector<karabo::data::Hash*>& toBlock)
Figure out how to treat shared inputs if sharedInputSelector is registered
Requires m_registeredInputsMutex to be locked
asyncSendOne
void asyncSendOne(unsigned int chunkId, InputChannelInfo& channelInfo, std::function<void()>&& doneHandler)
Helper to asynchronously send chunk data to channel in given channelInfo
chunkId
: The chunk to send
channelInfo
: Container with info about channel to send to
doneHandler
: Callback when sending done or failed
asyncSignalEndOfStream
void asyncSignalEndOfStream(std::function<void()>&& readyHandler)
Asynchonously send end-of-stream (EOS) notification to all connected input channels to indicate a logical break in the data stream.
readyHandler
: callback when notification has been sent or queued
Thread safety: All the 'write(..)' methods, 'update()'/'asyncUpdate(cb)' and 'signalEndOfStream()'/'asyncSignalEndOfStream(cb)' must not be called concurrently.
asyncUpdate
void asyncUpdate(bool safeNDArray = false, std::function<void()>&& writeDoneHandler = {})
Semi-asynchronously update the output channel, i.e. start asynchronous sending of data over the wire that was previously written to the output channel's' buffer by calling write(...), but block as long as required to really start sending. The start of sending data is delayed - for any connected input channel that is currently not ready to receive more data, but is configured with "dataDistribution" as "copy" and with "onSlowness" as "wait", - or if none of the connected input channels that are configured with "dataDistribution" as "shared" are currently ready to receive data and if this output channel is configured with "noInputShared" as "wait".
safeNDArray
: boolean to indicate whether all NDArrays inside the Hash passed to write(..) before
are 'safe', i.e. their memory will not be referred to elsewhere before 'readyHandler'
is called. Default is 'false', 'true' can avoid safety copies of NDArray content when
data is queued or sent locally.
writeDoneHandler
: callback when data (that is not queued) has been sent and thus even NDArray data
inside it can be re-used again (except if safeNDArray was set to 'true' in which
case its memory may still be used in a queue).
Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.
asyncUpdateNoWait
void asyncUpdateNoWait(std::function<void()>&& readyForNextHandler, std::function<void()>&& writeDoneHandler, bool safeNDArray)
Expert method
Asynchronously update the output channel, i.e. asynchronously send all data over the wire that was previously written by calling write(...) without any blocking.
This method must not be called again before either 'readyForNextHandler' or 'writeDoneHandler' have been called. If next data should be sent, but neither handler has been called yet, one has to block or skip the data. In the latter case, the wish of a connected input channel that is configured to make the output "wait" if not ready, is ignored (policy violation).
Both handlers have to be valid function pointers.
readyForNextHandler
: callback when asyncUpdateNoWait may be called again (this can only be
delayed if any blocking input channel ["wait"] is connected)
writeDoneHandler
: callback when sending is finished (as confirmed by Tcp) or stopped due to
disconnection, or data is internally queued. So now all NDArray inside the Hash
passed to write(..) before can be re-used again (except if safeNDArray was set
to 'true' in which case its memory may still be used in a queue)
safeNDArray
: boolean to indicate whether all NDArrays inside the Hash passed to write(..) before
are 'safe', i.e. their memory will not be referred to elsewhere after update is
finished. False triggers a data copy if data needs to be queued.
TODO: Provide a handler called when sending data is completed, including any queued data, and thus NDArray data can be re-used again even if safeNDArray=false (i.e. buffers could be re-used).
Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.
awaitUpdateFuture
void awaitUpdateFuture(std::future<void>& fut, const char* which)
Helper for waiting for future that in case of long delay adds a thread to unblock
Throws TimeoutException if not unblocked after two minutes.
debugId
std::string debugId() const
Provide a string identifying this output channel (useful in DEBUG logging)
disable
void disable()
Shut down all underlying connections, object will not be usable afterwards.
Needed if stray shared pointers may be kept somewhere.
ensureValidChunkId
void ensureValidChunkId(std::unique_lock<std::mutex>& lockOfRegisteredInputsMutex)
helper for asyncUpdate() to ensure that at the end m_chunkId is valid - may block a while
lockOfRegisteredInputsMutex
: a scoped_lock locking m_registeredInputsMutex
(may be unlocked and locked again during execution)
eraseCopyInput
bool eraseCopyInput(const std::string& instanceId)
Erase instance from container of copy channels that are ready to receive data
Requires m_registeredInputsMutex to be locked
instanceId
:
Return : whether instanceId could be removed (i.e. was actually ready to receive)
eraseOldChannel
void eraseOldChannel(InputChannels& channelContainer, const std::string& instanceId, const karabo::net::Channel::Pointer& newChannel) const
Erase instance with 'instanceId' from 'channelContainer' if existing - if same as 'newChannel', do not close
eraseSharedInput
void eraseSharedInput(const std::string& instanceId)
Helper to indicate that given shared input is currently not ready to receive more data
Requires m_registeredInputsMutex to be locked
expectedParameters
static void expectedParameters(karabo::data::Schema& expected)
Necessary method as part of the factory/configuration system
expected
: [out] Description of expected parameters for this object (Schema)
getInitialConfiguration
karabo::data::Hash getInitialConfiguration() const
returns the initial readonly configuration parameters
Returns a Hash containing the initial information that should
not be updated via ShowConnectionHandler and ShowStatisticsHandler.
Currently only the address key is included.
getInstanceIdName
std::string getInstanceIdName() const
Concatenation of instance id and name
hasRegisteredCopyInputChannel
bool hasRegisteredCopyInputChannel(const std::string& instanceId) const
Check whether an InputChannel with given id is registered to receive all data
i.e. an InputChannel with "dataDistribution == copy"
instanceId
: of InputChannel
Return : bool whether InputChannel of specified type is connected
hasRegisteredSharedInputChannel
bool hasRegisteredSharedInputChannel(const std::string& instanceId) const
Check whether an InputChannel with given id is registered to receive a share of the data
i.e. an InputChannel with "dataDistribution == shared"
instanceId
: of InputChannel
Return : bool whether InputChannel of specified type is connected
hasSharedInput
bool hasSharedInput(const std::string& instanceId)
Helper to query whether given shared input is ready to receive more data
Requires m_registeredInputsMutex to be locked
initialize
void initialize()
"Second constructor", to be called after construction with second argument autoInit == 0.
Initializes the underlying Tcp server connection and makes it available for others.
May throw a karabo::util::NetworkException, e.g. if a non-zero port was defined in the input configuration and that is not available since used by something else.
isShareNextEmpty
bool isShareNextEmpty() const
Helper to tell whether none of the shared inputs is ready to receive more data
Requires m_registeredInputsMutex to be locked
popShareNext
std::string popShareNext()
Helper to provide id of shared input that is ready to receive more data
Requires m_registeredInputsMutex to be locked
pushCopyNext
void pushCopyNext(const std::string& instanceId)
Helper to indicate that given copy input is ready to receive more data
Requires m_registeredInputsMutex to be locked
pushShareNext
void pushShareNext(const std::string& instanceId)
Helper to indicate that given shared input is ready to receive more data
Requires m_registeredInputsMutex to be locked
registerSharedInputSelector
void registerSharedInputSelector(SharedInputSelector&& selector)
Register handler that selects which of the connected input channels that have dataDistribution = "shared" is to be served.
The handler will be called during update(..)/asyncUpdateNoWait with the ids of the connected "shared" input channels (e.g. "deviceId:input") as argument. The returned channel id will receive the data. If an empty string or an unknown id is returned, the data will be dropped.
selector
: takes vector
resetSendOngoing
void resetSendOngoing(const std::string& instanceId)
Helper that sets the sendOngoing flag to false for given instanceId
signalEndOfStream
void signalEndOfStream()
Synchronously send end-of-stream (EOS) notification to all connected input channels to indicate a logical break in the data stream.
Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.
update
void update(bool safeNDArray = false)
Update the output channel, i.e. send all data over the wire that was previously written by calling write(...). This is a synchronous method, i.e. blocks until all data is actually sent (or dropped or queued).
safeNDArray
: boolean to indicate whether all NDArrays inside the Hash passed to write(..) before
are 'safe', i.e. their memory will not be referred to elsewhere after update is
finished. Default is 'false', 'true' can avoid safety copies of NDArray content when
data is queued or sent locally.
Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.
updateChunkId
bool updateChunkId()
helper to set new m_chunkId
Return : true if new m_chunkId is valid (i.e. not equal m_invalidChunkId)
write
void write(const karabo::data::Hash& data, const Memory::MetaData& metaData, bool /*unused*/ = false)
Writes a Hash containing data to the output channel. Sending to the network happens when update() is called.
data
: input Hash object
metaData
: a MetaData object containing meta data for this data token.
Note: Any NDArray/ImageData inside data must stay untouched at least until update() or the callback of asyncUpdate(cb) has been called. See also the documentation of the safeNDArray flag of the update()/asyncUpdate() methods.
Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.
void write(const karabo::data::Hash& data, bool /*unused*/ = false)
Writes a Hash containing data to the output channel. Sending to the network happens when update() is called. Metadata is initialized to default values. Namely the sending devices device id and the output channel's name are used as data source.
data
: input Hash object
Note: Any NDArray/ImageData inside data must stay untouched at least until update() or the callback of asyncUpdate(cb) has been called. See also the documentation of the safeNDArray flag of the update()/asyncUpdate() methods.
Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.
KARABO_DEPRECATED void write(const karabo::data::Hash::Pointer& data, const Memory::MetaData& metaData)
Writes a Hash containing data to the output channel. Sending to the network happens when update() is called.
data
: shared pointer to input Hash object
metaData
: a MetaData object containing meta data for this data token.
Note: Any NDArray/ImageData inside data must stay untouched at least until update() or the callback of asyncUpdate(cb) has been called. See also the documentation of the safeNDArray flag of the update()/asyncUpdate() methods.
Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.
KARABO_DEPRECATED void write(const karabo::data::Hash::Pointer& data)
Writes a Hash containing data to the output channel. Sending to the network happens asynchronously. Metadata is initialized to default values. Namely the sending devices device id and the output channel's name are used as data source.
data
: shared pointer to input Hash object
Note: Any NDArray/ImageData inside data must stay untouched at least until update() or the callback of asyncUpdate(cb) has been called. See also the documentation of the safeNDArray flag of the update()/asyncUpdate() methods.
Thread safety: All the 'write(..)' methods, '[async]UpdateNoWait' and '[async]SignalEndOfStream(..)' must not be called concurrently.