https://miro.medium.com/max/1200/0*rG84alRr_8Kex2Ha
Original Source Here
Training in PyTorch from Amazon S3
How to Maximize Data Throughput and Save Money
In previous posts (such as here and here) we discussed different options for streaming data from Amazon S3 into a TensorFlow training session. In this post we revisit the topic of training from S3, this time with an emphasis on PyTorch training.
As in our previous posts, the scenario that we address is one in which our dataset is so large that it either:
- cannot be downloaded in its entirety onto the training instance, or
- downloading it would introduce a significant delay to our training.
There are different approaches to addressing this challenge. One approach, which is out of the scope of this post, is to set up some form of a persistent storage system that mirrors the S3 data and that is readily available and accessible as soon as our training instance starts up. One way to implement this with Amazon FSx. While there are advantages to this approach (as discussed here) it might entail a considerable amount of extra maintenance and cost. The approach that we adopt in this post is to stream the data directly from S3 into our training loop.
Streaming Data from Amazon S3
While streaming the data from Amazon S3 directly into the training loop may sound simple, if not designed well, it could become a bottleneck in your training pipeline. In this undesired situation your system’s compute resources will be idle while they wait for data from S3 to arrive. Our goal is to maximize utilization of the system resources and, by extension, the speed of training. There are a number of factors that control the degree to which streaming data from S3 might impact the training step time, including the following:
- The size of the network input bandwidth of your instance type. This is a property of the training instance type you have chosen.
- The total amount of data (in bytes) required for a training step. You should strive to reduce the size of each data sample by streaming only the data required for training and considering different compression techniques. (Although, you should also take into consideration the additional compute that will be required for decompression.)
- The choice of file format in which the data is stored. For example, you may be better off with a sequential file format such as the WebDataset or TFRecord formats over a format that requires downloading entire files in order to be able to open and parse them.
- The sizes of the individual files in which your data is stored can also impact the performance of data streaming. For example, storing each data sample in a separate file that is a single megabyte in size will increase the overhead of the transactions with S3. We strive to store our data in multiple files, a few hundred megabytes in size, each of which contain sequences of multiple data samples. It should be noted that storing your data in this manner introduces other challenges which we have addressed in a previous post.
- The tools used to perform the data streaming, as will be discussed in this post.
In this post we will explore a number of ways to train from S3. Please do not interpret our mention, or lack thereof, of one method or another as an endorsement or a rejection. We believe that it is important to be familiar with multiple methods. Each has its own strengths and weaknesses and the best option will likely depend on the details of your project.
As the landscape of machine learning continues to evolve, so do many of the supporting frameworks and libraries. Please keep in mind that some of the APIs and tools we mention may become outdated by the time you read this post. We highly recommend that you stay versed with the most up-to-date tools available as these may include substantial enhancements and optimizations.
Although our focus will be on training using PyTorch (versions 1.10 and 1.11) and from Amazon S3, much of what we say will be just as relevant to other training frameworks and other object storage services.
Special thanks to Yitzhak Levi for his help in creating this post.
Measuring Throughput
In the next sections we will review different methods for streaming data from S3. We will compare the methods by calculating the training throughput as measured by the number of data samples that are being fed into the training loop per second. In order to focus on the performance of the data streaming alone, we will measure the throughput in the case of an empty training step as shown in the code block below.
import torch, time
from statistics import mean, variancedataset=get_dataset()
dl=torch.utils.data.DataLoader(dataset, batch_size=4, num_workers=4)stats_lst = []
t0 = time.perf_counter()
for batch_idx, batch in enumerate(dl, start=1):
if batch_idx % 100 == 0:
t = time.perf_counter() - t0
print(f'Iteration {batch_idx} Time {t}')
statistics_lst.append(t)
t0 = time.perf_counter()mean_calc = mean(stats_lst[1:])
var_calc = variance(stats_lst[1:])print(f'mean {mean_calc} variance {var_calc}')
It is important to keep in mind that while this comparative measurement might give us a good idea of the maximum throughput that each method can support, it may not provide a good prediction of how the method you choose will impact the actual training throughput for two primary reasons:
- Your overall training step time might not be impacted by the method you choose. For example, if your training step is compute intensive it might make no difference whether pulling a file from S3 takes 1 second or 10 seconds.
- A typical training step will include many additional operations that may impact that actual training throughput. In particular, some of the operations may contend for the same resources that are streaming the data from S3.
Another technique we often use that measures the impact of the data streaming on the overall speed of training is to measure how the step time changes when running on cached data samples rather than on the streamed data samples as shown below.
import torch, time
from statistics import mean, variancedataset=get_dataset()
dl=torch.utils.data.DataLoader(dataset, batch_size=4, num_workers=4)batch = next(iter(dl))
t0 = time.perf_counter()
for batch_idx in range(1,1000):
train_step(batch)
if batch_idx % 100 == 0:
t = time.perf_counter() - t0
print(f'Iteration {batch_idx} Time {t}')
t0 = time.perf_counter()
Toy Example — WebDataset
For the purpose of demonstration we will use a synthetic dataset comprised of random images and image segmentations stored in the WebDataset file format, a format based on tar files specifically designed for training with large datasets. Specifically, we generated multiple 400 megabyte tar files using the following code block:
import webdataset as wds
import numpy as np
from PIL import Image
import ioout_tar = 'wds.tar'
sink = wds.TarWriter(out_tar)
im_width = 1024
im_height = 1024
num_classes = 256for i in range(100):
image = Image.fromarray(np.random.randint(0, high=256,
size=(im_height,im_width,3), dtype=np.uint8))
label = Image.fromarray(np.random.randint(0, high=num_classes,
size=(im_height,im_width), dtype=np.uint8))
image_bytes = io.BytesIO()
label_bytes = io.BytesIO()
image.save(image_bytes, format='PNG')
label.save(label_bytes, format='PNG')
sample = {"__key__": str(i),
f'image': image_bytes.getvalue(),
f'label': label_bytes.getvalue()}
sink.write(sample)
Streaming From Amazon S3
In this section we will review a number of tools and techniques for streaming data from Amazon S3. The review is by no means exhaustive; there are many additional tools that we do not cover here. We will demonstrate some of the options using the WebDataset example we showed above. We broadly divide the solutions into two types, solutions in which we explicitly pull the data from S3 into the training environment vs. solutions that expose a file-system style interface to the application.
File Object Download
A number of solutions for training from Amazon S3 involve explicitly downloading the data into the local training environment.
Object Download With AWS CLI:
One of the simplest ways to pull a file from S3 is using the AWS Command Line Interface tool. The command below will download an object file stored in S3:
aws s3 cp s3://<path in s3>/wds0.tar -
Replacing the hyphen with a local path will result in the file being saved to the local disk. For more details on the usage of this tool see here.
The WebDataset library supports piping in a bytestream from a file that was pulled using an AWS S3 cp command. We demonstrate how to create a PyTorch dataset in this manner in the code block below:
import io, webdataset
def get_dataset():
urls = [f's3://<path in s3>/{i}.tar' for i in range(num_files)]
# add awscli command to urls
urls = [f'pipe:aws s3 cp {url} -' for url in urls]
dataset = (
webdataset.WebDataset(urls, shardshuffle=True)
.shuffle(10)
)
return dataset
Object Download With Boto3:
Boto3 is a Python library that enables downloading object files from S3. The function below demonstrates how to pull the contents of the file data into a bytestream in local memory.
import boto3, io, webdataset
client = boto3.client("s3")def get_bytes_io(path):
byte_io = io.BytesIO()
_, bucket, key, _ = re.split("s3://(.*?)/(.*)$", path)
client.download_fileobj(bucket, key, byte_io)
byte_io.seek(0)
return byte_io
Although WebDataset does not include native support for this usage, we can easily add it by overriding the url_opener function in webdataset.tariterators:
import webdataset
def url_opener(data, handler=reraise_exception, **kw):
url = sample["url"]
try:
stream = get_bytes_io(url)
sample.update(stream=stream)
yield sample
except Exception as exn:
exn.args = exn.args + (url,)
if handler(exn):
continue
else:
breakwebdataset.tariterators.url_opener = url_openerdef get_dataset():
urls = [f's3://<path in s3>/{i}.tar' for i in range(num_files)]
dataset = (
webdataset.WebDataset(urls, shardshuffle=True)
.shuffle(10)
)
return dataset
SageMaker Pipe Mode:
Amazon SageMaker is a managed service provided by AWS for performing cloud based machine learning at scale. The numerous utility tools it offers include dedicated APIs for interfacing with training data that is stored in Amazon S3. The SageMaker documentation details the different data input modes that are supported. We expanded on some of the properties of SageMaker Pipe Mode in a previous post. Pipe Mode is yet another way of explicitly pulling and streaming data from S3 into the local training environment. When using Pipe Mode, the training data is ingested through a dedicated Linux FIFO pipe.
with open(fifo_path, ‘rb’, buffering=0) as fifo:
# read and parse data stream to yield samples
A disadvantage of pipe mode is that it requires parsing the incoming data stream. This means that its use is limited to file formats that support parsing in this manner. In a previous post we demonstrated the construction of a training input pipeline in this manner.
S3 Access Solutions
While applications are typically programmed to work with file systems, Amazon S3 is an object storage, not a file system. A number of solutions are designed to bridge this gap by exposing a file system like interface to Amazon S3.
S3Fs:
S3Fs is one of several FUSE based Python solutions for mounting an S3 bucket as a file system. While WebDataset does not include native support for using S3Fs, we can override the url_opener function in webdataset.tariterators to use it. Note that to use S3Fs with PyTorch, we need to set the multiprocessing start method to “spawn”.
torch.multiprocessing.set_start_method('spawn')import s3fs, webdataset
fs = s3fs.S3FileSystem()def url_opener(data, handler=reraise_exception, **kw):
url = sample["url"]
try:
stream = fs.open(url.replace("s3://", ""), mode='rb')
sample.update(stream=stream)
yield sample
except Exception as exn:
exn.args = exn.args + (url,)
if handler(exn):
continue
else:
breakwebdataset.tariterators.url_opener = url_openerdef get_dataset():
urls = [f's3://<path in s3>/{i}.tar' for i in range(num_files)]
dataset = (
webdataset.WebDataset(urls, shardshuffle=True)
.shuffle(10)
)
return dataset
Check out this cool post for more on FUSE based methods for pulling S3 data including alternatives to S3Fs such as goofys, and rclone.
Amazon S3 PyTorch Plug-in:
Last year AWS announced the release of a dedicated library for pulling data from S3 into a PyTorch training environment. Details of this plug-in, including usage instructions, can be found in this github project. It should be noted that the authors recently announced the deprecation of this library as well as plans to replace it with S3 IO support in the TorchData library. (More on this below.) The code block below demonstrates the creation of an iterable PyTorch dataset with our toy WebDataset files using the S3 PyTorch plug-in.
from awsio.python.lib.io.s3.s3dataset import S3IterableDatasetclass S3_Dataset(torch.utils.data.IterableDataset):
def __init__(self, urls):
self._s3_iter_dataset = S3IterableDataset(urls, True) def data_generator(self):
try:
while True:
image_fname, image_fobj = next(self._s3_iter)
label_fname, label_fobj = next(self._s3_iter)
yield {
'image': image_fobj,
'label': label_fobj
} except StopIteration:
return def __iter__(self):
self._s3_iter = iter(self._s3_iter_dataset)
return self.data_generator()def get_dataset():
urls = [f's3://<path in s3>/{i}.tar' for i in range(num_files)]
dataset = S3_Dataset(urls)
return dataset
SageMaker Fast File Mode:
Amazon SageMaker offers an additional FUSE based solution for accessing files in S3 call Fast File Mode (FFM). When you program a SageMaker job to use the Fast File Input Mode, an S3 path is mounted onto a predefined local file path. In a recent post we expanded on this Input Mode option, demonstrated its use, and discussed its pros and cons. Adopting our WebDataset to use FFM is straightforward:
import os, webdataset
def get_dataset():
ffm = os.environ['SM_CHANNEL_TRAINING']
urls = [os.path.join(ffm, f'{i}.tar') for i in range(num_files)]
dataset = (
webdataset.WebDataset(urls, shardshuffle=True)
.shuffle(10)
)
return dataset
Note that as of the time of this writing, FFM performance may be dependent on the number of files as well as on the number of partitions in the predefined S3 path.
Results
The table below includes the average step time we received when running the empty training loop from above on different datasets on an EC2 c5.xlarge instance. These results are provided as an example of the type of comparative performance results you might get. We caution against drawing any conclusions from these results regarding your own project as the performance is likely to be highly dependent on the details of the training model and data.
Streaming Using TorchData Pipelines
TorchData is an exciting new PyTorch library for creating PyTorch datasets. It is currently released as a beta product and requires PyTorch version 1.11 (or above). The official release is expected in the coming months. The TorchData gihub project page includes information on the libraries design as well as its API documentation and examples. TorchData incorporates many building block modules for creating data pipelines including modules for loading a dataset stored in WebDataset format and modules for pulling data from S3.
In the sections below we demonstrate a few solutions enabled by the TorchData library. Keep in mind that some of the APIs may undergo modification before the official release of the library.
Using IoPathFileOpener:
The IoPathFileOpener supports loading files directly from cloud storage. It depends on the iopath I/O abstraction library. The code block below demonstrates its use:
from torchdata.datapipes.iter import IoPathFileOpener, IterableWrapperdef get_dataset():
urls = [f's3://<path in s3>/{i}.tar' for i in range(num_files)]
urls = IterableWrapper(urls).shuffle().cycle()
tars = IoPathFileOpener(urls, mode="rb").load_from_tar()
samples = tars.groupby(lambda x:
os.path.basename(x[0]).split(".")[0],
group_size=2, guaranteed_group_size=2)
dataset = samples.map(lambda x:
{'image': x[0][1].read(),
'label': x[0][1].read()})
dataset = dataset.shuffle(buffer_size=10)return dataset
Using FSSpecFileOpener:
The FSSpecFileOpener supports a similar functionality, this time based on S3Fs library. The code block below demonstrates its use:
from torchdata.datapipes.iter import FSSpecFileOpener, IterableWrapperdef get_dataset():
urls = [f's3://<path in s3>/{i}.tar' for i in range(num_files)]
urls = IterableWrapper(urls).shuffle().cycle()
tars = FSSpecFileOpener(urls, mode="rb").load_from_tar()
samples = tars.groupby(lambda x:
os.path.basename(x[0]).split(".")[0],
group_size=2, guaranteed_group_size=2)
dataset = samples.map(lambda x:
{'image': x[0][1].read(),
'label': x[0][1].read()})
dataset = dataset.shuffle(buffer_size=10)
return dataset
With SageMaker FFM:
When training using Amazon SageMaker, we can also use the standard FileOpener class by using FFM and pointing to the local file mount.
import os
from torchdata.datapipes.iter import FileOpener, IterableWrapperdef get_dataset():
ffm = os.environ['SM_CHANNEL_TRAINING']
urls = [os.path.join(ffm, f'{i}.tar') for i in range(num_files)]
tars = FileOpener(urls, mode="rb").load_from_tar()
samples = tars.groupby(lambda x:
os.path.basename(x[0]).split(".")[0],
group_size=2, guaranteed_group_size=2)
dataset = samples.map(lambda x:
{'image': x[0][1].read(),
'label': x[0][1].read()})
dataset = dataset.shuffle(buffer_size=10)
return dataset
Using S3FileLoader:
A reincarnation of the Amazon S3 PyTorch Plug-in from above, S3FileLoader is a datapipe specially created by AWS for loading data from S3. At the time of this writing it is not included in the default torchdata package and requires some installation steps as documented here.
from torchdata.datapipes.iter import S3FileLoader, IterableWrapperdef get_dataset():
urls = [f's3://<path in s3>/{i}.tar' for i in range(num_files)]
urls = IterableWrapper(urls).shuffle().cycle()
tars = S3FileLoader(urls).load_from_tar()
samples = tars.groupby(lambda x:
os.path.basename(x[0]).split(".")[0],
group_size=2, guaranteed_group_size=2)
dataset = samples.map(lambda x:
{'image': x[0][1].read(),
'label': x[0][1].read()})
dataset = dataset.shuffle(buffer_size=10)
return dataset
Results
Once again, we share average step times when running the same empty training loop from above on different datasets created with the new TorchData library and on an EC2 c5.xlarge instance.
Although we remain cautious about drawing any meaningful conclusions from these results, it seems as though the new TorchData offers an upgrade not only in the features it offers but also in the speed of streaming from S3.
Summary
In this post we have reviewed several options for streaming data from Amazon S3 into a training environment. This list is by no means comprehensive; many additional tools and techniques abound.
We believe that familiarizing yourself with several techniques is crucial as you may find that your top choice is either not relevant to one of your projects, or that its performance is unsatisfactory.
Please feel free to reach out to me with any comments, corrections, or questions.
AI/ML
Trending AI/ML Article Identified & Digested via Granola by Ramsey Elbasheer; a Machine-Driven RSS Bot