gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+executor/ExecutionTask.m
%ExecutionTask % A description of one piece of execution across multiple partitions that % generates a single partitioned output. This execution can take as input, % zero or more partitioned arrays generated by previous ExecutionTask % instances. % % Each task includes the following pieces of information: % - A list of tasks that act as the input to this task. % - A factory to construct one data processor for each execution partition. % - How the execution of the task should be partitioned. % - How the output of the task will be partitioned. % - What kind of communication is needed to send the output to the correct % location. % - What form of caching is desired for the output. % % To construct an Execution task, use one of the following factory methods: % createSimpleTask - Create a task with non-communicating output. % createAllToOneTask - Create a task where all output is sent to a single partition. % createAnyToAnyTask -Create a task where any output could be sent to any partition. % createBroadcastTask -Create a task where all output will be sent all partitions. % % Copyright 2015-2016 The MathWorks, Inc. classdef (Sealed) ExecutionTask < handle properties (SetAccess = immutable) % A numeric ID for this task. % % This is not guaranteed to be unique across multiple MATLAB % sessions. Id % An ordered list of IDs representing the inputs to this % execution task. % % For the Nth input ID in this list, all output from the % corresponding ExecutionTask that is intended for partition P will % be passed to the data processor of execution partition P. It will % be passed in chunks as the Nth varargin input of the % DataProcessor/process method. InputIds % A string representing the type of output with respect to % communication. This can be one of: % 'Simple' - All output of each DataProcessor will be sent to the % same partition index as execution partition index. % 'AllToOne' - All output of all DataProcessors will be sent to % output partition 1. % 'AnyToAny' - Any output of each DataProcessor can go to any % output partition. This allows any form of communication pattern. % 'Broadcast' - All output of each DataProcessor will be sent to % all output partitions. OutputCommunicationType % The level of caching requested by the algorithm that generated % this execution task. This can be one of: % 'None' - No caching is requested. % 'All' - Cache requested to all forms of available storage. CacheLevel; % A key that represents any cached entries resulting from this % execution task. These entries can be reused by holding onto the % key and providing it to future instances of ExecutionTask. CacheEntryKey; % A logical scalar that indicates whether this task requires a full % pass of the underlying data before the output can be used by % downstream tasks. % % This can be false only if it is safe to evaluate this task in the % same pass of the underlying data as all downstream tasks who % depend on the output of this task. IsPassBoundary; % The strategy for partitioning the execution of this task. % % A data processor will be instantiated for each execution % partition. % % Note, this must be compatible with the partitioning of the inputs % to this task. This must match the output partition strategy for % every non-broadcast input task. ExecutionPartitionStrategy % The strategy for partitioning the output of this task. % % This specifies how many partitions exist in the partitioned array % output of this task. In most cases, this is fixed by the % combination of the output communication type and execution % partition strategy. OutputPartitionStrategy % A factory for DataProcessor instances. This is a function handle % or object that obeys the feval contract, that will be called with % the syntax: % function dataProcessor = myFactory(partition) % % Where partition is an instance of DatastorePartition if the % execution partition strategy is based on a datastore or % SimplePartition otherwise. DataProcessorFactory end properties (Access = private, Constant) % The means by which this class receives unique IDs. IdFactory = matlab.bigdata.internal.util.UniqueIdFactory('ExecutionTask'); end methods (Static) %CREATESIMPLETASK Create a task with non-communicating output. % % All output from each data processor associated with this task % will be sent to the same output partition index as execution % partition index. % % Syntax: % obj = ExecutionTask.createSimpleTask(inputTasks,dataProcessorFactory,name1,value1,...); % % Required Inputs: % - inputTasks: An ordered list of ExecutionTask instances that % represent the inputs to this task. This is allowed to be empty. % - dataProcessorFactory: A data processor factory that will be % used to create the underlying data processors for this task. % % Parameter Inputs: % - 'ExecutionPartitionStrategy': The strategy for how to % partition execution. If this is an integer, a fixed partitioning % will be used. If this is a datastore, partitioning will be based % on the datastore. If this is not set, the execution environment % has full control over the partitioning. % - 'CacheLevel': The allowed ways to do caching. This is either % 'None' for no caching, or 'All' for caching to disk and memory. % - 'IsPassBoundary': A logical scalar that indicates whether this % task requires a full pass of the underlying data before the % output can be used by downstream tasks. This can be false only % if it is safe to evaluate this task in the same pass of the % underlying data as all downstream tasks who depend on the output % of this task. % % Outputs: % - obj: The constructed ExecutionTask instance. % function obj = createSimpleTask(inputTasks, dataProcessorFactory, varargin) import matlab.bigdata.internal.executor.ExecutionTask; import matlab.bigdata.internal.executor.OutputCommunicationType; inputStruct = iParseFactoryInputs(inputTasks, dataProcessorFactory, varargin{:}); if isempty(inputStruct.OutputPartitionStrategy) inputStruct.OutputPartitionStrategy = inputStruct.ExecutionPartitionStrategy; else assert (isequal(inputStruct.OutputPartitionStrategy, inputStruct.ExecutionPartitionStrategy)); end obj = ExecutionTask([], OutputCommunicationType.Simple, inputStruct); end %CREATEALLTOONETASK Create a task where all output is sent to a single partition. % % All output from each data processor associated with this task % will be sent to the output partition 1. % % Syntax: % obj = ExecutionTask.createAllToOneTask(inputTasks,dataProcessorFactory,name1,value1,...); % % Required Inputs: % - inputTasks: An ordered list of ExecutionTask instances that % represent the inputs to this task. This is allowed to be empty. % - dataProcessorFactory: A data processor factory that will be % used to create the underlying data processors for this task. % % Parameter Inputs: % - 'ExecutionPartitionStrategy': The strategy for how to % partition execution. If this is an integer, a fixed partitioning % will be used. If this is a datastore, partitioning will be based % on the datastore. If this is not set, the execution environment % has full control over the partitioning. % - 'IsPassBoundary': A logical scalar that indicates whether this % task requires a full pass of the underlying data before the % output can be used by downstream tasks. This can be false only % if it is safe to evaluate this task in the same pass of the % underlying data as all downstream tasks who depend on the output % of this task. % % Outputs: % - obj: The constructed ExecutionTask instance. % function obj = createAllToOneTask(inputTasks, dataProcessorFactory, varargin) import matlab.bigdata.internal.executor.ExecutionTask; import matlab.bigdata.internal.executor.FixedNumPartitionStrategy; import matlab.bigdata.internal.executor.OutputCommunicationType; inputStruct = iParseFactoryInputs(inputTasks, dataProcessorFactory, varargin{:}); if isempty(inputStruct.OutputPartitionStrategy) inputStruct.OutputPartitionStrategy = FixedNumPartitionStrategy(1); else assert (isequal(inputStruct.OutputPartitionStrategy, FixedNumPartitionStrategy(1))); end obj = ExecutionTask([], OutputCommunicationType.AllToOne, inputStruct); end %CREATEANYTOANYTASK Create a task where any output could be sent to any partition. % % Data processor associated with this task are expected to return % partition indices alongside the data output. These partition % indices will be used to send the corresponding data output to the % target partitions. % % Syntax: % obj = ExecutionTask.createAnyToAnyTask(inputTasks,dataProcessorFactory,name1,value1,...); % % Required Inputs: % - inputTasks: An ordered list of ExecutionTask instances that % represent the inputs to this task. This is allowed to be empty. % - dataProcessorFactory: A data processor factory that will be % used to create the underlying data processors for this task. % % Parameter Inputs: % - 'ExecutionPartitionStrategy': The strategy for how to % partition execution. If this is an integer, a fixed partitioning % will be used. If this is a datastore, partitioning will be based % on the datastore. If this is not set, the execution environment % has full control over the partitioning. % - 'OutputPartitionStrategy': The strategy for how the output % will be partitioned. If this is an integer, a fixed partitioning % will be used. If this is a datastore, partitioning will be based % on the datastore. If this is not set, the execution environment % has full control over the partitioning. % - 'IsPassBoundary': A logical scalar that indicates whether this % task requires a full pass of the underlying data before the % output can be used by downstream tasks. This can be false only % if it is safe to evaluate this task in the same pass of the % underlying data as all downstream tasks who depend on the output % of this task. % % Outputs: % - obj: The constructed ExecutionTask instance. % function obj = createAnyToAnyTask(inputTasks, dataProcessorFactory, varargin) import matlab.bigdata.internal.executor.ArbitraryPartitionStrategy; import matlab.bigdata.internal.executor.ExecutionTask; import matlab.bigdata.internal.executor.OutputCommunicationType; inputStruct = iParseFactoryInputs(inputTasks, dataProcessorFactory, varargin{:}); if isempty(inputStruct.OutputPartitionStrategy) inputStruct.OutputPartitionStrategy = ArbitraryPartitionStrategy(); end obj = ExecutionTask([], OutputCommunicationType.AnyToAny, inputStruct); end %CREATEBROADCASTTASK Create a task where all output will be sent all partitions. % % All output from each data processor associated with this task % will be sent to all partitions. % % Syntax: % obj = ExecutionTask.createBroadcastTask(inputTasks,dataProcessorFactory,name1,value1,...); % % Required Inputs: % - inputTasks: An ordered list of ExecutionTask instances that % represent the inputs to this task. This is allowed to be empty. % - dataProcessorFactory: A data processor factory that will be % used to create the underlying data processors for this task. % % Parameter Inputs: % - 'ExecutionPartitionStrategy': The strategy for how to % partition execution. If this is an integer, a fixed partitioning % will be used. If this is a datastore, partitioning will be based % on the datastore. If this is not set, the execution environment % has full control over the partitioning. % - 'IsPassBoundary': A logical scalar that indicates whether this % task requires a full pass of the underlying data before the % output can be used by downstream tasks. This can be false only % if it is safe to evaluate this task in the same pass of the % underlying data as all downstream tasks who depend on the output % of this task. % % Outputs: % - obj: The constructed ExecutionTask instance. % function obj = createBroadcastTask(inputTasks, dataProcessorFactory, varargin) import matlab.bigdata.internal.executor.BroadcastPartitionStrategy; import matlab.bigdata.internal.executor.ExecutionTask; import matlab.bigdata.internal.executor.OutputCommunicationType; inputStruct = iParseFactoryInputs(inputTasks, dataProcessorFactory, varargin{:}); if isempty(inputStruct.OutputPartitionStrategy) inputStruct.OutputPartitionStrategy = BroadcastPartitionStrategy(); else assert (isequal(inputStruct.OutputPartitionStrategy, BroadcastPartitionStrategy())); end obj = ExecutionTask([], OutputCommunicationType.Broadcast, inputStruct); end end methods %CREATEDATAPROCESSOR Create a data processor that will perform the %underlying execution this task represents for the given partition %index. % % Syntax: % partition = obj.createDataProcessor(partitionIndex) creates a % datastore based on the execution partitioning provided by % default by the ExecutionPartitionStrategy. % % partition = obj.createDataProcessor(partitionIndex, numPartitions) creates a % datastore based on numPartitions. If IsNumPartitionsFixed is % true on the ExecutionPartitionStrategy, then numPartitions % must equal DesiredNumPartitions from the ExecutionPartitionStrategy. % % partition = obj.createDataProcessor(partitionIndex, numPartitions, hadoopSplit) % creates a datastore based on the provided datastore Hadoop split. % The partitionIndex must match corresponding partition index of % the Hadoop split. This argument is only supported if % IsDatastorePartitioning is true on the ExecutionPartitionStrategy. % % Inputs: % - obj: The ExecutionTask instance object. % - partitionIndex: The partition index to use for the data % processor. This must be a scalar integer in the range % 1:NumPartitions. % - numPartitions: The total number of partitions. This is an % override for the datastore based strategy. % - hadoopSplit: A Hadoop split generated by HDFS. This precisely % defines a partition for HDFS based data. % % Outputs: % - dataProcessor: The constructed DataProcessor instance. % function dataProcessor = createDataProcessor(obj, partitionIndex, varargin) partition = obj.ExecutionPartitionStrategy.createPartition(partitionIndex, varargin{:}); dataProcessor = feval(obj.DataProcessorFactory, partition); end %COPYWITHREPLACEDINPUTS Return a copy of an ExecutionTask instance % for which the input tasks have been replaced by new ones. % % Syntax: % newTask = copyWithReplacedInputs(task, newInputTasks, newDataProcessorFactory) % returns a copy of task but with new input IDs and data process % factory set to newInputIds and newDataProcessorFactory % respectively. % function out = copyWithReplacedInputs(obj, newInputTasks, newDataProcessorFactory) import matlab.bigdata.internal.executor.ExecutionTask; if isempty(newInputTasks) newInputTasks = ExecutionTask.empty(0, 1); else assert (isa(newInputTasks, 'matlab.bigdata.internal.executor.ExecutionTask')); end inputStruct = struct(... 'InputTasks', newInputTasks, ... 'CacheLevel', {obj.CacheLevel}, ... 'IsPassBoundary', {obj.IsPassBoundary}, ... 'ExecutionPartitionStrategy', {obj.ExecutionPartitionStrategy}, ... 'OutputPartitionStrategy', {obj.OutputPartitionStrategy}, ... 'DataProcessorFactory', {newDataProcessorFactory}, ... 'CacheEntryKey', {obj.CacheEntryKey}); out = ExecutionTask(obj.Id, obj.OutputCommunicationType, inputStruct); end end methods (Access = private) % Private constructor for factory methods. function obj = ExecutionTask(id, outputCommunicationType, inputStruct) if isempty(id) id = obj.IdFactory.nextId(); end obj.Id = id; obj.InputIds = {inputStruct.InputTasks.Id}; obj.OutputCommunicationType = outputCommunicationType; obj.CacheLevel = inputStruct.CacheLevel; obj.IsPassBoundary = inputStruct.IsPassBoundary; obj.ExecutionPartitionStrategy = inputStruct.ExecutionPartitionStrategy; obj.OutputPartitionStrategy = inputStruct.OutputPartitionStrategy; obj.DataProcessorFactory = inputStruct.DataProcessorFactory; obj.CacheEntryKey = inputStruct.CacheEntryKey; end end end % Helper function that deals with the common input parsing shared by all % factory methods. function inputStruct = iParseFactoryInputs(varargin) import matlab.bigdata.internal.executor.CacheEntryKey; import matlab.bigdata.internal.executor.ExecutionTask; p = inputParser; p.addRequired('InputTasks'); p.addRequired('DataProcessorFactory'); p.addParameter('CacheLevel', 'None'); p.addParameter('CacheEntryKey', []); p.addParameter('IsPassBoundary', false); p.addParameter('ExecutionPartitionStrategy', []); p.addParameter('OutputPartitionStrategy', []); p.parse(varargin{:}); inputStruct = p.Results; % InputTasks if isempty(inputStruct.InputTasks) inputStruct.InputTasks = ExecutionTask.empty(1, 0); end assert (isa(inputStruct.InputTasks,'matlab.bigdata.internal.executor.ExecutionTask'), ... 'InputTasks must be a list of ExecutionTask instances.'); % DataProcessor assert (~isempty(inputStruct.DataProcessorFactory), 'DataProcessorFactory cannot be empty.'); % CacheLevel assert (any(strcmp(inputStruct.CacheLevel, {'None', 'All'})), 'CacheLevel must be either ''None'' or ''All''.'); % CacheEntryKey if isempty(inputStruct.CacheEntryKey) inputStruct.CacheEntryKey = CacheEntryKey(); else assert (isscalar(inputStruct.CacheEntryKey) && isa(inputStruct.CacheEntryKey, 'matlab.bigdata.internal.executor.CacheEntryKey')); end % IsPassBoundary assert (islogical(inputStruct.IsPassBoundary) && isscalar(inputStruct.IsPassBoundary)); % ExecutionPartitionStrategy if isempty(inputStruct.ExecutionPartitionStrategy) inputStruct.ExecutionPartitionStrategy = iDiscoverInputPartitionStrategy(inputStruct.InputTasks); else assert(isa(inputStruct.ExecutionPartitionStrategy, 'matlab.bigdata.internal.executor.PartitionStrategy')); end % OutputPartitionStrategy if ~isempty(inputStruct.OutputPartitionStrategy) assert(isa(inputStruct.ExecutionPartitionStrategy, 'matlab.bigdata.internal.executor.PartitionStrategy')); end end % Use the input tasks to determine the input partition strategy. function strategy = iDiscoverInputPartitionStrategy(inputTasks) import matlab.bigdata.internal.executor.BroadcastPartitionStrategy; strategy = []; for ii = 1:numel(inputTasks) if isa(inputTasks(ii).OutputPartitionStrategy, 'matlab.bigdata.internal.executor.BroadcastPartitionStrategy') continue; end if isempty(strategy) strategy = inputTasks(ii).OutputPartitionStrategy; elseif ~isempty(inputTasks(ii).OutputPartitionStrategy) assert(isequal(strategy, inputTasks(ii).OutputPartitionStrategy), ... 'Mixing non-broadcast partition strategies is not supported'); end end if isempty(strategy) strategy = BroadcastPartitionStrategy(); end end