gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+lazyeval/NonPartitionedProcessor.m
%NonPartitionedProcessor % Data Processor that applies a function handle to the vertical % concatenation of all input. % % This will buffer all input until upstream data processors have completed, % then apply a function handle to the vertical concatenation of the data. % % See LazyTaskGraph for a general description of input and outputs. % Specifically, this will emit a single 1 x NumOutputs cell array, where % each cell contains the full output of the corresponding operation output. % % Copyright 2015-2016 The MathWorks, Inc. classdef (Sealed) NonPartitionedProcessor < matlab.bigdata.internal.executor.DataProcessor % Properties overridden in the DataProcessor interface. properties (SetAccess = private) IsFinished = false; IsMoreInputRequired; end properties (SetAccess = immutable) % The slice-wise function handle. FunctionHandle; % The number of outputs from the function handle. NumOutputs; InputFutureMap % The input buffer. InputBuffer; end methods (Static) % Create a data processor factory that can be used by the execution % environment to construct instances of this class. function factory = createFactory(functionHandle, numOutputs, inputFutureMap) factory = @createNonPartitionedProcessor; function dataProcessor = createNonPartitionedProcessor(~) import matlab.bigdata.internal.lazyeval.NonPartitionedProcessor; dataProcessor = NonPartitionedProcessor(functionHandle, numOutputs, inputFutureMap); end end end % Methods overridden in the DataProcessor interface. methods function data = process(obj, isLastOfDependencies, varargin) if obj.IsFinished data = cell(0, obj.NumOutputs); return; end isLastOfInputs = obj.InputFutureMap.mapScalars(isLastOfDependencies); functionInputs = obj.InputFutureMap.mapData(varargin); inputBuffer = obj.InputBuffer; inputBuffer.add(isLastOfInputs, functionInputs{:}); obj.IsFinished = all(isLastOfInputs); if obj.IsFinished functionInputs = inputBuffer.getAll(); [data{1:obj.NumOutputs}] = matlab.bigdata.internal.lazyeval.callFunctionHandle(obj.FunctionHandle, functionInputs{:}); else data = cell(0, obj.NumOutputs); end obj.IsMoreInputRequired = ~isLastOfDependencies; end end methods (Access = private) % Private constructor for factory method. function obj = NonPartitionedProcessor(functionHandle, numOutputs, inputFutureMap) import matlab.bigdata.internal.lazyeval.InputBuffer; obj.FunctionHandle = functionHandle; obj.NumOutputs = numOutputs; obj.InputFutureMap = inputFutureMap; isInputSinglePartition = true(1, inputFutureMap.NumOperationInputs); obj.InputBuffer = InputBuffer(inputFutureMap.NumOperationInputs, isInputSinglePartition); obj.IsMoreInputRequired = false(1, inputFutureMap.NumOperationInputs); end end end