gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+lazyeval/ReduceProcessor.m

    %ReduceProcessor
% Data Processor that performs a reduction of the current partition to a
% single chunk.
%
% This will apply a rolling reduction to all input. It will emit the final
% result of this rolling reduction once all input has been received.
%
% See LazyTaskGraph for a general description of input and outputs.
% Specifically, this will receive a N x NumVariables cell array and reduce
% it to a 1 x NumVariables cell array, where each cell contains the final
% reduced chunk of the corresponding operation output.
%

%   Copyright 2015-2016 The MathWorks, Inc.

classdef (Sealed) ReduceProcessor < matlab.bigdata.internal.executor.DataProcessor
    % Properties overridden in the DataProcessor interface.
    properties (SetAccess = private)
        IsFinished = false;
        IsMoreInputRequired = true;
    end
    
    properties (GetAccess = private, SetAccess = immutable)
        % The Reducing function handle.
        FunctionHandle;
        
        % The number of variables that will be reduced.
        %
        % If this is greater than one, this processor expects a multiplexed
        % input and returns a multiplexed output. Multiplexing here means a
        % cell array with one column representing each variable.
        NumVariables;
    end
    
    properties (Access = private)
        % A buffer for holding partially reduced data while this data
        % processor is still receiving input.
        IntermediateBuffer;
    end
    
    methods (Static)
        % Create a data processor factory that can be used by the execution
        % environment to construct instances of this class.
        function factory = createFactory(functionHandle, numVariables)
            factory = @createReduceProcessor;
            function dataProcessor = createReduceProcessor(~)
                import matlab.bigdata.internal.lazyeval.ReduceProcessor;
                dataProcessor = ReduceProcessor(functionHandle, numVariables);
            end
        end
    end
    
    % Methods overridden in the DataProcessor interface.
    methods
        function data = process(obj, isLastOfInput, in)
            if obj.IsFinished || isempty(in)
                data = cell(0, obj.NumVariables);
                return;
            end

            % This enforces pairwise reduction so that we do not get sporadic
            % differences in rounding of results if this processor so
            % happens to receive a different number of chunks in two
            % different passes of the underlying data.
            in = [obj.IntermediateBuffer; in];
            state = in(1, :);
            for ii = 2:size(in, 1)
                state = cellfun(@vertcat, state, in(ii, :), 'UniformOutput', false);
                [state{:}] = matlab.bigdata.internal.lazyeval.callFunctionHandle(obj.FunctionHandle, state{:});
            end
            obj.IntermediateBuffer = state;
            
            if isLastOfInput && ~obj.IsFinished
                data = obj.IntermediateBuffer;
                obj.IsFinished = true;
                obj.IsMoreInputRequired = false;
            else
                data = cell(0, obj.NumVariables);
            end
        end
    end
    
    % Private constructor for factory method.
    methods (Access = private)
        function obj = ReduceProcessor(functionHandle, numVariables)
            obj.FunctionHandle = functionHandle;
            obj.NumVariables = numVariables;
        end
    end
end