Zdrojový kód pro modules.overlap_clip

#!/usr/bin/env python
# -*- coding: cp1250 -*-

#  Project: RadAgro
#  File: overlap_clip.py
#
#  Author: Dr. Jakub Brom
#
#  Copyright (c) 2020. Dr. Jakub Brom, University of South Bohemia in České
#  Budějovice, Faculty of Agriculture.
#
#  This program is free software: you can redistribute it and/or modify
#      it under the terms of the GNU General Public License as published by
#      the Free Software Foundation, either version 3 of the License, or
#      (at your option) any later version.
#
#      This program is distributed in the hope that it will be useful,
#      but WITHOUT ANY WARRANTY; without even the implied warranty of
#      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#      GNU General Public License for more details.
#
#      You should have received a copy of the GNU General Public License
#      along with this program.  If not, see <https://www.gnu.org/licenses/>
#
#  Last changes: 28.11.20 23:48
#
#  Begin: 2020/9/21
#
#  Description: Module for manipulation with rasters with different EPSG and
#  shape. The methods provide possibility to unify EPSGs of rasters and their
#  extent and resolution. It is possible to create an overlapping area for all
#  the rasters used.

import shutil
import tempfile
import os
import warnings
import copy
import numpy as np
from osgeo import gdal, osr


[dokumentace]def readGeo(rast): """ Reading geographical information from raster using GDAL. :param rast: Path to raster file in GDAL accepted format. :type rast: str :returns: List of geotransformation parameters and features of an \ input raster: \n * 0: The affine transformation coefficients; tuple * 1: Projection information of the raster (dataset); str * 2: Pixel width (m) on X axis; float * 3: Pixel height (m) on Y axis; float * 4: EPSG Geodetic Parameter Set code; str * 5: Coordinate system; str * 6: Number of columns in the raster; int * 7: Number of rows in the raster; int * 8: SRS - OSR spatial reference; object :rtype: list """ try: ds = gdal.Open(rast) gtransf = ds.GetGeoTransform() prj = ds.GetProjection() x_size = gtransf[1] y_size = gtransf[5] * (-1) cols = ds.RasterXSize rows = ds.RasterYSize srs = osr.SpatialReference(wkt=prj) if srs.IsProjected: EPSG = srs.GetAttrValue("authority", 1) geogcs = srs.GetAttrValue("geogcs", 0) loccs = srs.GetAttrValue("local_cs", 0) else: EPSG = None geogcs = None loccs = None del ds except IOError: raise IOError("Geographical information have not been readed.") return gtransf, prj, x_size, y_size, EPSG, geogcs, loccs, cols, \ rows, srs
[dokumentace]def fixSRS(rasters_in, tmp_out=True): """ Checking and fixing the SRSs of input rasters. Function looks for possible definition of SRS in group of rasters --> local SRS is possibly replaced by global SRS if it is available in the set of rasters. The empty SRS is replaced by most frequent SRS in the set of rasters. :param rasters_in: List of input rasters paths. :type rasters_in: list :param tmp_out: Selection if the output layers will be created as temporal scratch layers or the original raster layers will be replaced by newly defined rasters. :type tmp_out: bool :return: List of output layers paths :rtype: list """ # Manage output list rasters_out = copy.copy(rasters_in) # 1. Get rasters info EPSG_list = [] LLC_X_list = [] geoGCS_list = [] localCS_list = [] for i in rasters_in: ginfo = readGeo(i) cols = ginfo[7] EPSG_list.append(ginfo[4]) geoGCS_list.append(ginfo[5]) localCS_list.append(ginfo[6]) LLC_X_list.append(ginfo[0][0] + ginfo[2] * cols) # 2. Check EPSGs # Convert EPSG to int for i in range(len(EPSG_list)): if EPSG_list[i] is None: EPSG_list[i] = 0 EPSGs_int = np.array(EPSG_list).astype(int) # Check differences in the EPSGs if np.min(EPSGs_int) is not np.max(EPSGs_int): # Make temporary folder try: tmp_folder = tempfile.mkdtemp() except OSError as err: raise err # 3. Test of the layers with local CS - replace localCS with # geoCS localCS_index = [i for i, x in enumerate(localCS_list) if x is not None] for i in localCS_index: # Name of local CS which can cover geo CS localCS_name = localCS_list[i][0:5] try: # Search for the projection in geoCS which # corresponds with localCS of the tested image geoCS_notNone = [(i, x) for i, x in enumerate(geoGCS_list) if x is not None] geo_index = [i for i, x in geoCS_notNone if x[0:5] == localCS_name][0] # Fill EPSG for a layer EPSG_list[i] = EPSG_list[geo_index] # create temporary output file tmp_file = tempfile.NamedTemporaryFile( suffix=".tif", dir=tmp_folder, delete=False) tmp_file_name = tmp_file.name # convert SRS from local to global --> create # temporary file gdal.Warp(tmp_file_name, rasters_out[i], srcSRS=str( "EPSG:" + EPSG_list[i]), warpOptions=[ "NUM_THREADS=ALL_CPUS"], multithread=True) # replace path of file with local projection by the # temporary file with correct projection print("SRS for layer " + rasters_out[i] + " has been " "fixed.") rasters_out[i] = tmp_file_name geoGCS_list[i] = geoGCS_list[geo_index] localCS_list[i] = None except Warning: warnings.warn("The SRS for layer " + rasters_out[i] + " has not been found. We will try to " "find out another way how to fix SRS.", stacklevel=3) # 4. Testing layers with missing EPSG on basis of coordinates noEPSG_index = [i for i, x in enumerate(geoGCS_list) if x is None] # Searching the same coordinates and replacing SRS for i in noEPSG_index: # search for the same coordinates coord = LLC_X_list[i] try: geoCS_notNone = [i for i, x in enumerate(geoGCS_list) if x is not None] # Testing coordinates with layers with existing CS for j in geoCS_notNone: if LLC_X_list[j] == coord: EPSG_list[i] = EPSG_list[j] tmp_file = tempfile.NamedTemporaryFile( suffix=".tif", dir=tmp_folder, delete=False) tmp_file_name = tmp_file.name # convert SRS --> create temporary file gdal.Warp(tmp_file_name, rasters_out[i], srcSRS=str("EPSG:" + EPSG_list[i]), warpOptions=["NUM_THREADS=ALL_CPUS"], multithread=True) # replace path of file with local projection # by the temporary file with correct projection print("SRS for layer " + rasters_out[i] + " has been fixed.") rasters_out[i] = tmp_file_name geoGCS_list[i] = geoGCS_list[j] break except RuntimeError: raise RuntimeError("Ooops...! Something wrong " "happened.") # 5. Converting remaining files to prevailing EPSG noEPSG_index = [i for i, x in enumerate(geoGCS_list) if x is None] EPSG_maxfreq = max(set(EPSG_list), key=EPSG_list.count) for i in noEPSG_index: warnings.warn("Layer " + rasters_out[i] + " has still no " "SRS. The EPSG: " + EPSG_maxfreq + " will " "be used.", stacklevel=3) try: tmp_file = tempfile.NamedTemporaryFile( suffix=".tif", dir=tmp_folder, delete=False) tmp_file_name = tmp_file.name # convert SRS --> create temporary file gdal.Warp(tmp_file_name, rasters_out[i], srcSRS=str("EPSG:" + EPSG_maxfreq), warpOptions=["NUM_THREADS=ALL_CPUS"], multithread=True) # replace path of file with local projection by the # temporary file with correct projection print("SRS for layer " + rasters_out[i] + " has " "been fixed.") rasters_out[i] = tmp_file_name EPSG_list[i] = EPSG_maxfreq except RuntimeError: raise RuntimeError("Ooops...! Something wrong " "happened.") if tmp_out is False: warnings.warn("All the files with incorrect SRS have been " "replaced by files with fixed SRS. Some " "errors may occur!", stacklevel=3) for i in range(len(rasters_out)): if rasters_out[i] is not rasters_in[i]: shutil.copy(rasters_out[i], rasters_in[i]) rasters_out = rasters_in # remove temporary files shutil.rmtree(tmp_folder) else: print("No SRS has been changed.") return rasters_out
[dokumentace]def uniformSRS(rasters_in, epsg=None, tmp_out=True): """ The function unifies SRSs of a group of input raster layers. If output EPSG is not defined, the most frequent EPSG is defined as default. If SRSs are not included in the raster datasets either set EPSG or WGS 84 (EPSG:4326; i.e. if epsg=None) is used for transformation of SRS. :param rasters_in: List of input rasters paths. :type rasters_in: list :param epsg: EPSG definition for output rasters. If epsg=None the most frequent EPSG in the layers group will be set. :type epsg: int :param tmp_out: Selection if the output layers will be created as temporal scratch layers or the original raster layers will be replaced by newly defined rasters. :type tmp_out: bool :return: List of output layers paths :rtype: list """ # TODO: poresit vyjimky - melo by smysl, aby bylo namisto vyjimky # jen varovani, s tim, ze by z procedury vypadly puvodni # neupraveny data? # TODO: pokud GDAL neumi pracovat s nekterymi epsg (treba # S-JTSK), tak by mozna bylo dobry zkusit QGIS QgsGeometryEngine # (https://qgis.org/api/classQgsGeometryEngine.html) # TODO: poresit mazani dat z tmp - # GDAL exceptions gdal_ver = gdal.__version__ gdal.UseExceptions() # Manage output list rasters_orig = copy.copy(rasters_in) rasters_out = copy.copy(rasters_in) # 1. Get rasters info EPSG_list = [] geoGCS_list = [] for i in rasters_in: ginfo = readGeo(i) EPSG_list.append(ginfo[4]) geoGCS_list.append(ginfo[5]) # Make temporary folder try: tmp_folder = tempfile.mkdtemp() except OSError as err: raise err # 2. Check SRSs # Convert EPSG to int for i in range(len(EPSG_list)): if EPSG_list[i] is None: EPSG_list[i] = 0 EPSGs_int = np.array(EPSG_list).astype(int) # list of layers without SRS noEPSG_index = [i for i, x in enumerate(geoGCS_list) if x is None] # 3. Check differences in the EPSGs if np.min(EPSGs_int) is not np.max(EPSGs_int): # --> different SRS # 4. testing if there are no EPSGs in the raster list if len(noEPSG_index) > 0: # Fix SRS rasters_in = fixSRS(rasters_in, tmp_out=True) for i in rasters_in: ginfo = readGeo(i) EPSG_list[rasters_in.index(i)] = ginfo[4] geoGCS_list[rasters_in.index(i)] = ginfo[5] else: pass # 5. Testing if no projection is set up else: if len(noEPSG_index) > 0: warnings.warn("The projection is not defined in any layer " "or only local projection is defined in all" "cases. The continuing process may cause " "an error in projection settings!", stacklevel=3) else: pass # 6. Is the input EPSG set up? # EPSG is not set if epsg is None: # list of layers without SRS noEPSG_index = [i for i, x in enumerate(geoGCS_list) if x is None] # Empty SRS for all the rasters --> WGS 84: if len(noEPSG_index) == len(EPSG_list): warnings.warn("The WGS 84 projection (EPSG:4326) will be " "set for all layers.", stacklevel=3) try: for i in range(len(EPSG_list)): tmp_file = tempfile.NamedTemporaryFile( suffix=".tif", dir=tmp_folder, delete=False) tmp_file_name = tmp_file.name # convert SRS --> create temporary file gdal.Warp(tmp_file_name, rasters_in[i], srcSRS=str("EPSG:4326"), warpOptions=["NUM_THREADS=ALL_CPUS"], multithread=True) # Write new files to the output layers path list rasters_out[i] = tmp_file_name except RuntimeError: raise RuntimeError("The input SRSs of rasters are" "probably not supported by GDAL " "version {gdv}".format(gdv=gdal_ver)) # EPSG with the highest frequency else: EPSG_maxfreq = max(set(EPSG_list), key=EPSG_list.count) warnings.warn("The projection EPSG:" + str(EPSG_maxfreq) + " will be set for all layers.", stacklevel=3) for i in range(len(EPSG_list)): try: tmp_file = tempfile.NamedTemporaryFile( suffix=".tif", dir=tmp_folder, delete=False) tmp_file_name = tmp_file.name # convert SRS --> create temporary file gdal.Warp(tmp_file_name, rasters_in[i], dstSRS=str("EPSG:" + str(EPSG_maxfreq)), warpOptions=["NUM_THREADS=ALL_CPUS"], multithread=True) # Write new files to the output layers path list rasters_out[i] = tmp_file_name except RuntimeError: raise RuntimeError("The input SRSs of rasters are" "probably not supported by GDAL " "version {gdv}".format( gdv=gdal_ver)) # EPSG is set else: # All layers have the same EPSG corresponding to input EPSG EPSG_cor = [x for x in EPSG_list if x is epsg] if len(EPSG_cor) is len(EPSG_list): rasters_out = rasters_in # Different or various SRS else: for i in range(len(EPSG_list)): try: tmp_file = tempfile.NamedTemporaryFile( suffix=".tif", dir=tmp_folder, delete=False) tmp_file_name = tmp_file.name # convert SRS --> create temporary file gdal.Warp(tmp_file_name, rasters_in[i], srcSRS=str("EPSG:" + str(EPSG_list[i])), dstSRS=str("EPSG:" + str(epsg)), warpOptions=["NUM_THREADS=ALL_CPUS"], multithread=True) # Write new files to the output layers path list rasters_out[i] = tmp_file_name except RuntimeError: raise RuntimeError("The input SRSs of rasters are" "probably not supported by GDAL " "version {gdv}".format( gdv=gdal_ver)) # If temporary model is not used, replace original files by # results and remove temporary files if tmp_out is False: for i in range(len(rasters_out)): if rasters_out[i] is not rasters_orig[i]: shutil.copy(rasters_out[i], rasters_orig[i]) rasters_out = rasters_orig # remove temporary files shutil.rmtree(tmp_folder) return rasters_out
# noinspection PyUnboundLocalVariable
[dokumentace]def clipOverlappingArea(rasters_in, output_folder=None, suffix='_clipped', epsg=None, tmp_out=False): """ Function for clipping rasters by their overlapping area. :param rasters_in: List of input rasters paths. :type rasters_in: list :param output_folder: Path to output folder defined by user :type output_folder: str :param suffix: Suffix of the output rasters names. The name of \ new raster is constructed from original name of raster and suffix :type suffix: str :param epsg: EPSG definition for output rasters. If epsg=None \ the most frequent EPSG in the layers group will be set. :type epsg: int :param tmp_out: If True the output layers will be created as \ temporal scratch layers. :type tmp_out: bool :return: Paths of rasters clipped by overlapping area of all the \ rasters :rtype: list """ # 1. Unify EPSG of data rasters_unif = uniformSRS(rasters_in, epsg, tmp_out=True) # Make temporary folder for outputs if tmp_out is True: try: tmp_folder = tempfile.mkdtemp() except OSError as err: raise err # 2. Looking for coordinates of overlapping area # Lists of coordinates: EPSG_list = [] ULC_X_list = [] # upper left corner Xmin ULC_Y_list = [] # upper left corner Ymax LRC_X_list = [] # lower right corner Xmax LRC_Y_list = [] # lower right corner Ymin X_size_list = [] Y_size_list = [] # extract geoinformation for i in rasters_unif: ginfo = readGeo(i) EPSG_list.append(ginfo[4]) cols = ginfo[7] rows = ginfo[8] ULC_X_list.append(ginfo[0][0]) LRC_X_list.append(ginfo[0][0] + ginfo[2] * cols) ULC_Y_list.append(ginfo[0][3]) LRC_Y_list.append(ginfo[0][3] - ginfo[3] * rows) X_size_list.append(ginfo[2]) Y_size_list.append(ginfo[3]) # extract the coordinates and pixel size for overlapping area ULC_X_ovlp = np.max(ULC_X_list) LRC_X_ovlp = np.min(LRC_X_list) ULC_Y_ovlp = np.min(ULC_Y_list) LRC_Y_ovlp = np.max(LRC_Y_list) X_size_ovlp = np.min(X_size_list) Y_size_ovlp = np.min(Y_size_list) # 3. Rescale all the rasters to the same spatial extent (the same # pixel size) and clip all the layers by the overlapping area out_files_list = [] for i in range(0, len(rasters_unif)): # Define paths of the output files if tmp_out is False: rast_in_name = os.path.splitext(rasters_in[i])[0] rast_name = rast_in_name + suffix + ".tif" out_file = os.path.join(output_folder, rast_name) out_files_list.append(out_file) else: tmp_file = tempfile.NamedTemporaryFile( suffix=".tif", dir=tmp_folder, delete=False) out_file = tmp_file.name out_files_list.append(out_file) # Clip files and change resolution gdal.Warp(out_file, rasters_unif[i], outputBounds=(ULC_X_ovlp, LRC_Y_ovlp, LRC_X_ovlp, ULC_Y_ovlp), xRes=X_size_ovlp, yRes=Y_size_ovlp, warpOptions=[ "NUM_THREADS=ALL_CPUS"], multithread=True) # TODO: je potreba otestovat, jestli nějakej raster není mimo # rozsah ostatnich rastru # TODO: poresit nedokonale prekryvy, tzn, ze je prekryvna plocha # napr. prazdna nebo tak podobne # TODO: zkontrolovat, jestli jsou souradnice stejne pro vsechny # vrstvy. Pokud ano, tak uz dal neprovadet vypocet --> # out_files_list = rasters_in # TODO: pridat vyjimky return out_files_list