Note: This article was originally published on the Memgraph blog.
In today’s world, data sources are everywhere, constantly producing and demanding data. The flow of data is increasing exponentially, a trend clearly visible in the number of connected devices around us. The critical question then becomes: how do we effectively process and store this data? Here, the integration of Confluent’s Kafka platform with Memgraph offers a solution.
Memgraph, as an in-memory graph database, is exceptional in rapid data ingestion and processing. This capability is significantly enhanced when integrated with Kafka, particularly the Kafka platform provided by Confluent.
Kafka and Confluent Kafka: What’s the Difference?
Apache Kafka is an open-source stream-processing platform developed by the Apache Software Foundation, primarily focused on high-throughput, real-time data handling. Confluent Kafka, provided by Confluent, Inc., extends the basic Kafka platform with enterprise-ready features like Kafka Connect and Kafka Streams. While Apache Kafka lays the groundwork, Confluent Kafka builds upon it for enterprise-level data architecture.
Integrating Kafka Connect with Memgraph to Ingest IoT Data Streams
IoT devices continuously produce data streams. Kafka handles these efficiently, and when combined with Memgraph’s in-memory processing power, this enables rapid analysis and querying of data as it streams in. The combination opens up new horizons in IoT data management, making it possible to leverage real-time data for immediate decision-making and predictive analytics.
Setting Up the Environment
The orchestration involves key components of Confluent’s Kafka platform:
- Zookeeper: Handles configuration management and synchronization across the Kafka ecosystem
- Kafka Broker: The core message broker facilitating messaging and streaming
- Schema Registry: Manages the schema of data traversing through Kafka
- Kafka Connect: Bridges Kafka with external systems, including Memgraph
Setting Up Your Project Environment
First, create a folder for your project and create a docker-compose.yml file with the following content:
---
version: "3"
services:
memgraph:
image: "memgraph/memgraph-platform"
hostname: memgraph
ports:
- "7687:7687"
- "3000:3000"
- "7444:7444"
volumes:
- mg_lib:/var/lib/memgraph
- mg_log:/var/log/memgraph
- mg_etc:/etc/memgraph
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.3.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:7.3.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: cnfldemos/cp-server-connect-datagen:0.6.0-7.3.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
volumes:
- ./plugins:/tmp/connect-plugins
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/tmp/connect-plugins"
command:
- bash
- -c
- |
confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:latest
/etc/confluent/docker/run
control-center:
image: confluentinc/cp-enterprise-control-center:7.3.0
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
PORT: 9021
volumes:
mg_lib:
mg_log:
mg_etc:
Starting the Environment
docker compose up -d
Check the status:
docker compose ps
Configuring Memgraph as the Source for Kafka
Create a source.memgraph.json file:
{
"name": "Neo4jSourceConnectorAVRO",
"config": {
"topic": "my-topic",
"connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"neo4j.server.uri": "bolt://memgraph:7687",
"neo4j.authentication.basic.username": "",
"neo4j.authentication.basic.password": "",
"neo4j.streaming.poll.interval.msecs": 5000,
"neo4j.streaming.property": "timestamp",
"neo4j.streaming.from": "LAST_COMMITTED",
"neo4j.enforce.schema": true,
"neo4j.source.query": "MATCH (sd:SensorData) RETURN sd.sensorId AS sensorId, sd.temperature AS temperature, sd.humidity AS humidity, sd.timestamp AS timestamp"
}
}
Then run:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type:application/json" \
-H "Accept:application/json" \
-d @source.memgraph.json
Creating Sensor Data Nodes in Memgraph
Run these Cypher queries in Memgraph Lab:
CREATE (:SensorData {sensorId: 'S101', temperature: 22.5, humidity: 45, timestamp: localDateTime()});
CREATE (:SensorData {sensorId: 'S102', temperature: 24.1, humidity: 50, timestamp: localDateTime()});
CREATE (:SensorData {sensorId: 'S103', temperature: 23.3, humidity: 48, timestamp: localDateTime()});
Setting Up the Sink Instance
Create a sink.memgraph.json file:
{
"name": "Neo4jSinkConnectorAVRO",
"config": {
"topics": "my-topic",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"neo4j.server.uri": "bolt://memgraph:7687",
"neo4j.authentication.basic.username": "",
"neo4j.authentication.basic.password": "",
"neo4j.topic.cypher.my-topic": "MERGE (s:SensorData {sensorId: event.sensorId}) ON CREATE SET s.temperature = event.temperature, s.humidity = event.humidity, s.timestamp = event.timestamp ON MATCH SET s.temperature = event.temperature, s.humidity = event.humidity, s.timestamp = event.timestamp"
}
}
Then run:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type:application/json" \
-H "Accept:application/json" \
-d @sink.memgraph.json
Conclusion
The combination of Kafka’s data streaming capabilities with Memgraph’s rapid in-memory processing allows for not just effective data capture and storage, but also for real-time data analysis. This integration transforms raw data into intelligence, enabling real-time decision-making and enhancing the overall responsiveness and performance of IoT systems.