libics.tools.database
amqp
- class libics.tools.database.amqp.AmqpApiBase(instance_id='default', _random_id=None)
Bases:
objectAMQP RPC API base class.
- Parameters
- amqp_clientAmqpConnection
AMQP client.
- instance_idstr
Unique name of the API instance.
Notes
Usage:
Subclass this base class and set the class attribute
API_IDto a unique API-identifying string.Subclasses to a base API, which should use the same exchange as the parent API can be distinguished by setting the class attribute
API_SUB_ID.Set an appropriate
API_VERSION.The methods that should be accessible via the RPC API should be decorated with
api_method().Using
AmqpRpcFactory, these API classes can be turned into an RPC server or client class.
Methods
get_api()Gets a list of all API method names.
get_api_signature(func_id)Gets the call signature of the API method.
Gets the API specifications.
help(func_id)Gets the docstring of the API method.
ping()Returns True.
ping_args(*args, **kwargs)Returns any passed parameters.
register_cb_on_entry(method_name, func)Adds a callback function called before executing the API method.
register_cb_on_exit(method_name, func)Adds a callback function called after executing the API method.
get_exchange_id
get_queue_id
get_routing_key
- API_ID = 'TEST'
- API_METHODS = {'get_api', 'get_api_signature', 'get_api_specifications', 'help', 'ping', 'ping_args'}
- API_REPLIES = {'get_api', 'get_api_signature', 'get_api_specifications', 'help', 'ping', 'ping_args'}
- API_SUB_ID = 'default'
- API_TIMEOUTS = {'get_api': 1, 'get_api_signature': 1, 'get_api_specifications': 1, 'help': 1, 'ping': 1, 'ping_args': 1}
- API_VERSION = '0.0.0'
- LOGGER = <Logger libics.tools.database.amqp.AmqpApiBase (WARNING)>
- get_api() List[str]
Gets a list of all API method names.
- get_api_signature(func_id: str) dict
Gets the call signature of the API method.
- Returns
- retdict
Dictionary describing the signature with the following items:
- func_idstr
API method name.
- arg_nameslist(str)
Names of positional arguments in order.
- kwarg_nameslist(str)
Names of keyword arguments.
- returnbool
Whether a value is returned. If True, the argument name of the reply in types is “return”.
- arg_varbool
Whether variable positional arguments are valid.
- kwarg_varbool
Whether variable keyword arguments are valid.
- typesdict(str->list(str))
Dictionary mapping arg/kwarg/return name to a list of types. Valid types: “null”, “bool”, “int”, “float”, “str”, “json”. Type “json” means that the parameter has no type hint or has a more complicated structure.
- get_api_specifications() dict
Gets the API specifications.
- Returns
- specsdict(str->Any)
Dictionary containing the class variables “API_METHODS”, “API_REPLIES”, “API_TIMEOUTS”.
- classmethod get_exchange_id()
- get_queue_id()
- get_routing_key()
- help(func_id: str) str
Gets the docstring of the API method.
- ping() bool
Returns True.
- ping_args(*args, **kwargs) Any
Returns any passed parameters.
- register_cb_on_entry(method_name, func)
Adds a callback function called before executing the API method.
- Parameters
- method_namestr
API method name.
- funccallable
Callback function. Must accept the same arguments as the API method and may only raise AmqpLocalError.
- register_cb_on_exit(method_name, func)
Adds a callback function called after executing the API method.
- Parameters
- method_namestr
API method name.
- funccallable
Callback function. Must accept the return value of the API method as arguments and may only raise AmqpLocalError.
- class libics.tools.database.amqp.AmqpConnection(config=None, **kwargs)
Bases:
objectRPC base class via AMQP message queues.
- Parameters
- configstr or dict
Configuration file path or dictionary specifying below parameters. Duplicate parameters overwrite the config parameters.
- hoststr
Message broker server IP.
- portint
Message broker server port.
- credentialsdict(str->str)
Message broker access credentials with items: “username”, “password”.
- blockingbool
Whether to establish a blocking connection.
- Attributes
- credentials
Methods
close()Closes the AMQP connection.
connect([callback])Sets up an AMQP connection and channel.
discover_configs([config_path])Returns a list of all file names within the default configuration path.
Returns whether the AMQP connection is established.
Returns whether the AMQP I/O loop is running.
join()Waits for the I/O loop to stop.
run()Starts the AMQP I/O loop.
stop()Stops the AMQP I/O loop.
find_configs
get_url
handle_error
- CONFIG_PATH = 'C:\\Users\\David\\.libics\\tools\\database\\amqp'
- LOGGER = <Logger libics.tools.database.amqp.AmqpBase (WARNING)>
- close()
Closes the AMQP connection.
- connect(callback=None)
Sets up an AMQP connection and channel.
- property credentials
- static discover_configs(config_path=None)
Returns a list of all file names within the default configuration path.
- static find_configs(config_path=None)
- get_url(connect_status=True)
- handle_error(*_, err_raise=40, err_msg='')
- is_connected()
Returns whether the AMQP connection is established.
- is_running()
Returns whether the AMQP I/O loop is running.
- join()
Waits for the I/O loop to stop.
- run()
Starts the AMQP I/O loop.
- exception libics.tools.database.amqp.AmqpLocalError
Bases:
RuntimeError
- exception libics.tools.database.amqp.AmqpRemoteError
Bases:
RuntimeError
- exception libics.tools.database.amqp.AmqpReplyTimeoutError
Bases:
RuntimeError
- class libics.tools.database.amqp.AmqpRpcBase
Bases:
objectAMQP RPC base object.
Notes
AMQP specifications:
Messages (RPC calls) are sent to an AMQP topic exchange with the exchange name matching API_ID, by convention this should be all caps.
The AMQP routing keys have the form API_ID.API_SUB_ID.instance_id.*.
The AMQP queue instances have an additional random string attached, i.e., API_ID.SUBAPI_ID.instance_id.random_id. The random string is used to allow for multiple code instances to subscribe to the same API instance (e.g. one code instance executes the commands, another code instance logs the API traffic).
Message specifications:
{ "__meta__": { "__rpc_version": "x.y.z", "__api_version": "x.y.z" }, "func_id": "my_unique_function_identifier", "func_args": [ arg0, arg1, ... ], "func_kwargs": { "kw0": kwarg0, "kw1": kwarg1, ... } }
- Attributes
- API
Methods
Closes an AMQP connection.
Establishes an AMQP connection.
local_dispatcher(channel, method, ...)Local dispatcher used by RPC server.
remote_dispatcher(func_id, *args, **kwargs)Remote dispatcher used by RPC client.
setup_amqp([amqp_connection, blocking])Sets up the AMQP connection object.
setup_api(*args[, api_object])Sets up the API object.
deserialize_reply
deserialize_request
serialize_reply
serialize_request
- API = None
- RPC_VERSION = '0.0.0'
- close_amqp()
Closes an AMQP connection.
- connect_amqp()
Establishes an AMQP connection.
- classmethod deserialize_reply(_msg)
- classmethod deserialize_request(_msg)
- local_dispatcher(channel, method, properties, body)
Local dispatcher used by RPC server.
- remote_dispatcher(func_id, *args, **kwargs)
Remote dispatcher used by RPC client.
- classmethod serialize_reply(*args, err=False)
- classmethod serialize_request(func_id, *args, **kwargs)
- setup_amqp(amqp_connection=None, blocking=True, **kwargs)
Sets up the AMQP connection object.
If passing an amqp_connection, a separate connection with its parameters is created and is overwritten by kwargs.
- setup_api(*args, api_object=None, **kwargs)
Sets up the API object.
- class libics.tools.database.amqp.AmqpRpcFactory
Bases:
objectFactory class that turns API classes into RPC servers/clients.
Methods
make_rpc_server(Api, cls_name)Creates an AMQP RPC server class.
make_rpc_client(Api, cls_name)Creates an AMQP RPC client class.
- classmethod make_dynamic_rpc_client(api_id, api_sub_id='default', instance_id='default', cls_name=None)
Creates an AMQP RPC client class.
API method calls are sent via an AMQP broker to a remote server.
- Parameters
- api_id, api_sub_id, instance_idstr
API ID, API sub ID and instance ID determining the AMQP exchange from which to obtain the API specifications.
- classmethod make_rpc_client(Api: libics.tools.database.amqp.AmqpApiBase, cls_name: str)
Creates an AMQP RPC client class.
API method calls are sent via an AMQP broker to a remote server.
- classmethod make_rpc_server(Api: libics.tools.database.amqp.AmqpApiBase, cls_name: str)
Creates an AMQP RPC server class.
Accepts API method calls via an AMQP broker from a remote client.
- libics.tools.database.amqp.api_method(reply=False, timeout=1)
Decorator registering a method as API method.
- Parameters
- retbool
Whether to expect a reply value.
- timeoutfloat
API reply timeout in seconds (s).
influx
- class libics.tools.database.influx.InfluxDB(config=None, **kwargs)
Bases:
objectInfluxDB v2 client.
- Parameters
- configstr or dict
Configuration file path or dictionary specifying below parameters. Duplicate parameters overwrite the config parameters.
- hoststr
Database server IP.
- portint
Database server port.
- orgstr
InfluxDB organisation.
- tokenstr
InfluxDB token. Needs read/write access.
- timeoutstr
Query timeout in seconds (s).
- asynchrbool
Whether to communicate synchronous or asynchronous.
- default_bucket, default_measurementstr
Default bucket/measurement name.
- Attributes
- bucketslist(str)
List of bucket names in the database.
- measurementsdict(str->list(str))
List of measurement values for each bucket.
- tagsdict(str->list(str))
List of tag keys for each bucket. Excludes default keys like _field, _value, etc.
- fieldsdict(str->list(str))
List of field keys for each bucket.
Methods
create_bucket(bucket)Creates a bucket if it does not exist.
find_configs([config_path])Returns a list of all file names within the default configuration path.
read_last_points(*args, **kwargs)Queries the database for the last points.
read_measurement_values([bucket, measurement])Queries the database and gets all measurement values.
read_points([bucket, start, stop, group, ...])Queries the database.
read_tag_values([bucket, tag, measurement])Queries the database and gets all tag values.
Queries the database to update the buckets property.
update_fields([bucket])Queries the database to update the fields property.
update_measurements([bucket])Queries the database to update the measurements property.
update_tags([bucket])Queries the database to update the tags property.
write_point([bucket, measurement, tags, ...])Writes a point to the database.
write_points([bucket, data])Writes multiple points to the database.
- CONFIG_PATH = 'C:\\Users\\David\\.libics\\tools\\database\\influx'
- LOGGER = <Logger libics.tools.database.influx.InfluxDB (WARNING)>
- property buckets
- create_bucket(bucket)
Creates a bucket if it does not exist.
- Parameters
- bucketstr
Name of new bucket.
- property default_bucket
- property default_measurement
- property fields
- static find_configs(config_path=None)
Returns a list of all file names within the default configuration path.
- property measurements
- read_last_points(*args, **kwargs)
Queries the database for the last points.
Wrapper for
read_points().Queries the database.
- Parameters
- bucketstr
Bucket name.
- start, stopstr or datetime.datetime or pd.Timestamp
Extracted time range.
- grouplist(str)
Groups by the given tags.
- windowstr
Time window per extracted point. E.g.: “1m”, “2h”, …
- functionstr
Function applied to aggregated points. E.g.: “mean”, “median”, “last”, ….
- rmv_nanbool
Whether to remove windows containing no data.
- funcslist(str)
List of functions applied to query tables.
- measurementlist(str)
Filter for measurement values.
- fieldlist(str)
Filter for field keys. Note that all extracted keys must have the same data type.
- **tagsstr
Filter for tag keys and values.
- Returns
- dsDataSequence
Data sequence with the following columns: “time”, “measurement”, “field”, “value”, <tags>.
- read_measurement_values(bucket=None, measurement=None)
Queries the database and gets all measurement values.
- read_points(bucket=None, start='-1d', stop=None, group=None, window=None, function=None, rmv_nan=True, funcs=None, measurement=None, field=None, _check_params=True, **tags)
Queries the database.
- Parameters
- bucketstr
Bucket name.
- start, stopstr or datetime.datetime or pd.Timestamp
Extracted time range.
- grouplist(str)
Groups by the given tags.
- windowstr
Time window per extracted point. E.g.: “1m”, “2h”, …
- functionstr
Function applied to aggregated points. E.g.: “mean”, “median”, “last”, ….
- rmv_nanbool
Whether to remove windows containing no data.
- funcslist(str)
List of functions applied to query tables.
- measurementlist(str)
Filter for measurement values.
- fieldlist(str)
Filter for field keys. Note that all extracted keys must have the same data type.
- **tagsstr
Filter for tag keys and values.
- Returns
- dsDataSequence
Data sequence with the following columns: “time”, “measurement”, “field”, “value”, <tags>.
- read_tag_values(bucket=None, tag=None, measurement=True)
Queries the database and gets all tag values.
- Parameters
- bucketstr
Bucket name.
- tagstr
Tag key from which to obtain values.
- measurementstr or bool or None
If str, gets only tag values with the given measurement value. If True, gets all tag values. If None, uses
default_measurementin str mode.
- Returns
- tag_valueslist(str)
List of tag values.
- property tags
- update_buckets()
Queries the database to update the buckets property.
- update_fields(bucket=None)
Queries the database to update the fields property.
- update_measurements(bucket=None)
Queries the database to update the measurements property.
- update_tags(bucket=None)
Queries the database to update the tags property.
- property url
- write_point(bucket=None, measurement=None, tags=None, fields=None, time=None)
Writes a point to the database.
- Parameters
- bucketstr
Bucket name.
- measurementstr
Measurement value.
- tagsdict(str->str) or None
Dictionary containing tag keys and values.
- fieldsdict(str->Any) or None
Dictionary containing field keys and values.
- timeint or None
Timestamp in nanoseconds (ns). If None, uses the current time.
- write_points(bucket=None, data=None)
Writes multiple points to the database.
- Parameters
- bucketstr
Bucket name.
- datalist(dict)
List of dictionaries containing the keyword arguments for
write_point().