gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+lazyeval/GroupedPartitionedArray.m
%GroupedPartitionedArray % An implementation of the PartitionedArray API that represents an % collection of partitioned groups. Every method of this class % applies its logic per group. % % In the absence of reduced values, all pure % chunkwise / slicewise / elementwise operations are the same. The funfun % methods that are supported but differ from LazyPartitionedArray: % - reducefun is applied to each group separately % - aggregatefun is applied to each group separately % - partitionfun is applied to each group separately, with an info struct per group % % The following funfun methods are not supported: % - gather % - reducebykeyfun % - aggregatebykeyfun % % In the presence of reduced values, such as mean(x - mean(x)), all funfun % methods map the reduced scalar back to the group. This means that the % output per group of reducefun/aggregatefun is passed per group if passed % as an input to other funfun methods. % % Copyright 2016 The MathWorks, Inc. classdef (InferiorClasses = { ... ?matlab.bigdata.internal.BroadcastArray, ... ?matlab.bigdata.internal.FunctionHandle, ... ?matlab.bigdata.internal.LocalArray,... ?matlab.bigdata.internal.PartitionedArray}) ... GroupedPartitionedArray < matlab.bigdata.internal.PartitionedArray properties (SetAccess = immutable) % A LazyPartitionedArray of keys. This defines the grouping, each % unique slice of Keys corresponds to one group. Keys; % A LazyPartitionedArray of elements. This must be the same size in % the tall dimension as Keys. Values; % The GroupedPartitionedArraySession object that specifies when % this array is valid. Session; % A logical scalar that specifies whether a KeyValueMapBroadcast is % required in order to combine this back to the original data. RequiresBroadcast; end properties (Dependent) % A logical scalar that specifies whether this array is still % valid. IsValid; end methods (Static) % Create an array of GroupedPartitionedArray that share the same % session and keys. function varargout = create(keys, session, varargin) import matlab.bigdata.internal.lazyeval.GroupedPartitionedArray; assert(nargout == numel(varargin)); % TODO(g1394174): This is a workaround to an issue where % the optimizer errors if identical edges exist between one % source and destination node. keys = elementfun(@iNoOpForOptimizer, keys); for ii = 1:numel(varargin) varargin{ii} = elementfun(@iNoOpForOptimizer, varargin{ii}); end [keys, varargout{1:nargout}] = chunkfun(@iSortByKey, keys, varargin{:}); for ii = 1:numel(varargout) varargout{ii} = GroupedPartitionedArray(keys, varargout{ii}, session); end end end methods (Access = private) % A private constructor for the static create method. function obj = GroupedPartitionedArray(keys, values, session, requiresBroadcast) obj.Keys = keys; obj.Values = values; obj.Session = session; if nargin < 4 requiresBroadcast = false; end obj.RequiresBroadcast = requiresBroadcast; end end % Overrides of the PartitionedArray interface. methods %GATHER Return the underlying data for one or more PartitionedArray instances. function varargout = gather(varargin) %#ok<STOUT> [~, ~, session, ~] = iParseInputs(varargin{:}); funStr = func2str(session.FunctionHandle); error(message('MATLAB:bigdata:array:SplitApplyOperationNotSupported', funStr)); end %ELEMENTFUN Apply an element-wise function handle that preserves size in all dimensions to the underlying data. function varargout = elementfun(functionHandle, varargin) [inputs, keys, session, isAllBroadcast] = iParseInputs(varargin{:}); if numel(varargin) == 1 % When there is only a single input, that input is % guaranteed not to be a KeyValueMapBroadcast. As the % underlying function handle is elementwise, we don't need % to call it per group. [varargout{1:nargout}] = elementfun(functionHandle, inputs{:}); else functionHandle = iCreateKeyedFunctionHandle(functionHandle); [~, varargout{1:nargout}] = slicefun(functionHandle, keys, inputs{:}); end varargout = iWrapOutput(varargout, keys, session, isAllBroadcast); end %SLICEFUN Apply a given slice-wise function handle that preserves size in the tall dimension to the underlying data. function varargout = slicefun(functionHandle, varargin) [inputs, keys, session, isAllBroadcast] = iParseInputs(varargin{:}); functionHandle = iCreateKeyedFunctionHandle(functionHandle); [~, varargout{1:nargout}] = slicefun(functionHandle, keys, inputs{:}); varargout = iWrapOutput(varargout, keys, session, isAllBroadcast); end %FILTERSLICES Remove slices from one or more PartitionedArray using a PartitionedArray column vector of logical values. function varargout = filterslices(varargin) [inputs, keys, session, isAllBroadcast] = iParseInputs(varargin{:}); [varargout{1:nargout}, keys] = filterslices(inputs{:}, keys); varargout = iWrapOutput(varargout, keys, session, isAllBroadcast); end %UNION Concatenate two or more PartitionedArray instances in the tall dimension. function out = union(varargin) %#ok<STOUT> assert(false, 'The method PartitionedArray/union is currently not supported.'); end %REDUCEFUN Perform a reduction of the underlying data. function varargout = reducefun(functionHandle, varargin) [varargout{1:nargout}] = aggregatefun(functionHandle, functionHandle, varargin{:}); end %AGGREGATEFUN Perform a reduction of the underlying data that includes an initial transformation step. function varargout = aggregatefun(initialFunctionHandle, reduceFunctionHandle, varargin) [inputs, keys, session] = iParseInputs(varargin{:}); initialFunctionHandle = iParseFunctionHandle(initialFunctionHandle); reduceFunctionHandle = iParseFunctionHandle(reduceFunctionHandle); [keys, varargout{1:nargout}] = aggregatebykeyfun(initialFunctionHandle, reduceFunctionHandle, keys, inputs{:}); [keys, varargout{:}] = reducefun(@iSortByKey, keys, varargout{:}); varargout = iWrapOutput(varargout, keys, session, true); end %REDUCEBYKEYFUN For each unique key, perform a reducefun reduction of all of the data associated with that key. function [keys, varargout] = reducebykeyfun(functionHandle, keys, varargin) %#ok<INUSL,STOUT> [~, ~, session, ~] = iParseInputs(keys, varargin{:}); funStr = func2str(session.FunctionHandle); error(message('MATLAB:bigdata:array:SplitApplyOperationNotSupported', funStr)); end %AGGREGATEBYKEYFUN For each unique key, perform a aggregatefun reduction of all of the data associated with that key. function [keys, varargout] = aggregatebykeyfun(initialFunctionHandle, reduceFunctionHandle, keys, varargin) %#ok<INUSL,STOUT> [~, ~, session, ~] = iParseInputs(keys, varargin{:}); funStr = func2str(session.FunctionHandle); error(message('MATLAB:bigdata:array:SplitApplyOperationNotSupported', funStr)); end %JOINBYKEY Perform an inner join of two sets of PartitionedArray instances using keys common to both. function [keys, varargout] = joinbykey(xKeys, x, yKeys, y) %#ok<STOUT,INUSD> assert(false, 'The method PartitionedArray/joinbykey is currently not supported.'); end %CHUNKFUN Apply a given chunk-wise function handle to the underlying data. function varargout = chunkfun(functionHandle, varargin) [inputs, keys, session, isAllBroadcast] = iParseInputs(varargin{:}); functionHandle = iCreateKeyedFunctionHandle(functionHandle); [keys, varargout{1:nargout}] = chunkfun(functionHandle, keys, inputs{:}); varargout = iWrapOutput(varargout, keys, session, isAllBroadcast); end %PARTITIONFUN For each partition, apply a function handle to all of the underlying data for the partition. function varargout = partitionfun(functionHandle, varargin) [inputs, keys, session, isAllBroadcast] = iParseInputs(varargin{:}); functionHandle = iCreateKeyedPartitionfunFunction(functionHandle); [keys, varargout{1:nargout}] = partitionfun(functionHandle, keys, inputs{:}); varargout = iWrapOutput(varargout, keys, session, isAllBroadcast); end %MARKFORREUSE Inform the Lazy Evaluation Framework that the given PartitionedArray will be reused multiple times. function markforreuse(varargin) for ii = 1:numel(varargin) if isa(varargin{ii}, 'matlab.bigdata.internal.lazyeval.GroupedPartitionedArray') markforreuse(varargin{ii}.Keys, varargin{ii}.Values); end end end %REPARTITION Repartition one or more PartitionedArray instances. function [ref, varargout] = repartition(ref, varargin) %#ok<STOUT> assert(false, 'The method PartitionedArray/repartition is currently not supported.'); end %CLIENTFUN Apply a given function handle to the entirety underlying data in one call. function varargout = clientfun(functionHandle, varargin) [inputs, keys, session] = iParseInputs(varargin{:}); functionHandle = iCreateKeyedFunctionHandle(functionHandle); [keys, varargout{1:nargout}] = clientfun(functionHandle, keys, inputs{:}); varargout = iWrapOutput(varargout, keys, session, true); end % PARTITIONHEADFUN Apply a partition-wise function handle that only % requires the first few slices of each partition to generate the % complete output. % % The function handle must obey the same rules as for partitionfun. % On top of this, the framework will assume that full evaluation of % this operation is fast enough for preview. function varargout = partitionheadfun(functionHandle, varargin) [varargout{1:nargout}] = partitionfun(functionHandle, varargin{:}); end % STRICTSLICEFUN Perform a slicefun operation that does not support % singleton expansion. function varargout = strictslicefun(functionHandle, varargin) [inputs, keys, session, isAllBroadcast] = iParseInputs(varargin{:}); functionHandle = iCreateKeyedFunctionHandle(functionHandle); [keys, varargout{1:nargout}] = strictslicefun(functionHandle, keys, inputs{:}); varargout = iWrapOutput(varargout, keys, session, isAllBroadcast); end %GETEXECUTOR Get the underlying executor. function executor = getExecutor(obj) executor = getExecutor(obj.Values); end function tf = get.IsValid(obj) tf = obj.Session.IsValid; end end end % Helper function that ensures all received function handles are instances % of the matlab.bigdata.internal.FunctionHandle class. function functionHandle = iParseFunctionHandle(functionHandle) import matlab.bigdata.internal.FunctionHandle; if ~isa(functionHandle, 'matlab.bigdata.internal.FunctionHandle') assert (isa(functionHandle, 'function_handle')); functionHandle = FunctionHandle(functionHandle); end end % Helper function that wraps function handles with the logic necessary to % do per key function calls. function fcn = iCreateKeyedFunctionHandle(fcn) fcn = iParseFunctionHandle(fcn); fcn = matlab.bigdata.internal.lazyeval.createKeyedFunctionHandle(fcn); end % Helper function that wraps function handles with the logic necessary to % do per key function calls using the advanced partitionfun API. function fcn = iCreateKeyedPartitionfunFunction(fcn) fcn = iParseFunctionHandle(fcn); fcn = matlab.bigdata.internal.lazyeval.GroupedPartitionfunFunction.create(fcn); end % Parse the input GroupedPartitionedArray data inputs. This checks to ensure % all common data is the same for all inputs. function [inputs, keys, session, isAllBroadcast] = iParseInputs(varargin) import matlab.bigdata.internal.lazyeval.KeyValueMapBroadcast; inputs = varargin; % We need to know if all inputs are broadcast because if not, we need to % enact special logic to combine non-partitioned results back into partitioned % data. isAllBroadcast = true; for ii = 1:numel(varargin) isAllBroadcast = isAllBroadcast &&... ~(isa(varargin{ii}, 'matlab.bigdata.internal.lazyeval.GroupedPartitionedArray') ... && ~varargin{ii}.RequiresBroadcast); end session = []; keys = cell(size(varargin)); for ii = 1:nargin if isa(varargin{ii}, 'matlab.bigdata.internal.lazyeval.GroupedPartitionedArray') if isAllBroadcast || ~varargin{ii}.RequiresBroadcast % Whenever we do not need to broadcast, we simply pass the data % to the underlying LazyPartitionedArray unmodified. % TODO(g1394174): This is a workaround to an issue where % the optimizer errors if identical edges exist between one % source and destination node. keys{ii} = elementfun(@iNoOpForOptimizer, varargin{ii}.Keys); inputs{ii} = elementfun(@iNoOpForOptimizer, varargin{ii}.Values); session = iCheckSession(varargin{ii}.Session, session); else % Otherwise, convert to KeyValueMapBroadcast to deal with the % fact that LazyPartitionedArray requires broadcasts to be % singleton in the tall dimension. inputs{ii} = clientfun(@KeyValueMapBroadcast, ... varargin{ii}.Keys, varargin{ii}.Values); session = iCheckSession(varargin{ii}.Session, session); end elseif isa(varargin{ii}, 'matlab.bigdata.internal.BroadcastArray') ... && isa(varargin{ii}.Value, 'matlab.bigdata.internal.lazyeval.GroupedPartitionedArray') inputs{ii} = clientfun(@KeyValueMapBroadcast, ... varargin{ii}.Value.Keys, varargin{ii}.Value.Values); session = iCheckSession(varargin{ii}.Value.Session, session); else inputs{ii} = varargin{ii}; end end keys(cellfun(@isempty, keys)) = []; if numel(unique(vertcat(keys{:}))) == 1 keys = keys{1}; else keys = elementfun(@iAssertKeysEqual, keys{:}); end end % Check whether the underlying session is valid. function session = iCheckSession(newSession, session) if ~newSession.IsValid || (~isempty(session) && newSession ~= session) error(message('MATLAB:bigdata:array:InvalidTall')); else session = newSession; end end % Assert whether the keys vector is the same across all input GroupedPartitionArray. function keys = iAssertKeysEqual(keys, varargin) for ii = 1:numel(varargin) if ~isequal(keys, varargin{ii}) error(message('MATLAB:bigdata:array:IncompatibleTallIndexing')); end end end % Wrap the output in a collection of GroupedPartitionedArray instances. function outputs = iWrapOutput(outputs, keys, session, requiresBroadcast) import matlab.bigdata.internal.lazyeval.GroupedPartitionedArray if nargin < 4 requiresBroadcast = false; end for ii = 1:numel(outputs) outputs{ii} = GroupedPartitionedArray(keys, outputs{ii}, session, requiresBroadcast); end end % Sort the given data by key. This is used by aggregatefun to guarantee a % well defined ordering of results. function [keys, varargout] = iSortByKey(keys, varargin) if iscategorical(keys) % For categorical, sort depends on the order of categories. As vertical % concatenation means the categories might be in an arbitrary order, we % must sort this as well. keys = setcats(keys, sort(categories(keys))); end [keys, idx] = sortrows(keys); varargout = varargin; for ii = 1:numel(varargout) varargout{ii} = matlab.bigdata.internal.util.indexSlices(varargout{ii}, idx); end end % Wrapper for the workaround to g1394174 so that the optimizer can ignore % this when optimizing reducebykeyfun. function data = iNoOpForOptimizer(data) end