diff --git a/README.md b/README.md index 997d145..fa4f1b3 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ # **The AMUN Code** ## Copyright (C) 2008-2023 Grzegorz Kowal -[![Build Status](https://ampere.clarokowal.com/api/badges/gkowal/amun-code/status.svg)](https://ampere.clarokowal.com/gkowal/amun-code) +[![Build Status](https://drone.amuncode.org/api/badges/gkowal/amun-code/status.svg)](https://drone.amuncode.org/gkowal/amun-code) AMUN is a parallel code to perform numerical simulations in fluid approximation on uniform or non-uniform (adaptive) meshes. The goal in developing this code is diff --git a/python/amunpy/setup.py b/python/amunpy/setup.py index 7aab18d..1b58407 100644 --- a/python/amunpy/setup.py +++ b/python/amunpy/setup.py @@ -5,7 +5,7 @@ with open("README.md", "r", encoding="utf-8") as fh: setuptools.setup( name="amunpy", - version="0.9.9", + version="0.9.10", author="Grzegorz Kowal", author_email="grzegorz@amuncode.org", description="Python Interface for the AMUN code's snapshots", diff --git a/python/amunpy/src/amunpy/__init__.py b/python/amunpy/src/amunpy/__init__.py index 0ed875e..d4533a4 100644 --- a/python/amunpy/src/amunpy/__init__.py +++ b/python/amunpy/src/amunpy/__init__.py @@ -21,6 +21,6 @@ __all__ = [ 'AmunXML', 'AmunH5', 'WriteVTK', \ __author__ = "Grzegorz Kowal" __copyright__ = "Copyright 2018-2023 Grzegorz Kowal " -__version__ = "0.9.9" +__version__ = "0.9.10" __maintainer__ = "Grzegorz Kowal" __email__ = "grzegorz@amuncode.org" diff --git a/python/amunpy/src/amunpy/amunxml.py b/python/amunpy/src/amunpy/amunxml.py index 2c3599b..14c48d3 100644 --- a/python/amunpy/src/amunpy/amunxml.py +++ b/python/amunpy/src/amunpy/amunxml.py @@ -224,6 +224,24 @@ class AmunXML(Amun): print("File '{}' seems to be corrupted! Proceeding anyway...".format(filename)) + def __shuffle_decode(self, a, dtype='int64'): + import numpy + + s = numpy.dtype(dtype).itemsize + d = [s, len(a) // s] + + return numpy.frombuffer(a, dtype="int8").reshape(d).T.tobytes() + + + def __bytedelta_decode(self, a, dtype='int64'): + import numpy + + s = numpy.dtype(dtype).itemsize + d = [s, len(a) // s] + + return numpy.cumsum(numpy.frombuffer(a, dtype="int8").reshape(d), axis=-1, dtype='int8').T.tobytes() + + def __read_binary_meta(self, dataset, dtype='int32'): ''' Reads binary data of metadata. @@ -254,6 +272,15 @@ class AmunXML(Amun): else: raise Exception("Binary file '{}' compressed in unsupported format {}!".format(fname, comp)) + if 'data_filter' in self.binaries[dataset]: + data_filter = self.binaries[dataset]['data_filter'] + if data_filter == 'bytedelta': + data = self.__bytedelta_decode(data, dtype=dtype) + elif data_filter == 'shuffle': + data = self.__shuffle_decode(data, dtype=dtype) + else: + raise Exception("Binary file '{}' processed using unsupported filter {}!".format(fname, data_filter)) + if 'digest' in self.binaries[dataset]: htype = self.binaries[dataset]['digest_type'] dhash = self.binaries[dataset]['digest'] @@ -302,6 +329,15 @@ class AmunXML(Amun): else: raise Exception("Binary file '{}' compressed in unsupported format {}!".format(fname, comp)) + if 'data_filter' in self.chunks[chunk_number][dataset_name]: + data_filter = self.chunks[chunk_number][dataset_name]['data_filter'] + if data_filter == 'bytedelta': + data = self.__bytedelta_decode(data, dtype=dtype) + elif data_filter == 'shuffle': + data = self.__shuffle_decode(data, dtype=dtype) + else: + raise Exception("Binary file '{}' processed using unsupported filter {}!".format(fname, data_filter)) + if 'digest' in self.chunks[chunk_number][dataset_name]: htype = self.chunks[chunk_number][dataset_name]['digest_type'] dhash = self.chunks[chunk_number][dataset_name]['digest'] diff --git a/sources/compression.F90 b/sources/compression.F90 index 60466b8..7a2bc5f 100644 --- a/sources/compression.F90 +++ b/sources/compression.F90 @@ -123,8 +123,9 @@ module compression private - public :: set_compression, get_compression, compression_bound, compress - public :: compression_suffix + public :: set_compression, get_compression, compression_bound + public :: compression_none, compression_suffix + public :: encode, compress !- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - ! @@ -270,7 +271,7 @@ module compression ! subroutine COMPRESS: ! ------------------- ! -! Subroutine compressed input buffer using ZSTD compression. +! Subroutine compresses the input buffer. ! ! Arguments: ! @@ -338,6 +339,80 @@ module compression !------------------------------------------------------------------------------- ! end subroutine compress +! +!=============================================================================== +! +! subroutine ENCODE: +! ----------------- +! +! Subroutine encodes the input buffer for better data compression. +! +! Arguments: +! +! filter_type - the filter type to apply, 1 - shuffle, 2 - bytedelta; +! item_size - the size of each item in bytes; +! bytes - the length of the input data; +! input_ptr - the pointer to the input data; +! output - the output where the encoded data are written; +! +!=============================================================================== +! + subroutine encode(filter_type, item_size, bytes, input_ptr, output) + + use iso_c_binding, only : c_ptr, c_f_pointer + + implicit none + + integer(kind=1) , intent(in ) :: filter_type + integer(kind=4) , intent(in ) :: item_size + integer(kind=8) , intent(in ) :: bytes + type(c_ptr) , intent(in ) :: input_ptr + integer(kind=1), dimension(bytes), intent(inout) :: output + + integer(kind=1), dimension(:), pointer :: input + + integer(kind=8) :: i, j, m, n, item_count + +!------------------------------------------------------------------------------- +! + call c_f_pointer(input_ptr, input, [ bytes ]) + + select case(filter_type) + case(1) + item_count = bytes / item_size + + i = 1 + j = item_count + do m = 1, item_size + output(i:j) = input(m:bytes:item_size) + + i = j + 1 + j = j + item_count + end do + + case(2) + item_count = bytes / item_size + + i = 1 + j = item_count + do m = 1, item_size + output(i:j) = input(m:bytes:item_size) + + do n = j, i + 1, -1 + output(n) = output(n) - output(n-1) + end do + + i = j + 1 + j = j + item_count + end do + + case default + output(:) = input(:) + end select + +!------------------------------------------------------------------------------- +! + end subroutine encode !=============================================================================== ! diff --git a/sources/io.F90 b/sources/io.F90 index 72bfbaa..0ee82c4 100644 --- a/sources/io.F90 +++ b/sources/io.F90 @@ -118,6 +118,11 @@ module io character(len=255), save :: cformat = "none" ! compression format integer , save :: clevel = 3 ! compression level +! data filter applied before the compression: +! (0 - no filter, 1 - shuffle, 2 - bytedelta) +! + integer(kind=1) :: data_filter = 0 + ! the type of digest to use and its length ! integer, save :: hash_type = 0 @@ -185,7 +190,7 @@ module io ! subroutine initialize_io(verbose, status) - use compression, only : set_compression, get_compression + use compression, only : set_compression, get_compression, compression_none use hash , only : hash_info use helpers , only : print_message #ifdef HDF5 @@ -199,7 +204,7 @@ module io integer, intent(out) :: status logical :: test - character(len=255) :: dname + character(len=255) :: string, dname character(len=255) :: sformat = "xml" character(len=255) :: precise = "off" character(len=255) :: ghosts = "on" @@ -285,6 +290,18 @@ module io call get_parameter("compression_level" , clevel) call set_compression(cformat, clevel) + string = "none" + call get_parameter("data_filter", string) + select case(string) + case('shuffle', 'SHUFFLE') + data_filter = 1 + case('bytedelta', 'BYTEDELTA') + data_filter = 2 + case default + data_filter = 0 + end select + if (get_compression() == compression_none) data_filter = 0 + call get_parameter("digest_type", dtype) call hash_info(dtype, hash_type, hash_length) @@ -493,6 +510,14 @@ module io call print_parameter(verbose, "digest type", hash_name(hash_type)) call print_parameter(verbose, "compression format", cformat) call print_parameter(verbose, "compression level", clevel) + select case(data_filter) + case(1) + call print_parameter(verbose, "data filter", "shuffle") + case(2) + call print_parameter(verbose, "data filter", "bytedelta") + case default + call print_parameter(verbose, "data filter", "none") + end select end select call print_parameter(verbose, "precise snapshot intervals", & precise_snapshots, "on") @@ -2189,49 +2214,53 @@ module io write(fname,"(a,'.bin')") "metablock_fields" bytes = size(fields, kind=8) * kind(fields) - call write_binary_xml(dname, fname, c_loc(fields), bytes, dtype, digest) + call write_binary_xml(dname, fname, c_loc(fields), bytes, & + kind(fields), dtype, digest) call write_attribute_xml(lun, "fields", fname, 'int32', & shape(fields), bytes, dtype, digest) write(fname,"(a,'.bin')") "metablock_children" bytes = size(children, kind=8) * kind(children) - call write_binary_xml(dname, fname, c_loc(children), & - bytes, dtype, digest) + call write_binary_xml(dname, fname, c_loc(children), bytes, & + kind(children), dtype, digest) call write_attribute_xml(lun, "children", fname, 'int32', & shape(children), bytes, dtype, digest) #if NDIMS == 3 write(fname,"(a,'.bin')") "metablock_faces" bytes = size(faces, kind=8) * kind(faces) - call write_binary_xml(dname, fname, c_loc(faces), bytes, dtype, digest) + call write_binary_xml(dname, fname, c_loc(faces), bytes, & + kind(faces), dtype, digest) call write_attribute_xml(lun, "faces", fname, 'int32', & shape(faces), bytes, dtype, digest) #endif /* NDIMS == 3 */ write(fname,"(a,'.bin')") "metablock_edges" bytes = size(edges, kind=8) * kind(edges) - call write_binary_xml(dname, fname, c_loc(edges), bytes, dtype, digest) + call write_binary_xml(dname, fname, c_loc(edges), bytes, & + kind(edges), dtype, digest) call write_attribute_xml(lun, "edges", fname, 'int32', & shape(edges), bytes, dtype, digest) write(fname,"(a,'.bin')") "metablock_corners" bytes = size(corners, kind=8) * kind(corners) - call write_binary_xml(dname, fname, c_loc(corners), & - bytes, dtype, digest) + call write_binary_xml(dname, fname, c_loc(corners), bytes, & + kind(corners), dtype, digest) call write_attribute_xml(lun, "corners", fname, 'int32', & - shape(corners), bytes, dtype, digest) + shape(corners), bytes, dtype, digest) write(fname,"(a,'.bin')") "metablock_bounds" bytes = size(bounds, kind=8) * kind(bounds) - call write_binary_xml(dname, fname, c_loc(bounds), bytes, dtype, digest) + call write_binary_xml(dname, fname, c_loc(bounds), bytes, & + kind(bounds), dtype, digest) call write_attribute_xml(lun, "bounds", fname, 'float64', & - shape(bounds), bytes, dtype, digest) + shape(bounds), bytes, dtype, digest) if (nmodes > 0) then write(fname,"(a,'.bin')") "forcing_coefficients" bytes = size(fcoefs, kind=8) * kind(fcoefs) * 2 - call write_binary_xml(dname, fname, c_loc(fcoefs), & - bytes, dtype, digest) + call write_binary_xml(dname, fname, c_loc(fcoefs), bytes, & + kind(fcoefs), dtype, digest) call write_attribute_xml(lun, "forcing", fname, 'complex64', & shape(fcoefs), bytes, dtype, digest) end if @@ -2295,15 +2324,15 @@ module io write(aname,"('_',i6.6)") l write(fname,"('datablock_prim_',i6.6,a,'.bin')") nproc, trim(aname) bytes = size(pdata%q, kind=8) * kind(pdata%q) - call write_binary_xml(dname, fname, c_loc(pdata%q), & - bytes, dtype, digest) + call write_binary_xml(dname, fname, c_loc(pdata%q), bytes, & + kind(pdata%q), dtype, digest) call write_attribute_xml(lun, "prim" // trim(aname), fname, & 'float64', shape(pdata%q), bytes, dtype, digest) write(fname,"('datablock_cons_',i6.6,a,'.bin')") nproc, trim(aname) bytes = size(pdata%uu, kind=8) * kind(pdata%uu) - call write_binary_xml(dname, fname, c_loc(pdata%uu), & - bytes, dtype, digest) + call write_binary_xml(dname, fname, c_loc(pdata%uu), bytes, & + kind(pdata%uu), dtype, digest) call write_attribute_xml(lun, "cons" // trim(aname), fname, & 'float64', shape(pdata%uu), bytes, dtype, digest) @@ -2313,7 +2342,8 @@ module io write(fname,"(a,'_',a,'_',i6.6,'.bin')") "datablock", "ids", nproc bytes = size(ids, kind=8) * kind(ids) - call write_binary_xml(dname, fname, c_loc(ids), bytes, dtype, digest) + call write_binary_xml(dname, fname, c_loc(ids), bytes, & + kind(ids), dtype, digest) call write_attribute_xml(lun, "ids", fname, 'int32', & shape(ids), bytes, dtype, digest) @@ -2335,7 +2365,8 @@ module io write(fname,"(a,'_',a,'_',i6.6,'.bin')") "random", "seeds", nproc bytes = size(seeds, kind=8) * kind(seeds) - call write_binary_xml(dname, fname, c_loc(seeds), bytes, dtype, digest) + call write_binary_xml(dname, fname, c_loc(seeds), bytes, & + kind(seeds), dtype, digest) call write_attribute_xml(lun, "seeds", fname, 'int64', & shape(seeds), bytes, dtype, digest) @@ -2543,15 +2574,15 @@ module io write(fname,"(a,'.bin')") "metablock_fields" dbytes = size(fields, kind=8) * kind(fields) - call write_binary_xml(dname, fname, c_loc(fields), & - dbytes, hash_type, ddigest, cbytes, cdigest) + call write_binary_xml(dname, fname, c_loc(fields), dbytes, & + kind(fields), hash_type, ddigest, cbytes, cdigest) call write_attribute_xml(lun, "fields", fname, 'int32', & shape(fields), dbytes, hash_type, ddigest, cbytes, cdigest) write(fname,"(a,'.bin')") "metablock_bounds" dbytes = size(bounds, kind=8) * kind(bounds) - call write_binary_xml(dname, fname, c_loc(bounds), & - dbytes, hash_type, ddigest, cbytes, cdigest) + call write_binary_xml(dname, fname, c_loc(bounds), dbytes, & + kind(bounds), hash_type, ddigest, cbytes, cdigest) call write_attribute_xml(lun, "bounds", fname, 'float64', & shape(bounds), dbytes, hash_type, ddigest, cbytes, cdigest) @@ -2610,8 +2641,8 @@ module io write(fname,"(a,'_',a,'_',i6.6,'.bin')") "datablock", "ids", nproc dbytes = size(ids, kind=8) * kind(ids) - call write_binary_xml(dname, fname, c_loc(ids), & - dbytes, hash_type, ddigest, cbytes, cdigest) + call write_binary_xml(dname, fname, c_loc(ids), dbytes, & + kind(ids), hash_type, ddigest, cbytes, cdigest) call write_attribute_xml(lun, "ids", fname, 'int32', shape(ids), & dbytes, hash_type, ddigest, cbytes, cdigest) @@ -2630,8 +2661,8 @@ module io write(fname,"(a,'_',a,'_',i6.6,'.bin')") "datablock", & trim(pvars(p)), nproc - call write_binary_xml(dname, fname, c_loc(array), & - dbytes, hash_type, ddigest, cbytes, cdigest) + call write_binary_xml(dname, fname, c_loc(array), dbytes, & + kind(array), hash_type, ddigest, cbytes, cdigest) call write_attribute_xml(lun, pvars(p), fname, 'float64', & shape(array), dbytes, hash_type, ddigest, cbytes, cdigest) end do @@ -2860,6 +2891,12 @@ module io ' compressed_digest="' // trim(adjustl(str)) // '"' end if end if + select case(data_filter) + case(1) + string = trim(string) // ' data_filter="shuffle"' + case(2) + string = trim(string) // ' data_filter="bytedelta"' + end select end if end if string = trim(string) // '>' // trim(adjustl(fname)) // '' @@ -2882,6 +2919,7 @@ module io ! path, name - the path and name where the array should be written to; ! array_ptr - the pointer to the array to store; ! array_bytes - the size of the array in bytes; +! item_size - the size of element; ! digest_type - the type of digest to hash the data; ! array_digest - the digest of the original array; ! compressed_bytes - the size of the compressed array in bytes; @@ -2889,11 +2927,12 @@ module io ! !=============================================================================== ! - subroutine write_binary_xml(path, name, array_ptr, array_bytes, digest_type, & - array_digest, compressed_bytes, compressed_digest) + subroutine write_binary_xml(path, name, array_ptr, array_bytes, item_size, & + digest_type, array_digest, & + compressed_bytes, compressed_digest) use compression , only : get_compression, compression_bound, compress - use compression , only : compression_suffix + use compression , only : compression_suffix, encode use hash , only : digest use iso_c_binding, only : c_ptr, c_loc, c_f_pointer @@ -2901,6 +2940,7 @@ module io character(len=*), intent(in) :: path, name type(c_ptr) , intent(in) :: array_ptr + integer(kind=4) , intent(in) :: item_size integer(kind=8) , intent(in) :: array_bytes integer , intent(in) :: digest_type @@ -2915,8 +2955,8 @@ module io integer(kind=1), dimension(:), pointer :: array - integer(kind=1), dimension(:), allocatable, target :: buffer - type(c_ptr) :: buffer_ptr + integer(kind=1), dimension(:), allocatable, target :: input, buffer + type(c_ptr) :: input_ptr, buffer_ptr !------------------------------------------------------------------------------- ! @@ -2929,10 +2969,12 @@ module io ! if (present(compressed_bytes) .and. get_compression() > 0) then compressed_bytes = compression_bound(array_bytes) - allocate(buffer(compressed_bytes), stat = status) - buffer_ptr = c_loc(buffer) + allocate(input(array_bytes), buffer(compressed_bytes), stat = status) if (status == 0) then - call compress(array_ptr, array_bytes, buffer_ptr, compressed_bytes) + input_ptr = c_loc(input) + buffer_ptr = c_loc(buffer) + call encode(data_filter, item_size, array_bytes, array_ptr, input) + call compress(input_ptr, array_bytes, buffer_ptr, compressed_bytes) if (compressed_bytes > 0 .and. compressed_bytes < array_bytes) then write(fname,"(a,'/',a,a)") trim(path), trim(name), & trim(compression_suffix) @@ -2948,7 +2990,7 @@ module io compressed_bytes = 0 compressed_digest = 0 end if - deallocate(buffer) + deallocate(input, buffer) end if end if