gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+lazyeval/LazyPartitionedArray.m

    %LazyPartitionedArray
% An implementation of the PartitionedArray interface that sits on top of
% the lazy evaluation architecture.

%   Copyright 2015-2016 The MathWorks, Inc.

classdef (Sealed, InferiorClasses = { ...
        ?matlab.bigdata.internal.BroadcastArray, ...
        ?matlab.bigdata.internal.FunctionHandle, ...
        ?matlab.bigdata.internal.LocalArray}) ...
        LazyPartitionedArray < matlab.bigdata.internal.PartitionedArray
    
    properties (SetAccess = private)
        % A future to the actual evaluated data.
        ValueFuture;
        
        % The datastore that is backing this partitioned array. This exists
        % to allow checking of compatibility between two datastore based
        % tall arrays. This will be empty for gathered arrays.
        Datastore;
    end
    
    properties (SetAccess = private, Transient)
        % The underlying execution environment backing this partitioned
        % array.
        Executor;
    end
    
    properties (Dependent)
        % A logical scalar that is true if and only if this Partitioned
        % array still has a valid PartitionedArrayExecutor.
        IsValid
    end
    
    properties (SetAccess = private, Transient)
        % Properties to support cheap preview / display / workspace browser info.
        HasPreviewData = false
        IsPreviewTruncated = false
        PreviewData = []
    end
    
    methods (Static)
        % Construct a LazyPartitionedArray instance from a datastore
        % instance.
        function obj = createFromDatastore(ds)
            import matlab.bigdata.internal.executor.PartitionedArrayExecutor;
            import matlab.bigdata.internal.lazyeval.ReadOperation;
            
            ds = copy(ds);
            PartitionedArrayExecutor.checkDatastoreSupportForDefault(ds);
            numOutputs = 1;
            obj = iDoOperation(ReadOperation(ds, numOutputs));
            obj = iSetDatastore(ds, obj);
        end
        
        % Construct a LazyPartitionedArray instance around a constant.
        function obj = createFromConstant(constant, executor)
            import matlab.bigdata.internal.lazyeval.LazyPartitionedArray;
            valueFuture = iParseDataInputs(constant);
            if nargin < 2
                executor = iGetCurrentExecutor();
            end
            % We pass in an empty into the datastore input so that this
            % array can be used with any other LazyPartitionedArray object.
            ds = [];
            obj = LazyPartitionedArray(valueFuture, executor, ds);
        end
    end
    
    methods
        function varargout = gather(varargin)
            
            % We combine all variables to be gathered in order to allow the
            % Spark integration to perform the gather in a single execution.
            if nargin ~= 1
                [varargin{:}] = clientfun(@deal, varargin{:});
            end
            
            % Before gathering, call the optimizer.
            op = matlab.bigdata.internal.Optimizer.default();
            op.optimize(varargin{:});

            evaluate(varargin{:});
            
            varargout = cell(size(varargin));
            for ii = 1:numel(varargin)
                assert (varargin{ii}.ValueFuture.IsDone);
                varargout{ii} = varargin{ii}.ValueFuture.Value;
            end
        end
        
        function varargout = elementfun(functionHandle, varargin)
            import matlab.bigdata.internal.lazyeval.ElementwiseOperation;
            functionHandle = iParseFunctionHandle(functionHandle);
            [varargout{1:nargout}] = iDoOperation(ElementwiseOperation(functionHandle, numel(varargin), nargout), varargin{:});
        end
        
        function varargout = slicefun(functionHandle, varargin)
            import matlab.bigdata.internal.lazyeval.SlicewiseOperation;
            functionHandle = iParseFunctionHandle(functionHandle);
            [varargout{1:nargout}] = iDoOperation(SlicewiseOperation(functionHandle, numel(varargin), nargout), varargin{:});
        end
        
        function varargout = filterslices(subs, varargin)
            import matlab.bigdata.internal.lazyeval.FilterOperation;
            [varargout{1:nargout}] = iDoOperation(FilterOperation(numel(varargin)), subs, varargin{:});
        end
        
        function out = union(varargin) %#ok<STOUT>
            assert(false, 'The method PartitionedArray/union is currently not supported.');
        end
        
        function varargout = reducefun(functionHandle, varargin)
            [varargout{1:nargout}] = aggregatefun(functionHandle, functionHandle, varargin{:});
        end
        
        function varargout = aggregatefun(initialFunctionHandle, reduceFunctionHandle, varargin)
            import matlab.bigdata.internal.lazyeval.AggregateOperation;
            initialFunctionHandle = iParseFunctionHandle(initialFunctionHandle);
            reduceFunctionHandle = iParseFunctionHandle(reduceFunctionHandle);
            operation = AggregateOperation(initialFunctionHandle, reduceFunctionHandle, numel(varargin), nargout);
            [varargout{1:nargout}] = iDoOperation(operation, varargin{:});
            [varargout{:}] = iSetDatastore([], varargout{:});
        end
        
        function [keys, varargout] = reducebykeyfun(functionHandle, keys, varargin)
            [keys, varargout{1:nargout - 1}] = aggregatebykeyfun(functionHandle, functionHandle, keys, varargin{:});
        end
        
        function [keys, varargout] = aggregatebykeyfun(initialFunctionHandle, reduceFunctionHandle, keys, varargin)
            import matlab.bigdata.internal.lazyeval.AggregateByKeyOperation;
            initialFunctionHandle = iParseFunctionHandle(initialFunctionHandle);
            reduceFunctionHandle = iParseFunctionHandle(reduceFunctionHandle);
            operation = AggregateByKeyOperation(initialFunctionHandle, reduceFunctionHandle, numel(varargin) + 1, nargout);
            [keys, varargout{1:nargout - 1}] = iDoOperation(operation, keys, varargin{:});
            [varargout{:}] = iSetDatastore([], varargout{:});
        end
        
        function [keys, varargout] = joinbykey(xKeys, x, yKeys, y) %#ok<STOUT,INUSD>
            assert(false, 'The method PartitionedArray/joinbykey is currently not supported.');
        end
        
        function varargout = chunkfun(functionHandle, varargin)
            import matlab.bigdata.internal.lazyeval.ChunkwiseOperation;
            functionHandle = iParseFunctionHandle(functionHandle);
            [varargout{1:nargout}] = iDoOperation(ChunkwiseOperation(functionHandle, numel(varargin), nargout), varargin{:});
        end
        
        function varargout = partitionfun(functionHandle, varargin)
            import matlab.bigdata.internal.lazyeval.PartitionwiseOperation;
            functionHandle = iParseFunctionHandle(functionHandle);
            [varargout{1:nargout}] = iDoOperation(PartitionwiseOperation(functionHandle, numel(varargin), nargout), varargin{:});
        end
        
        function markforreuse(varargin)
            import matlab.bigdata.internal.lazyeval.CacheOperation;
            for ii = 1:numel(varargin)
                newArray = iDoOperation(CacheOperation(), varargin{ii});
                varargin{ii}.ValueFuture = newArray.ValueFuture;
            end
        end
        
        function [ref, varargout] = repartition(ref, varargin) %#ok<STOUT>
            assert(false, 'The method PartitionedArray/repartition is currently not supported.');
        end
        
        % Get the executor underlying this partitioned array or error if it
        % no longer is valid.
        function executor = getExecutor(obj)
            executor = obj.Executor;
        end
    end
    
    methods (Hidden)
        % TODO: This is currently not in the specification.
        function varargout = clientfun(functionHandle, varargin)
            import matlab.bigdata.internal.lazyeval.NonPartitionedOperation;
            functionHandle = iParseFunctionHandle(functionHandle);
            [varargout{1:nargout}] = iDoOperation(NonPartitionedOperation(functionHandle, numel(varargin), nargout), varargin{:});
        end
        
        % PARTITIONHEADFUN Apply a partition-wise function handle that only
        % requires the first few slices of each partition to generate the
        % complete output.
        %
        % The function handle must obey the same rules as for partitionfun.
        % On top of this, the framework will assume that full evaluation of
        % this operation is fast enough for preview.
        function varargout = partitionheadfun(functionHandle, varargin)
            import matlab.bigdata.internal.lazyeval.PartitionwiseOperation;
            functionHandle = iParseFunctionHandle(functionHandle);
            dependsOnlyOnHead = true;
            [varargout{1:nargout}] = iDoOperation(PartitionwiseOperation(functionHandle, numel(varargin), nargout, dependsOnlyOnHead), varargin{:});
        end
        
        % STRICTSLICEFUN Perform a slicefun operation that does not support
        % singleton expansion.
        function varargout = strictslicefun(functionHandle, varargin)
            import matlab.bigdata.internal.lazyeval.SlicewiseOperation;
            functionHandle = iParseFunctionHandle(functionHandle);
            allowsTallDimExpansion = false;
            op = SlicewiseOperation(functionHandle, numel(varargin), nargout, allowsTallDimExpansion);
            [varargout{1:nargout}] = iDoOperation(op, varargin{:});
        end
    end
    
    methods (Hidden)
        % Everything to do with the cached preview data. Don't even ask about this stuff
        % if the array is gathered.
        function tf = hasCachedPreviewData(obj)
            assert(~obj.ValueFuture.IsDone);
            tf = obj.HasPreviewData;
        end
        function [data, isTruncated] = getCachedPreviewData(obj)
            assert(~obj.ValueFuture.IsDone);
            data = obj.PreviewData;
            isTruncated = obj.IsPreviewTruncated;
        end
        function setCachedPreviewData(obj, previewData, isTruncated)
            assert(~obj.ValueFuture.IsDone);
            assert(~obj.HasPreviewData);
            obj.PreviewData = previewData;
            obj.IsPreviewTruncated = isTruncated;
            obj.HasPreviewData = true;
        end
    end
    
    methods
        function tf = get.IsValid(obj)
            tf = obj.Executor.checkIsValid();
        end
    end
    
    methods (Static)
        % Deserialization to a LazyPartitionedArray object.
        function obj = loadobj(obj)
            % On load, we bind the lazy partitioned array with the current
            % executor.
            obj.Executor = iGetCurrentExecutor();
        end
    end
    
    methods (Access = private)
        % Private constructor for factory purposes.
        function obj = LazyPartitionedArray(valueFuture, executor, ds)
            import matlab.bigdata.internal.executor.PartitionedArrayExecutor;
            
            assert(isa(valueFuture, 'matlab.bigdata.internal.lazyeval.ClosureFuture') && isscalar(valueFuture));
            obj.ValueFuture = valueFuture;
            obj.Executor = executor;
            obj.Datastore = ds;
            obj.HasPreviewData = false;
            
            % This is to ensure execution environments with idle timeouts
            % have the chance to reset the timeout when new arrays are
            % created.
            if  executor.checkIsValidNow()
                executor.keepAlive();
            end
        end
        
        % The common error handling code for Partitioned Array evaluation.
        function wrapEvaluationError(obj, err) %#ok<INUSL>
            rethrow(err);
        end
        
        % Evaluate the provided array of LazyPartitionedArray instances.
        function evaluate(varargin)
            import matlab.bigdata.internal.executor.PartitionedArrayExecutor;
            import matlab.bigdata.internal.lazyeval.LazyTaskGraph;
            import matlab.bigdata.internal.serial.SerialExecutor;
            import matlab.bigdata.internal.util.isPreviewCheap;
            
            closures = cell(size(varargin));
            executor = matlab.bigdata.internal.executor.PartitionedArrayExecutor.override();
            for ii = 1:numel(varargin)
                if ~varargin{ii}.ValueFuture.IsDone
                    closures{ii} = varargin{ii}.ValueFuture.Promise.Closure;
                    if isempty(executor)
                        executor = getExecutor(varargin{ii});
                    end
                end
            end
            
            if executor.supportsSinglePartition()
                isGatherCheap = true(size(varargin));
                ii=1;
                while all(isGatherCheap) && ii<=numel(varargin)
                    [~, isGatherCheap(ii)] = isPreviewCheap(varargin{ii});
                    ii = ii + 1;
                end
                if all(isGatherCheap)
                    executor = SerialExecutor('UseSinglePartition', true);
                end
            end

            closures = vertcat(closures{:});
            if ~isempty(closures)
                taskGraph = LazyTaskGraph(closures);
                
                [outputs{1:numel(taskGraph.OutputTasks)}] = executor.execute(taskGraph);
                
                for ii = 1:numel(outputs)
                    task = taskGraph.OutputTasks(ii);
                    outputClosure = taskGraph.TaskToClosureMap(task.Id);
                    for jj = 1:numel(outputClosure.OutputPromises)
                        outputClosure.OutputPromises(jj).setValue(vertcat(outputs{ii}{:, jj}));
                    end
                end
            end
        end
    end
end

% Generate a closure for the operation and return a list of
% PartitionedArray instances representing each output.
function varargout = iDoOperation(operation, varargin)
import matlab.bigdata.internal.lazyeval.LazyPartitionedArray;
[valueFutures, executor, datastore] = iParseDataInputs(varargin{:});
if isempty(executor)
    executor = iGetCurrentExecutor();
end

closure = matlab.bigdata.internal.lazyeval.Closure(valueFutures, operation);

varargout = cell(numel(closure.OutputPromises), 1);
for ii = 1:numel(varargout)
    varargout{ii} = LazyPartitionedArray(closure.OutputPromises(ii).Future, executor, datastore);
end
end

% Helper function that parses the data inputs to PartitionedArray methods.
% This will return a list of futures to each input.
function [valueFutures, executor, datastore] = iParseDataInputs(varargin)
import matlab.bigdata.internal.BroadcastArray;
import matlab.bigdata.internal.lazyeval.ClosureFuture;
import matlab.bigdata.internal.lazyeval.ClosurePromise;
valueFutures = cell(size(varargin));
executor = [];
datastore = [];
for ii = 1:nargin
    % Parse dispatch based on type of input.
    if isa(varargin{ii}, 'matlab.bigdata.internal.lazyeval.LazyPartitionedArray')
        valueFutures{ii} = varargin{ii}.ValueFuture;
        executor = iCheckSameExecutor(getExecutor(varargin{ii}), executor);
        datastore = iCheckSameDatastore(varargin{ii}.Datastore, datastore);
    elseif isa(varargin{ii}, 'matlab.bigdata.internal.BroadcastArray')
        % A BroadcastArray will either contain a local array that should
        % be passed to all function calls or a PartitionedArray instance
        % that represents an array that should be passed to all function
        % calls. To support this, we simply convert the underlying array to
        % a 1 x 1 BroadcastArray instance. This instance will be passed
        % to all function calls as per the singleton expansion rules.
        
        % Parse dispatch based on type of underlying value.
        value = varargin{ii}.Value;
        if isa(value, 'matlab.bigdata.internal.lazyeval.LazyPartitionedArray')
            value = clientfun(@BroadcastArray, value);
            valueFutures{ii} = value.ValueFuture;
            executor = iCheckSameExecutor(getExecutor(value), executor);
            datastore = iCheckSameDatastore(value.Datastore, datastore);
        else % An unwrapped local array
            % Ensure that this else branch does not accept other unsupported
            % implementation of PartitionedArray that has not been dealt
            % with explicitly by their own elseif branch.
            assert(~isa(varargin{ii}, 'matlab.bigdata.internal.PartitionedArray'));
            
            promise = ClosurePromise(BroadcastArray(value));
            valueFutures{ii} = promise.Future;
        end
        
    elseif isa(varargin{ii}, 'matlab.bigdata.internal.LocalArray')
        promise = ClosurePromise(varargin{ii}.Value);
        valueFutures{ii} = promise.Future;
        
    else % An unwrapped local array
        % Ensure that this else branch does not accept other unsupported
        % implementation of PartitionedArray that has not been dealt
        % with explicitly by their own elseif branch.
        assert(~isa(varargin{ii}, 'matlab.bigdata.internal.PartitionedArray'));
        
        promise = ClosurePromise(varargin{ii});
        valueFutures{ii} = promise.Future;
    end
end
valueFutures = vertcat(valueFutures{:});
if isempty(valueFutures)
    valueFutures = ClosureFuture.empty(0, 1);
end
end

% Get the current executor by looking at the current MapReducer.
function executor = iGetCurrentExecutor()
import matlab.bigdata.internal.executor.PartitionedArrayExecutor;
executor = PartitionedArrayExecutor.default();
end

% Check if the two executors are the same or simply return the new executor
% if a previous one does not exist.
function executor = iCheckSameExecutor(newExecutor, executor)
if isempty(executor)
    executor = newExecutor;
elseif ~checkSameExecutor(newExecutor, executor)
    error(message('MATLAB:bigdata:array:IncompatibleTallExecutor'));
end
end

% Helper function that ensures all received function handles are instances
% of the matlab.bigdata.internal.FunctionHandle class.
function functionHandle = iParseFunctionHandle(functionHandle)
import matlab.bigdata.internal.FunctionHandle;
if ~isa(functionHandle, 'matlab.bigdata.internal.FunctionHandle')
    assert (isa(functionHandle, 'function_handle'));
    functionHandle = FunctionHandle(functionHandle);
end
end

% Helper function that sets the datastore field of all the provided
% LazyPartitionedArray instances.
function varargout = iSetDatastore(ds, varargin)
import matlab.bigdata.internal.lazyeval.LazyPartitionedArray;
varargout = cell(size(varargin));
for ii = 1:numel(varargin)
    varargout{ii} = LazyPartitionedArray(varargin{ii}.ValueFuture, varargin{ii}.Executor, ds);
end
end

% Check if the two datastores are the same or simply return the new
% datastore if a previous one does not exist.
function datastore = iCheckSameDatastore(newDatastore, datastore)
if isempty(datastore)
    datastore = newDatastore;
elseif newDatastore ~= datastore
    error(message('MATLAB:bigdata:array:IncompatibleTallDatastore'));
end
end