VAPoR  0.1
WaveCodecIO.h
Go to the documentation of this file.
1 //
2 // $Id$
3 //
4 #ifndef _WaveCodeIO_h_
5 #define _WaveCodeIO_h_
6 
7 #include <vapor/VDFIOBase.h>
8 #include <vapor/SignificanceMap.h>
9 #include <vapor/Compressor.h>
10 #include <vapor/EasyThreads.h>
11 #include <vapor/NCBuf.h>
12 
13 #ifdef PARALLEL
14 #include <mpi.h>
15 #endif
16 
17 namespace VAPoR {
18 
19 //
37 //
39 public:
40 
47  //
48  WaveCodecIO(const MetadataVDC &metadata, int nthreads = 0);
49 
56  //
57  WaveCodecIO(const string &metafile, int nthreads = 0);
58 
59 #ifdef DEAD
61  const size_t dim[3], const size_t bs[3], int numTransforms,
62  const vector <size_t> cratios,
63  const string &wname, const string &filebase
64  );
65 
66  WaveCodecIO(const vector <string> &files);
67 #endif
68 
69  virtual ~WaveCodecIO();
70 
105  virtual int OpenVariableRead(
106  size_t timestep, const char *varname, int reflevel=0, int lod=0
107  );
108 
130  virtual int OpenVariableWrite(
131  size_t timestep, const char *varname,
132  int reflevel=-1 /*ignored*/, int lod=-1
133  );
134 
138  //
139  virtual int CloseVariable();
140 
162  //
163  virtual int BlockReadRegion(
164  const size_t bmin[3], const size_t bmax[3], float *region, bool unblock=true
165  );
166 
189  //
190  virtual int ReadRegion(
191  const size_t min[3], const size_t max[3], float *region
192  );
193 
194  virtual int ReadRegion(float *region);
195 
217  //
218  virtual int ReadSlice(float *slice);
219 
243  //
244  virtual int BlockWriteRegion(
245  const float *region, const size_t bmin[3], const size_t bmax[3],
246  bool block=true
247  );
248 
275  virtual int WriteRegion(
276  const float *region, const size_t min[3], const size_t max[3]
277  );
278 
279  virtual int WriteRegion(
280  const float *region
281  );
282 
283 
300  virtual int WriteSlice(const float *slice);
301 
314  virtual void SetBoundaryPadOnOff(bool pad) {
315  _pad = pad;
316  };
317 
331  //
332  const float *GetDataRange() const {return (_dataRange);}
333 
350  void GetValidRegion(
351  size_t min[3], size_t max[3], int reflevel
352  ) const;
353 
367  //
368  virtual int VariableExists(
369  size_t ts,
370  const char *varname,
371  int reflevel = 0,
372  int lod = 0
373  ) const ;
374 
392  static size_t GetMaxCRatio(const size_t bs[3], string wavename, string wmode);
393 
395  //
396  virtual int GetNumTransforms() const;
397 
399  //
400  virtual void GetBlockSize(size_t bs[3], int reflevel) const;
401 #ifdef PARALLEL
402  void SetIOComm(MPI_Comm NewIOComm) {_IO_Comm = NewIOComm;};
403 #endif
404  void SetCollectiveIO(bool newCollectiveIO) {
405  _collectiveIO = newCollectiveIO;
406  };
407  friend void *RunBlockReadRegionThread(void *object);
408  friend void *RunBlockWriteRegionThread(void *object);
409 
410 private:
411 #ifdef PARALLEL
412  MPI_Comm _IO_Comm;
413 #endif
414  bool _collectiveIO;
415  double _xformMPI;
416  double _methodTimer;
417  double _methodThreadTimer;
418  double _ioMPI;
419  //
420  // Threaded read object for parallel inverse transforms
421  // (data reconstruction)
422  //
423  class ReadWriteThreadObj {
424  public:
425  ReadWriteThreadObj(
426  WaveCodecIO *wc,
427  int id,
428  float *region,
429  const size_t bmin_p[3],
430  const size_t bmax_p[3],
431  const size_t bdim_p[3],
432  const size_t dim_p[3],
433  const size_t bs_p[3],
434  bool reblock,
435  bool pad
436  );
437  void BlockReadRegionThread();
438  void BlockWriteRegionThread();
439  const float *GetDataRange() const {return (_dataRange);}
440 
441  private:
442  WaveCodecIO *_wc;
443  int _id; // thread id
444  float *_region; // destination buffer for read
445  const size_t *_bmin_p;
446  const size_t *_bmax_p; // block coordinates of data
447  const size_t *_bdim_p;
448  const size_t *_dim_p;
449  const size_t *_bs_p; // dimensions of block
450  float _dataRange[2];
451  bool _reblock;
452  bool _pad;
453  int _FetchBlock(
454  size_t bx, size_t by, size_t bz
455  );
456  int _WriteBlock(size_t bx, size_t by, size_t bz);
457  };
458 
459 
460 public:
461  int _nthreads; // num execution threads
462  int getNumThread(){return _nthreads;}
463  void EnableBuffering(size_t count[3], size_t divisor, int rank);
464 private:
465  int _next_block;
466  int _threadStatus;
467  size_t _NC_BUF_SIZE; //buffering disabled by default
468  ReadWriteThreadObj **_rw_thread_objs;
469  vector < vector <SignificanceMap> > _sigmapsThread;// one set for each thread
470  vector <size_t> _sigmapsizes; // size of each encoded sig map
471  Compressor *_compressor3D; // 3D compressor
472  Compressor *_compressor2DXY;
473  Compressor *_compressor2DXZ;
474  Compressor *_compressor2DYZ;
475  Compressor *_compressor; // compressor for currently opened variable
476  vector <Compressor *> _compressorThread3D;
477  vector <Compressor *> _compressorThread2DXY;
478  vector <Compressor *> _compressorThread2DXZ;
479  vector <Compressor *> _compressorThread2DYZ;
480  vector <Compressor *> _compressorThread; // current compressor threads
481  vector <NCBuf *> _ncbufs;
482 
483  VarType_T _vtype; // Type (2d, or 3d) of currently opened variable
484  VarType_T _compressorType; // Type (2d, or 3d) of current _compressor
485  int _lod; // compression level of currently opened file
486  int _reflevel; // current refinement level
487  size_t _validRegMin[3]; // min region bounds of current file
488  size_t _validRegMax[3]; // max region bounds of current file
489  bool _writeMode; // true if opened for writes
490  bool _isOpen; // true if a file is opened
491  size_t _timeStep; // currently opened time step
492  string _varName; // Currently opened variable
493  vector <string> _ncpaths;
494  vector <int> _ncids;
495  vector <int> _nc_sig_vars; // ncdf ids for wave and sig vars
496  vector <int> _nc_wave_vars;
497  float *_cvector; // storage for wavelet coefficients
498  size_t _cvectorsize; // amount of space allocated to _cvector
499  vector <float *> _cvectorThread;
500  unsigned char *_svector; // storage for encoded signficance map
501  size_t _svectorsize; // amount of space allocated to _svector
502  vector <unsigned char *> _svectorThread;
503  float *_block; // storage for a block
504  vector <float *> _blockThread;
505  float *_blockReg; // more storage
506  float _dataRange[2];
507  vector <size_t> _ncoeffs; // num wave coeff. at each compression level
508  vector <size_t> _cratios3D; // 3D compression ratios
509  vector <size_t> _cratios2D; // 2D compression ratios
510  vector <size_t> _cratios; // compression ratios for currently opened file
511 
512  float *_sliceBuffer;
513  size_t _sliceBufferSize; // size of slice buffer in elements
514  int _sliceCount; // num slices written
515 
516  bool _pad; // Padding enabled?
517 
518  int _OpenVarWrite(const string &basename);
519  int _OpenVarRead(const string &basename);
520  int _WaveCodecIO(int nthreads);
521  int _SetupCompressor();
522 
523  void _UnpackCoord(
524  VarType_T vtype, const size_t src[3], size_t dst[3], size_t fill
525  ) const;
526 
527  void _PackCoord(
528  VarType_T vtype, const size_t src[3], size_t dst[3], size_t fill
529  ) const;
530 
531  void _FillPackedCoord(
532  VarType_T vtype, const size_t src[3], size_t dst[3], size_t fill
533  ) const;
534 
535 
536 };
537 };
538 
539 #endif // _WaveCodeIO_h_
virtual void SetBoundaryPadOnOff(bool pad)
Definition: WaveCodecIO.h:314
const float * GetDataRange() const
Definition: WaveCodecIO.h:332
#define VDF_API
Definition: common.h:61
void SetCollectiveIO(bool newCollectiveIO)
Definition: WaveCodecIO.h:404
A class for managing data set metadata.
Definition: MetadataVDC.h:92
A sub-region reader for VDF files.
Definition: WaveCodecIO.h:38
Abstract base class for performing data IO to a VDC.
Definition: VDFIOBase.h:27
A class for managing data set metadata.
Definition: Compressor.h:29