1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
| import json import math import os import shutil from osgeo import gdal, osr import gdal2tiles from concurrent.futures import ThreadPoolExecutor import threading import sys import uuid
def init_params(): """ 初始化参数 """ temp_input_list = [] input_list = []
try: with open('arg.json', 'r') as file: json_data = json.load(file)
o_type = json_data["type"] path_list = json_data["path_list"] path_route = json_data["path_route"] output_route = json_data["output_route"] max_level = json_data["max_level"]
if o_type == "file": if len(path_list) == 0: exit_app("path数据为空,切片结束") temp_input_list = path_list elif o_type == "folder": if os.path.exists(path_route) is False: exit_app("路径不存在,请检查路径") for root, dirs, files in os.walk(path_route): for file in files: file_extension = os.path.splitext(os.path.basename(file))[1] if file_extension in [".tiff", ".tif", ".TIF", ".TIFF"]: temp_input_list.append(os.path.join(root, file)) else: exit_app("type参数只支持:folder和file")
if (max_level != "auto" and not isinstance(max_level, int)) or (isinstance(max_level, int) and max_level < 0): exit_app(f"max_level参数支支持:auto和int类型")
for f in temp_input_list: pt = check_wgs84_coordinate_system(f) if pt != "": input_list.append(pt) else: with open(os.path.join(log_path, 'error.txt'), 'a') as file: file.write(f"[切片数据]:{f} [error] 数据无坐标切片失败! \n")
total_img = len(temp_input_list) return input_list, output_route, total_img, max_level except Exception as e: exit_app(f"参数初始化错误!强制退出!Error:{e}")
def check_wgs84_coordinate_system(image_path): temp_path = os.path.join("temp", "tif") if not os.path.exists(temp_path): os.makedirs(temp_path)
dataset = gdal.Open(image_path, gdal.GA_ReadOnly) if dataset is None: print("影像打开失败!") return ""
projection = dataset.GetProjection() geo_transform = dataset.GetGeoTransform() if projection is not None and geo_transform is not None: if projection == '': return '' else: return ''
srs = osr.SpatialReference() srs.ImportFromWkt(projection)
if srs.GetAuthorityCode(None) != '4326' or srs.GetAttrValue("GEOGCS") != "WGS 84": print(f"影像 {os.path.basename(image_path)} 不是地理坐标wgs84(EPSG:4326), 正在转换...") new_file = os.path.join(temp_path, os.path.basename(image_path)) gdal.Warp(new_file, dataset, dstSRS="EPSG:4326") print(f"影像 {os.path.basename(image_path)} 转换成功!") return new_file else: return image_path
def create_result_dir(): """ 创建日志目录文件 :return: """ try: random_uuid = str(uuid.uuid4()) log_path_folder = os.path.join('log', random_uuid) if not os.path.exists(log_path_folder): os.makedirs(log_path_folder) return log_path_folder else: print("路径重复!") create_result_dir() except OSError as e: print(f"路径创建出错: {e}") except RecursionError: print(f"") exit_app("程序循环异常!强制退出!")
def delete_folder_contents(path): """ 清空目录 :param path: 路径 """ for item in os.listdir(path): item_path = os.path.join(path, item) if os.path.isfile(item_path): os.remove(item_path) elif os.path.isdir(item_path): shutil.rmtree(item_path)
def check_and_create_path(target_path, source_path): """ 保存路径检查,清空目录和创建目录(根据源文件名创建切片文件夹) :param target_path: 保存的基础路径 :param source_path: 源文件路径 """ filename = os.path.splitext(os.path.basename(source_path))[0] path = os.path.join(target_path, filename) if not os.path.exists(path): try: os.makedirs(path) except OSError as e: print(f"Error: {e}") else: delete_folder_contents(path) return path
def computed_max_level(ds, tile_size=256): """ 计算最大切片范围(简单推算) :param tile_size: :param ds: gdal打开的数据 :return: 级别 """
if max_zoom != "auto" and isinstance(max_zoom, int): return max_zoom width = ds.RasterXSize height = ds.RasterYSize geo_transform = ds.GetGeoTransform() pixel_size_x = geo_transform[1] pixel_size_y = geo_transform[5] resolution = math.sqrt(pixel_size_x * pixel_size_x + pixel_size_y * pixel_size_y) maxLevel = math.ceil(math.log2(max(width, height) / resolution)) for zoom_level in range(0, maxLevel + 1): tiles_x = (width + tile_size * 2 ** zoom_level - 1) // (tile_size * 2 ** zoom_level) tiles_y = (height + tile_size * 2 ** zoom_level - 1) // (tile_size * 2 ** zoom_level) if tiles_x == 1 or tiles_y == 1: maxLevel = zoom_level if maxLevel < 7: maxLevel = 7 break
return maxLevel
def get_bbox(ds): """ 获取影像的地理范围 :param ds: gdal打开的数据 :return: 地理范围 tuple元组 (minx, miny, maxx, maxy) """ geo_transform = ds.GetGeoTransform() minx = geo_transform[0] maxy = geo_transform[3] maxx = minx + geo_transform[1] * ds.RasterXSize miny = maxy + geo_transform[5] * ds.RasterYSize
return minx, miny, maxx, maxy
def clear_files(path): """ 清理切片生成的无关文件 :param path: 切片保存路径 :return: """ print("正在整理切片生成的文件,请等待...") for root, dirs, files in os.walk(path): for file in files: filename, file_extension = os.path.splitext(file) if file_extension != ".png" and filename != "tilemapresource": file_path = os.path.join(root, file) os.remove(file_path) print("数据整理完毕!")
def start_cut_img(img_path): """ 切片启动函数 :param img_path: 单张tif路径 :return: """ save_path = check_and_create_path(save_base_path, img_path) img = gdal.Open(img_path) maxZoom = computed_max_level(img) bbox = get_bbox(img)
options = { "zoom": (0, maxZoom), "profile": "geodetic", "s_srs": "EPSG:4326", "tile_size": 256, "tmscompatible": True, "resampling": "near", 'np_processes': 3, 'kml': False, 'srcnodata': None, 'config': ['SRC_METHOD=NO_GEOTRANSFORM'], "bbox": bbox, }
print() print('-------------------// 任务开始 //---------------------') print(f'当前处理:{img_path}') print(f'切片层级:0-{maxZoom}') print(f'保存路径:{save_path}')
gdal2tiles.generate_tiles(img_path, save_path, **options) clear_files(save_path) with open(os.path.join(log_path, 'success.txt'), 'a') as file: file.write(f"[切片数据]:{img_path} [保存路径]:{save_path} \n") print('-------------------// 任务结束 //---------------------')
def generate_tiles_th(input_image): """ 线程函数管理 :param input_image: :return: """ with lock: start_cut_img(input_image)
def exit_app(msg="", code=1): """ 退出程序 :param code: 0 代表正常退出,其它值代码异常退出 :param msg: 退出前的提示 :return:q """ print() print(msg) print("按下任意键然后回车退出:") print("正在清理临时目录文件夹...") delete_folder_contents("temp") print("清理完毕!") sys.exit(code)
if __name__ == '__main__': lock = threading.Lock() max_workers = 3 log_path = create_result_dir() input_path_list, save_base_path, total, max_zoom = init_params() with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map(generate_tiles_th, input_path_list)
print() print(f"所有任务执行完毕,共处理 {total} 幅影像 ,成功切片 {len(input_path_list)} 幅影像。详情日志:{os.path.join(os.getcwd(), log_path)}") exit_app("", 0)
|