Data Product Manager

1. Data Product Definition

The aerOS project proposes a definition for data product as the combination of the following metadata and artifacts:

1.1 Batch Data Products:

1.1.1 Batch Data Products – Data Files

Generated from periodic collections of raw data stored in local or remote files. When defining a Batch Data Product with files as a data source, these are the details that must be provided:

  • Name (Optional): Name of the Data Product.

  • Description (Optional): Descriptive text of the Data Product.

  • Tags (Optional): List of tags that identify and categorize the Data Product.

  • Data Source Type: String that identifies the type of the Data Product within the Data Product Manager. For these Batch Data Products it must always be BATCH_FILE.

  • Freshness (Optional): The freshness of data sources is a crucial factor and is only applicable to Batch-type data sources. It determines how frequently the aerOS Data Fabric collects raw data from the target data source. This property is optional and can be tailored based on the user’s specific requirements.

  • Path to Data File: The location of the data file is specified using the file_path property. Users must specify this property to indicate where the raw data is stored, facilitating seamless data processing.

1.1.2 Batch Data Products – Relational Databases

Generated from periodic collections of raw data stored in relational databases. When creating a Batch Data Product with relational databases as a data source, these are the details that must be provided:

  • Name (Optional): Name of the Data Product.

  • Description (Optional): Descriptive text of the Data Product.

  • Tags (Optional): List of tags that identify and categorize the Data Product.

  • Data Source Type: String that identifies the type of the Data Product within the Data Product Manager. For these Batch Data Products it must always be BATCH_RELATIONAL_DATABASE.

  • Freshness (Optional): The freshness of data sources remains a critical aspect and is specifically relevant to Batch-type data sources. It establishes the frequency at which the aerOS Data Fabric retrieves raw data from the designated relational database. Users have the flexibility to define the freshness, determining the frequency of data collection. This property is optional, allowing users to align data retrieval with their specific operational needs.

  • Database URL: The database URL is specified using the db_url property. This parameter indicates the location and configuration details of the relational database from which raw data is collected.

1.2 Streaming Data Products

Streaming Data Products are generated in real-time as data flows continuously from streaming sources such as Kafka or MQTT. When configuring a Streaming Data Product, these are the details that must be provided:

1.2.1 Streaming Data Products – Kafka Source

  • Name (Optional): Name of the Data Product.

  • Description (Optional): Descriptive text of the Data Product.

  • Tags (Optional): List of tags that identify and categorize the Data Product.

  • Input format: Specifies the format for the input data. Valid values are XML, JSON or CSV.

  • Input Topic: The topic property is utilized to specify the streaming broker topic. This parameter defines the channel or subject from which real-time data is sourced.

  • Data Source Type: String that identifies the type of the Data Product within the Data Product Manager. For these Streaming Data Products it must always be STREAMING_KAFKA.

  • Host: IP address or FQDN where the Kafka broker is reachable.

  • Port: Port number where the Kafka broker is reachable.

1.2.2 Streaming Data Products – MQTT Source

  • Name (Optional): Name of the Data Product.

  • Description (Optional): Descriptive text of the Data Product.

  • Tags (Optional): List of tags that identify and categorize the Data Product.

  • Input format: Specifies the format for the input data. Valid values are XML, JSON or CSV.

  • Input Topic: The topic property is utilized to specify the streaming broker topic. This parameter defines the channel or subject from which real-time data is sourced.

  • Data Source Type: String that identifies the type of the Data Product within the Data Product Manager. For these Streaming Data Products it must always be STREAMING_MQTT.

  • Host: IP address or FQDN where the MQTT broker is reachable.

  • Port: Port number where the MQTT broker is reachable.

  • Protocol: Protocol that must be used for the communication with the MQTT broker. Expected values are tcp or udp.

2. Data Product Manager

The Data Product Manager plays a pivotal role in the aerOS Data Fabric, serving as the orchestrator for seamless data product onboarding. This component efficiently manages the integration of new data products, ensuring a coherent and standardized process.

For batch data sources/products, upon successful submission of metadata and artifacts, the Data Product Manager then proceeds to generate two ConfigMaps within the Kubernetes environment. The first ConfigMap encapsulates a config.ini file, created by the application, and the second captures the RML/YARRRML mappings file. These ConfigMaps serve as vital components, enabling seamless integration and transformation of data within the Data Fabric, with Morph-KGC leveraging both the config.ini and RML ConfigMaps in subsequent phases. For streaming data sources, the Data Product Manager sends HTTP POST requests to the Semantic Annotator to create the corresponding channels.

When creating batch data products, the Data Product Manager will orchestrate the deployment of Morph-KGC in a Kubernetes cluster using Helm Controller.

The Data Product Manager efficiently manages the integration process and orchestrates the deployment of the data product pipeline. This deployment, inclusive of coordinating computing resources like Kubernetes, is seamlessly achieved using Helm Charts and HTTP requests.

2.1 Installation and deployment

2.1.1 Building the Docker image

docker build -t data-product-manager .

2.1.2 Running with Helm

helm install data-product-manager ./helm

Check the assigned port with:

kubectl get services

The REST API (Swagger UI) will be available at: http://localhost:<port>/docs

Replace <port> with the actual port number assigned to the Data Product Manager. This also applies to any other ocurrence of <port> described in this documentation.

2.1.3 Installing FluxCD Helm Controller

kubectl apply -f https://github.com/fluxcd/flux2/releases/latest/download/install.yaml

2.1.4 Roles and Role Binding Configuration

These roles are necessary for performing specific operations on cluster resources, such as creating ConfigMaps, HelmRepositories, and HelmReleases.

Apply the ClusterRole by running the following command:

kubectl apply -f k8s/clusterRole.yaml

2.1.5 Environmental variables

Variable

Description

Default value (String)

KUBERNETES_NAMESPACE

Namespace where Morph-KGC Helm Releases will be deployed.

default

HELM_REPO_NAME

Name of the Helm Repository that stores Morph-KGC releases.

aeros-common

HELM_REPO_URL

URL where the Helm Repository is reachable.

Not included here (see aeros-common-deployments GitLab repository)

HELM_REPO_AUTH_SECRET

Name of the secret that will store authentication credentials for accessing the Helm Repository.

Not included here (see aeros-common-deployments GitLab repository)

HELM_REPO_AUTH_USERNAME

Username for accessing the Helm Repository.

Not included here (see aeros-common-deployments GitLab repository)

HELM_REPO_AUTH_TOKEN

Token for accessing the Helm Repository.

Not included here (see aeros-common-deployments GitLab repository)

KAFKA_BROKER

Endpoint <IP_or_FQDN:port_number> where the output Kafka broker is reachable.

kafka.default.svc.cluster.local:9092

KAFKA_TOPIC

Name of the topic where RDF triples will be written.

knowledge-graphs

KAFKA_GROUP_ID

Group ID for the output Kafka broker.

Commented, not used

MORPH_RELEASE_NAME

Name of the Morph-KGC Helm Release.

morph-kgc

MORPH_CHART_NAME

Name of the Morph-KGC Helm Chart.

morph-kgc

MORPH_CHART_VERSION

Version of the Morph-KGC Helm Chart.

1.2.0

SEMANTIC_ANNOTATOR_URI

HTTP(S) URI where the Semantic Annotator is reachable.

http://semantic-annotator-streamer.default.svc.cluster.local:8080/

SEMANTIC_ANNOTATOR_ERROR_TOPIC_ENABLED

Whether or not the error topic for the Semantic Annotator is used (for logging and debugging purposes).

\"False\" (notice the escape characters)

SEMANTIC_ANNOTATOR_INPUT_MONITOR_TOPIC_ENABLED

Whether or not the input monitor topic for the Semantic Annotator is used (for logging and debugging purposes).

\"False\" (notice the escape characters)

SEMANTIC_ANNOTATOR_OUTPUT_MONITOR_TOPIC_ENABLED

Whether or not the output monitor topic for the Semantic Annotator is used (for logging and debugging purposes).

\"False\" (notice the escape characters)

SEMANTIC_ANNOTATOR_ERROR_TOPIC

Name of the topic where Semantic Annotator error messages will be written (if enabled).

seaman-error

SEMANTIC_ANNOTATOR_INPUT_MONITOR_TOPIC

Name of the topic where Semantic Annotator input monitor messages will be written (if enabled).

seaman-input-monitor

SEMANTIC_ANNOTATOR_OUTPUT_MONITOR_TOPIC

Name of the topic where Semantic Annotator output monitor messages will be written (if enabled).

seaman-output-monitor

SEMANTIC_ANNOTATOR_OUTPUT_FORMAT

Output format for the RDF triples generated by the Semantic Annotator. Multiple valid values are available (see Semantic Annotator documentation). NQUADS is recommended for homogeneization.

NQUADS

MONGO_DB_URI

MongoDB URI <mongodb://IP_or_FQDN:port_number/> where the MongoDB server is reachable.

mongodb://mongo-db.default.svc.cluster.local:27017/

3. Interacting with the Data Product Manager

3.1 Onboarding Data Products

The onboarding process can be done either using the Swagger UI or by sending HTTP POST requests.

3.1.1 Batch Data Products - Data Files

Define a JSON dictionary that contains the following details:

{
  "details": {
    "name": "Data Product Name",
    "description": "Data Product Description",
    "tags": [
      "Tag 1", "Tag 2", "Tag 3"
    ],
    "freshness": "Freshness (in crontab/cronjob format)",
    "data_source_type": "BATCH_FILE",
    "file_path": "URI of the data file"
  }
}

When using Swagger UI, paste that JSON in the data_source block and attach the mappings file using the dialog. Once done, click on Execute to onboard the Data Product.

When sending an HTTP POST request, use the following command as template:

curl -X 'POST' 'http://localhost:<port>/dataProduct' \
  -H 'accept: application/json' \
  -H 'Content-Type: multipart/form-data' \
  -F 'data_source={
        "details": {
          "name": "Data Product Name",
          "description": "Data Product Description",
          "tags": [
            "Tag 1", "Tag 2", "Tag 3"
          ],
          "freshness": "Freshness (in crontab/cronjob format)",
          "data_source_type": "BATCH_FILE",
          "file_path": "URI of the data file"
        }
      }' \
  -F 'mappings_file=@path_to_mappings_file'

Once the onboarding has completed, a JSON object with details about the Data Product will be returned in response.

3.1.2 Batch Data Products - Relational Databases

Define a JSON dictionary that contains the following details:

{
  "details": {
    "name": "Data Product Name",
    "description": "Data Product Description",
    "tags": [
      "Tag 1", "Tag 2", "Tag 3"
    ],
    "freshness": "Freshness (in crontab/cronjob format)",
    "data_source_type": "BATCH_RELATIONAL_DATABASE",
    "db_url": "URL of the database"
  }
}

When using Swagger UI, paste that JSON in the data_source block and attach the mappings file using the dialog. Once done, click on Execute to onboard the Data Product.

When sending an HTTP POST request, use the following command as template:

curl -X 'POST' 'http://localhost:<port>/dataProduct' \
  -H 'accept: application/json' \
  -H 'Content-Type: multipart/form-data' \
  -F 'data_source={
        "details": {
          "name": "Data Product Name",
          "description": "Data Product Description",
          "tags": [
            "Tag 1", "Tag 2", "Tag 3"
          ],
          "freshness": "Freshness (in crontab/cronjob format)",
          "data_source_type": "BATCH_RELATIONAL_DATABASE",
          "db_url": "URL of the database"
        }
      }' \
  -F 'mappings_file=@path_to_mappings_file'

Once the onboarding has completed, a JSON object with details about the Data Product will be returned in response.

3.1.3 Streaming Data Products - Kafka Sources

Define a JSON dictionary that contains the following details:

{
  "details": {
    "name": "Data Product Name",
    "description": "Data Product Description",
    "tags": [
      "Tag 1", "Tag 2", "Tag 3"
    ],
    "input_format": "Expected valid values are XML, JSON or CSV",
    "input_topic": "input-topic",
    "data_source_type": "STREAMING_KAFKA",
    "host": "IP or FQDN where the Kafka broker is reachable",
    "port": Port number where the Kafka broker is reachable (integer, without double quotes)
  }
}

When using Swagger UI, paste that JSON in the data_source block and attach the mappings file using the dialog. Beware that Streaming Data Products require RML/CARML mapping files. Once done, click on Execute to onboard the Data Product.

When sending an HTTP POST request, use the following command as template:

curl -X 'POST' 'http://localhost:<port>/dataProduct' \
  -H 'accept: application/json' \
  -H 'Content-Type: multipart/form-data' \
  -F 'data_source={
        "details": {
          "name": "Data Product Name",
          "description": "Data Product Description",
          "tags": [
            "Tag 1", "Tag 2", "Tag 3"
          ],
          "input_format": "Expected valid values are XML, JSON or CSV",
          "input_topic": "input-topic",
          "data_source_type": "STREAMING_KAFKA",
          "host": "IP or FQDN where the Kafka broker is reachable",
          "port": Port number where the Kafka broker is reachable (integer, without double quotes)
        }
      }' \
  -F 'mappings_file=@path_to_mappings_file'

Once the onboarding has completed, a JSON object with details about the Data Product will be returned in response.

3.1.4 Streaming Data Products - MQTT Sources

Define a JSON dictionary that contains the following details:

{
  "details": {
    "name": "Data Product Name",
    "description": "Data Product Description",
    "tags": [
      "Tag 1", "Tag 2", "Tag 3"
    ],
    "input_format": "Expected valid values are XML, JSON or CSV",
    "input_topic": "input/topic",
    "data_source_type": "STREAMING_MQTT",
    "host": "IP or FQDN where the MQTT broker is reachable",
    "port": Port number where the MQTT broker is reachable (integer, without double quotes),
    "protocol": "Protocol that must be used for the communication with the MQTT broker. Expected values are tcp or udp"
  }
}

When using Swagger UI, paste that JSON in the data_source block and attach the mappings file using the dialog. Beware that Streaming Data Products require RML/CARML mapping files. Once done, click on Execute to onboard the Data Product.

When sending an HTTP POST request, use the following command as template:

curl -X 'POST' 'http://localhost:<port>/dataProduct' \
  -H 'accept: application/json' \
  -H 'Content-Type: multipart/form-data' \
  -F 'data_source={
        "details": {
          "name": "Data Product Name",
          "description": "Data Product Description",
          "tags": [
            "Tag 1", "Tag 2", "Tag 3"
          ],
          "input_format": "Expected valid values are XML, JSON or CSV",
          "input_topic": "input/topic",
          "data_source_type": "STREAMING_MQTT",
          "host": "IP or FQDN where the MQTT broker is reachable",
          "port": Port number where the MQTT broker is reachable (integer, without double quotes),
          "protocol": "Protocol that must be used for the communication with the MQTT broker. Expected values are tcp or udp"
        }
      }' \
  -F 'mappings_file=@path_to_mappings_file'

Once the onboarding has completed, a JSON object with details about the Data Product will be returned in response.

3.2 Reading existing Data Products

There are two HTTP GET methods available for getting information about the existing Data Products:

3.2.1 Read all existing Data Products

This method returns a list with details of all existing Data Products. It can be executed using Swagger UI or by sending the following HTTP GET request:

curl -X 'GET' 'http://localhost:<port>/dataProducts' \
  -H 'accept: application/json'

3.2.2 Read an existing Data Product

This method returns the details of an existing Data Product which ID is passed as parameter. It can be executed using Swagger UI or by sending the following HTTP GET request:

curl -X 'GET' 'http://localhost:<port>/dataProducts/{data_product_id}' \
  -H 'accept: application/json'

Replace {data_product_id} with the Data Product ID that was returned during the onboarding process.

3.3 Deleting Data Products

There are two HTTP DELETE methods available for deleting Data Products:

3.3.1 Delete all existing Data Products

This method deletes all existing Data Products. It can be executed using Swagger UI or by sending the following HTTP DELETE request:

curl -X 'DELETE' 'http://localhost:<port>/dataProducts'

3.3.2 Delete an existing Data Product

This method deletes only the Data Product which ID is passed as parameter. It can be executed using Swagger UI or by sending the following HTTP DELETE request:

curl -X 'DELETE' 'http://localhost:<port>/dataProducts/{data_product_id}'

Replace {data_product_id} with the Data Product ID that was returned during the onboarding process.

Acknowledgements and authors

  • Telefónica I+D (TID): Ignacio Domínguez Martínez-Casanueva and Lucía Cabanillas Rodríguez.

  • Universidad Politécnica de Madrid (UPM): Luis Bellido Triana and David Martínez García.