本文展示了一些提高 DALI 资源使用率以及创建一个完全基于 CPU 的管道的技术。这些技术长期稳定内存使用率,将 CPU & GPU 管道的 batch 大小提高 50%。用特斯拉 V100 加速器显示 PyTorch+DALI 可以达到接近 4000 个图像/秒的处理速度,比原生 PyTorch 快了大约 4 倍。
简介
过去几年见证了深度学习硬件的长足进步。英伟达的最新产品,Tesla V100 & Geforce RTX 系列,包含特定的张量核,以加速常用的神经网络操作。特别是,V100 已经具备足够的性能。能够以每秒数千幅图像的速度训练神经网络。这使得在 ImageNet 数据集上的单一 GPU 训练时间减少到几个小时。而在 202 年,在 ImageNet 上训练 AlexNet 模型花了 5 天时间!
如此强大的 gpu 使数据预处理管道变得紧张。为了解决这个问题,Tensorflow 发布了一个新的数据加载器:tf.data.Dataset。管道是用 C++ 编写的,使用基于图的方法,预处理操作被链接在一起形成一个管道。另一方面,PyTorch 使用在 PIL 库上用 Python 编写的数据加载器,它具备良好的易于用和灵活性,诞生在速度方面不是那么出色。尽管 PIL-SIMD 库确实改善了这种情况。
NVIDIA 数据加载库(DALI)旨在解决数据预处理瓶颈,让数据在训练时全速运行。DALI 主要用于在 GPU 上进行预处理,但是其大多数操作也有一个快速的 CPU 实现。本文主要关注 PyTorch,但 DALI 也支持 Tensorflow、MXNet 和 TensorRT,尤其是 TensorRT 的支持非常好。它允许训练和推理使用完全相同的预处理代码。Tensorflow 和 PyTorch 这样的框架在数据加载器之间通常具有一定的差异,这可能会影响准确性。
以下是开始使用 DALI 的一些重要资源:
DALI Home:https://developer.nvidia.com/DALI
Fast AI Data Preprocessing with NVIDIA DALI:https://devblogs.nvidia.com/fast-ai-data-preprocessing-with-nvidia-dali/
DALI Developer Guide:https://docs.nvidia.com/deeplearning/sdk/dali-developer-guide/docs/index.html
Getting Started:https://docs.nvidia.com/deeplearning/sdk/dali-developer-guide/docs/examples/getting%20started.html
在本文的其余部分中,我将假设你对 ImageNet 预处理及 DALI ImageNet 实例有一定的理解。我来谈谈在使用 DALI 的时候遇到的问题,以及我是如何解决的。我们将研究 CPU 和 GPU 管道。
DALI 长期内存使用
我在 DALI 中遇到的第一个问题是,随着训练阶段的推移,RAM 的使用率增加,这都会导致 OOM 错误(即使在内存为 78GB 的虚拟机上也是如此)。它已经被标记位(278,344,486),但是还没有被修复。
我唯一能找到的解决办法并不美好:重新导入 DALI,重新训练和验证管道:
del self.train_loader, self.val_loader, self.train_pipe,
self.val_pipe
torch.cuda.synchronize()
torch.cuda.empty_cache()
gc.collect()
importlib.reload(dali)
from dali import HybridTrainPipe, HybridValPipe, DaliIteratorCPU,
DaliIteratorGPU
<rebuild DALI pipeline>
注意,有了这个解决方案,DALI 仍然需要大量 RAM 来获得最好的结果。考虑到现在的 RAM 有多便宜,这不是什么大问题;相反,GPU 内存才是问题所在。从下表可以看出,使用 DALI 时的最大批的大小可能比 TorchVision 低 50%:
在下面的部分中,我将介绍一些减少 GPU 内存使用的方法。
构建完全基于 CPU 的管道
当不需要峰值吞吐量时(例如,当使用 ResNet50 等中大型模型时),基于 CPU 的管道非常有用。CPU 训练管道只在 CPU 上执行解码和大小调整操作,而 Cropmirnormalize 操作在 GPU 上运行。这点很重要。我发现,即使是用 DALI 将输出传输到 GPU,也会占用大量的 GPU 内存。为了避免这种情况,我修改了示例 CPU 管道,使其完全在 CPU 上运行:
class HybridTrainPipe(Pipeline):
def __init__(self, batch_size, num_threads, device_id, data_dir,
crop,
mean, std, local_rank=0, world_size=1,
dali_cpu=False, shuffle=True, fp16=False,
min_crop_size=0.08):
# As we're recreating the Pipeline at every epoch, the seed
must be -1 (random seed)
super(HybridTrainPipe, self).__init__(batch_size,
num_threads, device_id, seed=-1)
# Enabling read_ahead slowed down processing ~40%
self.input = ops.FileReader(file_root=data_dir,
shard_id=local_rank, num_shards=world_size,
random_shuffle=shuffle)
# Let user decide which pipeline works best with the chosen
model
if dali_cpu:
decode_device = "cpu"
self.dali_device = "cpu"
self.flip = ops.Flip(device=self.dali_device)
else:
decode_device = "mixed"
self.dali_device = "gpu"
output_dtype = types.FLOAT
if self.dali_device == "gpu" and fp16:
output_dtype = types.FLOAT16
self.cmn = ops.CropMirrorNormalize(device="gpu",
output_dtype=output_dtype,
output_layout=types.NCHW,
crop=(crop, crop),
image_type=types.RGB,
mean=mean,
std=std,)
# To be able to handle all images from full-sized ImageNet,
this padding sets the size of the internal nvJPEG buffers without
additional reallocations
device_memory_padding = 211025920 if decode_device == 'mixed'
else 0
host_memory_padding = 140544512 if decode_device == 'mixed'
else 0
self.decode =
ops.ImageDecoderRandomCrop(device=decode_device,
output_type=types.RGB,
device_memory_padding=device_memory_padding,
host_memory_padding=host_memory_padding,
random_aspect_ratio=
[0.8, 1.25],
random_area=
[min_crop_size, 1.0],
num_attempts=100)
# Resize as desired. To match torchvision data loader, use
triangular interpolation.
self.res = ops.Resize(device=self.dali_device, resize_x=crop,
resize_y=crop,
interp_type=types.INTERP_TRIANGULAR)
self.coin = ops.CoinFlip(probability=0.5)
print('DALI "{0}" variant'.format(self.dali_device))
def define_graph(self):
rng = self.coin()
self.jpegs, self.labels = self.input(name="Reader")
# Combined decode & random crop
images = self.decode(self.jpegs)
# Resize as desired
images = self.res(images)
if self.dali_device == "gpu":
output = self.cmn(images, mirror=rng)
else:
# CPU backend uses torch to apply mean & std
output = self.flip(images, horizontal=rng)
self.labels = self.labels.gpu()
return [output, self.labels]
DALI 管道现在在 CPU 上输出一个 8 位张量。我们需要使用 PyTorch 来完成 CPU->GPU 传输、浮点数转换和规范化。最后两个操作是在 GPU 上完成的,因为在实践中,它们非常快,并且减少了 CPU->GPU 内存带宽需求。在转到 GPU 之前,我试着固定张力,但没有从中获得任何性能提升。
将它与预取器组合在一起:
def _preproc_worker(dali_iterator, cuda_stream, fp16, mean, std,
output_queue, proc_next_input, done_event, pin_memory):
"""
Worker function to parse DALI output & apply final preprocessing
steps
"""
while not done_event.is_set():
# Wait until main thread signals to proc_next_input --
normally once it has taken the last processed input
proc_next_input.wait()
proc_next_input.clear()
if done_event.is_set():
print('Shutting down preproc thread')
break
try:
data = next(dali_iterator)
# Decode the data output
input_orig = data[0]['data']
target = data[0]['label'].squeeze().long() # DALI should
already output target on device
# Copy to GPU and apply final processing in separate CUDA
stream
with torch.cuda.stream(cuda_stream):
input = input_orig
if pin_memory:
input = input.pin_memory()
del input_orig # Save memory
input = input.cuda(non_blocking=True)
input = input.permute(0, 3, 1, 2)
# Input tensor is kept as 8-bit integer for transfer
to GPU, to save bandwidth
if fp16:
input = input.half()
else:
input = input.float()
input = input.sub_(mean).div_(std)
# Put the result on the queue
output_queue.put((input, target))
except StopIteration:
print('Resetting DALI loader')
dali_iterator.reset()
output_queue.put(None)
class DaliIteratorCPU(DaliIterator):
"""
Wrapper class to decode the DALI iterator output & provide
iterator that functions in the same way as TorchVision.
Note that permutation to channels first, converting from 8-bit
integer to float & normalization are all performed on GPU
pipelines (Pipeline): DALI pipelines
size (int): Number of examples in set
fp16 (bool): Use fp16 as output format, f32 otherwise
mean (tuple): Image mean value for each channel
std (tuple): Image standard deviation value for each channel
pin_memory (bool): Transfer input tensor to pinned memory, before
moving to GPU
"""
def __init__(self, fp16=False, mean=(0., 0., 0.), std=(1., 1.,
1.), pin_memory=True, **kwargs):
super().__init__(**kwargs)
print('Using DALI CPU iterator')
self.stream = torch.cuda.Stream()
self.fp16 = fp16
self.mean = torch.tensor(mean).cuda().view(1, 3, 1, 1)
self.std = torch.tensor(std).cuda().view(1, 3, 1, 1)
self.pin_memory = pin_memory
if self.fp16:
self.mean = self.mean.half()
self.std = self.std.half()
self.proc_next_input = Event()
self.done_event = Event()
self.output_queue = queue.Queue(maxsize=5)
self.preproc_thread = threading.Thread(
target=_preproc_worker,
kwargs={'dali_iterator': self._dali_iterator,
'cuda_stream': self.stream, 'fp16': self.fp16, 'mean': self.mean,
'std': self.std, 'proc_next_input': self.proc_next_input,
'done_event': self.done_event, 'output_queue': self.output_queue,
'pin_memory': self.pin_memory})
self.preproc_thread.daemon = True
self.preproc_thread.start()
self.proc_next_input.set()
def __next__(self):
torch.cuda.current_stream().wait_stream(self.stream)
data = self.output_queue.get()
self.proc_next_input.set()
if data is None:
raise StopIteration
return data
def __del__(self):
self.done_event.set()
self.proc_next_input.set()
torch.cuda.current_stream().wait_stream(self.stream)
self.preproc_thread.join()
基于 GPU 的管道
在我的测试中,上面详述的新的完整 CPU 管道的速度大约是 TooVIEW 数据加载程序的两倍,同时达到了几乎相同的最大批大小。CPU 管道在 ResNet50 这样的大型模型中工作得很好,但是,当使用 AlexNet 或 ResNet18 这样的小型模型时,CPU 管道仍然无法跟上 GPU。对于这些情况,示例 GPU 管道表现最好。问题是,GPU 管道将最大可能的批大小减少了 50%,限制了吞吐量。
显著减少 GPU 内存使用的一种方法是,在一个阶段结束时,将验证管道保留在 GPU 之外,直到它真正需要被使用为止。这很容易做到,因为我们已经重新导入 DALI 库并在每个阶段重新创建数据加载程序。
更多提示
使用 DALI 的更多提示:
对于验证,均匀划分数据集大小的批大小最有效,例如当验证集大小为 50000 时,最好的批大小是 500 而不是 512,这避免了验证数据集会剩余一部分。
与 Tensorflow 和 PyTorch 数据加载程序类似,TorchVision 和 DALI 管道不会产生完全相同的输出,你将看到验证精度略有不同。我发现这是由于不同的 JPEG 图像解码器造成的。以前在大小调整上有问题,但现在是管道固定。另一方面,DALI 支持 TensorRT,允许在训练和推理中使用完全相同的预处理。
对于峰值吞吐量,请尝试将数据加载程序工作线程数设置为虚拟 CPU 内核数。2 提供最佳性能(2 个虚拟内核=1 个物理内核)。
如果你想要绝对的最佳性能,并且不介意输出类似于 TorchVision,请尝试关闭 DALI 图像调整器上的三角形插值。
别忘了磁盘 IO。确保有足够的内存来缓存数据集以及一个非常快的 SSD。DALI 的磁盘传输速度可以达到 400Mb/s!
集成在一起
为了方便地集成这些修改,我创建了一个数据加载器类,其中包含了这里描述的所有修改,包括 DALI 和 TorchVision 后端。用法很简单。实例化数据加载器:
dataset = Dataset(data_dir,
batch_size,
val_batch_size
workers,
use_dali,
dali_cpu,
fp16)
然后获取训练和验证数据集加载程序:
train_loader = dataset.get_train_loader()
val_loader = dataset.get_val_loader()
在每个训练周期结束时重置数据加载器:
dataset.reset()
或者,可以在模型验证之前在 GPU 上重新创建验证管道:
dataset.prep_for_val()
基准
以下是我可以用 ResNet18 使用的最大批处理大小:
因此,通过应用这些修改,在 CPU 和 GPU 模式下 DALI 可以使用的最大批处理大小增加了约 50%!
以下是 Shufflenet V2 0.5 和批大小 512 的吞吐量数据:
下面是使用 DALI GPU 管道训练 TorchVision 中包含的各种网络的一些结果:
所有测试都在一个 Google Cloud V100 实例上运行,该实例有 12 个 vCPUs(6 个物理核)、78GB RAM,使用 Apex FP16 进行训练。要重现这些结果,请使用以下参数:
— fp16 — batch-size 512 — workers 10 — arch “shufflenet_v2_x0_5 or resnet18” — prof — use-dali
所以,有了DALI,一台 Tesla V100 的处理速度可以达到每秒处理近 4000 张图像!但这仅仅是 Nvidia 超昂贵的 DGX-1 8 V100 GPU 的一半多一点。对我来说,能够在几个小时内在一个 GPU 上进行 ImageNet 训练完全改变了生产力,希望对你来说也是如此!
本文提供的代码可以在如下网址找到:https://github.com/yaysummeriscoming/DALI_pytorch_demo
雷锋网雷锋网雷锋网