使用TensorFlow加载和预处理数据

| |

The tf.data API

python

>>> import tensorflow as tf
>>> X = tf.range(10)  # any data tensor
>>> dataset = tf.data.Dataset.from_tensor_slices(X)
>>> dataset
<TensorSliceDataset shapes: (), types: tf.int32>


#历遍
>>> for item in dataset:
...     print(item)
...
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
[...]
tf.Tensor(9, shape=(), dtype=int32)


# 数据集也可能包含张量的元组,或名称/张量对的字典,甚至嵌套的元组和张量字典。
# 当对元组、字典或嵌套结构进行切片时,数据集将只对其包含的张量进行切片,同时保留元组/字典结构。例如:
>>> X_nested = {"a": ([1, 2, 3], [4, 5, 6]), "b": [7, 8, 9]}
>>> dataset = tf.data.Dataset.from_tensor_slices(X_nested)
>>> for item in dataset:
...     print(item)
...
{'a': (<tf.Tensor: [...]=1>, <tf.Tensor: [...]=4>), 'b': <tf.Tensor: [...]=7>}
{'a': (<tf.Tensor: [...]=2>, <tf.Tensor: [...]=5>), 'b': <tf.Tensor: [...]=8>}
{'a': (<tf.Tensor: [...]=3>, <tf.Tensor: [...]=6>), 'b': <tf.Tensor: [...]=9>}

链式转换

python

>>> dataset = tf.data.Dataset.from_tensor_slices(tf.range(10))
>>> dataset = dataset.repeat(3).batch(7)
>>> for item in dataset:
...     print(item)
...
tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)

# 我们首先在原始数据集上调用repeat()方法,它返
# 回一个新数据集,该数据集将重复原始数据集的元素三次。当然这不会
# 将内存中的所有数据复制三遍(你如果不带任何参数调用此方法,则新
# 数据集将永远重复源数据集,因此遍历该数据集的代码必须决定何时停
# 止)。然后我们在此新数据集上调用batch()方法,并再次创建一个
# 新的数据集。这把先前数据集的元素以7个元素为一个批次分组。最后
# 我们遍历此最终数据集的元素。如你所见,batch()方法最后输出一
# 个大小为2而不是7的最终批次,但是如果你希望它删除最终批次,可以
# 使用drop_remainder=True调用它,使所有批次具有完全相同的大小。
# 数据集方法不会修改数据集,而是创建新数据集,因此请保留对这些新数据集的引用,使用dataset=...


# 你还可以通过调用map()方法来变换元素。例如,这将创建一个
# 新数据集,其中所有元素均是原来的两倍:
>>> dataset = dataset.map(lambda x: x * 2)  # x is a batch
>>> for item in dataset:
...     print(item)
...
tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int32)
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)
[...]
# 你可以调用此函数把你所需的任何预处理应用于你的数据。
# 请注意传递给map()方法的函数必须可转换为TF函数

# It is also possible to simply filter the dataset using the filter() method. 简单地过滤数据集
# For example, this code creates a dataset that only contains the batchs whose sum is greater than 50:
>>> dataset = dataset.filter(lambda x: tf.reduce_sum(x) > 50)
>>> for item in dataset:
...     print(item)
...
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int32)
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int32)

# 你通常会希望查看数据集中的一些元素,可以使用take()方法:
>>> for item in dataset.take(2):
...     print(item)
...
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int32)

乱序数据

当训练集中的实例相互独立且分布均匀时,梯度下降效果最好。

python

# 必须要指定buffer size使其足够大,否则乱序不会很有效,但是不能超过RAM数量,并且不要超过数据集大小
>>> dataset = tf.data.Dataset.range(10).repeat(2)
>>> dataset = dataset.shuffle(buffer_size=4, seed=42).batch(7)
>>> for item in dataset:
...     print(item)
...
tf.Tensor([3 0 1 6 2 5 7], shape=(7,), dtype=int64)
tf.Tensor([8 4 1 9 4 2 3], shape=(7,), dtype=int64)
tf.Tensor([7 5 0 8 9 6], shape=(6,), dtype=int64)

如果你在经过乱序的数据集上调用repeat(),则默认情况下,它在每次迭代时生成一个新次序。通常这是个好主意,但如果你希望在每次迭代中重用相同的顺序(例如用于测试或调试),则可以设置reshuffle_each_iteration=False。
其他方法:

交织来自多个文件的样本数据

python

# 默认情况下,list_files()函数返回一个乱序的文件路径的数据集。
# 通常这是一件好事,但是如果出于某种原因不希望这样做,则可以设置 `shuffle=False` 。
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)

# 一次读取5个文件并交织它们的行
n_readers = 5
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),         # 跳过每个文件的第一行,即标题行
    cycle_length=n_readers)

interleave() 创建一个数据集该数据集将从filepath_dataset中拉出5个文件路径,对于每个路径,它将调用你为其提供的函数(在此示例中为lambda)来创建新的数据集(在此示例中为TextLineDataset)。为了清楚起见,在此阶段总共有7个数据集:文件路径数据集、交织数据集和由交织数据集在内部创建的5个TextLineDataset。当我们遍历交织数据集时,它将循环遍历这5个TextLineDatasets,每次读取一行,直到所有数据集都读出为止。然后它将从filepath_dataset获取剩下的5个文件路径,并以相同的方式对它们进行交织,以此类推,直到读完文件路径。为了交织效果更好,最好使用具有相同长度的文件。否则最长文件的结尾将不会交织。

By default, interleave() does not use parallelism; it just reads one line at a time from each file, sequentially. If you want it to actually read files in parallel, you can set the interleave() method’s num_parallel_calls argument to the number of threads you want (recall that the map() method also has this argument). You can even set it to tf.data.AUTOTUNE to make TensorFlow choose the right number of threads dynamically based on the available CPU. Let’s look at what the dataset contains now:

python

>>> for line in dataset.take(5):
...     print(line)
...
tf.Tensor(b'4.5909,16.0,[...],33.63,-117.71,2.418', shape=(), dtype=string)
tf.Tensor(b'2.4792,24.0,[...],34.18,-118.38,2.0', shape=(), dtype=string)
tf.Tensor(b'4.2708,45.0,[...],37.48,-122.19,2.67', shape=(), dtype=string)
tf.Tensor(b'2.1856,41.0,[...],32.76,-117.12,1.205', shape=(), dtype=string)
tf.Tensor(b'4.1812,52.0,[...],33.73,-118.31,3.215', shape=(), dtype=string)

# 这些是随机选择的5个CSV文件的第一行(忽略标题行)

预处理数据

python

# X_mean和X_std是一维张量(或NumPy数组)
# This can be done using a Scikit-Learn StandardScaler on a large enough random sample of the dataset. 
X_mean, X_std = [...]  # mean and scale of each feature in the training set
n_inputs = 8

def parse_csv_line(line):
    defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    return tf.stack(fields[:-1]), tf.stack(fields[-1:])

def preprocess(line):
    x, y = parse_csv_line(line)
    return (x - X_mean) / X_std, y

The parse_csv_line() function takes one CSV line and parses it. To help with that, it uses the tf.io.decode_csv() function, which takes two arguments: the first is the line to parse, and the second is an array containing the default value for each column in the CSV file. This array (defs) tells TensorFlow not only the default value for each column, but also the number of columns and their types. In this example, we tell it that all feature columns are floats and that missing values should default to zero, but we provide an empty array of type tf.float32 as the default value for the last column (the target): the array tells TensorFlow that this column contains floats, but that there is no default value, so it will raise an exception if it encounters a missing value.
The tf.io.decode_csv() function returns a list of scalar tensors (one per column), but we need to return a 1D tensor array. So we call tf.stack() on all tensors except for the last one (the target): this will stack these tensors into a 1D array. We then do the same for the target value: this makes it a 1D tensor array with a single value, rather than a scalar tensor. The tf.io.decode_csv() function is done, so it returns the input features and the target.
Finally, the custom preprocess() function just calls the parse_csv_line() function, scales the input features by subtracting the feature means and then dividing by the feature standard deviations, and returns a tuple containing the scaled features and the target.
让我们测试一下预处理函数:

python

>>> preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')
(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([ 0.16579159,  1.216324  , -0.05204564, -0.39215982, -0.5277444 ,
        -0.2633488 ,  0.8543046 , -1.3072058 ], dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([2.782], dtype=float32)>)

The preprocess() function can convert an instance from a byte string to a nice scaled tensor, with its corresponding label. We can now use the dataset’s map() method to apply the preprocess() function to each sample in the dataset.

汇总

为了使代码可重用,我们将到目前为止讨论的所有内容放到一个小的辅助函数中:它将创建并返回一个数据集,该数据集有效地从多个CSV文件中加载加州住房数据,对其进行预处理、随机乱序,可以选择重复,并进行批处理。

python

def csv_reader_dataset(filepaths, n_readers=5, n_read_threads=None,
                       n_parse_threads=5, shuffle_buffer_size=10_000, seed=42,
                       batch_size=32):
    dataset = tf.data.Dataset.list_files(filepaths, seed=seed)
    dataset = dataset.interleave(
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
        cycle_length=n_readers, num_parallel_calls=n_read_threads)
    dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
    dataset = dataset.shuffle(shuffle_buffer_size, seed=seed)
    return dataset.batch(batch_size).prefetch(1)

Note that we use the prefetch() method on the very last line. This is important for performance, as you will see now.

mls3_1302.pngmls3_1302.png

预取

预取,Prefetching,训练算法正处理一个批次时,数据集已经并行工作以准备下一批次了,如果在CPU上利用多个内核,希望准备一个批次数据的时间比在GPU上执行一个训练步骤的时间要短一些:这样,GPU将达到几乎100%的利用率(从CPU到GPU的数据传输时间除外)。
如果你打算购买GPU卡,那么它的处理能力和内存大小非常重要(特别是大量的RAM对于计算机视觉至关重要)。要获得良好性能同样很重要的是其内存带宽,它代表每秒可以进出其RAM的GB数据大小。
如果数据集足够小,可以放到内存里,则可以使用数据集的cache()方法将其内容缓存到RAM中,从而显著加快训练速度。通常应该在加载和预处理数据之后,但在乱序、重复、批处理和预取之前执行此操作。这样,每个实例仅被读取和预处理一次(而不是每个轮次一次),但数据仍会在每个轮次进行不同的乱序,并且仍会提前准备下一批次。

You have now learned how to build efficient input pipelines to load and preprocess data from multiple text files. We have discussed the most common dataset methods, but there are a few more you may want to look at, such as concatenate(), zip(), window(), reduce(), shard(), flat_map(), apply(), unbatch(), and padded_batch(). There are also a few more class methods, such as from_generator() and from_​ten⁠sors(), which create a new dataset from a Python generator or a list of tensors, respectively. Please check the API documentation for more details. Also note that there are experimental features available in tf.data.experimental, many of which will likely make it to the core API in future releases (e.g., check out the CsvDataset class, as well as the make_csv_dataset() method, which takes care of inferring the type of each column).

Using the Dataset with Keras

Now we can use the custom csv_reader_dataset() function we wrote earlier to create a dataset for the training set, and for the validation set and the test set. The training set will be shuffled at each epoch (note that the validation set and the test set will also be shuffled, even though we don’t really need that):

python

train_set = csv_reader_dataset(train_filepaths)
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_filepaths)

# 下面可以直接将数据集直接传递给fit(),不需要其他操作
# fit() 方法会自行处理epoch和随机种子等
model = tf.keras.Sequential([...])
model.compile(loss="mse", optimizer="sgd")
model.fit(train_set, validation_data=valid_set, epochs=5)

# 类似地,传给 evaluate() 和predict()
test_mse = model.evaluate(test_set)
new_set = test_set.take(3)  # pretend we have 3 new samples,new_set通常不包含标签
y_pred = model.predict(new_set)  # or you could just pass a NumPy array


# 自定义训练循环
n_epochs = 5
for epoch in range(n_epochs):
    for X_batch, y_batch in train_set:
        [...]  # perform one gradient descent step


# 创建执行整个训练循环的TF函数,可以加速
@tf.function
def train_one_epoch(model, optimizer, loss_fn, train_set):
    for X_batch, y_batch in train_set:
        with tf.GradientTape() as tape:
            y_pred = model(X_batch)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
loss_fn = tf.keras.losses.mean_squared_error
for epoch in range(n_epochs):
    print("\rEpoch {}/{}".format(epoch + 1, n_epochs), end="")
    train_one_epoch(model, optimizer, loss_fn, train_set)

In Keras, the steps_per_execution argument of the compile() method lets you define the number of batches that the fit() method will process during each call to the tf.function it uses for training. The default is just 1, so if you set it to 50 you will often see a significant performance improvement. However, the on_batch_*() methods of Keras callbacks will only be called every 50 batches.

TFRecord 格式

这是一种非常简单的二进制格式,只包含大小不同的二进制记录序列(每个记录由一个长度、一个用于检查长度是否损坏的CRC校验和、实际数据以及最后一个CRC校验和组成)。

python

# 创建TFRecord文件
with tf.io.TFRecordWriter("my_data.tfrecord") as f:
    f.write(b"This is the first record")
    f.write(b"And this is the second record")

# 读取
filepaths = ["my_data.tfrecord"]
dataset = tf.data.TFRecordDataset(filepaths)
for item in dataset:
    print(item)
# 输出内容
# tf.Tensor(b'This is the first record', shape=(), dtype=string)
# tf.Tensor(b'And this is the second record', shape=(), dtype=string)

默认情况下,TFRecordDataset将一个接一个地读取文件,但是你可以通过设置num_parallel_reads使其并行读取多个文件并交织记录。另外,你可以使用list_files()和interleave()得到与前面读取多个CSV文件相同的结果。

压缩的TFRecord文件

有时压缩TFRecord文件可能很有用,尤其是在需要通过网络连接加载它们的情况下。你可以通过设置options参数来创建压缩的TFRecord文件:

python

options = tf.io.TFRecordOptions(compression_type="GZIP")
with tf.io.TFRecordWriter("my_compressed.tfrecord", options) as f:
    f.write(b"Compress, compress, compress!")

# 读取压缩的TFRecord文件时需要指定压缩类型
dataset = tf.data.TFRecordDataset(["my_compressed.tfrecord"], compression_type="GZIP")

协议缓冲区简介

Even though each record can use any binary format you want, TFRecord files usually contain serialized protocol buffers (also called protobufs). This is a portable, extensible, and efficient binary format developed at Google back in 2001 and made open source in 2008; protobufs are now widely used, in particular in gRPC, Google’s remote procedure call system. They are defined using a simple language that looks like this:

python

syntax = "proto3";
message Person {
    string name = 1;
    int32 id = 2;
    repeated string email = 3;
}

此定义表示我们使用的是protobuf格式的版本3,它指定每个Person对象具有string类型的name,类型int32的id和零个或多个email字段(每个都是string类型),数字1、2和3是字段标识符:它们用于每个记录的二进制表示形式。

python

>>> from person_pb2 import Person  # import the generated access class
>>> person = Person(name="Al", id=123, email=["a@b.com"])  # create a Person
>>> print(person)  # display the Person
name: "Al"
id: 123
email: "a@b.com"
>>> person.name  # read a field
'Al'
>>> person.name = "Alice"  # modify a field
>>> person.email[0]  # repeated fields can be accessed like arrays
'a@b.com'
>>> person.email.append("c@d.com")  # add an email address
>>> serialized = person.SerializeToString()  # serialize person to a byte string
>>> serialized
b'\n\x05Alice\x10{\x1a\x07a@b.com\x1a\x07c@d.com'
>>> person2 = Person()  # create a new Person
>>> person2.ParseFromString(serialized)  # parse the byte string (27 bytes long)
27
>>> person == person2  # now they are equal
True

SerializeToString()和ParseFromString()不是TensorFlow操作。
这是准备通过网络保存或传输的二进制数据。在读取或接收此二进制数据时,我们可以使用ParseFromString()方法对其进行解析,然后得到被序列化的对象的副本。

TensorFlow协议

python

syntax = "proto3";
message BytesList { repeated bytes value = 1; }
message FloatList { repeated float value = 1 [packed = true]; }
message Int64List { repeated int64 value = 1 [packed = true]; }
message Feature {
    oneof kind {
        BytesList bytes_list = 1;
        FloatList float_list = 2;
        Int64List int64_list = 3;
    }
};
message Features { map<string, Feature> feature = 1; };
message Example { Features features = 1; };

[packed=true]用于重复的数字字段以实现更有效的编码。

Here is how you could create a tf.train.Example representing the same person as earlier:

python

from tensorflow.train import BytesList, FloatList, Int64List
from tensorflow.train import Feature, Features, Example

person_example = Example(
    features=Features(
        feature={
            "name": Feature(bytes_list=BytesList(value=[b"Alice"])),
            "id": Feature(int64_list=Int64List(value=[123])),
            "emails": Feature(bytes_list=BytesList(value=[b"a@b.com",
                                                          b"c@d.com"]))
        }))

# 有了Example protobuf,
# 我们可以通过调用其SerializeToString()方法对其进行序列化,
# 然后将结果数据写入TFRecord文件:
with tf.io.TFRecordWriter("my_contacts.tfrecord") as f:
    for _ in range(5):
        f.write(person_example.SerializeToString())

通常你会编写不止一个Example!你会创建一个转换脚本,该脚本从你当前的格式(例如CSV文件)中读取,为每个实例创建一个Example protobuf,对其进行序列化,然后将其保存到多个TFRecord文件中,最好在处理过程中对其进行乱序。这需要一些工作量,因此请再次确保确实有必要(也许你的流水线可以使用CSV文件正常工作)。现在我们有了一个不错的TFRecord文件,其中包含序列化的Example,让我们尝试加载它。

加载和解析Example

python

feature_description = {
    "name": tf.io.FixedLenFeature([], tf.string, default_value=""),
    "id": tf.io.FixedLenFeature([], tf.int64, default_value=0),
    "emails": tf.io.VarLenFeature(tf.string),
}

def parse(serialized_example):
    return tf.io.parse_single_example(serialized_example, feature_description)

dataset = tf.data.TFRecordDataset(["my_contacts.tfrecord"]).map(parse)
for parsed_example in dataset:
    print(parsed_example)


# The fixed-length features are parsed as regular tensors, but the variable-length features are parsed as sparse tensors.
#  You can convert a sparse tensor to a dense tensor using tf.sparse.to_dense(), 
# but in this case it is simpler to just access its values:

>>> tf.sparse.to_dense(parsed_example["emails"], default_value=b"")
<tf.Tensor: [...] dtype=string, numpy=array([b'a@b.com', b'c@d.com'], [...])>
>>> parsed_example["emails"].values
<tf.Tensor: [...] dtype=string, numpy=array([b'a@b.com', b'c@d.com'], [...])>


# Instead of parsing examples one by one using tf.io.parse_single_example(),
#  you may want to parse them batch by batch using tf.io.parse_example():

def parse(serialized_examples):
    return tf.io.parse_example(serialized_examples, feature_description)

dataset = tf.data.TFRecordDataset(["my_contacts.tfrecord"]).batch(2).map(parse)
for parsed_examples in dataset:
    print(parsed_examples)  # two examples at a time


最后,BytesList可以包含您想要的任何二进制数据,包括任何序列化的对象。例如,您可以使用tf.io.encode_jpeg()使用jpeg格式对图像进行编码,并将此二进制数据放入BytesList中。稍后,当您的代码读取TFRecord时,它将从解析Example开始,然后它需要调用tf.io.decode_jpeg()来解析数据并获得原始图像(或者您可以使用tf.io.decode_image(),它可以解码任何BMP、GIF、jpeg或PNG图像)。您还可以将任何需要的张量存储在ByteList中,方法是使用tf.io.serialize_sensor()序列化张量,然后将生成的字节字符串放入ByteList功能中。稍后,当您解析TFRecord时,您可以使用tf.io.parse_tensor()解析这些数据https://homl.info/colab3例如在TFRecord文件中存储图像和张量。
正如您所看到的,Example protobuf非常灵活,因此它可能足以用于大多数用例。然而,在处理列表列表时,使用它可能会有点麻烦。例如,假设您要对文本文档进行分类。每个文档可以表示为句子列表,其中每个句子表示为单词列表。也许每个文档也有一个注释列表,其中每个注释都表示为一个单词列表。可能还有一些上下文数据,例如文档的作者、标题和发布日期。TensorFlow的SequenceExample protobuf就是为此类用例而设计的。

使用SequenceExample Protobuf处理列表的列表

以下是SequenceExample protobuf的定义:

plaintext

message FeatureList { repeated Feature feature = 1; };
message FeatureLists { map<string, FeatureList> feature_list = 1; };
message SequenceExample {
    Features context = 1;
    FeatureLists feature_lists = 2;
};

SequenceExample包含上下文数据的Features对象和包含一个或多个命名FeatureList对象的FeatureLists对象(例如,名为“content”的FeatureList和另一个名为“comments”的FeatureList)。每个FeatureList都包含一个Feature对象列表,每个Feature对象可以是一个字节字符串列表、一个64位整数列表或一个浮点列表(在本例中,每个Features将代表一个句子或注释,可能是单词标识符列表的形式)。构建SequenceExample、序列化并解析它类似于构建、序列化和解析Example,但必须使用tf.io.parse_single_sequence_Example()解析单个SequenceExample,或使用tf.io.parse_sequence_sample()来解析批。这两个函数都返回一个包含上下文特征(作为字典)和特征列表(也作为字典)的元组。

既然你知道如何有效地存储、加载和解析数据,下一步就是准备数据,以便可以将其馈送到神经网络。

Keras预处理层

方法有很多,这里介绍在模型中包含预处理层的做法。

The Normalization Layer

正如我们在第10章中看到的,Keras提供了一个标准化层,我们可以使用它来标准化输入特性。我们可以在创建层时指定每个特征的均值和方差,或者更简单地说,在拟合模型之前,将训练集传递给层的adapt()方法,这样层就可以在训练之前自行测量特征均值和方差:

python

norm_layer = tf.keras.layers.Normalization()
model = tf.keras.models.Sequential([
    norm_layer,
    tf.keras.layers.Dense(1)
])
model.compile(loss="mse", optimizer=tf.keras.optimizers.SGD(learning_rate=2e-3))
norm_layer.adapt(X_train)  # computes the mean and variance of every feature
model.fit(X_train, y_train, validation_data=(X_valid, y_valid), epochs=5)

传递给adapt()方法的数据样本必须足够大,才能代表您的数据集,但它不一定是完整的训练集:对于归一化层,从训练集中随机采样的几百个实例通常足以很好地估计特征均值和方差。
由于有了这个正则化层,我们就不用担心在部署的时候需要再次正则化了。

mls3_1304mls3_1304
Including the preprocessing layer directly in the model is nice and straightforward, but it will slow down training (only very slightly in the case of the Normalization layer): indeed, since preprocessing is performed on the fly during training, it happens once per epoch. We can do better by normalizing the whole training set just once before training. To do this, we can use the Normalization layer in a standalone fashion (much like a Scikit-Learn StandardScaler):

python

norm_layer = tf.keras.layers.Normalization()
norm_layer.adapt(X_train)
X_train_scaled = norm_layer(X_train)
X_valid_scaled = norm_layer(X_valid)

Now we can train a model on the scaled data, this time without a Normalization layer:

python

model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
model.compile(loss="mse", optimizer=tf.keras.optimizers.SGD(learning_rate=2e-3))
model.fit(X_train_scaled, y_train, epochs=5, validation_data=(X_valid_scaled, y_valid))

好的这应该会加快训练速度。但现在,当我们将模型部署到生产中时,它不会对其输入进行预处理。要解决这个问题,我们只需要创建一个新的模型,该模型同时封装经过调整的归一化层和我们刚刚训练的模型。然后,我们可以将这个最终模型部署到生产中,它将负责预处理输入和进行预测

python

final_model = tf.keras.Sequential([norm_layer, model])
X_new = X_test[:3]  # pretend we have a few new instances (unscaled)
y_pred = final_model(X_new)  # preprocesses the data and makes predictions

mls3_1305mls3_1305
现在我们两全其美:训练很快,因为我们在训练开始前只对数据进行一次预处理,最终模型可以动态预处理其输入,而不会有任何预处理不匹配的风险。 此外,Keras预处理层与tf.data API配合得很好。例如,可以将tf.data.Dataset传递给预处理层的adapt()方法。也可以使用数据集的map()方法将Keras预处理层应用于tf.data.Dataset。例如,以下是如何将调整后的归一化层应用于数据集中每个批次的输入特征:

python

dataset = dataset.map(lambda X, y: (norm_layer(X), y))

最后,如果您需要比Keras预处理层更多的功能,您可以随时编写自己的Keras层。例如,如果规范化层不存在,则可以使用以下自定义层获得类似的结果:

python

import numpy as np

class MyNormalization(tf.keras.layers.Layer):
    def adapt(self, X):
        self.mean_ = np.mean(X, axis=0, keepdims=True)
        self.std_ = np.std(X, axis=0, keepdims=True)

    def call(self, inputs):
        eps = tf.keras.backend.epsilon()  # a small smoothing term
        return (inputs - self.mean_) / (self.std_ + eps)

离散化层

离散化层的目标是通过将值范围(称为bin)映射到类别,将数字特征转换为分类特征。这有时对具有多模式分布的特征或与目标具有高度非线性关系的特征有用。例如,以下代码将数字年龄特征映射到三个类别,即小于18、18到50(不包括)和50或以上:

python

>>> age = tf.constant([[10.], [93.], [57.], [18.], [37.], [5.]])
>>> discretize_layer = tf.keras.layers.Discretization(bin_boundaries=[18., 50.])
>>> age_categories = discretize_layer(age)
>>> age_categories
<tf.Tensor: shape=(6, 1), dtype=int64, numpy=array([[0],[2],[2],[1],[1],[0]])>

在本例中,我们提供了所需的bin边界。如果你愿意,你可以提供你想要的bin数量,然后调用层的adapt()方法,让它根据值的百分位数找到合适的bin边界。例如,如果我们将num_bins设置为3,则bin边界将位于第33个和第66个百分位数以下的值处(在本例中,位于值10和37处):

python

>>> discretize_layer = tf.keras.layers.Discretization(num_bins=3)
>>> discretize_layer.adapt(age)
>>> age_categories = discretize_layer(age)
>>> age_categories
<tf.Tensor: shape=(6, 1), dtype=int64, numpy=array([[1],[2],[2],[1],[2],[0]])>

此类类别标识符通常不应直接传递给神经网络,因为它们的值无法进行有意义的比较。相反,应该对它们进行编码,例如使用独热编码,one-hot encoding。

The CategoryEncoding Layer

当类别数量比较小(less than a dozen or two)时,独热编码效果会比较好。
To do this, Keras provides the CategoryEncoding layer. For example, let’s one-hot encode the age_​cate⁠gories feature we just created:

python

>>> onehot_layer = tf.keras.layers.CategoryEncoding(num_tokens=3)
>>> onehot_layer(age_categories)
<tf.Tensor: shape=(6, 3), dtype=float32, numpy=
array([[0., 1., 0.],
       [0., 0., 1.],
       [0., 0., 1.],
       [0., 1., 0.],
       [0., 0., 1.],
       [1., 0., 0.]], dtype=float32)>

If you try to encode more than one categorical feature at a time (which only makes sense if they all use the same categories), the CategoryEncoding class will perform multi-hot encoding by default: the output tensor will contain a 1 for each category present in any input feature. For example:

plaintext

>>> two_age_categories = np.array([[1, 0], [2, 2], [2, 0]])
>>> onehot_layer(two_age_categories)
<tf.Tensor: shape=(3, 3), dtype=float32, numpy=
array([[1., 1., 0.],
       [0., 0., 1.],
       [1., 0., 1.]], dtype=float32)>

If you believe it’s useful to know how many times each category occurred, you can set output_mode="count" when creating the CategoryEncoding layer, in which case the output tensor will contain the number of occurrences of each category. In the preceding example, the output would be the same except for the second row, which would become [0., 0., 2.].

Note that both multi-hot encoding and count encoding lose information, since it’s not possible to know which feature each active category came from. For example, both [0, 1] and [1, 0] are encoded as [1., 1., 0.]. If you want to avoid this, then you need to one-hot encode each feature separately and concatenate the outputs. This way, [0, 1] would get encoded as [1., 0., 0., 0., 1., 0.] and [1, 0] would get encoded as [0., 1., 0., 1., 0., 0.]. You can get the same result by tweaking the category identifiers so they don’t overlap. For example:

python

>>> onehot_layer = tf.keras.layers.CategoryEncoding(num_tokens=3 + 3)
>>> onehot_layer(two_age_categories + [0, 3])  # adds 3 to the second feature
<tf.Tensor: shape=(3, 6), dtype=float32, numpy=
array([[0., 1., 0., 1., 0., 0.],
       [0., 0., 1., 0., 0., 1.],
       [0., 0., 1., 1., 0., 0.]], dtype=float32)>

在该输出中,前三列对应于第一个特征,后三列对应第二个特征。这使模型能够区分这两个特征。然而,它也增加了提供给模型的特征的数量,从而需要更多的模型参数。很难提前知道单个multi-hot encoding还是每个功能分别one-hot encoding效果最好:这取决于任务,您可能需要测试这两种选项。

分类文本特征呢?可以使用StringLookup层。

The StringLookup Layer

python

>>> cities = ["Auckland", "Paris", "Paris", "San Francisco"]
>>> str_lookup_layer = tf.keras.layers.StringLookup()
>>> str_lookup_layer.adapt(cities)
>>> str_lookup_layer([["Paris"], ["Auckland"], ["Auckland"], ["Montreal"]])
<tf.Tensor: shape=(4, 1), dtype=int64, numpy=array([[1], [3], [3], [0]])>

Conveniently, if you set output_mode="one_hot" when creating the StringLookup layer, it will output a one-hot vector for each category, instead of an integer:

python

>>> str_lookup_layer = tf.keras.layers.StringLookup(output_mode="one_hot")
>>> str_lookup_layer.adapt(cities)
>>> str_lookup_layer([["Paris"], ["Auckland"], ["Auckland"], ["Montreal"]])
<tf.Tensor: shape=(4, 4), dtype=float32, numpy=
array([[0., 1., 0., 0.],
       [0., 0., 0., 1.],
       [0., 0., 0., 1.],
       [1., 0., 0., 0.]], dtype=float32)>

note: Keras also includes an IntegerLookup layer that acts much like the StringLookup layer but takes integers as input, rather than strings.
如果训练集非常大,则可以方便地将层适配为训练集的随机子集。在这种情况下,层的adapt()方法可能会错过一些罕见的类别。默认情况下,它会将它们全部映射到类别0,使它们无法被模型区分。为了降低这种风险(同时仍然仅在训练集的子集上调整层),可以将num_oov_indices设置为大于1的整数。这是要使用的词汇表外(OOV)桶的数量:每个未知类别将使用哈希函数模化OOV桶的数量,伪随机映射到其中一个OOV桶。这将使模型能够区分至少一些罕见的类别。例如:

python

>>> str_lookup_layer = tf.keras.layers.StringLookup(num_oov_indices=5)
>>> str_lookup_layer.adapt(cities)
>>> str_lookup_layer([["Paris"], ["Auckland"], ["Foo"], ["Bar"], ["Baz"]])
<tf.Tensor: shape=(4, 1), dtype=int64, numpy=array([[5], [7], [4], [3], [4]])>

由于有五个OOV桶,第一个已知类别的ID现在是5(“Paris”)。但是“Foo”、“Bar”和“Baz”是未知的,所以它们各自被映射到OOV桶中的一个。“Bar”有自己的专用bucket(ID为3),但遗憾的是,“Foo”和“Baz”恰好映射到同一个bucket(ID4),因此它们在模型中仍然无法区分。这被称为哈希冲突。减少碰撞风险的唯一方法是增加OOV存储桶的数量。然而,这也将增加类别的总数,一旦对类别进行了热编码,就需要更多的RAM和额外的模型参数。所以,不要把这个数字增加太多。 这种将类别伪随机映射到桶的想法被称为哈希技巧。Keras提供了一个专门的层:Hashing层。

The Hashing Layer

For each category, the Keras Hashing layer computes a hash, modulo the number of buckets (or “bins”).
映射完全是伪随机的,但在运行和平台之间是稳定的(即,只要bin的数量不变,相同的类别将始终映射到相同的整数)。例如,让我们使用哈希层对几个城市进行编码:

python

>>> hashing_layer = tf.keras.layers.Hashing(num_bins=10)
>>> hashing_layer([["Paris"], ["Tokyo"], ["Auckland"], ["Montreal"]])
<tf.Tensor: shape=(4, 1), dtype=int64, numpy=array([[0], [1], [9], [1]])>

这个层的好处是它根本不需要调整,这有时可能很有用,尤其是在核心之外的环境中(当数据集太大而无法放入内存时)。然而,我们再次遇到了哈希冲突:“东京”和“蒙特利尔”被映射到同一个ID,使它们无法通过模型进行区分。因此,通常最好坚持使用StringLookup层。

现在让我们看看另一种编码类别的方法:嵌入编码。

使用嵌入编码分类特征

Encoding Categorical Features Using Embeddings,使用嵌入编码分类特征。
嵌入是一些高维数据的密集表示,例如词汇表中的类别或单词。如果有50000个可能的类别,那么独热编码将产生50000维稀疏向量(即,大部分包含零)。相反,嵌入将是一个相对较小的密集向量;例如只有100个维度。例如,"NEAR BAY"类别最初可以由诸如[0.131,0.890]的随机向量表示,而"NEAR OCEAN"类别可以由[0.631,0.791]表示。在此示例中,我们使用2D嵌入,但是维度是可以调整的超参数。在深度学习中,嵌入通常是随机初始化的,然后通过梯度下降以及其他模型参数对其进行训练。由于这些嵌入是可训练的,因此它们在训练过程中会逐步改善。由于它们代表的类别相当相似,“梯度下降”肯定最终会把它们推到接近的位置,而把它们推离"INLAND"类别的嵌入。实际上,表征越好,神经网络就越容易做出准确的预测,因此训练使嵌入成为类别的有用表征。这称为表征学习

关于文本处理的部分,请移步此篇文档

Using Pretrained Language Model Components

TensorFlow Hub库使您可以轻松地在自己的模型中重用经过预训练的模型组件,用于文本、图像、音频等。这些模型组件称为模块。只需浏览TFHub存储库,找到您需要的存储库,并将代码示例复制到您的项目中,模块就会自动下载并绑定到Keras层中,您可以直接将其包含在模型中。模块通常包含预处理代码和预训练的权重,它们通常不需要额外的训练(当然,模型的其余部分肯定需要训练)。
例如,一些强大的预训练语言模型是可用的。最强大的是相当大的(几GB),所以作为一个快速的例子,让我们使用nnlm-en-dim50模块版本2,这是一个相当基本的模块,它以原始文本作为输入并输出50维句子嵌入。我们将导入TensorFlow Hub并使用它来加载模块,然后使用该模块将两句编码为向量

>>> import tensorflow_hub as hub
>>> hub_layer = hub.KerasLayer("https://tfhub.dev/google/nnlm-en-dim50/2")
>>> sentence_embeddings = hub_layer(tf.constant(["To be", "Not to be"]))
>>> sentence_embeddings.numpy().round(2)
array([[-0.25,  0.28,  0.01,  0.1 ,  [...] ,  0.05,  0.31],
    [-0.2 ,  0.2 , -0.08,  0.02,  [...] , -0.04,  0.15]], dtype=float32)

这个特定的模块是一个句子编码器:它将字符串作为输入,并将每个字符串编码为单个向量(在本例中,为50维向量)。在内部,它解析字符串(在空间上拆分单词),并使用在一个巨大的语料库上预训练的嵌入矩阵嵌入每个单词:谷歌新闻7B语料库(70亿单词长!)。然后它计算所有单词嵌入的平均值,结果就是句子嵌入

注意:
You just need to include this hub_layer in your model, and you’re ready to go. Note that this particular language model was trained on the English language, but many other languages are available, as well as multilingual models.

Last but not least, the excellent open source Transformers library by Hugging Face also makes it easy to include powerful language model components inside your own models. You can browse the Hugging Face Hub, choose the model you want, and use the provided code examples to get started. It used to contain only language models, but it has now expanded to include image models and more.

Image Preprocessing Layers

Keras预处理API包括三个图像预处理层:
tf.keras.layers.Resizing 将输入图像的大小调整为所需的大小。例如,“调整大小”(高度=100,宽度=200)会将每个图像的大小调整为100×200,这可能会使图像失真。如果将crop_to_aspect_ratio设置为True,则图像将裁剪为目标图像比例,以避免失真。
tf.keras.layers.Rescaling 会重新缩放像素值。例如,Rescal⁠ing​(比例=2/255,偏移量=-1)从0开始缩放值→ 255到-1→ 1.
tf.keras.layers.CenterCrop 裁剪图像,只保留所需高度和宽度的中心面片。

例如,让我们加载几个示例图像并对它们进行中心裁剪。为此,我们将使用Scikit Learn的load_sample_images()函数;这加载了两个彩色图像,一个是中国寺庙,另一个是花朵(这需要Pillow库,如果您正在使用Colab或按照安装说明安装,该库应该已经安装好了):

python

from sklearn.datasets import load_sample_images

images = load_sample_images()["images"]
crop_image_layer = tf.keras.layers.CenterCrop(height=100, width=100)
cropped_images = crop_image_layer(images)

Keras还包括用于数据增强的几个层,如RandomCrop、RandomFlip、RandomTranslation、Random Rotation、RandomZoom、RandomHeight、RandomWidth和RandomContrast。这些层仅在训练期间是活动的,并且它们随机地对输入图像应用一些变换(它们的名称不言自明)。数据增强会人为地增加训练集的大小,这通常会提高性能,只要变换后的图像看起来像真实的(未增强的)图像。

TensorFlow数据集项目

TensorFlow数据集(TFDS)项目 使加载常见数据集变得非常容易,从MNIST或Fashion MNIST等小型数据集到ImageNet等大型数据集(您将需要相当大的磁盘空间!)。该列表包括图像数据集、文本数据集(包括翻译数据集)、音频和视频数据集、时间序列等等。您可以访问https://homl.info/tfds查看完整列表,以及每个数据集的描述。您还可以查看Know Your Data ,这是一个探索和理解TFDS提供的许多数据集的工具。
TFDS未与TensorFlow捆绑在一起,但如果您在Colab上运行,或者按照https://homl.info/install,则它已安装。然后,您可以导入tensorflow_dataset,通常为tfds,然后调用tfds.load()函数,该函数将下载您想要的数据(除非之前已经下载),并将数据作为数据集词典返回(通常一个用于训练,一个用于测试,但这取决于您选择的数据集)。例如,让我们下载MNIST:

python

import tensorflow_datasets as tfds

datasets = tfds.load(name="mnist")
mnist_train, mnist_test = datasets["train"], datasets["test"]

# 然后,你可以应用所需的任何转换(通常是乱序、批处理和预
# 取),并且准备好训练你的模型。以下是一个简单的示例:
for batch in mnist_train.shuffle(10_000, seed=42).batch(32).prefetch(1):
    images = batch["image"]
    labels = batch["label"]
    # [...] do something with the images and labels

load()函数可以打乱它下载的文件:只需 shuffle_files=True 。然而,这可能还不够,所以最好对训练数据进行更多的洗牌。

请注意,数据集中的每个项都是一个包含特征和标签的字典。但Keras希望每个项都是一个包含两个元素(同样是特性和标签)的元组。您可以使用map()方法转换数据集,如下所示:

python

mnist_train = mnist_train.shuffle(buffer_size=10_000, seed=42).batch(32)
mnist_train = mnist_train.map(lambda items: (items["image"], items["label"]))
mnist_train = mnist_train.prefetch(1)
# 但是通过设置as_supervised=True来使load()函数执行此操作
# 会更简单(显然这仅适用于带标签的数据集)

最后,TFDS提供了一种使用split参数拆分数据的方便方法。例如,如果要将训练集的前90%用于训练,其余10%用于验证,并将整个测试集用于测试,则可以

split=["train[:90%]", "train[90%:]", "test"]

load()函数将返回所有三个集合。以下是一个完整的示例,使用TFDS加载和拆分MNIST数据集,然后使用这些集来训练和评估一个简单的Keras模型

python

train_set, valid_set, test_set = tfds.load(
    name="mnist",
    split=["train[:90%]", "train[90%:]", "test"],
    as_supervised=True
)
train_set = train_set.shuffle(buffer_size=10_000, seed=42).batch(32).prefetch(1)
valid_set = valid_set.batch(32).cache()
test_set = test_set.batch(32).cache()
tf.random.set_seed(42)
model = tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape=(28, 28)),
    tf.keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam",
              metrics=["accuracy"])
history = model.fit(train_set, validation_data=valid_set, epochs=5)
test_loss, test_accuracy = model.evaluate(test_set)

OK!