gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+lazyeval/NonPartitionedOperation.m

    %NonPartitionedOperation
% An operation that requires non-partitioned inputs.

% Copyright 2015-2016 The MathWorks, Inc.

classdef (Sealed) NonPartitionedOperation < matlab.bigdata.internal.lazyeval.Operation
    properties (SetAccess = immutable)
        % The function handle for the operation.
        FunctionHandle;
    end
    
    methods
        % The main constructor.
        function obj = NonPartitionedOperation(functionHandle, numInputs, numOutputs)
            obj = obj@matlab.bigdata.internal.lazyeval.Operation(numInputs, numOutputs);
            obj.FunctionHandle = functionHandle;
        end
    end
    
    % Methods overridden in the Operation interface.
    methods
        function tasks = createExecutionTasks(obj, taskDependencies, inputFutureMap, isInputReplicated)
            import matlab.bigdata.internal.executor.ExecutionTask;
            import matlab.bigdata.internal.lazyeval.InputFutureMap;
            import matlab.bigdata.internal.lazyeval.NonPartitionedProcessor;
            import matlab.bigdata.internal.lazyeval.PassthroughProcessor;
            import matlab.bigdata.internal.lazyeval.PartitionwiseProcessor;
            import matlab.bigdata.internal.FunctionHandle;
            
            % If some of the inputs are not guaranteed a single
            % partition, we inject an all-to-one task to ensure only a
            % single partition. This also attaches the partition index
            % so that the collected partition can be sorted before
            % calling the function handle.
            tasks = ExecutionTask.empty();
            for ii = 1:numel(taskDependencies)
                if ~isInputReplicated(ii)
                    newTask = ExecutionTask.createAllToOneTask(taskDependencies(ii), PassthroughProcessor.createFactory());
                    taskDependencies(ii) = newTask;
                    tasks(end + 1) = newTask; %#ok<AGROW>
                end
            end
            
            processorFactory = NonPartitionedProcessor.createFactory(...
                obj.FunctionHandle, obj.NumOutputs, inputFutureMap);
            
            tasks(end + 1) = ExecutionTask.createBroadcastTask(taskDependencies, processorFactory, 'IsPassBoundary', true);
        end
    end
end