diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ColumnInfo.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ColumnInfo.java
index 685f58a83..080d14293 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ColumnInfo.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ColumnInfo.java
@@ -9,6 +9,7 @@
*
This class holds the metadata for a column, including the namespace (schema), table name, and
* the column name within the table.
*/
+@SuppressWarnings("SameNameButDifferent")
@Value
@Builder
public class ColumnInfo {
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ScanRange.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ScanRange.java
index b1ae7b02d..726ca0e7f 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ScanRange.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ScanRange.java
@@ -4,6 +4,7 @@
import lombok.Value;
/** * The scan range which is used in data export scan filtering */
+@SuppressWarnings("SameNameButDifferent")
@Value
public class ScanRange {
/** The key for scan start filter */
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java
index f66efdc9d..f228b8fc1 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java
@@ -28,6 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@SuppressWarnings({"SameNameButDifferent", "FutureReturnValueIgnored"})
@RequiredArgsConstructor
public abstract class ExportManager {
private static final Logger logger = LoggerFactory.getLogger(ExportManager.class);
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportOptions.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportOptions.java
index da515cf3c..b7cad03fa 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportOptions.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportOptions.java
@@ -9,8 +9,8 @@
import lombok.Builder;
import lombok.Data;
-/** Options for a ScalarDB export data operation */
-@SuppressWarnings("SameNameButDifferent")
+/** Options for a ScalarDB export data operation. */
+@SuppressWarnings({"SameNameButDifferent", "MissingSummary"})
@Builder(builderMethodName = "hiddenBuilder")
@Data
public class ExportOptions {
@@ -31,6 +31,15 @@ public class ExportOptions {
@Builder.Default private List projectionColumns = Collections.emptyList();
private List sortOrders;
+ /**
+ * Generates and returns an export options builder.
+ *
+ * @param namespace namespaces for export
+ * @param tableName tableName for export
+ * @param scanPartitionKey scan partition key for export
+ * @param outputFileFormat output file format for export
+ * @return a configured export options builder
+ */
public static ExportOptionsBuilder builder(
String namespace, String tableName, Key scanPartitionKey, FileFormat outputFileFormat) {
return hiddenBuilder()
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonLineProducerTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonLineProducerTask.java
index 689084698..4e19cd464 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonLineProducerTask.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonLineProducerTask.java
@@ -15,13 +15,10 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class JsonLineProducerTask extends ProducerTask {
private final DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper();
- private static final Logger logger = LoggerFactory.getLogger(JsonLineProducerTask.class);
/**
* Class constructor
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonProducerTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonProducerTask.java
index 9fb8014c6..4ea10130f 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonProducerTask.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonProducerTask.java
@@ -16,14 +16,11 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class JsonProducerTask extends ProducerTask {
private final DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper();
private final boolean prettyPrintJson;
- private static final Logger logger = LoggerFactory.getLogger(JsonProducerTask.class);
/**
* Class constructor
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerResult.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerResult.java
deleted file mode 100644
index 9506fcd72..000000000
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerResult.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package com.scalar.db.dataloader.core.dataexport.producer;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import lombok.Builder;
-import lombok.Value;
-
-@Builder
-@Value
-public class ProducerResult {
- JsonNode jsonNode;
- String csvSource;
- boolean poisonPill;
-}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTaskFactory.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTaskFactory.java
index 18adc8de6..f4577e463 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTaskFactory.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTaskFactory.java
@@ -8,6 +8,7 @@
import java.util.Map;
import lombok.RequiredArgsConstructor;
+@SuppressWarnings("SameNameButDifferent")
@RequiredArgsConstructor
public class ProducerTaskFactory {
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/validation/ExportOptionsValidator.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/validation/ExportOptionsValidator.java
index 7bf7645b0..1a0407160 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/validation/ExportOptionsValidator.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/validation/ExportOptionsValidator.java
@@ -17,6 +17,7 @@
* A validator for ensuring that export options are consistent with the ScalarDB table metadata and
* follow the defined constraints.
*/
+@SuppressWarnings("SameNameButDifferent")
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ExportOptionsValidator {
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java
index f1984d6c2..8b08fbb53 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java
@@ -35,6 +35,7 @@
* Notifying listeners of various import events
*
*/
+@SuppressWarnings("SameNameButDifferent")
@AllArgsConstructor
public class ImportManager implements ImportEventListener {
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java
index 6d3206765..359fb1f88 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java
@@ -7,7 +7,8 @@
import lombok.Builder;
import lombok.Data;
-/** Import options to import data into one or more ScalarDB tables */
+/** Import options to import data into one or more ScalarDB tables. */
+@SuppressWarnings({"SameNameButDifferent", "MissingSummary"})
@Builder
@Data
public class ImportOptions {
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFile.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFile.java
index 6a2229c18..a6888abfb 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFile.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFile.java
@@ -11,6 +11,7 @@
* Represents a control file that holds control file tables which contains the column mappings that
* maps a source file column to the actual database table column.
*/
+@SuppressWarnings("SameNameButDifferent")
@Getter
@Setter
public class ControlFile {
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileTable.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileTable.java
index efcfb0bc0..c65d05887 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileTable.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileTable.java
@@ -12,6 +12,7 @@
* table name, and field mappings. This class is used to define how data from a control file maps to
* a specific table in ScalarDB.
*/
+@SuppressWarnings("SameNameButDifferent")
@Getter
@Setter
public class ControlFileTable {
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileTableFieldMapping.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileTableFieldMapping.java
index 106857330..74785579e 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileTableFieldMapping.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileTableFieldMapping.java
@@ -10,6 +10,7 @@
* This class defines how data from a specific field in the input source should be mapped to the
* corresponding column in the database.
*/
+@SuppressWarnings("SameNameButDifferent")
@Getter
@Setter
public class ControlFileTableFieldMapping {
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportDataChunk.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportDataChunk.java
index 69ed97421..2ab6539d6 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportDataChunk.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportDataChunk.java
@@ -4,7 +4,8 @@
import lombok.Builder;
import lombok.Data;
-/** * Import data chunk data */
+/** * Import data chunk data. */
+@SuppressWarnings({"SameNameButDifferent", "MissingSummary"})
@Data
@Builder
public class ImportDataChunk {
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportDataChunkStatus.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportDataChunkStatus.java
index d6db3e1e7..0009f71cd 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportDataChunkStatus.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportDataChunkStatus.java
@@ -6,7 +6,8 @@
import lombok.Builder;
import lombok.Data;
-/** * A DTO to store import data chunk details */
+/** * A DTO to store import data chunk details. */
+@SuppressWarnings({"SameNameButDifferent", "MissingSummary"})
@Data
@Builder
@JsonDeserialize(builder = ImportDataChunkStatus.ImportDataChunkStatusBuilder.class)
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportRow.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportRow.java
index 824ca4ffa..84bcd0af3 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportRow.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportRow.java
@@ -3,7 +3,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Value;
-/** Stores data related to a single row on import file */
+/** Stores data related to a single row on import file. */
+@SuppressWarnings("SameNameButDifferent")
@Value
public class ImportRow {
int rowNumber;
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/AbstractImportLogger.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/AbstractImportLogger.java
new file mode 100644
index 000000000..7addf96a5
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/AbstractImportLogger.java
@@ -0,0 +1,243 @@
+package com.scalar.db.dataloader.core.dataimport.log;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.scalar.db.dataloader.core.Constants;
+import com.scalar.db.dataloader.core.DataLoaderObjectMapper;
+import com.scalar.db.dataloader.core.dataimport.ImportEventListener;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
+import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriter;
+import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory;
+import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult;
+import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus;
+import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
+import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
+import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * An abstract base class for logging import events during data loading operations. This class
+ * implements the {@link ImportEventListener} interface and provides common functionality for
+ * logging transaction batch results and managing event listeners. Concrete implementations should
+ * define how to log transaction batches and handle errors.
+ */
+@SuppressWarnings("SameNameButDifferent")
+@RequiredArgsConstructor
+public abstract class AbstractImportLogger implements ImportEventListener {
+
+ /** Object mapper used for JSON serialization/deserialization. */
+ protected static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper();
+
+ /** Configuration for the import logger. */
+ protected final ImportLoggerConfig config;
+
+ /** Factory for creating log writers. */
+ protected final LogWriterFactory logWriterFactory;
+
+ /** List of event listeners to be notified of import events. */
+ protected final List listeners = new ArrayList<>();
+
+ /**
+ * Called when a data chunk import is started. Currently, this implementation does not log the
+ * start of a data chunk.
+ *
+ * @param importDataChunkStatus the status of the data chunk being imported
+ */
+ @Override
+ public void onDataChunkStarted(ImportDataChunkStatus importDataChunkStatus) {
+ // Currently we are not logging the start of a data chunk
+ }
+
+ /**
+ * Called when a transaction batch is started. Currently, this implementation does not log the
+ * start of a transaction batch, but it notifies all registered listeners.
+ *
+ * @param batchStatus the status of the transaction batch being started
+ */
+ @Override
+ public void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) {
+ // Currently we are not logging the start of a transaction batch
+ notifyTransactionBatchStarted(batchStatus);
+ }
+
+ /**
+ * Called when a transaction batch is completed. This method logs the transaction batch result if
+ * it should be logged based on the configuration, and notifies all registered listeners.
+ *
+ * @param batchResult the result of the completed transaction batch
+ */
+ @Override
+ public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) {
+ // skip logging success records if the configuration is set to skip
+ if (shouldSkipLoggingSuccess(batchResult)) {
+ return;
+ }
+
+ logTransactionBatch(batchResult);
+ notifyTransactionBatchCompleted(batchResult);
+ }
+
+ /**
+ * Logs a transaction batch result. This method should be implemented by concrete subclasses to
+ * define how to log transaction batch results.
+ *
+ * @param batchResult the transaction batch result to log
+ */
+ protected abstract void logTransactionBatch(ImportTransactionBatchResult batchResult);
+
+ /**
+ * Determines whether logging of a successful transaction batch should be skipped. Logging is
+ * skipped if the batch was successful and the configuration specifies not to log success records.
+ *
+ * @param batchResult the transaction batch result to check
+ * @return true if logging should be skipped, false otherwise
+ */
+ protected boolean shouldSkipLoggingSuccess(ImportTransactionBatchResult batchResult) {
+ return batchResult.isSuccess() && !config.isLogSuccessRecords();
+ }
+
+ /**
+ * Creates a filtered JSON representation of a transaction batch result. This method filters out
+ * raw record data if the configuration specifies not to log raw source records.
+ *
+ * @param batchResult the transaction batch result to convert to JSON
+ * @return a JsonNode representing the filtered transaction batch result
+ */
+ protected JsonNode createFilteredTransactionBatchLogJsonNode(
+ ImportTransactionBatchResult batchResult) {
+
+ // If the batch result does not contain any records, return the batch result as is
+ if (batchResult.getRecords() == null) {
+ return OBJECT_MAPPER.valueToTree(batchResult);
+ }
+
+ // Create a new list to store the modified import task results
+ List modifiedRecords = new ArrayList<>();
+
+ // Loop over the records in the batchResult
+ for (ImportTaskResult taskResult : batchResult.getRecords()) {
+ // Create a new ImportTaskResult and not add the raw record yet
+ List targetResults =
+ batchResult.isSuccess()
+ ? taskResult.getTargets()
+ : updateTargetStatusForAbortedTransactionBatch(taskResult.getTargets());
+ ImportTaskResult.ImportTaskResultBuilder builder =
+ ImportTaskResult.builder()
+ .rowNumber(taskResult.getRowNumber())
+ .targets(targetResults)
+ .dataChunkId(taskResult.getDataChunkId());
+
+ // Only add the raw record if the configuration is set to log raw source data
+ if (config.isLogRawSourceRecords()) {
+ builder.rawRecord(taskResult.getRawRecord());
+ }
+ ImportTaskResult modifiedTaskResult = builder.build();
+
+ // Add the modified task result to the list
+ modifiedRecords.add(modifiedTaskResult);
+ }
+
+ // Create a new transaction batch result with the modified import task results
+ ImportTransactionBatchResult modifiedBatchResult =
+ ImportTransactionBatchResult.builder()
+ .dataChunkId(batchResult.getDataChunkId())
+ .transactionBatchId(batchResult.getTransactionBatchId())
+ .transactionId(batchResult.getTransactionId())
+ .records(modifiedRecords)
+ .errors(batchResult.getErrors())
+ .success(batchResult.isSuccess())
+ .build();
+
+ // Convert the modified batch result to a JsonNode
+ return OBJECT_MAPPER.valueToTree(modifiedBatchResult);
+ }
+
+ /**
+ * Safely closes a log writer. If an IOException occurs during closing, it logs the error using
+ * the {@link #logError} method.
+ *
+ * @param logWriter the log writer to close, may be null
+ */
+ protected void closeLogWriter(LogWriter logWriter) {
+ if (logWriter != null) {
+ try {
+ logWriter.close();
+ } catch (IOException e) {
+ logError("Failed to close a log writer", e);
+ }
+ }
+ }
+
+ /**
+ * Logs an error message and exception. This method should be implemented by concrete subclasses
+ * to define how to log errors.
+ *
+ * @param errorMessage the error message to log
+ * @param e the exception that caused the error
+ */
+ protected abstract void logError(String errorMessage, Exception e);
+
+ /**
+ * Creates a log writer for the specified log file path.
+ *
+ * @param logFilePath the path to the log file
+ * @return a new log writer
+ * @throws IOException if an I/O error occurs while creating the log writer
+ */
+ protected LogWriter createLogWriter(String logFilePath) throws IOException {
+ return logWriterFactory.createLogWriter(logFilePath);
+ }
+
+ /**
+ * Notifies all registered listeners that a transaction batch has started.
+ *
+ * @param status the status of the transaction batch that has started
+ */
+ private void notifyTransactionBatchStarted(ImportTransactionBatchStatus status) {
+ for (ImportEventListener listener : listeners) {
+ listener.onTransactionBatchStarted(status);
+ }
+ }
+
+ /**
+ * Notifies all registered listeners that a transaction batch has completed.
+ *
+ * @param batchResult the result of the completed transaction batch
+ */
+ private void notifyTransactionBatchCompleted(ImportTransactionBatchResult batchResult) {
+ for (ImportEventListener listener : listeners) {
+ listener.onTransactionBatchCompleted(batchResult);
+ }
+ }
+
+ /**
+ * Updates the status of target results for an aborted transaction batch. For each target with a
+ * status of SAVED, changes the status to ABORTED and adds an error message.
+ *
+ * @param targetResults the list of target results to update
+ * @return the updated list of target results
+ */
+ private List updateTargetStatusForAbortedTransactionBatch(
+ List targetResults) {
+ for (int i = 0; i < targetResults.size(); i++) {
+ ImportTargetResult target = targetResults.get(i);
+ if (target.getStatus().equals(ImportTargetResultStatus.SAVED)) {
+ ImportTargetResult newTarget =
+ ImportTargetResult.builder()
+ .importAction(target.getImportAction())
+ .status(ImportTargetResultStatus.ABORTED)
+ .importedRecord(target.getImportedRecord())
+ .namespace(target.getNamespace())
+ .tableName(target.getTableName())
+ .dataMapped(target.isDataMapped())
+ .errors(Collections.singletonList(Constants.ABORT_TRANSACTION_STATUS))
+ .build();
+ targetResults.set(i, newTarget);
+ }
+ }
+ return targetResults;
+ }
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerConfig.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerConfig.java
new file mode 100644
index 000000000..1bdec6cd7
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerConfig.java
@@ -0,0 +1,38 @@
+package com.scalar.db.dataloader.core.dataimport.log;
+
+import lombok.Builder;
+import lombok.Value;
+
+/**
+ * Configuration class for import loggers. This class uses Lombok's {@code @Value} annotation to
+ * create an immutable class and {@code @Builder} annotation to provide a builder pattern for
+ * creating instances.
+ */
+@Value
+@Builder
+@SuppressWarnings("SameNameButDifferent")
+public class ImportLoggerConfig {
+ /**
+ * The directory path where log files will be stored. This path should end with a directory
+ * separator (e.g., "/").
+ */
+ String logDirectoryPath;
+
+ /**
+ * Whether to log records that were successfully imported. If true, successful import operations
+ * will be logged to success log files.
+ */
+ boolean isLogSuccessRecords;
+
+ /**
+ * Whether to log raw source records that failed to be imported. If true, failed import operations
+ * will be logged to failure log files.
+ */
+ boolean isLogRawSourceRecords;
+
+ /**
+ * Whether to format the logs with pretty printing. If true, the JSON logs will be formatted with
+ * indentation for better readability.
+ */
+ boolean prettyPrint;
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java
new file mode 100644
index 000000000..cdd7a66d0
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java
@@ -0,0 +1,230 @@
+package com.scalar.db.dataloader.core.dataimport.log;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
+import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriter;
+import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory;
+import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult;
+import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus;
+import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
+import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
+import java.io.IOException;
+import javax.annotation.concurrent.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of {@link AbstractImportLogger} that uses a single file for each log type.
+ * Unlike {@link SplitByDataChunkImportLogger}, this logger creates only three log files: one for
+ * successful operations, one for failed operations, and one for summary information, regardless of
+ * the number of data chunks processed.
+ *
+ * The log files are named as follows:
+ *
+ *
+ * - success.json - Records of successful import operations
+ *
- failure.json - Records of failed import operations
+ *
- summary.log - Summary information for all data chunks
+ *
+ */
+@ThreadSafe
+public class SingleFileImportLogger extends AbstractImportLogger {
+
+ protected static final String SUMMARY_LOG_FILE_NAME = "summary.log";
+ protected static final String SUCCESS_LOG_FILE_NAME = "success.json";
+ protected static final String FAILURE_LOG_FILE_NAME = "failure.json";
+ private static final Logger logger = LoggerFactory.getLogger(SingleFileImportLogger.class);
+ private volatile LogWriter summaryLogWriter;
+ private final LogWriter successLogWriter;
+ private final LogWriter failureLogWriter;
+
+ /**
+ * Creates a new instance of SingleFileImportLogger. Initializes the success and failure log
+ * writers immediately. The summary log writer is created on demand when the first data chunk is
+ * completed.
+ *
+ * @param config the configuration for the logger
+ * @param logWriterFactory the factory to create log writers
+ * @throws IOException if an I/O error occurs while creating the log writers
+ */
+ public SingleFileImportLogger(ImportLoggerConfig config, LogWriterFactory logWriterFactory)
+ throws IOException {
+ super(config, logWriterFactory);
+ successLogWriter = createLogWriter(config.getLogDirectoryPath() + SUCCESS_LOG_FILE_NAME);
+ failureLogWriter = createLogWriter(config.getLogDirectoryPath() + FAILURE_LOG_FILE_NAME);
+ }
+
+ /**
+ * Called when an import task is completed. Writes the task result details to the appropriate log
+ * files based on the configuration.
+ *
+ * @param taskResult the result of the completed import task
+ */
+ @Override
+ public void onTaskComplete(ImportTaskResult taskResult) {
+ if (!config.isLogSuccessRecords() && !config.isLogRawSourceRecords()) return;
+ try {
+ writeImportTaskResultDetailToLogs(taskResult);
+ } catch (Exception e) {
+ logError("Failed to write success/failure logs", e);
+ }
+ }
+
+ /**
+ * Called to add or update the status of a data chunk. This implementation does nothing as the
+ * status is only logged when the data chunk is completed.
+ *
+ * @param status the status of the data chunk
+ */
+ @Override
+ public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {}
+
+ /**
+ * Called when a data chunk is completed. Logs the summary of the data chunk to the summary log
+ * file.
+ *
+ * @param dataChunkStatus the status of the completed data chunk
+ */
+ @Override
+ public void onDataChunkCompleted(ImportDataChunkStatus dataChunkStatus) {
+ try {
+ logDataChunkSummary(dataChunkStatus);
+ } catch (IOException e) {
+ logError("Failed to log the data chunk summary", e);
+ }
+ }
+
+ /** Called when all data chunks are completed. Closes all log writers. */
+ @Override
+ public void onAllDataChunksCompleted() {
+ closeAllLogWriters();
+ }
+
+ /**
+ * Logs a transaction batch result to the appropriate log file based on its success status.
+ *
+ * @param batchResult the transaction batch result to log
+ */
+ @Override
+ protected void logTransactionBatch(ImportTransactionBatchResult batchResult) {
+ try {
+ LogWriter logWriter = getLogWriterForTransactionBatch(batchResult);
+ JsonNode jsonNode = createFilteredTransactionBatchLogJsonNode(batchResult);
+ writeToLogWriter(logWriter, jsonNode);
+ } catch (IOException e) {
+ logError("Failed to write a transaction batch record to the log file", e);
+ }
+ }
+
+ /**
+ * Logs an error message with an exception to the logger.
+ *
+ * @param errorMessage the error message to log
+ * @param exception the exception associated with the error
+ */
+ @Override
+ protected void logError(String errorMessage, Exception exception) {
+ logger.error(errorMessage, exception);
+ throw new RuntimeException(errorMessage, exception);
+ }
+
+ /**
+ * Logs the summary of a data chunk to the summary log file. Creates the summary log writer if it
+ * doesn't exist yet.
+ *
+ * @param dataChunkStatus the status of the data chunk to log
+ * @throws IOException if an I/O error occurs while writing to the log
+ */
+ private void logDataChunkSummary(ImportDataChunkStatus dataChunkStatus) throws IOException {
+ ensureSummaryLogWriterInitialized();
+ writeImportDataChunkSummary(dataChunkStatus, summaryLogWriter);
+ }
+
+ /**
+ * Ensures that the summary log writer is initialized in a thread-safe manner.
+ *
+ * @throws IOException if an error occurs while creating the log writer
+ */
+ private void ensureSummaryLogWriterInitialized() throws IOException {
+ if (summaryLogWriter == null) {
+ synchronized (this) {
+ if (summaryLogWriter == null) {
+ summaryLogWriter = createLogWriter(config.getLogDirectoryPath() + SUMMARY_LOG_FILE_NAME);
+ }
+ }
+ }
+ }
+
+ /**
+ * Writes the summary of a data chunk to the specified log writer.
+ *
+ * @param dataChunkStatus the status of the data chunk to log
+ * @param logWriter the log writer to write to
+ * @throws IOException if an I/O error occurs while writing to the log
+ */
+ private void writeImportDataChunkSummary(
+ ImportDataChunkStatus dataChunkStatus, LogWriter logWriter) throws IOException {
+ JsonNode jsonNode = OBJECT_MAPPER.valueToTree(dataChunkStatus);
+ writeToLogWriter(logWriter, jsonNode);
+ }
+
+ /**
+ * Gets the appropriate log writer for a transaction batch based on its success status. If the log
+ * writer doesn't exist yet, it will be created.
+ *
+ * @param batchResult the transaction batch result
+ * @return the log writer for the batch
+ * @throws IOException if an I/O error occurs while creating a new log writer
+ */
+ private LogWriter getLogWriterForTransactionBatch(ImportTransactionBatchResult batchResult)
+ throws IOException {
+ return batchResult.isSuccess() ? successLogWriter : failureLogWriter;
+ }
+
+ /**
+ * Writes the details of an import task result to the appropriate log files. Successful targets
+ * are written to success logs and failed targets to failure logs. The method is synchronized on
+ * the respective log writers to ensure thread safety.
+ *
+ * @param importTaskResult the result of the import task to log
+ * @throws IOException if an I/O error occurs while writing to the logs
+ */
+ private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult)
+ throws IOException {
+ for (ImportTargetResult target : importTaskResult.getTargets()) {
+ if (config.isLogSuccessRecords()
+ && target.getStatus().equals(ImportTargetResultStatus.SAVED)) {
+
+ writeToLogWriter(successLogWriter, OBJECT_MAPPER.valueToTree(target));
+ }
+ if (config.isLogRawSourceRecords()
+ && !target.getStatus().equals(ImportTargetResultStatus.SAVED)) {
+ writeToLogWriter(failureLogWriter, OBJECT_MAPPER.valueToTree(target));
+ }
+ }
+ }
+
+ /**
+ * Writes a JSON node to a log writer and flushes the writer.
+ *
+ * @param logWriter the log writer to write to
+ * @param jsonNode the JSON node to write
+ * @throws IOException if an I/O error occurs while writing
+ */
+ private void writeToLogWriter(LogWriter logWriter, JsonNode jsonNode) throws IOException {
+ logWriter.write(jsonNode);
+ }
+
+ /**
+ * Closes all log writers and sets them to null. This method is called when all data chunks have
+ * been completed.
+ */
+ private void closeAllLogWriters() {
+ synchronized (this) {
+ closeLogWriter(summaryLogWriter);
+ closeLogWriter(successLogWriter);
+ closeLogWriter(failureLogWriter);
+ summaryLogWriter = null;
+ }
+ }
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java
new file mode 100644
index 000000000..99d1fe70d
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java
@@ -0,0 +1,305 @@
+package com.scalar.db.dataloader.core.dataimport.log;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
+import com.scalar.db.dataloader.core.dataimport.log.writer.LogFileType;
+import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriter;
+import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory;
+import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult;
+import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus;
+import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
+import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of {@link AbstractImportLogger} that creates separate log files for each data
+ * chunk. This logger maintains separate log writers for success, failure, and summary logs for each
+ * data chunk, allowing for better organization and easier tracking of import operations by data
+ * chunk.
+ *
+ * The log files are named using the following formats:
+ *
+ *
+ * - Success logs: data_chunk_[id]_success.json
+ *
- Failure logs: data_chunk_[id]_failure.json
+ *
- Summary logs: data_chunk_[id]_summary.json
+ *
+ *
+ * Log writers are created on demand and closed when their corresponding data chunk is completed.
+ */
+@ThreadSafe
+public class SplitByDataChunkImportLogger extends AbstractImportLogger {
+
+ protected static final String SUMMARY_LOG_FILE_NAME_FORMAT = "data_chunk_%s_summary.json";
+ protected static final String FAILURE_LOG_FILE_NAME_FORMAT = "data_chunk_%s_failure.json";
+ protected static final String SUCCESS_LOG_FILE_NAME_FORMAT = "data_chunk_%s_success.json";
+
+ private static final Logger logger = LoggerFactory.getLogger(SplitByDataChunkImportLogger.class);
+ private final Map summaryLogWriters = new ConcurrentHashMap<>();
+ private final Map successLogWriters = new ConcurrentHashMap<>();
+ private final Map failureLogWriters = new ConcurrentHashMap<>();
+
+ /**
+ * Creates a new instance of SplitByDataChunkImportLogger.
+ *
+ * @param config the configuration for the logger
+ * @param logWriterFactory the factory to create log writers
+ */
+ public SplitByDataChunkImportLogger(
+ ImportLoggerConfig config, LogWriterFactory logWriterFactory) {
+ super(config, logWriterFactory);
+ }
+
+ /**
+ * Called when an import task is completed. Writes the task result details to the appropriate log
+ * files based on the configuration.
+ *
+ * @param taskResult the result of the completed import task
+ */
+ @Override
+ public void onTaskComplete(ImportTaskResult taskResult) {
+ if (!config.isLogSuccessRecords() && !config.isLogRawSourceRecords()) return;
+ try {
+ writeImportTaskResultDetailToLogs(taskResult);
+ } catch (IOException e) {
+ logError("Failed to write success/failure logs", e);
+ }
+ }
+
+ /**
+ * Writes the details of an import task result to the appropriate log files. Successful targets
+ * are written to success logs and failed targets to failure logs.
+ *
+ * @param importTaskResult the result of the import task to log
+ * @throws IOException if an I/O error occurs while writing to the logs
+ */
+ private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult)
+ throws IOException {
+ for (ImportTargetResult target : importTaskResult.getTargets()) {
+ ImportTargetResultStatus status = target.getStatus();
+ if (status.equals(ImportTargetResultStatus.SAVED) && config.isLogSuccessRecords()) {
+ writeLog(target, LogFileType.SUCCESS, importTaskResult.getDataChunkId());
+ } else if (!status.equals(ImportTargetResultStatus.SAVED) && config.isLogRawSourceRecords()) {
+ writeLog(target, LogFileType.FAILURE, importTaskResult.getDataChunkId());
+ }
+ }
+ }
+
+ /**
+ * Serializes the given {@link ImportTargetResult} to JSON and writes it to a log file
+ * corresponding to the provided {@link LogFileType} and data chunk ID.
+ *
+ * This method ensures thread-safe access to the underlying {@link LogWriter} by synchronizing
+ * on the writer instance. It is safe to call concurrently from multiple threads handling the same
+ * or different data chunks.
+ *
+ * @param target the result of processing a single import target to be logged
+ * @param logFileType the type of log file to write to (e.g., SUCCESS or FAILURE)
+ * @param dataChunkId the ID of the data chunk associated with the log entry
+ * @throws IOException if writing or flushing the log fails
+ */
+ private void writeLog(ImportTargetResult target, LogFileType logFileType, int dataChunkId)
+ throws IOException {
+ JsonNode jsonNode = OBJECT_MAPPER.valueToTree(target);
+ LogWriter writer = initializeLogWriterIfNeeded(logFileType, dataChunkId);
+ writer.write(jsonNode);
+ }
+
+ /**
+ * Called to add or update the status of a data chunk. This implementation does nothing as the
+ * status is only logged when the data chunk is completed.
+ *
+ * @param status the status of the data chunk
+ */
+ @Override
+ public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {}
+
+ /**
+ * Called when a data chunk is completed. Logs the summary of the data chunk and closes the log
+ * writers for that data chunk.
+ *
+ * @param dataChunkStatus the status of the completed data chunk
+ */
+ @Override
+ public void onDataChunkCompleted(ImportDataChunkStatus dataChunkStatus) {
+ try {
+ logDataChunkSummary(dataChunkStatus);
+ // Close the split log writers per data chunk if they exist for this data chunk id
+ closeLogWritersForDataChunk(dataChunkStatus.getDataChunkId());
+ } catch (IOException e) {
+ logError("Failed to log the data chunk summary", e);
+ }
+ }
+
+ /** Called when all data chunks are completed. Closes all remaining log writers. */
+ @Override
+ public void onAllDataChunksCompleted() {
+ closeAllDataChunkLogWriters();
+ }
+
+ /**
+ * Logs a transaction batch result to the appropriate log file based on its success status. The
+ * log file is determined by the data chunk ID and whether the batch was successful.
+ *
+ * @param batchResult the transaction batch result to log
+ */
+ @Override
+ protected void logTransactionBatch(ImportTransactionBatchResult batchResult) {
+ LogFileType logFileType = batchResult.isSuccess() ? LogFileType.SUCCESS : LogFileType.FAILURE;
+ try {
+ LogWriter logWriter = initializeLogWriterIfNeeded(logFileType, batchResult.getDataChunkId());
+ JsonNode jsonNode = createFilteredTransactionBatchLogJsonNode(batchResult);
+ logWriter.write(jsonNode);
+ } catch (IOException e) {
+ logError("Failed to write a transaction batch record to a split mode log file", e);
+ }
+ }
+
+ /**
+ * Logs an error message with an exception to the logger.
+ *
+ * @param errorMessage the error message to log
+ * @param exception the exception associated with the error
+ */
+ @Override
+ protected void logError(String errorMessage, Exception exception) {
+ logger.error(errorMessage, exception);
+ throw new RuntimeException(errorMessage, exception);
+ }
+
+ /**
+ * Logs the summary of a data chunk to a summary log file.
+ *
+ * @param dataChunkStatus the status of the data chunk to log
+ * @throws IOException if an I/O error occurs while writing to the log
+ */
+ private void logDataChunkSummary(ImportDataChunkStatus dataChunkStatus) throws IOException {
+ try (LogWriter logWriter =
+ initializeLogWriterIfNeeded(LogFileType.SUMMARY, dataChunkStatus.getDataChunkId())) {
+ logWriter.write(OBJECT_MAPPER.valueToTree(dataChunkStatus));
+ logWriter.flush();
+ }
+ }
+
+ /**
+ * Closes and removes the log writers for a specific data chunk.
+ *
+ * @param dataChunkId the ID of the data chunk whose log writers should be closed
+ */
+ private void closeLogWritersForDataChunk(int dataChunkId) {
+ closeLogWriter(successLogWriters.remove(dataChunkId));
+ closeLogWriter(failureLogWriters.remove(dataChunkId));
+ closeLogWriter(summaryLogWriters.remove(dataChunkId));
+ }
+
+ /**
+ * Closes all log writers for all data chunks and clears the writer maps. This method is called
+ * when all data chunks have been completed.
+ */
+ private void closeAllDataChunkLogWriters() {
+ summaryLogWriters.values().forEach(this::closeLogWriter);
+ successLogWriters.values().forEach(this::closeLogWriter);
+ failureLogWriters.values().forEach(this::closeLogWriter);
+ summaryLogWriters.clear();
+ successLogWriters.clear();
+ failureLogWriters.clear();
+ }
+
+ /**
+ * Constructs the log file path based on the batch ID and log file type.
+ *
+ * @param batchId the ID of the batch (data chunk)
+ * @param logFileType the type of log file (SUCCESS, FAILURE, or SUMMARY)
+ * @return the full path to the log file
+ */
+ private String getLogFilePath(long batchId, LogFileType logFileType) {
+ String logfilePath;
+ switch (logFileType) {
+ case SUCCESS:
+ logfilePath =
+ config.getLogDirectoryPath() + String.format(SUCCESS_LOG_FILE_NAME_FORMAT, batchId);
+ break;
+ case FAILURE:
+ logfilePath =
+ config.getLogDirectoryPath() + String.format(FAILURE_LOG_FILE_NAME_FORMAT, batchId);
+ break;
+ case SUMMARY:
+ logfilePath =
+ config.getLogDirectoryPath() + String.format(SUMMARY_LOG_FILE_NAME_FORMAT, batchId);
+ break;
+ default:
+ logfilePath = "";
+ }
+ return logfilePath;
+ }
+
+ /**
+ * Gets or creates a log writer for the specified log file type and data chunk ID. If a log writer
+ * for the specified type and data chunk doesn't exist, it will be created.
+ *
+ * @param logFileType the type of log file
+ * @param dataChunkId the ID of the data chunk
+ * @return the log writer for the specified type and data chunk
+ * @throws IOException if an I/O error occurs while creating a new log writer
+ */
+ private LogWriter initializeLogWriterIfNeeded(LogFileType logFileType, int dataChunkId)
+ throws IOException {
+ Map logWriters = getLogWriters(logFileType);
+ try {
+ return logWriters.computeIfAbsent(
+ dataChunkId,
+ id -> {
+ try {
+ return createLogWriter(logFileType, id);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
+ }
+ }
+
+ /**
+ * Creates a new log writer for the specified log file type and data chunk ID.
+ *
+ * @param logFileType the type of log file
+ * @param dataChunkId the ID of the data chunk
+ * @return a new log writer
+ * @throws IOException if an I/O error occurs while creating the log writer
+ */
+ private LogWriter createLogWriter(LogFileType logFileType, int dataChunkId) throws IOException {
+ String logFilePath = getLogFilePath(dataChunkId, logFileType);
+ return createLogWriter(logFilePath);
+ }
+
+ /**
+ * Gets the appropriate map of log writers for the specified log file type.
+ *
+ * @param logFileType the type of log file
+ * @return the map of log writers for the specified type
+ */
+ private Map getLogWriters(LogFileType logFileType) {
+ Map logWriterMap;
+ switch (logFileType) {
+ case SUCCESS:
+ logWriterMap = successLogWriters;
+ break;
+ case FAILURE:
+ logWriterMap = failureLogWriters;
+ break;
+ case SUMMARY:
+ logWriterMap = summaryLogWriters;
+ break;
+ default:
+ throw new AssertionError();
+ }
+ return logWriterMap;
+ }
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactory.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactory.java
new file mode 100644
index 000000000..c5ef96714
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactory.java
@@ -0,0 +1,32 @@
+package com.scalar.db.dataloader.core.dataimport.log.writer;
+
+import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig;
+import java.io.IOException;
+import lombok.AllArgsConstructor;
+
+/**
+ * The default implementation of {@link LogWriterFactory} that creates {@link LocalFileLogWriter}
+ * instances. This factory uses the provided {@link ImportLoggerConfig} to configure the log writers
+ * it creates. It's annotated with Lombok's {@code @AllArgsConstructor} to automatically generate a
+ * constructor that initializes the configuration field.
+ */
+@SuppressWarnings("SameNameButDifferent")
+@AllArgsConstructor
+public class DefaultLogWriterFactory implements LogWriterFactory {
+
+ private final ImportLoggerConfig importLoggerConfig;
+
+ /**
+ * Creates a {@link LocalFileLogWriter} for the specified log file path. The created log writer
+ * will be configured using the {@link ImportLoggerConfig} that was provided to this factory
+ * during construction.
+ *
+ * @param logFilePath the path where the log file will be created or appended to
+ * @return a new {@link LogWriter} instance that writes to the specified file
+ * @throws IOException if an I/O error occurs while creating the log writer
+ */
+ @Override
+ public LogWriter createLogWriter(String logFilePath) throws IOException {
+ return new LocalFileLogWriter(logFilePath, importLoggerConfig);
+ }
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LocalFileLogWriter.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LocalFileLogWriter.java
new file mode 100644
index 000000000..3689bd51d
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LocalFileLogWriter.java
@@ -0,0 +1,86 @@
+package com.scalar.db.dataloader.core.dataimport.log.writer;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.scalar.db.dataloader.core.DataLoaderObjectMapper;
+import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import javax.annotation.Nullable;
+
+/**
+ * An implementation of {@link LogWriter} that writes log entries to a local file. This class writes
+ * JSON records to a file as a JSON array, with each record being an element in the array. It
+ * handles file creation, appending, and proper JSON formatting.
+ */
+public class LocalFileLogWriter implements LogWriter {
+ private final JsonGenerator logWriter;
+ private final DataLoaderObjectMapper objectMapper;
+
+ /**
+ * Creates an instance of LocalFileLogWriter with the specified file path and configuration.
+ *
+ * @param filePath the path where the log file will be created or appended to
+ * @param importLoggerConfig the configuration for the logger, including formatting options
+ * @throws IOException if an I/O error occurs while creating or opening the file
+ */
+ public LocalFileLogWriter(String filePath, ImportLoggerConfig importLoggerConfig)
+ throws IOException {
+ Path path = Paths.get(filePath);
+ this.objectMapper = new DataLoaderObjectMapper();
+ this.logWriter =
+ objectMapper
+ .getFactory()
+ .createGenerator(
+ Files.newBufferedWriter(
+ path, StandardOpenOption.CREATE, StandardOpenOption.APPEND));
+ // Start the JSON array
+ if (importLoggerConfig.isPrettyPrint()) this.logWriter.useDefaultPrettyPrinter();
+ this.logWriter.writeStartArray();
+ this.logWriter.flush();
+ }
+
+ /**
+ * Writes a JSON record to the log file. If the source record is null, this method does nothing.
+ * The method is synchronized to ensure thread safety when writing to the file.
+ *
+ * @param sourceRecord the JSON record to write
+ * @throws IOException if an I/O error occurs while writing the record
+ */
+ @Override
+ public synchronized void write(@Nullable JsonNode sourceRecord) throws IOException {
+ if (sourceRecord == null) {
+ return;
+ }
+ objectMapper.writeValue(logWriter, sourceRecord);
+ }
+
+ /**
+ * Flushes any buffered data to the log file.
+ *
+ * @throws IOException if an I/O error occurs while flushing
+ */
+ @Override
+ public synchronized void flush() throws IOException {
+ logWriter.flush();
+ }
+
+ /**
+ * Closes the log writer, properly ending the JSON array and releasing resources. If the writer is
+ * already closed, this method does nothing.
+ *
+ * @throws IOException if an I/O error occurs while closing the writer
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ if (logWriter.isClosed()) {
+ return;
+ }
+ logWriter.writeEndArray();
+ logWriter.flush();
+ logWriter.close();
+ }
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogFileType.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogFileType.java
new file mode 100644
index 000000000..e56a949da
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogFileType.java
@@ -0,0 +1,25 @@
+package com.scalar.db.dataloader.core.dataimport.log.writer;
+
+/**
+ * Represents the different types of log files used in the data import process. Each type serves a
+ * specific purpose in tracking the import operation's results.
+ */
+public enum LogFileType {
+ /**
+ * Represents a log file that records successful import operations. These logs contain records
+ * that were successfully processed and imported.
+ */
+ SUCCESS,
+
+ /**
+ * Represents a log file that records failed import operations. These logs contain records that
+ * failed to be processed or imported, along with information about the failure.
+ */
+ FAILURE,
+
+ /**
+ * Represents a log file that provides a summary of the import operation. These logs contain
+ * aggregated statistics and overall results of the import process.
+ */
+ SUMMARY
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriter.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriter.java
new file mode 100644
index 000000000..32838f321
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriter.java
@@ -0,0 +1,37 @@
+package com.scalar.db.dataloader.core.dataimport.log.writer;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+
+/**
+ * An interface for writing log entries to a destination. This interface extends {@link
+ * AutoCloseable} to ensure proper resource cleanup. Implementations of this interface handle the
+ * details of writing log entries to various destinations such as files, databases, or other storage
+ * systems.
+ */
+public interface LogWriter extends AutoCloseable {
+
+ /**
+ * Writes a JSON record to the log.
+ *
+ * @param sourceRecord the JSON record to write
+ * @throws IOException if an I/O error occurs while writing the record
+ */
+ void write(JsonNode sourceRecord) throws IOException;
+
+ /**
+ * Flushes any buffered data to the underlying storage.
+ *
+ * @throws IOException if an I/O error occurs while flushing
+ */
+ void flush() throws IOException;
+
+ /**
+ * Closes this log writer and releases any system resources associated with it. This method should
+ * be called when the log writer is no longer needed to ensure proper cleanup of resources.
+ *
+ * @throws IOException if an I/O error occurs while closing the log writer
+ */
+ @Override
+ void close() throws IOException;
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactory.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactory.java
new file mode 100644
index 000000000..3854d728c
--- /dev/null
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactory.java
@@ -0,0 +1,20 @@
+package com.scalar.db.dataloader.core.dataimport.log.writer;
+
+import java.io.IOException;
+
+/**
+ * A factory interface for creating {@link LogWriter} instances. This interface abstracts the
+ * creation of log writers, allowing different implementations to create different types of log
+ * writers based on the application's needs.
+ */
+public interface LogWriterFactory {
+
+ /**
+ * Creates a new log writer for the specified log file path.
+ *
+ * @param logFilePath the path where the log file will be created or appended to
+ * @return a new {@link LogWriter} instance
+ * @throws IOException if an I/O error occurs while creating the log writer
+ */
+ LogWriter createLogWriter(String logFilePath) throws IOException;
+}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java
index 1a317a1a8..81514e61a 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java
@@ -37,11 +37,12 @@
* supports both transactional and non-transactional (storage) modes and provides event notification
* capabilities for monitoring the import process.
*/
+@SuppressWarnings("SameNameButDifferent")
@RequiredArgsConstructor
public abstract class ImportProcessor {
final ImportProcessorParams params;
- private static final Logger LOGGER = LoggerFactory.getLogger(ImportProcessor.class);
+ private static final Logger logger = LoggerFactory.getLogger(ImportProcessor.class);
private final List listeners = new ArrayList<>();
/**
@@ -232,13 +233,13 @@ private ImportTransactionBatchResult processTransactionBatch(
} catch (TransactionException e) {
isSuccess = false;
- LOGGER.error(e.getMessage());
+ logger.error(e.getMessage());
try {
if (transaction != null) {
transaction.abort(); // Ensure transaction is aborted
}
} catch (TransactionException abortException) {
- LOGGER.error(
+ logger.error(
"Failed to abort transaction: {}", abortException.getMessage(), abortException);
}
error = e.getMessage();
@@ -446,7 +447,7 @@ private void waitForFuturesToComplete(List> futures) {
try {
future.get();
} catch (Exception e) {
- LOGGER.error(e.getMessage());
+ logger.error(e.getMessage());
}
}
}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java
index 36b96f62d..b09a27cf0 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java
@@ -17,6 +17,7 @@
* This class is immutable and uses the Builder pattern for construction. It encapsulates all
* required parameters and dependencies for processing data imports in ScalarDB.
*/
+@SuppressWarnings("SameNameButDifferent")
@Builder
@Value
public class ImportProcessorParams {
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java
index 3be177a00..5187225cc 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java
@@ -38,6 +38,7 @@
* functionality to import data into single or multiple tables based on the provided import options
* and control file configurations.
*/
+@SuppressWarnings({"SameNameButDifferent"})
@RequiredArgsConstructor
public abstract class ImportTask {
@@ -148,7 +149,8 @@ private List startMultiTableImportProcess(
copyNode);
targetResults.add(result);
}
- return targetResults;
+ // Wrapped in unmodifiable list to fix MixedMutabilityReturnType error-prone warning
+ return Collections.unmodifiableList(targetResults);
}
/**
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskParams.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskParams.java
index eafe3a42a..f68a526b8 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskParams.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskParams.java
@@ -14,6 +14,7 @@
* Parameters required for executing an import task in the data loader. This class encapsulates all
* necessary information needed to process and import a single record into ScalarDB.
*/
+@SuppressWarnings("SameNameButDifferent")
@Builder
@Value
public class ImportTaskParams {
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTransactionalTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTransactionalTask.java
index 449270d92..a5b764890 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTransactionalTask.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTransactionalTask.java
@@ -3,8 +3,6 @@
import com.scalar.db.api.DistributedTransaction;
import com.scalar.db.api.Result;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException;
-import com.scalar.db.exception.transaction.AbortException;
-import com.scalar.db.exception.transaction.TransactionException;
import com.scalar.db.io.Column;
import com.scalar.db.io.Key;
import java.util.List;
@@ -83,24 +81,4 @@ protected void saveRecord(
throws ScalarDBDaoException {
params.getDao().put(namespace, tableName, partitionKey, clusteringKey, columns, transaction);
}
-
- /**
- * Aborts the active ScalarDB transaction if it has not been committed.
- *
- * This method provides a safe way to abort an active transaction, handling any abort-related
- * exceptions by wrapping them in a {@link TransactionException}.
- *
- * @param tx the transaction to be aborted. If null, this method does nothing
- * @throws TransactionException if an error occurs during the abort operation or if the underlying
- * abort operation fails
- */
- private void abortActiveTransaction(DistributedTransaction tx) throws TransactionException {
- if (tx != null) {
- try {
- tx.abort();
- } catch (AbortException e) {
- throw new TransactionException(e.getMessage(), tx.getId());
- }
- }
- }
}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/result/ImportTargetResult.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/result/ImportTargetResult.java
index 0fe4e0379..55a5e2ba9 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/result/ImportTargetResult.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/result/ImportTargetResult.java
@@ -6,6 +6,8 @@
import lombok.Builder;
import lombok.Value;
+/** To store import target result. */
+@SuppressWarnings({"SameNameButDifferent", "MissingSummary"})
@Builder
@Value
public class ImportTargetResult {
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/result/ImportTaskResult.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/result/ImportTaskResult.java
index 3e08cc709..0d4dba0c8 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/result/ImportTaskResult.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/result/ImportTaskResult.java
@@ -7,6 +7,7 @@
import lombok.Builder;
import lombok.Value;
+@SuppressWarnings({"SameNameButDifferent", "MissingSummary"})
@Builder
@Value
@JsonDeserialize(builder = ImportTaskResult.ImportTaskResultBuilder.class)
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidationResult.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidationResult.java
index 30b878b9e..5c299f9a6 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidationResult.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidationResult.java
@@ -31,17 +31,29 @@ public void addErrorMessage(String columnName, String errorMessage) {
this.errorMessages.add(errorMessage);
}
- /** @return Immutable list of validation error messages */
+ /**
+ * Return error messages list.
+ *
+ * @return Immutable list of validation error messages.
+ */
public List getErrorMessages() {
return Collections.unmodifiableList(this.errorMessages);
}
- /** @return Immutable set of columns that had errors */
+ /**
+ * A set of columns with errors is stored and returned.
+ *
+ * @return Immutable set of columns that had errors.
+ */
public Set getColumnsWithErrors() {
return Collections.unmodifiableSet(this.columnsWithErrors);
}
- /** @return Validation is valid or not */
+ /**
+ * Stores validation result.
+ *
+ * @return Validation is valid or not.
+ */
public boolean isValid() {
return this.errorMessages.isEmpty();
}
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidator.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidator.java
index 6d773ffcc..a04683017 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidator.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidator.java
@@ -9,6 +9,7 @@
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+@SuppressWarnings("SameNameButDifferent")
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ImportSourceRecordValidator {
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatch.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatch.java
index a922fd8af..6fef97c56 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatch.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatch.java
@@ -6,6 +6,7 @@
import lombok.Value;
/** Transaction batch details */
+@SuppressWarnings({"SameNameButDifferent", "MissingSummary"})
@Builder
@Value
public class ImportTransactionBatch {
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatchResult.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatchResult.java
index 0e44b6695..887b9a392 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatchResult.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatchResult.java
@@ -8,6 +8,7 @@
import lombok.Value;
/** Transaction batch result */
+@SuppressWarnings({"SameNameButDifferent", "MissingSummary"})
@Builder
@Value
@JsonDeserialize(builder = ImportTransactionBatchResult.ImportTransactionBatchResultBuilder.class)
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatchStatus.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatchStatus.java
index 1b7bae34c..42d37eb64 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatchStatus.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatchStatus.java
@@ -5,7 +5,8 @@
import lombok.Builder;
import lombok.Value;
-/** Batch status details */
+/** Batch status details. */
+@SuppressWarnings({"SameNameButDifferent", "MissingSummary"})
@Builder
@Value
public class ImportTransactionBatchStatus {
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataRequest.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataRequest.java
index 8e79da3d6..3b730a081 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataRequest.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataRequest.java
@@ -3,6 +3,7 @@
import lombok.Getter;
/** Represents the request for metadata for a single ScalarDB table */
+@SuppressWarnings("SameNameButDifferent")
@Getter
public class TableMetadataRequest {
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataService.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataService.java
index 881694580..f91435fe5 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataService.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataService.java
@@ -14,6 +14,7 @@
* Service for retrieving {@link TableMetadata} from ScalarDB. Provides methods to fetch metadata
* for individual tables or a collection of tables.
*/
+@SuppressWarnings("SameNameButDifferent")
@RequiredArgsConstructor
public class TableMetadataService {
diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/TableMetadataUtil.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/TableMetadataUtil.java
index ddc15a1e5..7cd1834d8 100644
--- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/TableMetadataUtil.java
+++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/TableMetadataUtil.java
@@ -11,6 +11,7 @@
import lombok.NoArgsConstructor;
/** Utility class for handling ScalarDB table metadata operations. */
+@SuppressWarnings("SameNameButDifferent")
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class TableMetadataUtil {
diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java
new file mode 100644
index 000000000..d03f04fbb
--- /dev/null
+++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java
@@ -0,0 +1,265 @@
+package com.scalar.db.dataloader.core.dataimport.log;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.scalar.db.dataloader.core.DataLoaderObjectMapper;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
+import com.scalar.db.dataloader.core.dataimport.log.writer.DefaultLogWriterFactory;
+import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory;
+import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
+import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SingleFileImportLoggerTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(SingleFileImportLoggerTest.class);
+ private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper();
+
+ @TempDir Path tempDir;
+
+ private LogWriterFactory logWriterFactory;
+
+ @BeforeEach
+ void setUp() {
+ ImportLoggerConfig importLoggerConfig =
+ ImportLoggerConfig.builder()
+ .prettyPrint(false)
+ .isLogSuccessRecords(false)
+ .isLogRawSourceRecords(false)
+ .logDirectoryPath("path")
+ .build();
+ logWriterFactory = new DefaultLogWriterFactory(importLoggerConfig);
+ }
+
+ @AfterEach
+ void tearDown() throws IOException {
+ cleanUpTempDir();
+ }
+
+ private void cleanUpTempDir() throws IOException {
+ try (Stream paths = Files.list(tempDir)) {
+ paths.forEach(this::deleteFile);
+ }
+ }
+
+ private void deleteFile(Path file) {
+ try {
+ Files.deleteIfExists(file);
+ } catch (IOException e) {
+ logger.error("Failed to delete file: {}", file, e);
+ }
+ }
+
+ @Test
+ void onTransactionBatchCompleted_NoErrors_ShouldWriteToSuccessLogFile() throws IOException {
+ testTransactionBatchCompleted(true, true);
+ }
+
+ @Test
+ void onTransactionBatchCompleted_HasErrors_ShouldWriteToFailureLogFile() throws IOException {
+ testTransactionBatchCompleted(false, true);
+ }
+
+ private void testTransactionBatchCompleted(boolean success, boolean logSuccessRecords)
+ throws IOException {
+ // Arrange
+ ImportLoggerConfig config =
+ ImportLoggerConfig.builder()
+ .logDirectoryPath(tempDir.toString() + "/")
+ .isLogRawSourceRecords(true)
+ .isLogSuccessRecords(logSuccessRecords)
+ .build();
+ SingleFileImportLogger importLogger = new SingleFileImportLogger(config, logWriterFactory);
+
+ List batchResults = createBatchResults(1, success);
+
+ // Act
+ for (ImportTransactionBatchResult batchResult : batchResults) {
+ importLogger.onTransactionBatchCompleted(batchResult);
+ importLogger.onDataChunkCompleted(
+ ImportDataChunkStatus.builder().dataChunkId(batchResult.getDataChunkId()).build());
+ }
+ importLogger.onAllDataChunksCompleted();
+
+ // Assert
+ assertTransactionBatchResults(batchResults, success, logSuccessRecords);
+ }
+
+ private List createBatchResults(int count, boolean success) {
+ List batchResults = new ArrayList<>();
+
+ for (int i = 1; i <= count; i++) {
+ List records =
+ Collections.singletonList(
+ ImportTaskResult.builder()
+ .rowNumber(i)
+ .rawRecord(OBJECT_MAPPER.createObjectNode())
+ .targets(Collections.EMPTY_LIST)
+ .build());
+ ImportTransactionBatchResult result =
+ ImportTransactionBatchResult.builder()
+ .dataChunkId(i)
+ .transactionBatchId(1)
+ .records(records)
+ .success(success)
+ .build();
+ batchResults.add(result);
+ }
+
+ return batchResults;
+ }
+
+ private void assertTransactionBatchResults(
+ List batchResults, boolean success, boolean logSuccessRecords)
+ throws IOException {
+ DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper();
+
+ // Single file log mode
+ Path logFileName =
+ tempDir.resolve(
+ success
+ ? SingleFileImportLogger.SUCCESS_LOG_FILE_NAME
+ : SingleFileImportLogger.FAILURE_LOG_FILE_NAME);
+ if (logSuccessRecords || !success) {
+ assertTrue(Files.exists(logFileName), "Log file should exist");
+
+ String logContent = new String(Files.readAllBytes(logFileName), StandardCharsets.UTF_8);
+
+ List logEntries =
+ objectMapper.readValue(
+ logContent, new TypeReference>() {});
+
+ assertEquals(
+ batchResults.size(),
+ logEntries.size(),
+ "Number of log entries should match the number of batch results");
+
+ for (int i = 0; i < batchResults.size(); i++) {
+ assertTransactionBatchResult(batchResults.get(i), logEntries.get(i));
+ }
+ } else {
+ assertFalse(Files.exists(logFileName), "Log file should not exist");
+ }
+ }
+
+ private void assertTransactionBatchResult(
+ ImportTransactionBatchResult expected, ImportTransactionBatchResult actual) {
+ assertEquals(expected.getDataChunkId(), actual.getDataChunkId(), "Data chunk ID should match");
+ assertEquals(
+ expected.getTransactionBatchId(),
+ actual.getTransactionBatchId(),
+ "Transaction batch ID should match");
+ assertEquals(
+ expected.getTransactionId(), actual.getTransactionId(), "Transaction ID should match");
+ assertEquals(expected.isSuccess(), actual.isSuccess(), "Success status should match");
+
+ List expectedRecords = expected.getRecords();
+ List actualRecords = actual.getRecords();
+ assertEquals(expectedRecords.size(), actualRecords.size(), "Number of records should match");
+ for (int j = 0; j < expectedRecords.size(); j++) {
+ ImportTaskResult expectedRecord = expectedRecords.get(j);
+ ImportTaskResult actualRecord = actualRecords.get(j);
+ assertEquals(
+ expectedRecord.getRowNumber(), actualRecord.getRowNumber(), "Row number should match");
+ assertEquals(
+ expectedRecord.getRawRecord(), actualRecord.getRawRecord(), "Raw record should match");
+ assertEquals(expectedRecord.getTargets(), actualRecord.getTargets(), "Targets should match");
+ }
+ }
+
+ @Test
+ void onDataChunkCompleted_NoErrors_ShouldWriteToSummaryLogFile() throws IOException {
+ testDataChunkCompleted(false);
+ }
+
+ @Test
+ void onDataChunkCompleted_HasErrors_ShouldWriteToSummaryLogFile() throws IOException {
+ testDataChunkCompleted(true);
+ }
+
+ private void testDataChunkCompleted(boolean hasErrors) throws IOException {
+ ImportLoggerConfig config =
+ ImportLoggerConfig.builder()
+ .logDirectoryPath(tempDir.toString() + "/")
+ .isLogRawSourceRecords(true)
+ .isLogSuccessRecords(true)
+ .build();
+ SingleFileImportLogger importLogger = new SingleFileImportLogger(config, logWriterFactory);
+
+ List dataChunkStatuses =
+ Stream.of(1, 2)
+ .map(id -> createDataChunkStatus(id, hasErrors))
+ .collect(Collectors.toList());
+
+ dataChunkStatuses.forEach(importLogger::onDataChunkCompleted);
+ importLogger.onAllDataChunksCompleted();
+
+ assertDataChunkStatusLog(SingleFileImportLogger.SUMMARY_LOG_FILE_NAME, dataChunkStatuses);
+ }
+
+ private ImportDataChunkStatus createDataChunkStatus(int dataChunkId, boolean hasErrors) {
+ return ImportDataChunkStatus.builder()
+ .dataChunkId(dataChunkId)
+ .startTime(Instant.now())
+ .endTime(Instant.now())
+ .totalRecords(100)
+ .successCount(hasErrors ? 90 : 100)
+ .failureCount(hasErrors ? 10 : 0)
+ .batchCount(5)
+ .totalDurationInMilliSeconds(1000)
+ .build();
+ }
+
+ private void assertDataChunkStatusLog(
+ String logFilePattern, List dataChunkStatuses) throws IOException {
+ assertSingleFileLog(tempDir, logFilePattern, dataChunkStatuses);
+ }
+
+ private void assertSingleFileLog(
+ Path tempDir, String logFileName, List dataChunkStatuses)
+ throws IOException {
+ Path summaryLogFile = tempDir.resolve(logFileName);
+ assertTrue(Files.exists(summaryLogFile));
+
+ String logContent = new String(Files.readAllBytes(summaryLogFile), StandardCharsets.UTF_8);
+ DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper();
+ List logEntries =
+ objectMapper.readValue(logContent, new TypeReference>() {});
+
+ assertEquals(dataChunkStatuses.size(), logEntries.size());
+ for (int i = 0; i < dataChunkStatuses.size(); i++) {
+ assertDataChunkStatusEquals(dataChunkStatuses.get(i), logEntries.get(i));
+ }
+ }
+
+ private void assertDataChunkStatusEquals(
+ ImportDataChunkStatus expected, ImportDataChunkStatus actual) {
+ assertEquals(expected.getDataChunkId(), actual.getDataChunkId());
+ assertEquals(expected.getStartTime(), actual.getStartTime());
+ assertEquals(expected.getEndTime(), actual.getEndTime());
+ assertEquals(expected.getTotalRecords(), actual.getTotalRecords());
+ assertEquals(expected.getSuccessCount(), actual.getSuccessCount());
+ assertEquals(expected.getFailureCount(), actual.getFailureCount());
+ assertEquals(expected.getBatchCount(), actual.getBatchCount());
+ assertEquals(
+ expected.getTotalDurationInMilliSeconds(), actual.getTotalDurationInMilliSeconds());
+ }
+}
diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLoggerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLoggerTest.java
new file mode 100644
index 000000000..de7ee49be
--- /dev/null
+++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLoggerTest.java
@@ -0,0 +1,238 @@
+package com.scalar.db.dataloader.core.dataimport.log;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.scalar.db.dataloader.core.DataLoaderObjectMapper;
+import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
+import com.scalar.db.dataloader.core.dataimport.log.writer.DefaultLogWriterFactory;
+import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory;
+import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
+import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+class SplitByDataChunkImportLoggerTest {
+
+ private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper();
+
+ @TempDir Path tempDir;
+
+ private LogWriterFactory logWriterFactory;
+
+ @BeforeEach
+ void setUp() {
+ ImportLoggerConfig importLoggerConfig =
+ ImportLoggerConfig.builder()
+ .prettyPrint(false)
+ .isLogSuccessRecords(false)
+ .isLogRawSourceRecords(false)
+ .logDirectoryPath("path")
+ .build();
+ logWriterFactory = new DefaultLogWriterFactory(importLoggerConfig);
+ }
+
+ @Test
+ void onTransactionBatchCompleted_NoErrors_ShouldWriteToDataChunkSuccessFiles()
+ throws IOException {
+ testTransactionBatchCompleted(true, true);
+ }
+
+ @Test
+ void onTransactionBatchCompleted_HasErrors_ShouldWriteToDataChunkFailureFiles()
+ throws IOException {
+ testTransactionBatchCompleted(false, true);
+ }
+
+ @Test
+ void onTransactionBatchCompleted_NoErrorsAndNoSuccessFileLogging_ShouldNotWriteToSuccessFiles()
+ throws IOException {
+ testTransactionBatchCompleted(true, false);
+ }
+
+ private void testTransactionBatchCompleted(boolean success, boolean logSuccessRecords)
+ throws IOException {
+ // Arrange
+ ImportLoggerConfig config =
+ ImportLoggerConfig.builder()
+ .logDirectoryPath(tempDir.toString() + "/")
+ .isLogRawSourceRecords(true)
+ .isLogSuccessRecords(logSuccessRecords)
+ .build();
+ SplitByDataChunkImportLogger importLogger =
+ new SplitByDataChunkImportLogger(config, logWriterFactory);
+
+ List batchResults = new ArrayList<>();
+
+ for (int i = 1; i <= 3; i++) {
+ List records =
+ Collections.singletonList(
+ ImportTaskResult.builder()
+ .rowNumber(i)
+ .targets(Collections.EMPTY_LIST)
+ .rawRecord(OBJECT_MAPPER.createObjectNode())
+ .build());
+ ImportTransactionBatchResult result =
+ ImportTransactionBatchResult.builder()
+ .dataChunkId(i)
+ .transactionBatchId(1)
+ .records(records)
+ .success(success)
+ .build();
+ batchResults.add(result);
+ }
+
+ // Act
+ for (ImportTransactionBatchResult batchResult : batchResults) {
+ importLogger.onTransactionBatchCompleted(batchResult);
+ importLogger.onDataChunkCompleted(
+ ImportDataChunkStatus.builder().dataChunkId(batchResult.getDataChunkId()).build());
+ }
+ importLogger.onAllDataChunksCompleted();
+
+ // Assert
+ for (int i = 0; i < batchResults.size(); i++) {
+ ImportTransactionBatchResult batchResult = batchResults.get(i);
+ String logFileNameFormat =
+ success
+ ? SplitByDataChunkImportLogger.SUCCESS_LOG_FILE_NAME_FORMAT
+ : SplitByDataChunkImportLogger.FAILURE_LOG_FILE_NAME_FORMAT;
+ Path dataChunkLogFileName = tempDir.resolve(String.format(logFileNameFormat, i + 1));
+
+ if (success && logSuccessRecords) {
+ assertTrue(Files.exists(dataChunkLogFileName), "Data chunk success log file should exist");
+ assertTransactionBatchResult(batchResult, dataChunkLogFileName);
+ } else if (!success) {
+ assertTrue(Files.exists(dataChunkLogFileName), "Data chunk failure log file should exist");
+ assertTransactionBatchResult(batchResult, dataChunkLogFileName);
+ } else {
+ assertFalse(
+ Files.exists(dataChunkLogFileName), "Data chunk success log file should not exist");
+ }
+ }
+ }
+
+ private void assertTransactionBatchResult(
+ ImportTransactionBatchResult expected, Path dataChunkLogFileName) throws IOException {
+ // String logContent = Files.readString(dataChunkLogFileName);
+ String logContent =
+ new String(Files.readAllBytes(dataChunkLogFileName), StandardCharsets.UTF_8);
+ DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper();
+ List logEntries =
+ objectMapper.readValue(
+ logContent, new TypeReference>() {});
+ ImportTransactionBatchResult actual = logEntries.get(0);
+
+ assertEquals(expected.getDataChunkId(), actual.getDataChunkId(), "Data chunk ID should match");
+ assertEquals(
+ expected.getTransactionBatchId(),
+ actual.getTransactionBatchId(),
+ "Transaction batch ID should match");
+ assertEquals(
+ expected.getTransactionId(), actual.getTransactionId(), "Transaction ID should match");
+ assertEquals(expected.isSuccess(), actual.isSuccess(), "Success status should match");
+
+ List expectedRecords = expected.getRecords();
+ List actualRecords = actual.getRecords();
+ assertEquals(expectedRecords.size(), actualRecords.size(), "Number of records should match");
+ for (int j = 0; j < expectedRecords.size(); j++) {
+ ImportTaskResult expectedRecord = expectedRecords.get(j);
+ ImportTaskResult actualRecord = actualRecords.get(j);
+ assertEquals(
+ expectedRecord.getRowNumber(), actualRecord.getRowNumber(), "Row number should match");
+ assertEquals(
+ expectedRecord.getRawRecord(), actualRecord.getRawRecord(), "Raw record should match");
+ assertEquals(expectedRecord.getTargets(), actualRecord.getTargets(), "Targets should match");
+ }
+ }
+
+ @Test
+ void onDataChunkCompleted_NoErrors_ShouldWriteToSummaryLogFile() throws IOException {
+ testDataChunkCompleted(
+ String.format(SplitByDataChunkImportLogger.SUMMARY_LOG_FILE_NAME_FORMAT, "%d"), false);
+ }
+
+ @Test
+ void onDataChunkCompleted_HasErrors_ShouldWriteToSummaryLogFile() throws IOException {
+ testDataChunkCompleted(
+ String.format(SplitByDataChunkImportLogger.SUMMARY_LOG_FILE_NAME_FORMAT, "%d"), true);
+ }
+
+ private void testDataChunkCompleted(String logFilePattern, boolean hasErrors) throws IOException {
+ ImportLoggerConfig config =
+ ImportLoggerConfig.builder()
+ .logDirectoryPath(tempDir.toString() + "/")
+ .isLogRawSourceRecords(true)
+ .isLogSuccessRecords(true)
+ .build();
+ SplitByDataChunkImportLogger importLogger =
+ new SplitByDataChunkImportLogger(config, logWriterFactory);
+
+ List dataChunkStatuses =
+ IntStream.rangeClosed(1, 2)
+ .mapToObj(id -> createDataChunkStatus(id, hasErrors))
+ .collect(Collectors.toList());
+
+ dataChunkStatuses.forEach(importLogger::onDataChunkCompleted);
+ importLogger.onAllDataChunksCompleted();
+
+ assertDataChunkStatusLog(logFilePattern, dataChunkStatuses);
+ }
+
+ private ImportDataChunkStatus createDataChunkStatus(int dataChunkId, boolean hasErrors) {
+ return ImportDataChunkStatus.builder()
+ .dataChunkId(dataChunkId)
+ .startTime(Instant.now())
+ .endTime(Instant.now())
+ .totalRecords(100)
+ .successCount(hasErrors ? 90 : 100)
+ .failureCount(hasErrors ? 10 : 0)
+ .batchCount(5)
+ .totalDurationInMilliSeconds(1000)
+ .build();
+ }
+
+ private void assertDataChunkStatusLog(
+ String logFilePattern, List dataChunkStatuses) throws IOException {
+ for (ImportDataChunkStatus dataChunkStatus : dataChunkStatuses) {
+ String logFileName = String.format(logFilePattern, dataChunkStatus.getDataChunkId());
+ Path dataChunkLogFile = tempDir.resolve(logFileName);
+ assertTrue(Files.exists(dataChunkLogFile), "Data chunk summary log file should exist");
+
+ // String logContent = Files.readString(dataChunkLogFile);
+ String logContent = new String(Files.readAllBytes(dataChunkLogFile), StandardCharsets.UTF_8);
+ DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper();
+ List logEntries =
+ objectMapper.readValue(logContent, new TypeReference>() {});
+
+ assertEquals(1, logEntries.size());
+ assertDataChunkStatusEquals(dataChunkStatus, logEntries.get(0));
+ }
+ }
+
+ private void assertDataChunkStatusEquals(
+ ImportDataChunkStatus expected, ImportDataChunkStatus actual) {
+ assertEquals(expected.getDataChunkId(), actual.getDataChunkId());
+ assertEquals(expected.getStartTime(), actual.getStartTime());
+ assertEquals(expected.getEndTime(), actual.getEndTime());
+ assertEquals(expected.getTotalRecords(), actual.getTotalRecords());
+ assertEquals(expected.getSuccessCount(), actual.getSuccessCount());
+ assertEquals(expected.getFailureCount(), actual.getFailureCount());
+ assertEquals(expected.getBatchCount(), actual.getBatchCount());
+ assertEquals(
+ expected.getTotalDurationInMilliSeconds(), actual.getTotalDurationInMilliSeconds());
+ }
+}
diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactoryTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactoryTest.java
new file mode 100644
index 000000000..28c31e5c0
--- /dev/null
+++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactoryTest.java
@@ -0,0 +1,39 @@
+package com.scalar.db.dataloader.core.dataimport.log.writer;
+
+import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class DefaultLogWriterFactoryTest {
+
+ String filePath = Paths.get("").toAbsolutePath() + "/sample.log";
+ DefaultLogWriterFactory defaultLogWriterFactory;
+
+ @AfterEach
+ void removeFileIfCreated() {
+ File file = new File(filePath);
+ if (file.exists()) {
+ file.deleteOnExit();
+ }
+ }
+
+ @Test
+ void createLogWriter_withValidLocalLogFilePath_shouldReturnLocalFileLogWriterObject()
+ throws IOException {
+ defaultLogWriterFactory =
+ new DefaultLogWriterFactory(
+ ImportLoggerConfig.builder()
+ .prettyPrint(false)
+ .isLogSuccessRecords(false)
+ .isLogRawSourceRecords(false)
+ .logDirectoryPath("path")
+ .build());
+ LogWriter logWriter = defaultLogWriterFactory.createLogWriter(filePath);
+ Assertions.assertEquals(LocalFileLogWriter.class, logWriter.getClass());
+ logWriter.close();
+ }
+}
diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java
index 94acd20ac..9ff35d4f1 100644
--- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java
+++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java
@@ -1,5 +1,6 @@
package com.scalar.db.dataloader.core.dataimport.processor;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -95,7 +96,7 @@ void test_importProcessWithStorage() {
csvImportProcessor = new CsvImportProcessor(params);
Map statusList =
csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader());
- assert statusList != null;
+ assertThat(statusList).isNotNull();
Assertions.assertEquals(1, statusList.size());
}
@@ -115,7 +116,7 @@ void test_importProcessWithTransaction() {
csvImportProcessor = new CsvImportProcessor(params);
Map statusList =
csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader());
- assert statusList != null;
+ assertThat(statusList).isNotNull();
Assertions.assertEquals(1, statusList.size());
}
}
diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java
index aa9a106a0..4cc9db655 100644
--- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java
+++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java
@@ -1,5 +1,6 @@
package com.scalar.db.dataloader.core.dataimport.processor;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -95,7 +96,7 @@ void test_importProcessWithStorage() {
jsonImportProcessor = new JsonImportProcessor(params);
Map statusList =
jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader());
- assert statusList != null;
+ assertThat(statusList).isNotNull();
Assertions.assertEquals(1, statusList.size());
}
@@ -115,7 +116,7 @@ void test_importProcessWithTransaction() {
jsonImportProcessor = new JsonImportProcessor(params);
Map statusList =
jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader());
- assert statusList != null;
+ assertThat(statusList).isNotNull();
Assertions.assertEquals(1, statusList.size());
}
}
diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java
index e3db39175..b2ca39007 100644
--- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java
+++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java
@@ -1,5 +1,6 @@
package com.scalar.db.dataloader.core.dataimport.processor;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -95,7 +96,7 @@ void test_importProcessWithStorage() {
jsonLinesImportProcessor = new JsonLinesImportProcessor(params);
Map statusList =
jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader());
- assert statusList != null;
+ assertThat(statusList).isNotNull();
Assertions.assertEquals(1, statusList.size());
}
@@ -115,7 +116,7 @@ void test_importProcessWithTransaction() {
jsonLinesImportProcessor = new JsonLinesImportProcessor(params);
Map statusList =
jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader());
- assert statusList != null;
+ assertThat(statusList).isNotNull();
Assertions.assertEquals(1, statusList.size());
}
}
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 23254eb3a..bab1669d8 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -37,7 +37,7 @@
-
+