Skip to content

Commit

Permalink
Merge branch '1.8_release_3.10.x' into 1.8_release-github
Browse files Browse the repository at this point in the history
# Conflicts:
#	core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java
#	launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java
#	launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java
#	rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncSideInfo.java
  • Loading branch information
dapeng committed May 21, 2020
2 parents 6f8c9aa + db38a64 commit 5c176c8
Show file tree
Hide file tree
Showing 314 changed files with 8,106 additions and 4,020 deletions.
10 changes: 10 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
build:
stage: test
script:
- mvn clean org.jacoco:jacoco-maven-plugin:0.7.8:prepare-agent package -Dmaven.test.failure.ignore=true -q
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.login=11974c5e9a29625efa09fdc3c3fdc031efb1aab1 -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
- sh ci/sonar_notify.sh
only:
- v1.8.0_dev
tags:
- dt-insight-engine
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ FlinkStreamSQL
* 自定义create view 语法
* 自定义create function 语法
* 实现了流与维表的join
* 支持原生FLinkSQL所有的语法
* 扩展了输入和输出的性能指标到promethus
* 支持原生FlinkSQL所有的语法
* 扩展了输入和输出的性能指标到Task metrics

## 目录

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.dtstack.flink.sql.side.AllReqRow;
import com.dtstack.flink.sql.side.BaseAllReqRow;
import com.dtstack.flink.sql.side.FieldInfo;
import com.dtstack.flink.sql.side.JoinInfo;
import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand All @@ -62,14 +62,12 @@
*
* @author xuqianjin
*/
public class CassandraAllReqRow extends AllReqRow {
public class CassandraAllReqRow extends BaseAllReqRow {

private static final long serialVersionUID = 54015343561288219L;

private static final Logger LOG = LoggerFactory.getLogger(CassandraAllReqRow.class);

private static final String cassandra_DRIVER = "com.cassandra.jdbc.Driver";

private static final int CONN_RETRY_NUM = 3;

private static final int FETCH_SIZE = 1000;
Expand All @@ -79,7 +77,7 @@ public class CassandraAllReqRow extends AllReqRow {

private AtomicReference<Map<String, List<Map<String, Object>>>> cacheRef = new AtomicReference<>();

public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
super(new com.dtstack.flink.sql.side.cassandra.CassandraAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
}

Expand Down Expand Up @@ -269,7 +267,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
String connInfo = "address:" + tableInfo.getAddress() + ";userName:" + tableInfo.getUserName()
+ ",pwd:" + tableInfo.getPassword();
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
Thread.sleep(5 * 1000);
Thread.sleep(LOAD_DATA_ERROR_SLEEP_TIME);
} catch (InterruptedException e1) {
LOG.error("", e1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import com.dtstack.flink.sql.side.FieldInfo;
import com.dtstack.flink.sql.side.JoinInfo;
import com.dtstack.flink.sql.side.SideInfo;
import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.side.BaseSideInfo;
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
import com.dtstack.flink.sql.util.ParseUtils;
import org.apache.calcite.sql.SqlNode;
Expand All @@ -37,16 +37,16 @@
*
* @author xuqianjin
*/
public class CassandraAllSideInfo extends SideInfo {
public class CassandraAllSideInfo extends BaseSideInfo {

private static final long serialVersionUID = -8690814317653033557L;

public CassandraAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
public CassandraAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
}

@Override
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
CassandraSideTableInfo cassandraSideTableInfo = (CassandraSideTableInfo) sideTableInfo;

sqlCondition = "select ${selectField} from ${tableName} ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.dtstack.flink.sql.enums.ECacheContentType;
import com.dtstack.flink.sql.side.AsyncReqRow;
import com.dtstack.flink.sql.side.BaseAsyncReqRow;
import com.dtstack.flink.sql.side.CacheMissVal;
import com.dtstack.flink.sql.side.FieldInfo;
import com.dtstack.flink.sql.side.JoinInfo;
import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.dtstack.flink.sql.side.cache.CacheObj;
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
import com.google.common.base.Function;
Expand All @@ -67,7 +67,7 @@
*
* @author xuqianjin
*/
public class CassandraAsyncReqRow extends AsyncReqRow {
public class CassandraAsyncReqRow extends BaseAsyncReqRow {

private static final long serialVersionUID = 6631584128079864735L;

Expand All @@ -83,7 +83,7 @@ public class CassandraAsyncReqRow extends AsyncReqRow {
private transient ListenableFuture session;
private transient CassandraSideTableInfo cassandraSideTableInfo;

public CassandraAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
public CassandraAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
super(new com.dtstack.flink.sql.side.cassandra.CassandraAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
}

Expand Down Expand Up @@ -162,61 +162,14 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
}

@Override
public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exception {
CRow inputCopy = new CRow(input.row(), input.change());
JsonArray inputParams = new JsonArray();
StringBuffer stringBuffer = new StringBuffer();
String sqlWhere = " where ";

for (int i = 0; i < sideInfo.getEqualFieldList().size(); i++) {
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
Object equalObj = inputCopy.row().getField(conValIndex);
if (equalObj == null) {
dealMissKey(inputCopy, resultFuture);
return;
}
inputParams.add(equalObj);
StringBuffer sqlTemp = stringBuffer.append(sideInfo.getEqualFieldList().get(i))
.append(" = ");
if (equalObj instanceof String) {
sqlTemp.append("'" + equalObj + "'")
.append(" and ");
} else {
sqlTemp.append(equalObj)
.append(" and ");
}

}
public void handleAsyncInvoke(Map<String, Object> inputParams, CRow input, ResultFuture<CRow> resultFuture) throws Exception {

String key = buildCacheKey(inputParams);
sqlWhere = sqlWhere + stringBuffer.toString().substring(0, stringBuffer.lastIndexOf(" and "));

if (openCache()) {
CacheObj val = getFromCache(key);
if (val != null) {

if (ECacheContentType.MissVal == val.getType()) {
dealMissKey(inputCopy, resultFuture);
return;
} else if (ECacheContentType.MultiLine == val.getType()) {
List<CRow> rowList = Lists.newArrayList();
for (Object jsonArray : (List) val.getContent()) {
Row row = fillData(inputCopy.row(), jsonArray);
rowList.add(new CRow(row, inputCopy.change()));
}
resultFuture.complete(rowList);
} else {
throw new RuntimeException("not support cache obj type " + val.getType());
}
return;
}
}

//connect Cassandra
connCassandraDB(cassandraSideTableInfo);

String sqlCondition = sideInfo.getSqlCondition() + " " + sqlWhere + " ALLOW FILTERING ";
System.out.println("sqlCondition:" + sqlCondition);
String sqlCondition = sideInfo.getSqlCondition() + " " + buildWhereCondition(inputParams) + " ALLOW FILTERING ";
LOG.info("sqlCondition:{}" + sqlCondition);

ListenableFuture<ResultSet> resultSet = Futures.transformAsync(session,
new AsyncFunction<Session, ResultSet>() {
Expand All @@ -242,18 +195,18 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
List<CRow> rowList = Lists.newArrayList();
for (com.datastax.driver.core.Row line : rows) {
Row row = fillData(inputCopy.row(), line);
Row row = fillData(input.row(), line);
if (openCache()) {
cacheContent.add(line);
}
rowList.add(new CRow(row,inputCopy.change()));
rowList.add(new CRow(row, input.change()));
}
resultFuture.complete(rowList);
if (openCache()) {
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
}
} else {
dealMissKey(inputCopy, resultFuture);
dealMissKey(input, resultFuture);
if (openCache()) {
putCache(key, CacheMissVal.getMissKeyObj());
}
Expand All @@ -265,13 +218,30 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
public void onFailure(Throwable t) {
LOG.error("Failed to retrieve the data: %s%n",
t.getMessage());
System.out.println("Failed to retrieve the data: " + t.getMessage());
cluster.closeAsync();
resultFuture.completeExceptionally(t);
}
});
}

@Override
public String buildCacheKey(Map<String, Object> inputParams) {
StringBuilder sb = new StringBuilder();
for (Object ele : inputParams.values()) {
sb.append(ele.toString()).append("_");
}
return sb.toString();
}

private String buildWhereCondition(Map<String, Object> inputParams){
StringBuilder sb = new StringBuilder(" where ");
for(Map.Entry<String, Object> entry : inputParams.entrySet()){
Object value = entry.getValue() instanceof String ? "'" + entry.getValue() + "'" : entry.getValue();
sb.append(String.format("%s = %s", entry.getKey(), value));
}
return sb.toString();
}

@Override
public Row fillData(Row input, Object line) {
com.datastax.driver.core.Row rowArray = (com.datastax.driver.core.Row) line;
Expand Down Expand Up @@ -306,14 +276,4 @@ public void close() throws Exception {
cluster = null;
}
}

public String buildCacheKey(JsonArray jsonArray) {
StringBuilder sb = new StringBuilder();
for (Object ele : jsonArray.getList()) {
sb.append(ele.toString())
.append("_");
}

return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import com.dtstack.flink.sql.side.FieldInfo;
import com.dtstack.flink.sql.side.JoinInfo;
import com.dtstack.flink.sql.side.SideInfo;
import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.side.BaseSideInfo;
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
import com.dtstack.flink.sql.util.ParseUtils;
import org.apache.calcite.sql.SqlBasicCall;
Expand All @@ -30,6 +30,8 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

Expand All @@ -39,16 +41,18 @@
*
* @author xuqianjin
*/
public class CassandraAsyncSideInfo extends SideInfo {
public class CassandraAsyncSideInfo extends BaseSideInfo {

private static final long serialVersionUID = -4403313049809013362L;
private static final Logger LOG = LoggerFactory.getLogger(CassandraAsyncSideInfo.class.getSimpleName());

public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {

public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
}

@Override
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
CassandraSideTableInfo cassandraSideTableInfo = (CassandraSideTableInfo) sideTableInfo;

String sideTableName = joinInfo.getSideTableName();
Expand All @@ -63,9 +67,9 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
}

sqlCondition = "select ${selectField} from ${tableName}";

sqlCondition = sqlCondition.replace("${tableName}", cassandraSideTableInfo.getDatabase()+"."+cassandraSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
System.out.println("---------side_exe_sql-----\n" + sqlCondition);

LOG.info("---------side_exe_sql-----\n{}" + sqlCondition);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,24 @@

package com.dtstack.flink.sql.side.cassandra.table;

import com.dtstack.flink.sql.table.AbsSideTableParser;
import com.dtstack.flink.sql.table.TableInfo;
import com.dtstack.flink.sql.table.AbstractSideTableParser;
import com.dtstack.flink.sql.table.AbstractTableInfo;
import com.dtstack.flink.sql.util.MathUtil;

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY;
import static com.dtstack.flink.sql.table.AbstractTableInfo.PARALLELISM_KEY;

/**
* Reason:
* Date: 2018/11/22
*
* @author xuqianjin
*/
public class CassandraSideParser extends AbsSideTableParser {
public class CassandraSideParser extends AbstractSideTableParser {

private final static String SIDE_SIGN_KEY = "sideSignKey";

Expand Down Expand Up @@ -73,7 +71,7 @@ public CassandraSideParser() {
}

@Override
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo cassandraSideTableInfo = new com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo();
cassandraSideTableInfo.setName(tableName);
parseFieldsInfo(fieldsInfo, cassandraSideTableInfo);
Expand All @@ -96,9 +94,10 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
return cassandraSideTableInfo;
}

private void dealSideSign(Matcher matcher, TableInfo tableInfo) {
private void dealSideSign(Matcher matcher, AbstractTableInfo tableInfo) {
}

@Override
public Class dbTypeConvertToJavaType(String fieldType) {
switch (fieldType.toLowerCase()) {
case "bigint":
Expand All @@ -121,6 +120,8 @@ public Class dbTypeConvertToJavaType(String fieldType) {
return Double.class;
case "timestamp":
return Timestamp.class;
default:
break;
}

throw new RuntimeException("不支持 " + fieldType + " 类型");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package com.dtstack.flink.sql.side.cassandra.table;

import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.google.common.base.Preconditions;

/**
Expand All @@ -28,7 +28,7 @@
*
* @author xuqianjin
*/
public class CassandraSideTableInfo extends SideTableInfo {
public class CassandraSideTableInfo extends AbstractSideTableInfo {

private static final long serialVersionUID = -5556431094535478915L;

Expand Down
Loading

0 comments on commit 5c176c8

Please sign in to comment.