gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+lazyeval/LazyPartitionedArray.m
%LazyPartitionedArray % An implementation of the PartitionedArray interface that sits on top of % the lazy evaluation architecture. % Copyright 2015-2016 The MathWorks, Inc. classdef (Sealed, InferiorClasses = { ... ?matlab.bigdata.internal.BroadcastArray, ... ?matlab.bigdata.internal.FunctionHandle, ... ?matlab.bigdata.internal.LocalArray}) ... LazyPartitionedArray < matlab.bigdata.internal.PartitionedArray properties (SetAccess = private) % A future to the actual evaluated data. ValueFuture; % The datastore that is backing this partitioned array. This exists % to allow checking of compatibility between two datastore based % tall arrays. This will be empty for gathered arrays. Datastore; end properties (SetAccess = private, Transient) % The underlying execution environment backing this partitioned % array. Executor; end properties (Dependent) % A logical scalar that is true if and only if this Partitioned % array still has a valid PartitionedArrayExecutor. IsValid end properties (SetAccess = private, Transient) % Properties to support cheap preview / display / workspace browser info. HasPreviewData = false IsPreviewTruncated = false PreviewData = [] end methods (Static) % Construct a LazyPartitionedArray instance from a datastore % instance. function obj = createFromDatastore(ds) import matlab.bigdata.internal.executor.PartitionedArrayExecutor; import matlab.bigdata.internal.lazyeval.ReadOperation; ds = copy(ds); PartitionedArrayExecutor.checkDatastoreSupportForDefault(ds); numOutputs = 1; obj = iDoOperation(ReadOperation(ds, numOutputs)); obj = iSetDatastore(ds, obj); end % Construct a LazyPartitionedArray instance around a constant. function obj = createFromConstant(constant, executor) import matlab.bigdata.internal.lazyeval.LazyPartitionedArray; valueFuture = iParseDataInputs(constant); if nargin < 2 executor = iGetCurrentExecutor(); end % We pass in an empty into the datastore input so that this % array can be used with any other LazyPartitionedArray object. ds = []; obj = LazyPartitionedArray(valueFuture, executor, ds); end end methods function varargout = gather(varargin) % We combine all variables to be gathered in order to allow the % Spark integration to perform the gather in a single execution. if nargin ~= 1 [varargin{:}] = clientfun(@deal, varargin{:}); end % Before gathering, call the optimizer. op = matlab.bigdata.internal.Optimizer.default(); op.optimize(varargin{:}); evaluate(varargin{:}); varargout = cell(size(varargin)); for ii = 1:numel(varargin) assert (varargin{ii}.ValueFuture.IsDone); varargout{ii} = varargin{ii}.ValueFuture.Value; end end function varargout = elementfun(functionHandle, varargin) import matlab.bigdata.internal.lazyeval.ElementwiseOperation; functionHandle = iParseFunctionHandle(functionHandle); [varargout{1:nargout}] = iDoOperation(ElementwiseOperation(functionHandle, numel(varargin), nargout), varargin{:}); end function varargout = slicefun(functionHandle, varargin) import matlab.bigdata.internal.lazyeval.SlicewiseOperation; functionHandle = iParseFunctionHandle(functionHandle); [varargout{1:nargout}] = iDoOperation(SlicewiseOperation(functionHandle, numel(varargin), nargout), varargin{:}); end function varargout = filterslices(subs, varargin) import matlab.bigdata.internal.lazyeval.FilterOperation; [varargout{1:nargout}] = iDoOperation(FilterOperation(numel(varargin)), subs, varargin{:}); end function out = union(varargin) %#ok<STOUT> assert(false, 'The method PartitionedArray/union is currently not supported.'); end function varargout = reducefun(functionHandle, varargin) [varargout{1:nargout}] = aggregatefun(functionHandle, functionHandle, varargin{:}); end function varargout = aggregatefun(initialFunctionHandle, reduceFunctionHandle, varargin) import matlab.bigdata.internal.lazyeval.AggregateOperation; initialFunctionHandle = iParseFunctionHandle(initialFunctionHandle); reduceFunctionHandle = iParseFunctionHandle(reduceFunctionHandle); operation = AggregateOperation(initialFunctionHandle, reduceFunctionHandle, numel(varargin), nargout); [varargout{1:nargout}] = iDoOperation(operation, varargin{:}); [varargout{:}] = iSetDatastore([], varargout{:}); end function [keys, varargout] = reducebykeyfun(functionHandle, keys, varargin) [keys, varargout{1:nargout - 1}] = aggregatebykeyfun(functionHandle, functionHandle, keys, varargin{:}); end function [keys, varargout] = aggregatebykeyfun(initialFunctionHandle, reduceFunctionHandle, keys, varargin) import matlab.bigdata.internal.lazyeval.AggregateByKeyOperation; initialFunctionHandle = iParseFunctionHandle(initialFunctionHandle); reduceFunctionHandle = iParseFunctionHandle(reduceFunctionHandle); operation = AggregateByKeyOperation(initialFunctionHandle, reduceFunctionHandle, numel(varargin) + 1, nargout); [keys, varargout{1:nargout - 1}] = iDoOperation(operation, keys, varargin{:}); [varargout{:}] = iSetDatastore([], varargout{:}); end function [keys, varargout] = joinbykey(xKeys, x, yKeys, y) %#ok<STOUT,INUSD> assert(false, 'The method PartitionedArray/joinbykey is currently not supported.'); end function varargout = chunkfun(functionHandle, varargin) import matlab.bigdata.internal.lazyeval.ChunkwiseOperation; functionHandle = iParseFunctionHandle(functionHandle); [varargout{1:nargout}] = iDoOperation(ChunkwiseOperation(functionHandle, numel(varargin), nargout), varargin{:}); end function varargout = partitionfun(functionHandle, varargin) import matlab.bigdata.internal.lazyeval.PartitionwiseOperation; functionHandle = iParseFunctionHandle(functionHandle); [varargout{1:nargout}] = iDoOperation(PartitionwiseOperation(functionHandle, numel(varargin), nargout), varargin{:}); end function markforreuse(varargin) import matlab.bigdata.internal.lazyeval.CacheOperation; for ii = 1:numel(varargin) newArray = iDoOperation(CacheOperation(), varargin{ii}); varargin{ii}.ValueFuture = newArray.ValueFuture; end end function [ref, varargout] = repartition(ref, varargin) %#ok<STOUT> assert(false, 'The method PartitionedArray/repartition is currently not supported.'); end % Get the executor underlying this partitioned array or error if it % no longer is valid. function executor = getExecutor(obj) executor = obj.Executor; end end methods (Hidden) % TODO: This is currently not in the specification. function varargout = clientfun(functionHandle, varargin) import matlab.bigdata.internal.lazyeval.NonPartitionedOperation; functionHandle = iParseFunctionHandle(functionHandle); [varargout{1:nargout}] = iDoOperation(NonPartitionedOperation(functionHandle, numel(varargin), nargout), varargin{:}); end % PARTITIONHEADFUN Apply a partition-wise function handle that only % requires the first few slices of each partition to generate the % complete output. % % The function handle must obey the same rules as for partitionfun. % On top of this, the framework will assume that full evaluation of % this operation is fast enough for preview. function varargout = partitionheadfun(functionHandle, varargin) import matlab.bigdata.internal.lazyeval.PartitionwiseOperation; functionHandle = iParseFunctionHandle(functionHandle); dependsOnlyOnHead = true; [varargout{1:nargout}] = iDoOperation(PartitionwiseOperation(functionHandle, numel(varargin), nargout, dependsOnlyOnHead), varargin{:}); end % STRICTSLICEFUN Perform a slicefun operation that does not support % singleton expansion. function varargout = strictslicefun(functionHandle, varargin) import matlab.bigdata.internal.lazyeval.SlicewiseOperation; functionHandle = iParseFunctionHandle(functionHandle); allowsTallDimExpansion = false; op = SlicewiseOperation(functionHandle, numel(varargin), nargout, allowsTallDimExpansion); [varargout{1:nargout}] = iDoOperation(op, varargin{:}); end end methods (Hidden) % Everything to do with the cached preview data. Don't even ask about this stuff % if the array is gathered. function tf = hasCachedPreviewData(obj) assert(~obj.ValueFuture.IsDone); tf = obj.HasPreviewData; end function [data, isTruncated] = getCachedPreviewData(obj) assert(~obj.ValueFuture.IsDone); data = obj.PreviewData; isTruncated = obj.IsPreviewTruncated; end function setCachedPreviewData(obj, previewData, isTruncated) assert(~obj.ValueFuture.IsDone); assert(~obj.HasPreviewData); obj.PreviewData = previewData; obj.IsPreviewTruncated = isTruncated; obj.HasPreviewData = true; end end methods function tf = get.IsValid(obj) tf = obj.Executor.checkIsValid(); end end methods (Static) % Deserialization to a LazyPartitionedArray object. function obj = loadobj(obj) % On load, we bind the lazy partitioned array with the current % executor. obj.Executor = iGetCurrentExecutor(); end end methods (Access = private) % Private constructor for factory purposes. function obj = LazyPartitionedArray(valueFuture, executor, ds) import matlab.bigdata.internal.executor.PartitionedArrayExecutor; assert(isa(valueFuture, 'matlab.bigdata.internal.lazyeval.ClosureFuture') && isscalar(valueFuture)); obj.ValueFuture = valueFuture; obj.Executor = executor; obj.Datastore = ds; obj.HasPreviewData = false; % This is to ensure execution environments with idle timeouts % have the chance to reset the timeout when new arrays are % created. if executor.checkIsValidNow() executor.keepAlive(); end end % The common error handling code for Partitioned Array evaluation. function wrapEvaluationError(obj, err) %#ok<INUSL> rethrow(err); end % Evaluate the provided array of LazyPartitionedArray instances. function evaluate(varargin) import matlab.bigdata.internal.executor.PartitionedArrayExecutor; import matlab.bigdata.internal.lazyeval.LazyTaskGraph; import matlab.bigdata.internal.serial.SerialExecutor; import matlab.bigdata.internal.util.isPreviewCheap; closures = cell(size(varargin)); executor = matlab.bigdata.internal.executor.PartitionedArrayExecutor.override(); for ii = 1:numel(varargin) if ~varargin{ii}.ValueFuture.IsDone closures{ii} = varargin{ii}.ValueFuture.Promise.Closure; if isempty(executor) executor = getExecutor(varargin{ii}); end end end if executor.supportsSinglePartition() isGatherCheap = true(size(varargin)); ii=1; while all(isGatherCheap) && ii<=numel(varargin) [~, isGatherCheap(ii)] = isPreviewCheap(varargin{ii}); ii = ii + 1; end if all(isGatherCheap) executor = SerialExecutor('UseSinglePartition', true); end end closures = vertcat(closures{:}); if ~isempty(closures) taskGraph = LazyTaskGraph(closures); [outputs{1:numel(taskGraph.OutputTasks)}] = executor.execute(taskGraph); for ii = 1:numel(outputs) task = taskGraph.OutputTasks(ii); outputClosure = taskGraph.TaskToClosureMap(task.Id); for jj = 1:numel(outputClosure.OutputPromises) outputClosure.OutputPromises(jj).setValue(vertcat(outputs{ii}{:, jj})); end end end end end end % Generate a closure for the operation and return a list of % PartitionedArray instances representing each output. function varargout = iDoOperation(operation, varargin) import matlab.bigdata.internal.lazyeval.LazyPartitionedArray; [valueFutures, executor, datastore] = iParseDataInputs(varargin{:}); if isempty(executor) executor = iGetCurrentExecutor(); end closure = matlab.bigdata.internal.lazyeval.Closure(valueFutures, operation); varargout = cell(numel(closure.OutputPromises), 1); for ii = 1:numel(varargout) varargout{ii} = LazyPartitionedArray(closure.OutputPromises(ii).Future, executor, datastore); end end % Helper function that parses the data inputs to PartitionedArray methods. % This will return a list of futures to each input. function [valueFutures, executor, datastore] = iParseDataInputs(varargin) import matlab.bigdata.internal.BroadcastArray; import matlab.bigdata.internal.lazyeval.ClosureFuture; import matlab.bigdata.internal.lazyeval.ClosurePromise; valueFutures = cell(size(varargin)); executor = []; datastore = []; for ii = 1:nargin % Parse dispatch based on type of input. if isa(varargin{ii}, 'matlab.bigdata.internal.lazyeval.LazyPartitionedArray') valueFutures{ii} = varargin{ii}.ValueFuture; executor = iCheckSameExecutor(getExecutor(varargin{ii}), executor); datastore = iCheckSameDatastore(varargin{ii}.Datastore, datastore); elseif isa(varargin{ii}, 'matlab.bigdata.internal.BroadcastArray') % A BroadcastArray will either contain a local array that should % be passed to all function calls or a PartitionedArray instance % that represents an array that should be passed to all function % calls. To support this, we simply convert the underlying array to % a 1 x 1 BroadcastArray instance. This instance will be passed % to all function calls as per the singleton expansion rules. % Parse dispatch based on type of underlying value. value = varargin{ii}.Value; if isa(value, 'matlab.bigdata.internal.lazyeval.LazyPartitionedArray') value = clientfun(@BroadcastArray, value); valueFutures{ii} = value.ValueFuture; executor = iCheckSameExecutor(getExecutor(value), executor); datastore = iCheckSameDatastore(value.Datastore, datastore); else % An unwrapped local array % Ensure that this else branch does not accept other unsupported % implementation of PartitionedArray that has not been dealt % with explicitly by their own elseif branch. assert(~isa(varargin{ii}, 'matlab.bigdata.internal.PartitionedArray')); promise = ClosurePromise(BroadcastArray(value)); valueFutures{ii} = promise.Future; end elseif isa(varargin{ii}, 'matlab.bigdata.internal.LocalArray') promise = ClosurePromise(varargin{ii}.Value); valueFutures{ii} = promise.Future; else % An unwrapped local array % Ensure that this else branch does not accept other unsupported % implementation of PartitionedArray that has not been dealt % with explicitly by their own elseif branch. assert(~isa(varargin{ii}, 'matlab.bigdata.internal.PartitionedArray')); promise = ClosurePromise(varargin{ii}); valueFutures{ii} = promise.Future; end end valueFutures = vertcat(valueFutures{:}); if isempty(valueFutures) valueFutures = ClosureFuture.empty(0, 1); end end % Get the current executor by looking at the current MapReducer. function executor = iGetCurrentExecutor() import matlab.bigdata.internal.executor.PartitionedArrayExecutor; executor = PartitionedArrayExecutor.default(); end % Check if the two executors are the same or simply return the new executor % if a previous one does not exist. function executor = iCheckSameExecutor(newExecutor, executor) if isempty(executor) executor = newExecutor; elseif ~checkSameExecutor(newExecutor, executor) error(message('MATLAB:bigdata:array:IncompatibleTallExecutor')); end end % Helper function that ensures all received function handles are instances % of the matlab.bigdata.internal.FunctionHandle class. function functionHandle = iParseFunctionHandle(functionHandle) import matlab.bigdata.internal.FunctionHandle; if ~isa(functionHandle, 'matlab.bigdata.internal.FunctionHandle') assert (isa(functionHandle, 'function_handle')); functionHandle = FunctionHandle(functionHandle); end end % Helper function that sets the datastore field of all the provided % LazyPartitionedArray instances. function varargout = iSetDatastore(ds, varargin) import matlab.bigdata.internal.lazyeval.LazyPartitionedArray; varargout = cell(size(varargin)); for ii = 1:numel(varargin) varargout{ii} = LazyPartitionedArray(varargin{ii}.ValueFuture, varargin{ii}.Executor, ds); end end % Check if the two datastores are the same or simply return the new % datastore if a previous one does not exist. function datastore = iCheckSameDatastore(newDatastore, datastore) if isempty(datastore) datastore = newDatastore; elseif newDatastore ~= datastore error(message('MATLAB:bigdata:array:IncompatibleTallDatastore')); end end