🚀 Deployments
Requirements before starting to work with deployments.
The xCloud inference engine allows to do 2 kind of deployments:
- Custom deployment: deploy any model with your favorite framework (TensorFlow, PyTorch, HuggingFace, ONNX, etc). This deployment type is flexible enough to customize the model loading and the inference. You can take advantage of some server optimizations like dynamic batching and streaming inference. These server optimizations can potentially reduce inference costs by around 10x (if you have a high number of requests/sec) compared to a standard API server.
- Optimized deployment: deploy certain models that are automatically optimized to minimize latency. While the inference customization is limited, you can configure generation parameters such as top_k, max_tokens, temperature, etc. Server optimizations can also be applied to these deployments. Estimated cost savings can be approximately 30x compared to a standard API server.
The supported models for the optimized inference are:
- Aquila & Aquila2 (
BAAI/AquilaChat2-7B
,BAAI/AquilaChat2-34B
,BAAI/Aquila-7B
,BAAI/AquilaChat-7B
, etc.) - ChatGLM (
THUDM/chatglm2-6b
,THUDM/chatglm3-6b
, etc.) - Flan-T5 (
google/flan-t5-small
,google/flan-t5-base
,google/flan-t5-large
,google/flan-t5-xl
,google/flan-t5-xxl
, etc) - T5 (
t5-small
,t5-base
,t5-large
,t5-3b
,t5-11b
, etc) - LLaMA and LLaMA-2 (
meta-llama/Llama-2-70b-hf
,lmsys/vicuna-13b-v1.3
,young-geng/koala
,openlm-research/open_llama_13b
, etc) - GPT-J (
EleutherAI/gpt-j-6b
,nomic-ai/gpt4all-j
, etc) - GPT-NeoX (
EleutherAI/gpt-neox-20b
,databricks/dolly-v2-12b
,stabilityai/stablelm-tuned-alpha-7b
, etc) - GPT BigCode (
bigcode/starcoder
,bigcode/gpt_bigcode-santacoder
, etc.) - Baichuan (
baichuan-inc/Baichuan-7B
,baichuan-inc/Baichuan-13B-Chat
, etc) - BLOOM (
bigscience/bloom
,bigscience/bloomz
, etc) - Falcon (
tiiuae/falcon-7b
,tiiuae/falcon-40b
,tiiuae/falcon-rw-7b
, etc) - GPT-2 (
gpt2
,gpt2-xl
, etc) - MPT (
mosaicml/mpt-7b
,mosaicml/mpt-30b
, etc) - OPT (
facebook/opt-66b
,facebook/opt-iml-max-30b
, etc) - InternLM (
internlm/internlm-7b
,internlm/internlm-chat-7b
, etc.) - Mistral (
mistralai/Mistral-7B-v0.1
,mistralai/Mistral-7B-Instruct-v0.1
, etc.) - Mixtral (
mistralai/Mixtral-8x7B-v0.1
,mistralai/Mixtral-8x7B-Instruct-v0.1
, etc.) - Phi-1.5 (
microsoft/phi-1_5
, etc.) - Qwen (
Qwen/Qwen-7B
,Qwen/Qwen-7B-Chat
, etc.) - Yi (
01-ai/Yi-6B
,01-ai/Yi-34B
, etc.)
On this tutorial we will deploy the GPT2 model and we will cover both type of deployments.
1. Custom deployment
1.1 Preparing your inference code
- Create a directory in your local file system where you will save your inference scripts. Let's create this example directory
~/xcloud_inference
. - Create another directory called
~/xcloud_inference/model
where you will save your model. In this case we will save the GPT2 model.
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained(
"gpt2",
low_cpu_mem_usage=True
)
tokenizer.save_pretrained("~/xcloud_inference/model")
model.save_pretrained("~/xcloud_inference/model")
- Create your inference script
~/xcloud_inference/inference.py
and include the following code on it.
from typing import Dict
from pathlib import Path
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
class ModelInference:
def __init__(self, workdir_path: str):
model_path = Path(workdir_path) / "model"
self.tokenizer = AutoTokenizer.from_pretrained(str(model_path))
self.model = AutoModelForCausalLM.from_pretrained(
str(model_path),
low_cpu_mem_usage=True
).to("cuda")
self.model.eval()
def predict(
self,
request: Dict,
headers: Dict[str, str] = None
):
instances = request['instances']
generate_params = {
"max_new_tokens": 512,
"top_k": 1,
"top_p": 0.0,
"temperature": 1.0,
"repetition_penalty": 1.0
}
inputs = self.tokenizer(instances, return_tensors="pt")
inputs = {k: v.cuda() for k, v in inputs.items()}
with torch.no_grad():
output = self.model.generate(**inputs, **generate_params)
output = self.tokenizer.batch_decode(output, skip_special_tokens=True)
result = {
"predictions": output
}
return result
The inference script must be named inference.py
and include a class named ModelInference
with a minimum of two methods:
__init__(self, workdir_path: str)
: This method handles model loading and initialization. Theworkdir_path
parameter is a string containing the path to your code and assets. Your current directory is set as theworkdir_path
, so you can alternatively load the model usingfrom_pretrained("./model")
.predict(self, request: Dict, headers: Dict[str, str] = None)
: this method processes the user's request, performing preprocessing, the model's feed-forward pass, and post-processing. Therequest
parameter is a Python dictionary containing the JSON input. Note: when dynamic batching is enabled, you'll receive a dictionary with the keyinstances
, containing a list of batched requests. Additionally, with dynamic batching enabled, the output of the endpoint should be a Python dictionary containing the keypredictions
and its value should be a list of the same length of theinstances
list.
Streaming inference - Token by token generation
If you want to do stream inference (token by token) then you should use a code snippet similar to this one. Dynamic batching is not compatible with the streaming inference.
from pathlib import Path
from typing import Dict
from threading import Thread
from sse_starlette.sse import EventSourceResponse
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
import torch
class ModelInference:
def __init__(self, workdir_path: str):
model_path = Path(workdir_path) / "model"
self.tokenizer = AutoTokenizer.from_pretrained(str(model_path))
self.model = AutoModelForCausalLM.from_pretrained(
str(model_path),
low_cpu_mem_usage=True,
torch_dtype=torch.float16
).to("cuda")
self.model.eval()
def predict(
self,
request: Dict,
headers: Dict[str, str] = None
):
instances = request['instances']
streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True)
input_ids = self.tokenizer(instances, return_tensors="pt")['input_ids']
input_ids = input_ids.cuda()
generation_kwargs = {
"input_ids": input_ids,
"streamer": streamer,
"max_new_tokens": 1024,
"top_k": 1,
"top_p": 0.0,
"temperature": 1.0,
"repetition_penalty": 1.0
}
thread = Thread(target=self.model.generate, kwargs=generation_kwargs)
thread.start()
return EventSourceResponse(streamer)
- Create the file
~/xcloud_inference/requirements.txt
and include the following libraries.
transformers
torch
sentencepiece
accelerate
Your ~/xcloud_inference
directory should look like:
- xcloud_inference
|- requirements.txt
|
|- inference.py
|
|- model
| |- config.json
| |- generation_config.json
| |- pytorch_model.bin
| |- special_tokens_map.json
| |- spiece.model
| |- tokenizer_config.json
| |- tokenizer.json
1.2. Uploading assets to AWS
Before starting the deployment you will have to upload your code to your cloud provider, for instance S3 or Google Storage. We are working to support other cloud providers and uploading your code from your local machine.
To do that we will use cloudpathlib
, a Python package that allows you to upload local files to AWS.
You can install the cloudpathlib
library running the command pip install cloudpathlib[s3]
.
from cloudpathlib import CloudPath
CloudPath(
"s3://<your_bucket>/xcloud_inference/"
).upload_from("~/xcloud_inference")
1.3. Creating the deployment
To create a deployment, you will have to configure the following parameters.
- Specify where your code is stored and the credentials to access it.
from xcloud import ModelSpecs, Deployment, MachineType, Credentials, DeploymentContainerSpecs, DeploymentSpecs, Batcher, Scaling, SCALE_METRIC, Cloud
import os
# Configure your AWS credentials
creds = Credentials(
cloud=Cloud.AWS,
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
aws_region=os.environ["AWS_DEFAULT_REGION"]
)
model_specs = ModelSpecs(
model_path="s3://<your_bucket>/xcloud_inference/",
credentials=creds
)
- Specify the instance type, wether you want to use a spot or non-spot instance, the Docker image, environment variables and secrets.
container_specs = DeploymentContainerSpecs(
machine_type=MachineType.GPU_L4_1,
spot=True,
image="us-central1-docker.pkg.dev/stochastic-x/xcloud-public/inference:pytorch2_small_v0.0.1",
env={}, # Here you can add env variables
secrets={} # You can add secrets that will be shared through env variables
)
The secrets
won't be stored in the DB for security reasons. That's why you should put here environment variables that contains passwords, API keys, etc.
The Docker images that can be selected for inference are:
us-central1-docker.pkg.dev/stochastic-x/xcloud-public/inference:cpu_v0.0.1
: for CPU instances. It already has Python installed.us-central1-docker.pkg.dev/stochastic-x/xcloud-public/inference:pytorch2_small_v0.0.1
: it already contains CUDA and PyTorch (v2) already installed.us-central1-docker.pkg.dev/stochastic-x/xcloud-public/inference:pytorch2_v0.0.1
: it already contains CUDA and some other libraries already installed (protobuf
,torch
,torchaudio
,torchvision
,transformers[torch]
,numpy
,torchmetrics
,scikit-learn
,scipy
,Pillow
,datasets
,nltk
,rouge-score
,sentencepiece
,seqeval
,sacrebleu
,pytorch-lightning
,wandb
,marshmallow
,cloudpathlib
,requests
,onnxruntime-gpu
)
- Configure the deployment specifications. You can configure the dynamic batching, the auto-scaling policy and wether you want to add authentication to your endpoint.
deployment_specs = DeploymentSpecs(
batcher=Batcher(max_batch_size=8, max_lantecy=500),
scaling=Scaling(min_replicas=1, max_replicas=3, scale_metric=SCALE_METRIC.CONCURRENCY, target_scaling_value=1),
authentication=True
)
Batcher (dynamic batching): You can define the maximum batch size and waiting time (in milliseconds). The server will wait for a maximum of
max_latency
to create a batch of up tomax_batch_size
samples.Scaling: You can set the
min_replicas
that will always be running, and themax_replicas
that will scale up based on the chosen metric. Two available metrics for autoscaling deployments are:SCALE_METRIC.CONCURRENCY
: represents the number of simultaneous requests that each replica can process. With atarget_scaling_value
of 1, if there are 3 concurrent requests, the deployment will scale up to 3 replicas.SCALE_METRIC.RPS
: sets a target for requests per second per replica.
Authentication: When set to
True
, an API key will be generated for the deployment, which must be used for making inferences.
A deployment can also be configured to scale down to zero replicas after a period of inactivity. If the deployment scales down to 0 replicas, the next request will restart the server. Please note that there will be a cold start. Example:
# This deployment will scale down after 30 minutes = 180000 milliseconds
deployment_specs = DeploymentSpecs(
batcher=Batcher(max_batch_size=8, max_lantecy=500),
scaling=Scaling(min_replicas=0, max_replicas=1, time_before_scaling_to_zero=180000),
authentication=True
)
- Create the deployment.
from xcloud import DeploymentsClient
deployment_name = "xcloud-inference"
deployment = Deployment(
deployment_name=deployment_name,
model_specs=model_specs,
container_specs=container_specs,
deployment_specs=deployment_specs,
location=Location.GCP_US_CENTRAL_1
)
deployment = DeploymentsClient.create_deployment(deployment)
We are currently supporting the following locations (compute regions):
- Location.GCP_US_CENTRAL_1
- Location.AZURE_EAST_US
- You can wait until your deployment is ready. It usually takes 5 - 10 min depending on your model and the selected Docker image.
DeploymentsClient.wait_until_deployment_is_ready(deployment_name=deployment_name)
- You can fetch the deployment information.
deployment = DeploymentsClient.get_deployment_by_name(deployment_name)
print(deployment)
- To fetch the logs you can run the following code. Note that
logs
is a Python dictionary where the key is the replica number and the value is a string containing the logs.
logs = DeploymentsClient.get_logs(deployment_name=deployment_name)
if logs.get("0") is not None:
print(logs["0"])
- Once the deployment is running, you can start doing the inference. Note that you will have to add the header
x-api-key
to the request because the authentication was enabled when the deployment was created.
import requests
response = requests.post(
url=deployment.inference.infer_endpoint,
headers={
"x-api-key": deployment.inference.api_key
},
json={
"instances": [
"There was a guy that"
]
}
)
response.raise_for_status()
response.json()
Stream inference - Example client
Example client to do the inference in a streaming deployment. Don't forget to install the sseclient-py
package running pip install sseclient-py
.
import requests
import sseclient # pip install sseclient-py
stream_response = requests.post(
url=deployment.inference.infer_endpoint,
headers={
"x-api-key": deployment.inference.api_key
},
json={
"instances": [
"There was a guy that"
]
},
stream=True
)
client = sseclient.SSEClient(stream_response)
for event in client.events():
string_data = event.data.strip()
if string_data != "":
print(string_data, end=" ", flush=True)
- You can cancel the deployment execution. This will delete the virtual machines associated to it, but you will be able to still access the logs and the deployment information.
cancelled_deployment = DeploymentsClient.cancel_deployment(deployment_name=deployment_name)
- You can delete the deployment if you no longer want to access it.
deleted_deployment = DeploymentsClient.delete_deployment(deployment_name=deployment_name)
You can access the UI https://xcloud-app.stochastic.ai to visualize the deployments information and the logs.
2. Optimized deployment
2.1. Preparing your assets
- Create the directory
~/xcloud_opt_inference/model
in your local file system where you will save your model. In this case we will save the GPT2 model.
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained(
"gpt2",
low_cpu_mem_usage=True
)
tokenizer.save_pretrained("~/xcloud_opt_inference/model")
model.save_pretrained("~/xcloud_opt_inference/model")
Your ~/xcloud_opt_inference
directory should look like:
- xcloud_opt_inference
|- model
| |- config.json
| |- generation_config.json
| |- pytorch_model.bin
| |- special_tokens_map.json
| |- spiece.model
| |- tokenizer_config.json
| |- tokenizer.json
2.2. Uploading assets to AWS
Before starting the deployment you will have to upload your code to your cloud provider, for instance S3 or Google Storage.
To do that we will use cloudpathlib
, a Python package that allows you to upload local files to AWS.
You can install the cloudpathlib
library running the command pip install cloudpathlib[s3]
.
from cloudpathlib import CloudPath
CloudPath(
"s3://<your_bucket>/xcloud_opt_inference/"
).upload_from("~/xcloud_opt_inference")
2.3. Creating the optimized deployment
To create a deployment, you will have to configure the following parameters.
- Specify where your code is stored and the credentials to access it.
from xcloud import (
ModelSpecs,
Deployment,
MachineType,
Credentials,
DeploymentContainerSpecs,
DeploymentSpecs,
Batcher,
Scaling,
DeploymentOptimizationSpecs,
ModelConfig,
ModelType,
DTYPES,
GenerationParams,
Cloud
)
import os
# Configure your AWS credentials
creds = Credentials(
cloud=Cloud.AWS,
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
aws_region=os.environ["AWS_DEFAULT_REGION"]
)
model_specs = ModelSpecs(
model_path="s3://<your_bucket>/xcloud_opt_inference/",
credentials=creds
)
- Specify the instance type, wether you want to use a spot or non-spot instance and the optimization specifications.
container_specs = DeploymentContainerSpecs(
machine_type=MachineType.GPU_L4_1,
spot=True,
optimization_specs=DeploymentOptimizationSpecs(
model_type=ModelType.GPT2,
model_config=ModelConfig(
model_path="./model",
tokenizer_path="./model",
dtype=DTYPES.FP32,
generation_params=GenerationParams(
max_tokens=512
)
)
)
)
The options for the optimization specs are:
- model_type (required): we need to know which model you are going to use to optimize it accordingly.
- model_config:
- model_path and tokenizer_path (required): we need to know the location of your model and tokenizer inside of the code_specs that you have provided. Your working directory is set to the root directory of your uploaded code, that's why you can specify "./model".
- dtype (optional): the data type of your model weights. By default it is DTYPES.FP16. You can specify the following values: DTYPES.FP32, DTYPES.FP16 and DTYPES.BF16.
- tensor_parallel_size (optional): tensor parallelism. By default it is 1.
- max_batch_size (optional): maximum batch size that your model is able to process before getting CUDA out of memory.
- max_tokens (optional): maximum number of tokens your model can process before getting CUDA out of memory.
- generation_params (optional): parameters for the generate method.
- temperature (optional): this parameter controls the randomness of the generated text. A higher value, like 1.0, makes the output more diverse and creative, as it samples from a wider range of possibilities. A lower value, like 0.5, makes the output more focused and deterministic, as it tends to select the most probable words. Default is 1.0
- top_p (optional): this parameter sets a threshold for the cumulative probability of the next word candidates. It influences the diversity of the generated text. A higher value, like 1.0, means that more candidates are considered, while a lower value, like 0.8, limits the candidates to a narrower selection based on their probabilities. Default is 1.0
- top_k (optional): this parameter limits the number of top probability words considered at each step of generation. If set to a positive value, like 50, only the top 50 words by probability are considered. If set to -1 (the default), all words are considered.
- beam_search (optional): when set to True, beam search is used during generation. Beam search explores multiple possible sequences of words in parallel, keeping the most likely candidates at each step. This can lead to more coherent and structured output. Default is False
- max_tokens (optional): this parameter restricts the maximum length of the generated text to a certain number of tokens. Default is 2048
- presence_penalty (optional): this penalty encourages the model to avoid repeating the same words or phrases in the generated text. A non-zero value makes the model less likely to reuse words it has already used. Default is 0.0
- frequency_penalty (optional): this penalty encourages the model to use less common words or phrases. A non-zero value makes the model more inclined to use a broader vocabulary and avoid overusing common words. Default is 0.0
- Configure the deployment specifications. You can configure the dynamic batching, the auto-scaling policy and wether you want to add authentication to your endpoint.
deployment_specs = DeploymentSpecs(
batcher=Batcher(max_batch_size=8, max_lantecy=500),
scaling=Scaling(min_replicas=1, max_replicas=3),
authentication=True
)
Batcher (dynamic batching): You can define the maximum batch size and waiting time (in milliseconds). The server will wait for a maximum of
max_latency
to create a batch of up tomax_batch_size
samples.Scaling: You can set the
min_replicas
that will always be running, and themax_replicas
that will scale up based on the chosen metric. Two available metrics for autoscaling deployments are:SCALE_METRIC.CONCURRENCY
: represents the number of simultaneous requests that each replica can process. With atarget_scaling_value
of 1, if there are 3 concurrent requests, the deployment will scale up to 3 replicas.SCALE_METRIC.RPS
: sets a target for requests per second per replica.
Authentication: When set to
True
, an API key will be generated for the deployment, which must be used for making inferences.
A deployment can also be configured to scale down to zero replicas after a period of inactivity. If the deployment scales down to 0 replicas, the next request will restart the server. Please note that there will be a cold start. Example:
# This deployment will scale down after 30 minutes = 180000 milliseconds
deployment_specs = DeploymentSpecs(
batcher=Batcher(max_batch_size=8, max_lantecy=500),
scaling=Scaling(min_replicas=0, max_replicas=1, time_before_scaling_to_zero=180000),
authentication=True
)
- Create the optimized deployment.
from xcloud import DeploymentsClient
deployment_name = "opt-deployment"
# Configure deployment
deployment = Deployment(
deployment_name=deployment_name,
model_specs=model_specs,
container_specs=container_specs,
deployment_specs=deployment_specs
)
deployment = DeploymentsClient.create_deployment(deployment)
- You can wait until your deployment is ready. It usually takes 5 - 10 min depending on your model and the selected Docker image.
DeploymentsClient.wait_until_deployment_is_ready(deployment_name=deployment_name)
- You can fetch the deployment information.
deployment = DeploymentsClient.get_deployment_by_name(deployment_name)
print(deployment)
- To fetch the logs you can run the following code. Note that
logs
is a Python dictionary where the key is the replica number and the value is a string containing the logs.
logs = DeploymentsClient.get_logs(deployment_name=deployment_name)
if logs.get("0") is not None:
print(logs["0"])
- Once the deployment is running, you can start doing the inference. Note that you will have to add the header
x-api-key
to the request because the authentication was enabled when the deployment was created.
import requests
response = requests.post(
url=deployment.inference.infer_endpoint,
headers={
"x-api-key": deployment.inference.api_key
},
json={
"instances": [
"There was a guy that"
]
}
)
response.raise_for_status()
response.json()
- You can cancel the deployment execution. This will delete the virtual machines associated to it, but you will be able to still access the logs and the deployment information.
cancelled_deployment = DeploymentsClient.cancel_deployment(deployment_name=deployment_name)
- You can delete the deployment if you no longer want to access it.
deleted_deployment = DeploymentsClient.delete_deployment(deployment_name=deployment_name)
You can access the UI https://xcloud-app.stochastic.ai to visualize the deployments information and the logs.