InfluxDbClient
class InfluxDbClient : public std::enable_shared_from_this<InfluxDbClient>
This class uses HTTP protocol for communications with InfluxDB server. The protocol follows request/response pattern and before sending next request the response from the current one should be received. Only one request/response session per connection is allowed. To follow this rule the internal queue is used. Any request (functor) first is pushed into internal queue and then checked the internal state if some current request/response session is ongoing. If not, the next request is popped from front side of internal queue and executed. The internal flag (state) is raised. When response callback is called it checks if the internal queue has next entry and if so this entry is popped and executed. If not, the internal flag is lowered. For the time being the internal queue has no limits defined so it is possible that if the client cannot cope with input load rate some overflow condition can be encountered. The practice should show how we can handle these problems.
Functions
| Name | Description |
|---|---|
| startDbConnectIfDisconnected | Check if connection is lost and try to re-establish connection to InfluxDB server hook : function that will be called when connection is established |
| isConnected | Returns true if connection is established to InfluxDB server |
| influxVersion | @brief The version of the InfluxDb server the client is connected to. |
| serverUrl | @brief The url of the InfluxDb server the client is connected to (or supposed to connect to). |
| queryDb | HTTP request "GET /query ..." to InfluxDB server is registered in internal queue. Can be called with connection to InfluxDB or without. Blocking if no connection exists. Otherwise non-blocking. statement : is SELECT expression. action : callback: void(const HttpResponse&) is called when response comes from InfluxDB server |
| postQueryDb | HTTP request "POST /query ..." to InfluxDB server is registered in internal queue. statement : SELECT, SHOW, DROP and others QL commands action : callback: void(const HttpResponse&) is called when response comes from InfluxDB server |
| flushBatch | Flushes the contents of the write buffer to the InfluxDb. |
| getPingDb | HTTP request "GET /ping ..." to InfluxDB server is registered in internal queue. action : callback: void(const HttpResponse&) is called when response comes from InfluxDB server |
| generateUUID | Returns UUID used as Request-ID for HTTP requests |
| connectWait | Returns true if connection is established in "millis" time range, otherwise timeout condition comes up and returns false. |
| writeDb | Writing HTTP request message : formed in compliance of HTTP protocol Malformed requests resulting in response code 4xx requestId : unique id of the HTTP request to be sent to Influx. |
| onDbConnect | Low-level callback called when connection to InfluxDB is established |
| onDbRead | Low-level callback called when reading is done |
| onDbWrite | Low-level callback called when writing into networks interface is done |
| postWriteDb | HTTP request "POST /write ..." to InfluxDB server is registered in internal queue. batch : is a bunch of lines following InfluxDB "line protocol" separated by newline ('\n') the "line protocol" is detailed at https://influxdbcom.readthedocs.io/en/latest/content/docs/v0.9/write_protocols/write_syntax/ action : callback is called when acknowledgment (response) comes from InfluxDB server. The callback signature is void(const HttpResponse&). The success error code in HttpResponse structure is 204. |
| postQueryDbTask | Actual "POST /query ..." is accomplished. Non-blocking call. The connection to InfluxDB has to be established before this call |
| getPingDbTask | Actual "GET /ping ..." is accomplished. Non-blocking call. The connection to InfluxDB has to be established before this call |
| postWriteDbTask | Actual "POST /write ..." is accomplished. Non-blocking call, Connection should be established before this call |
| queryDbTask | Actual "GET /query ..." is accomplished. If no connection to DB, this call is blocked until the connection is established. Otherwise the call is non-blocking. |
| onResponse | Generic wrap callback is called and call in turn the user "action". |
| tryNextRequest | Try to take the next request from internal queue and execute it. |
| sendToInfluxDb | Send HTTP request to InfluxDb. |
| getRawBasicAuthHeader | Gets the raw form of the http Authorization header with values of dbUser and dbPassword separated by a colon and base64 encoded. |
| handleHttpReadError | Handle unrecoverable read and parsing errors while processing HTTP responses from Influx. |
Function Details
connectWait
bool connectWait(std::size_t millis)
Returns true if connection is established in "millis" time range, otherwise timeout condition comes up and returns false.
millis
: time in milliseconds to wait for connection to be established
Return : true if connection established, or false in case of timeout
flushBatch
void flushBatch(const InfluxResponseHandler& respHandler = InfluxResponseHandler())
Flushes the contents of the write buffer to the InfluxDb.
respHandler
: If defined, the handler function will be called with the response
sent by Influx after it accepted and processed the current batch of updates. If not
defined, the flushBatch will work in a call-and-forget mode.
generateUUID
static std::string generateUUID()
Returns UUID used as Request-ID for HTTP requests
getPingDb
void getPingDb(const InfluxResponseHandler& action)
HTTP request "GET /ping ..." to InfluxDB server is registered in internal queue.
action
: callback: void(const HttpResponse&) is called when response comes
from InfluxDB server
getPingDbTask
void getPingDbTask(const InfluxResponseHandler& action)
Actual "GET /ping ..." is accomplished. Non-blocking call. The connection to InfluxDB has to be established before this call
getRawBasicAuthHeader
std::string getRawBasicAuthHeader()
Gets the raw form of the http Authorization header with values of dbUser and dbPassword separated by a colon and base64 encoded.
Return : The raw form of the Authorization header.
handleHttpReadError
void handleHttpReadError(const std::string& errMsg, const std::string& requestId, bool logAsError = true)
Handle unrecoverable read and parsing errors while processing HTTP responses from Influx.
The recovery involves recycling the network connection, as there is no way to recover synchronism in the read operation within the current connection after those kind of errors happen. Also generates an HTTP response with status code 700 and an error message to communicate to users of the InfluxDbClient instance.
errMsg
: the error message to be put in the generated 700
coded http response.
requestId
: the unique identifier of the HTTP request whose
response could not be processed (needed to update
the internal bookeeping of the InfluxDb client).
logEsError
: if true (default), log as info, else as error
influxVersion
std::string influxVersion()
@brief The version of the InfluxDb server the client is connected to.
Return : std::string the connected InfluxDb server version (empty if no server is currently connected).
isConnected
bool isConnected()
Returns true if connection is established to InfluxDB server
onDbConnect
void onDbConnect(const karabo::net::ErrorCode& ec, const karabo::net::Channel::Pointer& channel, const InfluxConnectedHandler& hook)
Low-level callback called when connection to InfluxDB is established
onDbRead
void onDbRead(const karabo::net::ErrorCode& ec, const std::string& data)
Low-level callback called when reading is done
onDbWrite
void onDbWrite(const karabo::net::ErrorCode& ec, std::shared_ptr<std::vector<char> > p)
Low-level callback called when writing into networks interface is done
onResponse
void onResponse(const HttpResponse& o, const InfluxResponseHandler& action)
Generic wrap callback is called and call in turn the user "action".
postQueryDb
void postQueryDb(const std::string& statement, const InfluxResponseHandler& action)
HTTP request "POST /query ..." to InfluxDB server is registered in internal queue.
statement
: SELECT, SHOW, DROP and others QL commands
action
: callback: void(const HttpResponse&) is called when response comes
from InfluxDB server
postQueryDbTask
void postQueryDbTask(const std::string& statement, const InfluxResponseHandler& action)
Actual "POST /query ..." is accomplished. Non-blocking call. The connection to InfluxDB has to be established before this call
postWriteDb
void postWriteDb(const std::string& batch, const InfluxResponseHandler& action)
HTTP request "POST /write ..." to InfluxDB server is registered in internal queue.
batch
: is a bunch of lines following InfluxDB "line protocol" separated by newline ('\n')
the "line protocol" is detailed at
https://influxdbcom.readthedocs.io/en/latest/content/docs/v0.9/write_protocols/write_syntax/
action
: callback is called when acknowledgment (response) comes from InfluxDB
server. The callback signature is void(const HttpResponse&). The success
error code in HttpResponse structure is 204.
postWriteDbTask
void postWriteDbTask(const std::string& batch, const InfluxResponseHandler& action)
Actual "POST /write ..." is accomplished. Non-blocking call, Connection should be established before this call
queryDb
void queryDb(const std::string& statement, const InfluxResponseHandler& action)
HTTP request "GET /query ..." to InfluxDB server is registered in internal queue. Can be called with connection to InfluxDB or without. Blocking if no connection exists. Otherwise non-blocking.
statement
: is SELECT expression.
action
: callback: void(const HttpResponse&) is called when response comes
from InfluxDB server
queryDbTask
void queryDbTask(const std::string& statement, const InfluxResponseHandler& action)
Actual "GET /query ..." is accomplished. If no connection to DB, this call is blocked until the connection is established. Otherwise the call is non-blocking.
sendToInfluxDb
void sendToInfluxDb(const std::string& msg, const InfluxResponseHandler& action, const std::string& requestId)
Send HTTP request to InfluxDb. Helper function.
Wraps the given InfluxResponseHandler within a callback to onResponse. It will be up to onResponse to call the action InfluxResponseHandler and keep the consumption of requests submitted to the InfluxDbClient going.
serverUrl
std::string serverUrl() noexcept
@brief The url of the InfluxDb server the client is connected to (or supposed to connect to).
Return : std::string the InfluxDb server url.
startDbConnectIfDisconnected
void startDbConnectIfDisconnected(const InfluxConnectedHandler& hook = InfluxConnectedHandler())
Check if connection is lost and try to re-establish connection to InfluxDB server
hook
: function that will be called when connection is established
tryNextRequest
void tryNextRequest(std::unique_lock<std::mutex>& requestQueueLock)
Try to take the next request from internal queue and execute it. Set internal state to be "active" if it was not. Helper function.
requestQueueLock
: must be locked scoped_lock of m_requestQueueMutex, will be unlocked
afterwards
writeDb
void writeDb(const std::string& message, const std::string& requestId)
Writing HTTP request
message
: formed in compliance of HTTP protocol
Malformed requests resulting in response code 4xx
requestId
: unique id of the HTTP request to be sent to Influx.