Compare commits
2 Commits
master
...
mbasiuk/TP
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4728be52a8 | ||
|
|
22b8a39c46 |
53
PATCH_RELEASE_NOTE.md
Normal file
53
PATCH_RELEASE_NOTE.md
Normal file
@@ -0,0 +1,53 @@
|
||||
---
|
||||
version: 7.1.1
|
||||
module: https://talend.poolparty.biz/coretaxonomy/42
|
||||
product:
|
||||
- https://talend.poolparty.biz/coretaxonomy/23
|
||||
---
|
||||
|
||||
# TPS-4553
|
||||
|
||||
| Info | Value |
|
||||
| ---------------- | ---------------- |
|
||||
| Patch Name | Patch\_20201224\_TPS-4553\_v1-7.1.1 |
|
||||
| Release Date | 2020-12-24 |
|
||||
| Target Version | 20181026\_1147-V7.1.1 |
|
||||
| Product affected | Talend Studio |
|
||||
|
||||
## Introduction
|
||||
|
||||
This is a self-contained patch.
|
||||
|
||||
**NOTE**: For information on how to obtain this patch, reach out to your Support contact at Talend.
|
||||
|
||||
## Fixed issues
|
||||
|
||||
This patch contains the following fixes:
|
||||
|
||||
- TPS-4553 [7.1.1] tFileInputPositional inappropriately handles CR(\r) as a row separator (TDI-45260)
|
||||
|
||||
## Prerequisites
|
||||
|
||||
Consider the following requirements for your system:
|
||||
|
||||
- Talend Studio 7.1.1 must be installed.
|
||||
|
||||
## Installation
|
||||
- From the Talend Studio 7.1.1 installation folder, make a copy of the following files somewhere safe:
|
||||
- {Talend_Studio_path}/plugins/org.talend.designer.components.localprovider_7.1.1.20181026_1147/components/tFileInputPositional/tFileInputPositional_begin.javajet
|
||||
- {Talend_Studio_path}/plugins/org.talend.designer.components.localprovider_7.1.1.20181026_1147/components/tFileInputPositional/tFileInputPositional_java.xml
|
||||
- {Talend_Studio_path}/configuration/.m2/repository/org/talend/components/lib/talend_file_enhanced/1.1-patch/talend_file_enhanced-1.1-patch.jar
|
||||
|
||||
- Unzip content of the patch zip onto your Talend Studio 7.1.1 folder.
|
||||
|
||||
## Uninstallation
|
||||
|
||||
- Replace the files overridden by the patch by the copy you made before unzipping.
|
||||
|
||||
## Affected files for this patch
|
||||
|
||||
The following files are installed by this patch:
|
||||
|
||||
- {Talend_Studio_path}/plugins/org.talend.designer.components.localprovider_7.1.1.20181026_1147/components/tFileInputPositional/tFileInputPositional_begin.javajet
|
||||
- {Talend_Studio_path}/plugins/org.talend.designer.components.localprovider_7.1.1.20181026_1147/components/tFileInputPositional/tFileInputPositional_java.xml
|
||||
- {Talend_Studio_path}/configuration/.m2/repository/org/talend/components/lib/talend_file_enhanced/1.1-patch/talend_file_enhanced-1.1-patch.jar
|
||||
@@ -4,7 +4,7 @@
|
||||
<groupId>org.talend.components.lib</groupId>
|
||||
<artifactId>talend_file_enhanced</artifactId>
|
||||
<name>talend_file_enhanced</name>
|
||||
<version>1.1</version>
|
||||
<version>1.1-patch</version>
|
||||
|
||||
<properties>
|
||||
<talend.nexus.url>https://artifacts-oss.talend.com</talend.nexus.url>
|
||||
|
||||
@@ -1,54 +0,0 @@
|
||||
package org.talend.fileprocess;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
public class CopyInputStreamUtil {
|
||||
|
||||
private InputStream _is;
|
||||
|
||||
private ByteArrayOutputStream _copy = new ByteArrayOutputStream();
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public CopyInputStreamUtil(InputStream is) {
|
||||
_is = is;
|
||||
|
||||
try {
|
||||
copy();
|
||||
} catch (IOException ex) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
private int copy() throws IOException {
|
||||
int read = 0;
|
||||
int chunk = 0;
|
||||
byte[] data = new byte[256];
|
||||
|
||||
while (-1 != (chunk = _is.read(data))) {
|
||||
read += data.length;
|
||||
_copy.write(data, 0, chunk);
|
||||
}
|
||||
|
||||
return read;
|
||||
}
|
||||
|
||||
public InputStream getCopy() {
|
||||
return (InputStream) new ByteArrayInputStream(_copy.toByteArray());
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
FileInputStream in = new FileInputStream("C:/Documents and Settings/Administrator/Test.txt");
|
||||
CopyInputStreamUtil util = new CopyInputStreamUtil(in);
|
||||
InputStream copy = util.getCopy();
|
||||
InputStream copy2 = util.getCopy();
|
||||
System.out.println((char) copy.read());
|
||||
copy.close();
|
||||
System.out.println((char) copy2.read());
|
||||
}
|
||||
}
|
||||
@@ -1,391 +0,0 @@
|
||||
// ============================================================================
|
||||
//
|
||||
// Talend Community Edition
|
||||
//
|
||||
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
|
||||
//
|
||||
// This library is free software; you can redistribute it and/or
|
||||
// modify it under the terms of the GNU Lesser General Public
|
||||
// License as published by the Free Software Foundation; either
|
||||
// version 2.1 of the License.
|
||||
//
|
||||
// This library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
// Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program; if not, write to the Free Software
|
||||
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
|
||||
//
|
||||
// ============================================================================
|
||||
package org.talend.fileprocess;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.util.Iterator;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* FileInputDelimited is dedicated to Talend's tFileInputDelimited component. It wraps all parameters in
|
||||
* tFileInputDelimted, so it makes the generated code much easier and cleaner. This class is not recommended to use in
|
||||
* other circumstance.<br/>
|
||||
*
|
||||
* @author gke
|
||||
*/
|
||||
public class FileInputDelimited {
|
||||
|
||||
private TOSDelimitedReader delimitedDataReader = null;
|
||||
|
||||
Iterator<Integer> random = null;
|
||||
|
||||
private long loopCount = 0;
|
||||
|
||||
private boolean randomSwitch = false;
|
||||
|
||||
private int current = 0;
|
||||
|
||||
private boolean countNeedAdjust = false;
|
||||
|
||||
/**
|
||||
* This constructor is only for compatibility with the old usecase.(Before add the function support split Record.)
|
||||
*
|
||||
*/
|
||||
public FileInputDelimited(String file, String encoding, String fieldSeparator, String rowSeparator, boolean skipEmptyRow,
|
||||
int header, int footer, int limit, int random) throws IOException {
|
||||
this(file, encoding, fieldSeparator, rowSeparator, skipEmptyRow, header, footer, limit, random, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* This constructor is only for compatibility with the old usecase.(Before add the function support split Record.)
|
||||
*
|
||||
*/
|
||||
public FileInputDelimited(java.io.InputStream is, String encoding, String fieldSeparator, String rowSeparator,
|
||||
boolean skipEmptyRow, int header, int footer, int limit, int random) throws IOException {
|
||||
this(is, encoding, fieldSeparator, rowSeparator, skipEmptyRow, header, footer, limit, random, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* The constructor's parameter wraps all parameters' value, and a pretreatment was made according the value of
|
||||
* header, footer, limit and random.
|
||||
*
|
||||
* @param is
|
||||
* @param encoding
|
||||
* @param fieldSeparator
|
||||
* @param rowSeparator
|
||||
* @param skipEmptyRow
|
||||
* @param header
|
||||
* @param footer
|
||||
* @param limit
|
||||
* @param random
|
||||
* @throws IOException
|
||||
*/
|
||||
public FileInputDelimited(java.io.InputStream is, String encoding, String fieldSeparator, String rowSeparator,
|
||||
boolean skipEmptyRow, int header, int footer, int limit, int random, boolean splitRecord) throws IOException {
|
||||
if (header < 0) {
|
||||
header = 0;
|
||||
}
|
||||
if (footer < 0) {
|
||||
footer = 0;
|
||||
}
|
||||
if (random != 0 && limit != 0) {
|
||||
this.delimitedDataReader = new TOSDelimitedReader(is, encoding, fieldSeparator, rowSeparator, skipEmptyRow);
|
||||
this.delimitedDataReader.setSplitRecord(splitRecord);
|
||||
|
||||
this.delimitedDataReader.skipHeaders(header);
|
||||
if (random < 0 && footer == 0) {
|
||||
if (limit > 0) {
|
||||
this.loopCount = limit;
|
||||
} else {
|
||||
this.loopCount = -1;
|
||||
}
|
||||
this.countNeedAdjust = true;
|
||||
} else {
|
||||
// for stream,not support the footer
|
||||
int count = (int) this.delimitedDataReader.getAvailableRowCount(footer);
|
||||
this.delimitedDataReader.close();
|
||||
this.delimitedDataReader = new TOSDelimitedReader(is, encoding, fieldSeparator, rowSeparator, skipEmptyRow);
|
||||
this.delimitedDataReader.setSplitRecord(splitRecord);
|
||||
|
||||
this.delimitedDataReader.skipHeaders(header);
|
||||
if (limit > 0 && random < 0) {
|
||||
this.loopCount = limit < count ? limit : count;
|
||||
} else if (limit < 0 && random > 0) {
|
||||
if (random >= count) {
|
||||
this.loopCount = count;
|
||||
} else {
|
||||
setRandoms(random, count);
|
||||
this.loopCount = random;
|
||||
}
|
||||
} else if (limit > 0 && random > 0) {
|
||||
if (random >= limit) {
|
||||
random = limit;
|
||||
}
|
||||
if (random >= count) {
|
||||
this.loopCount = count;
|
||||
} else {
|
||||
setRandoms(random, count);
|
||||
this.loopCount = random;
|
||||
}
|
||||
} else {
|
||||
this.loopCount = count;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
loopCount = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The constructor's parameter wraps all parameters' value, and a pretreatment was made according the value of
|
||||
* header, footer, limit and random.
|
||||
*
|
||||
* @param file
|
||||
* @param encoding
|
||||
* @param fieldSeparator
|
||||
* @param rowSeparator
|
||||
* @param skipEmptyRow
|
||||
* @param header
|
||||
* @param footer
|
||||
* @param limit
|
||||
* @param random
|
||||
* @throws IOException
|
||||
*/
|
||||
public FileInputDelimited(String file, String encoding, String fieldSeparator, String rowSeparator, boolean skipEmptyRow,
|
||||
int header, int footer, int limit, int random, boolean splitRecord) throws IOException {
|
||||
if (header < 0) {
|
||||
header = 0;
|
||||
}
|
||||
if (footer < 0) {
|
||||
footer = 0;
|
||||
}
|
||||
if (random != 0 && limit != 0) {
|
||||
this.delimitedDataReader = new TOSDelimitedReader(file, encoding, fieldSeparator, rowSeparator, skipEmptyRow);
|
||||
this.delimitedDataReader.setSplitRecord(splitRecord);
|
||||
|
||||
this.delimitedDataReader.skipHeaders(header);
|
||||
if (random < 0 && footer == 0) {
|
||||
if (limit > 0) {
|
||||
this.loopCount = limit;
|
||||
} else {
|
||||
this.loopCount = -1;
|
||||
}
|
||||
this.countNeedAdjust = true;
|
||||
} else {
|
||||
int count = (int) this.delimitedDataReader.getAvailableRowCount(footer);
|
||||
this.delimitedDataReader.close();
|
||||
this.delimitedDataReader = new TOSDelimitedReader(file, encoding, fieldSeparator, rowSeparator, skipEmptyRow);
|
||||
this.delimitedDataReader.setSplitRecord(splitRecord);
|
||||
|
||||
this.delimitedDataReader.skipHeaders(header);
|
||||
if (limit > 0 && random < 0) {
|
||||
this.loopCount = limit < count ? limit : count;
|
||||
} else if (limit < 0 && random > 0) {
|
||||
if (random >= count) {
|
||||
this.loopCount = count;
|
||||
} else {
|
||||
setRandoms(random, count);
|
||||
this.loopCount = random;
|
||||
}
|
||||
} else if (limit > 0 && random > 0) {
|
||||
if (random >= limit) {
|
||||
random = limit;
|
||||
}
|
||||
if (random >= count) {
|
||||
this.loopCount = count;
|
||||
} else {
|
||||
setRandoms(random, count);
|
||||
this.loopCount = random;
|
||||
}
|
||||
} else {
|
||||
this.loopCount = count;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
loopCount = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* In order to support InGest to parse String as content directly
|
||||
*
|
||||
* @param content
|
||||
* @param fieldSeparator
|
||||
* @param rowSeparator
|
||||
* @param skipEmptyRow
|
||||
* @param header
|
||||
* @param footer
|
||||
* @param limit
|
||||
* @param random
|
||||
* @throws IOException
|
||||
*/
|
||||
public FileInputDelimited(String content, String fieldSeparator, String rowSeparator, boolean skipEmptyRow, int header,
|
||||
int footer, int limit, int random, boolean splitRecord) throws IOException {
|
||||
if (header < 0) {
|
||||
header = 0;
|
||||
}
|
||||
if (footer < 0) {
|
||||
footer = 0;
|
||||
}
|
||||
if (random != 0 && limit != 0) {
|
||||
StringReader stringReaderOne = new StringReader(content);
|
||||
this.delimitedDataReader = new TOSDelimitedReader(stringReaderOne, fieldSeparator, rowSeparator, skipEmptyRow);
|
||||
this.delimitedDataReader.setSplitRecord(splitRecord);
|
||||
|
||||
this.delimitedDataReader.skipHeaders(header);
|
||||
if (random < 0 && footer == 0) {
|
||||
if (limit > 0) {
|
||||
this.loopCount = limit;
|
||||
} else {
|
||||
this.loopCount = -1;
|
||||
}
|
||||
this.countNeedAdjust = true;
|
||||
} else {
|
||||
int count = (int) this.delimitedDataReader.getAvailableRowCount(footer);
|
||||
this.delimitedDataReader.close();
|
||||
StringReader stringReaderTwo = new StringReader(content);
|
||||
this.delimitedDataReader = new TOSDelimitedReader(stringReaderTwo, fieldSeparator, rowSeparator, skipEmptyRow);
|
||||
this.delimitedDataReader.setSplitRecord(splitRecord);
|
||||
|
||||
this.delimitedDataReader.skipHeaders(header);
|
||||
if (limit > 0 && random < 0) {
|
||||
this.loopCount = limit < count ? limit : count;
|
||||
} else if (limit < 0 && random > 0) {
|
||||
if (random >= count) {
|
||||
this.loopCount = count;
|
||||
} else {
|
||||
setRandoms(random, count);
|
||||
this.loopCount = random;
|
||||
}
|
||||
} else if (limit > 0 && random > 0) {
|
||||
if (random >= limit) {
|
||||
random = limit;
|
||||
}
|
||||
if (random >= count) {
|
||||
this.loopCount = count;
|
||||
} else {
|
||||
setRandoms(random, count);
|
||||
this.loopCount = random;
|
||||
}
|
||||
} else {
|
||||
this.loopCount = count;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
loopCount = 0;
|
||||
}
|
||||
}
|
||||
|
||||
private void setRandoms(int random, int count) {
|
||||
this.randomSwitch = true;
|
||||
Set<Integer> ranSet = new java.util.TreeSet<Integer>();
|
||||
Random ran = new java.util.Random();
|
||||
while (ranSet.size() < random) {
|
||||
ranSet.add(ran.nextInt(count));
|
||||
}
|
||||
this.random = ranSet.iterator();
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip to the next record, get ready before get new record's value.
|
||||
*
|
||||
* @return whether a next record is available.
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean nextRecord() throws IOException {
|
||||
if (this.countNeedAdjust) {
|
||||
if (this.delimitedDataReader.getProcessedRecordCount() == loopCount) {
|
||||
return false;
|
||||
}
|
||||
return this.delimitedDataReader.readRecord();
|
||||
} else {
|
||||
if (++current > loopCount) {
|
||||
return false;
|
||||
}
|
||||
if (randomSwitch) {
|
||||
if (!this.random.hasNext()) {
|
||||
return false;
|
||||
}
|
||||
int index = this.random.next();
|
||||
do {
|
||||
if (!this.delimitedDataReader.readRecord()) {
|
||||
return false;
|
||||
}
|
||||
} while (this.delimitedDataReader.getProcessedRecordCount() <= index);
|
||||
return true;
|
||||
} else {
|
||||
return this.delimitedDataReader.readRecord();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* See DelimitedDataReader.get(columnIndex)
|
||||
*
|
||||
* @param columnIndex
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public String get(int columnIndex) throws IOException {
|
||||
return this.delimitedDataReader.get(columnIndex);
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the delimitedDataReader if delimitedDataReader is not null.
|
||||
*/
|
||||
public void close() {
|
||||
if (this.delimitedDataReader != null) {
|
||||
this.delimitedDataReader.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use getLongRowNumber instead of this
|
||||
*
|
||||
* @return number of rows get by tFileInputDelimited
|
||||
*/
|
||||
@Deprecated
|
||||
public int getRowNumber() {
|
||||
if (this.countNeedAdjust) {
|
||||
return (int) this.delimitedDataReader.getProcessedRecordCount();
|
||||
} else {
|
||||
return (int) this.loopCount;
|
||||
}
|
||||
}
|
||||
|
||||
public long getLongRowNumber() {
|
||||
if (this.countNeedAdjust) {
|
||||
return this.delimitedDataReader.getProcessedRecordCount();
|
||||
} else {
|
||||
return this.loopCount;
|
||||
}
|
||||
}
|
||||
|
||||
public int getColumnsCountOfCurrentRow() throws IOException {
|
||||
return this.delimitedDataReader.getAvailableColumnsCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* check the first limit number of records, and fetch the max columns, this is only for GUI wizard.
|
||||
*
|
||||
*/
|
||||
public static int getMaxColumnCount(String fileName, String encoding, String fieldDelimiter, String recordDelimiter,
|
||||
boolean needSkipEmptyRecord, boolean splitRecord, int headerRows, int limit) throws IOException {
|
||||
TOSDelimitedReader tosDelimitedReader = new TOSDelimitedReader(fileName, encoding, fieldDelimiter, recordDelimiter,
|
||||
needSkipEmptyRecord);
|
||||
tosDelimitedReader.setSplitRecord(splitRecord);
|
||||
tosDelimitedReader.skipHeaders(headerRows);
|
||||
int result = 0;
|
||||
for (int i = 0; i < limit && tosDelimitedReader.readRecord(); i++) {
|
||||
int currentColumnsCount = tosDelimitedReader.getAvailableColumnsCount();
|
||||
if (currentColumnsCount > result) {
|
||||
result = currentColumnsCount;
|
||||
}
|
||||
}
|
||||
|
||||
tosDelimitedReader.close();
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -1,815 +0,0 @@
|
||||
// ============================================================================
|
||||
//
|
||||
// Talend Community Edition
|
||||
//
|
||||
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
|
||||
//
|
||||
// This library is free software; you can redistribute it and/or
|
||||
// modify it under the terms of the GNU Lesser General Public
|
||||
// License as published by the Free Software Foundation; either
|
||||
// version 2.1 of the License.
|
||||
//
|
||||
// This library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
// Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program; if not, write to the Free Software
|
||||
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
|
||||
//
|
||||
// ============================================================================
|
||||
package org.talend.fileprocess;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.Reader;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author xtan
|
||||
*
|
||||
*/
|
||||
public class TOSDelimitedReader {
|
||||
|
||||
private boolean debug = false;
|
||||
|
||||
/* --------------1------------------- */
|
||||
private BufferedReader inputStream = null;
|
||||
|
||||
private StreamBuffer streamBuffer = null;
|
||||
|
||||
private ColumnBuffer4Joiner columnBuffer = null;
|
||||
|
||||
/* --------------2------------------- */
|
||||
|
||||
private String[] values = new String[StaticSettings.INITIAL_COLUMN_COUNT];
|
||||
|
||||
private int columnsCount = 0;
|
||||
|
||||
private long currentRecord = 0;
|
||||
|
||||
private boolean skipEmptyRecord = false;
|
||||
|
||||
/* --------------3------------------- */
|
||||
private boolean hasReadRecord = false;
|
||||
|
||||
private boolean autoReallocateForHuge = true;
|
||||
|
||||
private boolean initialized = false;
|
||||
|
||||
private boolean closed = false;
|
||||
|
||||
/* --------------4------------------- */
|
||||
|
||||
public boolean splitRecord = false;
|
||||
|
||||
/* --------------5------------------- */
|
||||
private int header = 0;
|
||||
|
||||
/*--------------6--------------------- */
|
||||
|
||||
public TOSDelimitedReader(java.io.InputStream is, String encoding, String fieldDelimiter, String recordDelimiter,
|
||||
boolean needSkipEmptyRecord) throws IOException {
|
||||
|
||||
UnicodeReader inputStreamReader = new UnicodeReader(is, encoding);
|
||||
|
||||
init(inputStreamReader, fieldDelimiter, recordDelimiter, needSkipEmptyRecord);
|
||||
}
|
||||
|
||||
public TOSDelimitedReader(String fileName, String encoding, String fieldDelimiter, String recordDelimiter,
|
||||
boolean needSkipEmptyRecord) throws IOException {
|
||||
if (fileName == null || fieldDelimiter == null || recordDelimiter == null) {
|
||||
throw new IllegalArgumentException("Parameter can't be null.");
|
||||
}
|
||||
|
||||
UnicodeReader inputStreamReader = new UnicodeReader(new FileInputStream(fileName), encoding);
|
||||
|
||||
init(inputStreamReader, fieldDelimiter, recordDelimiter, needSkipEmptyRecord);
|
||||
|
||||
}
|
||||
|
||||
public TOSDelimitedReader(Reader reader, String fieldDelimiter, String recordDelimiter, boolean needSkipEmptyRecord)
|
||||
throws IOException {
|
||||
if (reader == null || fieldDelimiter == null || recordDelimiter == null) {
|
||||
throw new IllegalArgumentException("Parameter can't be null.");
|
||||
}
|
||||
|
||||
init(reader, fieldDelimiter, recordDelimiter, needSkipEmptyRecord);
|
||||
}
|
||||
|
||||
public void init(Reader reader, String fieldDelimiter, String recordDelimiter, boolean needSkipEmptyRecord)
|
||||
throws IOException {
|
||||
if (reader == null || fieldDelimiter == null || recordDelimiter == null) {
|
||||
throw new IllegalArgumentException("Parameter can't be null.");
|
||||
}
|
||||
|
||||
inputStream = new BufferedReader(reader);
|
||||
|
||||
columnBuffer = new ColumnBuffer4Joiner();
|
||||
streamBuffer = new StreamBuffer(fieldDelimiter, recordDelimiter);
|
||||
|
||||
skipEmptyRecord = needSkipEmptyRecord;
|
||||
|
||||
initialized = true;
|
||||
|
||||
}
|
||||
|
||||
public boolean readRecord() throws IOException {
|
||||
if (splitRecord) {
|
||||
return readRecord_SplitRecord();
|
||||
|
||||
} else {
|
||||
return readRecord_SplitField();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean readRecord_SplitField() throws IOException {
|
||||
|
||||
checkClosed();
|
||||
|
||||
boolean in = false;
|
||||
|
||||
hasReadRecord = false;
|
||||
|
||||
columnsCount = 0;
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition;
|
||||
|
||||
while (streamBuffer.hasMoreData() && !hasReadRecord) {
|
||||
|
||||
if (debug) {
|
||||
System.out.println(streamBuffer);
|
||||
}
|
||||
in = true;
|
||||
|
||||
if (streamBuffer.needJoinReadNextBuffer()) {
|
||||
joinAndRead();
|
||||
}
|
||||
|
||||
if (streamBuffer.isStartFieldDelimited()) {
|
||||
endColumn();
|
||||
streamBuffer.skipFieldDelimiter();
|
||||
} else if (streamBuffer.isStartRecordDelimited()) {
|
||||
endColumn();
|
||||
endRecord();
|
||||
if (!streamBuffer.hasMoreData()) {
|
||||
streamBuffer.fileEndWithRecordDelimiter = true;
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
streamBuffer.currentPosition++;
|
||||
|
||||
// for checking one column can support the max number of chars
|
||||
if (!autoReallocateForHuge
|
||||
&& streamBuffer.currentPosition - streamBuffer.columnStart + columnBuffer.position > StaticSettings.MAX_CHARS_IN_ONE_COLUMN) {
|
||||
close();
|
||||
throw new IOException("The maximum chars of one column is " + StaticSettings.MAX_CHARS_IN_ONE_COLUMN
|
||||
+ ". There is over this limit.");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (!hasReadRecord) {
|
||||
if (!StaticSettings.ignoreFileEndWithOneRecordDelimiter && streamBuffer.fileEndWithRecordDelimiter) {// aaa;
|
||||
// bbb
|
||||
// #111
|
||||
// ;
|
||||
// 222#
|
||||
streamBuffer.fileEndWithRecordDelimiter = false;
|
||||
endColumn();
|
||||
endRecord();
|
||||
} else if (in) {// aaa;bbb#111;222
|
||||
endColumn();
|
||||
endRecord();
|
||||
}
|
||||
}
|
||||
|
||||
return hasReadRecord;
|
||||
|
||||
}
|
||||
|
||||
private boolean readRecord_SplitRecord() throws IOException {
|
||||
|
||||
checkClosed();
|
||||
|
||||
boolean in = false;
|
||||
|
||||
hasReadRecord = false;
|
||||
|
||||
columnsCount = 0;
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition;
|
||||
|
||||
while (streamBuffer.hasMoreData() && !hasReadRecord) {
|
||||
|
||||
if (debug) {
|
||||
System.out.println(streamBuffer);
|
||||
}
|
||||
in = true;
|
||||
|
||||
if (streamBuffer.needJoinReadNextBuffer()) {
|
||||
joinAndRead();
|
||||
}
|
||||
|
||||
if (streamBuffer.isStartRecordDelimited()) {
|
||||
endColumn();
|
||||
endRecord();
|
||||
if (!streamBuffer.hasMoreData()) {
|
||||
streamBuffer.fileEndWithRecordDelimiter = true;
|
||||
}
|
||||
|
||||
} else if (streamBuffer.isStartFieldDelimited()) {
|
||||
endColumn();
|
||||
streamBuffer.skipFieldDelimiter();
|
||||
} else {
|
||||
|
||||
streamBuffer.currentPosition++;
|
||||
|
||||
// for checking one column can support the max number of chars
|
||||
if (!autoReallocateForHuge
|
||||
&& streamBuffer.currentPosition - streamBuffer.columnStart + columnBuffer.position > StaticSettings.MAX_CHARS_IN_ONE_COLUMN) {
|
||||
close();
|
||||
throw new IOException("The maximum chars of one column is " + StaticSettings.MAX_CHARS_IN_ONE_COLUMN
|
||||
+ ". There is over this limit.");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (!hasReadRecord) {
|
||||
if (!StaticSettings.ignoreFileEndWithOneRecordDelimiter && streamBuffer.fileEndWithRecordDelimiter) {// aaa;
|
||||
// bbb
|
||||
// #111
|
||||
// ;
|
||||
// 222#
|
||||
streamBuffer.fileEndWithRecordDelimiter = false;
|
||||
endColumn();
|
||||
endRecord();
|
||||
} else if (in) {// aaa;bbb#111;222
|
||||
endColumn();
|
||||
endRecord();
|
||||
}
|
||||
}
|
||||
|
||||
return hasReadRecord;
|
||||
}
|
||||
|
||||
private void joinAndRead() throws IOException {
|
||||
columnBuffer.saveCharInJoiner();
|
||||
streamBuffer.joinReadNextBuffer();
|
||||
}
|
||||
|
||||
private void endRecord() throws IOException {
|
||||
streamBuffer.skipRecordDelimiter();
|
||||
if(this.header>0){
|
||||
this.header--;
|
||||
}else if (skipEmptyRecord) {
|
||||
if (columnsCount == 0 || (columnsCount == 1 && values[0].equals(""))) {
|
||||
columnsCount = 0;// reset the columnsCount = 0 is a must
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// this flag is used as a loop exit condition during parsing
|
||||
hasReadRecord = true;
|
||||
|
||||
currentRecord++;
|
||||
}
|
||||
|
||||
/**
|
||||
* @exception IOException Thrown if a very rare extreme exception occurs during parsing, normally resulting from
|
||||
* improper data format.
|
||||
*/
|
||||
private void endColumn() throws IOException {
|
||||
String currentValue = "";
|
||||
|
||||
if (columnBuffer.position == 0) {
|
||||
currentValue = new String(streamBuffer.buffer, streamBuffer.columnStart, streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart);
|
||||
} else {
|
||||
// add the areadly datas in buffer
|
||||
columnBuffer.saveCharInJoiner();
|
||||
currentValue = new String(columnBuffer.buffer, 0, columnBuffer.position);
|
||||
}
|
||||
|
||||
columnBuffer.position = 0;
|
||||
|
||||
// for checking one record can support the max number of columns
|
||||
if (!autoReallocateForHuge && columnsCount >= StaticSettings.MAX_COLUMNS_IN_ONE_RECORD) {
|
||||
close();
|
||||
throw new IOException("The maximum column number of one record is " + StaticSettings.MAX_COLUMNS_IN_ONE_RECORD
|
||||
+ ". There is over this limit.");
|
||||
}
|
||||
|
||||
if (columnsCount == values.length) {
|
||||
|
||||
int newLength = values.length * 2;
|
||||
|
||||
String[] holder = new String[newLength];
|
||||
|
||||
System.arraycopy(values, 0, holder, 0, values.length);
|
||||
|
||||
values = holder;
|
||||
}
|
||||
|
||||
values[columnsCount] = currentValue;
|
||||
|
||||
columnsCount++;
|
||||
}
|
||||
|
||||
private void close(boolean closing) {
|
||||
if (!closed) {
|
||||
if (closing) {
|
||||
streamBuffer.buffer = null;
|
||||
columnBuffer.buffer = null;
|
||||
}
|
||||
|
||||
try {
|
||||
if (initialized) {
|
||||
inputStream.close();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// just ignore the exception
|
||||
}
|
||||
|
||||
inputStream = null;
|
||||
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* when read a new record or get content of the column, there should check the stream first.
|
||||
*/
|
||||
private void checkClosed() throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException("This instance of the DelimitedFileReader class has already been closed.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get one column value with a given column index for the current record.
|
||||
*/
|
||||
public String get(int columnIndex) throws IOException {
|
||||
checkClosed();
|
||||
|
||||
if (columnIndex > -1 && columnIndex < columnsCount) {
|
||||
return values[columnIndex];
|
||||
} else {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
public String getRowRecord() {
|
||||
// here can fast for the common record seperator function with fieldDelimiter=""
|
||||
// becase there is only one field.
|
||||
if (columnsCount == 1) {
|
||||
return values[0];
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (int i = 0; i < columnsCount; i++) {
|
||||
if (i > 0) {
|
||||
sb.append(streamBuffer.fieldDelimiter);// add the fieldDelimiter
|
||||
}
|
||||
sb.append(values[i]);
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* get the result how many records have been read.
|
||||
*/
|
||||
public long getProcessedRecordCount() {
|
||||
return currentRecord;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* when skip the some records of the beginning and some records of the end in the file, then compute how many
|
||||
* records are there.
|
||||
* </p>
|
||||
* <p>
|
||||
* it can use after the skipHeaders();
|
||||
* </p>
|
||||
*/
|
||||
public long getAvailableRowCount(int footer) throws IOException {
|
||||
checkClosed();
|
||||
boolean flag = true;
|
||||
do {
|
||||
flag = readRecord();
|
||||
} while (flag);
|
||||
return currentRecord - footer;
|
||||
}
|
||||
|
||||
public int getAvailableColumnsCount() throws IOException {
|
||||
return columnsCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* skip the some records of the beginning in the file, and set the "currentRecord = 0"
|
||||
*
|
||||
*
|
||||
*/
|
||||
public void skipHeaders(int header) throws IOException {
|
||||
this.header = header;
|
||||
checkClosed();
|
||||
if (header <= 0) {
|
||||
return;
|
||||
}
|
||||
for (int i = 0; i < header; i++) {
|
||||
readRecord();
|
||||
}
|
||||
currentRecord = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* the default limit is:
|
||||
* </p>
|
||||
* <li>public static final int MAX_CHARS_IN_ONE_COLUMN = 100000;</li> <li>public static final int
|
||||
* MAX_COLUMNS_IN_ONE_RECORD = 100000;</li>
|
||||
*
|
||||
*/
|
||||
public void setAutoReallocateForHuge(boolean autoReallocateForHuge) {
|
||||
this.autoReallocateForHuge = autoReallocateForHuge;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* if the input datas like this: 111;222;#aaa;bbb;# (row separator: ;# field separator: ; )
|
||||
* </p>
|
||||
* <li>if FirstSplit_RecordSeparator, there will get (2 records, 2 columns)</li> <li>if FirstSplit_FieldSeparator,
|
||||
* there will get (2 records, 3 columns)</li>
|
||||
* <p>
|
||||
* The default value is false, it means split the field first.
|
||||
* </p>
|
||||
*/
|
||||
public void setSplitRecord(boolean splitRecord) {
|
||||
this.splitRecord = splitRecord;
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes and releases all related resources.
|
||||
*/
|
||||
public void close() {
|
||||
if (!closed) {
|
||||
close(true);
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finalize() {
|
||||
close(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* a buffer: save the end chars of the last buffer and begin chars of the current buffer in the memory
|
||||
*
|
||||
* @author xtan
|
||||
*
|
||||
*/
|
||||
private class ColumnBuffer4Joiner {
|
||||
|
||||
char[] buffer;
|
||||
|
||||
int position;
|
||||
|
||||
public ColumnBuffer4Joiner() {
|
||||
buffer = new char[StaticSettings.INITIAL_COLUMN_BUFFER_SIZE];
|
||||
position = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* join TWO buffer together, save the end chars of the last buffer and begin chars of the current buffer in one
|
||||
* place
|
||||
*/
|
||||
public void saveCharInJoiner() {
|
||||
|
||||
int columnBufferBlankSpaceNum = columnBuffer.buffer.length - columnBuffer.position;
|
||||
int streamBufferFieldCharNum = 0;
|
||||
if (streamBuffer.count > 0) {// a must
|
||||
streamBufferFieldCharNum = streamBuffer.currentPosition - streamBuffer.columnStart;
|
||||
}
|
||||
// check and expand more memory for buffer
|
||||
if (columnBufferBlankSpaceNum < streamBufferFieldCharNum) {
|
||||
int newLength = columnBuffer.buffer.length + Math.max(streamBufferFieldCharNum, columnBuffer.buffer.length);
|
||||
|
||||
char[] holder = new char[newLength];
|
||||
|
||||
System.arraycopy(columnBuffer.buffer, 0, holder, 0, columnBuffer.position);
|
||||
|
||||
columnBuffer.buffer = holder;
|
||||
}
|
||||
|
||||
// copy datas from streamBuffer to columnBuffer for save it temporarily
|
||||
System.arraycopy(streamBuffer.buffer, streamBuffer.columnStart, columnBuffer.buffer, columnBuffer.position,
|
||||
streamBufferFieldCharNum);
|
||||
|
||||
columnBuffer.position += streamBufferFieldCharNum;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <b> a buffer with funtion: join the last data and read the next buffer, for supporting the multi-separator</b>
|
||||
* Notice: "count, currentPosition, currentPosition, columnStart", they are import here
|
||||
*
|
||||
* @author xtan
|
||||
*/
|
||||
private class StreamBuffer {
|
||||
|
||||
public boolean needJoinReadNextBuffer() {
|
||||
// 5-2=3, when currentPostion=3; or 5-0=5, when currentPosition=5, and file not end, so read next buffer
|
||||
return currentPosition >= lastIndexToRead && !streamEndMeet;
|
||||
/* notice: here is >=, not > */
|
||||
}
|
||||
|
||||
// when count=5, currentPosition=4 ===> hasMoreData()=true, it mean buffer[4] still need process
|
||||
public boolean hasMoreData() {
|
||||
return !streamEndMeet || currentPosition < count;
|
||||
}
|
||||
|
||||
private void moveTailToHead() {
|
||||
count = count - currentPosition;
|
||||
for (int i = 0; i < count; i++) {
|
||||
buffer[i] = buffer[currentPosition + i];
|
||||
}
|
||||
|
||||
lastIndexToRead = count - maxLimit;
|
||||
currentPosition = 0;
|
||||
columnStart = 0;
|
||||
}
|
||||
|
||||
public void joinReadNextBuffer() throws IOException {
|
||||
moveTailToHead();
|
||||
int readCount = 0;
|
||||
int maxReadLength = buffer.length - count;
|
||||
try {
|
||||
readCount = inputStream.read(buffer, count, maxReadLength);
|
||||
} catch (IOException ex) {
|
||||
close();
|
||||
throw ex;
|
||||
}
|
||||
|
||||
if (debug) {
|
||||
System.out.println("##maxReadLength=" + maxReadLength + " newReadCount=" + readCount);
|
||||
System.out.println(streamBuffer);
|
||||
}
|
||||
|
||||
setStreamEnd(readCount, maxReadLength);
|
||||
|
||||
if (readCount > -1) {
|
||||
count += readCount;
|
||||
lastIndexToRead = count - maxLimit;
|
||||
}
|
||||
}
|
||||
|
||||
private void setStreamEnd(int readCount, int maxReadLength) throws IOException {
|
||||
/* @see bug:http://talendforge.org/bugs/view.php?id=4554 */
|
||||
//https://jira.talendforge.org/browse/TDI-44745
|
||||
if (readCount < maxReadLength) {
|
||||
if (readCount == -1) {
|
||||
streamEndMeet = true;
|
||||
} else {
|
||||
if(inputStream.markSupported()) {
|
||||
inputStream.mark(1);
|
||||
if(inputStream.read() == -1) {
|
||||
streamEndMeet = true;
|
||||
}
|
||||
inputStream.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* --------------1------------------- */
|
||||
private char[] buffer;
|
||||
|
||||
private char[] fieldDelimiter;
|
||||
|
||||
private char[] recordDelimiter;
|
||||
|
||||
// aaa;bbb#111;222#
|
||||
private boolean fileEndWithRecordDelimiter = false;
|
||||
|
||||
/* --------------2------------------- */
|
||||
|
||||
private boolean lineFeedAll = false;
|
||||
|
||||
// if LineMode.LINEFEED_ALL, if "\r\n", variableLineFeed = 2, if "\r" or "\n", variableLineFeed = 1
|
||||
// only used to adjust the skipRecordDelimiter()
|
||||
private int variableLineFeed = 0;
|
||||
|
||||
/* --------------3------------------- */
|
||||
|
||||
private int maxLimit;// maxLimit = Math.max(fieldDelimiter.length, recordDelimiter.length);
|
||||
|
||||
private int currentPosition;
|
||||
|
||||
// for join read
|
||||
private int lastIndexToRead;// lastIndexToRead = count - maxLimit, special: 5 = 5 - 0
|
||||
|
||||
private int count;
|
||||
|
||||
private int columnStart;
|
||||
|
||||
/* --------------4------------------- */
|
||||
// end of the file
|
||||
private boolean streamEndMeet = false;
|
||||
|
||||
public StreamBuffer(String fieldDelimiterPara, String recordDelimiterPara) throws IOException {
|
||||
buffer = new char[StaticSettings.MAX_BUFFER_SIZE];
|
||||
|
||||
if (recordDelimiterPara.equals("\n")) {
|
||||
if (StaticSettings.lineMode == LineMode.LINEFEED_ALL) {
|
||||
// notice: here we set it "\r\n", neither "\n" nor "\r", becase there want max length
|
||||
recordDelimiter = "\r\n".toCharArray();
|
||||
lineFeedAll = true;
|
||||
} else if (StaticSettings.lineMode == LineMode.LINEFEED_JRE) {
|
||||
String lineSeparator = (String) java.security.AccessController
|
||||
.doPrivileged(new sun.security.action.GetPropertyAction("line.separator"));
|
||||
recordDelimiter = lineSeparator.toCharArray();
|
||||
} else {
|
||||
recordDelimiter = recordDelimiterPara.toCharArray();
|
||||
}
|
||||
|
||||
} else {
|
||||
recordDelimiter = recordDelimiterPara.toCharArray();
|
||||
}
|
||||
|
||||
fieldDelimiter = fieldDelimiterPara.toCharArray();
|
||||
|
||||
maxLimit = Math.max(fieldDelimiter.length, recordDelimiter.length);
|
||||
|
||||
// read one buffer datas first when initial
|
||||
try {
|
||||
count = inputStream.read(buffer, 0, buffer.length);
|
||||
} catch (IOException ex) {
|
||||
close();
|
||||
throw ex;
|
||||
}
|
||||
|
||||
currentPosition = 0;
|
||||
columnStart = 0;
|
||||
lastIndexToRead = count - maxLimit;
|
||||
setStreamEnd(count, buffer.length);
|
||||
}
|
||||
|
||||
public boolean isStartFieldDelimited() {
|
||||
int maxLengthCanCheck = count - currentPosition;
|
||||
|
||||
if (fieldDelimiter.length != 0 && fieldDelimiter.length <= maxLengthCanCheck) {// test here is a must
|
||||
for (int i = 0; i < fieldDelimiter.length; i++) {
|
||||
if (buffer[currentPosition + i] != fieldDelimiter[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean isStartRecordDelimited() {
|
||||
|
||||
int maxLengthCanCheck = count - currentPosition;
|
||||
if (lineFeedAll) {// maxLengthCanCheck > 0 is must
|
||||
if (maxLengthCanCheck > 0) {
|
||||
if (buffer[currentPosition] == '\n') {
|
||||
variableLineFeed = 1;
|
||||
return true;
|
||||
} else if (buffer[currentPosition] == '\r') {
|
||||
variableLineFeed = 1;
|
||||
if (buffer[currentPosition + 1] == '\n' && maxLengthCanCheck > 1) {
|
||||
variableLineFeed = 2;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
|
||||
} else {
|
||||
|
||||
if (recordDelimiter.length != 0 && recordDelimiter.length <= maxLengthCanCheck) {// test here is a
|
||||
// must
|
||||
for (int i = 0; i < recordDelimiter.length; i++) {
|
||||
if (buffer[currentPosition + i] != recordDelimiter[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public void skipFieldDelimiter() {
|
||||
currentPosition += fieldDelimiter.length;
|
||||
columnStart = currentPosition;// means: start a new Column
|
||||
}
|
||||
|
||||
public void skipRecordDelimiter() {
|
||||
if (lineFeedAll) {
|
||||
currentPosition += variableLineFeed;
|
||||
} else {
|
||||
currentPosition += recordDelimiter.length;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuffer sb = new StringBuffer();
|
||||
sb.append("count=").append(count).append("\n");
|
||||
sb.append("maxLimit=").append(maxLimit).append("\n");
|
||||
sb.append("lastIndexToRead=").append(lastIndexToRead).append("\n");
|
||||
sb.append("currentPosition=").append(currentPosition).append("\n");
|
||||
sb.append("columnStart=").append(columnStart).append("\n");
|
||||
sb.append("streamEndMeet=").append(streamEndMeet).append("\n");
|
||||
|
||||
sb.append("overMaxLimit()=").append(needJoinReadNextBuffer()).append("\n");
|
||||
sb.append("hasMoreData()=").append(hasMoreData()).append("\n");
|
||||
|
||||
sb.append("char[]=").append(buffer).append("\n");
|
||||
sb.append("char[").append(currentPosition).append("]=").append(buffer[currentPosition]).append("\n");
|
||||
|
||||
sb.append("fieldDelimiterLength=").append(fieldDelimiter.length).append("\n");
|
||||
sb.append("recordDelimiterLength=").append(recordDelimiter.length).append("\n");
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
public enum LineMode {
|
||||
// it means, if input recordDelimiter is "\n",
|
||||
// if JRE on windows, it will be "\r\n",
|
||||
// if JRE on linux, it will be "\n",
|
||||
// if JRE on Mac, it will be "\r"
|
||||
|
||||
LINEFEED_JRE,
|
||||
|
||||
// it means, if input recordDelimiter is "\n", it will treat all the "\r", "\n", "\r\n" as recordDelimiter
|
||||
LINEFEED_ALL,
|
||||
|
||||
// it means, if input recordDelimiter is "\n", just treat it as "\n"
|
||||
LINEFEED_NORMAL;
|
||||
}
|
||||
|
||||
private enum SplitWay {
|
||||
|
||||
FIRSTSPLIT_RECORDSEPARATOR,
|
||||
|
||||
FIRSTSPLIT_FIELDSEPARATOR;
|
||||
}
|
||||
|
||||
/**
|
||||
* StaticSettings for the DelimitedDataReader. they can be changed in unit test.
|
||||
*/
|
||||
private static class StaticSettings {
|
||||
|
||||
public static final int MAX_BUFFER_SIZE = 1024;
|
||||
|
||||
public static final int INITIAL_COLUMN_COUNT = 10;
|
||||
|
||||
public static final int INITIAL_COLUMN_BUFFER_SIZE = 50;
|
||||
|
||||
public static final int MAX_CHARS_IN_ONE_COLUMN = 100000;
|
||||
|
||||
public static final int MAX_COLUMNS_IN_ONE_RECORD = 100000;
|
||||
|
||||
// how do we process the "\n" as recordDelimiter
|
||||
public static final LineMode lineMode = LineMode.LINEFEED_ALL;
|
||||
|
||||
// 1. how do we process the case, file end with one recordDelimiter, like the following case, 2 records or 3
|
||||
// records?aaa;bbb#111;222#
|
||||
// 2. notice: there only process the last one recordDelimiter, if this case: aaa;bbb#111;222####, we still focus
|
||||
// on the last recordDelimiter
|
||||
public static final boolean ignoreFileEndWithOneRecordDelimiter = true;
|
||||
|
||||
// public static final boolean autoReallocateForHuge = true;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
TOSDelimitedReader fid = new TOSDelimitedReader("D:\\talend\\talendFID\\in.csv", "ISO-8859-15", "", "\n", false);
|
||||
int rowNum = 0;
|
||||
while (fid.readRecord()) {
|
||||
System.out.println("*********Row" + rowNum + "***********");
|
||||
System.out.println("------Row------\n" + fid.getRowRecord());
|
||||
int fieldNum = fid.getAvailableColumnsCount();
|
||||
for (int k = 0; k < fieldNum; k++) {
|
||||
System.out.println("------" + k + "------\n" + fid.get(k));
|
||||
}
|
||||
rowNum++;
|
||||
System.out.println("\n\n");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,74 +0,0 @@
|
||||
package org.talend.fileprocess;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.PushbackInputStream;
|
||||
import java.io.Reader;
|
||||
|
||||
public class UnicodeReader extends Reader {
|
||||
private static final int BOM_SIZE = 4;
|
||||
private final InputStreamReader reader;
|
||||
|
||||
/**
|
||||
* Construct UnicodeReader
|
||||
* @param in Input stream.
|
||||
* @param defaultEncoding Default encoding to be used if BOM is not found,
|
||||
* or <code>null</code> to use system default encoding.
|
||||
* @throws IOException If an I/O error occurs.
|
||||
*/
|
||||
public UnicodeReader(InputStream in, String defaultEncoding) throws IOException {
|
||||
byte bom[] = new byte[BOM_SIZE];
|
||||
String encoding;
|
||||
int unread;
|
||||
PushbackInputStream pushbackStream = new PushbackInputStream(in, BOM_SIZE);
|
||||
int n = pushbackStream.read(bom, 0, bom.length);
|
||||
|
||||
// Read ahead four bytes and check for BOM marks.
|
||||
if ((bom[0] == (byte) 0xEF) && (bom[1] == (byte) 0xBB) && (bom[2] == (byte) 0xBF)) {
|
||||
encoding = "UTF-8";
|
||||
unread = n - 3;
|
||||
} else if ((bom[0] == (byte) 0xFE) && (bom[1] == (byte) 0xFF)) {
|
||||
encoding = "UTF-16BE";
|
||||
unread = n - 2;
|
||||
} else if ((bom[0] == (byte) 0xFF) && (bom[1] == (byte) 0xFE) && (bom[2] == (byte) 0x00) && (bom[3] == (byte) 0x00)) {
|
||||
encoding = "UTF-32LE";
|
||||
unread = n - 4;
|
||||
} else if ((bom[0] == (byte) 0xFF) && (bom[1] == (byte) 0xFE)) {
|
||||
encoding = "UTF-16LE";
|
||||
unread = n - 2;
|
||||
} else if ((bom[0] == (byte) 0x00) && (bom[1] == (byte) 0x00) && (bom[2] == (byte) 0xFE) && (bom[3] == (byte) 0xFF)) {
|
||||
encoding = "UTF-32BE";
|
||||
unread = n - 4;
|
||||
} else {
|
||||
encoding = defaultEncoding;
|
||||
unread = n;
|
||||
}
|
||||
|
||||
// Unread bytes if necessary and skip BOM marks.
|
||||
if (unread > 0) {
|
||||
pushbackStream.unread(bom, (n - unread), unread);
|
||||
} else if (unread < -1) {
|
||||
pushbackStream.unread(bom, 0, 0);
|
||||
}
|
||||
|
||||
// Use given encoding.
|
||||
if (encoding == null) {
|
||||
reader = new InputStreamReader(pushbackStream);
|
||||
} else {
|
||||
reader = new InputStreamReader(pushbackStream, encoding);
|
||||
}
|
||||
}
|
||||
|
||||
public String getEncoding() {
|
||||
return reader.getEncoding();
|
||||
}
|
||||
|
||||
public int read(char[] cbuf, int off, int len) throws IOException {
|
||||
return reader.read(cbuf, off, len);
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
@@ -1,412 +0,0 @@
|
||||
// ============================================================================
|
||||
//
|
||||
// Talend Community Edition
|
||||
//
|
||||
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
|
||||
//
|
||||
// This library is free software; you can redistribute it and/or
|
||||
// modify it under the terms of the GNU Lesser General Public
|
||||
// License as published by the Free Software Foundation; either
|
||||
// version 2.1 of the License.
|
||||
//
|
||||
// This library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
// Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program; if not, write to the Free Software
|
||||
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
|
||||
//
|
||||
// ============================================================================
|
||||
package org.talend.fileprocess.delimited;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.text.NumberFormat;
|
||||
|
||||
/**
|
||||
* A stream based parser for parsing delimited text data from a reader. this
|
||||
* class only works when the record delimiter is a String which length is > 1,
|
||||
* and the field delimiter is a single character.<br/>
|
||||
*
|
||||
* @author gke
|
||||
*
|
||||
*/
|
||||
final class ComplexDelimitedDataReader1 extends DelimitedDataReader {
|
||||
|
||||
private StreamBuffer streamBuffer = new StreamBuffer();
|
||||
|
||||
private char fieldDelimiter;
|
||||
|
||||
private char[] recordDelimiter;
|
||||
|
||||
public ComplexDelimitedDataReader1(BufferedReader inputStream, char delimiter,
|
||||
String recordDelimiter, boolean skipEmptyRecords)
|
||||
throws IOException {
|
||||
super(inputStream, skipEmptyRecords);
|
||||
|
||||
this.fieldDelimiter = delimiter;
|
||||
|
||||
this.recordDelimiter = recordDelimiter.toCharArray();
|
||||
|
||||
streamBuffer.recordDelimiterLength = this.recordDelimiter.length;
|
||||
try {
|
||||
streamBuffer.count = inputStream.read(streamBuffer.buffer, 0,
|
||||
streamBuffer.buffer.length);
|
||||
} catch (IOException ex) {
|
||||
close();
|
||||
|
||||
throw ex;
|
||||
}
|
||||
streamBuffer.currentPosition = 0;
|
||||
streamBuffer.columnStart = 0;
|
||||
streamBuffer.lastIndexToRead = streamBuffer.count
|
||||
- streamBuffer.recordDelimiterLength;
|
||||
streamBuffer.streamEndMeet = (streamBuffer.count < streamBuffer.buffer.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readRecord() throws IOException {
|
||||
checkClosed();
|
||||
|
||||
columnsCount = 0;
|
||||
|
||||
hasReadRecord = false;
|
||||
|
||||
// check to see if we've already found the end of data
|
||||
|
||||
if (streamBuffer.hasMoreData()) {
|
||||
// loop over the data stream until the end of data is found
|
||||
// or the end of the record is found
|
||||
|
||||
// char currentLetter = '\0';
|
||||
|
||||
while (streamBuffer.hasMoreData() && !hasReadRecord) {
|
||||
if (streamBuffer.noDataRecordDelimter()) {
|
||||
checkDataLength();
|
||||
} else {
|
||||
// grab the current letter as a char
|
||||
|
||||
char currentLetter = streamBuffer.buffer[streamBuffer.currentPosition];
|
||||
|
||||
if (currentLetter == fieldDelimiter) {
|
||||
// we encountered a column with no data, so
|
||||
// just send the end column
|
||||
|
||||
endColumn();
|
||||
streamBuffer.currentPosition++;
|
||||
} else if (isStartRecordDelimited()) {
|
||||
if (columnsCount > 0 || !skipEmptyRecord) {
|
||||
endColumn();
|
||||
endRecord();
|
||||
}
|
||||
streamBuffer.skipRecordDelimiter();
|
||||
|
||||
} else {
|
||||
// since the letter wasn't a special letter, this
|
||||
// will be the first letter of our current column
|
||||
|
||||
startedRow = true;
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition;
|
||||
|
||||
boolean firstLoop = true;
|
||||
|
||||
while (streamBuffer.hasMoreData() && startedRow) {
|
||||
if (!firstLoop
|
||||
&& streamBuffer.noDataRecordDelimter()) {
|
||||
checkDataLength();
|
||||
} else {
|
||||
if (!firstLoop) {
|
||||
// grab the current letter as a char
|
||||
currentLetter = streamBuffer.buffer[streamBuffer.currentPosition];
|
||||
if (currentLetter == fieldDelimiter) {
|
||||
endColumn();
|
||||
streamBuffer.currentPosition++;
|
||||
} else if (isStartRecordDelimited()) {
|
||||
endColumn();
|
||||
|
||||
endRecord();
|
||||
|
||||
streamBuffer.skipRecordDelimiter();
|
||||
}
|
||||
} else {
|
||||
firstLoop = false;
|
||||
}
|
||||
|
||||
if (startedRow) {
|
||||
streamBuffer.currentPosition++;
|
||||
|
||||
if (safetySwitch
|
||||
&& streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart
|
||||
+ columnBuffer.position > StaticSettings.MAX_SIZE_FOR_SAFTY) {
|
||||
close();
|
||||
|
||||
throw new IOException(
|
||||
"Maximum column length of 100,000 exceeded in column "
|
||||
+ NumberFormat
|
||||
.getIntegerInstance()
|
||||
.format(
|
||||
columnsCount)
|
||||
+ " in record "
|
||||
+ NumberFormat
|
||||
.getIntegerInstance()
|
||||
.format(
|
||||
currentRecord)
|
||||
+ ". Set the SafetySwitch property to false"
|
||||
+ " if you're expecting column lengths greater than 100,000 characters to"
|
||||
+ " avoid this error.");
|
||||
}
|
||||
}
|
||||
} // end else
|
||||
}
|
||||
}
|
||||
} // end else
|
||||
}
|
||||
}
|
||||
|
||||
// check to see if we hit the end of the file
|
||||
// without processing the current record
|
||||
if (!hasReadRecord && !streamBuffer.noData()) {
|
||||
while (!streamBuffer.noData()) {
|
||||
if (streamBuffer.buffer[streamBuffer.currentPosition] == fieldDelimiter) {
|
||||
endColumn();
|
||||
streamBuffer.currentPosition++;
|
||||
startedRow = true;
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition;
|
||||
} else {
|
||||
if (!startedRow) {
|
||||
startedRow = true;
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition;
|
||||
}
|
||||
streamBuffer.currentPosition++;
|
||||
}
|
||||
}
|
||||
if (startedRow) {
|
||||
endColumn();
|
||||
}
|
||||
endRecord();
|
||||
}
|
||||
|
||||
return hasReadRecord;
|
||||
}
|
||||
|
||||
private boolean isStartRecordDelimited() {
|
||||
for (int i = 0; i < recordDelimiter.length; i++) {
|
||||
if (streamBuffer.buffer[streamBuffer.currentPosition + i] != recordDelimiter[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @exception IOException
|
||||
* Thrown if an error occurs while reading data from the
|
||||
* source stream.
|
||||
*/
|
||||
private void checkDataLength() throws IOException {
|
||||
updateCurrentValue();
|
||||
streamBuffer.moveTailToHead();
|
||||
int count = 0;
|
||||
try {
|
||||
count = inputStream.read(streamBuffer.buffer, streamBuffer.count,
|
||||
streamBuffer.buffer.length - streamBuffer.count);
|
||||
} catch (IOException ex) {
|
||||
close();
|
||||
|
||||
throw ex;
|
||||
}
|
||||
streamBuffer.count += count;
|
||||
streamBuffer.lastIndexToRead = streamBuffer.count
|
||||
- streamBuffer.recordDelimiterLength;
|
||||
if (streamBuffer.count < streamBuffer.buffer.length) {
|
||||
streamBuffer.streamEndMeet = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @exception IOException
|
||||
* Thrown if a very rare extreme exception occurs during
|
||||
* parsing, normally resulting from improper data format.
|
||||
*/
|
||||
private void endColumn() throws IOException {
|
||||
String currentValue = "";
|
||||
|
||||
// must be called before setting startedColumn = false
|
||||
if (startedRow) {
|
||||
if (columnBuffer.position == 0) {
|
||||
if (streamBuffer.columnStart < streamBuffer.currentPosition) {
|
||||
currentValue = new String(streamBuffer.buffer,
|
||||
streamBuffer.columnStart,
|
||||
streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart);
|
||||
}
|
||||
} else {
|
||||
updateCurrentValue();
|
||||
currentValue = new String(columnBuffer.buffer, 0,
|
||||
columnBuffer.position);
|
||||
}
|
||||
}
|
||||
|
||||
columnBuffer.position = 0;
|
||||
|
||||
startedRow = false;
|
||||
|
||||
if (columnsCount >= StaticSettings.MAX_SIZE_FOR_SAFTY && safetySwitch) {
|
||||
close();
|
||||
|
||||
throw new IOException(
|
||||
"Maximum column count of 100,000 exceeded in record "
|
||||
+ NumberFormat.getIntegerInstance().format(
|
||||
currentRecord)
|
||||
+ ". Set the SafetySwitch property to false"
|
||||
+ " if you're expecting more than 100,000 columns per record to"
|
||||
+ " avoid this error.");
|
||||
}
|
||||
|
||||
// check to see if our current holder array for
|
||||
// column chunks is still big enough to handle another
|
||||
// column chunk
|
||||
|
||||
if (columnsCount == values.length) {
|
||||
// holder array needs to grow to be able to hold another column
|
||||
int newLength = values.length * 2;
|
||||
|
||||
String[] holder = new String[newLength];
|
||||
|
||||
System.arraycopy(values, 0, holder, 0, values.length);
|
||||
|
||||
values = holder;
|
||||
}
|
||||
|
||||
values[columnsCount] = currentValue;
|
||||
|
||||
currentValue = "";
|
||||
|
||||
columnsCount++;
|
||||
}
|
||||
|
||||
private void updateCurrentValue() {
|
||||
if (startedRow
|
||||
&& streamBuffer.columnStart < streamBuffer.currentPosition) {
|
||||
if (columnBuffer.buffer.length - columnBuffer.position < streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart) {
|
||||
int newLength = columnBuffer.buffer.length
|
||||
+ Math.max(streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart,
|
||||
columnBuffer.buffer.length);
|
||||
|
||||
char[] holder = new char[newLength];
|
||||
|
||||
System.arraycopy(columnBuffer.buffer, 0, holder, 0,
|
||||
columnBuffer.position);
|
||||
|
||||
columnBuffer.buffer = holder;
|
||||
}
|
||||
|
||||
System.arraycopy(streamBuffer.buffer, streamBuffer.columnStart,
|
||||
columnBuffer.buffer, columnBuffer.position,
|
||||
streamBuffer.currentPosition - streamBuffer.columnStart);
|
||||
|
||||
columnBuffer.position += streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
protected void close(boolean closing) {
|
||||
if (!closed) {
|
||||
if (closing) {
|
||||
streamBuffer.buffer = null;
|
||||
columnBuffer.buffer = null;
|
||||
}
|
||||
|
||||
try {
|
||||
if (initialized) {
|
||||
inputStream.close();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// just eat the exception
|
||||
}
|
||||
|
||||
inputStream = null;
|
||||
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A buffer structure that used to load data from stream for processing.
|
||||
*
|
||||
* @author gke
|
||||
*/
|
||||
private class StreamBuffer {
|
||||
|
||||
char[] buffer;
|
||||
|
||||
int currentPosition;
|
||||
|
||||
// lastIndexToRead is the last index of letter in the buffer for
|
||||
// processing, this will be equal to count - recordDelimiterLegnth. This
|
||||
// is needed because evertime we process a letter, when we need to read
|
||||
// the proceeding letters in order to make sure if currentPosition is
|
||||
// the begin of record delmiter
|
||||
int lastIndexToRead;
|
||||
|
||||
int recordDelimiterLength;
|
||||
|
||||
// count indicates how much usable data has been read into the stream,
|
||||
// which will not always be as long as buffer.Length.
|
||||
int count;
|
||||
|
||||
// columnStart is the position in the buffer when the
|
||||
// current column was started or the last time data
|
||||
// was moved out to the column buffer.
|
||||
int columnStart;
|
||||
|
||||
// streamEndMeet is also very important, when we loag data from stream,
|
||||
// if the count is smaller than buffer.length, it indcates that there
|
||||
// must be no more data in the stream, the stream end is meet.
|
||||
boolean streamEndMeet = false;
|
||||
|
||||
public StreamBuffer() {
|
||||
buffer = new char[StaticSettings.MAX_BUFFER_SIZE];
|
||||
currentPosition = 0;
|
||||
count = 0;
|
||||
columnStart = 0;
|
||||
}
|
||||
|
||||
public void moveTailToHead() {
|
||||
count = count - currentPosition;
|
||||
for (int i = 0; i < count; i++) {
|
||||
buffer[i] = buffer[currentPosition + i];
|
||||
}
|
||||
lastIndexToRead = count - recordDelimiterLength;
|
||||
currentPosition = 0;
|
||||
columnStart = 0;
|
||||
}
|
||||
|
||||
public void skipRecordDelimiter() {
|
||||
currentPosition += recordDelimiterLength;
|
||||
}
|
||||
|
||||
public boolean noDataRecordDelimter() {
|
||||
return currentPosition > lastIndexToRead;
|
||||
}
|
||||
|
||||
public boolean hasMoreData() {
|
||||
return (currentPosition <= lastIndexToRead)
|
||||
|| (currentPosition > lastIndexToRead && !streamEndMeet);
|
||||
}
|
||||
|
||||
public boolean noData() {
|
||||
return currentPosition >= count;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,440 +0,0 @@
|
||||
// ============================================================================
|
||||
//
|
||||
// Talend Community Edition
|
||||
//
|
||||
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
|
||||
//
|
||||
// This library is free software; you can redistribute it and/or
|
||||
// modify it under the terms of the GNU Lesser General Public
|
||||
// License as published by the Free Software Foundation; either
|
||||
// version 2.1 of the License.
|
||||
//
|
||||
// This library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
// Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program; if not, write to the Free Software
|
||||
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
|
||||
//
|
||||
// ============================================================================
|
||||
package org.talend.fileprocess.delimited;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.text.NumberFormat;
|
||||
|
||||
/**
|
||||
* A stream based parser for parsing delimited text data from a reader. this
|
||||
* class only works when the field delimiter is a String which length is > 1,
|
||||
* and the record delimiter is a single character.<br/>
|
||||
*
|
||||
* @author gke
|
||||
*
|
||||
*/
|
||||
final class ComplexDelimitedDataReader2 extends DelimitedDataReader {
|
||||
|
||||
private boolean useCustomRecordDelimiter = false;
|
||||
|
||||
private StreamBuffer streamBuffer = new StreamBuffer();
|
||||
|
||||
private char lastLetter = '\0';
|
||||
|
||||
private char[] fieldDelimiter;
|
||||
|
||||
private char recordDelimiter;
|
||||
|
||||
public ComplexDelimitedDataReader2(BufferedReader inputStream, String delimiter,
|
||||
char recordDelimiter, boolean skipEmptyRecords) throws IOException {
|
||||
|
||||
super(inputStream, skipEmptyRecords);
|
||||
|
||||
this.fieldDelimiter = delimiter.toCharArray();
|
||||
|
||||
this.recordDelimiter = recordDelimiter;
|
||||
if (recordDelimiter != '\n' && recordDelimiter != '\r') {
|
||||
useCustomRecordDelimiter = true;
|
||||
}
|
||||
streamBuffer.fieldDelimiterLength = this.fieldDelimiter.length;
|
||||
try {
|
||||
streamBuffer.count = inputStream.read(streamBuffer.buffer, 0,
|
||||
streamBuffer.buffer.length);
|
||||
} catch (IOException ex) {
|
||||
close();
|
||||
|
||||
throw ex;
|
||||
}
|
||||
streamBuffer.currentPosition = 0;
|
||||
streamBuffer.columnStart = 0;
|
||||
streamBuffer.lastIndexToRead = streamBuffer.count
|
||||
- streamBuffer.fieldDelimiterLength;
|
||||
streamBuffer.streamEndMeet = (streamBuffer.count < streamBuffer.buffer.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readRecord() throws IOException {
|
||||
checkClosed();
|
||||
columnsCount = 0;
|
||||
hasReadRecord = false;
|
||||
if (streamBuffer.hasMoreData()) {
|
||||
while (streamBuffer.hasMoreData() && !hasReadRecord) {
|
||||
if (streamBuffer.noDataFieldDelimiter()) {
|
||||
checkDataLength();
|
||||
} else {
|
||||
char currentLetter = streamBuffer.buffer[streamBuffer.currentPosition];
|
||||
if (isStartFieldDelimited()) {
|
||||
// encountered a column with no data
|
||||
endColumn();
|
||||
streamBuffer.skipFieldDelimiter();
|
||||
lastLetter = streamBuffer.buffer[streamBuffer.currentPosition - 1];
|
||||
} else if (useCustomRecordDelimiter
|
||||
&& currentLetter == recordDelimiter) {
|
||||
if (!skipEmptyRecord || columnsCount > 0) {
|
||||
endColumn();
|
||||
endRecord();
|
||||
}
|
||||
streamBuffer.currentPosition++;
|
||||
lastLetter = streamBuffer.buffer[streamBuffer.currentPosition - 1];
|
||||
} else if (!useCustomRecordDelimiter
|
||||
&& (currentLetter == '\r' || currentLetter == '\n')) {
|
||||
if ((!skipEmptyRecord || columnsCount > 0)
|
||||
&& ((currentLetter == '\r' || lastLetter != '\r'))) {
|
||||
endColumn();
|
||||
endRecord();
|
||||
}
|
||||
streamBuffer.currentPosition++;
|
||||
lastLetter = streamBuffer.buffer[streamBuffer.currentPosition - 1];
|
||||
} else {
|
||||
// since the letter wasn't a special letter, this
|
||||
// will be the first letter of our current column
|
||||
startedRow = true;
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition;
|
||||
boolean firstLoop = true;
|
||||
while (streamBuffer.hasMoreData() && startedRow) {
|
||||
if (!firstLoop
|
||||
&& streamBuffer.noDataFieldDelimiter()) {
|
||||
checkDataLength();
|
||||
} else {
|
||||
if (!firstLoop) {
|
||||
currentLetter = streamBuffer.buffer[streamBuffer.currentPosition];
|
||||
if (isStartFieldDelimited()) {
|
||||
endColumn();
|
||||
streamBuffer.skipFieldDelimiter();
|
||||
lastLetter = streamBuffer.buffer[streamBuffer.currentPosition - 1];
|
||||
} else if ((!useCustomRecordDelimiter && (currentLetter == '\r' || currentLetter == '\n'))
|
||||
|| (useCustomRecordDelimiter && currentLetter == recordDelimiter)) {
|
||||
endColumn();
|
||||
endRecord();
|
||||
streamBuffer.currentPosition++;
|
||||
lastLetter = streamBuffer.buffer[streamBuffer.currentPosition - 1];
|
||||
}
|
||||
} else {
|
||||
firstLoop = false;
|
||||
}
|
||||
if (startedRow) {
|
||||
lastLetter = currentLetter;
|
||||
streamBuffer.currentPosition++;
|
||||
if (safetySwitch
|
||||
&& streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart
|
||||
+ columnBuffer.position > StaticSettings.MAX_SIZE_FOR_SAFTY) {
|
||||
close();
|
||||
throw new IOException(
|
||||
"Maximum column length of 100,000 exceeded in column "
|
||||
+ NumberFormat
|
||||
.getIntegerInstance()
|
||||
.format(
|
||||
columnsCount)
|
||||
+ " in record "
|
||||
+ NumberFormat
|
||||
.getIntegerInstance()
|
||||
.format(
|
||||
currentRecord)
|
||||
+ ". Set the SafetySwitch property to false"
|
||||
+ " if you're expecting column lengths greater than 100,000 characters to"
|
||||
+ " avoid this error.");
|
||||
}
|
||||
}
|
||||
} // end else
|
||||
}
|
||||
}
|
||||
} // end else
|
||||
}
|
||||
}
|
||||
if (!hasReadRecord) {
|
||||
while (!streamBuffer.noData() && !hasReadRecord) {
|
||||
char currentLetter = streamBuffer.buffer[streamBuffer.currentPosition];
|
||||
if (startedRow) {
|
||||
if ((!useCustomRecordDelimiter && (currentLetter == '\r' || currentLetter == '\n'))
|
||||
|| (useCustomRecordDelimiter && currentLetter == recordDelimiter)) {
|
||||
endColumn();
|
||||
|
||||
endRecord();
|
||||
lastLetter = currentLetter;
|
||||
streamBuffer.currentPosition++;
|
||||
return true;
|
||||
} else {
|
||||
lastLetter = currentLetter;
|
||||
streamBuffer.currentPosition++;
|
||||
}
|
||||
} else {
|
||||
if (useCustomRecordDelimiter
|
||||
&& currentLetter == recordDelimiter) {
|
||||
if (!skipEmptyRecord || columnsCount > 0) {
|
||||
endColumn();
|
||||
endRecord();
|
||||
lastLetter = currentLetter;
|
||||
streamBuffer.currentPosition++;
|
||||
return true;
|
||||
} else {
|
||||
lastLetter = currentLetter;
|
||||
streamBuffer.currentPosition++;
|
||||
}
|
||||
|
||||
} else if (!useCustomRecordDelimiter
|
||||
&& (currentLetter == '\r' || currentLetter == '\n')) {
|
||||
if ((!skipEmptyRecord || columnsCount > 0)
|
||||
&& ((currentLetter == '\r' || lastLetter != '\r'))) {
|
||||
endColumn();
|
||||
endRecord();
|
||||
lastLetter = currentLetter;
|
||||
streamBuffer.currentPosition++;
|
||||
return true;
|
||||
} else {
|
||||
lastLetter = currentLetter;
|
||||
streamBuffer.currentPosition++;
|
||||
}
|
||||
} else {
|
||||
startedRow = true;
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition;
|
||||
lastLetter = currentLetter;
|
||||
streamBuffer.currentPosition++;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (streamBuffer.noData() && !hasReadRecord) {
|
||||
if (columnsCount > 0 || startedRow) {
|
||||
endColumn();
|
||||
endRecord();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return hasReadRecord;
|
||||
}
|
||||
|
||||
private boolean isStartFieldDelimited() {
|
||||
for (int i = 0; i < fieldDelimiter.length; i++) {
|
||||
if (streamBuffer.buffer[streamBuffer.currentPosition + i] != fieldDelimiter[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @exception IOException
|
||||
* Thrown if an error occurs while reading data from the
|
||||
* source stream.
|
||||
*/
|
||||
private void checkDataLength() throws IOException {
|
||||
updateCurrentValue();
|
||||
streamBuffer.moveTailToHead();
|
||||
int count = 0;
|
||||
try {
|
||||
count = inputStream.read(streamBuffer.buffer, streamBuffer.count,
|
||||
streamBuffer.buffer.length - streamBuffer.count);
|
||||
} catch (IOException ex) {
|
||||
close();
|
||||
|
||||
throw ex;
|
||||
}
|
||||
streamBuffer.count += count;
|
||||
streamBuffer.lastIndexToRead = streamBuffer.count
|
||||
- streamBuffer.fieldDelimiterLength;
|
||||
if (streamBuffer.count < streamBuffer.buffer.length) {
|
||||
streamBuffer.streamEndMeet = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @exception IOException
|
||||
* Thrown if a very rare extreme exception occurs during
|
||||
* parsing, normally resulting from improper data format.
|
||||
*/
|
||||
private void endColumn() throws IOException {
|
||||
String currentValue = "";
|
||||
|
||||
// must be called before setting startedColumn = false
|
||||
if (startedRow) {
|
||||
if (columnBuffer.position == 0) {
|
||||
if (streamBuffer.columnStart < streamBuffer.currentPosition) {
|
||||
currentValue = new String(streamBuffer.buffer,
|
||||
streamBuffer.columnStart,
|
||||
streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart);
|
||||
}
|
||||
} else {
|
||||
updateCurrentValue();
|
||||
currentValue = new String(columnBuffer.buffer, 0,
|
||||
columnBuffer.position);
|
||||
}
|
||||
}
|
||||
|
||||
columnBuffer.position = 0;
|
||||
|
||||
startedRow = false;
|
||||
|
||||
if (columnsCount >= StaticSettings.MAX_SIZE_FOR_SAFTY && safetySwitch) {
|
||||
close();
|
||||
|
||||
throw new IOException(
|
||||
"Maximum column count of 100,000 exceeded in record "
|
||||
+ NumberFormat.getIntegerInstance().format(
|
||||
currentRecord)
|
||||
+ ". Set the SafetySwitch property to false"
|
||||
+ " if you're expecting more than 100,000 columns per record to"
|
||||
+ " avoid this error.");
|
||||
}
|
||||
|
||||
// check to see if our current holder array for
|
||||
// column chunks is still big enough to handle another
|
||||
// column chunk
|
||||
|
||||
if (columnsCount == values.length) {
|
||||
// holder array needs to grow to be able to hold another column
|
||||
int newLength = values.length * 2;
|
||||
|
||||
String[] holder = new String[newLength];
|
||||
|
||||
System.arraycopy(values, 0, holder, 0, values.length);
|
||||
|
||||
values = holder;
|
||||
}
|
||||
|
||||
values[columnsCount] = currentValue;
|
||||
|
||||
currentValue = "";
|
||||
|
||||
columnsCount++;
|
||||
}
|
||||
|
||||
private void updateCurrentValue() {
|
||||
if (startedRow
|
||||
&& streamBuffer.columnStart < streamBuffer.currentPosition) {
|
||||
if (columnBuffer.buffer.length - columnBuffer.position < streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart) {
|
||||
int newLength = columnBuffer.buffer.length
|
||||
+ Math.max(streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart,
|
||||
columnBuffer.buffer.length);
|
||||
|
||||
char[] holder = new char[newLength];
|
||||
|
||||
System.arraycopy(columnBuffer.buffer, 0, holder, 0,
|
||||
columnBuffer.position);
|
||||
|
||||
columnBuffer.buffer = holder;
|
||||
}
|
||||
|
||||
System.arraycopy(streamBuffer.buffer, streamBuffer.columnStart,
|
||||
columnBuffer.buffer, columnBuffer.position,
|
||||
streamBuffer.currentPosition - streamBuffer.columnStart);
|
||||
|
||||
columnBuffer.position += streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void close(boolean closing) {
|
||||
if (!closed) {
|
||||
if (closing) {
|
||||
streamBuffer.buffer = null;
|
||||
columnBuffer.buffer = null;
|
||||
}
|
||||
|
||||
try {
|
||||
if (initialized) {
|
||||
inputStream.close();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// just eat the exception
|
||||
}
|
||||
|
||||
inputStream = null;
|
||||
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A buffer structure that used to load data from stream for processing.
|
||||
*
|
||||
* @author gke
|
||||
*/
|
||||
private class StreamBuffer {
|
||||
|
||||
char[] buffer;
|
||||
|
||||
int currentPosition;
|
||||
|
||||
// lastIndexToRead is the last index of letter in the buffer for
|
||||
// processing, this will be equal to count - fieldDelimiterLegnth. This
|
||||
// is needed because evertime we process a letter, when we need to read
|
||||
// the proceeding letters in order to make sure if currentPosition is
|
||||
// the begin of field delmiter
|
||||
int lastIndexToRead;
|
||||
|
||||
int fieldDelimiterLength;
|
||||
|
||||
// count indicates how much usable data has been read into the stream,
|
||||
// which will not always be as long as Buffer.Length.
|
||||
int count;
|
||||
|
||||
// columnStart is the position in the buffer when the
|
||||
// current column was started or the last time data
|
||||
// was moved out to the column buffer.
|
||||
int columnStart;
|
||||
|
||||
// streamEndMeet is also very important, when we loag data from stream,
|
||||
// if the count is smaller than buffer.length, it indcates that there
|
||||
// must be no more data in the stream, the stream end is meet.
|
||||
boolean streamEndMeet = false;
|
||||
|
||||
public StreamBuffer() {
|
||||
buffer = new char[StaticSettings.MAX_BUFFER_SIZE];
|
||||
currentPosition = 0;
|
||||
count = 0;
|
||||
columnStart = 0;
|
||||
}
|
||||
|
||||
public void moveTailToHead() {
|
||||
count = count - currentPosition;
|
||||
for (int i = 0; i < count; i++) {
|
||||
buffer[i] = buffer[currentPosition + i];
|
||||
}
|
||||
lastIndexToRead = count - fieldDelimiterLength;
|
||||
currentPosition = 0;
|
||||
columnStart = 0;
|
||||
}
|
||||
|
||||
public void skipFieldDelimiter() {
|
||||
currentPosition += fieldDelimiterLength;
|
||||
}
|
||||
|
||||
public boolean noData() {
|
||||
return currentPosition >= count;
|
||||
}
|
||||
|
||||
public boolean hasMoreData() {
|
||||
return (currentPosition <= lastIndexToRead)
|
||||
|| (currentPosition > lastIndexToRead && !streamEndMeet);
|
||||
}
|
||||
|
||||
public boolean noDataFieldDelimiter() {
|
||||
return currentPosition > lastIndexToRead;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,401 +0,0 @@
|
||||
// ============================================================================
|
||||
//
|
||||
// Talend Community Edition
|
||||
//
|
||||
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
|
||||
//
|
||||
// This library is free software; you can redistribute it and/or
|
||||
// modify it under the terms of the GNU Lesser General Public
|
||||
// License as published by the Free Software Foundation; either
|
||||
// version 2.1 of the License.
|
||||
//
|
||||
// This library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
// Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program; if not, write to the Free Software
|
||||
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
|
||||
//
|
||||
// ============================================================================
|
||||
package org.talend.fileprocess.delimited;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.text.NumberFormat;
|
||||
|
||||
/**
|
||||
* A stream based parser for parsing delimited text data from a reader. this
|
||||
* class works when the record delimiter and field delimiter are both a String
|
||||
* which length is > 1, and the record delimiter's length equals to the field
|
||||
* delimiter's.<br/>
|
||||
*
|
||||
* @author gke
|
||||
*
|
||||
*/
|
||||
final class ComplexDelimitedDataReader3 extends DelimitedDataReader {
|
||||
|
||||
private StreamBuffer streamBuffer = new StreamBuffer();
|
||||
|
||||
private char[] fieldDelimiter;
|
||||
|
||||
private char[] recordDelimiter;
|
||||
|
||||
public ComplexDelimitedDataReader3(BufferedReader inputStream, String delimiter,
|
||||
String recordDelimiter, boolean skipEmptyRecords)
|
||||
throws IOException {
|
||||
|
||||
super(inputStream, skipEmptyRecords);
|
||||
|
||||
this.fieldDelimiter = delimiter.toCharArray();
|
||||
|
||||
this.recordDelimiter = recordDelimiter.toCharArray();
|
||||
|
||||
streamBuffer.delimiterLength = this.fieldDelimiter.length;
|
||||
try {
|
||||
streamBuffer.count = inputStream.read(streamBuffer.buffer, 0,
|
||||
streamBuffer.buffer.length);
|
||||
} catch (IOException ex) {
|
||||
close();
|
||||
|
||||
throw ex;
|
||||
}
|
||||
streamBuffer.currentPosition = 0;
|
||||
streamBuffer.columnStart = 0;
|
||||
streamBuffer.lastIndexToRead = streamBuffer.count
|
||||
- streamBuffer.delimiterLength;
|
||||
streamBuffer.streamEndMeet = (streamBuffer.count < streamBuffer.buffer.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readRecord() throws IOException {
|
||||
checkClosed();
|
||||
columnsCount = 0;
|
||||
hasReadRecord = false;
|
||||
|
||||
if (streamBuffer.hasMoreData()) {
|
||||
while (streamBuffer.hasMoreData() && !hasReadRecord) {
|
||||
if (streamBuffer.noDataForDelimter()) {
|
||||
checkDataLength();
|
||||
} else {
|
||||
if (isStartFieldDelimited()) {
|
||||
// encountered a column with no data
|
||||
endColumn();
|
||||
streamBuffer.skipDelimiter();
|
||||
} else if (isStartRecordDelimited()) {
|
||||
if (columnsCount > 0 || !skipEmptyRecord) {
|
||||
endColumn();
|
||||
endRecord();
|
||||
}
|
||||
streamBuffer.skipDelimiter();
|
||||
|
||||
} else {
|
||||
startedRow = true;
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition;
|
||||
|
||||
boolean firstLoop = true;
|
||||
|
||||
while (streamBuffer.hasMoreData() && startedRow) {
|
||||
if (!firstLoop && streamBuffer.noDataForDelimter()) {
|
||||
checkDataLength();
|
||||
} else {
|
||||
if (!firstLoop) {
|
||||
if (isStartFieldDelimited()) {
|
||||
endColumn();
|
||||
streamBuffer.skipDelimiter();
|
||||
} else if (isStartRecordDelimited()) {
|
||||
endColumn();
|
||||
|
||||
endRecord();
|
||||
|
||||
streamBuffer.skipDelimiter();
|
||||
}
|
||||
} else {
|
||||
firstLoop = false;
|
||||
}
|
||||
|
||||
if (startedRow) {
|
||||
streamBuffer.currentPosition++;
|
||||
|
||||
if (safetySwitch
|
||||
&& streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart
|
||||
+ columnBuffer.position > StaticSettings.MAX_SIZE_FOR_SAFTY) {
|
||||
close();
|
||||
|
||||
throw new IOException(
|
||||
"Maximum column length of 100,000 exceeded in column "
|
||||
+ NumberFormat
|
||||
.getIntegerInstance()
|
||||
.format(
|
||||
columnsCount)
|
||||
+ " in record "
|
||||
+ NumberFormat
|
||||
.getIntegerInstance()
|
||||
.format(
|
||||
currentRecord)
|
||||
+ ". Set the SafetySwitch property to false"
|
||||
+ " if you're expecting column lengths greater than 100,000 characters to"
|
||||
+ " avoid this error.");
|
||||
}
|
||||
}
|
||||
} // end else
|
||||
}
|
||||
}
|
||||
} // end else
|
||||
}
|
||||
}
|
||||
if (!hasReadRecord) {
|
||||
if (!streamBuffer.noData()) {
|
||||
if (!startedRow) {
|
||||
startedRow = true;
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition;
|
||||
streamBuffer.currentPosition = streamBuffer.count;
|
||||
endColumn();
|
||||
endRecord();
|
||||
return true;
|
||||
}
|
||||
if (startedRow) {
|
||||
streamBuffer.currentPosition = streamBuffer.count;
|
||||
endColumn();
|
||||
endRecord();
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
if (columnsCount > 0) {
|
||||
endRecord();
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return hasReadRecord;
|
||||
}
|
||||
|
||||
private boolean isStartFieldDelimited() {
|
||||
for (int i = 0; i < fieldDelimiter.length; i++) {
|
||||
if (streamBuffer.buffer[streamBuffer.currentPosition + i] != fieldDelimiter[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean isStartRecordDelimited() {
|
||||
for (int i = 0; i < recordDelimiter.length; i++) {
|
||||
if (streamBuffer.buffer[streamBuffer.currentPosition + i] != recordDelimiter[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @exception IOException
|
||||
* Thrown if an error occurs while reading data from the
|
||||
* source stream.
|
||||
*/
|
||||
private void checkDataLength() throws IOException {
|
||||
updateCurrentValue();
|
||||
streamBuffer.moveTailToHead();
|
||||
int count = 0;
|
||||
try {
|
||||
count = inputStream.read(streamBuffer.buffer, streamBuffer.count,
|
||||
streamBuffer.buffer.length - streamBuffer.count);
|
||||
} catch (IOException ex) {
|
||||
close();
|
||||
|
||||
throw ex;
|
||||
}
|
||||
streamBuffer.count += count;
|
||||
streamBuffer.lastIndexToRead = streamBuffer.count
|
||||
- streamBuffer.delimiterLength;
|
||||
if (streamBuffer.count < streamBuffer.buffer.length) {
|
||||
streamBuffer.streamEndMeet = true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @exception IOException
|
||||
* Thrown if a very rare extreme exception occurs during
|
||||
* parsing, normally resulting from improper data format.
|
||||
*/
|
||||
private void endColumn() throws IOException {
|
||||
String currentValue = "";
|
||||
|
||||
// must be called before setting startedColumn = false
|
||||
if (startedRow) {
|
||||
if (columnBuffer.position == 0) {
|
||||
if (streamBuffer.columnStart < streamBuffer.currentPosition) {
|
||||
currentValue = new String(streamBuffer.buffer,
|
||||
streamBuffer.columnStart,
|
||||
streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart);
|
||||
}
|
||||
} else {
|
||||
updateCurrentValue();
|
||||
currentValue = new String(columnBuffer.buffer, 0,
|
||||
columnBuffer.position);
|
||||
}
|
||||
}
|
||||
|
||||
columnBuffer.position = 0;
|
||||
|
||||
startedRow = false;
|
||||
|
||||
if (columnsCount >= StaticSettings.MAX_SIZE_FOR_SAFTY && safetySwitch) {
|
||||
close();
|
||||
|
||||
throw new IOException(
|
||||
"Maximum column count of 100,000 exceeded in record "
|
||||
+ NumberFormat.getIntegerInstance().format(
|
||||
currentRecord)
|
||||
+ ". Set the SafetySwitch property to false"
|
||||
+ " if you're expecting more than 100,000 columns per record to"
|
||||
+ " avoid this error.");
|
||||
}
|
||||
|
||||
// check to see if our current holder array for
|
||||
// column chunks is still big enough to handle another
|
||||
// column chunk
|
||||
|
||||
if (columnsCount == values.length) {
|
||||
// holder array needs to grow to be able to hold another column
|
||||
int newLength = values.length * 2;
|
||||
|
||||
String[] holder = new String[newLength];
|
||||
|
||||
System.arraycopy(values, 0, holder, 0, values.length);
|
||||
|
||||
values = holder;
|
||||
}
|
||||
|
||||
values[columnsCount] = currentValue;
|
||||
|
||||
currentValue = "";
|
||||
|
||||
columnsCount++;
|
||||
}
|
||||
|
||||
private void updateCurrentValue() {
|
||||
if (startedRow
|
||||
&& streamBuffer.columnStart < streamBuffer.currentPosition) {
|
||||
if (columnBuffer.buffer.length - columnBuffer.position < streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart) {
|
||||
int newLength = columnBuffer.buffer.length
|
||||
+ Math.max(streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart,
|
||||
columnBuffer.buffer.length);
|
||||
|
||||
char[] holder = new char[newLength];
|
||||
|
||||
System.arraycopy(columnBuffer.buffer, 0, holder, 0,
|
||||
columnBuffer.position);
|
||||
|
||||
columnBuffer.buffer = holder;
|
||||
}
|
||||
|
||||
System.arraycopy(streamBuffer.buffer, streamBuffer.columnStart,
|
||||
columnBuffer.buffer, columnBuffer.position,
|
||||
streamBuffer.currentPosition - streamBuffer.columnStart);
|
||||
|
||||
columnBuffer.position += streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void close(boolean closing) {
|
||||
if (!closed) {
|
||||
if (closing) {
|
||||
streamBuffer.buffer = null;
|
||||
columnBuffer.buffer = null;
|
||||
}
|
||||
|
||||
try {
|
||||
if (initialized) {
|
||||
inputStream.close();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// just eat the exception
|
||||
}
|
||||
|
||||
inputStream = null;
|
||||
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A buffer structure that used to load data from stream for processing.
|
||||
*
|
||||
* @author gke
|
||||
*/
|
||||
private class StreamBuffer {
|
||||
|
||||
char[] buffer;
|
||||
|
||||
int currentPosition;
|
||||
|
||||
// lastIndexToRead is the last index of letter in the buffer for
|
||||
// processing, this will be equal to count - delimiterLegnth. This
|
||||
// is needed because evertime we process a letter, when we need to read
|
||||
// the proceeding letters in order to make sure if currentPosition is
|
||||
// the begin of field or record delmiter
|
||||
int lastIndexToRead;
|
||||
|
||||
int delimiterLength;
|
||||
|
||||
// count indicates how much usable data has been read into the stream,
|
||||
// which will not always be as long as Buffer.Length.
|
||||
int count;
|
||||
|
||||
// columnStart is the position in the buffer when the
|
||||
// current column was started or the last time data
|
||||
// was moved out to the column buffer.
|
||||
int columnStart;
|
||||
|
||||
// streamEndMeet is also very important, when we loag data from stream,
|
||||
// if the count is smaller than buffer.length, it indcates that there
|
||||
// must be no more data in the stream, the stream end is meet.
|
||||
boolean streamEndMeet = false;
|
||||
|
||||
public StreamBuffer() {
|
||||
buffer = new char[StaticSettings.MAX_BUFFER_SIZE];
|
||||
currentPosition = 0;
|
||||
count = 0;
|
||||
columnStart = 0;
|
||||
}
|
||||
|
||||
public void moveTailToHead() {
|
||||
count = count - currentPosition;
|
||||
for (int i = 0; i < count; i++) {
|
||||
buffer[i] = buffer[currentPosition + i];
|
||||
}
|
||||
lastIndexToRead = count - delimiterLength;
|
||||
currentPosition = 0;
|
||||
columnStart = 0;
|
||||
}
|
||||
|
||||
public void skipDelimiter() {
|
||||
currentPosition += delimiterLength;
|
||||
}
|
||||
|
||||
public boolean noDataForDelimter() {
|
||||
return currentPosition > lastIndexToRead;
|
||||
}
|
||||
|
||||
public boolean hasMoreData() {
|
||||
return (currentPosition <= lastIndexToRead)
|
||||
|| (currentPosition > lastIndexToRead && !streamEndMeet);
|
||||
}
|
||||
|
||||
public boolean noData() {
|
||||
return currentPosition >= count;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,441 +0,0 @@
|
||||
// ============================================================================
|
||||
//
|
||||
// Talend Community Edition
|
||||
//
|
||||
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
|
||||
//
|
||||
// This library is free software; you can redistribute it and/or
|
||||
// modify it under the terms of the GNU Lesser General Public
|
||||
// License as published by the Free Software Foundation; either
|
||||
// version 2.1 of the License.
|
||||
//
|
||||
// This library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
// Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program; if not, write to the Free Software
|
||||
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
|
||||
//
|
||||
// ============================================================================
|
||||
package org.talend.fileprocess.delimited;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.text.NumberFormat;
|
||||
|
||||
/**
|
||||
* A stream based parser for parsing delimited text data from a reader. this
|
||||
* class works when the record delimiter and field delimiter are both a String
|
||||
* which length is > 1, and the field delimiter's length is longer than the
|
||||
* record delimiter's.<br/>
|
||||
*
|
||||
* @author gke
|
||||
*
|
||||
*/
|
||||
final class ComplexDelimitedDataReader4 extends DelimitedDataReader {
|
||||
|
||||
private StreamBuffer streamBuffer = new StreamBuffer();
|
||||
|
||||
private char[] fieldDelimiter;
|
||||
|
||||
private char[] recordDelimiter;
|
||||
|
||||
public ComplexDelimitedDataReader4(BufferedReader inputStream, String delimiter,
|
||||
String recordDelimiter, boolean skipEmptyRecords)
|
||||
throws IOException {
|
||||
super(inputStream, skipEmptyRecords);
|
||||
|
||||
this.fieldDelimiter = delimiter.toCharArray();
|
||||
|
||||
this.recordDelimiter = recordDelimiter.toCharArray();
|
||||
|
||||
streamBuffer.fieldDelimiterLength = this.fieldDelimiter.length;
|
||||
|
||||
streamBuffer.recordDelimiterLength = this.recordDelimiter.length;
|
||||
try {
|
||||
streamBuffer.count = inputStream.read(streamBuffer.buffer, 0,
|
||||
streamBuffer.buffer.length);
|
||||
} catch (IOException ex) {
|
||||
close();
|
||||
|
||||
throw ex;
|
||||
}
|
||||
streamBuffer.currentPosition = 0;
|
||||
streamBuffer.columnStart = 0;
|
||||
streamBuffer.lastIndexToRead = streamBuffer.count
|
||||
- streamBuffer.fieldDelimiterLength;
|
||||
streamBuffer.streamEndMeet = (streamBuffer.count < streamBuffer.buffer.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readRecord() throws IOException {
|
||||
checkClosed();
|
||||
columnsCount = 0;
|
||||
hasReadRecord = false;
|
||||
if (streamBuffer.hasMoreData()) {
|
||||
while (streamBuffer.hasMoreData() && !hasReadRecord) {
|
||||
if (streamBuffer.noDataFieldDelimter()) {
|
||||
checkDataLength();
|
||||
} else {
|
||||
if (isStartFieldDelimited()) {
|
||||
// we encountered a column with no data
|
||||
endColumn();
|
||||
streamBuffer.skipFieldDelimiter();
|
||||
} else if (isStartRecordDelimited()) {
|
||||
if (columnsCount > 0 || !skipEmptyRecord) {
|
||||
endColumn();
|
||||
endRecord();
|
||||
}
|
||||
streamBuffer.skipRecordDelimiter();
|
||||
} else {
|
||||
// since the letter wasn't a special letter, this
|
||||
// will be the first letter of our current column
|
||||
startedRow = true;
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition;
|
||||
boolean firstLoop = true;
|
||||
while (streamBuffer.hasMoreData() && startedRow) {
|
||||
if (!firstLoop && streamBuffer.noDataFieldDelimter()) {
|
||||
checkDataLength();
|
||||
} else {
|
||||
if (!firstLoop) {
|
||||
if (isStartFieldDelimited()) {
|
||||
endColumn();
|
||||
streamBuffer.skipFieldDelimiter();
|
||||
} else if (isStartRecordDelimited()) {
|
||||
endColumn();
|
||||
endRecord();
|
||||
streamBuffer.skipRecordDelimiter();
|
||||
}
|
||||
} else {
|
||||
firstLoop = false;
|
||||
}
|
||||
if (startedRow) {
|
||||
streamBuffer.currentPosition++;
|
||||
if (safetySwitch
|
||||
&& streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart
|
||||
+ columnBuffer.position > StaticSettings.MAX_SIZE_FOR_SAFTY) {
|
||||
close();
|
||||
throw new IOException(
|
||||
"Maximum column length of 100,000 exceeded in column "
|
||||
+ NumberFormat
|
||||
.getIntegerInstance()
|
||||
.format(
|
||||
columnsCount)
|
||||
+ " in record "
|
||||
+ NumberFormat
|
||||
.getIntegerInstance()
|
||||
.format(
|
||||
currentRecord)
|
||||
+ ". Set the SafetySwitch property to false"
|
||||
+ " if you're expecting column lengths greater than 100,000 characters to"
|
||||
+ " avoid this error.");
|
||||
}
|
||||
}
|
||||
} // end else
|
||||
}
|
||||
}
|
||||
} // end else
|
||||
}
|
||||
}
|
||||
if (!hasReadRecord) {
|
||||
while (!streamBuffer.noData()) {
|
||||
String tail = streamBuffer.getTail();
|
||||
int index = tail.indexOf(new String(recordDelimiter));
|
||||
if (index == -1) {
|
||||
if (startedRow) {
|
||||
streamBuffer.currentPosition = streamBuffer.count;
|
||||
endColumn();
|
||||
endRecord();
|
||||
streamBuffer.skipRecordDelimiter();
|
||||
return true;
|
||||
} else {
|
||||
if (!startedRow) {
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition;
|
||||
startedRow = true;
|
||||
}
|
||||
streamBuffer.currentPosition = streamBuffer.count;
|
||||
endColumn();
|
||||
endRecord();
|
||||
streamBuffer.skipRecordDelimiter();
|
||||
return true;
|
||||
}
|
||||
} else if (index == 0) {
|
||||
if (columnsCount > 0) {
|
||||
endColumn();
|
||||
endRecord();
|
||||
streamBuffer.skipRecordDelimiter();
|
||||
return true;
|
||||
}
|
||||
if (skipEmptyRecord) {
|
||||
streamBuffer.currentPosition += recordDelimiter.length;
|
||||
continue;
|
||||
} else {
|
||||
endRecord();
|
||||
streamBuffer.skipRecordDelimiter();
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
if (!startedRow) {
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition;
|
||||
startedRow = true;
|
||||
}
|
||||
streamBuffer.currentPosition += index;
|
||||
endColumn();
|
||||
endRecord();
|
||||
streamBuffer.skipRecordDelimiter();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (columnsCount > 0 && streamBuffer.noData()) {
|
||||
endColumn();
|
||||
endRecord();
|
||||
}
|
||||
}
|
||||
return hasReadRecord;
|
||||
}
|
||||
|
||||
private boolean isStartFieldDelimited() {
|
||||
for (int i = 0; i < this.fieldDelimiter.length; i++) {
|
||||
if (streamBuffer.buffer[streamBuffer.currentPosition + i] != this.fieldDelimiter[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean isStartRecordDelimited() {
|
||||
for (int i = 0; i < this.recordDelimiter.length; i++) {
|
||||
if (streamBuffer.buffer[streamBuffer.currentPosition + i] != this.recordDelimiter[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @exception IOException
|
||||
* Thrown if an error occurs while reading data from the
|
||||
* source stream.
|
||||
*/
|
||||
private void checkDataLength() throws IOException {
|
||||
updateCurrentValue();
|
||||
streamBuffer.moveTailToHead();
|
||||
int count = 0;
|
||||
try {
|
||||
count = inputStream.read(streamBuffer.buffer, streamBuffer.count,
|
||||
streamBuffer.buffer.length - streamBuffer.count);
|
||||
} catch (IOException ex) {
|
||||
close();
|
||||
|
||||
throw ex;
|
||||
}
|
||||
streamBuffer.count += count;
|
||||
streamBuffer.lastIndexToRead = streamBuffer.count
|
||||
- streamBuffer.fieldDelimiterLength;
|
||||
if (streamBuffer.count < streamBuffer.buffer.length) {
|
||||
streamBuffer.streamEndMeet = true;
|
||||
}
|
||||
// if no more data could be found, set flag stating that
|
||||
// the end of the data was found
|
||||
|
||||
// if (dataBuffer.Count == -1) {
|
||||
// hasMoreData = false;
|
||||
// }
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @exception IOException
|
||||
* Thrown if a very rare extreme exception occurs during
|
||||
* parsing, normally resulting from improper data format.
|
||||
*/
|
||||
private void endColumn() throws IOException {
|
||||
String currentValue = "";
|
||||
|
||||
// must be called before setting startedColumn = false
|
||||
if (startedRow) {
|
||||
if (columnBuffer.position == 0) {
|
||||
if (streamBuffer.columnStart < streamBuffer.currentPosition) {
|
||||
// if(c)
|
||||
currentValue = new String(streamBuffer.buffer,
|
||||
streamBuffer.columnStart, streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart);
|
||||
}
|
||||
} else {
|
||||
updateCurrentValue();
|
||||
currentValue = new String(columnBuffer.buffer, 0,
|
||||
columnBuffer.position);
|
||||
}
|
||||
}
|
||||
|
||||
columnBuffer.position = 0;
|
||||
|
||||
startedRow = false;
|
||||
|
||||
if (columnsCount >= StaticSettings.MAX_SIZE_FOR_SAFTY && safetySwitch) {
|
||||
close();
|
||||
|
||||
throw new IOException(
|
||||
"Maximum column count of 100,000 exceeded in record "
|
||||
+ NumberFormat.getIntegerInstance().format(
|
||||
currentRecord)
|
||||
+ ". Set the SafetySwitch property to false"
|
||||
+ " if you're expecting more than 100,000 columns per record to"
|
||||
+ " avoid this error.");
|
||||
}
|
||||
|
||||
// check to see if our current holder array for
|
||||
// column chunks is still big enough to handle another
|
||||
// column chunk
|
||||
|
||||
if (columnsCount == values.length) {
|
||||
// holder array needs to grow to be able to hold another column
|
||||
int newLength = values.length * 2;
|
||||
|
||||
String[] holder = new String[newLength];
|
||||
|
||||
System.arraycopy(values, 0, holder, 0, values.length);
|
||||
|
||||
values = holder;
|
||||
}
|
||||
|
||||
values[columnsCount] = currentValue;
|
||||
|
||||
currentValue = "";
|
||||
|
||||
columnsCount++;
|
||||
}
|
||||
|
||||
private void updateCurrentValue() {
|
||||
if (startedRow
|
||||
&& streamBuffer.columnStart < streamBuffer.currentPosition) {
|
||||
if (columnBuffer.buffer.length - columnBuffer.position < streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart) {
|
||||
int newLength = columnBuffer.buffer.length
|
||||
+ Math.max(streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart,
|
||||
columnBuffer.buffer.length);
|
||||
|
||||
char[] holder = new char[newLength];
|
||||
|
||||
System.arraycopy(columnBuffer.buffer, 0, holder, 0,
|
||||
columnBuffer.position);
|
||||
|
||||
columnBuffer.buffer = holder;
|
||||
}
|
||||
|
||||
System.arraycopy(streamBuffer.buffer, streamBuffer.columnStart,
|
||||
columnBuffer.buffer, columnBuffer.position,
|
||||
streamBuffer.currentPosition - streamBuffer.columnStart);
|
||||
|
||||
columnBuffer.position += streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart;
|
||||
}
|
||||
|
||||
// dataBuffer.ColumnStart = dataBuffer.Position + 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void close(boolean closing) {
|
||||
if (!closed) {
|
||||
if (closing) {
|
||||
streamBuffer.buffer = null;
|
||||
columnBuffer.buffer = null;
|
||||
}
|
||||
|
||||
try {
|
||||
if (initialized) {
|
||||
inputStream.close();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// just eat the exception
|
||||
}
|
||||
|
||||
inputStream = null;
|
||||
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A buffer structure that used to load data from stream for processing.
|
||||
*
|
||||
* @author gke
|
||||
*/
|
||||
private class StreamBuffer {
|
||||
|
||||
char[] buffer;
|
||||
|
||||
int currentPosition;
|
||||
|
||||
// lastIndexToRead is the last index of letter in the buffer for
|
||||
// processing, this will be equal to count - fieldDelimiterLegnth. This
|
||||
// is needed because evertime we process a letter, when we need to read
|
||||
// the proceeding letters in order to make sure if currentPosition is
|
||||
// the begin of field delmiter
|
||||
int lastIndexToRead;
|
||||
|
||||
int fieldDelimiterLength;
|
||||
|
||||
int recordDelimiterLength;
|
||||
|
||||
// count indicates how much usable data has been read into the stream,
|
||||
// which will not always be as long as buffer.Length.
|
||||
int count;
|
||||
|
||||
// columnStart is the position in the buffer when the
|
||||
// current column was started or the last time data
|
||||
// was moved out to the column buffer.
|
||||
int columnStart;
|
||||
|
||||
// streamEndMeet is also very important, when we loag data from stream,
|
||||
// if the count is smaller than buffer.length, it indcates that there
|
||||
// must be no more data in the stream, the stream end is meet.
|
||||
boolean streamEndMeet = false;
|
||||
|
||||
public StreamBuffer() {
|
||||
buffer = new char[StaticSettings.MAX_BUFFER_SIZE];
|
||||
currentPosition = 0;
|
||||
count = 0;
|
||||
columnStart = 0;
|
||||
}
|
||||
|
||||
public void moveTailToHead() {
|
||||
count = count - currentPosition;
|
||||
for (int i = 0; i < count; i++) {
|
||||
buffer[i] = buffer[currentPosition + i];
|
||||
}
|
||||
lastIndexToRead = count - fieldDelimiterLength;
|
||||
currentPosition = 0;
|
||||
columnStart = 0;
|
||||
}
|
||||
|
||||
public void skipFieldDelimiter() {
|
||||
currentPosition += fieldDelimiterLength;
|
||||
}
|
||||
|
||||
public void skipRecordDelimiter() {
|
||||
currentPosition += recordDelimiterLength;
|
||||
}
|
||||
|
||||
public boolean noDataFieldDelimter() {
|
||||
return currentPosition > lastIndexToRead;
|
||||
}
|
||||
|
||||
public boolean hasMoreData() {
|
||||
return (currentPosition <= lastIndexToRead)
|
||||
|| (currentPosition > lastIndexToRead && !streamEndMeet);
|
||||
}
|
||||
|
||||
public boolean noData() {
|
||||
return currentPosition >= count;
|
||||
}
|
||||
|
||||
public String getTail() {
|
||||
return new String(buffer, currentPosition, count - currentPosition);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,428 +0,0 @@
|
||||
// ============================================================================
|
||||
//
|
||||
// Talend Community Edition
|
||||
//
|
||||
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
|
||||
//
|
||||
// This library is free software; you can redistribute it and/or
|
||||
// modify it under the terms of the GNU Lesser General Public
|
||||
// License as published by the Free Software Foundation; either
|
||||
// version 2.1 of the License.
|
||||
//
|
||||
// This library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
// Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program; if not, write to the Free Software
|
||||
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
|
||||
//
|
||||
// ============================================================================
|
||||
package org.talend.fileprocess.delimited;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.text.NumberFormat;
|
||||
|
||||
/**
|
||||
* A stream based parser for parsing delimited text data from a reader. this
|
||||
* class works when the record delimiter and field delimiter are both a String
|
||||
* which length is > 1, and the record delimiter's length is longer than the
|
||||
* field delimiter's.<br/>
|
||||
*
|
||||
* @author gke
|
||||
*
|
||||
*/
|
||||
final class ComplexDelimitedDataReader5 extends DelimitedDataReader {
|
||||
|
||||
private StreamBuffer streamBuffer = new StreamBuffer();
|
||||
|
||||
private char[] fieldDelimiter;
|
||||
|
||||
private char[] recordDelimiter;
|
||||
|
||||
public ComplexDelimitedDataReader5(BufferedReader inputStream, String delimiter,
|
||||
String recordDelimiter, boolean skipEmptyRecords)
|
||||
throws IOException {
|
||||
|
||||
super(inputStream, skipEmptyRecords);
|
||||
|
||||
this.fieldDelimiter = delimiter.toCharArray();
|
||||
|
||||
this.recordDelimiter = recordDelimiter.toCharArray();
|
||||
|
||||
streamBuffer.fieldDelimiterLength = this.fieldDelimiter.length;
|
||||
|
||||
streamBuffer.recordDelimiterLength = this.recordDelimiter.length;
|
||||
try {
|
||||
streamBuffer.count = inputStream.read(streamBuffer.buffer, 0,
|
||||
streamBuffer.buffer.length);
|
||||
} catch (IOException ex) {
|
||||
close();
|
||||
|
||||
throw ex;
|
||||
}
|
||||
streamBuffer.currentPosition = 0;
|
||||
streamBuffer.columnStart = 0;
|
||||
streamBuffer.lastIndexToRead = streamBuffer.count
|
||||
- streamBuffer.recordDelimiterLength;
|
||||
streamBuffer.streamEndMeet = (streamBuffer.count < streamBuffer.buffer.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readRecord() throws IOException {
|
||||
checkClosed();
|
||||
|
||||
columnsCount = 0;
|
||||
hasReadRecord = false;
|
||||
|
||||
if (streamBuffer.hasMoreData()) {
|
||||
while (streamBuffer.hasMoreData() && !hasReadRecord) {
|
||||
if (streamBuffer.noDataRecordDelimter()) {
|
||||
checkDataLength();
|
||||
} else {
|
||||
if (isStartFieldDelimited()) {
|
||||
// we encountered a column with no data
|
||||
endColumn();
|
||||
streamBuffer.skipFieldDelimiter();
|
||||
} else if (isStartRecordDelimited()) {
|
||||
if (columnsCount > 0 || !skipEmptyRecord) {
|
||||
endColumn();
|
||||
endRecord();
|
||||
}
|
||||
streamBuffer.skipRecordDelimiter();
|
||||
|
||||
} else {
|
||||
// since the letter wasn't a special letter, this
|
||||
// will be the first letter of our current column
|
||||
|
||||
startedRow = true;
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition;
|
||||
|
||||
boolean firstLoop = true;
|
||||
|
||||
while (streamBuffer.hasMoreData() && startedRow) {
|
||||
if (!firstLoop
|
||||
&& streamBuffer.noDataRecordDelimter()) {
|
||||
checkDataLength();
|
||||
} else {
|
||||
if (!firstLoop) {
|
||||
if (isStartFieldDelimited()) {
|
||||
endColumn();
|
||||
streamBuffer.skipFieldDelimiter();
|
||||
} else if (isStartRecordDelimited()) {
|
||||
endColumn();
|
||||
|
||||
endRecord();
|
||||
|
||||
streamBuffer.skipRecordDelimiter();
|
||||
}
|
||||
} else {
|
||||
firstLoop = false;
|
||||
}
|
||||
|
||||
if (startedRow) {
|
||||
streamBuffer.currentPosition++;
|
||||
|
||||
if (safetySwitch
|
||||
&& streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart
|
||||
+ columnBuffer.position > StaticSettings.MAX_SIZE_FOR_SAFTY) {
|
||||
close();
|
||||
|
||||
throw new IOException(
|
||||
"Maximum column length of 100,000 exceeded in column "
|
||||
+ NumberFormat
|
||||
.getIntegerInstance()
|
||||
.format(
|
||||
columnsCount)
|
||||
+ " in record "
|
||||
+ NumberFormat
|
||||
.getIntegerInstance()
|
||||
.format(
|
||||
currentRecord)
|
||||
+ ". Set the SafetySwitch property to false"
|
||||
+ " if you're expecting column lengths greater than 100,000 characters to"
|
||||
+ " avoid this error.");
|
||||
}
|
||||
}
|
||||
} // end else
|
||||
}
|
||||
}
|
||||
} // end else
|
||||
}
|
||||
|
||||
}
|
||||
if (!hasReadRecord) {
|
||||
while (!streamBuffer.noData()) {
|
||||
String tail = streamBuffer.getTail();
|
||||
int index = tail.indexOf(new String(this.fieldDelimiter));
|
||||
if (index == -1) {
|
||||
if (startedRow) {
|
||||
streamBuffer.currentPosition = streamBuffer.count;
|
||||
endColumn();
|
||||
endRecord();
|
||||
return true;
|
||||
} else {
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition;
|
||||
startedRow = true;
|
||||
streamBuffer.currentPosition = streamBuffer.count;
|
||||
endColumn();
|
||||
endRecord();
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
if (!startedRow) {
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition;
|
||||
startedRow = true;
|
||||
}
|
||||
streamBuffer.currentPosition += index;
|
||||
endColumn();
|
||||
streamBuffer.skipFieldDelimiter();
|
||||
}
|
||||
|
||||
}
|
||||
if (startedRow || columnsCount > 0) {
|
||||
endRecord();
|
||||
}
|
||||
}
|
||||
|
||||
return hasReadRecord;
|
||||
}
|
||||
|
||||
private boolean isStartFieldDelimited() {
|
||||
for (int i = 0; i < this.fieldDelimiter.length; i++) {
|
||||
if (streamBuffer.buffer[streamBuffer.currentPosition + i] != this.fieldDelimiter[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean isStartRecordDelimited() {
|
||||
for (int i = 0; i < this.recordDelimiter.length; i++) {
|
||||
if (streamBuffer.buffer[streamBuffer.currentPosition + i] != this.recordDelimiter[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @exception IOException
|
||||
* Thrown if an error occurs while reading data from the
|
||||
* source stream.
|
||||
*/
|
||||
private void checkDataLength() throws IOException {
|
||||
updateCurrentValue();
|
||||
streamBuffer.moveTailToHead();
|
||||
int count = 0;
|
||||
try {
|
||||
count = inputStream.read(streamBuffer.buffer, streamBuffer.count,
|
||||
streamBuffer.buffer.length - streamBuffer.count);
|
||||
} catch (IOException ex) {
|
||||
close();
|
||||
|
||||
throw ex;
|
||||
}
|
||||
streamBuffer.count += count;
|
||||
streamBuffer.lastIndexToRead = streamBuffer.count
|
||||
- streamBuffer.recordDelimiterLength;
|
||||
if (streamBuffer.count < streamBuffer.buffer.length) {
|
||||
streamBuffer.streamEndMeet = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @exception IOException
|
||||
* Thrown if a very rare extreme exception occurs during
|
||||
* parsing, normally resulting from improper data format.
|
||||
*/
|
||||
private void endColumn() throws IOException {
|
||||
String currentValue = "";
|
||||
|
||||
// must be called before setting startedColumn = false
|
||||
if (startedRow) {
|
||||
if (columnBuffer.position == 0) {
|
||||
if (streamBuffer.columnStart < streamBuffer.currentPosition) {
|
||||
currentValue = new String(streamBuffer.buffer,
|
||||
streamBuffer.columnStart,
|
||||
streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart);
|
||||
}
|
||||
} else {
|
||||
updateCurrentValue();
|
||||
currentValue = new String(columnBuffer.buffer, 0,
|
||||
columnBuffer.position);
|
||||
}
|
||||
}
|
||||
|
||||
columnBuffer.position = 0;
|
||||
|
||||
startedRow = false;
|
||||
|
||||
if (columnsCount >= StaticSettings.MAX_SIZE_FOR_SAFTY && safetySwitch) {
|
||||
close();
|
||||
|
||||
throw new IOException(
|
||||
"Maximum column count of 100,000 exceeded in record "
|
||||
+ NumberFormat.getIntegerInstance().format(
|
||||
currentRecord)
|
||||
+ ". Set the SafetySwitch property to false"
|
||||
+ " if you're expecting more than 100,000 columns per record to"
|
||||
+ " avoid this error.");
|
||||
}
|
||||
|
||||
// check to see if our current holder array for
|
||||
// column chunks is still big enough to handle another
|
||||
// column chunk
|
||||
|
||||
if (columnsCount == values.length) {
|
||||
// holder array needs to grow to be able to hold another column
|
||||
int newLength = values.length * 2;
|
||||
|
||||
String[] holder = new String[newLength];
|
||||
|
||||
System.arraycopy(values, 0, holder, 0, values.length);
|
||||
|
||||
values = holder;
|
||||
}
|
||||
|
||||
values[columnsCount] = currentValue;
|
||||
|
||||
currentValue = "";
|
||||
|
||||
columnsCount++;
|
||||
}
|
||||
|
||||
private void updateCurrentValue() {
|
||||
if (startedRow
|
||||
&& streamBuffer.columnStart < streamBuffer.currentPosition) {
|
||||
if (columnBuffer.buffer.length - columnBuffer.position < streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart) {
|
||||
int newLength = columnBuffer.buffer.length
|
||||
+ Math.max(streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart,
|
||||
columnBuffer.buffer.length);
|
||||
|
||||
char[] holder = new char[newLength];
|
||||
|
||||
System.arraycopy(columnBuffer.buffer, 0, holder, 0,
|
||||
columnBuffer.position);
|
||||
|
||||
columnBuffer.buffer = holder;
|
||||
}
|
||||
|
||||
System.arraycopy(streamBuffer.buffer, streamBuffer.columnStart,
|
||||
columnBuffer.buffer, columnBuffer.position,
|
||||
streamBuffer.currentPosition - streamBuffer.columnStart);
|
||||
|
||||
columnBuffer.position += streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart;
|
||||
}
|
||||
|
||||
// dataBuffer.ColumnStart = dataBuffer.Position + 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void close(boolean closing) {
|
||||
if (!closed) {
|
||||
if (closing) {
|
||||
streamBuffer.buffer = null;
|
||||
columnBuffer.buffer = null;
|
||||
}
|
||||
|
||||
try {
|
||||
if (initialized) {
|
||||
inputStream.close();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// just eat the exception
|
||||
}
|
||||
|
||||
inputStream = null;
|
||||
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A buffer structure that used to load data from stream for processing.
|
||||
*
|
||||
* @author gke
|
||||
*/
|
||||
private class StreamBuffer {
|
||||
|
||||
char[] buffer;
|
||||
|
||||
int currentPosition;
|
||||
|
||||
// lastIndexToRead is the last index of letter in the buffer for
|
||||
// processing, this will be equal to count - recordDelimiterLegnth. This
|
||||
// is needed because evertime we process a letter, when we need to read
|
||||
// the proceeding letters in order to make sure if currentPosition is
|
||||
// the begin of record delmiter
|
||||
int lastIndexToRead;
|
||||
|
||||
int fieldDelimiterLength;
|
||||
|
||||
int recordDelimiterLength;
|
||||
|
||||
// count indicates how much usable data has been read into the stream,
|
||||
// which will not always be as long as buffer.Length.
|
||||
int count;
|
||||
|
||||
// columnStart is the position in the buffer when the
|
||||
// current column was started or the last time data
|
||||
// was moved out to the column buffer.
|
||||
int columnStart;
|
||||
|
||||
// streamEndMeet is also very important, when we loag data from stream,
|
||||
// if the count is smaller than buffer.length, it indcates that there
|
||||
// must be no more data in the stream, the stream end is meet.
|
||||
boolean streamEndMeet = false;
|
||||
|
||||
public StreamBuffer() {
|
||||
buffer = new char[StaticSettings.MAX_BUFFER_SIZE];
|
||||
currentPosition = 0;
|
||||
count = 0;
|
||||
columnStart = 0;
|
||||
}
|
||||
|
||||
public void moveTailToHead() {
|
||||
count = count - currentPosition;
|
||||
for (int i = 0; i < count; i++) {
|
||||
buffer[i] = buffer[currentPosition + i];
|
||||
}
|
||||
lastIndexToRead = count - recordDelimiterLength;
|
||||
currentPosition = 0;
|
||||
columnStart = 0;
|
||||
}
|
||||
|
||||
public void skipFieldDelimiter() {
|
||||
currentPosition += fieldDelimiterLength;
|
||||
}
|
||||
|
||||
public void skipRecordDelimiter() {
|
||||
currentPosition += recordDelimiterLength;
|
||||
}
|
||||
|
||||
public boolean noDataRecordDelimter() {
|
||||
return currentPosition > lastIndexToRead;
|
||||
}
|
||||
|
||||
public boolean hasMoreData() {
|
||||
return (currentPosition <= lastIndexToRead)
|
||||
|| (currentPosition > lastIndexToRead && !streamEndMeet);
|
||||
}
|
||||
|
||||
public boolean noData() {
|
||||
return currentPosition >= count;
|
||||
}
|
||||
|
||||
public String getTail() {
|
||||
return new String(buffer, currentPosition, count - currentPosition);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,156 +0,0 @@
|
||||
// ============================================================================
|
||||
//
|
||||
// Talend Community Edition
|
||||
//
|
||||
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
|
||||
//
|
||||
// This library is free software; you can redistribute it and/or
|
||||
// modify it under the terms of the GNU Lesser General Public
|
||||
// License as published by the Free Software Foundation; either
|
||||
// version 2.1 of the License.
|
||||
//
|
||||
// This library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
// Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program; if not, write to the Free Software
|
||||
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
|
||||
//
|
||||
// ============================================================================
|
||||
package org.talend.fileprocess.delimited;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
|
||||
/**
|
||||
* This factory is used to create a properiate DelimitedDataReader accordiong to the field delimiter(or field separator)
|
||||
* and record delimiter(or row separator). All the APIs that needed by tFileInputDelimited are provided by
|
||||
* DelimitedDataReader. And we can not directly creat the instance of subclasses of DelimitedDataReader, in fact, all
|
||||
* the subclasses are invisible out of this package.<br/>
|
||||
*
|
||||
* @author gke
|
||||
*/
|
||||
public class DelimitedDataReaderFactory {
|
||||
|
||||
/**
|
||||
* DOC ke Comment method "createDelimitedDataReader".
|
||||
*
|
||||
* @param inputStream A java Reader source of delimited data
|
||||
* @param delimiter the field delimiter(field separator)
|
||||
* @param recordDelimiter the record delimiter(row separator)
|
||||
* @param skipEmptyRecords flag for skip the empty records
|
||||
* @return an instance of DelmitedDataReader
|
||||
* @throws IOException
|
||||
*/
|
||||
public static DelimitedDataReader createDelimitedDataReader(BufferedReader inputStream, String delimiter,
|
||||
String recordDelimiter, boolean skipEmptyRecords) throws IOException {
|
||||
int dLength = delimiter.length();
|
||||
int rLength = recordDelimiter.length();
|
||||
if (dLength == 1 && rLength == 1) {
|
||||
|
||||
return new SimpleDelimitedDataReader(inputStream, delimiter.charAt(0), recordDelimiter.charAt(0),
|
||||
skipEmptyRecords);
|
||||
|
||||
} else if (dLength == 1 && rLength > 1) {
|
||||
|
||||
return new ComplexDelimitedDataReader1(inputStream, delimiter.charAt(0), recordDelimiter, skipEmptyRecords);
|
||||
|
||||
} else if (dLength > 1 && rLength == 1) {
|
||||
|
||||
return new ComplexDelimitedDataReader2(inputStream, delimiter, recordDelimiter.charAt(0), skipEmptyRecords);
|
||||
|
||||
} else if (dLength == rLength) {
|
||||
|
||||
return new ComplexDelimitedDataReader3(inputStream, delimiter, recordDelimiter, skipEmptyRecords);
|
||||
|
||||
} else if (dLength > rLength) {
|
||||
|
||||
return new ComplexDelimitedDataReader4(inputStream, delimiter, recordDelimiter, skipEmptyRecords);
|
||||
|
||||
} else {
|
||||
|
||||
return new ComplexDelimitedDataReader5(inputStream, delimiter, recordDelimiter, skipEmptyRecords);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* DOC ke Comment method "createDelimitedDataReader".
|
||||
*
|
||||
* @param file a delimited file
|
||||
* @param delimiter the field delimiter(field separator)
|
||||
* @param recordDelimiter the record delimiter(row separator)
|
||||
* @param skipEmptyRecords flag for skip the empty records
|
||||
* @return an instance of DelmitedDataReader
|
||||
* @throws IOException
|
||||
*/
|
||||
public static DelimitedDataReader createDelimitedDataReader(String file, String delimiter, String recordDelimiter,
|
||||
boolean skipEmptyRecords) throws IOException {
|
||||
|
||||
BufferedReader inputStream = new BufferedReader(new FileReader(file));
|
||||
return createDelimitedDataReader(inputStream, delimiter, recordDelimiter, skipEmptyRecords);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* DOC ke Comment method "createDelimitedDataReader".
|
||||
*
|
||||
* @param file a delimited file
|
||||
* @param encoding
|
||||
* @param delimiter the field delimiter(field separator)
|
||||
* @param recordDelimiter the record delimiter(row separator)
|
||||
* @param skipEmptyRecords flag for skip the empty records
|
||||
* @return an instance of DelmitedDataReader
|
||||
* @throws IOException
|
||||
*/
|
||||
public static DelimitedDataReader createDelimitedDataReader(String file, String encoding, String delimiter,
|
||||
String recordDelimiter, boolean skipEmptyRecords) throws IOException {
|
||||
|
||||
BufferedReader inputStream = new BufferedReader(new InputStreamReader(new FileInputStream(file), encoding));
|
||||
return createDelimitedDataReader(inputStream, delimiter, recordDelimiter, skipEmptyRecords);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* DOC ke Comment method "createDelimitedDataReader".
|
||||
*
|
||||
* @param inputStream A java stream source of delimited data
|
||||
* @param delimiter the field delimiter(field separator)
|
||||
* @param recordDelimiter the record delimiter(row separator)
|
||||
* @param skipEmptyRecords flag for skip the empty records
|
||||
* @return an instance of DelmitedDataReader
|
||||
* @throws IOException
|
||||
*/
|
||||
public static DelimitedDataReader createDelimitedDataReader(InputStream inputStream, String delimiter,
|
||||
String recordDelimiter, boolean skipEmptyRecords) throws IOException {
|
||||
|
||||
BufferedReader inputStreamReader = new BufferedReader(new InputStreamReader(inputStream));
|
||||
return createDelimitedDataReader(inputStreamReader, delimiter, recordDelimiter, skipEmptyRecords);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* DOC ke Comment method "createDelimitedDataReader".
|
||||
*
|
||||
* @param inputStream A java stream source of delimited data
|
||||
* @param encoding
|
||||
* @param delimiter the field delimiter(field separator)
|
||||
* @param recordDelimiter the record delimiter(row separator)
|
||||
* @param skipEmptyRecords flag for skip the empty records
|
||||
* @return an instance of DelmitedDataReader
|
||||
* @throws IOException
|
||||
*/
|
||||
public static DelimitedDataReader createDelimitedDataReader(InputStream inputStream, String encoding,
|
||||
String delimiter, String recordDelimiter, boolean skipEmptyRecords) throws IOException {
|
||||
|
||||
BufferedReader inputStreamReader = new BufferedReader(new InputStreamReader(inputStream, encoding));
|
||||
return createDelimitedDataReader(inputStreamReader, delimiter, recordDelimiter, skipEmptyRecords);
|
||||
|
||||
}
|
||||
}
|
||||
@@ -1,358 +0,0 @@
|
||||
// ============================================================================
|
||||
//
|
||||
// Talend Community Edition
|
||||
//
|
||||
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
|
||||
//
|
||||
// This library is free software; you can redistribute it and/or
|
||||
// modify it under the terms of the GNU Lesser General Public
|
||||
// License as published by the Free Software Foundation; either
|
||||
// version 2.1 of the License.
|
||||
//
|
||||
// This library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
// Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program; if not, write to the Free Software
|
||||
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
|
||||
//
|
||||
// ============================================================================
|
||||
package org.talend.fileprocess.delimited;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.text.NumberFormat;
|
||||
|
||||
/**
|
||||
* A stream based parser for parsing delimited text data from a file or a
|
||||
* stream. This works only when the field and record delimiter are both single
|
||||
* character<br/>
|
||||
*
|
||||
* @author gke
|
||||
*/
|
||||
public class SimpleDelimitedDataReader extends DelimitedDataReader {
|
||||
|
||||
private boolean useCustomRecordDelimiter = false;
|
||||
|
||||
private StreamBuffer streamBuffer = new StreamBuffer();
|
||||
|
||||
private boolean hasMoreData = true;
|
||||
|
||||
private char lastLetter = '\0';
|
||||
|
||||
private char fieldDelimiter;
|
||||
|
||||
private char recordDelimiter;
|
||||
|
||||
public SimpleDelimitedDataReader(BufferedReader inputStream, char delimiter,
|
||||
char recordDelimiter, boolean skipEmptyRecords) {
|
||||
|
||||
super(inputStream, skipEmptyRecords);
|
||||
fieldDelimiter = delimiter;
|
||||
this.recordDelimiter = recordDelimiter;
|
||||
if (recordDelimiter != '\n' && recordDelimiter != '\r') {
|
||||
useCustomRecordDelimiter = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readRecord() throws IOException {
|
||||
checkClosed();
|
||||
|
||||
columnsCount = 0;
|
||||
|
||||
hasReadRecord = false;
|
||||
|
||||
// check to see if we've already found the end of data
|
||||
|
||||
if (hasMoreData) {
|
||||
// loop over the data stream until the end of data is found
|
||||
// or the end of the record is found
|
||||
|
||||
while (hasMoreData && !hasReadRecord) {
|
||||
if (streamBuffer.currentPosition == streamBuffer.count) {
|
||||
checkDataLength();
|
||||
} else {
|
||||
// grab the current letter as a char
|
||||
|
||||
char currentLetter = streamBuffer.buffer[streamBuffer.currentPosition];
|
||||
|
||||
if (currentLetter == fieldDelimiter) {
|
||||
// we encountered a column with no data, so
|
||||
// just send the end column
|
||||
|
||||
lastLetter = currentLetter;
|
||||
|
||||
endColumn();
|
||||
} else if (useCustomRecordDelimiter
|
||||
&& currentLetter == recordDelimiter) {
|
||||
if (columnsCount > 0 || !skipEmptyRecord) {
|
||||
endColumn();
|
||||
endRecord();
|
||||
}
|
||||
|
||||
lastLetter = currentLetter;
|
||||
|
||||
} else if (!useCustomRecordDelimiter
|
||||
&& (currentLetter == '\r' || currentLetter == '\n')) {
|
||||
if ((!skipEmptyRecord || columnsCount > 0)
|
||||
&& ((currentLetter == '\r' || lastLetter != '\r'))) {
|
||||
endColumn();
|
||||
endRecord();
|
||||
}
|
||||
|
||||
lastLetter = currentLetter;
|
||||
} else {
|
||||
// since the letter wasn't a special letter, this
|
||||
// will be the first letter of our current column
|
||||
|
||||
startedRow = true;
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition;
|
||||
|
||||
boolean firstLoop = true;
|
||||
|
||||
while (hasMoreData && startedRow) {
|
||||
if (!firstLoop
|
||||
&& streamBuffer.currentPosition == streamBuffer.count) {
|
||||
checkDataLength();
|
||||
} else {
|
||||
if (!firstLoop) {
|
||||
// grab the current letter as a char
|
||||
currentLetter = streamBuffer.buffer[streamBuffer.currentPosition];
|
||||
|
||||
if (currentLetter == fieldDelimiter) {
|
||||
endColumn();
|
||||
} else if ((!useCustomRecordDelimiter && (currentLetter == '\r' || currentLetter == '\n'))
|
||||
|| (useCustomRecordDelimiter && currentLetter == recordDelimiter)) {
|
||||
endColumn();
|
||||
|
||||
endRecord();
|
||||
}
|
||||
} else {
|
||||
|
||||
firstLoop = false;
|
||||
|
||||
}
|
||||
|
||||
lastLetter = currentLetter;
|
||||
|
||||
if (startedRow) {
|
||||
streamBuffer.currentPosition++;
|
||||
|
||||
if (safetySwitch
|
||||
&& streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart
|
||||
+ columnBuffer.position > StaticSettings.MAX_SIZE_FOR_SAFTY) {
|
||||
close();
|
||||
|
||||
throw new IOException(
|
||||
"Maximum column length of 100,000 exceeded in column "
|
||||
+ NumberFormat
|
||||
.getIntegerInstance()
|
||||
.format(
|
||||
columnsCount)
|
||||
+ " in record "
|
||||
+ NumberFormat
|
||||
.getIntegerInstance()
|
||||
.format(
|
||||
currentRecord)
|
||||
+ ". Set the SafetySwitch property to false"
|
||||
+ " if you're expecting column lengths greater than 100,000 characters to"
|
||||
+ " avoid this error.");
|
||||
}
|
||||
}
|
||||
} // end else
|
||||
}
|
||||
}
|
||||
|
||||
if (hasMoreData) {
|
||||
streamBuffer.currentPosition++;
|
||||
}
|
||||
} // end else
|
||||
}
|
||||
|
||||
// check to see if we hit the end of the file
|
||||
// without processing the current record
|
||||
|
||||
if (startedRow || lastLetter == fieldDelimiter) {
|
||||
endColumn();
|
||||
|
||||
endRecord();
|
||||
}
|
||||
}
|
||||
|
||||
return hasReadRecord;
|
||||
}
|
||||
|
||||
/**
|
||||
* @exception IOException
|
||||
* Thrown if an error occurs while reading data from the
|
||||
* source stream.
|
||||
*/
|
||||
private void checkDataLength() throws IOException {
|
||||
updateCurrentValue();
|
||||
|
||||
try {
|
||||
streamBuffer.count = inputStream.read(streamBuffer.buffer, 0,
|
||||
streamBuffer.buffer.length);
|
||||
} catch (IOException ex) {
|
||||
close();
|
||||
|
||||
throw ex;
|
||||
}
|
||||
|
||||
// if no more data could be found, set flag stating that
|
||||
// the end of the data was found
|
||||
|
||||
if (streamBuffer.count == -1) {
|
||||
hasMoreData = false;
|
||||
}
|
||||
|
||||
streamBuffer.currentPosition = 0;
|
||||
streamBuffer.columnStart = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @exception IOException
|
||||
* Thrown if a very rare extreme exception occurs during
|
||||
* parsing, normally resulting from improper data format.
|
||||
*/
|
||||
private void endColumn() throws IOException {
|
||||
String currentValue = "";
|
||||
|
||||
// must be called before setting startedColumn = false
|
||||
if (startedRow) {
|
||||
if (columnBuffer.position == 0) {
|
||||
if (streamBuffer.columnStart < streamBuffer.currentPosition) {
|
||||
currentValue = new String(streamBuffer.buffer,
|
||||
streamBuffer.columnStart,
|
||||
streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart);
|
||||
}
|
||||
} else {
|
||||
updateCurrentValue();
|
||||
currentValue = new String(columnBuffer.buffer, 0,
|
||||
columnBuffer.position);
|
||||
}
|
||||
}
|
||||
|
||||
columnBuffer.position = 0;
|
||||
|
||||
startedRow = false;
|
||||
|
||||
if (columnsCount >= StaticSettings.MAX_SIZE_FOR_SAFTY && safetySwitch) {
|
||||
close();
|
||||
|
||||
throw new IOException(
|
||||
"Maximum column count of 100,000 exceeded in record "
|
||||
+ NumberFormat.getIntegerInstance().format(
|
||||
currentRecord)
|
||||
+ ". Set the SafetySwitch property to false"
|
||||
+ " if you're expecting more than 100,000 columns per record to"
|
||||
+ " avoid this error.");
|
||||
}
|
||||
|
||||
// check to see if our current holder array for
|
||||
// column chunks is still big enough to handle another
|
||||
// column chunk
|
||||
|
||||
if (columnsCount == values.length) {
|
||||
// holder array needs to grow to be able to hold another column
|
||||
int newLength = values.length * 2;
|
||||
|
||||
String[] holder = new String[newLength];
|
||||
|
||||
System.arraycopy(values, 0, holder, 0, values.length);
|
||||
|
||||
values = holder;
|
||||
}
|
||||
|
||||
values[columnsCount] = currentValue;
|
||||
|
||||
currentValue = "";
|
||||
|
||||
columnsCount++;
|
||||
}
|
||||
|
||||
private void updateCurrentValue() {
|
||||
if (startedRow
|
||||
&& streamBuffer.columnStart < streamBuffer.currentPosition) {
|
||||
if (columnBuffer.buffer.length - columnBuffer.position < streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart) {
|
||||
int newLength = columnBuffer.buffer.length
|
||||
+ Math.max(streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart,
|
||||
columnBuffer.buffer.length);
|
||||
|
||||
char[] holder = new char[newLength];
|
||||
|
||||
System.arraycopy(columnBuffer.buffer, 0, holder, 0,
|
||||
columnBuffer.position);
|
||||
|
||||
columnBuffer.buffer = holder;
|
||||
}
|
||||
|
||||
System.arraycopy(streamBuffer.buffer, streamBuffer.columnStart,
|
||||
columnBuffer.buffer, columnBuffer.position,
|
||||
streamBuffer.currentPosition - streamBuffer.columnStart);
|
||||
|
||||
columnBuffer.position += streamBuffer.currentPosition
|
||||
- streamBuffer.columnStart;
|
||||
}
|
||||
|
||||
streamBuffer.columnStart = streamBuffer.currentPosition + 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void close(boolean closing) {
|
||||
if (!closed) {
|
||||
if (closing) {
|
||||
streamBuffer.buffer = null;
|
||||
columnBuffer.buffer = null;
|
||||
}
|
||||
|
||||
try {
|
||||
if (initialized) {
|
||||
inputStream.close();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// just eat the exception
|
||||
}
|
||||
|
||||
inputStream = null;
|
||||
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A buffer structure that used to load data from stream for processing.
|
||||
*
|
||||
* @author gke
|
||||
*/
|
||||
class StreamBuffer {
|
||||
|
||||
char[] buffer;
|
||||
|
||||
int currentPosition;
|
||||
|
||||
// count indicates how much usable data has been read into the stream,
|
||||
// which will not always be as long as Buffer.Length.
|
||||
int count;
|
||||
|
||||
// columnStart is the position in the buffer when the
|
||||
// current column was started or the last time data
|
||||
// was moved out to the column buffer.
|
||||
int columnStart;
|
||||
|
||||
public StreamBuffer() {
|
||||
buffer = new char[StaticSettings.MAX_BUFFER_SIZE];
|
||||
currentPosition = 0;
|
||||
count = 0;
|
||||
columnStart = 0;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -19,7 +19,7 @@
|
||||
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
|
||||
//
|
||||
// ============================================================================
|
||||
package org.talend.fileprocess.delimited;
|
||||
package org.talend.fileprocess.delimited.patched;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
@@ -19,7 +19,7 @@
|
||||
// Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
|
||||
//
|
||||
// ============================================================================
|
||||
package org.talend.fileprocess.delimited;
|
||||
package org.talend.fileprocess.delimited.patched;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
@@ -1,4 +1,4 @@
|
||||
package org.talend.fileprocess.delimited;
|
||||
package org.talend.fileprocess.delimited.patched;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.File;
|
||||
Reference in New Issue
Block a user