gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+lazyeval/ReadProcessor.m

    %ReadProcessor
% Data Processor that reads a chunk from the datastore on each iteration.
%
% See LazyTaskGraph for a general description of input and outputs.
% Specifically, each iteration will emit a 1 x 1 cell array containing a
% chunk of data read from the internally held datastore.
%

%   Copyright 2015-2016 The MathWorks, Inc.

classdef (Sealed) ReadProcessor < matlab.bigdata.internal.executor.DataProcessor
    % Properties overridden in the DataProcessor interface.
    properties (SetAccess = private)
        IsFinished = false;
        IsMoreInputRequired = false(0, 1);
    end
    
    properties (SetAccess = immutable)
        % The underlying datastore consisting of all the data represented
        % by the current partition.
        Datastore;
        
        % A flag that is true if and only if each read must be wrapped in a
        % cell.
        RequiresCells;
        
        % A chunk of output from the datastore that has size zero in the
        % tall dimension. This exists to handle the case when a partition
        % contains no data, the framework makes the guarantee that it will
        % pass forward a correctly sized empty.
        EmptyChunk;
        
        % The chunk size to emit from this processor. This can be NaN,
        % which indicates to use the raw Datastore reads as the chunk size.
        ChunkSize = NaN;
        
        % A buffer for building up the output chunk. This is non-empty and
        % used if and only if ChunkSize is not NaN.
        OutputBuffer;
    end
    
    properties (SetAccess = private)
        % A logical scalar that is true if and only if there has been at
        % least one call to the process method.
        HasEmittedFirstChunk = false;
    end
    
    methods (Static)
        % Create a data processor factory that can be used by the execution
        % environment to construct instances of this class.
        %
        % Inputs:
        %  - originalDatastore is the corresponding datastore instance that
        %  this processor will read. It is used here to generate an empty
        %  chunk.
        function factory = createFactory(originalDatastore)
            previewChunk = preview(originalDatastore);

            % Below subsref/substruct calculates a generic empty chunk for a table,i
            % an array or a cell. It works for TallDatastore.
            if isa(originalDatastore, 'matlab.io.datastore.TallDatastore') ...
                    || matlab.bigdata.internal.util.isTabularDatastore(originalDatastore)
                emptyChunk = subsref(previewChunk, substruct('()', [{[]}, repmat({':'}, 1, ndims(previewChunk) - 1)]));
            else
                emptyChunk = cell(0, 1);
            end

            factory = @createReadProcessor;
            function dataProcessor = createReadProcessor(partition)
                import matlab.bigdata.internal.lazyeval.ReadProcessor;
                
                partitionedDatastore = partition.createDatastore();
                dataProcessor = ReadProcessor(partitionedDatastore, emptyChunk);
            end
        end
    end
    
    % Methods overridden in the DataProcessor interface.
    methods
        function data = process(obj, ~)
            %PROCESS Perform the next iteration of processing
            
            % If we already know we're done, just return nothing.
            if obj.IsFinished
                data = cell(0, 1);
                return;
            end
            
            % This object emits an empty cell on the very first call
            % to process because of caching and CompositeDataProcessor.
            % There are cases where a CompositeDataProcessor owns a
            % ReadProcessor but due to caching, no data is required
            % from the datastore. However, CompositeDataProcessor
            % requires to call the process method of all underlying
            % processors at least once. For this reason, we avoid
            % calling datastore read method until the second call to
            % process, when we know the data will be used.
            if ~obj.HasEmittedFirstChunk
                data = cell(0, 1);
                obj.HasEmittedFirstChunk = true;
                return;
            end
            
            % If the datastore starts with no data, we emit an empty chunk
            % because we are required to emit at least one chunk before
            % IsFinished can be set to true. We should only hit this if the
            % partition was empty, otherwise IsFinished will have been set
            % on the previous call to read.
            if ~hasdata(obj.Datastore) && obj.OutputBuffer.NumBufferedSlices == 0
                data = {obj.EmptyChunk};
                obj.IsFinished = true;
                return;
            end
            
            % Read some data and see if we are done.
            if isnan(obj.ChunkSize)
                data = {iRead(obj.Datastore, obj.RequiresCells)};
            else
                % We do not special case the instance where datastore/read
                % emits a chunk of size exactly ChunkSize because the
                % performance loss of going through the buffer is
                % negligible compared against the IO.
                while obj.OutputBuffer.NumBufferedSlices < obj.ChunkSize && hasdata(obj.Datastore)
                    obj.OutputBuffer.add(false, {iRead(obj.Datastore, obj.RequiresCells)});
                end
                data = obj.OutputBuffer.get(obj.ChunkSize);
            end
            obj.IsFinished = ~hasdata(obj.Datastore) && obj.OutputBuffer.NumBufferedSlices == 0;
        end
    end
    
    methods (Access = private)
        % Private constructor for factory method.
        function obj = ReadProcessor(datastore, emptyChunk)
            obj.Datastore = datastore;
            obj.RequiresCells = ~isa(datastore, 'matlab.io.datastore.TallDatastore') && ~matlab.bigdata.internal.util.isTabularDatastore(datastore);
            obj.EmptyChunk = emptyChunk;
            
            if isprop(datastore, 'ReadSize') && isnumeric(datastore.ReadSize)
                obj.ChunkSize = datastore.ReadSize;
                
                % We only ever require to wrap the output of datastore/read
                % if ReadSize is 1.
                obj.RequiresCells = obj.RequiresCells && datastore.ReadSize == 1;
            end
            obj.OutputBuffer = matlab.bigdata.internal.lazyeval.InputBuffer(1, false);
        end
    end
end

% Helper function that performs a read and optionally places the data in a
% cell.
function data = iRead(ds, requiresCell)
data = read(ds);

% If the datastore is not tabular, the read method will
% return the contents of a single element of a cell array.
% We need to wrap this single element back up in a cell to
% conform with readall.
if requiresCell
    data = {data};
end
end