diff --git a/tis-aliyun-fs-plugin/pom.xml b/tis-aliyun-fs-plugin/pom.xml index 7efd145bd..7c8c9e5a4 100644 --- a/tis-aliyun-fs-plugin/pom.xml +++ b/tis-aliyun-fs-plugin/pom.xml @@ -29,7 +29,7 @@ com.qlangtech.tis.plugins tis-aliyun-fs-plugin - tpi + tis-aliyun-fs-plugin diff --git a/tis-aliyun-fs-plugin/src/main/java/com/qlangtech/tis/plugin/fs/aliyun/oss/AliyunOSSFileSystem.java b/tis-aliyun-fs-plugin/src/main/java/com/qlangtech/tis/plugin/fs/aliyun/oss/AliyunOSSFileSystem.java index 39eed1be1..d2a9d3355 100644 --- a/tis-aliyun-fs-plugin/src/main/java/com/qlangtech/tis/plugin/fs/aliyun/oss/AliyunOSSFileSystem.java +++ b/tis-aliyun-fs-plugin/src/main/java/com/qlangtech/tis/plugin/fs/aliyun/oss/AliyunOSSFileSystem.java @@ -1,293 +1,296 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.qlangtech.tis.plugin.fs.aliyun.oss; - -import com.aliyun.oss.OSS; -import com.aliyun.oss.OSSClientBuilder; -import com.aliyun.oss.model.*; -import com.qlangtech.tis.config.aliyun.IHttpToken; -import com.qlangtech.tis.fs.*; -import org.apache.commons.lang.StringUtils; - -import java.io.*; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.stream.Collectors; - -/** - * @author 百岁(baisui@qlangtech.com) - * @create: 2020-04-12 20:10 - * @date 2020/04/13 - */ -public class AliyunOSSFileSystem implements ITISFileSystem { - - //private final IHttpToken aliyunToken; - private final OSS client; - private final String bucketName; - private final String rootDir; - - private static final ExecutorService ossPutExecutor = Executors.newCachedThreadPool(); - - public AliyunOSSFileSystem(IHttpToken aliyunToken, String endpoint, String buket, String rootDir) { - this.bucketName = buket; - this.rootDir = rootDir; - client = new OSSClientBuilder().build(endpoint, aliyunToken.getAccessKeyId(), aliyunToken.getAccessKeySecret()); - } - - @Override - public OSS unwrap() { - return this.client; - } - - @Override - public IPath getRootDir() { - return this.getPath(this.rootDir); - } - - @Override - public String getName() { - return "aliyun-oss"; - } - - @Override - public IPath getPath(String path) { - // ObjectMetadata metadata = client.getObjectMetadata(this.bucketName, path); - return new OSSPath(path); - } - - private class OSSPath implements IPath { - //private final ObjectMetadata metadata; - private final String path; - - public OSSPath(String path) { - // this.metadata = metadata; - this.path = path; - } - - @Override - public String getName() { - return this.path; - } - - @Override - public T unwrap(Class iface) { - //return iface.cast(this.metadata); - return iface.cast(this); - } - } - - @Override - public IPath getPath(IPath parent, String name) { - - boolean parentEndWithSlash = StringUtils.endsWith(parent.getName(), "/"); - boolean childStartWithSlash = StringUtils.startsWith(name, "/"); - String filePath = null; - if (parentEndWithSlash && childStartWithSlash) { - filePath = parent.getName() + StringUtils.substring(name, 1); - } else if (!parentEndWithSlash && !childStartWithSlash) { - filePath = parent.getName() + "/" + name; - } else { - filePath = parent.getName() + name; - } - return new OSSPath(filePath); - } - - @Override - public OutputStream getOutputStream(IPath path) { - - try { - PipedOutputStream outputStream = new PipedOutputStream(); - PipedInputStream inputStream = new PipedInputStream(outputStream); - ossPutExecutor.execute(() -> { - this.client.putObject(this.bucketName, path.getName(), inputStream); - }); - return new OSSDataOutputStream(outputStream); - } catch (IOException e) { - throw new RuntimeException(e); - } - // metaData. - - // this.client. - } - - @Override - public FSDataInputStream open(IPath path, int bufferSize) { - OSSObject oObj = this.client.getObject(new GetObjectRequest(bucketName, path.getName())); - return new OSSDataInputStream(new BufferedInputStream(oObj.getObjectContent(), bufferSize)); - } - - private static class OSSDataInputStream extends FSDataInputStream { - public OSSDataInputStream(InputStream in) { - super(in); - } - - @Override - public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void seek(long position) { - throw new UnsupportedOperationException(); - } - } - - @Override - public FSDataInputStream open(IPath path) { - OSSObject oObj = this.client.getObject(new GetObjectRequest(bucketName, path.getName())); - return new OSSDataInputStream((oObj.getObjectContent())); - } - - @Override - public TISFSDataOutputStream create(IPath f, boolean overwrite, int bufferSize) throws IOException { - - PipedOutputStream outputStream = new PipedOutputStream(); - PipedInputStream inputStream = new PipedInputStream(outputStream); - ossPutExecutor.execute(() -> { - this.client.putObject(this.bucketName, f.getName(), inputStream); - }); - return new OSSDataOutputStream(new BufferedOutputStream(outputStream, bufferSize)); - } - - - @Override - public TISFSDataOutputStream create(IPath f, boolean overwrite) throws IOException { - return create(f, overwrite, 2048); - } - - @Override - public boolean exists(IPath path) { - return false; - } - - @Override - public boolean mkdirs(IPath f) throws IOException { - return false; - } - - @Override - public void copyToLocalFile(IPath srcPath, File dstPath) { - } - - @Override - public void rename(IPath from, IPath to) { - } - - @Override - public boolean copyFromLocalFile(File localIncrPath, IPath remoteIncrPath) { - return false; - } - - @Override - public IFileSplitor getSplitor(IPath path) throws Exception { - return null; - } - - @Override - public IContentSummary getContentSummary(IPath path) { - ObjectMetadata meta = this.client.getObjectMetadata(this.bucketName, path.getName()); - return () -> { - return meta.getContentLength(); - }; - } - - @Override - public List listChildren(IPath path) { - ObjectListing objectListing = this.client.listObjects(this.bucketName, path.getName()); - return objectListing.getObjectSummaries().stream().map((summary) -> { - return new OSSPathInfo(path, summary); - }).collect(Collectors.toList()); - } - - - private static class OSSPathInfo implements IPathInfo { - private final OSSObjectSummary meta; - private final IPath path; - - public OSSPathInfo(IPath path, OSSObjectSummary meta) { - this.meta = meta; - this.path = path; - } - - @Override - public String getName() { - return meta.getKey(); - } - - @Override - public IPath getPath() { - return this.path; - } - - @Override - public boolean isDir() { - return false; - } - - @Override - public long getModificationTime() { - return meta.getLastModified().getTime(); - } - - @Override - public long getLength() { - return meta.getSize(); - } - } - - @Override - public List listChildren(IPath path, IPathFilter filter) { - return null; - } - - @Override - public IPathInfo getFileInfo(IPath path) { - return null; - } - - @Override - public boolean delete(IPath f, boolean recursive) throws IOException { - return false; - } - - @Override - public boolean delete(IPath f) throws IOException { - return false; - } - - @Override - public void close() { - } - - private static class OSSDataOutputStream extends TISFSDataOutputStream { - public OSSDataOutputStream(OutputStream out) { - super(out); - } - - @Override - public void write(int b) throws IOException { - this.out.write(b); - } - - @Override - public long getPos() throws IOException { - throw new UnsupportedOperationException(); - } - } -} +///** +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// *

+// * http://www.apache.org/licenses/LICENSE-2.0 +// *

+// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +//package com.qlangtech.tis.plugin.fs.aliyun.oss; +// +//import com.aliyun.oss.OSS; +//import com.aliyun.oss.OSSClientBuilder; +//import com.aliyun.oss.model.*; +//import com.qlangtech.tis.config.aliyun.IHttpToken; +//import com.qlangtech.tis.fs.*; +//import org.apache.commons.lang.StringUtils; +// +//import java.io.*; +//import java.util.List; +//import java.util.concurrent.ExecutorService; +//import java.util.concurrent.Executors; +//import java.util.stream.Collectors; +// +///** +// * @author 百岁(baisui@qlangtech.com) +// * @create: 2020-04-12 20:10 +// * @date 2020/04/13 +// */ +//public class AliyunOSSFileSystem implements ITISFileSystem { +// +// //private final IHttpToken aliyunToken; +// private final OSS client; +// private final String bucketName; +// private final String rootDir; +// +// private static final ExecutorService ossPutExecutor = Executors.newCachedThreadPool(); +// +// public AliyunOSSFileSystem(IHttpToken aliyunToken, String endpoint, String buket, String rootDir) { +// this.bucketName = buket; +// this.rootDir = rootDir; +// +// aliyunToken. +// +// client = new OSSClientBuilder().build(endpoint, aliyunToken.getAccessKeyId(), aliyunToken.getAccessKeySecret()); +// } +// +// @Override +// public OSS unwrap() { +// return this.client; +// } +// +// @Override +// public IPath getRootDir() { +// return this.getPath(this.rootDir); +// } +// +// @Override +// public String getName() { +// return "aliyun-oss"; +// } +// +// @Override +// public IPath getPath(String path) { +// // ObjectMetadata metadata = client.getObjectMetadata(this.bucketName, path); +// return new OSSPath(path); +// } +// +// private class OSSPath implements IPath { +// //private final ObjectMetadata metadata; +// private final String path; +// +// public OSSPath(String path) { +// // this.metadata = metadata; +// this.path = path; +// } +// +// @Override +// public String getName() { +// return this.path; +// } +// +// @Override +// public T unwrap(Class iface) { +// //return iface.cast(this.metadata); +// return iface.cast(this); +// } +// } +// +// @Override +// public IPath getPath(IPath parent, String name) { +// +// boolean parentEndWithSlash = StringUtils.endsWith(parent.getName(), "/"); +// boolean childStartWithSlash = StringUtils.startsWith(name, "/"); +// String filePath = null; +// if (parentEndWithSlash && childStartWithSlash) { +// filePath = parent.getName() + StringUtils.substring(name, 1); +// } else if (!parentEndWithSlash && !childStartWithSlash) { +// filePath = parent.getName() + "/" + name; +// } else { +// filePath = parent.getName() + name; +// } +// return new OSSPath(filePath); +// } +// +// @Override +// public OutputStream getOutputStream(IPath path) { +// +// try { +// PipedOutputStream outputStream = new PipedOutputStream(); +// PipedInputStream inputStream = new PipedInputStream(outputStream); +// ossPutExecutor.execute(() -> { +// this.client.putObject(this.bucketName, path.getName(), inputStream); +// }); +// return new OSSDataOutputStream(outputStream); +// } catch (IOException e) { +// throw new RuntimeException(e); +// } +// // metaData. +// +// // this.client. +// } +// +// @Override +// public FSDataInputStream open(IPath path, int bufferSize) { +// OSSObject oObj = this.client.getObject(new GetObjectRequest(bucketName, path.getName())); +// return new OSSDataInputStream(new BufferedInputStream(oObj.getObjectContent(), bufferSize)); +// } +// +// private static class OSSDataInputStream extends FSDataInputStream { +// public OSSDataInputStream(InputStream in) { +// super(in); +// } +// +// @Override +// public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { +// throw new UnsupportedOperationException(); +// } +// +// @Override +// public void seek(long position) { +// throw new UnsupportedOperationException(); +// } +// } +// +// @Override +// public FSDataInputStream open(IPath path) { +// OSSObject oObj = this.client.getObject(new GetObjectRequest(bucketName, path.getName())); +// return new OSSDataInputStream((oObj.getObjectContent())); +// } +// +// @Override +// public TISFSDataOutputStream create(IPath f, boolean overwrite, int bufferSize) throws IOException { +// +// PipedOutputStream outputStream = new PipedOutputStream(); +// PipedInputStream inputStream = new PipedInputStream(outputStream); +// ossPutExecutor.execute(() -> { +// this.client.putObject(this.bucketName, f.getName(), inputStream); +// }); +// return new OSSDataOutputStream(new BufferedOutputStream(outputStream, bufferSize)); +// } +// +// +// @Override +// public TISFSDataOutputStream create(IPath f, boolean overwrite) throws IOException { +// return create(f, overwrite, 2048); +// } +// +// @Override +// public boolean exists(IPath path) { +// return false; +// } +// +// @Override +// public boolean mkdirs(IPath f) throws IOException { +// return false; +// } +// +// @Override +// public void copyToLocalFile(IPath srcPath, File dstPath) { +// } +// +// @Override +// public void rename(IPath from, IPath to) { +// } +// +// @Override +// public boolean copyFromLocalFile(File localIncrPath, IPath remoteIncrPath) { +// return false; +// } +// +// @Override +// public IFileSplitor getSplitor(IPath path) throws Exception { +// return null; +// } +// +// @Override +// public IContentSummary getContentSummary(IPath path) { +// ObjectMetadata meta = this.client.getObjectMetadata(this.bucketName, path.getName()); +// return () -> { +// return meta.getContentLength(); +// }; +// } +// +// @Override +// public List listChildren(IPath path) { +// ObjectListing objectListing = this.client.listObjects(this.bucketName, path.getName()); +// return objectListing.getObjectSummaries().stream().map((summary) -> { +// return new OSSPathInfo(path, summary); +// }).collect(Collectors.toList()); +// } +// +// +// private static class OSSPathInfo implements IPathInfo { +// private final OSSObjectSummary meta; +// private final IPath path; +// +// public OSSPathInfo(IPath path, OSSObjectSummary meta) { +// this.meta = meta; +// this.path = path; +// } +// +// @Override +// public String getName() { +// return meta.getKey(); +// } +// +// @Override +// public IPath getPath() { +// return this.path; +// } +// +// @Override +// public boolean isDir() { +// return false; +// } +// +// @Override +// public long getModificationTime() { +// return meta.getLastModified().getTime(); +// } +// +// @Override +// public long getLength() { +// return meta.getSize(); +// } +// } +// +// @Override +// public List listChildren(IPath path, IPathFilter filter) { +// return null; +// } +// +// @Override +// public IPathInfo getFileInfo(IPath path) { +// return null; +// } +// +// @Override +// public boolean delete(IPath f, boolean recursive) throws IOException { +// return false; +// } +// +// @Override +// public boolean delete(IPath f) throws IOException { +// return false; +// } +// +// @Override +// public void close() { +// } +// +// private static class OSSDataOutputStream extends TISFSDataOutputStream { +// public OSSDataOutputStream(OutputStream out) { +// super(out); +// } +// +// @Override +// public void write(int b) throws IOException { +// this.out.write(b); +// } +// +// @Override +// public long getPos() throws IOException { +// throw new UnsupportedOperationException(); +// } +// } +//} diff --git a/tis-aliyun-fs-plugin/src/main/java/com/qlangtech/tis/plugin/fs/aliyun/oss/AliyunOSSFileSystemFactory.java b/tis-aliyun-fs-plugin/src/main/java/com/qlangtech/tis/plugin/fs/aliyun/oss/AliyunOSSFileSystemFactory.java index 0f8e66ad3..8dd32d741 100644 --- a/tis-aliyun-fs-plugin/src/main/java/com/qlangtech/tis/plugin/fs/aliyun/oss/AliyunOSSFileSystemFactory.java +++ b/tis-aliyun-fs-plugin/src/main/java/com/qlangtech/tis/plugin/fs/aliyun/oss/AliyunOSSFileSystemFactory.java @@ -1,98 +1,99 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.qlangtech.tis.plugin.fs.aliyun.oss; - -import com.qlangtech.tis.annotation.Public; -import com.qlangtech.tis.config.ParamsConfig; -import com.qlangtech.tis.config.aliyun.IHttpToken; -import com.qlangtech.tis.extension.Descriptor; -import com.qlangtech.tis.fs.ITISFileSystem; -import com.qlangtech.tis.offline.FileSystemFactory; -import com.qlangtech.tis.plugin.annotation.FormField; -import com.qlangtech.tis.plugin.annotation.FormFieldType; -import com.qlangtech.tis.plugin.annotation.Validator; - -import java.io.File; - -/** - * 基于阿里云OSS的 - * - * @author 百岁(baisui@qlangtech.com) - * @create: 2020-04-12 20:03 - * @date 2020/04/13 - */ -@Public -public class AliyunOSSFileSystemFactory extends FileSystemFactory { - - @FormField(identity = true, ordinal = 0, type = FormFieldType.INPUTTEXT, validate = {Validator.require, Validator.identity}) - public String name; - - @FormField(ordinal = 2, type = FormFieldType.INPUTTEXT, validate = {Validator.require}) - public String rootDir; - - @FormField(ordinal = 3, type = FormFieldType.SELECTABLE, validate = {Validator.require, Validator.identity}) - public String aliyunToken; - - //example: http://oss-cn-hangzhou.aliyuncs.com - @FormField(ordinal = 3, type = FormFieldType.INPUTTEXT, validate = {Validator.require, Validator.url}) - public String endpoint; - - @FormField(ordinal = 4, type = FormFieldType.INPUTTEXT, validate = {Validator.require}) - public String bucketName; - - @Override - public Configuration getConfiguration() { - throw new UnsupportedOperationException(); - // return null; - } - - @Override - public void setConfigFile(File cfgDir) { - throw new UnsupportedOperationException(); - } - - @Override - public String identityValue() { - return this.name; - } - - @Override - public String getFSAddress() { - return this.endpoint; - } - - private ITISFileSystem ossFs; - - @Override - public ITISFileSystem getFileSystem() { - if (ossFs == null) { - IHttpToken aliyunToken = ParamsConfig.getItem(this.aliyunToken, IHttpToken.KEY_DISPLAY_NAME); - ossFs = new AliyunOSSFileSystem(aliyunToken, this.endpoint, this.bucketName, this.rootDir); - } - return ossFs; - } - - // @TISExtension - public static class DefaultDescriptor extends Descriptor { - - public DefaultDescriptor() { - super(); - this.registerSelectOptions(IHttpToken.KEY_FIELD_ALIYUN_TOKEN, () -> ParamsConfig.getItems(IHttpToken.KEY_DISPLAY_NAME)); - } - } -} +///** +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// *

+// * http://www.apache.org/licenses/LICENSE-2.0 +// *

+// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +//package com.qlangtech.tis.plugin.fs.aliyun.oss; +// +//import com.qlangtech.tis.annotation.Public; +//import com.qlangtech.tis.config.ParamsConfig; +//import com.qlangtech.tis.config.aliyun.IHttpToken; +//import com.qlangtech.tis.extension.Descriptor; +//import com.qlangtech.tis.fs.ITISFileSystem; +//import com.qlangtech.tis.offline.FileSystemFactory; +//import com.qlangtech.tis.plugin.annotation.FormField; +//import com.qlangtech.tis.plugin.annotation.FormFieldType; +//import com.qlangtech.tis.plugin.annotation.Validator; +// +//import java.io.File; +// +///** +// * 基于阿里云OSS的 +// * +// * @author 百岁(baisui@qlangtech.com) +// * @create: 2020-04-12 20:03 +// * @date 2020/04/13 +// */ +//@Public +//public class AliyunOSSFileSystemFactory extends FileSystemFactory { +// +// @FormField(identity = true, ordinal = 0, type = FormFieldType.INPUTTEXT, validate = {Validator.require, Validator.identity}) +// public String name; +// +// @FormField(ordinal = 2, type = FormFieldType.INPUTTEXT, validate = {Validator.require}) +// public String rootDir; +// +// @FormField(ordinal = 3, type = FormFieldType.SELECTABLE, validate = {Validator.require, Validator.identity}) +// public String aliyunToken; +// +// //example: http://oss-cn-hangzhou.aliyuncs.com +// @FormField(ordinal = 3, type = FormFieldType.INPUTTEXT, validate = {Validator.require, Validator.url}) +// public String endpoint; +// +// @FormField(ordinal = 4, type = FormFieldType.INPUTTEXT, validate = {Validator.require}) +// public String bucketName; +// +// @Override +// public Configuration getConfiguration() { +// throw new UnsupportedOperationException(); +// // return null; +// } +// +// @Override +// public void setConfigFile(File cfgDir) { +// throw new UnsupportedOperationException(); +// } +// +// @Override +// public String identityValue() { +// return this.name; +// } +// +// @Override +// public String getFSAddress() { +// return this.endpoint; +// } +// +// private ITISFileSystem ossFs; +// +// @Override +// public ITISFileSystem getFileSystem() { +// if (ossFs == null) { +// +// IHttpToken aliyunToken = IHttpToken.getToken(this.aliyunToken);// ParamsConfig.getItem(this.aliyunToken, IHttpToken.KEY_DISPLAY_NAME); +// ossFs = new AliyunOSSFileSystem(aliyunToken, this.endpoint, this.bucketName, this.rootDir); +// } +// return ossFs; +// } +// +// // @TISExtension +// public static class DefaultDescriptor extends Descriptor { +// +// public DefaultDescriptor() { +// super(); +// this.registerSelectOptions(IHttpToken.KEY_FIELD_ALIYUN_TOKEN, () -> ParamsConfig.getItems(IHttpToken.KEY_DISPLAY_NAME)); +// } +// } +//} diff --git a/tis-aliyun-fs-plugin/src/main/java/com/qlangtech/tis/plugin/fs/aliyun/oss/OSSPath.java b/tis-aliyun-fs-plugin/src/main/java/com/qlangtech/tis/plugin/fs/aliyun/oss/OSSPath.java index 1ac401328..659d64f25 100644 --- a/tis-aliyun-fs-plugin/src/main/java/com/qlangtech/tis/plugin/fs/aliyun/oss/OSSPath.java +++ b/tis-aliyun-fs-plugin/src/main/java/com/qlangtech/tis/plugin/fs/aliyun/oss/OSSPath.java @@ -1,39 +1,39 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.qlangtech.tis.plugin.fs.aliyun.oss; - -import com.qlangtech.tis.fs.IPath; - -/* - * @create: 2020-04-12 23:06 - * - * @author 百岁(baisui@qlangtech.com) - * @date 2020/04/13 - */ -public class OSSPath implements IPath { - - @Override - public String getName() { - return null; - } - - @Override - public T unwrap(Class iface) { - return null; - } -} +///** +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +//package com.qlangtech.tis.plugin.fs.aliyun.oss; +// +//import com.qlangtech.tis.fs.IPath; +// +///* +// * @create: 2020-04-12 23:06 +// * +// * @author 百岁(baisui@qlangtech.com) +// * @date 2020/04/13 +// */ +//public class OSSPath implements IPath { +// +// @Override +// public String getName() { +// return null; +// } +// +// @Override +// public T unwrap(Class iface) { +// return null; +// } +//} diff --git a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/AuthToken.java b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/AuthToken.java index 003f75b85..ed676a1ea 100644 --- a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/AuthToken.java +++ b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/AuthToken.java @@ -19,10 +19,32 @@ package com.qlangtech.tis.plugin; import com.qlangtech.tis.extension.Describable; +import com.qlangtech.tis.plugin.aliyun.AccessKey; +import com.qlangtech.tis.plugin.aliyun.NoneToken; +import com.qlangtech.tis.plugin.aliyun.UsernamePassword; /** * @author: 百岁(baisui@qlangtech.com) * @create: 2022-10-31 09:51 **/ -public class AuthToken implements Describable { +public abstract class AuthToken implements Describable { + + public abstract T accept(Visitor visitor); + + + public interface Visitor { + + default public T visit(NoneToken noneToken) { + throw new UnsupportedOperationException(); + } + + default public T visit(AccessKey accessKey) { + throw new UnsupportedOperationException(); + } + + default public T visit(UsernamePassword accessKey) { + throw new UnsupportedOperationException(); + } + } + } diff --git a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/HttpEndpoint.java b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/HttpEndpoint.java index 73ec71cb6..2fc80204d 100644 --- a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/HttpEndpoint.java +++ b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/HttpEndpoint.java @@ -24,6 +24,7 @@ import com.qlangtech.tis.config.aliyun.IHttpToken; import com.qlangtech.tis.extension.Descriptor; import com.qlangtech.tis.extension.TISExtension; +import com.qlangtech.tis.plugin.aliyun.NoneToken; import com.qlangtech.tis.plugin.annotation.FormField; import com.qlangtech.tis.plugin.annotation.Validator; @@ -43,6 +44,13 @@ public class HttpEndpoint extends ParamsConfig implements IHttpToken { @FormField(ordinal = 2, validate = {}) public AuthToken authToken; + public T accept(AuthToken.Visitor visitor) { + if (authToken == null) { + return visitor.visit(new NoneToken()); + } + return authToken.accept(visitor); + } + @Override public IHttpToken createConfigInstance() { return this; diff --git a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/aliyun/AccessKey.java b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/aliyun/AccessKey.java index 0dea11909..06fae7295 100644 --- a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/aliyun/AccessKey.java +++ b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/aliyun/AccessKey.java @@ -18,7 +18,6 @@ package com.qlangtech.tis.plugin.aliyun; -import com.qlangtech.tis.config.ParamsConfig; import com.qlangtech.tis.extension.Descriptor; import com.qlangtech.tis.extension.TISExtension; import com.qlangtech.tis.plugin.AuthToken; @@ -46,6 +45,10 @@ public String getAccessKeySecret() { return this.accessKeySecret; } + @Override + public T accept(Visitor visitor) { + return visitor.visit(this); + } @TISExtension() public static class DefaultDescriptor extends Descriptor { diff --git a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/aliyun/NoneToken.java b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/aliyun/NoneToken.java new file mode 100644 index 000000000..1e46f52d1 --- /dev/null +++ b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/aliyun/NoneToken.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.tis.plugin.aliyun; + +import com.qlangtech.tis.extension.Descriptor; +import com.qlangtech.tis.extension.TISExtension; +import com.qlangtech.tis.plugin.AuthToken; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2022-10-31 12:14 + **/ +public class NoneToken extends AuthToken { + @Override + public T accept(Visitor visitor) { + return visitor.visit(this); + } + + @TISExtension() + public static class DefaultDescriptor extends Descriptor { + @Override + public String getDisplayName() { + return "none"; + } + } +} diff --git a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/aliyun/UsernamePassword.java b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/aliyun/UsernamePassword.java index e6d28c593..e465c73e2 100644 --- a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/aliyun/UsernamePassword.java +++ b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/aliyun/UsernamePassword.java @@ -18,7 +18,6 @@ package com.qlangtech.tis.plugin.aliyun; -import com.qlangtech.tis.config.ParamsConfig; import com.qlangtech.tis.extension.Descriptor; import com.qlangtech.tis.extension.TISExtension; import com.qlangtech.tis.plugin.AuthToken; @@ -38,11 +37,16 @@ public class UsernamePassword extends AuthToken { @FormField(ordinal = 3, type = FormFieldType.PASSWORD, validate = {}) public String password; + @Override + public T accept(Visitor visitor) { + return visitor.visit(this); + } + @TISExtension() public static class DefaultDescriptor extends Descriptor { @Override public String getDisplayName() { - return "userNamePassword"; + return "user"; } } } diff --git a/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXElasticsearchWriter.java b/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXElasticsearchWriter.java index 000105c08..8d67e9ebb 100644 --- a/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXElasticsearchWriter.java +++ b/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXElasticsearchWriter.java @@ -41,6 +41,7 @@ import com.qlangtech.tis.plugin.annotation.Validator; import com.qlangtech.tis.plugin.datax.elastic.ElasticEndpoint; import com.qlangtech.tis.plugin.ds.CMeta; +import com.qlangtech.tis.plugin.ds.DataType; import com.qlangtech.tis.plugin.ds.DataXReaderColType; import com.qlangtech.tis.plugin.ds.ISelectedTab; import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler; @@ -119,7 +120,11 @@ public ElasticEndpoint getToken() { return aliyunToken; } + @Override public String getIndexName() { + if (StringUtils.isEmpty(this.index)) { + throw new IllegalArgumentException("prop index can not be empty"); + } return this.index; } @@ -134,11 +139,12 @@ public SchemaMetaContent initSchemaMetaContent(ISelectedTab tab) { field = new ESField(); field.setName(m.getName()); field.setStored(true); - field.setIndexed(true); - field.setType(this.mapSearchEngineType(m.getType().getCollapse())); -// field.setSharedKey(); -// field.setUniqueKey(); + if (m.isPk()) { + field.setUniqueKey(true); + } + m.getType().accept(new CMetaTypeVisitor(field)); + field.setType(this.mapSearchEngineType(m.getType().getCollapse())); schema.fields.add(field); } byte[] schemaContent = null; @@ -146,6 +152,57 @@ public SchemaMetaContent initSchemaMetaContent(ISelectedTab tab) { return metaContent; } + private static class CMetaTypeVisitor implements DataType.TypeVisitor { + private final ESField field; + + public CMetaTypeVisitor(ESField field) { + this.field = field; + } + + private Void typeVisit(DataType type) { + field.setIndexed(true); + return null; + } + + @Override + public Void bigInt(DataType type) { + return typeVisit(type); + } + + @Override + public Void doubleType(DataType type) { + return typeVisit(type); + } + + @Override + public Void dateType(DataType type) { + return typeVisit(type); + } + + @Override + public Void timestampType(DataType type) { + return typeVisit(type); + } + + @Override + public Void bitType(DataType type) { + return typeVisit(type); + } + + @Override + public Void blobType(DataType type) { + // return typeVisit(type); + // 字节内容 需要index=false + field.setIndexed(false); + return null; + } + + @Override + public Void varcharType(DataType type) { + return typeVisit(type); + } + } + /** * 当增量开始执行前,先需要初始化一下索引实例 * diff --git a/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/ESContext.java b/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/ESContext.java index a7350d10f..6741f4039 100644 --- a/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/ESContext.java +++ b/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/ESContext.java @@ -21,6 +21,8 @@ import com.alibaba.fastjson.JSONArray; import com.qlangtech.tis.datax.IDataxContext; import com.qlangtech.tis.datax.impl.ESTableAlias; +import com.qlangtech.tis.plugin.AuthToken; +import com.qlangtech.tis.plugin.aliyun.NoneToken; import com.qlangtech.tis.plugin.aliyun.UsernamePassword; import com.qlangtech.tis.plugin.datax.elastic.ElasticEndpoint; import org.apache.commons.lang.StringUtils; @@ -41,7 +43,16 @@ public class ESContext implements IDataxContext { public ESContext(DataXElasticsearchWriter writer, ESTableAlias mapper) { this.writer = writer; this.token = writer.getToken(); - this.auth = Optional.ofNullable((UsernamePassword) token.authToken); + this.auth = token.accept(new AuthToken.Visitor>() { + @Override + public Optional visit(NoneToken noneToken) { + return Optional.empty(); + } + @Override + public Optional visit(UsernamePassword accessKey) { + return Optional.ofNullable(accessKey); + } + });// Optional.ofNullable((UsernamePassword) token.authToken); Objects.requireNonNull(this.token, "token can not be null"); this.mapper = mapper; } @@ -66,7 +77,7 @@ public String getUserName() { public String getPassword() { return auth.isPresent() ? auth.get().password : "******"; - // return StringUtils.defaultIfBlank(token.getAccessKeySecret(), "******"); + // return StringUtils.defaultIfBlank(token.getAccessKeySecret(), "******"); } public String getIndex() { diff --git a/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/ESSchema.java b/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/ESSchema.java index 699bcf4a0..6a0423c1b 100644 --- a/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/ESSchema.java +++ b/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/ESSchema.java @@ -1,19 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package com.qlangtech.tis.plugin.datax; @@ -21,11 +21,13 @@ import com.alibaba.fastjson.JSONArray; import com.google.common.collect.Lists; import com.qlangtech.tis.runtime.module.misc.ISearchEngineTokenizerType; -import com.qlangtech.tis.runtime.module.misc.TokenizerType; import com.qlangtech.tis.runtime.module.misc.VisualType; import com.qlangtech.tis.solrdao.ISchema; import com.qlangtech.tis.solrdao.SolrFieldsParser; +import org.apache.commons.collections.CollectionUtils; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -34,6 +36,8 @@ * @create: 2021-06-11 16:22 **/ public class ESSchema implements ISchema { + + private String uniqueKey; private String sharedKey; public List fields = Lists.newArrayList(); @@ -125,5 +129,27 @@ public JSONArray serialTypes() { return types; } + public List errlist = new ArrayList(); + +// public String getErrorSummary() { +// StringBuffer summary = new StringBuffer(); +// for (String err : errlist) { +// summary.append(err); +// summary.append("\n"); +// } +// return summary.toString(); +// } + + @Override + public boolean isValid() { + if (!this.errlist.isEmpty()) { + return false; + } + return CollectionUtils.isEmpty(this.errlist = ISchema.validateSchema(this.fields)); + } + @Override + public List getErrors() { + return Collections.unmodifiableList(this.errlist); + } } diff --git a/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/elastic/ElasticEndpoint.java b/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/elastic/ElasticEndpoint.java index 6c5b21d5b..26dc4e263 100644 --- a/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/elastic/ElasticEndpoint.java +++ b/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/elastic/ElasticEndpoint.java @@ -24,7 +24,9 @@ import com.qlangtech.tis.config.ParamsConfig; import com.qlangtech.tis.extension.Descriptor; import com.qlangtech.tis.extension.TISExtension; +import com.qlangtech.tis.plugin.AuthToken; import com.qlangtech.tis.plugin.HttpEndpoint; +import com.qlangtech.tis.plugin.aliyun.NoneToken; import com.qlangtech.tis.plugin.aliyun.UsernamePassword; import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler; import io.searchbox.client.JestResult; @@ -43,8 +45,18 @@ public class ElasticEndpoint extends HttpEndpoint { public static final String KEY_DISPLAY_NAME = "elasticToken"; public final ESInitialization createESInitialization() { - UsernamePassword authToken = (UsernamePassword) this.authToken; - return (ESInitialization.create(this.getEndpoint(), authToken.userName, authToken.password, + UsernamePassword auth = this.accept(new AuthToken.Visitor() { + @Override + public UsernamePassword visit(NoneToken noneToken) { + return new UsernamePassword(); + } + + @Override + public UsernamePassword visit(UsernamePassword accessKey) { + return accessKey; + } + }); + return (ESInitialization.create(this.getEndpoint(), auth.userName, auth.password, false, 300000, false, @@ -57,7 +69,7 @@ public final ESClient createESClient() { public static List filter(List descs) { return descs.stream().filter((desc) -> { - return desc instanceof UsernamePassword.DefaultDescriptor; + return desc instanceof UsernamePassword.DefaultDescriptor || desc instanceof NoneToken.DefaultDescriptor; }).collect(Collectors.toList()); } @@ -75,6 +87,9 @@ protected boolean verify(IControlMsgHandler msgHandler, Context context, PostFor JestResult result = es.jestClient.execute(hbuild.build()); if (!result.isSucceeded()) { msgHandler.addErrorMessage(context, result.getErrorMessage()); + } else { + msgHandler.addActionMessage(context + , "cluster '" + result.getValue("cluster_name") + "' is working,status:'" + result.getValue("status") + "'"); } return result.isSucceeded(); } catch (IOException e) { diff --git a/tis-datax/tis-datax-elasticsearch-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/DataXElasticsearchWriter.json b/tis-datax/tis-datax-elasticsearch-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/DataXElasticsearchWriter.json index 48b24a6ab..e7b8f8e44 100644 --- a/tis-datax/tis-datax-elasticsearch-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/DataXElasticsearchWriter.json +++ b/tis-datax/tis-datax-elasticsearch-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/DataXElasticsearchWriter.json @@ -6,7 +6,7 @@ "plugin": [ { "hetero": "params-cfg", - "descName": "httpToken" + "descName": "elasticToken" } ] } diff --git a/tis-datax/tis-datax-elasticsearch-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/elastic/ElasticEndpoint.json b/tis-datax/tis-datax-elasticsearch-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/elastic/ElasticEndpoint.json index e0951d20d..ab217a251 100644 --- a/tis-datax/tis-datax-elasticsearch-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/elastic/ElasticEndpoint.json +++ b/tis-datax/tis-datax-elasticsearch-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/elastic/ElasticEndpoint.json @@ -1,5 +1,6 @@ { - "authToken": { - "subDescEnumFilter": "return com.qlangtech.tis.plugin.datax.elastic.ElasticEndpoint.filter(desc);" - } + "authToken": { + "subDescEnumFilter": "return com.qlangtech.tis.plugin.datax.elastic.ElasticEndpoint.filter(desc);", + "dftVal": "none" + } } diff --git a/tis-datax/tis-datax-oss-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOssReader.java b/tis-datax/tis-datax-oss-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOssReader.java index c83d1521f..6dd79cf94 100644 --- a/tis-datax/tis-datax-oss-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOssReader.java +++ b/tis-datax/tis-datax-oss-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOssReader.java @@ -31,7 +31,11 @@ import com.qlangtech.tis.extension.Descriptor; import com.qlangtech.tis.extension.TISExtension; import com.qlangtech.tis.extension.impl.IOUtils; +import com.qlangtech.tis.plugin.AuthToken; import com.qlangtech.tis.plugin.HttpEndpoint; +import com.qlangtech.tis.plugin.aliyun.AccessKey; +import com.qlangtech.tis.plugin.aliyun.NoneToken; +import com.qlangtech.tis.plugin.aliyun.UsernamePassword; import com.qlangtech.tis.plugin.annotation.FormField; import com.qlangtech.tis.plugin.annotation.FormFieldType; import com.qlangtech.tis.plugin.annotation.Validator; @@ -159,6 +163,29 @@ public boolean isRdbms() { return false; } + + public boolean validateEndpoint(IFieldErrorHandler msgHandler, Context context, String fieldName, String endpoint) { + HttpEndpoint end = (HttpEndpoint) IHttpToken.getToken(endpoint); + return end.accept(new AuthToken.Visitor() { + @Override + public Boolean visit(NoneToken noneToken) { + Validator.require.validate(msgHandler, context, fieldName, null); + return false; + } + @Override + public Boolean visit(AccessKey accessKey) { + return true; + } + + @Override + public Boolean visit(UsernamePassword accessKey) { + msgHandler.addFieldError(context, fieldName, "不支持使用用户名/密码认证方式"); + return false; + } + }); + } + + public boolean validateFieldDelimiter(IFieldErrorHandler msgHandler, Context context, String fieldName, String value) { return validateFileDelimiter(msgHandler, context, fieldName, value); } @@ -196,10 +223,15 @@ public String getDisplayName() { public static boolean verifyFormOSSRelative(IControlMsgHandler msgHandler, Context context, Descriptor.PostFormVals postFormVals) { String endpoint = postFormVals.getField(FIELD_ENDPOINT); String bucket = postFormVals.getField(FIELD_BUCKET); - HttpEndpoint end = ParamsConfig.getItem(endpoint, HttpEndpoint.KEY_DISPLAY_NAME); - + HttpEndpoint end = (HttpEndpoint) IHttpToken.getToken(endpoint); try { - OSS ossClient = new OSSClientBuilder().build(end.getEndpoint(), end.getAccessKeyId(), end.getAccessKeySecret()); + OSS ossClient = end.accept(new AuthToken.Visitor() { + @Override + public OSS visit(AccessKey accessKey) { + return new OSSClientBuilder().build(end.getEndpoint(), accessKey.getAccessKeyId(), accessKey.getAccessKeySecret()); + } + }); + List buckets = ossClient.listBuckets(); if (buckets.size() < 1) { msgHandler.addErrorMessage(context, "buckets不能为空"); diff --git a/tis-incr/tis-sink-elasticsearch7-plugin/pom.xml b/tis-incr/tis-sink-elasticsearch7-plugin/pom.xml index 5cd7ee3f7..33f895bdf 100644 --- a/tis-incr/tis-sink-elasticsearch7-plugin/pom.xml +++ b/tis-incr/tis-sink-elasticsearch7-plugin/pom.xml @@ -51,11 +51,10 @@ tis-datax-elasticsearch-plugin ${project.version} - - - - - + + com.qlangtech.tis.plugins + tis-scala-compiler + com.qlangtech.tis.plugins tis-realtime-flink @@ -70,10 +69,6 @@ com.qlangtech.tis.plugins tis-incr-test - - com.qlangtech.tis.plugins - tis-scala-compiler - diff --git a/tis-incr/tis-sink-elasticsearch7-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/elasticsearch7/ElasticSearchSinkFactory.java b/tis-incr/tis-sink-elasticsearch7-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/elasticsearch7/ElasticSearchSinkFactory.java index d63bcd39d..75e451ae0 100644 --- a/tis-incr/tis-sink-elasticsearch7-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/elasticsearch7/ElasticSearchSinkFactory.java +++ b/tis-incr/tis-sink-elasticsearch7-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/elasticsearch7/ElasticSearchSinkFactory.java @@ -19,25 +19,30 @@ package com.qlangtech.tis.plugins.incr.flink.connector.elasticsearch7; +import com.google.common.collect.Sets; import com.qlangtech.org.apache.http.HttpHost; +import com.qlangtech.plugins.incr.flink.cdc.FlinkCol; import com.qlangtech.tis.compiler.incr.ICompileAndPackage; import com.qlangtech.tis.compiler.streamcode.CompileAndPackage; -import com.qlangtech.tis.config.aliyun.IHttpToken; import com.qlangtech.tis.datax.IDataxProcessor; import com.qlangtech.tis.datax.IDataxReader; import com.qlangtech.tis.datax.TableAlias; import com.qlangtech.tis.datax.impl.ESTableAlias; import com.qlangtech.tis.extension.TISExtension; +import com.qlangtech.tis.plugin.AuthToken; import com.qlangtech.tis.plugin.IEndTypeGetter; +import com.qlangtech.tis.plugin.aliyun.NoneToken; +import com.qlangtech.tis.plugin.aliyun.UsernamePassword; import com.qlangtech.tis.plugin.annotation.FormField; import com.qlangtech.tis.plugin.annotation.FormFieldType; import com.qlangtech.tis.plugin.annotation.Validator; import com.qlangtech.tis.plugin.datax.DataXElasticsearchWriter; +import com.qlangtech.tis.plugin.datax.elastic.ElasticEndpoint; import com.qlangtech.tis.plugin.ds.CMeta; import com.qlangtech.tis.plugin.ds.ISelectedTab; +import com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper; import com.qlangtech.tis.realtime.BasicTISSinkFactory; import com.qlangtech.tis.realtime.TabSinkFunc; -import com.qlangtech.tis.realtime.transfer.DTO; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.flink.annotation.Public; @@ -46,6 +51,9 @@ import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; @@ -64,7 +72,7 @@ * @create: 2021-09-28 19:45 **/ @Public -public class ElasticSearchSinkFactory extends BasicTISSinkFactory { +public class ElasticSearchSinkFactory extends BasicTISSinkFactory { public static final String DISPLAY_NAME_FLINK_CDC_SINK = "Flink-ElasticSearch-Sink"; private static final Logger logger = LoggerFactory.getLogger(ElasticSearchSinkFactory.class); private static final int DEFAULT_PARALLELISM = 1;// parallelism @@ -80,15 +88,17 @@ public class ElasticSearchSinkFactory extends BasicTISSinkFactory { @Override - public Map> createSinkFunction(IDataxProcessor dataxProcessor) { + public Map> createSinkFunction(IDataxProcessor dataxProcessor) { DataXElasticsearchWriter dataXWriter = (DataXElasticsearchWriter) dataxProcessor.getWriter(null); + + Objects.requireNonNull(dataXWriter, "dataXWriter can not be null"); - IHttpToken token = dataXWriter.getToken(); + ElasticEndpoint token = dataXWriter.getToken(); ESTableAlias esSchema = null; Optional first = dataxProcessor.getTabAlias().findFirst(); - if(first.isPresent()){ + if (first.isPresent()) { TableAlias value = first.get(); if (!(value instanceof ESTableAlias)) { throw new IllegalStateException("value must be type of 'ESTableAlias',but now is :" + value.getClass()); @@ -120,11 +130,20 @@ public Map> createSinkFunction(IDataxProcessor data List transportAddresses = new ArrayList<>(); transportAddresses.add(HttpHost.create(token.getEndpoint())); + ISelectedTab tab = null; + IDataxReader reader = dataxProcessor.getReader(null); + for (ISelectedTab selectedTab : reader.getSelectedTabs()) { + tab = selectedTab; + break; + } + Objects.requireNonNull(tab, "tab ca not be null"); + - ElasticsearchSink.Builder sinkBuilder + ElasticsearchSink.Builder sinkBuilder = new ElasticsearchSink.Builder<>(transportAddresses , new DefaultElasticsearchSinkFunction( cols.stream().map((c) -> c.getName()).collect(Collectors.toSet()) + , AbstractRowDataMapper.getAllTabColsMeta(tab.getCols()) , firstPK.get().getName() , dataXWriter.getIndexName())); @@ -141,21 +160,30 @@ public Map> createSinkFunction(IDataxProcessor data } sinkBuilder.setFailureHandler(new DefaultActionRequestFailureHandler()); - if (StringUtils.isNotEmpty(token.getAccessKeyId()) - || StringUtils.isNotEmpty(token.getAccessKeySecret())) { - // 如果用户设置了accessKey 或者accessSecret - sinkBuilder.setRestClientFactory(new TISElasticRestClientFactory(token.getAccessKeyId(), token.getAccessKeySecret())); - } + token.accept(new AuthToken.Visitor() { + @Override + public Void visit(NoneToken noneToken) { + return null; + } - TableAlias tableMapper = new TableAlias(); - tableMapper.setTo(dataXWriter.getIndexName()); - IDataxReader reader = dataxProcessor.getReader(null); - for (ISelectedTab selectedTab : reader.getSelectedTabs()) { - tableMapper.setFrom(selectedTab.getName()); - } - return Collections.singletonMap(tableMapper - , new DTOSinkFunc(tableMapper, sinkBuilder.build(), true, DEFAULT_PARALLELISM)); + @Override + public Void visit(UsernamePassword accessKey) { + sinkBuilder.setRestClientFactory(new TISElasticRestClientFactory(accessKey.userName, accessKey.password)); + return null; + } + }); + +// if (StringUtils.isNotEmpty(token.getAccessKeyId()) +// || StringUtils.isNotEmpty(token.getAccessKeySecret())) { +// // 如果用户设置了accessKey 或者accessSecret +// sinkBuilder.setRestClientFactory(new TISElasticRestClientFactory(token.getAccessKeyId(), token.getAccessKeySecret())); +// } + + + return Collections.singletonMap(esSchema + , new RowDataSinkFunc(esSchema, sinkBuilder.build() + , Collections.emptyList(), true, DEFAULT_PARALLELISM)); } private static class DefaultActionRequestFailureHandler implements ActionRequestFailureHandler, Serializable { @@ -168,15 +196,18 @@ public void onFailure(ActionRequest actionRequest, Throwable throwable, int rest @Override public ICompileAndPackage getCompileAndPackageManager() { - return new CompileAndPackage(); + return new CompileAndPackage(Sets.newHashSet(ElasticSearchSinkFactory.class)); } - private static class DefaultElasticsearchSinkFunction implements ElasticsearchSinkFunction, Serializable { + private static class DefaultElasticsearchSinkFunction implements ElasticsearchSinkFunction, Serializable { private final Set cols; private final String pkName; private final String targetIndexName; + private final List vgetters; - public DefaultElasticsearchSinkFunction(Set cols, String pkName, String targetIndexName) { + public DefaultElasticsearchSinkFunction(Set cols + , List vgetters, String pkName, String targetIndexName) { + this.vgetters = vgetters; this.cols = cols; this.pkName = pkName; this.targetIndexName = targetIndexName; @@ -185,20 +216,23 @@ public DefaultElasticsearchSinkFunction(Set cols, String pkName, String } } - private IndexRequest createIndexRequest(DTO element) { - Map after = element.getAfter(); + private IndexRequest createIndexRequest(RowData element) { Map json = new HashMap<>(); + Object val = null; - for (String col : cols) { - val = after.get(col); - if (val == null) { + for (FlinkCol get : vgetters) { + if (!cols.contains(get.name)) { continue; } - json.put(col, val); + val = get.getRowDataVal(element); + if (val instanceof DecimalData) { + val = ((DecimalData) val).toBigDecimal(); + } else if (val instanceof StringData) { + val = ((StringData) val).toString(); + } + json.put(get.name, val); } - - Object pkVal = after.get(pkName); - + Object pkVal = json.get(this.pkName); IndexRequest request = Requests.indexRequest() .index(this.targetIndexName) //.type("my-type") @@ -210,7 +244,7 @@ private IndexRequest createIndexRequest(DTO element) { } @Override - public void process(DTO element, RuntimeContext ctx, RequestIndexer indexer) { + public void process(RowData element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } }