InputChannel
class InputChannel : public std::enable_shared_from_this<InputChannel>
@brief The InputChannel class is used to receive data from pipelined processing OutputChannels.
The InputChannel class is used to receive data from pipelined processing OutputChannels. It additionally supports receiving meta data associated with each data token read. Specifically, the meta information contains source information, i.e. what produced the data token, and timing information, e.g. train ids.
void onInput(const InputChannel::Pointer& input) {
for (unsigned int i = 0; i != input->size(); ++i) {
Hash h;
const InputChannel::MetaData& meta = input->read(i);
std::cout<<"Source: <<meta.getSource()<<std::endl;
std::cout<<"TrainId: <<meta.getTimestamp().getTrainId()<<std::endl;
}
}
Types
| Name | Description |
|---|---|
| Handlers | Container for InputChannel handlers that concern data handling. |
Variables
| Name | Description |
|---|---|
| m_inputHandler | Callback on available data (per InputChannel) |
| m_dataHandler | Callback on available data (per item in InputChannel) |
| m_connectionsBeingSetup | All 'outputChannelString' for that a connection attempt is currently ongoing, with their handlers and ids |
Functions
| Name | Description |
|---|---|
| expectedParameters | Necessary method as part of the factory/configuration system expected : [out] Description of expected parameters for this object (Schema) |
| InputChannel | If this object is constructed using the factory/configuration system this method is called input : Validated (@see expectedParameters) and default-filled configuration |
| reconfigure | Reconfigure InputChannel Disconnects any previous "connectedOutputChannels" if not in config["connectedOutputChannels"]. |
| registerInputHandler | Register handler to be called when new data has arrived. |
| registerDataHandler | Register handler to be called for each data item that arrives. |
| registerEndOfStreamEventHandler | Register handler to be called when connected output channels inform about end-of-stream. |
| getRegisteredHandlers | Get handlers registered for data, input and end-of-stream handling. |
| dataQuantityRead | Returns the number of bytes read since the last call of this method See karabo::util::TcpChannel::dataQuantityRead() |
| dataQuantityWritten | Returns the number of bytes written since the last call of this method See karabo::util::TcpChannel::dataQuantityWritten() |
| getConnectedOutputChannels | Returns a map of between "output channel string" and "output channel info" Hash outputChannelString (STRING) represented like "instanceId:channelName" outputChannelInfo contains connection parameters or is empty, depending on connection state. This contains all output channels that the InputChannel is configured for, irrespective whether currently connected or not. Return : map. |
| getConnectionStatus | Provide a map between the output channels that are configured and their connection status. |
| read | Read data from the InputChannel - to be called inside an InputHandler callback Kept for backward compatibility only since internally the data is copied! |
| read | Read data from the InputChannel - to be called inside an InputHandler callback idx : of the data token to read from the available data tokens. Use InputChannel::size to request number of available tokens Return : the data as a pointer |
| read | Read data and meta data from the InputChannel - to be called inside an InputHandler callback idx : of the data token to read from the available data tokens. Use InputChannel::size to request number of available tokens source : reference that will hold the meta data Return : the data as a pointer |
| size | Number of data tokens - to be called inside an InputHandler callback |
| connect | Asynchronously connect this input channel to the output channel described by the first argument outputChannelInfo : Hash with three keys - "outputChannelString": a string matching one of the configured "connectedOutputChannels" - "connectionType": a string - currently only "tcp" supported - "hostname": a string telling which host/ip address to connect to - "port": an unsigned int telling the port - "memoryLocation: string "remote" or "local" to tell whether other end is in another process or can share memory handler : indicates asynchronously (like via EventLoop::post) the success of the connection request |
| disconnect | Disconnect and clean internals connectionString : One of the "connectedOutputChannels" given at construction |
| updateOutputChannelConfiguration | Update list of output channels that can be connected outputChannelString : string that can later be used as key "outputChannelString" of Hash argument to connect config : kept for backward compatibility |
| getMetaData | Get the current meta data for input data available on this input channel. |
| sourceToIndices | Return the list of indices of the data tokens (for read(index) ) for a given source identifier. |
| trainIdToIndices | Return the list of indices of the data tokens (for read(index) ) for a given train id. |
| indexToMetaData | Return the data source identifier pertinent to a data token at a given index. |
| disconnectImpl | Disconnect internals - needs protection by m_outputChannelsMutex outputChannelString : One of the "connectedOutputChannels" given at construction |
Variable Details
m_connectionsBeingSetup
std::unordered_map<std::string, std::pair<unsigned int, std::function<void(const karabo::net::ErrorCode&)>>> m_connectionsBeingSetup
All 'outputChannelString' for that a connection attempt is currently ongoing, with their handlers and ids
m_dataHandler
DataHandler m_dataHandler
Callback on available data (per item in InputChannel)
m_inputHandler
InputHandler m_inputHandler
Callback on available data (per InputChannel)
Function Details
InputChannel
InputChannel(const karabo::data::Hash& config)
If this object is constructed using the factory/configuration system this method is called
input
: Validated (@see expectedParameters) and default-filled configuration
connect
void connect(const karabo::data::Hash& outputChannelInfo, const std::function<void(const karabo::net::ErrorCode&)>& handler = std::function<void(const karabo::net::ErrorCode&)>())
Asynchronously connect this input channel to the output channel described by the first argument
outputChannelInfo
: Hash with three keys
- "outputChannelString": a string matching one of the configured
"connectedOutputChannels"
- "connectionType": a string - currently only "tcp" supported
- "hostname": a string telling which host/ip address to connect to
- "port": an unsigned int telling the port
- "memoryLocation: string "remote" or "local" to tell whether other end is in another
process or can share memory
handler
: indicates asynchronously (like via EventLoop::post) the success of the connection request
dataQuantityRead
size_t dataQuantityRead()
Returns the number of bytes read since the last call of this method
See karabo::util::TcpChannel::dataQuantityRead()
dataQuantityWritten
size_t dataQuantityWritten()
Returns the number of bytes written since the last call of this method
See karabo::util::TcpChannel::dataQuantityWritten()
disconnect
void disconnect(const std::string& connectionString)
Disconnect and clean internals
connectionString
: One of the "connectedOutputChannels" given at construction
disconnectImpl
void disconnectImpl(const std::string& outputChannelString)
Disconnect internals - needs protection by m_outputChannelsMutex
outputChannelString
: One of the "connectedOutputChannels" given at construction
expectedParameters
static void expectedParameters(karabo::data::Schema& expected)
Necessary method as part of the factory/configuration system
expected
: [out] Description of expected parameters for this object (Schema)
getConnectedOutputChannels
std::map<std::string, karabo::data::Hash> getConnectedOutputChannels()
Returns a map of between "output channel string" and "output channel info" Hash outputChannelString (STRING) represented like "instanceId:channelName" outputChannelInfo contains connection parameters or is empty, depending on connection state. This contains all output channels that the InputChannel is configured for, irrespective whether currently connected or not.
Return : map.
getConnectionStatus
std::unordered_map<std::string, karabo::net::ConnectionStatus> getConnectionStatus()
Provide a map between the output channels that are configured and their connection status.
Return : map
getMetaData
const std::vector<MetaData>& getMetaData() const
Get the current meta data for input data available on this input channel. Validity time of the object corresponds to lifetime of the InputHandler callback. Also the InputHandler this is called in needs to have been registered using registerInputHandler.
Return :
getRegisteredHandlers
Handlers getRegisteredHandlers() const
Get handlers registered for data, input and end-of-stream handling.
Do not call concurrrently with the corresponding register[Data|Input|EndOfStreamEvent]Handler() methods.
indexToMetaData
const MetaData& indexToMetaData(unsigned int index) const
Return the data source identifier pertinent to a data token at a given index. Validity time of the object corresponds to lifetime of the InputHandler callback. Also the InputHandler this is called in needs to have been registered using registerInputHandler.
index
:
Return :
Throw : A KARABO_LOGIC_EXCEPTION of there is no meta data available for the given index.
read
const MetaData& read(karabo::data::Hash& data, size_t idx = 0)
Read data from the InputChannel - to be called inside an InputHandler callback
Kept for backward compatibility only since internally the data is copied! Use one of the other read methods instead.
data
: reference that will hold the data
idx
: of the data token to read from the available data tokens. Use InputChannel::size to request
number of available tokens
Return : meta data associated to the data token. Lifetime of the object corresponds to live time of the InputHandler callback.
karabo::data::Hash::Pointer read(size_t idx = 0)
Read data from the InputChannel - to be called inside an InputHandler callback
idx
: of the data token to read from the available data tokens. Use InputChannel::size to request
number of available tokens
Return : the data as a pointer
karabo::data::Hash::Pointer read(size_t idx, MetaData& source)
Read data and meta data from the InputChannel - to be called inside an InputHandler callback
idx
: of the data token to read from the available data tokens. Use InputChannel::size to request
number of available tokens
source
: reference that will hold the meta data
Return : the data as a pointer
reconfigure
void reconfigure(const karabo::data::Hash& config, bool allowMissing = true)
Reconfigure InputChannel Disconnects any previous "connectedOutputChannels" if not in config["connectedOutputChannels"].
config
: as needed by the constructor
allowMissing
: if true, lack of keys "dataDistribution", "minData", "onSlowness", "delayOnInput"
and "respondToEndOfStream" is OK and their respective previous configuration is kept,
if false, an exception is thrown when these keys are missing in config
registerDataHandler
void registerDataHandler(const DataHandler& ioDataHandler)
Register handler to be called for each data item that arrives.
Note: The internal variable that stores the handler is neither protected against concurrent calls to getRegisteredHandlers() nor to concurrent usage of the handler when data arrives, i.e. this registration must not be called if (being) connected to any output channel.
registerEndOfStreamEventHandler
void registerEndOfStreamEventHandler(const InputHandler& endOfStreamEventHandler)
Register handler to be called when connected output channels inform about end-of-stream. If connected to more than one output channel, the handler is called if the last of them sends the end-of-stream signal.
Note: The internal variable that stores the handler is neither protected against concurrent calls to getRegisteredHandlers() nor to concurrent usage of the handler when the end-of-stream signal arrives, i.e. this registration must not be called if (being) connected to any outpu channel.
registerInputHandler
void registerInputHandler(const InputHandler& ioInputHandler)
Register handler to be called when new data has arrived. For each index i from 0 to < size(), data and meta data can be received via read(i, dataHash) and readMetaData(i, metaData), respectively.
Note: The internal variable that stores the handler is neither protected against concurrent calls to getRegisteredHandlers() nor to concurrent usage of the handler when data arrives, i.e. this registration must not be called if (being) connected to any output channel.
size
size_t size()
Number of data tokens - to be called inside an InputHandler callback
sourceToIndices
std::vector<unsigned int> sourceToIndices(const std::string& source) const
Return the list of indices of the data tokens (for read(index) ) for a given source identifier. Multiple indices may be returned if the same source was appended more than once in one batch write. Indices increase monotonically in insertion order of the write operations. Validity time of the indices corresponds to lifetime of the InputHandler callback. Also the InputHandler this is called in needs to have been registered using registerInputHandler.
source
:
Return :
trainIdToIndices
std::vector<unsigned int> trainIdToIndices(unsigned long long trainId) const
Return the list of indices of the data tokens (for read(index) ) for a given train id. Multiple indices may be returned if the same source was appended more than once in one batch write. Indices increase monotonically in insertion order of the write operations. Validity time of the indices corresponds to lifetime of the InputHandler callback. Also the InputHandler this is called in needs to have been registered using registerInputHandler.
source
:
Return :
updateOutputChannelConfiguration
void updateOutputChannelConfiguration(const std::string& outputChannelString, const karabo::data::Hash& config = karabo::data::Hash())
Update list of output channels that can be connected
outputChannelString
: string that can later be used as key "outputChannelString" of Hash argument to
connect
config
: kept for backward compatibility