至于为什么写这个教程,首先是为了自己学习做个记录,其次是因为Tensorflow的API写的很好,但是他的教程写的太乱了,不适合新手学习。tensorflow 1 和tensorflow 2 有相似之处但是不兼容,tensorflow 2将keras融合了。TensorFlow? 是一个采用 数据流图(data flow graphs),用于数值计算的开源软件库。图中得节点(Nodes)表示数学操作,图中的线(edges)则表示在节点间相互联系的多维数据数组,即张量(tensor)。它灵活的架构让你可以在多种平台上展开计算,例如台式计算机中的一个或多个CPU(或GPU),服务器,移动设备等等。
TensorFlow的主要优点:
TensorFlow的层次结构从低到高可以分成如下五层:硬件层,内核层,低阶API,中阶API,高阶API。

tf.constant(value, dtype=tf.float32) # 常数tf.range(start, limit=None, delta=1) # 生成一个范围内间隔为delta的 张量tf.linspace(start, stop, num) # 在一个间隔内生成均匀间隔的值tf.zeros() # 创建全0张量tf.ones() # 创建全1张量tf.zeros_like(input) # 创建和input一样大小的张量tf.fill(dims, value) # 创建shape为dim,全为value的张量tf.random.uniform([5], minval=0, maxval=10) # 均匀分布随机tf.random.normal([3, 3], mean=0.0, stddev=1.0) # 正态分布随机tf.Variable(initial_value) # 变量
tf.Variable:
tf.rank(a):求矩阵的秩
变量的设备位置
为了提高性能,TensorFlow 会尝试将张量和变量放在与其 dtype 兼容的最快设备上。这意味着如果有 GPU,那么大部分变量都会放置在 GPU 上,不过,我们可以重写变量的位置。
with tf.device('CPU:0'): # Create some tensors a = tf.Variable([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]) b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]) c = tf.matmul(a, b)print(c)
使用assign重新分配张量
a.assign([5, 6]) # a = [5, 6]a.assign_add([2, 3]) # a = a+[2,3]a.assign_sub([7, 9]) # a=a-[7,9]
维度变换
维度变换相关函数主要有 tf.reshape,tf.squeeze,tf.expand_dims,tf.transpose。
tf.reshape可以改变张量的形状,但是其本质上不会改变张量元素的存储顺序,所以,该操作实际上非常迅速,并且是可逆的。
合并分隔
和numpy类似,可以用tf.concat和tf.stack方法对多个张量进行合并,可以用tf.split方法把一个张量分割成多个张量。
tf.concat和tf.stack有略微的区别,tf.concat是连接,不会增加维度,而tf.stack是堆叠,会增加维度。
a = tf.constant([[1.0, 2.0], [3.0, 4.0]]) # (2,2)b = tf.constant([[5.0, 6.0], [7.0, 8.0]]) # (2,2)c = tf.concat([a, b], axis=0) # (4, 2)c = tf.stack([a, b], axis=0) # (2, 2, 2)d = tf.split(c, 2, axis=0) # [(1, 2, 2),(1, 2, 2)]
Tensor与Array的转换
c = np.array(b) # tensor 转 npc = b.numpy() # tensor 转 nptf.convert_to_tensor(c) # np 转 tensor
数学运算
tf.add(a,b) # 加法 a+btf.multiply(a,b) # 逐元素乘法a*btf.matmul(a,b) # 矩阵乘法a@b
类型转换
tensorflow支持的模型有:tf.float16、tf.float64、tf.int8、tf.int16、tf.int32...
a = tf.constant([2.2, 3.3, 4.4], dtype=tf.float64)b = tf.cast(a, dtype=tf.float16) # 类型转换
有三种计算图的构建方式:静态计算图,动态计算图,以及Autograph。在TensorFlow1.0时代,采用的是静态计算图,需要先使用TensorFlow的各种算子创建计算图,然后再开启一个会话Session,显式执行计算图。而在TensorFlow2.0时代,采用的是动态计算图,即每使用一个算子后,该算子会被动态加入到隐含的默认计算图中立即执行得到结果,而无需开启Session。使用动态计算图(Eager Excution)的好处是方便调试程序,它会让TensorFlow代码的表现和Python原生代码的表现一样,写起来就像写numpy一样,各种日志打印,控制流全部都是可以使用的。使用动态计算图的缺点是运行效率相对会低一些。因为使用动态图会有许多次Python进程和TensorFlow的C++进程之间的通信。而静态计算图构建完成之后几乎全部在TensorFlow内核上使用C++代码执行,效率更高。此外静态图会对计算步骤进行一定的优化,剪去和结果无关的计算步骤。
如果需要在TensorFlow2.0中使用静态图,可以使用@tf.function装饰器将普通Python函数转换成对应的TensorFlow计算图构建代码。运行该函数就相当于在TensorFlow1.0中用Session执行代码。使用tf.function构建静态图的方式叫做 Autograph。当然Autograph机制能够转换的代码并不是没有任何约束的,有一些编码规范需要遵循,否则可能会转换失败或者不符合预期。
计算图由节点(nodes)和线(edges)组成。节点表示操作符Operator,或者称之为算子,线表示计算间的依赖。实线表示有数据传递依赖,传递的数据即张量。虚线通常可以表示控制依赖,即执行先后顺序。

import tensorflow as tf# 使用autograph构建静态图@tf.functiondef strjoin(x,y): z = tf.strings.join([x,y],separator = " ") tf.print(z) return zresult = strjoin(tf.constant("hello"),tf.constant("world"))print(result)
您可以像这样测量静态图和动态图性能差异:


x = tf.random.uniform(shape=[10, 10], minval=-1, maxval=2, dtype=tf.dtypes.int32)def power(x, y): result = tf.eye(10, dtype=tf.dtypes.int32) for _ in range(y): result = tf.matmul(x, result) return resultprint("Eager execution:", timeit.timeit(lambda: power(x, 100), number=1000)) # 2.56378621799# 将python函数转换为图形power_as_graph = tf.function(power)print("Graph execution:", timeit.timeit(lambda: power_as_graph(x, 100), number=1000)) # 0.683253670
我们还可以再函数前使用装饰器 @tf.function 调用tf.function,同时也可以使用 tf.config.run_functions_eagerly(True) 关闭Function创建和运行图形的能力。
前面在介绍Autograph的编码规范时提到构建Autograph时应该避免在@tf.function修饰的函数内部定义tf.Variable。但是如果在函数外部定义tf.Variable的话,又会显得这个函数有外部变量依赖,封装不够完美。一种简单的思路是定义一个类,并将相关的tf.Variable创建放在类的初始化方法中。而将函数的逻辑放在其他方法中。


class DemoModule(tf.Module): def __init__(self, init_value=tf.constant(0.0), name=None): super(DemoModule, self).__init__(name=name) with self.name_scope: # 相当于with tf.name_scope("demo_module") self.x = tf.Variable(init_value, dtype=tf.float32, trainable=True) @tf.function def addprint(self, a): with self.name_scope: self.x.assign_add(a) tf.print(self.x) return self.x
自动微分用于训练神经网络的反向传播非常有用,TensorFlow 会记住在前向传递过程中哪些运算以何种顺序发生。随后,在后向传递期间,以相反的顺序遍历此运算列表来计算梯度。
Tensorflow一般使用tf.GradientTape来记录正向运算过程,然后反向传播自动计算梯度值。
$$f(x)=ax^2+bx+c$$
x = tf.Variable(0.0,name = "x",dtype = tf.float32)a = tf.constant(1.0)b = tf.constant(-2.0)c = tf.constant(1.0)with tf.GradientTape() as tape: y = a*tf.pow(x,2) + b*x + c dy_dx = tape.gradient(y,x)print(dy_dx) # tf.Tensor(-2.0, shape=(), dtype=float32)
对常量张量也可以求导,只不过需要增加watch


with tf.GradientTape() as tape: tape.watch([a,b,c]) y = a*tf.pow(x,2) + b*x + c dy_dx,dy_da,dy_db,dy_dc = tape.gradient(y,[x,a,b,c])print(dy_da) # tf.Tensor(0.0, shape=(), dtype=float32)print(dy_dc) # tf.Tensor(1.0, shape=(), dtype=float32)
可以求二阶导数


with tf.GradientTape() as tape2: with tf.GradientTape() as tape1: y = a*tf.pow(x,2) + b*x + c dy_dx = tape1.gradient(y,x) dy2_dx2 = tape2.gradient(dy_dx,x)print(dy2_dx2) # tf.Tensor(2.0, shape=(), dtype=float32)
利用梯度和优化器求最小值


# 求f(x) = a*x**2 + b*x + c的最小值# 使用optimizer.apply_gradientsx = tf.Variable(0.0,name = "x",dtype = tf.float32)a = tf.constant(1.0)b = tf.constant(-2.0)c = tf.constant(1.0)optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)for _ in range(1000): with tf.GradientTape() as tape: y = a*tf.pow(x,2) + b*x + c dy_dx = tape.gradient(y,x) # 计算梯度 optimizer.apply_gradients(grads_and_vars=[(dy_dx,x)]) # 根据梯度更新变量 tf.print("y =",y,"; x =",x)
如果不想被计算梯度:
with tf.GradientTape(watch_accessed_variables=False) as tape: pass
使用TensorFlow实现神经网络模型的一般流程包括:
tensorflow支持 Numpy 数组、Pandas DataFrame、Python 生成器、csv文件、文本文件、文件路径、TFrecords文件等方式构建数据管道。如果您的数据很小并且适合内存,我们建议您使用tf.data.Dataset.from_tensor_slices()从Numpy array构建数据管道
官方推荐使用 tf.data.Dataset.from_tensors() 或 tf.data.Dataset.from_tensor_slices() 创建数据集,Dataset支持一类特殊的操作Trainformation(打乱、生成epoch...等操作)
features = np.arange(0, 100, dtype=np.int32) # # (100,)labels = np.zeros(100, dtype=np.int32) # (100,)data = tf.data.Dataset.from_tensor_slices((features, labels)) # 创建数据集data = data.repeat() # 无限期地补充数据data = data.shuffle(buffer_size=100) # 打乱数据data = data.batch(batch_size=4) # 批量数据data = data.prefetch(buffer_size=1) # 预取批处理(预加载批处理,消耗更快)for batch_x, batch_y in data.take(5): print(batch_x.shape, batch_y.shape) # (4,) (4,) break
注意:如果你打算多次调用,你可以使用迭代器的方式:


ite_data = iter(data)for i in range(5):batch_x, batch_y = next(ite_data)print(batch_x, batch_y)for i in range(5):batch_x, batch_y = next(ite_data)print(batch_x, batch_y)
训练深度学习模型常常会非常耗时。模型训练的耗时主要来自于两个部分,一部分来自数据准备,另一部分来自参数迭代。参数迭代过程的耗时通常依赖于GPU来提升。而数据准备过程的耗时则可以通过构建高效的数据管道进行提升。
以下是一些构建高效数据管道的建议。
def generate_features(): # 函数生成一个随机字符串 def random_string(length): return ''.join(random.choice(string.ascii_letters) for m in range(length)) # 返回一个随机字符串、一个随机向量和一个随机整数 yield random_string(4), np.random.uniform(size=4), random.randint(0, 10)data = tf.data.Dataset.from_generator(generate_features, output_types=(tf.string, tf.float32, tf.int32))data = data.repeat() # 无限期地补充数据data = data.shuffle(buffer_size=100) # 打乱数据data = data.batch(batch_size=4) # 批量数据(将记录聚合在一起)data = data.prefetch(buffer_size=1) # 预取批量(预加载批量以便更快的消耗)# Display data.for batch_str, batch_vector, batch_int in data.take(5): # (4,) (4, 4) (4,) print(batch_str.shape, batch_vector.shape, batch_int.shape)
特别是,keras.utils.Sequence该类提供了一个简单的接口来构建 Python 数据生成器,该生成器可以感知多处理并且可以洗牌。
Sequence必须实现两种方法:
import tensorflow as tffrom keras.utils.data_utils import Sequenceclass SequenceDataset(Sequence): def __init__(self, batch_size): self.input_data = tf.random.normal((640, 8192, 1)) self.label_data = tf.random.normal((640, 8192, 1)) self.batch_size = batch_size def __len__(self): return int(tf.math.ceil(len(self.input_data) / float(self.batch_size))) # 每次输出一个batch def __getitem__(self, idx): batch_x = self.input_data[idx * self.batch_size:(idx + 1) * self.batch_size] batch_y = self.label_data[idx * self.batch_size:(idx + 1) * self.batch_size] return batch_x, batch_ysequence = SequenceDataset(batch_size=64)for batch_idx, (x, y) in enumerate(sequence): print(batch_idx, x.shape, y.shape) # tf.float32 # 0 (64, 8192, 1) (64, 8192, 1) break
深度学习模型一般由各种模型层组合而成,如果这些内置模型层不能够满足需求,我们也可以通过编写tf.keras.Lambda匿名模型层或继承tf.keras.layers.Layer基类构建自定义的模型层。其中tf.keras.Lambda匿名模型层只适用于构造没有学习参数的模型层。
搭建模型有以下3种方式构建模型:
model = keras.Sequential([ layers.Dense(2, activation="relu", name="layer1"), layers.Dense(3, activation="relu", name="layer2"), layers.Dense(4, name="layer3"),])
还可以通过add方法创建顺序模型
model = keras.Sequential()model.add(layers.Dense(2, activation="relu"))model.add(layers.Dense(3, activation="relu"))model.add(layers.Dense(4))
因为模型不知道输入shape,所以起初模型没有权重,因此我们需要告知模型输入shape
# 在第一层添加Inputmodel.add(keras.Input(shape=(4,)))model.add(layers.Dense(2, activation="relu"))# 或者在第一层添加input_shapemodel.add(layers.Dense(2, activation="relu", input_shape=(4,)))
顺序模型可以配合add 和 model.summary() 在模型的任何位置查看该层的输入输出。
函数式API搭建模型比Sequential更加灵活,可以处理具有非线性拓扑、共享层甚至多个输入或输出的模型。
inputs = keras.Input(shape=(784,))x = layers.Dense(64, activation="relu")(inputs)x = layers.Dense(64, activation="relu")(x)outputs = layers.Dense(10)(x)model = keras.Model(inputs=inputs, outputs=outputs, name="mnist_model")model.summary() # 查看模型摘要
还可以将模型绘制为图形
keras.utils.plot_model(model, "my_first_model_with_shape_info.png", show_shapes=True)

补充:函数式模型是可以嵌套的
在 TensorFlow 中,模型类的继承关系为:
tf.keras.Model > tf.keras.layers.Layer > tf.Module。
通常使用Layer类来定义内部计算块,并使用Model类定义外部模型(训练的对象)。
继承tf.Module栗子:


class SequentialModule(tf.Module): def __init__(self, name=None): super().__init__(name=name) self.dense_1 = Dense(in_features=3, out_features=3) self.dense_2 = Dense(in_features=3, out_features=2) def __call__(self, x): x = self.dense_1(x) return self.dense_2(x)my_model = SequentialModule(name="the_model")
继承tf.keras.layers.Layer栗子:


class ResBlock(layers.Layer): def __init__(self, kernel_size, **kwargs): super(ResBlock, self).__init__(**kwargs) self.kernel_size = kernel_size def build(self, input_shape): self.conv1 = layers.Conv1D(filters=64, kernel_size=self.kernel_size, activation="relu", padding="same") self.conv2 = layers.Conv1D(filters=32, kernel_size=self.kernel_size, activation="relu", padding="same") self.conv3 = layers.Conv1D(filters=input_shape[-1], kernel_size=self.kernel_size, activation="relu", padding="same") self.maxpool = layers.MaxPool1D(2) super(ResBlock, self).build(input_shape) # 相当于设置self.built = True def call(self, inputs): x = self.conv1(inputs) x = self.conv2(x) x = self.conv3(x) x = layers.Add()([inputs, x]) x = self.maxpool(x) return x # 如果要让自定义的Layer通过Functional API 组合成模型时可以序列化,需要自定义get_config方法。 def get_config(self): config = super(ResBlock, self).get_config() config.update({'kernel_size': self.kernel_size}) return configresblock = ResBlock(kernel_size=3)resblock.build(input_shape=(None, 200, 7))resblock.compute_output_shape(input_shape=(None, 200, 7))
继承tf.Module栗子:


class ImdbModel(models.Model): def __init__(self): super(ImdbModel, self).__init__() def build(self, input_shape): self.embedding = layers.Embedding(MAX_WORDS, 7) self.block1 = ResBlock(7) self.block2 = ResBlock(5) self.dense = layers.Dense(1, activation="sigmoid") super(ImdbModel, self).build(input_shape) def call(self, x): x = self.embedding(x) x = self.block1(x) x = self.block2(x) x = layers.Flatten()(x) x = self.dense(x) return (x)model = ImdbModel()model.build(input_shape=(None, 200))model.compile(optimizer='Nadam', loss='binary_crossentropy', metrics=['accuracy', "AUC"])
Model类具有Layer类相同的API,但有以下区别:
Layer 类对应于“层”,Model 类对应于“模型”,如果您想知道“我应该使用Layer类还是Model类?”,请问自己:我需要调用fit()它吗?我需要save() 吗?如果是这样,请与Model。如果不是,请使用Layer。
把Layer 类和Model 类用在一起,吃个栗子:


class Sampling(layers.Layer): """使用(z_mean, z_log_var)对z进行采样,z是对一个数字进行编码的向量""" def call(self, inputs): z_mean, z_log_var = inputs batch = tf.shape(z_mean)[0] dim = tf.shape(z_mean)[1] epsilon = tf.keras.backend.random_normal(shape=(batch, dim)) return z_mean + tf.exp(0.5 * z_log_var) * epsilonclass Encoder(layers.Layer): """将MNIST数字映射为一个三元组(z_mean, z_log_var, z)""" def __init__(self, latent_dim=32, intermediate_dim=64, name="encoder", **kwargs): super(Encoder, self).__init__(name=name, **kwargs) self.dense_proj = layers.Dense(intermediate_dim, activation="relu") self.dense_mean = layers.Dense(latent_dim) self.dense_log_var = layers.Dense(latent_dim) self.sampling = Sampling() def call(self, inputs): x = self.dense_proj(inputs) z_mean = self.dense_mean(x) z_log_var = self.dense_log_var(x) z = self.sampling((z_mean, z_log_var)) return z_mean, z_log_var, zclass Decoder(layers.Layer): """将已编码的数字向量z转换回可读的数字""" def __init__(self, original_dim, intermediate_dim=64, name="decoder", **kwargs): super(Decoder, self).__init__(name=name, **kwargs) self.dense_proj = layers.Dense(intermediate_dim, activation="relu") self.dense_output = layers.Dense(original_dim, activation="sigmoid") def call(self, inputs): x = self.dense_proj(inputs) return self.dense_output(x)class VariationalAutoEncoder(keras.Model): """将编码器和解码器组合成端到端的训练模型。""" def __init__(self, original_dim, intermediate_dim=64, latent_dim=32, name="autoencoder", **kwargs): super(VariationalAutoEncoder, self).__init__(name=name, **kwargs) self.original_dim = original_dim self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim) self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim) def call(self, inputs): z_mean, z_log_var, z = self.encoder(inputs) reconstructed = self.decoder(z) # Add KL divergence regularization loss. kl_loss = -0.5 * tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1) self.add_loss(kl_loss) return reconstructed
自定义循环训练模型


original_dim = 784vae = VariationalAutoEncoder(original_dim, 64, 32)optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)mse_loss_fn = tf.keras.losses.MeanSquaredError()loss_metric = tf.keras.metrics.Mean()(x_train, _), _ = tf.keras.datasets.mnist.load_data()x_train = x_train.reshape(60000, 784).astype("float32") / 255train_dataset = tf.data.Dataset.from_tensor_slices(x_train)train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)epochs = 2# Iterate over epochs.for epoch in range(epochs): print("Start of epoch %d" % (epoch,)) # Iterate over the batches of the dataset. for step, x_batch_train in enumerate(train_dataset): with tf.GradientTape() as tape: reconstructed = vae(x_batch_train) # Compute reconstruction loss loss = mse_loss_fn(x_batch_train, reconstructed) loss += sum(vae.losses) # Add KLD regularization loss grads = tape.gradient(loss, vae.trainable_weights) optimizer.apply_gradients(zip(grads, vae.trainable_weights)) loss_metric(loss) if step % 100 == 0: print("step %d: mean loss = %.4f" % (step, loss_metric.result()))
内置循环方法


vae = VariationalAutoEncoder(784, 64, 32)optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())vae.fit(x_train, x_train, epochs=2, batch_size=64)
函数式API和自定义Model类混搭训练


original_dim = 784intermediate_dim = 64latent_dim = 32# Define encoder model.original_inputs = tf.keras.Input(shape=(original_dim,), name="encoder_input")x = layers.Dense(intermediate_dim, activation="relu")(original_inputs)z_mean = layers.Dense(latent_dim, name="z_mean")(x)z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)z = Sampling()((z_mean, z_log_var))encoder = tf.keras.Model(inputs=original_inputs, outputs=z, name="encoder")# Define decoder model.latent_inputs = tf.keras.Input(shape=(latent_dim,), name="z_sampling")x = layers.Dense(intermediate_dim, activation="relu")(latent_inputs)outputs = layers.Dense(original_dim, activation="sigmoid")(x)decoder = tf.keras.Model(inputs=latent_inputs, outputs=outputs, name="decoder")# Define VAE model.outputs = decoder(z)vae = tf.keras.Model(inputs=original_inputs, outputs=outputs, name="vae")# Add KL divergence regularization loss.kl_loss = -0.5 * tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)vae.add_loss(kl_loss)# Train.optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())vae.fit(x_train, x_train, epochs=3, batch_size=64)函数式API训练模型
补充知识:自定义层
如果自定义模型层没有需要被训练的参数,一般推荐使用Lamda层实现。
mypower = layers.Lambda(lambda x:tf.math.pow(x,2))mypower(tf.range(5))
如果自定义模型层有需要被训练的参数,则可以通过继承Layer基类实现。Layer的子类化一般需要重新实现初始化方法,Build方法和Call方法。如果built = False,调用__call__时会先调用build方法, 再调用call方法。
一般来说,监督学习的目标函数由损失函数和正则化项组成(Objective = Loss + Regularization)。对于keras模型,目标函数中的正则化项一般在各层中指定,例如使用Dense的 kernel_regularizer 和 bias_regularizer等参数指定权重使用l1或者l2正则化项,此外还可以用kernel_constraint 和 bias_constraint等参数约束权重的取值范围,这也是一种正则化手段。
损失函数在模型编译时候指定。
常见的Loss可以参看Tensorflow的官网:tf.keras.losses
自定义损失函数接收两个张量y_true,y_pred作为输入参数,并输出一个标量作为损失函数值。
def focal_loss(gamma=2., alpha=.25): def focal_loss_fixed(y_true, y_pred): pt_1 = tf.where(tf.equal(y_true, 1), y_pred, tf.ones_like(y_pred)) pt_0 = tf.where(tf.equal(y_true, 0), y_pred, tf.zeros_like(y_pred)) loss = -tf.sum(alpha * tf.pow(1. - pt_1, gamma) * tf.log(1e-07+pt_1)) \ -tf.sum((1-alpha) * tf.pow( pt_0, gamma) * tf.log(1. - pt_0 + 1e-07)) return loss return focal_loss_fixed
也可以对tf.keras.losses.Loss进行子类化,重写call方法实现损失的计算逻辑,从而得到损失函数的类的实现。
class FocalLoss(losses.Loss): def __init__(self,gamma=2.0,alpha=0.25): self.gamma = gamma self.alpha = alpha def call(self,y_true,y_pred): pt_1 = tf.where(tf.equal(y_true, 1), y_pred, tf.ones_like(y_pred)) pt_0 = tf.where(tf.equal(y_true, 0), y_pred, tf.zeros_like(y_pred)) loss = -tf.sum(self.alpha * tf.pow(1. - pt_1, self.gamma) * tf.log(1e-07+pt_1)) \ -tf.sum((1-self.alpha) * tf.pow( pt_0, self.gamma) * tf.log(1. - pt_0 + 1e-07)) return loss
人们通常会通过度量函数来从另一个方面 评估模型的好坏,度量函数不要求连续可导,
Tensorflow内置的评估指标可以常见:tf.keras.metrics
如果编写函数形式的评估指标,则只能取epoch中各个batch计算的评估指标结果,这个结果通常会偏离拿整个epoch数据一次计算的结果。


@tf.functiondef ks(y_true,y_pred): y_true = tf.reshape(y_true,(-1,)) y_pred = tf.reshape(y_pred,(-1,)) length = tf.shape(y_true)[0] t = tf.math.top_k(y_pred,k = length,sorted = False) y_pred_sorted = tf.gather(y_pred,t.indices) y_true_sorted = tf.gather(y_true,t.indices) cum_positive_ratio = tf.truediv( tf.cumsum(y_true_sorted),tf.reduce_sum(y_true_sorted)) cum_negative_ratio = tf.truediv( tf.cumsum(1 - y_true_sorted),tf.reduce_sum(1 - y_true_sorted)) ks_value = tf.reduce_max(tf.abs(cum_positive_ratio - cum_negative_ratio)) return ks_value
由于训练的过程通常是分批次训练的,而评估指标要跑完一个epoch才能够得到整体的指标结果。因此,类形式的评估指标更为常见。即需要编写update_state方法在每个batch后更新相关中间变量的状态,编写result方法输出最终指标结果。


class KS(metrics.Metric): def __init__(self, name="ks", **kwargs): super(KS, self).__init__(name=name, **kwargs) self.true_positives = self.add_weight( name="tp", shape=(101,), initializer="zeros") self.false_positives = self.add_weight( name="fp", shape=(101,), initializer="zeros") @tf.function def update_state(self, y_true, y_pred): y_true = tf.cast(tf.reshape(y_true, (-1,)), tf.bool) y_pred = tf.cast(100 * tf.reshape(y_pred, (-1,)), tf.int32) for i in tf.range(0, tf.shape(y_true)[0]): if y_true[i]: self.true_positives[y_pred[i]].assign( self.true_positives[y_pred[i]] + 1.0) else: self.false_positives[y_pred[i]].assign( self.false_positives[y_pred[i]] + 1.0) return (self.true_positives, self.false_positives) @tf.function def result(self): cum_positive_ratio = tf.truediv( tf.cumsum(self.true_positives), tf.reduce_sum(self.true_positives)) cum_negative_ratio = tf.truediv( tf.cumsum(self.false_positives), tf.reduce_sum(self.false_positives)) ks_value = tf.reduce_max(tf.abs(cum_positive_ratio - cum_negative_ratio)) return ks_valuey_true = ...y_pred = ...myks = KS()myks.update_state(y_true, y_pred)tf.print(myks.result())
机器学习界有一群炼丹师,他们每天的日常是:拿来药材(数据),架起八卦炉(模型),点着六味真火(优化算法),就摇着蒲扇等着丹药出炉了。不过,当过厨子的都知道,同样的食材,同样的菜谱,但火候不一样了,这出来的口味可是千差万别。火小了夹生,火大了易糊,火不匀则半生半糊。机器学习也是一样,模型优化算法的选择直接关系到最终模型的性能。有时候效果不好,未必是特征的问题或者模型设计的问题,很可能就是优化算法的问题。
深度学习优化算法大概经历了 SGD -> SGDM -> NAG ->Adagrad -> Adadelta(RMSprop) -> Adam -> Nadam 这样的发展历程。model.compile(optimizer=optimizers.SGD(learning_rate=0.01), loss=loss)
对于一般新手炼丹师,优化器直接使用Adam,并使用其默认参数就OK了。一些爱写论文的炼丹师由于追求评估指标效果,可能会偏爱前期使用Adam优化器快速下降,后期使用SGD并精调优化器参数得到更好的结果。此外目前也有一些前沿的优化算法,据称效果比Adam更好,例如LazyAdam, Look-ahead, RAdam, Ranger等。
初始化优化器时会创建一个变量optimier.iterations用于记录迭代的次数。因此优化器和tf.Variable一样,一般在@tf.function外创建。
优化器主要使用apply_gradients方法传入变量和对应梯度从而来对给定变量进行迭代,
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)with tf.GradientTape() as tape: ...grads = tape.gradient(loss, model.trainable_weights) # 根据损失 求梯度optimizer.apply_gradients(zip(grads, model.trainable_weights)) # 根据梯度 优化模型
或者直接使用minimize方法对目标函数进行迭代优化。
@tf.functiondef train(epoch=1000): for _ in tf.range(epoch): optimizer.minimize(loss, model.trainable_weights) tf.print("epoch = ", optimizer.iterations) return loss
当然,更常见的使用是在编译时将优化器传入model.fit
model.compile(optimizer=optimizers.SGD(learning_rate=0.01), loss=loss)
tf.keras的回调函数实际上是一个类,一般是在model.fit时作为参数指定,一般收集一些日志信息,改变学习率等超参数,提前终止训练过程等等。
大部分时候,keras.callbacks子模块中定义的回调函数类已经足够使用了,如果有特定的需要,我们也可以通过对keras.callbacks.Callbacks实施子类化构造自定义的回调函数。
Tensorboard有助于追踪模型训练过程的Scalars、Graphs、Distributions等等
当使用Model.fit() 函数进行训练时, 添加 tf.keras.callback.TensorBoard 回调函数可确保创建和存储日志
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir="./event_file")model.fit(... callbacks=[tensorboard_callback])
summary_writer = tf.summary.create_file_writer("./event_file")with summary_writer.as_default(): tf.summary.scalar('train/loss', train_loss, step=epoch) tf.summary.scalar('train/accuracy', train_accuracy, step=epoch) tf.summary.scalar('val/loss', val_loss, step=epoch) tf.summary.scalar('val/accuracy', val_accuracy, step=epoch)
启动Tensorboard,在当前文件夹中,cmd运行:
tensorboard --logdir "./"
训练模型通常有3种方法,
model.fit( x=None, y=None, batch_size=None, epochs=1, verbose='auto', callbacks=None, validation_split=0.0, validation_data=None, shuffle=True, initial_epoch=0, steps_per_epoch=None)
参数:
x,它可以是 Numpy 数组或 TensorFlow 张量。它应该是一致的x(你不能有 Numpy 输入和张量目标,或者相反)。如果x是数据集、生成器或keras.utils.Sequence实例,y则不应指定(因为目标将从 获取x)。tf.keras.callbacks。返回:history,调用 history.history 可以查看训练期间损失值和度量值的记录
该内置方法相比较fit方法更加灵活,可以不通过回调函数而直接在batch层次上更加精细地控制训练的过程。
for epoch in tf.range(1, epoches + 1): for x, y in ds_train: train_result = model.train_on_batch(x, y) for x, y in ds_valid: valid_result = model.test_on_batch(x, y, reset_metrics=False)
自定义训练循环无需编译模型,直接利用优化器根据损失函数反向传播迭代参数,拥有最高的灵活性。
训练循环包括按顺序重复执行三个任务:
optimizer = keras.optimizers.SGD(learning_rate=1e-3) # 实例化一个优化器loss_fn = keras.losses.BinaryCrossentropy() # 实例化损失函数train_loss = keras.metrics.Mean(name='train_loss')valid_loss = keras.metrics.Mean(name='valid_loss')train_metric = keras.metrics.BinaryAccuracy(name='train_accuracy')valid_metric = keras.metrics.BinaryAccuracy(name='valid_accuracy')@tf.functiondef train_step(features, labels): with tf.GradientTape() as tape: logits = model(features, training=True) loss_value = loss_fn(labels, logits) # loss_value += sum(model.losses) # 添加额外的损失 grads = tape.gradient(loss_value, model.trainable_weights) optimizer.apply_gradients(zip(grads, model.trainable_weights)) train_loss.update_state(loss_value) train_metric.update_state(labels, logits)@tf.functiondef valid_step(features, labels): val_logits = model(features, training=False) loss_value = loss_fn(labels, val_logits) valid_loss.update_state(loss_value) valid_metric.update_state(labels, val_logits)epochs = 2for epoch in range(epochs): start_time = time.time() for step, (x_batch_train, y_batch_train) in enumerate(train_dataset): loss_value = train_step(x_batch_train, y_batch_train) # 在每个epoch结束时运行验证循环 for x_batch_val, y_batch_val in val_dataset: valid_step(x_batch_val, y_batch_val) if epoch % 5 == 0: print('Epoch={},Loss:{},Accuracy:{},Valid Loss:{},Valid Accuracy:{}'.format(epoch, train_loss.result(), train_metric.result(), valid_loss.result(), valid_metric.result())) train_loss.reset_states() valid_loss.reset_states() train_metric.reset_states() valid_metric.reset_states() print("运行时间: %.2fs" % (time.time() - start_time))
通过自定义训练循环训练的模型没有经过编译,无法直接使用model.evaluate(ds_valid)方法
model.evaluate(x = x_test,y = y_test)
可以使用以下方法:
model.predict(ds_test)
model(x_test)
model.call(x_test)
model.predict_on_batch(x_test)
推荐优先使用model.predict(ds_test)方法,既可以对Dataset,也可以对Tensor使用。
可以使用Keras方式保存模型,也可以使用TensorFlow原生方式保存。前者仅仅适合使用Python环境恢复模型,后者则可以跨平台进行模型部署。推荐使用后一种方式进行保存。
保存的模型包括:
# 保存模型model.save('path/to/location') del model #删除现有模型# 加载模型model = models.load_model('path/to/location')
我们可以使用两种格式将整个模型保存到磁盘:
可以通过以下方式保存 H5 格式:
我们还可以只保存模型结构
json_str = model.to_json() # 保存模型结构model_json = models.model_from_json(json_str) # 恢复模型结构
或者只保存模型权重
# 保存模型权重model.save_weights(...)# 恢复模型结构model_json = models.model_from_json(json_str)model_json.compile(...)# 加载权重model_json.load_weights(...)
保存模型结构与模型参数到文件,该方式保存的模型具有跨平台性便于部署
model.save(..., save_format="tf") # 保存模型model_loaded = tf.keras.models.load_model(...) # 加载模型
也可以仅保存权重
model.save_weights(...,save_format = "tf")


import tensorflow as tffrom tensorflow import kerasfrom tensorflow.keras import layersimport numpy as npimport time# 超参数lr = 1e-3 # 学习率batch_size = 64epochs = 2# 数据准备(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()x_train = np.reshape(x_train, (-1, 784))x_test = np.reshape(x_test, (-1, 784))# 保留1万个样品用于验证x_val = x_train[-10000:]y_val = y_train[-10000:]x_train = x_train[:-10000]y_train = y_train[:-10000]# 准备训练数据集train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)# 准备验证数据集val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))val_dataset = val_dataset.batch(batch_size)# 搭建模型inputs = keras.Input(shape=(784,), name="digits")x1 = layers.Dense(64, activation="relu")(inputs)x2 = layers.Dense(64, activation="relu")(x1)outputs = layers.Dense(10, name="predictions")(x2)model = keras.Model(inputs=inputs, outputs=outputs)optimizer = keras.optimizers.SGD(learning_rate=lr) # 实例化一个优化器loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True) # 实例化损失函数train_metric = keras.metrics.BinaryAccuracy(name='train_accuracy')valid_metric = keras.metrics.BinaryAccuracy(name='valid_accuracy')@tf.functiondef train_step(x, y): with tf.GradientTape() as tape: logits = model(x, training=True) loss_value = loss_fn(y, logits) # loss_value += sum(model.losses) # 添加额外的损失 grads = tape.gradient(loss_value, model.trainable_weights) optimizer.apply_gradients(zip(grads, model.trainable_weights)) train_metric.update_state(y, logits) return loss_value@tf.functiondef test_step(x, y): val_logits = model(x, training=False) valid_metric.update_state(y, val_logits)for epoch in range(epochs): start_time = time.time() for step, (x_batch_train, y_batch_train) in enumerate(train_dataset): loss_value = train_step(x_batch_train, y_batch_train) if step % 200 == 0: print("训练损失( %d: %.4f" % (step, float(loss_value))) train_acc = train_metric.result() print("训练精度: %.4f" % (float(train_acc),)) train_metric.reset_states() # 在每个epoch结束时重置训练指标 # 在每个epoch结束时运行验证循环 for x_batch_val, y_batch_val in val_dataset: test_step(x_batch_val, y_batch_val) val_acc = valid_metric.result() valid_metric.reset_states() print("验证精度: %.4f" % (float(val_acc),)) print("运行时间: %.2fs" % (time.time() - start_time))
深度学习的训练过程常常非常耗时,一个模型训练几个小时是家常便饭,训练几天也是常有的事情,有时候甚至要训练几十天。
训练过程的耗时主要来自于两个部分,一部分来自数据准备,另一部分来自参数更新。
数据准备过程可以使用更多进程处理数据来缩减时间。参数更新时间可以应用GPU或者Google的TPU来进行加速。
当存在可用的GPU时,如果不特意指定device,tensorflow会自动优先选择使用GPU来创建张量和执行张量计算。但如果是在公司或者学校实验室的服务器环境,存在多个GPU和多个使用者时,为了不让单个同学的任务占用全部GPU资源导致其他同学无法使用(tensorflow默认获取全部GPU的全部内存资源权限,但实际上只使用一个GPU的部分资源),我们通常会在开头增加以下几行代码以控制每个任务使用的GPU编号和显存大小,以便其他同学也能够同时训练模型。
gpus = tf.config.list_physical_devices("GPU")tf.print(gpus)# [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'),# PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU'),# PhysicalDevice(name='/physical_device:GPU:2', device_type='GPU'),# PhysicalDevice(name='/physical_device:GPU:3', device_type='GPU')]if gpus: gpu0 = gpus[0] # 如果有多个GPU,仅使用第0个GPU tf.config.experimental.set_memory_growth(gpu0, True) # 设置GPU显存用量按需使用 # 或者也可以设置GPU显存为固定使用量(例如:4G) # tf.config.experimental.set_virtual_device_configuration(gpu0, # [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=4096)]) tf.config.set_visible_devices([gpu0], "GPU")
TensorFlow 在 tf.distribute.MirroredStrategy 中为我们提供了单机多卡训练策略,使用这种策略时,我们只需实例化一个 MirroredStrategy 策略,并将模型构建的代码放入 strategy.scope() 的上下文环境中:
strategy = tf.distribute.MirroredStrategy()with strategy.scope(): # 模型构建代码
可以在参数中指定设备,如:
# 指定只使用第 0、1 号 GPU 参与分布式策略。strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
MirroredStrategy 的步骤如下:
默认情况下,TensorFlow 中的 MirroredStrategy 策略使用 NVIDIA NCCL 进行 All-reduce 操作。
tf.distribute.Strategy被集成到tf.keras,tf.keras是用于构建和训练模型的高级 API。您可以使用 Model.fit 来无缝分布式训练模型
TensorFlow 分布策略支持所有类型的 Keras 模型——Sequential、Functional和subclassed。以下是您需要在代码中更改的内容:
num_epochs = 5batch_size_per_replica = 64 # 每个显卡上的batch数learning_rate = 0.001strategy = tf.distribute.MirroredStrategy()print('Number of devices: %d' % strategy.num_replicas_in_sync) # 输出设备数量batch_size = batch_size_per_replica * strategy.num_replicas_in_sync # 总batch_sizedataset = ...with strategy.scope(): model = tf.keras.applications.MobileNetV2(weights=None, classes=2) model.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), loss=tf.keras.losses.sparse_categorical_crossentropy, metrics=[tf.keras.metrics.sparse_categorical_accuracy] )model.fit(dataset, epochs=num_epochs)
分布式训练汇总


import osimport tensorflow as tfimport tensorflow_datasets as tfdsstrategy = tf.distribute.MirroredStrategy()print('设备数量: {}'.format(strategy.num_replicas_in_sync))epochs = 12batch_size_per_replica = 64batch_size = batch_size_per_replica * strategy.num_replicas_in_sync# 数据集def scale(image, label): image = tf.cast(image, tf.float32) image /= 255 return image, labeldatasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)mnist_train, mnist_test = datasets['train'], datasets['test']train_dataset = mnist_train.map(scale).cache().shuffle(10000).batch(batch_size)eval_dataset = mnist_test.map(scale).batch(batch_size)with strategy.scope(): model = tf.keras.Sequential([ tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)), tf.keras.layers.MaxPooling2D(), tf.keras.layers.Flatten(), tf.keras.layers.Dense(64, activation='relu'), tf.keras.layers.Dense(10) ]) model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy'])checkpoint_dir = './training_checkpoints' # 定义用于存储检查点的检查点目录checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}") # 定义检查点文件的名称# 定义一个函数来衰减学习速率。def decay(epoch): if epoch < 3: return 1e-3 elif epoch >= 3 and epoch < 7: return 1e-4 else: return 1e-5# 定义一个回调函数,用于在每个epoch的末尾打印学习速率class PrintLR(tf.keras.callbacks.Callback): def on_epoch_end(self, epoch, logs=None): print('\nepoch的 {} 学习率是 {}'.format(epoch + 1, model.optimizer.lr.numpy()))# 把所有的回调放在一起callbacks = [tf.keras.callbacks.TensorBoard(log_dir='./logs'), tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix, save_weights_only=True), tf.keras.callbacks.LearningRateScheduler(decay), PrintLR()]# 训练和评估model.fit(train_dataset, epochs=epochs, callbacks=callbacks)
保存为SavedModel格式模型,您可以使用或不使用Strategy.scope.


path = 'saved_model/'model.save(path, save_format='tf')# 加载模型,不Strategy.scopeunreplicated_model = tf.keras.models.load_model(path)unreplicated_model.compile( loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy'])eval_loss, eval_acc = unreplicated_model.evaluate(eval_dataset)print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))# 加载模型Strategy.scopewith strategy.scope(): replicated_model = tf.keras.models.load_model(path) replicated_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy']) eval_loss, eval_acc = replicated_model.evaluate(eval_dataset) print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))
1、在mirrored_strategy.scope() 内创建模型和优化器
with mirrored_strategy.scope(): model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))]) optimizer = tf.keras.optimizers.SGD()
2、调用 tf.distribute.Strategy.experimental_distribute_dataset 创建分布式数据集
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)3、使用 tf.nn.compute_average_loss 计算损失。tf.nn.compute_average_loss对每个样本损失求和并将总和除以global_batch_size。
def compute_loss(labels, predictions): per_example_loss = loss_object(labels, predictions) return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)
4、将train_step放入 tf.distribute.Strategy.run 中,并传入之前创建的数据集
5、使用 tf.distribute.Strategy.reduce 来聚合 tf.distribute.Strategy.run。tf.distribute.Strategy.run返回每个GPU结果。您还可以tf.distribute.Strategy.experimental_local_results获取结果值列表。
def train_step(inputs): features, labels = inputs with tf.GradientTape() as tape: predictions = model(features, training=True) loss = compute_loss(labels, predictions) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) return loss@tf.functiondef distributed_train_step(dist_inputs): per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,)) return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
6、迭代dist_dataset并循环运行训练:
for dist_inputs in dist_dataset: print(distributed_train_step(dist_inputs))
分布式训练汇总


import tensorflow as tffrom tensorflow import kerasfrom tensorflow.keras import layersimport numpy as npimport osstrategy = tf.distribute.MirroredStrategy() # 实例化MirroredStrategyprint('设备数量: {}'.format(strategy.num_replicas_in_sync))epochs = 10batch_size_per_replica = 64 # 每个GPU上得batch数global_batch_size = batch_size_per_replica * strategy.num_replicas_in_sync # 总batch数# 数据集fashion_mnist = tf.keras.datasets.fashion_mnist(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()train_images = train_images[..., None]test_images = test_images[..., None]# 获取[0,1]范围内的图像train_images = train_images / np.float32(255)test_images = test_images / np.float32(255)buffer_size = len(train_images)train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(buffer_size).batch( global_batch_size)test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(global_batch_size)train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset) # 分布式数据集test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset) # 分布式数据集# 创建模型def create_model(): model = tf.keras.Sequential([ tf.keras.layers.Conv2D(32, 3, activation='relu'), tf.keras.layers.MaxPooling2D(), tf.keras.layers.Conv2D(64, 3, activation='relu'), tf.keras.layers.MaxPooling2D(), tf.keras.layers.Flatten(), tf.keras.layers.Dense(64, activation='relu'), tf.keras.layers.Dense(10) ]) return model# 创建一个检查点目录来存储检查点checkpoint_dir = './training_checkpoints'checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")with strategy.scope(): # 创建训练损失 # 将reduction设置为“none”,这样我们可以在之后进行reduction,并除以全局batch size loss_object = tf.keras.losses.SparseCategoricalCrossentropy( from_logits=True, reduction=tf.keras.losses.Reduction.NONE) def compute_loss(labels, predictions): per_example_loss = loss_object(labels, predictions) return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size) test_loss = tf.keras.metrics.Mean(name='test_loss') # 创建测试损失 train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy') # 训练精度 test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy') # 测试精度 # 模型、优化器和检查点必须创建在 'strategy.scope' 下。 model = create_model() optimizer = tf.keras.optimizers.Adam() checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)# 训练stepdef train_step(inputs): images, labels = inputs with tf.GradientTape() as tape: predictions = model(images, training=True) loss = compute_loss(labels, predictions) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) train_accuracy.update_state(labels, predictions) return loss# 测试stepdef test_step(inputs): images, labels = inputs predictions = model(images, training=False) t_loss = loss_object(labels, predictions) test_loss.update_state(t_loss) test_accuracy.update_state(labels, predictions)# 分布式训练@tf.functiondef distributed_train_step(dataset_inputs): per_replica_losses = strategy.run(train_step, args=(dataset_inputs,)) return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)# 分布式测试@tf.functiondef distributed_test_step(dataset_inputs): return strategy.run(test_step, args=(dataset_inputs,))for epoch in range(epochs): # 训练循环 total_loss = 0.0 num_batches = 0 for x in train_dist_dataset: total_loss += distributed_train_step(x) num_batches += 1 train_loss = total_loss / num_batches # 测试循环 for x in test_dist_dataset: distributed_test_step(x) if epoch % 10 == 0: checkpoint.save(checkpoint_prefix) print("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}".format(epoch + 1, train_loss, train_accuracy.result() * 100, test_loss.result(), test_accuracy.result() * 100)) test_loss.reset_states() train_accuracy.reset_states() test_accuracy.reset_states()
tf.distribute.Strategy 可以再没有strategy的情况下恢复最新的检查点并测试


test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(global_batch_size)eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='eval_accuracy')new_model = create_model()new_optimizer = tf.keras.optimizers.Adam()@tf.functiondef eval_step(images, labels): predictions = new_model(images, training=False) eval_accuracy(labels, predictions)checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))for images, labels in test_dataset: eval_step(images, labels)print('在没有strategy的情况下恢复保存的模型后的准确性: {}'.format(eval_accuracy.result() * 100))
TensorFlow Lite 是 TensorFlow 在移动和 IoT 等边缘设备端的解决方案,提供了 Java、Python 和 C++ API 库,可以运行在 Android、iOS 和 Raspberry Pi 等设备上。AI技术在边缘设备上的应用,TFLite 将会是愈发重要的角色。
目前 TFLite 只提供了推理功能,在服务器端进行训练后,经过如下简单处理即可部署到边缘设备上。
【电子书】简单粗暴 TensorFlow 2
【知乎】最全Tensorflow2.0 入门教程持续更新
【和鲸社区】30天吃掉那只TensorFlow2.0 | Github
【书籍】TensorFlow 2深度学习开源书 | PDF下载 提取码:juqs
【bilibili】tensorflow2.0入门与实战 2019年最通俗易懂的课程
【bilibili】神经网络与深度学习——TensorFlow2.0实战【中文课程】
【github】TensorFlow-Examples
【github】TensorFlow-2.x-Tutorials