gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+executor/convertToIndependentTasks.m

    function stageTasks = convertToIndependentTasks(taskGraph, varargin)
%CONVERTTOINDEPENDENTTASKS Convert the provided task graph into a
%collection of stage tasks that can be evaluated sequentially.
%
% Syntaxes:
%  stageTasks = convertToIndependentTasks(taskGraph,...
%       'CreateShuffleStorageFunction',@myCreateShuffleStorageFunction,...
%       'CreateBroadcastStorageFunction',@myCreateBroadcastStorageFunction);
%
%  [..] = convertToIndependentTasks(..,'GetCacheStoreFunction',@myCreateBroadcastStorageFunction,..)
%
%  [..] = convertToIndependentTasks(..,'NumAnyToAnyOutputPartitions',N,..)
%
% The output, stageTasks, will be an array of StageTask objects that is
% equivalent to taskGraph. Evaluating the stage tasks sequentially in order
% is equivalent to evaluating the task graph optimally. Specifically, each
% stage task will perform one or more side-effects, writing data either to
% a shuffle-sort storage or to a broadcast variable. Once complete, the
% output associated with each of taskGraph.OutputTasks is stored as a
% broadcast variable.
%
% The syntax for the createShuffle function is:
%   [writerFactory, readerFactory] = createShuffle(task)
%
% Where each factory must be serializable, and generate an instance of
% matlab.bigdata.internal.io.Writer and matlab.bigdata.internal.io.Reader
% respectively. This will be called by the MATLAB client during scheduling.
%
% The syntax for the createBroadcast function is:
%   [setterFactory, getterFactory] = createBroadcast(task)
%
% Where the getter is an inputless function handle that emits the broadcast
% value output of the given task and the setter is a function handle of the
% form setFcn(partition, value) that adds the given partition value to
% the broadcast value.
%
% The syntax for the getCacheStore function is:
%   cacheStore = getCacheStore()
%
% Where cacheStore is an instance of matlab.bigdata.internal.io.CacheStore.
% This will be called by a MATLAB worker during execution.
%
% If the executor supports multiple output partitions for AnyToAny
% communication, it can specify how many by the
% 'NumAnyToAnyOutputPartitions' name-value pair.
%
% The implementation of this is the following graph transformations:
%  1. Replace all edges of the graph that require communication with a pair
%  of read/write to an intermediate storage task (either shuffle-sort storage
%  or a broadcast).
%  2. Inject a cache task into all edges of the graph that represent
%  intermediate state that is marked for caching.
%  3. Wrap each still-connected subgraph into a single task that has no
%  task inputs or outputs.
%  4. Combine tasks that can be scheduled to run simultaneously.
%

%   Copyright 2016 The MathWorks, Inc.

import matlab.bigdata.internal.executor.BroadcastPartitionStrategy;
import matlab.bigdata.internal.executor.CacheEntryKey;
import matlab.bigdata.internal.executor.CompositeDataProcessorBuilder;
import matlab.bigdata.internal.executor.ExecutionTask;
import matlab.bigdata.internal.executor.StageTask;

p = inputParser;
p.addParameter('CreateShuffleStorageFunction', []);
p.addParameter('CreateBroadcastStorageFunction', []);
p.addParameter('GetCacheStoreFunction', []);
p.addParameter('NumAnyToAnyOutputPartitions', 1);
p.parse(varargin{:});
factories = p.Results;

% A map of task ID an instance of CompositeDataProcessorBuilder that
% generates the same output but that can be combined forward into
% downstream tasks. A task ID can map to empty, in which case it is not
% possible to combine.
passForwardNodeMap = containers.Map('KeyType', 'char', 'ValueType', 'any');

% A map of task ID to the minimum index into the output execution graph
% that the output of the task is available for consumption.
minScheduleIndexMap = containers.Map('KeyType', 'char', 'ValueType', 'double');

% A map of task ID to a list of ExecutionTask objects that represent the
% shuffle-sort inputs if that task was converted to an independent task.
taskDependenciesMap = containers.Map('KeyType', 'char', 'ValueType', 'any');

% A map of task ID to a list of cache entry keys that would be associated
% if that task was converted to an independent task.
cacheEntryKeysMap = containers.Map('KeyType', 'char', 'ValueType', 'any');

% A cell array of StageTask object arrays that will form the output of this
% function.
%
% Each cell is allowed to be an array of 0 or more StageTask objects that
% are scheduled to run simultaneously. At the end of this function, each
% non-empty cell is combined into a single output StageTask object. Empty
% cells are ignored.
%
% Stage task 1 is reserved for any broadcasted constants from the input
% graph. This is initialized to an empty array as a placeholder.
%
stageTasks = {StageTask.empty(0, 1)};

% A cell array of partition strategies corresponding to stageTasks.
%
% Each cell is the partition strategy that corresponds to all StageTask
% objects in the corresponding cell of stageTasks.
%
% Stage task 1 is reserved for any broadcasted constants from the input
% graph. This reservation is enforced by having the partition strategy for
% stage task 1 initialized to BroadcastPartitionStrategy before parsing the
% input graph.
independentPartitionStrategies = {BroadcastPartitionStrategy()};

executionTasks = taskGraph.Tasks;
outputTasks = taskGraph.OutputTasks;
for ii = 1:numel(executionTasks)
    task = taskGraph.Tasks(ii);
    isRequiredForGather = ismember(task, outputTasks);
    
    % Get the pieces of the tree upstream from this task that will be
    % combined into it as well as the minimum index into the output
    % independent tasks that all pieces of upstream tree are available for
    % use.
    inputNodes = iGetFromMap(passForwardNodeMap, task.InputIds, CompositeDataProcessorBuilder.empty());
    minScheduleIndex = max(iGetFromMap(minScheduleIndexMap, task.InputIds, 1));
    dependencies = unique(iGetFromMap(taskDependenciesMap, task.InputIds, ExecutionTask.empty()));
    cacheEntryKeys = unique(iGetFromMap(cacheEntryKeysMap, task.InputIds, CacheEntryKey.empty()));
    
    if ~isequal(task.CacheLevel, 'None')
        cacheEntryKeys = [cacheEntryKeys; task.CacheEntryKey]; %#ok<AGROW>
    end
    
    [passForwardNodeMap(task.Id), actionableNodes, requiresRead] = ...
        iWrapTask(task, inputNodes, isRequiredForGather, factories);
    
    minDownstreamScheduleIndex = minScheduleIndex;
    if ~isempty(actionableNodes)
        % This task does generate an actionable item. As such,
        % we want to find the first output independent task that is compatible.
        scheduleIndex = numel(independentPartitionStrategies) + 1;
        for jj = minScheduleIndex : numel(independentPartitionStrategies)
            if isequal(task.ExecutionPartitionStrategy, independentPartitionStrategies{jj})
                scheduleIndex = jj;
                break;
            end
        end
        minDownstreamScheduleIndex = scheduleIndex + requiresRead;
        
        stageTask = StageTask.createFromSingleTask(actionableNodes, ...
            task.ExecutionPartitionStrategy, task, dependencies, cacheEntryKeys, isRequiredForGather);
        
        if scheduleIndex > numel(independentPartitionStrategies)
            stageTasks{scheduleIndex} = stageTask;
            independentPartitionStrategies{scheduleIndex} = task.ExecutionPartitionStrategy;
        else
            stageTasks{scheduleIndex} = [stageTasks{scheduleIndex}; stageTask];
        end
    end
    
    minScheduleIndexMap(task.Id) = minDownstreamScheduleIndex;
    if requiresRead
        % This task requires downstream tasks to read its output from
        % either shuffle-sort or broadcast.
        taskDependenciesMap(task.Id) = task;
        cacheEntryKeysMap(task.Id) = CacheEntryKey.empty();
    else
        % If not, downstream tasks will combine with the evaluation of this
        % task and so inherit its dependencies and cache entry keys.
        % This task does not generate any actionable items. We
        % simply keep a record of the minimum index into the output
        % independent tasks that it can be used.
        taskDependenciesMap(task.Id) = dependencies;
        cacheEntryKeysMap(task.Id) = cacheEntryKeys;
    end
end

for ii = 1:numel(stageTasks)
    % If there are multiple data processors that can run simultaneously, we
    % combine them into one task.
    if numel(stageTasks{ii}) > 1
        stageTasks{ii} = combine(stageTasks{ii}, independentPartitionStrategies{ii});
    end
end
stageTasks = vertcat(stageTasks{:}, StageTask.empty());


% Wrap the provided task in  CompositeDataProcessorBuilder
% instances. This will return up-to two things:
%  - passForwardNode: A composite DataProcessor factory that is required to
%  be combined into all direct downstream tasks.
%  - actionableNodes: A composite DataProcessor factory that is required to
%  be fully evaluated before the pass-forward data processor can be used.
%  - requiresRead: A logical scalar that specifies whether the
%  passForwardNode is reading from disk rather than encapsulating the
%  evaluation itself.
function [passForwardNode, actionableNodes, requiresRead] = iWrapTask(task, inputCompositeFactories, isRequiredForGather, factories)
import matlab.bigdata.internal.executor.BroadcastProcessor;
import matlab.bigdata.internal.executor.CompositeDataProcessorBuilder;
import matlab.bigdata.internal.executor.ConstantProcessor;
import matlab.bigdata.internal.executor.OutputCommunicationType;
import matlab.bigdata.internal.io.CacheProcessor;
import matlab.bigdata.internal.io.LocalReadProcessor;
import matlab.bigdata.internal.io.LocalWriteProcessor;

if task.OutputCommunicationType == OutputCommunicationType.AnyToAny
    % AnyToAny communication pattern requires to emit a target partition
    % index for each output chunk. This is to ensure that information
    % bubbles up through CompositeDataProcessor.
    requiresInputPartitionIndices = false;
    requiresOutputPartitionIndices = true;
    numOutputPartitions = factories.NumAnyToAnyOutputPartitions;
    taskNode = CompositeDataProcessorBuilder(inputCompositeFactories, task.DataProcessorFactory,...
        requiresOutputPartitionIndices, requiresInputPartitionIndices, numOutputPartitions);
else
    taskNode = CompositeDataProcessorBuilder(inputCompositeFactories, task.DataProcessorFactory);
end

actionableNodes = CompositeDataProcessorBuilder.empty();

% Both gather and broadcast will require a write to broadcast. There is a
% lack of symmetry specifically for gather as we want to avoid losing
% partition information for non-broadcast intermediate data.
if isRequiredForGather || task.OutputCommunicationType == OutputCommunicationType.Broadcast
    [setterFunction, getterFunction] = factories.CreateBroadcastStorageFunction(task);
    
    actionableNodes = CompositeDataProcessorBuilder(taskNode, ...
        BroadcastProcessor.createFactory(setterFunction));
end

if task.OutputCommunicationType == OutputCommunicationType.Broadcast ...
        || task.IsPassBoundary && task.OutputPartitionStrategy.IsBroadcast
    
    % For broadcast variables, downstream tasks can simply read from the
    % broadcast.
    passForwardNode = CompositeDataProcessorBuilder([], ...
        ConstantProcessor.createFactoryFromFunction(getterFunction));
    requiresRead = true;
elseif any(task.OutputCommunicationType == [OutputCommunicationType.AnyToAny, OutputCommunicationType.AllToOne]) ...
        || task.IsPassBoundary
    % For tasks that could have arbitrarily large output but require
    % communication either to other partitions or a future pass, we write
    % all data to a shuffle algorithm. Typically this means writing to
    % disk.
    
    [writerFactory, readerFactory] = factories.CreateShuffleStorageFunction(task);
    
    % In any-to-any communication, the LocalWriteProcessor requires the
    % output partition keys on top of the data.
    requireOutputPartitionIndices = false;
    requireInputPartitionIndices = task.OutputCommunicationType == OutputCommunicationType.AnyToAny;
    actionableNodes(end + 1) = CompositeDataProcessorBuilder(taskNode, ...
        @(partition) LocalWriteProcessor(writerFactory(partition)), ...
        requireOutputPartitionIndices, requireInputPartitionIndices);
    
    passForwardNode = CompositeDataProcessorBuilder([], ...
        @(partition) LocalReadProcessor(readerFactory(partition)));
    requiresRead = true;
else
    % Otherwise no communication is needed. Here, we simply pass the task
    % node itself to be combined forward into downstream tasks.
    passForwardNode = taskNode;
    if ~isempty(factories.GetCacheStoreFunction) && ~task.ExecutionPartitionStrategy.IsBroadcast && strcmp(task.CacheLevel, 'All')
        passForwardNode = CompositeDataProcessorBuilder(passForwardNode, ...
            CacheProcessor.createFactory(task.CacheEntryKey, factories.GetCacheStoreFunction));
    end
    requiresRead = false;
end

% Helper function that obtains and vertically concatenates a collection of
% values from a map and collection of keys.
function values = iGetFromMap(map, keys, emptyValue)
values = cell(size(keys));
for ii = 1:numel(keys)
    values{ii} = map(keys{ii});
end
values = vertcat(values{:}, emptyValue);