在具体介绍 TensorFlow 分布式的各种 Strategy 之前,我们首先需要看看分布式的基础:分布式环境。只有把基础打扎实了,才能在以后的分析工作之中最大程度的扫清障碍,事半功倍。
本文代码使用的部分 API 不是最新,但因为我们的目的是了解其设计思想,旧的 API 反而会更加清晰(目前业界很多公司也依然基于较低版本的 TensroFlow,所以旧 API 也有相当的分析意义)。
这里强烈推荐两个大神:
[TensorFlow Internals] (https://github.com/horance-liu/tensorflow-internals),虽然其分析的不是最新代码,但是建议对 TF 内部实现机制有兴趣的朋友都去阅读一下,绝对大有收获。
https://home.cnblogs.com/u/deep-learning-stacks/ 西门宇少,不仅仅是 TensorFlow,其公共号还有更多其他领域,业界前沿。
本系列其他文章是:
[翻译] TensorFlow 分布式之论文篇 "TensorFlow : Large-Scale Machine Learning on Heterogeneous Distributed Systems"
[翻译] TensorFlow 分布式之论文篇 "Implementation of Control Flow in TensorFlow"
我们从几个不同角度来对分布式模式进行拆分,如何划分不是绝对的,这些角度也不是正交的,可能会彼此有部分包含,这么划分只是笔者觉得更容易从这些方面理解。
我们首先从集群和业务逻辑角度来拆分如下,有术语如下:
Cluster:TensorFlow 集群定义。
Job:一个 job 包含一系列致力于完成某个相同目标的 task,一个 job 中的 tasks 通常会运行在不同的机器中。一般存在两种 job:
Task:一个 Task 会完成一个具体任务,一般会关联到某个 TensorFlow 服务端的处理过程。
我们给出以上三者的关系如下,Cluster 包含多个 Job,Job 包括 1 到多个 Task:
图 1 角色之间关系
对于 Job 两种角色,我们给出一幅经典的参数服务器示意图如下,下图上方就是运行的 ps 集群,中间运行了四个 worker。
图 2 参数服务器.
来源:"A Survey on Distributed Machine Learning"
我们看看用低阶 API 如何实现分布式训练。
我们首先创建集群,集群包括两种角色,参数服务器 ps job 有三个任务(task),worker job 有两个 task。这里每一个 task 是一个机器,也可以在同一个机器之上运行多个 task(比如每个 task 控制不同的 GPU 设备)。
ClusterSpec 以 Job 的方式组织,指定了集群中 Task 如何部署,因为一个 Task 对应了一个进程,所以ClusterSpec 也描述了 TensorFlow 分布式运行时之中进程如何分布。
ps_hosts = ["1.1.1.1:11", "2.2.2.2:22"]worker_hosts = ["3.3.3.3:33", "4.4.4.4:44", "5.5.5.5:55"]cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
接下来启动若干任务,用户脚本需要在每一个机器上都运行,一共运行 5 次(3 个 ps,2 个 worker)。每个任务之中,都需要使用同一个 tf.train.ClusterSpec 来了解集群之中所有的任务。然后会启动一个 tf.distribution.Server服务。
一个 tf.distribution.Server 实例封装了一组设备和一个 tf.compat.v1.Session 目标,可以参与分布式训练。一个服务属于一个集群(由 tf.train.ClusterSpec 指定),并对应于一个指定作业中的特定任务。该服务可以与同一集群中的任何其他服务通信。
FLAGS = tf.app.flags.FLAGSserver = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
因为已经启动了 Server,所以每个任务或者说节点的具体执行逻辑就不同了。代码之中根据脚本执行的命令参数不同来决定这个Server执行的是哪个任务。
if FLAGS.job_name == "ps": server.join() elif FLAGS.job_name == "worker": with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)):
我们接下来从分布式业务逻辑/架构角度来具体分析一下。大家知道,Master-Worker 架构是分布式系统之中非常常见的一种架构组织形式,比如:GFS 之中有 Master,ChunkServer,Spanner 有 Zonemaster 和 Spanserver,Spark 有 driver和executor,Flink 有 JobManager 和 TaskManager。此架构下,Master 通常维护集群元信息,调度任务,Workers 则负责具体计算或者维护具体数据分片。
其实,TensorFlow 分布式也是采用了 Master-Worker 架构,为了更好的说明,我们给出一个官方的分布式 TensorFlow 的架构图,图上三个角色都是从逻辑视角来看。
图上的集群包括三个节点,每个节点上都运行一个 TensorFlow Server。这里 Master,Worker 每一个都是 TensorFlow Server。
图 3 集群,来自 TensorFlow
我们接下来从具体软件实现角度来剖析,在具体实现上可以分解为如下概念:
TensorFlow Server :Server 是运行 tf.train.Server 实例的进程,是一个集群中的一员,Server 通常包括 Master Service 与一个 Worker Service。Server 可以和集群中的其他 Server 进行通信。
Master Service :一个 GRPC service,用于同一系列远端的分布式设备进行交互,用来协调调度多个 worker service。
Master Session:一个主会话(master session)负责以下工作。
Worker Session: worker 通过 Worker Session 来标识一个执行序列(注册计算图,执行命令),Worker Session 属于一个 Master Session。
Worker service:这是一个 GRPC service,代表 MasterService 在一组本地设备上执行数据流计算图。一个 worker service 会保持/跟踪客户计算图的多个子图,这些子图对应了应该在这个 worker 上执行的节点,也包括那些进程间通信所需的任何额外节点。Worker service 对应 worker_service.proto。所有的 TensorFlow server 也都实现了 worker service。
我们现在知道,每个 Server 之上都会运行 MasterService 和 WorkerService 两个服务,这意味着 server 可能同时扮演 Master 和 Worker 两个角色,比如回到上图,图上的集群包括三个节点,每个节点上都运行一个 TensorFlow Server。这里 Master,Worker 每一个都是 TensorFlow Server,每个 server 之上都有两种 service(MasterService 和 WorkerService),只不过在这个系统之中,目前实际有角色意义的分别是 MasterService(Master之上的) 和 WorkerService(两个 worker 之上的),图之中用下划线表示。
图 4 服务
我们接着看一些其他可能。
图 5 多个Client 接入
分布式运行的核心也是如何操作计算图,但是计算功能被拆分为 Client,Master 和 Worker 三个角色。Client 负责构造计算图,Worker 负责执行具体计算,但是 Worker 如何知道应该计算什么?TensorFlow 在两者之间插入了一个 Master 角色来负责协调,调度。
在分布式模式下,对于计算图会进行分裂,执行操作。
因为执行是按照切分来的,所以我们这里只演示切分如下:
图 6 切分计算图
最后,我们从通信角度来对分布式模式进行分析。TF 的消息传输的通信组件叫做 Rendezvous,这是一个从生产者向消费者传递张量的抽象,一个 rendezvous 是一个通道(channels)的表(table)。生产者调用 Send() 方法,在一个命名的通道上发送一个张量。消费者调用 Recv() 方法,从一个指定的通道接收一个张量。
在分布式模式之中,对跨设备的边会进行分裂,在边的发送端和接收端会分别插入 Send 节点和 Recv 节点。
比如下图,左面是原始计算图,右面是分裂之后的计算图,5 个节点被分配到两个 worker 之上。
图 7 分裂计算图
我们假设 Worker 0 有两个 GPU,当插入Send 节点和 Recv 节点,效果如下,其中 Worker 1 发送给 Worker 之间的代表进程间通过 GrpcRemoteRendezvous 实现数据交换,Worker 0 内部两个 GPU 之间的虚线箭头代表进程内部通过 IntraProcessRendezvous 实现数据交换。
图 8 通信角度
我们接下来就看看 Server 的总体概况。
Server 的接口位于 tensorflow/core/protobuf/tensorflow_server.proto,具体如下:
// Defines the configuration of a single TensorFlow server.message ServerDef { // The cluster of which this server is a member. ClusterDef cluster = 1; // The name of the job of which this server is a member. // // NOTE(mrry): The cluster field must contain a JobDef with a name field // that matches this name. string job_name = 2; // The task index of this server in its job. // // NOTE: The cluster field must contain a JobDef with a matching name // and a mapping in its tasks field for this index. int32 task_index = 3; // The default configuration for sessions that run on this server. ConfigProto default_session_config = 4; // The protocol to be used by this server. // // Acceptable values include: "grpc", "grpc+verbs". string protocol = 5; // The server port. If not set, then we identify the port from the job_name. int32 port = 6; // Device filters for remote tasks in the cluster. // NOTE: This is an experimental feature and only effective in TensorFlow 2.x. ClusterDeviceFilters cluster_device_filters = 7;}
可以从多个角度来看Server。
图 9 GrpcServer 结构
具体Python接口定义在 tensorflow/python/training/server_lib.py 之中。
@tf_export("distribute.Server", v1=["distribute.Server", "train.Server"])@deprecation.deprecated_endpoints("train.Server")class Server(object): """An in-process TensorFlow server, for use in distributed training. A tf.distribute.Server instance encapsulates a set of devices and a tf.compat.v1.Session target that can participate in distributed training. A server belongs to a cluster (specified by a tf.train.ClusterSpec), and corresponds to a particular task in a named job. The server can communicate with any other server in the same cluster. """ def __init__(self, server_or_cluster_def, job_name=None, task_index=None, protocol=None, config=None, start=True): """Creates a new server with the given definition. The job_name, task_index, and protocol arguments are optional, and override any information provided in server_or_cluster_def. Args: server_or_cluster_def: A tf.train.ServerDef or tf.train.ClusterDef protocol buffer, or a tf.train.ClusterSpec object, describing the server to be created and/or the cluster of which it is a member. job_name: (Optional.) Specifies the name of the job of which the server is a member. Defaults to the value in server_or_cluster_def, if specified. task_index: (Optional.) Specifies the task index of the server in its job. Defaults to the value in server_or_cluster_def, if specified. Otherwise defaults to 0 if the server's job has only one task. protocol: (Optional.) Specifies the protocol to be used by the server. Acceptable values include "grpc", "grpc+verbs". Defaults to the value in server_or_cluster_def, if specified. Otherwise defaults to "grpc". config: (Options.) A tf.compat.v1.ConfigProto that specifies default configuration options for all sessions that run on this server. start: (Optional.) Boolean, indicating whether to start the server after creating it. Defaults to True. Raises: tf.errors.OpError: Or one of its subclasses if an error occurs while creating the TensorFlow server. """ self._server_def = _make_server_def(server_or_cluster_def, job_name, task_index, protocol, config) self._server = c_api.TF_NewServer(self._server_def.SerializeToString()) if start: self.start()
TF_NewServer 方法就进入到了C++世界,其调用 tensorflow::NewServer 建立了C++ 世界的Server。
TF_Server* TF_NewServer(const void* proto, size_t proto_len, TF_Status* status) {#if defined(IS_MOBILE_PLATFORM) || defined(IS_SLIM_BUILD) status->status = tensorflow::errors::Unimplemented( "Server functionality is not supported on mobile"); return nullptr;#else tensorflow::ServerDef server_def; if (!server_def.ParseFromArray(proto, static_cast<int>(proto_len))) { status->status = InvalidArgument( "Could not parse provided bytes into a ServerDef protocol buffer"); return nullptr; } std::unique_ptr<tensorflow::ServerInterface> out_server; status->status = tensorflow::NewServer(server_def, &out_server); if (!status->status.ok()) return nullptr; return new TF_Server(std::move(out_server));#endif // defined(IS_MOBILE_PLATFORM) || defined(IS_SLIM_BUILD)}
然后会通过如下代码选择建立何种Server。
// Creates a server based on the given server_def, and stores it in// *out_server. Returns OK on success, otherwise returns an error.Status NewServer(const ServerDef& server_def, std::unique_ptr<ServerInterface>* out_server) { ServerFactory* factory; TF_RETURN_IF_ERROR(ServerFactory::GetFactory(server_def, &factory)); return factory->NewServer(server_def, ServerFactory::Options(), out_server);}
而 GrpcServer 则早就注册到系统之中,GrpcServerFactory 是工厂类,如果 protocol 是"grpc",则生成 GrpcServer。
class GrpcServerFactory : public ServerFactory {public: bool AcceptsOptions(const ServerDef& server_def) override { return server_def.protocol() == "grpc"; } Status NewServer(const ServerDef& server_def, const Options& options, std::unique_ptr<ServerInterface>* out_server) override { return GrpcServer::Create(server_def, Env::Default(), options.local_device_mgr, out_server); }};
因此,我们接下来就看看GrpcServer。
ServerInterface 是基础接口,其代表一个输出Master和Worker服务的 TensorFlow Sever。定义在tensorflow/core/distributed_runtime/server_lib.h 之中。 这个库会基于注册/工厂的机制来创建 TensorFlow 服务器对象。每个服务器的实现都必须有一个配套的 ServerFactory,并创建一个静态的 "registrar"对象,用工厂类的一个实例调用 ServerFactory::Register()。具体如下:
class ServerInterface {public: ServerInterface() {} virtual ~ServerInterface() {} // Starts the server running asynchronously. Returns OK on success, otherwise // returns an error. virtual Status Start() = 0; // Stops the server asynchronously. Returns OK on success, otherwise returns // an error. // // After calling Stop(), the caller may call Join() to block until the // server has stopped. virtual Status Stop() = 0; // Blocks until the server has stopped. Returns OK on success, otherwise // returns an error. virtual Status Join() = 0; // Returns a target string that can be used to connect to this server using // tensorflow::NewSession(). virtual const string target() const = 0; virtual WorkerEnv* worker_env() = 0; virtual MasterEnv* master_env() = 0; // Update the set of workers that can be reached by the server virtual Status UpdateServerDef(const ServerDef& server_def) = 0; // Functions to operate on service-specific properties. // // Add master eager context to local eager service in order to handle enqueue // requests from remote workers. virtual Status AddMasterEagerContextToEagerService( const tensorflow::uint64 context_id, EagerContext* context) = 0; // Set coordination service agent instance to coordination service RPC handler virtual Status SetCoordinationServiceAgentInstance( CoordinationServiceAgent* agent) = 0;private: TF_DISALLOW_COPY_AND_ASSIGN(ServerInterface);};
工厂类定义如下:
class ServerFactory {public: struct Options { // Local DeviceMgr to use. tensorflow::DeviceMgr* local_device_mgr; }; // Creates a new server based on the given server_def, and stores // it in *out_server. Returns OK on success, otherwise returns an // error. virtual Status NewServer(const ServerDef& server_def, const Options& options, std::unique_ptr<ServerInterface>* out_server) = 0; // Returns true if and only if this factory can create a server // based on the given server_def. virtual bool AcceptsOptions(const ServerDef& server_def) = 0; virtual ~ServerFactory() {} // For each ServerFactory subclass, an instance of that class must // be registered by calling this method. // // The server_type must be unique to the server factory. static void Register(const string& server_type, ServerFactory* factory); // Looks up a factory that can create a server based on the given // server_def, and stores it in *out_factory. Returns OK on // success, otherwise returns an error. static Status GetFactory(const ServerDef& server_def, ServerFactory** out_factory);};
GrpcServer 是管理当前进程中的 Master 和 Worker 服务的结构,通过 Start()、Stop()、Join() 构成了下面注释之中的状态机,
// Represents the current state of the server, which changes as follows: // // Join() Join() // ___ ___ // Start() \ / Stop() \ / // NEW ---------> STARTED --------> STOPPED // \ / // \________________________/ // Stop(), Join()
其主要成员变量是:
具体来说,就是启动了若干个线程,分别执行了 GrpcMasterService,GrpcWorkerService,GrpcEagerServiceImpl。
class GrpcServer : public ServerInterface {private: Env* env_; // The port to which this server is bound. int bound_port_ = 0; // The host name of this server string host_name_; // Guards server configuration, server, and state. mutex mu_; enum State { NEW, STARTED, STOPPED }; State state_ TF_GUARDED_BY(mu_); // Implementation of a TensorFlow master, and RPC polling thread. MasterEnv master_env_; std::unique_ptr<Master> master_impl_; AsyncServiceInterface* master_service_ = nullptr; std::unique_ptr<Thread> master_thread_ TF_GUARDED_BY(mu_); std::map<std::string, AsyncServiceInterface*> extra_services_; std::vector<std::unique_ptr<Thread>> extra_service_threads_ TF_GUARDED_BY(mu_); // Implementation of a TensorFlow worker, and RPC polling thread. WorkerEnv worker_env_; std::unique_ptr<const DeviceMgr> owned_device_manager_; std::unique_ptr<GrpcWorker> worker_impl_; AsyncServiceInterface* worker_service_ = nullptr; std::unique_ptr<Thread> worker_thread_ TF_GUARDED_BY(mu_); std::unique_ptr<GrpcWorkerEnv> grpc_worker_env_; // TensorFlow Eager implementation, and RPC polling thread. AsyncServiceInterface* eager_service_ = nullptr; std::unique_ptr<Thread> eager_thread_ TF_GUARDED_BY(mu_); std::shared_ptr<WorkerSession> worker_session_; // TensorFlow profiler service implementation. std::unique_ptr<grpc::ProfilerService::Service> profiler_service_ = nullptr; // The overall server configuration. ServerDef server_def_ TF_GUARDED_BY(mu_); std::unique_ptr<::grpc::Server> server_ TF_GUARDED_BY(mu_);};
初始化逻辑大致如下:
获取各种相关配置,初始化 MasterEnv 和 WorkerEnv;
建立Device Manager;
构建device列表;
创建 RpcRendezvousMgr;
建立server必要设置;
创建 Master 以及对应的 GrpcMasterService,GrpcMasterService 是对外提供服务的实体,消息到达时候会调用这里的消息处理函数。具体业务则由 Master 提供。
创建 GrpcWorker 以及对应的 GrpcWorkerService,GrpcWorkerService是对外提供服务的实体,消息到达时候会调用这里的消息处理函数。具体业务则由 GrpcWorker 提供。
调用 builder.BuildAndStart 启动GRPC 通信服务器 grpc::Server,当启动之后,GrpcServer 依然是 New 状态,没有提供对外服务,需要状态机转换到 Started 状态才会对外提供服务;
建立grpc 需要的environment;
创建 WorkerCache;
创建一个 SessionMgr,并随后会在这个 SessionMgr 中创建 WorkerSession;
设置 MasterSession 的Factory,如果需要时候就会调用创建MasterSession,因为有的任务比如ps是不需要MasterSession的;
注册 LocalMaster;
Status GrpcServer::Init(const GrpcServerOptions& opts) { mutex_lock l(mu_); master_env_.env = env_; worker_env_.env = env_; // Check parameters before DeviceFactory::AddDevices, // otherwise if 'task_index=-1' the program will abort. int requested_port; TF_RETURN_IF_ERROR(GetHostAndPort(server_def_, &host_name_, &requested_port)); SessionOptions sess_opts; ConfigProto config = server_def_.default_session_config(); sess_opts.config = config; // Configure shared devices between master and worker. string name_prefix = strings::StrCat("/job:", server_def_.job_name(), "/replica:0", "/task:", server_def_.task_index()); // 建立Device Manager if (opts.local_device_mgr == nullptr) { std::vector<std::unique_ptr<Device>> devices; TF_RETURN_IF_ERROR( DeviceFactory::AddDevices(sess_opts, name_prefix, &devices)); worker_env_.device_mgr = new DynamicDeviceMgr(std::move(devices)); owned_device_manager_.reset(worker_env_.device_mgr); } else { worker_env_.device_mgr = opts.local_device_mgr; owned_device_manager_.reset(nullptr); } // 构建device列表 worker_env_.local_devices = worker_env_.device_mgr->ListDevices(); master_env_.local_devices = worker_env_.device_mgr->ListDevices(); // 创建了 RpcRendezvousMgr worker_env_.rendezvous_mgr = opts.rendezvous_mgr_func == nullptr ? new RpcRendezvousMgr(&worker_env_) : opts.rendezvous_mgr_func(&worker_env_); string unused; string default_worker_name; if (!DeviceNameUtils::SplitDeviceName(master_env_.local_devices[0]->name(), &default_worker_name, &unused)) { return errors::Internal("Could not parse worker name."); } // 建立server必要设置 ::grpc::ServerBuilder builder; builder.AddListeningPort(strings::StrCat("0.0.0.0:", requested_port), GetServerCredentials(server_def_), &bound_port_); builder.SetMaxMessageSize(std::numeric_limits<int32>::max()); bool reuse_port = false; const Status status = ReadBoolFromEnvVar("TF_GRPC_REUSE_PORT", false, &reuse_port); auto server_build_option = reuse_port ? std::unique_ptr<::grpc::ServerBuilderOption>(new ReusePortOption) : std::unique_ptr<::grpc::ServerBuilderOption>(new NoReusePortOption); builder.SetOption(std::move(server_build_option)); // Allow subclasses to specify more args to pass to the gRPC server. // 创建 Master 以及对应的 GrpcMasterService MaybeMutateBuilder(&builder, requested_port); master_impl_ = CreateMaster(&master_env_); master_service_ = NewGrpcMasterService(master_impl_.get(), config, &builder); // 创建 GrpcWorker 以及对应的 GrpcWorkerService worker_impl_ = opts.worker_func ? opts.worker_func(&worker_env_, config) : NewGrpcWorker(&worker_env_, config); worker_service_ = NewGrpcWorkerService(worker_impl_.get(), &builder, opts.worker_service_options) .release(); eager_service_ = new eager::GrpcEagerServiceImpl(&worker_env_, &builder); profiler_service_ = profiler::CreateProfilerService(); builder.RegisterService(profiler_service_.get()); // Add any extra services to be started. extra_services_ = ExtraServices(&builder); // extra service: if (opts.service_func != nullptr) { opts.service_func(&worker_env_, &builder); } // 启动 GRPC 通信 server server_ = builder.BuildAndStart(); // Create the execution environment for the GRPC workers cache. // 建立grpc 需要的environment grpc_worker_env_.reset(CreateGrpcWorkerEnv()); // 创建 WorkerCache WorkerCacheInterface* worker_cache; WorkerCacheFactoryOptions worker_cache_factory_options(server_def_); TF_RETURN_IF_ERROR( WorkerCacheFactory(worker_cache_factory_options, &worker_cache)); CHECK_NE(nullptr, worker_cache); if (opts.collective_mgr_func) { worker_env_.collective_executor_mgr.reset( opts.collective_mgr_func(config, &worker_env_, worker_cache)); } else { worker_env_.collective_executor_mgr = CreateProdRpcCollectiveExecutorMgr( config, worker_env_.device_mgr, MaybeCreateNcclCommunicator(), worker_cache, default_worker_name); } // Set up worker environment. // 创建一个 SessionMgr,并随后会在这个 SessionMgr 中创建 WorkerSession worker_env_.session_mgr = new SessionMgr( &worker_env_, SessionMgr::WorkerNameFromServerDef(server_def_), std::unique_ptr<WorkerCacheInterface>(worker_cache), [this](const ServerDef& server_def, WorkerCacheInterface** worker_cache) { WorkerCacheFactoryOptions options(server_def); return WorkerCacheFactory(options, worker_cache); }); worker_env_.compute_pool = ComputePool(sess_opts); // Finish setting up master environment. master_env_.ops = OpRegistry::Global(); master_env_.worker_cache = worker_cache; master_env_.collective_executor_mgr = worker_env_.collective_executor_mgr.get(); StatsPublisherFactory stats_factory = opts.stats_factory; // 设置 MasterSession 的Factory,如果需要时候就会调用创建MasterSession,因为有的任务比如ps是不需要MasterSession的 master_env_.master_session_factory = [config, stats_factory]( SessionOptions options, const MasterEnv* env, std::unique_ptr<std::vector<std::unique_ptr<Device>>> remote_devs, std::unique_ptr<WorkerCacheInterface> worker_cache, std::unique_ptr<DeviceSet> device_set, std::vector<string> filtered_worker_list) { options.config.MergeFrom(config); return new MasterSession(options, env, std::move(remote_devs), std::move(worker_cache), std::move(device_set), std::move(filtered_worker_list), stats_factory); }; master_env_.worker_cache_factory = [this](const WorkerCacheFactoryOptions& options, WorkerCacheInterface** worker_cache) { return WorkerCacheFactory(options, worker_cache); }; // Provide direct access to the master from in-process clients. // 注册 LocalMaster LocalMaster::Register(target(), master_impl_.get(), config.operation_timeout_in_ms()); return Status::OK();}
Master
Master 是具体提供业务的对象。上面代码之中,生成master的相关语句如下
master_impl_ = CreateMaster(&master_env_);LocalMaster::Register(target(), master_impl_.get(), config.operation_timeout_in_ms());
由以下代码可知,GrpcServer 生成的是 Master。
std::unique_ptr<Master> GrpcServer::CreateMaster(MasterEnv* master_env) { return std::unique_ptr<Master>(new Master(master_env, 0.0));}
由以下代码可知,Master在此时对应的target是"grpc://"。
const string GrpcServer::target() const { return strings::StrCat("grpc://", host_name_, ":", bound_port_);}
LocalMaster 会把Master注册到自己内部。
// Provide direct access to the master from in-process clients.LocalMaster::Register(target(), master_impl_.get(), config.operation_timeout_in_ms());
Worker
初始化代码之中,如下代码创建了worker,默认就是调用了 NewGrpcWorker 创建 GrpcWorker(具体提供业务的对象)。
worker_impl_ = opts.worker_func ? opts.worker_func(&worker_env_, config) : NewGrpcWorker(&worker_env_, config);
WorkerEnv
WorkerEnv 把各种相关配置归总在一起,供 Worker 使用,可以认为是 Worker 运行上下文,WorkerEnv 与 Server 具有同样生命周期,在 Worker 运行时全程可见,其主要变量如下:
Env* env :跨平台 API 接口
SessionMgr* session_mgr :管理 WorkerSession 集合。
std::vector<Device*> local_devices :本地设备集。
DeviceMgr* device_mgr :管理本地设备集和远端设备集。
RendezvousMgrInterface* rendezvous_mgr :管理 Rendezvous 实例集。
std::unique_ptr
thread::ThreadPool* compute_pool :线程池,每次有算子执行,都从中获取一个线程。
// The worker environment class, which holds a bag of pointers to// per-worker singletons.//// WorkerEnv does not own its member pointers.struct WorkerEnv { Env* env = nullptr; // session_mgr encapsulates state for each session. SessionMgr* session_mgr = nullptr; // The local devices of this worker. Devices are owned by the device_mgr. // // REQUIRES: !local_devices.empty(). std::vector<Device*> local_devices; // device_mgr manages local devices (cpu and gpu). The WorkerService // is the network interface for managed devices. // // Note: Please use the device_mgr associated with your session if appropriate // instead of this one. Using this device_mgr does not support ClusterSpec // propagated sessions. DeviceMgr* device_mgr = nullptr; // A set of rendezvous keyed by step ids. RendezvousMgrInterface* rendezvous_mgr = nullptr; // Generates per-step CollectiveExecutors and has access to utilities // supporting collective operations. std::unique_ptr<CollectiveExecutorMgrInterface> collective_executor_mgr; // A pool of threads for scheduling compute work. thread::ThreadPool* compute_pool = nullptr; // Coordination service. CoordinationServiceInterface* coord_service;};
WorkerEnv 的几个 管理类成员变量都很重要,比如 SessionMgr 类,其为 Worker 管理会话,比如会话的产生和销毁,同时还维护了当前 Worker 的会话句柄到会话的映射。
class SessionMgr { public: Status CreateSession(...); Status DeleteSession(...); private: const WorkerEnv* const worker_env_; const WorkerCacheFactory worker_cache_factory_; std::map<string, std::unique_ptr<WorkerSession>> sessions_ GUARDED_BY(mu_);};
MasterEnv
MasterEnv 把各种相关配置归总在一起,供 master 使用,可以认为是 Master 运行时的上下文,在 Master 的整个生命周期都是可见的。其主要成员变量如下:
// The master environment class, which holds a bag of pointers to// per-master state.//// MasterEnv does not own its member pointers.struct MasterEnv { Env* env = nullptr; // Object from which WorkerInterface instances can be obtained. Not owned. WorkerCacheInterface* worker_cache = nullptr; // The operation definitions to use. Must be filled before use. const OpRegistryInterface* ops = nullptr; // Local devices co-located with this master. Devices are not owned // by the master service. // // REQUIRES: !local_devices.empty(). std::vector<Device*> local_devices; // Factory for creating master sessions, given session options and a // vector of devices. // // The caller of the function takes ownership of the returned // MasterSession, which may not be null. Ownership of the // MasterEnv* is retained by the caller. std::function<MasterSession*( SessionOptions, MasterEnv*, std::unique_ptr<std::vector<std::unique_ptr<Device>>>, std::unique_ptr<WorkerCacheInterface>, std::unique_ptr<DeviceSet> device_set, std::vector<string> filtered_worker_list)> master_session_factory; std::function<Status(const WorkerCacheFactoryOptions&, WorkerCacheInterface**)> worker_cache_factory; // Generates per-step CollectiveExecutors and has access to utilities // supporting collective operations. Not owned. CollectiveExecutorMgrInterface* collective_executor_mgr = nullptr;};
Python 代码之中,最后是 start 方法的调用。
@tf_export("distribute.Server", v1=["distribute.Server", "train.Server"])@deprecation.deprecated_endpoints("train.Server")class Server(object): def __init__(self, server_or_cluster_def, job_name=None, task_index=None, protocol=None, config=None, start=True): self._server_def = _make_server_def(server_or_cluster_def, job_name, task_index, protocol, config) self._server = c_api.TF_NewServer(self._server_def.SerializeToString()) if start: self.start()
在调用之前,Server 是 New 状态,调用 start 之后,GrpcServer 的状态从 New 迁移 Started 状态。Start() 方法之中,会启动三个独立线程,分别是 MasterService,WorkerService,EagerService 的消息处理器。至此,GrpcServer 才对外提供 MasterService 和 WorkerService 这两种服务。
Status GrpcServer::Start() { mutex_lock l(mu_); switch (state_) { case NEW: { master_thread_.reset( env_->StartThread(ThreadOptions(), "TF_master_service", [this] { master_service_->HandleRPCsLoop(); })); worker_thread_.reset( env_->StartThread(ThreadOptions(), "TF_worker_service", [this] { worker_service_->HandleRPCsLoop(); })); eager_thread_.reset( env_->StartThread(ThreadOptions(), "TF_eager_service", [this] { eager_service_->HandleRPCsLoop(); })); for (const auto& kv : extra_services_) { const std::string& service_name = kv.first; AsyncServiceInterface* service = kv.second; std::unique_ptr<Thread> extra_service_thread; extra_service_thread.reset(env_->StartThread( ThreadOptions(), service_name, [service = service] { service->HandleRPCsLoop(); })); extra_service_threads_.push_back(std::move(extra_service_thread)); } state_ = STARTED; return Status::OK(); } case STARTED: return Status::OK(); case STOPPED: return errors::FailedPrecondition("Server has stopped."); default: LOG(FATAL); }}
启动之后,需要让这几个线程做 Join 操作,因此主线程会挂起直至这两个线程终止,这样可以持久地对外提供 MasterService 服务和 WorkerService 服务。
Status GrpcServer::Join() { mutex_lock l(mu_); switch (state_) { case NEW: // Prevent the server from being started subsequently. state_ = STOPPED; return Status::OK(); case STARTED: case STOPPED: master_thread_.reset(); worker_thread_.reset(); eager_thread_.reset(); for (auto& thread : extra_service_threads_) { thread.reset(); } return Status::OK(); default: LOG(FATAL); }}
至此,TF 分布式环境总体介绍完毕。
TensorFlow Internals
TensorFlow架构与设计:概述
TensorFlow内核剖析
TensorFlow架构与设计:OP本质论
[译] TensorFlow 白皮书
2017TensorFlow开发者峰会
https://jcf94.com/2018/02/28/2018-02-28-tfunpacking3/
TensorFlow 拆包(五):Distributed
TensorFlow Architecture
『深度长文』Tensorflow代码解析(五)
什么是in-graph replication和between-graph replication?
[腾讯机智] TensorFlow源码解析(1): 创建会话
05tensorflow分布式会话
第八节,配置分布式TensorFlow
TensorFlow 分布式(Distributed TensorFlow)
tensorflow源码解析之distributed_runtime
Distributed TensorFlow: A Gentle Introduction
一文说清楚Tensorflow分布式训练必备知识
TensorFlow中的Placement启发式算法模块——Placer
TensorFlow的图切割模块——Graph Partitioner
TensorFlow中的通信机制——Rendezvous(一)本地传输
TensorFlow分布式采坑记
TensorFlow技术内幕(九):模型优化之分布式执行
Tensorflow架构流程]