gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+lazyeval/AbstractChunkwiseProcessor.m

    %AbstractChunkwiseProcessor
% Abstract base class for all chunk-wise operations that contain the common
% chunk-wise processing logic.
%

%   Copyright 2015-2016 The MathWorks, Inc.

classdef (Abstract) AbstractChunkwiseProcessor < matlab.bigdata.internal.executor.DataProcessor
    % Properties overridden in the DataProcessor interface.
    properties (SetAccess = private)
        IsFinished = false;
        IsMoreInputRequired;
    end
    
    properties (SetAccess = immutable)
        % The number of outputs from the function handle.
        NumOutputs;
        
        % An object that represents how to convert from dependency input to
        % function handle input.
        InputFutureMap;
        
        % An input buffer that will deal with the fact that inputs are
        % multiplexed.
        %
        % This can be empty in cases where no buffering is required. This
        % will be the case for anything that can be treated as a chunkwise
        % operation that only has a single (or already zipped) input.
        InputBuffer;
        
        % The maximum number of input parameter slices to pass to the
        % function handle in any one call.
        MaxNumSlices = Inf;
        
        % A logical scalar that specifies if this processor should allow
        % singleton expansion in the tall dimension.
        AllowTallDimExpansion = true;
    end
    
    properties (SetAccess = private)
        % A logical scalar that is set to true once this object is
        % initialized and has begun processing data.
        IsInitialized = false;
    end
    
    % Methods overridden in the DataProcessor interface.
    methods
        function data = process(obj, isLastOfDependencies, varargin)
            if obj.IsFinished
                data = cell(0, obj.NumOutputs);
                return;
            end
            
            isLastOfInputsVector = obj.InputFutureMap.mapScalars(isLastOfDependencies);
            functionInputs = obj.InputFutureMap.mapData(varargin);
            
            inputBuffer = obj.InputBuffer;
            
            % The buffer property is set to empty if no buffering is
            % required for zipping or for chunk size. As such, we can just
            % pass the data directly to the underlying operation, which is
            % faster.
            if isempty(inputBuffer)
                if all(cellfun(@isempty, varargin))
                    data = cell(0, obj.NumOutputs);
                    obj.IsFinished = all(isLastOfDependencies);
                else
                    isLastofAllInputs = all(isLastOfInputsVector);
                    functionInputs = cellfun(@matlab.bigdata.internal.util.vertcatCellContents, ...
                        functionInputs, 'UniformOutput', false);
                    [obj.IsFinished, data] = obj.callFunctionHandle(isLastofAllInputs, functionInputs, []);
                end
                obj.IsMoreInputRequired = ~isLastOfDependencies;
                return;
            end
            
            inputBuffer.add(isLastOfInputsVector, functionInputs{:});
            isInputTooShortVector = isLastOfInputsVector & ~inputBuffer.IsInputSingleSlice ...
                & inputBuffer.NumBufferedSlices ~= inputBuffer.LargestNumBufferedSlices;
            if any (isInputTooShortVector)
                if all(inputBuffer.IsInputSinglePartition)
                    % We can be certain in this case that the two arrays
                    % have an incompatible size in the tall dimension.
                    obj.throwSizeError();
                else
                    % Otherwise this might just be a case of different
                    % partitioning.
                    obj.throwFromFunctionHandle(MException(message('MATLAB:bigdata:array:IncompatibleTallIndexing')));
                end
            end
            
            % There are some first time checks that we want to do once the
            % input buffer has enough data to determine the types of input
            % we are about to receive.
            if ~obj.IsInitialized
                % We require to know which inputs are single slice before
                % we can do the first time checks.
                if ~inputBuffer.HasDeterminedSingleSliceInputs
                    data = cell(0, obj.NumOutputs);
                    return;
                end
                
                % This is to guard against the situation where the size of
                % one partition in a partitioned tall array just so happens
                % to match the size of a non-partitioned array. Examples
                % include the output of a reduction as well as local arrays.
                if any(~inputBuffer.IsInputSinglePartition)
                    isInputInvalidBroadcastVector = inputBuffer.IsInputSinglePartition & ~inputBuffer.IsInputSingleSlice;
                    if any(isInputInvalidBroadcastVector)
                        obj.throwSizeError();
                    end
                end
                
                if ~obj.AllowTallDimExpansion && any(~inputBuffer.IsInputSingleSlice) && any(inputBuffer.IsInputSingleSlice)
                    obj.throwSizeError();
                end
                
                obj.IsInitialized = true;
            end
            
            [functionInputs, numSlices] = inputBuffer.getCompleteSlices(obj.MaxNumSlices);
            
            isLastofAllInputs = all(isLastOfInputsVector) && inputBuffer.LargestNumBufferedSlices == 0;
            [obj.IsFinished, data] = obj.callFunctionHandle(isLastofAllInputs, functionInputs, numSlices);
            
            % This logic exists in order to ensure inputs arrive at similar
            % data rates.
            %
            % This object indicates that it requires more data for a given
            % input if and only if the buffer for that input contains less
            % data than would be needed to consume all of the data from all
            % buffers, or to reach MaxNumSlices if that is smaller.
            requiredBufferSize = inputBuffer.LargestNumBufferedSlices;
            requiredBufferSize = min(requiredBufferSize, obj.MaxNumSlices);
            requiredBufferSize = max(requiredBufferSize, 1);
            
            isBufferTooShortVector = inputBuffer.NumBufferedSlices < requiredBufferSize;
            isMoreInputRequiredVector = ~isLastOfInputsVector & isBufferTooShortVector;
            
            % We have to map from operation inputs back to upstream
            % dependencies because this property is in terms of upstream
            % dependencies.
            obj.IsMoreInputRequired = ~obj.IsFinished & obj.InputFutureMap.reverseMapLogicals(isMoreInputRequiredVector);
        end
    end
    
    methods (Access = protected, Abstract)
        % Call the underlying function handle with the corresponding
        % inputs.
        [isFinished, data] = callFunctionHandle(obj, isLastOfAllInput, inputs);
        
        % Throw the provided error as if it originated from the function
        % handle.
        throwFromFunctionHandle(obj, err);
    end
    
    methods (Access = protected)
        % Protected constructor for subclasses.
        function obj = AbstractChunkwiseProcessor(numOutputs, inputFutureMap, ...
                enableBuffer, isInputSinglePartition, maxNumSlices, allowTallDimExpansion)
            import matlab.bigdata.internal.lazyeval.InputBuffer;
            obj.NumOutputs = numOutputs;
            obj.InputFutureMap = inputFutureMap;
            if enableBuffer
                obj.InputBuffer = InputBuffer(numel(isInputSinglePartition), isInputSinglePartition);
            end
            
            obj.IsMoreInputRequired = true(1, obj.InputFutureMap.NumOperationInputs);
            
            if nargin >= 5
                obj.MaxNumSlices = maxNumSlices;
            end
            if nargin >= 6
                obj.AllowTallDimExpansion = allowTallDimExpansion;
            end
        end
    end
    
    methods (Access = private)
        % Helper function that ensures the right error is thrown based on
        % whether this operation supports singleton expansion in the tall
        % dimension.
        function throwSizeError(obj)
            if obj.AllowTallDimExpansion
                obj.throwFromFunctionHandle(MException(message('MATLAB:bigdata:array:IncompatibleTallSize')));
            else
                obj.throwFromFunctionHandle(MException(message('MATLAB:bigdata:array:IncompatibleTallStrictSize')));
            end
        end
    end
end