Skip to content
Kruno Golubić
Go back

Integrating Confluent's Kafka Platform with Memgraph for Efficient Data Management

Kruno Golubić

Note: This article was originally published on the Memgraph blog.

In today’s world, data sources are everywhere, constantly producing and demanding data. The flow of data is increasing exponentially, a trend clearly visible in the number of connected devices around us. The critical question then becomes: how do we effectively process and store this data? Here, the integration of Confluent’s Kafka platform with Memgraph offers a solution.

Memgraph, as an in-memory graph database, is exceptional in rapid data ingestion and processing. This capability is significantly enhanced when integrated with Kafka, particularly the Kafka platform provided by Confluent.

Kafka and Confluent Kafka: What’s the Difference?

Apache Kafka is an open-source stream-processing platform developed by the Apache Software Foundation, primarily focused on high-throughput, real-time data handling. Confluent Kafka, provided by Confluent, Inc., extends the basic Kafka platform with enterprise-ready features like Kafka Connect and Kafka Streams. While Apache Kafka lays the groundwork, Confluent Kafka builds upon it for enterprise-level data architecture.

Integrating Kafka Connect with Memgraph to Ingest IoT Data Streams

IoT devices continuously produce data streams. Kafka handles these efficiently, and when combined with Memgraph’s in-memory processing power, this enables rapid analysis and querying of data as it streams in. The combination opens up new horizons in IoT data management, making it possible to leverage real-time data for immediate decision-making and predictive analytics.

Setting Up the Environment

The orchestration involves key components of Confluent’s Kafka platform:

Setting Up Your Project Environment

First, create a folder for your project and create a docker-compose.yml file with the following content:

---
version: "3"
services:
  memgraph:
    image: "memgraph/memgraph-platform"
    hostname: memgraph
    ports:
      - "7687:7687"
      - "3000:3000"
      - "7444:7444"
    volumes:
      - mg_lib:/var/lib/memgraph
      - mg_log:/var/log/memgraph
      - mg_etc:/etc/memgraph

  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:7.3.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:7.3.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  connect:
    image: cnfldemos/cp-server-connect-datagen:0.6.0-7.3.0
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    volumes:
      - ./plugins:/tmp/connect-plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/tmp/connect-plugins"
    command:
      - bash
      - -c
      - |
        confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:latest
        /etc/confluent/docker/run

  control-center:
    image: confluentinc/cp-enterprise-control-center:7.3.0
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      PORT: 9021

volumes:
  mg_lib:
  mg_log:
  mg_etc:

Starting the Environment

docker compose up -d

Check the status:

docker compose ps

Configuring Memgraph as the Source for Kafka

Create a source.memgraph.json file:

{
  "name": "Neo4jSourceConnectorAVRO",
  "config": {
    "topic": "my-topic",
    "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.server.uri": "bolt://memgraph:7687",
    "neo4j.authentication.basic.username": "",
    "neo4j.authentication.basic.password": "",
    "neo4j.streaming.poll.interval.msecs": 5000,
    "neo4j.streaming.property": "timestamp",
    "neo4j.streaming.from": "LAST_COMMITTED",
    "neo4j.enforce.schema": true,
    "neo4j.source.query": "MATCH (sd:SensorData) RETURN sd.sensorId AS sensorId, sd.temperature AS temperature, sd.humidity AS humidity, sd.timestamp AS timestamp"
  }
}

Then run:

curl -X POST http://localhost:8083/connectors \
 -H "Content-Type:application/json" \
 -H "Accept:application/json" \
 -d @source.memgraph.json

Creating Sensor Data Nodes in Memgraph

Run these Cypher queries in Memgraph Lab:

CREATE (:SensorData {sensorId: 'S101', temperature: 22.5, humidity: 45, timestamp: localDateTime()});
CREATE (:SensorData {sensorId: 'S102', temperature: 24.1, humidity: 50, timestamp: localDateTime()});
CREATE (:SensorData {sensorId: 'S103', temperature: 23.3, humidity: 48, timestamp: localDateTime()});

Setting Up the Sink Instance

Create a sink.memgraph.json file:

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "my-topic",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://memgraph:7687",
    "neo4j.authentication.basic.username": "",
    "neo4j.authentication.basic.password": "",
    "neo4j.topic.cypher.my-topic": "MERGE (s:SensorData {sensorId: event.sensorId}) ON CREATE SET s.temperature = event.temperature, s.humidity = event.humidity, s.timestamp = event.timestamp ON MATCH SET s.temperature = event.temperature, s.humidity = event.humidity, s.timestamp = event.timestamp"
  }
}

Then run:

curl -X POST http://localhost:8083/connectors \
 -H "Content-Type:application/json" \
 -H "Accept:application/json" \
 -d @sink.memgraph.json

Conclusion

The combination of Kafka’s data streaming capabilities with Memgraph’s rapid in-memory processing allows for not just effective data capture and storage, but also for real-time data analysis. This integration transforms raw data into intelligence, enabling real-time decision-making and enhancing the overall responsiveness and performance of IoT systems.


Share this post on:

Previous Post
Memgraph vs. Amazon Neptune: A Graph Database Comparison
Next Post
Optimizing Graph Databases through Denormalization