Skip to content

InputChannel

class InputChannel : public std::enable_shared_from_this<InputChannel>

@brief The InputChannel class is used to receive data from pipelined processing OutputChannels.

The InputChannel class is used to receive data from pipelined processing OutputChannels. It additionally supports receiving meta data associated with each data token read. Specifically, the meta information contains source information, i.e. what produced the data token, and timing information, e.g. train ids.

void onInput(const InputChannel::Pointer& input) {

     for (unsigned int i = 0; i != input->size(); ++i) {
          Hash h;
          const InputChannel::MetaData& meta = input->read(i);
          std::cout<<"Source: <<meta.getSource()<<std::endl;
          std::cout<<"TrainId: <<meta.getTimestamp().getTrainId()<<std::endl;
     }
}

Types

Name Description
Handlers Container for InputChannel handlers that concern data handling.

Variables

Name Description
m_inputHandler Callback on available data (per InputChannel)
m_dataHandler Callback on available data (per item in InputChannel)
m_connectionsBeingSetup All 'outputChannelString' for that a connection attempt is currently ongoing, with their handlers and ids

Functions

Name Description
expectedParameters Necessary method as part of the factory/configuration system expected : [out] Description of expected parameters for this object (Schema)
InputChannel If this object is constructed using the factory/configuration system this method is called input : Validated (@see expectedParameters) and default-filled configuration
reconfigure Reconfigure InputChannel Disconnects any previous "connectedOutputChannels" if not in config["connectedOutputChannels"].
registerInputHandler Register handler to be called when new data has arrived.
registerDataHandler Register handler to be called for each data item that arrives.
registerEndOfStreamEventHandler Register handler to be called when connected output channels inform about end-of-stream.
getRegisteredHandlers Get handlers registered for data, input and end-of-stream handling.
dataQuantityRead Returns the number of bytes read since the last call of this method See karabo::util::TcpChannel::dataQuantityRead()
dataQuantityWritten Returns the number of bytes written since the last call of this method See karabo::util::TcpChannel::dataQuantityWritten()
getConnectedOutputChannels Returns a map of between "output channel string" and "output channel info" Hash outputChannelString (STRING) represented like "instanceId:channelName" outputChannelInfo contains connection parameters or is empty, depending on connection state. This contains all output channels that the InputChannel is configured for, irrespective whether currently connected or not. Return : map.
getConnectionStatus Provide a map between the output channels that are configured and their connection status.
read Read data from the InputChannel - to be called inside an InputHandler callback Kept for backward compatibility only since internally the data is copied!
read Read data from the InputChannel - to be called inside an InputHandler callback idx : of the data token to read from the available data tokens. Use InputChannel::size to request number of available tokens Return : the data as a pointer
read Read data and meta data from the InputChannel - to be called inside an InputHandler callback idx : of the data token to read from the available data tokens. Use InputChannel::size to request number of available tokens source : reference that will hold the meta data Return : the data as a pointer
size Number of data tokens - to be called inside an InputHandler callback
connect Asynchronously connect this input channel to the output channel described by the first argument outputChannelInfo : Hash with three keys - "outputChannelString": a string matching one of the configured "connectedOutputChannels" - "connectionType": a string - currently only "tcp" supported - "hostname": a string telling which host/ip address to connect to - "port": an unsigned int telling the port - "memoryLocation: string "remote" or "local" to tell whether other end is in another process or can share memory handler : indicates asynchronously (like via EventLoop::post) the success of the connection request
disconnect Disconnect and clean internals connectionString : One of the "connectedOutputChannels" given at construction
updateOutputChannelConfiguration Update list of output channels that can be connected outputChannelString : string that can later be used as key "outputChannelString" of Hash argument to connect config : kept for backward compatibility
getMetaData Get the current meta data for input data available on this input channel.
sourceToIndices Return the list of indices of the data tokens (for read(index) ) for a given source identifier.
trainIdToIndices Return the list of indices of the data tokens (for read(index) ) for a given train id.
indexToMetaData Return the data source identifier pertinent to a data token at a given index.
disconnectImpl Disconnect internals - needs protection by m_outputChannelsMutex outputChannelString : One of the "connectedOutputChannels" given at construction

Variable Details

m_connectionsBeingSetup

std::unordered_map<std::string, std::pair<unsigned int, std::function<void(const karabo::net::ErrorCode&)>>> m_connectionsBeingSetup

All 'outputChannelString' for that a connection attempt is currently ongoing, with their handlers and ids

m_dataHandler

DataHandler m_dataHandler

Callback on available data (per item in InputChannel)

m_inputHandler

InputHandler m_inputHandler

Callback on available data (per InputChannel)

Function Details

InputChannel

InputChannel(const karabo::data::Hash& config)

If this object is constructed using the factory/configuration system this method is called

input : Validated (@see expectedParameters) and default-filled configuration

connect

void connect(const karabo::data::Hash& outputChannelInfo, const std::function<void(const karabo::net::ErrorCode&)>& handler = std::function<void(const karabo::net::ErrorCode&)>())

Asynchronously connect this input channel to the output channel described by the first argument

outputChannelInfo : Hash with three keys - "outputChannelString": a string matching one of the configured "connectedOutputChannels" - "connectionType": a string - currently only "tcp" supported - "hostname": a string telling which host/ip address to connect to - "port": an unsigned int telling the port - "memoryLocation: string "remote" or "local" to tell whether other end is in another process or can share memory

handler : indicates asynchronously (like via EventLoop::post) the success of the connection request

dataQuantityRead

size_t dataQuantityRead()

Returns the number of bytes read since the last call of this method

See karabo::util::TcpChannel::dataQuantityRead()

dataQuantityWritten

size_t dataQuantityWritten()

Returns the number of bytes written since the last call of this method

See karabo::util::TcpChannel::dataQuantityWritten()

disconnect

void disconnect(const std::string& connectionString)

Disconnect and clean internals

connectionString : One of the "connectedOutputChannels" given at construction

disconnectImpl

void disconnectImpl(const std::string& outputChannelString)

Disconnect internals - needs protection by m_outputChannelsMutex

outputChannelString : One of the "connectedOutputChannels" given at construction

expectedParameters

static void expectedParameters(karabo::data::Schema& expected)

Necessary method as part of the factory/configuration system

expected : [out] Description of expected parameters for this object (Schema)

getConnectedOutputChannels

std::map<std::string, karabo::data::Hash> getConnectedOutputChannels()

Returns a map of between "output channel string" and "output channel info" Hash outputChannelString (STRING) represented like "instanceId:channelName" outputChannelInfo contains connection parameters or is empty, depending on connection state. This contains all output channels that the InputChannel is configured for, irrespective whether currently connected or not.

Return : map.

getConnectionStatus

std::unordered_map<std::string, karabo::net::ConnectionStatus> getConnectionStatus()

Provide a map between the output channels that are configured and their connection status.

Return : map

getMetaData

const std::vector<MetaData>& getMetaData() const

Get the current meta data for input data available on this input channel. Validity time of the object corresponds to lifetime of the InputHandler callback. Also the InputHandler this is called in needs to have been registered using registerInputHandler.

Return :

getRegisteredHandlers

Handlers getRegisteredHandlers() const

Get handlers registered for data, input and end-of-stream handling.

Do not call concurrrently with the corresponding register[Data|Input|EndOfStreamEvent]Handler() methods.

indexToMetaData

const MetaData& indexToMetaData(unsigned int index) const

Return the data source identifier pertinent to a data token at a given index. Validity time of the object corresponds to lifetime of the InputHandler callback. Also the InputHandler this is called in needs to have been registered using registerInputHandler.

index :

Return :

Throw : A KARABO_LOGIC_EXCEPTION of there is no meta data available for the given index.

read

const MetaData& read(karabo::data::Hash& data, size_t idx = 0)

Read data from the InputChannel - to be called inside an InputHandler callback

Kept for backward compatibility only since internally the data is copied! Use one of the other read methods instead.

data : reference that will hold the data

idx : of the data token to read from the available data tokens. Use InputChannel::size to request number of available tokens

Return : meta data associated to the data token. Lifetime of the object corresponds to live time of the InputHandler callback.

karabo::data::Hash::Pointer read(size_t idx = 0)

Read data from the InputChannel - to be called inside an InputHandler callback

idx : of the data token to read from the available data tokens. Use InputChannel::size to request number of available tokens

Return : the data as a pointer

karabo::data::Hash::Pointer read(size_t idx, MetaData& source)

Read data and meta data from the InputChannel - to be called inside an InputHandler callback

idx : of the data token to read from the available data tokens. Use InputChannel::size to request number of available tokens

source : reference that will hold the meta data

Return : the data as a pointer

reconfigure

void reconfigure(const karabo::data::Hash& config, bool allowMissing = true)

Reconfigure InputChannel Disconnects any previous "connectedOutputChannels" if not in config["connectedOutputChannels"].

config : as needed by the constructor

allowMissing : if true, lack of keys "dataDistribution", "minData", "onSlowness", "delayOnInput" and "respondToEndOfStream" is OK and their respective previous configuration is kept, if false, an exception is thrown when these keys are missing in config

registerDataHandler

void registerDataHandler(const DataHandler& ioDataHandler)

Register handler to be called for each data item that arrives.

Note: The internal variable that stores the handler is neither protected against concurrent calls to getRegisteredHandlers() nor to concurrent usage of the handler when data arrives, i.e. this registration must not be called if (being) connected to any output channel.

registerEndOfStreamEventHandler

void registerEndOfStreamEventHandler(const InputHandler& endOfStreamEventHandler)

Register handler to be called when connected output channels inform about end-of-stream. If connected to more than one output channel, the handler is called if the last of them sends the end-of-stream signal.

Note: The internal variable that stores the handler is neither protected against concurrent calls to getRegisteredHandlers() nor to concurrent usage of the handler when the end-of-stream signal arrives, i.e. this registration must not be called if (being) connected to any outpu channel.

registerInputHandler

void registerInputHandler(const InputHandler& ioInputHandler)

Register handler to be called when new data has arrived. For each index i from 0 to < size(), data and meta data can be received via read(i, dataHash) and readMetaData(i, metaData), respectively.

Note: The internal variable that stores the handler is neither protected against concurrent calls to getRegisteredHandlers() nor to concurrent usage of the handler when data arrives, i.e. this registration must not be called if (being) connected to any output channel.

size

size_t size()

Number of data tokens - to be called inside an InputHandler callback

sourceToIndices

std::vector<unsigned int> sourceToIndices(const std::string& source) const

Return the list of indices of the data tokens (for read(index) ) for a given source identifier. Multiple indices may be returned if the same source was appended more than once in one batch write. Indices increase monotonically in insertion order of the write operations. Validity time of the indices corresponds to lifetime of the InputHandler callback. Also the InputHandler this is called in needs to have been registered using registerInputHandler.

source :

Return :

trainIdToIndices

std::vector<unsigned int> trainIdToIndices(unsigned long long trainId) const

Return the list of indices of the data tokens (for read(index) ) for a given train id. Multiple indices may be returned if the same source was appended more than once in one batch write. Indices increase monotonically in insertion order of the write operations. Validity time of the indices corresponds to lifetime of the InputHandler callback. Also the InputHandler this is called in needs to have been registered using registerInputHandler.

source :

Return :

updateOutputChannelConfiguration

void updateOutputChannelConfiguration(const std::string& outputChannelString, const karabo::data::Hash& config = karabo::data::Hash())

Update list of output channels that can be connected

outputChannelString : string that can later be used as key "outputChannelString" of Hash argument to connect

config : kept for backward compatibility