diff --git a/dss-apps/dss-datapipe-server/pom.xml b/dss-apps/dss-datapipe-server/pom.xml new file mode 100644 index 0000000000..30f2dd9e5c --- /dev/null +++ b/dss-apps/dss-datapipe-server/pom.xml @@ -0,0 +1,285 @@ + + + + + 4.0.0 + + dss + com.webank.wedatasphere.dss + 1.0.0 + ../../pom.xml + + + dss-datapipe-server + + + UTF-8 + 2.16 + + + + + + com.webank.wedatasphere.dss + dss-common + ${dss.version} + provided + + + com.webank.wedatasphere.linkis + linkis-module + ${linkis.version} + provided + + + asm + org.ow2.asm + + + hk2-api + org.glassfish.hk2 + + + jersey-common + org.glassfish.jersey.core + + + linkis-common + com.webank.wedatasphere.linkis + + + + + com.alibaba + fastjson + 1.2.70 + + + + org.glassfish.jersey.ext + jersey-bean-validation + ${jersey.version} + provided + + + javax.ws.rs-api + javax.ws.rs + + + hk2-locator + org.glassfish.hk2 + + + hk2-api + org.glassfish.hk2 + + + hibernate-validator + org.hibernate + + + jersey-server + org.glassfish.jersey.core + + + + + + com.webank.wedatasphere.linkis + linkis-bml-client + ${linkis.version} + + + commons-beanutils + commons-beanutils + + + linkis-common + com.webank.wedatasphere.linkis + + + json4s-jackson_2.11 + org.json4s + + + + + com.webank.wedatasphere.linkis + linkis-computation-client + ${linkis.version} + + + commons-beanutils + commons-beanutils + + + linkis-common + com.webank.wedatasphere.linkis + + + + + org.postgresql + postgresql + 42.2.12 + + + com.webank.wedatasphere.linkis + linkis-mybatis + ${linkis.version} + provided + + + com.webank.wedatasphere.linkis + linkis-storage + ${linkis.version} + provided + + + linkis-common + com.webank.wedatasphere.linkis + + + + + com.webank.wedatasphere.linkis + linkis-common + ${linkis.version} + provided + + + com.webank.wedatasphere.linkis + linkis-rpc + ${linkis.version} + provided + + + archaius-core + com.netflix.archaius + + + slf4j-api + org.slf4j + + + spring-cloud-starter + org.springframework.cloud + + + spring-web + org.springframework + + + + + hk2-api + org.glassfish.hk2 + 2.4.0-b34 + + + org.modelmapper + modelmapper + 0.7.5 + + + + io.jsonwebtoken + jjwt + 0.6.0 + + + jackson-databind + com.fasterxml.jackson.core + + + + + + junit + junit + 4.12 + test + + + org.junit.platform + junit-platform-launcher + 1.5.2 + test + + + xstream + com.thoughtworks.xstream + 1.4.11.1 + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + + + net.alchim31.maven + scala-maven-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + org.apache.maven.plugins + maven-assembly-plugin + 2.3 + false + + + make-assembly + package + + single + + + + src/main/assembly/distribution.xml + + + + + + false + out + false + false + + src/main/assembly/distribution.xml + + + + + + + ${basedir}/src/main/resources + + + ${project.artifactId}-${project.version} + + \ No newline at end of file diff --git a/dss-apps/dss-datapipe-server/src/main/assembly/distribution.xml b/dss-apps/dss-datapipe-server/src/main/assembly/distribution.xml new file mode 100644 index 0000000000..d6c12b0fb3 --- /dev/null +++ b/dss-apps/dss-datapipe-server/src/main/assembly/distribution.xml @@ -0,0 +1,44 @@ + + + + dss-apiService-server + + dir + + true + dss-datapipe-server + + + + + + lib + true + true + false + true + true + + + + + + + diff --git a/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/config/DataPipeServiceConfiguration.java b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/config/DataPipeServiceConfiguration.java new file mode 100644 index 0000000000..16185ebf48 --- /dev/null +++ b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/config/DataPipeServiceConfiguration.java @@ -0,0 +1,30 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.datapipe.config; + +import com.webank.wedatasphere.linkis.common.conf.CommonVars; + + +public class DataPipeServiceConfiguration { + public final static CommonVars LINKIS_AUTHOR_USER_TOKEN = CommonVars.apply("wds.linkis.client.api.service.author.user.token","WS-AUTH"); + public final static CommonVars LINKIS_ADMIN_USER = CommonVars.apply("wds.linkis.client.api.service.adminuser","ws"); + + public final static CommonVars LINKIS_CONNECTION_TIMEOUT = CommonVars.apply("wds.linkis.flow.connection.timeout",30000); + public final static CommonVars LINKIS_API_VERSION = CommonVars.apply("wds.linkis.server.version","v1"); + + +} diff --git a/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/dao/DataPipeDao.java b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/dao/DataPipeDao.java new file mode 100644 index 0000000000..f1c065e996 --- /dev/null +++ b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/dao/DataPipeDao.java @@ -0,0 +1,24 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.datapipe.dao; + + + +public interface DataPipeDao { + + +} diff --git a/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/dao/mapper/DataPipeMapper.xml b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/dao/mapper/DataPipeMapper.xml new file mode 100644 index 0000000000..a2c2837e2b --- /dev/null +++ b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/dao/mapper/DataPipeMapper.xml @@ -0,0 +1,20 @@ + + + + + + diff --git a/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/exception/DataPipeExecuteException.java b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/exception/DataPipeExecuteException.java new file mode 100644 index 0000000000..ea8445393c --- /dev/null +++ b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/exception/DataPipeExecuteException.java @@ -0,0 +1,31 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.datapipe.exception; + +import com.webank.wedatasphere.linkis.common.exception.ErrorException; + + +public class DataPipeExecuteException extends ErrorException { + + public DataPipeExecuteException(int errCode, String desc) { + super(errCode, desc); + } + + public DataPipeExecuteException(int errCode, String desc, String ip, int port, String serviceKind) { + super(errCode, desc, ip, port, serviceKind); + } + } diff --git a/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/execute/LinkisJobSubmit.java b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/execute/LinkisJobSubmit.java new file mode 100644 index 0000000000..eb3dbb5d2d --- /dev/null +++ b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/execute/LinkisJobSubmit.java @@ -0,0 +1,98 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.datapipe.execute; + +import com.webank.wedatasphere.dss.datapipe.config.DataPipeServiceConfiguration; +import com.webank.wedatasphere.linkis.common.conf.Configuration; +import com.webank.wedatasphere.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy; +import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfig; +import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder; +import com.webank.wedatasphere.linkis.server.socket.controller.ServerEvent; +import com.webank.wedatasphere.linkis.ujes.client.UJESClient; +import com.webank.wedatasphere.linkis.ujes.client.UJESClientImpl; +import com.webank.wedatasphere.linkis.ujes.client.request.JobExecuteAction; +import com.webank.wedatasphere.linkis.ujes.client.response.JobExecuteResult; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + + +public class LinkisJobSubmit { + + public static UJESClient getClient(Map props) { + UJESClient client = getUJESClient( + Configuration.GATEWAY_URL().getValue(props), + DataPipeServiceConfiguration.LINKIS_ADMIN_USER.getValue(props), + DataPipeServiceConfiguration.LINKIS_AUTHOR_USER_TOKEN.getValue(props), + props); + + return client; + } + + public static UJESClient getUJESClient(String url, String user, String token, Map jobProps){ + return new UJESClientImpl(getClientConfig(url,user,token, jobProps)); + } + + public static DWSClientConfig getClientConfig(String url, String user, String token, Map jobProps){ + DWSClientConfig clientConfig = ((DWSClientConfigBuilder) (DWSClientConfigBuilder.newBuilder() + .addServerUrl(url) + .connectionTimeout(DataPipeServiceConfiguration.LINKIS_CONNECTION_TIMEOUT.getValue(jobProps)) + .discoveryEnabled(false).discoveryFrequency(1, TimeUnit.MINUTES) + .loadbalancerEnabled(true) + .maxConnectionSize(5) + .retryEnabled(false).readTimeout(DataPipeServiceConfiguration.LINKIS_CONNECTION_TIMEOUT.getValue(jobProps)) + .setAuthenticationStrategy(new TokenAuthenticationStrategy()) + .setAuthTokenKey(user).setAuthTokenValue(token))) + .setDWSVersion(DataPipeServiceConfiguration.LINKIS_API_VERSION.getValue(jobProps)).build(); + return clientConfig; + } + + /** + * @param operation + * @param client + * @return + */ + public static JobExecuteResult execute(ServerEvent operation, UJESClient client) { + Map dataMap = operation.getData(); + String runType = (String)dataMap.get("runType"); + String background = (String)dataMap.get("background"); + String executionCode = (String)dataMap.get("executionCode"); + String scriptPath = "default.scala"; + Map sourceMap = (Map)dataMap.get("source"); + scriptPath = sourceMap!=null && sourceMap.containsKey("scriptPath") ? sourceMap.get("scriptPath") : scriptPath; + + String executeApplicationName = (String)dataMap.get("executeApplicationName"); + String umUser = (String)dataMap.get("umUser"); + Map paramsMap = (Map)dataMap.get("params"); + + Map source = new HashMap<>(); + source.put("DSS-DataPipe",scriptPath); + JobExecuteAction.Builder builder = JobExecuteAction.builder().setCreator("IDE") + .addExecuteCode(executionCode) + .setEngineTypeStr(executeApplicationName) + .setRunTypeStr(runType) + .setUser(umUser) + .setParams(paramsMap) + .setVariableMap(new HashMap<>()) + .setSource(source); + JobExecuteAction jobAction = builder.build(); + JobExecuteResult res = client.execute(jobAction); + return res; + } + +} diff --git a/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/restful/DSSEntranceRestfulApi.java b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/restful/DSSEntranceRestfulApi.java new file mode 100644 index 0000000000..bf5cb026b0 --- /dev/null +++ b/dss-apps/dss-datapipe-server/src/main/java/com/webank/wedatasphere/dss/datapipe/restful/DSSEntranceRestfulApi.java @@ -0,0 +1,104 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.datapipe.restful; + +import com.google.gson.Gson; +import com.google.gson.internal.LinkedTreeMap; +import com.webank.wedatasphere.dss.datapipe.entrance.background.BackGroundService; +import com.webank.wedatasphere.dss.datapipe.entrance.background.ExportBackGroundService; +import com.webank.wedatasphere.dss.datapipe.entrance.background.LoadBackGroundService; +import com.webank.wedatasphere.dss.datapipe.execute.LinkisJobSubmit; +import com.webank.wedatasphere.linkis.protocol.constants.TaskConstant; +import com.webank.wedatasphere.linkis.server.Message; +import com.webank.wedatasphere.linkis.server.security.SecurityFilter; +import com.webank.wedatasphere.linkis.server.socket.controller.ServerEvent; +import com.webank.wedatasphere.linkis.ujes.client.UJESClient; +import com.webank.wedatasphere.linkis.ujes.client.response.JobExecuteResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.util.HashMap; +import java.util.Map; + + +@Path("/dss/datapipe") +@Component +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +public class DSSEntranceRestfulApi { + private static final Logger logger = LoggerFactory.getLogger(DSSEntranceRestfulApi.class); + + @POST + @Path("/backgroundservice") + public Response backgroundservice(@Context HttpServletRequest req, Map json) { + Message message = null; + logger.info("Begin to get an execID"); + String backgroundType = (String) json.get("background"); + BackGroundService bgService = null; + if("export".equals(backgroundType)){ + bgService = new ExportBackGroundService(); + }else if("load".equals(backgroundType)){ + bgService = new LoadBackGroundService(); + }else{ + message = Message.error("export type is not exist:"+backgroundType); + return Message.messageToResponse(message); + } + Gson gson = new Gson(); + Map executionCode = (Map) json.get("executionCode"); + executionCode = gson.fromJson(gson.toJson(executionCode),LinkedTreeMap.class); + json.put("executionCode",executionCode); + json.put(TaskConstant.UMUSER, SecurityFilter.getLoginUsername(req)); + ServerEvent serverEvent = new ServerEvent(); + serverEvent.setData(json); + serverEvent.setUser(SecurityFilter.getLoginUsername(req)); + ServerEvent operation = bgService.operation(serverEvent); + + + JobExecuteResult jobExecuteResult = toLinkisEntrance(operation); + message = Message.ok(); + message.setMethod("/api/dss/datapipe/backgroundservice"); + message.data("operation",operation); + message.data("execID", jobExecuteResult.getExecID()); + message.data("taskID", jobExecuteResult.getTaskID()); + logger.info("End to get an an execID: {}, taskID: {}", jobExecuteResult.getExecID(), jobExecuteResult.getTaskID()); + return Message.messageToResponse(message); + + } + + public JobExecuteResult toLinkisEntrance(ServerEvent operation){ + JobExecuteResult jobExecuteResult = null; + try{ + Map props = new HashMap<>(); + UJESClient client = LinkisJobSubmit.getClient(props); + jobExecuteResult = LinkisJobSubmit.execute(operation,client); + }catch (Exception e){ + logger.error("toLinkisEntranceError-",e); + throw e; + } + return jobExecuteResult; + + } +} diff --git a/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/DSSDatapipeServerApplication.scala b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/DSSDatapipeServerApplication.scala new file mode 100644 index 0000000000..56b0553edc --- /dev/null +++ b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/DSSDatapipeServerApplication.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.datapipe + +import com.webank.wedatasphere.dss.common.utils.DSSMainHelper +import com.webank.wedatasphere.linkis.DataWorkCloudApplication +import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils} + + +object DSSDatapipeServerApplication extends Logging { + + val userName: String = System.getProperty("user.name") + val hostName: String = Utils.getComputerName + + def main(args: Array[String]): Unit = { + val serviceName = System.getProperty("serviceName")//ProjectConf.SERVICE_NAME.getValue + DSSMainHelper.formatPropertyFiles(serviceName) + val allArgs = args ++ DSSMainHelper.getExtraSpringOptions + System.setProperty("hostName", hostName) + System.setProperty("userName", userName) + info(s"Ready to start $serviceName with args: ${allArgs.toList}.") + println(s"Test Ready to start $serviceName with args: ${allArgs.toList}.") + DataWorkCloudApplication.main(allArgs) + } +} \ No newline at end of file diff --git a/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/AbstractBackGroundService.scala b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/AbstractBackGroundService.scala new file mode 100644 index 0000000000..3e0b97dac1 --- /dev/null +++ b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/AbstractBackGroundService.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.datapipe.entrance.background + + +abstract class AbstractBackGroundService extends BackGroundService{ + +} diff --git a/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/BackGroundService.scala b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/BackGroundService.scala new file mode 100644 index 0000000000..01af25e8b7 --- /dev/null +++ b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/BackGroundService.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.datapipe.entrance.background + +import com.webank.wedatasphere.linkis.server.socket.controller.ServerEvent + + +trait BackGroundService { + val serviceType:String + def operation(serverEvent: ServerEvent):ServerEvent +} diff --git a/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/BackGroundServiceUtils.scala b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/BackGroundServiceUtils.scala new file mode 100644 index 0000000000..6fa27fd9cf --- /dev/null +++ b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/BackGroundServiceUtils.scala @@ -0,0 +1,113 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.datapipe.entrance.background + +import java.io.{InputStream, OutputStream} +import java.lang +import java.lang.reflect.Type +import java.text.SimpleDateFormat +import java.util.Date + +import com.fasterxml.jackson.databind.ObjectMapper +import com.google.gson.{GsonBuilder, JsonElement, JsonPrimitive, JsonSerializationContext, JsonSerializer} +import com.webank.wedatasphere.linkis.common.conf.CommonVars +import com.webank.wedatasphere.linkis.common.io.FsPath +import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils} +import com.webank.wedatasphere.linkis.storage.FSFactory +import com.webank.wedatasphere.linkis.storage.fs.FileSystem +import com.webank.wedatasphere.linkis.storage.utils.FileSystemUtils +import org.apache.commons.io.IOUtils +import org.apache.commons.lang.time.DateFormatUtils + +object BackGroundServiceUtils extends Logging { + + private val CODE_STORE_PREFIX = CommonVars("bdp.dataworkcloud.bgservice.store.prefix", "hdfs:///tmp/bdp-ide/") + private val CODE_STORE_SUFFIX = CommonVars("bdp.dataworkcloud.bgservice.store.suffix", "") + private val CHARSET = "utf-8" + private val CODE_SPLIT = ";" + private val LENGTH_SPLIT = "#" + + def storeExecutionCode(destination: String,user:String): String = { + if (destination.length < 60000) return null + val path: String = getCodeStorePath(user) + val fsPath: FsPath = new FsPath(path) + val fileSystem = FSFactory.getFsByProxyUser(fsPath, user).asInstanceOf[FileSystem] + fileSystem.init(null) + var os: OutputStream = null + var position = 0L + val codeBytes = destination.getBytes(CHARSET) + Utils.tryFinally { + path.intern() synchronized { + if (!fileSystem.exists(fsPath)) FileSystemUtils.createNewFile(fsPath, user, true) + os = fileSystem.write(fsPath, false) + position = fileSystem.get(path).getLength + IOUtils.write(codeBytes, os) + } + } { + if (fileSystem != null) fileSystem.close() + IOUtils.closeQuietly(os) + } + val length = codeBytes.length + path + CODE_SPLIT + position + LENGTH_SPLIT + length + } + + def exchangeExecutionCode(codePath: String): Unit = { + import scala.util.control.Breaks._ + val path = codePath.substring(0, codePath.lastIndexOf(CODE_SPLIT)) + val codeInfo = codePath.substring(codePath.lastIndexOf(CODE_SPLIT) + 1) + val infos: Array[String] = codeInfo.split(LENGTH_SPLIT) + val position = infos(0).toInt + var lengthLeft = infos(1).toInt + val tub = new Array[Byte](1024) + val executionCode: StringBuilder = new StringBuilder + val fsPath: FsPath = new FsPath(path) + val fileSystem = FSFactory.getFsByProxyUser(fsPath, System.getProperty("user.name")).asInstanceOf[FileSystem] + fileSystem.init(null) + var is: InputStream = null + if(!fileSystem.exists(fsPath)) return + Utils.tryFinally { + is = fileSystem.read(fsPath) + if (position > 0) is.skip(position) + breakable { + while (lengthLeft > 0) { + val readed = is.read(tub) + val useful = Math.min(readed, lengthLeft) + if (useful < 0) break() + lengthLeft -= useful + executionCode.append(new String(tub, 0, useful, CHARSET)) + } + } + } { + if (fileSystem != null) fileSystem.close() + IOUtils.closeQuietly(is) + } + executionCode.toString() + } + + private def getCodeStorePath(user: String): String = { + val date: String = DateFormatUtils.format(new Date, "yyyyMMdd") + s"${CODE_STORE_PREFIX.getValue}${user}${CODE_STORE_SUFFIX.getValue}/executionCode/${date}/_bgservice" + } + + implicit val gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").serializeNulls + .registerTypeAdapter(classOf[java.lang.Double], new JsonSerializer[java.lang.Double] { + override def serialize(t: lang.Double, `type`: Type, jsonSerializationContext: JsonSerializationContext): JsonElement = + if(t == t.longValue()) new JsonPrimitive(t.longValue()) else new JsonPrimitive(t) + }).create + + implicit val jacksonJson = new ObjectMapper().setDateFormat(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")) +} diff --git a/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/ExportBackGroundService.scala b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/ExportBackGroundService.scala new file mode 100644 index 0000000000..a51350067d --- /dev/null +++ b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/ExportBackGroundService.scala @@ -0,0 +1,101 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.datapipe.entrance.background + +import java.util + +import com.google.gson.internal.LinkedTreeMap +import com.google.gson.{JsonObject, JsonParser} +import com.webank.wedatasphere.linkis.common.utils.Logging +import com.webank.wedatasphere.linkis.server._ +import com.webank.wedatasphere.linkis.server.socket.controller.ServerEvent + + +class ExportBackGroundService extends AbstractBackGroundService with Logging{ + override val serviceType: String = "export" + + override def operation(serverEvent: ServerEvent): ServerEvent = { + val params = serverEvent.getData.map { case (k, v) => k -> v.asInstanceOf[Any] } + //val executionCode = params.get("executionCode").get + val ec = params.get("executionCode").get.asInstanceOf[LinkedTreeMap[String,LinkedTreeMap[String,String]]] + if(ec.get("destination")!=null && ec.get("destination").get("fieldDelimiter") != null){ + info(s"---${ec.get("destination").get("fieldDelimiter")}---") + ec.get("destination").get("fieldDelimiter") match { + case "\\t" =>ec.get("destination").put("fieldDelimiter","\t") + case _ =>info("---other fieldDelimiter---") + } + } + val executionCode = BackGroundServiceUtils.gson.toJson(params.get("executionCode").get) + // TODO: Head may be removed + var newExecutionCode = "" + val jsonParser = new JsonParser() + val jsonCode = jsonParser.parse(executionCode.asInstanceOf[String]).asInstanceOf[JsonObject] + val destination = "val destination = \"\"\"" + jsonCode.get("destination").toString + "\"\"\"\n" + val dataInfo = jsonCode.get("dataInfo").toString + var newDataInfo = "val dataInfo = \"\"\"" + val storePath = BackGroundServiceUtils.storeExecutionCode(dataInfo,serverEvent.getUser) + if(storePath == null) newDataInfo += dataInfo + "\"\"\"\n" else newDataInfo += storePath + "\"\"\"\n" + newExecutionCode += destination + newExecutionCode += newDataInfo + if(storePath == null) + newExecutionCode += "com.webank.wedatasphere.linkis.engineplugin.spark.imexport.ExportData.exportData(spark,dataInfo,destination)" + else + newExecutionCode += "com.webank.wedatasphere.linkis.engineplugin.spark.imexport.ExportData.exportDataByFile(spark,dataInfo,destination)" + params.put("executionCode", newExecutionCode) + print(newExecutionCode) + val map = new util.HashMap[String, Object]() + params.foreach(f => map.put(f._1, f._2.asInstanceOf[Object])) + serverEvent.setData(map) + serverEvent + } + + def splitDataInfo(dataInfo:String):util.ArrayList[String] = { + val length = 6000 + val list = new util.ArrayList[String]() + var size = dataInfo.length /length + if(dataInfo.length % length != 0) size += 1 + for(i <- 0 to size-1){ + list.add(subString(dataInfo,i * length,(i +1) * length)) + } + list + } + + private def subString(str:String,begin:Int,end:Int):String = { + if(begin > str.length) return null + if(end > str.length) return str.substring(begin,str.length) + str.substring(begin,end) + } + + +} + +object A{ + def main(args: Array[String]): Unit = { + val builder: StringBuilder = new StringBuilder + for(i <- 1 to 200){ + if(i == 1) builder.append(2) else builder.append(1) + + } + var newDataInfo = "val dataInfo = \"\"\"" + val service: ExportBackGroundService = new ExportBackGroundService() + val splitDataInfos: util.ArrayList[String] =service.splitDataInfo(builder.toString()) + val splitString = "\"\"\"" + "+" + "\"\"\"" + val compaction = splitDataInfos.foldLeft("")((l,r) => l + splitString + r ) + newDataInfo += compaction.substring(splitString.length,compaction.length) + "\"\"\"\n" + print(newDataInfo) + } +} diff --git a/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/LoadBackGroundService.scala b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/LoadBackGroundService.scala new file mode 100644 index 0000000000..fa21113557 --- /dev/null +++ b/dss-apps/dss-datapipe-server/src/main/scala/com/webank/wedatasphere/dss/datapipe/entrance/background/LoadBackGroundService.scala @@ -0,0 +1,74 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.datapipe.entrance.background + +import java.util + +import com.google.gson.{JsonObject, JsonParser} +import com.webank.wedatasphere.linkis.server._ +import com.webank.wedatasphere.linkis.server.socket.controller.ServerEvent + + +class LoadBackGroundService extends AbstractBackGroundService { + + override val serviceType: String = "load" + + override def operation(serverEvent: ServerEvent): ServerEvent = { + val params = serverEvent.getData.map { case (k, v) => k -> v.asInstanceOf[Any] } + val executionCode = BackGroundServiceUtils.gson.toJson(params.get("executionCode").get) + // TODO: Head may be removed + var newExecutionCode = "" + val jsonParser = new JsonParser() + val jsonCode = jsonParser.parse(executionCode.asInstanceOf[String]).asInstanceOf[JsonObject] + val source = "val source = \"\"\"" + jsonCode.get("source").toString + "\"\"\"\n" + val destination = jsonCode.get("destination").toString + var newDestination = "val destination = \"\"\"" + val storePath = BackGroundServiceUtils.storeExecutionCode(destination,serverEvent.getUser) + if(storePath == null) newDestination +=destination +"\"\"\"\n" else newDestination +=storePath + "\"\"\"\n" + newExecutionCode += source + newExecutionCode += newDestination + if(storePath == null){ + newExecutionCode += "com.webank.wedatasphere.linkis.engineplugin.spark.imexport.LoadData.loadDataToTable(spark,source,destination)" + }else{ + newExecutionCode += "com.webank.wedatasphere.linkis.engineplugin.spark.imexport.LoadData.loadDataToTableByFile(spark,destination,source)" + } + + params.put("executionCode", newExecutionCode) + print(newExecutionCode) + val map = new util.HashMap[String, Object]() + params.foreach(f => map.put(f._1, f._2.asInstanceOf[Object])) + serverEvent.setData(map) + serverEvent + } + + def splitDestination(destination:String):util.ArrayList[String] = { + val length = 6000 + val list = new util.ArrayList[String]() + var size = destination.length /length + if(destination.length % length != 0) size += 1 + for(i <- 0 to size-1){ + list.add(subString(destination,i * length,(i +1) * length)) + } + list + } + + private def subString(str:String,begin:Int,end:Int):String = { + if(begin > str.length) return null + if(end > str.length) return str.substring(begin,str.length) + str.substring(begin,end) + } +}