前置

本人软件工程专业Web前端方向,非地理信息专业,未系统学过相关专业知识,开发此功能纯属工作遇到,现学现卖。文章中若用词不当或理解错误欢迎指正~~~

实现一个切片功能,进行批量切片后能通过cesium进行加载。

实现情况

文章基于python中的gdal库进行开发,对目标数据进行切片处理,处理过程中将采用经纬度模式进行切片,所以对于非wgs84地理坐标系下的影像将自动进行转换后切片。gdal官方api查看

完成:

  1. 可以指定文件或文件夹进行切片
  2. 可以指定文件夹保存切片数据
  3. 可批量进行切片操作
  4. wgs84地理坐标(EPSG:4326)的自动转换
  5. 切片日志简单输出(目前错误日志只捕获了无投影的错误)

存在问题:

  1. 目前无法对影像本身坐标投影缺失的数据进行切片(例如:高分原始数据的tif本身不带空间参考系,需要进行正射校正处理)。

  2. 对于非彩色的影像数据可能存在切出来失真严重。

详细实现

最终呈现

代码最终将进行打包成exe类型的可执行文件

image-20240531151408355

运行效果

image-20240531151538924

image-20240531151549850

切片保存情况

image-20240531151729431

image-20240531151804504

日志输出

image-20240531160138036

image-20240531160159836

代码模块

关键点记录

  • 数据输入

对于影像输入,采取两种形式

  1. 指定多个具体的影像。
  2. 给定一个路径,自动切片路径下所有GEOTIFF数据。

所以,为了打包后方便传参配置,将参数设定在一个json文件中,后面直接修改json文件中的参数。

如下json格式:

1
2
3
4
5
6
7
8
9
10
11
12
{
"type": "folder",
"max_level": 9,
"path_list":[
"D:\\Code\\work\\cut\\data_demo\\MULTI_FUSION_20240302_SDP_SDL_SSS_binary.tif",
"D:\\Code\\work\\cut\\data_demo\\MULTI_FUSION_20240303_SDP_SDL_SSS_binary.tif",
"D:\\Code\\work\\cut\\data_demo\\MULTI_FUSION_20240515_SDP_SDL_SST_binary.tif",
"D:\\Code\\work\\cut\\data_demo\\MULTI_FUSION_20240515_SDP_SDL_SST_binary - 2.tif"
],
"path_route":"D:\\Code\\work\\cut\\dist\\demo",
"output_route":"D:\\Code\\work\\cut\\data_demo_cut"
}

参数解释:

type:输入的参数形式,只接受两个值,一个folder和一个file。当输入file时,path_list将作为程序的输入数据源。当输入folder时,path_route路径下的所有GEOTIFF影像将作为输入源。

max_level:最大切片层级,填写auto可自动计算(仅作了简单计算,尽量自定义,避免计算值不准确)

path_list:列表的形式输入需要切片数据的路径

path_route:指定一个文件夹路径即可,程序会自动读取路径下的所有GEOTIFF影像

output_route:切片完成后保存的根路径,切片所生成的数据都将放在里面。切片输出时,默认每幅影像的文件名将作为保存该幅影像切片数据的文件夹名字

代码读取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def init_params():
"""
初始化参数
"""
temp_input_list = []
input_list = []

try:
with open('arg.json', 'r') as file:
json_data = json.load(file)

# 参数提取
o_type = json_data["type"]
path_list = json_data["path_list"]
path_route = json_data["path_route"]
output_route = json_data["output_route"]
max_level = json_data["max_level"]

# 类型校验
if o_type == "file": # 手动选择tif影像
if len(path_list) == 0:
exit_app("path数据为空,切片结束")
temp_input_list = path_list
elif o_type == "folder": # 手动选择路径,所有待切数据(tif)都在指定文件夹下一级(tif外无单独包裹的文件夹)
if os.path.exists(path_route) is False:
exit_app("路径不存在,请检查路径")
# 递归列出文件夹下的所有文件
for root, dirs, files in os.walk(path_route):
for file in files:
file_extension = os.path.splitext(os.path.basename(file))[1]
if file_extension in [".tiff", ".tif", ".TIF", ".TIFF"]:
temp_input_list.append(os.path.join(root, file))
else:
exit_app("type参数只支持:folder和file")

# max_level验证
if (max_level != "auto" and not isinstance(max_level, int)) or (isinstance(max_level, int) and max_level < 0):
exit_app(f"max_level参数支支持:auto和int类型")

# 数据检查是否地理坐标系4326
for f in temp_input_list:
pt = check_wgs84_coordinate_system(f)
if pt != "":
input_list.append(pt)
else:
with open(os.path.join(log_path, 'error.txt'), 'a') as file:
file.write(f"[切片数据]:{f} [error] 数据无坐标切片失败! \n")

# 输入影像
total_img = len(temp_input_list)
return input_list, output_route, total_img, max_level
except Exception as e:
exit_app(f"参数初始化错误!强制退出!Error:{e}")
  • 数据转换

对于有些墨卡托投影数据,gdal也有对应的切片方式,文本只研究了经纬度的,所以遇到非地理坐标系wgs84的需要进行投影转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def check_wgs84_coordinate_system(image_path):
# 创建零时文件存放转换数据
temp_path = os.path.join("temp", "tif")
if not os.path.exists(temp_path):
os.makedirs(temp_path)

dataset = gdal.Open(image_path, gdal.GA_ReadOnly)
if dataset is None:
print("影像打开失败!")
return ""

# 影像是否含有坐标系
projection = dataset.GetProjection()
geotransform = dataset.GetGeoTransform()
if projection is not None and geotransform is not None:
if projection == '':
return ''
else:
return ''

srs = osr.SpatialReference()
srs.ImportFromWkt(projection)

# 检查坐标系是否为WGS 84 (EPSG:4326)
if srs.GetAuthorityCode(None) != '4326' or srs.GetAttrValue("GEOGCS") != "WGS 84":
print(f"影像 {os.path.basename(image_path)} 不是地理坐标wgs84(EPSG:4326), 正在转换...")
new_file = os.path.join(temp_path,os.path.basename(image_path))
gdal.Warp(new_file, dataset, dstSRS="EPSG:4326")
print(f"影像 {os.path.basename(image_path)} 转换成功!")
return new_file
else:
return image_path

开发过程中遇到问题:对于高分的一些数据单独拿一个tif文件来说会出现没有坐标系信息,此类数据可能需要进行正射校正处理后方能切片。文章中直接规避了没有找到坐标系的数据。

  • 切片层级计算

切片层级按理来说需要自己指定,此处我仅作简单计算,适当推算符合我当前项目的一个层级。若不需要后面参数自行修改就行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def computed_max_level(ds, tile_size=256):
"""
计算最大切片范围
:param tile_size:
:param ds: gdal打开的数据
:return: 级别
"""
# 打开影像文件获取影像信息
width = ds.RasterXSize
height = ds.RasterYSize

# 计算影像的对角线分辨率
geotransform = ds.GetGeoTransform()

pixel_size_x = geotransform[1]
pixel_size_y = geotransform[5]
resolution = math.sqrt(pixel_size_x * pixel_size_x + pixel_size_y * pixel_size_y)

maxLevel = math.ceil(math.log2(max(width, height) / resolution))

# 计算每个瓦片的大小
for zoom_level in range(0, maxLevel + 1):
tiles_x = (width + tile_size * 2 ** zoom_level - 1) // (tile_size * 2 ** zoom_level)
tiles_y = (height + tile_size * 2 ** zoom_level - 1) // (tile_size * 2 ** zoom_level)
# print(f"Zoom Level {zoom_level}: Each tile size is {tiles_x}x{tiles_y}")
if tiles_x == 1 or tiles_y == 1:
maxLevel = zoom_level
if maxLevel < 7:
maxLevel = 7
break

# 计算最大缩放级别
return maxLevel

  • 切片参数设置

下面maxZoom即是我计算的最大层级,也可手动指定。参数中带*d的参数有待考证适用性,参数来源于网络。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
options = {
"zoom": (0, maxZoom), # 设置切片级别范围
"profile": "geodetic", # 使用地理坐标系
"s_srs": "EPSG:4326", # 设置输入数据的投影坐标系为经纬度坐标系
"tile_size": 256, # 设置瓦片大小为 256
"tmscompatible": True,
"resampling": "near", # 设置重采样方法
'np_processes': 2,
'kml': False, # ***
'srcnodata': None, # ***
'config': ['SRC_METHOD=NO_GEOTRANSFORM'], # ***
"bbox": bbox, # ***指定影像的地理范围
}

# 开始切片
gdal2tiles.generate_tiles(img_path, save_path, **options)

若不做异常和交互性处理,上面代码修改切片参数后可单独直接执行,若没有gdal2tiles块,可以手动安装

pip install gdal2tiles

完整代码

运行过程中需要注意在同级目录下创建一个arg.json文件,且文件中参数需要配置好。参数说明见上面参数输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
import json
import math
import os
import shutil
from osgeo import gdal, osr
import gdal2tiles
from concurrent.futures import ThreadPoolExecutor
import threading
import sys
import uuid


def init_params():
"""
初始化参数
"""
temp_input_list = []
input_list = []

try:
with open('arg.json', 'r') as file:
json_data = json.load(file)

# 参数提取
o_type = json_data["type"]
path_list = json_data["path_list"]
path_route = json_data["path_route"]
output_route = json_data["output_route"]
max_level = json_data["max_level"]

# 类型校验
if o_type == "file": # 手动选择tif影像
if len(path_list) == 0:
exit_app("path数据为空,切片结束")
temp_input_list = path_list
elif o_type == "folder": # 手动选择路径,所有待切数据(tif)都在指定文件夹下一级(tif外无单独包裹的文件夹)
if os.path.exists(path_route) is False:
exit_app("路径不存在,请检查路径")
# 递归列出文件夹下的所有文件
for root, dirs, files in os.walk(path_route):
for file in files:
file_extension = os.path.splitext(os.path.basename(file))[1]
if file_extension in [".tiff", ".tif", ".TIF", ".TIFF"]:
temp_input_list.append(os.path.join(root, file))
else:
exit_app("type参数只支持:folder和file")

# max_level验证
if (max_level != "auto" and not isinstance(max_level, int)) or (isinstance(max_level, int) and max_level < 0):
exit_app(f"max_level参数支支持:auto和int类型")

# 数据检查是否地理坐标系4326
for f in temp_input_list:
pt = check_wgs84_coordinate_system(f)
if pt != "":
input_list.append(pt)
else:
with open(os.path.join(log_path, 'error.txt'), 'a') as file:
file.write(f"[切片数据]:{f} [error] 数据无坐标切片失败! \n")

# 输入影像
total_img = len(temp_input_list)
return input_list, output_route, total_img, max_level
except Exception as e:
exit_app(f"参数初始化错误!强制退出!Error:{e}")


def check_wgs84_coordinate_system(image_path):
# 创建零时文件存放转换数据
temp_path = os.path.join("temp", "tif")
if not os.path.exists(temp_path):
os.makedirs(temp_path)

dataset = gdal.Open(image_path, gdal.GA_ReadOnly)
if dataset is None:
print("影像打开失败!")
return ""

# 影像是否含有坐标系
projection = dataset.GetProjection()
geo_transform = dataset.GetGeoTransform()
if projection is not None and geo_transform is not None:
if projection == '':
return ''
else:
return ''

srs = osr.SpatialReference()
srs.ImportFromWkt(projection)

# 检查坐标系是否为WGS 84 (EPSG:4326)
if srs.GetAuthorityCode(None) != '4326' or srs.GetAttrValue("GEOGCS") != "WGS 84":
print(f"影像 {os.path.basename(image_path)} 不是地理坐标wgs84(EPSG:4326), 正在转换...")
new_file = os.path.join(temp_path, os.path.basename(image_path))
gdal.Warp(new_file, dataset, dstSRS="EPSG:4326")
print(f"影像 {os.path.basename(image_path)} 转换成功!")
return new_file
else:
return image_path


def create_result_dir():
"""
创建日志目录文件
:return:
"""
try:
random_uuid = str(uuid.uuid4())
log_path_folder = os.path.join('log', random_uuid)
# 检查路径是否存在
if not os.path.exists(log_path_folder):
# 如果路径不存在,则创建
os.makedirs(log_path_folder)
return log_path_folder
else:
# 如果路径存在,重新创建
print("路径重复!")
create_result_dir()
except OSError as e:
print(f"路径创建出错: {e}")
except RecursionError:
print(f"")
exit_app("程序循环异常!强制退出!")


def delete_folder_contents(path):
"""
清空目录
:param path: 路径
"""
for item in os.listdir(path):
item_path = os.path.join(path, item)
if os.path.isfile(item_path):
os.remove(item_path)
elif os.path.isdir(item_path):
shutil.rmtree(item_path)


def check_and_create_path(target_path, source_path):
"""
保存路径检查,清空目录和创建目录(根据源文件名创建切片文件夹)
:param target_path: 保存的基础路径
:param source_path: 源文件路径
"""
filename = os.path.splitext(os.path.basename(source_path))[0]
# 瓦片数据保存路径
path = os.path.join(target_path, filename)
# 检查路径是否存在
if not os.path.exists(path):
# 如果路径不存在,则创建
try:
os.makedirs(path)
except OSError as e:
# 如果创建过程中出现错误(例如目录已存在),则打印错误信息并跳过
print(f"Error: {e}")
else:
# 如果路径存在,则清空原数据
delete_folder_contents(path)
# print("目录清空成功")
return path


def computed_max_level(ds, tile_size=256):
"""
计算最大切片范围(简单推算)
:param tile_size:
:param ds: gdal打开的数据
:return: 级别
"""

if max_zoom != "auto" and isinstance(max_zoom, int):
return max_zoom
# 打开影像文件获取影像信息
width = ds.RasterXSize
height = ds.RasterYSize
# 计算影像的对角线分辨率
geo_transform = ds.GetGeoTransform()
pixel_size_x = geo_transform[1]
pixel_size_y = geo_transform[5]
resolution = math.sqrt(pixel_size_x * pixel_size_x + pixel_size_y * pixel_size_y)
maxLevel = math.ceil(math.log2(max(width, height) / resolution))
# 计算每个瓦片的大小
for zoom_level in range(0, maxLevel + 1):
tiles_x = (width + tile_size * 2 ** zoom_level - 1) // (tile_size * 2 ** zoom_level)
tiles_y = (height + tile_size * 2 ** zoom_level - 1) // (tile_size * 2 ** zoom_level)
if tiles_x == 1 or tiles_y == 1:
maxLevel = zoom_level
if maxLevel < 7:
maxLevel = 7
break

# 计算最大缩放级别
return maxLevel


def get_bbox(ds):
"""
获取影像的地理范围
:param ds: gdal打开的数据
:return: 地理范围 tuple元组 (minx, miny, maxx, maxy)
"""
geo_transform = ds.GetGeoTransform()
minx = geo_transform[0]
maxy = geo_transform[3]
maxx = minx + geo_transform[1] * ds.RasterXSize
miny = maxy + geo_transform[5] * ds.RasterYSize

return minx, miny, maxx, maxy


def clear_files(path):
"""
清理切片生成的无关文件
:param path: 切片保存路径
:return:
"""
print("正在整理切片生成的文件,请等待...")
for root, dirs, files in os.walk(path):
for file in files:
filename, file_extension = os.path.splitext(file)
# 保留所有png和一个tilemapresource.xml文件
if file_extension != ".png" and filename != "tilemapresource":
file_path = os.path.join(root, file)
os.remove(file_path)
print("数据整理完毕!")

def start_cut_img(img_path):
"""
切片启动函数
:param img_path: 单张tif路径
:return:
"""
# 检查路径
save_path = check_and_create_path(save_base_path, img_path)
# 打开影像文件
img = gdal.Open(img_path)
# 最大的放缩层级
maxZoom = computed_max_level(img)
# 地理范围
bbox = get_bbox(img)

# 设置生成切片的参数
options = {
"zoom": (0, maxZoom), # 设置切片级别范围
"profile": "geodetic", # 使用地理坐标系
"s_srs": "EPSG:4326", # 设置输入数据的投影坐标系为经纬度坐标系
"tile_size": 256, # 设置瓦片大小为 256
"tmscompatible": True, # 使用TMS格式
"resampling": "near", # 设置重采样方法
'np_processes': 3,
'kml': False, # ***
'srcnodata': None, # ***
'config': ['SRC_METHOD=NO_GEOTRANSFORM'], # ***
"bbox": bbox, # ***指定影像的地理范围
}

print()
print('-------------------// 任务开始 //---------------------')
print(f'当前处理:{img_path}')
print(f'切片层级:0-{maxZoom}')
print(f'保存路径:{save_path}')

# 生成切片
gdal2tiles.generate_tiles(img_path, save_path, **options)
# 清理切片生成的无关文件
clear_files(save_path)
# 写入日志
with open(os.path.join(log_path, 'success.txt'), 'a') as file:
file.write(f"[切片数据]:{img_path} [保存路径]:{save_path} \n")
print('-------------------// 任务结束 //---------------------')


def generate_tiles_th(input_image):
"""
线程函数管理
:param input_image:
:return:
"""
with lock:
start_cut_img(input_image)


def exit_app(msg="", code=1):
"""
退出程序
:param code: 0 代表正常退出,其它值代码异常退出
:param msg: 退出前的提示
:return:q
"""
print()
print(msg)
print("按下任意键然后回车退出:")
# input()
print("正在清理临时目录文件夹...")
delete_folder_contents("temp")
print("清理完毕!")
sys.exit(code)


if __name__ == '__main__':
# 创建锁
lock = threading.Lock()
# 限制线程池的大小
max_workers = 3
# 创建日志目录文件
log_path = create_result_dir()
# 初始化必要参数(输入路径集合,保存切片的基础路径)
input_path_list, save_base_path, total, max_zoom = init_params()
# 使用线程池来同时执行多个generate_tiles函数
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(generate_tiles_th, input_path_list)

print()
print(f"所有任务执行完毕,共处理 {total} 幅影像 ,成功切片 {len(input_path_list)} 幅影像。详情日志:{os.path.join(os.getcwd(), log_path)}")
exit_app("", 0)

Cesium加载

假设你已经通过上面操作切片并保存

假设你已经通过nginx等软件发布了切片数据

假设你会cesium加载影像

假设我本地将切片发布在http://localhost:8901/cut/地址下

1
2
3
4
5
6
7
const tmsProvider = new Cesium.UrlTemplateImageryProvider({
url: "http://localhost:8901/cut/{z}/{x}/{reverseY}.png",
maximumLevel: 12,
minimumLevel: 0,
tilingScheme: new Cesium.GeographicTilingScheme()
});
map.imageryLayers.addImageryProvider(tmsProvider);

补充

Cesium加载地址转换

正常情况上面已经完成切片到加载业务,但是若有强制要求最终给前端的地址为xxx/{z}/{x}/{y}.png的格式时,需要额外处理。

原因

在用上面代码切片完成后,其切片方案中的切片 Y 坐标, 0 是最南端的切片,所以不作处理在cesium加载需要使用{reverseY},若是使用{y}则表示在切片方案中的切片 Y 坐标, 0 是最北端的切片。故,若需要使用后者需通过下面代码对y坐标进行重新计算和命名。

计算函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def recalculate(path):
"""
对y坐标从新计算(由于切片中 Y 坐标,0 是最南端的切片,与Cesium默认加载相悖),目的为了契合cesium的/{z}/{x}/{y}.png加载模式
:param path: 切片保存路径
:return:
"""
print("...开始y坐标重计算...")
n = 0
for item in os.listdir(path):
fir_path = os.path.join(path, item) # 构建完整的路径
if os.path.isdir(fir_path):
print(f'共{len(os.listdir(path)) - 2}级,当计算第{n}级,请等待!!!')
n += 1
for j in os.listdir(fir_path):
sec_path = os.path.join(fir_path, j) # 构建完整的路径
for k in os.listdir(sec_path):
file_name, file_extend = os.path.splitext(k)
if file_extend == ".png":
y = (1 << int(item)) - 1 - int(file_name)
os.rename(os.path.join(sec_path, k), os.path.join(sec_path, str(y)+'.png'))

print("...结束y坐标重计算...")

若需要加到上面完整代码中去,添加位置如下(记得函数也得导进去)

1
2
3
4
5
6
7
8
9
10
def start_cut_img(img_path):
...其余代码...
# 生成切片
gdal2tiles.generate_tiles(img_path, save_path, **options)
# 清理切片生成的无关文件
clear_files(save_path)
# 重新计算y坐标
recalculate(save_path)
# 写入日志
...其余代码...

去除切片黑边

逻辑为对每张图片进行计算,重置rgba值。针对png格式切片有效。

思路

理论上对影像本身进行黑色区的去除也可达到效果,本文暂不研究此法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def remove_black_from_png(input_path):
"""
去除切片黑边
:param input_path: 图片路径
"""
# 打开图片
image = Image.open(input_path)

# 转换为 RGBA 模式(如果不是的话)
if image.mode != 'RGBA':
image = image.convert('RGBA')

# 获取图片的像素数据
data = image.getdata()

# 创建一个新的像素数据列表,去除黑色像素
new_data = []
for item in data:
# 如果像素不是完全黑色,就添加到新的像素数据列表中
if item[0] != 0 or item[1] != 0 or item[2] != 0:
new_data.append(item)
else:
# 如果像素是完全黑色,就将其设置为透明
new_data.append((255, 255, 255, 0))

# 更新图片的像素数据
image.putdata(new_data)

# 保存处理后的图片
image.save(input_path, "PNG")