๐Ÿ†• Build and deploy Haystack pipelines with deepset Studio

Integration: Neo4j

Use the Neo4j database with Haystack

Authors
Sergey Bondarenco

Table of Contents

Overview

An integration of Neo4j graph database with Haystack v2.0 by deepset. In Neo4j Vector search index is being used for storing document embeddings and dense retrievals.

The library allows using Neo4j as a DocumentStore, and implements the required Protocol methods. You can start working with the implementation by importing it from neo4j_haystack package:

from neo4j_haystack import Neo4jDocumentStore

In addition to the Neo4jDocumentStore the library includes the following haystack components which can be used in a pipeline:

  • Neo4jEmbeddingRetriever - a typical retriever component which can be used to query vector store index and find related Documents. The component uses Neo4jDocumentStore to query embeddings.
  • Neo4jDynamicDocumentRetriever is also a retriever component in a sense that it can be used to query Documents in Neo4j. However it is decoupled from Neo4jDocumentStore and allows to run arbitrary Cypher query to extract documents. Practically it is possible to query Neo4j same way Neo4jDocumentStore does, including vector search.

The neo4j-haystack library uses Python Driver and Cypher Queries to interact with Neo4j database and hide all complexities under the hood.

Neo4jDocumentStore will store Documents as Graph nodes in Neo4j. Embeddings are stored as part of the node, but indexing and querying of vector embeddings using ANN is managed by a dedicated Vector Index.

                                   +-----------------------------+
                                   |       Neo4j Database        |
                                   +-----------------------------+
                                   |                             |
                                   |      +----------------+     |
                                   |      |    Document    |     |
                write_documents    |      +----------------+     |
          +------------------------+----->|   properties   |     |
          |                        |      |                |     |
+---------+----------+             |      |   embedding    |     |
|                    |             |      +--------+-------+     |
| Neo4jDocumentStore |             |               |             |
|                    |             |               |index/query  |
+---------+----------+             |               |             |
          |                        |      +--------+--------+    |
          |                        |      |  Vector Index   |    |
          +----------------------->|      |                 |    |
               query_embeddings    |      | (for embedding) |    |
                                   |      +-----------------+    |
                                   |                             |
                                   +-----------------------------+

In the above diagram:

  • Document is a Neo4j node (with “Document” label)
  • properties are Document attributes stored as part of the node.
  • embedding is also a property of the Document node (just shown separately in the diagram for clarity) which is a vector of type LIST[FLOAT].
  • Vector Index is where embeddings are getting indexed by Neo4j as soon as those are updated in Document nodes.

Installation

neo4j-haystack can be installed as any other Python library, using pip:

pip install --upgrade pip # optional
pip install sentence-transformers # required in order to run pipeline examples given below
pip install neo4j-haystack

Usage

Once installed, you can start using Neo4jDocumentStore as any other document stores that support embeddings.

from neo4j_haystack import Neo4jDocumentStore

document_store = Neo4jDocumentStore(
    url="bolt://localhost:7687",
    username="neo4j",
    password="passw0rd",
    database="neo4j",
    embedding_dim=384,
    embedding_field="embedding",
    index="document-embeddings", # The name of the Vector Index in Neo4j
    node_label="Document", # Providing a label to Neo4j nodes which store Documents
)

Assuming there is a list of documents available you can write/index those in Neo4j, e.g.:

documents: List[Document] = ...
document_store.write_documents(documents)

The full list of parameters accepted by Neo4jDocumentStore can be found in API documentation.

Please notice you will need to have a running instance of Neo4j database (in-memory version of Neo4j is not supported). There are several options available:

  • Docker, other options available in the same Operations Manual
  • AuraDB - a fully managed Cloud Instance of Neo4j
  • Neo4j Desktop client application

The simplest way to start database locally will be with Docker container:

docker run \
    --restart always \
    --publish=7474:7474 --publish=7687:7687 \
    --env NEO4J_AUTH=neo4j/passw0rd \
    neo4j:5.15.0

Retrieving documents

Neo4jEmbeddingRetriever component can be used to retrieve documents from Neo4j by querying vector index using an embedded query. Below is a pipeline which finds documents using query embedding as well as metadata filtering:

from typing import List

from haystack import Document, Pipeline
from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
from neo4j_haystack import Neo4jEmbeddingRetriever, Neo4jDocumentStore

document_store = Neo4jDocumentStore(
    url="bolt://localhost:7687",
    username="neo4j",
    password="passw0rd",
    database="neo4j",
    embedding_dim=384,
    index="document-embeddings",
)
documents = [
    Document(content="My name is Morgan and I live in Paris.", meta={"release_date": "2018-12-09"})]

document_embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")  
document_embedder.warm_up()
documents_with_embeddings = document_embedder.run(documents)

document_store.write_documents(documents_with_embeddings.get("documents"))

print(document_store.count_documents())
pipeline = Pipeline()
pipeline.add_component("text_embedder", SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"))
pipeline.add_component("retriever", Neo4jEmbeddingRetriever(document_store=document_store))
pipeline.connect("text_embedder.embedding", "retriever.query_embedding")

result = pipeline.run(
    data={
        "text_embedder": {"text": "What cities do people live in?"},
        "retriever": {
            "top_k": 5,
            "filters": {"field": "release_date", "operator": "==", "value": "2018-12-09"},
        },
    }
)

documents: List[Document] = result["retriever"]["documents"]
>>> output:
`[Document(id=e765764ab700b231db1eeae208d6a59047b4b93712d1a9e379ae9599128ffdbd, content: 'My name is Morgan and I live in Paris.', meta: {'release_date': '2018-12-09'}, score: 0.8416308164596558)]`

Retrieving documents using Cypher

Neo4jDynamicDocumentRetriever is a flexible retriever component which can run a Cypher query to obtain documents. The above example of Neo4jEmbeddingRetriever could be rewritten without usage of Neo4jDocumentStore:

from haystack import Document, Pipeline
from haystack.components.embedders import SentenceTransformersTextEmbedder

from neo4j_haystack import Neo4jClientConfig, Neo4jDynamicDocumentRetriever

client_config = Neo4jClientConfig(
    url="bolt://localhost:7687",
    username="neo4j",
    password="passw0rd",
    database="neo4j",
)

cypher_query = """
            CALL db.index.vector.queryNodes($index, $top_k, $query_embedding)
            YIELD node as doc, score
            MATCH (doc) WHERE doc.release_date = $release_date
            RETURN doc{.*, score}, score
            ORDER BY score DESC LIMIT $top_k
        """

embedder = SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
retriever = Neo4jDynamicDocumentRetriever(
    client_config=client_config, runtime_parameters=["query_embedding"], doc_node_name="doc"
)

pipeline = Pipeline()
pipeline.add_component("text_embedder", embedder)
pipeline.add_component("retriever", retriever)
pipeline.connect("text_embedder.embedding", "retriever.query_embedding")

result = pipeline.run(
    data={
        "text_embedder": {"text": "What cities do people live in?"},
        "retriever": {
            "query": cypher_query,
            "parameters": {"index": "document-embeddings", "top_k": 5, "release_date": "2018-12-09"},
        },
    }
)

documents: List[Document] = result["retriever"]["documents"]

Please notice how query parameters are being used in the cypher_query:

  • runtime_parameters is a list of parameter names which are going to be input slots when connecting components in a pipeline. In our case query_embedding input is connected to the text_embedder.embedding output.
  • pipeline.run specifies additional parameters to the retriever component which can be referenced in the cypher_query, e.g. top_k.

More examples

You can find more examples in the implementation repository:

You might find more technical details in the Code Reference documentation. For example, in real world scenarios there could be requirements to tune connection settings to Neo4j database (e.g. request timeout). Neo4jDocumentStore accepts an extended client configuration using Neo4jClientConfig class.

License

neo4j-haystack is distributed under the terms of the MIT license.