gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+lazyeval/GroupedPartitionedArray.m

    %GroupedPartitionedArray
% An implementation of the PartitionedArray API that represents an
% collection of partitioned groups. Every method of this class
% applies its logic per group.
%
% In the absence of reduced values, all pure
% chunkwise / slicewise / elementwise operations are the same. The funfun
% methods that are supported but differ from LazyPartitionedArray:
%  - reducefun is applied to each group separately
%  - aggregatefun is applied to each group separately
%  - partitionfun is applied to each group separately, with an info struct per group
%
% The following funfun methods are not supported:
%  - gather
%  - reducebykeyfun
%  - aggregatebykeyfun
%
% In the presence of reduced values, such as mean(x - mean(x)), all funfun
% methods map the reduced scalar back to the group. This means that the
% output per group of reducefun/aggregatefun is passed per group if passed
% as an input to other funfun methods.
%

%   Copyright 2016 The MathWorks, Inc.

classdef (InferiorClasses = { ...
        ?matlab.bigdata.internal.BroadcastArray, ...
        ?matlab.bigdata.internal.FunctionHandle, ...
        ?matlab.bigdata.internal.LocalArray,...
        ?matlab.bigdata.internal.PartitionedArray}) ...
        GroupedPartitionedArray < matlab.bigdata.internal.PartitionedArray
    
    properties (SetAccess = immutable)
        % A LazyPartitionedArray of keys. This defines the grouping, each
        % unique slice of Keys corresponds to one group.
        Keys;
        
        % A LazyPartitionedArray of elements. This must be the same size in
        % the tall dimension as Keys.
        Values;
        
        % The GroupedPartitionedArraySession object that specifies when
        % this array is valid.
        Session;
        
        % A logical scalar that specifies whether a KeyValueMapBroadcast is
        % required in order to combine this back to the original data.
        RequiresBroadcast;
    end
    
    properties (Dependent)
        % A logical scalar that specifies whether this array is still
        % valid.
        IsValid;
    end
    
    methods (Static)
        % Create an array of GroupedPartitionedArray that share the same
        % session and keys.
        function varargout = create(keys, session, varargin)
            import matlab.bigdata.internal.lazyeval.GroupedPartitionedArray;
            assert(nargout == numel(varargin));
            % TODO(g1394174): This is a workaround to an issue where
            % the optimizer errors if identical edges exist between one
            % source and destination node.
            keys = elementfun(@iNoOpForOptimizer, keys);
            for ii = 1:numel(varargin)
                varargin{ii} = elementfun(@iNoOpForOptimizer, varargin{ii});
            end
            [keys, varargout{1:nargout}] = chunkfun(@iSortByKey, keys, varargin{:});
            for ii = 1:numel(varargout)
                varargout{ii} = GroupedPartitionedArray(keys, varargout{ii}, session);
            end
        end
    end
    
    methods (Access = private)
        % A private constructor for the static create method.
        function obj = GroupedPartitionedArray(keys, values, session, requiresBroadcast)
            obj.Keys = keys;
            obj.Values = values;
            obj.Session = session;
            if nargin < 4
                requiresBroadcast = false;
            end
            obj.RequiresBroadcast = requiresBroadcast;
        end
    end
    
    % Overrides of the PartitionedArray interface.
    methods
        %GATHER Return the underlying data for one or more PartitionedArray instances.
        function varargout = gather(varargin) %#ok<STOUT>
            [~, ~, session, ~] = iParseInputs(varargin{:});
            funStr = func2str(session.FunctionHandle);
            error(message('MATLAB:bigdata:array:SplitApplyOperationNotSupported', funStr));
        end
        
        %ELEMENTFUN Apply an element-wise function handle that preserves size in all dimensions to the underlying data.
        function varargout = elementfun(functionHandle, varargin)
            [inputs, keys, session, isAllBroadcast] = iParseInputs(varargin{:});
            if numel(varargin) == 1
                % When there is only a single input, that input is
                % guaranteed not to be a KeyValueMapBroadcast. As the
                % underlying function handle is elementwise, we don't need
                % to call it per group.
                [varargout{1:nargout}] = elementfun(functionHandle, inputs{:});
            else
                functionHandle = iCreateKeyedFunctionHandle(functionHandle);
                [~, varargout{1:nargout}] = slicefun(functionHandle, keys, inputs{:});
            end
            varargout = iWrapOutput(varargout, keys, session, isAllBroadcast);
        end
        
        %SLICEFUN Apply a given slice-wise function handle that preserves size in the tall dimension to the underlying data.
        function varargout = slicefun(functionHandle, varargin)
            [inputs, keys, session, isAllBroadcast] = iParseInputs(varargin{:});
            functionHandle = iCreateKeyedFunctionHandle(functionHandle);
            [~, varargout{1:nargout}] = slicefun(functionHandle, keys, inputs{:});
            varargout = iWrapOutput(varargout, keys, session, isAllBroadcast);
        end
        
        %FILTERSLICES Remove slices from one or more PartitionedArray using a PartitionedArray column vector of logical values.
        function varargout = filterslices(varargin)
            [inputs, keys, session, isAllBroadcast] = iParseInputs(varargin{:});
            [varargout{1:nargout}, keys] = filterslices(inputs{:}, keys);
            varargout = iWrapOutput(varargout, keys, session, isAllBroadcast);
        end
        
        %UNION Concatenate two or more PartitionedArray instances in the tall dimension.
        function out = union(varargin) %#ok<STOUT>
            assert(false, 'The method PartitionedArray/union is currently not supported.');
        end
        
        %REDUCEFUN Perform a reduction of the underlying data.
        function varargout = reducefun(functionHandle, varargin)
            [varargout{1:nargout}] = aggregatefun(functionHandle, functionHandle, varargin{:});
        end
        
        %AGGREGATEFUN Perform a reduction of the underlying data that includes an initial transformation step.
        function varargout = aggregatefun(initialFunctionHandle, reduceFunctionHandle, varargin)
            [inputs, keys, session] = iParseInputs(varargin{:});
            initialFunctionHandle = iParseFunctionHandle(initialFunctionHandle);
            reduceFunctionHandle = iParseFunctionHandle(reduceFunctionHandle);
            [keys, varargout{1:nargout}] = aggregatebykeyfun(initialFunctionHandle, reduceFunctionHandle, keys, inputs{:});
            [keys, varargout{:}] = reducefun(@iSortByKey, keys, varargout{:});
            varargout = iWrapOutput(varargout, keys, session, true);
        end
        
        %REDUCEBYKEYFUN For each unique key, perform a reducefun reduction of all of the data associated with that key.
        function [keys, varargout] = reducebykeyfun(functionHandle, keys, varargin) %#ok<INUSL,STOUT>
            [~, ~, session, ~] = iParseInputs(keys, varargin{:});
            funStr = func2str(session.FunctionHandle);
            error(message('MATLAB:bigdata:array:SplitApplyOperationNotSupported', funStr));
        end
        
        %AGGREGATEBYKEYFUN For each unique key, perform a aggregatefun reduction of all of the data associated with that key.
        function [keys, varargout] = aggregatebykeyfun(initialFunctionHandle, reduceFunctionHandle, keys, varargin) %#ok<INUSL,STOUT>
            [~, ~, session, ~] = iParseInputs(keys, varargin{:});
            funStr = func2str(session.FunctionHandle);
            error(message('MATLAB:bigdata:array:SplitApplyOperationNotSupported', funStr));
        end
        
        %JOINBYKEY Perform an inner join of two sets of PartitionedArray instances using keys common to both.
        function [keys, varargout] = joinbykey(xKeys, x, yKeys, y) %#ok<STOUT,INUSD>
            assert(false, 'The method PartitionedArray/joinbykey is currently not supported.');
        end
        
        %CHUNKFUN Apply a given chunk-wise function handle to the underlying data.
        function varargout = chunkfun(functionHandle, varargin)
            [inputs, keys, session, isAllBroadcast] = iParseInputs(varargin{:});
            functionHandle = iCreateKeyedFunctionHandle(functionHandle);
            [keys, varargout{1:nargout}] = chunkfun(functionHandle, keys, inputs{:});
            varargout = iWrapOutput(varargout, keys, session, isAllBroadcast);
        end
        
        %PARTITIONFUN For each partition, apply a function handle to all of the underlying data for the partition.
        function varargout = partitionfun(functionHandle, varargin)
            [inputs, keys, session, isAllBroadcast] = iParseInputs(varargin{:});
            functionHandle = iCreateKeyedPartitionfunFunction(functionHandle);
            [keys, varargout{1:nargout}] = partitionfun(functionHandle, keys, inputs{:});
            varargout = iWrapOutput(varargout, keys, session, isAllBroadcast);
        end
        
        %MARKFORREUSE Inform the Lazy Evaluation Framework that the given PartitionedArray will be reused multiple times.
        function markforreuse(varargin)
            for ii = 1:numel(varargin)
                if isa(varargin{ii}, 'matlab.bigdata.internal.lazyeval.GroupedPartitionedArray')
                    markforreuse(varargin{ii}.Keys, varargin{ii}.Values);
                end
            end
        end
        
        %REPARTITION Repartition one or more PartitionedArray instances.
        function [ref, varargout] = repartition(ref, varargin) %#ok<STOUT>
            assert(false, 'The method PartitionedArray/repartition is currently not supported.');
        end
        
        %CLIENTFUN Apply a given function handle to the entirety underlying data in one call.
        function varargout = clientfun(functionHandle, varargin)
            [inputs, keys, session] = iParseInputs(varargin{:});
            functionHandle = iCreateKeyedFunctionHandle(functionHandle);
            [keys, varargout{1:nargout}] = clientfun(functionHandle, keys, inputs{:});
            varargout = iWrapOutput(varargout, keys, session, true);
        end
        
        % PARTITIONHEADFUN Apply a partition-wise function handle that only
        % requires the first few slices of each partition to generate the
        % complete output.
        %
        % The function handle must obey the same rules as for partitionfun.
        % On top of this, the framework will assume that full evaluation of
        % this operation is fast enough for preview.
        function varargout = partitionheadfun(functionHandle, varargin)
            [varargout{1:nargout}] = partitionfun(functionHandle, varargin{:});
        end
        
        % STRICTSLICEFUN Perform a slicefun operation that does not support
        % singleton expansion.
        function varargout = strictslicefun(functionHandle, varargin)
            [inputs, keys, session, isAllBroadcast] = iParseInputs(varargin{:});
            functionHandle = iCreateKeyedFunctionHandle(functionHandle);
            [keys, varargout{1:nargout}] = strictslicefun(functionHandle, keys, inputs{:});
            varargout = iWrapOutput(varargout, keys, session, isAllBroadcast);
        end
        
        %GETEXECUTOR Get the underlying executor.
        function executor = getExecutor(obj)
            executor = getExecutor(obj.Values);
        end
        
        function tf = get.IsValid(obj)
            tf = obj.Session.IsValid;
        end
    end
end

% Helper function that ensures all received function handles are instances
% of the matlab.bigdata.internal.FunctionHandle class.
function functionHandle = iParseFunctionHandle(functionHandle)
import matlab.bigdata.internal.FunctionHandle;
if ~isa(functionHandle, 'matlab.bigdata.internal.FunctionHandle')
    assert (isa(functionHandle, 'function_handle'));
    functionHandle = FunctionHandle(functionHandle);
end
end

% Helper function that wraps function handles with the logic necessary to
% do per key function calls.
function fcn = iCreateKeyedFunctionHandle(fcn)
fcn = iParseFunctionHandle(fcn);
fcn = matlab.bigdata.internal.lazyeval.createKeyedFunctionHandle(fcn);
end

% Helper function that wraps function handles with the logic necessary to
% do per key function calls using the advanced partitionfun API.
function fcn = iCreateKeyedPartitionfunFunction(fcn)
fcn = iParseFunctionHandle(fcn);
fcn = matlab.bigdata.internal.lazyeval.GroupedPartitionfunFunction.create(fcn);
end

% Parse the input GroupedPartitionedArray data inputs. This checks to ensure
% all common data is the same for all inputs.
function [inputs, keys, session, isAllBroadcast] = iParseInputs(varargin)
import matlab.bigdata.internal.lazyeval.KeyValueMapBroadcast;
inputs = varargin;

% We need to know if all inputs are broadcast because if not, we need to
% enact special logic to combine non-partitioned results back into partitioned
% data.
isAllBroadcast = true;
for ii = 1:numel(varargin)
    isAllBroadcast = isAllBroadcast &&...
        ~(isa(varargin{ii}, 'matlab.bigdata.internal.lazyeval.GroupedPartitionedArray') ...
        && ~varargin{ii}.RequiresBroadcast);
end

session = [];
keys = cell(size(varargin));
for ii = 1:nargin
    if isa(varargin{ii}, 'matlab.bigdata.internal.lazyeval.GroupedPartitionedArray')
        if isAllBroadcast || ~varargin{ii}.RequiresBroadcast
            % Whenever we do not need to broadcast, we simply pass the data
            % to the underlying LazyPartitionedArray unmodified.
            
            % TODO(g1394174): This is a workaround to an issue where
            % the optimizer errors if identical edges exist between one
            % source and destination node.
            keys{ii} = elementfun(@iNoOpForOptimizer, varargin{ii}.Keys);
            inputs{ii} = elementfun(@iNoOpForOptimizer, varargin{ii}.Values);
            session = iCheckSession(varargin{ii}.Session, session);
        else
            % Otherwise, convert to KeyValueMapBroadcast to deal with the
            % fact that LazyPartitionedArray requires broadcasts to be
            % singleton in the tall dimension.
            inputs{ii} = clientfun(@KeyValueMapBroadcast, ...
                varargin{ii}.Keys, varargin{ii}.Values);
            session = iCheckSession(varargin{ii}.Session, session);
        end
    elseif isa(varargin{ii}, 'matlab.bigdata.internal.BroadcastArray') ...
            && isa(varargin{ii}.Value, 'matlab.bigdata.internal.lazyeval.GroupedPartitionedArray')
        inputs{ii} = clientfun(@KeyValueMapBroadcast, ...
            varargin{ii}.Value.Keys, varargin{ii}.Value.Values);
        session = iCheckSession(varargin{ii}.Value.Session, session);
    else
        inputs{ii} = varargin{ii};
    end
end
keys(cellfun(@isempty, keys)) = [];
if numel(unique(vertcat(keys{:}))) == 1
    keys = keys{1};
else
    keys = elementfun(@iAssertKeysEqual, keys{:});
end
end

% Check whether the underlying session is valid.
function session = iCheckSession(newSession, session)
if ~newSession.IsValid || (~isempty(session) && newSession ~= session)
    error(message('MATLAB:bigdata:array:InvalidTall'));
else
    session = newSession;
end
end

% Assert whether the keys vector is the same across all input GroupedPartitionArray.
function keys = iAssertKeysEqual(keys, varargin)
for ii = 1:numel(varargin)
    if ~isequal(keys, varargin{ii})
        error(message('MATLAB:bigdata:array:IncompatibleTallIndexing'));
    end
end
end

% Wrap the output in a collection of GroupedPartitionedArray instances.
function outputs = iWrapOutput(outputs, keys, session, requiresBroadcast)
import matlab.bigdata.internal.lazyeval.GroupedPartitionedArray
if nargin < 4
    requiresBroadcast = false;
end
for ii = 1:numel(outputs)
    outputs{ii} = GroupedPartitionedArray(keys, outputs{ii}, session, requiresBroadcast);
end
end

% Sort the given data by key. This is used by aggregatefun to guarantee a
% well defined ordering of results.
function [keys, varargout] = iSortByKey(keys, varargin)
if iscategorical(keys)
    % For categorical, sort depends on the order of categories. As vertical
    % concatenation means the categories might be in an arbitrary order, we
    % must sort this as well.
    keys = setcats(keys, sort(categories(keys)));
end
[keys, idx] = sortrows(keys);
varargout = varargin;
for ii = 1:numel(varargout)
    varargout{ii} = matlab.bigdata.internal.util.indexSlices(varargout{ii}, idx);
end
end

% Wrapper for the workaround to g1394174 so that the optimizer can ignore
% this when optimizing reducebykeyfun.
function data = iNoOpForOptimizer(data)
end