| import json import math import os import shutil from osgeo import gdal, osr import gdal2tiles from concurrent.futures import ThreadPoolExecutor import threading import sys import uuid
def init_params(): """ 初始化参数 """ temp_input_list = [] input_list = []
try: with open('arg.json', 'r') as file: json_data = json.load(file)
o_type = json_data["type"] path_list = json_data["path_list"] path_route = json_data["path_route"] output_route = json_data["output_route"] max_level = json_data["max_level"]
if o_type == "file": if len(path_list) == 0: exit_app("path数据为空,切片结束") temp_input_list = path_list elif o_type == "folder": if os.path.exists(path_route) is False: exit_app("路径不存在,请检查路径") for root, dirs, files in os.walk(path_route): for file in files: file_extension = os.path.splitext(os.path.basename(file))[1] if file_extension in [".tiff", ".tif", ".TIF", ".TIFF"]: temp_input_list.append(os.path.join(root, file)) else: exit_app("type参数只支持:folder和file")
if (max_level != "auto" and not isinstance(max_level, int)) or (isinstance(max_level, int) and max_level < 0): exit_app(f"max_level参数支支持:auto和int类型")
for f in temp_input_list: pt = check_wgs84_coordinate_system(f) if pt != "": input_list.append(pt) else: with open(os.path.join(log_path, 'error.txt'), 'a') as file: file.write(f"[切片数据]:{f} [error] 数据无坐标切片失败! \n")
total_img = len(temp_input_list) return input_list, output_route, total_img, max_level except Exception as e: exit_app(f"参数初始化错误!强制退出!Error:{e}")
def check_wgs84_coordinate_system(image_path): temp_path = os.path.join("temp", "tif") if not os.path.exists(temp_path): os.makedirs(temp_path)
dataset = gdal.Open(image_path, gdal.GA_ReadOnly) if dataset is None: print("影像打开失败!") return ""
projection = dataset.GetProjection() geo_transform = dataset.GetGeoTransform() if projection is not None and geo_transform is not None: if projection == '': return '' else: return ''
srs = osr.SpatialReference() srs.ImportFromWkt(projection)
if srs.GetAuthorityCode(None) != '4326' or srs.GetAttrValue("GEOGCS") != "WGS 84": print(f"影像 {os.path.basename(image_path)} 不是地理坐标wgs84(EPSG:4326), 正在转换...") new_file = os.path.join(temp_path, os.path.basename(image_path)) gdal.Warp(new_file, dataset, dstSRS="EPSG:4326") print(f"影像 {os.path.basename(image_path)} 转换成功!") return new_file else: return image_path
def create_result_dir(): """ 创建日志目录文件 :return: """ try: random_uuid = str(uuid.uuid4()) log_path_folder = os.path.join('log', random_uuid) if not os.path.exists(log_path_folder): os.makedirs(log_path_folder) return log_path_folder else: print("路径重复!") create_result_dir() except OSError as e: print(f"路径创建出错: {e}") except RecursionError: print(f"") exit_app("程序循环异常!强制退出!")
def delete_folder_contents(path): """ 清空目录 :param path: 路径 """ for item in os.listdir(path): item_path = os.path.join(path, item) if os.path.isfile(item_path): os.remove(item_path) elif os.path.isdir(item_path): shutil.rmtree(item_path)
def check_and_create_path(target_path, source_path): """ 保存路径检查,清空目录和创建目录(根据源文件名创建切片文件夹) :param target_path: 保存的基础路径 :param source_path: 源文件路径 """ filename = os.path.splitext(os.path.basename(source_path))[0] path = os.path.join(target_path, filename) if not os.path.exists(path): try: os.makedirs(path) except OSError as e: print(f"Error: {e}") else: delete_folder_contents(path) return path
def computed_max_level(ds, tile_size=256): """ 计算最大切片范围(简单推算) :param tile_size: :param ds: gdal打开的数据 :return: 级别 """
if max_zoom != "auto" and isinstance(max_zoom, int): return max_zoom width = ds.RasterXSize height = ds.RasterYSize geo_transform = ds.GetGeoTransform() pixel_size_x = geo_transform[1] pixel_size_y = geo_transform[5] resolution = math.sqrt(pixel_size_x * pixel_size_x + pixel_size_y * pixel_size_y) maxLevel = math.ceil(math.log2(max(width, height) / resolution)) for zoom_level in range(0, maxLevel + 1): tiles_x = (width + tile_size * 2 ** zoom_level - 1) // (tile_size * 2 ** zoom_level) tiles_y = (height + tile_size * 2 ** zoom_level - 1) // (tile_size * 2 ** zoom_level) if tiles_x == 1 or tiles_y == 1: maxLevel = zoom_level if maxLevel < 7: maxLevel = 7 break
return maxLevel
def get_bbox(ds): """ 获取影像的地理范围 :param ds: gdal打开的数据 :return: 地理范围 tuple元组 (minx, miny, maxx, maxy) """ geo_transform = ds.GetGeoTransform() minx = geo_transform[0] maxy = geo_transform[3] maxx = minx + geo_transform[1] * ds.RasterXSize miny = maxy + geo_transform[5] * ds.RasterYSize
return minx, miny, maxx, maxy
def clear_files(path): """ 清理切片生成的无关文件 :param path: 切片保存路径 :return: """ print("正在整理切片生成的文件,请等待...") for root, dirs, files in os.walk(path): for file in files: filename, file_extension = os.path.splitext(file) if file_extension != ".png" and filename != "tilemapresource": file_path = os.path.join(root, file) os.remove(file_path) print("数据整理完毕!")
def start_cut_img(img_path): """ 切片启动函数 :param img_path: 单张tif路径 :return: """ save_path = check_and_create_path(save_base_path, img_path) img = gdal.Open(img_path) maxZoom = computed_max_level(img) bbox = get_bbox(img)
options = { "zoom": (0, maxZoom), "profile": "geodetic", "s_srs": "EPSG:4326", "tile_size": 256, "tmscompatible": True, "resampling": "near", 'np_processes': 3, 'kml': False, 'srcnodata': None, 'config': ['SRC_METHOD=NO_GEOTRANSFORM'], "bbox": bbox, }
print() print('-------------------// 任务开始 //---------------------') print(f'当前处理:{img_path}') print(f'切片层级:0-{maxZoom}') print(f'保存路径:{save_path}')
gdal2tiles.generate_tiles(img_path, save_path, **options) clear_files(save_path) with open(os.path.join(log_path, 'success.txt'), 'a') as file: file.write(f"[切片数据]:{img_path} [保存路径]:{save_path} \n") print('-------------------// 任务结束 //---------------------')
def generate_tiles_th(input_image): """ 线程函数管理 :param input_image: :return: """ with lock: start_cut_img(input_image)
def exit_app(msg="", code=1): """ 退出程序 :param code: 0 代表正常退出,其它值代码异常退出 :param msg: 退出前的提示 :return:q """ print() print(msg) print("按下任意键然后回车退出:") print("正在清理临时目录文件夹...") delete_folder_contents("temp") print("清理完毕!") sys.exit(code)
if __name__ == '__main__': lock = threading.Lock() max_workers = 3 log_path = create_result_dir() input_path_list, save_base_path, total, max_zoom = init_params() with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map(generate_tiles_th, input_path_list)
print() print(f"所有任务执行完毕,共处理 {total} 幅影像 ,成功切片 {len(input_path_list)} 幅影像。详情日志:{os.path.join(os.getcwd(), log_path)}") exit_app("", 0)