gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+optimizer/FusingOptimizer.m
%FusingOptimizer Optimizer that attempts to fuse closures. Primarily %aggregations. % Copyright 2016 The MathWorks, Inc. classdef FusingOptimizer < matlab.bigdata.internal.Optimizer properties % Set MaxFuseAttempts to 1 since we currently expect all aggregation fusing to % occur in the first pass. (This avoids recalculating the graph) MaxFuseAttempts = 1; Debug = false; end methods (Access = private) function tf = combineAggregations(~, closureGraph, dist, redDepths) tf = false; isAggregation = closureGraph.Nodes.OpType == 'AggregateOperation'; uniqueReductionDepths = unique(redDepths(isAggregation)); for depthId = 1:numel(uniqueReductionDepths) rows = (redDepths == uniqueReductionDepths(depthId)) & isAggregation; if sum(rows) > 1 % Assert that there are no data dependencies. fusingDistances is a distance % matrix for the nodes we're trying to fuse. It should be 0 % on the diagonal and Inf off the diagonal. fusingDistances = dist(rows, rows); expectedDistances = (1 ./ eye(size(fusingDistances))) - 1; assert(isequal(fusingDistances, expectedDistances), ... 'Attempt to fuse operations with a data dependency.'); % Got multiple aggregations to fuse aggregationsToFuseC = closureGraph.Nodes.NodeObj(rows); iFuseAggregations([aggregationsToFuseC{:}]); tf = true; end end end function tf = combineAggregationsByKey(~, closureGraph, dist, redDepths) % The constraints for combineAggregationsByKey are quite severe. We need to find % two or more AggregateByKey operations (reducebykeyfun operations end % up here too) where the key inputs are identical, and the reduction % depths are the same, and the tall sizes for all inputs should be % identical too. tf = false; isAggregationByKey = closureGraph.Nodes.OpType == 'AggregateByKeyOperation'; uniqueReductionDepths = unique(redDepths(isAggregationByKey)); for depthId = 1:numel(uniqueReductionDepths) rowIsThisDepth = (redDepths == uniqueReductionDepths(depthId)) & isAggregationByKey; numAggByKey = sum(rowIsThisDepth); if numAggByKey > 1 % We need to group by unique key inputs. We need to walk back up the execution % graph from the first input to the closure, stepping over % "iAssertAdaptorInfoCorrect" nodes, looking for a common % ClosurePromise. aggByKeyObjsCell = closureGraph.Nodes.NodeObj(rowIsThisDepth); keyInputPromiseIds = cellfun(@(closure) iFindUnderlyingPromiseId(closure.InputFutures(1)), ... aggByKeyObjsCell, 'UniformOutput', false); aggregationGroup = findgroups(keyInputPromiseIds); for groupIdx = 1:max(aggregationGroup) match = aggregationGroup == groupIdx; numMatches = sum(match); if numMatches > 1 % Assert that there are no data dependencies. fusingDistances is a distance % matrix for the nodes we're trying to fuse. It should be 0 % on the diagonal and Inf off the diagonal. fusingIdx = find(rowIsThisDepth); fusingIdx = fusingIdx(match); fusingDistances = dist(fusingIdx, fusingIdx); expectedDistances = (1 ./ eye(size(fusingDistances))) - 1; assert(isequal(fusingDistances, expectedDistances), ... 'Attempt to fuse operations with a data dependency.'); % Actually fuse the aggregations. iFuseAggregationsByKey([aggByKeyObjsCell{match}]); tf = true; end end end end end end methods function tf = optimize(obj, varargin) done = false; tf = false; count = 0; closureGraph = matlab.bigdata.internal.optimizer.ClosureGraph(varargin{:}); while ~done graphObj = closureGraph.Graph; % We need to generate a topological sort of the graph so that we can calculate % the tall sizes from the graph connectivity order = toposort(graphObj); % tallSize is +ve for real known sizes, -ve for 'symbolic' sizes. Symbolic sizes % are sizes that we know are identical to one another, but we % don't (yet) know the actual size. tallSize = nan(numel(order), 1); % reductionDepth is zero for nodes with no predecessors, and increases as % reductions are encountered. reductionDepth = zeros(numel(order), 1); % reductionCombinations is a map from a list of input depths to a resulting % output depth. It's used to ensure that we don't create a new % reduction depth for each time we encounter a given combination % of input reduction depths. reductionCombinations = containers.Map(); % inSize is the input size Id for a given node, or NaN if multiple sizes are % input to a single node. We use this to be conservative about % fusing aggregations - we'll only fuse those aggregations where % we can prove they have well-known and unique input sizes. inSize = nan(numel(order), 1); % Extract the node list up front so that we minimise the number of times we use % the digraph/subsref implementation. nodeList = graphObj.Nodes.NodeObj; opTypes = graphObj.Nodes.OpType; % Topological sort guarantees that each node we iterate over has all predecessor % information available for idx = 1:numel(order) nodeIdx = order(idx); nodeObj = nodeList{nodeIdx}; opType = opTypes(nodeIdx); [tallSize, reductionDepth, inSize(nodeIdx)] = ... iCalcTallSizeReductionDepth(graphObj, nodeObj, nodeIdx, opType, tallSize, reductionDepth, ... reductionCombinations); end % Calculate all (directed) distances between nodes so that we can check that we % don't accidentally try to fuse aggregations which have data % dependencies between them. dist = distances(graphObj); combined = combineAggregations(obj, graphObj, dist, reductionDepth); combined = combineAggregationsByKey(obj, graphObj, dist, reductionDepth) || combined; count = 1 + count; done = ~combined || count >= obj.MaxFuseAttempts; tf = tf || combined; if ~done && combined % We made changes, recompute the closure graph. recalculate(closureGraph); end end end end end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Derive a real or symbolic tall output size for a given closure. Here we rely % on the fact that all outputs for any type of closure must have the same tall % size. % % Since the computation is very closely related, we also compute the updated % "reduction depth" while we're here. % % For elementwise and slicewise operation types, if all the input sizes are % identical (or known to be 1), then the output size is the same as all input % sizes. In all other cases, a new symbolic output size is allocated. function [tsz, redDepth] = iDeriveTallSizeReductionDepthForClosure(opType, ... predecessorIdxs, tszs, redDepths, redCombin) % Default: allocate a new symbolic tall size tsz = min(tszs - 1); % Default: allocate a new reduction depth redDepth = max(redDepths) + 1; % For elementwise and slicewise, we know that we *might* be able to propagate % the size isSizePreserving = opType == 'ElementwiseOperation' || ... opType == 'SlicewiseOperation'; if isSizePreserving inputSizes = tszs(predecessorIdxs); % Ignore inputs which are size 1 in the tall dimension inputSizes(inputSizes == 1) = []; if isempty(inputSizes) && ~isempty(predecessorIdxs) % There were some precessors, but they were all size 1 in the tall dimension. tsz = 1; elseif numel(unique(inputSizes)) == 1 % All sizes the same, propagate. These sizes might be real +ve sizes, or % symbolic -ve sizes. tsz = inputSizes(1); end end isDepthPreserving = opType == 'ElementwiseOperation' || ... opType == 'SlicewiseOperation' || ... opType == 'FilterOperation' || ... opType == 'ChunkwiseOperation'; uniqueInputDepths = unique(redDepths(predecessorIdxs)); if isDepthPreserving redDepth = max(uniqueInputDepths); else depthKey = num2str(uniqueInputDepths); if redCombin.isKey(depthKey) redDepth = redCombin(depthKey); else % Use the new value redCombin(depthKey) = redDepth; %#ok<NASGU> handle end end end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % This function is called in a loop on a topologically sorted list of nodes. It % updates elements of 'tszs' - the tall output size array, and also calculates % 'insz', the tall size of the inputs to the closure function [tszs, redDepths, insz] = iCalcTallSizeReductionDepth(graphObj, node, nodeIdx, opType, ... tszs, redDepths, redCombin) pred = predecessors(graphObj, nodeIdx); if isClosure(node) if isempty(pred) % No predecessors - allocate a new symbolic size for the output of this closure. tszs(nodeIdx) = min(-1, min(tszs) - 1); % reductionDepth is initialized to zero - nothing to do here. else % Need to derive from inputs [tszs(nodeIdx), redDepths(nodeIdx)] = ... iDeriveTallSizeReductionDepthForClosure(opType, pred, tszs, redDepths, redCombin); end elseif isPromise(node) && node.IsDone % Completed promise - we have the actual data, so use that to get a real size. tszs(nodeIdx) = size(node.CachedValue, 1); % reductionDepth can stay at zero. elseif isPromise(node) || isFuture(node) % Uncompleted promise or any future - output size is simply input size. assert(isscalar(pred)); assert(~isnan(tszs(pred))); tszs(nodeIdx) = tszs(pred); % Likewise, reduction depth unchanged redDepths(nodeIdx) = redDepths(pred); else assert(false); end % Compute the input size for this node. If all predecessor output sizes are % identical, that's our 'input size'. If they aren't, then we don't know % what the input size is, so return NaN. inszs = tszs(pred); if ~isempty(inszs) && numel(unique(inszs)) == 1 insz = inszs(1); else insz = NaN; end end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Given a list of closures, inject upstream closures to wrap inputs in a scalar % cell array. Return an array of futures that corresponds to all the outputs of % the given closures. function futures = iAddEncellClosures(numInPerClosure, closuresToFuse) import matlab.bigdata.internal.lazyeval.ElementwiseOperation; import matlab.bigdata.internal.lazyeval.EncellificationOperation; import matlab.bigdata.internal.lazyeval.Closure; import matlab.bigdata.internal.FunctionHandle; % Allocate a cell array that will contain the futures prior to encellification. preEncellClosureInputFutures = cell(1, sum(numInPerClosure)); % Iterate over the to-be-fused closures, nextInputIdx = 1; for cidx = 1:numel(closuresToFuse) closure = closuresToFuse(cidx); for iidx = 1:numInPerClosure(cidx) % If MaxNumSlices is finite, we need a dummy operation to % attach this constraint. This is so that the en-cellification % operation itself does not have to deal with this. fcn = FunctionHandle(@deal, 'MaxNumSlices', ... closure.Operation.PerChunkFunctionHandle.MaxNumSlices); op = ElementwiseOperation(fcn, 1, 1); newClosure = Closure(closure.InputFutures(iidx), op); preEncellClosureInputFutures{nextInputIdx} = newClosure.OutputPromises.Future; nextInputIdx = 1 + nextInputIdx; end end preEncellClosureInputFutures = vertcat(preEncellClosureInputFutures{:}); op = EncellificationOperation(numel(preEncellClosureInputFutures)); newClosure = Closure(preEncellClosureInputFutures, op); futures = vertcat(newClosure.OutputPromises.Future); fcn = FunctionHandle(@deal); op = ElementwiseOperation(fcn, 1, 1); for iidx = 1:numel(futures) % TODO(g1394174): This is a workaround to an issue where % the optimizer errors if identical edges exist between one % source and destination node. newClosure = Closure(futures(iidx), op); futures(iidx) = newClosure.OutputPromises.Future; end end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Given an array of promises referring to cell data, add closures to remove % the layer of cell-array function promises = iAddDecellClosures(cellifiedPromises) import matlab.bigdata.internal.lazyeval.ChunkwiseOperation; import matlab.bigdata.internal.lazyeval.Closure; import matlab.bigdata.internal.FunctionHandle; promises = cell(1, numel(cellifiedPromises)); for outIdx = 1:numel(cellifiedPromises) cls = Closure(cellifiedPromises(outIdx).Future, ... ChunkwiseOperation(FunctionHandle(@(x) x{1}), 1, 1)); promises{outIdx} = cls.OutputPromises; end promises = vertcat(promises{:}); end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Fusing aggregation starts as follows: we need to en-cellify all inputs, then % build a fused aggregation closure, then de-cellify all outputs. function iFuseAggregations(closuresToFuse) import matlab.bigdata.internal.lazyeval.AggregateOperation; import matlab.bigdata.internal.lazyeval.ChunkwiseOperation; import matlab.bigdata.internal.lazyeval.Closure; import matlab.bigdata.internal.FunctionHandle; numInPerClosure = arrayfun(@(c) numel(c.InputFutures), closuresToFuse); numOutPerClosure = arrayfun(@(c) numel(c.OutputPromises), closuresToFuse); % Add calls to encell-ify the inputs fusedClosureInputFutures = iAddEncellClosures(numInPerClosure, closuresToFuse); originalOperations = arrayfun(@(c) c.Operation, closuresToFuse, 'UniformOutput', false); % The aggregation phase can emit a number of outputs that is different to the % number of inputs (and different to the overall number of outputs) numInPerFcnAggregatePhase = numInPerClosure; numOutPerFcnAggregatePhase = cellfun(@(op) op.NumOutputs, originalOperations); originalAggregateFcnCell = arrayfun(@(c) c.Operation.PerChunkFunctionHandle, ... closuresToFuse, 'UniformOutput', false); % For AggregateOperation, we're adding the cellification layer. doCellification = true; newAggregateFcn = FunctionHandle(@(varargin) iFusedFcn(doCellification, ... originalAggregateFcnCell, ... numInPerFcnAggregatePhase, ... numOutPerFcnAggregatePhase, ... varargin{:}), 'MaxNumSlices', 1); % Each reduction function takes has a number of inputs equal to the number of % outputs of the aggregation phase. The reduction function emits a number of % outputs equal to the overall number of outputs of the original closure. numInPerFcnReducePhase = numOutPerFcnAggregatePhase; numOutPerFcnReducePhase = numOutPerClosure; originalReduceFcnCell = arrayfun(@(c) c.Operation.ReduceFunctionHandle, ... closuresToFuse, 'UniformOutput', false); newReduceFcn = FunctionHandle(@(varargin) iFusedFcn(doCellification, ... originalReduceFcnCell, ... numInPerFcnReducePhase, ... numOutPerFcnReducePhase, ... varargin{:}), 'MaxNumSlices', 2); % Build the new operation and corresponding closure. newOperation = AggregateOperation(newAggregateFcn, newReduceFcn, ... sum(numInPerClosure), sum(numOutPerClosure)); newFusedClosure = Closure(fusedClosureInputFutures, newOperation); % De-cellify the outputs replacementPromises = iAddDecellClosures(newFusedClosure.OutputPromises); % Finally, swap over the new promises nextNewPromiseIdx = 1; for cidx = 1:numel(closuresToFuse) closure = closuresToFuse(cidx); for outIdx = 1:numel(closure.OutputPromises) swap(replacementPromises(nextNewPromiseIdx), closure.OutputPromises(outIdx)); nextNewPromiseIdx = 1 + nextNewPromiseIdx; end end end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % This function serves as the fused body of both the aggregation and reduction % phases of an aggregation operation. It expects that all inputs have been % wrapped in a scalar cell array, and it will wrap all outputs in a scalar cell % array. This is to allow operations to be fused where the tall size does not % necessarily match. % % underlyingFunctions is a cell array of FunctionHandle objects representing the % original functions. % % numInPerFcn is a numeric vector specifying how many elements of 'varargin' % each element of 'underlyingFunctions' should consume % % numOutPerClosure is a numeric vector specifying how many elements of % 'varargout' each element of 'underlyingFunctions' should provide % % varargin are the chunks of data, wrapped in scalar cells. function varargout = iFusedFcn(doCellification, underlyingFunctions, ... numInPerFcn, numOutPerFcn, varargin) assert(numel(underlyingFunctions) == numel(numInPerFcn) && ... numel(underlyingFunctions) == numel(numOutPerFcn) && ... nargout == sum(numOutPerFcn) && ... numel(varargin) == sum(numInPerFcn)); nextInputToConsume = 1; nextOutputToProvide = 1; varargout = cell(1, nargout); for fcnIdx = 1:numel(underlyingFunctions) % Pick off the inputs from varargin for this function numToConsume = numInPerFcn(fcnIdx); inputs = varargin(nextInputToConsume:(nextInputToConsume + numToConsume - 1)); nextInputToConsume = numToConsume + nextInputToConsume; % De-cellify inputs and vertcat (for the reduction phase) if doCellification inputs = cellfun(@matlab.bigdata.internal.util.vertcatCellContents, inputs, 'UniformOutput', false); end % Allocate output cell array for this function numToProvide = numOutPerFcn(fcnIdx); outputs = cell(1, numToProvide); try [outputs{1:numToProvide}] = feval(underlyingFunctions{fcnIdx}, inputs{:}); catch E % Here, we expect the error emanating from FunctionHandle/feval to have a cause % which has the correct information. To avoid this information being % "wrapped" twice by FunctionHandle/throwAsFunction, we peel off the % envelope. if strcmp(E.identifier, 'MATLAB:bigdata:array:FunctionHandleError') && ... isscalar(E.cause) throw(E.cause{1}); else % Unexpected... rethrow(E); end end % Re-cellify outputs and push back into varargout. if doCellification outputs = num2cell(outputs); end varargout(nextOutputToProvide:(nextOutputToProvide + numToProvide - 1)) = outputs; nextOutputToProvide = numToProvide + nextOutputToProvide; end end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Look up the execution graph to find the real promise Id corresponding to this % future. This function is necessary so that we can step over "adaptor % information assertion" functions. function promiseId = iFindUnderlyingPromiseId(fut) done = false; while ~done assert(isscalar(fut), 'Future for iFindUnderlyingPromiseId must be scalar.'); promise = fut.Promise; promiseId = promise.Id; if isempty(promise.Closure) || ~iIsAdaptorAssertionOrNoOp(promise.Closure) done = true; else % Walk past the closure to the next future and go around again. fut = promise.Closure.InputFutures; end end end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Does a closure correspond to iAssertAdaptorInfoCorrect or a iNoOpForOptimizer? function tf = iIsAdaptorAssertionOrNoOp(closure) tf = false; if isa(closure.Operation, 'matlab.bigdata.internal.lazyeval.ElementwiseOperation') fcn = closure.Operation.FunctionHandle; if isa(fcn, 'matlab.bigdata.internal.FunctionHandle') underlyingFcnStr = func2str(fcn.Handle); tf = ~isempty(strfind(underlyingFcnStr, 'iAssertAdaptorInfoCorrect')) ... || ~isempty(~strfind(underlyingFcnStr, 'iNoOpForOptimizer')); end end end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Fuse aggregations by key function iFuseAggregationsByKey(closuresToFuse) import matlab.bigdata.internal.lazyeval.AggregateByKeyOperation; import matlab.bigdata.internal.FunctionHandle; import matlab.bigdata.internal.lazyeval.Closure; import matlab.bigdata.internal.lazyeval.SlicewiseOperation; % Calculate the number of inputs and outputs per *closure* (remembering that 'by % key' operations have the 'key' input/output which is not passed to the % aggregation/reduction functions). numInPerClosure = arrayfun(@(c) numel(c.InputFutures), closuresToFuse); numOutPerClosure = arrayfun(@(c) numel(c.OutputPromises), closuresToFuse); originalOperations = arrayfun(@(c) c.Operation, closuresToFuse, 'UniformOutput', false); % Get the input list for the fused AggregateByKeyOperation fusedAggByKeyInputFutures = iCalcInputsForFusedAggByKey(numInPerClosure, closuresToFuse); % Number of inputs/outputs per *function* is one less than the number of inputs % per *closure* numInPerFcnAggregatePhase = numInPerClosure - 1; numOutPerFcnAggregatePhase = cellfun(@(op) op.NumOutputs, originalOperations) - 1; originalAggregateFcnCell = arrayfun(@(c) c.Operation.PerChunkFunctionHandle, ... closuresToFuse, 'UniformOutput', false); maxNumSlicesForAggregation = min(arrayfun(@(c) c.Operation.PerChunkFunctionHandle.MaxNumSlices, ... closuresToFuse)); % For AggregateByKey fusings, we're much more constrained in the sizes that we % accept - and thus we don't need en-cellification. doCellification = false; newAggregateFcn = FunctionHandle(@(varargin) iFusedFcn(doCellification, ... originalAggregateFcnCell, ... numInPerFcnAggregatePhase, ... numOutPerFcnAggregatePhase, ... varargin{:}), ... 'MaxNumSlices', maxNumSlicesForAggregation); % For AggregateByKey fusings, number of inputs & outputs per *function* in the % reduce phase is the same as the number of *function* outputs from the % aggregate phase. numInPerFcnReducePhase = numOutPerFcnAggregatePhase; numOutPerFcnReducePhase = numOutPerFcnAggregatePhase; originalReduceFcnCell = arrayfun(@(c) c.Operation.ReduceFunctionHandle, ... closuresToFuse, 'UniformOutput', false); maxNumSlicesForReduction = min(arrayfun(@(c) c.Operation.ReduceFunctionHandle.MaxNumSlices, ... closuresToFuse)); newReduceFcn = FunctionHandle(@(varargin) iFusedFcn(doCellification, ... originalReduceFcnCell, ... numInPerFcnReducePhase, ... numOutPerFcnReducePhase, ... varargin{:}), ... 'MaxNumSlices', maxNumSlicesForReduction); % Put the two new functions together into a single AggregateByKeyOperation and % Closure. numInFusedOperation = 1 + sum(numInPerFcnAggregatePhase); numOutFusedOperation = 1 + sum(numOutPerFcnReducePhase); newOperation = AggregateByKeyOperation(newAggregateFcn, newReduceFcn, ... numInFusedOperation, numOutFusedOperation); newFusedClosure = Closure(fusedAggByKeyInputFutures, newOperation); % Next, we need to deal the outputs of the newFusedClosure to replicate the % 'key' output once for each original input closure. This is required % because we cannot (using only "swap" on promises) hook up the single key % output from the fused AggregateByKeyOperation to the multiple key outputs % of the original operations. numInDealPhase = numOutFusedOperation; numOutDealPhase = sum(numOutPerClosure); fusedOutputsCell = arrayfun(@(x) x.Future, newFusedClosure.OutputPromises, ... 'UniformOutput', false); dealFcn = FunctionHandle(@(varargin) iDealFcn(numOutPerClosure, varargin{:})); dealOperation = SlicewiseOperation(dealFcn, numInDealPhase, numOutDealPhase); dealClosure = Closure([fusedOutputsCell{:}], dealOperation); % Finally, swap over the new promises nextNewPromiseIdx = 1; for cidx = 1:numel(closuresToFuse) closure = closuresToFuse(cidx); for outIdx = 1:numel(closure.OutputPromises) swap(dealClosure.OutputPromises(nextNewPromiseIdx), closure.OutputPromises(outIdx)); nextNewPromiseIdx = 1 + nextNewPromiseIdx; end end end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Compute the input futures for a fused AggregateByKey closure. We need to pick % only a single 'key' input from the first closure, and then the remaining % inputs from each closure. Preceding logic should ensure that the 'key' inputs % for all fusing closures are equivalent. function fusedInputFutures = iCalcInputsForFusedAggByKey(numInPerClosure, closuresToFuse) numInFused = 1 + sum(numInPerClosure - 1); fusedInCell = cell(numInFused, 1); % The single copy of the key input fusedInCell{1} = closuresToFuse(1).InputFutures(1); nextFusedInIdx = 2; % Pick up the remaining inputs. for cIdx = 1:numel(closuresToFuse) closure = closuresToFuse(cIdx); for fIdx = 2:numel(closure.InputFutures) fusedInCell{nextFusedInIdx} = closure.InputFutures(fIdx); nextFusedInIdx = 1 + nextFusedInIdx; end end fusedInputFutures = [fusedInCell{:}]; end %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% function varargout = iDealFcn(numOutPerClosure, key, varargin) varargout = cell(1, sum(numOutPerClosure)); nextOutputIdx = 1; nextInputIdx = 1; for closureIdx = 1:numel(numOutPerClosure) varargout{nextOutputIdx} = key; nextOutputIdx = 1 + nextOutputIdx; for outputIdx = 2:numOutPerClosure(closureIdx) varargout{nextOutputIdx} = varargin{nextInputIdx}; nextOutputIdx = 1 + nextOutputIdx; nextInputIdx = 1 + nextInputIdx; end end end