gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+lazyeval/InputBuffer.m
%InputBuffer % A helper class that acts as a multi-buffer for the input data to be % passed to operations by the data processor implementations. % % This contains the logic to cache inputs from multiple sources until they % are required by the current DataProcessor. % % This also contains the logic necessary to do balancing on the data rates % of inputs. As soon as any back-end does caching, there is the possibility % that two different inputs to a data processor will be calculated/read at % different rates. Several data processors require to operate on slices % from multiple inputs in lock-step. This class assists with this % requirement. % % Copyright 2015-2016 The MathWorks, Inc. classdef (Sealed) InputBuffer < handle properties (SetAccess = immutable) % This contains a flag for each input that describes whether that % input consists of a single partition. Effectively, will we see % all of that input. IsInputSinglePartition; end properties (SetAccess = private) % The internal buffer held by this instance. Buffer; % A list of logical values. IsBufferInitialized; % The number of slices in the buffer of each input. NumBufferedSlices; % A flag that is true if and only if this class has determined for % all inputs whether each input is guaranteed to be single slice or % guaranteed not to be. HasDeterminedSingleSliceInputs = false; % A logical per input that describes whether it is % guaranteed that the given input consists of only a single slice. IsInputSingleSlice; end properties (Dependent, SetAccess = private) % The current NumBufferedSlices for the largest non-singleton buffer. % This ignores buffers for inputs that are known to be single slice % and have finished. LargestNumBufferedSlices; end methods % The main constructor. function obj = InputBuffer(numInputs, isInputSinglePartition) assert (numInputs == numel(isInputSinglePartition)); obj.IsInputSinglePartition = isInputSinglePartition(:)'; obj.Buffer = cell(1, numInputs); obj.IsBufferInitialized = false(1, numInputs); obj.NumBufferedSlices = zeros(1, numInputs); obj.IsInputSingleSlice = false(1, numInputs); end function val = get.LargestNumBufferedSlices(obj) bufferSizes = obj.NumBufferedSlices(~obj.IsInputSingleSlice); if isempty(bufferSizes) val = 0; else val = max(bufferSizes); end end % Demux and then add a collection of multiplexed inputs to the buffer. function add(obj, isLastOfInputs, varargin) % To differentiate between no data and [], each input is a cell % array with each cell containing chunks of the data. assert (numel(varargin) == numel(isLastOfInputs)); assert (numel(varargin) == numel(obj.Buffer)); for ii = 1:numel(obj.Buffer) if obj.IsBufferInitialized(ii) obj.Buffer{ii} = vertcat(obj.Buffer{ii}, varargin{ii}{:}); elseif ~isempty(varargin{ii}) obj.Buffer{ii} = matlab.bigdata.internal.util.vertcatCellContents(varargin{ii}); obj.IsBufferInitialized(ii) = true; end obj.NumBufferedSlices(ii) = size(obj.Buffer{ii}, 1); end % This object needs to determine if a given input contains only % a single slice for the purposes of singleton expansion. It % does this by waiting until either isLastOfInputs is true or % more than a single slice of data is received. if ~obj.HasDeterminedSingleSliceInputs % Until we have determined all the single slice inputs, we % need to keep updating the IsInputSingleSlice property to % reflect the ones we do know about. isGuaranteedSingleSliceVector = obj.IsInputSinglePartition & isLastOfInputs & obj.NumBufferedSlices == 1; obj.IsInputSingleSlice = isGuaranteedSingleSliceVector; isSingleSliceDeterminedVector = isLastOfInputs | obj.NumBufferedSlices > 1 | ~obj.IsInputSinglePartition; obj.HasDeterminedSingleSliceInputs = all(obj.IsBufferInitialized & isSingleSliceDeterminedVector); end end % Get all inputs in the buffer. function inputs = getAll(obj) inputs = get(obj, inf); end % Get the first slices of the buffer for each input such that each % non-singleton input has the same number of slices. function [inputs, numSlices] = getCompleteSlices(obj, maxNumSlices) numSlices = min(obj.NumBufferedSlices(~obj.IsInputSingleSlice)); if nargin >= 2 numSlices = min(numSlices, maxNumSlices); end if isempty(numSlices) numSlices = 0; end inputs = get(obj, numSlices); end % Get the first n slices of the buffer of each input. function inputs = get(obj, n) assert (all(obj.IsBufferInitialized)); inputs = cell(size(obj.Buffer)); for ii = 1:numel(inputs) if obj.IsInputSingleSlice(ii) inputs{ii} = obj.Buffer{ii}; else [inputs{ii}, obj.Buffer{ii}] = iSplit(obj.Buffer{ii}, n); obj.NumBufferedSlices(ii) = size(obj.Buffer{ii}, 1); end end end end end % Helper function for splitting an input buffer into the first n elements. function [out, buffer] = iSplit(buffer, numSlices) isBufferComplex = isnumeric(buffer) && ~isreal(buffer); sz = size(buffer); numSlices = min(numSlices, sz(1)); if numSlices == sz(1) out = buffer; buffer = buffer([], :); else out = buffer(1:numSlices, :); buffer = buffer(numSlices + 1 : end, :); end if numel(sz) > 2 out = reshape(out, [numSlices, sz(2:end)]); buffer = reshape(buffer, [sz(1) - numSlices, sz(2:end)]); end % If the input buffer was complex, we need to ensure that both "out" and the % return "buffer" are also complex - the indexing expressions above might have % dropped the imaginary part if it is all zero. if isBufferComplex if isreal(buffer) buffer = complex(buffer); end if isreal(out) out = complex(out); end end end