gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+io/CacheStore.m
%CacheStore % Helper base class that manages a collection of cache entries for one MATLAB % Context. There will exist two implementations, DiskCacheStore and % MemoryCacheStore. This will be used by non-spark back-ends. % % The public methods supports usage as an array of CacheStore objects. In % such cases, the array is expected to be in decreasing order of % performance of the underlying storage. % % Copyright 2016 The MathWorks, Inc. classdef (Abstract) CacheStore < handle & matlab.mixin.Heterogeneous properties (SetAccess = immutable) % The amount of local disk space to allow for this cache store in % in bytes. MaxSize; end properties (Dependent, SetAccess = private) % A table of cache entries that are currently stored on disk. This will be % ordered by decreasing recently last used. Backed by CacheEntriesImpl % which exists solely to avoid holding onto a "table" when empty. CacheEntries; end methods function out = get.CacheEntries(obj) if isempty(obj.CacheEntriesImpl) out = table(string.empty(0, 1), zeros(0, 1), zeros(0, 1), zeros(0, 1), cell(0, 1), ... 'VariableNames', {'Id', 'PartitionIndex', 'StageIndex', 'Size', 'Data'}); else out = obj.CacheEntriesImpl; end end function set.CacheEntries(obj, newValue) if isempty(newValue) % Ensure we don't hold a table if we don't need to so that 'table' can be % cleared. obj.CacheEntriesImpl = []; else obj.CacheEntriesImpl = newValue; end end end properties (SetAccess = private) % Backing data structure for CacheEntries. CacheEntriesImpl = []; % A list of all CacheEntryKey Ids for which a Writer object % currently exists. % % Note, this is independent of partition index as a single worker % is not expected to write to multiple partitions of the cache % simultaneously. OpenForWriteIds = string.empty(0, 1); % An index into the current stage of execution. % % This will monotonically increase. This exists so that cache entries % generated by the same execution stage do not override those % written by the same stage. This is to prevent cache cycling, that % partition N should not push partition 1 of the same array out of % the cache. StageIndex = 0; % The current size of all cache entries associated with the current stage. StageSize = 0; end methods (Access = protected) % The main constructor. function obj = CacheStore(maxSizeInBytes) obj.MaxSize = maxSizeInBytes; end end methods (Sealed) % Open or create a cache entry for the given cache entry key and % partition index. % % Both the reader and writer outputs can be empty or non-empty. % % Outputs: % - reader: Either an implementation of the Reader interface or % empty. This is non-empty if the corresponding cache entry exists % in any of the CacheStore objects. % % - writer: Either an implementation of the writer interface or % empty. This is non-empty if the corresponding cache entry does % not exist in any of the CacheStore objects and will write to all % CacheStore objects. This is also non-empty if the cache entry % exists but only in one of the slower CacheStore objects. In such % cases, the writer write to all of faster CacheStore objects as a % way of upgrading the cache entry. % function [reader, writer] = openOrCreateEntry(objs, cacheEntryKey, partitionIndex) writer = []; for obj = objs(:)' reader = obj.openEntryForRead(cacheEntryKey.Id, partitionIndex); if isempty(reader) writer = [writer; obj.openEntryForWrite(cacheEntryKey.Id, partitionIndex)]; %#ok<AGROW> else break; end end end % Increment the stage index. This allows subsequent cache entry % creation to override existing cache entries. function nextStage(objs) for obj = objs(:)' obj.StageIndex = obj.StageIndex + 1; obj.StageSize = 0; end end % Copy all entries from one CacheStore to another. function copyEntries(source, destination) assert(isscalar(source)); assert(isscalar(destination)); for ii = 1:height(source.CacheEntries) id = source.CacheEntries.Id(ii); partitionIndex = source.CacheEntries.PartitionIndex(ii); if any(destination.CacheEntries.Id == id & destination.CacheEntries.PartitionIndex == partitionIndex) continue; end try reader = source.openEntryForRead(id, partitionIndex); writer = destination.openEntryForWrite(id, partitionIndex); while hasdata(reader) && writer.HasEntry writer.add([], read(reader)); end writer.commit(); catch % Nothing we can do. If this occurs, the delete method % of writer will delete any created files. We still % have to delete the memory cache entry, so we are left % with no cache entry. end end end % Remove all cache entries corresponding with the provided cache % entry key. function removeEntry(objs, cacheEntryKey) for obj = objs(:)' obj.doRemoveAll(cacheEntryKey.Id); obj.CacheEntries(obj.CacheEntries.Id == cacheEntryKey.Id, :) = []; end end % Remove all cache entries. function removeAllEntries(objs) for obj = objs(:)' cacheEntryIds = unique(obj.CacheEntries.Id); for cacheEntryId = cacheEntryIds(:)' obj.doRemoveAll(cacheEntryId); end obj.CacheEntries = obj.CacheEntries([], :); end end end methods (Abstract, Access = protected) % The underlying implementation specific pieces of openEntryForRead. % This is called with the data from a previous committed cache entry. reader = doOpenForRead(obj, data); % The underlying implementation specific pieces of openEntryForWrite. writer = doOpenForWrite(obj, cacheEntryId, PartitionIndex); % Do cleanup for the data of a specific cache entry. This is % a hook for the disk cache cleanup. doRemove(obj, data); % Do cleanup of all data associated with a cache entry ID. This is % a hook for the disk cache cleanup. doRemoveAll(obj, cacheEntryId); end methods (Access = ?matlab.bigdata.internal.io.CacheWriter) % Commit a disk cache entry to the store. % % This will be called by CacheEntryWriter and expects to receive a % collection of files under BasePath that can be read using an % TallDatastore. function commitEntry(obj, cacheEntryId, partitionIndex, data, dataSize) obj.CacheEntries(end + 1, :) = {string(cacheEntryId), partitionIndex, obj.StageIndex, dataSize, {data}}; obj.CacheEntries = obj.CacheEntries([2:end, 1], :); obj.StageSize = obj.StageSize + dataSize; cumulativeSize = cumsum(obj.CacheEntries.Size); for ii = find(cumulativeSize > obj.MaxSize)' obj.doRemove(obj.CacheEntries.Data{ii}); end obj.CacheEntries(cumulativeSize > obj.MaxSize, :) = []; end % Commit a memory cache entry to the store. % % This will be called by CacheEntryWriter and expects to receive a % N x 1 cell array of chunks that represent the memory cache entry. % Close an entry. % % This will be called by CacheEntryWriter when finished regardless % of whether a cache entry was committed or not. function closeEntry(obj, cacheEntryId, partitionIndex) %#ok<INUSD> obj.OpenForWriteIds(obj.OpenForWriteIds == cacheEntryId, :) = []; end % Check whether the provided size in bytes will fit in the disk % cache store. function tf = checkCacheSize(obj, sizeInBytes) tf = sizeInBytes < obj.MaxSize - obj.StageSize; end end methods (Access = private) % Open a cache entry for reading. % % This can return empty if this is not possible. function reader = openEntryForRead(obj, cacheEntryId, partitionIndex) entryIndex = find(obj.CacheEntries.Id == cacheEntryId ... & obj.CacheEntries.PartitionIndex == partitionIndex, 1); if isempty(entryIndex) reader = []; return; end reader = obj.doOpenForRead(obj.CacheEntries.Data{entryIndex}); % If the entry hasn't yet been used in the current stage of % execution, move it into the current stage. if obj.CacheEntries.StageIndex(entryIndex) ~= obj.StageIndex obj.CacheEntries.StageIndex(entryIndex) = obj.StageIndex; obj.StageSize = obj.StageSize + obj.CacheEntries.Size(entryIndex); end % Move the updated entry in the table to the top of the list as % it is now the most recently used. obj.CacheEntries = obj.CacheEntries([entryIndex, 1 : entryIndex - 1, entryIndex + 1 : end], :); end % Open a cache entry for writing. % % This can return empty if this is not possible. function writer = openEntryForWrite(obj, cacheEntryId, partitionIndex) if any(cacheEntryId == obj.OpenForWriteIds) writer = []; else writer = obj.doOpenForWrite(cacheEntryId, partitionIndex); end end end end