Flink Table & SQL AppendStreamTableSink、RetractStreamTableSink、UpsertStreamTableSink
Flink Table & SQL StreamTableSink
有三类接口: AppendStreamTableSink
、UpsertStreamTableSink
、RetractStreamTableSink
。
- AppendStreamTableSink: 可将动态表转换为Append流。适用于动态表只有Insert的场景。
- RetractStreamTableSink: 可将动态表转换为Retract流。适用于动态表有Insert、Delete、Update的场景。
- UpsertStreamTableSink: 可将动态表转换为Upsert流。适用于动态表有Insert、Delete、Update的场景。
- RetractStreamTableSink中: Insert被编码成一条Add消息;
Delete被编码成一条Retract消息;
Update被编码成两条消息(先是一条Retract消息,再是一条Add消息),即先删除再增加。
- UpsertStreamTableSink: Insert和Update均被编码成一条消息(Upsert消息);
Delete被编码成一条Delete消息。
- UpsertStreamTableSink和RetractStreamTableSink最大的不同在于Update编码成一条消息,效率上比RetractStreamTableSink高。
- 上述说的编码指的是动态表转换为DataStream时,表的增删改如何体现到DataStream上。
测试数据
// 自己造的测试数据
// 某个用户在某个时刻浏览了某个商品,以及商品的价值
// eventTime: 北京时间,方便测试。如下,乱序数据:
{"userID": "user_1", "eventTime": "2016-01-01 10:02:00", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:02", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 10:02:10", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_3", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_3", "eventTime": "2016-01-01 10:02:12", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_4", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_5", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:15", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:16", "eventType": "browse", "productID": "product_5", "productPrice": 20}
AppendStreamTableSink 示例
package com.bigdata.flink.tableSqlAppendUpsertRetract.append;
import com.alibaba.fastjson.JSON;
import com.bigdata.flink.beans.table.UserBrowseLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
/**
* Summary:
*自定义 AppendStreamTableSink 查看从只有Insert的Table到DataStream的消息编码
*
*/
@Slf4j
public class Test {
public static void main(String[] args) throws Exception{args=new String[]{"--application","flink/src/main/java/com/bigdata/flink/tableSqlAppendUpsertRetract/application.properties"};
//1、解析命令行参数
ParameterTool fromArgs = ParameterTool.fromArgs(args);
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application"));
String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers");
String browseTopic = parameterTool.getRequired("browseTopic");
String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID");
//2、设置运行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
streamEnv.setParallelism(1);
//3、注册Kafka数据源
Properties browseProperties = new Properties();
browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
browseProperties.put("group.id",browseTopicGroupID);
DataStream browseStream=streamEnv
.addSource(new FlinkKafkaConsumer010<>(browseTopic, new SimpleStringSchema(), browseProperties))
.process(new BrowseKafkaProcessFunction());
tableEnv.registerDataStream("source_kafka_browse_log",browseStream,"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");
//4、注册AppendStreamTableSink
String[] sinkFieldNames={"userID","eventTime","eventType","productID","productPrice","eventTimeTimestamp"};
DataType[] sinkFieldTypes={DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.INT(),DataTypes.BIGINT()};
TableSink myAppendStreamTableSink = new MyAppendStreamTableSink(sinkFieldNames,sinkFieldTypes);
tableEnv.registerTableSink("sink_stdout",myAppendStreamTableSink);
//5、连续查询
//将userID为user_1的记录输出到外部存储
String sql="insert into sink_stdout select userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp from source_kafka_browse_log where userID='user_1'";
tableEnv.sqlUpdate(sql);
//6、开始执行
tableEnv.execute(Test.class.getSimpleName());
}/**
* 解析Kafka数据
* 将Kafka JSON String 解析成JavaBean: UserBrowseLog
* UserBrowseLog(String userID, String eventTime, String eventType, String productID, int productPrice, long eventTimeTimestamp)
*/
private static class BrowseKafkaProcessFunction extends ProcessFunction, UserBrowseLog> {
@Override
public void processElement(String value, Context ctx, Collector out) throws Exception {
try {UserBrowseLog log = JSON.parseObject(value, UserBrowseLog.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
log.setEventTimeTimestamp(eventTimeTimestamp);
out.collect(log);
}catch (Exception ex){
log.error("解析Kafka数据异常...",ex);
}
}
}/**
* 自定义 AppendStreamTableSink
* AppendStreamTableSink 适用于表只有Insert的场景
*/
private static class MyAppendStreamTableSink implements AppendStreamTableSink {private TableSchema tableSchema;
public MyAppendStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames,fieldTypes).build();
}@Override
public TableSchema getTableSchema() {
return tableSchema;
}@Override
public DataType getConsumedDataType() {
return tableSchema.toRowDataType();
}// 已过时
@Override
public TableSink configure(String[] fieldNames, TypeInformation>[] fieldTypes) {
return null;
}// 已过时
@Override
public void emitDataStream(DataStream dataStream) {}@Override
public DataStreamSink consumeDataStream(DataStream dataStream) {
return dataStream.addSink(new SinkFunction());
}private static class SinkFunction extends RichSinkFunction {
public SinkFunction() {
}@Override
public void invoke(Row value, Context context) throws Exception {
System.out.println(value);
}
}
}}
结果
// Table只有Insert操作,不需要特殊编码
user_1,2016-01-01 10:02:06,browse,product_5,20,1451613726000
user_1,2016-01-01 10:02:00,browse,product_5,20,1451613720000
user_1,2016-01-01 10:02:15,browse,product_5,20,1451613735000
user_1,2016-01-01 10:02:02,browse,product_5,20,1451613722000
user_1,2016-01-01 10:02:16,browse,product_5,20,1451613736000
RetractStreamTableSink 示例
package com.bigdata.flink.tableSqlAppendUpsertRetract.retract;
import com.alibaba.fastjson.JSON;
import com.bigdata.flink.beans.table.UserBrowseLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
/**
* Summary:
*自定义RetractStreamTableSink 查看从有更新的Table到DataStream的消息编码
*/
@Slf4j
public class Test {
public static void main(String[] args) throws Exception{args=new String[]{"--application","flink/src/main/java/com/bigdata/flink/tableSqlAppendUpsertRetract/application.properties"};
//1、解析命令行参数
ParameterTool fromArgs = ParameterTool.fromArgs(args);
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application"));
String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers");
String browseTopic = parameterTool.getRequired("browseTopic");
String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID");
//2、设置运行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
streamEnv.setParallelism(1);
//3、注册Kafka数据源
Properties browseProperties = new Properties();
browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
browseProperties.put("group.id",browseTopicGroupID);
DataStream browseStream=streamEnv
.addSource(new FlinkKafkaConsumer010<>(browseTopic, new SimpleStringSchema(), browseProperties))
.process(new BrowseKafkaProcessFunction());
tableEnv.registerDataStream("source_kafka_browse_log",browseStream,"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");
//4、注册RetractStreamTableSink
String[] sinkFieldNames={"userID","browseNumber"};
DataType[] sinkFieldTypes={DataTypes.STRING(),DataTypes.BIGINT()};
RetractStreamTableSink myRetractStreamTableSink = new MyRetractStreamTableSink(sinkFieldNames,sinkFieldTypes);
tableEnv.registerTableSink("sink_stdout",myRetractStreamTableSink);
//5、连续查询
//统计每个Uid的浏览次数
String sql="insert into sink_stdout select userID,count(1) as browseNumber from source_kafka_browse_log where userID in ('user_1','user_2') group by userID ";
tableEnv.sqlUpdate(sql);
//6、开始执行
tableEnv.execute(Test.class.getSimpleName());
}/**
* 解析Kafka数据
* 将Kafka JSON String 解析成JavaBean: UserBrowseLog
* UserBrowseLog(String userID, String eventTime, String eventType, String productID, int productPrice, long eventTimeTimestamp)
*/
private static class BrowseKafkaProcessFunction extends ProcessFunction, UserBrowseLog> {
@Override
public void processElement(String value, Context ctx, Collector out) throws Exception {
try {UserBrowseLog log = JSON.parseObject(value, UserBrowseLog.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
log.setEventTimeTimestamp(eventTimeTimestamp);
out.collect(log);
}catch (Exception ex){
log.error("解析Kafka数据异常...",ex);
}
}
}/**
* 自定义RetractStreamTableSink
*
* Table在内部被转换成具有Add(增加)和Retract(撤消/删除)的消息流,最终交由DataStream的SinkFunction处理。
* DataStream里的数据格式是Tuple2类型,如Tuple2。
* Boolean是Add(增加)或Retract(删除)的flag(标识)。Row是真正的数据类型。
* Table中的Insert被编码成一条Add消息。如Tuple2。
* Table中的Update被编码成两条消息。一条删除消息Tuple2,一条增加消息Tuple2。
*/
private static class MyRetractStreamTableSink implements RetractStreamTableSink {private TableSchema tableSchema;
public MyRetractStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames,fieldTypes).build();
}@Override
public TableSchema getTableSchema() {
return tableSchema;
}// 已过时
@Override
public TableSink> configure(String[] fieldNames, TypeInformation>[] fieldTypes) {
return null;
}// 已过时
@Override
public void emitDataStream(DataStream> dataStream) {}// 最终会转换成DataStream处理
@Override
public DataStreamSink> consumeDataStream(DataStream> dataStream) {
return dataStream.addSink(new SinkFunction());
}@Override
public TypeInformation getRecordType() {
return new RowTypeInfo(tableSchema.getFieldTypes(),tableSchema.getFieldNames());
}private static class SinkFunction extends RichSinkFunction> {
public SinkFunction() {
}@Override
public void invoke(Tuple2 value, Context context) throws Exception {
Boolean flag = value.f0;
if(flag){
System.out.println("增加... "+value);
}else {
System.out.println("删除... "+value);
}
}
}
}
}
结果
增加... (true,user_1,1)
//user_1更新时被编译成两条消息
//先是一条删除的消息
删除... (false,user_1,1)
//再是一条增加的消息
增加... (true,user_1,2)
//同理user_2
增加... (true,user_2,1)
删除... (false,user_1,2)
增加... (true,user_1,3)
删除... (false,user_1,3)
增加... (true,user_1,4)
删除... (false,user_2,1)
增加... (true,user_2,2)
删除... (false,user_1,4)
增加... (true,user_1,5)
UpsertStreamTableSink 示例
package com.bigdata.flink.tableSqlAppendUpsertRetract.upsert;
import com.alibaba.fastjson.JSON;
import com.bigdata.flink.beans.table.UserBrowseLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sinks.UpsertStreamTableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
/**
* Summary:
*自定义UpsertStreamTableSink 查看从有Insert/Update的Table到DataStream的消息编码
*/
@Slf4j
public class Test {
public static void main(String[] args) throws Exception{args=new String[]{"--application","flink/src/main/java/com/bigdata/flink/tableSqlAppendUpsertRetract/application.properties"};
//1、解析命令行参数
ParameterTool fromArgs = ParameterTool.fromArgs(args);
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application"));
String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers");
String browseTopic = parameterTool.getRequired("browseTopic");
String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID");
//2、设置运行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
streamEnv.setParallelism(1);
//3、注册Kafka数据源
Properties browseProperties = new Properties();
browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
browseProperties.put("group.id",browseTopicGroupID);
DataStream browseStream=streamEnv
.addSource(new FlinkKafkaConsumer010<>(browseTopic, new SimpleStringSchema(), browseProperties))
.process(new BrowseKafkaProcessFunction());
tableEnv.registerDataStream("source_kafka_browse_log",browseStream,"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");
//4、注册UpsertStreamTableSink
String[] sinkFieldNames={"userID","browseNumber"};
DataType[] sinkFieldTypes={DataTypes.STRING(),DataTypes.BIGINT()};
UpsertStreamTableSink myRetractStreamTableSink = new MyUpsertStreamTableSink(sinkFieldNames,sinkFieldTypes);
tableEnv.registerTableSink("sink_stdout",myRetractStreamTableSink);
//5、连续查询
//统计每个Uid的浏览次数
String sql="insert into sink_stdout select userID,count(1) as browseNumber from source_kafka_browse_log where userID in ('user_1','user_2') group by userID ";
tableEnv.sqlUpdate(sql);
//6、开始执行
tableEnv.execute(Test.class.getSimpleName());
}/**
* 解析Kafka数据
* 将Kafka JSON String 解析成JavaBean: UserBrowseLog
* UserBrowseLog(String userID, String eventTime, String eventType, String productID, int productPrice, long eventTimeTimestamp)
*/
private static class BrowseKafkaProcessFunction extends ProcessFunction, UserBrowseLog> {
@Override
public void processElement(String value, Context ctx, Collector out) throws Exception {
try {UserBrowseLog log = JSON.parseObject(value, UserBrowseLog.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
log.setEventTimeTimestamp(eventTimeTimestamp);
out.collect(log);
}catch (Exception ex){
log.error("解析Kafka数据异常...",ex);
}
}
}/**
* 自定义UpsertStreamTableSink
* Table在内部被转换成具有Add(增加)和Retract(撤消/删除)的消息流,最终交由DataStream的SinkFunction处理。
* Boolean是Add(增加)或Retract(删除)的flag(标识)。Row是真正的数据类型。
* Table中的Insert被编码成一条Add消息。如Tuple2。
* Table中的Update被编码成一条Add消息。如Tuple2。
* 在SortLimit(即order by ... limit ...)的场景下,被编码成两条消息。一条删除消息Tuple2,一条增加消息Tuple2。
*/
private static class MyUpsertStreamTableSink implements UpsertStreamTableSink {private TableSchema tableSchema;
public MyUpsertStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames,fieldTypes).build();
}@Override
public TableSchema getTableSchema() {
return tableSchema;
}// 设置Unique Key
// 如上SQL中有GroupBy,则这里的唯一键会自动被推导为GroupBy的字段
@Override
public void setKeyFields(String[] keys) {}// 是否只有Insert
// 如上SQL场景,需要Update,则这里被推导为isAppendOnly=false
@Override
public void setIsAppendOnly(Boolean isAppendOnly) {}@Override
public TypeInformation getRecordType() {
return new RowTypeInfo(tableSchema.getFieldTypes(),tableSchema.getFieldNames());
}// 已过时
@Override
public void emitDataStream(DataStream> dataStream) {}// 最终会转换成DataStream处理
@Override
public DataStreamSink> consumeDataStream(DataStream> dataStream) {
return dataStream.addSink(new SinkFunction());
}@Override
public TableSink> configure(String[] fieldNames, TypeInformation>[] fieldTypes) {
return null;
}private static class SinkFunction extends RichSinkFunction> {
public SinkFunction() {
}@Override
public void invoke(Tuple2 value, Context context) throws Exception {
Boolean flag = value.f0;
if(flag){
System.out.println("增加... "+value);
}else {
System.out.println("删除... "+value);
}
}
}
}
}
结果
增加... (true,user_1,1)
//user_1更新时被编译成一条消息
增加... (true,user_1,2)
//user_1更新时被编译成一条消息
增加... (true,user_1,3)
增加... (true,user_1,4)
//同理user_2更新
增加... (true,user_2,1)
增加... (true,user_2,2)
增加... (true,user_1,5)
推荐阅读
- JAVA(抽象类与接口的区别&重载与重写&内存泄漏)
- 基于|基于 antd 风格的 element-table + pagination 的二次封装
- 宋仲基&宋慧乔(我们不公布恋情,我们直接结婚。)
- 21天|21天|M&M《见识》04
- tableView|tableView 头视图下拉放大 重写
- 二叉树路径节点关键值和等于目标值(LeetCode--112&LeetCode--113)
- 前端代码|前端代码 返回顶部 backToTop
- 2021—3—8日教练实践总结&呼吸练习&觉察日记
- #12-UITableView|#12-UITableView 优化方案
- 奇迹-妖妈|奇迹-妖妈 感恩日记46/365&非暴力沟通第3天