Zdrojový kód pro modules.hydrIO

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

#  Project: RadAgro
#  File: hydrIO.py
#
#  Author: Dr. Jakub Brom
#
#  Copyright (c) 2020-2021. Dr. Jakub Brom, University of South
#  Bohemia in České Budějovice, Faculty of Agriculture.
#
#  This program is free software: you can redistribute it and/or modify
#      it under the terms of the GNU General Public License as published by
#      the Free Software Foundation, either version 3 of the License, or
#      (at your option) any later version.
#
#      This program is distributed in the hope that it will be useful,
#      but WITHOUT ANY WARRANTY; without even the implied warranty of
#      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#      GNU General Public License for more details.
#
#      You should have received a copy of the GNU General Public License
#      along with this program.  If not, see <https://www.gnu.org/licenses/>
#
#  Last changes: 07.01.21 1:37
#
# Date: 2018/11/08
#
# Description:


# imports
import os
import tempfile
import shutil
import glob

import numpy as np

from osgeo import gdal, osr, ogr
from qgis.PyQt.QtCore import QVariant
from qgis.core import QgsVectorLayer, QgsField, QgsVectorFileWriter, \
	QgsProject


[dokumentace]def rasterToArray(layer): """Conversion of raster layer to numpy array. :param layer: Path to raster layer. :type layer: str :return: raster file converted to numpy array """ try: if layer is not None: ds = gdal.Open(layer) in_layer = gdal.Dataset.ReadAsArray(ds).astype( np.float32) ds = None else: in_layer = None return in_layer except FileNotFoundError: in_layer = None return in_layer
[dokumentace]def readGeo(rast): """Reading important geographical information from raster using GDAL. :param rast: Path to raster file in GDAL accepted format. :returns: The tuple of important geographic information about a \ raster. The tuple contains: \n * The affine transformation coefficients (tuple) * Projection information of the raster or dataset (str) * Size of pixel at X scale (float) * Size of pixel at Y scale (float) * EPSG Geodetic Parameter Set code (int) """ ds = gdal.Open(rast) gtransf = ds.GetGeoTransform() ds_s = ds.GetRasterBand(1) prj = ds.GetProjection() xSize = gtransf[1] ySize = gtransf[5] * (-1) srs = osr.SpatialReference(wkt=prj) if srs.IsProjected: EPSG = int(srs.GetAttrValue("authority", 1)) else: EPSG = None ds = None return (gtransf, prj, xSize, ySize, EPSG)
[dokumentace]def arrayToRast(arrays, names, prj, gtransf, EPSG, out_folder, out_file_name=None, driver_name="GTiff", multiband=False): """Export numpy 2D arrays to multiband or singleband raster files. Following common raster formats are accepted for export: \n * ENVI .hdr labeled raster format * Erdas Imagine (.img) raster format * Idrisi raster format (.rst) * TIFF / BigTIFF / GeoTIFF (.tif) raster format * PCI Geomatics Database File (.pix) raster format :param arrays: Numpy array or list of arrays for export to raster. :type arrays: numpy.ndarray or list of numpy.ndarray :param names: Name or list of names of the exported bands (in case of multiband raster) or particular rasters (in case of singleband rasters). :type names: str or list of str :param prj: Projection information of the exported raster (dataset). :type prj: str :param gtransf: The affine transformation coefficients. :type gtransf: tuple :param EPSG: EPSG Geodetic Parameter Set code. :type EPSG: int :param out_folder: Path to folder where the raster(s) will be created. :type out_folder: str :param driver_name: GDAL driver. 'GTiff' is default. :type driver_name: str :param out_file_name: Name of exported multiband raster. Default is None. :type out_file_name: str :param multiband: Option of multiband raster creation. Default is False. :type multiband: bool :returns: Raster singleband or multiband file(s) :rtype: raster """ # Convert arrays and names on list if type(arrays) is not list: arr_list = [] arr_list.append(arrays) arrays = arr_list if type(names) is not list: names_list = [] names_list.append(names) names = names_list if out_file_name is None: out_file_name = "" multiband = False # Drivers and suffixes driver_list = ["ENVI", "HFA", "RST", "GTiff", "PCIDSK"] # GDAL driver for output files out_suffixes = ["", ".img", ".rst", ".tif", ".pix"] # Suffixes of output names # Test driver if driver_name not in driver_list: raise ValueError("Unknown driver. Data could not be exported.") driver_index = driver_list.index(driver_name) suffix = out_suffixes[driver_index] if multiband == True and driver_name != "RST": out_file_name, ext = os.path.splitext(out_file_name) out_file = os.path.join(out_folder, out_file_name + suffix) try: driver = gdal.GetDriverByName(driver_name) ds = driver.Create(out_file, arrays[0].shape[1], arrays[0].shape[0], len(arrays), gdal.GDT_Float32) ds.SetProjection(prj) ds.SetGeoTransform(gtransf) if EPSG is not None: outRasterSRS = osr.SpatialReference() outRasterSRS.ImportFromEPSG(EPSG) ds.SetProjection(outRasterSRS.ExportToWkt()) j = 1 for i in arrays: ds.GetRasterBand(j).WriteArray(i) ds.GetRasterBand(j).SetDescription(names[j - 1]) ds.GetRasterBand(j).SetMetadataItem("Band name", names[j - 1]) ds.GetRasterBand(j).FlushCache() j = j + 1 ds = None except Exception: raise Exception("Raster file %s has not been created." % ( out_file_name + suffix)) else: for i in range(0, len(arrays)): try: out_file_name, ext = os.path.splitext(names[i]) out_file = os.path.join(out_folder, out_file_name + suffix) driver = gdal.GetDriverByName(driver_name) ds = driver.Create(out_file, arrays[i].shape[1], arrays[i].shape[0], 1, gdal.GDT_Float32) ds.SetProjection(prj) ds.SetGeoTransform(gtransf) if EPSG is not None: outRasterSRS = osr.SpatialReference() outRasterSRS.ImportFromEPSG(EPSG) ds.SetProjection(outRasterSRS.ExportToWkt()) ds.GetRasterBand(1).WriteArray(arrays[i]) ds.GetRasterBand(1).SetDescription(names[i]) ds.GetRasterBand(1).SetMetadataItem("Band name", names[i]) ds.GetRasterBand(1).FlushCache() ds = None except Exception: raise Exception("Raster file %s has not been created." % ( names[i] + suffix)) return
[dokumentace]def readLatLong(rast_path): """Automatic setting of the lyrs coordinates according to the projection of NIR band in to the form. :param rast_path: Raster path. :return: Longitude and latitude in decimal scale """ inputEPSG = None if rast_path == "" or rast_path is None: raise IOError("Path to raster has not been set.") else: try: ds = gdal.Open(rast_path) gtransf = ds.GetGeoTransform() leftCornerX = gtransf[0] # X position of left corner of the layer leftCornerY = gtransf[3] # Y position of left corner of the layer xSize = gtransf[1] # pixel X size ySize = gtransf[5] # pixel Y size cols = ds.RasterXSize # No. of columns rows = ds.RasterYSize # No. of rows pointX = leftCornerX + ( xSize * cols) / 2 # X position of the middle of the layer pointY = leftCornerY + ( ySize * rows) / 2 # Y position of the middle of the layer prj = ds.GetProjection() ds = None # Spatial Reference System srs = osr.SpatialReference(wkt=prj) if srs.IsProjected: inputEPSG = int(srs.GetAttrValue("authority", 1)) outputEPSG = int(4326) # WGS84 # create a geometry from coordinates point = ogr.Geometry(ogr.wkbPoint) point.AddPoint(pointX, pointY) # create coordinate transformation inSpatialRef = osr.SpatialReference() inSpatialRef.ImportFromEPSG(inputEPSG) outSpatialRef = osr.SpatialReference() outSpatialRef.ImportFromEPSG(outputEPSG) coordTransform = osr.CoordinateTransformation( inSpatialRef, outSpatialRef) # transform point point.Transform(coordTransform) # Coords in EPSG 4326 long_dec = point.GetX() lat_dec = point.GetY() return long_dec, lat_dec except IOError: raise IOError("Latitude and longitude of the raster has" "not been calculated. Input raster probably " "has no geographical reference.")
[dokumentace]def joinLyrWithDataFrame(in_layer_path, df_data, out_layer_path): """ Create GeoPackage layer from input vector data for crops and data of radioactive contamination stored in pandas dataframe. :param in_layer_path: Path to original input vector layer which is \ used as a template for a new layer. :type in_layer_path: str :param df_data: Joining Pandas dataframe corresponding with \ vector_layer. FID values must be equal. :type df_data: Pandas DataFrame :param out_layer_path: Path to output vector file. :type out_layer_path: str """ # Translate input layer to gpkg format and create new vector # instance # TODO......... # Copy input layer to tmp ## Create temp folder tmp_folder = tempfile.mkdtemp() ## Path to all files of input vector and copy to tmp_folder in_lyr_base = in_layer_path.split(".")[0] for filepath in glob.glob(in_lyr_base + ".*"): in_file_name = os.path.basename(filepath) new_file_path = os.path.join(tmp_folder, in_file_name) shutil.copy(filepath, new_file_path) in_lyr_name = os.path.basename(in_layer_path) new_in_layer_path = os.path.join(tmp_folder, in_lyr_name) # Import input layer in_layer = QgsVectorLayer(new_in_layer_path, "Input_layer", "ogr") in_layer.isValid() # Remove fid field from layer if occurs flist = [] try: for field in in_layer.fields(): flist.append(field.name()) ind = flist.index("fid") in_layer.dataProvider().deleteAttributes([ind]) in_layer.updateFields() except Exception: pass # # Convert input layer to GPKG save_options = QgsVectorFileWriter.SaveVectorOptions() transform_context = QgsProject.instance().transformContext() QgsVectorFileWriter.writeAsVectorFormatV2(in_layer, out_layer_path, transform_context, save_options) # # ogr2ogr.main(["", "-f", "GPKG", "-nln", "Radioactive contamination", # out_layer_path, in_layer_path]) # try: # os.system("python3 " + ogr2ogr_path + " -f GPKG -nln " # "'Radioactive contamination' " + # out_layer_path + " " + in_layer_path) # # except Exception: # # os.system("ogr2ogr -f GPKG -nln 'Radioactive contamination' " + # out_layer_path + " " + in_layer_path) vector_layer = QgsVectorLayer(out_layer_path, "New_layer", "ogr") # Remove all columns count = vector_layer.fields().count() ind_list = list(range(0, count)) vector_layer.dataProvider().deleteAttributes(ind_list) vector_layer.updateFields() # Add new fields from df no_cols = df_data.shape[1] field_names = list(df_data.columns) field_names[0] = "fid2" fields_list = [QgsField(field_names[1], QVariant.String), QgsField(field_names[2], QVariant.Double), QgsField(field_names[3], QVariant.LongLong), QgsField(field_names[4], QVariant.String)] for i in range(5,no_cols): fields_list.append(QgsField(field_names[i], QVariant.Double)) vector_layer.dataProvider().addAttributes(fields_list) vector_layer.updateFields() # Get FID data from vector layer fid = [] for feature in vector_layer.getFeatures(): fid.append(feature.id()) # Get data from Pandas df and add them to layer vector_layer.startEditing() for i in range(len(fid)): # Get data form df - index i is from 1 features_data = list(df_data.iloc[i, :])[1::] # Set data to vector layer for j in range(len(features_data)): vector_layer.changeAttributeValue( fid[i], j+1, str(features_data[j])) vector_layer.commitChanges() try: shutil.rmtree(tmp_folder) except Exception: pass return