文章目录
- 前言
- 数据从Hive中导出到MySQL
- 调用百度地图的第三方API对经纬度信息进行逆编码解析
前言
应用场景:
Hive数仓中有一张近三千万条数据的地理位置参考表A,但是表A中 原来的数据字段中仅仅包含了国家信息,地理位置信息没有细化。本篇博客主要就如何快速获取这些数据的具体地理位置信息展开。
解决方案:
首先,考虑到表A的数据不算特别大,MySQL数据库可以承载这么大的数据。而且MySQL的实时数据查询性能优于Hive,南国考虑首先将数据打出到MySQL,后续在通过读取数据 并调用第三方API解析经纬度信息。
数据从Hive中导出到MySQL
这里 采取的方案是 使用Spark读取数据并进行MySQL写入:
@Slf4j
public class Hive2MySQL {
public static void main(String[] args) throws Exception {// mysql db
String dbUser = "*****";
String dbPW = "******";
String dbUrl = "jdbc:mysql://********";
String dbTable = "*******";
String sql = "SELECT * FROM ********";
SparkSession spark = SparkSession.builder().appName("hive2MySQL ").enableHiveSupport().getOrCreate();
spark.sql(sql).write().mode(SaveMode.Append)
.format("jdbc")
.option("url", dbUrl)
.option("user", dbUser)
.option("password", dbPW)
.option("dbtable", dbTable)
.save();
spark.stop();
}
调用百度地图的第三方API对经纬度信息进行逆编码解析
南国采用市场主流的地理位置API服务,在验证调用第三方API解析结果后,选择使用的是百度地图的API。
这里指的说明的是:
1.百度地图使用的坐标系为BD09,而在全球范围使用的最广的是WGS84。根据自身情况,选择合适的坐标系。
2.这里我们使用个人开发者的账户,它是每天30W数据的限额。这里 我们使用多个开发者key并行处理数据。
百度地图api的使用官方介绍:百度地图开放平台
JDBC操作MySQL数据库 过滤条件利用主键id提高查询速度, limit 实现分批分页,解决内存溢出问题
public class MainRun {
// MySQL 8.0 以上版本 - JDBC 驱动名及数据库 URL
static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
static final String DB_URL = "jdbc:mysql://********";
static final String USER = "******";
static final String PASS = "******";
public static void main(String[] args) {
Connection conn = null;
Statement stmt = null;
String key = args[0];
// 1.连接数据库 拿到经纬度
long lastUpdateId = Integer.valueOf(args[1]);
boolean flag=false;
while (true) {
try {
// 注册 JDBC 驱动
Class.forName(JDBC_DRIVER);
conn = DriverManager.getConnection(DB_URL, USER, PASS);
// 执行查询
stmt = conn.createStatement();
String sql = String.format("SELECT id,longitude,latitude FROM table where id >= %s and country is null LIMIT 0,3000", lastUpdateId);
ResultSet rs = stmt.executeQuery(sql);
ArrayList geoCityInfoList=new ArrayList<>();
// 展开结果集数据库
while (rs.next()) {
GeoCityInfo geoCityInfo =
new GeoCityInfo(rs.getLong("id"),rs.getDouble("longitude") / 1000000, rs.getDouble("latitude") / 1000000);
geoCityInfoList.add(geoCityInfo);
}//2.百度地图api 得到三级城市信息放到res中
Map id2BaiduGeo = new HashMap<>();
for (GeoCityInfo t : geoCityInfoList) {
Double p1 = t.getLongitude();
Double p2 = t.getLatitude();
String[] baiduGeo = HttpClientToInterface.doGet("http://api.map.baidu.com/reverse_geocoding/v3/?ak=" +
key + "&output=xml&coordtype=wgs84ll&location=" + String.valueOf(p1) + "," + String.valueOf(p2), "UTF-8");
//判断是否超额
if (baiduGeo.equals(null)){
flag=true;
break;
}
id2BaiduGeo.put(t.getId(), baiduGeo);
}if (flag==true) {
System.out.println("当前key: "+key+" 天配额超限,限制访问");
rs.close();
stmt.close();
conn.close();
break;
//程序终止
}
System.out.println("Prepare update data....");
//3.获取的三级城市信息和原表中的有效信息 合并到同一个新表中
// 更新数据
for (Map.Entry entry : id2BaiduGeo.entrySet()) {
String updateSql = "UPDATE table SET country= ? , province=? , city=?WHERE id= ? ";
PreparedStatement preparedStatement = conn.prepareStatement(updateSql);
preparedStatement.setString(1, entry.getValue()[0]);
//country
preparedStatement.setString(2, entry.getValue()[1]);
//province
preparedStatement.setString(3, entry.getValue()[2]);
//city
preparedStatement.setLong(4, entry.getKey());
preparedStatement.executeUpdate();
//执行更新操作
}
System.out.println("更新完成");
// 完成后关闭
rs.close();
stmt.close();
conn.close();
if(null == geoCityInfoList || geoCityInfoList.size() == 0){
break;
}lastUpdateId = geoCityInfoList.get(geoCityInfoList.size() - 1).getId();
System.out.println("last update id : " + lastUpdateId);
if(geoCityInfoList.size() < 3000){
break;
}
} catch (SQLException se) {
// 处理 JDBC 错误
se.printStackTrace();
} catch (Exception e) {
// 处理 Class.forName 错误
e.printStackTrace();
}
}
}@Data
@AllArgsConstructor
public static class GeoCityInfo{
private long id ;
private double latitude;
private double longitude;
}
}
【数据库|地理位置数据逆编码解析】HTTP 调用第三方API服务的具体实现:
public class HttpClientToInterface {
public static String[] doGet(String url, String charset){
//1.生成HttpClient对象并设置参数
HttpClient httpClient = new HttpClient();
//设置Http连接超时为5秒
httpClient.getHttpConnectionManager().getParams().setConnectionTimeout(5000);
//2.生成GetMethod对象并设置参数
GetMethod getMethod = new GetMethod(url);
//设置get请求超时为5秒
getMethod.getParams().setParameter(HttpMethodParams.SO_TIMEOUT, 5000);
//设置请求重试处理,用的是默认的重试处理:请求三次
getMethod.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler());
String response = "";
String country="";
String province="";
String city="";
//3.执行HTTP GET 请求
try {
int statusCode = httpClient.executeMethod(getMethod);
/**
* 4.判断访问的状态码
*/
if (statusCode != HttpStatus.SC_OK){
System.err.println("请求出错:" + getMethod.getStatusLine());
}/**
* 5.处理HTTP响应内容
*/
//读取HTTP响应内容,这里简单打印网页内容
//读取字节数组
byte[] responseBody = getMethod.getResponseBody();
response = new String(responseBody, charset);
//解析xml字符串
Document dom=DocumentHelper.parseText(response);
//超额判断
int status=Integer.valueOf(dom.getRootElement().element("status").getText());
if (status==302){
//先释放连接 后返回null
getMethod.releaseConnection();
return null;
}Element root=dom.getRootElement().element("result").element("addressComponent");
country=root.element("country").getText();
province=root.element("province").getText();
city=root.element("city").getText();
System.out.println("country:"+country+";
province:"+province+";
city:"+city);
//读取为InputStream,在网页内容数据量大时候推荐使用
//InputStream response = getMethod.getResponseBodyAsStream();
} catch (HttpException e) {
//发生致命的异常,可能是协议不对或者返回的内容有问题
System.out.println("请检查输入的URL!");
e.printStackTrace();
} catch (IOException e){
//发生网络异常
System.out.println("发生网络异常!");
} catch (DocumentException e) {
e.printStackTrace();
} finally {
/**
* 6.释放连接
*/
getMethod.releaseConnection();
}
returnnew String[]{country,province,city};
}public static void main(String[] args) {
doGet("http://api.map.baidu.com/reverse_geocoding/v3/?ak=6n1mLr9AjjEpGNXA1s8zwy9yRFr3GIQy&output=xml&coordtype=wgs84ll&location=40.543687,-73.939521","UTF-8");
System.out.println("-----------分割线------------");
}
}
推荐阅读
- 数据库|SQL行转列方式优化查询性能实践
- mysql|一文深入理解mysql
- 达梦数据库|DM8表空间备份恢复
- 数据技术|一文了解Gauss数据库(开发历程、OLTP&OLAP特点、行式&列式存储,及与Oracle和AWS对比)
- SqlServer|sql server的UPDLOCK、HOLDLOCK试验
- 谈灾难恢复指标(RTO与RPO是什么鬼())
- RPO与RTO
- 数据库|效率最高的Excel数据导入---(c#调用SSIS Package将数据库数据导入到Excel文件中【附源代码下载】)...