Commit 527e57d6 authored by godkaikai's avatar godkaikai

yarn-application and yarn per-job

parent cdc26038
......@@ -118,8 +118,19 @@
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-gateway</artifactId>
<!--<scope>provided</scope>-->
<scope>provided</scope>
</dependency>
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId>
<version>1.12.5</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
</exclusions>
</dependency>-->
<!--<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-metadata-mysql</artifactId>
......
......@@ -105,5 +105,13 @@ public class TaskController {
Task task = taskService.getTaskInfoById(id);
return Result.succeed(task,"获取成功");
}
/**
* 提交作业
*/
@GetMapping(value = "/submitApplication")
public Result submitApplicationByTaskId(@RequestParam Integer id) {
return taskService.submitApplicationByTaskId(id);
}
}
package com.dlink.service;
import com.dlink.common.result.Result;
import com.dlink.db.service.ISuperService;
import com.dlink.job.JobResult;
import com.dlink.model.Task;
......@@ -16,6 +17,8 @@ public interface TaskService extends ISuperService<Task> {
JobResult submitByTaskId(Integer id);
Result submitApplicationByTaskId(Integer id);
Task getTaskInfoById(Integer id);
boolean saveOrUpdateTask(Task task);
......
......@@ -3,11 +3,14 @@ package com.dlink.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.dlink.assertion.Assert;
import com.dlink.cluster.FlinkCluster;
import com.dlink.common.result.Result;
import com.dlink.constant.FlinkConstant;
import com.dlink.db.service.impl.SuperServiceImpl;
import com.dlink.exception.BusException;
import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import com.dlink.gateway.GatewayConfig;
import com.dlink.gateway.GatewayType;
import com.dlink.job.JobConfig;
import com.dlink.job.JobManager;
import com.dlink.job.JobResult;
......@@ -51,6 +54,24 @@ public class TaskServiceImpl extends SuperServiceImpl<TaskMapper, Task> implemen
return jobManager.executeSql(statement.getStatement());
}
@Override
public Result submitApplicationByTaskId(Integer id) {
Task task = this.getById(id);
Assert.check(task);
Statement statement = statementService.getById(id);
Assert.check(statement);
JobConfig config = task.buildSubmitConfig();
GatewayConfig gatewayConfig = new GatewayConfig();
gatewayConfig.setJobName(config.getJobName());
gatewayConfig.setType(GatewayType.YARN_PER_JOB);
gatewayConfig.setFlinkConfigPath("/opt/src/flink-1.12.2_pj/conf");
gatewayConfig.setFlinkLibs("hdfs:///flink12/lib/flinklib");
gatewayConfig.setYarnConfigPath("/usr/local/hadoop/hadoop-2.7.7/etc/hadoop/yarn-site.xml");
JobManager jobManager = JobManager.build(config);
SubmitResult result = jobManager.submitGraph(statement.getStatement(), gatewayConfig);
return Result.succeed(result,"提交成功");
}
@Override
public Task getTaskInfoById(Integer id) {
Task task = this.getById(id);
......
......@@ -23,20 +23,31 @@ import java.util.concurrent.*;
**/
@Internal
class CustomTableResultImpl implements TableResult {
public static final TableResult TABLE_RESULT_OK;
public static final TableResult TABLE_RESULT_OK =
CustomTableResultImpl.builder()
.resultKind(ResultKind.SUCCESS)
.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
.data(Collections.singletonList(Row.of("OK")))
.build();
private final JobClient jobClient;
private final TableSchema tableSchema;
private final ResultKind resultKind;
private final CloseableRowIteratorWrapper data;
private final CloseableIterator<Row> data;
private final PrintStyle printStyle;
private CustomTableResultImpl(@Nullable JobClient jobClient, TableSchema tableSchema, ResultKind resultKind, CloseableIterator<Row> data, PrintStyle printStyle) {
private CustomTableResultImpl(
@Nullable JobClient jobClient,
TableSchema tableSchema,
ResultKind resultKind,
CloseableIterator<Row> data,
PrintStyle printStyle) {
this.jobClient = jobClient;
this.tableSchema = (TableSchema) Preconditions.checkNotNull(tableSchema, "tableSchema should not be null");
this.resultKind = (ResultKind)Preconditions.checkNotNull(resultKind, "resultKind should not be null");
Preconditions.checkNotNull(data, "data should not be null");
this.data = new CloseableRowIteratorWrapper(data);
this.printStyle = (PrintStyle)Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.tableSchema =
Preconditions.checkNotNull(tableSchema, "tableSchema should not be null");
this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.data = Preconditions.checkNotNull(data, "data should not be null");
this.printStyle = Preconditions.checkNotNull(printStyle, "printStyle should not be null");
}
public static TableResult buildTableResult(List<TableSchemaField> fields,List<Row> rows){
......@@ -51,220 +62,189 @@ class CustomTableResultImpl implements TableResult {
return builder.build();
}
@Override
public Optional<JobClient> getJobClient() {
return Optional.ofNullable(this.jobClient);
}
public void await() throws InterruptedException, ExecutionException {
try {
this.awaitInternal(-1L, TimeUnit.MILLISECONDS);
} catch (TimeoutException var2) {
;
}
}
public void await(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
this.awaitInternal(timeout, unit);
}
private void awaitInternal(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (this.jobClient != null) {
ExecutorService executor = Executors.newFixedThreadPool(1, (r) -> {
return new Thread(r, "TableResult-await-thread");
});
try {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
while(!this.data.isFirstRowReady()) {
try {
Thread.sleep(100L);
} catch (InterruptedException var2) {
throw new TableException("Thread is interrupted");
}
}
}, executor);
if (timeout >= 0L) {
future.get(timeout, unit);
} else {
future.get();
}
} finally {
executor.shutdown();
}
}
return Optional.ofNullable(jobClient);
}
@Override
public TableSchema getTableSchema() {
return this.tableSchema;
return tableSchema;
}
@Override
public ResultKind getResultKind() {
return this.resultKind;
return resultKind;
}
@Override
public CloseableIterator<Row> collect() {
return this.data;
return data;
}
@Override
public void print() {
Iterator<Row> it = this.collect();
if (this.printStyle instanceof TableauStyle) {
int maxColumnWidth = ((TableauStyle)this.printStyle).getMaxColumnWidth();
String nullColumn = ((TableauStyle)this.printStyle).getNullColumn();
boolean deriveColumnWidthByType = ((TableauStyle)this.printStyle).isDeriveColumnWidthByType();
PrintUtils.printAsTableauForm(this.getTableSchema(), it, new PrintWriter(System.out), maxColumnWidth, nullColumn, deriveColumnWidthByType);
} else {
if (!(this.printStyle instanceof RawContentStyle)) {
throw new TableException("Unsupported print style: " + this.printStyle);
}
while(it.hasNext()) {
System.out.println(String.join(",", PrintUtils.rowToString((Row)it.next())));
Iterator<Row> it = collect();
if (printStyle instanceof TableauStyle) {
int maxColumnWidth = ((TableauStyle) printStyle).getMaxColumnWidth();
String nullColumn = ((TableauStyle) printStyle).getNullColumn();
boolean deriveColumnWidthByType =
((TableauStyle) printStyle).isDeriveColumnWidthByType();
PrintUtils.printAsTableauForm(
getTableSchema(),
it,
new PrintWriter(System.out),
maxColumnWidth,
nullColumn,
deriveColumnWidthByType);
} else if (printStyle instanceof RawContentStyle) {
while (it.hasNext()) {
System.out.println(String.join(",", PrintUtils.rowToString(it.next())));
}
} else {
throw new TableException("Unsupported print style: " + printStyle);
}
}
public static Builder builder() {
return new Builder();
}
static {
TABLE_RESULT_OK = builder().resultKind(ResultKind.SUCCESS).tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build()).data(Collections.singletonList(Row.of(new Object[]{"OK"}))).build();
}
private static final class CloseableRowIteratorWrapper implements CloseableIterator<Row> {
private final CloseableIterator<Row> iterator;
private boolean isFirstRowReady;
private CloseableRowIteratorWrapper(CloseableIterator<Row> iterator) {
this.isFirstRowReady = false;
this.iterator = iterator;
}
public void close() throws Exception {
this.iterator.close();
}
public boolean hasNext() {
boolean hasNext = this.iterator.hasNext();
this.isFirstRowReady = this.isFirstRowReady || hasNext;
return hasNext;
}
public Row next() {
Row next = (Row)this.iterator.next();
this.isFirstRowReady = true;
return next;
}
public boolean isFirstRowReady() {
return this.isFirstRowReady || this.hasNext();
}
}
private static final class RawContentStyle implements PrintStyle {
private RawContentStyle() {
}
}
private static final class TableauStyle implements PrintStyle {
private final boolean deriveColumnWidthByType;
private final int maxColumnWidth;
private final String nullColumn;
private final boolean printRowKind;
private TableauStyle(int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType, boolean printRowKind) {
this.deriveColumnWidthByType = deriveColumnWidthByType;
this.maxColumnWidth = maxColumnWidth;
this.nullColumn = nullColumn;
this.printRowKind = printRowKind;
}
public boolean isDeriveColumnWidthByType() {
return this.deriveColumnWidthByType;
}
int getMaxColumnWidth() {
return this.maxColumnWidth;
}
String getNullColumn() {
return this.nullColumn;
}
public boolean isPrintRowKind() {
return this.printRowKind;
}
}
public interface PrintStyle {
static PrintStyle tableau(int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType, boolean printRowKind) {
Preconditions.checkArgument(maxColumnWidth > 0, "maxColumnWidth should be greater than 0");
Preconditions.checkNotNull(nullColumn, "nullColumn should not be null");
return new TableauStyle(maxColumnWidth, nullColumn, deriveColumnWidthByType, printRowKind);
}
static PrintStyle rawContent() {
return new RawContentStyle();
}
}
/** Builder for creating a {@link CustomTableResultImpl}. */
public static class Builder {
private JobClient jobClient;
private TableSchema tableSchema;
private ResultKind resultKind;
private CloseableIterator<Row> data;
private PrintStyle printStyle;
private Builder() {
this.jobClient = null;
this.tableSchema = null;
this.resultKind = null;
this.data = null;
this.printStyle = PrintStyle.tableau(2147483647, "(NULL)", false, false);
}
private JobClient jobClient = null;
private TableSchema tableSchema = null;
private ResultKind resultKind = null;
private CloseableIterator<Row> data = null;
private PrintStyle printStyle =
PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false);
private Builder() {}
/**
* Specifies job client which associates the submitted Flink job.
*
* @param jobClient a {@link JobClient} for the submitted Flink job.
*/
public Builder jobClient(JobClient jobClient) {
this.jobClient = jobClient;
return this;
}
/**
* Specifies table schema of the execution result.
*
* @param tableSchema a {@link TableSchema} for the execution result.
*/
public Builder tableSchema(TableSchema tableSchema) {
Preconditions.checkNotNull(tableSchema, "tableSchema should not be null");
this.tableSchema = tableSchema;
return this;
}
/**
* Specifies result kind of the execution result.
*
* @param resultKind a {@link ResultKind} for the execution result.
*/
public Builder resultKind(ResultKind resultKind) {
Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.resultKind = resultKind;
return this;
}
/**
* Specifies an row iterator as the execution result.
*
* @param rowIterator a row iterator as the execution result.
*/
public Builder data(CloseableIterator<Row> rowIterator) {
Preconditions.checkNotNull(rowIterator, "rowIterator should not be null");
this.data = rowIterator;
return this;
}
/**
* Specifies an row list as the execution result.
*
* @param rowList a row list as the execution result.
*/
public Builder data(List<Row> rowList) {
Preconditions.checkNotNull(rowList, "listRows should not be null");
this.data = CloseableIterator.adapterForIterator(rowList.iterator());
return this;
}
/** Specifies print style. Default is {@link TableauStyle} with max integer column width. */
public Builder setPrintStyle(PrintStyle printStyle) {
Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.printStyle = printStyle;
return this;
}
/** Returns a {@link TableResult} instance. */
public TableResult build() {
return new CustomTableResultImpl(this.jobClient, this.tableSchema, this.resultKind, this.data, this.printStyle);
return new CustomTableResultImpl(jobClient, tableSchema, resultKind, data, printStyle);
}
}
/** Root interface for all print styles. */
public interface PrintStyle {
/**
* Create a tableau print style with given max column width, null column, and a flag to
* indicate whether the column width is derived from type (true) or content (false), which
* prints the result schema and content as tableau form.
*/
static PrintStyle tableau(
int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType) {
Preconditions.checkArgument(
maxColumnWidth > 0, "maxColumnWidth should be greater than 0");
Preconditions.checkNotNull(nullColumn, "nullColumn should not be null");
return new TableauStyle(maxColumnWidth, nullColumn, deriveColumnWidthByType);
}
/**
* Create a raw content print style, which only print the result content as raw form. column
* delimiter is ",", row delimiter is "\n".
*/
static PrintStyle rawContent() {
return new RawContentStyle();
}
}
/** print the result schema and content as tableau form. */
private static final class TableauStyle implements PrintStyle {
/**
* A flag to indicate whether the column width is derived from type (true) or content
* (false).
*/
private final boolean deriveColumnWidthByType;
private final int maxColumnWidth;
private final String nullColumn;
private TableauStyle(
int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType) {
this.deriveColumnWidthByType = deriveColumnWidthByType;
this.maxColumnWidth = maxColumnWidth;
this.nullColumn = nullColumn;
}
public boolean isDeriveColumnWidthByType() {
return deriveColumnWidthByType;
}
int getMaxColumnWidth() {
return maxColumnWidth;
}
String getNullColumn() {
return nullColumn;
}
}
/**
* only print the result content as raw form. column delimiter is ",", row delimiter is "\n".
*/
private static final class RawContentStyle implements PrintStyle {}
}
......@@ -48,7 +48,55 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId>
<version>${flink.version}</version>
&lt;!&ndash;<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
</exclusions>&ndash;&gt;
&lt;!&ndash;<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</exclusion>
</exclusions>&ndash;&gt;
</dependency>-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
......
......@@ -7,11 +7,13 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
......@@ -22,6 +24,7 @@ import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
......@@ -157,6 +160,39 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
}
}
public JobGraph getJobGraphFromInserts(List<String> statements) {
List<ModifyOperation> modifyOperations = new ArrayList();
for(String statement : statements){
if(useSqlFragment) {
statement = sqlManager.parseVariable(statement);
if (statement.length() == 0) {
throw new TableException("This is a sql fragment.");
}
}
if (checkShowFragments(statement)) {
throw new TableException("'SHOW FRAGMENTS' can't be add inserts.");
}
List<Operation> operations = getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Only single statement is supported.");
} else {
Operation operation = (Operation)operations.get(0);
if (operation instanceof ModifyOperation) {
modifyOperations.add((ModifyOperation)operation);
} else {
throw new TableException("Only insert statement is supported now.");
}
}
}
List<Transformation<?>> trans = getPlanner().translate(modifyOperations);
if(execEnv instanceof ExecutorBase){
StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(((ExecutorBase) execEnv).getExecutionEnvironment(), trans);
return streamGraph.getJobGraph();
}else{
throw new TableException("Unsupported SQL query! ExecEnv need a ExecutorBase.");
}
}
public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
SqlExplainResult record = new SqlExplainResult();
if(useSqlFragment) {
......@@ -277,4 +313,7 @@ public class CustomTableEnvironmentImpl extends TableEnvironmentImpl {
this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
}
public Parser getParser(){
return super.parser;
}
}
......@@ -2,7 +2,11 @@ package com.dlink.executor.custom;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
......@@ -14,29 +18,46 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* 定制TableResultImpl
* 定制CustomTableResultImpl
* @author wenmo
* @since 2021/6/7 22:06
**/
@Internal
class CustomTableResultImpl implements TableResult {
public static final TableResult TABLE_RESULT_OK;
public static final TableResult TABLE_RESULT_OK =
CustomTableResultImpl.builder()
.resultKind(ResultKind.SUCCESS)
.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
.data(Collections.singletonList(Row.of("OK")))
.build();
private final JobClient jobClient;
private final TableSchema tableSchema;
private final ResultKind resultKind;
private final CloseableRowIteratorWrapper data;
private final PrintStyle printStyle;
private CustomTableResultImpl(@Nullable JobClient jobClient, TableSchema tableSchema, ResultKind resultKind, CloseableIterator<Row> data, PrintStyle printStyle) {
private CustomTableResultImpl(
@Nullable JobClient jobClient,
TableSchema tableSchema,
ResultKind resultKind,
CloseableIterator<Row> data,
PrintStyle printStyle) {
this.jobClient = jobClient;
this.tableSchema = (TableSchema) Preconditions.checkNotNull(tableSchema, "tableSchema should not be null");
this.resultKind = (ResultKind)Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.tableSchema =
Preconditions.checkNotNull(tableSchema, "tableSchema should not be null");
this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind should not be null");
Preconditions.checkNotNull(data, "data should not be null");
this.data = new CloseableRowIteratorWrapper(data);
this.printStyle = (PrintStyle)Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.printStyle = Preconditions.checkNotNull(printStyle, "printStyle should not be null");
}
public static TableResult buildTableResult(List<TableSchemaField> fields,List<Row> rows){
......@@ -51,134 +72,227 @@ class CustomTableResultImpl implements TableResult {
return builder.build();
}
@Override
public Optional<JobClient> getJobClient() {
return Optional.ofNullable(this.jobClient);
return Optional.ofNullable(jobClient);
}
@Override
public void await() throws InterruptedException, ExecutionException {
try {
this.awaitInternal(-1L, TimeUnit.MILLISECONDS);
} catch (TimeoutException var2) {
;
awaitInternal(-1, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// do nothing
}
}
public void await(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
this.awaitInternal(timeout, unit);
@Override
public void await(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
awaitInternal(timeout, unit);
}
private void awaitInternal(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (this.jobClient != null) {
ExecutorService executor = Executors.newFixedThreadPool(1, (r) -> {
return new Thread(r, "TableResult-await-thread");
});
try {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
while(!this.data.isFirstRowReady()) {
try {
Thread.sleep(100L);
} catch (InterruptedException var2) {
throw new TableException("Thread is interrupted");
}
}
}, executor);
if (timeout >= 0L) {
future.get(timeout, unit);
} else {
future.get();
}
} finally {
executor.shutdown();
}
private void awaitInternal(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (jobClient == null) {
return;
}
ExecutorService executor =
Executors.newFixedThreadPool(1, r -> new Thread(r, "TableResult-await-thread"));
try {
CompletableFuture<Void> future =
CompletableFuture.runAsync(
() -> {
while (!data.isFirstRowReady()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new TableException("Thread is interrupted");
}
}
},
executor);
if (timeout >= 0) {
future.get(timeout, unit);
} else {
future.get();
}
} finally {
executor.shutdown();
}
}
@Override
public TableSchema getTableSchema() {
return this.tableSchema;
return tableSchema;
}
@Override
public ResultKind getResultKind() {
return this.resultKind;
return resultKind;
}
@Override
public CloseableIterator<Row> collect() {
return this.data;
return data;
}
@Override
public void print() {
Iterator<Row> it = this.collect();
if (this.printStyle instanceof TableauStyle) {
int maxColumnWidth = ((TableauStyle)this.printStyle).getMaxColumnWidth();
String nullColumn = ((TableauStyle)this.printStyle).getNullColumn();
boolean deriveColumnWidthByType = ((TableauStyle)this.printStyle).isDeriveColumnWidthByType();
boolean printRowKind = ((TableauStyle)this.printStyle).isPrintRowKind();
PrintUtils.printAsTableauForm(this.getTableSchema(), it, new PrintWriter(System.out), maxColumnWidth, nullColumn, deriveColumnWidthByType, printRowKind);
} else {
if (!(this.printStyle instanceof RawContentStyle)) {
throw new TableException("Unsupported print style: " + this.printStyle);
}
while(it.hasNext()) {
System.out.println(String.join(",", PrintUtils.rowToString((Row)it.next())));
Iterator<Row> it = collect();
if (printStyle instanceof TableauStyle) {
int maxColumnWidth = ((TableauStyle) printStyle).getMaxColumnWidth();
String nullColumn = ((TableauStyle) printStyle).getNullColumn();
boolean deriveColumnWidthByType =
((TableauStyle) printStyle).isDeriveColumnWidthByType();
boolean printRowKind = ((TableauStyle) printStyle).isPrintRowKind();
PrintUtils.printAsTableauForm(
getTableSchema(),
it,
new PrintWriter(System.out),
maxColumnWidth,
nullColumn,
deriveColumnWidthByType,
printRowKind);
} else if (printStyle instanceof RawContentStyle) {
while (it.hasNext()) {
System.out.println(String.join(",", PrintUtils.rowToString(it.next())));
}
} else {
throw new TableException("Unsupported print style: " + printStyle);
}
}
public static Builder builder() {
return new Builder();
}
static {
TABLE_RESULT_OK = builder().resultKind(ResultKind.SUCCESS).tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build()).data(Collections.singletonList(Row.of(new Object[]{"OK"}))).build();
}
/** Builder for creating a {@link CustomTableResultImpl}. */
public static class Builder {
private JobClient jobClient = null;
private TableSchema tableSchema = null;
private ResultKind resultKind = null;
private CloseableIterator<Row> data = null;
private PrintStyle printStyle =
PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false, false);
private Builder() {}
/**
* Specifies job client which associates the submitted Flink job.
*
* @param jobClient a {@link JobClient} for the submitted Flink job.
*/
public Builder jobClient(JobClient jobClient) {
this.jobClient = jobClient;
return this;
}
private static final class CloseableRowIteratorWrapper implements CloseableIterator<Row> {
private final CloseableIterator<Row> iterator;
private boolean isFirstRowReady;
/**
* Specifies table schema of the execution result.
*
* @param tableSchema a {@link TableSchema} for the execution result.
*/
public Builder tableSchema(TableSchema tableSchema) {
Preconditions.checkNotNull(tableSchema, "tableSchema should not be null");
this.tableSchema = tableSchema;
return this;
}
private CloseableRowIteratorWrapper(CloseableIterator<Row> iterator) {
this.isFirstRowReady = false;
this.iterator = iterator;
/**
* Specifies result kind of the execution result.
*
* @param resultKind a {@link ResultKind} for the execution result.
*/
public Builder resultKind(ResultKind resultKind) {
Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.resultKind = resultKind;
return this;
}
public void close() throws Exception {
this.iterator.close();
/**
* Specifies an row iterator as the execution result.
*
* @param rowIterator a row iterator as the execution result.
*/
public Builder data(CloseableIterator<Row> rowIterator) {
Preconditions.checkNotNull(rowIterator, "rowIterator should not be null");
this.data = rowIterator;
return this;
}
public boolean hasNext() {
boolean hasNext = this.iterator.hasNext();
this.isFirstRowReady = this.isFirstRowReady || hasNext;
return hasNext;
/**
* Specifies an row list as the execution result.
*
* @param rowList a row list as the execution result.
*/
public Builder data(List<Row> rowList) {
Preconditions.checkNotNull(rowList, "listRows should not be null");
this.data = CloseableIterator.adapterForIterator(rowList.iterator());
return this;
}
public Row next() {
Row next = (Row)this.iterator.next();
this.isFirstRowReady = true;
return next;
/** Specifies print style. Default is {@link TableauStyle} with max integer column width. */
public Builder setPrintStyle(PrintStyle printStyle) {
Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.printStyle = printStyle;
return this;
}
public boolean isFirstRowReady() {
return this.isFirstRowReady || this.hasNext();
/** Returns a {@link TableResult} instance. */
public TableResult build() {
return new CustomTableResultImpl(jobClient, tableSchema, resultKind, data, printStyle);
}
}
private static final class RawContentStyle implements PrintStyle {
private RawContentStyle() {
/** Root interface for all print styles. */
public interface PrintStyle {
/**
* Create a tableau print style with given max column width, null column, change mode
* indicator and a flag to indicate whether the column width is derived from type (true) or
* content (false), which prints the result schema and content as tableau form.
*/
static PrintStyle tableau(
int maxColumnWidth,
String nullColumn,
boolean deriveColumnWidthByType,
boolean printRowKind) {
Preconditions.checkArgument(
maxColumnWidth > 0, "maxColumnWidth should be greater than 0");
Preconditions.checkNotNull(nullColumn, "nullColumn should not be null");
return new TableauStyle(
maxColumnWidth, nullColumn, deriveColumnWidthByType, printRowKind);
}
/**
* Create a raw content print style, which only print the result content as raw form. column
* delimiter is ",", row delimiter is "\n".
*/
static PrintStyle rawContent() {
return new RawContentStyle();
}
}
/** print the result schema and content as tableau form. */
private static final class TableauStyle implements PrintStyle {
/**
* A flag to indicate whether the column width is derived from type (true) or content
* (false).
*/
private final boolean deriveColumnWidthByType;
private final int maxColumnWidth;
private final String nullColumn;
/** A flag to indicate whether print row kind info. */
private final boolean printRowKind;
private TableauStyle(int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType, boolean printRowKind) {
private TableauStyle(
int maxColumnWidth,
String nullColumn,
boolean deriveColumnWidthByType,
boolean printRowKind) {
this.deriveColumnWidthByType = deriveColumnWidthByType;
this.maxColumnWidth = maxColumnWidth;
this.nullColumn = nullColumn;
......@@ -186,86 +300,64 @@ class CustomTableResultImpl implements TableResult {
}
public boolean isDeriveColumnWidthByType() {
return this.deriveColumnWidthByType;
return deriveColumnWidthByType;
}
int getMaxColumnWidth() {
return this.maxColumnWidth;
return maxColumnWidth;
}
String getNullColumn() {
return this.nullColumn;
return nullColumn;
}
public boolean isPrintRowKind() {
return this.printRowKind;
return printRowKind;
}
}
public interface PrintStyle {
static PrintStyle tableau(int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType, boolean printRowKind) {
Preconditions.checkArgument(maxColumnWidth > 0, "maxColumnWidth should be greater than 0");
Preconditions.checkNotNull(nullColumn, "nullColumn should not be null");
return new TableauStyle(maxColumnWidth, nullColumn, deriveColumnWidthByType, printRowKind);
}
static PrintStyle rawContent() {
return new RawContentStyle();
}
}
public static class Builder {
private JobClient jobClient;
private TableSchema tableSchema;
private ResultKind resultKind;
private CloseableIterator<Row> data;
private PrintStyle printStyle;
private Builder() {
this.jobClient = null;
this.tableSchema = null;
this.resultKind = null;
this.data = null;
this.printStyle = PrintStyle.tableau(2147483647, "(NULL)", false, false);
}
public Builder jobClient(JobClient jobClient) {
this.jobClient = jobClient;
return this;
}
public Builder tableSchema(TableSchema tableSchema) {
Preconditions.checkNotNull(tableSchema, "tableSchema should not be null");
this.tableSchema = tableSchema;
return this;
}
/**
* only print the result content as raw form. column delimiter is ",", row delimiter is "\n".
*/
private static final class RawContentStyle implements PrintStyle {}
/**
* A {@link CloseableIterator} wrapper class that can return whether the first row is ready.
*
* <p>The first row is ready when {@link #hasNext} method returns true or {@link #next()} method
* returns a row. The execution order of {@link TableResult#collect} method and {@link
* TableResult#await()} may be arbitrary, this class will record whether the first row is ready
* (or accessed).
*/
private static final class CloseableRowIteratorWrapper implements CloseableIterator<Row> {
private final CloseableIterator<Row> iterator;
private boolean isFirstRowReady = false;
public Builder resultKind(ResultKind resultKind) {
Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.resultKind = resultKind;
return this;
private CloseableRowIteratorWrapper(CloseableIterator<Row> iterator) {
this.iterator = iterator;
}
public Builder data(CloseableIterator<Row> rowIterator) {
Preconditions.checkNotNull(rowIterator, "rowIterator should not be null");
this.data = rowIterator;
return this;
@Override
public void close() throws Exception {
iterator.close();
}
public Builder data(List<Row> rowList) {
Preconditions.checkNotNull(rowList, "listRows should not be null");
this.data = CloseableIterator.adapterForIterator(rowList.iterator());
return this;
@Override
public boolean hasNext() {
boolean hasNext = iterator.hasNext();
isFirstRowReady = isFirstRowReady || hasNext;
return hasNext;
}
public Builder setPrintStyle(PrintStyle printStyle) {
Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.printStyle = printStyle;
return this;
@Override
public Row next() {
Row next = iterator.next();
isFirstRowReady = true;
return next;
}
public TableResult build() {
return new CustomTableResultImpl(this.jobClient, this.tableSchema, this.resultKind, this.data, this.printStyle);
public boolean isFirstRowReady() {
return isFirstRowReady || hasNext();
}
}
}
......@@ -3,9 +3,13 @@ package com.dlink.executor.custom;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExpressionParserException;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
......@@ -26,6 +30,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public final class SqlManager {
private Map<String, String> sqlFragments;
private List<ModifyOperation> operations = new ArrayList();
static final String SHOW_FRAGMENTS = "SHOW FRAGMENTS";
public SqlManager() {
......@@ -191,4 +197,18 @@ public final class SqlManager {
m.appendTail(sb);
return sb.toString();
}
public void addInsertSql(String statement,CustomTableEnvironmentImpl tableEnvironment) {
List<Operation> operations = tableEnvironment.getParser().parse(statement);
if (operations.size() != 1) {
throw new TableException("Only single statement is supported.");
} else {
Operation operation = (Operation)operations.get(0);
if (operation instanceof ModifyOperation) {
this.operations.add((ModifyOperation)operation);
} else {
throw new TableException("Only insert statement is supported now.");
}
}
}
}
......@@ -2,7 +2,10 @@ package com.dlink.executor.custom;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.types.DataType;
......@@ -14,32 +17,55 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.PrintWriter;
import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* 定制TableResultImpl
* 定制CustomTableResultImpl
* @author wenmo
* @since 2021/6/7 22:06
**/
@Internal
public class CustomTableResultImpl implements TableResult {
public static final TableResult TABLE_RESULT_OK;
public static final TableResult TABLE_RESULT_OK =
CustomTableResultImpl.builder()
.resultKind(ResultKind.SUCCESS)
.schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING())))
.data(Collections.singletonList(Row.of("OK")))
.build();
private final JobClient jobClient;
private final ResolvedSchema resolvedSchema;
private final ResultKind resultKind;
private final CustomTableResultImpl.CloseableRowIteratorWrapper data;
private final CustomTableResultImpl.PrintStyle printStyle;
private final CloseableRowIteratorWrapper data;
private final PrintStyle printStyle;
private final ZoneId sessionTimeZone;
private CustomTableResultImpl(@Nullable JobClient jobClient, ResolvedSchema resolvedSchema, ResultKind resultKind, CloseableIterator<Row> data, CustomTableResultImpl.PrintStyle printStyle, ZoneId sessionTimeZone) {
private CustomTableResultImpl(
@Nullable JobClient jobClient,
ResolvedSchema resolvedSchema,
ResultKind resultKind,
CloseableIterator<Row> data,
PrintStyle printStyle,
ZoneId sessionTimeZone) {
this.jobClient = jobClient;
this.resolvedSchema = (ResolvedSchema)Preconditions.checkNotNull(resolvedSchema, "resolvedSchema should not be null");
this.resultKind = (ResultKind)Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.resolvedSchema =
Preconditions.checkNotNull(resolvedSchema, "resolvedSchema should not be null");
this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind should not be null");
Preconditions.checkNotNull(data, "data should not be null");
this.data = new CustomTableResultImpl.CloseableRowIteratorWrapper(data);
this.printStyle = (CustomTableResultImpl.PrintStyle)Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.sessionTimeZone = (ZoneId)Preconditions.checkNotNull(sessionTimeZone, "sessionTimeZone should not be null");
this.data = new CloseableRowIteratorWrapper(data);
this.printStyle = Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.sessionTimeZone =
Preconditions.checkNotNull(sessionTimeZone, "sessionTimeZone should not be null");
}
public static TableResult buildTableResult(List<TableSchemaField> fields,List<Row> rows){
......@@ -56,134 +82,241 @@ public class CustomTableResultImpl implements TableResult {
return builder.build();
}
@Override
public Optional<JobClient> getJobClient() {
return Optional.ofNullable(this.jobClient);
return Optional.ofNullable(jobClient);
}
@Override
public void await() throws InterruptedException, ExecutionException {
try {
this.awaitInternal(-1L, TimeUnit.MILLISECONDS);
} catch (TimeoutException var2) {
;
awaitInternal(-1, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// do nothing
}
}
public void await(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
this.awaitInternal(timeout, unit);
@Override
public void await(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
awaitInternal(timeout, unit);
}
private void awaitInternal(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (this.jobClient != null) {
ExecutorService executor = Executors.newFixedThreadPool(1, (r) -> {
return new Thread(r, "TableResult-await-thread");
});
try {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
while(!this.data.isFirstRowReady()) {
try {
Thread.sleep(100L);
} catch (InterruptedException var2) {
throw new TableException("Thread is interrupted");
}
}
}, executor);
if (timeout >= 0L) {
future.get(timeout, unit);
} else {
future.get();
}
} finally {
executor.shutdown();
}
private void awaitInternal(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (jobClient == null) {
return;
}
ExecutorService executor =
Executors.newFixedThreadPool(1, r -> new Thread(r, "TableResult-await-thread"));
try {
CompletableFuture<Void> future =
CompletableFuture.runAsync(
() -> {
while (!data.isFirstRowReady()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new TableException("Thread is interrupted");
}
}
},
executor);
if (timeout >= 0) {
future.get(timeout, unit);
} else {
future.get();
}
} finally {
executor.shutdown();
}
}
@Override
public ResolvedSchema getResolvedSchema() {
return this.resolvedSchema;
return resolvedSchema;
}
@Override
public ResultKind getResultKind() {
return this.resultKind;
return resultKind;
}
@Override
public CloseableIterator<Row> collect() {
return this.data;
return data;
}
@Override
public void print() {
Iterator<Row> it = this.collect();
if (this.printStyle instanceof CustomTableResultImpl.TableauStyle) {
int maxColumnWidth = ((CustomTableResultImpl.TableauStyle)this.printStyle).getMaxColumnWidth();
String nullColumn = ((CustomTableResultImpl.TableauStyle)this.printStyle).getNullColumn();
boolean deriveColumnWidthByType = ((CustomTableResultImpl.TableauStyle)this.printStyle).isDeriveColumnWidthByType();
boolean printRowKind = ((CustomTableResultImpl.TableauStyle)this.printStyle).isPrintRowKind();
PrintUtils.printAsTableauForm(this.getResolvedSchema(), it, new PrintWriter(System.out), maxColumnWidth, nullColumn, deriveColumnWidthByType, printRowKind, this.sessionTimeZone);
} else {
if (!(this.printStyle instanceof CustomTableResultImpl.RawContentStyle)) {
throw new TableException("Unsupported print style: " + this.printStyle);
}
while(it.hasNext()) {
System.out.println(String.join(",", PrintUtils.rowToString((Row)it.next(), this.getResolvedSchema(), this.sessionTimeZone)));
Iterator<Row> it = collect();
if (printStyle instanceof TableauStyle) {
int maxColumnWidth = ((TableauStyle) printStyle).getMaxColumnWidth();
String nullColumn = ((TableauStyle) printStyle).getNullColumn();
boolean deriveColumnWidthByType =
((TableauStyle) printStyle).isDeriveColumnWidthByType();
boolean printRowKind = ((TableauStyle) printStyle).isPrintRowKind();
PrintUtils.printAsTableauForm(
getResolvedSchema(),
it,
new PrintWriter(System.out),
maxColumnWidth,
nullColumn,
deriveColumnWidthByType,
printRowKind,
sessionTimeZone);
} else if (printStyle instanceof RawContentStyle) {
while (it.hasNext()) {
System.out.println(
String.join(
",",
PrintUtils.rowToString(
it.next(), getResolvedSchema(), sessionTimeZone)));
}
} else {
throw new TableException("Unsupported print style: " + printStyle);
}
}
public static CustomTableResultImpl.Builder builder() {
return new CustomTableResultImpl.Builder();
public static Builder builder() {
return new Builder();
}
static {
TABLE_RESULT_OK = builder().resultKind(ResultKind.SUCCESS).schema(ResolvedSchema.of(new Column[]{Column.physical("result", DataTypes.STRING())})).data(Collections.singletonList(Row.of(new Object[]{"OK"}))).build();
}
/** Builder for creating a {@link CustomTableResultImpl}. */
public static class Builder {
private JobClient jobClient = null;
private ResolvedSchema resolvedSchema = null;
private ResultKind resultKind = null;
private CloseableIterator<Row> data = null;
private PrintStyle printStyle =
PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false, false);
private ZoneId sessionTimeZone = ZoneId.of("UTC");
private Builder() {}
/**
* Specifies job client which associates the submitted Flink job.
*
* @param jobClient a {@link JobClient} for the submitted Flink job.
*/
public Builder jobClient(JobClient jobClient) {
this.jobClient = jobClient;
return this;
}
private static final class CloseableRowIteratorWrapper implements CloseableIterator<Row> {
private final CloseableIterator<Row> iterator;
private boolean isFirstRowReady;
/**
* Specifies schema of the execution result.
*
* @param resolvedSchema a {@link ResolvedSchema} for the execution result.
*/
public Builder schema(ResolvedSchema resolvedSchema) {
Preconditions.checkNotNull(resolvedSchema, "resolvedSchema should not be null");
this.resolvedSchema = resolvedSchema;
return this;
}
private CloseableRowIteratorWrapper(CloseableIterator<Row> iterator) {
this.isFirstRowReady = false;
this.iterator = iterator;
/**
* Specifies result kind of the execution result.
*
* @param resultKind a {@link ResultKind} for the execution result.
*/
public Builder resultKind(ResultKind resultKind) {
Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.resultKind = resultKind;
return this;
}
public void close() throws Exception {
this.iterator.close();
/**
* Specifies an row iterator as the execution result.
*
* @param rowIterator a row iterator as the execution result.
*/
public Builder data(CloseableIterator<Row> rowIterator) {
Preconditions.checkNotNull(rowIterator, "rowIterator should not be null");
this.data = rowIterator;
return this;
}
public boolean hasNext() {
boolean hasNext = this.iterator.hasNext();
this.isFirstRowReady = this.isFirstRowReady || hasNext;
return hasNext;
/**
* Specifies an row list as the execution result.
*
* @param rowList a row list as the execution result.
*/
public Builder data(List<Row> rowList) {
Preconditions.checkNotNull(rowList, "listRows should not be null");
this.data = CloseableIterator.adapterForIterator(rowList.iterator());
return this;
}
public Row next() {
Row next = (Row)this.iterator.next();
this.isFirstRowReady = true;
return next;
/** Specifies print style. Default is {@link TableauStyle} with max integer column width. */
public Builder setPrintStyle(PrintStyle printStyle) {
Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.printStyle = printStyle;
return this;
}
public boolean isFirstRowReady() {
return this.isFirstRowReady || this.hasNext();
/** Specifies session time zone. */
public Builder setSessionTimeZone(ZoneId sessionTimeZone) {
Preconditions.checkNotNull(sessionTimeZone, "sessionTimeZone should not be null");
this.sessionTimeZone = sessionTimeZone;
return this;
}
/** Returns a {@link TableResult} instance. */
public TableResult build() {
return new CustomTableResultImpl(
jobClient, resolvedSchema, resultKind, data, printStyle, sessionTimeZone);
}
}
private static final class RawContentStyle implements CustomTableResultImpl.PrintStyle {
private RawContentStyle() {
/** Root interface for all print styles. */
public interface PrintStyle {
/**
* Create a tableau print style with given max column width, null column, change mode
* indicator and a flag to indicate whether the column width is derived from type (true) or
* content (false), which prints the result schema and content as tableau form.
*/
static PrintStyle tableau(
int maxColumnWidth,
String nullColumn,
boolean deriveColumnWidthByType,
boolean printRowKind) {
Preconditions.checkArgument(
maxColumnWidth > 0, "maxColumnWidth should be greater than 0");
Preconditions.checkNotNull(nullColumn, "nullColumn should not be null");
return new TableauStyle(
maxColumnWidth, nullColumn, deriveColumnWidthByType, printRowKind);
}
/**
* Create a raw content print style, which only print the result content as raw form. column
* delimiter is ",", row delimiter is "\n".
*/
static PrintStyle rawContent() {
return new RawContentStyle();
}
}
private static final class TableauStyle implements CustomTableResultImpl.PrintStyle {
/** print the result schema and content as tableau form. */
private static final class TableauStyle implements PrintStyle {
/**
* A flag to indicate whether the column width is derived from type (true) or content
* (false).
*/
private final boolean deriveColumnWidthByType;
private final int maxColumnWidth;
private final String nullColumn;
/** A flag to indicate whether print row kind info. */
private final boolean printRowKind;
private TableauStyle(int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType, boolean printRowKind) {
private TableauStyle(
int maxColumnWidth,
String nullColumn,
boolean deriveColumnWidthByType,
boolean printRowKind) {
this.deriveColumnWidthByType = deriveColumnWidthByType;
this.maxColumnWidth = maxColumnWidth;
this.nullColumn = nullColumn;
......@@ -191,94 +324,64 @@ public class CustomTableResultImpl implements TableResult {
}
public boolean isDeriveColumnWidthByType() {
return this.deriveColumnWidthByType;
return deriveColumnWidthByType;
}
int getMaxColumnWidth() {
return this.maxColumnWidth;
return maxColumnWidth;
}
String getNullColumn() {
return this.nullColumn;
return nullColumn;
}
public boolean isPrintRowKind() {
return this.printRowKind;
return printRowKind;
}
}
public interface PrintStyle {
static CustomTableResultImpl.PrintStyle tableau(int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType, boolean printRowKind) {
Preconditions.checkArgument(maxColumnWidth > 0, "maxColumnWidth should be greater than 0");
Preconditions.checkNotNull(nullColumn, "nullColumn should not be null");
return new CustomTableResultImpl.TableauStyle(maxColumnWidth, nullColumn, deriveColumnWidthByType, printRowKind);
}
static CustomTableResultImpl.PrintStyle rawContent() {
return new CustomTableResultImpl.RawContentStyle();
}
}
public static class Builder {
private JobClient jobClient;
private ResolvedSchema resolvedSchema;
private ResultKind resultKind;
private CloseableIterator<Row> data;
private CustomTableResultImpl.PrintStyle printStyle;
private ZoneId sessionTimeZone;
private Builder() {
this.jobClient = null;
this.resolvedSchema = null;
this.resultKind = null;
this.data = null;
this.printStyle = CustomTableResultImpl.PrintStyle.tableau(2147483647, "(NULL)", false, false);
this.sessionTimeZone = ZoneId.of("UTC");
}
public CustomTableResultImpl.Builder jobClient(JobClient jobClient) {
this.jobClient = jobClient;
return this;
}
public CustomTableResultImpl.Builder schema(ResolvedSchema resolvedSchema) {
Preconditions.checkNotNull(resolvedSchema, "resolvedSchema should not be null");
this.resolvedSchema = resolvedSchema;
return this;
}
public CustomTableResultImpl.Builder resultKind(ResultKind resultKind) {
Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.resultKind = resultKind;
return this;
}
/**
* only print the result content as raw form. column delimiter is ",", row delimiter is "\n".
*/
private static final class RawContentStyle implements PrintStyle {}
/**
* A {@link CloseableIterator} wrapper class that can return whether the first row is ready.
*
* <p>The first row is ready when {@link #hasNext} method returns true or {@link #next()} method
* returns a row. The execution order of {@link TableResult#collect} method and {@link
* TableResult#await()} may be arbitrary, this class will record whether the first row is ready
* (or accessed).
*/
private static final class CloseableRowIteratorWrapper implements CloseableIterator<Row> {
private final CloseableIterator<Row> iterator;
private boolean isFirstRowReady = false;
public CustomTableResultImpl.Builder data(CloseableIterator<Row> rowIterator) {
Preconditions.checkNotNull(rowIterator, "rowIterator should not be null");
this.data = rowIterator;
return this;
private CloseableRowIteratorWrapper(CloseableIterator<Row> iterator) {
this.iterator = iterator;
}
public CustomTableResultImpl.Builder data(List<Row> rowList) {
Preconditions.checkNotNull(rowList, "listRows should not be null");
this.data = CloseableIterator.adapterForIterator(rowList.iterator());
return this;
@Override
public void close() throws Exception {
iterator.close();
}
public CustomTableResultImpl.Builder setPrintStyle(CustomTableResultImpl.PrintStyle printStyle) {
Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.printStyle = printStyle;
return this;
@Override
public boolean hasNext() {
boolean hasNext = iterator.hasNext();
isFirstRowReady = isFirstRowReady || hasNext;
return hasNext;
}
public CustomTableResultImpl.Builder setSessionTimeZone(ZoneId sessionTimeZone) {
Preconditions.checkNotNull(sessionTimeZone, "sessionTimeZone should not be null");
this.sessionTimeZone = sessionTimeZone;
return this;
@Override
public Row next() {
Row next = iterator.next();
isFirstRowReady = true;
return next;
}
public TableResult build() {
return new CustomTableResultImpl(this.jobClient, this.resolvedSchema, this.resultKind, this.data, this.printStyle, this.sessionTimeZone);
public boolean isFirstRowReady() {
return isFirstRowReady || hasNext();
}
}
}
\ No newline at end of file
......@@ -36,7 +36,13 @@ import java.util.concurrent.TimeoutException;
**/
@Internal
public class CustomTableResultImpl implements TableResult {
public static final TableResult TABLE_RESULT_OK;
public static final TableResult TABLE_RESULT_OK =
CustomTableResultImpl.builder()
.resultKind(ResultKind.SUCCESS)
.schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING())))
.data(Collections.singletonList(Row.of("OK")))
.build();
private final JobClient jobClient;
private final ResolvedSchema resolvedSchema;
private final ResultKind resultKind;
......@@ -44,14 +50,22 @@ public class CustomTableResultImpl implements TableResult {
private final PrintStyle printStyle;
private final ZoneId sessionTimeZone;
private CustomTableResultImpl(@Nullable JobClient jobClient, ResolvedSchema resolvedSchema, ResultKind resultKind, CloseableIterator<Row> data, PrintStyle printStyle, ZoneId sessionTimeZone) {
private CustomTableResultImpl(
@Nullable JobClient jobClient,
ResolvedSchema resolvedSchema,
ResultKind resultKind,
CloseableIterator<Row> data,
PrintStyle printStyle,
ZoneId sessionTimeZone) {
this.jobClient = jobClient;
this.resolvedSchema = (ResolvedSchema)Preconditions.checkNotNull(resolvedSchema, "resolvedSchema should not be null");
this.resultKind = (ResultKind)Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.resolvedSchema =
Preconditions.checkNotNull(resolvedSchema, "resolvedSchema should not be null");
this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind should not be null");
Preconditions.checkNotNull(data, "data should not be null");
this.data = new CloseableRowIteratorWrapper(data);
this.printStyle = (PrintStyle)Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.sessionTimeZone = (ZoneId)Preconditions.checkNotNull(sessionTimeZone, "sessionTimeZone should not be null");
this.printStyle = Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.sessionTimeZone =
Preconditions.checkNotNull(sessionTimeZone, "sessionTimeZone should not be null");
}
public static TableResult buildTableResult(List<TableSchemaField> fields,List<Row> rows){
......@@ -68,134 +82,241 @@ public class CustomTableResultImpl implements TableResult {
return builder.build();
}
@Override
public Optional<JobClient> getJobClient() {
return Optional.ofNullable(this.jobClient);
return Optional.ofNullable(jobClient);
}
@Override
public void await() throws InterruptedException, ExecutionException {
try {
this.awaitInternal(-1L, TimeUnit.MILLISECONDS);
} catch (TimeoutException var2) {
;
awaitInternal(-1, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// do nothing
}
}
public void await(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
this.awaitInternal(timeout, unit);
@Override
public void await(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
awaitInternal(timeout, unit);
}
private void awaitInternal(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (this.jobClient != null) {
ExecutorService executor = Executors.newFixedThreadPool(1, (r) -> {
return new Thread(r, "TableResult-await-thread");
});
try {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
while(!this.data.isFirstRowReady()) {
try {
Thread.sleep(100L);
} catch (InterruptedException var2) {
throw new TableException("Thread is interrupted");
}
}
}, executor);
if (timeout >= 0L) {
future.get(timeout, unit);
} else {
future.get();
}
} finally {
executor.shutdown();
}
private void awaitInternal(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (jobClient == null) {
return;
}
ExecutorService executor =
Executors.newFixedThreadPool(1, r -> new Thread(r, "TableResult-await-thread"));
try {
CompletableFuture<Void> future =
CompletableFuture.runAsync(
() -> {
while (!data.isFirstRowReady()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new TableException("Thread is interrupted");
}
}
},
executor);
if (timeout >= 0) {
future.get(timeout, unit);
} else {
future.get();
}
} finally {
executor.shutdown();
}
}
@Override
public ResolvedSchema getResolvedSchema() {
return this.resolvedSchema;
return resolvedSchema;
}
@Override
public ResultKind getResultKind() {
return this.resultKind;
return resultKind;
}
@Override
public CloseableIterator<Row> collect() {
return this.data;
return data;
}
@Override
public void print() {
Iterator<Row> it = this.collect();
if (this.printStyle instanceof TableauStyle) {
int maxColumnWidth = ((TableauStyle)this.printStyle).getMaxColumnWidth();
String nullColumn = ((TableauStyle)this.printStyle).getNullColumn();
boolean deriveColumnWidthByType = ((TableauStyle)this.printStyle).isDeriveColumnWidthByType();
boolean printRowKind = ((TableauStyle)this.printStyle).isPrintRowKind();
PrintUtils.printAsTableauForm(this.getResolvedSchema(), it, new PrintWriter(System.out), maxColumnWidth, nullColumn, deriveColumnWidthByType, printRowKind, this.sessionTimeZone);
} else {
if (!(this.printStyle instanceof RawContentStyle)) {
throw new TableException("Unsupported print style: " + this.printStyle);
}
while(it.hasNext()) {
System.out.println(String.join(",", PrintUtils.rowToString((Row)it.next(), this.getResolvedSchema(), this.sessionTimeZone)));
Iterator<Row> it = collect();
if (printStyle instanceof TableauStyle) {
int maxColumnWidth = ((TableauStyle) printStyle).getMaxColumnWidth();
String nullColumn = ((TableauStyle) printStyle).getNullColumn();
boolean deriveColumnWidthByType =
((TableauStyle) printStyle).isDeriveColumnWidthByType();
boolean printRowKind = ((TableauStyle) printStyle).isPrintRowKind();
PrintUtils.printAsTableauForm(
getResolvedSchema(),
it,
new PrintWriter(System.out),
maxColumnWidth,
nullColumn,
deriveColumnWidthByType,
printRowKind,
sessionTimeZone);
} else if (printStyle instanceof RawContentStyle) {
while (it.hasNext()) {
System.out.println(
String.join(
",",
PrintUtils.rowToString(
it.next(), getResolvedSchema(), sessionTimeZone)));
}
} else {
throw new TableException("Unsupported print style: " + printStyle);
}
}
public static Builder builder() {
return new Builder();
}
static {
TABLE_RESULT_OK = builder().resultKind(ResultKind.SUCCESS).schema(ResolvedSchema.of(new Column[]{Column.physical("result", DataTypes.STRING())})).data(Collections.singletonList(Row.of(new Object[]{"OK"}))).build();
}
/** Builder for creating a {@link CustomTableResultImpl}. */
public static class Builder {
private JobClient jobClient = null;
private ResolvedSchema resolvedSchema = null;
private ResultKind resultKind = null;
private CloseableIterator<Row> data = null;
private PrintStyle printStyle =
PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false, false);
private ZoneId sessionTimeZone = ZoneId.of("UTC");
private Builder() {}
/**
* Specifies job client which associates the submitted Flink job.
*
* @param jobClient a {@link JobClient} for the submitted Flink job.
*/
public Builder jobClient(JobClient jobClient) {
this.jobClient = jobClient;
return this;
}
private static final class CloseableRowIteratorWrapper implements CloseableIterator<Row> {
private final CloseableIterator<Row> iterator;
private boolean isFirstRowReady;
/**
* Specifies schema of the execution result.
*
* @param resolvedSchema a {@link ResolvedSchema} for the execution result.
*/
public Builder schema(ResolvedSchema resolvedSchema) {
Preconditions.checkNotNull(resolvedSchema, "resolvedSchema should not be null");
this.resolvedSchema = resolvedSchema;
return this;
}
private CloseableRowIteratorWrapper(CloseableIterator<Row> iterator) {
this.isFirstRowReady = false;
this.iterator = iterator;
/**
* Specifies result kind of the execution result.
*
* @param resultKind a {@link ResultKind} for the execution result.
*/
public Builder resultKind(ResultKind resultKind) {
Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.resultKind = resultKind;
return this;
}
public void close() throws Exception {
this.iterator.close();
/**
* Specifies an row iterator as the execution result.
*
* @param rowIterator a row iterator as the execution result.
*/
public Builder data(CloseableIterator<Row> rowIterator) {
Preconditions.checkNotNull(rowIterator, "rowIterator should not be null");
this.data = rowIterator;
return this;
}
public boolean hasNext() {
boolean hasNext = this.iterator.hasNext();
this.isFirstRowReady = this.isFirstRowReady || hasNext;
return hasNext;
/**
* Specifies an row list as the execution result.
*
* @param rowList a row list as the execution result.
*/
public Builder data(List<Row> rowList) {
Preconditions.checkNotNull(rowList, "listRows should not be null");
this.data = CloseableIterator.adapterForIterator(rowList.iterator());
return this;
}
public Row next() {
Row next = (Row)this.iterator.next();
this.isFirstRowReady = true;
return next;
/** Specifies print style. Default is {@link TableauStyle} with max integer column width. */
public Builder setPrintStyle(PrintStyle printStyle) {
Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.printStyle = printStyle;
return this;
}
public boolean isFirstRowReady() {
return this.isFirstRowReady || this.hasNext();
/** Specifies session time zone. */
public Builder setSessionTimeZone(ZoneId sessionTimeZone) {
Preconditions.checkNotNull(sessionTimeZone, "sessionTimeZone should not be null");
this.sessionTimeZone = sessionTimeZone;
return this;
}
/** Returns a {@link TableResult} instance. */
public TableResult build() {
return new CustomTableResultImpl(
jobClient, resolvedSchema, resultKind, data, printStyle, sessionTimeZone);
}
}
private static final class RawContentStyle implements PrintStyle {
private RawContentStyle() {
/** Root interface for all print styles. */
public interface PrintStyle {
/**
* Create a tableau print style with given max column width, null column, change mode
* indicator and a flag to indicate whether the column width is derived from type (true) or
* content (false), which prints the result schema and content as tableau form.
*/
static PrintStyle tableau(
int maxColumnWidth,
String nullColumn,
boolean deriveColumnWidthByType,
boolean printRowKind) {
Preconditions.checkArgument(
maxColumnWidth > 0, "maxColumnWidth should be greater than 0");
Preconditions.checkNotNull(nullColumn, "nullColumn should not be null");
return new TableauStyle(
maxColumnWidth, nullColumn, deriveColumnWidthByType, printRowKind);
}
/**
* Create a raw content print style, which only print the result content as raw form. column
* delimiter is ",", row delimiter is "\n".
*/
static PrintStyle rawContent() {
return new RawContentStyle();
}
}
/** print the result schema and content as tableau form. */
private static final class TableauStyle implements PrintStyle {
/**
* A flag to indicate whether the column width is derived from type (true) or content
* (false).
*/
private final boolean deriveColumnWidthByType;
private final int maxColumnWidth;
private final String nullColumn;
/** A flag to indicate whether print row kind info. */
private final boolean printRowKind;
private TableauStyle(int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType, boolean printRowKind) {
private TableauStyle(
int maxColumnWidth,
String nullColumn,
boolean deriveColumnWidthByType,
boolean printRowKind) {
this.deriveColumnWidthByType = deriveColumnWidthByType;
this.maxColumnWidth = maxColumnWidth;
this.nullColumn = nullColumn;
......@@ -203,94 +324,64 @@ public class CustomTableResultImpl implements TableResult {
}
public boolean isDeriveColumnWidthByType() {
return this.deriveColumnWidthByType;
return deriveColumnWidthByType;
}
int getMaxColumnWidth() {
return this.maxColumnWidth;
return maxColumnWidth;
}
String getNullColumn() {
return this.nullColumn;
return nullColumn;
}
public boolean isPrintRowKind() {
return this.printRowKind;
return printRowKind;
}
}
public interface PrintStyle {
static PrintStyle tableau(int maxColumnWidth, String nullColumn, boolean deriveColumnWidthByType, boolean printRowKind) {
Preconditions.checkArgument(maxColumnWidth > 0, "maxColumnWidth should be greater than 0");
Preconditions.checkNotNull(nullColumn, "nullColumn should not be null");
return new TableauStyle(maxColumnWidth, nullColumn, deriveColumnWidthByType, printRowKind);
}
static PrintStyle rawContent() {
return new RawContentStyle();
}
}
public static class Builder {
private JobClient jobClient;
private ResolvedSchema resolvedSchema;
private ResultKind resultKind;
private CloseableIterator<Row> data;
private PrintStyle printStyle;
private ZoneId sessionTimeZone;
private Builder() {
this.jobClient = null;
this.resolvedSchema = null;
this.resultKind = null;
this.data = null;
this.printStyle = PrintStyle.tableau(2147483647, "(NULL)", false, false);
this.sessionTimeZone = ZoneId.of("UTC");
}
public Builder jobClient(JobClient jobClient) {
this.jobClient = jobClient;
return this;
}
public Builder schema(ResolvedSchema resolvedSchema) {
Preconditions.checkNotNull(resolvedSchema, "resolvedSchema should not be null");
this.resolvedSchema = resolvedSchema;
return this;
}
public Builder resultKind(ResultKind resultKind) {
Preconditions.checkNotNull(resultKind, "resultKind should not be null");
this.resultKind = resultKind;
return this;
}
/**
* only print the result content as raw form. column delimiter is ",", row delimiter is "\n".
*/
private static final class RawContentStyle implements PrintStyle {}
/**
* A {@link CloseableIterator} wrapper class that can return whether the first row is ready.
*
* <p>The first row is ready when {@link #hasNext} method returns true or {@link #next()} method
* returns a row. The execution order of {@link TableResult#collect} method and {@link
* TableResult#await()} may be arbitrary, this class will record whether the first row is ready
* (or accessed).
*/
private static final class CloseableRowIteratorWrapper implements CloseableIterator<Row> {
private final CloseableIterator<Row> iterator;
private boolean isFirstRowReady = false;
public Builder data(CloseableIterator<Row> rowIterator) {
Preconditions.checkNotNull(rowIterator, "rowIterator should not be null");
this.data = rowIterator;
return this;
private CloseableRowIteratorWrapper(CloseableIterator<Row> iterator) {
this.iterator = iterator;
}
public Builder data(List<Row> rowList) {
Preconditions.checkNotNull(rowList, "listRows should not be null");
this.data = CloseableIterator.adapterForIterator(rowList.iterator());
return this;
@Override
public void close() throws Exception {
iterator.close();
}
public Builder setPrintStyle(PrintStyle printStyle) {
Preconditions.checkNotNull(printStyle, "printStyle should not be null");
this.printStyle = printStyle;
return this;
@Override
public boolean hasNext() {
boolean hasNext = iterator.hasNext();
isFirstRowReady = isFirstRowReady || hasNext;
return hasNext;
}
public Builder setSessionTimeZone(ZoneId sessionTimeZone) {
Preconditions.checkNotNull(sessionTimeZone, "sessionTimeZone should not be null");
this.sessionTimeZone = sessionTimeZone;
return this;
@Override
public Row next() {
Row next = iterator.next();
isFirstRowReady = true;
return next;
}
public TableResult build() {
return new CustomTableResultImpl(this.jobClient, this.resolvedSchema, this.resultKind, this.data, this.printStyle, this.sessionTimeZone);
public boolean isFirstRowReady() {
return isFirstRowReady || hasNext();
}
}
}
\ No newline at end of file
......@@ -55,5 +55,10 @@
<artifactId>dlink-function</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dlink</groupId>
<artifactId>dlink-gateway</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -4,8 +4,10 @@ import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.result.SqlExplainResult;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogManager;
......@@ -13,6 +15,7 @@ import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
......@@ -173,4 +176,8 @@ public abstract class Executor {
public CatalogManager getCatalogManager(){
return stEnvironment.getCatalogManager();
}
public JobGraph getJobGraphFromInserts(List<String> statements){
return stEnvironment.getJobGraphFromInserts(statements);
}
}
......@@ -7,6 +7,9 @@ import com.dlink.executor.Executor;
import com.dlink.executor.ExecutorSetting;
import com.dlink.executor.custom.CustomTableEnvironmentImpl;
import com.dlink.explainer.Explainer;
import com.dlink.gateway.Gateway;
import com.dlink.gateway.GatewayConfig;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.interceptor.FlinkInterceptor;
import com.dlink.parser.SqlType;
import com.dlink.result.*;
......@@ -17,9 +20,12 @@ import com.dlink.session.SessionPool;
import com.dlink.trans.Operations;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
......@@ -104,7 +110,9 @@ public class JobManager extends RunTime {
}
private void initEnvironmentSetting(){
environmentSetting = EnvironmentSetting.build(config.getAddress());
if(Asserts.isNotNullString(config.getAddress())) {
environmentSetting = EnvironmentSetting.build(config.getAddress());
}
}
private void initExecutorSetting(){
......@@ -203,6 +211,68 @@ public class JobManager extends RunTime {
return result;
}
public SubmitResult submitGraph(String statement, GatewayConfig gatewayConfig) {
if (statement == null || "".equals(statement)) {
return SubmitResult.error("FlinkSql语句不存在");
}
String[] statements = statement.split(FlinkSQLConstant.SEPARATOR);
List<String> sqlList = Arrays.asList(statements);
SubmitResult result = new SubmitResult(null, sqlList, null, executorSetting.getJobName());
int currentIndex = 0;
try {
if (Asserts.isNullCollection(sqlList)) {
result.setSuccess(false);
result.setMsg(LocalDateTime.now().toString() + ":执行sql语句为空。");
return result;
}
Executor executor = createExecutor();
List<String> inserts = new ArrayList<>();
long start = System.currentTimeMillis();
for (String sqlText : sqlList) {
currentIndex++;
SqlType operationType = Operations.getOperationType(sqlText);
CustomTableEnvironmentImpl stEnvironment = executor.getCustomTableEnvironmentImpl();
if (operationType.equals(SqlType.INSERT)) {
if (!FlinkInterceptor.build(stEnvironment, sqlText)) {
inserts.add(sqlText);
}
} else if(operationType.equals(SqlType.SET)){
} else {
if (!FlinkInterceptor.build(stEnvironment, sqlText)) {
executor.executeSql(sqlText);
}
}
}
JobGraph jobGraph = executor.getJobGraphFromInserts(inserts);
GatewayResult gatewayResult = Gateway.build(gatewayConfig).submitJobGraph(jobGraph);
long finish = System.currentTimeMillis();
long timeElapsed = finish - start;
InsertResult insertResult = new InsertResult(gatewayResult.getAppId(), true);
result.setResult(insertResult);
result.setJobId(gatewayResult.getAppId());
result.setTime(timeElapsed);
result.setSuccess(true);
result.setFinishDate(LocalDateTime.now());
} catch (Exception e) {
e.printStackTrace();
StackTraceElement[] trace = e.getStackTrace();
StringBuilder resMsg = new StringBuilder();
for (StackTraceElement s : trace) {
resMsg.append(" \n " + s + " ");
}
result.setSuccess(false);
// result.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage());
result.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "\n >>>堆栈信息<<<" + resMsg.toString());
// result.setError(LocalDateTime.now().toString() + ":" + "运行第" + currentIndex + "行sql时出现异常:" + e.getMessage() + "\n >>>异常原因<<< \n" + e.toString());
return result;
}
result.setSuccess(true);
result.setMsg(LocalDateTime.now().toString() + ":任务提交成功!");
return result;
}
public JobResult executeSql(String statement) {
Job job = new Job(config,environmentSetting.getAddress(),
Job.JobStatus.INITIALIZE,statement,executorSetting, LocalDateTime.now(),executor);
......
......@@ -44,7 +44,18 @@
<artifactId>dlink-client-1.12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId>
<version>1.12.5</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.dlink.gateway;
/**
* ConfigPara
*
* @author wenmo
* @since 2021/11/2
**/
public class ConfigPara {
private String key;
private String value;
public ConfigPara() {
}
public ConfigPara(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
......@@ -8,6 +8,7 @@ import sun.misc.Service;
import java.util.Iterator;
import java.util.Optional;
import java.util.ServiceLoader;
/**
* Submiter
......@@ -19,9 +20,10 @@ public interface Gateway {
static Optional<Gateway> get(GatewayConfig config){
Asserts.checkNotNull(config,"配置不能为空");
Iterator<Gateway> providers = Service.providers(Gateway.class);
while(providers.hasNext()) {
Gateway gateway = providers.next();
ServiceLoader<Gateway> loader = ServiceLoader.load(Gateway.class);
Iterator<Gateway> iterator = loader.iterator();
while(iterator.hasNext()) {
Gateway gateway = iterator.next();
if(gateway.canHandle(config.getType())){
gateway.setGatewayConfig(config);
return Optional.of(gateway);
......@@ -48,4 +50,6 @@ public interface Gateway {
GatewayResult submitJar();
GatewayResult savepoint();
}
package com.dlink.gateway;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* SubmitConfig
......@@ -18,23 +23,18 @@ public class GatewayConfig {
private GatewayType type;
private String jobName;
private String configDir;
private String flinkConfigPath;
private String userJarPath;
private String[] userJarParas;
private String userJarMainAppClass;
private String savePoint;
private String flinkLibs;
private String yarnConfigPath;
private List<ConfigPara> configParas;
public GatewayConfig() {
}
private static final ObjectMapper mapper = new ObjectMapper();
public GatewayConfig(GatewayType type, String jobName, String configDir, String userJarPath, String[] userJarParas, String userJarMainAppClass, String savePoint) {
this.type = type;
this.jobName = jobName;
this.configDir = configDir;
this.userJarPath = userJarPath;
this.userJarParas = userJarParas;
this.userJarMainAppClass = userJarMainAppClass;
this.savePoint = savePoint;
public GatewayConfig() {
}
public static GatewayConfig build(JsonNode para){
......@@ -43,8 +43,8 @@ public class GatewayConfig {
if(para.has("jobName")) {
config.setJobName(para.get("jobName").asText());
}
if(para.has("configDir")) {
config.setConfigDir(para.get("configDir").asText());
if(para.has("flinkConfigPath")) {
config.setFlinkConfigPath(para.get("flinkConfigPath").asText());
}
if(para.has("userJarPath")) {
config.setUserJarPath(para.get("userJarPath").asText());
......@@ -58,6 +58,25 @@ public class GatewayConfig {
if(para.has("savePoint")) {
config.setSavePoint(para.get("savePoint").asText());
}
if(para.has("flinkLibs")) {
config.setFlinkLibs(para.get("flinkLibs").asText());
}
if(para.has("yarnConfigPath")) {
config.setYarnConfigPath(para.get("yarnConfigPath").asText());
}
if(para.has("configParas")) {
try {
List<ConfigPara> configParas = new ArrayList<>();
JsonNode paras = mapper.readTree(para.get("configParas").asText());
paras.forEach((JsonNode node)-> {
configParas.add(new ConfigPara(node.get("key").asText(),node.get("value").asText()));
}
);
config.setConfigParas(configParas);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
return config;
}
......@@ -66,11 +85,14 @@ public class GatewayConfig {
return "GatewayConfig{" +
"type=" + type +
", jobName='" + jobName + '\'' +
", configDir='" + configDir + '\'' +
", flinkConfigPath='" + flinkConfigPath + '\'' +
", userJarPath='" + userJarPath + '\'' +
", userJarParas=" + Arrays.toString(userJarParas) +
", userJarMainAppClass='" + userJarMainAppClass + '\'' +
", savePoint='" + savePoint + '\'' +
", flinkLibs='" + flinkLibs + '\'' +
", yarnConfigPath='" + yarnConfigPath + '\'' +
", configParas='" + configParas.toString() + '\'' +
'}';
}
}
......@@ -10,7 +10,7 @@ import com.dlink.assertion.Asserts;
**/
public enum GatewayType {
YARN_APPLICATION("ya","yarn-application");
YARN_APPLICATION("ya","yarn-application"),YARN_PER_JOB("ypj","yarn-per-job");
private String value;
private String longValue;
......
......@@ -8,5 +8,5 @@ package com.dlink.gateway.result;
**/
public interface GatewayResult {
String getAppId();
}
......@@ -28,6 +28,13 @@ public class YarnResult extends AbstractGatewayResult {
this.appId = appId;
}
public String getAppId() {
return appId;
}
public String getWebURL() {
return webURL;
}
public static YarnResult build(GatewayType type){
return new YarnResult(type,LocalDateTime.now());
......
......@@ -3,22 +3,17 @@ package com.dlink.gateway.yarn;
import com.dlink.assertion.Asserts;
import com.dlink.gateway.GatewayConfig;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.YarnResult;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import java.util.Collections;
......@@ -43,58 +38,24 @@ public class YarnApplicationGateway extends YarnGateway {
return GatewayType.YARN_APPLICATION;
}
@Override
public void init() {
configuration = GlobalConfiguration.loadConfiguration(config.getConfigDir());
configuration.set(DeploymentOptions.TARGET, getType().getLongValue());
if(Asserts.isNotNullString(config.getSavePoint())) {
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, config.getSavePoint());
}
clientServiceLoader = new DefaultClusterClientServiceLoader();
}
@Override
public GatewayResult submitJobGraph(JobGraph jobGraph) {
init();
YarnResult result = YarnResult.build(getType());
final ClusterClientFactory clientFactory = clientServiceLoader.getClusterClientFactory(configuration);
try (final YarnClusterDescriptor clusterDescriptor =
(YarnClusterDescriptor) clientFactory.createClusterDescriptor(configuration)) {
final ClusterSpecification clusterSpecification =
clientFactory.getClusterSpecification(configuration);
ClusterClientProvider<ApplicationId> clusterClientProvider = clusterDescriptor.deployInternal(
clusterSpecification,
config.getJobName(),
YarnApplicationClusterEntryPoint.class.getName(),
jobGraph,
false);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
ApplicationId applicationId = clusterClient.getClusterId();
result.setAppId(applicationId.toString());
result.setWebURL(clusterClient.getWebInterfaceURL());
result.success();
}catch (Exception e){
e.printStackTrace();
logger.error(e.getMessage());
result.fail(e.getMessage());
}
return result;
throw new GatewayException("Couldn't deploy Yarn Application Cluster with job graph.");
}
@Override
public GatewayResult submitJar() {
init();
if(Asserts.isNull(yarnClient)){
init();
}
YarnResult result = YarnResult.build(getType());
logger.warn(config.toString());
configuration.set(PipelineOptions.JARS, Collections.singletonList(config.getUserJarPath()));
configuration.set(YarnConfigOptions.APPLICATION_NAME, config.getJobName());
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
ApplicationConfiguration appConfig = new ApplicationConfiguration(config.getUserJarParas(), config.getUserJarMainAppClass());
final ClusterClientFactory clientFactory = clientServiceLoader.getClusterClientFactory(configuration);
try (final YarnClusterDescriptor clusterDescriptor =
(YarnClusterDescriptor) clientFactory.createClusterDescriptor(configuration)) {
final ClusterSpecification clusterSpecification =
clientFactory.getClusterSpecification(configuration);
ClusterClientProvider<ApplicationId> clusterClientProvider = clusterDescriptor.deployApplicationCluster(
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
try {
ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
clusterSpecification,
appConfig);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
......
package com.dlink.gateway.yarn;
import com.dlink.assertion.Asserts;
import com.dlink.gateway.AbstractGateway;
import com.dlink.gateway.ConfigPara;
import com.dlink.gateway.GatewayConfig;
import com.dlink.gateway.result.GatewayResult;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.util.Collections;
import java.util.List;
/**
* YarnSubmiter
......@@ -13,7 +27,8 @@ import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
**/
public abstract class YarnGateway extends AbstractGateway {
protected DefaultClusterClientServiceLoader clientServiceLoader;
protected YarnConfiguration yarnConfiguration;
protected YarnClient yarnClient;
public YarnGateway() {
}
......@@ -22,5 +37,40 @@ public abstract class YarnGateway extends AbstractGateway {
super(config);
}
public void init(){}
public void init(){
initConfig();
initYarnClient();
}
private void initConfig(){
configuration = GlobalConfiguration.loadConfiguration(config.getFlinkConfigPath());
addConfigParas(config.getConfigParas());
configuration.set(DeploymentOptions.TARGET, getType().getLongValue());
if(Asserts.isNotNullString(config.getSavePoint())) {
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, config.getSavePoint());
}
configuration.set(YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(config.getFlinkLibs()));
configuration.set(YarnConfigOptions.APPLICATION_NAME, config.getJobName());
YarnLogConfigUtil.setLogConfigFileInConfig(configuration, config.getFlinkConfigPath());
}
private void initYarnClient(){
yarnConfiguration = new YarnConfiguration();
yarnConfiguration.addResource( new Path( config.getYarnConfigPath() ) );
yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfiguration);
yarnClient.start();
}
private void addConfigParas(List<ConfigPara> configParas){
if(Asserts.isNotNull(configParas)) {
for (ConfigPara configPara : configParas) {
configuration.setString(configPara.getKey(), configPara.getValue());
}
}
}
public GatewayResult savepoint(){
}
}
package com.dlink.gateway.yarn;
import com.dlink.assertion.Asserts;
import com.dlink.gateway.GatewayConfig;
import com.dlink.gateway.GatewayType;
import com.dlink.gateway.exception.GatewayException;
import com.dlink.gateway.result.GatewayResult;
import com.dlink.gateway.result.YarnResult;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
/**
* YarnApplicationGateway
*
* @author wenmo
* @since 2021/10/29
**/
public class YarnPerJobGateway extends YarnGateway {
public YarnPerJobGateway(GatewayConfig config) {
super(config);
}
public YarnPerJobGateway() {
}
@Override
public GatewayType getType() {
return GatewayType.YARN_PER_JOB;
}
@Override
public GatewayResult submitJobGraph(JobGraph jobGraph) {
if(Asserts.isNull(yarnClient)){
init();
}
YarnResult result = YarnResult.build(getType());
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), true);
try {
ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployJobCluster(clusterSpecification,jobGraph,false);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
ApplicationId applicationId = clusterClient.getClusterId();
result.setAppId(applicationId.toString());
result.setWebURL(clusterClient.getWebInterfaceURL());
result.success();
}catch (Exception e){
e.printStackTrace();
logger.error(e.getMessage());
result.fail(e.getMessage());
}
return result;
}
@Override
public GatewayResult submitJar() {
throw new GatewayException("Couldn't deploy Yarn Per-Job Cluster with User Application Jar.");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.yarn;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.plugin.PluginConfig;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.util.StringUtils;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint;
import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
import org.apache.commons.collections.ListUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.flink.configuration.ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.yarn.YarnConfigKeys.LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR;
/** The descriptor with deployment information for deploying a Flink cluster on Yarn. */
public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class);
private final YarnConfiguration yarnConfiguration;
private final YarnClient yarnClient;
private final YarnClusterInformationRetriever yarnClusterInformationRetriever;
/** True if the descriptor must not shut down the YarnClient. */
private final boolean sharedYarnClient;
/** Lazily initialized list of files to ship. */
private final List<File> shipFiles = new LinkedList<>();
private final List<File> shipArchives = new LinkedList<>();
private final String yarnQueue;
private Path flinkJarPath;
private final Configuration flinkConfiguration;
private final String customName;
private final String nodeLabel;
private final String applicationType;
private YarnConfigOptions.UserJarInclusion userJarInclusion;
public YarnClusterDescriptor(
Configuration flinkConfiguration,
YarnConfiguration yarnConfiguration,
YarnClient yarnClient,
YarnClusterInformationRetriever yarnClusterInformationRetriever,
boolean sharedYarnClient) {
this.yarnConfiguration = Preconditions.checkNotNull(yarnConfiguration);
this.yarnClient = Preconditions.checkNotNull(yarnClient);
this.yarnClusterInformationRetriever =
Preconditions.checkNotNull(yarnClusterInformationRetriever);
this.sharedYarnClient = sharedYarnClient;
this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration);
this.userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
getLocalFlinkDistPath(flinkConfiguration).ifPresent(this::setLocalJarPath);
decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_FILES)
.ifPresent(this::addShipFiles);
decodeFilesToShipToCluster(flinkConfiguration, YarnConfigOptions.SHIP_ARCHIVES)
.ifPresent(this::addShipArchives);
this.yarnQueue = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_QUEUE);
this.customName = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_NAME);
this.applicationType = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TYPE);
this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
}
private Optional<List<File>> decodeFilesToShipToCluster(
final Configuration configuration, final ConfigOption<List<String>> configOption) {
checkNotNull(configuration);
checkNotNull(configOption);
final List<File> files =
ConfigUtils.decodeListFromConfig(configuration, configOption, File::new);
return files.isEmpty() ? Optional.empty() : Optional.of(files);
}
private Optional<Path> getLocalFlinkDistPath(final Configuration configuration) {
final String localJarPath = configuration.getString(YarnConfigOptions.FLINK_DIST_JAR);
if (localJarPath != null) {
return Optional.of(new Path(localJarPath));
}
LOG.info(
"No path for the flink jar passed. Using the location of "
+ getClass()
+ " to locate the jar");
// check whether it's actually a jar file --> when testing we execute this class without a
// flink-dist jar
final String decodedPath = getDecodedJarPath();
return decodedPath.endsWith(".jar")
? Optional.of(new Path(new File(decodedPath).toURI()))
: Optional.empty();
}
private String getDecodedJarPath() {
final String encodedJarPath =
getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
try {
return URLDecoder.decode(encodedJarPath, Charset.defaultCharset().name());
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(
"Couldn't decode the encoded Flink dist jar path: "
+ encodedJarPath
+ " You can supply a path manually via the command line.");
}
}
@VisibleForTesting
List<File> getShipFiles() {
return shipFiles;
}
public YarnClient getYarnClient() {
return yarnClient;
}
/**
* The class to start the application master with. This class runs the main method in case of
* session cluster.
*/
protected String getYarnSessionClusterEntrypoint() {
return YarnSessionClusterEntrypoint.class.getName();
}
/**
* The class to start the application master with. This class runs the main method in case of
* the job cluster.
*/
protected String getYarnJobClusterEntrypoint() {
return YarnJobClusterEntrypoint.class.getName();
}
public Configuration getFlinkConfiguration() {
return flinkConfiguration;
}
public void setLocalJarPath(Path localJarPath) {
if (!localJarPath.toString().endsWith("jar")) {
throw new IllegalArgumentException(
"The passed jar path ('"
+ localJarPath
+ "') does not end with the 'jar' extension");
}
this.flinkJarPath = localJarPath;
}
/**
* Adds the given files to the list of files to ship.
*
* <p>Note that any file matching "<tt>flink-dist*.jar</tt>" will be excluded from the upload by
* {@link YarnApplicationFileUploader#registerMultipleLocalResources(Collection, String,
* LocalResourceType)} since we upload the Flink uber jar ourselves and do not need to deploy it
* multiple times.
*
* @param shipFiles files to ship
*/
public void addShipFiles(List<File> shipFiles) {
checkArgument(
userJarInclusion != YarnConfigOptions.UserJarInclusion.DISABLED
|| isUsrLibDirIncludedInShipFiles(shipFiles),
"This is an illegal ship directory : %s. When setting the %s to %s the name of ship directory can not be %s.",
ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR,
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(),
YarnConfigOptions.UserJarInclusion.DISABLED,
ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR);
this.shipFiles.addAll(shipFiles);
}
private void addShipArchives(List<File> shipArchives) {
checkArgument(
isArchiveOnlyIncludedInShipArchiveFiles(shipArchives),
"Non-archive files are included.");
this.shipArchives.addAll(shipArchives);
}
private static boolean isArchiveOnlyIncludedInShipArchiveFiles(List<File> shipFiles) {
return shipFiles.stream()
.filter(File::isFile)
.map(File::getName)
.map(String::toLowerCase)
.allMatch(
name ->
name.endsWith(".tar.gz")
|| name.endsWith(".tar")
|| name.endsWith(".tgz")
|| name.endsWith(".dst")
|| name.endsWith(".jar")
|| name.endsWith(".zip"));
}
private void isReadyForDeployment(ClusterSpecification clusterSpecification) throws Exception {
if (this.flinkJarPath == null) {
throw new YarnDeploymentException("The Flink jar path is null");
}
if (this.flinkConfiguration == null) {
throw new YarnDeploymentException("Flink configuration object has not been set");
}
// Check if we don't exceed YARN's maximum virtual cores.
final int numYarnMaxVcores = yarnClusterInformationRetriever.getMaxVcores();
int configuredAmVcores = flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES);
if (configuredAmVcores > numYarnMaxVcores) {
throw new IllegalConfigurationException(
String.format(
"The number of requested virtual cores for application master %d"
+ " exceeds the maximum number of virtual cores %d available in the Yarn Cluster.",
configuredAmVcores, numYarnMaxVcores));
}
int configuredVcores =
flinkConfiguration.getInteger(
YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
// don't configure more than the maximum configured number of vcores
if (configuredVcores > numYarnMaxVcores) {
throw new IllegalConfigurationException(
String.format(
"The number of requested virtual cores per node %d"
+ " exceeds the maximum number of virtual cores %d available in the Yarn Cluster."
+ " Please note that the number of virtual cores is set to the number of task slots by default"
+ " unless configured in the Flink config with '%s.'",
configuredVcores, numYarnMaxVcores, YarnConfigOptions.VCORES.key()));
}
// check if required Hadoop environment variables are set. If not, warn user
if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
LOG.warn(
"Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. "
+ "The Flink YARN Client needs one of these to be set to properly load the Hadoop "
+ "configuration for accessing YARN.");
}
}
public String getNodeLabel() {
return nodeLabel;
}
// -------------------------------------------------------------
// Lifecycle management
// -------------------------------------------------------------
@Override
public void close() {
if (!sharedYarnClient) {
yarnClient.stop();
}
}
// -------------------------------------------------------------
// ClusterClient overrides
// -------------------------------------------------------------
@Override
public ClusterClientProvider<ApplicationId> retrieve(ApplicationId applicationId)
throws ClusterRetrieveException {
try {
// check if required Hadoop environment variables are set. If not, warn user
if (System.getenv("HADOOP_CONF_DIR") == null
&& System.getenv("YARN_CONF_DIR") == null) {
LOG.warn(
"Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set."
+ "The Flink YARN Client needs one of these to be set to properly load the Hadoop "
+ "configuration for accessing YARN.");
}
final ApplicationReport report = yarnClient.getApplicationReport(applicationId);
if (report.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
// Flink cluster is not running anymore
LOG.error(
"The application {} doesn't run anymore. It has previously completed with final status: {}",
applicationId,
report.getFinalApplicationStatus());
throw new RuntimeException(
"The Yarn application " + applicationId + " doesn't run anymore.");
}
setClusterEntrypointInfoToConfig(report);
return () -> {
try {
return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
} catch (Exception e) {
throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
}
};
} catch (Exception e) {
throw new ClusterRetrieveException("Couldn't retrieve Yarn cluster", e);
}
}
@Override
public ClusterClientProvider<ApplicationId> deploySessionCluster(
ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
try {
return deployInternal(
clusterSpecification,
"Flink session cluster",
getYarnSessionClusterEntrypoint(),
null,
false);
} catch (Exception e) {
throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", e);
}
}
@Override
public ClusterClientProvider<ApplicationId> deployApplicationCluster(
final ClusterSpecification clusterSpecification,
final ApplicationConfiguration applicationConfiguration)
throws ClusterDeploymentException {
checkNotNull(clusterSpecification);
checkNotNull(applicationConfiguration);
final YarnDeploymentTarget deploymentTarget =
YarnDeploymentTarget.fromConfig(flinkConfiguration);
if (YarnDeploymentTarget.APPLICATION != deploymentTarget) {
throw new ClusterDeploymentException(
"Couldn't deploy Yarn Application Cluster."
+ " Expected deployment.target="
+ YarnDeploymentTarget.APPLICATION.getName()
+ " but actual one was \""
+ deploymentTarget.getName()
+ "\"");
}
applicationConfiguration.applyToConfiguration(flinkConfiguration);
final List<String> pipelineJars =
flinkConfiguration
.getOptional(PipelineOptions.JARS)
.orElse(Collections.emptyList());
Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");
try {
return deployInternal(
clusterSpecification,
"Flink Application Cluster",
YarnApplicationClusterEntryPoint.class.getName(),
null,
false);
} catch (Exception e) {
throw new ClusterDeploymentException("Couldn't deploy Yarn Application Cluster", e);
}
}
@Override
public ClusterClientProvider<ApplicationId> deployJobCluster(
ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached)
throws ClusterDeploymentException {
try {
return deployInternal(
clusterSpecification,
"Flink per-job cluster",
getYarnJobClusterEntrypoint(),
jobGraph,
detached);
} catch (Exception e) {
throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
}
}
@Override
public void killCluster(ApplicationId applicationId) throws FlinkException {
try {
yarnClient.killApplication(applicationId);
try (final FileSystem fs = FileSystem.get(yarnConfiguration)) {
final Path applicationDir =
YarnApplicationFileUploader.getApplicationDirPath(
getStagingDir(fs), applicationId);
Utils.deleteApplicationFiles(applicationDir.toUri().toString());
}
} catch (YarnException | IOException e) {
throw new FlinkException(
"Could not kill the Yarn Flink cluster with id " + applicationId + '.', e);
}
}
/**
* This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
*
* @param clusterSpecification Initial cluster specification for the Flink cluster to be
* deployed
* @param applicationName name of the Yarn application to start
* @param yarnClusterEntrypoint Class name of the Yarn cluster entry point.
* @param jobGraph A job graph which is deployed with the Flink cluster, {@code null} if none
* @param detached True if the cluster should be started in detached mode
*/
public ClusterClientProvider<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached)
throws Exception {
final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
boolean useTicketCache =
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
throw new RuntimeException(
"Hadoop security with Kerberos is enabled but the login user "
+ "does not have Kerberos credentials or delegation tokens!");
}
}
isReadyForDeployment(clusterSpecification);
// ------------------ Check if the specified queue exists --------------------
checkYarnQueues(yarnClient);
// ------------------ Check if the YARN ClusterClient has the requested resources
// --------------
// Create application via yarnClient
final YarnClientApplication yarnApplication = yarnClient.createApplication();
final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
Resource maxRes = appResponse.getMaximumResourceCapability();
final ClusterResourceDescription freeClusterMem;
try {
freeClusterMem = getCurrentFreeClusterResources(yarnClient);
} catch (YarnException | IOException e) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw new YarnDeploymentException(
"Could not retrieve information about free cluster resources.", e);
}
final int yarnMinAllocationMB =
yarnConfiguration.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
if (yarnMinAllocationMB <= 0) {
throw new YarnDeploymentException(
"The minimum allocation memory "
+ "("
+ yarnMinAllocationMB
+ " MB) configured via '"
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
+ "' should be greater than 0.");
}
final ClusterSpecification validClusterSpecification;
try {
validClusterSpecification =
validateClusterResources(
clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem);
} catch (YarnDeploymentException yde) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw yde;
}
LOG.info("Cluster specification: {}", validClusterSpecification);
final ClusterEntrypoint.ExecutionMode executionMode =
detached
? ClusterEntrypoint.ExecutionMode.DETACHED
: ClusterEntrypoint.ExecutionMode.NORMAL;
flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());
ApplicationReport report =
startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
validClusterSpecification);
// print the application id for user to cancel themselves.
if (detached) {
final ApplicationId yarnApplicationId = report.getApplicationId();
logDetachedClusterInformation(yarnApplicationId, LOG);
}
setClusterEntrypointInfoToConfig(report);
return () -> {
try {
return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
} catch (Exception e) {
throw new RuntimeException("Error while creating RestClusterClient.", e);
}
};
}
private ClusterSpecification validateClusterResources(
ClusterSpecification clusterSpecification,
int yarnMinAllocationMB,
Resource maximumResourceCapability,
ClusterResourceDescription freeClusterResources)
throws YarnDeploymentException {
int jobManagerMemoryMb = clusterSpecification.getMasterMemoryMB();
final int taskManagerMemoryMb = clusterSpecification.getTaskManagerMemoryMB();
logIfComponentMemNotIntegerMultipleOfYarnMinAllocation(
"JobManager", jobManagerMemoryMb, yarnMinAllocationMB);
logIfComponentMemNotIntegerMultipleOfYarnMinAllocation(
"TaskManager", taskManagerMemoryMb, yarnMinAllocationMB);
// set the memory to minAllocationMB to do the next checks correctly
if (jobManagerMemoryMb < yarnMinAllocationMB) {
jobManagerMemoryMb = yarnMinAllocationMB;
}
final String note =
"Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
throw new YarnDeploymentException(
"The cluster does not have the requested resources for the JobManager available!\n"
+ "Maximum Memory: "
+ maximumResourceCapability.getMemory()
+ "MB Requested: "
+ jobManagerMemoryMb
+ "MB. "
+ note);
}
if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
throw new YarnDeploymentException(
"The cluster does not have the requested resources for the TaskManagers available!\n"
+ "Maximum Memory: "
+ maximumResourceCapability.getMemory()
+ " Requested: "
+ taskManagerMemoryMb
+ "MB. "
+ note);
}
final String noteRsc =
"\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are "
+ "connecting from the beginning because the resources are currently not available in the cluster. "
+ "The allocation might take more time than usual because the Flink YARN client needs to wait until "
+ "the resources become available.";
if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn(
"The requested amount of memory for the TaskManagers ("
+ taskManagerMemoryMb
+ "MB) is more than "
+ "the largest possible YARN container: "
+ freeClusterResources.containerLimit
+ noteRsc);
}
if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn(
"The requested amount of memory for the JobManager ("
+ jobManagerMemoryMb
+ "MB) is more than "
+ "the largest possible YARN container: "
+ freeClusterResources.containerLimit
+ noteRsc);
}
return new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMb)
.setTaskManagerMemoryMB(taskManagerMemoryMb)
.setSlotsPerTaskManager(clusterSpecification.getSlotsPerTaskManager())
.createClusterSpecification();
}
private void logIfComponentMemNotIntegerMultipleOfYarnMinAllocation(
String componentName, int componentMemoryMB, int yarnMinAllocationMB) {
int normalizedMemMB =
(componentMemoryMB + (yarnMinAllocationMB - 1))
/ yarnMinAllocationMB
* yarnMinAllocationMB;
if (normalizedMemMB <= 0) {
normalizedMemMB = yarnMinAllocationMB;
}
if (componentMemoryMB != normalizedMemMB) {
LOG.info(
"The configured {} memory is {} MB. YARN will allocate {} MB to make up an integer multiple of its "
+ "minimum allocation memory ({} MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra {} MB "
+ "may not be used by Flink.",
componentName,
componentMemoryMB,
normalizedMemMB,
yarnMinAllocationMB,
normalizedMemMB - componentMemoryMB);
}
}
private void checkYarnQueues(YarnClient yarnClient) {
try {
List<QueueInfo> queues = yarnClient.getAllQueues();
if (queues.size() > 0
&& this.yarnQueue
!= null) { // check only if there are queues configured in yarn and for
// this session.
boolean queueFound = false;
for (QueueInfo queue : queues) {
if (queue.getQueueName().equals(this.yarnQueue)
|| queue.getQueueName().equals("root." + this.yarnQueue)) {
queueFound = true;
break;
}
}
if (!queueFound) {
String queueNames = "";
for (QueueInfo queue : queues) {
queueNames += queue.getQueueName() + ", ";
}
LOG.warn(
"The specified queue '"
+ this.yarnQueue
+ "' does not exist. "
+ "Available queues: "
+ queueNames);
}
} else {
LOG.debug("The YARN cluster does not have any queues configured");
}
} catch (Throwable e) {
LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Error details", e);
}
}
}
private ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification)
throws Exception {
// ------------------ Initialize the file systems -------------------------
org.apache.flink.core.fs.FileSystem.initialize(
configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
final FileSystem fs = FileSystem.get(yarnConfiguration);
// hard coded check for the GoogleHDFS client because its not overriding the getScheme()
// method.
if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem")
&& fs.getScheme().startsWith("file")) {
LOG.warn(
"The file system scheme is '"
+ fs.getScheme()
+ "'. This indicates that the "
+ "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
+ "The Flink YARN client needs to store its files in a distributed file system");
}
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
final List<Path> providedLibDirs =
Utils.getQualifiedRemoteSharedPaths(configuration, yarnConfiguration);
final YarnApplicationFileUploader fileUploader =
YarnApplicationFileUploader.from(
fs,
getStagingDir(fs),
providedLibDirs,
appContext.getApplicationId(),
getFileReplication());
// The files need to be shipped and added to classpath.
Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
for (File file : shipFiles) {
systemShipFiles.add(file.getAbsoluteFile());
}
final String logConfigFilePath =
configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
if (logConfigFilePath != null) {
systemShipFiles.add(new File(logConfigFilePath));
}
// Set-up ApplicationSubmissionContext for the application
final ApplicationId appId = appContext.getApplicationId();
// ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
setHAClusterIdIfNotSet(configuration, appId);
if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
// activate re-execution of failed applications
appContext.setMaxAppAttempts(
configuration.getInteger(
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
activateHighAvailabilitySupport(appContext);
} else {
// set number of application retries to 1 in the default case
appContext.setMaxAppAttempts(
configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1));
}
final Set<Path> userJarFiles = new HashSet<>();
if (jobGraph != null) {
userJarFiles.addAll(
jobGraph.getUserJars().stream()
.map(f -> f.toUri())
.map(Path::new)
.collect(Collectors.toSet()));
}
final List<URI> jarUrls =
ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);
if (jarUrls != null
&& YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
}
// only for per job mode
if (jobGraph != null) {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
jobGraph.getUserArtifacts().entrySet()) {
// only upload local files
if (!Utils.isRemotePath(entry.getValue().filePath)) {
Path localPath = new Path(entry.getValue().filePath);
Tuple2<Path, Long> remoteFileInfo =
fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());
jobGraph.setUserArtifactRemotePath(
entry.getKey(), remoteFileInfo.f0.toString());
}
}
jobGraph.writeUserArtifactEntriesToConfiguration();
}
if (providedLibDirs == null || providedLibDirs.isEmpty()) {
addLibFoldersToShipFiles(systemShipFiles);
}
// Register all files in provided lib dirs as local resources with public visibility
// and upload the remaining dependencies as local resources with APPLICATION visibility.
final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
final List<String> uploadedDependencies =
fileUploader.registerMultipleLocalResources(
systemShipFiles.stream()
.map(e -> new Path(e.toURI()))
.collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.FILE);
systemClassPaths.addAll(uploadedDependencies);
// upload and register ship-only files
// Plugin files only need to be shipped and should not be added to classpath.
if (providedLibDirs == null || providedLibDirs.isEmpty()) {
Set<File> shipOnlyFiles = new HashSet<>();
addPluginsFoldersToShipFiles(shipOnlyFiles);
fileUploader.registerMultipleLocalResources(
shipOnlyFiles.stream()
.map(e -> new Path(e.toURI()))
.collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.FILE);
}
if (!shipArchives.isEmpty()) {
fileUploader.registerMultipleLocalResources(
shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
Path.CUR_DIR,
LocalResourceType.ARCHIVE);
}
// Upload and register user jars
final List<String> userClassPaths =
fileUploader.registerMultipleLocalResources(
userJarFiles,
userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED
? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR
: Path.CUR_DIR,
LocalResourceType.FILE);
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
systemClassPaths.addAll(userClassPaths);
}
// normalize classpath by sorting
Collections.sort(systemClassPaths);
Collections.sort(userClassPaths);
// classpath assembler
StringBuilder classPathBuilder = new StringBuilder();
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
for (String classPath : systemClassPaths) {
classPathBuilder.append(classPath).append(File.pathSeparator);
}
// Setup jar for ApplicationMaster
final YarnLocalResourceDescriptor localResourceDescFlinkJar =
fileUploader.uploadFlinkDist(flinkJarPath);
classPathBuilder
.append(localResourceDescFlinkJar.getResourceKey())
.append(File.pathSeparator);
// write job graph to tmp file and add it to local resource
// TODO: server use user main method to generate job graph
if (jobGraph != null) {
File tmpJobGraphFile = null;
try {
tmpJobGraphFile = File.createTempFile(appId.toString(), null);
try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
ObjectOutputStream obOutput = new ObjectOutputStream(output)) {
obOutput.writeObject(jobGraph);
}
final String jobGraphFilename = "job.graph";
configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);
fileUploader.registerSingleLocalResource(
jobGraphFilename,
new Path(tmpJobGraphFile.toURI()),
"",
LocalResourceType.FILE,
true,
false);
classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
} catch (Exception e) {
LOG.warn("Add job graph to local resource fail.");
throw e;
} finally {
if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());
}
}
}
// Upload the flink configuration
// write out configuration file
File tmpConfigurationFile = null;
try {
tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);
String flinkConfigKey = "flink-conf.yaml";
fileUploader.registerSingleLocalResource(
flinkConfigKey,
new Path(tmpConfigurationFile.getAbsolutePath()),
"",
LocalResourceType.FILE,
true,
true);
classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
} finally {
if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
}
}
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
// To support Yarn Secure Integration Test Scenario
// In Integration test setup, the Yarn containers created by YarnMiniCluster does not have
// the Yarn site XML
// and KRB5 configuration files. We are adding these files as container local resources for
// the container
// applications (JM/TMs) to have proper secure cluster setup
Path remoteYarnSiteXmlPath = null;
if (System.getenv("IN_TESTS") != null) {
File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
LOG.info(
"Adding Yarn configuration {} to the AM container local resource bucket",
f.getAbsolutePath());
Path yarnSitePath = new Path(f.getAbsolutePath());
remoteYarnSiteXmlPath =
fileUploader
.registerSingleLocalResource(
Utils.YARN_SITE_FILE_NAME,
yarnSitePath,
"",
LocalResourceType.FILE,
false,
false)
.getPath();
if (System.getProperty("java.security.krb5.conf") != null) {
configuration.set(
SecurityOptions.KERBEROS_KRB5_PATH,
System.getProperty("java.security.krb5.conf"));
}
}
Path remoteKrb5Path = null;
boolean hasKrb5 = false;
String krb5Config = configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);
if (!StringUtils.isNullOrWhitespaceOnly(krb5Config)) {
final File krb5 = new File(krb5Config);
LOG.info(
"Adding KRB5 configuration {} to the AM container local resource bucket",
krb5.getAbsolutePath());
final Path krb5ConfPath = new Path(krb5.getAbsolutePath());
remoteKrb5Path =
fileUploader
.registerSingleLocalResource(
Utils.KRB5_FILE_NAME,
krb5ConfPath,
"",
LocalResourceType.FILE,
false,
false)
.getPath();
hasKrb5 = true;
}
Path remotePathKeytab = null;
String localizedKeytabPath = null;
String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
if (keytab != null) {
boolean localizeKeytab =
flinkConfiguration.getBoolean(YarnConfigOptions.SHIP_LOCAL_KEYTAB);
localizedKeytabPath =
flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
if (localizeKeytab) {
// Localize the keytab to YARN containers via local resource.
LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
remotePathKeytab =
fileUploader
.registerSingleLocalResource(
localizedKeytabPath,
new Path(keytab),
"",
LocalResourceType.FILE,
false,
false)
.getPath();
} else {
// // Assume Keytab is pre-installed in the container.
localizedKeytabPath =
flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
}
}
final JobManagerProcessSpec processSpec =
JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY);
final ContainerLaunchContext amContainer =
setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec);
// setup security tokens
if (UserGroupInformation.isSecurityEnabled()) {
// set HDFS delegation tokens when security is enabled
LOG.info("Adding delegation token to the AM container.");
List<Path> yarnAccessList =
ConfigUtils.decodeListFromConfig(
configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
Utils.setTokensFor(
amContainer,
ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()),
yarnConfiguration);
}
amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
fileUploader.close();
// Setup CLASSPATH and environment variables for ApplicationMaster
final Map<String, String> appMasterEnv = new HashMap<>();
// set user specified app master environment variables
appMasterEnv.putAll(
ConfigurationUtils.getPrefixedKeyValuePairs(
ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));
// set Flink app class path
appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());
// set Flink on YARN internal configuration values
appMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR, localResourceDescFlinkJar.toString());
appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fileUploader.getHomeDir().toString());
appMasterEnv.put(
YarnConfigKeys.ENV_CLIENT_SHIP_FILES,
encodeYarnLocalResourceDescriptorListToString(
fileUploader.getEnvShipResourceList()));
appMasterEnv.put(
YarnConfigKeys.FLINK_YARN_FILES,
fileUploader.getApplicationDir().toUri().toString());
// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
appMasterEnv.put(
YarnConfigKeys.ENV_HADOOP_USER_NAME,
UserGroupInformation.getCurrentUser().getUserName());
if (localizedKeytabPath != null) {
appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
if (remotePathKeytab != null) {
appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());
}
}
// To support Yarn Secure Integration Test Scenario
if (remoteYarnSiteXmlPath != null) {
appMasterEnv.put(
YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
}
if (remoteKrb5Path != null) {
appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
}
// set classpath from YARN configuration
Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv);
amContainer.setEnvironment(appMasterEnv);
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(clusterSpecification.getMasterMemoryMB());
capability.setVirtualCores(
flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));
final String customApplicationName = customName != null ? customName : applicationName;
appContext.setApplicationName(customApplicationName);
appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
// Set priority for application
int priorityNum = flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY);
if (priorityNum >= 0) {
Priority priority = Priority.newInstance(priorityNum);
appContext.setPriority(priority);
}
if (yarnQueue != null) {
appContext.setQueue(yarnQueue);
}
setApplicationNodeLabel(appContext);
setApplicationTags(appContext);
// add a hook to clean up in case deployment fails
Thread deploymentFailureHook =
new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
LOG.info("Submitting application master " + appId);
yarnClient.submitApplication(appContext);
LOG.info("Waiting for the cluster to be allocated");
final long startTime = System.currentTimeMillis();
ApplicationReport report;
YarnApplicationState lastAppState = YarnApplicationState.NEW;
loop:
while (true) {
try {
report = yarnClient.getApplicationReport(appId);
} catch (IOException e) {
throw new YarnDeploymentException("Failed to deploy the cluster.", e);
}
YarnApplicationState appState = report.getYarnApplicationState();
LOG.debug("Application State: {}", appState);
switch (appState) {
case FAILED:
case KILLED:
throw new YarnDeploymentException(
"The YARN application unexpectedly switched to state "
+ appState
+ " during deployment. \n"
+ "Diagnostics from YARN: "
+ report.getDiagnostics()
+ "\n"
+ "If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n"
+ "yarn logs -applicationId "
+ appId);
// break ..
case RUNNING:
LOG.info("YARN application has been deployed successfully.");
break loop;
case FINISHED:
LOG.info("YARN application has been finished successfully.");
break loop;
default:
if (appState != lastAppState) {
LOG.info("Deploying cluster, current state " + appState);
}
if (System.currentTimeMillis() - startTime > 60000) {
LOG.info(
"Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
}
}
lastAppState = appState;
Thread.sleep(250);
}
// since deployment was successful, remove the hook
ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
return report;
}
/**
* Returns the configured remote target home directory if set, otherwise returns the default
* home directory.
*
* @param fileSystem file system used
* @return the remote target home directory
*/
private Path getStagingDir(FileSystem fileSystem) {
final String configuredStagingDir =
flinkConfiguration.getString(YarnConfigOptions.STAGING_DIRECTORY);
return configuredStagingDir != null
? fileSystem.makeQualified(new Path(configuredStagingDir))
: fileSystem.getHomeDirectory();
}
private int getFileReplication() {
final int yarnFileReplication =
yarnConfiguration.getInt(
DFSConfigKeys.DFS_REPLICATION_KEY, DFSConfigKeys.DFS_REPLICATION_DEFAULT);
final int fileReplication =
flinkConfiguration.getInteger(YarnConfigOptions.FILE_REPLICATION);
return fileReplication > 0 ? fileReplication : yarnFileReplication;
}
private static String encodeYarnLocalResourceDescriptorListToString(
List<YarnLocalResourceDescriptor> resources) {
return String.join(
LOCAL_RESOURCE_DESCRIPTOR_SEPARATOR,
resources.stream()
.map(YarnLocalResourceDescriptor::toString)
.collect(Collectors.toList()));
}
/**
* Kills YARN application and stops YARN client.
*
* <p>Use this method to kill the App before it has been properly deployed
*/
private void failSessionDuringDeployment(
YarnClient yarnClient, YarnClientApplication yarnApplication) {
LOG.info("Killing YARN application");
try {
yarnClient.killApplication(
yarnApplication.getNewApplicationResponse().getApplicationId());
} catch (Exception e) {
// we only log a debug message here because the "killApplication" call is a best-effort
// call (we don't know if the application has been deployed when the error occurred).
LOG.debug("Error while killing YARN application", e);
}
}
private static class ClusterResourceDescription {
public final int totalFreeMemory;
public final int containerLimit;
public final int[] nodeManagersFree;
public ClusterResourceDescription(
int totalFreeMemory, int containerLimit, int[] nodeManagersFree) {
this.totalFreeMemory = totalFreeMemory;
this.containerLimit = containerLimit;
this.nodeManagersFree = nodeManagersFree;
}
}
private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient)
throws YarnException, IOException {
List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
int totalFreeMemory = 0;
int containerLimit = 0;
int[] nodeManagersFree = new int[nodes.size()];
for (int i = 0; i < nodes.size(); i++) {
NodeReport rep = nodes.get(i);
int free =
rep.getCapability().getMemory()
- (rep.getUsed() != null ? rep.getUsed().getMemory() : 0);
nodeManagersFree[i] = free;
totalFreeMemory += free;
if (free > containerLimit) {
containerLimit = free;
}
}
return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
}
@Override
public String getClusterDescription() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers());
List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
final String format = "|%-16s |%-16s %n";
ps.printf("|Property |Value %n");
ps.println("+---------------------------------------+");
int totalMemory = 0;
int totalCores = 0;
for (NodeReport rep : nodes) {
final Resource res = rep.getCapability();
totalMemory += res.getMemory();
totalCores += res.getVirtualCores();
ps.format(format, "NodeID", rep.getNodeId());
ps.format(format, "Memory", res.getMemory() + " MB");
ps.format(format, "vCores", res.getVirtualCores());
ps.format(format, "HealthReport", rep.getHealthReport());
ps.format(format, "Containers", rep.getNumContainers());
ps.println("+---------------------------------------+");
}
ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores);
List<QueueInfo> qInfo = yarnClient.getAllQueues();
for (QueueInfo q : qInfo) {
ps.println(
"Queue: "
+ q.getQueueName()
+ ", Current Capacity: "
+ q.getCurrentCapacity()
+ " Max Capacity: "
+ q.getMaximumCapacity()
+ " Applications: "
+ q.getApplications().size());
}
return baos.toString();
} catch (Exception e) {
throw new RuntimeException("Couldn't get cluster description", e);
}
}
private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext)
throws InvocationTargetException, IllegalAccessException {
ApplicationSubmissionContextReflector reflector =
ApplicationSubmissionContextReflector.getInstance();
reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);
reflector.setAttemptFailuresValidityInterval(
appContext,
flinkConfiguration.getLong(
YarnConfigOptions.APPLICATION_ATTEMPT_FAILURE_VALIDITY_INTERVAL));
}
private void setApplicationTags(final ApplicationSubmissionContext appContext)
throws InvocationTargetException, IllegalAccessException {
final ApplicationSubmissionContextReflector reflector =
ApplicationSubmissionContextReflector.getInstance();
final String tagsString = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TAGS);
final Set<String> applicationTags = new HashSet<>();
// Trim whitespace and cull empty tags
for (final String tag : tagsString.split(",")) {
final String trimmedTag = tag.trim();
if (!trimmedTag.isEmpty()) {
applicationTags.add(trimmedTag);
}
}
reflector.setApplicationTags(appContext, applicationTags);
}
private void setApplicationNodeLabel(final ApplicationSubmissionContext appContext)
throws InvocationTargetException, IllegalAccessException {
if (nodeLabel != null) {
final ApplicationSubmissionContextReflector reflector =
ApplicationSubmissionContextReflector.getInstance();
reflector.setApplicationNodeLabel(appContext, nodeLabel);
}
}
/**
* Singleton object which uses reflection to determine whether the {@link
* ApplicationSubmissionContext} supports various methods which, depending on the Hadoop
* version, may or may not be supported.
*
* <p>If an unsupported method is invoked, nothing happens.
*
* <p>Currently three methods are proxied: - setApplicationTags (>= 2.4.0) -
* setAttemptFailuresValidityInterval (>= 2.6.0) - setKeepContainersAcrossApplicationAttempts
* (>= 2.4.0) - setNodeLabelExpression (>= 2.6.0)
*/
private static class ApplicationSubmissionContextReflector {
private static final Logger LOG =
LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class);
private static final ApplicationSubmissionContextReflector instance =
new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
public static ApplicationSubmissionContextReflector getInstance() {
return instance;
}
private static final String APPLICATION_TAGS_METHOD_NAME = "setApplicationTags";
private static final String ATTEMPT_FAILURES_METHOD_NAME =
"setAttemptFailuresValidityInterval";
private static final String KEEP_CONTAINERS_METHOD_NAME =
"setKeepContainersAcrossApplicationAttempts";
private static final String NODE_LABEL_EXPRESSION_NAME = "setNodeLabelExpression";
private final Method applicationTagsMethod;
private final Method attemptFailuresValidityIntervalMethod;
private final Method keepContainersMethod;
@Nullable private final Method nodeLabelExpressionMethod;
private ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> clazz) {
Method applicationTagsMethod;
Method attemptFailuresValidityIntervalMethod;
Method keepContainersMethod;
Method nodeLabelExpressionMethod;
try {
// this method is only supported by Hadoop 2.4.0 onwards
applicationTagsMethod = clazz.getMethod(APPLICATION_TAGS_METHOD_NAME, Set.class);
LOG.debug(
"{} supports method {}.",
clazz.getCanonicalName(),
APPLICATION_TAGS_METHOD_NAME);
} catch (NoSuchMethodException e) {
LOG.debug(
"{} does not support method {}.",
clazz.getCanonicalName(),
APPLICATION_TAGS_METHOD_NAME);
// assign null because the Hadoop version apparently does not support this call.
applicationTagsMethod = null;
}
this.applicationTagsMethod = applicationTagsMethod;
try {
// this method is only supported by Hadoop 2.6.0 onwards
attemptFailuresValidityIntervalMethod =
clazz.getMethod(ATTEMPT_FAILURES_METHOD_NAME, long.class);
LOG.debug(
"{} supports method {}.",
clazz.getCanonicalName(),
ATTEMPT_FAILURES_METHOD_NAME);
} catch (NoSuchMethodException e) {
LOG.debug(
"{} does not support method {}.",
clazz.getCanonicalName(),
ATTEMPT_FAILURES_METHOD_NAME);
// assign null because the Hadoop version apparently does not support this call.
attemptFailuresValidityIntervalMethod = null;
}
this.attemptFailuresValidityIntervalMethod = attemptFailuresValidityIntervalMethod;
try {
// this method is only supported by Hadoop 2.4.0 onwards
keepContainersMethod = clazz.getMethod(KEEP_CONTAINERS_METHOD_NAME, boolean.class);
LOG.debug(
"{} supports method {}.",
clazz.getCanonicalName(),
KEEP_CONTAINERS_METHOD_NAME);
} catch (NoSuchMethodException e) {
LOG.debug(
"{} does not support method {}.",
clazz.getCanonicalName(),
KEEP_CONTAINERS_METHOD_NAME);
// assign null because the Hadoop version apparently does not support this call.
keepContainersMethod = null;
}
this.keepContainersMethod = keepContainersMethod;
try {
nodeLabelExpressionMethod =
clazz.getMethod(NODE_LABEL_EXPRESSION_NAME, String.class);
LOG.debug(
"{} supports method {}.",
clazz.getCanonicalName(),
NODE_LABEL_EXPRESSION_NAME);
} catch (NoSuchMethodException e) {
LOG.debug(
"{} does not support method {}.",
clazz.getCanonicalName(),
NODE_LABEL_EXPRESSION_NAME);
nodeLabelExpressionMethod = null;
}
this.nodeLabelExpressionMethod = nodeLabelExpressionMethod;
}
public void setApplicationTags(
ApplicationSubmissionContext appContext, Set<String> applicationTags)
throws InvocationTargetException, IllegalAccessException {
if (applicationTagsMethod != null) {
LOG.debug(
"Calling method {} of {}.",
applicationTagsMethod.getName(),
appContext.getClass().getCanonicalName());
applicationTagsMethod.invoke(appContext, applicationTags);
} else {
LOG.debug(
"{} does not support method {}. Doing nothing.",
appContext.getClass().getCanonicalName(),
APPLICATION_TAGS_METHOD_NAME);
}
}
public void setApplicationNodeLabel(
ApplicationSubmissionContext appContext, String nodeLabel)
throws InvocationTargetException, IllegalAccessException {
if (nodeLabelExpressionMethod != null) {
LOG.debug(
"Calling method {} of {}.",
nodeLabelExpressionMethod.getName(),
appContext.getClass().getCanonicalName());
nodeLabelExpressionMethod.invoke(appContext, nodeLabel);
} else {
LOG.debug(
"{} does not support method {}. Doing nothing.",
appContext.getClass().getCanonicalName(),
NODE_LABEL_EXPRESSION_NAME);
}
}
public void setAttemptFailuresValidityInterval(
ApplicationSubmissionContext appContext, long validityInterval)
throws InvocationTargetException, IllegalAccessException {
if (attemptFailuresValidityIntervalMethod != null) {
LOG.debug(
"Calling method {} of {}.",
attemptFailuresValidityIntervalMethod.getName(),
appContext.getClass().getCanonicalName());
attemptFailuresValidityIntervalMethod.invoke(appContext, validityInterval);
} else {
LOG.debug(
"{} does not support method {}. Doing nothing.",
appContext.getClass().getCanonicalName(),
ATTEMPT_FAILURES_METHOD_NAME);
}
}
public void setKeepContainersAcrossApplicationAttempts(
ApplicationSubmissionContext appContext, boolean keepContainers)
throws InvocationTargetException, IllegalAccessException {
if (keepContainersMethod != null) {
LOG.debug(
"Calling method {} of {}.",
keepContainersMethod.getName(),
appContext.getClass().getCanonicalName());
keepContainersMethod.invoke(appContext, keepContainers);
} else {
LOG.debug(
"{} does not support method {}. Doing nothing.",
appContext.getClass().getCanonicalName(),
KEEP_CONTAINERS_METHOD_NAME);
}
}
}
private static class YarnDeploymentException extends RuntimeException {
private static final long serialVersionUID = -812040641215388943L;
public YarnDeploymentException(String message) {
super(message);
}
public YarnDeploymentException(String message, Throwable cause) {
super(message, cause);
}
}
private class DeploymentFailureHook extends Thread {
private final YarnClient yarnClient;
private final YarnClientApplication yarnApplication;
private final Path yarnFilesDir;
DeploymentFailureHook(YarnClientApplication yarnApplication, Path yarnFilesDir) {
this.yarnApplication = Preconditions.checkNotNull(yarnApplication);
this.yarnFilesDir = Preconditions.checkNotNull(yarnFilesDir);
// A new yarn client need to be created in shutdown hook in order to avoid
// the yarn client has been closed by YarnClusterDescriptor.
this.yarnClient = YarnClient.createYarnClient();
this.yarnClient.init(yarnConfiguration);
}
@Override
public void run() {
LOG.info("Cancelling deployment from Deployment Failure Hook");
yarnClient.start();
failSessionDuringDeployment(yarnClient, yarnApplication);
yarnClient.stop();
LOG.info("Deleting files in {}.", yarnFilesDir);
try {
FileSystem fs = FileSystem.get(yarnConfiguration);
if (!fs.delete(yarnFilesDir, true)) {
throw new IOException(
"Deleting files in " + yarnFilesDir + " was unsuccessful");
}
fs.close();
} catch (IOException e) {
LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e);
}
}
}
@VisibleForTesting
void addLibFoldersToShipFiles(Collection<File> effectiveShipFiles) {
// Add lib folder to the ship files if the environment variable is set.
// This is for convenience when running from the command-line.
// (for other files users explicitly set the ship files)
String libDir = System.getenv().get(ENV_FLINK_LIB_DIR);
if (libDir != null) {
File directoryFile = new File(libDir);
if (directoryFile.isDirectory()) {
effectiveShipFiles.add(directoryFile);
} else {
throw new YarnDeploymentException(
"The environment variable '"
+ ENV_FLINK_LIB_DIR
+ "' is set to '"
+ libDir
+ "' but the directory doesn't exist.");
}
} else if (shipFiles.isEmpty()) {
LOG.warn(
"Environment variable '{}' not set and ship files have not been provided manually. "
+ "Not shipping any library files.",
ENV_FLINK_LIB_DIR);
}
}
@VisibleForTesting
void addPluginsFoldersToShipFiles(Collection<File> effectiveShipFiles) {
final Optional<File> pluginsDir = PluginConfig.getPluginsDir();
pluginsDir.ifPresent(effectiveShipFiles::add);
}
ContainerLaunchContext setupApplicationMasterContainer(
String yarnClusterEntrypoint, boolean hasKrb5, JobManagerProcessSpec processSpec) {
// ------------------ Prepare Application Master Container ------------------------------
// respect custom JVM options in the YAML file
String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
javaOpts += " " + flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
}
// krb5.conf file will be available as local resource in JM/TM container
if (hasKrb5) {
javaOpts += " -Djava.security.krb5.conf=krb5.conf";
}
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
final Map<String, String> startCommandValues = new HashMap<>();
startCommandValues.put("java", "$JAVA_HOME/bin/java");
String jvmHeapMem =
JobManagerProcessUtils.generateJvmParametersStr(processSpec, flinkConfiguration);
startCommandValues.put("jvmmem", jvmHeapMem);
startCommandValues.put("jvmopts", javaOpts);
startCommandValues.put(
"logging", YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));
startCommandValues.put("class", yarnClusterEntrypoint);
startCommandValues.put(
"redirects",
"1> "
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/jobmanager.out "
+ "2> "
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/jobmanager.err");
String dynamicParameterListStr =
JobManagerProcessUtils.generateDynamicConfigsStr(processSpec);
startCommandValues.put("args", dynamicParameterListStr);
final String commandTemplate =
flinkConfiguration.getString(
ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
final String amCommand =
BootstrapTools.getStartCommand(commandTemplate, startCommandValues);
amContainer.setCommands(Collections.singletonList(amCommand));
LOG.debug("Application Master start command: " + amCommand);
return amContainer;
}
private static YarnConfigOptions.UserJarInclusion getUserJarInclusionMode(
org.apache.flink.configuration.Configuration config) {
return config.getEnum(
YarnConfigOptions.UserJarInclusion.class,
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
}
private static boolean isUsrLibDirIncludedInShipFiles(List<File> shipFiles) {
return shipFiles.stream()
.filter(File::isDirectory)
.map(File::getName)
.noneMatch(name -> name.equals(DEFAULT_FLINK_USR_LIB_DIR));
}
private void setClusterEntrypointInfoToConfig(final ApplicationReport report) {
checkNotNull(report);
final ApplicationId appId = report.getApplicationId();
final String host = report.getHost();
final int port = report.getRpcPort();
LOG.info("Found Web Interface {}:{} of application '{}'.", host, port, appId);
flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
flinkConfiguration.setString(RestOptions.ADDRESS, host);
flinkConfiguration.setInteger(RestOptions.PORT, port);
flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(appId));
setHAClusterIdIfNotSet(flinkConfiguration, appId);
}
private void setHAClusterIdIfNotSet(Configuration configuration, ApplicationId appId) {
// set cluster-id to app id if not specified
if (!configuration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) {
configuration.set(
HighAvailabilityOptions.HA_CLUSTER_ID, ConverterUtils.toString(appId));
}
}
public static void logDetachedClusterInformation(
ApplicationId yarnApplicationId, Logger logger) {
logger.info(
"The Flink YARN session cluster has been started in detached mode. In order to "
+ "stop Flink gracefully, use the following command:\n"
+ "$ echo \"stop\" | ./bin/yarn-session.sh -id {}\n"
+ "If this should not be possible, then you can also kill Flink via YARN's web interface or via:\n"
+ "$ yarn application -kill {}\n"
+ "Note that killing Flink might not clean up all job artifacts and temporary files.",
yarnApplicationId,
yarnApplicationId);
}
}
com.dlink.gateway.yarn.YarnApplicationGateway
\ No newline at end of file
com.dlink.gateway.yarn.YarnApplicationGateway
com.dlink.gateway.yarn.YarnPerJobGateway
\ No newline at end of file
......@@ -15,7 +15,7 @@ public class GatewayTest {
GatewayConfig config = new GatewayConfig();
config.setJobName("apptest");
config.setType(GatewayType.get("yarn-application"));
config.setConfigDir("/opt/src/flink-1.12.2_pj/conf");
config.setFlinkConfigPath("/opt/src/flink-1.12.2_pj/conf");
config.setUserJarPath("hdfs:///flink12/jar/currencyAppJar.jar");
config.setUserJarParas("--id 2410,2412,2411".split("\\s+"));
config.setUserJarMainAppClass("com.app.MainApp");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment