gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+lazyeval/GroupedPartitionfunFunction.m

    %GroupedPartitionfunFunction
% An object that obeys the feval contract and performs the grouped version
% of the partitionfun call.
%
% This expects to be called within a partitionfun on a partitioned array of
% keys and one or more partitioned array of data. It works by wrapping the
% internal function handle in code that is called once per group, that
% maintains an info struct per group.
%

%   Copyright 2016 The MathWorks, Inc.

classdef GroupedPartitionfunFunction < handle & matlab.mixin.Copyable
    properties (SetAccess = immutable)
        % The underlying FunctionHandle object to be called per group per
        % collection of chunks in partition.
        UnderlyingFunction;
    end
    
    properties (SetAccess = private)
        % The set of currently known keys.
        Keys;
        
        % A vector of relative indices into the partition for each group.
        RelativeIndices;
        
        % A vector of logicals that specifies whether each group is
        % finished.
        IsGroupFinished;
    end
    
    methods (Static)
        % Construct a FunctionHandle containing a
        % GroupedPartitionfunFunction from a FunctionHandle.
        function fcn = create(fcnHandle)
            import matlab.bigdata.internal.FunctionHandle;
            import matlab.bigdata.internal.lazyeval.GroupedPartitionfunFunction;
            obj = GroupedPartitionfunFunction(fcnHandle);
            fcn = FunctionHandle(obj);
        end
    end
    methods
        %FEVAL Call the function handle. This is expected to be called by a
        %partitionfun that has no knowledge of the fact that groups exist.
        function [isFinished, keys, varargout] = feval(obj, info, keys, varargin)
            if info.IsLastChunk && isempty(keys)
                [isFinished, keys, varargout{1 : nargout - 2}] = fevalEmpty(obj, info, keys, varargin{:});
            else
                [isFinished, keys, varargout{1 : nargout - 2}] = fevalNormal(obj, info, keys, varargin{:});
            end
        end
    end
    
    methods (Access = private)
        % Private constructor for the create method.
        function obj = GroupedPartitionfunFunction(fcnHandle)
            assert(isa(fcnHandle, 'matlab.bigdata.internal.FunctionHandle'));
            obj.UnderlyingFunction = fcnHandle;
        end
        
        % Implementation of feval when there exists more data.
        function [isFinished, keys, varargout] = fevalNormal(obj, info, keys, varargin)
            % This is to ensure we have a full list of keys.
            if iscellstr(keys)
                keys = string(keys);
            end
            if isempty(obj.Keys)
                obj.Keys = unique(keys, 'rows');
            else
                obj.Keys = union(obj.Keys, keys, 'rows', 'stable');
            end
            obj.IsGroupFinished(end + 1 : size(obj.Keys, 1)) = false;
            obj.RelativeIndices(end + 1 : size(obj.Keys, 1)) = 1;
            
            fcn = iCreateKeyedPartitionfunHandle(obj.UnderlyingFunction);
            % This will mean the wrapper function receives the index into obj.Keys
            % of the current key as one of the inputs.
            keyIndexMap = matlab.bigdata.internal.lazyeval.KeyValueMapBroadcast(obj.Keys, (1:size(obj.Keys,1))');
            % obj is passed in a cell because function_handle is inferior
            % to this class.
            [keys, varargout{1 : nargout - 2}] = feval(fcn, keys, keyIndexMap, {obj}, info, varargin{:});
            
            isFinished = info.IsLastChunk && all(obj.IsGroupFinished);
        end
        
        % Implementation of feval when there does not exist any more data.
        % This simply runs through to discover which groups are not yet
        % finished.
        function [isFinished, keys, varargout] = fevalEmpty(obj, outerInfo, keys, varargin)
            
            fcn = obj.UnderlyingFunction;
            out = cell(size(obj.Keys, 1), nargout - 2);
            wasGroupFinished = obj.IsGroupFinished;
            for ii = 1:size(obj.Keys, 1)
                if ~wasGroupFinished(ii)
                    info = createInfoStruct(obj, ii, outerInfo);
                    [obj.IsGroupFinished(ii), out{ii, :}] = feval(fcn, info, varargin{:});
                end
            end
            
            varargout = cell(1, size(out, 2));
            for ii = 1:size(out, 2)
                varargout{ii} = vertcat(out{:, ii});
            end
            
            isFinished = outerInfo.IsLastChunk && all(obj.IsGroupFinished);
        end
        
        % Helper function that creates the info struct for one group.
        function info = createInfoStruct(obj, keyIndex, outerInfo)
            info = struct(...
                'PartitionId', outerInfo.PartitionId, ...
                'RelativeIndexInPartition', obj.RelativeIndices(keyIndex), ...
                'IsLastChunk', outerInfo.IsLastChunk);
        end
    end
end

% Internal wrapper functor that is passed to createKeyedFunctionHandle to
% generate a keyed function handle that calls a function with the
% partitionfun API.
function groupedKeyFcn = iCreateKeyedPartitionfunHandle(fcnHandle)
import matlab.bigdata.internal.FunctionHandle;

underlyingFcnHandle = fcnHandle.Handle;

groupedFcn = FunctionHandle(@fcn, 'MaxNumSlices', fcnHandle.MaxNumSlices, ...
    'ErrorFree', fcnHandle.ErrorFree, 'ErrorStack', fcnHandle.ErrorStack);
groupedKeyFcn = matlab.bigdata.internal.lazyeval.createKeyedFunctionHandle(groupedFcn);

% This function is called once per group. It receives:
%  - keyIndex: The index into obj{1}.Keys that represents the current
%  groups key.
%  - obj: A scalar cell containing the GroupedPartitionfunFunction object.
%  - outerInfo: The info struct passed to GroupedPartitionfunFunction by
%  LazyPartitionedArray/partitionfun.
%  - varargin: The input data associated with the current group.
    function varargout = fcn(keyIndex, obj, outerInfo, varargin)
        info = obj{1}.createInfoStruct(keyIndex, outerInfo);
        [obj{1}.IsGroupFinished(keyIndex), varargout{1:nargout}] = underlyingFcnHandle(info, varargin{:});
        
        sz = 1;
        for ii = 1:numel(varargin)
            if size(varargin{ii}, 1) ~= 1
                sz = size(varargin{ii}, 1);
                break;
            end
        end
        obj{1}.RelativeIndices(keyIndex) = obj{1}.RelativeIndices(keyIndex) + sz;
    end
end