gusucode.com > bigdata 工具箱 matlab源码程序 > bigdata/+matlab/+bigdata/+internal/+io/CacheWriter.m
%CacheWriter % An abstract implementation of the Writer interface that commits cache % entries to a CacheStore object. This base class is extended by % DiskCacheWriter and MemoryCacheWriter. This will be used by % non-spark back-ends. % % The public methods supports usage as an array of CacheWriter objects. In % such cases, the data is added to every CacheWriter individually. % % Copyright 2016 The MathWorks, Inc. classdef (Abstract) CacheWriter < matlab.bigdata.internal.io.Writer & matlab.mixin.Heterogeneous properties (SetAccess = immutable) % The unique ID associated with the entire partitioned array % being cached. CacheEntryId; % The partition index that is associated with this writer. PartitionIndex; % The underlying CacheStore that will own the resulting cache % entries. CacheStore; end properties (SetAccess = private) % A logical scalar that is true if this object is still writing % into a cache entry. HasEntryOpenForWrite = true; % The size in bytes of all data written to the cache entry. CacheEntrySize = 0; end methods (Access = protected) % The main constructor. % % The inputs are as follows: % - cacheEntryId: The value for the CacheEntryId property. % - partitionIndex: The value for the PartitionIndex property. % - cacheStore: The value for the CacheStore property. function obj = CacheWriter(cacheEntryId, partitionIndex, cacheStore) obj.CacheEntryId = string(cacheEntryId); obj.PartitionIndex = partitionIndex; obj.CacheStore = cacheStore; end end methods (Sealed) %ADD Add a collection of<key, value> pairs to the intermediate storage function add(objs, ~, value) p = whos('value'); numBytes = p.bytes; for obj = objs(:)' obj.CacheEntrySize = obj.CacheEntrySize + numBytes; if obj.HasEntryOpenForWrite keep = obj.CacheStore.checkCacheSize(obj.CacheEntrySize); if keep obj.doAdd(value); else obj.CacheStore.closeEntry(obj.CacheEntryId, obj.PartitionIndex); obj.HasEntryOpenForWrite = false; obj.doDiscard(); end end end end %COMMIT Commit all output to the intermediate storage function commit(objs) for obj = objs(:)' if obj.HasEntryOpenForWrite [data, dataSize] = obj.doCommit(obj.CacheEntrySize); obj.CacheStore.commitEntry(obj.CacheEntryId, obj.PartitionIndex, data, dataSize); obj.CacheStore.closeEntry(obj.CacheEntryId, obj.PartitionIndex); obj.HasEntryOpenForWrite = false; end end end end methods % Ensure that the cache entry is always closed. function delete(obj) if obj.HasEntryOpenForWrite obj.CacheStore.closeEntry(obj.CacheEntryId, obj.PartitionIndex); obj.HasEntryOpenForWrite = false; end end end methods (Abstract, Access = protected) % The underlying implementation of the add method for a single % CacheWriter object. doAdd(obj, value); % The underlying implementation of the commit method for a single % CacheWriter object. This receives the sum of the sizes of all % values added. It returns both data to be stored for the cache % entry and the size of that cache entry. [data, dataSize] = doCommit(obj, dataSize); % The underlying implementation of discard method for a single % CacheWriter object. This is called whenever the cache entry is to % be discarded. doDiscard(obj); end end