Fine-tuning
Requirements before starting to work with fine-tuning jobs.
Initiating a fine-tuning job involves launching an executor. This term is used because it enables the execution of various tasks beyond fine-tuning jobs. For instance, you can also use it to process your data and store the results in your cloud environment.
In this tutorial we will finetune the GPT2 model to showcase how easy is using xCloud.
1. Preparing your fine-tuning code
- Create a directory in your local file system where you will save your fine-tuning scripts. Let's create this example directory
~/xcloud_finetuning
. - Within this directory create your fine-tuning script
~/xcloud_finetuning/finetuning.py
. - Include the following code inside of it. This code is responsible of fine-tuning the GPT2 model. Feel free to modify it.
import argparse
from cloudpathlib import CloudPath, S3Client
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
DataCollatorForLanguageModeling,
TrainingArguments,
Trainer
)
import torch
from datasets import load_dataset
import os
LOCAL_DST_MODEL_DIR = "/asset/model_result"
LOCAL_TRAINING_LOGS = "/assets/logs"
LOCAL_CKPTS_DIR = "/assets/ckpts"
AWS_ACCESS_KEY_ID = os.environ["AWS_ACCESS_KEY_ID"]
AWS_SECRET_ACCESS_KEY = os.environ["AWS_SECRET_ACCESS_KEY"]
def define_arg_parser():
parser = argparse.ArgumentParser()
parser.add_argument(
"--saving_model_s3_dir",
help="S3 directory where the finetuned model will be saved",
type=str
)
parser.add_argument(
"--num_epochs",
help="Number of epochs that the model will be trained",
type=int
)
args = parser.parse_args()
return args
def upload_model(args):
s3_client = S3Client(
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)
model_cloud_path = CloudPath(cloud_path=args.saving_model_s3_dir, client=s3_client)
model_cloud_path.upload_from(LOCAL_DST_MODEL_DIR)
def do_training(args):
def preprocessing_fn(examples):
return tokenizer(examples["text"], return_tensors="pt", padding=True)
tokenizer = AutoTokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token
# Dummy dataset for testing the finetuning
dataset = load_dataset("tweet_eval", "emoji")
# Reduce the training samples for a fast fine-tuning
dataset['train'] = dataset['train'].select(range(1000))
tokenized_dataset = dataset.map(
preprocessing_fn,
batched=True,
num_proc=4,
remove_columns=dataset["train"].column_names,
)
model = AutoModelForCausalLM.from_pretrained(
"gpt2",
low_cpu_mem_usage=True
)
model = model.cuda()
data_collator = DataCollatorForLanguageModeling(
tokenizer=tokenizer,
mlm=False,
pad_to_multiple_of=8
)
# Define training args
training_args = TrainingArguments(
output_dir=LOCAL_CKPTS_DIR,
num_train_epochs=args.num_epochs,
evaluation_strategy="no",
# logging & evaluation strategies
logging_dir=LOCAL_TRAINING_LOGS,
logging_strategy="steps",
logging_steps=500,
save_strategy="epoch",
save_total_limit=2,
push_to_hub=False,
# finetuning hyper-parameters
per_device_train_batch_size=8,
learning_rate=2e-4,
lr_scheduler_type="cosine"
)
# Create Trainer instance
trainer = Trainer(
model=model,
args=training_args,
data_collator=data_collator,
train_dataset=tokenized_dataset["train"]
)
# Start training
trainer.train()
# It should only save the adapter weights
trainer.save_model(LOCAL_DST_MODEL_DIR)
tokenizer.save_pretrained(LOCAL_DST_MODEL_DIR)
def main():
args = define_arg_parser()
# Preprocess the dataset and do the training
do_training(args)
# Upload the model to the cloud
upload_model(args)
main()
Note that we will need to pass some environment variables for uploading the fine-tuned model to S3.
- Create a
~/xcloud_finetuning/requirements.txt
file with the following content.
transformers
torch
accelerate
datasets
sentencepiece
cloudpathlib[s3]
Your ~/xcloud_finetuning
directory should look like:
~/xcloud_finetuning
├── finetuning.py
└── requirements.txt
2. Uploading assets to AWS
Before starting an execution job you will have to upload your code to your cloud provider, for instance S3 or Google Storage. We are working to support other cloud providers and uploading your code from your local machine.
To do that we will use cloudpathlib
, a Python package that allows you to upload local files to AWS.
You can install the cloudpathlib
library running the command pip install cloudpathlib[s3]
.
from cloudpathlib import CloudPath
CloudPath(
"s3://<your_bucket>/xcloud_finetuning/"
).upload_from("~/xcloud_finetuning")
3. Creating the job
To create a job, you will have to configure the following parameters.
- Specify where your code is stored and the credentials to access it.
from xcloud import ExecutionContainerSpecs, CodeSpecs, ExecutionJob, Credentials, MachineType, Cloud
from xcloud import ExecutionJobsClient
import os
# Configure your AWS credentials
creds = Credentials(
cloud=Cloud.AWS,
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
aws_region=os.environ["AWS_DEFAULT_REGION"]
)
code_specs = CodeSpecs(
code_dir="s3://<your_bucket>/xcloud_finetuning/",
credentials=creds
)
- Specify the instance type, wether you want to use a spot or non-spot instance, the Docker image, the command, arguments, environment variables and secrets.
Kindly replace the --saving_model_s3_dir
parameter (located within args
) with the S3 path where you want to save the fine-tuned GPT2 model.
The secrets
won't be stored in the DB for security reasons. That's why you should put here your AWS credentials.
# Configure execution specs
container_specs = ExecutionContainerSpecs(
machine_type=MachineType.GPU_L4_1,
spot=True,
image="public.ecr.aws/t8g5g2q5/x-demo:finetuning",
command=["/bin/sh", "-c"],
args=["pip install -r requirements.txt ; python finetuning.py --saving_model_s3_dir s3://<your_bucket>/xcloud_finetuning/finetuned_gpt2 --num_epochs 1"],
secrets={
"AWS_ACCESS_KEY_ID": os.environ["AWS_ACCESS_KEY_ID"],
"AWS_SECRET_ACCESS_KEY": os.environ["AWS_SECRET_ACCESS_KEY"]
},
env={} # You can add ENV variables that are not considered secrets here.
)
- Initiate the job creation process. Keep in mind that you can provide a list of container specs for the job, enabling the execution of multi-node jobs. Although currently, only single-node execution is supported, we're actively developing the multi-node capability.
job_name = "xcloud-finetuning"
# Create the job
job = ExecutionJob(
job_name=job_name,
credentials=creds,
code_specs=code_specs,
container_specs=[container_specs],
location=Location.GCP_US_CENTRAL_1
)
job = ExecutionJobsClient.create_job(job)
We are currently supporting the following compute regions:
- Location.GCP_US_CENTRAL_1
- Location.AZURE_EAST_US
- You can fetch the execution job information.
from datetime import datetime
job = ExecutionJobsClient.get_job_by_name(job_name=job_name)
print(f"[+] Job name: {job.job_name}")
print(f"[+] Job status: {job.status}")
print(f"[+] Job execution specs: {job.container_specs}")
print(f"[+] Job started at: {datetime.fromtimestamp(job.metadata.started_at)}")
if job.metadata.ended_at is not None:
print(f"[+] Job ended at: {datetime.fromtimestamp(job.metadata.ended_at)}")
- Job name: the name must not exceed 30 characters and should consist solely of letters, numbers, and hyphens.
- Workspace ID: identifies the workspace in which the notebook was created.
- Status
- Container specs: a list specifying the container specifications for every node. Define the machine type, instance type (spot or non-spot), Docker image (commands and arguments), environment variables and secrets.
- Location: specifies the cloud provider and region of instance creation. Currently, only GCP cloud and the us-central1 region are supported, with more regions and cloud providers under development.
- Code specifications: indicates the location of the code to be executed.
- Metadata: contains information related to the creation, cancellation, and deletion of the job.
- To fetch the logs run the following code.
from xcloud import ExecutionJobsClient
logs = ExecutionJobsClient.get_logs(job_name=job_name)
if logs.get("0") is not None:
print(logs['0'])
- You can cancel the job execution. This will delete the virtual machines associated to it, but you will be able to still access the logs and the job information.
cancelled_job = ExecutionJobsClient.cancel_job(job_name=job_name)
- You can delete the job if you no longer want to access it.
deleted_job = ExecutionJobsClient.delete_job(job_name=job_name)
You can access the UI https://xcloud-app.stochastic.ai to visualize the jobs information and the logs.