libics.tools.database

amqp

class libics.tools.database.amqp.AmqpApiBase(instance_id='default', _random_id=None)

Bases: object

AMQP RPC API base class.

Parameters
amqp_clientAmqpConnection

AMQP client.

instance_idstr

Unique name of the API instance.

Notes

Usage:

  • Subclass this base class and set the class attribute API_ID to a unique API-identifying string.

  • Subclasses to a base API, which should use the same exchange as the parent API can be distinguished by setting the class attribute API_SUB_ID.

  • Set an appropriate API_VERSION.

  • The methods that should be accessible via the RPC API should be decorated with api_method().

  • Using AmqpRpcFactory, these API classes can be turned into an RPC server or client class.

Methods

get_api()

Gets a list of all API method names.

get_api_signature(func_id)

Gets the call signature of the API method.

get_api_specifications()

Gets the API specifications.

help(func_id)

Gets the docstring of the API method.

ping()

Returns True.

ping_args(*args, **kwargs)

Returns any passed parameters.

register_cb_on_entry(method_name, func)

Adds a callback function called before executing the API method.

register_cb_on_exit(method_name, func)

Adds a callback function called after executing the API method.

get_exchange_id

get_queue_id

get_routing_key

API_ID = 'TEST'
API_METHODS = {'get_api', 'get_api_signature', 'get_api_specifications', 'help', 'ping', 'ping_args'}
API_REPLIES = {'get_api', 'get_api_signature', 'get_api_specifications', 'help', 'ping', 'ping_args'}
API_SUB_ID = 'default'
API_TIMEOUTS = {'get_api': 1, 'get_api_signature': 1, 'get_api_specifications': 1, 'help': 1, 'ping': 1, 'ping_args': 1}
API_VERSION = '0.0.0'
LOGGER = <Logger libics.tools.database.amqp.AmqpApiBase (WARNING)>
get_api() List[str]

Gets a list of all API method names.

get_api_signature(func_id: str) dict

Gets the call signature of the API method.

Returns
retdict

Dictionary describing the signature with the following items:

func_idstr

API method name.

arg_nameslist(str)

Names of positional arguments in order.

kwarg_nameslist(str)

Names of keyword arguments.

returnbool

Whether a value is returned. If True, the argument name of the reply in types is “return”.

arg_varbool

Whether variable positional arguments are valid.

kwarg_varbool

Whether variable keyword arguments are valid.

typesdict(str->list(str))

Dictionary mapping arg/kwarg/return name to a list of types. Valid types: “null”, “bool”, “int”, “float”, “str”, “json”. Type “json” means that the parameter has no type hint or has a more complicated structure.

get_api_specifications() dict

Gets the API specifications.

Returns
specsdict(str->Any)

Dictionary containing the class variables “API_METHODS”, “API_REPLIES”, “API_TIMEOUTS”.

classmethod get_exchange_id()
get_queue_id()
get_routing_key()
help(func_id: str) str

Gets the docstring of the API method.

ping() bool

Returns True.

ping_args(*args, **kwargs) Any

Returns any passed parameters.

register_cb_on_entry(method_name, func)

Adds a callback function called before executing the API method.

Parameters
method_namestr

API method name.

funccallable

Callback function. Must accept the same arguments as the API method and may only raise AmqpLocalError.

register_cb_on_exit(method_name, func)

Adds a callback function called after executing the API method.

Parameters
method_namestr

API method name.

funccallable

Callback function. Must accept the return value of the API method as arguments and may only raise AmqpLocalError.

class libics.tools.database.amqp.AmqpConnection(config=None, **kwargs)

Bases: object

RPC base class via AMQP message queues.

Parameters
configstr or dict

Configuration file path or dictionary specifying below parameters. Duplicate parameters overwrite the config parameters.

hoststr

Message broker server IP.

portint

Message broker server port.

credentialsdict(str->str)

Message broker access credentials with items: “username”, “password”.

blockingbool

Whether to establish a blocking connection.

Attributes
credentials

Methods

close()

Closes the AMQP connection.

connect([callback])

Sets up an AMQP connection and channel.

discover_configs([config_path])

Returns a list of all file names within the default configuration path.

is_connected()

Returns whether the AMQP connection is established.

is_running()

Returns whether the AMQP I/O loop is running.

join()

Waits for the I/O loop to stop.

run()

Starts the AMQP I/O loop.

stop()

Stops the AMQP I/O loop.

find_configs

get_url

handle_error

CONFIG_PATH = 'C:\\Users\\David\\.libics\\tools\\database\\amqp'
LOGGER = <Logger libics.tools.database.amqp.AmqpBase (WARNING)>
close()

Closes the AMQP connection.

connect(callback=None)

Sets up an AMQP connection and channel.

property credentials
static discover_configs(config_path=None)

Returns a list of all file names within the default configuration path.

static find_configs(config_path=None)
get_url(connect_status=True)
handle_error(*_, err_raise=40, err_msg='')
is_connected()

Returns whether the AMQP connection is established.

is_running()

Returns whether the AMQP I/O loop is running.

join()

Waits for the I/O loop to stop.

run()

Starts the AMQP I/O loop.

stop()

Stops the AMQP I/O loop.

Note that this function is non-blocking; if applicable, call join() afterwards.

exception libics.tools.database.amqp.AmqpLocalError

Bases: RuntimeError

exception libics.tools.database.amqp.AmqpRemoteError

Bases: RuntimeError

exception libics.tools.database.amqp.AmqpReplyTimeoutError

Bases: RuntimeError

class libics.tools.database.amqp.AmqpRpcBase

Bases: object

AMQP RPC base object.

Notes

AMQP specifications:

  • Messages (RPC calls) are sent to an AMQP topic exchange with the exchange name matching API_ID, by convention this should be all caps.

  • The AMQP routing keys have the form API_ID.API_SUB_ID.instance_id.*.

  • The AMQP queue instances have an additional random string attached, i.e., API_ID.SUBAPI_ID.instance_id.random_id. The random string is used to allow for multiple code instances to subscribe to the same API instance (e.g. one code instance executes the commands, another code instance logs the API traffic).

Message specifications:

{
    "__meta__": {
        "__rpc_version": "x.y.z",
        "__api_version": "x.y.z"
    },
    "func_id": "my_unique_function_identifier",
    "func_args": [
        arg0, arg1, ...
    ],
    "func_kwargs": {
        "kw0": kwarg0, "kw1": kwarg1, ...
    }
}
Attributes
API

Methods

close_amqp()

Closes an AMQP connection.

connect_amqp()

Establishes an AMQP connection.

local_dispatcher(channel, method, ...)

Local dispatcher used by RPC server.

remote_dispatcher(func_id, *args, **kwargs)

Remote dispatcher used by RPC client.

setup_amqp([amqp_connection, blocking])

Sets up the AMQP connection object.

setup_api(*args[, api_object])

Sets up the API object.

deserialize_reply

deserialize_request

serialize_reply

serialize_request

API = None
RPC_VERSION = '0.0.0'
close_amqp()

Closes an AMQP connection.

connect_amqp()

Establishes an AMQP connection.

classmethod deserialize_reply(_msg)
classmethod deserialize_request(_msg)
local_dispatcher(channel, method, properties, body)

Local dispatcher used by RPC server.

remote_dispatcher(func_id, *args, **kwargs)

Remote dispatcher used by RPC client.

classmethod serialize_reply(*args, err=False)
classmethod serialize_request(func_id, *args, **kwargs)
setup_amqp(amqp_connection=None, blocking=True, **kwargs)

Sets up the AMQP connection object.

If passing an amqp_connection, a separate connection with its parameters is created and is overwritten by kwargs.

setup_api(*args, api_object=None, **kwargs)

Sets up the API object.

class libics.tools.database.amqp.AmqpRpcFactory

Bases: object

Factory class that turns API classes into RPC servers/clients.

Methods

make_rpc_server(Api, cls_name)

Creates an AMQP RPC server class.

make_rpc_client(Api, cls_name)

Creates an AMQP RPC client class.

classmethod make_dynamic_rpc_client(api_id, api_sub_id='default', instance_id='default', cls_name=None)

Creates an AMQP RPC client class.

API method calls are sent via an AMQP broker to a remote server.

Parameters
api_id, api_sub_id, instance_idstr

API ID, API sub ID and instance ID determining the AMQP exchange from which to obtain the API specifications.

classmethod make_rpc_client(Api: libics.tools.database.amqp.AmqpApiBase, cls_name: str)

Creates an AMQP RPC client class.

API method calls are sent via an AMQP broker to a remote server.

classmethod make_rpc_server(Api: libics.tools.database.amqp.AmqpApiBase, cls_name: str)

Creates an AMQP RPC server class.

Accepts API method calls via an AMQP broker from a remote client.

libics.tools.database.amqp.api_method(reply=False, timeout=1)

Decorator registering a method as API method.

Parameters
retbool

Whether to expect a reply value.

timeoutfloat

API reply timeout in seconds (s).

influx

class libics.tools.database.influx.InfluxDB(config=None, **kwargs)

Bases: object

InfluxDB v2 client.

Parameters
configstr or dict

Configuration file path or dictionary specifying below parameters. Duplicate parameters overwrite the config parameters.

hoststr

Database server IP.

portint

Database server port.

orgstr

InfluxDB organisation.

tokenstr

InfluxDB token. Needs read/write access.

timeoutstr

Query timeout in seconds (s).

asynchrbool

Whether to communicate synchronous or asynchronous.

default_bucket, default_measurementstr

Default bucket/measurement name.

Attributes
bucketslist(str)

List of bucket names in the database.

measurementsdict(str->list(str))

List of measurement values for each bucket.

tagsdict(str->list(str))

List of tag keys for each bucket. Excludes default keys like _field, _value, etc.

fieldsdict(str->list(str))

List of field keys for each bucket.

Methods

create_bucket(bucket)

Creates a bucket if it does not exist.

find_configs([config_path])

Returns a list of all file names within the default configuration path.

read_last_points(*args, **kwargs)

Queries the database for the last points.

read_measurement_values([bucket, measurement])

Queries the database and gets all measurement values.

read_points([bucket, start, stop, group, ...])

Queries the database.

read_tag_values([bucket, tag, measurement])

Queries the database and gets all tag values.

update_buckets()

Queries the database to update the buckets property.

update_fields([bucket])

Queries the database to update the fields property.

update_measurements([bucket])

Queries the database to update the measurements property.

update_tags([bucket])

Queries the database to update the tags property.

write_point([bucket, measurement, tags, ...])

Writes a point to the database.

write_points([bucket, data])

Writes multiple points to the database.

CONFIG_PATH = 'C:\\Users\\David\\.libics\\tools\\database\\influx'
LOGGER = <Logger libics.tools.database.influx.InfluxDB (WARNING)>
property buckets
create_bucket(bucket)

Creates a bucket if it does not exist.

Parameters
bucketstr

Name of new bucket.

property default_bucket
property default_measurement
property fields
static find_configs(config_path=None)

Returns a list of all file names within the default configuration path.

property measurements
read_last_points(*args, **kwargs)

Queries the database for the last points.

Wrapper for read_points().

Queries the database.

Parameters
bucketstr

Bucket name.

start, stopstr or datetime.datetime or pd.Timestamp

Extracted time range.

grouplist(str)

Groups by the given tags.

windowstr

Time window per extracted point. E.g.: “1m”, “2h”, …

functionstr

Function applied to aggregated points. E.g.: “mean”, “median”, “last”, ….

rmv_nanbool

Whether to remove windows containing no data.

funcslist(str)

List of functions applied to query tables.

measurementlist(str)

Filter for measurement values.

fieldlist(str)

Filter for field keys. Note that all extracted keys must have the same data type.

**tagsstr

Filter for tag keys and values.

Returns
dsDataSequence

Data sequence with the following columns: “time”, “measurement”, “field”, “value”, <tags>.

read_measurement_values(bucket=None, measurement=None)

Queries the database and gets all measurement values.

read_points(bucket=None, start='-1d', stop=None, group=None, window=None, function=None, rmv_nan=True, funcs=None, measurement=None, field=None, _check_params=True, **tags)

Queries the database.

Parameters
bucketstr

Bucket name.

start, stopstr or datetime.datetime or pd.Timestamp

Extracted time range.

grouplist(str)

Groups by the given tags.

windowstr

Time window per extracted point. E.g.: “1m”, “2h”, …

functionstr

Function applied to aggregated points. E.g.: “mean”, “median”, “last”, ….

rmv_nanbool

Whether to remove windows containing no data.

funcslist(str)

List of functions applied to query tables.

measurementlist(str)

Filter for measurement values.

fieldlist(str)

Filter for field keys. Note that all extracted keys must have the same data type.

**tagsstr

Filter for tag keys and values.

Returns
dsDataSequence

Data sequence with the following columns: “time”, “measurement”, “field”, “value”, <tags>.

read_tag_values(bucket=None, tag=None, measurement=True)

Queries the database and gets all tag values.

Parameters
bucketstr

Bucket name.

tagstr

Tag key from which to obtain values.

measurementstr or bool or None

If str, gets only tag values with the given measurement value. If True, gets all tag values. If None, uses default_measurement in str mode.

Returns
tag_valueslist(str)

List of tag values.

property tags
update_buckets()

Queries the database to update the buckets property.

update_fields(bucket=None)

Queries the database to update the fields property.

update_measurements(bucket=None)

Queries the database to update the measurements property.

update_tags(bucket=None)

Queries the database to update the tags property.

property url
write_point(bucket=None, measurement=None, tags=None, fields=None, time=None)

Writes a point to the database.

Parameters
bucketstr

Bucket name.

measurementstr

Measurement value.

tagsdict(str->str) or None

Dictionary containing tag keys and values.

fieldsdict(str->Any) or None

Dictionary containing field keys and values.

timeint or None

Timestamp in nanoseconds (ns). If None, uses the current time.

write_points(bucket=None, data=None)

Writes multiple points to the database.

Parameters
bucketstr

Bucket name.

datalist(dict)

List of dictionaries containing the keyword arguments for write_point().