gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+executor/ExecutionTask.m

    %ExecutionTask
% A description of one piece of execution across multiple partitions that
% generates a single partitioned output. This execution can take as input,
% zero or more partitioned arrays generated by previous ExecutionTask
% instances.
%
% Each task includes the following pieces of information:
%  - A list of tasks that act as the input to this task.
%  - A factory to construct one data processor for each execution partition.
%  - How the execution of the task should be partitioned.
%  - How the output of the task will be partitioned.
%  - What kind of communication is needed to send the output to the correct
%  location.
%  - What form of caching is desired for the output.
%
% To construct an Execution task, use one of the following factory methods:
%  createSimpleTask - Create a task with non-communicating output.
%  createAllToOneTask - Create a task where all output is sent to a single partition.
%  createAnyToAnyTask -Create a task where any output could be sent to any partition.
%  createBroadcastTask -Create a task where all output will be sent all partitions.
%

%   Copyright 2015-2016 The MathWorks, Inc.

classdef (Sealed) ExecutionTask < handle
    properties (SetAccess = immutable)
        % A numeric ID for this task.
        %
        % This is not guaranteed to be unique across multiple MATLAB
        % sessions.
        Id
        
        % An ordered list of IDs representing the inputs to this
        % execution task.
        %
        % For the Nth input ID in this list, all output from the
        % corresponding ExecutionTask that is intended for partition P will
        % be passed to the data processor of execution partition P. It will
        % be passed in chunks as the Nth varargin input of the
        % DataProcessor/process method.
        InputIds
        
        % A string representing the type of output with respect to
        % communication. This can be one of:
        % 'Simple' - All output of each DataProcessor will be sent to the
        % same partition index as execution partition index.
        % 'AllToOne' - All output of all DataProcessors will be sent to
        % output partition 1.
        % 'AnyToAny' - Any output of each DataProcessor can go to any
        % output partition. This allows any form of communication pattern.
        % 'Broadcast' - All output of each DataProcessor will be sent to
        % all output partitions.
        OutputCommunicationType
        
        % The level of caching requested by the algorithm that generated
        % this execution task. This can be one of:
        %  'None' - No caching is requested.
        %  'All'  - Cache requested to all forms of available storage.
        CacheLevel;
        
        % A key that represents any cached entries resulting from this
        % execution task. These entries can be reused by holding onto the
        % key and providing it to future instances of ExecutionTask.
        CacheEntryKey;
        
        % A logical scalar that indicates whether this task requires a full
        % pass of the underlying data before the output can be used by
        % downstream tasks.
        %
        % This can be false only if it is safe to evaluate this task in the
        % same pass of the underlying data as all downstream tasks who
        % depend on the output of this task.
        IsPassBoundary;
        
        % The strategy for partitioning the execution of this task.
        %
        % A data processor will be instantiated for each execution
        % partition.
        %
        % Note, this must be compatible with the partitioning of the inputs
        % to this task. This must match the output partition strategy for
        % every non-broadcast input task.
        ExecutionPartitionStrategy
        
        % The strategy for partitioning the output of this task.
        %
        % This specifies how many partitions exist in the partitioned array
        % output of this task. In most cases, this is fixed by the
        % combination of the output communication type and execution
        % partition strategy.
        OutputPartitionStrategy
        
        % A factory for DataProcessor instances. This is a function handle
        % or object that obeys the feval contract, that will be called with
        % the syntax:
        %   function dataProcessor = myFactory(partition)
        %
        % Where partition is an instance of DatastorePartition if the
        % execution partition strategy is based on a datastore or
        % SimplePartition otherwise.
        DataProcessorFactory
    end
    
    properties (Access = private, Constant)
        % The means by which this class receives unique IDs.
        IdFactory = matlab.bigdata.internal.util.UniqueIdFactory('ExecutionTask');
    end
    
    methods (Static)
        %CREATESIMPLETASK Create a task with non-communicating output.
        %
        % All output from each data processor associated with this task
        % will be sent to the same output partition index as execution
        % partition index.
        %
        % Syntax:
        %  obj = ExecutionTask.createSimpleTask(inputTasks,dataProcessorFactory,name1,value1,...);
        %
        % Required Inputs:
        %  - inputTasks: An ordered list of ExecutionTask instances that
        %  represent the inputs to this task. This is allowed to be empty.
        %  - dataProcessorFactory: A data processor factory that will be
        %  used to create the underlying data processors for this task.
        %
        % Parameter Inputs:
        %  - 'ExecutionPartitionStrategy': The strategy for how to
        %  partition execution. If this is an integer, a fixed partitioning
        %  will be used. If this is a datastore, partitioning will be based
        %  on the datastore. If this is not set, the execution environment
        %  has full control over the partitioning.
        %  - 'CacheLevel': The allowed ways to do caching. This is either
        %  'None' for no caching, or 'All' for caching to disk and memory.
        %  - 'IsPassBoundary': A logical scalar that indicates whether this
        %  task requires a full pass of the underlying data before the
        %  output can be used by downstream tasks. This can be false only
        %  if it is safe to evaluate this task in the same pass of the
        %  underlying data as all downstream tasks who depend on the output
        %  of this task.
        %
        % Outputs:
        %  - obj: The constructed ExecutionTask instance.
        %
        function obj = createSimpleTask(inputTasks, dataProcessorFactory, varargin)
            import matlab.bigdata.internal.executor.ExecutionTask;
            import matlab.bigdata.internal.executor.OutputCommunicationType;
            inputStruct = iParseFactoryInputs(inputTasks, dataProcessorFactory, varargin{:});
            
            if isempty(inputStruct.OutputPartitionStrategy)
                inputStruct.OutputPartitionStrategy = inputStruct.ExecutionPartitionStrategy;
            else
                assert (isequal(inputStruct.OutputPartitionStrategy, inputStruct.ExecutionPartitionStrategy));
            end
            obj = ExecutionTask([], OutputCommunicationType.Simple, inputStruct);
        end
        
        %CREATEALLTOONETASK Create a task where all output is sent to a single partition.
        %
        % All output from each data processor associated with this task
        % will be sent to the output partition 1.
        %
        % Syntax:
        %  obj = ExecutionTask.createAllToOneTask(inputTasks,dataProcessorFactory,name1,value1,...);
        %
        % Required Inputs:
        %  - inputTasks: An ordered list of ExecutionTask instances that
        %  represent the inputs to this task. This is allowed to be empty.
        %  - dataProcessorFactory: A data processor factory that will be
        %  used to create the underlying data processors for this task.
        %
        % Parameter Inputs:
        %  - 'ExecutionPartitionStrategy': The strategy for how to
        %  partition execution. If this is an integer, a fixed partitioning
        %  will be used. If this is a datastore, partitioning will be based
        %  on the datastore. If this is not set, the execution environment
        %  has full control over the partitioning.
        %  - 'IsPassBoundary': A logical scalar that indicates whether this
        %  task requires a full pass of the underlying data before the
        %  output can be used by downstream tasks. This can be false only
        %  if it is safe to evaluate this task in the same pass of the
        %  underlying data as all downstream tasks who depend on the output
        %  of this task.
        %
        % Outputs:
        %  - obj: The constructed ExecutionTask instance.
        %
        function obj = createAllToOneTask(inputTasks, dataProcessorFactory, varargin)
            import matlab.bigdata.internal.executor.ExecutionTask;
            import matlab.bigdata.internal.executor.FixedNumPartitionStrategy;
            import matlab.bigdata.internal.executor.OutputCommunicationType;
            inputStruct = iParseFactoryInputs(inputTasks, dataProcessorFactory, varargin{:});
            
            if isempty(inputStruct.OutputPartitionStrategy)
                inputStruct.OutputPartitionStrategy = FixedNumPartitionStrategy(1);
            else
                assert (isequal(inputStruct.OutputPartitionStrategy, FixedNumPartitionStrategy(1)));
            end
            obj = ExecutionTask([], OutputCommunicationType.AllToOne, inputStruct);
        end
        
        %CREATEANYTOANYTASK Create a task where any output could be sent to any partition.
        %
        % Data processor associated with this task are expected to return
        % partition indices alongside the data output. These partition
        % indices will be used to send the corresponding data output to the
        % target partitions.
        %
        % Syntax:
        %  obj = ExecutionTask.createAnyToAnyTask(inputTasks,dataProcessorFactory,name1,value1,...);
        %
        % Required Inputs:
        %  - inputTasks: An ordered list of ExecutionTask instances that
        %  represent the inputs to this task. This is allowed to be empty.
        %  - dataProcessorFactory: A data processor factory that will be
        %  used to create the underlying data processors for this task.
        %
        % Parameter Inputs:
        %  - 'ExecutionPartitionStrategy': The strategy for how to
        %  partition execution. If this is an integer, a fixed partitioning
        %  will be used. If this is a datastore, partitioning will be based
        %  on the datastore. If this is not set, the execution environment
        %  has full control over the partitioning.
        %  - 'OutputPartitionStrategy': The strategy for how the output
        %  will be partitioned. If this is an integer, a fixed partitioning
        %  will be used. If this is a datastore, partitioning will be based
        %  on the datastore. If this is not set, the execution environment
        %  has full control over the partitioning.
        %  - 'IsPassBoundary': A logical scalar that indicates whether this
        %  task requires a full pass of the underlying data before the
        %  output can be used by downstream tasks. This can be false only
        %  if it is safe to evaluate this task in the same pass of the
        %  underlying data as all downstream tasks who depend on the output
        %  of this task.
        %
        % Outputs:
        %  - obj: The constructed ExecutionTask instance.
        %
        function obj = createAnyToAnyTask(inputTasks, dataProcessorFactory, varargin)
            import matlab.bigdata.internal.executor.ArbitraryPartitionStrategy;
            import matlab.bigdata.internal.executor.ExecutionTask;
            import matlab.bigdata.internal.executor.OutputCommunicationType;
            inputStruct = iParseFactoryInputs(inputTasks, dataProcessorFactory, varargin{:});
            
            if isempty(inputStruct.OutputPartitionStrategy)
                inputStruct.OutputPartitionStrategy = ArbitraryPartitionStrategy();
            end
            obj = ExecutionTask([], OutputCommunicationType.AnyToAny, inputStruct);
        end
        
        %CREATEBROADCASTTASK Create a task where all output will be sent all partitions.
        %
        % All output from each data processor associated with this task
        % will be sent to all partitions.
        %
        % Syntax:
        %  obj = ExecutionTask.createBroadcastTask(inputTasks,dataProcessorFactory,name1,value1,...);
        %
        % Required Inputs:
        %  - inputTasks: An ordered list of ExecutionTask instances that
        %  represent the inputs to this task. This is allowed to be empty.
        %  - dataProcessorFactory: A data processor factory that will be
        %  used to create the underlying data processors for this task.
        %
        % Parameter Inputs:
        %  - 'ExecutionPartitionStrategy': The strategy for how to
        %  partition execution. If this is an integer, a fixed partitioning
        %  will be used. If this is a datastore, partitioning will be based
        %  on the datastore. If this is not set, the execution environment
        %  has full control over the partitioning.
        %  - 'IsPassBoundary': A logical scalar that indicates whether this
        %  task requires a full pass of the underlying data before the
        %  output can be used by downstream tasks. This can be false only
        %  if it is safe to evaluate this task in the same pass of the
        %  underlying data as all downstream tasks who depend on the output
        %  of this task.
        %
        % Outputs:
        %  - obj: The constructed ExecutionTask instance.
        %
        function obj = createBroadcastTask(inputTasks, dataProcessorFactory, varargin)
            import matlab.bigdata.internal.executor.BroadcastPartitionStrategy;
            import matlab.bigdata.internal.executor.ExecutionTask;
            import matlab.bigdata.internal.executor.OutputCommunicationType;
            inputStruct = iParseFactoryInputs(inputTasks, dataProcessorFactory, varargin{:});
            
            if isempty(inputStruct.OutputPartitionStrategy)
                inputStruct.OutputPartitionStrategy = BroadcastPartitionStrategy();
            else
                assert (isequal(inputStruct.OutputPartitionStrategy, BroadcastPartitionStrategy()));
            end
            obj = ExecutionTask([], OutputCommunicationType.Broadcast, inputStruct);
        end
    end
    
    methods
        %CREATEDATAPROCESSOR Create a data processor that will perform the
        %underlying execution this task represents for the given partition
        %index.
        %
        % Syntax:
        %  partition = obj.createDataProcessor(partitionIndex) creates a
        %  datastore based on the execution partitioning provided by
        %  default by the ExecutionPartitionStrategy.
        %
        %  partition = obj.createDataProcessor(partitionIndex, numPartitions) creates a
        %  datastore based on numPartitions. If IsNumPartitionsFixed is
        %  true on the ExecutionPartitionStrategy, then numPartitions
        %  must equal DesiredNumPartitions from the ExecutionPartitionStrategy.
        %
        %  partition = obj.createDataProcessor(partitionIndex, numPartitions, hadoopSplit)
        %  creates a datastore based on the provided datastore Hadoop split.
        %  The partitionIndex must match corresponding partition index of
        %  the Hadoop split. This argument is only supported if
        %  IsDatastorePartitioning is true on the ExecutionPartitionStrategy.
        %
        % Inputs:
        %  - obj: The ExecutionTask instance object.
        %  - partitionIndex: The partition index to use for the data
        %  processor. This must be a scalar integer in the range
        %  1:NumPartitions.
        %  - numPartitions: The total number of partitions. This is an
        %  override for the datastore based strategy.
        %  - hadoopSplit: A Hadoop split generated by HDFS. This precisely
        %  defines a partition for HDFS based data.
        %
        % Outputs:
        %  - dataProcessor: The constructed DataProcessor instance.
        %
        function dataProcessor = createDataProcessor(obj, partitionIndex, varargin)
            partition = obj.ExecutionPartitionStrategy.createPartition(partitionIndex, varargin{:});
            dataProcessor = feval(obj.DataProcessorFactory, partition);
        end
        
        %COPYWITHREPLACEDINPUTS Return a copy of an ExecutionTask instance
        % for which the input tasks have been replaced by new ones.
        %
        % Syntax:
        %  newTask = copyWithReplacedInputs(task, newInputTasks, newDataProcessorFactory)
        %  returns a copy of task but with new input IDs and data process
        %  factory set to newInputIds and newDataProcessorFactory
        %  respectively.
        %
        function out = copyWithReplacedInputs(obj, newInputTasks, newDataProcessorFactory)
            import matlab.bigdata.internal.executor.ExecutionTask;
            
            if isempty(newInputTasks)
                newInputTasks = ExecutionTask.empty(0, 1);
            else
                assert (isa(newInputTasks, 'matlab.bigdata.internal.executor.ExecutionTask'));
            end
            
            inputStruct = struct(...
                'InputTasks', newInputTasks, ...
                'CacheLevel', {obj.CacheLevel}, ...
                'IsPassBoundary', {obj.IsPassBoundary}, ...
                'ExecutionPartitionStrategy', {obj.ExecutionPartitionStrategy}, ...
                'OutputPartitionStrategy', {obj.OutputPartitionStrategy}, ...
                'DataProcessorFactory', {newDataProcessorFactory}, ...
                'CacheEntryKey', {obj.CacheEntryKey});
            
            out = ExecutionTask(obj.Id, obj.OutputCommunicationType, inputStruct);
        end
    end
    
    methods (Access = private)
        % Private constructor for factory methods.
        function obj = ExecutionTask(id, outputCommunicationType, inputStruct)
            if isempty(id)
                id = obj.IdFactory.nextId();
            end
            obj.Id = id;
            obj.InputIds = {inputStruct.InputTasks.Id};
            obj.OutputCommunicationType = outputCommunicationType;
            obj.CacheLevel = inputStruct.CacheLevel;
            obj.IsPassBoundary = inputStruct.IsPassBoundary;
            obj.ExecutionPartitionStrategy = inputStruct.ExecutionPartitionStrategy;
            obj.OutputPartitionStrategy = inputStruct.OutputPartitionStrategy;
            obj.DataProcessorFactory = inputStruct.DataProcessorFactory;
            obj.CacheEntryKey = inputStruct.CacheEntryKey;
        end
    end
end

% Helper function that deals with the common input parsing shared by all
% factory methods.
function inputStruct = iParseFactoryInputs(varargin)
import matlab.bigdata.internal.executor.CacheEntryKey;
import matlab.bigdata.internal.executor.ExecutionTask;
p = inputParser;
p.addRequired('InputTasks');
p.addRequired('DataProcessorFactory');
p.addParameter('CacheLevel', 'None');
p.addParameter('CacheEntryKey', []);
p.addParameter('IsPassBoundary', false);
p.addParameter('ExecutionPartitionStrategy', []);
p.addParameter('OutputPartitionStrategy', []);
p.parse(varargin{:});
inputStruct = p.Results;

% InputTasks
if isempty(inputStruct.InputTasks)
    inputStruct.InputTasks = ExecutionTask.empty(1, 0);
end
assert (isa(inputStruct.InputTasks,'matlab.bigdata.internal.executor.ExecutionTask'), ...
    'InputTasks must be a list of ExecutionTask instances.');

% DataProcessor
assert (~isempty(inputStruct.DataProcessorFactory), 'DataProcessorFactory cannot be empty.');

% CacheLevel
assert (any(strcmp(inputStruct.CacheLevel, {'None', 'All'})), 'CacheLevel must be either ''None'' or ''All''.');

% CacheEntryKey
if isempty(inputStruct.CacheEntryKey)
    inputStruct.CacheEntryKey = CacheEntryKey();
else
    assert (isscalar(inputStruct.CacheEntryKey) && isa(inputStruct.CacheEntryKey, 'matlab.bigdata.internal.executor.CacheEntryKey'));
end

% IsPassBoundary
assert (islogical(inputStruct.IsPassBoundary) && isscalar(inputStruct.IsPassBoundary));

% ExecutionPartitionStrategy
if isempty(inputStruct.ExecutionPartitionStrategy)
    inputStruct.ExecutionPartitionStrategy = iDiscoverInputPartitionStrategy(inputStruct.InputTasks);
else
    assert(isa(inputStruct.ExecutionPartitionStrategy, 'matlab.bigdata.internal.executor.PartitionStrategy'));
end

% OutputPartitionStrategy
if ~isempty(inputStruct.OutputPartitionStrategy)
    assert(isa(inputStruct.ExecutionPartitionStrategy, 'matlab.bigdata.internal.executor.PartitionStrategy'));
end

end

% Use the input tasks to determine the input partition strategy.
function strategy = iDiscoverInputPartitionStrategy(inputTasks)
import matlab.bigdata.internal.executor.BroadcastPartitionStrategy;
strategy = [];
for ii = 1:numel(inputTasks)
    if isa(inputTasks(ii).OutputPartitionStrategy, 'matlab.bigdata.internal.executor.BroadcastPartitionStrategy')
        continue;
    end
    
    if isempty(strategy)
        strategy = inputTasks(ii).OutputPartitionStrategy;
    elseif ~isempty(inputTasks(ii).OutputPartitionStrategy)
        assert(isequal(strategy, inputTasks(ii).OutputPartitionStrategy), ...
            'Mixing non-broadcast partition strategies is not supported');
    end
end
if isempty(strategy)
    strategy = BroadcastPartitionStrategy();
end
end