将 DTensor 与 Keras 一起使用

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

概述

在本教程中,您将学习如何将 DTensor 与 Keras 一起使用。

通过将 DTensor 与 Keras 集成,您可以重用现有的 Keras 层和模型来构建和训练分布式机器学习模型。

您将使用 MNIST 数据训练多层分类模型。本文将演示如何设置子类化模型、序贯模型和函数式模型的布局。

本教程假设您已经阅读了 DTensor 编程指南,并且熟悉基本的 DTensor 概念,例如 MeshLayout

本教程基于 https://tensorflow.google.cn/datasets/keras_example

安装

DTensor 是 TensorFlow 2.9.0 版本的一部分。

pip install --quiet --upgrade --pre tensorflow tensorflow-datasets

接下来,导入 tensorflowtensorflow.experimental.dtensor,并将 TensorFlow 配置为使用 8 个虚拟 CPU。

尽管本示例使用了 CPU,但 DTensor 在 CPU、GPU 或 TPU 设备上的工作方式相同。

import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.experimental import dtensor
2023-11-07 23:29:54.584316: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2023-11-07 23:29:54.584359: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2023-11-07 23:29:54.585900: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
def configure_virtual_cpus(ncpu):
  phy_devices = tf.config.list_physical_devices('CPU')
  tf.config.set_logical_device_configuration(
        phy_devices[0], 
        [tf.config.LogicalDeviceConfiguration()] * ncpu)

configure_virtual_cpus(8)
tf.config.list_logical_devices('CPU')

devices = [f'CPU:{i}' for i in range(8)]

确定性伪随机数生成器

您应当注意的一件事是 DTensor API 要求每个正在运行的客户端具有相同的随机种子,以便它可以具有用于初始化权重的确定性行为。可以通过 tf.keras.utils.set_random_seed() 在 Keras 中设置全局种子来实现此目的。

tf.keras.backend.experimental.enable_tf_random_generator()
tf.keras.utils.set_random_seed(1337)

创建数据并行网格

本教程演示数据并行训练。适应模型并行训练和空间并行训练可以像切换到一组不同的 Layout 对象一样简单。有关数据并行之外的分布式训练的更多信息,请参阅 DTensor 深入机器学习教程

数据并行训练是一种常用的并行训练方案,也被诸如 tf.distribute.MirroredStrategy 等使用。

使用 DTensor,数据并行训练循环使用由单个“批次”维度组成的 Mesh,其中每个设备都会运行模型的副本,从全局批次接收分片。

mesh = dtensor.create_mesh([("batch", 8)], devices=devices)

由于每个设备都运行模型的完整副本,模型变量应在网格中完全复制(不分片)。例如,此 Mesh 上 2 秩权重的完全复制布局如下:

example_weight_layout = dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh)  # or
example_weight_layout = dtensor.Layout.replicated(mesh, rank=2)

Mesh 上 2 秩数据张量的布局将沿第一个维度进行分片(有时称为 batch_sharded),

example_data_layout = dtensor.Layout(['batch', dtensor.UNSHARDED], mesh)  # or
example_data_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)

使用布局创建 Keras 层

在数据并行方案中,您通常使用完全复制的布局创建模型权重,以便模型的每个副本都可以使用分片输入数据进行计算。

为了为您的层权重配置布局信息,Keras 在层构造函数中为大多数内置层公开了一个额外的参数。

以下示例使用完全复制的权重布局构建了一个小型图像分类模型。您可以通过参数 kernel_layoutbias_layouttf.keras.layers.Dense 中指定布局信息 kernelbias。大多数内置 Keras 层都可以显式地指定层权重的 Layout

unsharded_layout_2d = dtensor.Layout.replicated(mesh, 2)
unsharded_layout_1d = dtensor.Layout.replicated(mesh, 1)
model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28, 28)),
  tf.keras.layers.Dense(128, 
                        activation='relu',
                        name='d1',
                        kernel_layout=unsharded_layout_2d, 
                        bias_layout=unsharded_layout_1d),
  tf.keras.layers.Dense(10,
                        name='d2',
                        kernel_layout=unsharded_layout_2d, 
                        bias_layout=unsharded_layout_1d)
])

您可以通过检查权重的 layout 属性来查看布局信息。

for weight in model.weights:
  print(f'Weight name: {weight.name} with layout: {weight.layout}')
  break
Weight name: d1/kernel:0 with layout: Layout.from_string(sharding_specs:unsharded,unsharded, mesh:|batch=8|0,1,2,3,4,5,6,7|0,1,2,3,4,5,6,7|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5,/job:localhost/replica:0/task:0/device:CPU:6,/job:localhost/replica:0/task:0/device:CPU:7)

加载数据集并构建输入流水线

加载一个 MNIST 数据集并为其配置一些预处理输入流水线。数据集本身与任何 DTensor 布局信息不关联。我们计划在未来的 TensorFlow 版本中改进 DTensor Keras 与 tf.data 的集成。

(ds_train, ds_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)
def normalize_img(image, label):
  """Normalizes images: `uint8` -> `float32`."""
  return tf.cast(image, tf.float32) / 255., label
batch_size = 128

ds_train = ds_train.map(
    normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(batch_size)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.map(
    normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(batch_size)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)

定义模型的训练逻辑

接下来,定义模型的训练和评估逻辑。

从 TensorFlow 2.9 开始,您必须为启用 DTensor 的 Keras 模型编写自定义训练循环。这是为了用适当的布局信息打包输入数据,这些信息未与 Keras 中的标准 tf.keras.Model.fit()tf.keras.Model.eval() 函数集成。您将在即将发布的版本中获得更多 tf.data 支持。

@tf.function
def train_step(model, x, y, optimizer, metrics):
  with tf.GradientTape() as tape:
    logits = model(x, training=True)
    # tf.reduce_sum sums the batch sharded per-example loss to a replicated
    # global loss (scalar).
    loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
        y, logits, from_logits=True))

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  for metric in metrics.values():
    metric.update_state(y_true=y, y_pred=logits)

  loss_per_sample = loss / len(x)
  results = {'loss': loss_per_sample}
  return results
@tf.function
def eval_step(model, x, y, metrics):
  logits = model(x, training=False)
  loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
        y, logits, from_logits=True))

  for metric in metrics.values():
    metric.update_state(y_true=y, y_pred=logits)

  loss_per_sample = loss / len(x)
  results = {'eval_loss': loss_per_sample}
  return results
def pack_dtensor_inputs(images, labels, image_layout, label_layout):
  num_local_devices = image_layout.mesh.num_local_devices()
  images = tf.split(images, num_local_devices)
  labels = tf.split(labels, num_local_devices)
  images = dtensor.pack(images, image_layout)
  labels = dtensor.pack(labels, label_layout)
  return  images, labels

指标和优化器

将 DTensor API 与 Keras MetricOptimizer 一起使用时,您需要提供额外的网格信息,以便任何内部状态变量和张量都可以使用模型中的变量。

  • 对于优化器,DTensor 引入了一个新的实验性命名空间 keras.dtensor.experimental.optimizers,其中扩展了许多现有的 Keras 优化器以接收额外的 mesh 参数。在未来的版本中,它可能会与 Keras 核心优化器合并。

  • 对于指标,可以直接将 mesh 作为参数指定给构造函数,使其成为兼容 DTensor 的 Metric

optimizer = tf.keras.dtensor.experimental.optimizers.Adam(0.01, mesh=mesh)
metrics = {'accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}
eval_metrics = {'eval_accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}

训练模型

以下示例在批次维度上对来自输入流水线的数据进行分片,并使用具有完全复制权重的模型进行训练。

经过 3 个周期后,模型应当达到大约 97% 的准确率。

num_epochs = 3

image_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=4)
label_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)

for epoch in range(num_epochs):
  print("============================") 
  print("Epoch: ", epoch)
  for metric in metrics.values():
    metric.reset_state()
  step = 0
  results = {}
  pbar = tf.keras.utils.Progbar(target=None, stateful_metrics=[])
  for input in ds_train:
    images, labels = input[0], input[1]
    images, labels = pack_dtensor_inputs(
        images, labels, image_layout, label_layout)

    results.update(train_step(model, images, labels, optimizer, metrics))
    for metric_name, metric in metrics.items():
      results[metric_name] = metric.result()

    pbar.update(step, values=results.items(), finalize=False)
    step += 1
  pbar.update(step, values=results.items(), finalize=True)

  for metric in eval_metrics.values():
    metric.reset_state()
  for input in ds_test:
    images, labels = input[0], input[1]
    images, labels = pack_dtensor_inputs(
        images, labels, image_layout, label_layout)
    results.update(eval_step(model, images, labels, eval_metrics))

  for metric_name, metric in eval_metrics.items():
    results[metric_name] = metric.result()

  for metric_name, metric in results.items():
    print(f"{metric_name}: {metric.numpy()}")
============================
Epoch:  0
    469/Unknown - 7s 16ms/step - loss: 0.2907 - accuracy: 0.8308
    469/Unknown - 4s 9ms/step - loss: 0.1285 - accuracy: 0.9595
    469/Unknown - 4s 9ms/step - loss: 0.1010 - accuracy: 0.9682
loss: 0.044021397829055786
accuracy: 0.9682833552360535
eval_loss: 0.05413995310664177
eval_accuracy: 0.9656000137329102

为现有模型代码指定布局

通常,您的模型非常适合您的用例。为模型中的每个单独层指定 Layout 信息将是一项需要大量编辑的工作。

为了帮助您轻松地将现有 Keras 模型转换为使用 DTensor API,可以使用新的 dtensor.LayoutMap API,它允许您从全局角度指定 Layout

首先,您需要创建一个 LayoutMap 实例,它是一个类似字典的对象,其中包含您要为模型权重指定的所有 Layout

LayoutMap 在初始化时需要一个 Mesh 实例,该实例可用于为任何未配置布局的权重提供默认的复制 Layout。如果您希望完全复制所有模型权重,则可以提供空的 LayoutMap,默认网格将用于创建复制的 Layout

LayoutMap 使用字符串作为键,使用 Layout 作为值。普通的 Python 字典与此类之间存在行为差异。检索值时,字符串键将被视为正则表达式

子类化模型

考虑使用 Keras 子类化模型语法定义的以下模型。

class SubclassedModel(tf.keras.Model):

  def __init__(self, name=None):
    super().__init__(name=name)
    self.feature = tf.keras.layers.Dense(16)
    self.feature_2 = tf.keras.layers.Dense(24)
    self.dropout = tf.keras.layers.Dropout(0.1)

  def call(self, inputs, training=None):
    x = self.feature(inputs)
    x = self.dropout(x, training=training)
    return self.feature_2(x)

此模型中有 4 个权重,分别是两个 Dense 层的 kernelbias。它们中的每一个都基于对象路径进行映射:

  • model.feature.kernel
  • model.feature.bias
  • model.feature_2.kernel
  • model.feature_2.bias

注:对于子类化模型,特性名称而不是层的 .name 特性用作从映射中检索布局的键。这与 tf.Module 检查点遵循的约定一致。对于具有多个层的复杂模型,您可以手动检查检查点来查看特性映射。

现在,定义以下 LayoutMap 并将其应用于模型。

layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)

layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)

with layout_map.scope():
  subclassed_model = SubclassedModel()

模型权重是在第一次调用时创建的,因此使用 DTensor 输入调用模型并确认权重具有预期的布局。

dtensor_input = dtensor.copy_to_mesh(tf.zeros((16, 16)), layout=unsharded_layout_2d)
# Trigger the weights creation for subclass model
subclassed_model(dtensor_input)

print(subclassed_model.feature.kernel.layout)
Layout.from_string(sharding_specs:batch,unsharded, mesh:|batch=8|0,1,2,3,4,5,6,7|0,1,2,3,4,5,6,7|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5,/job:localhost/replica:0/task:0/device:CPU:6,/job:localhost/replica:0/task:0/device:CPU:7)

这样一来,您就可以快速将 Layout 映射到您的模型,而无需更新任何现有代码。

序贯模型和函数式模型

对于 Keras 序贯和函数式模型,您也可以使用 LayoutMap

注:对于序贯模型和函数式模型,映射略有不同。模型中的层没有附加到模型的公共特性(尽管可以通过 model.layers 作为列表访问它们)。在这种情况下,使用字符串名称作为键。字符串名称保证在模型中是唯一的。

layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)

layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
with layout_map.scope():
  inputs = tf.keras.Input((16,), batch_size=16)
  x = tf.keras.layers.Dense(16, name='feature')(inputs)
  x = tf.keras.layers.Dropout(0.1)(x)
  output = tf.keras.layers.Dense(32, name='feature_2')(x)
  model = tf.keras.Model(inputs, output)

print(model.layers[1].kernel.layout)
Layout.from_string(sharding_specs:batch,unsharded, mesh:|batch=8|0,1,2,3,4,5,6,7|0,1,2,3,4,5,6,7|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5,/job:localhost/replica:0/task:0/device:CPU:6,/job:localhost/replica:0/task:0/device:CPU:7)
with layout_map.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Dense(16, name='feature', input_shape=(16,)),
      tf.keras.layers.Dropout(0.1),
      tf.keras.layers.Dense(32, name='feature_2')
  ])

print(model.layers[2].kernel.layout)
Layout.from_string(sharding_specs:batch,unsharded, mesh:|batch=8|0,1,2,3,4,5,6,7|0,1,2,3,4,5,6,7|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5,/job:localhost/replica:0/task:0/device:CPU:6,/job:localhost/replica:0/task:0/device:CPU:7)