import json
from packaging import version
from tokenizers import Tokenizer # type: ignore
from types import TracebackType
from typing import (
Any,
AsyncGenerator,
List,
Mapping,
Optional,
Dict,
Sequence,
Type,
Union,
)
import aiohttp
import asyncio
from aiohttp_retry import RetryClient, ExponentialRetry
import requests
from requests import Response
from requests.adapters import HTTPAdapter
from requests.structures import CaseInsensitiveDict
from urllib3.util.retry import Retry
from tqdm.asyncio import tqdm
from aleph_alpha_client.explanation import (
ExplanationRequest,
ExplanationResponse,
)
from aleph_alpha_client.completion import (
CompletionRequest,
CompletionResponse,
CompletionResponseStreamItem,
stream_item_from_json,
)
from aleph_alpha_client.chat import (
ChatRequest,
ChatResponse,
ChatStreamChunk,
Usage,
FinishReason,
process_chat_stream,
ToolCall,
)
from aleph_alpha_client.evaluation import EvaluationRequest, EvaluationResponse
from aleph_alpha_client.tokenization import TokenizationRequest, TokenizationResponse
from aleph_alpha_client.detokenization import (
DetokenizationRequest,
DetokenizationResponse,
)
from aleph_alpha_client.embedding import (
BatchSemanticEmbeddingRequest,
BatchSemanticEmbeddingResponse,
EmbeddingRequest,
EmbeddingResponse,
EmbeddingVector,
EmbeddingV2Request,
EmbeddingV2Response,
InstructableEmbeddingRequest,
InstructableEmbeddingResponse,
SemanticEmbeddingRequest,
SemanticEmbeddingResponse,
)
from aleph_alpha_client.steering import (
SteeringConceptCreationRequest,
SteeringConceptCreationResponse,
)
from aleph_alpha_client.version import MIN_API_VERSION, user_agent_headers
from aleph_alpha_client.translation import TranslationRequest, TranslationResponse
from aleph_alpha_client.reranking import RerankRequest, RerankResponse
POOLING_OPTIONS = ["mean", "max", "last_token", "abs_max"]
RETRY_STATUS_CODES = frozenset({408, 429, 500, 502, 503, 504})
DEFAULT_REQUEST_TIMEOUT = 305
[docs]
class QuotaError(Exception):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
[docs]
class BusyError(Exception):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def _raise_for_status(status_code: int, text: str):
if status_code >= 400:
if status_code == 400:
raise ValueError(status_code, text)
elif status_code == 401:
raise PermissionError(status_code, text)
elif status_code == 402:
raise QuotaError(status_code, text)
elif status_code == 408:
raise TimeoutError(status_code, text)
elif status_code == 503:
raise BusyError(status_code, text)
else:
raise RuntimeError(status_code, text)
def _check_api_version(version_str: str):
api_ver = version.parse(MIN_API_VERSION)
ver = version.parse(version_str)
valid = api_ver.major == ver.major and api_ver <= ver
if not valid:
raise RuntimeError(
f"The aleph alpha client requires at least api version {api_ver}, found version {ver}"
)
AnyRequest = Union[
CompletionRequest,
ChatRequest,
EmbeddingRequest,
EmbeddingV2Request,
EvaluationRequest,
TokenizationRequest,
DetokenizationRequest,
SemanticEmbeddingRequest,
InstructableEmbeddingRequest,
BatchSemanticEmbeddingRequest,
ExplanationRequest,
ExplanationRequest,
SteeringConceptCreationRequest,
TranslationRequest,
RerankRequest,
]
[docs]
class Client:
"""
Construct a client for synchronous requests given a user token
Parameters:
token (string, required):
The API token that will be used for authentication.
host (string, required):
The hostname of the API host.
hosting(string, optional, default None):
Determines in which datacenters the request may be processed.
You can either set the parameter to "aleph-alpha" or omit it (defaulting to None).
Not setting this value, or setting it to None, gives us maximal flexibility in processing your request in our
own datacenters and on servers hosted with other providers. Choose this option for maximal availability.
Setting it to "aleph-alpha" allows us to only process the request in our own datacenters.
Choose this option for maximal data privacy.
request_timeout_seconds (int, optional, default 305):
Client timeout that will be set for HTTP requests in the `requests` library's API calls.
Server will close all requests after 300 seconds with an internal server error.
total_retries(int, optional, default 8)
The number of retries made in case requests fail with certain retryable status codes. If the last
retry fails a corresponding exception is raised. Note, that between retries an exponential backoff
is applied, starting with 0.5 s after the first retry and doubling for each retry made. So with the
default setting of 8 retries a total wait time of 63.5 s is added between the retries.
nice(bool, required, default False):
Setting this to True, will signal to the API that you intend to be nice to other users
by de-prioritizing your request below concurrent ones.
verify_ssl(bool, optional, default True)
Setting this to False will disable checking for SSL when doing requests.
tags(Optional[Sequence[str]], optional, default None)
Internal feature.
Example usage:
>>> request = CompletionRequest(prompt=Prompt.from_text(f"Request"), maximum_tokens=64)
>>> client = Client(
token=os.environ["TEST_TOKEN"],
host=os.environ["TEST_API_URL"],
)
>>> response: CompletionResponse = client.complete(request, "pharia-1-llm-7b-control")
"""
def __init__(
self,
token: str,
host: str,
hosting: Optional[str] = None,
request_timeout_seconds: int = DEFAULT_REQUEST_TIMEOUT,
total_retries: int = 8,
nice: bool = False,
verify_ssl=True,
tags: Optional[Sequence[str]] = None,
pool_size: int = 10,
) -> None:
if host[-1] != "/":
host += "/"
self.host = host
self.hosting = hosting
self.request_timeout_seconds = request_timeout_seconds
self.token = token
self.nice = nice
self.tags = tags
retry_strategy = Retry(
total=total_retries,
backoff_factor=0.25,
status_forcelist=RETRY_STATUS_CODES,
allowed_methods=["POST", "GET"],
raise_on_status=False,
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=pool_size,
pool_maxsize=pool_size,
)
self.session = requests.Session()
self.session.verify = verify_ssl
self.session.headers = CaseInsensitiveDict(
{
"Authorization": "Bearer " + self.token,
**user_agent_headers(),
}
)
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)
[docs]
def validate_version(self) -> None:
"""Gets version of the AlephAlpha HTTP API."""
_check_api_version(self.get_version())
[docs]
def get_version(self) -> str:
"""Gets version of the AlephAlpha HTTP API."""
return self._get_request("version").text
def _get_request(self, endpoint: str) -> Response:
response = self.session.get(self.host + endpoint)
if not response.ok:
_raise_for_status(response.status_code, response.text)
return response
def _post_request(
self,
endpoint: str,
request: AnyRequest,
model: Optional[str] = None,
) -> Dict[str, Any]:
json_body = self._build_json_body(request, model)
query_params = self._build_query_parameters()
response = self.session.post(
self.host + endpoint,
json=json_body,
params=query_params,
timeout=self.request_timeout_seconds,
)
if not response.ok:
_raise_for_status(response.status_code, response.text)
return response.json()
def _build_query_parameters(self) -> Mapping[str, str]:
return {
# Cannot use str() here because we want lowercase true/false in query string.
# Also do not want to send the nice flag with every request if it is false
**({"nice": "true"} if self.nice else {}),
}
def _build_json_body(
self, request: AnyRequest, model: Optional[str]
) -> Mapping[str, Any]:
json_body = dict(request.to_json())
if model is not None:
json_body["model"] = model
if self.hosting is not None:
json_body["hosting"] = self.hosting
if self.tags is not None:
json_body["tags"] = self.tags
return json_body
[docs]
def models(self) -> List[Mapping[str, Any]]:
"""
Queries all models which are currently available.
For documentation of the response, see
https://docs.aleph-alpha.com/products/apis/pharia-inference/available-models/
"""
response = self._get_request("models_available")
return response.json()
[docs]
def complete(
self,
request: CompletionRequest,
model: str,
) -> CompletionResponse:
"""Generates completions given a prompt.
Parameters:
request (CompletionRequest, required):
Parameters for the requested completion.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> # create a prompt
>>> prompt = Prompt.from_text("An apple a day, ")
>>>
>>> # create a completion request
>>> request = CompletionRequest(
prompt=prompt,
maximum_tokens=32,
stop_sequences=["###","\\n"],
temperature=0.12
)
>>>
>>> # complete the prompt
>>> result = client.complete(request, model=model_name)
"""
response = self._post_request("complete", request, model)
return CompletionResponse.from_json(response)
[docs]
def chat(
self,
request: ChatRequest,
model: str,
) -> ChatResponse:
"""Chat with a model.
Parameters:
request (ChatRequest, required):
Parameters for the requested chat.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> import os
>>> from aleph_alpha_client import Client, ChatRequest, Message
>>>
>>> client = Client(token=os.environ["TOKEN"], host="https://inference-api.your.domain")
>>> model = "llama-3.1-8b-instruct"
>>> # create a chat request
>>> request = ChatRequest(
messages=[Message(role="user", content="Hello, how are you?")],
model=model,
)
>>>
>>> result = client.chat(request, model=model)
>>> print(result.message)
"""
response = self._post_request("chat/completions", request, model)
return ChatResponse.from_json(response)
[docs]
def tokenize(
self,
request: TokenizationRequest,
model: str,
) -> TokenizationResponse:
"""Tokenizes the given prompt for the given model.
Parameters:
request (TokenizationRequest, required):
Parameters for the requested tokenization.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> request = TokenizationRequest(
prompt="hello", token_ids=True, tokens=True
)
>>> response = client.tokenize(request, model=model_name)
"""
response = self._post_request(
"tokenize",
request,
model,
)
return TokenizationResponse.from_json(response)
[docs]
def detokenize(
self,
request: DetokenizationRequest,
model: str,
) -> DetokenizationResponse:
"""Detokenizes the given prompt for the given model.
Parameters:
request (DetokenizationRequest, required):
Parameters for the requested detokenization.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> request = DetokenizationRequest(token_ids=[2, 3, 4])
>>> response = client.detokenize(request, model=model_name)
"""
response = self._post_request(
"detokenize",
request,
model,
)
return DetokenizationResponse.from_json(response)
[docs]
def embed(
self,
request: EmbeddingRequest,
model: str,
) -> EmbeddingResponse:
"""Embeds a text and returns vectors that can be used for downstream tasks (e.g. semantic similarity) and models (e.g. classifiers).
Parameters:
request (EmbeddingRequest, required):
Parameters for the requested embedding.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> request = EmbeddingRequest(prompt=Prompt.from_text(
"This is an example."), layers=[-1], pooling=["mean"]
)
>>> result = client.embed(request, model=model_name)
"""
response = self._post_request(
"embed",
request,
model,
)
return EmbeddingResponse.from_json(response)
[docs]
def embeddings(
self,
request: EmbeddingV2Request,
model: str,
) -> EmbeddingV2Response:
"""Embeds a text and returns vectors that can be used for downstream tasks.
This interface is compatible to the OpenAI /embeddings endpoint.
Parameters:
request (EmbeddingV2Request, required):
Parameters for the requested embedding.
model (string, required):
Name of model to use.
Examples:
>>> request = EmbeddingV2Request(input="This is an example", dimensions=20)
>>> result = client.embeddings(request, model=model_name)
"""
response = self._post_request(
"embeddings",
request,
model,
)
return EmbeddingV2Response.from_json(response)
[docs]
def semantic_embed(
self,
request: SemanticEmbeddingRequest,
model: str,
) -> SemanticEmbeddingResponse:
"""Embeds a text and returns vectors that can be used for downstream tasks
(e.g. semantic similarity) and models (e.g. classifiers).
Parameters:
request (SemanticEmbeddingRequest, required):
Parameters for the requested semantic embedding.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> # function for symmetric embedding
>>> def embed_symmetric(text: str):
# Create an embeddingrequest with the type set to symmetric
request = SemanticEmbeddingRequest(prompt=Prompt.from_text(
text), representation=SemanticRepresentation.Symmetric)
# create the embedding
result = client.semantic_embed(request, model=model_name)
return result.embedding
>>>
>>> # function to calculate similarity
>>> def cosine_similarity(v1: Sequence[float], v2: Sequence[float]) -> float:
"compute cosine similarity of v1 to v2: (v1 dot v2)/{||v1||*||v2||)"
sumxx, sumxy, sumyy = 0, 0, 0
for i in range(len(v1)):
x = v1[i]; y = v2[i]
sumxx += x*x
sumyy += y*y
sumxy += x*y
return sumxy/math.sqrt(sumxx*sumyy)
>>>
>>> # define the texts
>>> text_a = "The sun is shining"
>>> text_b = "Il sole splende"
>>>
>>> # show the similarity
>>> print(cosine_similarity(embed_symmetric(text_a), embed_symmetric(text_b)))
"""
response = self._post_request(
"semantic_embed",
request,
model,
)
return SemanticEmbeddingResponse.from_json(response)
[docs]
def batch_semantic_embed(
self,
request: BatchSemanticEmbeddingRequest,
model: Optional[str] = None,
) -> BatchSemanticEmbeddingResponse:
"""Embeds a sequence of texts or images and returns vectors in the same order as they
were provided. If more than 100 prompts are provided then this method will chunk them
into batches of 100 prompts that will be sent to the API.
Parameters:
request (BatchSemanticEmbeddingRequest, required):
Parameters for the requested semantic embeddings.
model (string, optional, default None):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> # function for symmetric embedding
>>> def embed_symmetric(texts: Sequence[str]):
# Create an embeddingrequest with the type set to symmetric
request = BatchSemanticEmbeddingRequest(
prompts=[Prompt.from_text(text) for text in texts],
representation=SemanticRepresentation.Symmetric
)
# create the embedding
result = client.batch_semantic_embed(request, model=model_name)
return result.embedding
"""
responses: List[EmbeddingVector] = []
model_version = ""
num_tokens_prompt_total = 0
# The API currently only supports batch semantic embedding requests with up to 100
# prompts per batch. As a convenience for users, this function chunks larger requests.
for batch_request in _generate_semantic_embedding_batches(request):
raw_response = self._post_request(
"batch_semantic_embed",
batch_request,
model,
)
response = BatchSemanticEmbeddingResponse.from_json(raw_response)
model_version = response.model_version
responses.extend(response.embeddings)
num_tokens_prompt_total += response.num_tokens_prompt_total
return BatchSemanticEmbeddingResponse(
model_version=model_version,
embeddings=responses,
num_tokens_prompt_total=num_tokens_prompt_total,
)
[docs]
def instructable_embed(
self,
request: InstructableEmbeddingRequest,
model: str,
) -> InstructableEmbeddingResponse:
"""Embeds a text and returns vectors that can be used for classification according to a given instruction.
Parameters:
request (InstructableEmbeddingRequest, required):
Parameters for the requested instructable embedding.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> # function for salutation embedding
>>> def embed_salutation(text: str):
# Create an embeddingrequest with a given instruction
request = InstructableEmbeddingRequest(
input=Prompt.from_text(text),
instruction="Represent the text to query a database of salutations"
)
# create the embedding
result = client.instructable_embed(request, model=model_name)
return result.embedding
>>>
>>> # function to calculate similarity
>>> def cosine_similarity(v1: Sequence[float], v2: Sequence[float]) -> float:
"compute cosine similarity of v1 to v2: (v1 dot v2)/{||v1||*||v2||)"
sumxx, sumxy, sumyy = 0, 0, 0
for i in range(len(v1)):
x = v1[i]; y = v2[i]
sumxx += x*x
sumyy += y*y
sumxy += x*y
return sumxy/math.sqrt(sumxx*sumyy)
>>>
>>> # define the texts
>>> text_a = "Hello"
>>> text_b = "Good morning"
>>>
>>> # show the similarity
>>> print(cosine_similarity(embed_salutation(text_a), embed_salutation(text_b)))
"""
response = self._post_request(
"instructable_embed",
request,
model,
)
return InstructableEmbeddingResponse.from_json(response)
[docs]
def evaluate(
self,
request: EvaluationRequest,
model: str,
) -> EvaluationResponse:
"""Evaluates the model's likelihood to produce a completion given a prompt.
Parameters:
request (EvaluationRequest, required):
Parameters for the requested evaluation.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> request = EvaluationRequest(
prompt=Prompt.from_text("hello"), completion_expected=" world"
)
>>> response = client.evaluate(request, model=model_name)
"""
response = self._post_request(
"evaluate",
request,
model,
)
return EvaluationResponse.from_json(response)
[docs]
def explain(
self,
request: ExplanationRequest,
model: str,
) -> ExplanationResponse:
"""Better understand the source of a completion, specifically on how much each section of a
prompt impacts each token of the completion.
Parameters:
request (ExplanationRequest, required):
Parameters for the requested explanation.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> request = ExplanationRequest(
prompt=Prompt.from_text("Andreas likes"),
target=" pizza."
)
>>> response = client.explain(request, model="luminous-base")
"""
response = self._post_request(
"explain",
request,
model,
)
return ExplanationResponse.from_json(response)
[docs]
def create_steering_concept(
self, request: SteeringConceptCreationRequest
) -> SteeringConceptCreationResponse:
"""Creates a steering concept.
A steering concept consists of a list of steering examples. A steering
example is a pair of a "negative" and a "positive" string, describing
how you want to alter the model's output.
This request will return a unique ID for the newly created steering
concept that you can then use in completion and chat requests.
Parameters:
request (SteeringConceptCreationRequest, required)
Parameters for the steering concepts to create.
Examples:
>>> request = SteeringConceptCreationRequest(
>>> examples=[
>>> SteeringPairedExample(
>>> negative="I appreciate your valuable feedback on this matter.",
>>> positive="Thanks for the real talk, fam.",
>>> ),
>>> SteeringPairedExample(
>>> negative="The financial projections indicate significant growth potential.",
>>> positive="Yo, these numbers are looking mad stacked!",
>>> ),
>>> ]
>>> )
>>> response = client.create_steering_concept(request)
"""
response = self._post_request(
"steering_concepts",
request,
)
return SteeringConceptCreationResponse.from_json(response)
[docs]
def tokenizer(self, model: str) -> Tokenizer:
"""Returns a Tokenizer instance with the settings that were used to train the model.
Examples:
>>> tokenizer = client.tokenizer(model="luminous-base")
>>> tokenized_prompt = tokenizer.encode("Hello world")
"""
return Tokenizer.from_str(self._get_request(f"models/{model}/tokenizer").text)
[docs]
def translate(
self,
request: TranslationRequest,
) -> TranslationResponse:
"""Translates text from one language to another.
Parameters:
request (TranslationRequest, required):
Parameters for the requested translation.
Examples:
>>> request = TranslationRequest(
model="pharia-1-mt-translation",
source="Hello, how are you?",
target_language="de"
)
>>> response = client.translate(request)
>>> print(response.translation)
"""
response = self._post_request("translate", request)
return TranslationResponse.from_json(response)
[docs]
def rerank(
self,
request: RerankRequest,
model: str,
) -> RerankResponse:
"""Reranks documents against a query.
This endpoint takes in a query and a list of documents and produces an array
with each document assigned a relevance score.
Parameters:
request (RerankRequest, required):
Parameters for the requested reranking.
model (string, required):
Name of the model to use for reranking.
Examples:
>>> request = RerankRequest(
query="What is the capital of France?",
documents=[
"The capital of Brazil is Brasilia.",
"The capital of France is Paris.",
"Horses and cows are both animals.",
],
top_n=2,
)
>>> response = client.rerank(request, model="your-reranker-model")
>>> for result in response.results:
>>> print(f"Document {result.index}: {result.relevance_score}")
"""
response = self._post_request("rerank", request, model)
return RerankResponse.from_json(response)
[docs]
class AsyncClient:
"""
Construct a context object for asynchronous requests given a user token
Parameters:
token (string, required):
The API token that will be used for authentication.
host (string, required):
The hostname of the API host.
hosting(string, optional, default None):
Determines in which datacenters the request may be processed.
You can either set the parameter to "aleph-alpha" or omit it (defaulting to None).
Not setting this value, or setting it to None, gives us maximal flexibility in processing your request in our
own datacenters and on servers hosted with other providers. Choose this option for maximal availability.
Setting it to "aleph-alpha" allows us to only process the request in our own datacenters.
Choose this option for maximal data privacy.
request_timeout_seconds (int, optional, default 305):
Client timeout that will be set for HTTP requests in the `aiohttp` library's API calls.
Server will close all requests after 300 seconds with an internal server error.
total_retries(int, optional, default 8)
The number of retries made in case requests fail with certain retryable status codes. If the last
retry fails a corresponding exception is raised. Note, that between retries an exponential backoff
is applied, starting with 0.25 s after the first request and doubling for each retry made. So with the
default setting of 8 retries a total wait time of 63.75 s is added between the retries.
nice(bool, required, default False):
Setting this to True, will signal to the API that you intend to be nice to other users
by de-prioritizing your request below concurrent ones.
verify_ssl(bool, optional, default True)
Setting this to False will disable checking for SSL when doing requests.
tags(Optional[Sequence[str]], optional, default None)
Internal feature.
Example usage:
>>> request = CompletionRequest(prompt=Prompt.from_text(f"Request"), maximum_tokens=64)
>>> async with AsyncClient(
token=os.environ["TEST_TOKEN"],
host=os.environ["TEST_API_URL"],
) as client:
response: CompletionResponse = await client.complete(request, "pharia-1-llm-7b-control")
"""
def __init__(
self,
token: str,
host: str,
limit: int = 100,
hosting: Optional[str] = None,
request_timeout_seconds: int = DEFAULT_REQUEST_TIMEOUT,
total_retries: int = 8,
nice: bool = False,
verify_ssl=True,
tags: Optional[Sequence[str]] = None,
) -> None:
if host[-1] != "/":
host += "/"
self.host = host
self.hosting = hosting
self.request_timeout_seconds = request_timeout_seconds
self.token = token
self.nice = nice
self.tags = tags
retry_options = ExponentialRetry(
attempts=total_retries + 1,
exceptions={aiohttp.ClientConnectionError},
start_timeout=0.25,
statuses=set(RETRY_STATUS_CODES),
)
connector = aiohttp.TCPConnector(verify_ssl=verify_ssl, limit=limit)
self.session = RetryClient(
trust_env=True, # same behaviour as requests/(Sync)Client wrt. http_proxy
raise_for_status=False,
retry_options=retry_options,
timeout=aiohttp.ClientTimeout(self.request_timeout_seconds),
headers={
"Authorization": "Bearer " + self.token,
**user_agent_headers(),
},
connector=connector,
)
[docs]
async def close(self):
"""Needs to be called at end of lifetime if the AsyncClient object is not used as a context manager."""
await self.session.close()
def __enter__(self) -> None:
raise TypeError("Use async with instead")
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
# __exit__ should exist in pair with __enter__ but never executed
pass # pragma: no cover
async def __aenter__(self) -> "AsyncClient":
await self.session.__aenter__()
return self
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
):
await self.session.__aexit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb)
[docs]
async def validate_version(self) -> None:
_check_api_version(await self.get_version())
[docs]
async def get_version(self) -> str:
"""Gets version of the AlephAlpha HTTP API."""
return await self._get_request_text("version")
async def _get_request_text(self, endpoint: str) -> str:
async with self.session.get(
self.host + endpoint,
) as response:
if not response.ok:
_raise_for_status(response.status, await response.text())
return await response.text()
async def _get_request_json(
self, endpoint: str
) -> Union[List[Mapping[str, Any]], Mapping[str, Any]]:
async with self.session.get(
self.host + endpoint,
) as response:
if not response.ok:
_raise_for_status(response.status, await response.text())
return await response.json()
async def _post_request(
self,
endpoint: str,
request: AnyRequest,
model: Optional[str] = None,
) -> Dict[str, Any]:
json_body = self._build_json_body(request, model)
query_params = self._build_query_parameters()
async with self.session.post(
self.host + endpoint, json=json_body, params=query_params
) as response:
if not response.ok:
_raise_for_status(response.status, await response.text())
return await response.json()
SSE_DATA_PREFIX = "data: "
async def _post_request_with_streaming(
self,
endpoint: str,
request: AnyRequest,
model: Optional[str] = None,
) -> AsyncGenerator[Dict[str, Any], None]:
json_body = self._build_json_body(request, model)
json_body["stream"] = True
query_params = self._build_query_parameters()
async with self.session.post(
self.host + endpoint, json=json_body, params=query_params
) as response:
if not response.ok:
_raise_for_status(response.status, await response.text())
async for stream_item in response.content:
stream_item_as_str = stream_item.decode().strip()
if not stream_item_as_str:
continue
if not stream_item_as_str.startswith(self.SSE_DATA_PREFIX):
raise ValueError(
f"Stream item did not start with `{self.SSE_DATA_PREFIX}`. Was `{stream_item_as_str}`"
)
payload = stream_item_as_str[len(self.SSE_DATA_PREFIX) :]
if payload == "[DONE]":
continue
yield json.loads(payload)
def _build_query_parameters(self) -> Mapping[str, str]:
return {
# cannot use str() here because we want lowercase true/false in query string
# Also do not want to send the nice flag with every request if it is false
**({"nice": "true"} if self.nice else {}),
}
def _build_json_body(
self, request: AnyRequest, model: Optional[str]
) -> Dict[str, Any]:
json_body = dict(request.to_json())
if model is not None:
json_body["model"] = model
if self.hosting is not None:
json_body["hosting"] = self.hosting
if self.tags is not None:
json_body["tags"] = self.tags
return json_body
[docs]
async def models(self) -> List[Mapping[str, Any]]:
"""
Queries all models which are currently available.
For documentation of the response, see
https://docs.aleph-alpha.com/products/apis/pharia-inference/available-models/
"""
return await self._get_request_json("models_available") # type: ignore
[docs]
async def complete(
self,
request: CompletionRequest,
model: str,
) -> CompletionResponse:
"""Generates completions given a prompt.
Parameters:
request (CompletionRequest, required):
Parameters for the requested completion.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> # create a prompt
>>> prompt = Prompt.from_text("An apple a day, ")
>>>
>>> # create a completion request
>>> request = CompletionRequest(
prompt=prompt,
maximum_tokens=32,
stop_sequences=["###","\\n"],
temperature=0.12
)
>>>
>>> # complete the prompt
>>> result = await client.complete(request, model=model_name)
"""
response = await self._post_request(
"complete",
request,
model,
)
return CompletionResponse.from_json(response)
[docs]
async def chat(
self,
request: ChatRequest,
model: str,
) -> ChatResponse:
"""Chat with a model.
Parameters:
request (ChatRequest, required):
Parameters for the requested chat.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> import os
>>> from aleph_alpha_client import AsyncClient, ChatRequest, Message
>>>
>>> client = AsyncClient(token=os.environ["TOKEN"], host="https://inference-api.your.domain")
>>> model = "llama-3.1-8b-instruct"
>>> # create a chat request
>>> request = ChatRequest(
messages=[Message(role="user", content="Hello, how are you?")],
model=model,
)
>>>
>>> # chat with the model
>>> result = await client.chat(request, model=model)
>>> print(result.message)
"""
response = await self._post_request(
"chat/completions",
request,
model,
)
return ChatResponse.from_json(response)
[docs]
async def complete_with_streaming(
self,
request: CompletionRequest,
model: str,
) -> AsyncGenerator[CompletionResponseStreamItem, None]:
"""Generates streamed completions given a prompt.
Parameters:
request (CompletionRequest, required):
Parameters for the requested completion.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> # create a prompt
>>> prompt = Prompt.from_text("An apple a day, ")
>>>
>>> # create a completion request
>>> request = CompletionRequest(
prompt=prompt,
maximum_tokens=32,
stop_sequences=["###","\\n"],
temperature=0.12
)
>>>
>>> # complete the prompt
>>> result = await client.complete_with_streaming(request, model=model_name)
>>>
>>> # consume the completion stream
>>> async for stream_item in result:
>>> do_something_with(stream_item)
"""
async for stream_item_json in self._post_request_with_streaming(
"complete",
request,
model,
):
yield stream_item_from_json(stream_item_json)
[docs]
async def chat_with_streaming(
self,
request: ChatRequest,
model: str,
) -> AsyncGenerator[Union[ChatStreamChunk, Usage, ToolCall, FinishReason], None]:
"""Generates streamed chat completions.
The first yielded chunk contains the role, while subsequent chunks only contain the content delta.
Parameters:
request (ChatRequest, required):
Parameters for the requested chat.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> import os
>>> from aleph_alpha_client import AsyncClient, ChatRequest, Message
>>>
>>> client = AsyncClient(token=os.environ["TOKEN"], host="https://inference-api.your.domain")
>>> model = "llama-3.1-8b-instruct"
>>> # create a chat request
>>> request = ChatRequest(
messages=[Message(role="user", content="Hello, how are you?")],
model=model,
)
>>>
>>> # chat with the model
>>> result = client.chat_with_streaming(request, model=model)
>>>
>>> # consume the chat stream
>>> async for stream_item in result:
>>> print(stream_item)
"""
async for x in process_chat_stream(
self._post_request_with_streaming(
"chat/completions",
request,
model,
)
):
yield x
[docs]
async def tokenize(
self,
request: TokenizationRequest,
model: str,
) -> TokenizationResponse:
"""Tokenizes the given prompt for the given model.
Parameters:
request (TokenizationRequest, required):
Parameters for the requested tokenization.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> request = TokenizationRequest(prompt="hello", token_ids=True, tokens=True)
>>> response = await client.tokenize(request, model=model_name)
"""
response = await self._post_request(
"tokenize",
request,
model,
)
return TokenizationResponse.from_json(response)
[docs]
async def detokenize(
self,
request: DetokenizationRequest,
model: str,
) -> DetokenizationResponse:
"""Detokenizes the given prompt for the given model.
Parameters:
request (DetokenizationRequest, required):
Parameters for the requested detokenization.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> request = DetokenizationRequest(token_ids=[2, 3, 4])
>>> response = await client.detokenize(request, model=model_name)
"""
response = await self._post_request(
"detokenize",
request,
model,
)
return DetokenizationResponse.from_json(response)
[docs]
async def embed(
self,
request: EmbeddingRequest,
model: str,
) -> EmbeddingResponse:
"""Embeds a text and returns vectors that can be used for downstream tasks (e.g. semantic similarity) and models (e.g. classifiers).
Parameters:
request (EmbeddingRequest, required):
Parameters for the requested embedding.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> request = EmbeddingRequest(prompt=Prompt.from_text("This is an example."), layers=[-1], pooling=["mean"])
>>> result = await client.embed(request, model=model_name)
"""
response = await self._post_request(
"embed",
request,
model,
)
return EmbeddingResponse.from_json(response)
[docs]
async def semantic_embed(
self,
request: SemanticEmbeddingRequest,
model: str,
) -> SemanticEmbeddingResponse:
"""Embeds a text and returns vectors that can be used for downstream tasks
(e.g. semantic similarity) and models (e.g. classifiers).
Parameters:
request (SemanticEmbeddingRequest, required):
Parameters for the requested semantic embedding.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> # function for symmetric embedding
>>> async def embed_symmetric(text: str):
# Create an embeddingrequest with the type set to symmetric
request = SemanticEmbeddingRequest(prompt=Prompt.from_text(text), representation=SemanticRepresentation.Symmetric)
# create the embedding
result = await client.semantic_embed(request, model=model_name)
return result.embedding
>>>
>>> # function to calculate similarity
>>> def cosine_similarity(v1: Sequence[float], v2: Sequence[float]) -> float:
"compute cosine similarity of v1 to v2: (v1 dot v2)/{||v1||*||v2||)"
sumxx, sumxy, sumyy = 0, 0, 0
for i in range(len(v1)):
x = v1[i]; y = v2[i]
sumxx += x*x
sumyy += y*y
sumxy += x*y
return sumxy/math.sqrt(sumxx*sumyy)
>>>
>>> # define the texts
>>> text_a = "The sun is shining"
>>> text_b = "Il sole splende"
>>>
>>> # show the similarity
>>> print(cosine_similarity(await embed_symmetric(text_a), await embed_symmetric(text_b)))
"""
response = await self._post_request(
"semantic_embed",
request,
model,
)
return SemanticEmbeddingResponse.from_json(response)
[docs]
async def batch_semantic_embed(
self,
request: BatchSemanticEmbeddingRequest,
model: Optional[str] = None,
num_concurrent_requests: int = 1,
batch_size: int = 100,
progress_bar: bool = False,
) -> BatchSemanticEmbeddingResponse:
"""Embeds a sequence of texts or images and returns vectors in the same order as they
were provided. If more than `batch_size` prompts are provided then this method will chunk them
into batches of up to `batch_size` prompts that will be sent to the API.
Parameters:
request (BatchSemanticEmbeddingRequest, required):
Parameters for the requested semantic embeddings.
model (string, optional, default None):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
num_concurrent_requests (int, optional, default 1):
Maximum number of concurrent requests to send to the API.
batch_size (int, optional, default 100):
Number of prompts per batch sent to the API. This value must be between 1 and 100 (inclusive).
progress_bar (bool, optional, default False):
Whether to show a progress bar using tqdm.
Examples:
>>> # function for symmetric embedding
>>> def embed_symmetric(texts: Sequence[str]):
# Create an embeddingrequest with the type set to symmetric
request = BatchSemanticEmbeddingRequest(
prompts=[Prompt.from_text(text) for text in texts],
representation=SemanticRepresentation.Symmetric
)
# create the embedding
result = client.batch_semantic_embed(request, model=model_name)
return result.embedding
"""
if batch_size < 1 or batch_size > 100:
raise ValueError(
"`batch_semantic_embed` must be called with a `batch_size` between 1 and 100 (inclusive)"
)
responses: List[EmbeddingVector] = []
model_version = ""
# The API currently only supports batch semantic embedding requests with up to 100
# prompts per batch. As a convenience for users, this function chunks larger requests.
results = await self._gather_with_concurrency(
"batch_semantic_embed",
model,
num_concurrent_requests,
_generate_semantic_embedding_batches(request, batch_size),
progress_bar,
)
num_tokens_prompt_total = 0
for result in results:
resp = BatchSemanticEmbeddingResponse.from_json(result)
model_version = resp.model_version
responses.extend(resp.embeddings)
num_tokens_prompt_total += resp.num_tokens_prompt_total
return BatchSemanticEmbeddingResponse(
model_version=model_version,
embeddings=responses,
num_tokens_prompt_total=num_tokens_prompt_total,
)
[docs]
async def instructable_embed(
self,
request: InstructableEmbeddingRequest,
model: str,
) -> InstructableEmbeddingResponse:
"""Embeds a text and returns vectors that can be used for classification according to a given instruction.
Parameters:
request (InstructableEmbeddingRequest, required):
Parameters for the requested instructable embedding.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> # function for salutation embedding
>>> async def embed_salutation(text: str):
# Create an embeddingrequest with a given instruction
request = InstructableEmbeddingRequest(
input=Prompt.from_text(text),
instruction="Represent the text to query a database of salutations"
)
# create the embedding
result = await client.instructable_embed(request, model=model_name)
return result.embedding
>>>
>>> # function to calculate similarity
>>> def cosine_similarity(v1: Sequence[float], v2: Sequence[float]) -> float:
"compute cosine similarity of v1 to v2: (v1 dot v2)/{||v1||*||v2||)"
sumxx, sumxy, sumyy = 0, 0, 0
for i in range(len(v1)):
x = v1[i]; y = v2[i]
sumxx += x*x
sumyy += y*y
sumxy += x*y
return sumxy/math.sqrt(sumxx*sumyy)
>>>
>>> # define the texts
>>> text_a = "Hello"
>>> text_b = "Good morning"
>>>
>>> # show the similarity
>>> print(cosine_similarity(await embed_salutation(text_a), await embed_salutation(text_b)))
"""
response = await self._post_request(
"instructable_embed",
request,
model,
)
return InstructableEmbeddingResponse.from_json(response)
[docs]
async def evaluate(
self,
request: EvaluationRequest,
model: str,
) -> EvaluationResponse:
"""Evaluates the model's likelihood to produce a completion given a prompt.
Parameters:
request (EvaluationRequest, required):
Parameters for the requested evaluation.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> request = EvaluationRequest(
prompt=Prompt.from_text("hello"), completion_expected=" world"
)
>>> response = await client.evaluate(request, model=model_name)
"""
response = await self._post_request(
"evaluate",
request,
model,
)
return EvaluationResponse.from_json(response)
[docs]
async def explain(
self,
request: ExplanationRequest,
model: str,
) -> ExplanationResponse:
"""Better understand the source of a completion, specifically on how much each section of a
prompt impacts each token of the completion.
Parameters:
request (ExplanationRequest, required):
Parameters for the requested explanation.
model (string, required):
Name of model to use. A model name refers to a model architecture (number of parameters among others).
Always the latest version of model is used.
Examples:
>>> request = ExplanationRequest(
prompt=Prompt.from_text("Andreas likes"),
target=" pizza."
)
>>> response = await client.explain(request, model="luminous-base")
"""
response = await self._post_request(
"explain",
request,
model,
)
return ExplanationResponse.from_json(response)
[docs]
async def create_steering_concept(
self, request: SteeringConceptCreationRequest
) -> SteeringConceptCreationResponse:
"""Creates a steering concept.
A steering concept consists of a list of steering examples. A steering
example is a pair of a "negative" and a "positive" string, describing
how you want to alter the model's output.
This request will return a unique ID for the newly created steering
concept that you can then use in completion and chat requests.
Parameters:
request (SteeringConceptCreationRequest, required)
Parameters for the steering concepts to create.
Examples:
>>> request = SteeringConceptCreationRequest(
>>> examples=[
>>> SteeringPairedExample(
>>> negative="I appreciate your valuable feedback on this matter.",
>>> positive="Thanks for the real talk, fam.",
>>> ),
>>> SteeringPairedExample(
>>> negative="The financial projections indicate significant growth potential.",
>>> positive="Yo, these numbers are looking mad stacked!",
>>> ),
>>> ]
>>> )
>>> response = client.create_steering_concept(request)
"""
response = await self._post_request(
"steering_concepts",
request,
)
return SteeringConceptCreationResponse.from_json(response)
[docs]
async def tokenizer(self, model: str) -> Tokenizer:
"""Returns a Tokenizer instance with the settings that were used to train the model.
Examples:
>>> tokenizer = await client.tokenizer(model="luminous-base")
>>> tokenized_prompt = tokenizer.encode("Hello world")
"""
response = await self._get_request_text(f"models/{model}/tokenizer")
return Tokenizer.from_str(response)
# Based on: https://docs.aleph-alpha.com/changelog/2022/11/14/async-python-client/
async def _gather_with_concurrency(
self,
endpoint: str,
model: Optional[str],
n: int,
requests: Sequence[BatchSemanticEmbeddingRequest],
progress_bar: bool,
) -> List[Dict[str, Any]]:
semaphore = asyncio.Semaphore(n)
async def sem_task(request: BatchSemanticEmbeddingRequest):
async with semaphore:
return await self._post_request(endpoint, request, model)
# asyncio.gather preserves order of awaitables in result list
if progress_bar:
return await tqdm.gather(*(sem_task(request) for request in requests))
else:
return await asyncio.gather(*(sem_task(request) for request in requests))
[docs]
async def translate(
self,
request: TranslationRequest,
) -> TranslationResponse:
"""Translates text from one language to another.
Parameters:
request (TranslationRequest, required):
Parameters for the requested translation.
Examples:
>>> request = TranslationRequest(
model="pharia-1-mt-translation",
source="Hello, how are you?",
target_language="de"
)
>>> response = await client.translate(request)
>>> print(response.translation)
"""
response = await self._post_request("translate", request)
return TranslationResponse.from_json(response)
[docs]
async def rerank(
self,
request: RerankRequest,
model: str,
) -> RerankResponse:
"""Reranks documents against a query.
This endpoint takes in a query and a list of documents and produces an array
with each document assigned a relevance score.
Parameters:
request (RerankRequest, required):
Parameters for the requested reranking.
model (string, required):
Name of the model to use for reranking.
Examples:
>>> request = RerankRequest(
query="What is the capital of France?",
documents=[
"The capital of Brazil is Brasilia.",
"The capital of France is Paris.",
"Horses and cows are both animals.",
],
top_n=2,
)
>>> response = await client.rerank(request, model="your-reranker-model")
>>> for result in response.results:
>>> print(f"Document {result.index}: {result.relevance_score}")
"""
response = await self._post_request("rerank", request, model)
return RerankResponse.from_json(response)
def _generate_semantic_embedding_batches(
request: BatchSemanticEmbeddingRequest, batch_size: int = 100
) -> List[BatchSemanticEmbeddingRequest]:
requests = []
for batch_index in range(0, len(request.prompts), batch_size):
batch = request.prompts[batch_index : batch_index + batch_size]
requests.append(
BatchSemanticEmbeddingRequest(
prompts=batch,
representation=request.representation,
compress_to_size=request.compress_to_size,
normalize=request.normalize,
contextual_control_threshold=request.contextual_control_threshold,
control_log_additive=request.control_log_additive,
)
)
return requests