diff --git a/dss-apps/dss-datapipe-server/pom.xml b/dss-apps/dss-datapipe-server/pom.xml
new file mode 100644
index 0000000000..30f2dd9e5c
--- /dev/null
+++ b/dss-apps/dss-datapipe-server/pom.xml
@@ -0,0 +1,285 @@
+
+
+
+
+ 4.0.0
+
+ dss
+ com.webank.wedatasphere.dss
+ 1.0.0
+ ../../pom.xml
+
+
+ dss-datapipe-server
+
+
+ UTF-8
+ 2.16
+
+
+
+
+
+ com.webank.wedatasphere.dss
+ dss-common
+ ${dss.version}
+ provided
+
+
+ com.webank.wedatasphere.linkis
+ linkis-module
+ ${linkis.version}
+ provided
+
+
+ asm
+ org.ow2.asm
+
+
+ hk2-api
+ org.glassfish.hk2
+
+
+ jersey-common
+ org.glassfish.jersey.core
+
+
+ linkis-common
+ com.webank.wedatasphere.linkis
+
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.70
+
+
+
+ org.glassfish.jersey.ext
+ jersey-bean-validation
+ ${jersey.version}
+ provided
+
+
+ javax.ws.rs-api
+ javax.ws.rs
+
+
+ hk2-locator
+ org.glassfish.hk2
+
+
+ hk2-api
+ org.glassfish.hk2
+
+
+ hibernate-validator
+ org.hibernate
+
+
+ jersey-server
+ org.glassfish.jersey.core
+
+
+
+
+
+ com.webank.wedatasphere.linkis
+ linkis-bml-client
+ ${linkis.version}
+
+
+ commons-beanutils
+ commons-beanutils
+
+
+ linkis-common
+ com.webank.wedatasphere.linkis
+
+
+ json4s-jackson_2.11
+ org.json4s
+
+
+
+
+ com.webank.wedatasphere.linkis
+ linkis-computation-client
+ ${linkis.version}
+
+
+ commons-beanutils
+ commons-beanutils
+
+
+ linkis-common
+ com.webank.wedatasphere.linkis
+
+
+
+
+ org.postgresql
+ postgresql
+ 42.2.12
+
+
+ com.webank.wedatasphere.linkis
+ linkis-mybatis
+ ${linkis.version}
+ provided
+
+
+ com.webank.wedatasphere.linkis
+ linkis-storage
+ ${linkis.version}
+ provided
+
+
+ linkis-common
+ com.webank.wedatasphere.linkis
+
+
+
+
+ com.webank.wedatasphere.linkis
+ linkis-common
+ ${linkis.version}
+ provided
+
+
+ com.webank.wedatasphere.linkis
+ linkis-rpc
+ ${linkis.version}
+ provided
+
+
+ archaius-core
+ com.netflix.archaius
+
+
+ slf4j-api
+ org.slf4j
+
+
+ spring-cloud-starter
+ org.springframework.cloud
+
+
+ spring-web
+ org.springframework
+
+
+
+
+ hk2-api
+ org.glassfish.hk2
+ 2.4.0-b34
+
+
+ org.modelmapper
+ modelmapper
+ 0.7.5
+
+
+
+ io.jsonwebtoken
+ jjwt
+ 0.6.0
+
+
+ jackson-databind
+ com.fasterxml.jackson.core
+
+
+
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+ org.junit.platform
+ junit-platform-launcher
+ 1.5.2
+ test
+
+
+ xstream
+ com.thoughtworks.xstream
+ 1.4.11.1
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ 2.3
+ false
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+ src/main/assembly/distribution.xml
+
+
+
+
+
+ false
+ out
+ false
+ false
+
+ src/main/assembly/distribution.xml
+
+
+
+
+
+
+ ${basedir}/src/main/resources
+
+
+ ${project.artifactId}-${project.version}
+
+
\ No newline at end of file
diff --git a/dss-apps/dss-datapipe-server/src/main/assembly/distribution.xml b/dss-apps/dss-datapipe-server/src/main/assembly/distribution.xml
new file mode 100644
index 0000000000..d6c12b0fb3
--- /dev/null
+++ b/dss-apps/dss-datapipe-server/src/main/assembly/distribution.xml
@@ -0,0 +1,44 @@
+
+
+
+ dss-apiService-server
+
+ dir
+
+ true
+ dss-datapipe-server
+
+
+
+
+
+ lib
+ true
+ true
+ false
+ true
+ true
+
+
+
+
+
+
+
diff --git a/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/config/DataPipeServiceConfiguration.java b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/config/DataPipeServiceConfiguration.java
new file mode 100644
index 0000000000..16185ebf48
--- /dev/null
+++ b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/config/DataPipeServiceConfiguration.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2019 WeBank
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.webank.wedatasphere.dss.datapipe.config;
+
+import com.webank.wedatasphere.linkis.common.conf.CommonVars;
+
+
+public class DataPipeServiceConfiguration {
+ public final static CommonVars LINKIS_AUTHOR_USER_TOKEN = CommonVars.apply("wds.linkis.client.api.service.author.user.token","WS-AUTH");
+ public final static CommonVars LINKIS_ADMIN_USER = CommonVars.apply("wds.linkis.client.api.service.adminuser","ws");
+
+ public final static CommonVars LINKIS_CONNECTION_TIMEOUT = CommonVars.apply("wds.linkis.flow.connection.timeout",30000);
+ public final static CommonVars LINKIS_API_VERSION = CommonVars.apply("wds.linkis.server.version","v1");
+
+
+}
diff --git a/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/dao/DataPipeDao.java b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/dao/DataPipeDao.java
new file mode 100644
index 0000000000..f1c065e996
--- /dev/null
+++ b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/dao/DataPipeDao.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2019 WeBank
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.webank.wedatasphere.dss.datapipe.dao;
+
+
+
+public interface DataPipeDao {
+
+
+}
diff --git a/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/dao/mapper/DataPipeMapper.xml b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/dao/mapper/DataPipeMapper.xml
new file mode 100644
index 0000000000..a2c2837e2b
--- /dev/null
+++ b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/dao/mapper/DataPipeMapper.xml
@@ -0,0 +1,20 @@
+
+
+
+
+
+
diff --git a/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/exception/DataPipeExecuteException.java b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/exception/DataPipeExecuteException.java
new file mode 100644
index 0000000000..ea8445393c
--- /dev/null
+++ b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/exception/DataPipeExecuteException.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2019 WeBank
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.webank.wedatasphere.dss.datapipe.exception;
+
+import com.webank.wedatasphere.linkis.common.exception.ErrorException;
+
+
+public class DataPipeExecuteException extends ErrorException {
+
+ public DataPipeExecuteException(int errCode, String desc) {
+ super(errCode, desc);
+ }
+
+ public DataPipeExecuteException(int errCode, String desc, String ip, int port, String serviceKind) {
+ super(errCode, desc, ip, port, serviceKind);
+ }
+ }
diff --git a/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/execute/LinkisJobSubmit.java b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/execute/LinkisJobSubmit.java
new file mode 100644
index 0000000000..eb3dbb5d2d
--- /dev/null
+++ b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/execute/LinkisJobSubmit.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2019 WeBank
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.webank.wedatasphere.dss.datapipe.execute;
+
+import com.webank.wedatasphere.dss.datapipe.config.DataPipeServiceConfiguration;
+import com.webank.wedatasphere.linkis.common.conf.Configuration;
+import com.webank.wedatasphere.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy;
+import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfig;
+import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder;
+import com.webank.wedatasphere.linkis.server.socket.controller.ServerEvent;
+import com.webank.wedatasphere.linkis.ujes.client.UJESClient;
+import com.webank.wedatasphere.linkis.ujes.client.UJESClientImpl;
+import com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction;
+import com.webank.wedatasphere.linkis.ujes.client.response.JobExecuteResult;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+
+public class LinkisJobSubmit {
+
+ public static UJESClient getClient(Map props) {
+ UJESClient client = getUJESClient(
+ Configuration.GATEWAY_URL().getValue(props),
+ DataPipeServiceConfiguration.LINKIS_ADMIN_USER.getValue(props),
+ DataPipeServiceConfiguration.LINKIS_AUTHOR_USER_TOKEN.getValue(props),
+ props);
+
+ return client;
+ }
+
+ public static UJESClient getUJESClient(String url, String user, String token, Map jobProps){
+ return new UJESClientImpl(getClientConfig(url,user,token, jobProps));
+ }
+
+ public static DWSClientConfig getClientConfig(String url, String user, String token, Map jobProps){
+ DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder()
+ .addServerUrl(url)
+ .connectionTimeout(DataPipeServiceConfiguration.LINKIS_CONNECTION_TIMEOUT.getValue(jobProps))
+ .discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES)
+ .loadbalancerEnabled(true)
+ .maxConnectionSize(5)
+ .retryEnabled(false).readTimeout(DataPipeServiceConfiguration.LINKIS_CONNECTION_TIMEOUT.getValue(jobProps))
+ .setAuthenticationStrategy(new TokenAuthenticationStrategy())
+ .setAuthTokenKey(user).setAuthTokenValue(token)))
+ .setDWSVersion(DataPipeServiceConfiguration.LINKIS_API_VERSION.getValue(jobProps)).build();
+ return clientConfig;
+ }
+
+ /**
+ * @param operation
+ * @param client
+ * @return
+ */
+ public static JobExecuteResult execute(ServerEvent operation, UJESClient client) {
+ Map dataMap = operation.getData();
+ String runType = (String)dataMap.get("runType");
+ String background = (String)dataMap.get("background");
+ String executionCode = (String)dataMap.get("executionCode");
+ String scriptPath = "default.scala";
+ Map sourceMap = (Map)dataMap.get("source");
+ scriptPath = sourceMap!=null && sourceMap.containsKey("scriptPath") ? sourceMap.get("scriptPath") : scriptPath;
+
+ String executeApplicationName = (String)dataMap.get("executeApplicationName");
+ String umUser = (String)dataMap.get("umUser");
+ Map paramsMap = (Map)dataMap.get("params");
+
+ Map source = new HashMap<>();
+ source.put("DSS-DataPipe",scriptPath);
+ JobExecuteAction.Builder builder = JobExecuteAction.builder().setCreator("IDE")
+ .addExecuteCode(executionCode)
+ .setEngineTypeStr(executeApplicationName)
+ .setRunTypeStr(runType)
+ .setUser(umUser)
+ .setParams(paramsMap)
+ .setVariableMap(new HashMap<>())
+ .setSource(source);
+ JobExecuteAction jobAction = builder.build();
+ JobExecuteResult res = client.execute(jobAction);
+ return res;
+ }
+
+}
diff --git a/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/restful/DSSEntranceRestfulApi.java b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/restful/DSSEntranceRestfulApi.java
new file mode 100644
index 0000000000..bf5cb026b0
--- /dev/null
+++ b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/restful/DSSEntranceRestfulApi.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2019 WeBank
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.webank.wedatasphere.dss.datapipe.restful;
+
+import com.google.gson.Gson;
+import com.google.gson.internal.LinkedTreeMap;
+import com.webank.wedatasphere.dss.datapipe.entrance.background.BackGroundService;
+import com.webank.wedatasphere.dss.datapipe.entrance.background.ExportBackGroundService;
+import com.webank.wedatasphere.dss.datapipe.entrance.background.LoadBackGroundService;
+import com.webank.wedatasphere.dss.datapipe.execute.LinkisJobSubmit;
+import com.webank.wedatasphere.linkis.protocol.constants.TaskConstant;
+import com.webank.wedatasphere.linkis.server.Message;
+import com.webank.wedatasphere.linkis.server.security.SecurityFilter;
+import com.webank.wedatasphere.linkis.server.socket.controller.ServerEvent;
+import com.webank.wedatasphere.linkis.ujes.client.UJESClient;
+import com.webank.wedatasphere.linkis.ujes.client.response.JobExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.HashMap;
+import java.util.Map;
+
+
+@Path("/dss/datapipe")
+@Component
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class DSSEntranceRestfulApi {
+ private static final Logger logger = LoggerFactory.getLogger(DSSEntranceRestfulApi.class);
+
+ @POST
+ @Path("/backgroundservice")
+ public Response backgroundservice(@Context HttpServletRequest req, Map json) {
+ Message message = null;
+ logger.info("Begin to get an execID");
+ String backgroundType = (String) json.get("background");
+ BackGroundService bgService = null;
+ if("export".equals(backgroundType)){
+ bgService = new ExportBackGroundService();
+ }else if("load".equals(backgroundType)){
+ bgService = new LoadBackGroundService();
+ }else{
+ message = Message.error("export type is not exist:"+backgroundType);
+ return Message.messageToResponse(message);
+ }
+ Gson gson = new Gson();
+ Map executionCode = (Map) json.get("executionCode");
+ executionCode = gson.fromJson(gson.toJson(executionCode),LinkedTreeMap.class);
+ json.put("executionCode",executionCode);
+ json.put(TaskConstant.UMUSER, SecurityFilter.getLoginUsername(req));
+ ServerEvent serverEvent = new ServerEvent();
+ serverEvent.setData(json);
+ serverEvent.setUser(SecurityFilter.getLoginUsername(req));
+ ServerEvent operation = bgService.operation(serverEvent);
+
+
+ JobExecuteResult jobExecuteResult = toLinkisEntrance(operation);
+ message = Message.ok();
+ message.setMethod("/api/dss/datapipe/backgroundservice");
+ message.data("operation",operation);
+ message.data("execID", jobExecuteResult.getExecID());
+ message.data("taskID", jobExecuteResult.getTaskID());
+ logger.info("End to get an an execID: {}, taskID: {}", jobExecuteResult.getExecID(), jobExecuteResult.getTaskID());
+ return Message.messageToResponse(message);
+
+ }
+
+ public JobExecuteResult toLinkisEntrance(ServerEvent operation){
+ JobExecuteResult jobExecuteResult = null;
+ try{
+ Map props = new HashMap<>();
+ UJESClient client = LinkisJobSubmit.getClient(props);
+ jobExecuteResult = LinkisJobSubmit.execute(operation,client);
+ }catch (Exception e){
+ logger.error("toLinkisEntranceError-",e);
+ throw e;
+ }
+ return jobExecuteResult;
+
+ }
+}
diff --git a/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/DSSDatapipeServerApplication.scala b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/DSSDatapipeServerApplication.scala
new file mode 100644
index 0000000000..56b0553edc
--- /dev/null
+++ b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/DSSDatapipeServerApplication.scala
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2019 WeBank
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.webank.wedatasphere.dss.datapipe
+
+import com.webank.wedatasphere.dss.common.utils.DSSMainHelper
+import com.webank.wedatasphere.linkis.DataWorkCloudApplication
+import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
+
+
+object DSSDatapipeServerApplication extends Logging {
+
+ val userName: String = System.getProperty("user.name")
+ val hostName: String = Utils.getComputerName
+
+ def main(args: Array[String]): Unit = {
+ val serviceName = System.getProperty("serviceName")//ProjectConf.SERVICE_NAME.getValue
+ DSSMainHelper.formatPropertyFiles(serviceName)
+ val allArgs = args ++ DSSMainHelper.getExtraSpringOptions
+ System.setProperty("hostName", hostName)
+ System.setProperty("userName", userName)
+ info(s"Ready to start $serviceName with args: ${allArgs.toList}.")
+ println(s"Test Ready to start $serviceName with args: ${allArgs.toList}.")
+ DataWorkCloudApplication.main(allArgs)
+ }
+}
\ No newline at end of file
diff --git a/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/AbstractBackGroundService.scala b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/AbstractBackGroundService.scala
new file mode 100644
index 0000000000..3e0b97dac1
--- /dev/null
+++ b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/AbstractBackGroundService.scala
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2019 WeBank
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.webank.wedatasphere.dss.datapipe.entrance.background
+
+
+abstract class AbstractBackGroundService extends BackGroundService{
+
+}
diff --git a/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/BackGroundService.scala b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/BackGroundService.scala
new file mode 100644
index 0000000000..01af25e8b7
--- /dev/null
+++ b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/BackGroundService.scala
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2019 WeBank
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.webank.wedatasphere.dss.datapipe.entrance.background
+
+import com.webank.wedatasphere.linkis.server.socket.controller.ServerEvent
+
+
+trait BackGroundService {
+ val serviceType:String
+ def operation(serverEvent: ServerEvent):ServerEvent
+}
diff --git a/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/BackGroundServiceUtils.scala b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/BackGroundServiceUtils.scala
new file mode 100644
index 0000000000..6fa27fd9cf
--- /dev/null
+++ b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/BackGroundServiceUtils.scala
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2019 WeBank
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.webank.wedatasphere.dss.datapipe.entrance.background
+
+import java.io.{InputStream, OutputStream}
+import java.lang
+import java.lang.reflect.Type
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.gson.{GsonBuilder, JsonElement, JsonPrimitive, JsonSerializationContext, JsonSerializer}
+import com.webank.wedatasphere.linkis.common.conf.CommonVars
+import com.webank.wedatasphere.linkis.common.io.FsPath
+import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
+import com.webank.wedatasphere.linkis.storage.FSFactory
+import com.webank.wedatasphere.linkis.storage.fs.FileSystem
+import com.webank.wedatasphere.linkis.storage.utils.FileSystemUtils
+import org.apache.commons.io.IOUtils
+import org.apache.commons.lang.time.DateFormatUtils
+
+object BackGroundServiceUtils extends Logging {
+
+ private val CODE_STORE_PREFIX = CommonVars("bdp.dataworkcloud.bgservice.store.prefix", "hdfs:///tmp/bdp-ide/")
+ private val CODE_STORE_SUFFIX = CommonVars("bdp.dataworkcloud.bgservice.store.suffix", "")
+ private val CHARSET = "utf-8"
+ private val CODE_SPLIT = ";"
+ private val LENGTH_SPLIT = "#"
+
+ def storeExecutionCode(destination: String,user:String): String = {
+ if (destination.length < 60000) return null
+ val path: String = getCodeStorePath(user)
+ val fsPath: FsPath = new FsPath(path)
+ val fileSystem = FSFactory.getFsByProxyUser(fsPath, user).asInstanceOf[FileSystem]
+ fileSystem.init(null)
+ var os: OutputStream = null
+ var position = 0L
+ val codeBytes = destination.getBytes(CHARSET)
+ Utils.tryFinally {
+ path.intern() synchronized {
+ if (!fileSystem.exists(fsPath)) FileSystemUtils.createNewFile(fsPath, user, true)
+ os = fileSystem.write(fsPath, false)
+ position = fileSystem.get(path).getLength
+ IOUtils.write(codeBytes, os)
+ }
+ } {
+ if (fileSystem != null) fileSystem.close()
+ IOUtils.closeQuietly(os)
+ }
+ val length = codeBytes.length
+ path + CODE_SPLIT + position + LENGTH_SPLIT + length
+ }
+
+ def exchangeExecutionCode(codePath: String): Unit = {
+ import scala.util.control.Breaks._
+ val path = codePath.substring(0, codePath.lastIndexOf(CODE_SPLIT))
+ val codeInfo = codePath.substring(codePath.lastIndexOf(CODE_SPLIT) + 1)
+ val infos: Array[String] = codeInfo.split(LENGTH_SPLIT)
+ val position = infos(0).toInt
+ var lengthLeft = infos(1).toInt
+ val tub = new Array[Byte](1024)
+ val executionCode: StringBuilder = new StringBuilder
+ val fsPath: FsPath = new FsPath(path)
+ val fileSystem = FSFactory.getFsByProxyUser(fsPath, System.getProperty("user.name")).asInstanceOf[FileSystem]
+ fileSystem.init(null)
+ var is: InputStream = null
+ if(!fileSystem.exists(fsPath)) return
+ Utils.tryFinally {
+ is = fileSystem.read(fsPath)
+ if (position > 0) is.skip(position)
+ breakable {
+ while (lengthLeft > 0) {
+ val readed = is.read(tub)
+ val useful = Math.min(readed, lengthLeft)
+ if (useful < 0) break()
+ lengthLeft -= useful
+ executionCode.append(new String(tub, 0, useful, CHARSET))
+ }
+ }
+ } {
+ if (fileSystem != null) fileSystem.close()
+ IOUtils.closeQuietly(is)
+ }
+ executionCode.toString()
+ }
+
+ private def getCodeStorePath(user: String): String = {
+ val date: String = DateFormatUtils.format(new Date, "yyyyMMdd")
+ s"${CODE_STORE_PREFIX.getValue}${user}${CODE_STORE_SUFFIX.getValue}/executionCode/${date}/_bgservice"
+ }
+
+ implicit val gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").serializeNulls
+ .registerTypeAdapter(classOf[java.lang.Double], new JsonSerializer[java.lang.Double] {
+ override def serialize(t: lang.Double, `type`: Type, jsonSerializationContext: JsonSerializationContext): JsonElement =
+ if(t == t.longValue()) new JsonPrimitive(t.longValue()) else new JsonPrimitive(t)
+ }).create
+
+ implicit val jacksonJson = new ObjectMapper().setDateFormat(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ"))
+}
diff --git a/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/ExportBackGroundService.scala b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/ExportBackGroundService.scala
new file mode 100644
index 0000000000..a51350067d
--- /dev/null
+++ b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/ExportBackGroundService.scala
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2019 WeBank
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.webank.wedatasphere.dss.datapipe.entrance.background
+
+import java.util
+
+import com.google.gson.internal.LinkedTreeMap
+import com.google.gson.{JsonObject, JsonParser}
+import com.webank.wedatasphere.linkis.common.utils.Logging
+import com.webank.wedatasphere.linkis.server._
+import com.webank.wedatasphere.linkis.server.socket.controller.ServerEvent
+
+
+class ExportBackGroundService extends AbstractBackGroundService with Logging{
+ override val serviceType: String = "export"
+
+ override def operation(serverEvent: ServerEvent): ServerEvent = {
+ val params = serverEvent.getData.map { case (k, v) => k -> v.asInstanceOf[Any] }
+ //val executionCode = params.get("executionCode").get
+ val ec = params.get("executionCode").get.asInstanceOf[LinkedTreeMap[String,LinkedTreeMap[String,String]]]
+ if(ec.get("destination")!=null && ec.get("destination").get("fieldDelimiter") != null){
+ info(s"---${ec.get("destination").get("fieldDelimiter")}---")
+ ec.get("destination").get("fieldDelimiter") match {
+ case "\\t" =>ec.get("destination").put("fieldDelimiter","\t")
+ case _ =>info("---other fieldDelimiter---")
+ }
+ }
+ val executionCode = BackGroundServiceUtils.gson.toJson(params.get("executionCode").get)
+ // TODO: Head may be removed
+ var newExecutionCode = ""
+ val jsonParser = new JsonParser()
+ val jsonCode = jsonParser.parse(executionCode.asInstanceOf[String]).asInstanceOf[JsonObject]
+ val destination = "val destination = \"\"\"" + jsonCode.get("destination").toString + "\"\"\"\n"
+ val dataInfo = jsonCode.get("dataInfo").toString
+ var newDataInfo = "val dataInfo = \"\"\""
+ val storePath = BackGroundServiceUtils.storeExecutionCode(dataInfo,serverEvent.getUser)
+ if(storePath == null) newDataInfo += dataInfo + "\"\"\"\n" else newDataInfo += storePath + "\"\"\"\n"
+ newExecutionCode += destination
+ newExecutionCode += newDataInfo
+ if(storePath == null)
+ newExecutionCode += "com.webank.wedatasphere.linkis.engineplugin.spark.imexport.ExportData.exportData(spark,dataInfo,destination)"
+ else
+ newExecutionCode += "com.webank.wedatasphere.linkis.engineplugin.spark.imexport.ExportData.exportDataByFile(spark,dataInfo,destination)"
+ params.put("executionCode", newExecutionCode)
+ print(newExecutionCode)
+ val map = new util.HashMap[String, Object]()
+ params.foreach(f => map.put(f._1, f._2.asInstanceOf[Object]))
+ serverEvent.setData(map)
+ serverEvent
+ }
+
+ def splitDataInfo(dataInfo:String):util.ArrayList[String] = {
+ val length = 6000
+ val list = new util.ArrayList[String]()
+ var size = dataInfo.length /length
+ if(dataInfo.length % length != 0) size += 1
+ for(i <- 0 to size-1){
+ list.add(subString(dataInfo,i * length,(i +1) * length))
+ }
+ list
+ }
+
+ private def subString(str:String,begin:Int,end:Int):String = {
+ if(begin > str.length) return null
+ if(end > str.length) return str.substring(begin,str.length)
+ str.substring(begin,end)
+ }
+
+
+}
+
+object A{
+ def main(args: Array[String]): Unit = {
+ val builder: StringBuilder = new StringBuilder
+ for(i <- 1 to 200){
+ if(i == 1) builder.append(2) else builder.append(1)
+
+ }
+ var newDataInfo = "val dataInfo = \"\"\""
+ val service: ExportBackGroundService = new ExportBackGroundService()
+ val splitDataInfos: util.ArrayList[String] =service.splitDataInfo(builder.toString())
+ val splitString = "\"\"\"" + "+" + "\"\"\""
+ val compaction = splitDataInfos.foldLeft("")((l,r) => l + splitString + r )
+ newDataInfo += compaction.substring(splitString.length,compaction.length) + "\"\"\"\n"
+ print(newDataInfo)
+ }
+}
diff --git a/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/LoadBackGroundService.scala b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/LoadBackGroundService.scala
new file mode 100644
index 0000000000..fa21113557
--- /dev/null
+++ b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/LoadBackGroundService.scala
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2019 WeBank
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.webank.wedatasphere.dss.datapipe.entrance.background
+
+import java.util
+
+import com.google.gson.{JsonObject, JsonParser}
+import com.webank.wedatasphere.linkis.server._
+import com.webank.wedatasphere.linkis.server.socket.controller.ServerEvent
+
+
+class LoadBackGroundService extends AbstractBackGroundService {
+
+ override val serviceType: String = "load"
+
+ override def operation(serverEvent: ServerEvent): ServerEvent = {
+ val params = serverEvent.getData.map { case (k, v) => k -> v.asInstanceOf[Any] }
+ val executionCode = BackGroundServiceUtils.gson.toJson(params.get("executionCode").get)
+ // TODO: Head may be removed
+ var newExecutionCode = ""
+ val jsonParser = new JsonParser()
+ val jsonCode = jsonParser.parse(executionCode.asInstanceOf[String]).asInstanceOf[JsonObject]
+ val source = "val source = \"\"\"" + jsonCode.get("source").toString + "\"\"\"\n"
+ val destination = jsonCode.get("destination").toString
+ var newDestination = "val destination = \"\"\""
+ val storePath = BackGroundServiceUtils.storeExecutionCode(destination,serverEvent.getUser)
+ if(storePath == null) newDestination +=destination +"\"\"\"\n" else newDestination +=storePath + "\"\"\"\n"
+ newExecutionCode += source
+ newExecutionCode += newDestination
+ if(storePath == null){
+ newExecutionCode += "com.webank.wedatasphere.linkis.engineplugin.spark.imexport.LoadData.loadDataToTable(spark,source,destination)"
+ }else{
+ newExecutionCode += "com.webank.wedatasphere.linkis.engineplugin.spark.imexport.LoadData.loadDataToTableByFile(spark,destination,source)"
+ }
+
+ params.put("executionCode", newExecutionCode)
+ print(newExecutionCode)
+ val map = new util.HashMap[String, Object]()
+ params.foreach(f => map.put(f._1, f._2.asInstanceOf[Object]))
+ serverEvent.setData(map)
+ serverEvent
+ }
+
+ def splitDestination(destination:String):util.ArrayList[String] = {
+ val length = 6000
+ val list = new util.ArrayList[String]()
+ var size = destination.length /length
+ if(destination.length % length != 0) size += 1
+ for(i <- 0 to size-1){
+ list.add(subString(destination,i * length,(i +1) * length))
+ }
+ list
+ }
+
+ private def subString(str:String,begin:Int,end:Int):String = {
+ if(begin > str.length) return null
+ if(end > str.length) return str.substring(begin,str.length)
+ str.substring(begin,end)
+ }
+}