gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+executor/convertToIndependentTasks.m
function stageTasks = convertToIndependentTasks(taskGraph, varargin) %CONVERTTOINDEPENDENTTASKS Convert the provided task graph into a %collection of stage tasks that can be evaluated sequentially. % % Syntaxes: % stageTasks = convertToIndependentTasks(taskGraph,... % 'CreateShuffleStorageFunction',@myCreateShuffleStorageFunction,... % 'CreateBroadcastStorageFunction',@myCreateBroadcastStorageFunction); % % [..] = convertToIndependentTasks(..,'GetCacheStoreFunction',@myCreateBroadcastStorageFunction,..) % % [..] = convertToIndependentTasks(..,'NumAnyToAnyOutputPartitions',N,..) % % The output, stageTasks, will be an array of StageTask objects that is % equivalent to taskGraph. Evaluating the stage tasks sequentially in order % is equivalent to evaluating the task graph optimally. Specifically, each % stage task will perform one or more side-effects, writing data either to % a shuffle-sort storage or to a broadcast variable. Once complete, the % output associated with each of taskGraph.OutputTasks is stored as a % broadcast variable. % % The syntax for the createShuffle function is: % [writerFactory, readerFactory] = createShuffle(task) % % Where each factory must be serializable, and generate an instance of % matlab.bigdata.internal.io.Writer and matlab.bigdata.internal.io.Reader % respectively. This will be called by the MATLAB client during scheduling. % % The syntax for the createBroadcast function is: % [setterFactory, getterFactory] = createBroadcast(task) % % Where the getter is an inputless function handle that emits the broadcast % value output of the given task and the setter is a function handle of the % form setFcn(partition, value) that adds the given partition value to % the broadcast value. % % The syntax for the getCacheStore function is: % cacheStore = getCacheStore() % % Where cacheStore is an instance of matlab.bigdata.internal.io.CacheStore. % This will be called by a MATLAB worker during execution. % % If the executor supports multiple output partitions for AnyToAny % communication, it can specify how many by the % 'NumAnyToAnyOutputPartitions' name-value pair. % % The implementation of this is the following graph transformations: % 1. Replace all edges of the graph that require communication with a pair % of read/write to an intermediate storage task (either shuffle-sort storage % or a broadcast). % 2. Inject a cache task into all edges of the graph that represent % intermediate state that is marked for caching. % 3. Wrap each still-connected subgraph into a single task that has no % task inputs or outputs. % 4. Combine tasks that can be scheduled to run simultaneously. % % Copyright 2016 The MathWorks, Inc. import matlab.bigdata.internal.executor.BroadcastPartitionStrategy; import matlab.bigdata.internal.executor.CacheEntryKey; import matlab.bigdata.internal.executor.CompositeDataProcessorBuilder; import matlab.bigdata.internal.executor.ExecutionTask; import matlab.bigdata.internal.executor.StageTask; p = inputParser; p.addParameter('CreateShuffleStorageFunction', []); p.addParameter('CreateBroadcastStorageFunction', []); p.addParameter('GetCacheStoreFunction', []); p.addParameter('NumAnyToAnyOutputPartitions', 1); p.parse(varargin{:}); factories = p.Results; % A map of task ID an instance of CompositeDataProcessorBuilder that % generates the same output but that can be combined forward into % downstream tasks. A task ID can map to empty, in which case it is not % possible to combine. passForwardNodeMap = containers.Map('KeyType', 'char', 'ValueType', 'any'); % A map of task ID to the minimum index into the output execution graph % that the output of the task is available for consumption. minScheduleIndexMap = containers.Map('KeyType', 'char', 'ValueType', 'double'); % A map of task ID to a list of ExecutionTask objects that represent the % shuffle-sort inputs if that task was converted to an independent task. taskDependenciesMap = containers.Map('KeyType', 'char', 'ValueType', 'any'); % A map of task ID to a list of cache entry keys that would be associated % if that task was converted to an independent task. cacheEntryKeysMap = containers.Map('KeyType', 'char', 'ValueType', 'any'); % A cell array of StageTask object arrays that will form the output of this % function. % % Each cell is allowed to be an array of 0 or more StageTask objects that % are scheduled to run simultaneously. At the end of this function, each % non-empty cell is combined into a single output StageTask object. Empty % cells are ignored. % % Stage task 1 is reserved for any broadcasted constants from the input % graph. This is initialized to an empty array as a placeholder. % stageTasks = {StageTask.empty(0, 1)}; % A cell array of partition strategies corresponding to stageTasks. % % Each cell is the partition strategy that corresponds to all StageTask % objects in the corresponding cell of stageTasks. % % Stage task 1 is reserved for any broadcasted constants from the input % graph. This reservation is enforced by having the partition strategy for % stage task 1 initialized to BroadcastPartitionStrategy before parsing the % input graph. independentPartitionStrategies = {BroadcastPartitionStrategy()}; executionTasks = taskGraph.Tasks; outputTasks = taskGraph.OutputTasks; for ii = 1:numel(executionTasks) task = taskGraph.Tasks(ii); isRequiredForGather = ismember(task, outputTasks); % Get the pieces of the tree upstream from this task that will be % combined into it as well as the minimum index into the output % independent tasks that all pieces of upstream tree are available for % use. inputNodes = iGetFromMap(passForwardNodeMap, task.InputIds, CompositeDataProcessorBuilder.empty()); minScheduleIndex = max(iGetFromMap(minScheduleIndexMap, task.InputIds, 1)); dependencies = unique(iGetFromMap(taskDependenciesMap, task.InputIds, ExecutionTask.empty())); cacheEntryKeys = unique(iGetFromMap(cacheEntryKeysMap, task.InputIds, CacheEntryKey.empty())); if ~isequal(task.CacheLevel, 'None') cacheEntryKeys = [cacheEntryKeys; task.CacheEntryKey]; %#ok<AGROW> end [passForwardNodeMap(task.Id), actionableNodes, requiresRead] = ... iWrapTask(task, inputNodes, isRequiredForGather, factories); minDownstreamScheduleIndex = minScheduleIndex; if ~isempty(actionableNodes) % This task does generate an actionable item. As such, % we want to find the first output independent task that is compatible. scheduleIndex = numel(independentPartitionStrategies) + 1; for jj = minScheduleIndex : numel(independentPartitionStrategies) if isequal(task.ExecutionPartitionStrategy, independentPartitionStrategies{jj}) scheduleIndex = jj; break; end end minDownstreamScheduleIndex = scheduleIndex + requiresRead; stageTask = StageTask.createFromSingleTask(actionableNodes, ... task.ExecutionPartitionStrategy, task, dependencies, cacheEntryKeys, isRequiredForGather); if scheduleIndex > numel(independentPartitionStrategies) stageTasks{scheduleIndex} = stageTask; independentPartitionStrategies{scheduleIndex} = task.ExecutionPartitionStrategy; else stageTasks{scheduleIndex} = [stageTasks{scheduleIndex}; stageTask]; end end minScheduleIndexMap(task.Id) = minDownstreamScheduleIndex; if requiresRead % This task requires downstream tasks to read its output from % either shuffle-sort or broadcast. taskDependenciesMap(task.Id) = task; cacheEntryKeysMap(task.Id) = CacheEntryKey.empty(); else % If not, downstream tasks will combine with the evaluation of this % task and so inherit its dependencies and cache entry keys. % This task does not generate any actionable items. We % simply keep a record of the minimum index into the output % independent tasks that it can be used. taskDependenciesMap(task.Id) = dependencies; cacheEntryKeysMap(task.Id) = cacheEntryKeys; end end for ii = 1:numel(stageTasks) % If there are multiple data processors that can run simultaneously, we % combine them into one task. if numel(stageTasks{ii}) > 1 stageTasks{ii} = combine(stageTasks{ii}, independentPartitionStrategies{ii}); end end stageTasks = vertcat(stageTasks{:}, StageTask.empty()); % Wrap the provided task in CompositeDataProcessorBuilder % instances. This will return up-to two things: % - passForwardNode: A composite DataProcessor factory that is required to % be combined into all direct downstream tasks. % - actionableNodes: A composite DataProcessor factory that is required to % be fully evaluated before the pass-forward data processor can be used. % - requiresRead: A logical scalar that specifies whether the % passForwardNode is reading from disk rather than encapsulating the % evaluation itself. function [passForwardNode, actionableNodes, requiresRead] = iWrapTask(task, inputCompositeFactories, isRequiredForGather, factories) import matlab.bigdata.internal.executor.BroadcastProcessor; import matlab.bigdata.internal.executor.CompositeDataProcessorBuilder; import matlab.bigdata.internal.executor.ConstantProcessor; import matlab.bigdata.internal.executor.OutputCommunicationType; import matlab.bigdata.internal.io.CacheProcessor; import matlab.bigdata.internal.io.LocalReadProcessor; import matlab.bigdata.internal.io.LocalWriteProcessor; if task.OutputCommunicationType == OutputCommunicationType.AnyToAny % AnyToAny communication pattern requires to emit a target partition % index for each output chunk. This is to ensure that information % bubbles up through CompositeDataProcessor. requiresInputPartitionIndices = false; requiresOutputPartitionIndices = true; numOutputPartitions = factories.NumAnyToAnyOutputPartitions; taskNode = CompositeDataProcessorBuilder(inputCompositeFactories, task.DataProcessorFactory,... requiresOutputPartitionIndices, requiresInputPartitionIndices, numOutputPartitions); else taskNode = CompositeDataProcessorBuilder(inputCompositeFactories, task.DataProcessorFactory); end actionableNodes = CompositeDataProcessorBuilder.empty(); % Both gather and broadcast will require a write to broadcast. There is a % lack of symmetry specifically for gather as we want to avoid losing % partition information for non-broadcast intermediate data. if isRequiredForGather || task.OutputCommunicationType == OutputCommunicationType.Broadcast [setterFunction, getterFunction] = factories.CreateBroadcastStorageFunction(task); actionableNodes = CompositeDataProcessorBuilder(taskNode, ... BroadcastProcessor.createFactory(setterFunction)); end if task.OutputCommunicationType == OutputCommunicationType.Broadcast ... || task.IsPassBoundary && task.OutputPartitionStrategy.IsBroadcast % For broadcast variables, downstream tasks can simply read from the % broadcast. passForwardNode = CompositeDataProcessorBuilder([], ... ConstantProcessor.createFactoryFromFunction(getterFunction)); requiresRead = true; elseif any(task.OutputCommunicationType == [OutputCommunicationType.AnyToAny, OutputCommunicationType.AllToOne]) ... || task.IsPassBoundary % For tasks that could have arbitrarily large output but require % communication either to other partitions or a future pass, we write % all data to a shuffle algorithm. Typically this means writing to % disk. [writerFactory, readerFactory] = factories.CreateShuffleStorageFunction(task); % In any-to-any communication, the LocalWriteProcessor requires the % output partition keys on top of the data. requireOutputPartitionIndices = false; requireInputPartitionIndices = task.OutputCommunicationType == OutputCommunicationType.AnyToAny; actionableNodes(end + 1) = CompositeDataProcessorBuilder(taskNode, ... @(partition) LocalWriteProcessor(writerFactory(partition)), ... requireOutputPartitionIndices, requireInputPartitionIndices); passForwardNode = CompositeDataProcessorBuilder([], ... @(partition) LocalReadProcessor(readerFactory(partition))); requiresRead = true; else % Otherwise no communication is needed. Here, we simply pass the task % node itself to be combined forward into downstream tasks. passForwardNode = taskNode; if ~isempty(factories.GetCacheStoreFunction) && ~task.ExecutionPartitionStrategy.IsBroadcast && strcmp(task.CacheLevel, 'All') passForwardNode = CompositeDataProcessorBuilder(passForwardNode, ... CacheProcessor.createFactory(task.CacheEntryKey, factories.GetCacheStoreFunction)); end requiresRead = false; end % Helper function that obtains and vertically concatenates a collection of % values from a map and collection of keys. function values = iGetFromMap(map, keys, emptyValue) values = cell(size(keys)); for ii = 1:numel(keys) values{ii} = map(keys{ii}); end values = vertcat(values{:}, emptyValue);