gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+executor/StageTask.m
%StageTask % A class that represents a collection of execution tasks that can be run % simultaneously. % % Specifically, a collection of execution tasks can be run simultaneously % if they can run from start to finish without any communication between % partitions, or requiring to buffer the entirety of a potentially large % intermediate partition. % % Typically, the boundary of stages will be the points where data is % written into either a shuffle-sort storage on disk or to a broadcast map. % % Copyright 2016 The MathWorks, Inc. classdef StageTask < handle properties (SetAccess = immutable) % A factory for DataProcessor instances. This is a function handle % or object that obeys the feval contract, that will be called with % the syntax: % function dataProcessor = myFactory(partition) % % This will typically be the combination of several % DataProcessor factories wrapped in a CompositeDataProcessorFactory. DataProcessorFactory % The strategy for partitioning the execution of this task. % % A data processor will be instantiated for each execution % partition. % % Note, this must be compatible with the partitioning of the inputs % to this task. This must match the output partition strategy for % every non-broadcast input task. ExecutionPartitionStrategy; % An array of ExecutionTask instances that correspond to intermediate % data to be received by this stage task by shuffle-sort. InputShuffles; % An array of ExecutionTask instances that correspond to intermediate % data that will be sent by this stage task by shuffle-sort. OutputShuffles; % An array of ExecutionTask instances that correspond to broadcast % data to be received by this stage task. InputBroadcasts; % An array of ExecutionTask instances that correspond to broadcast % data that is generated by this stage task. OutputBroadcasts; % An array of CacheEntryKey instances that correspond to the cache % entries that this stage task will generate or consume. CacheEntryKeys; end methods (Static) % Create a Stage Task that represents the accumulation of a single % ExecutionTask with a collection of its upstream dependencies. % % - stageProcessorFactory: The DataProcessor factory that will % construct the processors that perform the underlying evaluation. % - executionPartitionStrategy: The partition strategy for how this % stage task should be scheduled for execution. % - originalTask: The original ExecutionTask that represents the % output. % - dependencies: An array of ExecutionTasks whos output are % required for this stage task. % - cacheEntryKeys: An array of CacheEntryKey that correspond with % the underlying ExecutionTask instances. % function obj = createFromSingleTask(stageProcessorFactory, executionPartitionStrategy, ... originalTask, dependencies, cacheEntryKeys, requiresGather) import matlab.bigdata.internal.executor.OutputCommunicationType; import matlab.bigdata.internal.executor.StageTask; if numel(stageProcessorFactory) > 1 stageProcessorFactory = iCombineFactories(stageProcessorFactory); end isDependencyBroadcast = arrayfun(@(x)x.OutputPartitionStrategy.IsBroadcast, dependencies); % This invariant is the case because a dependency constitutes a % read from some intermediate storage, currently this can be % only either shuffle or broadcast. isDependencyShuffle = ~isDependencyBroadcast; isOutputBroadcast = originalTask.OutputCommunicationType == OutputCommunicationType.Broadcast ... || (originalTask.IsPassBoundary && originalTask.OutputPartitionStrategy.IsBroadcast); isOutputShuffle = (~isOutputBroadcast && originalTask.IsPassBoundary) ... || any(originalTask.OutputCommunicationType == [OutputCommunicationType.AllToOne, OutputCommunicationType.AnyToAny]); obj = StageTask(stageProcessorFactory, executionPartitionStrategy, ... dependencies(isDependencyShuffle), originalTask(isOutputShuffle), ... dependencies(isDependencyBroadcast), originalTask(isOutputBroadcast || requiresGather), ... cacheEntryKeys); end end methods % Combine an array of StageTask instances into a single StageTask % that represents the collection. function obj = combine(objs, newExecutionPartitionStrategy) import matlab.bigdata.internal.executor.CacheEntryKey; import matlab.bigdata.internal.executor.CompositeDataProcessorBuilder; import matlab.bigdata.internal.executor.ExecutionTask; import matlab.bigdata.internal.executor.StageTask; nodes = vertcat(objs.DataProcessorFactory, CompositeDataProcessorBuilder.empty()); dataProcessorFactory = iCombineFactories(nodes); inputShuffles = unique(vertcat(objs.InputShuffles, ExecutionTask.empty())); outputShuffles = unique(vertcat(objs.OutputShuffles, ExecutionTask.empty())); inputBroadcasts = unique(vertcat(objs.InputBroadcasts, ExecutionTask.empty())); outputBroadcasts = unique(vertcat(objs.OutputBroadcasts, ExecutionTask.empty())); cacheEntryKeys = unique(vertcat(objs.CacheEntryKeys, CacheEntryKey.empty())); obj = StageTask(dataProcessorFactory, newExecutionPartitionStrategy, ... inputShuffles, outputShuffles, inputBroadcasts, outputBroadcasts, cacheEntryKeys); end end methods % Private constructor for the create and combine methods. function obj = StageTask(dataProcessorFactory, executionPartitionStrategy, ... inputShuffles, outputShuffles, inputBroadcasts, outputBroadcasts, cacheEntryKeys) obj.DataProcessorFactory = dataProcessorFactory; obj.ExecutionPartitionStrategy = executionPartitionStrategy; obj.InputShuffles = inputShuffles; obj.OutputShuffles = outputShuffles; obj.InputBroadcasts = inputBroadcasts; obj.OutputBroadcasts = outputBroadcasts; obj.CacheEntryKeys = cacheEntryKeys; end end end % Helper function that combines together an array of CompositeDataProcessorBuilder function dataProcessorFactory = iCombineFactories(nodes) import matlab.bigdata.internal.executor.CompositeDataProcessorBuilder; import matlab.bigdata.internal.executor.TerminalProcessor; dataProcessorFactory = CompositeDataProcessorBuilder(... nodes, TerminalProcessor.createFactory(numel(nodes))); end