Vector Databases

Below is a table of the popular vector search solutions:

Tool

Free Tier

Pricing Model

Docs

FAISS

N/A, self-host

Open-source

Pinecone

2GB free

From $25/mo

Qdrant

1GB free cloud

Pay-as-you-go

Weaviate

14-day sandbox

From $25/mo

Milvus

5GB free cloud

PAYG or $99/mo dedicated

Chroma

N/A, self-host

Free (Apache 2.0)

Redis

30MB free

From $5/mo


Example Code

Below are basic usage snippets for each tool.

FAISS is primarily a C++ library with Python bindings, optimized for performance. It's typically used for in-memory indexing or self-hosted scenarios. Direct usage from Node.js/TypeScript is less common without a dedicated server or WASM compilation.

# Requires: pip install faiss-cpu # or faiss-gpu
import faiss
import numpy as np
from typing import List, Tuple, Any # Added for type hints

def create_faiss_index(dimension: int) -> faiss.Index:
    """Creates a simple FAISS index."""
    # Example: Flat L2 index
    index = faiss.IndexFlatL2(dimension)
    return index

def add_to_faiss_index(index: faiss.Index, vectors: np.ndarray):
    """Adds vectors to the FAISS index."""
    # Ensure vectors are float32
    if vectors.dtype != 'float32':
        vectors = vectors.astype('float32')
    index.add(vectors)

def search_faiss_index(index: faiss.Index, query_vector: np.ndarray, top_k: int = 5) -> Tuple[np.ndarray, np.ndarray]:
    """Searches the FAISS index."""
    if query_vector.dtype != 'float32':
        query_vector = query_vector.astype('float32')
    # Ensure query is 2D array
    if query_vector.ndim == 1:
        query_vector = np.array([query_vector])
    distances, indices = index.search(query_vector, top_k)
    return distances, indices

# Example Usage:
# d = 128 # Dimensionality of embeddings
# index = create_faiss_index(d)
# print(f"Index created. Is trained: {index.is_trained}, Total vectors: {index.ntotal}")

# data_vectors = np.random.random((1000, d)).astype('float32')
# add_to_faiss_index(index, data_vectors)
# print(f"Added {data_vectors.shape[0]} vectors. Total vectors: {index.ntotal}")

# query = np.random.random((1, d)).astype('float32')
# D, I = search_faiss_index(index, query, k=5)

# print("Distances:", D)
# print("Neighbors:", I)

# In production, you would also add functions to:
# - save_index(index, filename) -> faiss.write_index(index, filename)
# - load_index(filename) -> faiss.read_index(filename)

2. Pinecone

# Requires: pip install pinecone-client
import os
from pinecone import Pinecone, PodSpec

def init_pinecone() -> Pinecone | None:
    """Initializes Pinecone client."""
    api_key = os.environ.get("PINECONE_API_KEY")
    # environment = os.environ.get("PINECONE_ENVIRONMENT") # Legacy, use API key only
    if not api_key:
        print("Error: PINECONE_API_KEY not set.")
        return None
    try:
        # pc = Pinecone(api_key=api_key, environment=environment) # Legacy init
        pc = Pinecone(api_key=api_key)
        print("Pinecone initialized.")
        return pc
    except Exception as e:
        print(f"Error initializing Pinecone: {e}")
        return None

def create_pinecone_index_if_not_exists(pc: Pinecone, index_name: str, dimension: int, metric: str = 'cosine', environment: str = 'gcp-starter'):
    """Creates a Pinecone index if it doesn't exist."""
    if index_name not in pc.list_indexes().names:
        print(f"Creating index '{index_name}'...")
        try:
            pc.create_index(
                name=index_name,
                dimension=dimension,
                metric=metric,
                spec=PodSpec(environment=environment) # Specify environment here
            )
            print(f"Index '{index_name}' created.")
        except Exception as e:
            print(f"Error creating Pinecone index: {e}")
    else:
        print(f"Index '{index_name}' already exists.")

# Example Usage:
# pc = init_pinecone()
# if pc:
#     index_name = "my-brainyflow-index"
#     dimension = 128
#     create_pinecone_index_if_not_exists(pc, index_name, dimension)

#     # Connect to the index
#     try:
#         index = pc.Index(index_name)

#         # Upsert vectors
#         vectors_to_upsert = [
#             ("vec_id1", [0.1] * dimension, {"genre": "fiction"}), # With metadata
#             ("vec_id2", [0.2] * dimension, {"year": 2023})
#         ]
#         print(f"Upserting {len(vectors_to_upsert)} vectors...")
#         index.upsert(vectors=vectors_to_upsert)
#         print("Upsert complete.")

#         # Query
#         query_vector = [0.15] * dimension
#         print("Querying index...")
#         response = index.query(vector=query_vector, top_k=3, include_metadata=True)
#         print("Query response:", response)

#     except Exception as e:
#         print(f"Error interacting with Pinecone index: {e}")

3. Qdrant

# Requires: pip install qdrant-client
import os
import qdrant_client
from qdrant_client.http.models import Distance, VectorParams, PointStruct, CollectionStatus

def init_qdrant_client() -> qdrant_client.QdrantClient | None:
    """Initializes Qdrant client."""
    qdrant_url = os.environ.get("QDRANT_URL") # e.g., "http://localhost:6333" or cloud URL
    api_key = os.environ.get("QDRANT_API_KEY") # Optional, for cloud
    if not qdrant_url:
        print("Error: QDRANT_URL not set.")
        return None
    try:
        client = qdrant_client.QdrantClient(url=qdrant_url, api_key=api_key)
        print("Qdrant client initialized.")
        return client
    except Exception as e:
        print(f"Error initializing Qdrant client: {e}")
        return None

def create_qdrant_collection_if_not_exists(client: qdrant_client.QdrantClient, collection_name: str, dimension: int, distance_metric: Distance = Distance.COSINE):
    """Creates a Qdrant collection if it doesn't exist."""
    try:
        collections = client.get_collections().collections
        if not any(c.name == collection_name for c in collections):
            print(f"Creating collection '{collection_name}'...")
            client.recreate_collection( # Use recreate_collection for simplicity, or check/create
                collection_name=collection_name,
                vectors_config=VectorParams(size=dimension, distance=distance_metric)
            )
            print(f"Collection '{collection_name}' created.")
        else:
            print(f"Collection '{collection_name}' already exists.")
    except Exception as e:
        print(f"Error creating or checking Qdrant collection: {e}")


# Example Usage:
# client = init_qdrant_client()
# if client:
#     collection_name = "brainyflow_qdrant_demo"
#     dimension = 128
#     create_qdrant_collection_if_not_exists(client, collection_name, dimension)

#     try:
#         # Upsert points
#         points_to_upsert = [
#             # Use UUIDs or sequential integers for IDs
#             PointStruct(id=1, vector=[0.1] * dimension, payload={"type": "doc1", "source": "fileA.txt"}),
#             PointStruct(id=2, vector=[0.2] * dimension, payload={"type": "doc2", "source": "fileB.txt"}),
#         ]
#         print(f"Upserting {len(points_to_upsert)} points...")
#         # Use wait=True for confirmation, especially in scripts
#         client.upsert(collection_name=collection_name, points=points_to_upsert, wait=True)
#         print("Upsert complete.")

#         # Search
#         query_vector = [0.15] * dimension
#         print("Searching collection...")
#         search_result = client.search(
#             collection_name=collection_name,
#             query_vector=query_vector,
#             limit=2 # Number of results to return
#         )
#         print("Search results:", search_result)

#     except Exception as e:
#         print(f"Error interacting with Qdrant collection: {e}")

4. Weaviate

# Requires: pip install weaviate-client
import os
import weaviate
import weaviate.classes as wvc # New import style

def init_weaviate_client() -> weaviate.Client | None:
    """Initializes Weaviate client."""
    weaviate_url = os.environ.get("WEAVIATE_URL") # e.g., "http://localhost:8080" or cloud URL
    api_key = os.environ.get("WEAVIATE_API_KEY") # Optional, for cloud/auth
    if not weaviate_url:
        print("Error: WEAVIATE_URL not set.")
        return None
    try:
        auth_config = weaviate.AuthApiKey(api_key=api_key) if api_key else None
        client = weaviate.connect_to_custom( # Use new connection methods
            http_host=weaviate_url.replace("http://", "").split(":")[0],
            http_port=int(weaviate_url.split(":")[-1]),
            http_secure=weaviate_url.startswith("https"),
            # grpc_host=... # Optional gRPC connection details
            # grpc_port=...
            # grpc_secure=...
            auth_credentials=auth_config
        )
        # client = weaviate.Client(url=weaviate_url, auth_client_secret=auth_config) # Old init
        client.is_ready() # Check connection
        print("Weaviate client initialized and ready.")
        return client
    except Exception as e:
        print(f"Error initializing Weaviate client: {e}")
        return None

def create_weaviate_collection_if_not_exists(client: weaviate.Client, class_name: str, dimension: int):
    """Creates a Weaviate collection (class) if it doesn't exist."""
    try:
        if not client.collections.exists(class_name):
            print(f"Creating collection '{class_name}'...")
            client.collections.create(
                name=class_name,
                vectorizer_config=wvc.config.Configure.Vectorizer.none(), # Explicitly disable vectorizer
                # Define properties if needed
                properties=[
                    wvc.config.Property(name="title", data_type=wvc.config.DataType.TEXT),
                    wvc.config.Property(name="content", data_type=wvc.config.DataType.TEXT),
                ]
            )
            print(f"Collection '{class_name}' created.")
        else:
             print(f"Collection '{class_name}' already exists.")
    except Exception as e:
        print(f"Error creating or checking Weaviate collection: {e}")

# Example Usage:
# client = init_weaviate_client()
# if client:
#     collection_name = "BrainyflowArticle" # Class names must be capitalized
#     dimension = 128
#     create_weaviate_collection_if_not_exists(client, collection_name, dimension)

#     try:
#         articles = client.collections.get(collection_name)

#         # Insert data object with vector
#         properties = {
#             "title": "Hello Weaviate",
#             "content": "This is an example document."
#         }
#         vector = [0.1] * dimension
#         print("Inserting object...")
#         uuid = articles.data.insert(properties=properties, vector=vector)
#         print(f"Object inserted with UUID: {uuid}")

#         # Query using nearVector
#         query_vector = [0.15] * dimension
#         print("Querying collection...")
#         response = articles.query.near_vector(
#             near_vector=query_vector,
#             limit=3,
#             return_properties=["title", "content"], # Specify properties to return
#             return_metadata=wvc.query.MetadataQuery(distance=True) # Include distance
#         )

#         print("Query results:")
#         for o in response.objects:
#             print(f" - Title: {o.properties['title']}, Distance: {o.metadata.distance}")

#     except Exception as e:
#         print(f"Error interacting with Weaviate collection: {e}")
#     finally:
#         client.close() # Close the connection
#         print("Weaviate client closed.")

5. Milvus

# Requires: pip install pymilvus numpy
import os
from pymilvus import connections, utility, FieldSchema, CollectionSchema, DataType, Collection
import numpy as np

def connect_milvus():
    """Connects to Milvus."""
    milvus_uri = os.environ.get("MILVUS_URI", "http://localhost:19530") # Or Zilliz Cloud URI
    token = os.environ.get("MILVUS_TOKEN") # For Zilliz Cloud or authenticated Milvus
    alias = "default"
    try:
        print(f"Connecting to Milvus at {milvus_uri}...")
        connections.connect(alias=alias, uri=milvus_uri, token=token)
        print("Milvus connected.")
    except Exception as e:
        print(f"Error connecting to Milvus: {e}")

def create_milvus_collection_if_not_exists(collection_name: str, dimension: int):
    """Creates a Milvus collection if it doesn't exist."""
    alias = "default"
    if not utility.has_collection(collection_name, using=alias):
        print(f"Creating collection '{collection_name}'...")
        # Define fields: Primary key and embedding vector
        fields = [
            FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=True), # Auto-incrementing ID
            FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dimension)
        ]
        schema = CollectionSchema(fields, description="BrainyFlow Milvus Demo")
        collection = Collection(name=collection_name, schema=schema, using=alias)
        print(f"Collection '{collection_name}' created.")
        # Create an index for the embedding field for efficient search
        index_params = {
            "metric_type": "L2", # Or "IP" for inner product
            "index_type": "IVF_FLAT", # Example index type
            "params": {"nlist": 1024} # Example parameter
        }
        collection.create_index(field_name="embeddings", index_params=index_params)
        print(f"Index created on 'embeddings' field.")
    else:
        print(f"Collection '{collection_name}' already exists.")
    # Load collection into memory for searching
    collection = Collection(collection_name) # Get collection object
    if collection.is_empty:
         print(f"Collection '{collection_name}' is empty, loading skipped.")
    elif utility.load_state(collection_name) != "Loaded":
         print(f"Loading collection '{collection_name}'...")
         collection.load()
         print("Collection loaded.")
    else:
         print(f"Collection '{collection_name}' already loaded.")


# Example Usage:
# connect_milvus()
# collection_name = "brainyflow_milvus_demo"
# dimension = 128
# create_milvus_collection_if_not_exists(collection_name, dimension)

# try:
#     collection = Collection(collection_name) # Get collection handle

#     # Insert data
#     num_vectors = 100
#     vectors_to_insert = np.random.rand(num_vectors, dimension).astype('float32').tolist()
#     # Data format for pymilvus is a list of lists/tuples matching field order (excluding auto-id pk)
#     data = [vectors_to_insert]
#     print(f"Inserting {num_vectors} vectors...")
#     mr = collection.insert(data)
#     print(f"Insert result: {mr}")
#     collection.flush() # Ensure data is written
#     print(f"Data flushed. Entity count: {collection.num_entities}")


#     # Search
#     query_vector = np.random.rand(1, dimension).astype('float32').tolist()
#     search_params = {
#         "metric_type": "L2",
#         "params": {"nprobe": 10} # Search parameter for IVF_FLAT
#     }
#     print("Searching collection...")
#     results = collection.search(
#         data=query_vector,
#         anns_field="embeddings",
#         param=search_params,
#         limit=3 # Number of results
#     )
#     print("Search results:")
#     for hit in results[0]: # Results is a list of lists (one per query vector)
#         print(f" - ID: {hit.id}, Distance: {hit.distance}")

# except Exception as e:
#     print(f"Error interacting with Milvus collection: {e}")
# finally:
#     # Disconnect (optional, depends on application lifecycle)
#     # connections.disconnect("default")
#     # print("Milvus disconnected.")
#     pass

6. Chroma

# Requires: pip install chromadb duckdb>=0.8.1 # Or other impl. deps
import os
import chromadb
# from chromadb.config import Settings # Settings is deprecated, use client args

def init_chroma_client(persist_directory: str = "./chroma_data_py") -> chromadb.Client | None:
    """Initializes a persistent Chroma client."""
    try:
        # Persistent client stores data on disk
        client = chromadb.PersistentClient(path=persist_directory)
        # Or use in-memory client: client = chromadb.Client()
        print(f"Chroma client initialized (persistent path: {persist_directory}).")
        return client
    except Exception as e:
        print(f"Error initializing Chroma client: {e}")
        return None

def get_or_create_chroma_collection(client: chromadb.Client, collection_name: str) -> chromadb.Collection | None:
    """Gets or creates a Chroma collection."""
    try:
        collection = client.get_or_create_collection(collection_name)
        print(f"Using Chroma collection '{collection_name}'.")
        return collection
    except Exception as e:
        print(f"Error getting/creating Chroma collection: {e}")
        return None

# Example Usage:
# client = init_chroma_client()
# if client:
#     collection_name = "brainyflow_chroma_demo"
#     collection = get_or_create_chroma_collection(client, collection_name)

#     if collection:
#         try:
#             # Add data (embeddings, optional documents, optional metadata, required IDs)
#             dimension = 128
#             ids_to_add = ["item1", "item2", "item3"]
#             embeddings_to_add = [
#                 [0.1] * dimension,
#                 [0.2] * dimension,
#                 [0.9] * dimension # Different vector
#             ]
#             metadatas_to_add = [
#                 {"source": "docA", "page": 1},
#                 {"source": "docB", "page": 5},
#                 {"source": "docA", "page": 2},
#             ]
#             documents_to_add = [ # Optional, can store original text
#                 "This is the first document.",
#                 "This is the second text.",
#                 "A third piece of information."
#             ]

#             print(f"Adding/updating {len(ids_to_add)} items...")
#             collection.add( # Use add or upsert
#                 ids=ids_to_add,
#                 embeddings=embeddings_to_add,
#                 metadatas=metadatas_to_add,
#                 documents=documents_to_add
#             )
#             print("Add/update complete.")
#             print(f"Collection count: {collection.count()}")

#             # Query
#             query_embedding = [[0.15] * dimension] # Query expects list of embeddings
#             num_results = 2
#             print("Querying collection...")
#             results = collection.query(
#                 query_embeddings=query_embedding,
#                 n_results=num_results,
#                 include=['metadatas', 'documents', 'distances'] # Specify what to include
#             )
#             print("Query results:", results)

#         except Exception as e:
#             print(f"Error interacting with Chroma collection: {e}")
# Requires: pip install redis numpy
import os
import redis
import numpy as np
from redis.commands.search.field import VectorField, TagField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import Query

def init_redis_client() -> redis.Redis | None:
    """Initializes Redis client."""
    redis_host = os.environ.get("REDIS_HOST", "localhost")
    redis_port = int(os.environ.get("REDIS_PORT", 6379))
    redis_password = os.environ.get("REDIS_PASSWORD") # Optional
    try:
        # Ensure decoding responses for easier handling
        client = redis.Redis(host=redis_host, port=redis_port, password=redis_password, decode_responses=True)
        client.ping() # Verify connection
        print(f"Redis client connected to {redis_host}:{redis_port}.")
        return client
    except Exception as e:
        print(f"Error connecting to Redis: {e}")
        return None

def create_redis_index_if_not_exists(client: redis.Redis, index_name: str, prefix: str, dimension: int):
    """Creates a Redis Search index for vectors if it doesn't exist."""
    schema = (
        # Example: Add other fields like text or tags if needed
        # TagField("$.tag", as_name="tag"),
        VectorField("$.embedding", "FLAT", { # Or HNSW
            "TYPE": "FLOAT32",
            "DIM": dimension,
            "DISTANCE_METRIC": "L2", # Or COSINE, IP
            # "INITIAL_CAP": 1000, # Optional params for HNSW etc.
        }, as_name="vector")
    )
    definition = IndexDefinition(prefix=[prefix], index_type=IndexType.JSON) # Index JSON documents

    try:
        # Check if index exists
        client.ft(index_name).info()
        print(f"Redis index '{index_name}' already exists.")
    except:
        # Index doesn't exist, create it
        print(f"Creating Redis index '{index_name}' for prefix '{prefix}'...")
        client.ft(index_name).create_index(fields=schema, definition=definition)
        print(f"Redis index '{index_name}' created.")

# Example Usage:
# client = init_redis_client()
# if client:
#     index_name = "brainyflow-redis-idx"
#     doc_prefix = "bfdoc:" # Prefix for keys to be indexed
#     dimension = 128
#     create_redis_index_if_not_exists(client, index_name, doc_prefix, dimension)

#     try:
#         # Insert data using JSON.SET (requires RedisJSON module)
#         doc_id1 = f"{doc_prefix}item1"
#         doc_id2 = f"{doc_prefix}item2"
#         vector1 = np.array([0.1] * dimension, dtype=np.float32).tobytes()
#         vector2 = np.array([0.2] * dimension, dtype=np.float32).tobytes()

#         # Store vector as bytes in a JSON object
#         # Note: Storing raw bytes in JSON isn't standard.
#         # Often, vectors are stored as lists and indexed, or using HSET with raw bytes.
#         # Let's use HSET for simpler vector storage with raw bytes.

#         # Revisit index creation for HASH
#         client.ft(index_name).dropindex() # Drop JSON index if exists
#         schema_hash = (
#              VectorField("embedding", "FLAT", { # Indexing a field named 'embedding' in the HASH
#                 "TYPE": "FLOAT32", "DIM": dimension, "DISTANCE_METRIC": "L2"
#              }, as_name="vector"),
#              # TagField("tag", as_name="tag") # Example other field
#         )
#         definition_hash = IndexDefinition(prefix=[doc_prefix], index_type=IndexType.HASH)
#         try:
#              client.ft(index_name).info()
#         except:
#              print("Recreating index for HASH...")
#              client.ft(index_name).create_index(fields=schema_hash, definition=definition_hash)


#         print("Inserting data using HSET...")
#         client.hset(doc_id1, mapping={"embedding": vector1, "tag": "A"})
#         client.hset(doc_id2, mapping={"embedding": vector2, "tag": "B"})
#         print("Data inserted.")

#         # KNN Query
#         k = 3
#         query_vector = np.array([0.15] * dimension, dtype=np.float32).tobytes()
#         # Query syntax: "*=>[KNN $K @vector $vec AS vector_score]"
#         # $vec needs to be passed as query parameter
#         q = Query(f"*=>[KNN {k} @vector $vec AS vector_score]")\
#             .sort_by("vector_score")\
#             .return_fields("id", "vector_score", "tag")\
#             .dialect(2) # Use DIALECT 2 for parameter support

#         params = {"vec": query_vector}
#         print("Querying Redis index...")
#         results = client.ft(index_name).search(q, query_params=params)

#         print("Query results:")
#         for doc in results.docs:
#             print(f" - ID: {doc.id}, Score: {doc.vector_score}, Tag: {doc.tag}")

#     except Exception as e:
#         print(f"Error interacting with Redis Search: {e}")

Last updated