gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+executor/StageTask.m

    %StageTask
% A class that represents a collection of execution tasks that can be run
% simultaneously.
%
% Specifically, a collection of  execution tasks can be run simultaneously
% if they can run from start to finish without any communication between
% partitions, or requiring to buffer the entirety of a potentially large
% intermediate partition.
%
% Typically, the boundary of stages will be the points where data is
% written into either a shuffle-sort storage on disk or to a broadcast map.
%

%   Copyright 2016 The MathWorks, Inc.

classdef StageTask < handle
    properties (SetAccess = immutable)
        % A factory for DataProcessor instances. This is a function handle
        % or object that obeys the feval contract, that will be called with
        % the syntax:
        %   function dataProcessor = myFactory(partition)
        %
        % This will typically be the combination of several
        % DataProcessor factories wrapped in a CompositeDataProcessorFactory.
        DataProcessorFactory
        
        % The strategy for partitioning the execution of this task.
        %
        % A data processor will be instantiated for each execution
        % partition.
        %
        % Note, this must be compatible with the partitioning of the inputs
        % to this task. This must match the output partition strategy for
        % every non-broadcast input task.
        ExecutionPartitionStrategy;
        
        % An array of ExecutionTask instances that correspond to intermediate
        % data to be received by this stage task by shuffle-sort.
        InputShuffles;
        
        % An array of ExecutionTask instances that correspond to intermediate
        % data that will be sent by this stage task by shuffle-sort.
        OutputShuffles;
        
        % An array of ExecutionTask instances that correspond to broadcast
        % data to be received by this stage task.
        InputBroadcasts;
        
        % An array of ExecutionTask instances that correspond to broadcast
        % data that is generated by this stage task.
        OutputBroadcasts;
        
        % An array of CacheEntryKey instances that correspond to the cache
        % entries that this stage task will generate or consume.
        CacheEntryKeys;
    end
    
    methods (Static)
        % Create a Stage Task that represents the accumulation of a single
        % ExecutionTask with a collection of its upstream dependencies.
        %
        % - stageProcessorFactory: The DataProcessor factory that will
        % construct the processors that perform the underlying evaluation.
        % - executionPartitionStrategy: The partition strategy for how this
        % stage task should be scheduled for execution.
        % - originalTask: The original ExecutionTask that represents the
        % output.
        % - dependencies: An array of ExecutionTasks whos output are
        % required for this stage task.
        % - cacheEntryKeys: An array of CacheEntryKey that correspond with
        % the underlying ExecutionTask instances.
        %
        function obj = createFromSingleTask(stageProcessorFactory, executionPartitionStrategy, ...
                originalTask, dependencies, cacheEntryKeys, requiresGather)
            import matlab.bigdata.internal.executor.OutputCommunicationType;
            import matlab.bigdata.internal.executor.StageTask;
            
            if numel(stageProcessorFactory) > 1
                stageProcessorFactory = iCombineFactories(stageProcessorFactory);
            end
            
            isDependencyBroadcast = arrayfun(@(x)x.OutputPartitionStrategy.IsBroadcast, dependencies);
            % This invariant is the case because a dependency constitutes a
            % read from some intermediate storage, currently this can be
            % only either shuffle or broadcast.
            isDependencyShuffle = ~isDependencyBroadcast;
            
            isOutputBroadcast = originalTask.OutputCommunicationType == OutputCommunicationType.Broadcast ...
                || (originalTask.IsPassBoundary && originalTask.OutputPartitionStrategy.IsBroadcast);

            isOutputShuffle = (~isOutputBroadcast && originalTask.IsPassBoundary) ...
                || any(originalTask.OutputCommunicationType == [OutputCommunicationType.AllToOne, OutputCommunicationType.AnyToAny]);
            
            obj = StageTask(stageProcessorFactory, executionPartitionStrategy, ...
                dependencies(isDependencyShuffle), originalTask(isOutputShuffle), ...
                dependencies(isDependencyBroadcast), originalTask(isOutputBroadcast || requiresGather), ...
                cacheEntryKeys);
        end
    end
    
    methods
        % Combine an array of StageTask instances into a single StageTask
        % that represents the collection.
        function obj = combine(objs, newExecutionPartitionStrategy)
            import matlab.bigdata.internal.executor.CacheEntryKey;
            import matlab.bigdata.internal.executor.CompositeDataProcessorBuilder;
            import matlab.bigdata.internal.executor.ExecutionTask;
            import matlab.bigdata.internal.executor.StageTask;
            
            nodes = vertcat(objs.DataProcessorFactory, CompositeDataProcessorBuilder.empty());
            dataProcessorFactory = iCombineFactories(nodes);
            
            inputShuffles = unique(vertcat(objs.InputShuffles, ExecutionTask.empty()));
            outputShuffles = unique(vertcat(objs.OutputShuffles, ExecutionTask.empty()));
            inputBroadcasts = unique(vertcat(objs.InputBroadcasts, ExecutionTask.empty()));
            outputBroadcasts = unique(vertcat(objs.OutputBroadcasts, ExecutionTask.empty()));
            cacheEntryKeys =  unique(vertcat(objs.CacheEntryKeys, CacheEntryKey.empty()));
            
            obj = StageTask(dataProcessorFactory, newExecutionPartitionStrategy, ...
                inputShuffles, outputShuffles, inputBroadcasts, outputBroadcasts, cacheEntryKeys);
        end
    end
    
    methods
        % Private constructor for the create and combine methods.
        function obj = StageTask(dataProcessorFactory, executionPartitionStrategy, ...
                inputShuffles, outputShuffles, inputBroadcasts, outputBroadcasts, cacheEntryKeys)
            obj.DataProcessorFactory = dataProcessorFactory;
            obj.ExecutionPartitionStrategy = executionPartitionStrategy;
            obj.InputShuffles = inputShuffles;
            obj.OutputShuffles = outputShuffles;
            obj.InputBroadcasts = inputBroadcasts;
            obj.OutputBroadcasts = outputBroadcasts;
            obj.CacheEntryKeys = cacheEntryKeys;
        end
    end
end

% Helper function that combines together an array of CompositeDataProcessorBuilder
function dataProcessorFactory = iCombineFactories(nodes)
import matlab.bigdata.internal.executor.CompositeDataProcessorBuilder;
import matlab.bigdata.internal.executor.TerminalProcessor;
dataProcessorFactory = CompositeDataProcessorBuilder(...
    nodes, TerminalProcessor.createFactory(numel(nodes)));
end