gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+executor/CompositeDataProcessor.m

    %CompositeDataProcessor
% An implementation of the DataProcessor interface that wraps around a
% graph of DataProcessor instances. All DataProcessor instances must be
% non-communicating except for last or most downstream DataProcessor.

%   Copyright 2015-2016 The MathWorks, Inc.

classdef CompositeDataProcessor < matlab.bigdata.internal.executor.DataProcessor
    % Properties overridden in the DataProcessor interface.
    properties (SetAccess = private)
        IsFinished = false;
        IsMoreInputRequired;
    end
    
    properties (SetAccess = immutable)
        % A collection of CompositeDataProcessorNode instances. Each node
        % wraps a single DataProcessor.
        Nodes;
    end
    
    % Methods overridden in the DataProcessor interface.
    methods
        function [data, partitionIndices] = process(obj, isLastOfGlobalInput, varargin)
            isMoreGlobalInputRequired = false(size(obj.IsMoreInputRequired));
            for ii = 1:numel(obj.Nodes)
                % This logic exists in order to ensure inputs arrive at
                % similar data rates.
                %
                % For a given iteration, we attempt to process only the nodes
                % whose output are needed to generate the next chunk of
                % output of the entire CompositeDataProcessor. We can only
                % skip a node if the upstream dependencies have not either
                % been evaluated or generated output. This is because there
                % is no persistence on output, it will be lost if it has
                % not been passed to the dependent processors.
                node = obj.Nodes(ii);
                if node.IsMoreOutputRequired ...
                        || any(~cellfun(@isempty, {node.InputNodes.Output}))
                    isMoreGlobalInputRequired = process(node, isLastOfGlobalInput, varargin, isMoreGlobalInputRequired);
                end
            end
            
            obj.IsFinished = [obj.Nodes(end).IsFinished];
            obj.IsMoreInputRequired = isMoreGlobalInputRequired;
            data = obj.Nodes(end).Output;
            partitionIndices = obj.Nodes(end).PartitionIndices;
            
            markOutputAsConsumed(obj.Nodes(end), true);
            for ii = numel(obj.Nodes) - 1 : -1 : 1
                markOutputAsConsumed(obj.Nodes(ii), false);
            end
        end
    end
    
    methods (Access = ?matlab.bigdata.internal.executor.CompositeDataProcessorBuilder)
        % Private constructor for the builder.
        function obj = CompositeDataProcessor(nodes, numGlobalInputs)
            obj.Nodes = nodes;
            obj.IsMoreInputRequired = true(1, numGlobalInputs);
        end
    end
end