别再用过时的地区数据了!闸北区都消失了,教你一次性搞定省市区同步更新!(附实战源码)
最近在工作中要做一个功能:需要把地区表更新为最新的行政划分。 我们的地区表中现在都还保留着闸北区,然而,现在上海市早就没有使用闸北区这个名称了。用户也不止一次两次吐槽我们的地址选项,需要的地区没得选,早就没用的地区依旧还留着,感觉我们的 APP 仿佛还停留在上个世纪一样。 于是,近期就得赶紧把地区表中的省市区更新一下。然而我发现,做这个功能还是有很多细节需要仔细考虑一下的。这篇文章记录了我的思考过程,也附上了我的代码,供各位参考。当然,如果有思考不周全的地方,也欢迎各位提建议进行指导。 首先,就是最新的省市区行政规划数据从何而来?可能很多同学立马就想到了可以从 GitHub 上获取,是的,GitHub 上有很多作者都有维护省市区数据,那么,你有没有想过他们的这些数据又从何而来呢?数据的真实性又怎么样呢? 其实,我们还是需要一份权威的数据,我建议可以直接从政府官方数据上获取。 现在,数据真实性已经搞定了,但是我们发现上面的数据还需要我们进一步爬取才可以使用,那么,有没有直接帮我们爬取好的数据,我们可以拿过来就直接使用的呢?当然有! 我是直接采用的这个开源库,这个仓库直接提供了 SQL 文件,数据源也是来源于政府官方,完全符合我的要求。 于是,我就直接下载了 地区表并非热点数据,数据更新频次并不会很高,因此,我的思路是直接将最新的省市区数据在我本地更新好了之后,再将更新好的地区表同步到生产环境中,这样对生产环境影响是最小的,也是最安全的一种方式。 从 GitHub 上获得的数据和我使用的地区表中的结构有一些差异。对比一下 DDL。 这是从 GitHub 上获得的数据结构: 这个是我们公司目前正在用的地区表数据结构: 父子级数据在每张表中都可以找出来,但是我们地区表 现在数据已经有了,那么应该如何进行对比呢? 我们先来一起看看数据是怎样的, 我们的目标是:需要将最新的省市区数据 area_latest_2024 更新到陈旧的 area 表中去,并且还需要保证 area 表层级结构不能变。 但是我们可以发现, 那么,现在摆在我们面前的,就有两个问题: 我们一个问题一个问题的来解决。 首先,第一个问题,我们可以发现,虽然两张表各自的层级标识字段无法进行关联,但是我们完全可以通过省份的名称进行关联,当省份可以关联上的时候,省份下面的区县层级数据也可以做出来,这样自然而然的第二个问题也可以有办法解决了。 于是,我就写了一点儿代码,先做了一个省份关联的结构: 为了方便我检查数据的准确性,我将省份关联的数据以 json 的形式做了一下保存 当省份关联的结构有了之后,我们就可以根据每一个省的层级标识来获取省份下面的城市数据,得到城市数据之后,我们进行数据对比,就会有 3 种可能性: 对数据的操作,无非也就是 增删改查(CRUD),在当前这个应用场景下,我们只需要对数据进行“增删”即可。 由于我们的数据是有层级关系的,因此,当我们找到了需要新增的数据时,我们插入到数据库中的新数据也是需要保证层级关系是存在的,所以,当我们将需要新增的城市数据插入 area 表时,我们还需要同时将“省份层级标识”做好拼接一并写入进去,否则,新增的这部分城市数据就毫无意义。 那么,我们又该如何更加直观的知道哪些数据是这一次增加的呢? 我的做法是:新增了一个 经过上面的数据对比之后,或多或少总会有部分城市数据是已经废弃的,那么,我们对比结束之后,可以直接删除掉这部分废弃的城市数据吗? 如果在我们的系统中并没有对 area_id 做很强的关联,那么,直接删除掉也无伤大雅。但是,如果在我们的系统中,有任何地方直接用到了 area_id 并需要关联数据,那么,就不能直接将数据删除掉! 比如,在我们的 APP 中,我发现之前的同事存用户地址时,是直接存入的 area_id。 比如有一个用户设定的地址是闵行区,之前的同事在数据库中直接存入的是 1004 (通过 area_id 做了一层数据关联)。如果此时我们冒然删除掉了 area_id 为 1004 的这一条数据,那么就会导致之前所有和“闵行区”相关的历史地址数据都出现错误,这是不能忍受的。 于是,在处理“删数据”时,我直接新增一个 理论说的再多,都没有直接看代码进行实践操作来的强,下面我将代码贴出来,以供各位参考: 看着代码量貌似还挺多的,其实很多都是一些常规操作,并没有什么很特别的技巧。 在做这个需求时,我并没有一上来就考虑什么高并发啊、代码优雅啊之类的,毕竟这个需求只是一个临时操作,地区表也不会变动得特别频繁,具体情况具体分析就好。 有一点需要提一下的是,虽然我们只处理了省市区这 3 个层级的数据,省市区所有的数据总条数也不会超过 5000 条,但是两组数据要进行对比的话,就会产生笛卡尔乘积,这么一算下来,其实还是挺占内存的,并且要是脚本出现任何一个错误,所有的操作都又要重新开始,这未免也有点儿傻了。 我的处理方式就是将 31 个省级的数据对比好了之后,分别用文件将结果做一个保存,然后再一个一个的读取文件内容,再去操作数据库更新数据。 这样处理,有以下几点好处: 以上就是我做这个需求的思路和解决方案了,如果你刚好也有类似的需求,希望可以对你有所帮助。 如果你觉得我写的有纰漏,欢迎进行指正。如果你有更好的方案,欢迎评论区一起来聊聊,共同进步!最新的省市区行政规划数据


area_code_2024.sql.gz 文件,然后在我本地数据库中导入了这份 SQL 。更新思路
CREATE TABLE `area_latest_2024` (
`code` bigint(20) unsigned NOT NULL COMMENT '区划代码',
`name` varchar(128) NOT NULL DEFAULT '' COMMENT '名称',
`level` tinyint(1) NOT NULL COMMENT '级别1-5,省市县镇村',
`pcode` bigint(20) DEFAULT NULL COMMENT '父级区划代码',
`category` int(11) DEFAULT NULL COMMENT '城乡分类',
PRIMARY KEY (`code`),
KEY `name` (`name`),
KEY `level` (`level`),
KEY `pcode` (`pcode`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;CREATE TABLE `area` (
`area_id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '地区ID',
`name` varchar(32) NOT NULL DEFAULT '' COMMENT '地区名',
`group` varchar(10) NOT NULL DEFAULT '' COMMENT '分组&排序',
`parent_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '父地区ID',
`type` tinyint(1) unsigned NOT NULL DEFAULT '1' COMMENT '国内/国外',
`invalid` tinyint(1) unsigned NOT NULL COMMENT '是否无效',
`zipcode` varchar(20) NOT NULL DEFAULT '' COMMENT '邮编',
`area_code` varchar(20) NOT NULL DEFAULT '' COMMENT '区号',
PRIMARY KEY (`area_id`) USING BTREE,
KEY `parent_id` (`parent_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5593 DEFAULT CHARSET=utf8 COMMENT='地区列表';area 只需要省市区 3 级,area_latest_2024 表提供了省市区镇村 5 个级别,我不需要用到这么细的数据,好在,area_latest_2024 表提供了 level 字段,因此,我直接通过 level 字段删除了大于 3 级别的数据,这样两张表都只有省市区的数据了,在数据对比上大大减少了无效的对比体量。新旧数据对比
area_latest_2024 表中:
area 表中的数据:
area_latest_2024 表中是通过 code 和 pcode 这两个字段进行层级关联的, area 表中是通过 area_id 和 parent_id 这两个字段进行层级关联的。但是,area_latest_2024 表中的 code 字段和 area 表中的 area_id 字段是没有任何关系的。当然,这也很正常,毕竟 area 表是我们系统自身的地区表,可能我们还有其他特定的需求改造,而 area_latest_2024 可以理解成一个通用表。

对比后的数据如何处理
增
version 字段,用于标记当前数据是第几次新增的。之所以加 version 字段,也是为了方便后期检查数据。删

deleted_at 字段,用于软删除。在选择省市区的层级接口中做了一个过滤,让用户填写新地址时,不能再选择已经被软删除的地址就好了,但是用户的地址列表就不做调整,这样就可以将影响降到最小。源码
from sqlalchemy import create_engine,Column, Integer, String, BigInteger
from sqlalchemy.orm import declarative_base,sessionmaker
import json
from icecream import ic
import os
import time
from datetime import datetime
# 创建基类
Base = declarative_base()
# 最新的省市区县数据(这里用的是 2024 年最新的省市区数据)
# 数据来源于 https://github.com/adyliu/china_area
class AreaCode(Base):
__tablename__ = "area_latest_2024"
code = Column(String(128), nullable=False, primary_key=True)
name = Column(String(128), nullable=False)
level = Column(Integer, nullable=False)
pcode = Column(BigInteger, nullable=False)
category = Column(Integer, nullable=False)
# 旧的省市区县数据
class Area(Base):
__tablename__ = "area"
area_id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
group = Column(String(10), nullable=False)
parent_id = Column(Integer, nullable=False)
type = Column(Integer, nullable=False)
invalid = Column(Integer, nullable=False)
zipcode = Column(String(20), nullable=False)
area_code = Column(String(20), nullable=False)
deleted_at = Column(Integer, nullable=False)
version = Column(Integer, nullable=False)
class Helper(object):
def json_echo(data):
print(json.dumps(data, ensure_ascii=False))
# 将数据转成 json 写入文件中
def write_to_file(self, data, file_name):
with open(file_name, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False)
def get_file_cache(self, file_name):
if os.path.exists(file_name):
with open(file_name, 'r', encoding='utf-8') as f:
return json.load(f)
return None
# 列出指定目录下,指定后缀的文件
def list_files_by_suffix(self, dir_name, suffix):
file_names = []
files = os.listdir(dir_name)
for file in files:
if file.endswith(suffix):
file_names.append(file)
return file_names
class AreaCodeHandler(object):
def get_mysql_session(self):
# 创建引擎
engine = create_engine('mysql+pymysql://root:123456@10.87.1.241:3306/local_test?charset=utf8mb4')
# 创建会话
ssm = sessionmaker(bind=engine)
# 创建会话
session = ssm()
return session
# 得到原始表的 31 个省份数据
def get_ori_31_provinces(self):
session = self.get_mysql_session()
# 查询原始表的 31 个省份数据
provinces = session.query(Area).filter(Area.parent_id == 5000).all()
# 将查询结果转换为字典列表
provinces_dict = [
{
"area_id": province.area_id,
"name": province.name,
"group": province.group,
"parent_id": province.parent_id,
"type": province.type,
"invalid": province.invalid,
"zipcode": province.zipcode,
"area_code": province.area_code
} for province in provinces]
return provinces_dict
def get_target_31_provinces(self):
session = self.get_mysql_session()
# 查询目标表的 31 个省份数据
provinces = session.query(AreaCode).filter(AreaCode.level == 1).all()
provinces_dict = [
{
"code": province.code,
"name": province.name,
"level": province.level,
"pcode": province.pcode,
"category": province.category
} for province in provinces]
return provinces_dict
def provinces_mixed(self, cache_file='provinces.json'):
# 先检查缓存文件中是否存在数据
provinces = Helper().get_file_cache(cache_file)
if provinces:
return provinces
print('缓存文件不存在,开始混合数据')
# 原始表的 31 个省份数据
ori_provinces = self.get_ori_31_provinces()
# 目标表的 31 个省份数据
target_provinces = self.get_target_31_provinces()
# 将原始表的 31 个省份数据与目标表的 31 个省份数据进行混合
for ori_province in ori_provinces:
for target_province in target_provinces:
# 取出原始表中省份的前两个汉字
ori_province_name = ori_province['name'][:2]
# 取出目标表中省份的前两个汉字
target_province_name = target_province['name'][:2]
# 如果每个省份的前 2 个汉字相同,则可以将数据进行匹配上
if ori_province_name == target_province_name:
# 将目标表中的数据赋值给原始表中的数据
ori_province['target_province'] = target_province
# 将混合后的数据写入文件中
Helper().write_to_file(ori_provinces, cache_file)
return ori_provinces
def get_city_by_province(self, province):
"""通过省份获取城市数据"""
session = self.get_mysql_session()
# 获取原始表中的城市数据
ori_cities = session.query(Area).filter(Area.parent_id == province['area_id']).all()
# 将查询结果转换为字典列表
ori_cities_dict = [
{
"area_id": city.area_id,
"name": city.name,
"group": city.group,
"parent_id": city.parent_id,
"type": city.type,
"invalid": city.invalid,
"zipcode": city.zipcode,
"area_code": city.area_code
} for city in ori_cities]
target_cities = session.query(AreaCode).filter(AreaCode.pcode == province['target_province']['code']).all()
target_cities_dict = [
{
"code": city.code,
"name": city.name,
"level": city.level,
"pcode": city.pcode,
"category": city.category
} for city in target_cities]
# 取出原始表中所有的城市名称组成 set
ori_cities_name_set = set([city['name'][:2] for city in ori_cities_dict])
# 取出目标表中所有的城市名称组成 set
target_cities_name_set = set([city['name'][:2] for city in target_cities_dict])
# 1. 二者共同存在的城市数据(则表示城市数据没有变,但是可能城市下面的区县数据有变化,因此还是需要进一步对比)
both_cities = []
for ori_city in ori_cities_dict:
for target_city in target_cities_dict:
# 如果每个城市的前 2 个汉字相同,则认为是同一个城市
if ori_city['name'][:2] == target_city['name'][:2]:
ori_city['target_city'] = target_city # 将目标城市数据赋值给原始城市数据,方便后续查找城市下面的区县数据
both_cities.append(ori_city) # 记录没有变化的城市数据
# 2. 原始表中不存在,但是目标表中却存在的城市数据
# (则表示这些城市数据是需要新增的。城市数据需要新增,那么城市下面的区县数据则可以直接新增)
new_cities = []
# 取原始表中不存在,但是目标表中却存在的城市数据
only_target_cities = target_cities_name_set - ori_cities_name_set
for target_city in target_cities_dict:
if target_city['name'][:2] in only_target_cities:
new_cities.append(target_city)
# 3. 原始表中存在,但是目标表中不存在的城市数据
# (则表示这些城市数据是需要删除的。城市数据需要删除,那么城市下面的区县数据则可以直接删除)
deleted_cities = []
only_ori_cities = ori_cities_name_set - target_cities_name_set
for ori_city in ori_cities_dict:
if ori_city['name'][:2] in only_ori_cities:
deleted_cities.append(ori_city)
return both_cities, new_cities, deleted_cities
# 通过城市获取区县数据
def get_county_by_city(self, city):
session = self.get_mysql_session()
# 获取原始表中的区县数据
ori_counties = session.query(Area).filter(Area.parent_id == city['area_id']).all()
# 将查询结果转换为字典列表
ori_counties_dict = [
{
"area_id": county.area_id,
"name": county.name,
"group": county.group,
"parent_id": county.parent_id,
"type": county.type,
"invalid": county.invalid,
"zipcode": county.zipcode,
"area_code": county.area_code
} for county in ori_counties]
target_counties = session.query(AreaCode).filter(AreaCode.pcode == city['target_city']['code']).all()
target_counties_dict = [
{
"code": county.code,
"name": county.name,
"level": county.level,
"pcode": county.pcode,
"category": county.category
} for county in target_counties]
# 取出原始表中所有的区县名称组成 set
ori_counties_name_set = set([county['name'][:2] for county in ori_counties_dict])
# 取出目标表中所有的区县名称组成 set
target_counties_name_set = set([county['name'][:2] for county in target_counties_dict])
# 1. 二者共同存在的区县数据
both_counties = []
for ori_county in ori_counties_dict:
for target_county in target_counties_dict:
if ori_county['name'][:2] == target_county['name'][:2]:
ori_county['target_county'] = target_county
both_counties.append(ori_county)
# 2. 原始表中不存在,但是目标表中却存在的区县数据
# 则表示这些区县数据是需要新增的
only_target_counties = target_counties_name_set - ori_counties_name_set
new_counties = []
for target_county in target_counties_dict:
target_county_name = target_county['name'][:2]
if target_county_name in only_target_counties and target_county_name != '市辖':
new_counties.append(target_county)
# 3. 原始表中存在,但是目标表中不存在的区县数据
# 则表示这些区县数据是需要删除的
only_ori_counties = ori_counties_name_set - target_counties_name_set
deleted_counties_area_ids = []
for ori_county in ori_counties_dict:
ori_county_name = ori_county['name'][:2]
if ori_county_name in only_ori_counties and ori_county_name != '市辖':
deleted_counties_area_ids.append(ori_county['area_id']) # 这里可以直接记录被删除的区县的 area_id
return both_counties, new_counties, deleted_counties_area_ids
def get_new_country_by_new_city(self, new_city):
session = self.get_mysql_session()
# 获取目标表中的区县数据
target_counties = session.query(AreaCode).filter(AreaCode.pcode == new_city['code']).filter(
AreaCode.name != '市辖区').all()
target_counties_dict = [
{
"code": county.code,
"name": county.name,
"level": county.level,
"pcode": county.pcode,
"category": county.category
} for county in target_counties]
return target_counties_dict
def has_deleted_area_data(self, deleted_cities):
"""得到被删除的城市及区县的 area_id 列表"""
deleted_area_ids = []
if not deleted_cities:
return deleted_area_ids
# 城市的 area_id
for deleted_city in deleted_cities:
deleted_area_ids.append(deleted_city['area_id'])
# 通过城市的 area_id 获取区县的 area_id
session = self.get_mysql_session()
counties = session.query(Area).filter(Area.parent_id.in_(deleted_area_ids)).all()
for county in counties:
deleted_area_ids.append(county.area_id)
return deleted_area_ids
def city_with_county_by_province(self, province):
# 1. 通过省份数据获取该省下的所有城市数据
cities_always, cities_new, deleted_cities = self.get_city_by_province(province)
# 此时,就有三种情况
# 第一种是两张表中都存在的城市,这一部分需要进一步对比区县数据
for city_always in cities_always:
counties_always, counties_new, deleted_counties_area_ids = self.get_county_by_city(city_always)
city_always['about_counties'] = {
'counties_always': counties_always,
'counties_new': counties_new,
'deleted_counties_area_ids': deleted_counties_area_ids, # 需要直接删除的区县 area_id
}
# 第二种就是对于原始表来说是新的城市数据
for city_new in cities_new:
new_counties = self.get_new_country_by_new_city(city_new)
city_new['target_children'] = new_counties
# 第三种就是对于原始表来说是需要被删除的城市数据
# 此时重新整理一下省份数据
province['cities_always'] = cities_always
province['cities_new'] = cities_new
province['deleted_cities'] = deleted_cities
return province
def generate_all_provinces_data(self):
provinces = self.provinces_mixed()
# 将 31 个省份数据写入文件中
for province in provinces:
data = handler.city_with_county_by_province(province)
Helper().write_to_file(data, province['name'] + '_city.json')
time.sleep(3)
ic(province['name'])
def parse_province_data(self, province):
# 这里的数据就需要分为 3 部分
session = self.get_mysql_session()
current_timestamp = int(datetime.now().timestamp()) # 获取当前时间戳
try:
# 开始事务
session.begin()
# 第一部分,添加了新的城市数据
if 'cities_new' in province:
for city_new in province['cities_new']:
city_new_insert = Area(
name=city_new['name'],
group='',
parent_id=province['area_id'],
type=1,
invalid=0,
zipcode='',
area_code='',
deleted_at=0,
version=1,
)
session.add(city_new_insert)
session.flush()
if 'target_children' in city_new:
for target_child in city_new['target_children']:
target_child_insert = Area(
name=target_child['name'],
group='',
parent_id=city_new_insert.area_id, # 这里是自增主键 ID
type=1,
invalid=0,
zipcode='',
area_code='',
deleted_at=0,
version=1,
)
session.add(target_child_insert)
session.flush()
# 第二部分,没有添加新的城市,但是添加了新的区县
if 'cities_always' in province:
for city_always_item in province['cities_always']:
if 'about_counties' in city_always_item:
# 新增了区县
if 'counties_new' in city_always_item['about_counties']:
for county_new in city_always_item['about_counties']['counties_new']:
county_new_insert = Area(
name=county_new['name'],
group='',
parent_id=city_always_item['area_id'],
type=1,
invalid=0,
zipcode='',
area_code='',
deleted_at=0,
version=1,
)
session.add(county_new_insert)
# 删除了区县
if 'deleted_counties_area_ids' in city_always_item['about_counties']:
deleted_counties_area_ids = city_always_item['about_counties']['deleted_counties_area_ids']
if deleted_counties_area_ids:
ic(f'删除区县数据时 需要删除的 area_id 列表:{deleted_counties_area_ids}')
session.query(Area).filter(Area.area_id.in_(deleted_counties_area_ids)).update(
{Area.deleted_at: current_timestamp}, synchronize_session=False
)
# 第三部分,删除了城市
if 'deleted_cities' in province:
deleted_area_ids = self.has_deleted_area_data(province['deleted_cities'])
if deleted_area_ids:
ic(f'{province["name"]} 需要删除的 area_id 列表:{deleted_area_ids}')
# 这里是软删除,将 deleted_at 字段更新为当前时间戳
session.query(Area).filter(Area.area_id.in_(deleted_area_ids)).update(
{Area.deleted_at: current_timestamp}, synchronize_session=False
)
# 提交事务
session.commit()
ic(f'{province["name"]} 数据插入成功')
except Exception as e:
# 回滚事务
session.rollback()
ic(f'{province["name"]} 数据插入失败,错误信息:{e}')
finally:
# 关闭会话
session.close()
if __name__ == "__main__":
handler = AreaCodeHandler()
# 第一步:生成所有省份的城市及区县数据
handler.generate_all_provinces_data()
# 第二步:将生成的省份数据更新到原始表中
# 调试插入某个省份的数据
# data = Helper().get_file_cache('上海_city.json')
# handler.parse_province_data(data)
# 调试了没有什么问题后,就可以批量处理所有省份的数据了
# 列出指定目录下,指定后缀的文件
# file_names = Helper().list_files_by_suffix('./', '_city.json')
#
# for file_name in file_names:
# province = Helper().get_file_cache(file_name)
# if province is not None:
# handler.parse_province_data(province)
# time.sleep(3)
# ic(f'正在处理 {file_name}')



完结