Integration: Ray
Run and scale Haystack Pipelines with Ray in distributed manner
Table of Contents
Overview
ray-haystack
is a python package which allows running
Haystack pipelines on
Ray
in a distributed manner. The package provides the same API to build and run Haystack pipelines, but under the hood, components are being distributed to remote nodes for execution using Ray primitives.
Specifically,
Ray Actor is created for each component in a pipeline to run
its logic.
The purpose of this library is to showcase the ability to run Haystack in a distributed setup with Ray featuring its options to configure the payload, e.g:
- Control with resources how much CPU/GPU is needed for a component to run (per each component if needed)
- Manage environment dependencies for components to run on dedicated machines.
- Run pipeline on Kubernetes using KubeRay
Most of the time, you will run Haystack pipelines on your local environment; even in production, you will want to run the pipeline on a single node if the goal is to return a response quickly to the user without the overhead you would usually get with a distributed setup. However, in the case of long running and complex RAG pipelines distributed way might help:
- Not every component needs GPU, most will use some external API calls. With Ray it should be possible to assign respective resource requirements (CPU, RAM) per component execution needs.
- Some components might take longer to run, so ideally, if there is an option to parallelize component execution, it would decrease pipeline run time.
- With asynchronous execution, it should be possible to interact with different component execution stages (e.g. fire an event before and after the component starts).
ray-haystack
provides a custom implementation for pipeline execution logic with the goal to stay as compliant as possible with native Haystack implementation.
In most cases, you should expect the same results (outputs) from pipeline runs. On top of that, the package will parallelize component runs where possible.
Components with no active dependencies can be scheduled without waiting for currently running components.
Installation
ray-haystack
can be installed as any other Python library, using pip:
pip install ray-haystack
The package should work with python version 3.8 and onwards. If you plan to use ray-haystack
with an existing Ray cluster, make sure you align python and ray
versions with those running in the cluster.
Note The
ray-haystack
package will install bothhaystack-ai
andray
as transitive dependencies. The minimum supported version of haystack is2.6.0
.
If you would like to see Ray dashboard when starting Ray cluster locally, install Ray as follows:
pip install -U "ray[default]"
pip install ray-haystack
While pipeline is running locally, access the dashboard in the browser at http://localhost:8265.
Usage
Start with an example
Once ray-haystack
is installed, let’s demonstrate how it works by running a simple example.
We will build a pipeline that fetches RSS news headlines from the list of given URLs and converts each headline to a Document
with content equal to the headline title. We then ask LLM (OpenAIGenerator
) to create a news summary from the list of converted Documents, given a prompt template
.
import io
import os
from typing import List, Optional
from xml.etree.ElementTree import parse as parse_xml
import ray # Import ray
from haystack import Document, component
from haystack.components.builders import PromptBuilder
from haystack.components.fetchers import LinkContentFetcher
from haystack.components.generators import OpenAIGenerator
from haystack.components.joiners import DocumentJoiner
from haystack.dataclasses import ByteStream
from ray_haystack import RayPipeline # Import RayPipeline (instead of `from haystack import Pipeline`)
# Please introduce your OpenAI Key here
os.environ["OPENAI_API_KEY"] = "You OpenAI Key"
@component
class XmlConverter:
"""
Custom component which parses given RSS feed (from ByteStream) and extracts values by a
given XPath, e.g. ".//channel/item/title" will find "title" for each RSS feed item.
A Document is created for each extracted title. The `category` attribute can be used as
an additional metadata field.
"""
def __init__(self, xpath: str = ".//channel/item/title", category: Optional[str] = None):
self.xpath = xpath
self.category = category
@component.output_types(documents=List[Document])
def run(self, sources: List[ByteStream]):
documents: List[Document] = []
for source in sources:
xml_content = io.StringIO(source.to_string())
documents.extend(
Document(content=elem.text, meta={"category": self.category})
for elem in parse_xml(xml_content).findall(self.xpath) # noqa: S314
if elem.text
)
return {"documents": documents}
template = """
Given news headlines below provide a summary of what is happening in the world right now in a couple of sentences.
You will be given headline titles in the following format: "<headline category>: <headline title>".
When creating summary pay attention to common news headlines as those could be most insightful.
HEADLINES:
{% for document in documents %}
{{ document.meta["category"] }}: {{ document.content }}
{% endfor %}
SUMMARY:
"""
# Create instance of Ray pipeline
pipeline = RayPipeline()
pipeline.add_component("tech-news-fetcher", LinkContentFetcher())
pipeline.add_component("business-news-fetcher", LinkContentFetcher())
pipeline.add_component("politics-news-fetcher", LinkContentFetcher())
pipeline.add_component("tech-xml-converter", XmlConverter(category="tech"))
pipeline.add_component("business-xml-converter", XmlConverter(category="business"))
pipeline.add_component("politics-xml-converter", XmlConverter(category="politics"))
pipeline.add_component("document_joiner", DocumentJoiner(sort_by_score=False))
pipeline.add_component("prompt_builder", PromptBuilder(template=template))
pipeline.add_component("generator", OpenAIGenerator()) # "gpt-4o-mini" is the default model
pipeline.connect("tech-news-fetcher", "tech-xml-converter.sources")
pipeline.connect("business-news-fetcher", "business-xml-converter.sources")
pipeline.connect("politics-news-fetcher", "politics-xml-converter.sources")
pipeline.connect("tech-xml-converter", "document_joiner")
pipeline.connect("business-xml-converter", "document_joiner")
pipeline.connect("politics-xml-converter", "document_joiner")
pipeline.connect("document_joiner", "prompt_builder")
pipeline.connect("prompt_builder", "generator.prompt")
# Draw pipeline and save it to `pipe.png`
# pipeline.draw("pipe.png")
# Start local Ray cluster
ray.init()
# Prepare pipeline inputs by specifying RSS urls for each fetcher
pipeline_inputs = {
"tech-news-fetcher": {
"urls": [
"https://www.theverge.com/rss/frontpage/",
"https://techcrunch.com/feed",
"https://cnet.com/rss/news",
"https://wired.com/feed/rss",
]
},
"business-news-fetcher": {
"urls": [
"https://search.cnbc.com/rs/search/combinedcms/view.xml?partnerId=wrss01&id=10001147",
"https://www.business-standard.com/rss/home_page_top_stories.rss",
"https://feeds.a.dj.com/rss/WSJcomUSBusiness.xml",
]
},
"politics-news-fetcher": {
"urls": [
"https://search.cnbc.com/rs/search/combinedcms/view.xml?partnerId=wrss01&id=10000113",
"https://rss.nytimes.com/services/xml/rss/nyt/Politics.xml",
]
},
}
# Run pipeline with inputs
result = pipeline.run(pipeline_inputs)
# Print response from LLM
print("RESULT: ", result["generator"]["replies"][0])
Key takeways from the example:
- import
ray
module - import
RayPipeline
(fromray_haystack
) instead ofPipeline
class fromhaystack
- before running the pipeline, start
local ray cluster with explicit
ray.init()
call
Under the hood, RyaPipeline
creates actors for each component in the pipeline and runs it in a distributed manner until no components are left to run. By default, RyaPipeline
blocks until the pipeline finishes its execution.
Read pipeline events
In some cases, you would want to react asynchronously to particular pipeline execution points:
- when pipeline starts
- before component runs
- after component finishes
- after pipeline finishes
Internally, RayPipeline
creates an instance of
Ray Queue where such events are stored and can be consumed.
Except for the standard run
method, RayPipeline
provides a method called run_nowait
, which returns pipeline execution results without blocking current logic. We can use run_nowait
to iterate over pipeline events, e.g.
result = pipeline.run_nowait(pipeline_inputs)
for pipeline_event in result.pipeline_events_sync():
print(
f"\n>>> [{pipeline_event.time}] Source: {pipeline_event.source} | Type: {pipeline_event.type} | Data={pipeline_event.data}"
)
Component Serialization
If you run native Haystack pipeline locally, the components remain in the same python process, and there is no reason to care about the distributed setup. When RayPipeline runs in a distributed manner, it should be able to serialize components before they end up in a remote task or actor.
The ray-haystack
package relies on Haystack’s standard to_dict
and from_dict
methods to serialize and deserialize components, respectively.
Please refer to the main
documentation for a detailed explanation and features that handle edge cases.
DocumentStore with Ray
When you use
InMemoryDocumentStore or any DocumentStore which runs in memory with RayPipeline
, you will stumble upon an apparent issue: in a distributed environment, these document stores running in memory will fail to operate as components that reference the store will not point to a single instance but rather a copy of it.
ray-haystack
package provides a wrapper around InMemoryDocumentStore
by implementing a proxy pattern so that only a single instance of InMemoryDocumentStore
across Ray cluster is present. With that, components can share a single store. Use RayInMemoryDocumentStore
, RayInMemoryEmbeddingRetriever
or RayInMemoryBM25Retriever
in case you need in-memory document store in your Ray pipelines.
RayPipeline Settings
When an actor is created in Ray, we can control its behavior by providing certain settings.
ray-haystack
provides means to configure pipeline Actors with the help of RayPipelineSettings
dictionary:
from typing import Any, Dict
from ray_haystack import RayPipeline, RayPipelineSettings
settings: RayPipelineSettings = {
"common": {
"actor_options": {
"namespace": "haystack", # common namespace name for all actors
}
},
"components": {
"per_component": {
"generator": {
"actor_options": {
"num_cpus": 2, # component specific CPU resource requirement
}
}
}
},
}
# Option 1 - Pass settings through pipeline's metadata
pipeline = RayPipeline(metadata={"ray": settings})
pipeline_inputs: Dict[str, Any] = {}
# Option 2 - Pass settings when in the `run` method
pipeline.run(pipeline_inputs, ray_settings=settings)
Middleware
Sometimes it might be useful to let custom logic run before and after component actor runs the component.
It is possible to build custom middleware:
from typing import Any, Literal
import ray
from haystack.components.fetchers import LinkContentFetcher
from ray_haystack import RayPipeline, RayPipelineSettings
from ray_haystack.middleware import ComponentMiddleware, ComponentMiddlewareContext
from ray_haystack.serialization import worker_asset
ray.init()
@worker_asset
class TraceMiddleware(ComponentMiddleware):
def __init__(self, capture: Literal["input", "output", "input_and_output"] = "input_and_output"):
self.capture = capture
def __call__(self, component_input, ctx: ComponentMiddlewareContext) -> Any:
print(f"Tracer: Before running component '{ctx['component_name']}' with inputs: '{component_input}'")
outputs = self.next(component_input, ctx)
print(f"Tracer: After running component '{ctx['component_name']}' with outputs: '{outputs}'")
return outputs
pipeline = RayPipeline()
pipeline.add_component("cocktail_fetcher", LinkContentFetcher())
settings: RayPipelineSettings = {
"components": {
"per_component": {
# Middleware applies only to "cocktail_fetcher" component
"cocktail_fetcher": {
"middleware": {
"trace": {"type": "__main__.TraceMiddleware"},
},
},
}
},
}
response = pipeline.run(
{
"cocktail_fetcher": {"urls": ["https://www.thecocktaildb.com/api/json/v1/1/random.php"]},
},
ray_settings=settings,
)
Resources
- The full documentation is available in the repository
- Explore more advanced examples:
- Learn more about Ray
License
ray-haystack
is distributed under the terms of the
MIT license.