Compare commits

...

2 Commits

Author SHA1 Message Date
Maksym Basiuk
4728be52a8 chore: patch release notes 2020-12-23 21:24:42 +02:00
Maksym Basiuk
22b8a39c46 chore(TDI-45260): patch 711 version of library 2020-12-23 20:38:57 +02:00
16 changed files with 57 additions and 3974 deletions

53
PATCH_RELEASE_NOTE.md Normal file
View File

@@ -0,0 +1,53 @@
---
version: 7.1.1
module: https://talend.poolparty.biz/coretaxonomy/42
product:
- https://talend.poolparty.biz/coretaxonomy/23
---
# TPS-4553
| Info | Value |
| ---------------- | ---------------- |
| Patch Name | Patch\_20201224\_TPS-4553\_v1-7.1.1 |
| Release Date | 2020-12-24 |
| Target Version | 20181026\_1147-V7.1.1 |
| Product affected | Talend Studio |
## Introduction
This is a self-contained patch.
**NOTE**: For information on how to obtain this patch, reach out to your Support contact at Talend.
## Fixed issues
This patch contains the following fixes:
- TPS-4553 [7.1.1] tFileInputPositional inappropriately handles CR(\r) as a row separator (TDI-45260)
## Prerequisites
Consider the following requirements for your system:
- Talend Studio 7.1.1 must be installed.
## Installation
- From the Talend Studio 7.1.1 installation folder, make a copy of the following files somewhere safe:
- {Talend_Studio_path}/plugins/org.talend.designer.components.localprovider_7.1.1.20181026_1147/components/tFileInputPositional/tFileInputPositional_begin.javajet
- {Talend_Studio_path}/plugins/org.talend.designer.components.localprovider_7.1.1.20181026_1147/components/tFileInputPositional/tFileInputPositional_java.xml
- {Talend_Studio_path}/configuration/.m2/repository/org/talend/components/lib/talend_file_enhanced/1.1-patch/talend_file_enhanced-1.1-patch.jar
- Unzip content of the patch zip onto your Talend Studio 7.1.1 folder.
## Uninstallation
- Replace the files overridden by the patch by the copy you made before unzipping.
## Affected files for this patch
The following files are installed by this patch:
- {Talend_Studio_path}/plugins/org.talend.designer.components.localprovider_7.1.1.20181026_1147/components/tFileInputPositional/tFileInputPositional_begin.javajet
- {Talend_Studio_path}/plugins/org.talend.designer.components.localprovider_7.1.1.20181026_1147/components/tFileInputPositional/tFileInputPositional_java.xml
- {Talend_Studio_path}/configuration/.m2/repository/org/talend/components/lib/talend_file_enhanced/1.1-patch/talend_file_enhanced-1.1-patch.jar

View File

@@ -4,7 +4,7 @@
<groupId>org.talend.components.lib</groupId>
<artifactId>talend_file_enhanced</artifactId>
<name>talend_file_enhanced</name>
<version>1.1</version>
<version>1.1-patch</version>
<properties>
<talend.nexus.url>https://artifacts-oss.talend.com</talend.nexus.url>

View File

@@ -1,54 +0,0 @@
package org.talend.fileprocess;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
public class CopyInputStreamUtil {
private InputStream _is;
private ByteArrayOutputStream _copy = new ByteArrayOutputStream();
/**
*
*/
public CopyInputStreamUtil(InputStream is) {
_is = is;
try {
copy();
} catch (IOException ex) {
// do nothing
}
}
private int copy() throws IOException {
int read = 0;
int chunk = 0;
byte[] data = new byte[256];
while (-1 != (chunk = _is.read(data))) {
read += data.length;
_copy.write(data, 0, chunk);
}
return read;
}
public InputStream getCopy() {
return (InputStream) new ByteArrayInputStream(_copy.toByteArray());
}
public static void main(String[] args) throws IOException {
FileInputStream in = new FileInputStream("C:/Documents and Settings/Administrator/Test.txt");
CopyInputStreamUtil util = new CopyInputStreamUtil(in);
InputStream copy = util.getCopy();
InputStream copy2 = util.getCopy();
System.out.println((char) copy.read());
copy.close();
System.out.println((char) copy2.read());
}
}

View File

@@ -1,391 +0,0 @@
// ============================================================================
//
// Talend Community Edition
//
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
//
// ============================================================================
package org.talend.fileprocess;
import java.io.IOException;
import java.io.StringReader;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
/**
* FileInputDelimited is dedicated to Talend's tFileInputDelimited component. It wraps all parameters in
* tFileInputDelimted, so it makes the generated code much easier and cleaner. This class is not recommended to use in
* other circumstance.<br/>
*
* @author gke
*/
public class FileInputDelimited {
private TOSDelimitedReader delimitedDataReader = null;
Iterator<Integer> random = null;
private long loopCount = 0;
private boolean randomSwitch = false;
private int current = 0;
private boolean countNeedAdjust = false;
/**
* This constructor is only for compatibility with the old usecase.(Before add the function support split Record.)
*
*/
public FileInputDelimited(String file, String encoding, String fieldSeparator, String rowSeparator, boolean skipEmptyRow,
int header, int footer, int limit, int random) throws IOException {
this(file, encoding, fieldSeparator, rowSeparator, skipEmptyRow, header, footer, limit, random, false);
}
/**
* This constructor is only for compatibility with the old usecase.(Before add the function support split Record.)
*
*/
public FileInputDelimited(java.io.InputStream is, String encoding, String fieldSeparator, String rowSeparator,
boolean skipEmptyRow, int header, int footer, int limit, int random) throws IOException {
this(is, encoding, fieldSeparator, rowSeparator, skipEmptyRow, header, footer, limit, random, false);
}
/**
* The constructor's parameter wraps all parameters' value, and a pretreatment was made according the value of
* header, footer, limit and random.
*
* @param is
* @param encoding
* @param fieldSeparator
* @param rowSeparator
* @param skipEmptyRow
* @param header
* @param footer
* @param limit
* @param random
* @throws IOException
*/
public FileInputDelimited(java.io.InputStream is, String encoding, String fieldSeparator, String rowSeparator,
boolean skipEmptyRow, int header, int footer, int limit, int random, boolean splitRecord) throws IOException {
if (header < 0) {
header = 0;
}
if (footer < 0) {
footer = 0;
}
if (random != 0 && limit != 0) {
this.delimitedDataReader = new TOSDelimitedReader(is, encoding, fieldSeparator, rowSeparator, skipEmptyRow);
this.delimitedDataReader.setSplitRecord(splitRecord);
this.delimitedDataReader.skipHeaders(header);
if (random < 0 && footer == 0) {
if (limit > 0) {
this.loopCount = limit;
} else {
this.loopCount = -1;
}
this.countNeedAdjust = true;
} else {
// for stream,not support the footer
int count = (int) this.delimitedDataReader.getAvailableRowCount(footer);
this.delimitedDataReader.close();
this.delimitedDataReader = new TOSDelimitedReader(is, encoding, fieldSeparator, rowSeparator, skipEmptyRow);
this.delimitedDataReader.setSplitRecord(splitRecord);
this.delimitedDataReader.skipHeaders(header);
if (limit > 0 && random < 0) {
this.loopCount = limit < count ? limit : count;
} else if (limit < 0 && random > 0) {
if (random >= count) {
this.loopCount = count;
} else {
setRandoms(random, count);
this.loopCount = random;
}
} else if (limit > 0 && random > 0) {
if (random >= limit) {
random = limit;
}
if (random >= count) {
this.loopCount = count;
} else {
setRandoms(random, count);
this.loopCount = random;
}
} else {
this.loopCount = count;
}
}
} else {
loopCount = 0;
}
}
/**
* The constructor's parameter wraps all parameters' value, and a pretreatment was made according the value of
* header, footer, limit and random.
*
* @param file
* @param encoding
* @param fieldSeparator
* @param rowSeparator
* @param skipEmptyRow
* @param header
* @param footer
* @param limit
* @param random
* @throws IOException
*/
public FileInputDelimited(String file, String encoding, String fieldSeparator, String rowSeparator, boolean skipEmptyRow,
int header, int footer, int limit, int random, boolean splitRecord) throws IOException {
if (header < 0) {
header = 0;
}
if (footer < 0) {
footer = 0;
}
if (random != 0 && limit != 0) {
this.delimitedDataReader = new TOSDelimitedReader(file, encoding, fieldSeparator, rowSeparator, skipEmptyRow);
this.delimitedDataReader.setSplitRecord(splitRecord);
this.delimitedDataReader.skipHeaders(header);
if (random < 0 && footer == 0) {
if (limit > 0) {
this.loopCount = limit;
} else {
this.loopCount = -1;
}
this.countNeedAdjust = true;
} else {
int count = (int) this.delimitedDataReader.getAvailableRowCount(footer);
this.delimitedDataReader.close();
this.delimitedDataReader = new TOSDelimitedReader(file, encoding, fieldSeparator, rowSeparator, skipEmptyRow);
this.delimitedDataReader.setSplitRecord(splitRecord);
this.delimitedDataReader.skipHeaders(header);
if (limit > 0 && random < 0) {
this.loopCount = limit < count ? limit : count;
} else if (limit < 0 && random > 0) {
if (random >= count) {
this.loopCount = count;
} else {
setRandoms(random, count);
this.loopCount = random;
}
} else if (limit > 0 && random > 0) {
if (random >= limit) {
random = limit;
}
if (random >= count) {
this.loopCount = count;
} else {
setRandoms(random, count);
this.loopCount = random;
}
} else {
this.loopCount = count;
}
}
} else {
loopCount = 0;
}
}
/**
* In order to support InGest to parse String as content directly
*
* @param content
* @param fieldSeparator
* @param rowSeparator
* @param skipEmptyRow
* @param header
* @param footer
* @param limit
* @param random
* @throws IOException
*/
public FileInputDelimited(String content, String fieldSeparator, String rowSeparator, boolean skipEmptyRow, int header,
int footer, int limit, int random, boolean splitRecord) throws IOException {
if (header < 0) {
header = 0;
}
if (footer < 0) {
footer = 0;
}
if (random != 0 && limit != 0) {
StringReader stringReaderOne = new StringReader(content);
this.delimitedDataReader = new TOSDelimitedReader(stringReaderOne, fieldSeparator, rowSeparator, skipEmptyRow);
this.delimitedDataReader.setSplitRecord(splitRecord);
this.delimitedDataReader.skipHeaders(header);
if (random < 0 && footer == 0) {
if (limit > 0) {
this.loopCount = limit;
} else {
this.loopCount = -1;
}
this.countNeedAdjust = true;
} else {
int count = (int) this.delimitedDataReader.getAvailableRowCount(footer);
this.delimitedDataReader.close();
StringReader stringReaderTwo = new StringReader(content);
this.delimitedDataReader = new TOSDelimitedReader(stringReaderTwo, fieldSeparator, rowSeparator, skipEmptyRow);
this.delimitedDataReader.setSplitRecord(splitRecord);
this.delimitedDataReader.skipHeaders(header);
if (limit > 0 && random < 0) {
this.loopCount = limit < count ? limit : count;
} else if (limit < 0 && random > 0) {
if (random >= count) {
this.loopCount = count;
} else {
setRandoms(random, count);
this.loopCount = random;
}
} else if (limit > 0 && random > 0) {
if (random >= limit) {
random = limit;
}
if (random >= count) {
this.loopCount = count;
} else {
setRandoms(random, count);
this.loopCount = random;
}
} else {
this.loopCount = count;
}
}
} else {
loopCount = 0;
}
}
private void setRandoms(int random, int count) {
this.randomSwitch = true;
Set<Integer> ranSet = new java.util.TreeSet<Integer>();
Random ran = new java.util.Random();
while (ranSet.size() < random) {
ranSet.add(ran.nextInt(count));
}
this.random = ranSet.iterator();
}
/**
* Skip to the next record, get ready before get new record's value.
*
* @return whether a next record is available.
* @throws IOException
*/
public boolean nextRecord() throws IOException {
if (this.countNeedAdjust) {
if (this.delimitedDataReader.getProcessedRecordCount() == loopCount) {
return false;
}
return this.delimitedDataReader.readRecord();
} else {
if (++current > loopCount) {
return false;
}
if (randomSwitch) {
if (!this.random.hasNext()) {
return false;
}
int index = this.random.next();
do {
if (!this.delimitedDataReader.readRecord()) {
return false;
}
} while (this.delimitedDataReader.getProcessedRecordCount() <= index);
return true;
} else {
return this.delimitedDataReader.readRecord();
}
}
}
/**
* See DelimitedDataReader.get(columnIndex)
*
* @param columnIndex
* @return
* @throws IOException
*/
public String get(int columnIndex) throws IOException {
return this.delimitedDataReader.get(columnIndex);
}
/**
* Close the delimitedDataReader if delimitedDataReader is not null.
*/
public void close() {
if (this.delimitedDataReader != null) {
this.delimitedDataReader.close();
}
}
/**
* @deprecated use getLongRowNumber instead of this
*
* @return number of rows get by tFileInputDelimited
*/
@Deprecated
public int getRowNumber() {
if (this.countNeedAdjust) {
return (int) this.delimitedDataReader.getProcessedRecordCount();
} else {
return (int) this.loopCount;
}
}
public long getLongRowNumber() {
if (this.countNeedAdjust) {
return this.delimitedDataReader.getProcessedRecordCount();
} else {
return this.loopCount;
}
}
public int getColumnsCountOfCurrentRow() throws IOException {
return this.delimitedDataReader.getAvailableColumnsCount();
}
/**
* check the first limit number of records, and fetch the max columns, this is only for GUI wizard.
*
*/
public static int getMaxColumnCount(String fileName, String encoding, String fieldDelimiter, String recordDelimiter,
boolean needSkipEmptyRecord, boolean splitRecord, int headerRows, int limit) throws IOException {
TOSDelimitedReader tosDelimitedReader = new TOSDelimitedReader(fileName, encoding, fieldDelimiter, recordDelimiter,
needSkipEmptyRecord);
tosDelimitedReader.setSplitRecord(splitRecord);
tosDelimitedReader.skipHeaders(headerRows);
int result = 0;
for (int i = 0; i < limit && tosDelimitedReader.readRecord(); i++) {
int currentColumnsCount = tosDelimitedReader.getAvailableColumnsCount();
if (currentColumnsCount > result) {
result = currentColumnsCount;
}
}
tosDelimitedReader.close();
return result;
}
}

View File

@@ -1,815 +0,0 @@
// ============================================================================
//
// Talend Community Edition
//
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
//
// ============================================================================
package org.talend.fileprocess;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
/**
*
* @author xtan
*
*/
public class TOSDelimitedReader {
private boolean debug = false;
/* --------------1------------------- */
private BufferedReader inputStream = null;
private StreamBuffer streamBuffer = null;
private ColumnBuffer4Joiner columnBuffer = null;
/* --------------2------------------- */
private String[] values = new String[StaticSettings.INITIAL_COLUMN_COUNT];
private int columnsCount = 0;
private long currentRecord = 0;
private boolean skipEmptyRecord = false;
/* --------------3------------------- */
private boolean hasReadRecord = false;
private boolean autoReallocateForHuge = true;
private boolean initialized = false;
private boolean closed = false;
/* --------------4------------------- */
public boolean splitRecord = false;
/* --------------5------------------- */
private int header = 0;
/*--------------6--------------------- */
public TOSDelimitedReader(java.io.InputStream is, String encoding, String fieldDelimiter, String recordDelimiter,
boolean needSkipEmptyRecord) throws IOException {
UnicodeReader inputStreamReader = new UnicodeReader(is, encoding);
init(inputStreamReader, fieldDelimiter, recordDelimiter, needSkipEmptyRecord);
}
public TOSDelimitedReader(String fileName, String encoding, String fieldDelimiter, String recordDelimiter,
boolean needSkipEmptyRecord) throws IOException {
if (fileName == null || fieldDelimiter == null || recordDelimiter == null) {
throw new IllegalArgumentException("Parameter can't be null.");
}
UnicodeReader inputStreamReader = new UnicodeReader(new FileInputStream(fileName), encoding);
init(inputStreamReader, fieldDelimiter, recordDelimiter, needSkipEmptyRecord);
}
public TOSDelimitedReader(Reader reader, String fieldDelimiter, String recordDelimiter, boolean needSkipEmptyRecord)
throws IOException {
if (reader == null || fieldDelimiter == null || recordDelimiter == null) {
throw new IllegalArgumentException("Parameter can't be null.");
}
init(reader, fieldDelimiter, recordDelimiter, needSkipEmptyRecord);
}
public void init(Reader reader, String fieldDelimiter, String recordDelimiter, boolean needSkipEmptyRecord)
throws IOException {
if (reader == null || fieldDelimiter == null || recordDelimiter == null) {
throw new IllegalArgumentException("Parameter can't be null.");
}
inputStream = new BufferedReader(reader);
columnBuffer = new ColumnBuffer4Joiner();
streamBuffer = new StreamBuffer(fieldDelimiter, recordDelimiter);
skipEmptyRecord = needSkipEmptyRecord;
initialized = true;
}
public boolean readRecord() throws IOException {
if (splitRecord) {
return readRecord_SplitRecord();
} else {
return readRecord_SplitField();
}
}
private boolean readRecord_SplitField() throws IOException {
checkClosed();
boolean in = false;
hasReadRecord = false;
columnsCount = 0;
streamBuffer.columnStart = streamBuffer.currentPosition;
while (streamBuffer.hasMoreData() && !hasReadRecord) {
if (debug) {
System.out.println(streamBuffer);
}
in = true;
if (streamBuffer.needJoinReadNextBuffer()) {
joinAndRead();
}
if (streamBuffer.isStartFieldDelimited()) {
endColumn();
streamBuffer.skipFieldDelimiter();
} else if (streamBuffer.isStartRecordDelimited()) {
endColumn();
endRecord();
if (!streamBuffer.hasMoreData()) {
streamBuffer.fileEndWithRecordDelimiter = true;
}
} else {
streamBuffer.currentPosition++;
// for checking one column can support the max number of chars
if (!autoReallocateForHuge
&& streamBuffer.currentPosition - streamBuffer.columnStart + columnBuffer.position > StaticSettings.MAX_CHARS_IN_ONE_COLUMN) {
close();
throw new IOException("The maximum chars of one column is " + StaticSettings.MAX_CHARS_IN_ONE_COLUMN
+ ". There is over this limit.");
}
}
}
if (!hasReadRecord) {
if (!StaticSettings.ignoreFileEndWithOneRecordDelimiter && streamBuffer.fileEndWithRecordDelimiter) {// aaa;
// bbb
// #111
// ;
// 222#
streamBuffer.fileEndWithRecordDelimiter = false;
endColumn();
endRecord();
} else if (in) {// aaa;bbb#111;222
endColumn();
endRecord();
}
}
return hasReadRecord;
}
private boolean readRecord_SplitRecord() throws IOException {
checkClosed();
boolean in = false;
hasReadRecord = false;
columnsCount = 0;
streamBuffer.columnStart = streamBuffer.currentPosition;
while (streamBuffer.hasMoreData() && !hasReadRecord) {
if (debug) {
System.out.println(streamBuffer);
}
in = true;
if (streamBuffer.needJoinReadNextBuffer()) {
joinAndRead();
}
if (streamBuffer.isStartRecordDelimited()) {
endColumn();
endRecord();
if (!streamBuffer.hasMoreData()) {
streamBuffer.fileEndWithRecordDelimiter = true;
}
} else if (streamBuffer.isStartFieldDelimited()) {
endColumn();
streamBuffer.skipFieldDelimiter();
} else {
streamBuffer.currentPosition++;
// for checking one column can support the max number of chars
if (!autoReallocateForHuge
&& streamBuffer.currentPosition - streamBuffer.columnStart + columnBuffer.position > StaticSettings.MAX_CHARS_IN_ONE_COLUMN) {
close();
throw new IOException("The maximum chars of one column is " + StaticSettings.MAX_CHARS_IN_ONE_COLUMN
+ ". There is over this limit.");
}
}
}
if (!hasReadRecord) {
if (!StaticSettings.ignoreFileEndWithOneRecordDelimiter && streamBuffer.fileEndWithRecordDelimiter) {// aaa;
// bbb
// #111
// ;
// 222#
streamBuffer.fileEndWithRecordDelimiter = false;
endColumn();
endRecord();
} else if (in) {// aaa;bbb#111;222
endColumn();
endRecord();
}
}
return hasReadRecord;
}
private void joinAndRead() throws IOException {
columnBuffer.saveCharInJoiner();
streamBuffer.joinReadNextBuffer();
}
private void endRecord() throws IOException {
streamBuffer.skipRecordDelimiter();
if(this.header>0){
this.header--;
}else if (skipEmptyRecord) {
if (columnsCount == 0 || (columnsCount == 1 && values[0].equals(""))) {
columnsCount = 0;// reset the columnsCount = 0 is a must
streamBuffer.columnStart = streamBuffer.currentPosition;
return;
}
}
// this flag is used as a loop exit condition during parsing
hasReadRecord = true;
currentRecord++;
}
/**
* @exception IOException Thrown if a very rare extreme exception occurs during parsing, normally resulting from
* improper data format.
*/
private void endColumn() throws IOException {
String currentValue = "";
if (columnBuffer.position == 0) {
currentValue = new String(streamBuffer.buffer, streamBuffer.columnStart, streamBuffer.currentPosition
- streamBuffer.columnStart);
} else {
// add the areadly datas in buffer
columnBuffer.saveCharInJoiner();
currentValue = new String(columnBuffer.buffer, 0, columnBuffer.position);
}
columnBuffer.position = 0;
// for checking one record can support the max number of columns
if (!autoReallocateForHuge && columnsCount >= StaticSettings.MAX_COLUMNS_IN_ONE_RECORD) {
close();
throw new IOException("The maximum column number of one record is " + StaticSettings.MAX_COLUMNS_IN_ONE_RECORD
+ ". There is over this limit.");
}
if (columnsCount == values.length) {
int newLength = values.length * 2;
String[] holder = new String[newLength];
System.arraycopy(values, 0, holder, 0, values.length);
values = holder;
}
values[columnsCount] = currentValue;
columnsCount++;
}
private void close(boolean closing) {
if (!closed) {
if (closing) {
streamBuffer.buffer = null;
columnBuffer.buffer = null;
}
try {
if (initialized) {
inputStream.close();
}
} catch (Exception e) {
// just ignore the exception
}
inputStream = null;
closed = true;
}
}
/**
* when read a new record or get content of the column, there should check the stream first.
*/
private void checkClosed() throws IOException {
if (closed) {
throw new IOException("This instance of the DelimitedFileReader class has already been closed.");
}
}
/**
* get one column value with a given column index for the current record.
*/
public String get(int columnIndex) throws IOException {
checkClosed();
if (columnIndex > -1 && columnIndex < columnsCount) {
return values[columnIndex];
} else {
return "";
}
}
public String getRowRecord() {
// here can fast for the common record seperator function with fieldDelimiter=""
// becase there is only one field.
if (columnsCount == 1) {
return values[0];
}
StringBuilder sb = new StringBuilder();
for (int i = 0; i < columnsCount; i++) {
if (i > 0) {
sb.append(streamBuffer.fieldDelimiter);// add the fieldDelimiter
}
sb.append(values[i]);
}
return sb.toString();
}
/**
* get the result how many records have been read.
*/
public long getProcessedRecordCount() {
return currentRecord;
}
/**
* <p>
* when skip the some records of the beginning and some records of the end in the file, then compute how many
* records are there.
* </p>
* <p>
* it can use after the skipHeaders();
* </p>
*/
public long getAvailableRowCount(int footer) throws IOException {
checkClosed();
boolean flag = true;
do {
flag = readRecord();
} while (flag);
return currentRecord - footer;
}
public int getAvailableColumnsCount() throws IOException {
return columnsCount;
}
/**
* skip the some records of the beginning in the file, and set the "currentRecord = 0"
*
*
*/
public void skipHeaders(int header) throws IOException {
this.header = header;
checkClosed();
if (header <= 0) {
return;
}
for (int i = 0; i < header; i++) {
readRecord();
}
currentRecord = 0;
}
/**
* <p>
* the default limit is:
* </p>
* <li>public static final int MAX_CHARS_IN_ONE_COLUMN = 100000;</li> <li>public static final int
* MAX_COLUMNS_IN_ONE_RECORD = 100000;</li>
*
*/
public void setAutoReallocateForHuge(boolean autoReallocateForHuge) {
this.autoReallocateForHuge = autoReallocateForHuge;
}
/**
* <p>
* if the input datas like this: 111;222;#aaa;bbb;# (row separator: ;# field separator: ; )
* </p>
* <li>if FirstSplit_RecordSeparator, there will get (2 records, 2 columns)</li> <li>if FirstSplit_FieldSeparator,
* there will get (2 records, 3 columns)</li>
* <p>
* The default value is false, it means split the field first.
* </p>
*/
public void setSplitRecord(boolean splitRecord) {
this.splitRecord = splitRecord;
}
/**
* Closes and releases all related resources.
*/
public void close() {
if (!closed) {
close(true);
closed = true;
}
}
@Override
protected void finalize() {
close(false);
}
/**
* a buffer: save the end chars of the last buffer and begin chars of the current buffer in the memory
*
* @author xtan
*
*/
private class ColumnBuffer4Joiner {
char[] buffer;
int position;
public ColumnBuffer4Joiner() {
buffer = new char[StaticSettings.INITIAL_COLUMN_BUFFER_SIZE];
position = 0;
}
/**
* join TWO buffer together, save the end chars of the last buffer and begin chars of the current buffer in one
* place
*/
public void saveCharInJoiner() {
int columnBufferBlankSpaceNum = columnBuffer.buffer.length - columnBuffer.position;
int streamBufferFieldCharNum = 0;
if (streamBuffer.count > 0) {// a must
streamBufferFieldCharNum = streamBuffer.currentPosition - streamBuffer.columnStart;
}
// check and expand more memory for buffer
if (columnBufferBlankSpaceNum < streamBufferFieldCharNum) {
int newLength = columnBuffer.buffer.length + Math.max(streamBufferFieldCharNum, columnBuffer.buffer.length);
char[] holder = new char[newLength];
System.arraycopy(columnBuffer.buffer, 0, holder, 0, columnBuffer.position);
columnBuffer.buffer = holder;
}
// copy datas from streamBuffer to columnBuffer for save it temporarily
System.arraycopy(streamBuffer.buffer, streamBuffer.columnStart, columnBuffer.buffer, columnBuffer.position,
streamBufferFieldCharNum);
columnBuffer.position += streamBufferFieldCharNum;
}
}
/**
* <b> a buffer with funtion: join the last data and read the next buffer, for supporting the multi-separator</b>
* Notice: "count, currentPosition, currentPosition, columnStart", they are import here
*
* @author xtan
*/
private class StreamBuffer {
public boolean needJoinReadNextBuffer() {
// 5-2=3, when currentPostion=3; or 5-0=5, when currentPosition=5, and file not end, so read next buffer
return currentPosition >= lastIndexToRead && !streamEndMeet;
/* notice: here is >=, not > */
}
// when count=5, currentPosition=4 ===> hasMoreData()=true, it mean buffer[4] still need process
public boolean hasMoreData() {
return !streamEndMeet || currentPosition < count;
}
private void moveTailToHead() {
count = count - currentPosition;
for (int i = 0; i < count; i++) {
buffer[i] = buffer[currentPosition + i];
}
lastIndexToRead = count - maxLimit;
currentPosition = 0;
columnStart = 0;
}
public void joinReadNextBuffer() throws IOException {
moveTailToHead();
int readCount = 0;
int maxReadLength = buffer.length - count;
try {
readCount = inputStream.read(buffer, count, maxReadLength);
} catch (IOException ex) {
close();
throw ex;
}
if (debug) {
System.out.println("##maxReadLength=" + maxReadLength + " newReadCount=" + readCount);
System.out.println(streamBuffer);
}
setStreamEnd(readCount, maxReadLength);
if (readCount > -1) {
count += readCount;
lastIndexToRead = count - maxLimit;
}
}
private void setStreamEnd(int readCount, int maxReadLength) throws IOException {
/* @see bug:http://talendforge.org/bugs/view.php?id=4554 */
//https://jira.talendforge.org/browse/TDI-44745
if (readCount < maxReadLength) {
if (readCount == -1) {
streamEndMeet = true;
} else {
if(inputStream.markSupported()) {
inputStream.mark(1);
if(inputStream.read() == -1) {
streamEndMeet = true;
}
inputStream.reset();
}
}
}
}
/* --------------1------------------- */
private char[] buffer;
private char[] fieldDelimiter;
private char[] recordDelimiter;
// aaa;bbb#111;222#
private boolean fileEndWithRecordDelimiter = false;
/* --------------2------------------- */
private boolean lineFeedAll = false;
// if LineMode.LINEFEED_ALL, if "\r\n", variableLineFeed = 2, if "\r" or "\n", variableLineFeed = 1
// only used to adjust the skipRecordDelimiter()
private int variableLineFeed = 0;
/* --------------3------------------- */
private int maxLimit;// maxLimit = Math.max(fieldDelimiter.length, recordDelimiter.length);
private int currentPosition;
// for join read
private int lastIndexToRead;// lastIndexToRead = count - maxLimit, special: 5 = 5 - 0
private int count;
private int columnStart;
/* --------------4------------------- */
// end of the file
private boolean streamEndMeet = false;
public StreamBuffer(String fieldDelimiterPara, String recordDelimiterPara) throws IOException {
buffer = new char[StaticSettings.MAX_BUFFER_SIZE];
if (recordDelimiterPara.equals("\n")) {
if (StaticSettings.lineMode == LineMode.LINEFEED_ALL) {
// notice: here we set it "\r\n", neither "\n" nor "\r", becase there want max length
recordDelimiter = "\r\n".toCharArray();
lineFeedAll = true;
} else if (StaticSettings.lineMode == LineMode.LINEFEED_JRE) {
String lineSeparator = (String) java.security.AccessController
.doPrivileged(new sun.security.action.GetPropertyAction("line.separator"));
recordDelimiter = lineSeparator.toCharArray();
} else {
recordDelimiter = recordDelimiterPara.toCharArray();
}
} else {
recordDelimiter = recordDelimiterPara.toCharArray();
}
fieldDelimiter = fieldDelimiterPara.toCharArray();
maxLimit = Math.max(fieldDelimiter.length, recordDelimiter.length);
// read one buffer datas first when initial
try {
count = inputStream.read(buffer, 0, buffer.length);
} catch (IOException ex) {
close();
throw ex;
}
currentPosition = 0;
columnStart = 0;
lastIndexToRead = count - maxLimit;
setStreamEnd(count, buffer.length);
}
public boolean isStartFieldDelimited() {
int maxLengthCanCheck = count - currentPosition;
if (fieldDelimiter.length != 0 && fieldDelimiter.length <= maxLengthCanCheck) {// test here is a must
for (int i = 0; i < fieldDelimiter.length; i++) {
if (buffer[currentPosition + i] != fieldDelimiter[i]) {
return false;
}
}
} else {
return false;
}
return true;
}
public boolean isStartRecordDelimited() {
int maxLengthCanCheck = count - currentPosition;
if (lineFeedAll) {// maxLengthCanCheck > 0 is must
if (maxLengthCanCheck > 0) {
if (buffer[currentPosition] == '\n') {
variableLineFeed = 1;
return true;
} else if (buffer[currentPosition] == '\r') {
variableLineFeed = 1;
if (buffer[currentPosition + 1] == '\n' && maxLengthCanCheck > 1) {
variableLineFeed = 2;
}
return true;
}
}
return false;
} else {
if (recordDelimiter.length != 0 && recordDelimiter.length <= maxLengthCanCheck) {// test here is a
// must
for (int i = 0; i < recordDelimiter.length; i++) {
if (buffer[currentPosition + i] != recordDelimiter[i]) {
return false;
}
}
} else {
return false;
}
}
return true;
}
public void skipFieldDelimiter() {
currentPosition += fieldDelimiter.length;
columnStart = currentPosition;// means: start a new Column
}
public void skipRecordDelimiter() {
if (lineFeedAll) {
currentPosition += variableLineFeed;
} else {
currentPosition += recordDelimiter.length;
}
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("count=").append(count).append("\n");
sb.append("maxLimit=").append(maxLimit).append("\n");
sb.append("lastIndexToRead=").append(lastIndexToRead).append("\n");
sb.append("currentPosition=").append(currentPosition).append("\n");
sb.append("columnStart=").append(columnStart).append("\n");
sb.append("streamEndMeet=").append(streamEndMeet).append("\n");
sb.append("overMaxLimit()=").append(needJoinReadNextBuffer()).append("\n");
sb.append("hasMoreData()=").append(hasMoreData()).append("\n");
sb.append("char[]=").append(buffer).append("\n");
sb.append("char[").append(currentPosition).append("]=").append(buffer[currentPosition]).append("\n");
sb.append("fieldDelimiterLength=").append(fieldDelimiter.length).append("\n");
sb.append("recordDelimiterLength=").append(recordDelimiter.length).append("\n");
return sb.toString();
}
}
public enum LineMode {
// it means, if input recordDelimiter is "\n",
// if JRE on windows, it will be "\r\n",
// if JRE on linux, it will be "\n",
// if JRE on Mac, it will be "\r"
LINEFEED_JRE,
// it means, if input recordDelimiter is "\n", it will treat all the "\r", "\n", "\r\n" as recordDelimiter
LINEFEED_ALL,
// it means, if input recordDelimiter is "\n", just treat it as "\n"
LINEFEED_NORMAL;
}
private enum SplitWay {
FIRSTSPLIT_RECORDSEPARATOR,
FIRSTSPLIT_FIELDSEPARATOR;
}
/**
* StaticSettings for the DelimitedDataReader. they can be changed in unit test.
*/
private static class StaticSettings {
public static final int MAX_BUFFER_SIZE = 1024;
public static final int INITIAL_COLUMN_COUNT = 10;
public static final int INITIAL_COLUMN_BUFFER_SIZE = 50;
public static final int MAX_CHARS_IN_ONE_COLUMN = 100000;
public static final int MAX_COLUMNS_IN_ONE_RECORD = 100000;
// how do we process the "\n" as recordDelimiter
public static final LineMode lineMode = LineMode.LINEFEED_ALL;
// 1. how do we process the case, file end with one recordDelimiter, like the following case, 2 records or 3
// records?aaa;bbb#111;222#
// 2. notice: there only process the last one recordDelimiter, if this case: aaa;bbb#111;222####, we still focus
// on the last recordDelimiter
public static final boolean ignoreFileEndWithOneRecordDelimiter = true;
// public static final boolean autoReallocateForHuge = true;
}
public static void main(String[] args) throws IOException {
TOSDelimitedReader fid = new TOSDelimitedReader("D:\\talend\\talendFID\\in.csv", "ISO-8859-15", "", "\n", false);
int rowNum = 0;
while (fid.readRecord()) {
System.out.println("*********Row" + rowNum + "***********");
System.out.println("------Row------\n" + fid.getRowRecord());
int fieldNum = fid.getAvailableColumnsCount();
for (int k = 0; k < fieldNum; k++) {
System.out.println("------" + k + "------\n" + fid.get(k));
}
rowNum++;
System.out.println("\n\n");
}
}
}

View File

@@ -1,74 +0,0 @@
package org.talend.fileprocess;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PushbackInputStream;
import java.io.Reader;
public class UnicodeReader extends Reader {
private static final int BOM_SIZE = 4;
private final InputStreamReader reader;
/**
* Construct UnicodeReader
* @param in Input stream.
* @param defaultEncoding Default encoding to be used if BOM is not found,
* or <code>null</code> to use system default encoding.
* @throws IOException If an I/O error occurs.
*/
public UnicodeReader(InputStream in, String defaultEncoding) throws IOException {
byte bom[] = new byte[BOM_SIZE];
String encoding;
int unread;
PushbackInputStream pushbackStream = new PushbackInputStream(in, BOM_SIZE);
int n = pushbackStream.read(bom, 0, bom.length);
// Read ahead four bytes and check for BOM marks.
if ((bom[0] == (byte) 0xEF) && (bom[1] == (byte) 0xBB) && (bom[2] == (byte) 0xBF)) {
encoding = "UTF-8";
unread = n - 3;
} else if ((bom[0] == (byte) 0xFE) && (bom[1] == (byte) 0xFF)) {
encoding = "UTF-16BE";
unread = n - 2;
} else if ((bom[0] == (byte) 0xFF) && (bom[1] == (byte) 0xFE) && (bom[2] == (byte) 0x00) && (bom[3] == (byte) 0x00)) {
encoding = "UTF-32LE";
unread = n - 4;
} else if ((bom[0] == (byte) 0xFF) && (bom[1] == (byte) 0xFE)) {
encoding = "UTF-16LE";
unread = n - 2;
} else if ((bom[0] == (byte) 0x00) && (bom[1] == (byte) 0x00) && (bom[2] == (byte) 0xFE) && (bom[3] == (byte) 0xFF)) {
encoding = "UTF-32BE";
unread = n - 4;
} else {
encoding = defaultEncoding;
unread = n;
}
// Unread bytes if necessary and skip BOM marks.
if (unread > 0) {
pushbackStream.unread(bom, (n - unread), unread);
} else if (unread < -1) {
pushbackStream.unread(bom, 0, 0);
}
// Use given encoding.
if (encoding == null) {
reader = new InputStreamReader(pushbackStream);
} else {
reader = new InputStreamReader(pushbackStream, encoding);
}
}
public String getEncoding() {
return reader.getEncoding();
}
public int read(char[] cbuf, int off, int len) throws IOException {
return reader.read(cbuf, off, len);
}
public void close() throws IOException {
reader.close();
}
}

View File

@@ -1,412 +0,0 @@
// ============================================================================
//
// Talend Community Edition
//
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
//
// ============================================================================
package org.talend.fileprocess.delimited;
import java.io.BufferedReader;
import java.io.IOException;
import java.text.NumberFormat;
/**
* A stream based parser for parsing delimited text data from a reader. this
* class only works when the record delimiter is a String which length is > 1,
* and the field delimiter is a single character.<br/>
*
* @author gke
*
*/
final class ComplexDelimitedDataReader1 extends DelimitedDataReader {
private StreamBuffer streamBuffer = new StreamBuffer();
private char fieldDelimiter;
private char[] recordDelimiter;
public ComplexDelimitedDataReader1(BufferedReader inputStream, char delimiter,
String recordDelimiter, boolean skipEmptyRecords)
throws IOException {
super(inputStream, skipEmptyRecords);
this.fieldDelimiter = delimiter;
this.recordDelimiter = recordDelimiter.toCharArray();
streamBuffer.recordDelimiterLength = this.recordDelimiter.length;
try {
streamBuffer.count = inputStream.read(streamBuffer.buffer, 0,
streamBuffer.buffer.length);
} catch (IOException ex) {
close();
throw ex;
}
streamBuffer.currentPosition = 0;
streamBuffer.columnStart = 0;
streamBuffer.lastIndexToRead = streamBuffer.count
- streamBuffer.recordDelimiterLength;
streamBuffer.streamEndMeet = (streamBuffer.count < streamBuffer.buffer.length);
}
@Override
public boolean readRecord() throws IOException {
checkClosed();
columnsCount = 0;
hasReadRecord = false;
// check to see if we've already found the end of data
if (streamBuffer.hasMoreData()) {
// loop over the data stream until the end of data is found
// or the end of the record is found
// char currentLetter = '\0';
while (streamBuffer.hasMoreData() && !hasReadRecord) {
if (streamBuffer.noDataRecordDelimter()) {
checkDataLength();
} else {
// grab the current letter as a char
char currentLetter = streamBuffer.buffer[streamBuffer.currentPosition];
if (currentLetter == fieldDelimiter) {
// we encountered a column with no data, so
// just send the end column
endColumn();
streamBuffer.currentPosition++;
} else if (isStartRecordDelimited()) {
if (columnsCount > 0 || !skipEmptyRecord) {
endColumn();
endRecord();
}
streamBuffer.skipRecordDelimiter();
} else {
// since the letter wasn't a special letter, this
// will be the first letter of our current column
startedRow = true;
streamBuffer.columnStart = streamBuffer.currentPosition;
boolean firstLoop = true;
while (streamBuffer.hasMoreData() && startedRow) {
if (!firstLoop
&& streamBuffer.noDataRecordDelimter()) {
checkDataLength();
} else {
if (!firstLoop) {
// grab the current letter as a char
currentLetter = streamBuffer.buffer[streamBuffer.currentPosition];
if (currentLetter == fieldDelimiter) {
endColumn();
streamBuffer.currentPosition++;
} else if (isStartRecordDelimited()) {
endColumn();
endRecord();
streamBuffer.skipRecordDelimiter();
}
} else {
firstLoop = false;
}
if (startedRow) {
streamBuffer.currentPosition++;
if (safetySwitch
&& streamBuffer.currentPosition
- streamBuffer.columnStart
+ columnBuffer.position > StaticSettings.MAX_SIZE_FOR_SAFTY) {
close();
throw new IOException(
"Maximum column length of 100,000 exceeded in column "
+ NumberFormat
.getIntegerInstance()
.format(
columnsCount)
+ " in record "
+ NumberFormat
.getIntegerInstance()
.format(
currentRecord)
+ ". Set the SafetySwitch property to false"
+ " if you're expecting column lengths greater than 100,000 characters to"
+ " avoid this error.");
}
}
} // end else
}
}
} // end else
}
}
// check to see if we hit the end of the file
// without processing the current record
if (!hasReadRecord && !streamBuffer.noData()) {
while (!streamBuffer.noData()) {
if (streamBuffer.buffer[streamBuffer.currentPosition] == fieldDelimiter) {
endColumn();
streamBuffer.currentPosition++;
startedRow = true;
streamBuffer.columnStart = streamBuffer.currentPosition;
} else {
if (!startedRow) {
startedRow = true;
streamBuffer.columnStart = streamBuffer.currentPosition;
}
streamBuffer.currentPosition++;
}
}
if (startedRow) {
endColumn();
}
endRecord();
}
return hasReadRecord;
}
private boolean isStartRecordDelimited() {
for (int i = 0; i < recordDelimiter.length; i++) {
if (streamBuffer.buffer[streamBuffer.currentPosition + i] != recordDelimiter[i]) {
return false;
}
}
return true;
}
/**
* @exception IOException
* Thrown if an error occurs while reading data from the
* source stream.
*/
private void checkDataLength() throws IOException {
updateCurrentValue();
streamBuffer.moveTailToHead();
int count = 0;
try {
count = inputStream.read(streamBuffer.buffer, streamBuffer.count,
streamBuffer.buffer.length - streamBuffer.count);
} catch (IOException ex) {
close();
throw ex;
}
streamBuffer.count += count;
streamBuffer.lastIndexToRead = streamBuffer.count
- streamBuffer.recordDelimiterLength;
if (streamBuffer.count < streamBuffer.buffer.length) {
streamBuffer.streamEndMeet = true;
}
}
/**
* @exception IOException
* Thrown if a very rare extreme exception occurs during
* parsing, normally resulting from improper data format.
*/
private void endColumn() throws IOException {
String currentValue = "";
// must be called before setting startedColumn = false
if (startedRow) {
if (columnBuffer.position == 0) {
if (streamBuffer.columnStart < streamBuffer.currentPosition) {
currentValue = new String(streamBuffer.buffer,
streamBuffer.columnStart,
streamBuffer.currentPosition
- streamBuffer.columnStart);
}
} else {
updateCurrentValue();
currentValue = new String(columnBuffer.buffer, 0,
columnBuffer.position);
}
}
columnBuffer.position = 0;
startedRow = false;
if (columnsCount >= StaticSettings.MAX_SIZE_FOR_SAFTY && safetySwitch) {
close();
throw new IOException(
"Maximum column count of 100,000 exceeded in record "
+ NumberFormat.getIntegerInstance().format(
currentRecord)
+ ". Set the SafetySwitch property to false"
+ " if you're expecting more than 100,000 columns per record to"
+ " avoid this error.");
}
// check to see if our current holder array for
// column chunks is still big enough to handle another
// column chunk
if (columnsCount == values.length) {
// holder array needs to grow to be able to hold another column
int newLength = values.length * 2;
String[] holder = new String[newLength];
System.arraycopy(values, 0, holder, 0, values.length);
values = holder;
}
values[columnsCount] = currentValue;
currentValue = "";
columnsCount++;
}
private void updateCurrentValue() {
if (startedRow
&& streamBuffer.columnStart < streamBuffer.currentPosition) {
if (columnBuffer.buffer.length - columnBuffer.position < streamBuffer.currentPosition
- streamBuffer.columnStart) {
int newLength = columnBuffer.buffer.length
+ Math.max(streamBuffer.currentPosition
- streamBuffer.columnStart,
columnBuffer.buffer.length);
char[] holder = new char[newLength];
System.arraycopy(columnBuffer.buffer, 0, holder, 0,
columnBuffer.position);
columnBuffer.buffer = holder;
}
System.arraycopy(streamBuffer.buffer, streamBuffer.columnStart,
columnBuffer.buffer, columnBuffer.position,
streamBuffer.currentPosition - streamBuffer.columnStart);
columnBuffer.position += streamBuffer.currentPosition
- streamBuffer.columnStart;
}
}
/**
*
*/
protected void close(boolean closing) {
if (!closed) {
if (closing) {
streamBuffer.buffer = null;
columnBuffer.buffer = null;
}
try {
if (initialized) {
inputStream.close();
}
} catch (Exception e) {
// just eat the exception
}
inputStream = null;
closed = true;
}
}
/**
* A buffer structure that used to load data from stream for processing.
*
* @author gke
*/
private class StreamBuffer {
char[] buffer;
int currentPosition;
// lastIndexToRead is the last index of letter in the buffer for
// processing, this will be equal to count - recordDelimiterLegnth. This
// is needed because evertime we process a letter, when we need to read
// the proceeding letters in order to make sure if currentPosition is
// the begin of record delmiter
int lastIndexToRead;
int recordDelimiterLength;
// count indicates how much usable data has been read into the stream,
// which will not always be as long as buffer.Length.
int count;
// columnStart is the position in the buffer when the
// current column was started or the last time data
// was moved out to the column buffer.
int columnStart;
// streamEndMeet is also very important, when we loag data from stream,
// if the count is smaller than buffer.length, it indcates that there
// must be no more data in the stream, the stream end is meet.
boolean streamEndMeet = false;
public StreamBuffer() {
buffer = new char[StaticSettings.MAX_BUFFER_SIZE];
currentPosition = 0;
count = 0;
columnStart = 0;
}
public void moveTailToHead() {
count = count - currentPosition;
for (int i = 0; i < count; i++) {
buffer[i] = buffer[currentPosition + i];
}
lastIndexToRead = count - recordDelimiterLength;
currentPosition = 0;
columnStart = 0;
}
public void skipRecordDelimiter() {
currentPosition += recordDelimiterLength;
}
public boolean noDataRecordDelimter() {
return currentPosition > lastIndexToRead;
}
public boolean hasMoreData() {
return (currentPosition <= lastIndexToRead)
|| (currentPosition > lastIndexToRead && !streamEndMeet);
}
public boolean noData() {
return currentPosition >= count;
}
}
}

View File

@@ -1,440 +0,0 @@
// ============================================================================
//
// Talend Community Edition
//
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
//
// ============================================================================
package org.talend.fileprocess.delimited;
import java.io.BufferedReader;
import java.io.IOException;
import java.text.NumberFormat;
/**
* A stream based parser for parsing delimited text data from a reader. this
* class only works when the field delimiter is a String which length is > 1,
* and the record delimiter is a single character.<br/>
*
* @author gke
*
*/
final class ComplexDelimitedDataReader2 extends DelimitedDataReader {
private boolean useCustomRecordDelimiter = false;
private StreamBuffer streamBuffer = new StreamBuffer();
private char lastLetter = '\0';
private char[] fieldDelimiter;
private char recordDelimiter;
public ComplexDelimitedDataReader2(BufferedReader inputStream, String delimiter,
char recordDelimiter, boolean skipEmptyRecords) throws IOException {
super(inputStream, skipEmptyRecords);
this.fieldDelimiter = delimiter.toCharArray();
this.recordDelimiter = recordDelimiter;
if (recordDelimiter != '\n' && recordDelimiter != '\r') {
useCustomRecordDelimiter = true;
}
streamBuffer.fieldDelimiterLength = this.fieldDelimiter.length;
try {
streamBuffer.count = inputStream.read(streamBuffer.buffer, 0,
streamBuffer.buffer.length);
} catch (IOException ex) {
close();
throw ex;
}
streamBuffer.currentPosition = 0;
streamBuffer.columnStart = 0;
streamBuffer.lastIndexToRead = streamBuffer.count
- streamBuffer.fieldDelimiterLength;
streamBuffer.streamEndMeet = (streamBuffer.count < streamBuffer.buffer.length);
}
@Override
public boolean readRecord() throws IOException {
checkClosed();
columnsCount = 0;
hasReadRecord = false;
if (streamBuffer.hasMoreData()) {
while (streamBuffer.hasMoreData() && !hasReadRecord) {
if (streamBuffer.noDataFieldDelimiter()) {
checkDataLength();
} else {
char currentLetter = streamBuffer.buffer[streamBuffer.currentPosition];
if (isStartFieldDelimited()) {
// encountered a column with no data
endColumn();
streamBuffer.skipFieldDelimiter();
lastLetter = streamBuffer.buffer[streamBuffer.currentPosition - 1];
} else if (useCustomRecordDelimiter
&& currentLetter == recordDelimiter) {
if (!skipEmptyRecord || columnsCount > 0) {
endColumn();
endRecord();
}
streamBuffer.currentPosition++;
lastLetter = streamBuffer.buffer[streamBuffer.currentPosition - 1];
} else if (!useCustomRecordDelimiter
&& (currentLetter == '\r' || currentLetter == '\n')) {
if ((!skipEmptyRecord || columnsCount > 0)
&& ((currentLetter == '\r' || lastLetter != '\r'))) {
endColumn();
endRecord();
}
streamBuffer.currentPosition++;
lastLetter = streamBuffer.buffer[streamBuffer.currentPosition - 1];
} else {
// since the letter wasn't a special letter, this
// will be the first letter of our current column
startedRow = true;
streamBuffer.columnStart = streamBuffer.currentPosition;
boolean firstLoop = true;
while (streamBuffer.hasMoreData() && startedRow) {
if (!firstLoop
&& streamBuffer.noDataFieldDelimiter()) {
checkDataLength();
} else {
if (!firstLoop) {
currentLetter = streamBuffer.buffer[streamBuffer.currentPosition];
if (isStartFieldDelimited()) {
endColumn();
streamBuffer.skipFieldDelimiter();
lastLetter = streamBuffer.buffer[streamBuffer.currentPosition - 1];
} else if ((!useCustomRecordDelimiter && (currentLetter == '\r' || currentLetter == '\n'))
|| (useCustomRecordDelimiter && currentLetter == recordDelimiter)) {
endColumn();
endRecord();
streamBuffer.currentPosition++;
lastLetter = streamBuffer.buffer[streamBuffer.currentPosition - 1];
}
} else {
firstLoop = false;
}
if (startedRow) {
lastLetter = currentLetter;
streamBuffer.currentPosition++;
if (safetySwitch
&& streamBuffer.currentPosition
- streamBuffer.columnStart
+ columnBuffer.position > StaticSettings.MAX_SIZE_FOR_SAFTY) {
close();
throw new IOException(
"Maximum column length of 100,000 exceeded in column "
+ NumberFormat
.getIntegerInstance()
.format(
columnsCount)
+ " in record "
+ NumberFormat
.getIntegerInstance()
.format(
currentRecord)
+ ". Set the SafetySwitch property to false"
+ " if you're expecting column lengths greater than 100,000 characters to"
+ " avoid this error.");
}
}
} // end else
}
}
} // end else
}
}
if (!hasReadRecord) {
while (!streamBuffer.noData() && !hasReadRecord) {
char currentLetter = streamBuffer.buffer[streamBuffer.currentPosition];
if (startedRow) {
if ((!useCustomRecordDelimiter && (currentLetter == '\r' || currentLetter == '\n'))
|| (useCustomRecordDelimiter && currentLetter == recordDelimiter)) {
endColumn();
endRecord();
lastLetter = currentLetter;
streamBuffer.currentPosition++;
return true;
} else {
lastLetter = currentLetter;
streamBuffer.currentPosition++;
}
} else {
if (useCustomRecordDelimiter
&& currentLetter == recordDelimiter) {
if (!skipEmptyRecord || columnsCount > 0) {
endColumn();
endRecord();
lastLetter = currentLetter;
streamBuffer.currentPosition++;
return true;
} else {
lastLetter = currentLetter;
streamBuffer.currentPosition++;
}
} else if (!useCustomRecordDelimiter
&& (currentLetter == '\r' || currentLetter == '\n')) {
if ((!skipEmptyRecord || columnsCount > 0)
&& ((currentLetter == '\r' || lastLetter != '\r'))) {
endColumn();
endRecord();
lastLetter = currentLetter;
streamBuffer.currentPosition++;
return true;
} else {
lastLetter = currentLetter;
streamBuffer.currentPosition++;
}
} else {
startedRow = true;
streamBuffer.columnStart = streamBuffer.currentPosition;
lastLetter = currentLetter;
streamBuffer.currentPosition++;
}
}
}
if (streamBuffer.noData() && !hasReadRecord) {
if (columnsCount > 0 || startedRow) {
endColumn();
endRecord();
return true;
}
}
}
return hasReadRecord;
}
private boolean isStartFieldDelimited() {
for (int i = 0; i < fieldDelimiter.length; i++) {
if (streamBuffer.buffer[streamBuffer.currentPosition + i] != fieldDelimiter[i]) {
return false;
}
}
return true;
}
/**
* @exception IOException
* Thrown if an error occurs while reading data from the
* source stream.
*/
private void checkDataLength() throws IOException {
updateCurrentValue();
streamBuffer.moveTailToHead();
int count = 0;
try {
count = inputStream.read(streamBuffer.buffer, streamBuffer.count,
streamBuffer.buffer.length - streamBuffer.count);
} catch (IOException ex) {
close();
throw ex;
}
streamBuffer.count += count;
streamBuffer.lastIndexToRead = streamBuffer.count
- streamBuffer.fieldDelimiterLength;
if (streamBuffer.count < streamBuffer.buffer.length) {
streamBuffer.streamEndMeet = true;
}
}
/**
* @exception IOException
* Thrown if a very rare extreme exception occurs during
* parsing, normally resulting from improper data format.
*/
private void endColumn() throws IOException {
String currentValue = "";
// must be called before setting startedColumn = false
if (startedRow) {
if (columnBuffer.position == 0) {
if (streamBuffer.columnStart < streamBuffer.currentPosition) {
currentValue = new String(streamBuffer.buffer,
streamBuffer.columnStart,
streamBuffer.currentPosition
- streamBuffer.columnStart);
}
} else {
updateCurrentValue();
currentValue = new String(columnBuffer.buffer, 0,
columnBuffer.position);
}
}
columnBuffer.position = 0;
startedRow = false;
if (columnsCount >= StaticSettings.MAX_SIZE_FOR_SAFTY && safetySwitch) {
close();
throw new IOException(
"Maximum column count of 100,000 exceeded in record "
+ NumberFormat.getIntegerInstance().format(
currentRecord)
+ ". Set the SafetySwitch property to false"
+ " if you're expecting more than 100,000 columns per record to"
+ " avoid this error.");
}
// check to see if our current holder array for
// column chunks is still big enough to handle another
// column chunk
if (columnsCount == values.length) {
// holder array needs to grow to be able to hold another column
int newLength = values.length * 2;
String[] holder = new String[newLength];
System.arraycopy(values, 0, holder, 0, values.length);
values = holder;
}
values[columnsCount] = currentValue;
currentValue = "";
columnsCount++;
}
private void updateCurrentValue() {
if (startedRow
&& streamBuffer.columnStart < streamBuffer.currentPosition) {
if (columnBuffer.buffer.length - columnBuffer.position < streamBuffer.currentPosition
- streamBuffer.columnStart) {
int newLength = columnBuffer.buffer.length
+ Math.max(streamBuffer.currentPosition
- streamBuffer.columnStart,
columnBuffer.buffer.length);
char[] holder = new char[newLength];
System.arraycopy(columnBuffer.buffer, 0, holder, 0,
columnBuffer.position);
columnBuffer.buffer = holder;
}
System.arraycopy(streamBuffer.buffer, streamBuffer.columnStart,
columnBuffer.buffer, columnBuffer.position,
streamBuffer.currentPosition - streamBuffer.columnStart);
columnBuffer.position += streamBuffer.currentPosition
- streamBuffer.columnStart;
}
}
@Override
protected void close(boolean closing) {
if (!closed) {
if (closing) {
streamBuffer.buffer = null;
columnBuffer.buffer = null;
}
try {
if (initialized) {
inputStream.close();
}
} catch (Exception e) {
// just eat the exception
}
inputStream = null;
closed = true;
}
}
/**
* A buffer structure that used to load data from stream for processing.
*
* @author gke
*/
private class StreamBuffer {
char[] buffer;
int currentPosition;
// lastIndexToRead is the last index of letter in the buffer for
// processing, this will be equal to count - fieldDelimiterLegnth. This
// is needed because evertime we process a letter, when we need to read
// the proceeding letters in order to make sure if currentPosition is
// the begin of field delmiter
int lastIndexToRead;
int fieldDelimiterLength;
// count indicates how much usable data has been read into the stream,
// which will not always be as long as Buffer.Length.
int count;
// columnStart is the position in the buffer when the
// current column was started or the last time data
// was moved out to the column buffer.
int columnStart;
// streamEndMeet is also very important, when we loag data from stream,
// if the count is smaller than buffer.length, it indcates that there
// must be no more data in the stream, the stream end is meet.
boolean streamEndMeet = false;
public StreamBuffer() {
buffer = new char[StaticSettings.MAX_BUFFER_SIZE];
currentPosition = 0;
count = 0;
columnStart = 0;
}
public void moveTailToHead() {
count = count - currentPosition;
for (int i = 0; i < count; i++) {
buffer[i] = buffer[currentPosition + i];
}
lastIndexToRead = count - fieldDelimiterLength;
currentPosition = 0;
columnStart = 0;
}
public void skipFieldDelimiter() {
currentPosition += fieldDelimiterLength;
}
public boolean noData() {
return currentPosition >= count;
}
public boolean hasMoreData() {
return (currentPosition <= lastIndexToRead)
|| (currentPosition > lastIndexToRead && !streamEndMeet);
}
public boolean noDataFieldDelimiter() {
return currentPosition > lastIndexToRead;
}
}
}

View File

@@ -1,401 +0,0 @@
// ============================================================================
//
// Talend Community Edition
//
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
//
// ============================================================================
package org.talend.fileprocess.delimited;
import java.io.BufferedReader;
import java.io.IOException;
import java.text.NumberFormat;
/**
* A stream based parser for parsing delimited text data from a reader. this
* class works when the record delimiter and field delimiter are both a String
* which length is > 1, and the record delimiter's length equals to the field
* delimiter's.<br/>
*
* @author gke
*
*/
final class ComplexDelimitedDataReader3 extends DelimitedDataReader {
private StreamBuffer streamBuffer = new StreamBuffer();
private char[] fieldDelimiter;
private char[] recordDelimiter;
public ComplexDelimitedDataReader3(BufferedReader inputStream, String delimiter,
String recordDelimiter, boolean skipEmptyRecords)
throws IOException {
super(inputStream, skipEmptyRecords);
this.fieldDelimiter = delimiter.toCharArray();
this.recordDelimiter = recordDelimiter.toCharArray();
streamBuffer.delimiterLength = this.fieldDelimiter.length;
try {
streamBuffer.count = inputStream.read(streamBuffer.buffer, 0,
streamBuffer.buffer.length);
} catch (IOException ex) {
close();
throw ex;
}
streamBuffer.currentPosition = 0;
streamBuffer.columnStart = 0;
streamBuffer.lastIndexToRead = streamBuffer.count
- streamBuffer.delimiterLength;
streamBuffer.streamEndMeet = (streamBuffer.count < streamBuffer.buffer.length);
}
@Override
public boolean readRecord() throws IOException {
checkClosed();
columnsCount = 0;
hasReadRecord = false;
if (streamBuffer.hasMoreData()) {
while (streamBuffer.hasMoreData() && !hasReadRecord) {
if (streamBuffer.noDataForDelimter()) {
checkDataLength();
} else {
if (isStartFieldDelimited()) {
// encountered a column with no data
endColumn();
streamBuffer.skipDelimiter();
} else if (isStartRecordDelimited()) {
if (columnsCount > 0 || !skipEmptyRecord) {
endColumn();
endRecord();
}
streamBuffer.skipDelimiter();
} else {
startedRow = true;
streamBuffer.columnStart = streamBuffer.currentPosition;
boolean firstLoop = true;
while (streamBuffer.hasMoreData() && startedRow) {
if (!firstLoop && streamBuffer.noDataForDelimter()) {
checkDataLength();
} else {
if (!firstLoop) {
if (isStartFieldDelimited()) {
endColumn();
streamBuffer.skipDelimiter();
} else if (isStartRecordDelimited()) {
endColumn();
endRecord();
streamBuffer.skipDelimiter();
}
} else {
firstLoop = false;
}
if (startedRow) {
streamBuffer.currentPosition++;
if (safetySwitch
&& streamBuffer.currentPosition
- streamBuffer.columnStart
+ columnBuffer.position > StaticSettings.MAX_SIZE_FOR_SAFTY) {
close();
throw new IOException(
"Maximum column length of 100,000 exceeded in column "
+ NumberFormat
.getIntegerInstance()
.format(
columnsCount)
+ " in record "
+ NumberFormat
.getIntegerInstance()
.format(
currentRecord)
+ ". Set the SafetySwitch property to false"
+ " if you're expecting column lengths greater than 100,000 characters to"
+ " avoid this error.");
}
}
} // end else
}
}
} // end else
}
}
if (!hasReadRecord) {
if (!streamBuffer.noData()) {
if (!startedRow) {
startedRow = true;
streamBuffer.columnStart = streamBuffer.currentPosition;
streamBuffer.currentPosition = streamBuffer.count;
endColumn();
endRecord();
return true;
}
if (startedRow) {
streamBuffer.currentPosition = streamBuffer.count;
endColumn();
endRecord();
return true;
}
} else {
if (columnsCount > 0) {
endRecord();
return true;
} else {
return false;
}
}
}
return hasReadRecord;
}
private boolean isStartFieldDelimited() {
for (int i = 0; i < fieldDelimiter.length; i++) {
if (streamBuffer.buffer[streamBuffer.currentPosition + i] != fieldDelimiter[i]) {
return false;
}
}
return true;
}
private boolean isStartRecordDelimited() {
for (int i = 0; i < recordDelimiter.length; i++) {
if (streamBuffer.buffer[streamBuffer.currentPosition + i] != recordDelimiter[i]) {
return false;
}
}
return true;
}
/**
* @exception IOException
* Thrown if an error occurs while reading data from the
* source stream.
*/
private void checkDataLength() throws IOException {
updateCurrentValue();
streamBuffer.moveTailToHead();
int count = 0;
try {
count = inputStream.read(streamBuffer.buffer, streamBuffer.count,
streamBuffer.buffer.length - streamBuffer.count);
} catch (IOException ex) {
close();
throw ex;
}
streamBuffer.count += count;
streamBuffer.lastIndexToRead = streamBuffer.count
- streamBuffer.delimiterLength;
if (streamBuffer.count < streamBuffer.buffer.length) {
streamBuffer.streamEndMeet = true;
}
}
/**
* @exception IOException
* Thrown if a very rare extreme exception occurs during
* parsing, normally resulting from improper data format.
*/
private void endColumn() throws IOException {
String currentValue = "";
// must be called before setting startedColumn = false
if (startedRow) {
if (columnBuffer.position == 0) {
if (streamBuffer.columnStart < streamBuffer.currentPosition) {
currentValue = new String(streamBuffer.buffer,
streamBuffer.columnStart,
streamBuffer.currentPosition
- streamBuffer.columnStart);
}
} else {
updateCurrentValue();
currentValue = new String(columnBuffer.buffer, 0,
columnBuffer.position);
}
}
columnBuffer.position = 0;
startedRow = false;
if (columnsCount >= StaticSettings.MAX_SIZE_FOR_SAFTY && safetySwitch) {
close();
throw new IOException(
"Maximum column count of 100,000 exceeded in record "
+ NumberFormat.getIntegerInstance().format(
currentRecord)
+ ". Set the SafetySwitch property to false"
+ " if you're expecting more than 100,000 columns per record to"
+ " avoid this error.");
}
// check to see if our current holder array for
// column chunks is still big enough to handle another
// column chunk
if (columnsCount == values.length) {
// holder array needs to grow to be able to hold another column
int newLength = values.length * 2;
String[] holder = new String[newLength];
System.arraycopy(values, 0, holder, 0, values.length);
values = holder;
}
values[columnsCount] = currentValue;
currentValue = "";
columnsCount++;
}
private void updateCurrentValue() {
if (startedRow
&& streamBuffer.columnStart < streamBuffer.currentPosition) {
if (columnBuffer.buffer.length - columnBuffer.position < streamBuffer.currentPosition
- streamBuffer.columnStart) {
int newLength = columnBuffer.buffer.length
+ Math.max(streamBuffer.currentPosition
- streamBuffer.columnStart,
columnBuffer.buffer.length);
char[] holder = new char[newLength];
System.arraycopy(columnBuffer.buffer, 0, holder, 0,
columnBuffer.position);
columnBuffer.buffer = holder;
}
System.arraycopy(streamBuffer.buffer, streamBuffer.columnStart,
columnBuffer.buffer, columnBuffer.position,
streamBuffer.currentPosition - streamBuffer.columnStart);
columnBuffer.position += streamBuffer.currentPosition
- streamBuffer.columnStart;
}
}
@Override
protected void close(boolean closing) {
if (!closed) {
if (closing) {
streamBuffer.buffer = null;
columnBuffer.buffer = null;
}
try {
if (initialized) {
inputStream.close();
}
} catch (Exception e) {
// just eat the exception
}
inputStream = null;
closed = true;
}
}
/**
* A buffer structure that used to load data from stream for processing.
*
* @author gke
*/
private class StreamBuffer {
char[] buffer;
int currentPosition;
// lastIndexToRead is the last index of letter in the buffer for
// processing, this will be equal to count - delimiterLegnth. This
// is needed because evertime we process a letter, when we need to read
// the proceeding letters in order to make sure if currentPosition is
// the begin of field or record delmiter
int lastIndexToRead;
int delimiterLength;
// count indicates how much usable data has been read into the stream,
// which will not always be as long as Buffer.Length.
int count;
// columnStart is the position in the buffer when the
// current column was started or the last time data
// was moved out to the column buffer.
int columnStart;
// streamEndMeet is also very important, when we loag data from stream,
// if the count is smaller than buffer.length, it indcates that there
// must be no more data in the stream, the stream end is meet.
boolean streamEndMeet = false;
public StreamBuffer() {
buffer = new char[StaticSettings.MAX_BUFFER_SIZE];
currentPosition = 0;
count = 0;
columnStart = 0;
}
public void moveTailToHead() {
count = count - currentPosition;
for (int i = 0; i < count; i++) {
buffer[i] = buffer[currentPosition + i];
}
lastIndexToRead = count - delimiterLength;
currentPosition = 0;
columnStart = 0;
}
public void skipDelimiter() {
currentPosition += delimiterLength;
}
public boolean noDataForDelimter() {
return currentPosition > lastIndexToRead;
}
public boolean hasMoreData() {
return (currentPosition <= lastIndexToRead)
|| (currentPosition > lastIndexToRead && !streamEndMeet);
}
public boolean noData() {
return currentPosition >= count;
}
}
}

View File

@@ -1,441 +0,0 @@
// ============================================================================
//
// Talend Community Edition
//
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
//
// ============================================================================
package org.talend.fileprocess.delimited;
import java.io.BufferedReader;
import java.io.IOException;
import java.text.NumberFormat;
/**
* A stream based parser for parsing delimited text data from a reader. this
* class works when the record delimiter and field delimiter are both a String
* which length is > 1, and the field delimiter's length is longer than the
* record delimiter's.<br/>
*
* @author gke
*
*/
final class ComplexDelimitedDataReader4 extends DelimitedDataReader {
private StreamBuffer streamBuffer = new StreamBuffer();
private char[] fieldDelimiter;
private char[] recordDelimiter;
public ComplexDelimitedDataReader4(BufferedReader inputStream, String delimiter,
String recordDelimiter, boolean skipEmptyRecords)
throws IOException {
super(inputStream, skipEmptyRecords);
this.fieldDelimiter = delimiter.toCharArray();
this.recordDelimiter = recordDelimiter.toCharArray();
streamBuffer.fieldDelimiterLength = this.fieldDelimiter.length;
streamBuffer.recordDelimiterLength = this.recordDelimiter.length;
try {
streamBuffer.count = inputStream.read(streamBuffer.buffer, 0,
streamBuffer.buffer.length);
} catch (IOException ex) {
close();
throw ex;
}
streamBuffer.currentPosition = 0;
streamBuffer.columnStart = 0;
streamBuffer.lastIndexToRead = streamBuffer.count
- streamBuffer.fieldDelimiterLength;
streamBuffer.streamEndMeet = (streamBuffer.count < streamBuffer.buffer.length);
}
@Override
public boolean readRecord() throws IOException {
checkClosed();
columnsCount = 0;
hasReadRecord = false;
if (streamBuffer.hasMoreData()) {
while (streamBuffer.hasMoreData() && !hasReadRecord) {
if (streamBuffer.noDataFieldDelimter()) {
checkDataLength();
} else {
if (isStartFieldDelimited()) {
// we encountered a column with no data
endColumn();
streamBuffer.skipFieldDelimiter();
} else if (isStartRecordDelimited()) {
if (columnsCount > 0 || !skipEmptyRecord) {
endColumn();
endRecord();
}
streamBuffer.skipRecordDelimiter();
} else {
// since the letter wasn't a special letter, this
// will be the first letter of our current column
startedRow = true;
streamBuffer.columnStart = streamBuffer.currentPosition;
boolean firstLoop = true;
while (streamBuffer.hasMoreData() && startedRow) {
if (!firstLoop && streamBuffer.noDataFieldDelimter()) {
checkDataLength();
} else {
if (!firstLoop) {
if (isStartFieldDelimited()) {
endColumn();
streamBuffer.skipFieldDelimiter();
} else if (isStartRecordDelimited()) {
endColumn();
endRecord();
streamBuffer.skipRecordDelimiter();
}
} else {
firstLoop = false;
}
if (startedRow) {
streamBuffer.currentPosition++;
if (safetySwitch
&& streamBuffer.currentPosition
- streamBuffer.columnStart
+ columnBuffer.position > StaticSettings.MAX_SIZE_FOR_SAFTY) {
close();
throw new IOException(
"Maximum column length of 100,000 exceeded in column "
+ NumberFormat
.getIntegerInstance()
.format(
columnsCount)
+ " in record "
+ NumberFormat
.getIntegerInstance()
.format(
currentRecord)
+ ". Set the SafetySwitch property to false"
+ " if you're expecting column lengths greater than 100,000 characters to"
+ " avoid this error.");
}
}
} // end else
}
}
} // end else
}
}
if (!hasReadRecord) {
while (!streamBuffer.noData()) {
String tail = streamBuffer.getTail();
int index = tail.indexOf(new String(recordDelimiter));
if (index == -1) {
if (startedRow) {
streamBuffer.currentPosition = streamBuffer.count;
endColumn();
endRecord();
streamBuffer.skipRecordDelimiter();
return true;
} else {
if (!startedRow) {
streamBuffer.columnStart = streamBuffer.currentPosition;
startedRow = true;
}
streamBuffer.currentPosition = streamBuffer.count;
endColumn();
endRecord();
streamBuffer.skipRecordDelimiter();
return true;
}
} else if (index == 0) {
if (columnsCount > 0) {
endColumn();
endRecord();
streamBuffer.skipRecordDelimiter();
return true;
}
if (skipEmptyRecord) {
streamBuffer.currentPosition += recordDelimiter.length;
continue;
} else {
endRecord();
streamBuffer.skipRecordDelimiter();
return true;
}
} else {
if (!startedRow) {
streamBuffer.columnStart = streamBuffer.currentPosition;
startedRow = true;
}
streamBuffer.currentPosition += index;
endColumn();
endRecord();
streamBuffer.skipRecordDelimiter();
return true;
}
}
if (columnsCount > 0 && streamBuffer.noData()) {
endColumn();
endRecord();
}
}
return hasReadRecord;
}
private boolean isStartFieldDelimited() {
for (int i = 0; i < this.fieldDelimiter.length; i++) {
if (streamBuffer.buffer[streamBuffer.currentPosition + i] != this.fieldDelimiter[i]) {
return false;
}
}
return true;
}
private boolean isStartRecordDelimited() {
for (int i = 0; i < this.recordDelimiter.length; i++) {
if (streamBuffer.buffer[streamBuffer.currentPosition + i] != this.recordDelimiter[i]) {
return false;
}
}
return true;
}
/**
* @exception IOException
* Thrown if an error occurs while reading data from the
* source stream.
*/
private void checkDataLength() throws IOException {
updateCurrentValue();
streamBuffer.moveTailToHead();
int count = 0;
try {
count = inputStream.read(streamBuffer.buffer, streamBuffer.count,
streamBuffer.buffer.length - streamBuffer.count);
} catch (IOException ex) {
close();
throw ex;
}
streamBuffer.count += count;
streamBuffer.lastIndexToRead = streamBuffer.count
- streamBuffer.fieldDelimiterLength;
if (streamBuffer.count < streamBuffer.buffer.length) {
streamBuffer.streamEndMeet = true;
}
// if no more data could be found, set flag stating that
// the end of the data was found
// if (dataBuffer.Count == -1) {
// hasMoreData = false;
// }
}
/**
* @exception IOException
* Thrown if a very rare extreme exception occurs during
* parsing, normally resulting from improper data format.
*/
private void endColumn() throws IOException {
String currentValue = "";
// must be called before setting startedColumn = false
if (startedRow) {
if (columnBuffer.position == 0) {
if (streamBuffer.columnStart < streamBuffer.currentPosition) {
// if(c)
currentValue = new String(streamBuffer.buffer,
streamBuffer.columnStart, streamBuffer.currentPosition
- streamBuffer.columnStart);
}
} else {
updateCurrentValue();
currentValue = new String(columnBuffer.buffer, 0,
columnBuffer.position);
}
}
columnBuffer.position = 0;
startedRow = false;
if (columnsCount >= StaticSettings.MAX_SIZE_FOR_SAFTY && safetySwitch) {
close();
throw new IOException(
"Maximum column count of 100,000 exceeded in record "
+ NumberFormat.getIntegerInstance().format(
currentRecord)
+ ". Set the SafetySwitch property to false"
+ " if you're expecting more than 100,000 columns per record to"
+ " avoid this error.");
}
// check to see if our current holder array for
// column chunks is still big enough to handle another
// column chunk
if (columnsCount == values.length) {
// holder array needs to grow to be able to hold another column
int newLength = values.length * 2;
String[] holder = new String[newLength];
System.arraycopy(values, 0, holder, 0, values.length);
values = holder;
}
values[columnsCount] = currentValue;
currentValue = "";
columnsCount++;
}
private void updateCurrentValue() {
if (startedRow
&& streamBuffer.columnStart < streamBuffer.currentPosition) {
if (columnBuffer.buffer.length - columnBuffer.position < streamBuffer.currentPosition
- streamBuffer.columnStart) {
int newLength = columnBuffer.buffer.length
+ Math.max(streamBuffer.currentPosition
- streamBuffer.columnStart,
columnBuffer.buffer.length);
char[] holder = new char[newLength];
System.arraycopy(columnBuffer.buffer, 0, holder, 0,
columnBuffer.position);
columnBuffer.buffer = holder;
}
System.arraycopy(streamBuffer.buffer, streamBuffer.columnStart,
columnBuffer.buffer, columnBuffer.position,
streamBuffer.currentPosition - streamBuffer.columnStart);
columnBuffer.position += streamBuffer.currentPosition
- streamBuffer.columnStart;
}
// dataBuffer.ColumnStart = dataBuffer.Position + 1;
}
@Override
protected void close(boolean closing) {
if (!closed) {
if (closing) {
streamBuffer.buffer = null;
columnBuffer.buffer = null;
}
try {
if (initialized) {
inputStream.close();
}
} catch (Exception e) {
// just eat the exception
}
inputStream = null;
closed = true;
}
}
/**
* A buffer structure that used to load data from stream for processing.
*
* @author gke
*/
private class StreamBuffer {
char[] buffer;
int currentPosition;
// lastIndexToRead is the last index of letter in the buffer for
// processing, this will be equal to count - fieldDelimiterLegnth. This
// is needed because evertime we process a letter, when we need to read
// the proceeding letters in order to make sure if currentPosition is
// the begin of field delmiter
int lastIndexToRead;
int fieldDelimiterLength;
int recordDelimiterLength;
// count indicates how much usable data has been read into the stream,
// which will not always be as long as buffer.Length.
int count;
// columnStart is the position in the buffer when the
// current column was started or the last time data
// was moved out to the column buffer.
int columnStart;
// streamEndMeet is also very important, when we loag data from stream,
// if the count is smaller than buffer.length, it indcates that there
// must be no more data in the stream, the stream end is meet.
boolean streamEndMeet = false;
public StreamBuffer() {
buffer = new char[StaticSettings.MAX_BUFFER_SIZE];
currentPosition = 0;
count = 0;
columnStart = 0;
}
public void moveTailToHead() {
count = count - currentPosition;
for (int i = 0; i < count; i++) {
buffer[i] = buffer[currentPosition + i];
}
lastIndexToRead = count - fieldDelimiterLength;
currentPosition = 0;
columnStart = 0;
}
public void skipFieldDelimiter() {
currentPosition += fieldDelimiterLength;
}
public void skipRecordDelimiter() {
currentPosition += recordDelimiterLength;
}
public boolean noDataFieldDelimter() {
return currentPosition > lastIndexToRead;
}
public boolean hasMoreData() {
return (currentPosition <= lastIndexToRead)
|| (currentPosition > lastIndexToRead && !streamEndMeet);
}
public boolean noData() {
return currentPosition >= count;
}
public String getTail() {
return new String(buffer, currentPosition, count - currentPosition);
}
}
}

View File

@@ -1,428 +0,0 @@
// ============================================================================
//
// Talend Community Edition
//
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
//
// ============================================================================
package org.talend.fileprocess.delimited;
import java.io.BufferedReader;
import java.io.IOException;
import java.text.NumberFormat;
/**
* A stream based parser for parsing delimited text data from a reader. this
* class works when the record delimiter and field delimiter are both a String
* which length is > 1, and the record delimiter's length is longer than the
* field delimiter's.<br/>
*
* @author gke
*
*/
final class ComplexDelimitedDataReader5 extends DelimitedDataReader {
private StreamBuffer streamBuffer = new StreamBuffer();
private char[] fieldDelimiter;
private char[] recordDelimiter;
public ComplexDelimitedDataReader5(BufferedReader inputStream, String delimiter,
String recordDelimiter, boolean skipEmptyRecords)
throws IOException {
super(inputStream, skipEmptyRecords);
this.fieldDelimiter = delimiter.toCharArray();
this.recordDelimiter = recordDelimiter.toCharArray();
streamBuffer.fieldDelimiterLength = this.fieldDelimiter.length;
streamBuffer.recordDelimiterLength = this.recordDelimiter.length;
try {
streamBuffer.count = inputStream.read(streamBuffer.buffer, 0,
streamBuffer.buffer.length);
} catch (IOException ex) {
close();
throw ex;
}
streamBuffer.currentPosition = 0;
streamBuffer.columnStart = 0;
streamBuffer.lastIndexToRead = streamBuffer.count
- streamBuffer.recordDelimiterLength;
streamBuffer.streamEndMeet = (streamBuffer.count < streamBuffer.buffer.length);
}
@Override
public boolean readRecord() throws IOException {
checkClosed();
columnsCount = 0;
hasReadRecord = false;
if (streamBuffer.hasMoreData()) {
while (streamBuffer.hasMoreData() && !hasReadRecord) {
if (streamBuffer.noDataRecordDelimter()) {
checkDataLength();
} else {
if (isStartFieldDelimited()) {
// we encountered a column with no data
endColumn();
streamBuffer.skipFieldDelimiter();
} else if (isStartRecordDelimited()) {
if (columnsCount > 0 || !skipEmptyRecord) {
endColumn();
endRecord();
}
streamBuffer.skipRecordDelimiter();
} else {
// since the letter wasn't a special letter, this
// will be the first letter of our current column
startedRow = true;
streamBuffer.columnStart = streamBuffer.currentPosition;
boolean firstLoop = true;
while (streamBuffer.hasMoreData() && startedRow) {
if (!firstLoop
&& streamBuffer.noDataRecordDelimter()) {
checkDataLength();
} else {
if (!firstLoop) {
if (isStartFieldDelimited()) {
endColumn();
streamBuffer.skipFieldDelimiter();
} else if (isStartRecordDelimited()) {
endColumn();
endRecord();
streamBuffer.skipRecordDelimiter();
}
} else {
firstLoop = false;
}
if (startedRow) {
streamBuffer.currentPosition++;
if (safetySwitch
&& streamBuffer.currentPosition
- streamBuffer.columnStart
+ columnBuffer.position > StaticSettings.MAX_SIZE_FOR_SAFTY) {
close();
throw new IOException(
"Maximum column length of 100,000 exceeded in column "
+ NumberFormat
.getIntegerInstance()
.format(
columnsCount)
+ " in record "
+ NumberFormat
.getIntegerInstance()
.format(
currentRecord)
+ ". Set the SafetySwitch property to false"
+ " if you're expecting column lengths greater than 100,000 characters to"
+ " avoid this error.");
}
}
} // end else
}
}
} // end else
}
}
if (!hasReadRecord) {
while (!streamBuffer.noData()) {
String tail = streamBuffer.getTail();
int index = tail.indexOf(new String(this.fieldDelimiter));
if (index == -1) {
if (startedRow) {
streamBuffer.currentPosition = streamBuffer.count;
endColumn();
endRecord();
return true;
} else {
streamBuffer.columnStart = streamBuffer.currentPosition;
startedRow = true;
streamBuffer.currentPosition = streamBuffer.count;
endColumn();
endRecord();
return true;
}
} else {
if (!startedRow) {
streamBuffer.columnStart = streamBuffer.currentPosition;
startedRow = true;
}
streamBuffer.currentPosition += index;
endColumn();
streamBuffer.skipFieldDelimiter();
}
}
if (startedRow || columnsCount > 0) {
endRecord();
}
}
return hasReadRecord;
}
private boolean isStartFieldDelimited() {
for (int i = 0; i < this.fieldDelimiter.length; i++) {
if (streamBuffer.buffer[streamBuffer.currentPosition + i] != this.fieldDelimiter[i]) {
return false;
}
}
return true;
}
private boolean isStartRecordDelimited() {
for (int i = 0; i < this.recordDelimiter.length; i++) {
if (streamBuffer.buffer[streamBuffer.currentPosition + i] != this.recordDelimiter[i]) {
return false;
}
}
return true;
}
/**
* @exception IOException
* Thrown if an error occurs while reading data from the
* source stream.
*/
private void checkDataLength() throws IOException {
updateCurrentValue();
streamBuffer.moveTailToHead();
int count = 0;
try {
count = inputStream.read(streamBuffer.buffer, streamBuffer.count,
streamBuffer.buffer.length - streamBuffer.count);
} catch (IOException ex) {
close();
throw ex;
}
streamBuffer.count += count;
streamBuffer.lastIndexToRead = streamBuffer.count
- streamBuffer.recordDelimiterLength;
if (streamBuffer.count < streamBuffer.buffer.length) {
streamBuffer.streamEndMeet = true;
}
}
/**
* @exception IOException
* Thrown if a very rare extreme exception occurs during
* parsing, normally resulting from improper data format.
*/
private void endColumn() throws IOException {
String currentValue = "";
// must be called before setting startedColumn = false
if (startedRow) {
if (columnBuffer.position == 0) {
if (streamBuffer.columnStart < streamBuffer.currentPosition) {
currentValue = new String(streamBuffer.buffer,
streamBuffer.columnStart,
streamBuffer.currentPosition
- streamBuffer.columnStart);
}
} else {
updateCurrentValue();
currentValue = new String(columnBuffer.buffer, 0,
columnBuffer.position);
}
}
columnBuffer.position = 0;
startedRow = false;
if (columnsCount >= StaticSettings.MAX_SIZE_FOR_SAFTY && safetySwitch) {
close();
throw new IOException(
"Maximum column count of 100,000 exceeded in record "
+ NumberFormat.getIntegerInstance().format(
currentRecord)
+ ". Set the SafetySwitch property to false"
+ " if you're expecting more than 100,000 columns per record to"
+ " avoid this error.");
}
// check to see if our current holder array for
// column chunks is still big enough to handle another
// column chunk
if (columnsCount == values.length) {
// holder array needs to grow to be able to hold another column
int newLength = values.length * 2;
String[] holder = new String[newLength];
System.arraycopy(values, 0, holder, 0, values.length);
values = holder;
}
values[columnsCount] = currentValue;
currentValue = "";
columnsCount++;
}
private void updateCurrentValue() {
if (startedRow
&& streamBuffer.columnStart < streamBuffer.currentPosition) {
if (columnBuffer.buffer.length - columnBuffer.position < streamBuffer.currentPosition
- streamBuffer.columnStart) {
int newLength = columnBuffer.buffer.length
+ Math.max(streamBuffer.currentPosition
- streamBuffer.columnStart,
columnBuffer.buffer.length);
char[] holder = new char[newLength];
System.arraycopy(columnBuffer.buffer, 0, holder, 0,
columnBuffer.position);
columnBuffer.buffer = holder;
}
System.arraycopy(streamBuffer.buffer, streamBuffer.columnStart,
columnBuffer.buffer, columnBuffer.position,
streamBuffer.currentPosition - streamBuffer.columnStart);
columnBuffer.position += streamBuffer.currentPosition
- streamBuffer.columnStart;
}
// dataBuffer.ColumnStart = dataBuffer.Position + 1;
}
@Override
protected void close(boolean closing) {
if (!closed) {
if (closing) {
streamBuffer.buffer = null;
columnBuffer.buffer = null;
}
try {
if (initialized) {
inputStream.close();
}
} catch (Exception e) {
// just eat the exception
}
inputStream = null;
closed = true;
}
}
/**
* A buffer structure that used to load data from stream for processing.
*
* @author gke
*/
private class StreamBuffer {
char[] buffer;
int currentPosition;
// lastIndexToRead is the last index of letter in the buffer for
// processing, this will be equal to count - recordDelimiterLegnth. This
// is needed because evertime we process a letter, when we need to read
// the proceeding letters in order to make sure if currentPosition is
// the begin of record delmiter
int lastIndexToRead;
int fieldDelimiterLength;
int recordDelimiterLength;
// count indicates how much usable data has been read into the stream,
// which will not always be as long as buffer.Length.
int count;
// columnStart is the position in the buffer when the
// current column was started or the last time data
// was moved out to the column buffer.
int columnStart;
// streamEndMeet is also very important, when we loag data from stream,
// if the count is smaller than buffer.length, it indcates that there
// must be no more data in the stream, the stream end is meet.
boolean streamEndMeet = false;
public StreamBuffer() {
buffer = new char[StaticSettings.MAX_BUFFER_SIZE];
currentPosition = 0;
count = 0;
columnStart = 0;
}
public void moveTailToHead() {
count = count - currentPosition;
for (int i = 0; i < count; i++) {
buffer[i] = buffer[currentPosition + i];
}
lastIndexToRead = count - recordDelimiterLength;
currentPosition = 0;
columnStart = 0;
}
public void skipFieldDelimiter() {
currentPosition += fieldDelimiterLength;
}
public void skipRecordDelimiter() {
currentPosition += recordDelimiterLength;
}
public boolean noDataRecordDelimter() {
return currentPosition > lastIndexToRead;
}
public boolean hasMoreData() {
return (currentPosition <= lastIndexToRead)
|| (currentPosition > lastIndexToRead && !streamEndMeet);
}
public boolean noData() {
return currentPosition >= count;
}
public String getTail() {
return new String(buffer, currentPosition, count - currentPosition);
}
}
}

View File

@@ -1,156 +0,0 @@
// ============================================================================
//
// Talend Community Edition
//
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
//
// ============================================================================
package org.talend.fileprocess.delimited;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
/**
* This factory is used to create a properiate DelimitedDataReader accordiong to the field delimiter(or field separator)
* and record delimiter(or row separator). All the APIs that needed by tFileInputDelimited are provided by
* DelimitedDataReader. And we can not directly creat the instance of subclasses of DelimitedDataReader, in fact, all
* the subclasses are invisible out of this package.<br/>
*
* @author gke
*/
public class DelimitedDataReaderFactory {
/**
* DOC ke Comment method "createDelimitedDataReader".
*
* @param inputStream A java Reader source of delimited data
* @param delimiter the field delimiter(field separator)
* @param recordDelimiter the record delimiter(row separator)
* @param skipEmptyRecords flag for skip the empty records
* @return an instance of DelmitedDataReader
* @throws IOException
*/
public static DelimitedDataReader createDelimitedDataReader(BufferedReader inputStream, String delimiter,
String recordDelimiter, boolean skipEmptyRecords) throws IOException {
int dLength = delimiter.length();
int rLength = recordDelimiter.length();
if (dLength == 1 && rLength == 1) {
return new SimpleDelimitedDataReader(inputStream, delimiter.charAt(0), recordDelimiter.charAt(0),
skipEmptyRecords);
} else if (dLength == 1 && rLength > 1) {
return new ComplexDelimitedDataReader1(inputStream, delimiter.charAt(0), recordDelimiter, skipEmptyRecords);
} else if (dLength > 1 && rLength == 1) {
return new ComplexDelimitedDataReader2(inputStream, delimiter, recordDelimiter.charAt(0), skipEmptyRecords);
} else if (dLength == rLength) {
return new ComplexDelimitedDataReader3(inputStream, delimiter, recordDelimiter, skipEmptyRecords);
} else if (dLength > rLength) {
return new ComplexDelimitedDataReader4(inputStream, delimiter, recordDelimiter, skipEmptyRecords);
} else {
return new ComplexDelimitedDataReader5(inputStream, delimiter, recordDelimiter, skipEmptyRecords);
}
}
/**
* DOC ke Comment method "createDelimitedDataReader".
*
* @param file a delimited file
* @param delimiter the field delimiter(field separator)
* @param recordDelimiter the record delimiter(row separator)
* @param skipEmptyRecords flag for skip the empty records
* @return an instance of DelmitedDataReader
* @throws IOException
*/
public static DelimitedDataReader createDelimitedDataReader(String file, String delimiter, String recordDelimiter,
boolean skipEmptyRecords) throws IOException {
BufferedReader inputStream = new BufferedReader(new FileReader(file));
return createDelimitedDataReader(inputStream, delimiter, recordDelimiter, skipEmptyRecords);
}
/**
* DOC ke Comment method "createDelimitedDataReader".
*
* @param file a delimited file
* @param encoding
* @param delimiter the field delimiter(field separator)
* @param recordDelimiter the record delimiter(row separator)
* @param skipEmptyRecords flag for skip the empty records
* @return an instance of DelmitedDataReader
* @throws IOException
*/
public static DelimitedDataReader createDelimitedDataReader(String file, String encoding, String delimiter,
String recordDelimiter, boolean skipEmptyRecords) throws IOException {
BufferedReader inputStream = new BufferedReader(new InputStreamReader(new FileInputStream(file), encoding));
return createDelimitedDataReader(inputStream, delimiter, recordDelimiter, skipEmptyRecords);
}
/**
* DOC ke Comment method "createDelimitedDataReader".
*
* @param inputStream A java stream source of delimited data
* @param delimiter the field delimiter(field separator)
* @param recordDelimiter the record delimiter(row separator)
* @param skipEmptyRecords flag for skip the empty records
* @return an instance of DelmitedDataReader
* @throws IOException
*/
public static DelimitedDataReader createDelimitedDataReader(InputStream inputStream, String delimiter,
String recordDelimiter, boolean skipEmptyRecords) throws IOException {
BufferedReader inputStreamReader = new BufferedReader(new InputStreamReader(inputStream));
return createDelimitedDataReader(inputStreamReader, delimiter, recordDelimiter, skipEmptyRecords);
}
/**
* DOC ke Comment method "createDelimitedDataReader".
*
* @param inputStream A java stream source of delimited data
* @param encoding
* @param delimiter the field delimiter(field separator)
* @param recordDelimiter the record delimiter(row separator)
* @param skipEmptyRecords flag for skip the empty records
* @return an instance of DelmitedDataReader
* @throws IOException
*/
public static DelimitedDataReader createDelimitedDataReader(InputStream inputStream, String encoding,
String delimiter, String recordDelimiter, boolean skipEmptyRecords) throws IOException {
BufferedReader inputStreamReader = new BufferedReader(new InputStreamReader(inputStream, encoding));
return createDelimitedDataReader(inputStreamReader, delimiter, recordDelimiter, skipEmptyRecords);
}
}

View File

@@ -1,358 +0,0 @@
// ============================================================================
//
// Talend Community Edition
//
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
//
// ============================================================================
package org.talend.fileprocess.delimited;
import java.io.BufferedReader;
import java.io.IOException;
import java.text.NumberFormat;
/**
* A stream based parser for parsing delimited text data from a file or a
* stream. This works only when the field and record delimiter are both single
* character<br/>
*
* @author gke
*/
public class SimpleDelimitedDataReader extends DelimitedDataReader {
private boolean useCustomRecordDelimiter = false;
private StreamBuffer streamBuffer = new StreamBuffer();
private boolean hasMoreData = true;
private char lastLetter = '\0';
private char fieldDelimiter;
private char recordDelimiter;
public SimpleDelimitedDataReader(BufferedReader inputStream, char delimiter,
char recordDelimiter, boolean skipEmptyRecords) {
super(inputStream, skipEmptyRecords);
fieldDelimiter = delimiter;
this.recordDelimiter = recordDelimiter;
if (recordDelimiter != '\n' && recordDelimiter != '\r') {
useCustomRecordDelimiter = true;
}
}
@Override
public boolean readRecord() throws IOException {
checkClosed();
columnsCount = 0;
hasReadRecord = false;
// check to see if we've already found the end of data
if (hasMoreData) {
// loop over the data stream until the end of data is found
// or the end of the record is found
while (hasMoreData && !hasReadRecord) {
if (streamBuffer.currentPosition == streamBuffer.count) {
checkDataLength();
} else {
// grab the current letter as a char
char currentLetter = streamBuffer.buffer[streamBuffer.currentPosition];
if (currentLetter == fieldDelimiter) {
// we encountered a column with no data, so
// just send the end column
lastLetter = currentLetter;
endColumn();
} else if (useCustomRecordDelimiter
&& currentLetter == recordDelimiter) {
if (columnsCount > 0 || !skipEmptyRecord) {
endColumn();
endRecord();
}
lastLetter = currentLetter;
} else if (!useCustomRecordDelimiter
&& (currentLetter == '\r' || currentLetter == '\n')) {
if ((!skipEmptyRecord || columnsCount > 0)
&& ((currentLetter == '\r' || lastLetter != '\r'))) {
endColumn();
endRecord();
}
lastLetter = currentLetter;
} else {
// since the letter wasn't a special letter, this
// will be the first letter of our current column
startedRow = true;
streamBuffer.columnStart = streamBuffer.currentPosition;
boolean firstLoop = true;
while (hasMoreData && startedRow) {
if (!firstLoop
&& streamBuffer.currentPosition == streamBuffer.count) {
checkDataLength();
} else {
if (!firstLoop) {
// grab the current letter as a char
currentLetter = streamBuffer.buffer[streamBuffer.currentPosition];
if (currentLetter == fieldDelimiter) {
endColumn();
} else if ((!useCustomRecordDelimiter && (currentLetter == '\r' || currentLetter == '\n'))
|| (useCustomRecordDelimiter && currentLetter == recordDelimiter)) {
endColumn();
endRecord();
}
} else {
firstLoop = false;
}
lastLetter = currentLetter;
if (startedRow) {
streamBuffer.currentPosition++;
if (safetySwitch
&& streamBuffer.currentPosition
- streamBuffer.columnStart
+ columnBuffer.position > StaticSettings.MAX_SIZE_FOR_SAFTY) {
close();
throw new IOException(
"Maximum column length of 100,000 exceeded in column "
+ NumberFormat
.getIntegerInstance()
.format(
columnsCount)
+ " in record "
+ NumberFormat
.getIntegerInstance()
.format(
currentRecord)
+ ". Set the SafetySwitch property to false"
+ " if you're expecting column lengths greater than 100,000 characters to"
+ " avoid this error.");
}
}
} // end else
}
}
if (hasMoreData) {
streamBuffer.currentPosition++;
}
} // end else
}
// check to see if we hit the end of the file
// without processing the current record
if (startedRow || lastLetter == fieldDelimiter) {
endColumn();
endRecord();
}
}
return hasReadRecord;
}
/**
* @exception IOException
* Thrown if an error occurs while reading data from the
* source stream.
*/
private void checkDataLength() throws IOException {
updateCurrentValue();
try {
streamBuffer.count = inputStream.read(streamBuffer.buffer, 0,
streamBuffer.buffer.length);
} catch (IOException ex) {
close();
throw ex;
}
// if no more data could be found, set flag stating that
// the end of the data was found
if (streamBuffer.count == -1) {
hasMoreData = false;
}
streamBuffer.currentPosition = 0;
streamBuffer.columnStart = 0;
}
/**
* @exception IOException
* Thrown if a very rare extreme exception occurs during
* parsing, normally resulting from improper data format.
*/
private void endColumn() throws IOException {
String currentValue = "";
// must be called before setting startedColumn = false
if (startedRow) {
if (columnBuffer.position == 0) {
if (streamBuffer.columnStart < streamBuffer.currentPosition) {
currentValue = new String(streamBuffer.buffer,
streamBuffer.columnStart,
streamBuffer.currentPosition
- streamBuffer.columnStart);
}
} else {
updateCurrentValue();
currentValue = new String(columnBuffer.buffer, 0,
columnBuffer.position);
}
}
columnBuffer.position = 0;
startedRow = false;
if (columnsCount >= StaticSettings.MAX_SIZE_FOR_SAFTY && safetySwitch) {
close();
throw new IOException(
"Maximum column count of 100,000 exceeded in record "
+ NumberFormat.getIntegerInstance().format(
currentRecord)
+ ". Set the SafetySwitch property to false"
+ " if you're expecting more than 100,000 columns per record to"
+ " avoid this error.");
}
// check to see if our current holder array for
// column chunks is still big enough to handle another
// column chunk
if (columnsCount == values.length) {
// holder array needs to grow to be able to hold another column
int newLength = values.length * 2;
String[] holder = new String[newLength];
System.arraycopy(values, 0, holder, 0, values.length);
values = holder;
}
values[columnsCount] = currentValue;
currentValue = "";
columnsCount++;
}
private void updateCurrentValue() {
if (startedRow
&& streamBuffer.columnStart < streamBuffer.currentPosition) {
if (columnBuffer.buffer.length - columnBuffer.position < streamBuffer.currentPosition
- streamBuffer.columnStart) {
int newLength = columnBuffer.buffer.length
+ Math.max(streamBuffer.currentPosition
- streamBuffer.columnStart,
columnBuffer.buffer.length);
char[] holder = new char[newLength];
System.arraycopy(columnBuffer.buffer, 0, holder, 0,
columnBuffer.position);
columnBuffer.buffer = holder;
}
System.arraycopy(streamBuffer.buffer, streamBuffer.columnStart,
columnBuffer.buffer, columnBuffer.position,
streamBuffer.currentPosition - streamBuffer.columnStart);
columnBuffer.position += streamBuffer.currentPosition
- streamBuffer.columnStart;
}
streamBuffer.columnStart = streamBuffer.currentPosition + 1;
}
@Override
protected void close(boolean closing) {
if (!closed) {
if (closing) {
streamBuffer.buffer = null;
columnBuffer.buffer = null;
}
try {
if (initialized) {
inputStream.close();
}
} catch (Exception e) {
// just eat the exception
}
inputStream = null;
closed = true;
}
}
/**
* A buffer structure that used to load data from stream for processing.
*
* @author gke
*/
class StreamBuffer {
char[] buffer;
int currentPosition;
// count indicates how much usable data has been read into the stream,
// which will not always be as long as Buffer.Length.
int count;
// columnStart is the position in the buffer when the
// current column was started or the last time data
// was moved out to the column buffer.
int columnStart;
public StreamBuffer() {
buffer = new char[StaticSettings.MAX_BUFFER_SIZE];
currentPosition = 0;
count = 0;
columnStart = 0;
}
}
}

View File

@@ -19,7 +19,7 @@
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
//
// ============================================================================
package org.talend.fileprocess.delimited;
package org.talend.fileprocess.delimited.patched;
import java.io.BufferedReader;
import java.io.IOException;

View File

@@ -19,7 +19,7 @@
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
//
// ============================================================================
package org.talend.fileprocess.delimited;
package org.talend.fileprocess.delimited.patched;
import java.io.BufferedReader;
import java.io.IOException;