Skip to content

feat: add Explicit bucket schemas API #528

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.35.0 [unreleased]

### Features
1. [#528](https://github.com/influxdata/influxdb-client-python/pull/528): Add `BucketSchemasService` to manage explicit bucket schemas to enforce column names, tags, fields, and data types for your data

### Bug Fixes
1. [#526](https://github.com/influxdata/influxdb-client-python/pull/526): Creating client instance from static configuration

Expand Down
11 changes: 8 additions & 3 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,23 @@
- [query_response_to_json.py](query_response_to_json.py) - How to serialize Query response to JSON
- [query_with_profilers.py](query_with_profilers.py) - How to process profilers output by callback


## Management API
- [buckets_management.py](buckets_management.py) - How to create, list and delete Buckets
- [monitoring_and_alerting.py](monitoring_and_alerting.py) - How to create the Check with Slack notification.
- [task_example.py](task_example.py) - How to create a Task by API
- [templates_management.py](templates_management.py) - How to use Templates and Stack API

## Others
## InfluxDB Cloud

:warning: The following examples are related to [InfluxDB Cloud](https://docs.influxdata.com/influxdb/cloud/) and not available on a local InfluxDB OSS instance.

- [influx_cloud.py](influx_cloud.py) - How to connect to InfluxDB 2 Cloud
- [invokable_scripts.py](invokable_scripts.py) - How to use Invokable scripts Cloud API to create custom endpoints that query data
- [bucket_schemas.py](bucket_schemas.py) - How to manage explicit bucket schemas to enforce column names, tags, fields, and data types for your data

## Others
- [influxdb_18_example.py](influxdb_18_example.py) - How to connect to InfluxDB 1.8
- [nanosecond_precision.py](nanosecond_precision.py) - How to use nanoseconds precision
- [invokable_scripts.py](invokable_scripts.py) - How to use Invokable scripts Cloud API to create custom endpoints that query data
- [connection_check.py](connection_check.py) - How to check connection configuration

## Asynchronous
Expand Down
95 changes: 95 additions & 0 deletions examples/bucket_schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
This example is related to `InfluxDB Cloud <https://docs.influxdata.com/influxdb/cloud/>`_ and not available
on a local InfluxDB OSS instance.

How to manage explicit bucket schemas to enforce column names, tags, fields, and data types for your data.
"""
import datetime

from influxdb_client import InfluxDBClient, BucketSchemasService, PostBucketRequest, SchemaType, \
MeasurementSchemaCreateRequest, MeasurementSchemaColumn, ColumnSemanticType, ColumnDataType, \
MeasurementSchemaUpdateRequest

"""
Define credentials
"""
influx_cloud_url = 'https://us-west-2-1.aws.cloud2.influxdata.com'
influx_cloud_token = '...'
org_name = '...'

with InfluxDBClient(url=influx_cloud_url, token=influx_cloud_token, org=org_name, debug=False) as client:
uniqueId = str(datetime.datetime.now())
org_id = client.organizations_api().find_organizations(org=org_name)[0].id
bucket_schemas_api = BucketSchemasService(api_client=client.api_client)

"""
Create a bucket with the schema_type flag set to explicit
"""
print("------- Create Bucket -------\n")
created_bucket = client \
.buckets_api() \
.create_bucket(bucket=PostBucketRequest(name=f"my_schema_bucket_{uniqueId}",
org_id=org_id,
retention_rules=[],
schema_type=SchemaType.EXPLICIT))
print(created_bucket)

"""
Sets the schema for a measurement: Usage CPU

[
{"name": "time", "type": "timestamp"},
{"name": "service", "type": "tag"},
{"name": "host", "type": "tag"},
{"name": "usage_user", "type": "field", "dataType": "float"},
{"name": "usage_system", "type": "field", "dataType": "float"}
]
"""
print("------- Create Schema -------\n")
columns = [
MeasurementSchemaColumn(name="time",
type=ColumnSemanticType.TIMESTAMP),
MeasurementSchemaColumn(name="service",
type=ColumnSemanticType.TAG),
MeasurementSchemaColumn(name="host",
type=ColumnSemanticType.TAG),
MeasurementSchemaColumn(name="usage_user",
type=ColumnSemanticType.FIELD,
data_type=ColumnDataType.FLOAT),
MeasurementSchemaColumn(name="usage_system",
type=ColumnSemanticType.FIELD,
data_type=ColumnDataType.FLOAT)
]
create_request = MeasurementSchemaCreateRequest(name="usage_cpu", columns=columns)
created_schema = bucket_schemas_api.create_measurement_schema(bucket_id=created_bucket.id,
org_id=org_id,
measurement_schema_create_request=create_request)
print(created_bucket)

"""
Lists the Schemas
"""
print("\n------- Lists the Schemas -------\n")
measurement_schemas = bucket_schemas_api.get_measurement_schemas(bucket_id=created_bucket.id).measurement_schemas
print("\n".join([f"---\n ID: {ms.id}\n Name: {ms.name}\n Columns: {ms.columns}" for ms in measurement_schemas]))
print("---")

"""
Update a bucket schema
"""
print("------- Update a bucket schema -------\n")
columns.append(MeasurementSchemaColumn(name="usage_total",
type=ColumnSemanticType.FIELD,
data_type=ColumnDataType.FLOAT))
update_request = MeasurementSchemaUpdateRequest(columns=columns)
updated_schema = bucket_schemas_api.update_measurement_schema(bucket_id=created_bucket.id,
measurement_id=created_schema.id,
measurement_schema_update_request=update_request)
print(updated_schema)

"""
Delete previously created bucket
"""
print("------- Delete Bucket -------\n")
client.buckets_api().delete_bucket(created_bucket)
print(f" successfully deleted bucket: {created_bucket.name}")
3 changes: 3 additions & 0 deletions examples/invokable_scripts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
"""
This example is related to `InfluxDB Cloud <https://docs.influxdata.com/influxdb/cloud/>`_ and not available
on a local InfluxDB OSS instance.

How to use Invokable scripts Cloud API to create custom endpoints that query data
"""
import datetime
Expand Down
8 changes: 8 additions & 0 deletions influxdb_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# import apis into sdk package
from influxdb_client.service.authorizations_service import AuthorizationsService
from influxdb_client.service.backup_service import BackupService
from influxdb_client.service.bucket_schemas_service import BucketSchemasService
from influxdb_client.service.buckets_service import BucketsService
from influxdb_client.service.cells_service import CellsService
from influxdb_client.service.checks_service import ChecksService
Expand Down Expand Up @@ -100,6 +101,8 @@
from influxdb_client.domain.check_status_level import CheckStatusLevel
from influxdb_client.domain.check_view_properties import CheckViewProperties
from influxdb_client.domain.checks import Checks
from influxdb_client.domain.column_data_type import ColumnDataType
from influxdb_client.domain.column_semantic_type import ColumnSemanticType
from influxdb_client.domain.conditional_expression import ConditionalExpression
from influxdb_client.domain.config import Config
from influxdb_client.domain.constant_variable_properties import ConstantVariableProperties
Expand Down Expand Up @@ -167,6 +170,11 @@
from influxdb_client.domain.logs import Logs
from influxdb_client.domain.map_variable_properties import MapVariableProperties
from influxdb_client.domain.markdown_view_properties import MarkdownViewProperties
from influxdb_client.domain.measurement_schema import MeasurementSchema
from influxdb_client.domain.measurement_schema_column import MeasurementSchemaColumn
from influxdb_client.domain.measurement_schema_create_request import MeasurementSchemaCreateRequest
from influxdb_client.domain.measurement_schema_list import MeasurementSchemaList
from influxdb_client.domain.measurement_schema_update_request import MeasurementSchemaUpdateRequest
from influxdb_client.domain.member_assignment import MemberAssignment
from influxdb_client.domain.member_expression import MemberExpression
from influxdb_client.domain.metadata_backup import MetadataBackup
Expand Down
1 change: 1 addition & 0 deletions influxdb_client/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# import apis into api package
from influxdb_client.service.authorizations_service import AuthorizationsService
from influxdb_client.service.backup_service import BackupService
from influxdb_client.service.bucket_schemas_service import BucketSchemasService
from influxdb_client.service.buckets_service import BucketsService
from influxdb_client.service.cells_service import CellsService
from influxdb_client.service.checks_service import ChecksService
Expand Down
7 changes: 0 additions & 7 deletions influxdb_client/client/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,6 @@ def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, or
self.profilers = kwargs.get('profilers', None)
pass

def _version(self, response) -> str:
if response is not None and len(response) >= 3:
if 'X-Influxdb-Version' in response[2]:
return response[2]['X-Influxdb-Version']

return "unknown"

@classmethod
def _from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gzip=False, **kwargs):
config = configparser.ConfigParser()
Expand Down
2 changes: 1 addition & 1 deletion influxdb_client/client/bucket_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def create_bucket(self, bucket=None, bucket_name=None, org_id=None, retention_ru
description=None, org=None) -> Bucket:
"""Create a bucket.

:param Bucket bucket: bucket to create
:param Bucket|PostBucketRequest bucket: bucket to create
:param bucket_name: bucket name
:param description: bucket description
:param org_id: org_id
Expand Down
12 changes: 11 additions & 1 deletion influxdb_client/client/influxdb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,17 @@ def version(self) -> str:

response = ping_service.get_ping_with_http_info(_return_http_data_only=False)

return self._version(response)
return ping_service.response_header(response)

def build(self) -> str:
"""
Return the build type of the connected InfluxDB Server.

:return: The type of InfluxDB build.
"""
ping_service = PingService(self.api_client)

return ping_service.build_type()

def ready(self) -> Ready:
"""
Expand Down
12 changes: 11 additions & 1 deletion influxdb_client/client/influxdb_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,17 @@ async def version(self) -> str:
ping_service = PingService(self.api_client)

response = await ping_service.get_ping_async(_return_http_data_only=False)
return self._version(response)
return ping_service.response_header(response)

async def build(self) -> str:
"""
Return the build type of the connected InfluxDB Server.

:return: The type of InfluxDB build.
"""
ping_service = PingService(self.api_client)

return await ping_service.build_type_async()

def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApiAsync:
"""
Expand Down
21 changes: 21 additions & 0 deletions influxdb_client/client/warnings.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,24 @@ def print_warning(query: str):
- https://docs.influxdata.com/flux/latest/stdlib/influxdata/influxdb/schema/fieldsascols/
"""
warnings.warn(message, MissingPivotFunction)


class CloudOnlyWarning(UserWarning):
"""User warning about availability only on the InfluxDB Cloud."""

@staticmethod
def print_warning(api_name: str, doc_url: str):
"""Print warning about availability only on the InfluxDB Cloud."""
message = f"""The '{api_name}' is available only on the InfluxDB Cloud.

For more info see:
- {doc_url}
- https://docs.influxdata.com/influxdb/cloud/

You can disable this warning by:
import warnings
from influxdb_client.client.warnings import CloudOnlyWarning

warnings.simplefilter("ignore", CloudOnlyWarning)
"""
warnings.warn(message, CloudOnlyWarning)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means users who are using this with cloud suddenly will get additional text for every usage correct? That seems less than ideal to make a user who is using this correctly to need to ignore a warning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I will add a checking to the type of instance before warning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of warning is changed. The warnings are displayed on individual actions and only if the InfluxD instance is not Cloud. For more info see:

1 change: 1 addition & 0 deletions influxdb_client/client/write/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# import apis into api package
from influxdb_client.service.authorizations_service import AuthorizationsService
from influxdb_client.service.backup_service import BackupService
from influxdb_client.service.bucket_schemas_service import BucketSchemasService
from influxdb_client.service.buckets_service import BucketsService
from influxdb_client.service.cells_service import CellsService
from influxdb_client.service.checks_service import ChecksService
Expand Down
7 changes: 7 additions & 0 deletions influxdb_client/domain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
from influxdb_client.domain.check_status_level import CheckStatusLevel
from influxdb_client.domain.check_view_properties import CheckViewProperties
from influxdb_client.domain.checks import Checks
from influxdb_client.domain.column_data_type import ColumnDataType
from influxdb_client.domain.column_semantic_type import ColumnSemanticType
from influxdb_client.domain.conditional_expression import ConditionalExpression
from influxdb_client.domain.config import Config
from influxdb_client.domain.constant_variable_properties import ConstantVariableProperties
Expand Down Expand Up @@ -123,6 +125,11 @@
from influxdb_client.domain.logs import Logs
from influxdb_client.domain.map_variable_properties import MapVariableProperties
from influxdb_client.domain.markdown_view_properties import MarkdownViewProperties
from influxdb_client.domain.measurement_schema import MeasurementSchema
from influxdb_client.domain.measurement_schema_column import MeasurementSchemaColumn
from influxdb_client.domain.measurement_schema_create_request import MeasurementSchemaCreateRequest
from influxdb_client.domain.measurement_schema_list import MeasurementSchemaList
from influxdb_client.domain.measurement_schema_update_request import MeasurementSchemaUpdateRequest
from influxdb_client.domain.member_assignment import MemberAssignment
from influxdb_client.domain.member_expression import MemberExpression
from influxdb_client.domain.metadata_backup import MetadataBackup
Expand Down
91 changes: 91 additions & 0 deletions influxdb_client/domain/column_data_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# coding: utf-8

"""
InfluxDB OSS API Service.

The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint. # noqa: E501

OpenAPI spec version: 2.0.0
Generated by: https://openapi-generator.tech
"""


import pprint
import re # noqa: F401


class ColumnDataType(object):
"""NOTE: This class is auto generated by OpenAPI Generator.

Ref: https://openapi-generator.tech

Do not edit the class manually.
"""

"""
allowed enum values
"""
INTEGER = "integer"
FLOAT = "float"
BOOLEAN = "boolean"
STRING = "string"
UNSIGNED = "unsigned"

"""
Attributes:
openapi_types (dict): The key is attribute name
and the value is attribute type.
attribute_map (dict): The key is attribute name
and the value is json key in definition.
"""
openapi_types = {
}

attribute_map = {
}

def __init__(self): # noqa: E501,D401,D403
"""ColumnDataType - a model defined in OpenAPI.""" # noqa: E501 self.discriminator = None

def to_dict(self):
"""Return the model properties as a dict."""
result = {}

for attr, _ in self.openapi_types.items():
value = getattr(self, attr)
if isinstance(value, list):
result[attr] = list(map(
lambda x: x.to_dict() if hasattr(x, "to_dict") else x,
value
))
elif hasattr(value, "to_dict"):
result[attr] = value.to_dict()
elif isinstance(value, dict):
result[attr] = dict(map(
lambda item: (item[0], item[1].to_dict())
if hasattr(item[1], "to_dict") else item,
value.items()
))
else:
result[attr] = value

return result

def to_str(self):
"""Return the string representation of the model."""
return pprint.pformat(self.to_dict())

def __repr__(self):
"""For `print` and `pprint`."""
return self.to_str()

def __eq__(self, other):
"""Return true if both objects are equal."""
if not isinstance(other, ColumnDataType):
return False

return self.__dict__ == other.__dict__

def __ne__(self, other):
"""Return true if both objects are not equal."""
return not self == other
Loading