DC

Documentation: https://minipada.github.io/ros2_data_collection

Source code: https://github.com/minipada/ros2_data_collection

ROS 2 python C++

codecov tests

Humble
Docker
Format
Documentation
Github Pages
CI

For detailed instructions:

Introduction

The DC (Data Collection) project aims at integrating data collection pipelines into ROS 2. The goal is to integrate data collection pipelines with existing APIs to enable data analytics, rather than live monitoring, which already has excellent tools available. As companies increasingly turn to autonomous robots, the ability to understand and improve operations for any type of machine in any environment has become crucial. This involves mostly pick and drop and inspection operations. This framework aims at helping collecting, validating (through JSON schemas) and sending reliably the data to create such APIs and dashboards.

DC uses a modular approach, based on pluginlib and greatly inspired by Nav2 for its architecture. Pluginlib is used to configure which measurements are collected and where the data goes. Measurements and destinations are pluginlib plugins. In addition to pluginlib, most plugins use Fluent Bit in the backend: Fluent Bit is a super fast, lightweight, and highly scalable logging and metrics processor and forwarder. It is the preferred choice for cloud and containerized environments. Developed and interfaced in C, it has already many features we directly can use, especially: high performance, reliability and data integrity (backpressure handling and data buffering in memory and filesystem).

Why collect data from robots?

  1. Performance Monitoring: Collecting data from a robot allows you to monitor its performance and identify areas for improvement. For example, you can use data to analyze the robot's motion and identify areas where it may be experiencing issues or inefficiencies.
  2. Fault Diagnosis: Data collection can also be used to diagnose faults and troubleshoot issues with the robot. By collecting data on various aspects of the robot's behavior, you can identify patterns or anomalies that may indicate problems with the system.
  3. Machine Learning: Data collected from robots can be used to train machine learning models, which can be used to improve the robot's performance and behavior. For example, you can use data collected from sensors to train models for object detection or path planning.
  4. Research and Development: Data collection is important for research and development in robotics. By collecting data on the behavior of robots in different scenarios, researchers can gain insights into how robots can be designed and optimized for different applications.
  5. Inventory Management: Data collection can be used to monitor inventory levels and track the movement of goods within a warehouse. This can help managers identify which products are in high demand and optimize the placement of products to improve order fulfillment times.
  6. Resource Allocation: Data collection can also help managers allocate resources more efficiently. For example, by monitoring the movement of people and goods within a warehouse, managers can identify bottlenecks and areas of congestion and adjust staffing and equipment allocation to address these issues.
  7. Process Improvement: Data collection can be used to monitor and analyze the performance of various processes within a warehouse. By identifying areas of inefficiency or errors, managers can develop strategies for improving these processes and increasing productivity.
  8. Predictive Maintenance: Data collection can be used to monitor the performance of equipment and identify potential maintenance issues before they occur. This can help managers schedule maintenance more effectively and avoid costly downtime due to equipment failure.

Main features

  • Open source: Currently all tools on the market are not open source. This project is in MPL-2.0 license, in summary you can use without asking permission and without paying
  • Modular approach: based on pluginlib and greatly inspired by Nav2 for its architecture
  • Reliable data collection: validate and send data to create APIs and dashboards
  • Flexible data collection: set polling interval for each measurement or collect every measurement with StringStamped messages
  • Customizable validation: validate data using existing or customized JSON schemas
  • Easy to extend: add new measurements or destinations by simply adding a plugin
  • Flexible data collection conditions: collect data based on conditions such as whether the robot is moving or if a field is equal to a value
  • Trigger-based data collection: collect data when a defined set of combination of all, any, or no condition are met
  • Customizable record collection: configure the number of records to collect at the start and when a condition is activated.
  • Data inspection: inspect data from camera input including barcode and QR codes
  • Fast and efficient: high performance, using Fluent Bit for backend processing, and designed to minimize code duplication and reduce human errors
  • Grouped measurements: measurements can be grouped using the group node based on the ApproximateTimeSynchronizer
  • File saving: files can be saved, including map_server maps, camera images, and any file produced by a measurement
  • Easy to use: designed to be easy to learn and use
  • No C++ 3rd party library required: all 3rd party libraries have a vendor package in the repository

And inherited from Fluent Bit:

Here is an example of a pipeline:

flowchart LR
    pl_camera1["Camera bottom"]
    pl_camera2["Camera middle"]
    pl_camera3["Camera top"]
    pl_condition_moving["Moving"]
    pl_cpu["CPU"]
    pl_cmd_vel["Command velocity"]
    pl_memory["Memory"]
    pl_position["Position"]
    pl_speed["Speed"]
    pl_storage["Storage"]
    pl_uptime["Uptime"]
    pl_network["Network"]
    pl_network_boot["Network"]
    pl_os_boot["OS"]

    subgraph m_n["Measurement node"]
        subgraph cond["Condition plugins"]
            pl_condition_moving
        end
        subgraph measurements["Measurement plugins"]
            pl_camera1
            pl_camera2
            pl_camera3
            pl_cpu
            pl_cmd_vel
            pl_memory
            pl_position
            pl_speed
            pl_storage
            pl_uptime
            pl_network
            pl_network_boot
            pl_os_boot
        end
    end

    subgraph g_n["Group node"]
        gr_boot_system["System (boot)"]
        gr_system["System"]
        gr_robot["Robot"]
        gr_inspection["Inspection"]
    end

    subgraph d_n["Destination node"]
        pl_pgsql["PostgreSQL"]
        pl_minio["Minio"]
        pl_s3["S3"]
    end

    pl_camera1 -- if not --> pl_condition_moving --> gr_inspection
    pl_camera2 -- if not --> pl_condition_moving --> gr_inspection
    pl_camera3 -- if not --> pl_condition_moving --> gr_inspection
    pl_cpu --> gr_system
    pl_memory --> gr_system
    pl_uptime --> gr_boot_system
    pl_network_boot --> gr_boot_system
    pl_os_boot --> gr_boot_system
    pl_storage --> gr_system
    pl_cmd_vel --> gr_robot
    pl_position --> gr_robot
    pl_speed --> gr_robot

    pl_network -- Network ping and online status --> pl_pgsql
    gr_boot_system -- os, network interfaces\n, permissions and uptime --> pl_pgsql
    gr_robot -- Robot cmd_vel, position. speed --> pl_pgsql
    gr_system -- Available space,\n memory used and cpu usage --> pl_pgsql
    gr_inspection -- Image paths on s3 and minio --> pl_pgsql
    gr_inspection -- Raw, rotated and/or inspected images --> pl_minio
    gr_inspection -- Raw, rotated and/or inspected images --> pl_s3

License

This program is under the terms of the Mozilla Public License Version 2.0.

About and Contact

For any inquiry, please contact David (d.bensoussan@proton.me). If your inquiry relates to bugs or open-source feature requests, consider posting a ticket on our GitHub project. If your inquiry relates to configuration support or private feature development, reach out and we will be able to support you in your projects.

Setup

Use docker

Available images

Docker images with latest code are available on the Docker public registry:

ImageDescription
minipada/ros2_data_collection:humble-ciROS base image augmented with all DC dependencies to use for CI
minipada/ros2_data_collection:humble-ci-testingCI image using the ROS testing repository
minipada/ros2_data_collection:humble-sourceDC source compiled
minipada/ros2_data_collection:humble-source-simDC source compiled with all simulation packages
minipada/ros2_data_collection:humble-docDocumentation

Get any by running:

docker pull minipada/ros2_data_collection:<TAG>

Docker workflow

When developing, use the source-sim image with a docker compose file. The latest one is available on the repository:

version: "3.9"

services:
  ros2-data-collection:
    image: minipada/ros2_data_collection:humble-source-sim
    privileged: true
    container_name: ros2-data-collection
    environment:
      - QT_X11_NO_MITSHM=1
      - NVIDIA_VISIBLE_DEVICES=all
      - NVIDIA_DRIVER_CAPABILITIES=compute,utility,display
      - DISPLAY=unix$DISPLAY
      - XAUTHORITY=$XAUTHORITY
    volumes:
      - ${HOME}/.Xauthority:/root/.Xauthority
      - /tmp/.X11-unix:/tmp/.X11-unix
      - /var/run/docker.sock:/var/run/docker.sock
      - ${PWD}:/root/ws/src/ros2_data_collection
    network_mode: "host"
    restart: "unless-stopped"
    stop_grace_period: "3s"
    runtime: nvidia
    command: tail -F anything
    working_dir: /root/ws

Then, to allow GUI (RViz, Gazebo) to work in the container, execute:

xhost +

Then start the container:

docker compose up -d

And get in the container with:

docker exec -it ros2-data-collection /ros_entrypoint.sh bash

Build from source

Download the repository

Given that there is no apt packages available, you will need to build from source.

Download the repository in your workspace

git clone github.com/minipada/ros2_data_collection.git

Install dependencies

Install system and C/C++ dependencies

rosdep install --from-paths src --ignore-src -r -y

Some packages are external C++ packages but a vendor package has been included in the repository so colcon will handle it. In addition, since this set of packages has no python external dependencies, you won't need anything else.

Install python dependencies

This project uses poetry to manage python dependencies. It is easy to use and it is possible to set each package version like in a requirements.txt and manage multiple python environments. If you have poetry on your machine, you can execute:

poetry install

If not, you can install python dependencies from the provided requirements.txt:

pip3 install -r requirements.txt

Build

colcon build

Run

source install/setup.bash
ros2 launch dc_bringup bringup.launch.py

Issues

If you run into any issues when building ROS 2 Data Collection, you can use the search tool in the issues tab on GitHub and always feel free to open a ticket.

Demos

We will go together through some demos to get started with DC. You shall find them in the dc_demos package

Warning

Make sure you built and sourced the workspace.

Info

Have you checked the configuration examples before running the demos? They will help understand how the demo configuration work.

TitleDescription
UptimeCollect how long the system has been running and print it on Stdout. Minimal example
Group memory and uptimeCollect both memory and uptime and group them in a dictionary
Turtlebot3 StdoutCollect command velocity, map, position and speed and print it in stdout
Turtlebot3 AWS Warehouse MinIO PostgreSQLCollect system, robot, environment and infrastructure data and send it to MinIO and PostgreSQL
Turtlebot3 AWS Warehouse InfluxDBCollect system, robot, environment and infrastructure data and send it to InfluxDB
Turtlebot3 QR codesCollect QR codes and images
Custom pluginCreate an external plugin

Note that each demo assumes concepts explained in previous demos will be acknowledged.

Uptime to stdout

This is the most minimal example to run DC, it collects the system uptime every 5 seconds and sends it to Stdout.

Let's run it:

ros2 launch dc_demos uptime_stdout.launch.py

At the end, the data is displayed:

[component_container_isolated-1] [{"date":1677668906.745817,"time":92395,"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"218"}]
[component_container_isolated-1] [{"date":1677668911.700309,"time":92400,"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"218"}]
[component_container_isolated-1] [{"date":1677668916.70031,"time":92405,"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"218"}]
[component_container_isolated-1] [{"date":1677668921.700388,"time":92410,"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"218"}]
[component_container_isolated-1] [{"date":1677668926.700422,"time":92415,"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"218"}]

This launchfile is a wrapper of dc_bringup/launch/bringup.launch.py which loads a custom yaml configuration

Configuration

Measurement

measurement_server:
  ros__parameters:
    measurement_plugins: ["uptime"]
    uptime:
      plugin: "dc_measurements/Uptime"
      topic_output: "/dc/measurement/uptime"
      polling_interval: 5000
      enable_validator: true
      debug: true
      tags: ["flb_stdout"]
      init_collect: true
    custom_str_params_list: ["robot_name", "id"]
    custom_str_params:
      robot_name:
        name: robot_name
        value: C3PO
      id:
        name: id
        value_from_file: /etc/machine-id
    run_id:
      enabled: true
      counter: true
      counter_path: "$HOME/run_id"
      uuid: false

measurement_plugins (Mandatory): List all the plugins to enable. This is a custom string that is equal to the measurement plugin dictionary present in the same level. If not listed, will not be loaded.

uptime.plugin (Mandatory): Name of the plugin, if you are not sure which plugin is available, use the CLI tool to list them

uptime.tags (Mandatory): This is used by Fluent Bit, to match inputs and outputs. More here

uptime.polling_interval (Optional): Interval to which data is collected in milliseconds

uptime.enable_validator (Optional): Will validate the data against a JSON schema. This file is located in the dc_measurements package. You can provide your own using the json_schema_path parameter, which we will explore later on

uptime.debug (Optional): More verbose output

uptime.init_collect (Optional): Collect when the node starts instead of waiting for the polling_interval time to pass

run_id.enabled (Optional): Identify which run the robot is. A new one is generated at every start of the node. Uses either a counter that increment at each restart of the node or UUID

run_id.counter (Optional): Enable counter for the run_id

run_id.counter_path (Optional): Path to store the last run. It is expanded with environment variables id

run_id.uuid (Optional): Generate a new run ID by using a random UUID

This will collect the uptime every 5 seconds (including when the node starts), will forward it to the flb_stdout destination.

Inject custom data for each record

Here, we want to append some content in every record: the robot name and its ID. While the robot name comes from a fixed variable in the parameter file, the id comes from the machine-id file.

custom_str_params_list (Optional): Look for those keys in this configuration to add them as keys and values in each record.

custom_str_params.robot_name (Optional): This parameter is loaded since it is mentioned in custom_str_params_list

custom_str_params.robot_name.name (Optional): Key in the dictionary to add

custom_str_params.robot_name.value (Optional): Value associated to the key in the dictionary to add

custom_str_params.id.name (Optional): Key in the dictionary to add

custom_str_params.id.value_from_file (Optional): Value associated to the key in the dictionary to add taken from the content of a file

Info

Note that this configuration alone, sent to Fluent Bit will not display the JSON on stdout since it requires the destination_server configuration

Find the complete measurements documentation here

Destination

destination_server:
  ros__parameters:
    flb:
      flush: 1
      flb_grace: 1
      log_level: "info"
      storage_path: "/var/log/flb-storage/"
      storage_sync: "full"
      storage_checksum: "off"
      storage_backlog_mem_limit: "1M"
      scheduler_cap: 200
      scheduler_base: 5
      http_server: true
      http_listen: "0.0.0.0"
      http_port: 2020
      in_storage_type: "filesystem"
      in_storage_pause_on_chunks_overlimit: "off"
    destination_plugins: ["flb_stdout"]
    flb_stdout:
      plugin: "dc_destinations/FlbStdout"
      inputs: ["/dc/measurement/uptime"]
      time_format: "double"
      time_key: "date"
      debug: false

Destinations

Let's analyze piece by piece. First, we select the plugin to enable, create its section containing the plugin name and the topic to subscribe to. We need the topic list because in the background, the ROS 2 Fluent Bit plugin subscribes to each topic.

destination_plugins (Mandatory): List all the plugins to enable. This is a custom string that is equal to the destination plugin dictionary present in the same level. If not listed, will not be loaded.

flb_stdout.plugin (Mandatory): Plugin to load

flb_stdout.inputs (Mandatory): Topics to which to listen to get the data

flb_stdout.time_format (Optional): Format the data will be printed

flb_stdout.time_key (Optional): Dictionary key from which date will be taken from

flb_stdout.debug (Optional): Verbose output

Fluent Bit

Then, we configure Fluent Bit. This is not necessary but it is easy to do by using the ros parameters provided by the node.

flb.flush (Optional): Interval to flush output (seconds)

flb.flb_grace (Optional): Wait time (seconds) on exit

flb.log_level (Optional): Diagnostic level (error/warning/info/debug/trace)

flb.storage_path (Optional): Set an optional location in the file system to store streams and chunks of data. If this parameter is not set, Input plugins can only use in-memory buffering.

flb.storage_sync (Optional): Configure the synchronization mode used to store the data into the file system. It can take the values normal or full.

flb.storage_checksum (Optional): Enable the data integrity check when writing and reading data from the filesystem. The storage layer uses the CRC32 algorithm.

flb.storage_backlog_mem_limit (Optional): If storage.path is set, Fluent Bit will look for data chunks that were not delivered and are still in the storage layer, these are called backlog data. This option configure a hint of maximum value of memory to use when processing these records.

flb.scheduler_cap (Optional): Set a maximum retry time in seconds. The property is supported from v1.8.7.

flb.scheduler_base (Optional): Set a base of exponential backoff. The property is supported from v1.8.7.

flb.http_server (Optional): If true enable statistics HTTP server

flb.http_listen (Optional): Address to listen (e.g. 0.0.0.0)

flb.http_port (Optional): Port to listen (e.g. 8888)

flb.in_storage_type (Optional): Specifies the buffering mechanism to use. It can be memory or filesystem.

flb.in_storage_pause_on_chunks_overlimit (Optional): Specifies if file storage is to be paused when reaching the chunk limit.

Inject run id at each record

Finally, we set the run id. This is used later on when fetching data for a run. It can come from a counter which is incremented at each start of the node or from a random UUID generated. The counter mechanism writes and read on a file on the system (take care of not deleting it), you can set its path as a parameter.

Find the complete destinations documentation here

Console output

Now that the node started, let us see what's displayed in the console

[component_container_isolated-1] [INFO] [1677668906.621492168] [dc_container]: Load Library: /root/ws/install/dc_measurements/lib/libmeasurement_server_core.so
[component_container_isolated-1] [INFO] [1677668906.634130562] [dc_container]: Found class: rclcpp_components::NodeFactoryTemplate<measurement_server::MeasurementServer>
[component_container_isolated-1] [INFO] [1677668906.634246977] [dc_container]: Instantiate class: rclcpp_components::NodeFactoryTemplate<measurement_server::MeasurementServer>
[component_container_isolated-1] [INFO] [1677668906.655803692] [measurement_server]:
	measurement_server lifecycle node launched.
	Waiting on external lifecycle transitions to activate
	See https://design.ros2.org/articles/node_lifecycle.html for more information.
[component_container_isolated-1] [INFO] [1677668906.670233932] [dc_container]: Load Library: /root/ws/install/dc_destinations/lib/libdestination_server_core.so
[component_container_isolated-1] [INFO] [1677668906.675826543] [dc_container]: Found class: rclcpp_components::NodeFactoryTemplate<destination_server::DestinationServer>
[component_container_isolated-1] [INFO] [1677668906.675864319] [dc_container]: Instantiate class: rclcpp_components::NodeFactoryTemplate<destination_server::DestinationServer>
[component_container_isolated-1] [INFO] [1677668906.680499515] [destination_server]:
[component_container_isolated-1] 	destination_server lifecycle node launched.
[component_container_isolated-1] 	Waiting on external lifecycle transitions to activate
[component_container_isolated-1] 	See https://design.ros2.org/articles/node_lifecycle.html for more information.
[INFO] [launch_ros.actions.load_composable_nodes]: Loaded node '/destination_server' in container 'dc_container'
[component_container_isolated-1] [INFO] [1677668906.684802172] [dc_container]: Load Library: /opt/ros/humble/lib/libnav2_lifecycle_manager_core.so
[component_container_isolated-1] [INFO] [1677668906.685487672] [dc_container]: Found class: rclcpp_components::NodeFactoryTemplate<nav2_lifecycle_manager::LifecycleManager>
[component_container_isolated-1] [INFO] [1677668906.685510016] [dc_container]: Instantiate class: rclcpp_components::NodeFactoryTemplate<nav2_lifecycle_manager::LifecycleManager>
[component_container_isolated-1] [INFO] [1677668906.691075881] [lifecycle_manager_navigation]: Creating
[INFO] [launch_ros.actions.load_composable_nodes]: Loaded node '/lifecycle_manager_navigation' in container 'dc_container'
[component_container_isolated-1] [INFO] [1677668906.692954854] [lifecycle_manager_navigation]: Creating and initializing lifecycle service clients
[component_container_isolated-1] [INFO] [1677668906.695025050] [lifecycle_manager_navigation]: Starting managed nodes bringup...

Measurement server and destination server are starting in the Lifecycle, you can read more about it here

Afterward, the measurement_server starts:

[component_container_isolated-1] [INFO] [1677668906.695056624] [lifecycle_manager_navigation]: Configuring measurement_server
[component_container_isolated-1] [INFO] [1677668906.695181755] [measurement_server]: Configuring
[component_container_isolated-1] [INFO] [1677668906.698019687] [measurement_server]: Creating measurement plugin uptime: Type dc_measurements/Uptime, Group key: uptime, Polling interval: 5000, Debug: 1, Validator enabled: 1, Schema path: , Tags: [flb_stdout], Init collect: 1, Init Max measurement: 0, Include measurement name: 0, Include measurement plugin name: 0, Remote keys: , Remote prefixes: , Include measurement plugin name: 0, Max measurement on condition: 0, If all condition: , If any condition: , If none condition:
[component_container_isolated-1] [INFO] [1677668906.698932685] [measurement_server]: Configuring uptime
[component_container_isolated-1] [INFO] [1677668906.699937310] [measurement_server]: Done configuring uptime
[component_container_isolated-1] [INFO] [1677668906.700162355] [measurement_server]: Looking for schema at /root/ws/install/dc_measurements/share/dc_measurements/plugins/measurements/json/uptime.json
[component_container_isolated-1] [INFO] [1677668906.700192161] [measurement_server]: schema: {"$schema":"http://json-schema.org/draft-07/schema#","description":"Time the system has been up","properties":{"time":{"description":"Time the system has been up","minimum":0,"type":"integer"}},"title":"Uptime","type":"object"}

Plugins are loaded one by one (here only one is, the uptime one) and configured. The configuration for each is displayed and the validation schema is also loaded and its path printed.

Then, the destination_server starts:

[component_container_isolated-1] [INFO] [1677668906.700528260] [lifecycle_manager_navigation]: Configuring destination_server
[component_container_isolated-1] [INFO] [1677668906.700620104] [destination_server]: Configuring
[component_container_isolated-1] [INFO] [1677668906.701246047] [destination_server]: Fluent Bit service initialized
[component_container_isolated-1] [INFO] [1677668906.701470896] [destination_server]: Creating destination plugin flb_stdout: Type dc_destinations/FlbStdout, Debug: 0, Time format: double. Time key: date
[component_container_isolated-1] [INFO] [1677668906.702670279] [destination_server]: Configuring Flb plugin flb_stdout
[component_container_isolated-1] [INFO] [1677668906.702871127] [destination_server]: Loaded lua filter. Match=ros2, code=function concatenate(tag, timestamp, record) if (type(record["tags"]) == "table") then record["tags"] = table.concat(record["tags"], ",") end  return 2, timestamp, record end
[component_container_isolated-1] [INFO] [1677668906.702912686] [destination_server]: Loaded rewrite_tag filter. Match=ros2, Rule=$tags .*(flb_stdout).* flb_stdout true
[component_container_isolated-1] [INFO] [1677668906.703021628] [destination_server]: Done configuring Flb plugin flb_stdout
[component_container_isolated-1] [INFO] [1677668906.703049312] [destination_server]: Loading input ros2 shared library /root/ws/install/fluent_bit_plugins/lib/flb-in_ros2.so...
[component_container_isolated-1] [INFO] [1677668906.703818806] [destination_server]: Loaded input ros2 shared library /root/ws/install/fluent_bit_plugins/lib/flb-in_ros2.so
[component_container_isolated-1] [INFO] [1677668906.703899346] [destination_server]: Flb ros2 plugin initialized. ret=0
[component_container_isolated-1] [INFO] [1677668906.703908977] [destination_server]: Starting Flb engine...
[component_container_isolated-1] [2023/03/01 11:08:26] [ info] [fluent bit] version=2.0.7, commit=1ab360f79c, pid=31126
[component_container_isolated-1] [2023/03/01 11:08:26] [ info] [storage] ver=1.3.0, type=memory+filesystem, sync=full, checksum=off, max_chunks_up=128
[component_container_isolated-1] [2023/03/01 11:08:26] [ info] [storage] backlog input plugin: storage_backlog.1
[component_container_isolated-1] [2023/03/01 11:08:26] [ info] [cmetrics] version=0.5.7
[component_container_isolated-1] [2023/03/01 11:08:26] [ info] [ctraces ] version=0.2.5
[component_container_isolated-1] [2023/03/01 11:08:26] [ info] [input:ros2:ros2.0] initializing
[component_container_isolated-1] [2023/03/01 11:08:26] [ info] [input:ros2:ros2.0] storage_strategy='filesystem' (memory + filesystem)
[component_container_isolated-1] [2023/03/01 11:08:26] [ info] Started node fluentbit_rclc
[component_container_isolated-1] [2023/03/01 11:08:26] [ info] Created subscriber /dc/measurement/uptime
[component_container_isolated-1] [2023/03/01 11:08:26] [ info] [input:storage_backlog:storage_backlog.1] initializing
[component_container_isolated-1] [2023/03/01 11:08:26] [ info] [input:storage_backlog:storage_backlog.1] storage_strategy='memory' (memory only)
[component_container_isolated-1] [2023/03/01 11:08:26] [ info] [input:storage_backlog:storage_backlog.1] queue memory limit: 976.6K
[component_container_isolated-1] [2023/03/01 11:08:26] [ info] [input:emitter:emitter_for_rewrite_tag.2] initializing
[component_container_isolated-1] [2023/03/01 11:08:26] [ info] [input:emitter:emitter_for_rewrite_tag.2] storage_strategy='filesystem' (memory + filesystem)
[component_container_isolated-1] [2023/03/01 11:08:26] [ info] [output:stdout:stdout.0] worker #0 started
[component_container_isolated-1] [2023/03/01 11:08:26] [ info] [http_server] listen iface=0.0.0.0 tcp_port=2020
[component_container_isolated-1] [2023/03/01 11:08:26] [ info] [sp] stream processor started
[component_container_isolated-1] [INFO] [1677668906.744577163] [destination_server]: Started Flb engine

It starts the FlbStdout destination plugin, filters used internally by Fluent to edit Timestamps and tags. It then loads the ROS 2 Fluent Bit shared library and initialize it with the topics we provided as parameter.

Afterward, Fluent Bit starts, it prints its version, storage strategy and buffer configuration and lists which plugins are loaded.

Then, the measurement and destination server nodes are activated:

[component_container_isolated-1] [INFO] [1677668906.745180268] [lifecycle_manager_navigation]: Activating measurement_server
[component_container_isolated-1] [INFO] [1677668906.745462265] [measurement_server]: Activating
[component_container_isolated-1] [INFO] [1677668906.745558665] [measurement_server]: Activating uptime
[component_container_isolated-1] [INFO] [1677668906.746014639] [measurement_server]: Creating bond (measurement_server) to lifecycle manager.
[component_container_isolated-1] [INFO] [1677668906.853199899] [lifecycle_manager_navigation]: Server measurement_server connected with bond.
[component_container_isolated-1] [INFO] [1677668906.853316226] [lifecycle_manager_navigation]: Activating destination_server
[component_container_isolated-1] [INFO] [1677668906.853620571] [destination_server]: Activating
[component_container_isolated-1] [INFO] [1677668906.853722659] [destination_server]: Activating flb_stdout
[component_container_isolated-1] [INFO] [1677668906.853796078] [destination_server]: Creating bond (destination_server) to lifecycle manager.
[component_container_isolated-1] [INFO] [1677668906.961666207] [lifecycle_manager_navigation]: Server destination_server connected with bond.
[component_container_isolated-1] [INFO] [1677668906.961777773] [lifecycle_manager_navigation]: Managed nodes are active
[component_container_isolated-1] [INFO] [1677668906.961820078] [lifecycle_manager_navigation]: Creating bond timer...

Finally, we see the data:

[component_container_isolated-1] [{"date":1677668906.745817,"time":92395,"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"218"}]
[component_container_isolated-1] [{"date":1677668911.700309,"time":92400,"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"218"}]
[component_container_isolated-1] [{"date":1677668916.70031,"time":92405,"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"218"}]
[component_container_isolated-1] [{"date":1677668921.700388,"time":92410,"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"218"}]
[component_container_isolated-1] [{"date":1677668926.700422,"time":92415,"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"218"}]

So...what happened?

  1. The measurement plugin starts publishing data to /dc/measurement/uptime, which contains the JSON, tags and timestamp of the message
  2. Run ID and robot_name is appended in the JSON
  3. The ROS 2 Fluent Bit plugin, which subscribes to this topic, receive the data and pass it on to Fluent Bit
  4. Fluent Bit receives it, applies the timestamp filter (which modifies the timestamp to the desired format)
  5. Fluent Bit applies changes the tag with another filter. This tag is used to match to the destination afterward.
  6. Data is flushed
  7. The Fluent Bit stdout output plugin receives the data because tags match (the flb_stdout tag is used) and forwards it to Stdout

Group memory and uptime

This demo will introduce the group node. It subscribes to multiple nodes and group for each its data and republishes on a new topic.

Let's run it:

ros2 launch dc_demos group_memory_uptime_stdout.launch.py
[component_container_isolated-1] [{"memory":{"used":71.67569732666016},"date":1677673405.921659,"uptime":{"time":96894},"run_id":"221"}]
[component_container_isolated-1] [{"memory":{"used":71.70790100097656},"date":1677673408.868847,"uptime":{"time":96897},"run_id":"221"}]
[component_container_isolated-1] [{"memory":{"used":71.72582244873047},"date":1677673411.869299,"uptime":{"time":96900},"run_id":"221"}]
[component_container_isolated-1] [{"memory":{"used":71.86643218994141},"date":1677673414.869292,"uptime":{"time":96903},"run_id":"221"}]
[component_container_isolated-1] [{"memory":{"used":71.82553863525391},"date":1677673417.869207,"uptime":{"time":96906},"run_id":"221"}]

This launchfile is a wrapper of dc_bringup/launch/bringup.launch.py which loads a custom yaml configuration

Info

Note that here the group node is started. It is one parameter in the launchfile to enable it. In the uptime demo, it is disabled by default because it is not used.

Configuration

Measurement

We collect data from 2 plugins: memory and uptime. The first every second and the latter every 3.

measurement_server:
  ros__parameters:
    measurement_plugins: ["memory", "uptime"]
    memory:
      plugin: "dc_measurements/Memory"
      group_key: "memory"
      topic_output: "/dc/measurement/memory"
      polling_interval: 1000
    uptime:
      plugin: "dc_measurements/Uptime"
      group_key: "uptime"
      topic_output: "/dc/measurement/uptime"
      polling_interval: 3000

Data is now published on 2 ROS topics: /dc/measurement/uptime and /dc/measurement/memory.

The group_key mentioned will be used by the group node to assign a key in the new dictionary

Group

This create a memory_uptime group, subscribes to /dc/measurement/memory and /dc/measurement/uptime topics and republish the result on /dc/group/memory_uptime. The sync_delay allows to wait in a 5 seconds window timeframe the data from each topic before throwing away the data if one topic does not publish it

group_server:
  ros__parameters:
    groups: ["memory_uptime"]
    memory_uptime:
      inputs: ["/dc/measurement/memory", "/dc/measurement/uptime"]
      output: "/dc/group/memory_uptime"
      sync_delay: 5.0
      group_key: "memory_uptime"
      tags: ["flb_stdout"]
      include_group_name: false

include_group_name (Optional): Will include the name of the group in the JSON, it makes it easier later on to fetch the data from your API. In this case, we can skip it since we only have one group to show.

You can also notice that the group also has a "group_key". It means a group can be part of another.

Destination

Here, we only subscribe to the /dc/group/memory_uptime topic

    flb_stdout:
      plugin: "dc_destinations/FlbStdout"
      inputs: ["/dc/group/memory_uptime"]

Console output

In the terminal, you can see the result, published every 3 seconds (see the date field), which the is timeframe defined by sync_delay and the maximum polling_interval of the measurements.

Finally, note the new dictionary uses the key defined in the group_key measurement_server plugin configuration. They are transferred through the ROS message.

[component_container_isolated-1] [{"memory":{"used":71.67569732666016},"date":1677673405.921659,"uptime":{"time":96894},"run_id":"221"}]
[component_container_isolated-1] [{"memory":{"used":71.70790100097656},"date":1677673408.868847,"uptime":{"time":96897},"run_id":"221"}]
[component_container_isolated-1] [{"memory":{"used":71.72582244873047},"date":1677673411.869299,"uptime":{"time":96900},"run_id":"221"}]
[component_container_isolated-1] [{"memory":{"used":71.86643218994141},"date":1677673414.869292,"uptime":{"time":96903},"run_id":"221"}]
[component_container_isolated-1] [{"memory":{"used":71.82553863525391},"date":1677673417.869207,"uptime":{"time":96906},"run_id":"221"}]

Turtlebot3

In this example, we add a robot and start collecting robot data to Stdout.

You will also need 2 terminal windows, to:

  1. Run the Nav2 turtlebot3 launchfile: it starts localization, navigation and RViz
  2. Run DC

Since RViz is pretty verbose, using 2 terminal windows will help reading the JSON printed on the terminal window.

Setup the environment

In each, terminal, source your environment and setup turtlebot configuration:

source /opt/ros/humble/setup.bash
source install/setup.bash
export GAZEBO_MODEL_PATH=$GAZEBO_MODEL_PATH:/opt/ros/humble/share/turtlebot3_gazebo/models
export TURTLEBOT3_MODEL=waffle

Start Navigation

Then, start the Turtlebot launchfile:

ros2 launch nav2_bringup tb3_simulation_launch.py headless:=False

RViz and Gazebo will start: you should now see the robot in Gazebo, and the map on RViz.

Set the robot position using the "2D Pose Estimate" button.

Info

If any problem occur, please take a look at the nav2 official documentation which covers the case.

Start DC

Execute

ros2 launch dc_demos tb3_simulation_stdout.launch.py

At the end, the data is displayed:

[component_container_isolated-1] [{"date":1677690777.96911,"width":384,"height":384,"remote_paths":{"minio":{"pgm":"C3PO/2023/03/01/17/map/2023-03-01T17:12:57.pgm","yaml":"C3PO/2023/03/01/17/map/2023-03-01T17:12:57.yaml"}},"resolution":0.05000000074505806,"origin":{"x":-10,"y":-10},"local_paths":{"pgm":"/root/dc_data/C3PO/2023/03/01/17/map/2023-03-01T17:12:57.pgm","yaml":"/root/dc_data/C3PO/2023/03/01/17/map/2023-03-01T17:12:57.yaml"},"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"234"}]
[component_container_isolated-1] [{"date":1677690782.883036,"width":384,"height":384,"remote_paths":{"minio":{"pgm":"C3PO/2023/03/01/17/map/2023-03-01T17:13:02.pgm","yaml":"C3PO/2023/03/01/17/map/2023-03-01T17:13:02.yaml"}},"resolution":0.05000000074505806,"origin":{"x":-10,"y":-10},"local_paths":{"pgm":"/root/dc_data/C3PO/2023/03/01/17/map/2023-03-01T17:13:02.pgm","yaml":"/root/dc_data/C3PO/2023/03/01/17/map/2023-03-01T17:13:02.yaml"},"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"234"}]
[component_container_isolated-1] [{"position":{"x":-0.7254206029057992,"yaw":0.134098723062189,"tags":["flb_stdout"],"y":-0.5116378019627142},"cmd_vel":{"tags":["flb_stdout"],"linear":{"x":0.26,"z":0,"y":0},"angular":{"x":0,"z":-0.157895,"y":0}},"speed":{"tags":["flb_stdout"],"linear":{"x":8.720555295508514e-05,"z":0,"y":4.219923511090644e-06},"computed":8.730759543499989e-05,"angular":{"x":-0.0002958005596118955}},"date":1677690787.878526,"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"234"}]
[component_container_isolated-1] [{"date":1677690787.883072,"width":384,"height":384,"remote_paths":{"minio":{"pgm":"C3PO/2023/03/01/17/map/2023-03-01T17:13:07.pgm","yaml":"C3PO/2023/03/01/17/map/2023-03-01T17:13:07.yaml"}},"resolution":0.05000000074505806,"origin":{"x":-10,"y":-10},"local_paths":{"pgm":"/root/dc_data/C3PO/2023/03/01/17/map/2023-03-01T17:13:07.pgm","yaml":"/root/dc_data/C3PO/2023/03/01/17/map/2023-03-01T17:13:07.yaml"},"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"234"}]
[component_container_isolated-1] [{"position":{"x":-0.1931822640325796,"yaw":-0.08553498481283986,"tags":["flb_stdout"],"y":-0.5267276846092974},"cmd_vel":{"tags":["flb_stdout"],"linear":{"x":0.232632,"z":0,"y":0},"angular":{"x":0,"z":-0.0526316,"y":0}},"speed":{"tags":["flb_stdout"],"linear":{"x":0.2466804589428482,"z":0,"y":4.147463233418404e-05},"computed":0.2466804624294339,"angular":{"x":-0.1575310923819759}},"date":1677690789.878385,"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"234"}]
[component_container_isolated-1] [{"position":{"x":-0.06732902212346159,"yaw":-0.06199357549188581,"tags":["flb_stdout"],"y":-0.5350783704643411},"cmd_vel":{"tags":["flb_stdout"],"linear":{"x":0.191579,"z":0,"y":0},"angular":{"x":0,"z":-0.0526316,"y":0}},"speed":{"tags":["flb_stdout"],"linear":{"x":0.2464715530337069,"z":0,"y":1.735682541426087e-05},"computed":0.2464715536448513,"angular":{"x":-0.05272644093984532}},"date":1677690790.878357,"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"234"}]
[component_container_isolated-1] [{"position":{"x":0.1426837169393564,"yaw":-0.1292500346335131,"tags":["flb_stdout"],"y":-0.5883037844577782},"cmd_vel":{"tags":["flb_stdout"],"linear":{"x":0,"z":0,"y":0},"angular":{"x":0,"z":0,"y":0}},"speed":{"tags":["flb_stdout"],"linear":{"x":0.1917414358977791,"z":0,"y":1.44170547699829e-05},"computed":0.1917414364397889,"angular":{"x":-0.05280628215810287}},"date":1677690791.878987,"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"234"}]
[component_container_isolated-1] [{"position":{"x":0.2658444927274801,"yaw":-0.1669572018597313,"tags":["flb_stdout"],"y":-0.6121168728850158},"cmd_vel":{"tags":["flb_stdout"],"linear":{"x":0,"z":0,"y":0},"angular":{"x":0,"z":0,"y":0}},"speed":{"tags":["flb_stdout"],"linear":{"x":3.645119603853573e-05,"z":0,"y":3.14134397110158e-06},"computed":3.658630528742332e-05,"angular":{"x":-0.0004012554003809027}},"date":1677690792.879082,"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"234"}]
[component_container_isolated-1] [{"date":1677690792.883126,"width":384,"height":384,"remote_paths":{"minio":{"pgm":"C3PO/2023/03/01/17/map/2023-03-01T17:13:12.pgm","yaml":"C3PO/2023/03/01/17/map/2023-03-01T17:13:12.yaml"}},"resolution":0.05000000074505806,"origin":{"x":-10,"y":-10},"local_paths":{"pgm":"/root/dc_data/C3PO/2023/03/01/17/map/2023-03-01T17:13:12.pgm","yaml":"/root/dc_data/C3PO/2023/03/01/17/map/2023-03-01T17:13:12.yaml"},"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"234"}

Given the JSON is quite large, let's analyze 2 different records:

The first one being the data published on the robot group:

[
  {
    "cmd_vel": {
      "linear": {
        "x": 0.246316,
        "y": 0,
        "z": 0
      },
      "angular": {
        "x": 0,
        "y": 0,
        "z": 0.263158
      }
    },
    "date": 1677694799.685468,
    "position": {
      "x": -0.6033772358727438,
      "yaw": -0.7146495585355921,
      "y": -1.633862970534114
    },
    "speed": {
      "angular": {
        "x": -0.0002542699063698451
      },
      "computed": 3.279051750818494e-05,
      "linear": {
        "x": 3.244397182637543e-05,
        "y": 4.754653571390878e-06,
        "z": 0
      }
    },
    "name": "robot",
    "id": "be781e5ffb1e7ee4f817fe7b63e92c32",
    "robot_name": "C3PO",
    "run_id": "240"
  }
]

This record contains the speed, cmd_vel and position from the group "robot". Not that the tags for each measurement have been dropped. This is because they won't be used in this particular group (they can be used as a single measurement)

[
  {
    "width": 384,
    "remote_paths": {
      "minio": {
        "yaml": "C3PO/2023/03/01/18/map/2023-03-01T18:20:16.yaml",
        "pgm": "C3PO/2023/03/01/18/map/2023-03-01T18:20:16.pgm"
      }
    },
    "name": "map",
    "resolution": 0.05000000074505806,
    "origin": {
      "x": -10,
      "y": -10
    },
    "local_paths": {
      "yaml": "/root/dc_data/C3PO/2023/03/01/18/map/2023-03-01T18:20:16.yaml",
      "pgm": "/root/dc_data/C3PO/2023/03/01/18/map/2023-03-01T18:20:16.pgm"
    },
    "date": 1677694816.690489,
    "height": 384,
    "id": "be781e5ffb1e7ee4f817fe7b63e92c32",
    "robot_name": "C3PO",
    "run_id": "240"
  }
]

This record contains the map data from the measurement.

Configuration

Measurement

measurement_server:
  ros__parameters:
    custom_str_params: ["robot_name"]
    robot_name: "C3PO"
    measurement_plugins: ["cmd_vel", "map", "position", "speed"]
    run_id:
      enabled: true
      counter: true
      counter_path: "$HOME/run_id"
      uuid: false
    save_local_base_path: "$HOME/dc_data/"
    all_base_path: "=robot_name/%Y/%m/%d/%H"
    cmd_vel:
      plugin: "dc_measurements/CmdVel"
      tags: ["flb_stdout"]
      group_key: "cmd_vel"
      enable_validator: true
      topic_output: "/dc/measurement/cmd_vel"
    position:
      plugin: "dc_measurements/Position"
      tags: ["flb_stdout"]
      group_key: "position"
      topic_output: "/dc/measurement/position"
      polling_interval: 1000
      enable_validator: true
      init_collect: true
      global_frame: "map"
      robot_base_frame: "base_link"
      transform_timeout: 0.1
    speed:
      plugin: "dc_measurements/Speed"
      tags: ["flb_stdout"]
      group_key: "speed"
      odom_topic: "/odom"
      topic_output: "/dc/measurement/speed"
    map:
      plugin: "dc_measurements/Map"
      tags: ["flb_stdout"]
      group_key: "map"
      polling_interval: 5000
      save_path: "map/%Y-%m-%dT%H:%M:%S"
      topic_output: "/dc/measurement/map"
      save_map_timeout: 4.0
      remote_prefixes: [""]
      remote_keys: ["minio"]

save_local_base_path (Optional): Used as a common base for all saved files from measurement plugins. all_base_path is concatenated to it afterwards for defining the path where files are saved.

all_base_path (Optional): Used as a common base for some measurements to save files. Is concatenated to save_local_base_path. Note the =robot_name, which is later replaced by C3PO (the variable defined in custom_str_params)

map.remote_keys: creates a dictionary inside remote_paths which is named by the strings in this field, which contains fields used by your API to fetch the remote URLs of the files.

Group

group_server:
  ros__parameters:
    groups: ["robot"]
    robot:
      inputs:
        [
          "/dc/measurement/cmd_vel",
          "/dc/measurement/position",
          "/dc/measurement/speed",
        ]
      output: "/dc/group/robot"
      sync_delay: 5.0
      group_key: "robot"
      tags: ["flb_stdout"]
      include_group_name: true

Create a group with data from cmd_vel, position and speed. Even though it appears there is nothing new here, I shall like to precise something important. In the previous demo, we mentioned that if all messages are not received by the group, it will drop it. It matters in this case because cmd_vel is not published all the time in this example (not when it is not moving), this means the data will be collected only when the robot moves (when a controller sends a command).

If you wished to collect the position and the speed constantly, you could take cmd_vel out of this group and add it in the destination.

Destinations

destination_server:
  ros__parameters:
    flb:
      flush: 1
      flb_grace: 1
      log_level: "info"
      storage_path: "/var/log/flb-storage/"
      storage_sync: "full"
      storage_checksum: "off"
      storage_backlog_mem_limit: "1M"
      scheduler_cap: 200
      scheduler_base: 5
      http_server: true
      http_listen: "0.0.0.0"
      http_port: 2020
      in_storage_type: "filesystem"
      in_storage_pause_on_chunks_overlimit: "off"
    destination_plugins: ["flb_stdout"]
    custom_str_params_list: ["robot_name", "id"]
    custom_str_params:
      robot_name:
        name: robot_name
        value: "C3PO"
      # Requires systemd package
      id:
        name: id
        value_from_file: /etc/machine-id
    flb_stdout:
      plugin: "dc_destinations/FlbStdout"
      inputs: ["/dc/group/robot", "/dc/measurement/map"]
      time_format: "double"
      time_key: "date"
      debug: false

Nothing new here, we simply edited the inputs in flb_stdout to ["/dc/group/robot", "/dc/measurement/map"] to get the data from the robot group and the map.

Console output

Now that the node started, let us see what's displayed in the console

[component_container_isolated-1] [INFO] [1677696067.792012776] [dc_container]: Load Library: /root/ws/install/dc_measurements/lib/libmeasurement_server_core.so
[component_container_isolated-1] [INFO] [1677696067.795921257] [dc_container]: Found class: rclcpp_components::NodeFactoryTemplate<measurement_server::MeasurementServer>
[component_container_isolated-1] [INFO] [1677696067.795956318] [dc_container]: Instantiate class: rclcpp_components::NodeFactoryTemplate<measurement_server::MeasurementServer>
[component_container_isolated-1] [INFO] [1677696067.811804437] [measurement_server]:
[component_container_isolated-1] 	measurement_server lifecycle node launched.
[component_container_isolated-1] 	Waiting on external lifecycle transitions to activate
[component_container_isolated-1] 	See https://design.ros2.org/articles/node_lifecycle.html for more information.
[component_container_isolated-1] [INFO] [1677696067.818755098] [measurement_server]: Base save path expanded to /root/dc_data/
[component_container_isolated-1] [INFO] [1677696067.818799565] [measurement_server]: All Base path expanded to C3PO/%Y/%m/%d/%H
[INFO] [launch_ros.actions.load_composable_nodes]: Loaded node '/measurement_server' in container 'dc_container'
[component_container_isolated-1] [INFO] [1677696067.821344112] [dc_container]: Load Library: /root/ws/install/dc_destinations/lib/libdestination_server_core.so
[component_container_isolated-1] [INFO] [1677696067.826089144] [dc_container]: Found class: rclcpp_components::NodeFactoryTemplate<destination_server::DestinationServer>
[component_container_isolated-1] [INFO] [1677696067.826138848] [dc_container]: Instantiate class: rclcpp_components::NodeFactoryTemplate<destination_server::DestinationServer>
[component_container_isolated-1] [INFO] [1677696067.845160790] [destination_server]:
[component_container_isolated-1] 	destination_server lifecycle node launched.
[component_container_isolated-1] 	Waiting on external lifecycle transitions to activate
[component_container_isolated-1] 	See https://design.ros2.org/articles/node_lifecycle.html for more information.
[INFO] [launch_ros.actions.load_composable_nodes]: Loaded node '/destination_server' in container 'dc_container'
[component_container_isolated-1] [INFO] [1677696067.856784495] [dc_container]: Load Library: /opt/ros/humble/lib/libnav2_lifecycle_manager_core.so
[component_container_isolated-1] [INFO] [1677696067.857836219] [dc_container]: Found class: rclcpp_components::NodeFactoryTemplate<nav2_lifecycle_manager::LifecycleManager>
[component_container_isolated-1] [INFO] [1677696067.857877118] [dc_container]: Instantiate class: rclcpp_components::NodeFactoryTemplate<nav2_lifecycle_manager::LifecycleManager>
[component_container_isolated-1] [INFO] [1677696067.875491851] [lifecycle_manager_navigation]: Creating
[INFO] [launch_ros.actions.load_composable_nodes]: Loaded node '/lifecycle_manager_navigation' in container 'dc_container'
[component_container_isolated-1] [INFO] [1677696067.881290824] [lifecycle_manager_navigation]: Creating and initializing lifecycle service clients
[component_container_isolated-1] [INFO] [1677696067.889491127] [lifecycle_manager_navigation]: Starting managed nodes bringup...

Measurement server and destination server are starting in the Lifecycle, you can read more about it here

"Base save path" and "All Base path" are also saved and expanded. Note "=robot_name" has been replaced by C3PO:

[component_container_isolated-1] [INFO] [1677696067.818755098] [measurement_server]: Base save path expanded to /root/dc_data/
[component_container_isolated-1] [INFO] [1677696067.818799565] [measurement_server]: All Base path expanded to C3PO/%Y/%m/%d/%H

Afterward, the measurement_server starts:

[component_container_isolated-1] [INFO] [1677694796.667666978] [lifecycle_manager_navigation]: Configuring measurement_server
[component_container_isolated-1] [INFO] [1677694796.667887325] [measurement_server]: Configuring
[component_container_isolated-1] [INFO] [1677694796.680348817] [measurement_server]: Creating measurement plugin cmd_vel: Type dc_measurements/CmdVel, Group key: cmd_vel, Polling interval: 1000, Debug: 0, Validator enabled: 1, Schema path: , Tags: [flb_stdout], Init collect: 1, Init Max measurement: 0, Include measurement name: 0, Include measurement plugin name: 0, Remote keys: , Remote prefixes: , Include measurement plugin name: 0, Max measurement on condition: 0, If all condition: , If any condition: , If none condition:
[component_container_isolated-1] [INFO] [1677694796.682517594] [measurement_server]: Configuring cmd_vel
[component_container_isolated-1] [INFO] [1677694796.684392643] [measurement_server]: Done configuring cmd_vel
[component_container_isolated-1] [INFO] [1677694796.684643767] [measurement_server]: Looking for schema at /root/ws/install/dc_measurements/share/dc_measurements/plugins/measurements/json/cmd_vel.json
[component_container_isolated-1] [INFO] [1677694796.684685350] [measurement_server]: schema: {"$defs":{"vector3":{"properties":{"x":{"description":"X speed","type":"number"},"y":{"description":"Y speed","type":"number"},"z":{"description":"Z speed","type":"number"}},"type":"object"}},"$schema":"http://json-schema.org/draft-07/schema#","description":"Command velocity sent to the robot","properties":{"angular":{"description":"Angular velocity as a vector","items":{"$ref":"#/$defs/vector3"},"type":"object"},"linear":{"description":"Linear velocity as a vector","items":{"$ref":"#/$defs/vector3"},"type":"object"}},"title":"Cmd_vel","type":"object"}
[component_container_isolated-1] [INFO] [1677694796.688143164] [measurement_server]: Creating measurement plugin map: Type dc_measurements/Map, Group key: map, Polling interval: 5000, Debug: 0, Validator enabled: 1, Schema path: , Tags: [flb_stdout], Init collect: 1, Init Max measurement: 0, Include measurement name: 1, Include measurement plugin name: 0, Remote keys: minio, Remote prefixes: , Include measurement plugin name: 0, Max measurement on condition: 0, If all condition: , If any condition: , If none condition:
[component_container_isolated-1] [INFO] [1677694796.689090088] [measurement_server]: Configuring map
[component_container_isolated-1] [INFO] [1677694796.690180661] [measurement_server]: Done configuring map
[component_container_isolated-1] [INFO] [1677694796.690522923] [measurement_server]: Looking for schema at /root/ws/install/dc_measurements/share/dc_measurements/plugins/measurements/json/map.json
[component_container_isolated-1] [INFO] [1677694796.690587274] [measurement_server]: schema: {"$defs":{"origin":{"description":"The 2-D pose of the lower-left pixel in the map, as (x, y, yaw), with yaw as counterclockwise rotation (yaw=0 means no rotation). Many parts of the system currently ignore yaw.","properties":{"x":{"description":"X origin of the robot","type":"number"},"y":{"description":"Y origin of the robot","type":"number"}},"type":"object"},"paths":{"properties":{"pgm":{"description":"Path to the map PGM file containing the gray-scale image","type":"string"},"yaml":{"description":"Path to the map YAML file containing map metadata","type":"string"}},"type":"object"}},"$schema":"http://json-schema.org/draft-07/schema#","description":"Map saved metadata and paths","properties":{"height":{"description":"Height of the PGM","minimum":0,"type":"integer"},"local_paths":{"description":"Paths where metadata and image are stored","items":{"$ref":"#/$defs/paths"},"type":"object"},"origin":{"description":"Robot origin position in meters","items":{"$ref":"#/$defs/origin"},"type":"object"},"remote_paths":{"additionalProperties":{"items":{"$ref":"#/$defs/paths"},"type":"object"},"description":"Dictionary of paths where metadata and image will be remotely stored","type":"object"},"resolution":{"description":"Resolution of the map, meters/pixel","minimum":0,"type":"number"},"width":{"description":"Width of the PGM","minimum":0,"type":"integer"}},"title":"Map","type":"object"}
[component_container_isolated-1] [INFO] [1677694796.693890952] [measurement_server]: Creating measurement plugin position: Type dc_measurements/Position, Group key: position, Polling interval: 1000, Debug: 0, Validator enabled: 1, Schema path: , Tags: [flb_stdout], Init collect: 1, Init Max measurement: 0, Include measurement name: 0, Include measurement plugin name: 0, Remote keys: , Remote prefixes: , Include measurement plugin name: 0, Max measurement on condition: 0, If all condition: , If any condition: , If none condition:
[component_container_isolated-1] [INFO] [1677694796.694797887] [measurement_server]: Configuring position
[component_container_isolated-1] [INFO] [1677694796.695799962] [measurement_server]: Done configuring position
[component_container_isolated-1] [INFO] [1677694796.696060904] [measurement_server]: Looking for schema at /root/ws/install/dc_measurements/share/dc_measurements/plugins/measurements/json/position.json
[component_container_isolated-1] [INFO] [1677694796.696114257] [measurement_server]: schema: {"$schema":"http://json-schema.org/draft-07/schema#","description":"Position and orientation of the robot","properties":{"x":{"description":"X position of the robot","type":"number"},"y":{"description":"Y position of the robot","type":"number"},"yaw":{"description":"Yaw angle of the robot","type":"number"}},"title":"Position","type":"object"}
[component_container_isolated-1] [INFO] [1677694796.699956883] [measurement_server]: Creating measurement plugin speed: Type dc_measurements/Speed, Group key: speed, Polling interval: 1000, Debug: 0, Validator enabled: 1, Schema path: , Tags: [flb_stdout], Init collect: 1, Init Max measurement: 0, Include measurement name: 0, Include measurement plugin name: 0, Remote keys: , Remote prefixes: , Include measurement plugin name: 0, Max measurement on condition: 0, If all condition: , If any condition: , If none condition:
[component_container_isolated-1] [INFO] [1677694796.701722429] [measurement_server]: Configuring speed
[component_container_isolated-1] [INFO] [1677694796.703057966] [measurement_server]: Done configuring speed
[component_container_isolated-1] [INFO] [1677694796.703357744] [measurement_server]: Looking for schema at /root/ws/install/dc_measurements/share/dc_measurements/plugins/measurements/json/speed.json
[component_container_isolated-1] [INFO] [1677694796.703413430] [measurement_server]: schema: {"$defs":{"vector3":{"properties":{"x":{"description":"X speed","type":"number"},"y":{"description":"Y speed","type":"number"},"z":{"description":"Z speed","type":"number"}},"type":"object"}},"$schema":"http://json-schema.org/draft-07/schema#","description":"Computed, linear and angular speed of the robot","properties":{"angular":{"description":"Angular velocity as a vector","items":{"$ref":"#/$defs/vector3"},"type":"object"},"computed":{"description":"Computed speed in meter/s","type":"number"},"linear":{"description":"Linear velocity as a vector","items":{"$ref":"#/$defs/vector3"},"type":"object"}},"title":"Speed","type":"object"}

Plugins are loaded one by one (here only one is, the uptime one) and configured. The configuration for each is displayed and the validation schema is also loaded and its path printed.

Then, the destination_server starts:

[component_container_isolated-1] [INFO] [1677694796.706019709] [lifecycle_manager_navigation]: Configuring destination_server
[component_container_isolated-1] [INFO] [1677694796.706162231] [destination_server]: Configuring
[component_container_isolated-1] [INFO] [1677694796.706907595] [destination_server]: Fluent Bit service initialized
[component_container_isolated-1] [INFO] [1677694796.707868171] [destination_server]: Creating destination plugin flb_stdout: Type dc_destinations/FlbStdout, Debug: 0, Time format: double. Time key: date
[component_container_isolated-1] [INFO] [1677694796.709554983] [destination_server]: Configuring Flb plugin flb_stdout
[component_container_isolated-1] [INFO] [1677694796.710263626] [destination_server]: Loaded lua filter. Match=ros2, code=function concatenate(tag, timestamp, record) if (type(record["tags"]) == "table") then record["tags"] = table.concat(record["tags"], ",") end  return 2, timestamp, record end
[component_container_isolated-1] [INFO] [1677694796.710309154] [destination_server]: Loaded rewrite_tag filter. Match=ros2, Rule=$tags .*(flb_stdout).* flb_stdout true
[component_container_isolated-1] [INFO] [1677694796.710382723] [destination_server]: Done configuring Flb plugin flb_stdout
[component_container_isolated-1] [INFO] [1677694796.710401937] [destination_server]: Loading input ros2 shared library /root/ws/install/fluent_bit_plugins/lib/flb-in_ros2.so...
[component_container_isolated-1] [INFO] [1677694796.711494520] [destination_server]: Loaded input ros2 shared library /root/ws/install/fluent_bit_plugins/lib/flb-in_ros2.so
[component_container_isolated-1] [INFO] [1677694796.711617719] [destination_server]: Flb ros2 plugin initialized. ret=0
[component_container_isolated-1] [INFO] [1677694796.711638189] [destination_server]: Starting Flb engine...
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] [fluent bit] version=2.0.7, commit=1ab360f79c, pid=79925
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] [storage] ver=1.3.0, type=memory+filesystem, sync=full, checksum=off, max_chunks_up=128
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] [storage] backlog input plugin: storage_backlog.1
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] [cmetrics] version=0.5.7
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] [ctraces ] version=0.2.5
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] [input:ros2:ros2.0] initializing
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] [input:ros2:ros2.0] storage_strategy='filesystem' (memory + filesystem)
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] Started node fluentbit_rclc
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] Created subscriber /dc/group/robot
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] Created subscriber /dc/measurement/map
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] [input:storage_backlog:storage_backlog.1] initializing
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] [input:storage_backlog:storage_backlog.1] storage_strategy='memory' (memory only)
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] [input:storage_backlog:storage_backlog.1] queue memory limit: 976.6K
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] [input:emitter:emitter_for_rewrite_tag.2] initializing
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] [input:emitter:emitter_for_rewrite_tag.2] storage_strategy='filesystem' (memory + filesystem)
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] [output:stdout:stdout.0] worker #0 started
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] [http_server] listen iface=0.0.0.0 tcp_port=2020
[component_container_isolated-1] [2023/03/01 18:19:56] [ info] [sp] stream processor started
[component_container_isolated-1] [INFO] [1677694796.763937083] [destination_server]: Started Flb engine

All the same as with previous demos:

It starts the FlbStdout destination plugin, filters used internally by Fluent to edit Timestamps and tags. It then loads the ROS 2 Fluent Bit shared library and initialize it with the topics we provided as parameter.

Afterward, Fluent Bit starts, it prints its version, storage strategy and buffer configuration and lists which plugins are loaded.

Then, the measurement and destination server nodes are activated:

[component_container_isolated-1] [INFO] [1677668906.745180268] [lifecycle_manager_navigation]: Activating measurement_server
[component_container_isolated-1] [INFO] [1677668906.745462265] [measurement_server]: Activating
[component_container_isolated-1] [INFO] [1677668906.745558665] [measurement_server]: Activating uptime
[component_container_isolated-1] [INFO] [1677668906.746014639] [measurement_server]: Creating bond (measurement_server) to lifecycle manager.
[component_container_isolated-1] [INFO] [1677668906.853199899] [lifecycle_manager_navigation]: Server measurement_server connected with bond.
[component_container_isolated-1] [INFO] [1677668906.853316226] [lifecycle_manager_navigation]: Activating destination_server
[component_container_isolated-1] [INFO] [1677668906.853620571] [destination_server]: Activating
[component_container_isolated-1] [INFO] [1677668906.853722659] [destination_server]: Activating flb_stdout
[component_container_isolated-1] [INFO] [1677668906.853796078] [destination_server]: Creating bond (destination_server) to lifecycle manager.
[component_container_isolated-1] [INFO] [1677668906.961666207] [lifecycle_manager_navigation]: Server destination_server connected with bond.
[component_container_isolated-1] [INFO] [1677668906.961777773] [lifecycle_manager_navigation]: Managed nodes are active
[component_container_isolated-1] [INFO] [1677668906.961820078] [lifecycle_manager_navigation]: Creating bond timer...

Finally, we see the data:

[component_container_isolated-1] [{"remote_paths":{"minio":{"pgm":"C3PO/2023/03/01/18/map/2023-03-01T18:19:56.pgm","yaml":"C3PO/2023/03/01/18/map/2023-03-01T18:19:56.yaml"}},"date":1677694796.765904,"height":384,"name":"map","origin":{"x":-10,"y":-10},"local_paths":{"pgm":"/root/dc_data/C3PO/2023/03/01/18/map/2023-03-01T18:19:56.pgm","yaml":"/root/dc_data/C3PO/2023/03/01/18/map/2023-03-01T18:19:56.yaml"},"resolution":0.05000000074505806,"width":384,"id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"240"}]
[component_container_isolated-1] [{"cmd_vel":{"linear":{"x":0.246316,"y":0,"z":0},"angular":{"x":0,"y":0,"z":0.263158}},"date":1677694799.685468,"position":{"x":-0.6033772358727438,"yaw":-0.7146495585355921,"y":-1.633862970534114},"speed":{"angular":{"x":-0.0002542699063698451},"computed":3.279051750818494e-05,"linear":{"x":3.244397182637543e-05,"y":4.754653571390878e-06,"z":0}},"name":"robot","id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"240"}]
[component_container_isolated-1] [{"cmd_vel":{"linear":{"x":0.246316,"y":0,"z":0},"angular":{"x":0,"y":0,"z":0.263158}},"date":1677694800.685094,"position":{"x":-0.5307920077356227,"yaw":-0.645872441707418,"y":-1.693502983783379},"speed":{"angular":{"x":0.2127361022319145},"computed":0.2459393937023935,"linear":{"x":0.2459393844826374,"y":-6.734242603700924e-05,"z":0}},"name":"robot","id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"240"}]
[component_container_isolated-1] [{"cmd_vel":{"linear":{"x":0.26,"y":0,"z":0},"angular":{"x":0,"y":0,"z":0.368421}},"date":1677694801.685017,"position":{"x":-0.3195900881047479,"yaw":-0.3944485529461182,"y":-1.813946939716471},"speed":{"angular":{"x":0.2623361481938653},"computed":0.24584231041888,"linear":{"x":0.2458423037516516,"y":-5.725533732485466e-05,"z":0}},"name":"robot","id":"be781e5ffb1e7ee4f817fe7b63e92c32","robot_name":"C3PO","run_id":"240"}]

So...what happened?

  1. The Nav2 turtlebot3 simulation starts, a robot is able to localize and move (once you use the 2-D pose estimate on RViz)
  2. The measurement plugins start publishing data to /dc/measurement/map, /dc/measurement/cmd_vel, /dc/measurement/position and /dc/measurement/speed, which contain the JSONs, tags and timestamp of the message
  3. In parallel, each time the map plugin sends a ROS message, it also saves the files on the filesystem. Open a file browser to the path you set in the configuration to a path mentioned in the map JSON
  4. The "robot" group node subscribes to /dc/measurement/cmd_vel, /dc/measurement/position and /dc/measurement/speed and publish on /dc/group/robot when it collects data from all 3 topics
  5. Run ID and robot_name is appended in the JSON of each
  6. The ROS 2 Fluent Bit plugin, which subscribes to the topics, receive the data and pass it on to Fluent Bit
  7. Fluent Bit receives it, applies the timestamp filter (which modifies the timestamp to the desired format)
  8. Fluent Bit applies changes the tag with another filter. This tag is used to match to the destination afterward.
  9. Data is flushed
  10. The Fluent Bit stdout output plugin receives the data because tags match (the flb_stdout tag is used) and forwards it to Stdout

Turtlebot3

In this example, we add a robot and start collecting robot data to Stdout.

You will also need 3 terminal windows, to:

  1. Run the Nav2 turtlebot3 launchfile: it starts localization, navigation and RViz
  2. Run navigation inspection demo
  3. Run DC

Using a different terminal window for DC helps reading its information.

Packages in the workspace

In addition to the ros2_data_collection repo in your workspace, you will need to download the aws warehouse package:

cd src
git clone https://github.com/aws-robotics/aws-robomaker-small-warehouse-world.git -b ros2
cd ..
colcon build

Setup the environment

Python dependencies

For this tutorial, we will need to install all dependencies:

pip3 install -r requirements.txt -r requirements-dev.txt

Setup Infrastructure

MinIO

MinIO will be used as storage for images and other files. To start it, follow the steps

Once you have your keys, edit the yaml configuration file at dc_demos/params/tb3_simulation_pgsql_minio.yaml by editing all instances of:

  1. access_key_id
  2. secret_access_key

PostgreSQL

PostgreSQL will be used as database storage for our JSON. Later on, backend engineers can make requests on those JSON based on measurement requested and time range. To start it, follow the steps

The default yaml configuration file does not need change as it also uses default values.

Setup simulation environment

In the terminal 1, source your environment, setup turtlebot configuration:

source /opt/ros/humble/setup.bash
source install/setup.bash
export GAZEBO_MODEL_PATH=$GAZEBO_MODEL_PATH:/opt/ros/humble/share/turtlebot3_gazebo/models
export GAZEBO_RESOURCE_PATH=${PWD}/src/aws-robomaker-small-warehouse-world/
export TURTLEBOT3_MODEL=waffle
source /usr/share/gazebo/setup.bash

Verify the gazebo world can be loaded properly:

gazebo /opt/ros/humble/share/aws_robomaker_small_warehouse_world/worlds/small_warehouse/small_warehouse.world

Gazebo will start with the warehouse environment. You can close it now.

Info

I believe requiring the source along with those export are needed because of this issue

Terminal 1: Start Navigation

Then, in the same terminal (1), start the Turtlebot launchfile:

ros2 launch nav2_bringup tb3_simulation_launch.py \
    world:=/opt/ros/humble/share/aws_robomaker_small_warehouse_world/worlds/no_roof_small_warehouse/no_roof_small_warehouse.world \
    map:=/opt/ros/humble/share/aws_robomaker_small_warehouse_world/maps/005/map.yaml \
    headless:=False \
    x_pose:=3.45 \
    y_pose:=2.15 \
    yaw:=3.14

RViz and Gazebo will start: now you see the robot in Gazebo, and the map on RViz.

RViz Gazebo

Terminal 2: Start DC

Run colcon build to compile the workspace:

colcon build

Now, start the demo:

ros2 launch dc_demos tb3_simulation_pgsql_minio.launch.py

The robot will start collecting data.

Terminal 3: Start autonomous navigation

Execute

ros2 run nav2_simple_commander demo_security

The robot will start moving and you will be able to see all visualizations activated in RViz:

RViz-moving

Visualize the data

In the database

Navigate to localhost:8080

  1. Select dc database
  2. Select dc table
  3. Click on Select data

You will see rows filling the database. You can click on one to see its content:

Adminer-gif

In MinIO (file storage)

Navigate to localhost:9001

  1. Select mybucket bucket
  2. Click until you reach camera or map
  3. Continue until you reached a file, and click on preview (pgm can't be previewed)

You will see rows filling the database. You can click on one to see its content:

Adminer-gif

With Streamlit dashboard

A dashboard has been developed to help visualize the data collected.

Warning

Make sure you have install python dependencies with pip!

Execute in a new terminal:

cd src/ros2_data_collection/dc_demos/dc_demos/streamlit_dashboard/
# Replace by what was created at the start of this demo
export MINIO_ACCESS_KEY=<YOUR_ACCESS_KEY>
# Replace by what was created at the start of this demo
export MINIO_SECRET_KEY=<YOUR_SECRET_KEY>
streamlit run Streamlit_ROS_2.py

# This will be printed:

  You can now view your Streamlit app in your browser.

  Local URL: http://localhost:8502
  Network URL: http://192.168.2.105:8502

Reach the address on your browser: localhost:8502. Here is a video of it:

That's it! Now you can collect your data!

Understanding the configuration

Info

The full configuration file can be found here.

Measurement server

Measurements

measurement_plugins sets which plugin to load. We collect

System measurements:

  1. CPU
  2. OS
  3. Memory
  4. Uptime

Robot measurements:

  1. Camera images
  2. Command velocities
  3. Distance traveled
  4. Positions
  5. Speed

Environment measurements:

  1. Map

Infrastructure measurements:

  1. MinIO API health
  2. MinIO Dashboard health
  3. PostgreSQL health

Each has their own configuration: polling interval, source topic, destination paths, topics used as input etc. Going through each of them would be too long here but you can check for each measurement its documentation and the general documentation of measurements

Conditions

We also initialize conditions:

  1. min_distance_traveled
  2. max_distance_traveled

They are used in the distance traveled measurement to only take values in a certain range.

Destination server

Here we enable the PostgreSQL and MinIO plugins since it is where we send the data.

PostgreSQL destination

We pass all topics generated by measurements. The node will subscribe to it and transfer the data to PostgreSQL. Note that not all data need to be sent to PostgreSQL, some could be sent elsewhere.

Along with the inputs, we pass the server configuration: username, password, database name etc.

MinIO destination

We pass the topics we want to subscribe to. Here only map and camera since those are the ones generating files.

The src_fields and upload_fields are fields in the JSON sent by the measurement where local and remote paths are stored so the plugin knows where to put the files.

Turtlebot3

In this example, we add a robot and start collecting robot data to Stdout.

You will also need 3 terminal windows, to:

  1. Run the Nav2 turtlebot3 launchfile: it starts localization, navigation and RViz
  2. Run navigation inspection demo
  3. Run DC

Using a different terminal window for DC helps reading its information.

Packages in the workspace

In addition to the ros2_data_collection repo in your workspace, you will need to download the aws warehouse package:

cd src
git clone https://github.com/aws-robotics/aws-robomaker-small-warehouse-world.git -b ros2
cd ..
colcon build

Setup the environment

Python dependencies

For this tutorial, we will need to install all dependencies:

pip3 install -r requirements.txt -r requirements-dev.txt

Setup the infrastructure

InfluxDB

InfluxDB will be used to store our data and timestamps. Later on, backend engineers can make requests on those JSON based on measurement requested and time range. To start it, follow the steps

Grafana

Grafana will be used to display the data as timeseries or statistics.

Setup simulation environment

In the terminal 1, source your environment, setup turtlebot configuration:

source /opt/ros/humble/setup.bash
source install/setup.bash
export GAZEBO_MODEL_PATH=$GAZEBO_MODEL_PATH:/opt/ros/humble/share/turtlebot3_gazebo/models
export GAZEBO_RESOURCE_PATH=${PWD}/src/aws-robomaker-small-warehouse-world/
export TURTLEBOT3_MODEL=waffle
source /usr/share/gazebo/setup.bash

Verify the gazebo world can be loaded properly:

gazebo /opt/ros/humble/share/aws_robomaker_small_warehouse_world/worlds/no_roof_small_warehouse/no_roof_small_warehouse.world

Gazebo will start with the warehouse environment. You can close it now.

Info

I believe requiring the source along with those export are needed because of this issue

Terminal 1: Start Navigation

Then, in the same terminal (1), start the Turtlebot launchfile:

ros2 launch nav2_bringup tb3_simulation_launch.py \
    world:=/opt/ros/humble/share/aws_robomaker_small_warehouse_world/worlds/no_roof_small_warehouse/no_roof_small_warehouse.world \
    map:=/opt/ros/humble/share/aws_robomaker_small_warehouse_world/maps/005/map.yaml \
    headless:=False \
    x_pose:=3.45 \
    y_pose:=2.15 \
    yaw:=3.14

RViz and Gazebo will start: now you see the robot in Gazebo, and the map on RViz.

RViz Gazebo

Terminal 2: Start DC

Run colcon build to compile the workspace:

colcon build

Now, start the demo:

ros2 launch dc_demos tb3_simulation_influxdb.launch.py

The robot will start collecting data.

Terminal 3: Start autonomous navigation

Execute

ros2 run nav2_simple_commander demo_security

The robot will start moving and you will be able to see all visualizations activated in RViz:

RViz-moving

Visualize the data

With Grafana Dashboard

Open your browser at http://localhost:3000 and login with admin/admin

Grafana-1

Understanding the configuration

Info

The full configuration file can be found here.

Measurement server

Measurements

measurement_plugins sets which plugin to load. We collect

System measurements:

  1. CPU
  2. OS
  3. Memory
  4. Uptime

Robot measurements:

  1. Camera images
  2. Command velocities
  3. Distance traveled
  4. Positions
  5. Speed

Environment measurements:

  1. Map

Infrastructure measurements:

  1. InfluxDB health

Each has their own configuration: polling interval, source topic, destination paths, topics used as input etc. Going through each of them would be too long here but you can check for each measurement its documentation and the general documentation of measurements

Note that all measurements have the nested and flattened parameter set to True, this will transform each measurement to conform to how InfluxDB stores data. See nested and flattened in the measurements page.

Note also that all images (map and camera images) are stored as base64 strings because Grafana, the frontend at the end will only be able to access strings from the database.

Conditions

We also initialize conditions:

  1. min_distance_traveled
  2. max_distance_traveled

They are used in the distance traveled measurement to only take values in a certain range.

Destination server

Here we only enable the InfluxDB plugins since it is where we send the data.

At the end, we want to display the data on Grafana. Grafana can only display content stored on the connected database. This also includes images, which is why they are stored as base64 strings.

InfluxDB destination

We pass all topics generated by measurements. The node will subscribe to it and transfer the data to InfluxDB.

Along with the inputs, we pass the server credentials.

QRCodes

In this example, we add a robot and start collecting robot data to PostgreSQL and maps and scanned QRcodes to MinIO as image files.

You will also need 2 terminal windows, to:

  1. Run the Nav2 turtlebot3 launchfile: it starts localization, navigation and RViz
  2. Run DC

Since RViz is pretty verbose, using 3 terminal windows will help reading the JSON printed on the terminal window.

TerminalDescription
RVizStart it independently because of its verbosity
Nav2Localization and navigation
DCData collection

Setup MinIO and PostgreSQL

MinIO

MinIO will be used as storage for images and other files. To start it, follow the steps

PostgreSQL

PostgreSQL will be used as database storage for our JSON. Later on, backend engineers can make requests on those JSON based on measurement requested and time range. To start it, follow the steps

Setup the ROS environment

In each, terminal, source your environment and setup turtlebot configuration:

source /opt/ros/humble/setup.bash
source install/setup.bash
export GAZEBO_MODEL_PATH=$GAZEBO_MODEL_PATH:/opt/ros/humble/share/turtlebot3_gazebo/models
export GAZEBO_MODEL_PATH=$GAZEBO_MODEL_PATH:/opt/ros/humble/share/aws_robomaker_small_warehouse_world/models/
export TURTLEBOT3_MODEL=waffle

Start RVIZ

ros2 run rviz2 rviz2 -d ${PWD}/install/dc_simulation/share/dc_simulation/rviz/qrcodes.rviz

Start Navigation

Then, start the Turtlebot launchfile with custom parameters to start the QRcode world:

ros2 launch nav2_bringup tb3_simulation_launch.py \
    headless:=False \
    x_pose:="-16.679400" \
    y_pose:="-15.300200" \
    z_pose:="0.01" \
    roll:="0.00" \
    pitch:="0.00" \
    yaw:="1.570796" \
    robot_sdf:="${PWD}/install/dc_simulation/share/dc_simulation/worlds/waffle.model" \
    map:="${PWD}/install/dc_simulation/share/dc_simulation/maps/qrcodes.yaml" \
    world:="${PWD}/install/dc_simulation/share/dc_simulation/worlds/qrcodes.world" \
    nav2_params_file:="${PWD}/install/dc_simulation/share/dc_demos/params/qrcodes_nav.yaml" \
    dc_params_file:="${PWD}/install/dc_simulation/share/dc_demos/params/qrcodes_minio_pgsql.yaml" \
    rviz_config_file:="${PWD}/install/dc_simulation/share/dc_simulation/rviz/qrcodes.rviz" \
    use_rviz:=False

RViz and Gazebo will start: you should now see the robot in Gazebo, and the map on RViz.

Set the robot position using the "2D Pose Estimate" button.

Start DC

Execute

ros2 launch dc_demos tb3_qrcodes_minio_pgsql.launch.py

With this, all data will be transmitted

Understanding the configuration

Info

The full configuration file can be found here.

For this demo, we will reconstruct the yaml configuration element by element, given how large it is. Go through the explanation to understand how it works.

Collect command velocity, position and speed to PostgreSQL as a group

Similarly to the previous tutorial:

destination_server:
  ros__parameters:
    flb:
      flush: 1
      flb_grace: 1
      log_level: "info"
      storage_path: "/var/log/flb-storage/"
      storage_sync: "full"
      storage_checksum: "off"
      storage_backlog_mem_limit: "1M"
      scheduler_cap: 200
      scheduler_base: 5
      http_server: true
      http_listen: "0.0.0.0"
      http_port: 2020
      in_storage_type: "filesystem"
      in_storage_pause_on_chunks_overlimit: "off"
    destination_plugins: ["flb_pgsql"]
    flb_pgsql:
      plugin: "dc_destinations/FlbPgSQL"
      inputs: ["/dc/group/robot"]
      host: "127.0.0.1"
      port: 5432
      user: fluentbit
      password: password
      database: "fluentbit"
      table: "dc"
      timestamp_key: "date"
      async: false
      time_format: "double"
      time_key: "date"

group_server:
  ros__parameters:
    groups: ["robot"]
    robot:
      inputs:
        [
          "/dc/measurement/cmd_vel",
          "/dc/measurement/position",
          "/dc/measurement/speed",
        ]
      output: "/dc/group/robot"
      sync_delay: 5.0
      group_key: "robot"
      tags: ["flb_pgsql"]

measurement_server:
  ros__parameters:
    custom_str_params: ["robot_name"]
    robot_name: "C3PO"
    measurement_plugins: ["cmd_vel", "position", "speed"]
    custom_str_params_list: ["robot_name", "id"]
    custom_str_params:
      robot_name:
        name: robot_name
        value: "C3PO"
      # Requires systemd package
      id:
        name: id
        value_from_file: /etc/machine-id
    run_id:
      enabled: true
      counter: true
      counter_path: "$HOME/run_id"
      uuid: false
    moving:
      plugin: "dc_conditions/Moving"
    cmd_vel:
      plugin: "dc_measurements/CmdVel"
      group_key: "cmd_vel"
      enable_validator: true
      topic_output: "/dc/measurement/cmd_vel"
      include_measurement_name: true
    position:
      plugin: "dc_measurements/Position"
      group_key: "position"
      topic_output: "/dc/measurement/position"
      enable_validator: true
      global_frame: "map"
      robot_base_frame: "base_link"
      transform_timeout: 0.1
      include_measurement_name: true
    speed:
      plugin: "dc_measurements/Speed"
      group_key: "speed"
      odom_topic: "/odom"
      topic_output: "/dc/measurement/speed"
      include_measurement_name: true

In the measurement server, we set 3 measurements: cmd_vel, position and speed being collected once per second, are validated with their respective JSON schemas and publish on their own topics.

Note the include_measurement_name which include measurement name in the JSON, which is used when grouping. The group collects the data from those 3 measurement and republishes it on the group topic /dc/group/robot.

In the destination server, we enable the PostgreSQL plugin and configure its credentials.

Warning

Be sure to change the login and password to your current infrastructure configuration. Do it in production setup!

You can find more about the PostgreSQL plugin here

To take a look at records, go to Adminer. It is by default started at http://localhost:8080, it is a database GUI.:

Adminer

You can then click on a record, to take a look, edit or delete it:

Adminer

Send the map image and YAML from nav2_map_server to MinIO

First, we add the map measurement. We use 3 plugins here: flb_pgsql to send the data to PostgreSQL on the dc database, then flb_minio to send some files to MinIO and flb_files_metrics which will be used to autodelete the files once they reach their destination.

measurement_server:
  ros__parameters:
  ...
  measurement_plugins: ["map"]
  map:
    plugin: "dc_measurements/Map"
    group_key: "map"
    polling_interval: 5000
    save_path: "map/%Y-%m-%dT%H:%M:%S"
    topic_output: "/dc/measurement/map"
    save_map_timeout: 4.0
    remote_prefixes: [""]
    remote_keys: ["minio"]
    enable_validator: true
    tags: ["flb_pgsql", "flb_minio", "flb_files_metrics"]
    include_measurement_name: true
  ...

Then we add the destinations:

destination_server:
  ros__parameters:
    ...
    destination_plugins: ["flb_files_metrics", "flb_pgsql", "flb_minio"]
    flb_files_metrics:
      plugin: "dc_destinations/FlbFilesMetrics"
      inputs: ["/dc/measurement/map"]
      file_storage: ["minio"]
      db_type: "pgsql"
      delete_when_sent: true
      minio:
        endpoint: 127.0.0.1:9000
        access_key_id: rQXPf1f730Yuu2yW # Change it
        secret_access_key: TYYkjN5L4gqDgCGLzQahHDcvqL4WNTcb # Change it
        use_ssl: false
        bucket: "mybucket"
        src_fields: ["local_paths.pgm", "local_paths.yaml"]
        upload_fields: ["remote_paths.minio.pgm", "remote_paths.minio.yaml"]
        input_names: ["map", "map"]
      pgsql:
        host: "127.0.0.1"
        port: "5432"
        user: fluentbit # Change it
        password: password # Change it
        database: "fluentbit"
        table: "files_metrics"
        timestamp_key: "date"
        time_format: "double"
        time_key: "date"
        ssl: false
    flb_minio:
      verbose_plugin: false
      time_format: "iso8601"
      plugin: "dc_destinations/FlbMinIO"
      inputs: ["/dc/measurement/map"]
      endpoint: 127.0.0.1:9000
      access_key_id: rQXPf1f730Yuu2yW # Change it
      secret_access_key: TYYkjN5L4gqDgCGLzQahHDcvqL4WNTcb # Change it
      use_ssl: false
      create_bucket: true
      bucket: "mybucket" # Change it
      src_fields: ["local_paths.pgm", "local_paths.yaml"]
      upload_fields: ["remote_paths.minio.pgm", "remote_paths.minio.yaml"]
    flb_pgsql:
      plugin: "dc_destinations/FlbPgSQL"
      inputs: ["/dc/group/map"]
      host: "127.0.0.1"
      port: 5432
      user: fluentbit
      password: password
      database: "fluentbit"
      table: "dc"
      timestamp_key: "date"
      async: false
      time_format: "double"
      time_key: "date"

This will save the map pgm and yaml every 5 seconds to MinIO. Head to the MinIO GUI, which is by default located at http://localhost:9001. Default user/password are minioadmin/minioadmin.

MapMinIO

It will also save the map metadata on PostgreSQL, which you can later use to know the map location on MinIO and its dimensions for example:

MapPostgreSQL

We also transmit the metadata, to be fetched later on by the backend (map dimension and path). It is set in the flb_pgsql measurement plugin.

The flb_metrics plugin is also enabled. We use it to track if the file is on the filesystem and also delete it once it reaches its destination, here MinIO. This plugins deletes file when they are sent with the delete_when_sent parameter. More about the plugin here

Then, similarly, on Adminer, you can check the data is uploaded and deleted:

MapAdminerFilesMetrics

Send QR code images to MinIO

We want to collect pictures taken by the cameras

measurement_server:
  ros__parameters:
  ...
  measurement_plugins: ["right_camera", "left_camera"]
  condition_plugins: ["moving", "inspected_exists"]
  custom_str_params_list: ["robot_name", "id"]
  custom_str_params:
    robot_name:
      name: robot_name
      value: "C3PO"
    # Requires systemd package
    id:
      name: id
      value_from_file: /etc/machine-id
  run_id:
    enabled: true
    counter: true
    counter_path: "$HOME/run_id"
    uuid: false
  destinations:
    minio:
      bucket: mybucket
  moving:
    plugin: "dc_conditions/Moving"
  inspected_exists:
    plugin: "dc_conditions/Exist"
    key: "inspected"
  right_camera:
    plugin: "dc_measurements/Camera"
    group_key: "right_camera"
    if_none_conditions: ["moving"]
    if_all_conditions: ["inspected_exists"]
    topic_output: "/dc/measurement/right_camera"
    init_collect: false
    init_max_measurements: -1
    condition_max_measurements: 1
    node_name: "dc_measurement_camera"
    cam_topic: "/right_intel_realsense_r200_depth/image_raw"
    cam_name: right_camera
    enable_validator: true
    draw_det_barcodes: true
    save_raw_img: false
    save_rotated_img: false
    save_detections_img: true
    save_inspected_path: "right_camera/inspected/%Y-%m-%dT%H-%M-%S"
    rotation_angle: 0
    detection_modules: ["barcode"]
    remote_prefixes: [""]
    remote_keys: ["minio"]
    tags: ["flb_pgsql", "flb_minio", "flb_files_metrics"]
    include_measurement_name: true
  left_camera:
    plugin: "dc_measurements/Camera"
    group_key: "left_camera"
    if_none_conditions: ["moving"]
    if_all_conditions: ["inspected_exists"]
    topic_output: "/dc/measurement/left_camera"
    init_collect: true
    init_max_measurements: -1
    condition_max_measurements: 1
    node_name: "dc_measurement_camera"
    cam_topic: "/left_intel_realsense_r200_depth/image_raw"
    cam_name: left_camera
    enable_validator: true
    draw_det_barcodes: true
    save_raw_img: false
    save_rotated_img: false
    save_detections_img: true
    save_inspected_path: "left_camera/inspected/%Y-%m-%dT%H-%M-%S"
    rotation_angle: 0
    detection_modules: ["barcode"]
    remote_prefixes: [""]
    remote_keys: ["minio"]
    tags: ["flb_pgsql", "flb_minio", "flb_files_metrics"]
    include_measurement_name: true
  ...

Taking a look at the cameras, we can understand that:

  1. Data is only collected when the robot is not moving: if_none_conditions: ["moving"]
  2. Data is only collected when there is inspected data, so only when a QR code is detected: if_all_conditions: ["inspected_exists"]
  3. Data is not collected constantly: init_max_measurements: -1
  4. Only one record is collected when conditions are triggered: condition_max_measurements: 1
  5. Only images with inspected data are collected:
    1. save_raw_img: false
    2. save_rotated_img: false
    3. save_detections_img: true
  6. Barcodes are scanned in each image: detection_modules: ["barcode"]

For the files-metrics, it is very important to have include_measurement_name since it relies on it to know in which field needs the paths are.

Then we add the destination:

destination_server:
  ros__parameters:
  ...
    destination_plugins: ["flb_minio", "flb_pgsql", "flb_files_metrics"]
    flb_files_metrics:
      plugin: "dc_destinations/FlbFilesMetrics"
      inputs:
        [
          "/dc/measurement/right_camera",
          "/dc/measurement/left_camera",
        ]
      file_storage: ["minio"]
      db_type: "pgsql"
      delete_when_sent: true
      minio:
        endpoint: 127.0.0.1:9000
        access_key_id: rQXPf1f730Yuu2yW
        secret_access_key: TYYkjN5L4gqDgCGLzQahHDcvqL4WNTcb
        use_ssl: false
        bucket: "mybucket"
        src_fields:
          [
            "local_paths.inspected",
            "local_paths.inspected"
          ]
        upload_fields:
          [
            "remote_paths.minio.inspected",
            "remote_paths.minio.inspected"
          ]
      pgsql:
        host: "127.0.0.1"
        port: "5432"
        user: fluentbit
        password: password
        database: "fluentbit"
        table: "files_metrics"
        timestamp_key: "date"
        time_format: "double"
        time_key: "date"
        ssl: false
    flb_minio:
      verbose_plugin: false
      time_format: "iso8601"
      plugin: "dc_destinations/FlbMinIO"
      inputs: ["/dc/measurement/right_camera", "/dc/measurement/left_camera"]
      endpoint: 127.0.0.1:9000
      access_key_id: rQXPf1f730Yuu2yW
      secret_access_key: TYYkjN5L4gqDgCGLzQahHDcvqL4WNTcb
      use_ssl: false
      bucket: "mybucket"
      src_fields:
        [
          "local_paths.inspected",
          "local_paths.inspected"
        ]
      upload_fields:
        [
          "remote_paths.minio.inspected",
          "remote_paths.minio.inspected"
        ]
    flb_pgsql:
      plugin: "dc_destinations/FlbPgSQL"
      inputs:
        [
          "/dc/measurement/right_camera",
          "/dc/measurement/left_camera",
        ]
      host: "127.0.0.1"
      port: 5432
      user: fluentbit
      password: password
      database: "fluentbit"
      table: "dc"
      timestamp_key: "date"
      async: false
      time_format: "double"
      time_key: "date"

Here, we collect images with the MinIO plugin, also send metadata to PostreSQL.

In addition, flb_files_metrics tracks when the files are sent to MinIO and deletes them when it is done. Note that multiple destinations can be configured for each field. Check the plugin documentation to see which remote destinations are supported.

Custom uptime to Stdout

In this demo, we will go through a new use case. You want to create your own measurement or use an existing measurement provided by dc_measurements but not exactly, you want to add a field and also modify the JSON schema. That is what we will do here: create a new plugin, inside dc_demos, another package, create a plugin and load it

We are going to take the uptime measurement, change it slightly and collect the data.

To test it, run:

ros2 launch dc_demos uptime_custom_stdout.launch.py

JSON schema

Located in dc_demos/plugins/measurements/json/uptime_custom.json:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Uptime Custom",
  "description": "Time the system has been up. Intentionally failing to demonstrate customization and callback",
  "properties": {
    "time": {
      "description": "Time the system has been up",
      "type": "integer",
      "maximum": 0
    }
  },
  "type": "object"
}

It is almost the same as the standard uptime, but for the sake of the example, we will set the maximum value to 0, which will certainly make the validation fail!

CPP code

First, we create a hpp file in dc_demos/include/dc_demos/plugins/measurements:

#ifndef DC_DEMOS__PLUGINS__MEASUREMENTS__UPTIME_CUSTOM_HPP_
#define DC_DEMOS__PLUGINS__MEASUREMENTS__UPTIME_CUSTOM_HPP_

#include <nlohmann/json.hpp>

#include "dc_measurements/measurement.hpp"
#include "dc_measurements/plugins/measurements/uptime.hpp"

namespace dc_demos
{
using json = nlohmann::json;

class UptimeCustom : public dc_measurements::Uptime
{
protected:
  void onFailedValidation(json data_json) override;
  void setValidationSchema() override;
};

}  // namespace dc_demos

#endif  // DC_DEMOS__PLUGINS__MEASUREMENTS__UPTIME_CUSTOM_HPP_

We create a new class UptimeCustom, which inherits from dc_measurements::Uptime.

Info

If we wanted to start a new measurement from scratch, it would inherit from dc_core::Measurement.

The method onFailedValidation is not mandatory but it is here to show it is possible to trigger a custom function when the validation fails.

Then the cpp code, currently located in dc_demos/plugins/measurements/uptime_custom.cpp:

#include "dc_demos/plugins/measurements/uptime_custom.hpp"

namespace dc_demos
{

void UptimeCustom::onFailedValidation(json data_json)
{
  (void)data_json;
  RCLCPP_INFO(logger_, "Callback! Validation failed for uptime custom");
}

void UptimeCustom::setValidationSchema()
{
  if (enable_validator_)
  {
    validateSchema("dc_demos", "uptime_custom.json");
  }
}

}  // namespace dc_demos

#include "pluginlib/class_list_macros.hpp"
PLUGINLIB_EXPORT_CLASS(dc_demos::UptimeCustom, dc_core::Measurement)

We include the uptime_custom file header. Then, define the onFailedValidation function (triggered when validation fails) and the setValidationSchema function which sets the json with the new json schema

Info

Do not forget to include the pluginlib statements at the end to export the plugin class.

Plugin file

Create the xml file, here will be measurement_plugin.xml for us, at the source of the package. It defines the plugins of the package.

<class_libraries>
    <library path="dc_uptime_custom_measurement">
        <class name="dc_demos/UptimeCustom" type="dc_demos::UptimeCustom" base_class_type="dc_core::Measurement">
            <description>
                dc_measurement_uptime_custom
            </description>
        </class>
    </library>
</class_libraries>

CMakeLists.txt

Now that you have all files set up, you can add the build process to the CMakeLists.txt:

# Measurement plugins
add_library(dc_uptime_custom_measurement SHARED
  plugins/measurements/uptime_custom.cpp
)
list(APPEND dc_measurement_plugin_libs dc_uptime_custom_measurement)

foreach(measurement_plugin ${dc_measurement_plugin_libs})
  ament_target_dependencies(${measurement_plugin} ${dependencies})
  target_link_libraries(
    ${measurement_plugin}
  )
  target_compile_definitions(${measurement_plugin} PRIVATE BT_PLUGIN_EXPORT)
endforeach()

pluginlib_export_plugin_description_file(dc_core measurement_plugin.xml)

install(FILES measurement_plugin.xml
  DESTINATION share/${PROJECT_NAME}
)

install(DIRECTORY plugins/measurements/
  DESTINATION share/${PROJECT_NAME}/plugins/measurements/
)

It creates the library, installs and exports it.

Console output

In the measurement server log, the plugin is detected properly

[component_container_isolated-1] [INFO] [1677715629.547448938] [measurement_server]: Creating measurement plugin uptime_custom: Type dc_demos/UptimeCustom, Group key: uptime, Polling interval: 5000, Debug: 0, Validator enabled: 1, Schema path: , Tags: [flb_stdout], Init collect: 1, Init Max measurement: 0, Include measurement name: 0, Include measurement plugin name: 0, Remote keys: , Remote prefixes: , Include measurement plugin name: 0, Max measurement on condition: 0, If all condition: , If any condition: , If none condition:
...
[component_container_isolated-1] [INFO] [1677715629.550523128] [measurement_server]: schema: {"$schema":"http://json-schema.org/draft-07/schema#","description":"Time the system has been up. Intentionally failing to demonstrate customization and callback","properties":{"time":{"description":"Time the system has been up","maximum":0,"type":"integer"}},"title":"Uptime Custom","type":"object"}

Then, it fails as expected:

[component_container_isolated-1] [ERROR] [1677715634.550778659] [measurement_server]: Validation failed: At /time of 139123 - instance exceeds maximum of 0
[component_container_isolated-1] data={"tags":["flb_stdout"],"time":139123}
[component_container_isolated-1] [INFO] [1677715634.550911115] [measurement_server]: Callback! Validation failed for uptime custom
[component_container_isolated-1] [ERROR] [1677715639.550754853] [measurement_server]: Validation failed: At /time of 139128 - instance exceeds maximum of 0
[component_container_isolated-1] data={"tags":["flb_stdout"],"time":139128}
[component_container_isolated-1] [INFO] [1677715639.550867662] [measurement_server]: Callback! Validation failed for uptime custom

Concepts

There are a few key concepts that are really important to understand how DC operates.

ROS 2

ROS 2 is the core middleware used for DC. If you are unfamilar with this, please visit the ROS 2 documentation before continuing.

Fluent Bit

Fluent Bit is used in the backend for most plugins in DC. If you are unfamilar with this, please visit the Fluent Bit documentation

The DC destination ROS node starts it using the fluent bit C api and thus DC directly gets all benefits from it.

Fluent Bit configuration has been wrapped in this node and all its configuration parameters can be passed from the YAML configuration file.

Lifecycle Nodes and Bond

(Source: Nav2 documentation)

Lifecycle (or Managed, more correctly) nodes are unique to ROS 2. More information can be found here. They are nodes that contain state machine transitions for bringup and teardown of ROS 2 servers. This helps in deterministic behavior of ROS systems in startup and shutdown. It also helps users structure their programs in reasonable ways for commercial uses and debugging.

When a node is started, it is in the unconfigured state, only processing the node’s constructor which should not contain any ROS networking setup or parameter reading. By the launch system, or the supplied lifecycle manager, the nodes need to be transitioned to inactive by configuring. After, it is possible to activate the node by transitioning through the activating stage.

This state will allow the node to process information and be fully setup to run. The configuration stage, triggering the on_configure() method, will setup all parameters, ROS networking interfaces, and for safety systems, all dynamically allocated memory. The activation stage, triggering the on_activate() method, will active the ROS networking interfaces and set any states in the program to start processing information.

To shutdown, we transition into deactivating, cleaning up, shutting down and end in the finalized state. The networking interfaces are deactivated and stop processing, deallocate memory, exit cleanly, in those stages, respectively.

The lifecycle node framework is used extensively through out this project and all servers utilize it. It is best convention for all ROS systems to use lifecycle nodes if it is possible.

Within DC, we use a wrapper of LifecycleNodes, nav2_util LifecycleNode from Nav2. This wrapper wraps much of the complexities of LifecycleNodes for typical applications. It also includes a bond connection for the lifecycle manager to ensure that after a server transitions up, it also remains active. If a server crashes, it lets the lifecycle manager know and transition down the system to prevent a critical failure. See Eloquent to Foxy for details.

Measurements

Measurements are a single data unit presented in JSON format, that can contain different fields. For example, Memory measurement:

{
    "date": "2022-12-04T14:16:06.810999008",
    "memory": {
        "used": 76.007431
    },
}

Every incoming piece of data that belongs to a log or a metric that is retrieved by DC is considered an Event or Record.

Internally, when using Fluent Bit based plugins, it will contain 2 components: its timestamp and its message. For us, in DC, it will always be a JSON string sent in a ROS message of StringStamped type. This ROS message contains:

  1. header: ROS timestamp as std_msgs/Header
  2. data: JSON message as string
  3. group_key: a string used as a key for the new message when grouping multiple messages together

Tag(s)

Every measurement requires to have at least a tag configured (via the tags parameter) so it is sent to its destination(s). This tag corresponds to the name of the plugin you defined in the same configuration. It is then used in a later stage by the Router to decide which Filter or Output phase it must go through.

Example:

destination_server:
  ros__parameters:
    destination_plugins: ["flb_stdout"]
    flb_stdout: # Custom name for the plugin
      plugin: "dc_destinations/FlbStdout"
      inputs: ["/dc/measurement/string_stamped"]
measurement_server:
  ros__parameters:
    measurement_plugins: ["uptime"]
    uptime:
      plugin: "dc_measurements/Uptime"
      topic_output: "/dc/measurement/uptime"
      tags: ["flb_stdout"] # Match the plugin set in the destination_server

To manage the tags in DC, we pass the tags as parameter to each measurement and automatically use 2 Fluent Bit filters to assign the ROS message to a certain Fluent Bit output:

  1. rewrite_tag filter: Modify the message tag to the destination(s) configured
  2. lua filter: Take the flags received as a string containing a list and set the tags internally in Fluent Bit.

You can find th code in flb_destination.hpp

Match

Fluent Bit allows to deliver your collected and processed Events to one or multiple destinations, this is done through a routing phase. A Match represent a simple rule to select Events where its tags matches a defined rule.

Destinations

A destination is where the data will be sent: AWS S3, stdout, AWS Kinesis. It has the possibility to use outputs from fluentbit.

Conditions

A condition enables or disables one or multiple measurements to be published and thus collected. We could for example enable collecting camera images only when a robot is stopped.

Data collection for a measurement can be enabled if one of many conditions are activated, multiple conditions are activated or none.

Timestamp

The Timestamp represents the time when an Event was created. All events are converting the ROS now time to timestamps (UTC)

JSON Messages

In DC, all messages sent by measurements are a ROS message and the data string must be a JSON message.

"Robot_X:1.5, Robot_Y:1.8" # Not valid data string
"{'x': 1.5, 'y': 1.8}"     # Valid data string

JSON validation

Each record follows a JSON schema by default, it follows this specification document JSON Schema Validation.

Each measurement has its own JSON schema, which can be overwritten in a custom package or disabled.

Buffering and data persistence

Fluent Bit has its own buffering management, explained in its documentation. Data can be stored in Memory or/and filesystem.

By default, DC uses the memory buffering for a small amount of data (5M) and then uses filesystem buffering.

The configuration is printed when starting the destination server:

[destination_server-2] [2023/02/24 10:36:15] [ info] [storage] ver=1.3.0, type=memory+filesystem, sync=full, checksum=off, max_chunks_up=128
[destination_server-2] [2023/02/24 10:36:15] [ info] [input:ros2:ros2.0] storage_strategy='filesystem' (memory + filesystem)
[destination_server-2] [2023/02/24 10:36:15] [ info] [input:storage_backlog:storage_backlog.1] queue memory limit: 976.6K

This buffering ensures data will persist across reboots

Scheduling and Retries

DC inherits from Fluent Bit features of scheduling and retries. It can be configured in the destination node. More about it can be read on the Fluent Bit documentation

Data Pipeline

flowchart LR
    dc_bringup_launch["DC Bringup Launch"]
    yaml_param["YAML Parameter file"]
    ros_middleware["ROS middleware"]

    subgraph m_s["Measurement Server"]
        direction LR
        meas_node["Measurement Node"]
        measurement_plugins["Measurement plugins"]
        meas_topics_out["ROS Topics"]
    end

    subgraph d_s["Destination Server"]
        direction LR
        dest_node["Destination Node"]
        dest_plugins["Destination plugins"]
    end

    subgraph g_s["Group Server"]
        direction LR
        group_node["Group Node"]
        group_grouped_data["Grouped data"]
        group_topics_out["ROS Topics"]
    end

    subgraph flb["Fluent Bit"]
        direction LR
        flb_server["Fluent Bit Server"]
        flb_mem_buff["Memory buffer"]
        flb_storage_buff["Storage buffer"]
        flb_record["Record"]
        flb_ros2_plugin["ROS2 plugin"]
        flb_output["Outputs"]
    end

    dc_bringup_launch--2.starts-->meas_node
    dc_bringup_launch--3.starts-->dest_node
    dc_bringup_launch--4.starts-->group_node
    dc_bringup_launch--1.loads-->yaml_param

    meas_node--2.1.initializes-->measurement_plugins
    measurement_plugins--"2.2.publish to"-->meas_topics_out

    dest_node--3.1.starts-->flb_server
    dest_node--3.2.initializes-->dest_plugins
    dest_node--3.3.loads-->flb_ros2_plugin

    flb_ros2_plugin--"3.3.subscribes to"-->meas_topics_out
    flb_ros2_plugin--"3.3.subscribes to"-->group_topics_out
    group_node--"4.1.subscribes to"-->meas_topics_out
    group_node--4.2.generates-->group_grouped_data
    group_grouped_data--"4.3.publishes to"-->group_topics_out

    flb_ros2_plugin--3.4.generates-->flb_record
    flb_record--"3.5.stores in"-->flb_mem_buff
    flb_record--"3.7.stores in"-->flb_storage_buff
    flb_mem_buff--3.6.flushes-->flb_output
    flb_storage_buff--3.8.flushes-->flb_output

    group_topics_out--"4.4.sends to"-->ros_middleware
    meas_topics_out--"2.3.sends to"-->ros_middleware
    ros_middleware--"when data received, sends to"-->flb_ros2_plugin

The data flow precised in this flowchart summarizes how data moves:

  1. DC Bringup loads the yaml configuration file
  2. DC Bringup starts the measurement node
    1. Measurement plugins are loaded and data starts to be collected
    2. Each plugin publishes what it collected on a ROS topic
  3. DC Bringup starts the destination node
    1. Fluent Bit server is started: as soon as it receives records, it goes through filters and then is flushed to desired destinations
    2. Destination plugins are loaded
    3. ROS2 Fluent Bit plugin is loaded: it subscribes to measurement and group output topics
    4. Fluent Bit filters are initialized: it edits data received, modify the tags to match the desired output(s) and edit the timestamp field if required
  4. DC Bringup starts the group node
    1. Subscribes to measurement and group topics outputs
    2. When all measurements of a group are received, it groups the JSONs into a new one
    3. It publishes the grouped data on another topic

Measurements

Description

Measurements are a single data unit presented in JSON format, that can contain different fields. For example, Memory measurement:

{
    "date": "2022-12-04T14:16:06.810999008",
    "memory": {
        "used": 76.007431
    },
    "id": "3c70afdcb6f248f28f4c3980734064c5",
    "robot_name": "C3PO",
    "run_id": 358
}

Node parameters

The node starts the fluent bit engine and its ros2 plugin and enables data collection from ROS 2 topics. This plugin will subscribe to the configured ROS 2 topics and data will be collected by Fluent Bit to destinations enabled by the destination node. Each topic is configured in a measurement, which is loaded in this node with pluginlib. In addition, conditions are pluginlibs plugin also loaded dynamically. They are optional plugins that allow to collect on some conditions, e.g robot is moving.

Parameter nameDescriptionType(s)Default
measurement_pluginsName of the measurement plugins to loadlist[str]N/A (mandatory)
condition_pluginsName of the condition plugins to loadlist[str]N/A (mandatory)
save_local_base_pathPath where files will be saved locally (e.g camera images). Expands $X to environment variables and =Y to custom string parametersstr"$HOME/ros2/data/%Y/%M/%D/%H"
all_base_pathPath where files will be saved at their destination (S3, minio...). Expands $X to environment variables and =Y to custom string parametersstr""
custom_str_params_listCustom strings to use in other parameters. They are also appended in the json sent to the destinationlist[str]N/A
custom_str_params.force_overrideOverride values if the keys are already present in the measurement. Applies to all and can be overridden by custom_str_params.<param_name>.force_overrideboolfalse
custom_str_params.<param_name>.nameKey to add in the serialized datastrN/A (optional)
custom_str_params.<param_name>.valueValue to set for the key as a fixed stringstrN/A (optional)
custom_str_params.<param_name>.value_from_filePath to a file containing the value to setstrN/A (optional)
custom_str_params.<param_name>.force_overrideOverride value if the key is already present in the measurementboolfalse
run_id.enabledIdentify which run the robot is. A new one is generated at every start of the node. Uses either a counter or UUIDstrtrue
run_id.counterEnable counter for the run_idstrtrue
run_id.counter_pathPath to store the last run. It is expanded with environment variables idstr"$HOME/run_id"
run_id.uuidGenerate a new run ID by using a random UUIDstrfalse

Plugin parameters

Each measurement is collected through a node and has these configuration parameters:

Parameter nameDescriptionType(s)Default
pluginName of the plugin to loadstrN/A (mandatory)
topic_outputTopic where result will be publishedstr"/dc/measurement/<measurement_name>"
group_keyValue of the key used when groupedstrN/A (mandatory)
debugMore verbose outputboolfalse
polling_intervalInterval to which data is collected in millisecondsint (>=100)1000
init_collectCollect when the node starts instead of waiting the first tickbooltrue
init_max_measurementsCollect a maximum of n measurements when starting the node (-1 = never, 0 = infinite)int0
condition_max_measurementsCollect a maximum of n measurements when conditions are activated (-1 = never, 0 = infinite)int0
enable_validatorWill validate the data against a JSON schemabooltrue
json_schema_pathPath to the JSON schema, ignored if empty stringstrN/A (optional)
tagsTags used by Fluent Bit to do the matching to destinationslist[str]N/A (mandatory)
remote_prefixesPrefixes to apply to the paths when sending files to a destinationstrN/A (optional)
remote_keysUsed by some plugins to generate remote pathslist[str]N/A (optional)
if_all_conditionsCollect only if all conditions are activatedlist[str]N/A (optional)
if_any_conditionsCollect if any conditions is activatedlist[str]N/A (optional)
if_none_conditionsCollect only if all conditions are not activatedlist[str]N/A (optional)
include_measurement_nameInclude measurement name in the JSON databoolfalse
include_measurement_pluginInclude measurement plugin name in the JSON databoolfalse

Available plugins:

NameDescription
CameraCamera images, images can be rotated and inspected to detect content in images. They are saved as files
Command velocityCommand velocity: navigation commands
CPUCPU statistics
Distance traveledTotal distance traveled by the robot
DummyDummy event, for testing and debugging
IP CameraIP camera videos as files
MapROS map files (yaml and pgm) and metadata used by the robot to localize and navigate
MemorySystem memory usage
NetworkNetwork interfaces, availability
OSOperating System information
PermissionsPermissions of a file or directory
PositionRobot position
SpeedRobot speed
StorageAvailable and used space in a directory
String stampedRepublish a string stamped message, can be used for external data
TCP HealthHealth status of a TCP Server
UptimeHow long the machine has been turned on

Camera

Description

Save camera image files: raw, rotated and/or inspected. Images can be inspected using different services (e.g barcode detection)

Parameters

ParameterDescriptionTypeDefault
cam_nameName to give to the camerastrN/A (mandatory)
cam_topicTopic from where camera data needs to be fetchedstrN/A (mandatory)
detection_modulesDetection modules to uselist[str](barcode)N/A (optional)
draw_det_barcodesDraw barcode detection on imagesbooltrue
rotation_angleRotate the image before inspecting it by this angleint (90, 180, 270)0
save_detections_imgWhether to save inspected image captured by the camera with detection shapesbooltrue
save_inspected_base64Whether to save inspected image captured by the camera with detection shapes as base64 stringboolfalse
save_inspected_pathPath to save the inspected camera image. Expands environment variables and datetime format are expandedstr"camera/inspected/%Y-%m-%dT%H:%M:%S"
save_raw_base64Whether to save raw image captured by the camera as base64 stringboolfalse
save_raw_imgWhether to save raw image captured by the cameraboolfalse
save_raw_pathPath to save the raw camera image. Expands environment variables and datetimestr"camera/raw/%Y-%m-%dT%H:%M:%S"
save_rotated_base64Whether to save rotated image captured by the camera as base64 stringboolfalse
save_rotated_imgWhether to save rotated image captured by the cameraboolfalse
save_rotated_pathPath to save the rotated camera image. Expands environment variables and datetime format are expandedstr"camera/rotated/%Y-%m-%dT%H:%M:%S"

Measurement node configuration

The remote paths are also saved in the JSON under <measurement_name>._img_paths.(raw|rotated|inspected). If images want to be sent to Minio, add "minio" in remote_keys. This will add a remote path that can later be used in your API.

Note that this remote key is not included in the JSON schema, which only contains the local paths. If you want to enforce the schema with your custom remote key, you will need to write it and load it manually.

...
camera:
  plugin: "dc_measurements/Camera"
  group_key: "camera_with_codes"
  topic_output: "/dc/measurement/camera_with_codes"
  polling_interval: 10000
  init_collect: true
  node_name: "dc_measurement_camera"
  cam_topic: "/camera_with_codes"
  cam_name: my_camera_with_codes
  enable_validator: false
  draw_det_barcodes: true
  save_raw_img: true
  save_rotated_img: false
  save_detections_img: true
  save_raw_path: "camera_with_codes/raw/%Y-%m-%dT%H-%M-%S"
  save_rotated_path: "camera_with_codes/rotated/%Y-%m-%dT%H-%M-%S"
  save_inspected_path: "camera_with_codes/inspected/%Y-%m-%dT%H-%M-%S"
  rotation_angle: 0
  detection_modules: ["barcode"]
  remote_prefixes: [""]
  remote_keys: ["minio"] # Will create paths for Minio, does not send the file

Destination node configuration

Now that the path is set, it can be used to know where to send the image:

...
flb_minio:
  verbose_plugin: false
  time_format: "iso8601"
  plugin: "dc_destinations/FlbMinIO"
  inputs: ["/dc/group/cameras"]
  endpoint: 127.0.0.1:9000
  access_key_id: XEYqG4ZcPY5jiq5i
  secret_access_key: ji011KCtI82ZeQS6UwsQAg8x9VR4lSaQ
  use_ssl: false
  create_bucket: true
  bucket: "mybucket"
  src_fields:
    [
      "camera.local_img_paths.raw",
      "camera.local_img_paths.inspected"
    ]
  upload_fields: # Remote paths created by the measurement node configuration
    [
      "camera.minio_img_paths.raw",
      "camera.minio_img_paths.inspected"
    ]
...

Schema

{
  "$schema":"http://json-schema.org/draft-07/schema#",
  "title":"Camera",
  "description":"Camera images with detected objects",
  "properties":{
    "camera_name": {
      "description": "Name of the camera",
      "type": "string"
    },
    "local_img_paths":{
      "description":"Paths of saved images",
      "type":"object",
      "items":{
        "$ref":"#/$defs/local_img_paths"
      }
    },
    "inspected":{
      "description":"Inspected content of an image",
      "type":"object",
      "items":{
        "$ref":"#/$defs/inspected"
      }
    }
  },
  "$defs":{
    "local_img_paths":{
      "type":"object",
      "properties":{
        "raw":{
          "description":"Raw image",
          "type":"string"
        },
        "rotated":{
          "description":"Rotated image",
          "type":"string"
        },
        "inspected":{
          "description":"Inspected image",
          "type":"string"
        }
      }
    },
    "inspected":{
      "type":"object",
      "properties":{
        "barcode":{
          "description":"Barcode inspected data",
          "type":"array",
          "items":{
            "$ref":"#/$defs/barcode"
          }
        }
      }
    },
    "barcode":{
      "type":"object",
      "properties":{
        "data":{
          "description":"Barcode data",
          "type":"array",
          "items":{
            "type":"integer"
          }
        },
        "height":{
          "description":"Barcode height",
          "type":"integer"
        },
        "width":{
          "description":"Barcode width",
          "type":"integer"
        },
        "top":{
          "description":"Barcode top position",
          "type":"integer"
        },
        "left":{
          "description":"Barcode left position",
          "type":"integer"
        },
        "type":{
          "description":"Barcode type",
          "type":"string"
        }
      }
    }
  },
  "type":"object"
}

Cmd_vel

Description

Collect command velocity sent to the robot by subscribing to cmd_vel topic.

Parameters

ParameterDescriptionTypeDefault
topicTopic to subscribe to to get the cmd_vel (geometry_msgs/Twist)str"/cmd_vel"

Schema

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Cmd_vel",
    "description": "Command velocity sent to the robot",
    "properties": {
        "computed": {
            "description": "Computed command velocity in meter/s",
            "type": "number"
        },
        "linear": {
            "description": "Linear velocity as a vector",
            "type": "object",
            "items": {
                "$ref": "#/$defs/vector3"
            }
        },
        "angular": {
            "description": "Angular velocity as a vector",
            "type": "object",
            "items": {
                "$ref": "#/$defs/vector3"
            }
        }
    },
    "$defs": {
        "vector3": {
            "type": "object",
            "properties": {
                "x": {
                    "description": "X speed",
                    "type": "number"
                },
                "y": {
                    "description": "Y speed",
                    "type": "number"
                },
                "z": {
                    "description": "Z speed",
                    "type": "number"
                }
            }
        }
    },
    "type": "object"
}

Measurement configuration

...
cmd_vel:
  plugin: "dc_measurements/CmdVel"
  group_key: "cmd_vel"
  topic_output: "/dc/measurement/cmd_vel"

CPU

Description

Collect cpu usage: average cpu, number of processes running and processes sorted by cpu usage.

Parameters

ParameterDescriptionTypeDefault
cpu_minFilters out processes using less than this cpu usage. -1 to disablefloat5.0
max_processesMax amount of processes to collect in the sorted field. -1 for allint5

Schema

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Cpu",
    "description": "CPU statistics",
    "properties": {
        "average": {
            "description": "Average CPU",
            "type": "number",
            "minimum": 0
        },
        "processes": {
            "description": "Number of processes running",
            "type": "integer",
            "minimum": 0
        },
        "sorted": {
            "description": "Processes sorted by CPU usage",
            "type": "array",
            "items": {
                "$ref": "#/$defs/process"
            }
        }
    },
    "$defs": {
        "process": {
            "type": "object",
            "description": "Process information",
            "properties": {
                "pid": {
                    "description": "Process ID of the process",
                    "type": "integer"
                },
                "user": {
                    "description": "User who started the process",
                    "type": "string"
                },
                "cmd": {
                    "description": "Command that launched the process",
                    "type": "string"
                },
                "cpu": {
                    "description": "Process' current utilization as a percentage of total CPU time",
                    "type": "number"
                },
                "ram": {
                    "description": "Memory in use by this process in kb",
                    "type": "integer"
                },
                "uptime": {
                    "description": "Age of the process in seconds",
                    "type": "integer"
                }
            }
        }
    },
    "type": "object"
}

Measurement configuration

...
cpu:
  plugin: "dc_measurements/Cpu"
  topic_output: "/dc/measurement/cpu"
  tags: ["flb_stdout"]
  max_processes: 10
  cpu_min: 10.0

Dummy

Description

The dummy measurement, generates dummy events. It is useful for testing, debugging, benchmarking and getting started with ROS 2 Data collection.

Parameters

ParameterDescriptionTypeDefault
recordDummy JSON record.str"{"message":"Hello from ROS 2 DC"}"

Schema

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Dummy",
    "description": "Dummy JSON",
    "properties": {
        "message": {
            "description": "Dummy message",
            "type": "string"
        }
    },
    "type": "object"
}

Measurement configuration

...
dummy:
  plugin: "dc_measurements/Dummy"
  topic_output: "/dc/measurement/dummy"
  tags: ["flb_stdout"]

Distance traveled

Description

Collect total distance traveled in the robot since it is powered.

Parameters

ParameterDescriptionTypeDefault
global_frameGlobal framestr"map"
robot_base_frameRobot base framestr"base_link"
transform_timeoutTF Timeout to use for transformationfloat0.1

Schema

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Distance traveled",
    "description": "Total distance traveled in meters by the robot",
    "properties": {
        "distance_traveled": {
            "description": "Total distance traveled in meters",
            "type": "number"
        }
    },
    "type": "object"
}

Measurement configuration

...
distance_traveled:
  plugin: "dc_measurements/DistanceTraveled"
  topic_output: "/dc/measurement/distance_traveled"
  tags: ["flb_stdout"]
  global_frame: "map"
  robot_base_frame: "base_link"
  transform_timeout: 0.1

Ip Camera

Description

Records video in small segments (in case of a cut) and store it locally. They are first stored in a temporary folder. Once the record (of e.g 10 seconds) is done, it is moved to another directory.

Compared to other plugins, the collect function only moves the files from the temporary location, it does not start the recording. It takes some time to establish connection, so we avoid doing this every time. Recording is started at initialization by an ffmpeg process and saved in HLS format.

Parameters

ParameterDescriptionTypeDefault
inputInput urlstrN/A (Mandatory)
videoEnable video recordingbooltrue
audioEnable audio recordingboolfalse
bitrate_videoVideo bitratestr([0-9]+[kmKM])"2M"
bitrate_audioAudio bitratestr([0-9]+[kmKM])"192k"
segmentRecords by small segment, managed by ffmpegboolbool
segment_timeDuration of a segmentint (>0)10
ffmpeg_log_levelFfmpeg log levelstr (See doc)"info"
ffmpeg_bannerShow ffmpeg banner in consolebooltrue
save_pathPath used to save files with ffmpeg, UTC date is used. There can't be ":" in this string, ffmpeg does not parse itstr"ffmpeg_%Y-%m-%dT%H:%M:%S"

Schema

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Ip Camera",
    "description": "Local and remote path where the remote camera video is recorded",
    "properties": {
        "local_path": {
            "description": "Local video path",
            "type": "string"
        },
        "remote_path": {
            "description": "Remote video path",
            "type": "string"
        }
    },
    "type": "object"
}

Map

Description

Save map using nav2_map_server and collect the map of the local map saved. The measurement also includes metadata: width, height and x and y origin.

Parameters

ParameterDescriptionTypeDefault
quietDisable stdout for nav2 map saverbooltrue
save_pathPath to save the map to. Environment variables and datetime format are expandedstr"map/%Y-%m-%dT%H:%M:%S"
save_map_timeoutTime to wait to save the mapfloat3.0
topicTopic to subscribe to to get the mapstr"/map"

Schema

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Map",
    "description": "Map saved metadata and paths",
    "properties": {
        "resolution": {
            "description": "Resolution of the map, meters/pixel",
            "type": "number",
            "minimum": 0
        },
        "local_paths": {
            "description": "Paths where metadata and image are stored",
            "type": "object",
            "items": {
                "$ref": "#/$defs/paths"
            }
        },
        "origin": {
            "description": "Robot origin position in meters",
            "type": "object",
            "items": {
                "$ref": "#/$defs/origin"
            }
        },
        "width": {
            "description": "Width of the PGM",
            "type": "integer",
            "minimum": 0
        },
        "height": {
            "description": "Height of the PGM",
            "type": "integer",
            "minimum": 0
        }
    },
    "$defs": {
        "origin": {
            "type": "object",
            "description": "The 2-D pose of the lower-left pixel in the map, as (x, y, yaw), with yaw as counterclockwise rotation (yaw=0 means no rotation). Many parts of the system currently ignore yaw.",
            "properties": {
                "x": {
                    "description": "X origin of the robot",
                    "type": "number"
                },
                "y": {
                    "description": "Y origin of the robot",
                    "type": "number"
                }
            }
        },
        "paths": {
            "type": "object",
            "properties": {
                "png": {
                    "description": "Path to the map PNG file containing the image",
                    "type": "string"
                },
                "yaml": {
                    "description": "Path to the map YAML file containing map metadata",
                    "type": "string"
                },
                "pgm": {
                    "description": "Path to the map PGM file containing the gray-scale image",
                    "type": "string"
                }
            }
        }
    },
    "type": "object"
}

Measurement configuration

...
map:
  plugin: "dc_measurements/Map"
  topic_output: "/dc/measurement/map"
  tags: ["flb_stdout"]
  topic: "/map"
  save_map: "map/%Y-%m-%dT%H:%M:%S"
  save_map_timeout: 0.2
  quiet: true
  remote_keys: ["s3"]

Memory

Description

Collect memory used in percentage.

Schema

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Memory",
    "description": "Memory used",
    "properties": {
        "used": {
            "description": "Memory used in percent",
            "type": "number",
            "minimum": 0
        }
    },
    "type": "object"
}

Measurement configuration

...
memory:
  plugin: "dc_measurements/Memory"
  topic_output: "/dc/measurement/memory"
  tags: ["flb_stdout"]

Network

Description

Collects ping value, whether or not the PC is online and interfaces available.

Parameters

ParameterDescriptionTypeDefault
ping_addressIP address to test the ping tostr"8.8.8.8"
ping_timeoutTime in ms before ping times out. Offline if superior to itint5000

Schema

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Network",
    "description": "Network accessibility and information",
    "properties": {
        "ping": {
            "description": "Time to ping the host in ms",
            "type": "integer",
            "minimum": -1
        },
        "online": {
            "description": "If the pc is online",
            "type": "boolean"
        },
        "interfaces": {
            "description": "List of network interfaces",
            "type": "array",
            "items": {
                "type": "string"
            }
        }
    },
    "type": "object"
}

Measurement configuration

...
network:
  plugin: "dc_measurements/Network"
  topic_output: "/dc/measurement/network"
  tags: ["flb_stdout"]
  ping_address: 192.168.0.1
  ping_timeout: 500

OS

Description

Collects the Operating System information: cpus, operating system name and kernel information

Schema

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "OS",
  "description": "OS, kernel and CPUs information",
  "properties": {
    "os": {
      "description": "Host distribution name",
      "type": "string"
    },
    "kernel": {
      "description": "Kernel version",
      "type": "string"
    },
    "cpu": {
      "description": "Number of CPUs",
      "type": "integer",
      "minimum": 0
    },
    "memory": {
      "description": "System memory",
      "type": "number",
      "minimum": 0
    }
  },
  "type": "object"
}

Measurement configuration

...
os:
  plugin: "dc_measurements/OS"
  topic_output: "/dc/measurement/os"
  tags: ["flb_stdout"]

Permissions

Description

Collect UID, GID, if a file or directory exists and its permissions (in rwx or integer format).

Parameters

ParameterDescriptionTypeDefault
formatFormat to collect permissions instr(rwx or int)"int"
pathPath to the file or directory to collect data from, support environment variablesstrN/A (mandatory)

Schemas

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Permissions",
    "description": "Permissions of a file/directory",
    "properties": {
        "uid": {
            "description": "File/directory User IDentifier",
            "type": "integer"
        },
        "gid": {
            "description": "File/directory Group IDentifier",
            "type": "integer"
        },
        "exists": {
            "description": "File/directory exists",
            "type": "boolean"
        },
        "permissions": {
            "description": "Permissions as rwx or integer",
            "type": "string"
        }
    },
    "type": "object"
}

Measurement configuration

...
permission_home_dc:
  plugin: "dc_measurements/Permissions"
  topic_output: "/dc/measurement/permissions_home_dc"
  tags: ["flb_stdout"]
  path: "$HOME/dc"
  format: "rwx"

Position

Description

Collect x, y and yaw of the robot.

Parameters

ParameterDescriptionTypeDefault
global_frameGlobal framestr"map"
robot_base_frameRobot base framestr"base_link"
transform_timeoutTF Timeout to use for transformationfloat0.1

Schema

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Position",
    "description": "Position and orientation of the robot",
    "properties": {
        "x": {
            "description": "X position of the robot",
            "type": "number"
        },
        "y": {
            "description": "Y position of the robot",
            "type": "number"
        },
        "yaw": {
            "description": "Yaw angle of the robot",
            "type": "number"
        }
    },
    "type": "object"
}

Measurement configuration

...
position:
  plugin: "dc_measurements/Position"
  topic_output: "/dc/measurement/position"
  tags: ["flb_stdout"]

Speed

Description

Collect robot speed using the Odom topic.

Parameters

ParameterDescriptionTypeDefault
odom_topicTopic to subscribe to to get the odometry (nav_msgs/Odometry)str"/odom"

Schema

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Speed",
    "description": "Computed, linear and angular speed of the robot",
    "properties": {
        "computed": {
            "description": "Computed speed in meter/s",
            "type": "number"
        },
        "linear": {
            "description": "Linear velocity as a vector",
            "type": "object",
            "items": {
                "$ref": "#/$defs/vector3"
            }
        },
        "angular": {
            "description": "Angular velocity as a vector",
            "type": "object",
            "items": {
                "$ref": "#/$defs/vector3"
            }
        }
    },
    "$defs": {
        "vector3": {
            "type": "object",
            "properties": {
                "x": {
                    "description": "X speed",
                    "type": "number"
                },
                "y": {
                    "description": "Y speed",
                    "type": "number"
                },
                "z": {
                    "description": "Z speed",
                    "type": "number"
                }
            }
        }
    },
    "type": "object"
}

Measurement configuration

...
speed:
  plugin: "dc_measurements/Speed"
  topic_output: "/dc/measurement/speed"
  tags: ["flb_stdout"]
  odom_topic: "/odom"

Storage

Description

Collect storage information on a directory.

Parameters

ParameterDescriptionTypeDefault
pathAbsolute path to the directory to inspect, expect environment variablesstrN/A (Mandatory)

Schema

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Storage",
    "description": "Storage information of a directory",
    "properties": {
        "free_percent": {
            "description": "Free space on the filesystem, in percent",
            "type": "number",
            "minimum": 0
        },
        "free": {
            "description": "Free space on the filesystem, in bytes",
            "type": "integer",
            "minimum": 0
        },
        "capacity": {
            "description": "Total size of the filesystem, in bytes",
            "type": "integer",
            "minimum": 0
        }
    },
    "type": "object"
}

Measurement configuration

...
storage_home:
  plugin: "dc_measurements/Storage"
  topic_output: "/dc/measurement/storage_home"
  tags: ["flb_stdout"]
  path: "$HOME"

String stamped

Description

Collect generic data from a topic publishing a StringStamped message and republish it. It allows to fetch data from rclc, rclpy and your custom ROS 2 nodes that don't have use a plugin.

Parameters

ParameterDescriptionTypeDefault
timer_basedIf true, collect data at interval and if false collect every record and ignores polling_intervalbooltrue
topicTopic to get data fromstrN/A (mandatory)

Schema

Given that the data is customized here, there is no default schema.

Measurement configuration

...
my_data:
  plugin: "dc_measurements/StringStamped"
  topic_output: "/dc/measurement/my_data"
  tags: ["flb_stdout"]
  topic: "/hello-world"
  timer_based: true

TCP Health

Description

Collects status of a TCP server.

Parameters

ParameterDescriptionTypeDefault
nameAlias to give to the TCP ServerstrN/A (Mandatory)
hostName of the target host or IP address to checkstr"127.0.0.1" (Optional)
portTCP port where to perform the connection check.int(>0 <65536)80 (Optional)

Schema

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "TCP Health",
    "description": "Status of a TCP server",
    "properties": {
        "host": {
            "description": "Server hostname",
            "type": "string"
        },
        "port": {
            "description": "Port number",
            "type": "integer",
            "minimum": 1,
            "maximum": 65536
        },
        "server_name": {
            "description": "Time the system has been up",
            "type": "string"
        }
    },
    "type": "object"
}

Measurement configuration

...
tcp_health:
  plugin: "dc_measurements/TCPHealth"
  topic_output: "/dc/measurement/minio_health"
  group_key: "minio_health"
  tags: ["flb_stdout"]
  host: "127.0.0.1"
  port: 9000
  name: "minio_api"

Uptime

Description

Time since when the robot PC has been on.

Schema

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Uptime",
    "description": "Time the system has been up",
    "properties": {
        "time": {
            "description": "Time the system has been up",
            "type": "integer",
            "minimum": 0
        }
    },
    "type": "object"
}

Measurement configuration

...
uptime:
  plugin: "dc_measurements/Uptime"
  topic_output: "/dc/measurement/uptime"
  tags: ["flb_stdout"]

Overview

Description

A condition enables or disables one or multiple measurements to be published and thus collected. We could for example enable collecting camera images only when a robot is stopped:

Each condition is enabled or disabled through a pluginlib plugin. It has these configuration parameters:

Available plugins:

NameDescription
Robot movingRobot is moving
Bool equalValue of a boolean key is equal to
Double equalValue of a double key is equal to
Double inferiorValue of a double key is inferior to
Double superiorValue of a double key is superior to
ExistKey exists
Integer equalValue of an integer key is equal to
Integer inferiorValue of an integer key is inferior to
Integer superiorValue of an integer key is superior to
List bool equalValue of a list of boolean key is equal to
List double equalValue of a list of double key is equal to
List integer equalValue of a list of integer key is equal to
List string equalValue of a list of string key is equal to
Same as previousValue of the key is the same as the previous one
String matchValue of a string key matches the regex of

Bool equal

Description

Compare JSON key value to the value passed in parameter and returns true if equal.

Parameters

ParameterDescriptionTypeDefault
keyJSON key where value is located, separate nested dictionary with /strN/A (Mandatory)
valueValue to which compare the JSON valueboolN/A (Mandatory)

Double equal

Description

Compare JSON key value to the value passed in parameter and returns true if equal.

Parameters

ParameterDescriptionTypeDefault
keyJSON key where value is located, separate nested dictionary with /strN/A (Mandatory)
valueValue to which compare the JSON valuefloatN/A (Mandatory)

Double inferior

Description

Compare JSON key value to the value passed in parameter and returns true if inferior.

Parameters

ParameterDescriptionTypeDefault
keyJSON key where value is located, separate nested dictionary with /strN/A (Mandatory)
valueValue to which compare the JSON valuefloatN/A (Mandatory)

Double superior

Description

Compare JSON key value to the value passed in parameter and returns true if superior.

Parameters

ParameterDescriptionTypeDefault
keyJSON key where value is located, separate nested dictionary with /strN/A (Mandatory)
valueValue to which compare the JSON valuefloatN/A (Mandatory)

Exist

Description

Compare JSON key value to the value passed in parameter and returns true if exists.

Parameters

ParameterDescriptionTypeDefault
keyJSON key where value is located, separate nested dictionary with /strN/A (Mandatory)

Integer equal

Description

Compare JSON key value to the value passed in parameter and returns true if equal.

Parameters

ParameterDescriptionTypeDefault
keyJSON key where value is located, separate nested dictionary with /strN/A (Mandatory)
valueValue to which compare the JSON valueintN/A (Mandatory)

Integer inferior

Description

Compare JSON key value to the value passed in parameter and returns true if inferior.

Parameters

ParameterDescriptionTypeDefault
keyJSON key where value is located, separate nested dictionary with /strN/A (Mandatory)
valueValue to which compare the JSON valueintN/A (Mandatory)

Integer superior

Description

Compare JSON key value to the value passed in parameter and returns true if superior.

Parameters

ParameterDescriptionTypeDefault
keyJSON key where value is located, separate nested dictionary with /strN/A (Mandatory)
valueValue to which compare the JSON valueintN/A (Mandatory)

List bool equal

Description

Compare JSON key value to the value passed in parameter and returns true if equal.

Parameters

ParameterDescriptionTypeDefault
keyJSON key where value is located, separate nested dictionary with /strN/A (Mandatory)
valueValue to which compare the JSON valueboolN/A (Mandatory)
order_mattersIf true, will compare taking account of the orderbooltrue

List double equal

Description

Compare JSON key value to the value passed in parameter and returns true if equal.

Parameters

ParameterDescriptionTypeDefault
keyJSON key where value is located, separate nested dictionary with /strN/A (Mandatory)
valueValue to which compare the JSON valuelist[float]N/A (Mandatory)
order_mattersIf true, will compare taking account of the orderbooltrue

List integer equal

Description

Compare JSON key value to the value passed in parameter and returns true if equal.

Parameters

ParameterDescriptionTypeDefault
keyJSON key where value is located, separate nested dictionary with /strN/A (Mandatory)
valueValue to which compare the JSON valuelist[int]N/A (Mandatory)
order_mattersIf true, will compare taking account of the orderbooltrue

List string equal

Description

Compare JSON key value to the value passed in parameter and returns true if equal.

Parameters

ParameterDescriptionTypeDefault
keyJSON key where value is located, separate nested dictionary with /strN/A (Mandatory)
valueValue to which compare the JSON valueboolN/A (Mandatory)
order_mattersIf true, will compare taking account of the orderbooltrue

Moving

Description

Use a hysteresis on robot position to know whether the robot is moving.

Parameters

ParameterDescriptionTypeDefault
odom_topicTopic from where odom is used to know if the robot is movingstr"/odom"
speed_thresholdSpeed threshold used in the hysteresisfloat0.2
count_limitCounter to know if the robot is movingint8
count_hysteresisHysteresis counterint5

Same as previous

Description

Compare JSON key value to the value passed in parameter and returns true if match.

Parameters

ParameterDescriptionTypeDefault
keysJSON keys where values are located, separate nested dictionary with /list[str]N/A (Mandatory)
excludeJSON keys to exclude in comparisonlist[str]N/A (Mandatory)

String match

Description

Compare JSON key value to the value passed in parameter and returns true if match.

Parameters

ParameterDescriptionTypeDefault
keyJSON key where value is located, separate nested dictionary with /strN/A (Mandatory)
regexRegex to which compare the JSON value tostrN/A (Mandatory)

Data validation

Model

Each measurement cam validate its data with a model.

We use use JSON schema validator for JSON for Modern C++.

Schemas follow the JSON 7 model:

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Uptime",
    "description": "Time the system has been up",
    "properties": {
        "time": {
            "description": "Time the system has been up",
            "type": "integer",
            "minimum": 0
        }
    },
    "type": "object"
}

It is possible to enforce the validation. By default, it is disabled since some fields can be filtered out, and some are added dynamically (e.g remote paths).

Currently it is not possible to select another schema but it is planned to be able to pass a custom path later on.

Failed validation callback

You might want to trigger some actions when a validation fails, e.g send the data to another database to later on debug it.

In this case, you will need to write your own plugin (inherit from an existing one or start from scratch) and define the onFailedValidation function in the class.

This case is covered by the custom plugin demo

Use a different Schema

For each plugin, a default path is provided but this can be changed by passing the json_schema_path parameter in the measurement plugin parameter to the absolute path of your schema.

Groups

Description

Each measurement is part of a group. This allows measurements to be grouped together later on. For example, having the measurements cpu and position belonging to the same group, data will be published together on /dc/group/my_group

{
    "cpu": ...,
    "position": ...,
}

Currently, this node is written in python. The main reason for it is that we don't know how to allocate and pass different amount of variables to the TimeSynchronizer. In python it is quite simple, but not so in C++.

Node parameters

ParameterDescriptionTypeDefault
groupsGroups to enablelist[str]N/A

Group parameters

ParameterDescriptionTypeDefault
inputsName of the input topics to grouplist[str]N/A
outputOutput topic to send the data tostr"/dc/group/{group}"
sync_delayDelay to wait during all subscriber data need to reach before being published againfloat5.0
group_keyDictionary key under which data is groupedstr{group_name}
exclude_keysList of keys to exclude from the published data. Data depth is separated by a dotlist[str]N/A
tagsDestination names, used to know where data will be forwarded tolist[str]N/A
nested_dataWhether measurements are nested dictionaries or flatboolfalse
include_group_nameInclude group name in the JSON as key="name" and value=<group_key>booltrue

Example

group_server:
  ros__parameters:
    groups: ["map", "memory_uptime"]
    memory_cpu:
      inputs: ["/dc/measurement/memory", "/dc/measurement/cpu"]
      output: "/dc/group/memory_cpu"
      sync_delay: 5.0
      group_key: "memory_cpu"
      tags: ["flb_pgsql"]
    memory_uptime:
      inputs: ["/dc/measurement/memory", "/dc/measurement/uptime"]
      output: "/dc/group/memory_uptime"
      sync_delay: 5.0
      group_key: "memory_uptime"
      tags: ["flb_stdout"]
    cameras:
      inputs: ["/dc/measurement/camera"]
      output: "/dc/group/cameras"
      sync_delay: 5.0
      group_key: "cameras"
      tags: ["flb_minio", "flb_stdout"]
    map:
      inputs: ["/dc/measurement/map"]
      output: "/dc/group/map"
      sync_delay: 5.0
      group_key: "map"
      tags: ["flb_minio", "flb_stdout"]

Overview

Description

A destination is where the data will be sent: AWS S3, stdout, AWS Kinesis. It has the possibility to use outputs from fluentbit.

The destination node is similar to the measurement one. It:

  • Either subscribes to data in the main node and data is forwarded with ros based destination plugins
  • Either subscribes to data from the ros2 fluent bit plugin.

The fluent bit plugin is the preferred one since when using it, we get all the benefits from fluent bit (especially data integrity). The ros2 fluent bit plugin uses rclc and is at the moment a fork of fluent bit. It could also be rewritten as a fluent bit plugin based on the GO interface, but pros and cons to move to it are not clear yet.

Node parameters

Destinations parameters are loaded dynamically. Here are the static ones:

Parameter nameDescriptionType(s)Default
flb.flushInterval to flush output (seconds)str"1"
flb.graceWait time (seconds) on exitstr"1"
flb.log_levelDiagnostic level (error/warning/info/debug/trace)str"info"
flb.storage_pathSet an optional location in the file system to store streams and chunks of data. If this parameter is not set, Input plugins can only use in-memory buffering.str"/var/log/flb-storage/"
flb.storage_syncConfigure the synchronization mode used to store the data into the file system. It can take the values normal or full.str"normal"
flb.storage_checksumEnable the data integrity check when writing and reading data from the filesystem. The storage layer uses the CRC32 algorithm.str"off"
flb.storage_backlog_mem_limitIf storage.path is set, Fluent Bit will look for data chunks that were not delivered and are still in the storage layer, these are called backlog data. This option configure a hint of maximum value of memory to use when processing these records.str"5M"
flb.scheduler_capSet a maximum retry time in seconds. The property is supported from v1.8.7.str"2000"
flb.scheduler_baseSet a base of exponential backoff. The property is supported from v1.8.7.str"5"
flb.http_serverIf true enable statistics HTTP serverboolfalse
flb.in_storage_typeSpecifies the buffering mechanism to use. It can be memory or filesystem.str"filesystem"
flb.in_storage_pause_on_chunks_overlimitSpecifies if file storage is to be paused when reaching the chunk limit.str"off"

Plugin parameters

Parameter nameDescriptionType(s)Default
pluginPlugin to loadlist[str]N/A
inputsTopics to which to listen to get the datalist[str]N/A
debugEnable debug printboolfalse
time_formatFormat the data will be printedstr("double", "iso8601")"double"
time_keyDictionary key from which date will be taken fromstr"date"

Available plugins:

NameDescriptionSource
flb_fileWrite the data received through the input plugin to file.Fluent Bit File
flb_files_metricsIngest your records into the AWS Kinesis serviceThis project, in fluent_bit_plugins
flb_httpJSON to http request via Fluent BitFluent Bit HTTP
flb_kinesis_streamsJSON to AWS Kinesis Streams via Fluent BitAmazon Kinesis Data Streams
flb_minioJSON to Minio via Fluent BitThis project, in fluent_bit_plugins package
flb_pgsqlJSON to PostgreSQL via Fluent BitFluent Bit PostgreSQL
flb_stdoutJSON to STDOUT via Fluent BitFluent Bit Stdout
flb_s3JSON to AWS S3 via Fluent BitFluent Bit S3
flb_tcpJSON to TCP via Fluent BitFluent Bit TCP & TLS
rclJSON to RCLROS 2 logging

Example configuration

destination_server:
  ros__parameters:
    flb:
      flush: "1"
      flb_grace: "1"
      log_level: "info"
      storage_path: "/var/log/flb-storage/"
      storage_sync: "full"
      storage_checksum: "off"
      storage_backlog_mem_limit: "1M"
      scheduler_cap: "2000"
      scheduler_base: "5"
      metrics: true
      in_storage_type: "filesystem"
      in_storage_pause_on_chunks_overlimit: "off"
    destination_plugins: ["flb_stdout"]
    flb_minio:
      plugin: "dc_destinations/FlbMinIO"
      inputs: ["/dc/group/map"]
      plugin_path: "/root/ws/src/ros2_data_collection/dc_destinations/flb_plugins/lib/out_minio.so"
      endpoint: 127.0.0.1:9000
      access_key_id: HgJdDWeDQBiBWCwm
      secret_access_key: plCMROO2VMZIKiqEwDd80dLJUCRvJ9iu
      use_ssl: false
      bucket: "mybucket"
      src_fields: ["camera.local_img_paths.raw","camera.local_img_paths.rotated", "map.local_map_paths.yaml", "map.local_map_paths.pgm"]
      upload_fields: ["camera.minio_img_paths.raw","camera.minio_img_paths.rotated", "map.minio_map_paths.yaml", "map.minio_map_paths.pgm"]
    flb_pgsql:
      plugin: "dc_destinations/FlbPgSQL"
      inputs: ["/dc/group/memory_uptime"]
      host: "127.0.0.1"
      port: "5432"
      user: fluentbit
      password: password
      database: "fluentbit"
      table: "dc"
      timestamp_key: "date"
      async: false
      time_format: "double"
      time_key: "date"
    rcl:
      plugin: "dc_destinations/Rcl"
      inputs: ["/dc/group/memory_uptime"]
    flb_stdout:
      plugin: "dc_destinations/FlbStdout"
      inputs: ["/dc/group/cameras"]
      inputs: ["/dc/group/cameras"]
      time_format: "iso8601"
      time_key: "date"
    flb_http:
      plugin: "dc_destinations/FlbHTTP"
      inputs: ["/dc/group/memory_uptime"]
      debug: true
    flb_file:
      plugin: "dc_destinations/FlbFile"
      inputs: ["/dc/group/memory_uptime"]
      path: "$HOME/data"
      file: uptime
      debug: false
      throttle:
        enable: false
        rate: "1"
        window: "5"
        interval: "10s"
        print_status: true

AWS Kinesis Data Streams - Fluent Bit

Description

The Amazon Kinesis Data Streams output plugin allows to ingest your records into the Kinesis service. See fluent bit page for more information.

Parameters

ParameterDescriptionTypeDefault
regionThe AWS region.strN/A
streamThe name of the Kinesis Streams Delivery stream that you want log records sent to.strN/A
role_arnARN of an IAM role to assume (for cross account access).strN/A
time_keyAdd the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis.strN/A
time_key_formatstrftime compliant format string for the timestamp; for example, the default is '%Y-%m-%dT%H:%M:%S'. Supports millisecond precision with '%3N' and supports nanosecond precision with '%9N' and '%L'; for example, adding '%3N' to support millisecond '%Y-%m-%dT%H:%M:%S.%3N'. This option is used with time_key.str"%Y-%m-%dT%H:%M:%S"
log_keyBy default, the whole log record will be sent to Kinesis. If you specify a key name with this option, then only the value of that key will be sent to Kinesis. For example, if you are using the Fluentd Docker log driver, you can specify log_key log and only the log message will be sent to Kinesis.strN/A
endpointSpecify a custom endpoint for the Kinesis API.strN/A
auto_retry_requestsImmediately retry failed requests to AWS services once. This option does not affect the normal Fluent Bit retry mechanism with backoff. Instead, it enables an immediate retry with no delay for networking errors, which may help improve throughput when there are transient/random networking issues. This option defaults to true.booltrue

Node configuration

...
flb_kinesis_streams:
  plugin: "dc_destinations/FlbKinesisStreams"
  inputs: ["/dc/measurement/data"]
  region: "us-east-1"
  stream: my_stream
  time_key: "date"
  time_key_format: "%Y-%m-%dT%H:%M:%S"
...

AWS S3 - Fluent Bit

Description

The Amazon S3 output plugin allows to ingest your records into the S3 service. See fluent bit page for more information.

Parameters

ParameterDescriptionTypeDefault
regionThe AWS region of your S3 bucket.str"us-east-1"
bucketS3 Bucket name.strN/A (Mandatory)
json_date_keySpecify the name of the time key in the output record. To disable the time key just set the value to false.str"date"
json_date_formatSpecify the format of the date. Supported formats are double, epoch, iso8601 (eg: 2018-05-30T09:39:52.000681Z) and java_sql_timestamp (eg: 2018-05-30 09:39:52.000681).str"iso8601"
total_file_sizeSpecifies the size of files in S3. Maximum size is 50G, minimim is 1M.str"100M"
upload_chunk_sizeThe size of each 'part' for multipart uploads. Max: 50M.str"50M"
upload_timeoutWhenever this amount of time has elapsed, Fluent Bit will complete an upload and create a new file in S3. For example, set this value to 60m and you will get a new file every hour.str"10m"
store_dirDirectory to locally buffer data before sending. When multipart uploads are used, data will only be buffered until the upload_chunk_size is reached. S3 will also store metadata about in progress multipart uploads in this directory; this allows pending uploads to be completed even if Fluent Bit stops and restarts. It will also store the current $INDEX value if enabled in the S3 key format so that the $INDEX can keep incrementing from its previous value after Fluent Bit restarts.str"/tmp/fluent-bit/s3"
store_dir_limit_sizeThe size of the limitation for disk usage in S3. Limit the amount of s3 buffers in the store_dir to limit disk usage. Note: Use store_dir_limit_size instead of storage.total_limit_size which can be used to other plugins, because S3 has its own buffering system.int0
s3_key_formatFormat string for keys in S3. This option supports a UUID, strftime time formatters, a syntax for selecting parts of the Fluent log tag using a syntax inspired by the rewrite_tag filter. Add $UUID in the format string to insert a random string. Add $INDEX in the format string to insert an integer that increments each upload. The $INDEX value will be saved in the store_dir so that if Fluent Bit restarts the value will keep incrementing from the previous run. Add $TAG in the format string to insert the full log tag; add $TAG[0] to insert the first part of the tag in the s3 key. The tag is split into “parts” using the characters specified with the s3_key_format_tag_delimiters option. Add extension directly after the last piece of the format string to insert a key suffix. If you want to specify a key suffix and you are in use_put_object mode, you must specify $UUID as well. More explanations can be found in the S3 Key Format explainer section further down in this document. See the in depth examples and tutorial in the documentation. Time in s3_key is the timestamp of the first record in the S3 file.str"/fluent-bit-logs/$TAG/%Y/%m/%d/%H/%M/%S"
static_file_pathDisables behavior where UUID string is automatically appended to end of S3 key name when $UUID is not provided in s3_key_format. $UUID, time formatters, $TAG, and other dynamic key formatters all work as expected while this feature is set to true.boolfalse
s3_key_format_tag_delimitersA series of characters which will be used to split the tag into 'parts' for use with the s3_key_format option. See the in depth examples and tutorial in the documentation.strN/A
use_put_objectUse the S3 PutObject API, instead of the multipart upload API. When this option is on, key extension is only available when $UUID is specified in s3_key_format. If $UUID is not included, a random string will be appended at the end of the format string and the key extension cannot be customized in this case.boolfalse
role_arnARN of an IAM role to assume (ex. for cross account access).strN/A
endpointCustom endpoint for the S3 API. An endpoint can contain scheme and port.strN/A
sts_endpointCustom endpoint for the STS API.strN/A
canned_aclPredefined Canned ACL policy.strN/A
compressionCompression type for S3 objects. 'gzip' is currently the only supported value by default. If Apache Arrow support was enabled at compile time, you can also use 'arrow'. For gzip compression, the Content-Encoding HTTP Header will be set to 'gzip'. Gzip compression can be enabled when use_put_object is 'on' or 'off' (PutObject and Multipart). Arrow compression can only be enabled with use_put_object On.strN/A
content_typeA standard MIME type for the S3 object; this will be set as the Content-Type HTTP header.strN/A
send_content_md5Send the Content-MD5 header with PutObject and UploadPart requests, as is required when Object Lock is enabled.boolfalse
auto_retry_requestsImmediately retry failed requests to AWS services once. This option does not affect the normal Fluent Bit retry mechanism with backoff. Instead, it enables an immediate retry with no delay for networking errors, which may help improve throughput when there are transient/random networking issues.booltrue
log_keyBy default, the whole log record will be sent to S3. If you specify a key name with this option, then only the value of that key will be sent to S3. For example, if you are using Docker, you can specify log_key log and only the log message will be sent to S3.strN/A
preserve_data_orderingNormally, when an upload request fails, there is a high chance for the last received chunk to be swapped with a later chunk, resulting in data shuffling. This feature prevents this shuffling by using a queue logic for uploads.booltrue
storage_classSpecify the storage class for S3 objects. If this option is not specified, objects will be stored with the default 'STANDARD' storage class.strN/A
retry_limitInteger value to set the maximum number of retries allowed. For previous version, the number of retries is 5 and is not configurable.int1
external_idSpecify an external ID for the STS API, can be used with the role_arn parameter if your role requires an external ID.strN/A

Node configuration

...
flb_s3:
  plugin: "dc_destinations/FlbS3"
  inputs: ["/dc/group/data"]
  bucket: my-bucket
  region: us-west-2
  total_file_size: "50M"
  use_put_object: false
  compression: "gzip"
  s3_key_format: "/$TAG/%Y/%m/%d/%H_%M_%S.gz"
...

File - Fluent Bit

Description

The file output plugin allows to write the data received through the input plugin to file. See fluent bit page for more information.

Parameters

ParameterDescriptionTypeDefault
fileSet file name to store the records. If not set, the file name will be the tag associated with the records.strN/A (Optional)
formatThe format of the file content. See also Format section.str(out_file,plain,csv,ltsv)"out_file"
mkdirRecursively create output directory if it does not exist. Permissions set to 0755.booltrue
pathDirectory path to store files. If not set, Fluent Bit will write the files on it's own positioned directory.str"$HOME/data/"
delimiterThe character to separate each data. Default to ',' if format=csv, '\t'(TAB) if format=ltsv, '' else.str'' or ',' or '\t'
label_delimiterThe character to separate label and the value. Default: ':'. Used for ltsv.str'.'

Node configuration

...
flb_file:
  plugin: "dc_destinations/FlbFile"
  inputs: ["/dc/group/cameras"]
  file: "data"
  format: "out_file"
  mkdir: true
  path: "$HOME/data/"
...

Files metrics - Fluent Bit

Description

Tracks files (yaml files, jpg pictures etc.) sent to their destinations (s3, minio etc.). We can then delete files locally when they have been sent to all their storage destinations.

It loads a Fluent Bit plugin we wrote ourselves to do this task, located in the fluent_bit_plugins package.

It creates a table on the database (currently PostgreSQL only) to store each file status.

erDiagram
    files_metrics {
        integer id
        timestamp timestamp
        text robot_name
        text robot_id
        text group_name
        double duration
        text local_path
        text remote_path
        bool uploaded
        bool on_filesystem
        bool deleted
        bool ignored
        text storage_type
        text content_type
        integer size
        timestamp created_at
        timestamp updated_at
    }

Parameters

ParameterDescriptionTypeDefault
db_typeDatabase used.str"pgsql"
file_storageWhere data will be stored.list[str](s3, minio)N/A (Mandatory)
delete_when_sentDelete file when sent.booltrue
minio.access_key_idMinio Access key ID.strN/A (Mandatory)
minio.bucketMinio bucket name.str"dc_bucket"
minio.endpointMinio endpoint.str"127.0.0.1:9000"
minio.secret_access_keyMinio Secret access key.strN/A (Mandatory)
minio.src_fieldsJSON fields containing local paths for Minio.strN/A (Mandatory)
minio.upload_fieldsJSON fields containing remote paths for Minio.strN/A (Mandatory)
minio.use_sslUse SSL for Minio.booltrue
plugin_pathShared library path compiled by go.str"{package_share_directory}/flb_plugins/lib/out_files_metrics.so"
pgsql.databaseDatabase name to connect to.str<system_username>
pgsql.hostHostname/IP address of the PostgreSQL instance.str"127.0.0.1"
pgsql.passwordPassword of PostgreSQL username.str""
pgsql.portPostgreSQL port.str"5432"
pgsql.sslUse ssl for PostgreSQL connection.booltrue
pgsql.tableTable name where to store data.str"pg_table"
pgsql.userPostgreSQL username.str<system_username>
plugin_pathShared library of the plugin.str<install_path>/lib/out_files_metrics.so
s3.access_key_idS3 Access key ID.strN/A (Mandatory)
s3.bucketS3 bucket name.strN/A (Mandatory)
s3.endpointS3 endpoint.strN/A (Mandatory)
s3.secret_access_keyS3 Secret access key.strN/A (Mandatory)
s3.src_fieldsJSON fields containing local paths for s3.strN/A (Mandatory)
s3.upload_fieldsJSON fields containing remote paths for S3.strN/A (Mandatory)

Example

flb_files_metrics:
    plugin: "dc_destinations/FlbFilesMetrics"
    inputs: ["/dc/group/map"]
    file_storage: ["minio", "s3"]
    db_type: "pgsql"
    debug: true
    delete_when_sent: true
    minio:
        endpoint: 127.0.0.1:9000
        access_key_id: XEYqG4ZcPY5jiq5i
        secret_access_key: ji011KCtI82ZeQS6UwsQAg8x9VR4lSaQ
        use_ssl: false
        bucket: "mybucket"
        src_fields: ["map.local_paths.pgm", "map.local_paths.yaml"]
        upload_fields: ["map.minio_paths.pgm", "map.minio_paths.yaml"]
    s3:
        endpoint: 127.0.0.1:9000
        access_key_id: XEYqG4ZcPY5jiq5i
        secret_access_key: ji011KCtI82ZeQS6UwsQAg8x9VR4lSaQ
        bucket: "mybucket"
        src_fields: ["map.local_paths.yaml"]
        upload_fields: ["map.s3_paths.yaml"]
    pgsql:
        host: "127.0.0.1"
        port: "5432"
        user: fluentbit
        password: password
        database: "fluentbit"
        table: "files_metrics"
        timestamp_key: "date"
        time_format: "double"
        time_key: "date"
        ssl: false

HTTP - Fluent Bit

Description

The http output plugin allows to flush your records into a HTTP endpoint. For now the functionality is pretty basic and it issues a POST request with the data records in MessagePack (or JSON) format. See fluent bit page for more information.

Parameters

ParameterDescriptionTypeDefault
allow_duplicated_headersSpecify if duplicated headers are allowed. If a duplicated header is found, the latest key/value set is preserved.booltrue
formatSpecify the data format to be used in the HTTP request body, by default it uses msgpack. Other supported formats are json, json_stream and json_lines and gelf.str"json"
headerAdd a HTTP header key/value pair. Multiple headers can be set.strN/A
headers_keySpecify the key to use as the headers of the request (must prefix with "$"). The key must contain a map, which will have the contents merged on the request headers. This can be used for many purposes, such as specifying the content-type of the data contained in body_key.strN/A
header_tagSpecify an optional HTTP header field for the original message tag.strN/A
http_userBasic Auth Username.strN/A
http_passwdBasic Auth Password. Requires HTTP_User to be set.strN/A
json_date_formatSpecify the format of the date. Supported formats are double, epoch, iso8601 (eg: 2018-05-30T09:39:52.000681Z) and java_sql_timestamp (eg: 2018-05-30 09:39:52.000681).str"double"
json_date_keySpecify the name of the time key in the output record. To disable the time key just set the value to false.str"date"
hostIP address or hostname of the target HTTP Server.str"127.0.0.1"
log_response_payloadSpecify if the response paylod should be logged or not.strtrue
portTCP port of the target HTTP Serverstr"80"
uriSpecify an optional HTTP URI for the target web server, e.g: /something.str"/"

Node configuration

...
flb_http:
  plugin: "dc_destinations/FlbHTTP"
  inputs: ["/dc/measurement/data"]
  host: "127.0.0.1"
  port: 80
  uri: "/"
  format: "json"
...

InfluxDB - Fluent Bit

Description

The influxdb output plugin, allows to flush your records into a InfluxDB time series database. The following instructions assumes that you have a fully operational InfluxDB service running in your system. See fluent bit page for more information.

Parameters

ParameterDescriptionTypeDefault
hostIP address or hostname of the target InfluxDB service.str"127.0.01"
portTCP port of the target InfluxDB service.int8086
databaseInfluxDB database name where records will be inserted.str"fluentbit"
bucketInfluxDB bucket name where records will be inserted - if specified, database is ignored and v2 of API is used.strN/A (Optional)
orgInfluxDB organization name where the bucket is (v2 only).str"fluent"
sequence_tagThe name of the tag whose value is incremented for the consecutive simultaneous events.str"_seq"
http_userOptional username for HTTP Basic Authentication.strN/A (Optional)
http_passwdPassword for user defined in HTTP_User.strN/A (Optional)
http_tokenAuthentication token used with InfluDB v2 - if specified, both HTTP_User and HTTP_Passwd are ignored.strN/A (Optional)
tag_keysList of keys that needs to be tagged.list[str]N/A (Optional)
auto_tagsAutomatically tag keys where value is string.boolfalse

Node configuration

...
flb_influxdb:
  plugin: "dc_destinations/FlbInfluxDB"
  inputs: ["/dc/measurement/uptime"]
  host: "127.0.0.1"
  port: 8086
  database: "ros"
...

Minio - Fluent Bit

Description

The Minio Plugin allows to send files to Minio. It uses the GO interface provided by Fluent Bit.

Difference with Fluent Bit S3 Plugin

The S3 plugin sends data as JSON to S3 or Minio but does not send files (e.g yaml, images etc.).

Parameters

ParameterDescriptionTypeDefault
plugin_pathShared library path compiled by go.str"{package_share_directory}/flb_plugins/lib/out_minio.so"
endpointEndpoint for the S3 API. An endpoint can contain scheme and port.str"127.0.0.1:9000"
access_key_idAccess keys are long-term credentials for an IAM user or the AWS account root user.strN/A (mandatory)
secret_access_keySecret-like password to connect along he access key id.strN/A (mandatory)
use_sslIf set to true, https is used instead of http. Default is true.str"true"
create_bucketWhether the bucket will be created.str"true"
bucketBucket name.str"dc_bucket"
upload_fieldsFields containing remote paths, separated by dots.list[str]N/A (mandatory)
src_fieldsFields containing local paths, separated by dots.list[str]N/A (mandatory)

Node configuration

...
flb_minio:
  verbose_plugin: false
  time_format: "iso8601"
  plugin: "dc_destinations/FlbMinIO"
  inputs: ["/dc/measurement/camera", "/dc/measurement/map"]
  endpoint: 127.0.0.1:9000
  access_key_id: XEYqG4ZcPY5jiq5i
  secret_access_key: ji011KCtI82ZeQS6UwsQAg8x9VR4lSaQ
  use_ssl: false
  create_bucket: true
  bucket: "mybucket"
  src_fields:
    [
      "local_paths.inspected"
      "local_paths.minio.pgm"
      "local_paths.minio.yaml"
    ]
  upload_fields: # Remote paths created by the measurement node configuration
    [
      "remote_paths.minio.inspected",
      "remote_paths.minio.pgm"
      "remote_paths.minio.yaml"
    ]
...

Parameter handling

flowchart TB

  subgraph ros2_plugin["Measurement plugin"]
    direction LR
    mp_src_fields["['local_paths.inspected', 'local_paths.minio.pgm', 'local_paths.minio.yaml']"]
    mp_upload_fields["['remote_paths.inspected', 'remote_paths.minio.pgm', 'remote_paths.minio.yaml']"]
  end

  subgraph flb_plugin["Fluent Bit plugin"]
    direction LR
    flb_src_fields["'local_paths.inspected, local_paths.minio.pgm, local_paths.minio.yaml'"]
    flb_upload_fields["'remote_paths.inspected, remote_paths.minio.pgm, remote_paths.minio.yaml'"]

    flb_split_src_fields["['local_paths.inspected', 'local_paths.minio.pgm', 'local_paths.minio.yaml']"]
    flb_split_upload_fields["['remote_paths.inspected', 'remote_paths.minio.pgm', 'remote_paths.minio.yaml']"]
  end

  mp_src_fields--"to string"-->flb_src_fields
  mp_upload_fields--"to string"-->flb_upload_fields
  flb_src_fields--"to go array"-->flb_split_src_fields
  flb_upload_fields--"to go array"-->flb_split_upload_fields

NULL - Fluent Bit

Description

The null output plugin just throws away events. See fluent bit page for more information.

Node configuration

...
flb_null:
  plugin: "dc_destinations/FlbNull"
  inputs: ["/dc/measurement/data"]
...

PostgreSQL - Fluent Bit

Description

PostgreSQL is a very popular and versatile open source database management system that supports the SQL language and that is capable of storing both structured and unstructured data, such as JSON objects. See fluent bit page for more information.

Parameters

ParameterDescriptionTypeDefault
hostHostname/IP address of the PostgreSQL instance.str"127.0.0.1"
portPostgreSQL port.str"5432"
userPostgreSQL username.str"{username}"
passwordPassword of PostgreSQL username.strN/A
databaseDatabase name to connect to.str"{username}"
tableTable name where to store data.strN/A
timestamp_KeyKey in the JSON object containing the record timestamp.str"date"
asyncDefine if we will use async or sync connections.boolfalse
min_pool_sizeMinimum number of connection in async mode.str"1"
max_pool_sizeMaximum amount of connections in async mode.str"4"
cockroachdbSet to true if you will connect the plugin with a CockroachDB.boolfalse

Node configuration

...
flb_pgsql:
  plugin: "dc_destinations/FlbPgSQL"
  inputs: ["/dc/group/data"]
  host: "127.0.0.1"
  port: 5432
  user: fluentbit
  password: password
  database: "fluentbit"
  table: "dc"
  timestamp_key: "date"
  async: false
  time_format: "double"
  time_key: "date"
...

Slack - Fluent Bit

Description

The Slack output plugin delivers records or messages to your preferred Slack channel. See fluent bit page for more information.

Parameters

ParameterDescriptionTypeDefault
webhookAbsolute address of the Webhook provided by Slack.strN/A (Mandatory)

Node configuration

...
flb_slack:
  plugin: "dc_destinations/FlbSlack"
  inputs: ["/dc/group/data"]
  webhook: https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX
...

File - Fluent Bit

Description

The stdout output plugin allows to print to the standard output the data received through the input plugin. See fluent bit page for more information.

Parameters

ParameterDescriptionTypeDefault
formatSpecify the data format to be printed. Supported formats are msgpack json, json_lines and json_stream.str"json"
json_date_keySpecify the name of the time key in the output record. To disable the time key just set the value to false.str"date"
json_date_formatSpecify the format of the date. Supported formats are double, epoch, iso8601 (eg: 2018-05-30T09:39:52.000681Z) and java_sql_timestamp (eg: 2018-05-30 09:39:52.000681).str"double"

Node configuration

...
flb_stdout:
  plugin: "dc_destinations/FlbStdout"
  inputs: ["/dc/group/data"]
  format: "json"
  json_date_key: "date"
  json_date_format: "iso8601"
...

TCP & TLS - Fluent Bit

Description

The tcp output plugin allows to send records to a remote TCP server. The payload can be formatted in different ways as required.. See fluent bit page for more information.

Parameters

ParameterDescriptionTypeDefault
hostTarget host where Fluent-Bit or Fluentd are listening for Forward messages.str"127.0.0.1"
portTCP Port of the target service.int(>0 <65536)5170
formatSpecify the data format to be printed. Supported formats are msgpack json, json_lines and json_stream.str"msgpack"
json_date_keySpecify the name of the time key in the output record. To disable the time key just set the value to false.str"date"
json_date_formatSpecify the format of the date. Supported formats are double, epoch, iso8601 (eg: 2018-05-30T09:39:52.000681Z) and java_sql_timestamp (eg: 2018-05-30 09:39:52.000681)str"double"
workersEnables dedicated thread(s) for this output. Default value is set since version 1.8.13. For previous versions is 0.int2
tls.activeEnable or disable TLS support.boolfalse
tls.verifyForce certificate validation.booltrue
tls.debugSet TLS debug verbosity level. It accept the following values: 0 (No debug), 1 (Error), 2 (State change), 3 (Informational) and 4 Verbose.int(>=0 <=4)1
tls.ca_fileAbsolute path to CA certificate file.strN/A (Optional)
tls.crt_fileAbsolute path to Certificate file.strN/A (Optional)
tls.key_fileAbsolute path to private Key file.strN/A (Optional)
tls.key_passwdOptional password for tls.key_file file.strN/A (Optional)

Node configuration

...
flb_tcp:
  plugin: "dc_destinations/FlbTCP"
  inputs: ["/dc/measurement/uptime"]
  host: "127.0.0.1"
  port: 5170
...

RCL - RCLCPP

Description

The RCL output collects the data with ROS and display it with RCLCPP_INFO call.

Warning

Note that plugins which are not using Fluent Bit like this one don't have any data integrity and persistence. Do not use in production!

Configuration examples

Through minimal code examples, you will learn how to collect and send data with DC.

It will progressively present all features.

Info

By here, you must have built the workspace following the setup guide.

Copy the configuration and save it as a yaml file, and then run:

ros2 launch dc_bringup params_file:="my_file.yaml"

Running the examples

Example 1: Uptime to Fluent Bit Stdout every second and flush data every second

destination_server:                           # Destination node configuration
  ros__parameters:
    destination_plugins: ["flb_stdout"]       # List of destination plugins names to enable
    flb_stdout:                               # Plugin name, you choose
      plugin: "dc_destinations/FlbStdout"     # Plugin class name, fixed
      inputs: ["/dc/measurement/uptime"]      # Same as topic_output in the uptime measurement in measurement_server

measurement_server:                           # Measurement node configuration
  ros__parameters:
    measurement_plugins: ["uptime"]           # List of measurement plugins names to enable
    uptime:                                   # Plugin name, you choose
      plugin: "dc_measurements/Uptime"        # Plugin class name, fixed
      topic_output: "/dc/measurement/uptime"  # Topic where data will be published
      tags: ["flb_stdout"]                    # Fluent Bit will match this in the destination server

Example 2: Uptime to Fluent Bit Stdout every second and flush data every 3 seconds

destination_server:
  ros__parameters:
    flb:
      flush: 3                               # Interval to flush output (seconds)
    destination_plugins: ["flb_stdout"]
    flb_stdout:
      plugin: "dc_destinations/FlbStdout"
      inputs: ["/dc/measurement/uptime"]

measurement_server:
  ros__parameters:
    measurement_plugins: ["uptime"]
    uptime:
      plugin: "dc_measurements/Uptime"
      topic_output: "/dc/measurement/uptime"
      tags: ["flb_stdout"]

Example 3: Uptime to Fluent Bit Stdout only at start and 3 times

destination_server:
  ros__parameters:
    destination_plugins: ["flb_stdout"]
    flb_stdout:
      plugin: "dc_destinations/FlbStdout"
      inputs: ["/dc/measurement/uptime"]

measurement_server:
  ros__parameters:
    measurement_plugins: ["uptime"]
    uptime:
      plugin: "dc_measurements/Uptime"
      topic_output: "/dc/measurement/uptime"
      tags: ["flb_stdout"]
      init_max_measurements: 3               # Maximum records to collect

Example 4: CPU and Memory to Fluent Bit Stdout every 5 seconds forever

destination_server:
  ros__parameters:
    destination_plugins: ["flb_stdout"]
    flb_stdout:
      plugin: "dc_destinations/FlbStdout"
      inputs: ["/dc/measurement/cpu", "/dc/measurement/memory"]

measurement_server:
  ros__parameters:
    measurement_plugins: ["memory", "cpu"]
    memory:
      plugin: "dc_measurements/Memory"
      topic_output: "/dc/measurement/memory"
      polling_interval: 5000                  # Interval to which data is collected in milliseconds
      tags: ["flb_stdout"]
    cpu:
      plugin: "dc_measurements/Cpu"
      topic_output: "/dc/measurement/cpu"
      polling_interval: 5000                  # Interval to which data is collected in milliseconds
      tags: ["flb_stdout"]

Example 5: CPU and Memory as a group to Fluent Bit Stdout every 5 seconds forever

destination_server:
  ros__parameters:
    destination_plugins: ["flb_stdout"]
    flb_stdout:
      plugin: "dc_destinations/FlbStdout"
      inputs: ["/dc/group/memory_cpu"]        # Group to create

group_server:                                 # Group server configuration
  ros__parameters:
    groups: ["cpu_memory"]
    cpu_memory:
      inputs: ["/dc/measurement/cpu", "/dc/measurement/memory"] # Topics which are subscribed
      output: "/dc/group/cpu_memory"          # Topic where result will be published
      sync_delay: 5.0                         # How long to queue up messages before passing them through.
      group_key: "cpu_memory"
      tags: ["flb_stdout"]

measurement_server:
  ros__parameters:
    measurement_plugins: ["memory", "cpu"]
    memory:
      plugin: "dc_measurements/Memory"
      topic_output: "/dc/measurement/memory"
      polling_interval: 5000
      tags: ["flb_stdout"]
    cpu:
      plugin: "dc_measurements/Cpu"
      topic_output: "/dc/measurement/cpu"
      polling_interval: 5000
      tags: ["flb_stdout"]

Example 6: Custom ROS message to Stdout every 2 seconds forever

destination_server:
  ros__parameters:
    destination_plugins: ["flb_stdout"]
    flb_stdout:
      plugin: "dc_destinations/FlbStdout"
      inputs: ["/dc/measurement/string_stamped"]

measurement_server:
  ros__parameters:
    measurement_plugins: ["my_string_stamped"]
    my_string_stamped:
      plugin: "dc_measurements/StringStamped"           # Plugin that allow to publish from your nodes
      topic_output: "/dc/measurement/my_string_stamped" # Topic where data is republished with tags
      topic: "/hello-world"                             # Input topic where you are publishing
      tags: ["flb_stdout"]
      polling_interval: 2000
      enable_validator: false                           # By default, StringStamped message does not have a JSON schema since it uses custom input data

You will then need in another terminal to publish data on the input topic (/hello-world)

ros2 topic pub -r 1 /hello-world dc_interfaces/msg/StringStamped  "{data: '{\"hello\":\"world\"}'}"

Example 7: Custom ROS message to Stdout every time it is published

destination_server:
  ros__parameters:
    destination_plugins: ["flb_stdout"]
    flb_stdout:
      plugin: "dc_destinations/FlbStdout"
      inputs: ["/dc/measurement/string_stamped"]

measurement_server:
  ros__parameters:
    measurement_plugins: ["my_string_stamped"]
    my_string_stamped:
      plugin: "dc_measurements/StringStamped"
      topic_output: "/dc/measurement/my_string_stamped"
      topic: "/hello-world"
      tags: ["flb_stdout"]
      enable_validator: false
      timer_based: false                                 # Get all data published on the input topic. Ignores polling_interval

Now that you know how it works, you can set your own measurements and destinations.

Infrastructure setup

InfluxDB

Description

PostgreSQL is a powerful, open source object-relational database system that uses and extends the SQL language combined with many features that safely store and scale the most complicated data workloads. The origins of PostgreSQL date back to 1986 as part of the POSTGRES project at the University of California at Berkeley and has more than 35 years of active development on the core platform.

Start with docker compose

Execute:

./tools/infrastructure/scripts/install_infrastructure.bash \
  --tool=influxdb
  --install-type=docker

Start natively

./tools/infrastructure/scripts/install_infrastructure.bash \
  --tool=influxdb
  --install-type=native

Credentials

UserPasswordDatabasePort
adminadmindc8086

Ip cameras

To start the ip camera example, to save video feeds from rtsp cameras, you can optionally create a virtual video feed if you don't have an ip camera, for testing purpose:

Start RTSP Server

A rtsp server. I use mediamtx available here. Download from the release page and start it:

./mediamtx

If the port is already used, download the configuration and edit the hlsAddress parameter. You would then need to start it this way:

./mediamtx conf.yml

Create a virtual camera

This is optional, and only useful if you want to not use a camera

You will first need to install some packages:

sudo apt-get install v4l-utils v4l2loopback

And start a virtual camera device:

sudo modprobe v4l2loopback

Now check its path:

v4l2-ctl --list-devices

Dummy video device (0x0000) (platform:v4l2loopback-000):
	/dev/video2

In my case, it is /dev/video2

To send your camera feed, run:

ffmpeg \
  -f v4l2 \
  -video_size 1920x1080 \
  -i /dev/video2 \
  -f rtsp \
  -rtsp_transport tcp rtsp://127.0.0.1:8554/mystream

Send video stream

To send an image of a clock ticking, create a blank image and call it bg-white.png (with gimp for example) and run:

ffmpeg \
  -re -loop 1 \
  -i bg-white.png \
  -vf drawtext="fontfile=monofonto.ttf: fontsize=96: box=1: boxcolor=black@0.75: boxborderw=5: fontcolor=white: x=(w-text_w)/2: y=((h-text_h)/2)+((h-text_h)/4): text='%{gmtime\:%H\\\\\:%M\\\\\:%S}'" \
  -r 25 \
  -vcodec libx264 \
  -f rtsp -rtsp_transport tcp rtsp://127.0.0.1:8554/mystream

Now you can use the ip_camera plugin with the url from the command: rtsp://127.0.0.1:8554/mystream

MinIO

Requirements

  1. Docker installed
  2. Docker compose installed

Description

MinIO is an object storage solution that provides an Amazon Web Services S3-compatible API and supports all core S3 features. MinIO is built to deploy anywhere - public or private cloud, baremetal infrastructure, orchestrated environments, and edge infrastructure.

Start with Docker compose

Execute:

./tools/infrastructure/scripts/install_infrastructure.bash \
  --tool=minio \
  --install-type=docker

Start natively

./tools/infrastructure/scripts/install_infrastructure.bash \
  --tool=minio \
  --install-type=native

How to use

This will start MinIO API and dashboard (a GUI to see and interact with the data)

By accessing localhost:9001, you should be able to see this page:

MinIO

Username: minioadmin Password: minioadmin

Create an access key

Go to the Access Keys page and click on "Create access key"

MinIO Access Keys

Take note of the access and secret key and add them in your required launch file configuration late on.

Credentials

UserPasswordPort
minioadminminioadmin9000, 9001

PostgreSQL

Description

PostgreSQL is a powerful, open source object-relational database system that uses and extends the SQL language combined with many features that safely store and scale the most complicated data workloads. The origins of PostgreSQL date back to 1986 as part of the POSTGRES project at the University of California at Berkeley and has more than 35 years of active development on the core platform.

Start with docker compose

Execute:

./tools/infrastructure/scripts/install_infrastructure.bash \
  --tool=postgresql \
  --install-type=docker

Start natively

Execute:

./tools/infrastructure/scripts/install_infrastructure.bash \
  --tool=postgresql \
  --install-type=native

Credentials

UserPasswordDatabasePort
dcpassworddc5432

CLI tools

List plugins

You can list available plugins by running the CLI tool:

ros2 run dc_cli list_plugins --help


 Usage: list_plugins [OPTIONS] COMMAND [ARGS]...

╭─ Options ─────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ --install-completion          Install completion for the current shell.                                                   │
│ --show-completion             Show completion for the current shell, to copy it or customize the installation.            │
│ --help                        Show this message and exit.                                                                 │
╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Commands ────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ by-package       List plugins of a pluginlib file with their descriptions, by package name and filename.                  │
│ by-path          List plugins of a pluginlib file with their descriptions, by their path.                                 │
│ conditions       List condition plugins with their descriptions.                                                          │
│ destinations     List destination plugins with their descriptions.                                                        │
│ measurements     List measurement plugins with their descriptions.                                                        │
╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯

Requirements

Requirements are tracked using Strictdoc.

The goal is to list tasks and do test based requirements.

Future work and Roadmap

DC still being in early development, it requires:

  1. High test coverage
  2. Automation: docker image build, PR and commit builds
  3. More measurement plugins
  4. More destination plugins
  5. Guide for the Backend developer: I would like DC to be simple enough so a web developer can run it, get the data and work on its backend application
  6. Guide for the ROS developer: I would like DC to be simple enough so a ROS developer can collect data without the expertise of knowing how to manage databases
  7. Handle python destination plugins
  8. More use cases showcased in demos

Participating

TLDR

EventWhat to do
Want to contributeOpen a PR
Found a bugFile a ticket on Github Issues
Feature requestDescribe what you want on Github Discussions
Want to start a discussionStart one on Github Discussions
Be aware of the ongoing developmentTake a look at the Github Project and what is being worked on

Contributing

Feature requests

Since I want DC to be community driven, go to Github discussions, start a discussion about a features you want to see and users will be able to vote for your it. Most requested features will have more attention than others.

Found a bug?

If you find a problem, first search if an issue already exists. If a related issue doesn't exist, you can open a new issue using the issue form.

General guidelines

You can contribute to the source code with Pull Requests, for example:

  • To fix a typo you found on the documentation.
  • To propose new documentation sections.
  • To fix an existing issue/bug.
    • Make sure to add tests.
  • To add a new feature.
    • Make sure to add tests.
    • Make sure to add documentation if it's relevant.

Setup environment

ROS

Follow the steps to build your workspace and install dependencies in the setup section

Then, install the pre-commit hook:

pre-commit install

You are now ready to write some code, commit and follow the standards with the pre-commit hook.

Docs

To build the docs, install cargo:

sudo apt-get install cargo

Then install mdbook (the command line tool to create books with Markdown) and its plugins:

cargo install mdbook mdbook-admonish mdbook-linkcheck mdbook-mermaid

Start the doc locally with auto-refresh on edit:

export PATH=$PATH:$HOME/.cargo/bin
mdbook serve -p 8000 -n 0.0.0.0

And open localhost:8000

Now that you open see the documentation locally, open the doc folder of the repository and edit the Markdown files you need. More about mdbook can be found here

Tests

TODO...

FAQ

I can't find a measurement and/or destination I need

Even though measurements and destinations will constantly be added in the future, the current focus is on getting feedback, fixing bugs, documentation and reach a minimum test coverage.

Create a feature request in Github Discussions...or better, write your plugin and do a Pull Request.

Can I use DC without Fluent Bit backend?

It is possible to use DC without Fluent Bit as backend. There is an example with the RCL destination, which subscribes to a ROS topic and prints it in the console.

Still, a lot of work would be required to disable Fluent Bit and keep similar features (backpressure handling, data persistence and measurements and destinations available)

How can I add a Fluent Bit plugin?

Currently, from my knowledge, it is only possible to load a plugin using the C api which loads a shared library. Because of that, I expect more languages can be used, such as Rust, the only requirement is to use the bindings. For measurements, it is not a problem, we can write a node in any language publishing on a StringStamped message and use the measurement plugin with the same name.

Destination plugins can be written in C or Go. You can find examples in the fluent_bit_plugins package.

How can I use a Python API with Fluent Bit

Adding a destination plugin which uses a Python library is not cleared. We have not yet solved this problem and would welcome one, since many API libraries are available in this language.

Note for the courageous ones: I tried once to create python-C bindings of Fluent Bit, by creating .h files like in the fluent-bit-go and use ctypesgen(https://github.com/ctypesgen/ctypesgen), like this:

ctypesgen \
  --allow-gnu-c \
  --output-language=py32 \
  ~/ws/fluent-bit/include/fluent-bit.h \
  -I./lib/msgpack-c/include/ \
  -I./lib/monkey/include \
  -I./build/lib/monkey/include/monkey \
  -I./lib/cfl/include \
  -I./lib/cfl/lib/xxhash \
  -I./include \
  -I./lib/flb_libco \
  -I./lib/c-ares-1.19.0/include \
  -I./lib/cmetrics/include \
  -I./lib/ctraces/include \
  -I./lib/ctraces/lib/mpack/src \
  -o fluent_bit.py

It creates a base for the bindings. While I suppose it is possible to generate them, I'm puzzled as how this will be loaded by Fluent Bit afterwards.

Whatever you write, feel free to make a Pull Request and share your work!

My group data is not published on the group topic

This may happen for different reasons:

  1. The group node is not started, be sure it is (ros2 node list), you will need to enable it in your launch file or using the group_node:=true when launching the bringup
  2. Data is not being published on all topics it subscribes to (use ros2 topic echo on each to ensure that). Currently, if all topics don't publish, there is no way to ignore that and still send the message partially full (see here). Help is welcome here :)

About and Contact

About

Our team currently includes:

NameGitHub IDCurrent Role
David BensoussanMinipada

Contact

For any inquiry, please contact David (d.bensoussan@proton.me). If your inquiry relates to bugs or open-source feature requests, consider posting a ticket on our GitHub project. If your inquiry relates to configuration support or private feature development, reach out and we will be able to support you in your projects.