Skip to main content

🚀 Deployments

The xCloud inference engine allows to do 2 kind of deployments:

  • Custom deployment: deploy any model with your favorite framework (TensorFlow, PyTorch, HuggingFace, ONNX, etc). This deployment type is flexible enough to customize the model loading and the inference. You can take advantage of some server optimizations like dynamic batching and streaming inference. These server optimizations can potentially reduce inference costs by around 10x (if you have a high number of requests/sec) compared to a standard API server.
  • Optimized deployment: deploy certain models that are automatically optimized to minimize latency. While the inference customization is limited, you can configure generation parameters such as top_k, max_tokens, temperature, etc. Server optimizations can also be applied to these deployments. Estimated cost savings can be approximately 30x compared to a standard API server.

The supported models for the optimized inference are:

  • Aquila & Aquila2 (BAAI/AquilaChat2-7B, BAAI/AquilaChat2-34B, BAAI/Aquila-7B, BAAI/AquilaChat-7B, etc.)
  • ChatGLM (THUDM/chatglm2-6b, THUDM/chatglm3-6b, etc.)
  • Flan-T5 (google/flan-t5-small, google/flan-t5-base, google/flan-t5-large, google/flan-t5-xl, google/flan-t5-xxl, etc)
  • T5 (t5-small, t5-base, t5-large, t5-3b, t5-11b, etc)
  • LLaMA and LLaMA-2 (meta-llama/Llama-2-70b-hf, lmsys/vicuna-13b-v1.3, young-geng/koala, openlm-research/open_llama_13b, etc)
  • GPT-J (EleutherAI/gpt-j-6b, nomic-ai/gpt4all-j, etc)
  • GPT-NeoX (EleutherAI/gpt-neox-20b, databricks/dolly-v2-12b, stabilityai/stablelm-tuned-alpha-7b, etc)
  • GPT BigCode (bigcode/starcoder, bigcode/gpt_bigcode-santacoder, etc.)
  • Baichuan (baichuan-inc/Baichuan-7B, baichuan-inc/Baichuan-13B-Chat, etc)
  • BLOOM (bigscience/bloom, bigscience/bloomz, etc)
  • Falcon (tiiuae/falcon-7b, tiiuae/falcon-40b, tiiuae/falcon-rw-7b, etc)
  • GPT-2 (gpt2, gpt2-xl, etc)
  • MPT (mosaicml/mpt-7b, mosaicml/mpt-30b, etc)
  • OPT (facebook/opt-66b, facebook/opt-iml-max-30b, etc)
  • InternLM (internlm/internlm-7b, internlm/internlm-chat-7b, etc.)
  • Mistral (mistralai/Mistral-7B-v0.1, mistralai/Mistral-7B-Instruct-v0.1, etc.)
  • Mixtral (mistralai/Mixtral-8x7B-v0.1, mistralai/Mixtral-8x7B-Instruct-v0.1, etc.)
  • Phi-1.5 (microsoft/phi-1_5, etc.)
  • Qwen (Qwen/Qwen-7B, Qwen/Qwen-7B-Chat, etc.)
  • Yi (01-ai/Yi-6B, 01-ai/Yi-34B, etc.)

On this tutorial we will deploy the GPT2 model and we will cover both type of deployments.

1. Custom deployment

1.1 Preparing your inference code

  1. Create a directory in your local file system where you will save your inference scripts. Let's create this example directory ~/xcloud_inference.
  2. Create another directory called ~/xcloud_inference/model where you will save your model. In this case we will save the GPT2 model.
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained(
"gpt2",
low_cpu_mem_usage=True
)

tokenizer.save_pretrained("~/xcloud_inference/model")
model.save_pretrained("~/xcloud_inference/model")
  1. Create your inference script ~/xcloud_inference/inference.py and include the following code on it.
from typing import Dict
from pathlib import Path
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch


class ModelInference:
def __init__(self, workdir_path: str):
model_path = Path(workdir_path) / "model"

self.tokenizer = AutoTokenizer.from_pretrained(str(model_path))
self.model = AutoModelForCausalLM.from_pretrained(
str(model_path),
low_cpu_mem_usage=True
).to("cuda")
self.model.eval()

def predict(
self,
request: Dict,
headers: Dict[str, str] = None
):
instances = request['instances']

generate_params = {
"max_new_tokens": 512,
"top_k": 1,
"top_p": 0.0,
"temperature": 1.0,
"repetition_penalty": 1.0
}

inputs = self.tokenizer(instances, return_tensors="pt")
inputs = {k: v.cuda() for k, v in inputs.items()}
with torch.no_grad():
output = self.model.generate(**inputs, **generate_params)

output = self.tokenizer.batch_decode(output, skip_special_tokens=True)

result = {
"predictions": output
}

return result
caution

The inference script must be named inference.py and include a class named ModelInference with a minimum of two methods:

  • __init__(self, workdir_path: str): This method handles model loading and initialization. The workdir_path parameter is a string containing the path to your code and assets. Your current directory is set as the workdir_path, so you can alternatively load the model using from_pretrained("./model").

  • predict(self, request: Dict, headers: Dict[str, str] = None): this method processes the user's request, performing preprocessing, the model's feed-forward pass, and post-processing. The request parameter is a Python dictionary containing the JSON input. Note: when dynamic batching is enabled, you'll receive a dictionary with the key instances, containing a list of batched requests. Additionally, with dynamic batching enabled, the output of the endpoint should be a Python dictionary containing the key predictions and its value should be a list of the same length of the instances list.

Streaming inference - Token by token generation

If you want to do stream inference (token by token) then you should use a code snippet similar to this one. Dynamic batching is not compatible with the streaming inference.

from pathlib import Path
from typing import Dict
from threading import Thread
from sse_starlette.sse import EventSourceResponse
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
import torch


class ModelInference:
def __init__(self, workdir_path: str):
model_path = Path(workdir_path) / "model"
self.tokenizer = AutoTokenizer.from_pretrained(str(model_path))
self.model = AutoModelForCausalLM.from_pretrained(
str(model_path),
low_cpu_mem_usage=True,
torch_dtype=torch.float16
).to("cuda")
self.model.eval()

def predict(
self,
request: Dict,
headers: Dict[str, str] = None
):
instances = request['instances']

streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True)

input_ids = self.tokenizer(instances, return_tensors="pt")['input_ids']
input_ids = input_ids.cuda()

generation_kwargs = {
"input_ids": input_ids,
"streamer": streamer,
"max_new_tokens": 1024,
"top_k": 1,
"top_p": 0.0,
"temperature": 1.0,
"repetition_penalty": 1.0
}

thread = Thread(target=self.model.generate, kwargs=generation_kwargs)
thread.start()

return EventSourceResponse(streamer)
  1. Create the file ~/xcloud_inference/requirements.txt and include the following libraries.
transformers
torch
sentencepiece
accelerate
info

Your ~/xcloud_inference directory should look like:

- xcloud_inference
|- requirements.txt
|
|- inference.py
|
|- model
| |- config.json
| |- generation_config.json
| |- pytorch_model.bin
| |- special_tokens_map.json
| |- spiece.model
| |- tokenizer_config.json
| |- tokenizer.json

1.2. Uploading assets to AWS

Before starting the deployment you will have to upload your code to your cloud provider, for instance S3 or Google Storage. We are working to support other cloud providers and uploading your code from your local machine.

To do that we will use cloudpathlib, a Python package that allows you to upload local files to AWS.

info

You can install the cloudpathlib library running the command pip install cloudpathlib[s3].

from cloudpathlib import CloudPath

CloudPath(
"s3://<your_bucket>/xcloud_inference/"
).upload_from("~/xcloud_inference")

1.3. Creating the deployment

To create a deployment, you will have to configure the following parameters.

  1. Specify where your code is stored and the credentials to access it.
from xcloud import ModelSpecs, Deployment, MachineType, Credentials, DeploymentContainerSpecs, DeploymentSpecs, Batcher, Scaling, SCALE_METRIC, Cloud
import os


# Configure your AWS credentials
creds = Credentials(
cloud=Cloud.AWS,
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
aws_region=os.environ["AWS_DEFAULT_REGION"]
)

model_specs = ModelSpecs(
model_path="s3://<your_bucket>/xcloud_inference/",
credentials=creds
)
  1. Specify the instance type, wether you want to use a spot or non-spot instance, the Docker image, environment variables and secrets.
container_specs = DeploymentContainerSpecs(
machine_type=MachineType.GPU_L4_1,
spot=True,
image="us-central1-docker.pkg.dev/stochastic-x/xcloud-public/inference:pytorch2_small_v0.0.1",
env={}, # Here you can add env variables
secrets={} # You can add secrets that will be shared through env variables
)
info

The secrets won't be stored in the DB for security reasons. That's why you should put here environment variables that contains passwords, API keys, etc.

The Docker images that can be selected for inference are:

  • us-central1-docker.pkg.dev/stochastic-x/xcloud-public/inference:cpu_v0.0.1: for CPU instances. It already has Python installed.
  • us-central1-docker.pkg.dev/stochastic-x/xcloud-public/inference:pytorch2_small_v0.0.1: it already contains CUDA and PyTorch (v2) already installed.
  • us-central1-docker.pkg.dev/stochastic-x/xcloud-public/inference:pytorch2_v0.0.1: it already contains CUDA and some other libraries already installed (protobuf, torch, torchaudio, torchvision, transformers[torch], numpy, torchmetrics, scikit-learn, scipy, Pillow, datasets, nltk, rouge-score, sentencepiece, seqeval, sacrebleu, pytorch-lightning, wandb, marshmallow, cloudpathlib, requests, onnxruntime-gpu)
  1. Configure the deployment specifications. You can configure the dynamic batching, the auto-scaling policy and wether you want to add authentication to your endpoint.
deployment_specs = DeploymentSpecs(
batcher=Batcher(max_batch_size=8, max_lantecy=500),
scaling=Scaling(min_replicas=1, max_replicas=3, scale_metric=SCALE_METRIC.CONCURRENCY, target_scaling_value=1),
authentication=True
)
info
  • Batcher (dynamic batching): You can define the maximum batch size and waiting time (in milliseconds). The server will wait for a maximum of max_latency to create a batch of up to max_batch_size samples.

  • Scaling: You can set the min_replicas that will always be running, and the max_replicas that will scale up based on the chosen metric. Two available metrics for autoscaling deployments are:

    • SCALE_METRIC.CONCURRENCY: represents the number of simultaneous requests that each replica can process. With a target_scaling_value of 1, if there are 3 concurrent requests, the deployment will scale up to 3 replicas.
    • SCALE_METRIC.RPS: sets a target for requests per second per replica.
  • Authentication: When set to True, an API key will be generated for the deployment, which must be used for making inferences.

A deployment can also be configured to scale down to zero replicas after a period of inactivity. If the deployment scales down to 0 replicas, the next request will restart the server. Please note that there will be a cold start. Example:

# This deployment will scale down after 30 minutes = 180000 milliseconds
deployment_specs = DeploymentSpecs(
batcher=Batcher(max_batch_size=8, max_lantecy=500),
scaling=Scaling(min_replicas=0, max_replicas=1, time_before_scaling_to_zero=180000),
authentication=True
)
  1. Create the deployment.
from xcloud import DeploymentsClient

deployment_name = "xcloud-inference"
deployment = Deployment(
deployment_name=deployment_name,
model_specs=model_specs,
container_specs=container_specs,
deployment_specs=deployment_specs,
location=Location.GCP_US_CENTRAL_1
)

deployment = DeploymentsClient.create_deployment(deployment)
info

We are currently supporting the following locations (compute regions):

  • Location.GCP_US_CENTRAL_1
  • Location.AZURE_EAST_US
We highly advise opting for the Location.AZURE_EAST_US region when using A100 (80GB) GPUs due to ongoing high demand issues in GCP.
  1. You can wait until your deployment is ready. It usually takes 5 - 10 min depending on your model and the selected Docker image.
DeploymentsClient.wait_until_deployment_is_ready(deployment_name=deployment_name)
  1. You can fetch the deployment information.
deployment = DeploymentsClient.get_deployment_by_name(deployment_name)
print(deployment)
  1. To fetch the logs you can run the following code. Note that logs is a Python dictionary where the key is the replica number and the value is a string containing the logs.
logs = DeploymentsClient.get_logs(deployment_name=deployment_name)

if logs.get("0") is not None:
print(logs["0"])
  1. Once the deployment is running, you can start doing the inference. Note that you will have to add the header x-api-key to the request because the authentication was enabled when the deployment was created.
import requests

response = requests.post(
url=deployment.inference.infer_endpoint,
headers={
"x-api-key": deployment.inference.api_key
},
json={
"instances": [
"There was a guy that"
]
}
)

response.raise_for_status()

response.json()
Stream inference - Example client

Example client to do the inference in a streaming deployment. Don't forget to install the sseclient-py package running pip install sseclient-py.

import requests
import sseclient # pip install sseclient-py

stream_response = requests.post(
url=deployment.inference.infer_endpoint,
headers={
"x-api-key": deployment.inference.api_key
},
json={
"instances": [
"There was a guy that"
]
},
stream=True
)

client = sseclient.SSEClient(stream_response)

for event in client.events():
string_data = event.data.strip()
if string_data != "":
print(string_data, end=" ", flush=True)
  1. You can cancel the deployment execution. This will delete the virtual machines associated to it, but you will be able to still access the logs and the deployment information.
cancelled_deployment = DeploymentsClient.cancel_deployment(deployment_name=deployment_name)
  1. You can delete the deployment if you no longer want to access it.
deleted_deployment = DeploymentsClient.delete_deployment(deployment_name=deployment_name)
info

You can access the UI https://xcloud-app.stochastic.ai to visualize the deployments information and the logs.

2. Optimized deployment

2.1. Preparing your assets

  1. Create the directory ~/xcloud_opt_inference/model in your local file system where you will save your model. In this case we will save the GPT2 model.
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained(
"gpt2",
low_cpu_mem_usage=True
)

tokenizer.save_pretrained("~/xcloud_opt_inference/model")
model.save_pretrained("~/xcloud_opt_inference/model")
info

Your ~/xcloud_opt_inference directory should look like:

- xcloud_opt_inference
|- model
| |- config.json
| |- generation_config.json
| |- pytorch_model.bin
| |- special_tokens_map.json
| |- spiece.model
| |- tokenizer_config.json
| |- tokenizer.json

2.2. Uploading assets to AWS

Before starting the deployment you will have to upload your code to your cloud provider, for instance S3 or Google Storage.

To do that we will use cloudpathlib, a Python package that allows you to upload local files to AWS.

info

You can install the cloudpathlib library running the command pip install cloudpathlib[s3].

from cloudpathlib import CloudPath

CloudPath(
"s3://<your_bucket>/xcloud_opt_inference/"
).upload_from("~/xcloud_opt_inference")

2.3. Creating the optimized deployment

To create a deployment, you will have to configure the following parameters.

  1. Specify where your code is stored and the credentials to access it.
from xcloud import (
ModelSpecs,
Deployment,
MachineType,
Credentials,
DeploymentContainerSpecs,
DeploymentSpecs,
Batcher,
Scaling,
DeploymentOptimizationSpecs,
ModelConfig,
ModelType,
DTYPES,
GenerationParams,
Cloud
)
import os


# Configure your AWS credentials
creds = Credentials(
cloud=Cloud.AWS,
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
aws_region=os.environ["AWS_DEFAULT_REGION"]
)

model_specs = ModelSpecs(
model_path="s3://<your_bucket>/xcloud_opt_inference/",
credentials=creds
)
  1. Specify the instance type, wether you want to use a spot or non-spot instance and the optimization specifications.
container_specs = DeploymentContainerSpecs(
machine_type=MachineType.GPU_L4_1,
spot=True,
optimization_specs=DeploymentOptimizationSpecs(
model_type=ModelType.GPT2,
model_config=ModelConfig(
model_path="./model",
tokenizer_path="./model",
dtype=DTYPES.FP32,
generation_params=GenerationParams(
max_tokens=512
)
)
)
)
info

The options for the optimization specs are:

  • model_type (required): we need to know which model you are going to use to optimize it accordingly.
  • model_config:
    • model_path and tokenizer_path (required): we need to know the location of your model and tokenizer inside of the code_specs that you have provided. Your working directory is set to the root directory of your uploaded code, that's why you can specify "./model".
    • dtype (optional): the data type of your model weights. By default it is DTYPES.FP16. You can specify the following values: DTYPES.FP32, DTYPES.FP16 and DTYPES.BF16.
    • tensor_parallel_size (optional): tensor parallelism. By default it is 1.
    • max_batch_size (optional): maximum batch size that your model is able to process before getting CUDA out of memory.
    • max_tokens (optional): maximum number of tokens your model can process before getting CUDA out of memory.
    • generation_params (optional): parameters for the generate method.
      • temperature (optional): this parameter controls the randomness of the generated text. A higher value, like 1.0, makes the output more diverse and creative, as it samples from a wider range of possibilities. A lower value, like 0.5, makes the output more focused and deterministic, as it tends to select the most probable words. Default is 1.0
      • top_p (optional): this parameter sets a threshold for the cumulative probability of the next word candidates. It influences the diversity of the generated text. A higher value, like 1.0, means that more candidates are considered, while a lower value, like 0.8, limits the candidates to a narrower selection based on their probabilities. Default is 1.0
      • top_k (optional): this parameter limits the number of top probability words considered at each step of generation. If set to a positive value, like 50, only the top 50 words by probability are considered. If set to -1 (the default), all words are considered.
      • beam_search (optional): when set to True, beam search is used during generation. Beam search explores multiple possible sequences of words in parallel, keeping the most likely candidates at each step. This can lead to more coherent and structured output. Default is False
      • max_tokens (optional): this parameter restricts the maximum length of the generated text to a certain number of tokens. Default is 2048
      • presence_penalty (optional): this penalty encourages the model to avoid repeating the same words or phrases in the generated text. A non-zero value makes the model less likely to reuse words it has already used. Default is 0.0
      • frequency_penalty (optional): this penalty encourages the model to use less common words or phrases. A non-zero value makes the model more inclined to use a broader vocabulary and avoid overusing common words. Default is 0.0
  1. Configure the deployment specifications. You can configure the dynamic batching, the auto-scaling policy and wether you want to add authentication to your endpoint.
deployment_specs = DeploymentSpecs(
batcher=Batcher(max_batch_size=8, max_lantecy=500),
scaling=Scaling(min_replicas=1, max_replicas=3),
authentication=True
)
info
  • Batcher (dynamic batching): You can define the maximum batch size and waiting time (in milliseconds). The server will wait for a maximum of max_latency to create a batch of up to max_batch_size samples.

  • Scaling: You can set the min_replicas that will always be running, and the max_replicas that will scale up based on the chosen metric. Two available metrics for autoscaling deployments are:

    • SCALE_METRIC.CONCURRENCY: represents the number of simultaneous requests that each replica can process. With a target_scaling_value of 1, if there are 3 concurrent requests, the deployment will scale up to 3 replicas.
    • SCALE_METRIC.RPS: sets a target for requests per second per replica.
  • Authentication: When set to True, an API key will be generated for the deployment, which must be used for making inferences.

A deployment can also be configured to scale down to zero replicas after a period of inactivity. If the deployment scales down to 0 replicas, the next request will restart the server. Please note that there will be a cold start. Example:

# This deployment will scale down after 30 minutes = 180000 milliseconds
deployment_specs = DeploymentSpecs(
batcher=Batcher(max_batch_size=8, max_lantecy=500),
scaling=Scaling(min_replicas=0, max_replicas=1, time_before_scaling_to_zero=180000),
authentication=True
)
  1. Create the optimized deployment.
from xcloud import DeploymentsClient

deployment_name = "opt-deployment"
# Configure deployment
deployment = Deployment(
deployment_name=deployment_name,
model_specs=model_specs,
container_specs=container_specs,
deployment_specs=deployment_specs
)

deployment = DeploymentsClient.create_deployment(deployment)
  1. You can wait until your deployment is ready. It usually takes 5 - 10 min depending on your model and the selected Docker image.
DeploymentsClient.wait_until_deployment_is_ready(deployment_name=deployment_name)
  1. You can fetch the deployment information.
deployment = DeploymentsClient.get_deployment_by_name(deployment_name)
print(deployment)
  1. To fetch the logs you can run the following code. Note that logs is a Python dictionary where the key is the replica number and the value is a string containing the logs.
logs = DeploymentsClient.get_logs(deployment_name=deployment_name)

if logs.get("0") is not None:
print(logs["0"])
  1. Once the deployment is running, you can start doing the inference. Note that you will have to add the header x-api-key to the request because the authentication was enabled when the deployment was created.
import requests

response = requests.post(
url=deployment.inference.infer_endpoint,
headers={
"x-api-key": deployment.inference.api_key
},
json={
"instances": [
"There was a guy that"
]
}
)

response.raise_for_status()

response.json()
  1. You can cancel the deployment execution. This will delete the virtual machines associated to it, but you will be able to still access the logs and the deployment information.
cancelled_deployment = DeploymentsClient.cancel_deployment(deployment_name=deployment_name)
  1. You can delete the deployment if you no longer want to access it.
deleted_deployment = DeploymentsClient.delete_deployment(deployment_name=deployment_name)
info

You can access the UI https://xcloud-app.stochastic.ai to visualize the deployments information and the logs.