gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+lazyeval/ReadProcessor.m
%ReadProcessor % Data Processor that reads a chunk from the datastore on each iteration. % % See LazyTaskGraph for a general description of input and outputs. % Specifically, each iteration will emit a 1 x 1 cell array containing a % chunk of data read from the internally held datastore. % % Copyright 2015-2016 The MathWorks, Inc. classdef (Sealed) ReadProcessor < matlab.bigdata.internal.executor.DataProcessor % Properties overridden in the DataProcessor interface. properties (SetAccess = private) IsFinished = false; IsMoreInputRequired = false(0, 1); end properties (SetAccess = immutable) % The underlying datastore consisting of all the data represented % by the current partition. Datastore; % A flag that is true if and only if each read must be wrapped in a % cell. RequiresCells; % A chunk of output from the datastore that has size zero in the % tall dimension. This exists to handle the case when a partition % contains no data, the framework makes the guarantee that it will % pass forward a correctly sized empty. EmptyChunk; % The chunk size to emit from this processor. This can be NaN, % which indicates to use the raw Datastore reads as the chunk size. ChunkSize = NaN; % A buffer for building up the output chunk. This is non-empty and % used if and only if ChunkSize is not NaN. OutputBuffer; end properties (SetAccess = private) % A logical scalar that is true if and only if there has been at % least one call to the process method. HasEmittedFirstChunk = false; end methods (Static) % Create a data processor factory that can be used by the execution % environment to construct instances of this class. % % Inputs: % - originalDatastore is the corresponding datastore instance that % this processor will read. It is used here to generate an empty % chunk. function factory = createFactory(originalDatastore) previewChunk = preview(originalDatastore); % Below subsref/substruct calculates a generic empty chunk for a table,i % an array or a cell. It works for TallDatastore. if isa(originalDatastore, 'matlab.io.datastore.TallDatastore') ... || matlab.bigdata.internal.util.isTabularDatastore(originalDatastore) emptyChunk = subsref(previewChunk, substruct('()', [{[]}, repmat({':'}, 1, ndims(previewChunk) - 1)])); else emptyChunk = cell(0, 1); end factory = @createReadProcessor; function dataProcessor = createReadProcessor(partition) import matlab.bigdata.internal.lazyeval.ReadProcessor; partitionedDatastore = partition.createDatastore(); dataProcessor = ReadProcessor(partitionedDatastore, emptyChunk); end end end % Methods overridden in the DataProcessor interface. methods function data = process(obj, ~) %PROCESS Perform the next iteration of processing % If we already know we're done, just return nothing. if obj.IsFinished data = cell(0, 1); return; end % This object emits an empty cell on the very first call % to process because of caching and CompositeDataProcessor. % There are cases where a CompositeDataProcessor owns a % ReadProcessor but due to caching, no data is required % from the datastore. However, CompositeDataProcessor % requires to call the process method of all underlying % processors at least once. For this reason, we avoid % calling datastore read method until the second call to % process, when we know the data will be used. if ~obj.HasEmittedFirstChunk data = cell(0, 1); obj.HasEmittedFirstChunk = true; return; end % If the datastore starts with no data, we emit an empty chunk % because we are required to emit at least one chunk before % IsFinished can be set to true. We should only hit this if the % partition was empty, otherwise IsFinished will have been set % on the previous call to read. if ~hasdata(obj.Datastore) && obj.OutputBuffer.NumBufferedSlices == 0 data = {obj.EmptyChunk}; obj.IsFinished = true; return; end % Read some data and see if we are done. if isnan(obj.ChunkSize) data = {iRead(obj.Datastore, obj.RequiresCells)}; else % We do not special case the instance where datastore/read % emits a chunk of size exactly ChunkSize because the % performance loss of going through the buffer is % negligible compared against the IO. while obj.OutputBuffer.NumBufferedSlices < obj.ChunkSize && hasdata(obj.Datastore) obj.OutputBuffer.add(false, {iRead(obj.Datastore, obj.RequiresCells)}); end data = obj.OutputBuffer.get(obj.ChunkSize); end obj.IsFinished = ~hasdata(obj.Datastore) && obj.OutputBuffer.NumBufferedSlices == 0; end end methods (Access = private) % Private constructor for factory method. function obj = ReadProcessor(datastore, emptyChunk) obj.Datastore = datastore; obj.RequiresCells = ~isa(datastore, 'matlab.io.datastore.TallDatastore') && ~matlab.bigdata.internal.util.isTabularDatastore(datastore); obj.EmptyChunk = emptyChunk; if isprop(datastore, 'ReadSize') && isnumeric(datastore.ReadSize) obj.ChunkSize = datastore.ReadSize; % We only ever require to wrap the output of datastore/read % if ReadSize is 1. obj.RequiresCells = obj.RequiresCells && datastore.ReadSize == 1; end obj.OutputBuffer = matlab.bigdata.internal.lazyeval.InputBuffer(1, false); end end end % Helper function that performs a read and optionally places the data in a % cell. function data = iRead(ds, requiresCell) data = read(ds); % If the datastore is not tabular, the read method will % return the contents of a single element of a cell array. % We need to wrap this single element back up in a cell to % conform with readall. if requiresCell data = {data}; end end