Skip to content

InfluxDbClient

class InfluxDbClient : public std::enable_shared_from_this<InfluxDbClient>

This class uses HTTP protocol for communications with InfluxDB server. The protocol follows request/response pattern and before sending next request the response from the current one should be received. Only one request/response session per connection is allowed. To follow this rule the internal queue is used. Any request (functor) first is pushed into internal queue and then checked the internal state if some current request/response session is ongoing. If not, the next request is popped from front side of internal queue and executed. The internal flag (state) is raised. When response callback is called it checks if the internal queue has next entry and if so this entry is popped and executed. If not, the internal flag is lowered. For the time being the internal queue has no limits defined so it is possible that if the client cannot cope with input load rate some overflow condition can be encountered. The practice should show how we can handle these problems.

Functions

Name Description
startDbConnectIfDisconnected Check if connection is lost and try to re-establish connection to InfluxDB server hook : function that will be called when connection is established
isConnected Returns true if connection is established to InfluxDB server
influxVersion @brief The version of the InfluxDb server the client is connected to.
serverUrl @brief The url of the InfluxDb server the client is connected to (or supposed to connect to).
queryDb HTTP request "GET /query ..." to InfluxDB server is registered in internal queue. Can be called with connection to InfluxDB or without. Blocking if no connection exists. Otherwise non-blocking. statement : is SELECT expression. action : callback: void(const HttpResponse&) is called when response comes from InfluxDB server
postQueryDb HTTP request "POST /query ..." to InfluxDB server is registered in internal queue. statement : SELECT, SHOW, DROP and others QL commands action : callback: void(const HttpResponse&) is called when response comes from InfluxDB server
flushBatch Flushes the contents of the write buffer to the InfluxDb.
getPingDb HTTP request "GET /ping ..." to InfluxDB server is registered in internal queue. action : callback: void(const HttpResponse&) is called when response comes from InfluxDB server
generateUUID Returns UUID used as Request-ID for HTTP requests
connectWait Returns true if connection is established in "millis" time range, otherwise timeout condition comes up and returns false.
writeDb Writing HTTP request message : formed in compliance of HTTP protocol Malformed requests resulting in response code 4xx requestId : unique id of the HTTP request to be sent to Influx.
onDbConnect Low-level callback called when connection to InfluxDB is established
onDbRead Low-level callback called when reading is done
onDbWrite Low-level callback called when writing into networks interface is done
postWriteDb HTTP request "POST /write ..." to InfluxDB server is registered in internal queue. batch : is a bunch of lines following InfluxDB "line protocol" separated by newline ('\n') the "line protocol" is detailed at https://influxdbcom.readthedocs.io/en/latest/content/docs/v0.9/write_protocols/write_syntax/ action : callback is called when acknowledgment (response) comes from InfluxDB server. The callback signature is void(const HttpResponse&). The success error code in HttpResponse structure is 204.
postQueryDbTask Actual "POST /query ..." is accomplished. Non-blocking call. The connection to InfluxDB has to be established before this call
getPingDbTask Actual "GET /ping ..." is accomplished. Non-blocking call. The connection to InfluxDB has to be established before this call
postWriteDbTask Actual "POST /write ..." is accomplished. Non-blocking call, Connection should be established before this call
queryDbTask Actual "GET /query ..." is accomplished. If no connection to DB, this call is blocked until the connection is established. Otherwise the call is non-blocking.
onResponse Generic wrap callback is called and call in turn the user "action".
tryNextRequest Try to take the next request from internal queue and execute it.
sendToInfluxDb Send HTTP request to InfluxDb.
getRawBasicAuthHeader Gets the raw form of the http Authorization header with values of dbUser and dbPassword separated by a colon and base64 encoded.
handleHttpReadError Handle unrecoverable read and parsing errors while processing HTTP responses from Influx.

Function Details

connectWait

bool connectWait(std::size_t millis)

Returns true if connection is established in "millis" time range, otherwise timeout condition comes up and returns false.

millis : time in milliseconds to wait for connection to be established

Return : true if connection established, or false in case of timeout

flushBatch

void flushBatch(const InfluxResponseHandler& respHandler = InfluxResponseHandler())

Flushes the contents of the write buffer to the InfluxDb.

respHandler : If defined, the handler function will be called with the response sent by Influx after it accepted and processed the current batch of updates. If not defined, the flushBatch will work in a call-and-forget mode.

generateUUID

static std::string generateUUID()

Returns UUID used as Request-ID for HTTP requests

getPingDb

void getPingDb(const InfluxResponseHandler& action)

HTTP request "GET /ping ..." to InfluxDB server is registered in internal queue.

action : callback: void(const HttpResponse&) is called when response comes from InfluxDB server

getPingDbTask

void getPingDbTask(const InfluxResponseHandler& action)

Actual "GET /ping ..." is accomplished. Non-blocking call. The connection to InfluxDB has to be established before this call

getRawBasicAuthHeader

std::string getRawBasicAuthHeader()

Gets the raw form of the http Authorization header with values of dbUser and dbPassword separated by a colon and base64 encoded.

Return : The raw form of the Authorization header.

handleHttpReadError

void handleHttpReadError(const std::string& errMsg, const std::string& requestId, bool logAsError = true)

Handle unrecoverable read and parsing errors while processing HTTP responses from Influx.

The recovery involves recycling the network connection, as there is no way to recover synchronism in the read operation within the current connection after those kind of errors happen. Also generates an HTTP response with status code 700 and an error message to communicate to users of the InfluxDbClient instance.

errMsg : the error message to be put in the generated 700 coded http response.

requestId : the unique identifier of the HTTP request whose response could not be processed (needed to update the internal bookeeping of the InfluxDb client).

logEsError : if true (default), log as info, else as error

influxVersion

std::string influxVersion()

@brief The version of the InfluxDb server the client is connected to.

Return : std::string the connected InfluxDb server version (empty if no server is currently connected).

isConnected

bool isConnected()

Returns true if connection is established to InfluxDB server

onDbConnect

void onDbConnect(const karabo::net::ErrorCode& ec, const karabo::net::Channel::Pointer& channel, const InfluxConnectedHandler& hook)

Low-level callback called when connection to InfluxDB is established

onDbRead

void onDbRead(const karabo::net::ErrorCode& ec, const std::string& data)

Low-level callback called when reading is done

onDbWrite

void onDbWrite(const karabo::net::ErrorCode& ec, std::shared_ptr<std::vector<char> > p)

Low-level callback called when writing into networks interface is done

onResponse

void onResponse(const HttpResponse& o, const InfluxResponseHandler& action)

Generic wrap callback is called and call in turn the user "action".

postQueryDb

void postQueryDb(const std::string& statement, const InfluxResponseHandler& action)

HTTP request "POST /query ..." to InfluxDB server is registered in internal queue.

statement : SELECT, SHOW, DROP and others QL commands

action : callback: void(const HttpResponse&) is called when response comes from InfluxDB server

postQueryDbTask

void postQueryDbTask(const std::string& statement, const InfluxResponseHandler& action)

Actual "POST /query ..." is accomplished. Non-blocking call. The connection to InfluxDB has to be established before this call

postWriteDb

void postWriteDb(const std::string& batch, const InfluxResponseHandler& action)

HTTP request "POST /write ..." to InfluxDB server is registered in internal queue.

batch : is a bunch of lines following InfluxDB "line protocol" separated by newline ('\n') the "line protocol" is detailed at https://influxdbcom.readthedocs.io/en/latest/content/docs/v0.9/write_protocols/write_syntax/

action : callback is called when acknowledgment (response) comes from InfluxDB server. The callback signature is void(const HttpResponse&). The success error code in HttpResponse structure is 204.

postWriteDbTask

void postWriteDbTask(const std::string& batch, const InfluxResponseHandler& action)

Actual "POST /write ..." is accomplished. Non-blocking call, Connection should be established before this call

queryDb

void queryDb(const std::string& statement, const InfluxResponseHandler& action)

HTTP request "GET /query ..." to InfluxDB server is registered in internal queue. Can be called with connection to InfluxDB or without. Blocking if no connection exists. Otherwise non-blocking.

statement : is SELECT expression.

action : callback: void(const HttpResponse&) is called when response comes from InfluxDB server

queryDbTask

void queryDbTask(const std::string& statement, const InfluxResponseHandler& action)

Actual "GET /query ..." is accomplished. If no connection to DB, this call is blocked until the connection is established. Otherwise the call is non-blocking.

sendToInfluxDb

void sendToInfluxDb(const std::string& msg, const InfluxResponseHandler& action, const std::string& requestId)

Send HTTP request to InfluxDb. Helper function.

Wraps the given InfluxResponseHandler within a callback to onResponse. It will be up to onResponse to call the action InfluxResponseHandler and keep the consumption of requests submitted to the InfluxDbClient going.

serverUrl

std::string serverUrl() noexcept

@brief The url of the InfluxDb server the client is connected to (or supposed to connect to).

Return : std::string the InfluxDb server url.

startDbConnectIfDisconnected

void startDbConnectIfDisconnected(const InfluxConnectedHandler& hook = InfluxConnectedHandler())

Check if connection is lost and try to re-establish connection to InfluxDB server

hook : function that will be called when connection is established

tryNextRequest

void tryNextRequest(std::unique_lock<std::mutex>& requestQueueLock)

Try to take the next request from internal queue and execute it. Set internal state to be "active" if it was not. Helper function.

requestQueueLock : must be locked scoped_lock of m_requestQueueMutex, will be unlocked afterwards

writeDb

void writeDb(const std::string& message, const std::string& requestId)

Writing HTTP request

message : formed in compliance of HTTP protocol Malformed requests resulting in response code 4xx

requestId : unique id of the HTTP request to be sent to Influx.