一直到这一步,如果读者对 ArcGIS Pro 的操作更加熟悉,其实也是可以直接在软件中完成的。可以后续再在 python 中读取 out_label_clip,进行下一步 API 的调用。
构造请求 url 百度批量算路 API 存在一些限制,但对于规划的城市研究情景来说,不是很重要。具体可以参考它的开发文档 。
在我们的代码中,构造的请求 url 形式如:
1 2 url = f'https://api.map.baidu.com/routematrix/v2/{trans_type} ? output=json&origins={origins} &destinations={des} &tactics={tactics} &coord_type=wgs84&ak={ak} '
其中,trans_type 可选 driving、walking、riding(注:riding 可以在 url 请求参数中进一步区分 riding_type 是自行车还是电动车,但摩托车需要向百度申请权限) ;tactics 对于驾车情景,是策略选择,具体的说明为:
1 2 3 4 10:不走高速; 11:常规路线,即多数用户常走的一条经验路线,满足大多数场景需求,是较推荐的一个策略; 12:距离较短(考虑路况):即距离相对较短的一条路线,但并不一定是一条优质路线。计算耗时时,考虑路况对耗时的影响; 13:距离较短(不考虑路况):路线同以上,但计算耗时时,不考虑路况对耗时的影响,可理解为在路况完全通畅时预计耗时。
百度注明,除 13 外,其他策略的耗时计算都考虑实时路况 。也就是说,在一天的早上、中午、晚上不同时间调用 API,由于每次都是按请求时的路况来计算,做出来的等时圈结果也是不一样的。
我们这次试试推荐策略 11,我是在下午 3 点左右运行的代码,此时应当并不会因为早晚高峰改变结果的普适性。
ak 则是你的 access key,可以前往百度地图开放平台的应用管理页面,创建一个应用,得到 ak。代码中不建议明文存储这类访问密钥,我们在工作文件夹下新建一个 .env 文件,写入 baidu_ak = "xxxxxx……(你的ak)",保存。
让 python 读入这个 ak。顺便,定一下我们的交通工具和策略吧。
1 2 3 4 5 6 7 8 from dotenv import load_dotenvimport osload_dotenv(".env" ) ak = os.getenv("baidu_ak" ) trans_type = 'driving' tactics = '11'
然后,我们根据 out_label_clip 中每个点的坐标,批量构造我们待会儿要请求的 url,把它们保存到字典里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import urllib.parseimport arcpydata_dict = {} des = f"{points[0 ][1 ]} ,{points[0 ][0 ]} " with arcpy.da.SearchCursor(out_label_clip, ["OBJECTID" , "SHAPE@XY" ]) as cursor: for oid, xy in cursor: lat, lon = xy[1 ], xy[0 ] coord_str = f"{lat} ,{lon} " data_dict[oid] = {"coord" : coord_str} base_url = f"https://api.map.baidu.com/routematrix/v2/{trans_type} " for oid, info in data_dict.items(): params = { "output" : "json" , "origins" : info["coord" ], "destinations" : des, "tactics" : tactics, "coord_type" : "wgs84" , "ak" : ak } query_string = urllib.parse.urlencode(params) info["url" ] = f"{base_url} ?{query_string} " print (f"构造了 URL 数量: {len (data_dict)} " )
批量调用 API 我们可以把调用得到的通行时间存进一张表格,这样方便在出错的时候也保留一部分数据,或者,分成好几天来请求百度给我数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 import asyncioimport aiohttpimport jsonimport csvimport osasync def perform_request (url ): """发送请求并返回文本内容;失败则返回 response.text 供错误输出""" async with aiohttp.ClientSession() as session: try : async with session.get(url) as response: text = await response.text() if response.status == 200 : return text else : return {"error" : True , "msg" : text} except Exception as e: return {"error" : True , "msg" : str (e)} async def limited_fetch (url, semaphore ): """限制并发""" async with semaphore: result = await perform_request(url) await asyncio.sleep(1 ) return result def get_time (content ): """从 JSON 内容解析 duration(分钟)""" try : item = json.loads(content) if item.get("status" ) == 0 : duration = item['result' ][0 ]['duration' ]['value' ] / 60 return round (duration, 1 ) else : return None except : return None async def main_fetch_to_csv (csv_path, data_dict ): """并发请求,写CSV,支持断点续跑""" csv_map = {} order_oids = [] if os.path.exists(csv_path): with open (csv_path, "r" , encoding="utf-8" ) as f: reader = csv.DictReader(f) for row in reader: oid_str = (row.get("oid" ) or "" ).strip() dur_str = (row.get("duration_min" ) or "" ).strip() if not oid_str: continue oid = int (oid_str) if oid not in csv_map: order_oids.append(oid) csv_map[oid] = dur_str def has_value (v ): return v not in ("" , None , "None" ) done_oids = {oid for oid, v in csv_map.items() if has_value(v)} print (f"已完成:{len (done_oids)} / {len (data_dict)} " ) if not os.path.exists(csv_path): with open (csv_path, "w" , newline='' , encoding="utf-8" ) as f: writer = csv.writer(f) writer.writerow(["oid" , "duration_min" ]) semaphore = asyncio.Semaphore(10 ) csv_lock = asyncio.Lock() def flush_csv (): with open (csv_path, "w" , newline='' , encoding="utf-8" ) as f: writer = csv.writer(f) writer.writerow(["oid" , "duration_min" ]) for oid in order_oids: writer.writerow([oid, csv_map.get(oid, "" )]) for oid in csv_map: if oid not in order_oids: writer.writerow([oid, csv_map[oid]]) async def request_task (oid, url ): if oid in done_oids: return result = await limited_fetch(url, semaphore) if isinstance (result, dict ) and result.get("error" ): print (f"[请求失败] oid={oid} → {result['msg' ]} " ) duration_min = None else : duration_min = get_time(result) async with csv_lock: if oid not in csv_map: order_oids.append(oid) csv_map[oid] = "" if duration_min is None else str (duration_min) flush_csv() print (f"[写入] oid={oid} , duration_min={duration_min} " ) tasks = [] for oid, info in data_dict.items(): if oid not in done_oids: tasks.append(asyncio.create_task(request_task(oid, info["url" ]))) if not tasks: print ("没有需要处理的 oid。" ) return await asyncio.gather(*tasks) print ("CSV 更新完成" ) csv_path = "durations.csv" await main_fetch_to_csv(csv_path, data_dict)
输出可能类似
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 已完成:0 / 4705 [写入] oid=6, duration_min=29.7 [写入] oid=5, duration_min=20.8 [写入] oid=8, duration_min=30.0 [写入] oid=10, duration_min=21.6 [写入] oid=2, duration_min=28.2 [写入] oid=1, duration_min=28.2 [写入] oid=7, duration_min=30.3 [写入] oid=9, duration_min=27.4 [写入] oid=3, duration_min=22.4 [写入] oid=4, duration_min=22.2 ......(省略) CSV 更新完成
你会注意到,oid 的更新并不是按序的。这是因为请求的返回时间并不一致,这并不影响结果。
这段代码是支持当再次调用时,继续从未得到结果的点位开始继续请求的。也就是说,如果你的研究区域比较大,突破了百度一天 5000 次请求的限制,可以分成好几天来运行……(我建议每天在同一个时刻运行)
另外百度可能会发邮件或短信提示你「使用的批量算路服务并发量已接近约定上限」,同时,有些 url 请求结果因此被限制而报错。前者不用理会,后者的话,这段代码结束运行之后,再多运行几次,会对没有得到结果的点位继续请求,最后总会得到完整的、包含所有终点的时间的结果的表格的。
使用请求结果更新 ArcGIS 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 def update_row (out_feature_class, oid, duration_min ): """更新游标,将duration_min填入time字段""" field_names = ["OBJECTID" ,"SHAPE@XY" , "time" ] where_clause= f"OBJECTID = {oid} " with arcpy.da.UpdateCursor(out_feature_class, field_names, where_clause) as cursor: for row in cursor: row[2 ] = duration_min cursor.updateRow(row) print (f"成功更新{oid} :{duration_min} 分钟" ) def main_update_from_csv (csv_path ): fcs = out_label_clip ws = arcpy.env.workspace if not arcpy.ListFields(fcs, "time" ): arcpy.management.AddField(fcs, "time" , "DOUBLE" ) with arcpy.da.Editor(ws) as edit: with open (csv_path, "r" , encoding="utf-8" ) as f: reader = csv.DictReader(f) for row in reader: oid = int (row["oid" ]) if row["oid" ] else None duration_min = float (row["duration_min" ]) if row["duration_min" ] not in (None , "" , "None" ) else None if oid is not None and duration_min is not None : update_row(fcs, oid, duration_min) main_update_from_csv(csv_path)
绘制等时圈 我们得到的是许许多多个点的时间,然后,我们要进行插值 ,得到面状的时间分布(好抽象,其实意思是,我们要为没有测量值的点来「预测」值,这个预测是使用周边测量点的已有值来预测的)。
插值的算法还蛮多的,我们就用反距离权重法吧。
1 2 3 4 5 6 7 8 Idw_raster = arcpy.sa.Idw( in_point_features=out_label_clip, z_field="time" , cell_size=7.99999999999272E-05 , power=2 , search_radius="VARIABLE 12" , in_barrier_polyline_features=None )
我们会得到一张漂亮的栅格。我们把它裁剪到研究区域。
1 2 3 4 5 6 7 8 arcpy.management.Clip( in_raster=Idw_raster, out_raster="Idw_raster_clip" , in_template_dataset="study_area" , nodata_value="3.4e+38" , clipping_geometry="NONE" , maintain_clipping_extent="NO_MAINTAIN_EXTENT" )
大功告成!现在,我们可以打开 ArcGIS Pro。找到地理数据库中剪裁后的栅格,把它拖到地图上,然后在符号系统中调整一下不同交通时间段的颜色,就可以看到漂亮的等时图了~