Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
D
dlink
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhaowei
dlink
Commits
8fc49e14
Commit
8fc49e14
authored
Mar 17, 2022
by
wenmo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
app 版本补全
parent
c7751365
Changes
28
Show whitespace changes
Inline
Side-by-side
Showing
28 changed files
with
2506 additions
and
122 deletions
+2506
-122
pom.xml
dlink-app/dlink-app-1.11/pom.xml
+100
-0
MainApp.java
...p/dlink-app-1.11/src/main/java/com/dlink/app/MainApp.java
+0
-0
CustomTableEnvironmentImpl.java
...n/java/com/dlink/executor/CustomTableEnvironmentImpl.java
+216
-0
CustomTableResultImpl.java
...c/main/java/com/dlink/executor/CustomTableResultImpl.java
+262
-0
TableSchemaField.java
...11/src/main/java/com/dlink/executor/TableSchemaField.java
+2
-2
pom.xml
dlink-app/dlink-app-1.12/pom.xml
+100
-0
MainApp.java
...p/dlink-app-1.12/src/main/java/com/dlink/app/MainApp.java
+27
-0
CustomTableEnvironmentImpl.java
...n/java/com/dlink/executor/CustomTableEnvironmentImpl.java
+218
-0
CustomTableResultImpl.java
...c/main/java/com/dlink/executor/CustomTableResultImpl.java
+369
-0
TableSchemaField.java
...12/src/main/java/com/dlink/executor/TableSchemaField.java
+33
-0
pom.xml
dlink-app/dlink-app-1.13/pom.xml
+100
-0
MainApp.java
...p/dlink-app-1.13/src/main/java/com/dlink/app/MainApp.java
+27
-0
CustomTableEnvironmentImpl.java
...n/java/com/dlink/executor/CustomTableEnvironmentImpl.java
+0
-0
CustomTableResultImpl.java
...c/main/java/com/dlink/executor/CustomTableResultImpl.java
+31
-14
TableSchemaField.java
...13/src/main/java/com/dlink/executor/TableSchemaField.java
+33
-0
pom.xml
dlink-app/dlink-app-1.14/pom.xml
+100
-0
MainApp.java
...p/dlink-app-1.14/src/main/java/com/dlink/app/MainApp.java
+27
-0
CustomTableEnvironmentImpl.java
...n/java/com/dlink/executor/CustomTableEnvironmentImpl.java
+329
-0
CustomTableResultImpl.java
...c/main/java/com/dlink/executor/CustomTableResultImpl.java
+395
-0
TableSchemaField.java
...14/src/main/java/com/dlink/executor/TableSchemaField.java
+33
-0
pom.xml
dlink-app/dlink-app-base/pom.xml
+41
-0
DBConfig.java
...ink-app-base/src/main/java/com/dlink/app/db/DBConfig.java
+3
-3
DBUtil.java
...dlink-app-base/src/main/java/com/dlink/app/db/DBUtil.java
+14
-19
StatementParam.java
.../src/main/java/com/dlink/app/flinksql/StatementParam.java
+0
-0
Submiter.java
...p-base/src/main/java/com/dlink/app/flinksql/Submiter.java
+5
-6
pom.xml
dlink-app/pom.xml
+13
-76
package.xml
dlink-assembly/src/main/assembly/package.xml
+23
-2
pom.xml
pom.xml
+5
-0
No files found.
dlink-app/dlink-app-1.11/pom.xml
0 → 100644
View file @
8fc49e14
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
dlink-app
</artifactId>
<groupId>
com.dlink
</groupId>
<version>
0.6.0-SNAPSHOT
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
dlink-app-1.11
</artifactId>
<properties>
<mainClass>
com.dlink.app.MainApp
</mainClass>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<maven.compiler.source>
1.8
</maven.compiler.source>
<maven.compiler.target>
1.8
</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-app-base
</artifactId>
<version>
${project.version}
</version>
</dependency>
<dependency>
<groupId>
mysql
</groupId>
<artifactId>
mysql-connector-java
</artifactId>
<!-- <scope>provided</scope>-->
<version>
8.0.21
</version>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-1.11
</artifactId>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-base
</artifactId>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-executor
</artifactId>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>
src/main/resources
</directory>
<includes>
<include>
*.properties
</include>
</includes>
</resource>
</resources>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<version>
3.6.0
</version>
<configuration>
<source>
1.8
</source>
<target>
1.8
</target>
<encoding>
UTF-8
</encoding>
</configuration>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-assembly-plugin
</artifactId>
<version>
2.6
</version>
<configuration>
<descriptorRefs>
<descriptorRef>
jar-with-dependencies
</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass>
com.dlink.app.MainApp
</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>
make-assembly
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
dlink-app/src/main/java/com/dlink/app/MainApp.java
→
dlink-app/
dlink-app-1.11/
src/main/java/com/dlink/app/MainApp.java
View file @
8fc49e14
File moved
dlink-app/dlink-app-1.11/src/main/java/com/dlink/executor/CustomTableEnvironmentImpl.java
0 → 100644
View file @
8fc49e14
package
com
.
dlink
.
executor
;
import
com.dlink.result.SqlExplainResult
;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
org.apache.flink.api.common.typeinfo.TypeInformation
;
import
org.apache.flink.api.dag.Transformation
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.configuration.PipelineOptions
;
import
org.apache.flink.runtime.jobgraph.JobGraph
;
import
org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
;
import
org.apache.flink.runtime.rest.messages.JobPlanInfo
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.graph.JSONGenerator
;
import
org.apache.flink.streaming.api.graph.StreamGraph
;
import
org.apache.flink.table.api.EnvironmentSettings
;
import
org.apache.flink.table.api.ExplainDetail
;
import
org.apache.flink.table.api.TableConfig
;
import
org.apache.flink.table.api.TableException
;
import
org.apache.flink.table.api.internal.TableEnvironmentImpl
;
import
org.apache.flink.table.catalog.CatalogManager
;
import
org.apache.flink.table.catalog.FunctionCatalog
;
import
org.apache.flink.table.catalog.GenericInMemoryCatalog
;
import
org.apache.flink.table.delegation.Executor
;
import
org.apache.flink.table.delegation.ExecutorFactory
;
import
org.apache.flink.table.delegation.Planner
;
import
org.apache.flink.table.delegation.PlannerFactory
;
import
org.apache.flink.table.factories.ComponentFactoryService
;
import
org.apache.flink.table.functions.AggregateFunction
;
import
org.apache.flink.table.functions.TableAggregateFunction
;
import
org.apache.flink.table.functions.TableFunction
;
import
org.apache.flink.table.functions.UserDefinedFunctionHelper
;
import
org.apache.flink.table.module.ModuleManager
;
import
org.apache.flink.table.operations.ExplainOperation
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.operations.QueryOperation
;
import
org.apache.flink.table.planner.delegation.ExecutorBase
;
import
org.apache.flink.table.planner.utils.ExecutorUtils
;
import
java.lang.reflect.Method
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
/**
* 定制TableEnvironmentImpl
*
* @author wenmo
* @since 2021/6/7 22:06
**/
public
class
CustomTableEnvironmentImpl
extends
TableEnvironmentImpl
implements
CustomTableEnvironment
{
protected
CustomTableEnvironmentImpl
(
CatalogManager
catalogManager
,
ModuleManager
moduleManager
,
TableConfig
tableConfig
,
Executor
executor
,
FunctionCatalog
functionCatalog
,
Planner
planner
,
boolean
isStreamingMode
,
ClassLoader
userClassLoader
)
{
super
(
catalogManager
,
moduleManager
,
tableConfig
,
executor
,
functionCatalog
,
planner
,
isStreamingMode
,
userClassLoader
);
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
)
{
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
build
());
}
public
static
CustomTableEnvironmentImpl
createBatch
(
StreamExecutionEnvironment
executionEnvironment
)
{
Configuration
configuration
=
new
Configuration
();
configuration
.
setString
(
"execution.runtime-mode"
,
"BATCH"
);
TableConfig
tableConfig
=
new
TableConfig
();
tableConfig
.
addConfiguration
(
configuration
);
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
useBlinkPlanner
().
inBatchMode
().
build
(),
tableConfig
);
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
)
{
return
create
(
executionEnvironment
,
settings
,
new
TableConfig
());
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
,
TableConfig
tableConfig
)
{
ClassLoader
classLoader
=
Thread
.
currentThread
().
getContextClassLoader
();
ModuleManager
moduleManager
=
new
ModuleManager
();
CatalogManager
catalogManager
=
CatalogManager
.
newBuilder
().
classLoader
(
classLoader
).
config
(
tableConfig
.
getConfiguration
()).
defaultCatalog
(
settings
.
getBuiltInCatalogName
(),
new
GenericInMemoryCatalog
(
settings
.
getBuiltInCatalogName
(),
settings
.
getBuiltInDatabaseName
())).
executionConfig
(
executionEnvironment
.
getConfig
()).
build
();
FunctionCatalog
functionCatalog
=
new
FunctionCatalog
(
tableConfig
,
catalogManager
,
moduleManager
);
Map
<
String
,
String
>
executorProperties
=
settings
.
toExecutorProperties
();
Executor
executor
=
lookupExecutor
(
executorProperties
,
executionEnvironment
);
Map
<
String
,
String
>
plannerProperties
=
settings
.
toPlannerProperties
();
Planner
planner
=
(
ComponentFactoryService
.
find
(
PlannerFactory
.
class
,
plannerProperties
)).
create
(
plannerProperties
,
executor
,
tableConfig
,
functionCatalog
,
catalogManager
);
return
new
CustomTableEnvironmentImpl
(
catalogManager
,
moduleManager
,
tableConfig
,
executor
,
functionCatalog
,
planner
,
settings
.
isStreamingMode
(),
classLoader
);
}
private
static
Executor
lookupExecutor
(
Map
<
String
,
String
>
executorProperties
,
StreamExecutionEnvironment
executionEnvironment
)
{
try
{
ExecutorFactory
executorFactory
=
ComponentFactoryService
.
find
(
ExecutorFactory
.
class
,
executorProperties
);
Method
createMethod
=
executorFactory
.
getClass
().
getMethod
(
"create"
,
Map
.
class
,
StreamExecutionEnvironment
.
class
);
return
(
Executor
)
createMethod
.
invoke
(
executorFactory
,
executorProperties
,
executionEnvironment
);
}
catch
(
Exception
var4
)
{
throw
new
TableException
(
"Could not instantiate the executor. Make sure a planner module is on the classpath"
,
var4
);
}
}
public
ObjectNode
getStreamGraph
(
String
statement
)
{
List
<
Operation
>
operations
=
super
.
parser
.
parse
(
statement
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Unsupported SQL query! explainSql() only accepts a single SQL query."
);
}
else
{
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
operations
.
size
();
i
++)
{
if
(
operations
.
get
(
i
)
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operations
.
get
(
i
));
}
}
List
<
Transformation
<?>>
trans
=
super
.
planner
.
translate
(
modifyOperations
);
if
(
execEnv
instanceof
ExecutorBase
)
{
StreamGraph
streamGraph
=
ExecutorUtils
.
generateStreamGraph
(((
ExecutorBase
)
execEnv
).
getExecutionEnvironment
(),
trans
);
JSONGenerator
jsonGenerator
=
new
JSONGenerator
(
streamGraph
);
String
json
=
jsonGenerator
.
getJSON
();
ObjectMapper
mapper
=
new
ObjectMapper
();
ObjectNode
objectNode
=
mapper
.
createObjectNode
();
try
{
objectNode
=
(
ObjectNode
)
mapper
.
readTree
(
json
);
}
catch
(
JsonProcessingException
e
)
{
e
.
printStackTrace
();
}
finally
{
return
objectNode
;
}
}
else
{
throw
new
TableException
(
"Unsupported SQL query! ExecEnv need a ExecutorBase."
);
}
}
}
public
JobPlanInfo
getJobPlanInfo
(
List
<
String
>
statements
)
{
return
new
JobPlanInfo
(
JsonPlanGenerator
.
generatePlan
(
getJobGraphFromInserts
(
statements
)));
}
public
StreamGraph
getStreamGraphFromInserts
(
List
<
String
>
statements
)
{
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
();
for
(
String
statement
:
statements
)
{
List
<
Operation
>
operations
=
getParser
().
parse
(
statement
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Only single statement is supported."
);
}
else
{
Operation
operation
=
operations
.
get
(
0
);
if
(
operation
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operation
);
}
else
{
throw
new
TableException
(
"Only insert statement is supported now."
);
}
}
}
List
<
Transformation
<?>>
trans
=
getPlanner
().
translate
(
modifyOperations
);
if
(
execEnv
instanceof
ExecutorBase
)
{
StreamGraph
streamGraph
=
ExecutorUtils
.
generateStreamGraph
(((
ExecutorBase
)
execEnv
).
getExecutionEnvironment
(),
trans
);
if
(
tableConfig
.
getConfiguration
().
containsKey
(
PipelineOptions
.
NAME
.
key
()))
{
streamGraph
.
setJobName
(
tableConfig
.
getConfiguration
().
getString
(
PipelineOptions
.
NAME
));
}
return
streamGraph
;
}
else
{
throw
new
TableException
(
"Unsupported SQL query! ExecEnv need a ExecutorBase."
);
}
}
public
JobGraph
getJobGraphFromInserts
(
List
<
String
>
statements
)
{
return
getStreamGraphFromInserts
(
statements
).
getJobGraph
();
}
public
SqlExplainResult
explainSqlRecord
(
String
statement
,
ExplainDetail
...
extraDetails
)
{
SqlExplainResult
record
=
new
SqlExplainResult
();
List
<
Operation
>
operations
=
parser
.
parse
(
statement
);
record
.
setParseTrue
(
true
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Unsupported SQL query! explainSql() only accepts a single SQL query."
);
}
List
<
Operation
>
operationlist
=
new
ArrayList
<>(
operations
);
for
(
int
i
=
0
;
i
<
operationlist
.
size
();
i
++)
{
Operation
operation
=
operationlist
.
get
(
i
);
if
(
operation
instanceof
ModifyOperation
)
{
record
.
setType
(
"Modify DML"
);
}
else
if
(
operation
instanceof
ExplainOperation
)
{
record
.
setType
(
"Explain DML"
);
}
else
if
(
operation
instanceof
QueryOperation
)
{
record
.
setType
(
"Query DML"
);
}
else
{
record
.
setExplain
(
operation
.
asSummaryString
());
operationlist
.
remove
(
i
);
record
.
setType
(
"DDL"
);
i
=
i
-
1
;
}
}
record
.
setExplainTrue
(
true
);
if
(
operationlist
.
size
()
==
0
)
{
return
record
;
}
record
.
setExplain
(
planner
.
explain
(
operationlist
,
extraDetails
));
return
record
;
}
public
<
T
>
void
registerFunction
(
String
name
,
TableFunction
<
T
>
tableFunction
)
{
TypeInformation
<
T
>
typeInfo
=
UserDefinedFunctionHelper
.
getReturnTypeOfTableFunction
(
tableFunction
);
this
.
functionCatalog
.
registerTempSystemTableFunction
(
name
,
tableFunction
,
typeInfo
);
}
public
<
T
,
ACC
>
void
registerFunction
(
String
name
,
AggregateFunction
<
T
,
ACC
>
aggregateFunction
)
{
TypeInformation
<
T
>
typeInfo
=
UserDefinedFunctionHelper
.
getReturnTypeOfAggregateFunction
(
aggregateFunction
);
TypeInformation
<
ACC
>
accTypeInfo
=
UserDefinedFunctionHelper
.
getAccumulatorTypeOfAggregateFunction
(
aggregateFunction
);
this
.
functionCatalog
.
registerTempSystemAggregateFunction
(
name
,
aggregateFunction
,
typeInfo
,
accTypeInfo
);
}
public
<
T
,
ACC
>
void
registerFunction
(
String
name
,
TableAggregateFunction
<
T
,
ACC
>
tableAggregateFunction
)
{
TypeInformation
<
T
>
typeInfo
=
UserDefinedFunctionHelper
.
getReturnTypeOfAggregateFunction
(
tableAggregateFunction
);
TypeInformation
<
ACC
>
accTypeInfo
=
UserDefinedFunctionHelper
.
getAccumulatorTypeOfAggregateFunction
(
tableAggregateFunction
);
this
.
functionCatalog
.
registerTempSystemAggregateFunction
(
name
,
tableAggregateFunction
,
typeInfo
,
accTypeInfo
);
}
public
boolean
parseAndLoadConfiguration
(
String
statement
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
return
false
;
}
}
dlink-app/dlink-app-1.11/src/main/java/com/dlink/executor/CustomTableResultImpl.java
0 → 100644
View file @
8fc49e14
package
com
.
dlink
.
executor
;
import
org.apache.flink.annotation.Internal
;
import
org.apache.flink.core.execution.JobClient
;
import
org.apache.flink.table.api.*
;
import
org.apache.flink.table.utils.PrintUtils
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.util.CloseableIterator
;
import
org.apache.flink.util.Preconditions
;
import
javax.annotation.Nullable
;
import
java.io.PrintWriter
;
import
java.util.Collections
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.Optional
;
/**
* 定制TableResultImpl
*
* @author wenmo
* @since 2021/6/7 22:06
**/
@Internal
class
CustomTableResultImpl
implements
TableResult
{
public
static
final
TableResult
TABLE_RESULT_OK
=
CustomTableResultImpl
.
builder
()
.
resultKind
(
ResultKind
.
SUCCESS
)
.
tableSchema
(
TableSchema
.
builder
().
field
(
"result"
,
DataTypes
.
STRING
()).
build
())
.
data
(
Collections
.
singletonList
(
Row
.
of
(
"OK"
)))
.
build
();
private
final
JobClient
jobClient
;
private
final
TableSchema
tableSchema
;
private
final
ResultKind
resultKind
;
private
final
CloseableIterator
<
Row
>
data
;
private
final
PrintStyle
printStyle
;
private
CustomTableResultImpl
(
@Nullable
JobClient
jobClient
,
TableSchema
tableSchema
,
ResultKind
resultKind
,
CloseableIterator
<
Row
>
data
,
PrintStyle
printStyle
)
{
this
.
jobClient
=
jobClient
;
this
.
tableSchema
=
Preconditions
.
checkNotNull
(
tableSchema
,
"tableSchema should not be null"
);
this
.
resultKind
=
Preconditions
.
checkNotNull
(
resultKind
,
"resultKind should not be null"
);
this
.
data
=
Preconditions
.
checkNotNull
(
data
,
"data should not be null"
);
this
.
printStyle
=
Preconditions
.
checkNotNull
(
printStyle
,
"printStyle should not be null"
);
}
public
static
TableResult
buildTableResult
(
List
<
TableSchemaField
>
fields
,
List
<
Row
>
rows
)
{
Builder
builder
=
builder
().
resultKind
(
ResultKind
.
SUCCESS
);
if
(
fields
.
size
()
>
0
)
{
TableSchema
.
Builder
tableSchemaBuild
=
TableSchema
.
builder
();
for
(
int
i
=
0
;
i
<
fields
.
size
();
i
++)
{
tableSchemaBuild
.
field
(
fields
.
get
(
i
).
getName
(),
fields
.
get
(
i
).
getType
());
}
builder
.
tableSchema
(
tableSchemaBuild
.
build
()).
data
(
rows
);
}
return
builder
.
build
();
}
@Override
public
Optional
<
JobClient
>
getJobClient
()
{
return
Optional
.
ofNullable
(
jobClient
);
}
@Override
public
TableSchema
getTableSchema
()
{
return
tableSchema
;
}
@Override
public
ResultKind
getResultKind
()
{
return
resultKind
;
}
@Override
public
CloseableIterator
<
Row
>
collect
()
{
return
data
;
}
@Override
public
void
print
()
{
Iterator
<
Row
>
it
=
collect
();
if
(
printStyle
instanceof
TableauStyle
)
{
int
maxColumnWidth
=
((
TableauStyle
)
printStyle
).
getMaxColumnWidth
();
String
nullColumn
=
((
TableauStyle
)
printStyle
).
getNullColumn
();
boolean
deriveColumnWidthByType
=
((
TableauStyle
)
printStyle
).
isDeriveColumnWidthByType
();
PrintUtils
.
printAsTableauForm
(
getTableSchema
(),
it
,
new
PrintWriter
(
System
.
out
),
maxColumnWidth
,
nullColumn
,
deriveColumnWidthByType
);
}
else
if
(
printStyle
instanceof
RawContentStyle
)
{
while
(
it
.
hasNext
())
{
System
.
out
.
println
(
String
.
join
(
","
,
PrintUtils
.
rowToString
(
it
.
next
())));
}
}
else
{
throw
new
TableException
(
"Unsupported print style: "
+
printStyle
);
}
}
public
static
Builder
builder
()
{
return
new
Builder
();
}
/**
* Builder for creating a {@link CustomTableResultImpl}.
*/
public
static
class
Builder
{
private
JobClient
jobClient
=
null
;
private
TableSchema
tableSchema
=
null
;
private
ResultKind
resultKind
=
null
;
private
CloseableIterator
<
Row
>
data
=
null
;
private
PrintStyle
printStyle
=
PrintStyle
.
tableau
(
Integer
.
MAX_VALUE
,
PrintUtils
.
NULL_COLUMN
,
false
);
private
Builder
()
{
}
/**
* Specifies job client which associates the submitted Flink job.
*
* @param jobClient a {@link JobClient} for the submitted Flink job.
*/
public
Builder
jobClient
(
JobClient
jobClient
)
{
this
.
jobClient
=
jobClient
;
return
this
;
}
/**
* Specifies table schema of the execution result.
*
* @param tableSchema a {@link TableSchema} for the execution result.
*/
public
Builder
tableSchema
(
TableSchema
tableSchema
)
{
Preconditions
.
checkNotNull
(
tableSchema
,
"tableSchema should not be null"
);
this
.
tableSchema
=
tableSchema
;
return
this
;
}
/**
* Specifies result kind of the execution result.
*
* @param resultKind a {@link ResultKind} for the execution result.
*/
public
Builder
resultKind
(
ResultKind
resultKind
)
{
Preconditions
.
checkNotNull
(
resultKind
,
"resultKind should not be null"
);
this
.
resultKind
=
resultKind
;
return
this
;
}
/**
* Specifies an row iterator as the execution result.
*
* @param rowIterator a row iterator as the execution result.
*/
public
Builder
data
(
CloseableIterator
<
Row
>
rowIterator
)
{
Preconditions
.
checkNotNull
(
rowIterator
,
"rowIterator should not be null"
);
this
.
data
=
rowIterator
;
return
this
;
}
/**
* Specifies an row list as the execution result.
*
* @param rowList a row list as the execution result.
*/
public
Builder
data
(
List
<
Row
>
rowList
)
{
Preconditions
.
checkNotNull
(
rowList
,
"listRows should not be null"
);
this
.
data
=
CloseableIterator
.
adapterForIterator
(
rowList
.
iterator
());
return
this
;
}
/**
* Specifies print style. Default is {@link TableauStyle} with max integer column width.
*/
public
Builder
setPrintStyle
(
PrintStyle
printStyle
)
{
Preconditions
.
checkNotNull
(
printStyle
,
"printStyle should not be null"
);
this
.
printStyle
=
printStyle
;
return
this
;
}
/**
* Returns a {@link TableResult} instance.
*/
public
TableResult
build
()
{
return
new
CustomTableResultImpl
(
jobClient
,
tableSchema
,
resultKind
,
data
,
printStyle
);
}
}
/**
* Root interface for all print styles.
*/
public
interface
PrintStyle
{
/**
* Create a tableau print style with given max column width, null column, and a flag to
* indicate whether the column width is derived from type (true) or content (false), which
* prints the result schema and content as tableau form.
*/
static
PrintStyle
tableau
(
int
maxColumnWidth
,
String
nullColumn
,
boolean
deriveColumnWidthByType
)
{
Preconditions
.
checkArgument
(
maxColumnWidth
>
0
,
"maxColumnWidth should be greater than 0"
);
Preconditions
.
checkNotNull
(
nullColumn
,
"nullColumn should not be null"
);
return
new
TableauStyle
(
maxColumnWidth
,
nullColumn
,
deriveColumnWidthByType
);
}
/**
* Create a raw content print style, which only print the result content as raw form. column
* delimiter is ",", row delimiter is "\n".
*/
static
PrintStyle
rawContent
()
{
return
new
RawContentStyle
();
}
}
/**
* print the result schema and content as tableau form.
*/
private
static
final
class
TableauStyle
implements
PrintStyle
{
/**
* A flag to indicate whether the column width is derived from type (true) or content
* (false).
*/
private
final
boolean
deriveColumnWidthByType
;
private
final
int
maxColumnWidth
;
private
final
String
nullColumn
;
private
TableauStyle
(
int
maxColumnWidth
,
String
nullColumn
,
boolean
deriveColumnWidthByType
)
{
this
.
deriveColumnWidthByType
=
deriveColumnWidthByType
;
this
.
maxColumnWidth
=
maxColumnWidth
;
this
.
nullColumn
=
nullColumn
;
}
public
boolean
isDeriveColumnWidthByType
()
{
return
deriveColumnWidthByType
;
}
int
getMaxColumnWidth
()
{
return
maxColumnWidth
;
}
String
getNullColumn
()
{
return
nullColumn
;
}
}
/**
* only print the result content as raw form. column delimiter is ",", row delimiter is "\n".
*/
private
static
final
class
RawContentStyle
implements
PrintStyle
{
}
}
dlink-app/src/main/java/com/dlink/executor/TableSchemaField.java
→
dlink-app/
dlink-app-1.11/
src/main/java/com/dlink/executor/TableSchemaField.java
View file @
8fc49e14
dlink-app/dlink-app-1.12/pom.xml
0 → 100644
View file @
8fc49e14
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
dlink-app
</artifactId>
<groupId>
com.dlink
</groupId>
<version>
0.6.0-SNAPSHOT
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
dlink-app-1.12
</artifactId>
<properties>
<mainClass>
com.dlink.app.MainApp
</mainClass>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<maven.compiler.source>
1.8
</maven.compiler.source>
<maven.compiler.target>
1.8
</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-app-base
</artifactId>
<version>
${project.version}
</version>
</dependency>
<dependency>
<groupId>
mysql
</groupId>
<artifactId>
mysql-connector-java
</artifactId>
<!-- <scope>provided</scope>-->
<version>
8.0.21
</version>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-1.12
</artifactId>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-base
</artifactId>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-executor
</artifactId>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>
src/main/resources
</directory>
<includes>
<include>
*.properties
</include>
</includes>
</resource>
</resources>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<version>
3.6.0
</version>
<configuration>
<source>
1.8
</source>
<target>
1.8
</target>
<encoding>
UTF-8
</encoding>
</configuration>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-assembly-plugin
</artifactId>
<version>
2.6
</version>
<configuration>
<descriptorRefs>
<descriptorRef>
jar-with-dependencies
</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass>
com.dlink.app.MainApp
</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>
make-assembly
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
dlink-app/dlink-app-1.12/src/main/java/com/dlink/app/MainApp.java
0 → 100644
View file @
8fc49e14
package
com
.
dlink
.
app
;
import
com.dlink.app.db.DBConfig
;
import
com.dlink.app.flinksql.Submiter
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.constant.FlinkParamConstant
;
import
com.dlink.utils.FlinkBaseUtil
;
import
java.io.IOException
;
import
java.util.Map
;
/**
* MainApp
*
* @author wenmo
* @since 2021/10/27
**/
public
class
MainApp
{
public
static
void
main
(
String
[]
args
)
throws
IOException
{
Map
<
String
,
String
>
params
=
FlinkBaseUtil
.
getParamsFromArgs
(
args
);
String
id
=
params
.
get
(
FlinkParamConstant
.
ID
);
Asserts
.
checkNullString
(
id
,
"请配置入参 id "
);
DBConfig
dbConfig
=
DBConfig
.
build
(
params
);
Submiter
.
submit
(
Integer
.
valueOf
(
id
),
dbConfig
);
}
}
dlink-app/dlink-app-1.12/src/main/java/com/dlink/executor/CustomTableEnvironmentImpl.java
0 → 100644
View file @
8fc49e14
package
com
.
dlink
.
executor
;
import
com.dlink.result.SqlExplainResult
;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
org.apache.flink.api.common.RuntimeExecutionMode
;
import
org.apache.flink.api.common.typeinfo.TypeInformation
;
import
org.apache.flink.api.dag.Transformation
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.configuration.ExecutionOptions
;
import
org.apache.flink.configuration.PipelineOptions
;
import
org.apache.flink.runtime.jobgraph.JobGraph
;
import
org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
;
import
org.apache.flink.runtime.rest.messages.JobPlanInfo
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.graph.JSONGenerator
;
import
org.apache.flink.streaming.api.graph.StreamGraph
;
import
org.apache.flink.table.api.EnvironmentSettings
;
import
org.apache.flink.table.api.ExplainDetail
;
import
org.apache.flink.table.api.TableConfig
;
import
org.apache.flink.table.api.TableException
;
import
org.apache.flink.table.api.internal.TableEnvironmentImpl
;
import
org.apache.flink.table.catalog.CatalogManager
;
import
org.apache.flink.table.catalog.FunctionCatalog
;
import
org.apache.flink.table.catalog.GenericInMemoryCatalog
;
import
org.apache.flink.table.delegation.Executor
;
import
org.apache.flink.table.delegation.ExecutorFactory
;
import
org.apache.flink.table.delegation.Planner
;
import
org.apache.flink.table.delegation.PlannerFactory
;
import
org.apache.flink.table.factories.ComponentFactoryService
;
import
org.apache.flink.table.functions.AggregateFunction
;
import
org.apache.flink.table.functions.TableAggregateFunction
;
import
org.apache.flink.table.functions.TableFunction
;
import
org.apache.flink.table.functions.UserDefinedFunctionHelper
;
import
org.apache.flink.table.module.ModuleManager
;
import
org.apache.flink.table.operations.ExplainOperation
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.operations.QueryOperation
;
import
org.apache.flink.table.planner.delegation.ExecutorBase
;
import
org.apache.flink.table.planner.utils.ExecutorUtils
;
import
java.lang.reflect.Method
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
/**
* 定制TableEnvironmentImpl
*
* @author wenmo
* @since 2021/6/7 22:06
**/
public
class
CustomTableEnvironmentImpl
extends
TableEnvironmentImpl
implements
CustomTableEnvironment
{
protected
CustomTableEnvironmentImpl
(
CatalogManager
catalogManager
,
ModuleManager
moduleManager
,
TableConfig
tableConfig
,
Executor
executor
,
FunctionCatalog
functionCatalog
,
Planner
planner
,
boolean
isStreamingMode
,
ClassLoader
userClassLoader
)
{
super
(
catalogManager
,
moduleManager
,
tableConfig
,
executor
,
functionCatalog
,
planner
,
isStreamingMode
,
userClassLoader
);
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
)
{
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
build
());
}
public
static
CustomTableEnvironmentImpl
createBatch
(
StreamExecutionEnvironment
executionEnvironment
)
{
Configuration
configuration
=
new
Configuration
();
configuration
.
set
(
ExecutionOptions
.
RUNTIME_MODE
,
RuntimeExecutionMode
.
BATCH
);
TableConfig
tableConfig
=
new
TableConfig
();
tableConfig
.
addConfiguration
(
configuration
);
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
useBlinkPlanner
().
inBatchMode
().
build
(),
tableConfig
);
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
)
{
return
create
(
executionEnvironment
,
settings
,
new
TableConfig
());
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
,
TableConfig
tableConfig
)
{
ClassLoader
classLoader
=
Thread
.
currentThread
().
getContextClassLoader
();
ModuleManager
moduleManager
=
new
ModuleManager
();
CatalogManager
catalogManager
=
CatalogManager
.
newBuilder
().
classLoader
(
classLoader
).
config
(
tableConfig
.
getConfiguration
()).
defaultCatalog
(
settings
.
getBuiltInCatalogName
(),
new
GenericInMemoryCatalog
(
settings
.
getBuiltInCatalogName
(),
settings
.
getBuiltInDatabaseName
())).
executionConfig
(
executionEnvironment
.
getConfig
()).
build
();
FunctionCatalog
functionCatalog
=
new
FunctionCatalog
(
tableConfig
,
catalogManager
,
moduleManager
);
Map
<
String
,
String
>
executorProperties
=
settings
.
toExecutorProperties
();
Executor
executor
=
lookupExecutor
(
executorProperties
,
executionEnvironment
);
Map
<
String
,
String
>
plannerProperties
=
settings
.
toPlannerProperties
();
Planner
planner
=
(
ComponentFactoryService
.
find
(
PlannerFactory
.
class
,
plannerProperties
)).
create
(
plannerProperties
,
executor
,
tableConfig
,
functionCatalog
,
catalogManager
);
return
new
CustomTableEnvironmentImpl
(
catalogManager
,
moduleManager
,
tableConfig
,
executor
,
functionCatalog
,
planner
,
settings
.
isStreamingMode
(),
classLoader
);
}
private
static
Executor
lookupExecutor
(
Map
<
String
,
String
>
executorProperties
,
StreamExecutionEnvironment
executionEnvironment
)
{
try
{
ExecutorFactory
executorFactory
=
ComponentFactoryService
.
find
(
ExecutorFactory
.
class
,
executorProperties
);
Method
createMethod
=
executorFactory
.
getClass
().
getMethod
(
"create"
,
Map
.
class
,
StreamExecutionEnvironment
.
class
);
return
(
Executor
)
createMethod
.
invoke
(
executorFactory
,
executorProperties
,
executionEnvironment
);
}
catch
(
Exception
var4
)
{
throw
new
TableException
(
"Could not instantiate the executor. Make sure a planner module is on the classpath"
,
var4
);
}
}
public
ObjectNode
getStreamGraph
(
String
statement
)
{
List
<
Operation
>
operations
=
super
.
parser
.
parse
(
statement
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Unsupported SQL query! explainSql() only accepts a single SQL query."
);
}
else
{
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
operations
.
size
();
i
++)
{
if
(
operations
.
get
(
i
)
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operations
.
get
(
i
));
}
}
List
<
Transformation
<?>>
trans
=
super
.
planner
.
translate
(
modifyOperations
);
if
(
execEnv
instanceof
ExecutorBase
)
{
StreamGraph
streamGraph
=
ExecutorUtils
.
generateStreamGraph
(((
ExecutorBase
)
execEnv
).
getExecutionEnvironment
(),
trans
);
JSONGenerator
jsonGenerator
=
new
JSONGenerator
(
streamGraph
);
String
json
=
jsonGenerator
.
getJSON
();
ObjectMapper
mapper
=
new
ObjectMapper
();
ObjectNode
objectNode
=
mapper
.
createObjectNode
();
try
{
objectNode
=
(
ObjectNode
)
mapper
.
readTree
(
json
);
}
catch
(
JsonProcessingException
e
)
{
e
.
printStackTrace
();
}
finally
{
return
objectNode
;
}
}
else
{
throw
new
TableException
(
"Unsupported SQL query! ExecEnv need a ExecutorBase."
);
}
}
}
public
JobPlanInfo
getJobPlanInfo
(
List
<
String
>
statements
)
{
return
new
JobPlanInfo
(
JsonPlanGenerator
.
generatePlan
(
getJobGraphFromInserts
(
statements
)));
}
public
StreamGraph
getStreamGraphFromInserts
(
List
<
String
>
statements
)
{
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
();
for
(
String
statement
:
statements
)
{
List
<
Operation
>
operations
=
getParser
().
parse
(
statement
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Only single statement is supported."
);
}
else
{
Operation
operation
=
operations
.
get
(
0
);
if
(
operation
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operation
);
}
else
{
throw
new
TableException
(
"Only insert statement is supported now."
);
}
}
}
List
<
Transformation
<?>>
trans
=
getPlanner
().
translate
(
modifyOperations
);
if
(
execEnv
instanceof
ExecutorBase
)
{
StreamGraph
streamGraph
=
ExecutorUtils
.
generateStreamGraph
(((
ExecutorBase
)
execEnv
).
getExecutionEnvironment
(),
trans
);
if
(
tableConfig
.
getConfiguration
().
containsKey
(
PipelineOptions
.
NAME
.
key
()))
{
streamGraph
.
setJobName
(
tableConfig
.
getConfiguration
().
getString
(
PipelineOptions
.
NAME
));
}
return
streamGraph
;
}
else
{
throw
new
TableException
(
"Unsupported SQL query! ExecEnv need a ExecutorBase."
);
}
}
public
JobGraph
getJobGraphFromInserts
(
List
<
String
>
statements
)
{
return
getStreamGraphFromInserts
(
statements
).
getJobGraph
();
}
public
SqlExplainResult
explainSqlRecord
(
String
statement
,
ExplainDetail
...
extraDetails
)
{
SqlExplainResult
record
=
new
SqlExplainResult
();
List
<
Operation
>
operations
=
parser
.
parse
(
statement
);
record
.
setParseTrue
(
true
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Unsupported SQL query! explainSql() only accepts a single SQL query."
);
}
List
<
Operation
>
operationlist
=
new
ArrayList
<>(
operations
);
for
(
int
i
=
0
;
i
<
operationlist
.
size
();
i
++)
{
Operation
operation
=
operationlist
.
get
(
i
);
if
(
operation
instanceof
ModifyOperation
)
{
record
.
setType
(
"Modify DML"
);
}
else
if
(
operation
instanceof
ExplainOperation
)
{
record
.
setType
(
"Explain DML"
);
}
else
if
(
operation
instanceof
QueryOperation
)
{
record
.
setType
(
"Query DML"
);
}
else
{
record
.
setExplain
(
operation
.
asSummaryString
());
operationlist
.
remove
(
i
);
record
.
setType
(
"DDL"
);
i
=
i
-
1
;
}
}
record
.
setExplainTrue
(
true
);
if
(
operationlist
.
size
()
==
0
)
{
return
record
;
}
record
.
setExplain
(
planner
.
explain
(
operationlist
,
extraDetails
));
return
record
;
}
public
<
T
>
void
registerFunction
(
String
name
,
TableFunction
<
T
>
tableFunction
)
{
TypeInformation
<
T
>
typeInfo
=
UserDefinedFunctionHelper
.
getReturnTypeOfTableFunction
(
tableFunction
);
this
.
functionCatalog
.
registerTempSystemTableFunction
(
name
,
tableFunction
,
typeInfo
);
}
public
<
T
,
ACC
>
void
registerFunction
(
String
name
,
AggregateFunction
<
T
,
ACC
>
aggregateFunction
)
{
TypeInformation
<
T
>
typeInfo
=
UserDefinedFunctionHelper
.
getReturnTypeOfAggregateFunction
(
aggregateFunction
);
TypeInformation
<
ACC
>
accTypeInfo
=
UserDefinedFunctionHelper
.
getAccumulatorTypeOfAggregateFunction
(
aggregateFunction
);
this
.
functionCatalog
.
registerTempSystemAggregateFunction
(
name
,
aggregateFunction
,
typeInfo
,
accTypeInfo
);
}
public
<
T
,
ACC
>
void
registerFunction
(
String
name
,
TableAggregateFunction
<
T
,
ACC
>
tableAggregateFunction
)
{
TypeInformation
<
T
>
typeInfo
=
UserDefinedFunctionHelper
.
getReturnTypeOfAggregateFunction
(
tableAggregateFunction
);
TypeInformation
<
ACC
>
accTypeInfo
=
UserDefinedFunctionHelper
.
getAccumulatorTypeOfAggregateFunction
(
tableAggregateFunction
);
this
.
functionCatalog
.
registerTempSystemAggregateFunction
(
name
,
tableAggregateFunction
,
typeInfo
,
accTypeInfo
);
}
public
boolean
parseAndLoadConfiguration
(
String
statement
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
return
false
;
}
}
\ No newline at end of file
dlink-app/dlink-app-1.12/src/main/java/com/dlink/executor/CustomTableResultImpl.java
0 → 100644
View file @
8fc49e14
package
com
.
dlink
.
executor
;
import
org.apache.flink.annotation.Internal
;
import
org.apache.flink.core.execution.JobClient
;
import
org.apache.flink.table.api.*
;
import
org.apache.flink.table.utils.PrintUtils
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.util.CloseableIterator
;
import
org.apache.flink.util.Preconditions
;
import
javax.annotation.Nullable
;
import
java.io.PrintWriter
;
import
java.util.Collections
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.Optional
;
import
java.util.concurrent.*
;
/**
* 定制CustomTableResultImpl
*
* @author wenmo
* @since 2021/6/7 22:06
**/
@Internal
class
CustomTableResultImpl
implements
TableResult
{
public
static
final
TableResult
TABLE_RESULT_OK
=
CustomTableResultImpl
.
builder
()
.
resultKind
(
ResultKind
.
SUCCESS
)
.
tableSchema
(
TableSchema
.
builder
().
field
(
"result"
,
DataTypes
.
STRING
()).
build
())
.
data
(
Collections
.
singletonList
(
Row
.
of
(
"OK"
)))
.
build
();
private
final
JobClient
jobClient
;
private
final
TableSchema
tableSchema
;
private
final
ResultKind
resultKind
;
private
final
CloseableRowIteratorWrapper
data
;
private
final
PrintStyle
printStyle
;
private
CustomTableResultImpl
(
@Nullable
JobClient
jobClient
,
TableSchema
tableSchema
,
ResultKind
resultKind
,
CloseableIterator
<
Row
>
data
,
PrintStyle
printStyle
)
{
this
.
jobClient
=
jobClient
;
this
.
tableSchema
=
Preconditions
.
checkNotNull
(
tableSchema
,
"tableSchema should not be null"
);
this
.
resultKind
=
Preconditions
.
checkNotNull
(
resultKind
,
"resultKind should not be null"
);
Preconditions
.
checkNotNull
(
data
,
"data should not be null"
);
this
.
data
=
new
CloseableRowIteratorWrapper
(
data
);
this
.
printStyle
=
Preconditions
.
checkNotNull
(
printStyle
,
"printStyle should not be null"
);
}
public
static
TableResult
buildTableResult
(
List
<
TableSchemaField
>
fields
,
List
<
Row
>
rows
)
{
Builder
builder
=
builder
().
resultKind
(
ResultKind
.
SUCCESS
);
if
(
fields
.
size
()
>
0
)
{
TableSchema
.
Builder
tableSchemaBuild
=
TableSchema
.
builder
();
for
(
int
i
=
0
;
i
<
fields
.
size
();
i
++)
{
tableSchemaBuild
.
field
(
fields
.
get
(
i
).
getName
(),
fields
.
get
(
i
).
getType
());
}
builder
.
tableSchema
(
tableSchemaBuild
.
build
()).
data
(
rows
);
}
return
builder
.
build
();
}
@Override
public
Optional
<
JobClient
>
getJobClient
()
{
return
Optional
.
ofNullable
(
jobClient
);
}
@Override
public
void
await
()
throws
InterruptedException
,
ExecutionException
{
try
{
awaitInternal
(-
1
,
TimeUnit
.
MILLISECONDS
);
}
catch
(
TimeoutException
e
)
{
// do nothing
}
}
@Override
public
void
await
(
long
timeout
,
TimeUnit
unit
)
throws
InterruptedException
,
ExecutionException
,
TimeoutException
{
awaitInternal
(
timeout
,
unit
);
}
private
void
awaitInternal
(
long
timeout
,
TimeUnit
unit
)
throws
InterruptedException
,
ExecutionException
,
TimeoutException
{
if
(
jobClient
==
null
)
{
return
;
}
ExecutorService
executor
=
Executors
.
newFixedThreadPool
(
1
,
r
->
new
Thread
(
r
,
"TableResult-await-thread"
));
try
{
CompletableFuture
<
Void
>
future
=
CompletableFuture
.
runAsync
(
()
->
{
while
(!
data
.
isFirstRowReady
())
{
try
{
Thread
.
sleep
(
100
);
}
catch
(
InterruptedException
e
)
{
throw
new
TableException
(
"Thread is interrupted"
);
}
}
},
executor
);
if
(
timeout
>=
0
)
{
future
.
get
(
timeout
,
unit
);
}
else
{
future
.
get
();
}
}
finally
{
executor
.
shutdown
();
}
}
@Override
public
TableSchema
getTableSchema
()
{
return
tableSchema
;
}
@Override
public
ResultKind
getResultKind
()
{
return
resultKind
;
}
@Override
public
CloseableIterator
<
Row
>
collect
()
{
return
data
;
}
@Override
public
void
print
()
{
Iterator
<
Row
>
it
=
collect
();
if
(
printStyle
instanceof
TableauStyle
)
{
int
maxColumnWidth
=
((
TableauStyle
)
printStyle
).
getMaxColumnWidth
();
String
nullColumn
=
((
TableauStyle
)
printStyle
).
getNullColumn
();
boolean
deriveColumnWidthByType
=
((
TableauStyle
)
printStyle
).
isDeriveColumnWidthByType
();
boolean
printRowKind
=
((
TableauStyle
)
printStyle
).
isPrintRowKind
();
PrintUtils
.
printAsTableauForm
(
getTableSchema
(),
it
,
new
PrintWriter
(
System
.
out
),
maxColumnWidth
,
nullColumn
,
deriveColumnWidthByType
,
printRowKind
);
}
else
if
(
printStyle
instanceof
RawContentStyle
)
{
while
(
it
.
hasNext
())
{
System
.
out
.
println
(
String
.
join
(
","
,
PrintUtils
.
rowToString
(
it
.
next
())));
}
}
else
{
throw
new
TableException
(
"Unsupported print style: "
+
printStyle
);
}
}
public
static
Builder
builder
()
{
return
new
Builder
();
}
/**
* Builder for creating a {@link CustomTableResultImpl}.
*/
public
static
class
Builder
{
private
JobClient
jobClient
=
null
;
private
TableSchema
tableSchema
=
null
;
private
ResultKind
resultKind
=
null
;
private
CloseableIterator
<
Row
>
data
=
null
;
private
PrintStyle
printStyle
=
PrintStyle
.
tableau
(
Integer
.
MAX_VALUE
,
PrintUtils
.
NULL_COLUMN
,
false
,
false
);
private
Builder
()
{
}
/**
* Specifies job client which associates the submitted Flink job.
*
* @param jobClient a {@link JobClient} for the submitted Flink job.
*/
public
Builder
jobClient
(
JobClient
jobClient
)
{
this
.
jobClient
=
jobClient
;
return
this
;
}
/**
* Specifies table schema of the execution result.
*
* @param tableSchema a {@link TableSchema} for the execution result.
*/
public
Builder
tableSchema
(
TableSchema
tableSchema
)
{
Preconditions
.
checkNotNull
(
tableSchema
,
"tableSchema should not be null"
);
this
.
tableSchema
=
tableSchema
;
return
this
;
}
/**
* Specifies result kind of the execution result.
*
* @param resultKind a {@link ResultKind} for the execution result.
*/
public
Builder
resultKind
(
ResultKind
resultKind
)
{
Preconditions
.
checkNotNull
(
resultKind
,
"resultKind should not be null"
);
this
.
resultKind
=
resultKind
;
return
this
;
}
/**
* Specifies an row iterator as the execution result.
*
* @param rowIterator a row iterator as the execution result.
*/
public
Builder
data
(
CloseableIterator
<
Row
>
rowIterator
)
{
Preconditions
.
checkNotNull
(
rowIterator
,
"rowIterator should not be null"
);
this
.
data
=
rowIterator
;
return
this
;
}
/**
* Specifies an row list as the execution result.
*
* @param rowList a row list as the execution result.
*/
public
Builder
data
(
List
<
Row
>
rowList
)
{
Preconditions
.
checkNotNull
(
rowList
,
"listRows should not be null"
);
this
.
data
=
CloseableIterator
.
adapterForIterator
(
rowList
.
iterator
());
return
this
;
}
/**
* Specifies print style. Default is {@link TableauStyle} with max integer column width.
*/
public
Builder
setPrintStyle
(
PrintStyle
printStyle
)
{
Preconditions
.
checkNotNull
(
printStyle
,
"printStyle should not be null"
);
this
.
printStyle
=
printStyle
;
return
this
;
}
/**
* Returns a {@link TableResult} instance.
*/
public
TableResult
build
()
{
return
new
CustomTableResultImpl
(
jobClient
,
tableSchema
,
resultKind
,
data
,
printStyle
);
}
}
/**
* Root interface for all print styles.
*/
public
interface
PrintStyle
{
/**
* Create a tableau print style with given max column width, null column, change mode
* indicator and a flag to indicate whether the column width is derived from type (true) or
* content (false), which prints the result schema and content as tableau form.
*/
static
PrintStyle
tableau
(
int
maxColumnWidth
,
String
nullColumn
,
boolean
deriveColumnWidthByType
,
boolean
printRowKind
)
{
Preconditions
.
checkArgument
(
maxColumnWidth
>
0
,
"maxColumnWidth should be greater than 0"
);
Preconditions
.
checkNotNull
(
nullColumn
,
"nullColumn should not be null"
);
return
new
TableauStyle
(
maxColumnWidth
,
nullColumn
,
deriveColumnWidthByType
,
printRowKind
);
}
/**
* Create a raw content print style, which only print the result content as raw form. column
* delimiter is ",", row delimiter is "\n".
*/
static
PrintStyle
rawContent
()
{
return
new
RawContentStyle
();
}
}
/**
* print the result schema and content as tableau form.
*/
private
static
final
class
TableauStyle
implements
PrintStyle
{
/**
* A flag to indicate whether the column width is derived from type (true) or content
* (false).
*/
private
final
boolean
deriveColumnWidthByType
;
private
final
int
maxColumnWidth
;
private
final
String
nullColumn
;
/**
* A flag to indicate whether print row kind info.
*/
private
final
boolean
printRowKind
;
private
TableauStyle
(
int
maxColumnWidth
,
String
nullColumn
,
boolean
deriveColumnWidthByType
,
boolean
printRowKind
)
{
this
.
deriveColumnWidthByType
=
deriveColumnWidthByType
;
this
.
maxColumnWidth
=
maxColumnWidth
;
this
.
nullColumn
=
nullColumn
;
this
.
printRowKind
=
printRowKind
;
}
public
boolean
isDeriveColumnWidthByType
()
{
return
deriveColumnWidthByType
;
}
int
getMaxColumnWidth
()
{
return
maxColumnWidth
;
}
String
getNullColumn
()
{
return
nullColumn
;
}
public
boolean
isPrintRowKind
()
{
return
printRowKind
;
}
}
/**
* only print the result content as raw form. column delimiter is ",", row delimiter is "\n".
*/
private
static
final
class
RawContentStyle
implements
PrintStyle
{
}
/**
* A {@link CloseableIterator} wrapper class that can return whether the first row is ready.
*
* <p>The first row is ready when {@link #hasNext} method returns true or {@link #next()} method
* returns a row. The execution order of {@link TableResult#collect} method and {@link
* TableResult#await()} may be arbitrary, this class will record whether the first row is ready
* (or accessed).
*/
private
static
final
class
CloseableRowIteratorWrapper
implements
CloseableIterator
<
Row
>
{
private
final
CloseableIterator
<
Row
>
iterator
;
private
boolean
isFirstRowReady
=
false
;
private
CloseableRowIteratorWrapper
(
CloseableIterator
<
Row
>
iterator
)
{
this
.
iterator
=
iterator
;
}
@Override
public
void
close
()
throws
Exception
{
iterator
.
close
();
}
@Override
public
boolean
hasNext
()
{
boolean
hasNext
=
iterator
.
hasNext
();
isFirstRowReady
=
isFirstRowReady
||
hasNext
;
return
hasNext
;
}
@Override
public
Row
next
()
{
Row
next
=
iterator
.
next
();
isFirstRowReady
=
true
;
return
next
;
}
public
boolean
isFirstRowReady
()
{
return
isFirstRowReady
||
hasNext
();
}
}
}
dlink-app/dlink-app-1.12/src/main/java/com/dlink/executor/TableSchemaField.java
0 → 100644
View file @
8fc49e14
package
com
.
dlink
.
executor
;
import
org.apache.flink.table.types.DataType
;
/**
* @author wenmo
* @since 2021/6/7 22:06
**/
public
class
TableSchemaField
{
private
String
name
;
private
DataType
type
;
public
TableSchemaField
(
String
name
,
DataType
type
)
{
this
.
name
=
name
;
this
.
type
=
type
;
}
public
String
getName
()
{
return
name
;
}
public
void
setName
(
String
name
)
{
this
.
name
=
name
;
}
public
DataType
getType
()
{
return
type
;
}
public
void
setType
(
DataType
type
)
{
this
.
type
=
type
;
}
}
dlink-app/dlink-app-1.13/pom.xml
0 → 100644
View file @
8fc49e14
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
dlink-app
</artifactId>
<groupId>
com.dlink
</groupId>
<version>
0.6.0-SNAPSHOT
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
dlink-app-1.13
</artifactId>
<properties>
<mainClass>
com.dlink.app.MainApp
</mainClass>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<maven.compiler.source>
1.8
</maven.compiler.source>
<maven.compiler.target>
1.8
</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-app-base
</artifactId>
<version>
${project.version}
</version>
</dependency>
<dependency>
<groupId>
mysql
</groupId>
<artifactId>
mysql-connector-java
</artifactId>
<!-- <scope>provided</scope>-->
<version>
8.0.21
</version>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-1.13
</artifactId>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-base
</artifactId>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-executor
</artifactId>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>
src/main/resources
</directory>
<includes>
<include>
*.properties
</include>
</includes>
</resource>
</resources>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<version>
3.6.0
</version>
<configuration>
<source>
1.8
</source>
<target>
1.8
</target>
<encoding>
UTF-8
</encoding>
</configuration>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-assembly-plugin
</artifactId>
<version>
2.6
</version>
<configuration>
<descriptorRefs>
<descriptorRef>
jar-with-dependencies
</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass>
com.dlink.app.MainApp
</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>
make-assembly
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
dlink-app/dlink-app-1.13/src/main/java/com/dlink/app/MainApp.java
0 → 100644
View file @
8fc49e14
package
com
.
dlink
.
app
;
import
com.dlink.app.db.DBConfig
;
import
com.dlink.app.flinksql.Submiter
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.constant.FlinkParamConstant
;
import
com.dlink.utils.FlinkBaseUtil
;
import
java.io.IOException
;
import
java.util.Map
;
/**
* MainApp
*
* @author wenmo
* @since 2021/10/27
**/
public
class
MainApp
{
public
static
void
main
(
String
[]
args
)
throws
IOException
{
Map
<
String
,
String
>
params
=
FlinkBaseUtil
.
getParamsFromArgs
(
args
);
String
id
=
params
.
get
(
FlinkParamConstant
.
ID
);
Asserts
.
checkNullString
(
id
,
"请配置入参 id "
);
DBConfig
dbConfig
=
DBConfig
.
build
(
params
);
Submiter
.
submit
(
Integer
.
valueOf
(
id
),
dbConfig
);
}
}
dlink-app/src/main/java/com/dlink/executor/CustomTableEnvironmentImpl.java
→
dlink-app/
dlink-app-1.13/
src/main/java/com/dlink/executor/CustomTableEnvironmentImpl.java
View file @
8fc49e14
File moved
dlink-app/src/main/java/com/dlink/executor/CustomTableResultImpl.java
→
dlink-app/
dlink-app-1.13/
src/main/java/com/dlink/executor/CustomTableResultImpl.java
View file @
8fc49e14
...
@@ -22,6 +22,7 @@ import java.util.concurrent.*;
...
@@ -22,6 +22,7 @@ import java.util.concurrent.*;
/**
/**
* 定制CustomTableResultImpl
* 定制CustomTableResultImpl
*
* @author wenmo
* @author wenmo
* @since 2021/6/7 22:06
* @since 2021/6/7 22:06
**/
**/
...
@@ -59,16 +60,16 @@ public class CustomTableResultImpl implements TableResult {
...
@@ -59,16 +60,16 @@ public class CustomTableResultImpl implements TableResult {
Preconditions
.
checkNotNull
(
sessionTimeZone
,
"sessionTimeZone should not be null"
);
Preconditions
.
checkNotNull
(
sessionTimeZone
,
"sessionTimeZone should not be null"
);
}
}
public
static
TableResult
buildTableResult
(
List
<
TableSchemaField
>
fields
,
List
<
Row
>
rows
)
{
public
static
TableResult
buildTableResult
(
List
<
TableSchemaField
>
fields
,
List
<
Row
>
rows
)
{
Builder
builder
=
builder
().
resultKind
(
ResultKind
.
SUCCESS
);
Builder
builder
=
builder
().
resultKind
(
ResultKind
.
SUCCESS
);
if
(
fields
.
size
()>
0
)
{
if
(
fields
.
size
()
>
0
)
{
List
<
String
>
columnNames
=
new
ArrayList
<>();
List
<
String
>
columnNames
=
new
ArrayList
<>();
List
<
DataType
>
columnTypes
=
new
ArrayList
<>();
List
<
DataType
>
columnTypes
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
fields
.
size
();
i
++)
{
for
(
int
i
=
0
;
i
<
fields
.
size
();
i
++)
{
columnNames
.
add
(
fields
.
get
(
i
).
getName
());
columnNames
.
add
(
fields
.
get
(
i
).
getName
());
columnTypes
.
add
(
fields
.
get
(
i
).
getType
());
columnTypes
.
add
(
fields
.
get
(
i
).
getType
());
}
}
builder
.
schema
(
ResolvedSchema
.
physical
(
columnNames
,
columnTypes
)).
data
(
rows
);
builder
.
schema
(
ResolvedSchema
.
physical
(
columnNames
,
columnTypes
)).
data
(
rows
);
}
}
return
builder
.
build
();
return
builder
.
build
();
}
}
...
@@ -175,7 +176,9 @@ public class CustomTableResultImpl implements TableResult {
...
@@ -175,7 +176,9 @@ public class CustomTableResultImpl implements TableResult {
return
new
Builder
();
return
new
Builder
();
}
}
/** Builder for creating a {@link CustomTableResultImpl}. */
/**
* Builder for creating a {@link CustomTableResultImpl}.
*/
public
static
class
Builder
{
public
static
class
Builder
{
private
JobClient
jobClient
=
null
;
private
JobClient
jobClient
=
null
;
private
ResolvedSchema
resolvedSchema
=
null
;
private
ResolvedSchema
resolvedSchema
=
null
;
...
@@ -185,7 +188,8 @@ public class CustomTableResultImpl implements TableResult {
...
@@ -185,7 +188,8 @@ public class CustomTableResultImpl implements TableResult {
PrintStyle
.
tableau
(
Integer
.
MAX_VALUE
,
PrintUtils
.
NULL_COLUMN
,
false
,
false
);
PrintStyle
.
tableau
(
Integer
.
MAX_VALUE
,
PrintUtils
.
NULL_COLUMN
,
false
,
false
);
private
ZoneId
sessionTimeZone
=
ZoneId
.
of
(
"UTC"
);
private
ZoneId
sessionTimeZone
=
ZoneId
.
of
(
"UTC"
);
private
Builder
()
{}
private
Builder
()
{
}
/**
/**
* Specifies job client which associates the submitted Flink job.
* Specifies job client which associates the submitted Flink job.
...
@@ -241,28 +245,36 @@ public class CustomTableResultImpl implements TableResult {
...
@@ -241,28 +245,36 @@ public class CustomTableResultImpl implements TableResult {
return
this
;
return
this
;
}
}
/** Specifies print style. Default is {@link TableauStyle} with max integer column width. */
/**
* Specifies print style. Default is {@link TableauStyle} with max integer column width.
*/
public
Builder
setPrintStyle
(
PrintStyle
printStyle
)
{
public
Builder
setPrintStyle
(
PrintStyle
printStyle
)
{
Preconditions
.
checkNotNull
(
printStyle
,
"printStyle should not be null"
);
Preconditions
.
checkNotNull
(
printStyle
,
"printStyle should not be null"
);
this
.
printStyle
=
printStyle
;
this
.
printStyle
=
printStyle
;
return
this
;
return
this
;
}
}
/** Specifies session time zone. */
/**
* Specifies session time zone.
*/
public
Builder
setSessionTimeZone
(
ZoneId
sessionTimeZone
)
{
public
Builder
setSessionTimeZone
(
ZoneId
sessionTimeZone
)
{
Preconditions
.
checkNotNull
(
sessionTimeZone
,
"sessionTimeZone should not be null"
);
Preconditions
.
checkNotNull
(
sessionTimeZone
,
"sessionTimeZone should not be null"
);
this
.
sessionTimeZone
=
sessionTimeZone
;
this
.
sessionTimeZone
=
sessionTimeZone
;
return
this
;
return
this
;
}
}
/** Returns a {@link TableResult} instance. */
/**
* Returns a {@link TableResult} instance.
*/
public
TableResult
build
()
{
public
TableResult
build
()
{
return
new
CustomTableResultImpl
(
return
new
CustomTableResultImpl
(
jobClient
,
resolvedSchema
,
resultKind
,
data
,
printStyle
,
sessionTimeZone
);
jobClient
,
resolvedSchema
,
resultKind
,
data
,
printStyle
,
sessionTimeZone
);
}
}
}
}
/** Root interface for all print styles. */
/**
* Root interface for all print styles.
*/
public
interface
PrintStyle
{
public
interface
PrintStyle
{
/**
/**
* Create a tableau print style with given max column width, null column, change mode
* Create a tableau print style with given max column width, null column, change mode
...
@@ -290,7 +302,9 @@ public class CustomTableResultImpl implements TableResult {
...
@@ -290,7 +302,9 @@ public class CustomTableResultImpl implements TableResult {
}
}
}
}
/** print the result schema and content as tableau form. */
/**
* print the result schema and content as tableau form.
*/
private
static
final
class
TableauStyle
implements
PrintStyle
{
private
static
final
class
TableauStyle
implements
PrintStyle
{
/**
/**
* A flag to indicate whether the column width is derived from type (true) or content
* A flag to indicate whether the column width is derived from type (true) or content
...
@@ -300,7 +314,9 @@ public class CustomTableResultImpl implements TableResult {
...
@@ -300,7 +314,9 @@ public class CustomTableResultImpl implements TableResult {
private
final
int
maxColumnWidth
;
private
final
int
maxColumnWidth
;
private
final
String
nullColumn
;
private
final
String
nullColumn
;
/** A flag to indicate whether print row kind info. */
/**
* A flag to indicate whether print row kind info.
*/
private
final
boolean
printRowKind
;
private
final
boolean
printRowKind
;
private
TableauStyle
(
private
TableauStyle
(
...
@@ -334,7 +350,8 @@ public class CustomTableResultImpl implements TableResult {
...
@@ -334,7 +350,8 @@ public class CustomTableResultImpl implements TableResult {
/**
/**
* only print the result content as raw form. column delimiter is ",", row delimiter is "\n".
* only print the result content as raw form. column delimiter is ",", row delimiter is "\n".
*/
*/
private
static
final
class
RawContentStyle
implements
PrintStyle
{}
private
static
final
class
RawContentStyle
implements
PrintStyle
{
}
/**
/**
* A {@link CloseableIterator} wrapper class that can return whether the first row is ready.
* A {@link CloseableIterator} wrapper class that can return whether the first row is ready.
...
...
dlink-app/dlink-app-1.13/src/main/java/com/dlink/executor/TableSchemaField.java
0 → 100644
View file @
8fc49e14
package
com
.
dlink
.
executor
;
import
org.apache.flink.table.types.DataType
;
/**
* @author wenmo
* @since 2021/6/7 22:06
**/
public
class
TableSchemaField
{
private
String
name
;
private
DataType
type
;
public
TableSchemaField
(
String
name
,
DataType
type
)
{
this
.
name
=
name
;
this
.
type
=
type
;
}
public
String
getName
()
{
return
name
;
}
public
void
setName
(
String
name
)
{
this
.
name
=
name
;
}
public
DataType
getType
()
{
return
type
;
}
public
void
setType
(
DataType
type
)
{
this
.
type
=
type
;
}
}
dlink-app/dlink-app-1.14/pom.xml
0 → 100644
View file @
8fc49e14
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
dlink-app
</artifactId>
<groupId>
com.dlink
</groupId>
<version>
0.6.0-SNAPSHOT
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
dlink-app-1.14
</artifactId>
<properties>
<mainClass>
com.dlink.app.MainApp
</mainClass>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<maven.compiler.source>
1.8
</maven.compiler.source>
<maven.compiler.target>
1.8
</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-app-base
</artifactId>
<version>
${project.version}
</version>
</dependency>
<dependency>
<groupId>
mysql
</groupId>
<artifactId>
mysql-connector-java
</artifactId>
<!-- <scope>provided</scope>-->
<version>
8.0.21
</version>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-1.14
</artifactId>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-base
</artifactId>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-executor
</artifactId>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>
src/main/resources
</directory>
<includes>
<include>
*.properties
</include>
</includes>
</resource>
</resources>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<version>
3.6.0
</version>
<configuration>
<source>
1.8
</source>
<target>
1.8
</target>
<encoding>
UTF-8
</encoding>
</configuration>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-assembly-plugin
</artifactId>
<version>
2.6
</version>
<configuration>
<descriptorRefs>
<descriptorRef>
jar-with-dependencies
</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass>
com.dlink.app.MainApp
</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>
make-assembly
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
dlink-app/dlink-app-1.14/src/main/java/com/dlink/app/MainApp.java
0 → 100644
View file @
8fc49e14
package
com
.
dlink
.
app
;
import
com.dlink.app.db.DBConfig
;
import
com.dlink.app.flinksql.Submiter
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.constant.FlinkParamConstant
;
import
com.dlink.utils.FlinkBaseUtil
;
import
java.io.IOException
;
import
java.util.Map
;
/**
* MainApp
*
* @author wenmo
* @since 2021/10/27
**/
public
class
MainApp
{
public
static
void
main
(
String
[]
args
)
throws
IOException
{
Map
<
String
,
String
>
params
=
FlinkBaseUtil
.
getParamsFromArgs
(
args
);
String
id
=
params
.
get
(
FlinkParamConstant
.
ID
);
Asserts
.
checkNullString
(
id
,
"请配置入参 id "
);
DBConfig
dbConfig
=
DBConfig
.
build
(
params
);
Submiter
.
submit
(
Integer
.
valueOf
(
id
),
dbConfig
);
}
}
dlink-app/dlink-app-1.14/src/main/java/com/dlink/executor/CustomTableEnvironmentImpl.java
0 → 100644
View file @
8fc49e14
package
com
.
dlink
.
executor
;
import
com.dlink.assertion.Asserts
;
import
com.dlink.result.SqlExplainResult
;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
org.apache.flink.api.common.RuntimeExecutionMode
;
import
org.apache.flink.api.common.typeinfo.TypeInformation
;
import
org.apache.flink.api.dag.Transformation
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.configuration.ExecutionOptions
;
import
org.apache.flink.configuration.PipelineOptions
;
import
org.apache.flink.runtime.jobgraph.JobGraph
;
import
org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
;
import
org.apache.flink.runtime.rest.messages.JobPlanInfo
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.graph.JSONGenerator
;
import
org.apache.flink.streaming.api.graph.StreamGraph
;
import
org.apache.flink.table.api.EnvironmentSettings
;
import
org.apache.flink.table.api.ExplainDetail
;
import
org.apache.flink.table.api.TableConfig
;
import
org.apache.flink.table.api.TableException
;
import
org.apache.flink.table.api.internal.TableEnvironmentImpl
;
import
org.apache.flink.table.catalog.CatalogManager
;
import
org.apache.flink.table.catalog.FunctionCatalog
;
import
org.apache.flink.table.catalog.GenericInMemoryCatalog
;
import
org.apache.flink.table.delegation.Executor
;
import
org.apache.flink.table.delegation.ExecutorFactory
;
import
org.apache.flink.table.delegation.Planner
;
import
org.apache.flink.table.factories.FactoryUtil
;
import
org.apache.flink.table.factories.PlannerFactoryUtil
;
import
org.apache.flink.table.functions.AggregateFunction
;
import
org.apache.flink.table.functions.TableAggregateFunction
;
import
org.apache.flink.table.functions.TableFunction
;
import
org.apache.flink.table.functions.UserDefinedFunctionHelper
;
import
org.apache.flink.table.module.ModuleManager
;
import
org.apache.flink.table.operations.ExplainOperation
;
import
org.apache.flink.table.operations.ModifyOperation
;
import
org.apache.flink.table.operations.Operation
;
import
org.apache.flink.table.operations.QueryOperation
;
import
org.apache.flink.table.operations.command.ResetOperation
;
import
org.apache.flink.table.operations.command.SetOperation
;
import
org.apache.flink.table.planner.delegation.DefaultExecutor
;
import
java.lang.reflect.Method
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
/**
* 定制TableEnvironmentImpl
*
* @author wenmo
* @since 2021/10/22 10:02
**/
public
class
CustomTableEnvironmentImpl
extends
TableEnvironmentImpl
implements
CustomTableEnvironment
{
protected
CustomTableEnvironmentImpl
(
CatalogManager
catalogManager
,
ModuleManager
moduleManager
,
TableConfig
tableConfig
,
Executor
executor
,
FunctionCatalog
functionCatalog
,
Planner
planner
,
boolean
isStreamingMode
,
ClassLoader
userClassLoader
)
{
super
(
catalogManager
,
moduleManager
,
tableConfig
,
executor
,
functionCatalog
,
planner
,
isStreamingMode
,
userClassLoader
);
}
public
CustomTableEnvironmentImpl
(
CatalogManager
catalogManager
,
ModuleManager
moduleManager
,
FunctionCatalog
functionCatalog
,
TableConfig
tableConfig
,
StreamExecutionEnvironment
executionEnvironment
,
Planner
planner
,
Executor
executor
,
boolean
isStreamingMode
,
ClassLoader
userClassLoader
)
{
super
(
catalogManager
,
moduleManager
,
tableConfig
,
executor
,
functionCatalog
,
planner
,
isStreamingMode
,
userClassLoader
);
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
)
{
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
build
(),
TableConfig
.
getDefault
());
}
public
static
CustomTableEnvironmentImpl
createBatch
(
StreamExecutionEnvironment
executionEnvironment
)
{
Configuration
configuration
=
new
Configuration
();
configuration
.
set
(
ExecutionOptions
.
RUNTIME_MODE
,
RuntimeExecutionMode
.
BATCH
);
TableConfig
tableConfig
=
new
TableConfig
();
tableConfig
.
addConfiguration
(
configuration
);
return
create
(
executionEnvironment
,
EnvironmentSettings
.
newInstance
().
useBlinkPlanner
().
inBatchMode
().
build
(),
tableConfig
);
}
public
static
CustomTableEnvironmentImpl
create
(
StreamExecutionEnvironment
executionEnvironment
,
EnvironmentSettings
settings
,
TableConfig
tableConfig
)
{
// temporary solution until FLINK-15635 is fixed
final
ClassLoader
classLoader
=
Thread
.
currentThread
().
getContextClassLoader
();
final
ModuleManager
moduleManager
=
new
ModuleManager
();
final
CatalogManager
catalogManager
=
CatalogManager
.
newBuilder
()
.
classLoader
(
classLoader
)
.
config
(
tableConfig
.
getConfiguration
())
.
defaultCatalog
(
settings
.
getBuiltInCatalogName
(),
new
GenericInMemoryCatalog
(
settings
.
getBuiltInCatalogName
(),
settings
.
getBuiltInDatabaseName
()))
.
executionConfig
(
executionEnvironment
.
getConfig
())
.
build
();
final
FunctionCatalog
functionCatalog
=
new
FunctionCatalog
(
tableConfig
,
catalogManager
,
moduleManager
);
final
Executor
executor
=
lookupExecutor
(
classLoader
,
settings
.
getExecutor
(),
executionEnvironment
);
final
Planner
planner
=
PlannerFactoryUtil
.
createPlanner
(
settings
.
getPlanner
(),
executor
,
tableConfig
,
catalogManager
,
functionCatalog
);
return
new
CustomTableEnvironmentImpl
(
catalogManager
,
moduleManager
,
functionCatalog
,
tableConfig
,
executionEnvironment
,
planner
,
executor
,
settings
.
isStreamingMode
(),
classLoader
);
}
private
static
Executor
lookupExecutor
(
ClassLoader
classLoader
,
String
executorIdentifier
,
StreamExecutionEnvironment
executionEnvironment
)
{
try
{
final
ExecutorFactory
executorFactory
=
FactoryUtil
.
discoverFactory
(
classLoader
,
ExecutorFactory
.
class
,
executorIdentifier
);
final
Method
createMethod
=
executorFactory
.
getClass
()
.
getMethod
(
"create"
,
StreamExecutionEnvironment
.
class
);
return
(
Executor
)
createMethod
.
invoke
(
executorFactory
,
executionEnvironment
);
}
catch
(
Exception
e
)
{
throw
new
TableException
(
"Could not instantiate the executor. Make sure a planner module is on the classpath"
,
e
);
}
}
public
ObjectNode
getStreamGraph
(
String
statement
)
{
List
<
Operation
>
operations
=
super
.
getParser
().
parse
(
statement
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Unsupported SQL query! explainSql() only accepts a single SQL query."
);
}
else
{
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
operations
.
size
();
i
++)
{
if
(
operations
.
get
(
i
)
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operations
.
get
(
i
));
}
}
List
<
Transformation
<?>>
trans
=
super
.
planner
.
translate
(
modifyOperations
);
if
(
execEnv
instanceof
DefaultExecutor
)
{
StreamGraph
streamGraph
=
((
DefaultExecutor
)
execEnv
).
getExecutionEnvironment
().
generateStreamGraph
(
trans
);
JSONGenerator
jsonGenerator
=
new
JSONGenerator
(
streamGraph
);
String
json
=
jsonGenerator
.
getJSON
();
ObjectMapper
mapper
=
new
ObjectMapper
();
ObjectNode
objectNode
=
mapper
.
createObjectNode
();
try
{
objectNode
=
(
ObjectNode
)
mapper
.
readTree
(
json
);
}
catch
(
JsonProcessingException
e
)
{
e
.
printStackTrace
();
}
finally
{
return
objectNode
;
}
}
else
{
throw
new
TableException
(
"Unsupported SQL query! explainSql() need a single SQL to query."
);
}
}
}
@Override
public
JobPlanInfo
getJobPlanInfo
(
List
<
String
>
statements
)
{
return
new
JobPlanInfo
(
JsonPlanGenerator
.
generatePlan
(
getJobGraphFromInserts
(
statements
)));
}
public
StreamGraph
getStreamGraphFromInserts
(
List
<
String
>
statements
)
{
List
<
ModifyOperation
>
modifyOperations
=
new
ArrayList
();
for
(
String
statement
:
statements
)
{
List
<
Operation
>
operations
=
getParser
().
parse
(
statement
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Only single statement is supported."
);
}
else
{
Operation
operation
=
operations
.
get
(
0
);
if
(
operation
instanceof
ModifyOperation
)
{
modifyOperations
.
add
((
ModifyOperation
)
operation
);
}
else
{
throw
new
TableException
(
"Only insert statement is supported now."
);
}
}
}
List
<
Transformation
<?>>
trans
=
getPlanner
().
translate
(
modifyOperations
);
if
(
execEnv
instanceof
DefaultExecutor
)
{
StreamGraph
streamGraph
=
((
DefaultExecutor
)
execEnv
).
getExecutionEnvironment
().
generateStreamGraph
(
trans
);
if
(
tableConfig
.
getConfiguration
().
containsKey
(
PipelineOptions
.
NAME
.
key
()))
{
streamGraph
.
setJobName
(
tableConfig
.
getConfiguration
().
getString
(
PipelineOptions
.
NAME
));
}
return
streamGraph
;
}
else
{
throw
new
TableException
(
"Unsupported SQL query! ExecEnv need a ExecutorBase."
);
}
}
public
JobGraph
getJobGraphFromInserts
(
List
<
String
>
statements
)
{
return
getStreamGraphFromInserts
(
statements
).
getJobGraph
();
}
public
SqlExplainResult
explainSqlRecord
(
String
statement
,
ExplainDetail
...
extraDetails
)
{
SqlExplainResult
record
=
new
SqlExplainResult
();
List
<
Operation
>
operations
=
getParser
().
parse
(
statement
);
record
.
setParseTrue
(
true
);
if
(
operations
.
size
()
!=
1
)
{
throw
new
TableException
(
"Unsupported SQL query! explainSql() only accepts a single SQL query."
);
}
List
<
Operation
>
operationlist
=
new
ArrayList
<>(
operations
);
for
(
int
i
=
0
;
i
<
operationlist
.
size
();
i
++)
{
Operation
operation
=
operationlist
.
get
(
i
);
if
(
operation
instanceof
ModifyOperation
)
{
record
.
setType
(
"Modify DML"
);
}
else
if
(
operation
instanceof
ExplainOperation
)
{
record
.
setType
(
"Explain DML"
);
}
else
if
(
operation
instanceof
QueryOperation
)
{
record
.
setType
(
"Query DML"
);
}
else
{
record
.
setExplain
(
operation
.
asSummaryString
());
operationlist
.
remove
(
i
);
record
.
setType
(
"DDL"
);
i
=
i
-
1
;
}
}
record
.
setExplainTrue
(
true
);
if
(
operationlist
.
size
()
==
0
)
{
//record.setExplain("DDL语句不进行解释。");
return
record
;
}
record
.
setExplain
(
planner
.
explain
(
operationlist
,
extraDetails
));
return
record
;
}
public
<
T
>
void
registerFunction
(
String
name
,
TableFunction
<
T
>
tableFunction
)
{
TypeInformation
<
T
>
typeInfo
=
UserDefinedFunctionHelper
.
getReturnTypeOfTableFunction
(
tableFunction
);
this
.
functionCatalog
.
registerTempSystemTableFunction
(
name
,
tableFunction
,
typeInfo
);
}
public
<
T
,
ACC
>
void
registerFunction
(
String
name
,
AggregateFunction
<
T
,
ACC
>
aggregateFunction
)
{
TypeInformation
<
T
>
typeInfo
=
UserDefinedFunctionHelper
.
getReturnTypeOfAggregateFunction
(
aggregateFunction
);
TypeInformation
<
ACC
>
accTypeInfo
=
UserDefinedFunctionHelper
.
getAccumulatorTypeOfAggregateFunction
(
aggregateFunction
);
this
.
functionCatalog
.
registerTempSystemAggregateFunction
(
name
,
aggregateFunction
,
typeInfo
,
accTypeInfo
);
}
public
<
T
,
ACC
>
void
registerFunction
(
String
name
,
TableAggregateFunction
<
T
,
ACC
>
tableAggregateFunction
)
{
TypeInformation
<
T
>
typeInfo
=
UserDefinedFunctionHelper
.
getReturnTypeOfAggregateFunction
(
tableAggregateFunction
);
TypeInformation
<
ACC
>
accTypeInfo
=
UserDefinedFunctionHelper
.
getAccumulatorTypeOfAggregateFunction
(
tableAggregateFunction
);
this
.
functionCatalog
.
registerTempSystemAggregateFunction
(
name
,
tableAggregateFunction
,
typeInfo
,
accTypeInfo
);
}
public
boolean
parseAndLoadConfiguration
(
String
statement
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
List
<
Operation
>
operations
=
getParser
().
parse
(
statement
);
for
(
Operation
operation
:
operations
)
{
if
(
operation
instanceof
SetOperation
)
{
callSet
((
SetOperation
)
operation
,
environment
,
setMap
);
return
true
;
}
else
if
(
operation
instanceof
ResetOperation
)
{
callReset
((
ResetOperation
)
operation
,
environment
,
setMap
);
return
true
;
}
}
return
false
;
}
private
void
callSet
(
SetOperation
setOperation
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
if
(
setOperation
.
getKey
().
isPresent
()
&&
setOperation
.
getValue
().
isPresent
())
{
String
key
=
setOperation
.
getKey
().
get
().
trim
();
String
value
=
setOperation
.
getValue
().
get
().
trim
();
if
(
Asserts
.
isNullString
(
key
)
||
Asserts
.
isNullString
(
value
))
{
return
;
}
Map
<
String
,
String
>
confMap
=
new
HashMap
<>();
confMap
.
put
(
key
,
value
);
setMap
.
put
(
key
,
value
);
Configuration
configuration
=
Configuration
.
fromMap
(
confMap
);
environment
.
getConfig
().
configure
(
configuration
,
null
);
getConfig
().
addConfiguration
(
configuration
);
}
}
private
void
callReset
(
ResetOperation
resetOperation
,
StreamExecutionEnvironment
environment
,
Map
<
String
,
Object
>
setMap
)
{
if
(
resetOperation
.
getKey
().
isPresent
())
{
String
key
=
resetOperation
.
getKey
().
get
().
trim
();
if
(
Asserts
.
isNullString
(
key
))
{
return
;
}
Map
<
String
,
String
>
confMap
=
new
HashMap
<>();
confMap
.
put
(
key
,
null
);
setMap
.
remove
(
key
);
Configuration
configuration
=
Configuration
.
fromMap
(
confMap
);
environment
.
getConfig
().
configure
(
configuration
,
null
);
getConfig
().
addConfiguration
(
configuration
);
}
else
{
setMap
.
clear
();
}
}
}
dlink-app/dlink-app-1.14/src/main/java/com/dlink/executor/CustomTableResultImpl.java
0 → 100644
View file @
8fc49e14
package
com
.
dlink
.
executor
;
import
org.apache.flink.annotation.Internal
;
import
org.apache.flink.core.execution.JobClient
;
import
org.apache.flink.table.api.DataTypes
;
import
org.apache.flink.table.api.ResultKind
;
import
org.apache.flink.table.api.TableException
;
import
org.apache.flink.table.api.TableResult
;
import
org.apache.flink.table.catalog.Column
;
import
org.apache.flink.table.catalog.ResolvedSchema
;
import
org.apache.flink.table.types.DataType
;
import
org.apache.flink.table.utils.PrintUtils
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.util.CloseableIterator
;
import
org.apache.flink.util.Preconditions
;
import
javax.annotation.Nullable
;
import
java.io.PrintWriter
;
import
java.time.ZoneId
;
import
java.util.*
;
import
java.util.concurrent.*
;
/**
* 定制TableResultImpl
*
* @author wenmo
* @since 2021/10/22 10:02
**/
@Internal
public
class
CustomTableResultImpl
implements
TableResult
{
public
static
final
TableResult
TABLE_RESULT_OK
=
CustomTableResultImpl
.
builder
()
.
resultKind
(
ResultKind
.
SUCCESS
)
.
schema
(
ResolvedSchema
.
of
(
Column
.
physical
(
"result"
,
DataTypes
.
STRING
())))
.
data
(
Collections
.
singletonList
(
Row
.
of
(
"OK"
)))
.
build
();
private
final
JobClient
jobClient
;
private
final
ResolvedSchema
resolvedSchema
;
private
final
ResultKind
resultKind
;
private
final
CloseableRowIteratorWrapper
data
;
private
final
PrintStyle
printStyle
;
private
final
ZoneId
sessionTimeZone
;
private
CustomTableResultImpl
(
@Nullable
JobClient
jobClient
,
ResolvedSchema
resolvedSchema
,
ResultKind
resultKind
,
CloseableIterator
<
Row
>
data
,
PrintStyle
printStyle
,
ZoneId
sessionTimeZone
)
{
this
.
jobClient
=
jobClient
;
this
.
resolvedSchema
=
Preconditions
.
checkNotNull
(
resolvedSchema
,
"resolvedSchema should not be null"
);
this
.
resultKind
=
Preconditions
.
checkNotNull
(
resultKind
,
"resultKind should not be null"
);
Preconditions
.
checkNotNull
(
data
,
"data should not be null"
);
this
.
data
=
new
CloseableRowIteratorWrapper
(
data
);
this
.
printStyle
=
Preconditions
.
checkNotNull
(
printStyle
,
"printStyle should not be null"
);
this
.
sessionTimeZone
=
Preconditions
.
checkNotNull
(
sessionTimeZone
,
"sessionTimeZone should not be null"
);
}
public
static
TableResult
buildTableResult
(
List
<
TableSchemaField
>
fields
,
List
<
Row
>
rows
)
{
Builder
builder
=
builder
().
resultKind
(
ResultKind
.
SUCCESS
);
if
(
fields
.
size
()
>
0
)
{
List
<
String
>
columnNames
=
new
ArrayList
<>();
List
<
DataType
>
columnTypes
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
fields
.
size
();
i
++)
{
columnNames
.
add
(
fields
.
get
(
i
).
getName
());
columnTypes
.
add
(
fields
.
get
(
i
).
getType
());
}
builder
.
schema
(
ResolvedSchema
.
physical
(
columnNames
,
columnTypes
)).
data
(
rows
);
}
return
builder
.
build
();
}
@Override
public
Optional
<
JobClient
>
getJobClient
()
{
return
Optional
.
ofNullable
(
jobClient
);
}
@Override
public
void
await
()
throws
InterruptedException
,
ExecutionException
{
try
{
awaitInternal
(-
1
,
TimeUnit
.
MILLISECONDS
);
}
catch
(
TimeoutException
e
)
{
// do nothing
}
}
@Override
public
void
await
(
long
timeout
,
TimeUnit
unit
)
throws
InterruptedException
,
ExecutionException
,
TimeoutException
{
awaitInternal
(
timeout
,
unit
);
}
private
void
awaitInternal
(
long
timeout
,
TimeUnit
unit
)
throws
InterruptedException
,
ExecutionException
,
TimeoutException
{
if
(
jobClient
==
null
)
{
return
;
}
ExecutorService
executor
=
Executors
.
newFixedThreadPool
(
1
,
r
->
new
Thread
(
r
,
"TableResult-await-thread"
));
try
{
CompletableFuture
<
Void
>
future
=
CompletableFuture
.
runAsync
(
()
->
{
while
(!
data
.
isFirstRowReady
())
{
try
{
Thread
.
sleep
(
100
);
}
catch
(
InterruptedException
e
)
{
throw
new
TableException
(
"Thread is interrupted"
);
}
}
},
executor
);
if
(
timeout
>=
0
)
{
future
.
get
(
timeout
,
unit
);
}
else
{
future
.
get
();
}
}
finally
{
executor
.
shutdown
();
}
}
@Override
public
ResolvedSchema
getResolvedSchema
()
{
return
resolvedSchema
;
}
@Override
public
ResultKind
getResultKind
()
{
return
resultKind
;
}
@Override
public
CloseableIterator
<
Row
>
collect
()
{
return
data
;
}
@Override
public
void
print
()
{
Iterator
<
Row
>
it
=
collect
();
if
(
printStyle
instanceof
TableauStyle
)
{
int
maxColumnWidth
=
((
TableauStyle
)
printStyle
).
getMaxColumnWidth
();
String
nullColumn
=
((
TableauStyle
)
printStyle
).
getNullColumn
();
boolean
deriveColumnWidthByType
=
((
TableauStyle
)
printStyle
).
isDeriveColumnWidthByType
();
boolean
printRowKind
=
((
TableauStyle
)
printStyle
).
isPrintRowKind
();
PrintUtils
.
printAsTableauForm
(
getResolvedSchema
(),
it
,
new
PrintWriter
(
System
.
out
),
maxColumnWidth
,
nullColumn
,
deriveColumnWidthByType
,
printRowKind
,
sessionTimeZone
);
}
else
if
(
printStyle
instanceof
RawContentStyle
)
{
while
(
it
.
hasNext
())
{
System
.
out
.
println
(
String
.
join
(
","
,
PrintUtils
.
rowToString
(
it
.
next
(),
getResolvedSchema
(),
sessionTimeZone
)));
}
}
else
{
throw
new
TableException
(
"Unsupported print style: "
+
printStyle
);
}
}
public
static
Builder
builder
()
{
return
new
Builder
();
}
/**
* Builder for creating a {@link CustomTableResultImpl}.
*/
public
static
class
Builder
{
private
JobClient
jobClient
=
null
;
private
ResolvedSchema
resolvedSchema
=
null
;
private
ResultKind
resultKind
=
null
;
private
CloseableIterator
<
Row
>
data
=
null
;
private
PrintStyle
printStyle
=
PrintStyle
.
tableau
(
Integer
.
MAX_VALUE
,
PrintUtils
.
NULL_COLUMN
,
false
,
false
);
private
ZoneId
sessionTimeZone
=
ZoneId
.
of
(
"UTC"
);
private
Builder
()
{
}
/**
* Specifies job client which associates the submitted Flink job.
*
* @param jobClient a {@link JobClient} for the submitted Flink job.
*/
public
Builder
jobClient
(
JobClient
jobClient
)
{
this
.
jobClient
=
jobClient
;
return
this
;
}
/**
* Specifies schema of the execution result.
*
* @param resolvedSchema a {@link ResolvedSchema} for the execution result.
*/
public
Builder
schema
(
ResolvedSchema
resolvedSchema
)
{
Preconditions
.
checkNotNull
(
resolvedSchema
,
"resolvedSchema should not be null"
);
this
.
resolvedSchema
=
resolvedSchema
;
return
this
;
}
/**
* Specifies result kind of the execution result.
*
* @param resultKind a {@link ResultKind} for the execution result.
*/
public
Builder
resultKind
(
ResultKind
resultKind
)
{
Preconditions
.
checkNotNull
(
resultKind
,
"resultKind should not be null"
);
this
.
resultKind
=
resultKind
;
return
this
;
}
/**
* Specifies an row iterator as the execution result.
*
* @param rowIterator a row iterator as the execution result.
*/
public
Builder
data
(
CloseableIterator
<
Row
>
rowIterator
)
{
Preconditions
.
checkNotNull
(
rowIterator
,
"rowIterator should not be null"
);
this
.
data
=
rowIterator
;
return
this
;
}
/**
* Specifies an row list as the execution result.
*
* @param rowList a row list as the execution result.
*/
public
Builder
data
(
List
<
Row
>
rowList
)
{
Preconditions
.
checkNotNull
(
rowList
,
"listRows should not be null"
);
this
.
data
=
CloseableIterator
.
adapterForIterator
(
rowList
.
iterator
());
return
this
;
}
/**
* Specifies print style. Default is {@link TableauStyle} with max integer column width.
*/
public
Builder
setPrintStyle
(
PrintStyle
printStyle
)
{
Preconditions
.
checkNotNull
(
printStyle
,
"printStyle should not be null"
);
this
.
printStyle
=
printStyle
;
return
this
;
}
/**
* Specifies session time zone.
*/
public
Builder
setSessionTimeZone
(
ZoneId
sessionTimeZone
)
{
Preconditions
.
checkNotNull
(
sessionTimeZone
,
"sessionTimeZone should not be null"
);
this
.
sessionTimeZone
=
sessionTimeZone
;
return
this
;
}
/**
* Returns a {@link TableResult} instance.
*/
public
TableResult
build
()
{
return
new
CustomTableResultImpl
(
jobClient
,
resolvedSchema
,
resultKind
,
data
,
printStyle
,
sessionTimeZone
);
}
}
/**
* Root interface for all print styles.
*/
public
interface
PrintStyle
{
/**
* Create a tableau print style with given max column width, null column, change mode
* indicator and a flag to indicate whether the column width is derived from type (true) or
* content (false), which prints the result schema and content as tableau form.
*/
static
PrintStyle
tableau
(
int
maxColumnWidth
,
String
nullColumn
,
boolean
deriveColumnWidthByType
,
boolean
printRowKind
)
{
Preconditions
.
checkArgument
(
maxColumnWidth
>
0
,
"maxColumnWidth should be greater than 0"
);
Preconditions
.
checkNotNull
(
nullColumn
,
"nullColumn should not be null"
);
return
new
TableauStyle
(
maxColumnWidth
,
nullColumn
,
deriveColumnWidthByType
,
printRowKind
);
}
/**
* Create a raw content print style, which only print the result content as raw form. column
* delimiter is ",", row delimiter is "\n".
*/
static
PrintStyle
rawContent
()
{
return
new
RawContentStyle
();
}
}
/**
* print the result schema and content as tableau form.
*/
private
static
final
class
TableauStyle
implements
PrintStyle
{
/**
* A flag to indicate whether the column width is derived from type (true) or content
* (false).
*/
private
final
boolean
deriveColumnWidthByType
;
private
final
int
maxColumnWidth
;
private
final
String
nullColumn
;
/**
* A flag to indicate whether print row kind info.
*/
private
final
boolean
printRowKind
;
private
TableauStyle
(
int
maxColumnWidth
,
String
nullColumn
,
boolean
deriveColumnWidthByType
,
boolean
printRowKind
)
{
this
.
deriveColumnWidthByType
=
deriveColumnWidthByType
;
this
.
maxColumnWidth
=
maxColumnWidth
;
this
.
nullColumn
=
nullColumn
;
this
.
printRowKind
=
printRowKind
;
}
public
boolean
isDeriveColumnWidthByType
()
{
return
deriveColumnWidthByType
;
}
int
getMaxColumnWidth
()
{
return
maxColumnWidth
;
}
String
getNullColumn
()
{
return
nullColumn
;
}
public
boolean
isPrintRowKind
()
{
return
printRowKind
;
}
}
/**
* only print the result content as raw form. column delimiter is ",", row delimiter is "\n".
*/
private
static
final
class
RawContentStyle
implements
PrintStyle
{
}
/**
* A {@link CloseableIterator} wrapper class that can return whether the first row is ready.
*
* <p>The first row is ready when {@link #hasNext} method returns true or {@link #next()} method
* returns a row. The execution order of {@link TableResult#collect} method and {@link
* TableResult#await()} may be arbitrary, this class will record whether the first row is ready
* (or accessed).
*/
private
static
final
class
CloseableRowIteratorWrapper
implements
CloseableIterator
<
Row
>
{
private
final
CloseableIterator
<
Row
>
iterator
;
private
boolean
isFirstRowReady
=
false
;
private
CloseableRowIteratorWrapper
(
CloseableIterator
<
Row
>
iterator
)
{
this
.
iterator
=
iterator
;
}
@Override
public
void
close
()
throws
Exception
{
iterator
.
close
();
}
@Override
public
boolean
hasNext
()
{
boolean
hasNext
=
iterator
.
hasNext
();
isFirstRowReady
=
isFirstRowReady
||
hasNext
;
return
hasNext
;
}
@Override
public
Row
next
()
{
Row
next
=
iterator
.
next
();
isFirstRowReady
=
true
;
return
next
;
}
public
boolean
isFirstRowReady
()
{
return
isFirstRowReady
||
hasNext
();
}
}
}
\ No newline at end of file
dlink-app/dlink-app-1.14/src/main/java/com/dlink/executor/TableSchemaField.java
0 → 100644
View file @
8fc49e14
package
com
.
dlink
.
executor
;
import
org.apache.flink.table.types.DataType
;
/**
* @author wenmo
* @since 2021/10/22 10:02
**/
public
class
TableSchemaField
{
private
String
name
;
private
DataType
type
;
public
TableSchemaField
(
String
name
,
DataType
type
)
{
this
.
name
=
name
;
this
.
type
=
type
;
}
public
String
getName
()
{
return
name
;
}
public
void
setName
(
String
name
)
{
this
.
name
=
name
;
}
public
DataType
getType
()
{
return
type
;
}
public
void
setType
(
DataType
type
)
{
this
.
type
=
type
;
}
}
dlink-app/dlink-app-base/pom.xml
0 → 100644
View file @
8fc49e14
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
dlink-app
</artifactId>
<groupId>
com.dlink
</groupId>
<version>
0.6.0-SNAPSHOT
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
dlink-app-base
</artifactId>
<properties>
<maven.compiler.source>
8
</maven.compiler.source>
<maven.compiler.target>
8
</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>
mysql
</groupId>
<artifactId>
mysql-connector-java
</artifactId>
<!-- <scope>provided</scope>-->
<version>
8.0.21
</version>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-1.13
</artifactId>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-base
</artifactId>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-executor
</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
dlink-app/src/main/java/com/dlink/app/db/DBConfig.java
→
dlink-app/
dlink-app-base/
src/main/java/com/dlink/app/db/DBConfig.java
View file @
8fc49e14
...
@@ -24,12 +24,12 @@ public class DBConfig {
...
@@ -24,12 +24,12 @@ public class DBConfig {
this
.
password
=
password
;
this
.
password
=
password
;
}
}
public
static
DBConfig
build
(
String
driver
,
String
url
,
String
username
,
String
password
){
public
static
DBConfig
build
(
String
driver
,
String
url
,
String
username
,
String
password
)
{
return
new
DBConfig
(
driver
,
url
,
username
,
password
);
return
new
DBConfig
(
driver
,
url
,
username
,
password
);
}
}
public
static
DBConfig
build
(
Map
<
String
,
String
>
params
){
public
static
DBConfig
build
(
Map
<
String
,
String
>
params
)
{
return
new
DBConfig
(
params
.
get
(
FlinkParamConstant
.
DRIVER
),
return
new
DBConfig
(
params
.
get
(
FlinkParamConstant
.
DRIVER
),
params
.
get
(
FlinkParamConstant
.
URL
),
params
.
get
(
FlinkParamConstant
.
URL
),
params
.
get
(
FlinkParamConstant
.
USERNAME
),
params
.
get
(
FlinkParamConstant
.
USERNAME
),
...
...
dlink-app/src/main/java/com/dlink/app/db/DBUtil.java
→
dlink-app/
dlink-app-base/
src/main/java/com/dlink/app/db/DBUtil.java
View file @
8fc49e14
package
com
.
dlink
.
app
.
db
;
package
com
.
dlink
.
app
.
db
;
import
java.io.IOException
;
import
java.io.IOException
;
import
java.sql.Connection
;
import
java.sql.*
;
import
java.sql.DriverManager
;
import
java.sql.ResultSet
;
import
java.sql.SQLException
;
import
java.sql.Statement
;
import
java.util.ArrayList
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.List
;
...
@@ -19,7 +15,6 @@ import java.util.Map;
...
@@ -19,7 +15,6 @@ import java.util.Map;
**/
**/
public
class
DBUtil
{
public
class
DBUtil
{
private
static
Connection
getConnection
(
DBConfig
config
)
throws
IOException
{
private
static
Connection
getConnection
(
DBConfig
config
)
throws
IOException
{
Connection
conn
=
null
;
Connection
conn
=
null
;
try
{
try
{
...
@@ -42,7 +37,7 @@ public class DBUtil {
...
@@ -42,7 +37,7 @@ public class DBUtil {
}
}
}
}
public
static
String
getOneByID
(
String
sql
,
DBConfig
config
)
throws
SQLException
,
IOException
{
public
static
String
getOneByID
(
String
sql
,
DBConfig
config
)
throws
SQLException
,
IOException
{
Connection
conn
=
getConnection
(
config
);
Connection
conn
=
getConnection
(
config
);
String
result
=
null
;
String
result
=
null
;
try
(
Statement
stmt
=
conn
.
createStatement
();
try
(
Statement
stmt
=
conn
.
createStatement
();
...
@@ -60,18 +55,18 @@ public class DBUtil {
...
@@ -60,18 +55,18 @@ public class DBUtil {
return
result
;
return
result
;
}
}
public
static
Map
<
String
,
String
>
getMapByID
(
String
sql
,
DBConfig
config
)
throws
SQLException
,
IOException
{
public
static
Map
<
String
,
String
>
getMapByID
(
String
sql
,
DBConfig
config
)
throws
SQLException
,
IOException
{
Connection
conn
=
getConnection
(
config
);
Connection
conn
=
getConnection
(
config
);
HashMap
<
String
,
String
>
map
=
new
HashMap
();
HashMap
<
String
,
String
>
map
=
new
HashMap
();
try
(
Statement
stmt
=
conn
.
createStatement
();
try
(
Statement
stmt
=
conn
.
createStatement
();
ResultSet
rs
=
stmt
.
executeQuery
(
sql
))
{
ResultSet
rs
=
stmt
.
executeQuery
(
sql
))
{
List
<
String
>
columnList
=
new
ArrayList
<>();
List
<
String
>
columnList
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
rs
.
getMetaData
().
getColumnCount
();
i
++)
{
for
(
int
i
=
0
;
i
<
rs
.
getMetaData
().
getColumnCount
();
i
++)
{
columnList
.
add
(
rs
.
getMetaData
().
getColumnLabel
(
i
+
1
));
columnList
.
add
(
rs
.
getMetaData
().
getColumnLabel
(
i
+
1
));
}
}
if
(
rs
.
next
())
{
if
(
rs
.
next
())
{
for
(
int
i
=
0
;
i
<
columnList
.
size
();
i
++)
{
for
(
int
i
=
0
;
i
<
columnList
.
size
();
i
++)
{
map
.
put
(
columnList
.
get
(
i
),
rs
.
getString
(
i
+
1
));
map
.
put
(
columnList
.
get
(
i
),
rs
.
getString
(
i
+
1
));
}
}
}
}
}
}
...
@@ -79,19 +74,19 @@ public class DBUtil {
...
@@ -79,19 +74,19 @@ public class DBUtil {
return
map
;
return
map
;
}
}
public
static
List
<
Map
<
String
,
String
>>
getListByID
(
String
sql
,
DBConfig
config
)
throws
SQLException
,
IOException
{
public
static
List
<
Map
<
String
,
String
>>
getListByID
(
String
sql
,
DBConfig
config
)
throws
SQLException
,
IOException
{
Connection
conn
=
getConnection
(
config
);
Connection
conn
=
getConnection
(
config
);
List
<
Map
<
String
,
String
>>
list
=
new
ArrayList
<>();
List
<
Map
<
String
,
String
>>
list
=
new
ArrayList
<>();
try
(
Statement
stmt
=
conn
.
createStatement
();
try
(
Statement
stmt
=
conn
.
createStatement
();
ResultSet
rs
=
stmt
.
executeQuery
(
sql
))
{
ResultSet
rs
=
stmt
.
executeQuery
(
sql
))
{
List
<
String
>
columnList
=
new
ArrayList
<>();
List
<
String
>
columnList
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
rs
.
getMetaData
().
getColumnCount
();
i
++)
{
for
(
int
i
=
0
;
i
<
rs
.
getMetaData
().
getColumnCount
();
i
++)
{
columnList
.
add
(
rs
.
getMetaData
().
getColumnName
(
i
));
columnList
.
add
(
rs
.
getMetaData
().
getColumnName
(
i
));
}
}
while
(
rs
.
next
())
{
while
(
rs
.
next
())
{
HashMap
<
String
,
String
>
map
=
new
HashMap
();
HashMap
<
String
,
String
>
map
=
new
HashMap
();
for
(
int
i
=
0
;
i
<
columnList
.
size
();
i
++)
{
for
(
int
i
=
0
;
i
<
columnList
.
size
();
i
++)
{
map
.
put
(
columnList
.
get
(
i
),
rs
.
getString
(
i
));
map
.
put
(
columnList
.
get
(
i
),
rs
.
getString
(
i
));
}
}
list
.
add
(
map
);
list
.
add
(
map
);
}
}
...
...
dlink-app/src/main/java/com/dlink/app/flinksql/StatementParam.java
→
dlink-app/
dlink-app-base/
src/main/java/com/dlink/app/flinksql/StatementParam.java
View file @
8fc49e14
File moved
dlink-app/src/main/java/com/dlink/app/flinksql/Submiter.java
→
dlink-app/
dlink-app-base/
src/main/java/com/dlink/app/flinksql/Submiter.java
View file @
8fc49e14
...
@@ -10,7 +10,6 @@ import com.dlink.interceptor.FlinkInterceptor;
...
@@ -10,7 +10,6 @@ import com.dlink.interceptor.FlinkInterceptor;
import
com.dlink.parser.SqlType
;
import
com.dlink.parser.SqlType
;
import
com.dlink.trans.Operations
;
import
com.dlink.trans.Operations
;
import
org.apache.flink.configuration.CheckpointingOptions
;
import
org.apache.flink.configuration.CheckpointingOptions
;
import
org.apache.flink.streaming.api.environment.CheckpointConfig
;
import
org.slf4j.Logger
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
...
@@ -87,13 +86,13 @@ public class Submiter {
...
@@ -87,13 +86,13 @@ public class Submiter {
List
<
String
>
statements
=
Submiter
.
getStatements
(
sb
.
toString
());
List
<
String
>
statements
=
Submiter
.
getStatements
(
sb
.
toString
());
ExecutorSetting
executorSetting
=
ExecutorSetting
.
build
(
taskConfig
);
ExecutorSetting
executorSetting
=
ExecutorSetting
.
build
(
taskConfig
);
String
uuid
=
UUID
.
randomUUID
().
toString
().
replace
(
"-"
,
""
);
String
uuid
=
UUID
.
randomUUID
().
toString
().
replace
(
"-"
,
""
);
if
(
executorSetting
.
getConfig
().
containsKey
(
CheckpointingOptions
.
CHECKPOINTS_DIRECTORY
.
key
()))
{
if
(
executorSetting
.
getConfig
().
containsKey
(
CheckpointingOptions
.
CHECKPOINTS_DIRECTORY
.
key
()))
{
executorSetting
.
getConfig
().
put
(
CheckpointingOptions
.
CHECKPOINTS_DIRECTORY
.
key
(),
executorSetting
.
getConfig
().
put
(
CheckpointingOptions
.
CHECKPOINTS_DIRECTORY
.
key
(),
executorSetting
.
getConfig
().
get
(
CheckpointingOptions
.
CHECKPOINTS_DIRECTORY
.
key
())
+
"/"
+
uuid
);
executorSetting
.
getConfig
().
get
(
CheckpointingOptions
.
CHECKPOINTS_DIRECTORY
.
key
())
+
"/"
+
uuid
);
}
}
if
(
executorSetting
.
getConfig
().
containsKey
(
CheckpointingOptions
.
SAVEPOINT_DIRECTORY
.
key
()))
{
if
(
executorSetting
.
getConfig
().
containsKey
(
CheckpointingOptions
.
SAVEPOINT_DIRECTORY
.
key
()))
{
executorSetting
.
getConfig
().
put
(
CheckpointingOptions
.
SAVEPOINT_DIRECTORY
.
key
(),
executorSetting
.
getConfig
().
put
(
CheckpointingOptions
.
SAVEPOINT_DIRECTORY
.
key
(),
executorSetting
.
getConfig
().
get
(
CheckpointingOptions
.
SAVEPOINT_DIRECTORY
.
key
())
+
"/"
+
uuid
);
executorSetting
.
getConfig
().
get
(
CheckpointingOptions
.
SAVEPOINT_DIRECTORY
.
key
())
+
"/"
+
uuid
);
}
}
logger
.
info
(
"作业配置如下: "
+
executorSetting
.
toString
());
logger
.
info
(
"作业配置如下: "
+
executorSetting
.
toString
());
Executor
executor
=
Executor
.
buildAppStreamExecutor
(
executorSetting
);
Executor
executor
=
Executor
.
buildAppStreamExecutor
(
executorSetting
);
...
@@ -150,7 +149,7 @@ public class Submiter {
...
@@ -150,7 +149,7 @@ public class Submiter {
for
(
StatementParam
item
:
execute
)
{
for
(
StatementParam
item
:
execute
)
{
executes
.
add
(
item
.
getValue
());
executes
.
add
(
item
.
getValue
());
executor
.
executeSql
(
item
.
getValue
());
executor
.
executeSql
(
item
.
getValue
());
if
(!
executorSetting
.
isUseStatementSet
())
{
if
(!
executorSetting
.
isUseStatementSet
())
{
break
;
break
;
}
}
}
}
...
...
dlink-app/pom.xml
View file @
8fc49e14
...
@@ -10,83 +10,20 @@
...
@@ -10,83 +10,20 @@
<modelVersion>
4.0.0
</modelVersion>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
dlink-app
</artifactId>
<artifactId>
dlink-app
</artifactId>
<properties>
<mainClass>
com.dlink.app.MainApp
</mainClass>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
<maven.compiler.source>
1.8
</maven.compiler.source>
<maven.compiler.target>
1.8
</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>
mysql
</groupId>
<artifactId>
mysql-connector-java
</artifactId>
<!-- <scope>provided</scope>-->
<version>
8.0.21
</version>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-1.13
</artifactId>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-client-base
</artifactId>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-executor
</artifactId>
</dependency>
</dependencies>
<
build
>
<
packaging>
pom
</packaging
>
<resources>
<resource
>
<modules
>
<directory>
src/main/resources
</directory
>
<module>
dlink-app-1.13
</module
>
<includes
>
<module>
dlink-app-base
</module
>
<include>
*.properties
</includ
e>
<module>
dlink-app-1.14
</modul
e>
</includes
>
<module>
dlink-app-1.12
</module
>
</resourc
e>
<module>
dlink-app-1.11
</modul
e>
</resourc
es>
</modul
es>
<plugins>
<properties>
<!-- 编译插件 -->
<maven.compiler.source>
8
</maven.compiler.source>
<plugin>
<maven.compiler.target>
8
</maven.compiler.target>
<groupId>
org.apache.maven.plugins
</groupId>
</properties>
<artifactId>
maven-compiler-plugin
</artifactId>
<version>
3.6.0
</version>
<configuration>
<source>
1.8
</source>
<target>
1.8
</target>
<encoding>
UTF-8
</encoding>
</configuration>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-assembly-plugin
</artifactId>
<version>
2.6
</version>
<configuration>
<descriptorRefs>
<descriptorRef>
jar-with-dependencies
</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass>
com.dlink.app.MainApp
</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>
make-assembly
</id>
<phase>
package
</phase>
<goals>
<goal>
single
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
</project>
\ No newline at end of file
dlink-assembly/src/main/assembly/package.xml
View file @
8fc49e14
...
@@ -203,10 +203,31 @@
...
@@ -203,10 +203,31 @@
</includes>
</includes>
</fileSet>-->
</fileSet>-->
<fileSet>
<fileSet>
<directory>
${project.parent.basedir}/dlink-app/target
</directory>
<directory>
${project.parent.basedir}/dlink-app/
dlink-app-1.11/
target
</directory>
<outputDirectory>
jar
</outputDirectory>
<outputDirectory>
jar
</outputDirectory>
<includes>
<includes>
<include>
dlink-app-${project.version}-jar-with-dependencies.jar
</include>
<include>
dlink-app-1.11-${project.version}-jar-with-dependencies.jar
</include>
</includes>
</fileSet>
<fileSet>
<directory>
${project.parent.basedir}/dlink-app/dlink-app-1.12/target
</directory>
<outputDirectory>
jar
</outputDirectory>
<includes>
<include>
dlink-app-1.12-${project.version}-jar-with-dependencies.jar
</include>
</includes>
</fileSet>
<fileSet>
<directory>
${project.parent.basedir}/dlink-app/dlink-app-1.13/target
</directory>
<outputDirectory>
jar
</outputDirectory>
<includes>
<include>
dlink-app-1.13-${project.version}-jar-with-dependencies.jar
</include>
</includes>
</fileSet>
<fileSet>
<directory>
${project.parent.basedir}/dlink-app/dlink-app-1.14/target
</directory>
<outputDirectory>
jar
</outputDirectory>
<includes>
<include>
dlink-app-1.14-${project.version}-jar-with-dependencies.jar
</include>
</includes>
</includes>
</fileSet>
</fileSet>
<fileSet>
<fileSet>
...
...
pom.xml
View file @
8fc49e14
...
@@ -307,6 +307,11 @@
...
@@ -307,6 +307,11 @@
<artifactId>
dlink-daemon
</artifactId>
<artifactId>
dlink-daemon
</artifactId>
<version>
${project.version}
</version>
<version>
${project.version}
</version>
</dependency>
</dependency>
<dependency>
<groupId>
com.dlink
</groupId>
<artifactId>
dlink-app-base
</artifactId>
<version>
${project.version}
</version>
</dependency>
<dependency>
<dependency>
<groupId>
org.apache.httpcomponents
</groupId>
<groupId>
org.apache.httpcomponents
</groupId>
<artifactId>
httpclient
</artifactId>
<artifactId>
httpclient
</artifactId>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment