1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Vector DB CDK: Fix special tokens (#33065)

This commit is contained in:
Joe Reuter
2023-12-08 11:46:46 +01:00
committed by GitHub
parent aab74b27e2
commit 21b3b2f638
3 changed files with 33 additions and 17 deletions

View File

@@ -74,6 +74,7 @@ class DocumentProcessor:
chunk_overlap=chunk_overlap,
separators=[json.loads(s) for s in splitter_config.separators],
keep_separator=splitter_config.keep_separator,
disallowed_special=(),
)
if splitter_config.mode == "markdown":
return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
@@ -82,12 +83,14 @@ class DocumentProcessor:
separators=headers_to_split_on[: splitter_config.split_level],
is_separator_regex=True,
keep_separator=True,
disallowed_special=(),
)
if splitter_config.mode == "code":
return RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=RecursiveCharacterTextSplitter.get_separators_for_language(Language(splitter_config.language)),
disallowed_special=(),
)
def __init__(self, config: ProcessingConfigModel, catalog: ConfiguredAirbyteCatalog):

View File

@@ -4,7 +4,7 @@
import os
from abc import ABC, abstractmethod
from typing import List, Optional, Union
from typing import List, Optional, Union, cast
from airbyte_cdk.destinations.vector_db_based.config import (
AzureOpenAIEmbeddingConfigModel,
@@ -72,7 +72,7 @@ class BaseOpenAIEmbedder(Embedder):
return format_exception(e)
return None
def embed_chunks(self, chunks: List[Chunk]) -> List[List[float]]:
def embed_chunks(self, chunks: List[Chunk]) -> List[Optional[List[float]]]:
"""
Embed the text of each chunk and return the resulting embedding vectors.
@@ -83,7 +83,7 @@ class BaseOpenAIEmbedder(Embedder):
# Each chunk can hold at most self.chunk_size tokens, so tokens-per-minute by maximum tokens per chunk is the number of chunks that can be embedded at once without exhausting the limit in a single request
embedding_batch_size = OPEN_AI_TOKEN_LIMIT // self.chunk_size
batches = create_chunks(chunks, batch_size=embedding_batch_size)
embeddings = []
embeddings: List[Optional[List[float]]] = []
for batch in batches:
embeddings.extend(self.embeddings.embed_documents([chunk.page_content for chunk in batch]))
return embeddings
@@ -96,13 +96,13 @@ class BaseOpenAIEmbedder(Embedder):
class OpenAIEmbedder(BaseOpenAIEmbedder):
def __init__(self, config: OpenAIEmbeddingConfigModel, chunk_size: int):
super().__init__(OpenAIEmbeddings(openai_api_key=config.openai_key, max_retries=15), chunk_size) # type: ignore
super().__init__(OpenAIEmbeddings(openai_api_key=config.openai_key, max_retries=15, disallowed_special=()), chunk_size) # type: ignore
class AzureOpenAIEmbedder(BaseOpenAIEmbedder):
def __init__(self, config: AzureOpenAIEmbeddingConfigModel, chunk_size: int):
# Azure OpenAI API has — as of 20230927 — a limit of 16 documents per request
super().__init__(OpenAIEmbeddings(openai_api_key=config.openai_key, chunk_size=16, max_retries=15, openai_api_type="azure", openai_api_version="2023-05-15", openai_api_base=config.api_base, deployment=config.deployment), chunk_size) # type: ignore
super().__init__(OpenAIEmbeddings(openai_api_key=config.openai_key, chunk_size=16, max_retries=15, openai_api_type="azure", openai_api_version="2023-05-15", openai_api_base=config.api_base, deployment=config.deployment, disallowed_special=()), chunk_size) # type: ignore
COHERE_VECTOR_SIZE = 1024
@@ -121,8 +121,8 @@ class CohereEmbedder(Embedder):
return format_exception(e)
return None
def embed_chunks(self, chunks: List[Chunk]) -> List[List[float]]:
return self.embeddings.embed_documents([chunk.page_content for chunk in chunks])
def embed_chunks(self, chunks: List[Chunk]) -> List[Optional[List[float]]]:
return cast(List[Optional[List[float]]], self.embeddings.embed_documents([chunk.page_content or "" for chunk in chunks]))
@property
def embedding_dimensions(self) -> int:
@@ -142,8 +142,8 @@ class FakeEmbedder(Embedder):
return format_exception(e)
return None
def embed_chunks(self, chunks: List[Chunk]) -> List[List[float]]:
return self.embeddings.embed_documents([chunk.page_content for chunk in chunks])
def embed_chunks(self, chunks: List[Chunk]) -> List[Optional[List[float]]]:
return cast(List[Optional[List[float]]], self.embeddings.embed_documents([chunk.page_content or "" for chunk in chunks]))
@property
def embedding_dimensions(self) -> int:
@@ -160,7 +160,7 @@ class OpenAICompatibleEmbedder(Embedder):
self.config = config
# Client is set internally
# Always set an API key even if there is none defined in the config because the validator will fail otherwise. Embedding APIs that don't require an API key don't fail if one is provided, so this is not breaking usage.
self.embeddings = LocalAIEmbeddings(model=config.model_name, openai_api_key=config.api_key or "dummy-api-key", openai_api_base=config.base_url, max_retries=15) # type: ignore
self.embeddings = LocalAIEmbeddings(model=config.model_name, openai_api_key=config.api_key or "dummy-api-key", openai_api_base=config.base_url, max_retries=15, disallowed_special=()) # type: ignore
def check(self) -> Optional[str]:
deployment_mode = os.environ.get("DEPLOYMENT_MODE", "")
@@ -173,8 +173,8 @@ class OpenAICompatibleEmbedder(Embedder):
return format_exception(e)
return None
def embed_chunks(self, chunks: List[Chunk]) -> List[List[float]]:
return self.embeddings.embed_documents([chunk.page_content for chunk in chunks])
def embed_chunks(self, chunks: List[Chunk]) -> List[Optional[List[float]]]:
return cast(List[Optional[List[float]]], self.embeddings.embed_documents([chunk.page_content or "" for chunk in chunks]))
@property
def embedding_dimensions(self) -> int:
@@ -190,12 +190,12 @@ class FromFieldEmbedder(Embedder):
def check(self) -> Optional[str]:
return None
def embed_chunks(self, chunks: List[Chunk]) -> List[List[float]]:
def embed_chunks(self, chunks: List[Chunk]) -> List[Optional[List[float]]]:
"""
From each chunk, pull the embedding from the field specified in the config.
Check that the field exists, is a list of numbers and is the correct size. If not, raise an AirbyteTracedException explaining the problem.
"""
embeddings = []
embeddings: List[Optional[List[float]]] = []
for chunk in chunks:
data = chunk.record.data
if self.config.field_name not in data:
@@ -246,8 +246,9 @@ def create_from_config(
OpenAICompatibleEmbeddingConfigModel,
],
processing_config: ProcessingConfigModel,
):
) -> Embedder:
if embedding_config.mode == "azure_openai" or embedding_config.mode == "openai":
return embedder_map[embedding_config.mode](embedding_config, processing_config.chunk_size)
return cast(Embedder, embedder_map[embedding_config.mode](embedding_config, processing_config.chunk_size))
else:
return embedder_map[embedding_config.mode](embedding_config)
return cast(Embedder, embedder_map[embedding_config.mode](embedding_config))

View File

@@ -275,6 +275,18 @@ def test_process_multiple_chunks_with_relevant_fields():
"eight nine ten eleven twelve thirteen",
],
),
(
"Special tokens",
"Special tokens like <|endoftext|> are treated like regular text",
15,
0,
None,
[
"text: Special tokens like",
"<|endoftext|> are treated like regular",
"text",
]
),
(
"Custom separator",
"Custom \nseparatorxxxDoes not split on \n\nnewlines",