diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ColumnInfo.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ColumnInfo.java index 685f58a83..080d14293 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ColumnInfo.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ColumnInfo.java @@ -9,6 +9,7 @@ *

This class holds the metadata for a column, including the namespace (schema), table name, and * the column name within the table. */ +@SuppressWarnings("SameNameButDifferent") @Value @Builder public class ColumnInfo { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ScanRange.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ScanRange.java index b1ae7b02d..726ca0e7f 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ScanRange.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/ScanRange.java @@ -4,6 +4,7 @@ import lombok.Value; /** * The scan range which is used in data export scan filtering */ +@SuppressWarnings("SameNameButDifferent") @Value public class ScanRange { /** The key for scan start filter */ diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java index f66efdc9d..f228b8fc1 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportManager.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@SuppressWarnings({"SameNameButDifferent", "FutureReturnValueIgnored"}) @RequiredArgsConstructor public abstract class ExportManager { private static final Logger logger = LoggerFactory.getLogger(ExportManager.class); diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportOptions.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportOptions.java index da515cf3c..b7cad03fa 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportOptions.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportOptions.java @@ -9,8 +9,8 @@ import lombok.Builder; import lombok.Data; -/** Options for a ScalarDB export data operation */ -@SuppressWarnings("SameNameButDifferent") +/** Options for a ScalarDB export data operation. */ +@SuppressWarnings({"SameNameButDifferent", "MissingSummary"}) @Builder(builderMethodName = "hiddenBuilder") @Data public class ExportOptions { @@ -31,6 +31,15 @@ public class ExportOptions { @Builder.Default private List projectionColumns = Collections.emptyList(); private List sortOrders; + /** + * Generates and returns an export options builder. + * + * @param namespace namespaces for export + * @param tableName tableName for export + * @param scanPartitionKey scan partition key for export + * @param outputFileFormat output file format for export + * @return a configured export options builder + */ public static ExportOptionsBuilder builder( String namespace, String tableName, Key scanPartitionKey, FileFormat outputFileFormat) { return hiddenBuilder() diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonLineProducerTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonLineProducerTask.java index 689084698..4e19cd464 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonLineProducerTask.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonLineProducerTask.java @@ -15,13 +15,10 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class JsonLineProducerTask extends ProducerTask { private final DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); - private static final Logger logger = LoggerFactory.getLogger(JsonLineProducerTask.class); /** * Class constructor diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonProducerTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonProducerTask.java index 9fb8014c6..4ea10130f 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonProducerTask.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/JsonProducerTask.java @@ -16,14 +16,11 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class JsonProducerTask extends ProducerTask { private final DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); private final boolean prettyPrintJson; - private static final Logger logger = LoggerFactory.getLogger(JsonProducerTask.class); /** * Class constructor diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerResult.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerResult.java deleted file mode 100644 index 9506fcd72..000000000 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerResult.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.scalar.db.dataloader.core.dataexport.producer; - -import com.fasterxml.jackson.databind.JsonNode; -import lombok.Builder; -import lombok.Value; - -@Builder -@Value -public class ProducerResult { - JsonNode jsonNode; - String csvSource; - boolean poisonPill; -} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTaskFactory.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTaskFactory.java index 18adc8de6..f4577e463 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTaskFactory.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/producer/ProducerTaskFactory.java @@ -8,6 +8,7 @@ import java.util.Map; import lombok.RequiredArgsConstructor; +@SuppressWarnings("SameNameButDifferent") @RequiredArgsConstructor public class ProducerTaskFactory { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/validation/ExportOptionsValidator.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/validation/ExportOptionsValidator.java index 7bf7645b0..1a0407160 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/validation/ExportOptionsValidator.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/validation/ExportOptionsValidator.java @@ -17,6 +17,7 @@ * A validator for ensuring that export options are consistent with the ScalarDB table metadata and * follow the defined constraints. */ +@SuppressWarnings("SameNameButDifferent") @NoArgsConstructor(access = AccessLevel.PRIVATE) public class ExportOptionsValidator { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java index f1984d6c2..8b08fbb53 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java @@ -35,6 +35,7 @@ *

  • Notifying listeners of various import events * */ +@SuppressWarnings("SameNameButDifferent") @AllArgsConstructor public class ImportManager implements ImportEventListener { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java index 6d3206765..359fb1f88 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportOptions.java @@ -7,7 +7,8 @@ import lombok.Builder; import lombok.Data; -/** Import options to import data into one or more ScalarDB tables */ +/** Import options to import data into one or more ScalarDB tables. */ +@SuppressWarnings({"SameNameButDifferent", "MissingSummary"}) @Builder @Data public class ImportOptions { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFile.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFile.java index 6a2229c18..a6888abfb 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFile.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFile.java @@ -11,6 +11,7 @@ * Represents a control file that holds control file tables which contains the column mappings that * maps a source file column to the actual database table column. */ +@SuppressWarnings("SameNameButDifferent") @Getter @Setter public class ControlFile { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileTable.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileTable.java index efcfb0bc0..c65d05887 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileTable.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileTable.java @@ -12,6 +12,7 @@ * table name, and field mappings. This class is used to define how data from a control file maps to * a specific table in ScalarDB. */ +@SuppressWarnings("SameNameButDifferent") @Getter @Setter public class ControlFileTable { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileTableFieldMapping.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileTableFieldMapping.java index 106857330..74785579e 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileTableFieldMapping.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/controlfile/ControlFileTableFieldMapping.java @@ -10,6 +10,7 @@ * This class defines how data from a specific field in the input source should be mapped to the * corresponding column in the database. */ +@SuppressWarnings("SameNameButDifferent") @Getter @Setter public class ControlFileTableFieldMapping { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportDataChunk.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportDataChunk.java index 69ed97421..2ab6539d6 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportDataChunk.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportDataChunk.java @@ -4,7 +4,8 @@ import lombok.Builder; import lombok.Data; -/** * Import data chunk data */ +/** * Import data chunk data. */ +@SuppressWarnings({"SameNameButDifferent", "MissingSummary"}) @Data @Builder public class ImportDataChunk { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportDataChunkStatus.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportDataChunkStatus.java index d6db3e1e7..0009f71cd 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportDataChunkStatus.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportDataChunkStatus.java @@ -6,7 +6,8 @@ import lombok.Builder; import lombok.Data; -/** * A DTO to store import data chunk details */ +/** * A DTO to store import data chunk details. */ +@SuppressWarnings({"SameNameButDifferent", "MissingSummary"}) @Data @Builder @JsonDeserialize(builder = ImportDataChunkStatus.ImportDataChunkStatusBuilder.class) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportRow.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportRow.java index 824ca4ffa..84bcd0af3 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportRow.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/datachunk/ImportRow.java @@ -3,7 +3,8 @@ import com.fasterxml.jackson.databind.JsonNode; import lombok.Value; -/** Stores data related to a single row on import file */ +/** Stores data related to a single row on import file. */ +@SuppressWarnings("SameNameButDifferent") @Value public class ImportRow { int rowNumber; diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/AbstractImportLogger.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/AbstractImportLogger.java new file mode 100644 index 000000000..7addf96a5 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/AbstractImportLogger.java @@ -0,0 +1,243 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.dataloader.core.Constants; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.ImportEventListener; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriter; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import lombok.RequiredArgsConstructor; + +/** + * An abstract base class for logging import events during data loading operations. This class + * implements the {@link ImportEventListener} interface and provides common functionality for + * logging transaction batch results and managing event listeners. Concrete implementations should + * define how to log transaction batches and handle errors. + */ +@SuppressWarnings("SameNameButDifferent") +@RequiredArgsConstructor +public abstract class AbstractImportLogger implements ImportEventListener { + + /** Object mapper used for JSON serialization/deserialization. */ + protected static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); + + /** Configuration for the import logger. */ + protected final ImportLoggerConfig config; + + /** Factory for creating log writers. */ + protected final LogWriterFactory logWriterFactory; + + /** List of event listeners to be notified of import events. */ + protected final List listeners = new ArrayList<>(); + + /** + * Called when a data chunk import is started. Currently, this implementation does not log the + * start of a data chunk. + * + * @param importDataChunkStatus the status of the data chunk being imported + */ + @Override + public void onDataChunkStarted(ImportDataChunkStatus importDataChunkStatus) { + // Currently we are not logging the start of a data chunk + } + + /** + * Called when a transaction batch is started. Currently, this implementation does not log the + * start of a transaction batch, but it notifies all registered listeners. + * + * @param batchStatus the status of the transaction batch being started + */ + @Override + public void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) { + // Currently we are not logging the start of a transaction batch + notifyTransactionBatchStarted(batchStatus); + } + + /** + * Called when a transaction batch is completed. This method logs the transaction batch result if + * it should be logged based on the configuration, and notifies all registered listeners. + * + * @param batchResult the result of the completed transaction batch + */ + @Override + public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) { + // skip logging success records if the configuration is set to skip + if (shouldSkipLoggingSuccess(batchResult)) { + return; + } + + logTransactionBatch(batchResult); + notifyTransactionBatchCompleted(batchResult); + } + + /** + * Logs a transaction batch result. This method should be implemented by concrete subclasses to + * define how to log transaction batch results. + * + * @param batchResult the transaction batch result to log + */ + protected abstract void logTransactionBatch(ImportTransactionBatchResult batchResult); + + /** + * Determines whether logging of a successful transaction batch should be skipped. Logging is + * skipped if the batch was successful and the configuration specifies not to log success records. + * + * @param batchResult the transaction batch result to check + * @return true if logging should be skipped, false otherwise + */ + protected boolean shouldSkipLoggingSuccess(ImportTransactionBatchResult batchResult) { + return batchResult.isSuccess() && !config.isLogSuccessRecords(); + } + + /** + * Creates a filtered JSON representation of a transaction batch result. This method filters out + * raw record data if the configuration specifies not to log raw source records. + * + * @param batchResult the transaction batch result to convert to JSON + * @return a JsonNode representing the filtered transaction batch result + */ + protected JsonNode createFilteredTransactionBatchLogJsonNode( + ImportTransactionBatchResult batchResult) { + + // If the batch result does not contain any records, return the batch result as is + if (batchResult.getRecords() == null) { + return OBJECT_MAPPER.valueToTree(batchResult); + } + + // Create a new list to store the modified import task results + List modifiedRecords = new ArrayList<>(); + + // Loop over the records in the batchResult + for (ImportTaskResult taskResult : batchResult.getRecords()) { + // Create a new ImportTaskResult and not add the raw record yet + List targetResults = + batchResult.isSuccess() + ? taskResult.getTargets() + : updateTargetStatusForAbortedTransactionBatch(taskResult.getTargets()); + ImportTaskResult.ImportTaskResultBuilder builder = + ImportTaskResult.builder() + .rowNumber(taskResult.getRowNumber()) + .targets(targetResults) + .dataChunkId(taskResult.getDataChunkId()); + + // Only add the raw record if the configuration is set to log raw source data + if (config.isLogRawSourceRecords()) { + builder.rawRecord(taskResult.getRawRecord()); + } + ImportTaskResult modifiedTaskResult = builder.build(); + + // Add the modified task result to the list + modifiedRecords.add(modifiedTaskResult); + } + + // Create a new transaction batch result with the modified import task results + ImportTransactionBatchResult modifiedBatchResult = + ImportTransactionBatchResult.builder() + .dataChunkId(batchResult.getDataChunkId()) + .transactionBatchId(batchResult.getTransactionBatchId()) + .transactionId(batchResult.getTransactionId()) + .records(modifiedRecords) + .errors(batchResult.getErrors()) + .success(batchResult.isSuccess()) + .build(); + + // Convert the modified batch result to a JsonNode + return OBJECT_MAPPER.valueToTree(modifiedBatchResult); + } + + /** + * Safely closes a log writer. If an IOException occurs during closing, it logs the error using + * the {@link #logError} method. + * + * @param logWriter the log writer to close, may be null + */ + protected void closeLogWriter(LogWriter logWriter) { + if (logWriter != null) { + try { + logWriter.close(); + } catch (IOException e) { + logError("Failed to close a log writer", e); + } + } + } + + /** + * Logs an error message and exception. This method should be implemented by concrete subclasses + * to define how to log errors. + * + * @param errorMessage the error message to log + * @param e the exception that caused the error + */ + protected abstract void logError(String errorMessage, Exception e); + + /** + * Creates a log writer for the specified log file path. + * + * @param logFilePath the path to the log file + * @return a new log writer + * @throws IOException if an I/O error occurs while creating the log writer + */ + protected LogWriter createLogWriter(String logFilePath) throws IOException { + return logWriterFactory.createLogWriter(logFilePath); + } + + /** + * Notifies all registered listeners that a transaction batch has started. + * + * @param status the status of the transaction batch that has started + */ + private void notifyTransactionBatchStarted(ImportTransactionBatchStatus status) { + for (ImportEventListener listener : listeners) { + listener.onTransactionBatchStarted(status); + } + } + + /** + * Notifies all registered listeners that a transaction batch has completed. + * + * @param batchResult the result of the completed transaction batch + */ + private void notifyTransactionBatchCompleted(ImportTransactionBatchResult batchResult) { + for (ImportEventListener listener : listeners) { + listener.onTransactionBatchCompleted(batchResult); + } + } + + /** + * Updates the status of target results for an aborted transaction batch. For each target with a + * status of SAVED, changes the status to ABORTED and adds an error message. + * + * @param targetResults the list of target results to update + * @return the updated list of target results + */ + private List updateTargetStatusForAbortedTransactionBatch( + List targetResults) { + for (int i = 0; i < targetResults.size(); i++) { + ImportTargetResult target = targetResults.get(i); + if (target.getStatus().equals(ImportTargetResultStatus.SAVED)) { + ImportTargetResult newTarget = + ImportTargetResult.builder() + .importAction(target.getImportAction()) + .status(ImportTargetResultStatus.ABORTED) + .importedRecord(target.getImportedRecord()) + .namespace(target.getNamespace()) + .tableName(target.getTableName()) + .dataMapped(target.isDataMapped()) + .errors(Collections.singletonList(Constants.ABORT_TRANSACTION_STATUS)) + .build(); + targetResults.set(i, newTarget); + } + } + return targetResults; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerConfig.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerConfig.java new file mode 100644 index 000000000..1bdec6cd7 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerConfig.java @@ -0,0 +1,38 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import lombok.Builder; +import lombok.Value; + +/** + * Configuration class for import loggers. This class uses Lombok's {@code @Value} annotation to + * create an immutable class and {@code @Builder} annotation to provide a builder pattern for + * creating instances. + */ +@Value +@Builder +@SuppressWarnings("SameNameButDifferent") +public class ImportLoggerConfig { + /** + * The directory path where log files will be stored. This path should end with a directory + * separator (e.g., "/"). + */ + String logDirectoryPath; + + /** + * Whether to log records that were successfully imported. If true, successful import operations + * will be logged to success log files. + */ + boolean isLogSuccessRecords; + + /** + * Whether to log raw source records that failed to be imported. If true, failed import operations + * will be logged to failure log files. + */ + boolean isLogRawSourceRecords; + + /** + * Whether to format the logs with pretty printing. If true, the JSON logs will be formatted with + * indentation for better readability. + */ + boolean prettyPrint; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java new file mode 100644 index 000000000..cdd7a66d0 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLogger.java @@ -0,0 +1,230 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriter; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import java.io.IOException; +import javax.annotation.concurrent.ThreadSafe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of {@link AbstractImportLogger} that uses a single file for each log type. + * Unlike {@link SplitByDataChunkImportLogger}, this logger creates only three log files: one for + * successful operations, one for failed operations, and one for summary information, regardless of + * the number of data chunks processed. + * + *

    The log files are named as follows: + * + *

      + *
    • success.json - Records of successful import operations + *
    • failure.json - Records of failed import operations + *
    • summary.log - Summary information for all data chunks + *
    + */ +@ThreadSafe +public class SingleFileImportLogger extends AbstractImportLogger { + + protected static final String SUMMARY_LOG_FILE_NAME = "summary.log"; + protected static final String SUCCESS_LOG_FILE_NAME = "success.json"; + protected static final String FAILURE_LOG_FILE_NAME = "failure.json"; + private static final Logger logger = LoggerFactory.getLogger(SingleFileImportLogger.class); + private volatile LogWriter summaryLogWriter; + private final LogWriter successLogWriter; + private final LogWriter failureLogWriter; + + /** + * Creates a new instance of SingleFileImportLogger. Initializes the success and failure log + * writers immediately. The summary log writer is created on demand when the first data chunk is + * completed. + * + * @param config the configuration for the logger + * @param logWriterFactory the factory to create log writers + * @throws IOException if an I/O error occurs while creating the log writers + */ + public SingleFileImportLogger(ImportLoggerConfig config, LogWriterFactory logWriterFactory) + throws IOException { + super(config, logWriterFactory); + successLogWriter = createLogWriter(config.getLogDirectoryPath() + SUCCESS_LOG_FILE_NAME); + failureLogWriter = createLogWriter(config.getLogDirectoryPath() + FAILURE_LOG_FILE_NAME); + } + + /** + * Called when an import task is completed. Writes the task result details to the appropriate log + * files based on the configuration. + * + * @param taskResult the result of the completed import task + */ + @Override + public void onTaskComplete(ImportTaskResult taskResult) { + if (!config.isLogSuccessRecords() && !config.isLogRawSourceRecords()) return; + try { + writeImportTaskResultDetailToLogs(taskResult); + } catch (Exception e) { + logError("Failed to write success/failure logs", e); + } + } + + /** + * Called to add or update the status of a data chunk. This implementation does nothing as the + * status is only logged when the data chunk is completed. + * + * @param status the status of the data chunk + */ + @Override + public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {} + + /** + * Called when a data chunk is completed. Logs the summary of the data chunk to the summary log + * file. + * + * @param dataChunkStatus the status of the completed data chunk + */ + @Override + public void onDataChunkCompleted(ImportDataChunkStatus dataChunkStatus) { + try { + logDataChunkSummary(dataChunkStatus); + } catch (IOException e) { + logError("Failed to log the data chunk summary", e); + } + } + + /** Called when all data chunks are completed. Closes all log writers. */ + @Override + public void onAllDataChunksCompleted() { + closeAllLogWriters(); + } + + /** + * Logs a transaction batch result to the appropriate log file based on its success status. + * + * @param batchResult the transaction batch result to log + */ + @Override + protected void logTransactionBatch(ImportTransactionBatchResult batchResult) { + try { + LogWriter logWriter = getLogWriterForTransactionBatch(batchResult); + JsonNode jsonNode = createFilteredTransactionBatchLogJsonNode(batchResult); + writeToLogWriter(logWriter, jsonNode); + } catch (IOException e) { + logError("Failed to write a transaction batch record to the log file", e); + } + } + + /** + * Logs an error message with an exception to the logger. + * + * @param errorMessage the error message to log + * @param exception the exception associated with the error + */ + @Override + protected void logError(String errorMessage, Exception exception) { + logger.error(errorMessage, exception); + throw new RuntimeException(errorMessage, exception); + } + + /** + * Logs the summary of a data chunk to the summary log file. Creates the summary log writer if it + * doesn't exist yet. + * + * @param dataChunkStatus the status of the data chunk to log + * @throws IOException if an I/O error occurs while writing to the log + */ + private void logDataChunkSummary(ImportDataChunkStatus dataChunkStatus) throws IOException { + ensureSummaryLogWriterInitialized(); + writeImportDataChunkSummary(dataChunkStatus, summaryLogWriter); + } + + /** + * Ensures that the summary log writer is initialized in a thread-safe manner. + * + * @throws IOException if an error occurs while creating the log writer + */ + private void ensureSummaryLogWriterInitialized() throws IOException { + if (summaryLogWriter == null) { + synchronized (this) { + if (summaryLogWriter == null) { + summaryLogWriter = createLogWriter(config.getLogDirectoryPath() + SUMMARY_LOG_FILE_NAME); + } + } + } + } + + /** + * Writes the summary of a data chunk to the specified log writer. + * + * @param dataChunkStatus the status of the data chunk to log + * @param logWriter the log writer to write to + * @throws IOException if an I/O error occurs while writing to the log + */ + private void writeImportDataChunkSummary( + ImportDataChunkStatus dataChunkStatus, LogWriter logWriter) throws IOException { + JsonNode jsonNode = OBJECT_MAPPER.valueToTree(dataChunkStatus); + writeToLogWriter(logWriter, jsonNode); + } + + /** + * Gets the appropriate log writer for a transaction batch based on its success status. If the log + * writer doesn't exist yet, it will be created. + * + * @param batchResult the transaction batch result + * @return the log writer for the batch + * @throws IOException if an I/O error occurs while creating a new log writer + */ + private LogWriter getLogWriterForTransactionBatch(ImportTransactionBatchResult batchResult) + throws IOException { + return batchResult.isSuccess() ? successLogWriter : failureLogWriter; + } + + /** + * Writes the details of an import task result to the appropriate log files. Successful targets + * are written to success logs and failed targets to failure logs. The method is synchronized on + * the respective log writers to ensure thread safety. + * + * @param importTaskResult the result of the import task to log + * @throws IOException if an I/O error occurs while writing to the logs + */ + private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult) + throws IOException { + for (ImportTargetResult target : importTaskResult.getTargets()) { + if (config.isLogSuccessRecords() + && target.getStatus().equals(ImportTargetResultStatus.SAVED)) { + + writeToLogWriter(successLogWriter, OBJECT_MAPPER.valueToTree(target)); + } + if (config.isLogRawSourceRecords() + && !target.getStatus().equals(ImportTargetResultStatus.SAVED)) { + writeToLogWriter(failureLogWriter, OBJECT_MAPPER.valueToTree(target)); + } + } + } + + /** + * Writes a JSON node to a log writer and flushes the writer. + * + * @param logWriter the log writer to write to + * @param jsonNode the JSON node to write + * @throws IOException if an I/O error occurs while writing + */ + private void writeToLogWriter(LogWriter logWriter, JsonNode jsonNode) throws IOException { + logWriter.write(jsonNode); + } + + /** + * Closes all log writers and sets them to null. This method is called when all data chunks have + * been completed. + */ + private void closeAllLogWriters() { + synchronized (this) { + closeLogWriter(summaryLogWriter); + closeLogWriter(successLogWriter); + closeLogWriter(failureLogWriter); + summaryLogWriter = null; + } + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java new file mode 100644 index 000000000..99d1fe70d --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLogger.java @@ -0,0 +1,305 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogFileType; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriter; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.concurrent.ThreadSafe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of {@link AbstractImportLogger} that creates separate log files for each data + * chunk. This logger maintains separate log writers for success, failure, and summary logs for each + * data chunk, allowing for better organization and easier tracking of import operations by data + * chunk. + * + *

    The log files are named using the following formats: + * + *

      + *
    • Success logs: data_chunk_[id]_success.json + *
    • Failure logs: data_chunk_[id]_failure.json + *
    • Summary logs: data_chunk_[id]_summary.json + *
    + * + *

    Log writers are created on demand and closed when their corresponding data chunk is completed. + */ +@ThreadSafe +public class SplitByDataChunkImportLogger extends AbstractImportLogger { + + protected static final String SUMMARY_LOG_FILE_NAME_FORMAT = "data_chunk_%s_summary.json"; + protected static final String FAILURE_LOG_FILE_NAME_FORMAT = "data_chunk_%s_failure.json"; + protected static final String SUCCESS_LOG_FILE_NAME_FORMAT = "data_chunk_%s_success.json"; + + private static final Logger logger = LoggerFactory.getLogger(SplitByDataChunkImportLogger.class); + private final Map summaryLogWriters = new ConcurrentHashMap<>(); + private final Map successLogWriters = new ConcurrentHashMap<>(); + private final Map failureLogWriters = new ConcurrentHashMap<>(); + + /** + * Creates a new instance of SplitByDataChunkImportLogger. + * + * @param config the configuration for the logger + * @param logWriterFactory the factory to create log writers + */ + public SplitByDataChunkImportLogger( + ImportLoggerConfig config, LogWriterFactory logWriterFactory) { + super(config, logWriterFactory); + } + + /** + * Called when an import task is completed. Writes the task result details to the appropriate log + * files based on the configuration. + * + * @param taskResult the result of the completed import task + */ + @Override + public void onTaskComplete(ImportTaskResult taskResult) { + if (!config.isLogSuccessRecords() && !config.isLogRawSourceRecords()) return; + try { + writeImportTaskResultDetailToLogs(taskResult); + } catch (IOException e) { + logError("Failed to write success/failure logs", e); + } + } + + /** + * Writes the details of an import task result to the appropriate log files. Successful targets + * are written to success logs and failed targets to failure logs. + * + * @param importTaskResult the result of the import task to log + * @throws IOException if an I/O error occurs while writing to the logs + */ + private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult) + throws IOException { + for (ImportTargetResult target : importTaskResult.getTargets()) { + ImportTargetResultStatus status = target.getStatus(); + if (status.equals(ImportTargetResultStatus.SAVED) && config.isLogSuccessRecords()) { + writeLog(target, LogFileType.SUCCESS, importTaskResult.getDataChunkId()); + } else if (!status.equals(ImportTargetResultStatus.SAVED) && config.isLogRawSourceRecords()) { + writeLog(target, LogFileType.FAILURE, importTaskResult.getDataChunkId()); + } + } + } + + /** + * Serializes the given {@link ImportTargetResult} to JSON and writes it to a log file + * corresponding to the provided {@link LogFileType} and data chunk ID. + * + *

    This method ensures thread-safe access to the underlying {@link LogWriter} by synchronizing + * on the writer instance. It is safe to call concurrently from multiple threads handling the same + * or different data chunks. + * + * @param target the result of processing a single import target to be logged + * @param logFileType the type of log file to write to (e.g., SUCCESS or FAILURE) + * @param dataChunkId the ID of the data chunk associated with the log entry + * @throws IOException if writing or flushing the log fails + */ + private void writeLog(ImportTargetResult target, LogFileType logFileType, int dataChunkId) + throws IOException { + JsonNode jsonNode = OBJECT_MAPPER.valueToTree(target); + LogWriter writer = initializeLogWriterIfNeeded(logFileType, dataChunkId); + writer.write(jsonNode); + } + + /** + * Called to add or update the status of a data chunk. This implementation does nothing as the + * status is only logged when the data chunk is completed. + * + * @param status the status of the data chunk + */ + @Override + public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {} + + /** + * Called when a data chunk is completed. Logs the summary of the data chunk and closes the log + * writers for that data chunk. + * + * @param dataChunkStatus the status of the completed data chunk + */ + @Override + public void onDataChunkCompleted(ImportDataChunkStatus dataChunkStatus) { + try { + logDataChunkSummary(dataChunkStatus); + // Close the split log writers per data chunk if they exist for this data chunk id + closeLogWritersForDataChunk(dataChunkStatus.getDataChunkId()); + } catch (IOException e) { + logError("Failed to log the data chunk summary", e); + } + } + + /** Called when all data chunks are completed. Closes all remaining log writers. */ + @Override + public void onAllDataChunksCompleted() { + closeAllDataChunkLogWriters(); + } + + /** + * Logs a transaction batch result to the appropriate log file based on its success status. The + * log file is determined by the data chunk ID and whether the batch was successful. + * + * @param batchResult the transaction batch result to log + */ + @Override + protected void logTransactionBatch(ImportTransactionBatchResult batchResult) { + LogFileType logFileType = batchResult.isSuccess() ? LogFileType.SUCCESS : LogFileType.FAILURE; + try { + LogWriter logWriter = initializeLogWriterIfNeeded(logFileType, batchResult.getDataChunkId()); + JsonNode jsonNode = createFilteredTransactionBatchLogJsonNode(batchResult); + logWriter.write(jsonNode); + } catch (IOException e) { + logError("Failed to write a transaction batch record to a split mode log file", e); + } + } + + /** + * Logs an error message with an exception to the logger. + * + * @param errorMessage the error message to log + * @param exception the exception associated with the error + */ + @Override + protected void logError(String errorMessage, Exception exception) { + logger.error(errorMessage, exception); + throw new RuntimeException(errorMessage, exception); + } + + /** + * Logs the summary of a data chunk to a summary log file. + * + * @param dataChunkStatus the status of the data chunk to log + * @throws IOException if an I/O error occurs while writing to the log + */ + private void logDataChunkSummary(ImportDataChunkStatus dataChunkStatus) throws IOException { + try (LogWriter logWriter = + initializeLogWriterIfNeeded(LogFileType.SUMMARY, dataChunkStatus.getDataChunkId())) { + logWriter.write(OBJECT_MAPPER.valueToTree(dataChunkStatus)); + logWriter.flush(); + } + } + + /** + * Closes and removes the log writers for a specific data chunk. + * + * @param dataChunkId the ID of the data chunk whose log writers should be closed + */ + private void closeLogWritersForDataChunk(int dataChunkId) { + closeLogWriter(successLogWriters.remove(dataChunkId)); + closeLogWriter(failureLogWriters.remove(dataChunkId)); + closeLogWriter(summaryLogWriters.remove(dataChunkId)); + } + + /** + * Closes all log writers for all data chunks and clears the writer maps. This method is called + * when all data chunks have been completed. + */ + private void closeAllDataChunkLogWriters() { + summaryLogWriters.values().forEach(this::closeLogWriter); + successLogWriters.values().forEach(this::closeLogWriter); + failureLogWriters.values().forEach(this::closeLogWriter); + summaryLogWriters.clear(); + successLogWriters.clear(); + failureLogWriters.clear(); + } + + /** + * Constructs the log file path based on the batch ID and log file type. + * + * @param batchId the ID of the batch (data chunk) + * @param logFileType the type of log file (SUCCESS, FAILURE, or SUMMARY) + * @return the full path to the log file + */ + private String getLogFilePath(long batchId, LogFileType logFileType) { + String logfilePath; + switch (logFileType) { + case SUCCESS: + logfilePath = + config.getLogDirectoryPath() + String.format(SUCCESS_LOG_FILE_NAME_FORMAT, batchId); + break; + case FAILURE: + logfilePath = + config.getLogDirectoryPath() + String.format(FAILURE_LOG_FILE_NAME_FORMAT, batchId); + break; + case SUMMARY: + logfilePath = + config.getLogDirectoryPath() + String.format(SUMMARY_LOG_FILE_NAME_FORMAT, batchId); + break; + default: + logfilePath = ""; + } + return logfilePath; + } + + /** + * Gets or creates a log writer for the specified log file type and data chunk ID. If a log writer + * for the specified type and data chunk doesn't exist, it will be created. + * + * @param logFileType the type of log file + * @param dataChunkId the ID of the data chunk + * @return the log writer for the specified type and data chunk + * @throws IOException if an I/O error occurs while creating a new log writer + */ + private LogWriter initializeLogWriterIfNeeded(LogFileType logFileType, int dataChunkId) + throws IOException { + Map logWriters = getLogWriters(logFileType); + try { + return logWriters.computeIfAbsent( + dataChunkId, + id -> { + try { + return createLogWriter(logFileType, id); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } catch (UncheckedIOException e) { + throw e.getCause(); + } + } + + /** + * Creates a new log writer for the specified log file type and data chunk ID. + * + * @param logFileType the type of log file + * @param dataChunkId the ID of the data chunk + * @return a new log writer + * @throws IOException if an I/O error occurs while creating the log writer + */ + private LogWriter createLogWriter(LogFileType logFileType, int dataChunkId) throws IOException { + String logFilePath = getLogFilePath(dataChunkId, logFileType); + return createLogWriter(logFilePath); + } + + /** + * Gets the appropriate map of log writers for the specified log file type. + * + * @param logFileType the type of log file + * @return the map of log writers for the specified type + */ + private Map getLogWriters(LogFileType logFileType) { + Map logWriterMap; + switch (logFileType) { + case SUCCESS: + logWriterMap = successLogWriters; + break; + case FAILURE: + logWriterMap = failureLogWriters; + break; + case SUMMARY: + logWriterMap = summaryLogWriters; + break; + default: + throw new AssertionError(); + } + return logWriterMap; + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactory.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactory.java new file mode 100644 index 000000000..c5ef96714 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactory.java @@ -0,0 +1,32 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig; +import java.io.IOException; +import lombok.AllArgsConstructor; + +/** + * The default implementation of {@link LogWriterFactory} that creates {@link LocalFileLogWriter} + * instances. This factory uses the provided {@link ImportLoggerConfig} to configure the log writers + * it creates. It's annotated with Lombok's {@code @AllArgsConstructor} to automatically generate a + * constructor that initializes the configuration field. + */ +@SuppressWarnings("SameNameButDifferent") +@AllArgsConstructor +public class DefaultLogWriterFactory implements LogWriterFactory { + + private final ImportLoggerConfig importLoggerConfig; + + /** + * Creates a {@link LocalFileLogWriter} for the specified log file path. The created log writer + * will be configured using the {@link ImportLoggerConfig} that was provided to this factory + * during construction. + * + * @param logFilePath the path where the log file will be created or appended to + * @return a new {@link LogWriter} instance that writes to the specified file + * @throws IOException if an I/O error occurs while creating the log writer + */ + @Override + public LogWriter createLogWriter(String logFilePath) throws IOException { + return new LocalFileLogWriter(logFilePath, importLoggerConfig); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LocalFileLogWriter.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LocalFileLogWriter.java new file mode 100644 index 000000000..3689bd51d --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LocalFileLogWriter.java @@ -0,0 +1,86 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import javax.annotation.Nullable; + +/** + * An implementation of {@link LogWriter} that writes log entries to a local file. This class writes + * JSON records to a file as a JSON array, with each record being an element in the array. It + * handles file creation, appending, and proper JSON formatting. + */ +public class LocalFileLogWriter implements LogWriter { + private final JsonGenerator logWriter; + private final DataLoaderObjectMapper objectMapper; + + /** + * Creates an instance of LocalFileLogWriter with the specified file path and configuration. + * + * @param filePath the path where the log file will be created or appended to + * @param importLoggerConfig the configuration for the logger, including formatting options + * @throws IOException if an I/O error occurs while creating or opening the file + */ + public LocalFileLogWriter(String filePath, ImportLoggerConfig importLoggerConfig) + throws IOException { + Path path = Paths.get(filePath); + this.objectMapper = new DataLoaderObjectMapper(); + this.logWriter = + objectMapper + .getFactory() + .createGenerator( + Files.newBufferedWriter( + path, StandardOpenOption.CREATE, StandardOpenOption.APPEND)); + // Start the JSON array + if (importLoggerConfig.isPrettyPrint()) this.logWriter.useDefaultPrettyPrinter(); + this.logWriter.writeStartArray(); + this.logWriter.flush(); + } + + /** + * Writes a JSON record to the log file. If the source record is null, this method does nothing. + * The method is synchronized to ensure thread safety when writing to the file. + * + * @param sourceRecord the JSON record to write + * @throws IOException if an I/O error occurs while writing the record + */ + @Override + public synchronized void write(@Nullable JsonNode sourceRecord) throws IOException { + if (sourceRecord == null) { + return; + } + objectMapper.writeValue(logWriter, sourceRecord); + } + + /** + * Flushes any buffered data to the log file. + * + * @throws IOException if an I/O error occurs while flushing + */ + @Override + public synchronized void flush() throws IOException { + logWriter.flush(); + } + + /** + * Closes the log writer, properly ending the JSON array and releasing resources. If the writer is + * already closed, this method does nothing. + * + * @throws IOException if an I/O error occurs while closing the writer + */ + @Override + public synchronized void close() throws IOException { + if (logWriter.isClosed()) { + return; + } + logWriter.writeEndArray(); + logWriter.flush(); + logWriter.close(); + } +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogFileType.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogFileType.java new file mode 100644 index 000000000..e56a949da --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogFileType.java @@ -0,0 +1,25 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +/** + * Represents the different types of log files used in the data import process. Each type serves a + * specific purpose in tracking the import operation's results. + */ +public enum LogFileType { + /** + * Represents a log file that records successful import operations. These logs contain records + * that were successfully processed and imported. + */ + SUCCESS, + + /** + * Represents a log file that records failed import operations. These logs contain records that + * failed to be processed or imported, along with information about the failure. + */ + FAILURE, + + /** + * Represents a log file that provides a summary of the import operation. These logs contain + * aggregated statistics and overall results of the import process. + */ + SUMMARY +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriter.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriter.java new file mode 100644 index 000000000..32838f321 --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriter.java @@ -0,0 +1,37 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; + +/** + * An interface for writing log entries to a destination. This interface extends {@link + * AutoCloseable} to ensure proper resource cleanup. Implementations of this interface handle the + * details of writing log entries to various destinations such as files, databases, or other storage + * systems. + */ +public interface LogWriter extends AutoCloseable { + + /** + * Writes a JSON record to the log. + * + * @param sourceRecord the JSON record to write + * @throws IOException if an I/O error occurs while writing the record + */ + void write(JsonNode sourceRecord) throws IOException; + + /** + * Flushes any buffered data to the underlying storage. + * + * @throws IOException if an I/O error occurs while flushing + */ + void flush() throws IOException; + + /** + * Closes this log writer and releases any system resources associated with it. This method should + * be called when the log writer is no longer needed to ensure proper cleanup of resources. + * + * @throws IOException if an I/O error occurs while closing the log writer + */ + @Override + void close() throws IOException; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactory.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactory.java new file mode 100644 index 000000000..3854d728c --- /dev/null +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/writer/LogWriterFactory.java @@ -0,0 +1,20 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import java.io.IOException; + +/** + * A factory interface for creating {@link LogWriter} instances. This interface abstracts the + * creation of log writers, allowing different implementations to create different types of log + * writers based on the application's needs. + */ +public interface LogWriterFactory { + + /** + * Creates a new log writer for the specified log file path. + * + * @param logFilePath the path where the log file will be created or appended to + * @return a new {@link LogWriter} instance + * @throws IOException if an I/O error occurs while creating the log writer + */ + LogWriter createLogWriter(String logFilePath) throws IOException; +} diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java index 1a317a1a8..81514e61a 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java @@ -37,11 +37,12 @@ * supports both transactional and non-transactional (storage) modes and provides event notification * capabilities for monitoring the import process. */ +@SuppressWarnings("SameNameButDifferent") @RequiredArgsConstructor public abstract class ImportProcessor { final ImportProcessorParams params; - private static final Logger LOGGER = LoggerFactory.getLogger(ImportProcessor.class); + private static final Logger logger = LoggerFactory.getLogger(ImportProcessor.class); private final List listeners = new ArrayList<>(); /** @@ -232,13 +233,13 @@ private ImportTransactionBatchResult processTransactionBatch( } catch (TransactionException e) { isSuccess = false; - LOGGER.error(e.getMessage()); + logger.error(e.getMessage()); try { if (transaction != null) { transaction.abort(); // Ensure transaction is aborted } } catch (TransactionException abortException) { - LOGGER.error( + logger.error( "Failed to abort transaction: {}", abortException.getMessage(), abortException); } error = e.getMessage(); @@ -446,7 +447,7 @@ private void waitForFuturesToComplete(List> futures) { try { future.get(); } catch (Exception e) { - LOGGER.error(e.getMessage()); + logger.error(e.getMessage()); } } } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java index 36b96f62d..b09a27cf0 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorParams.java @@ -17,6 +17,7 @@ *

    This class is immutable and uses the Builder pattern for construction. It encapsulates all * required parameters and dependencies for processing data imports in ScalarDB. */ +@SuppressWarnings("SameNameButDifferent") @Builder @Value public class ImportProcessorParams { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java index 3be177a00..5187225cc 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTask.java @@ -38,6 +38,7 @@ * functionality to import data into single or multiple tables based on the provided import options * and control file configurations. */ +@SuppressWarnings({"SameNameButDifferent"}) @RequiredArgsConstructor public abstract class ImportTask { @@ -148,7 +149,8 @@ private List startMultiTableImportProcess( copyNode); targetResults.add(result); } - return targetResults; + // Wrapped in unmodifiable list to fix MixedMutabilityReturnType error-prone warning + return Collections.unmodifiableList(targetResults); } /** diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskParams.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskParams.java index eafe3a42a..f68a526b8 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskParams.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTaskParams.java @@ -14,6 +14,7 @@ * Parameters required for executing an import task in the data loader. This class encapsulates all * necessary information needed to process and import a single record into ScalarDB. */ +@SuppressWarnings("SameNameButDifferent") @Builder @Value public class ImportTaskParams { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTransactionalTask.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTransactionalTask.java index 449270d92..a5b764890 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTransactionalTask.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/ImportTransactionalTask.java @@ -3,8 +3,6 @@ import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.Result; import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException; -import com.scalar.db.exception.transaction.AbortException; -import com.scalar.db.exception.transaction.TransactionException; import com.scalar.db.io.Column; import com.scalar.db.io.Key; import java.util.List; @@ -83,24 +81,4 @@ protected void saveRecord( throws ScalarDBDaoException { params.getDao().put(namespace, tableName, partitionKey, clusteringKey, columns, transaction); } - - /** - * Aborts the active ScalarDB transaction if it has not been committed. - * - *

    This method provides a safe way to abort an active transaction, handling any abort-related - * exceptions by wrapping them in a {@link TransactionException}. - * - * @param tx the transaction to be aborted. If null, this method does nothing - * @throws TransactionException if an error occurs during the abort operation or if the underlying - * abort operation fails - */ - private void abortActiveTransaction(DistributedTransaction tx) throws TransactionException { - if (tx != null) { - try { - tx.abort(); - } catch (AbortException e) { - throw new TransactionException(e.getMessage(), tx.getId()); - } - } - } } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/result/ImportTargetResult.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/result/ImportTargetResult.java index 0fe4e0379..55a5e2ba9 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/result/ImportTargetResult.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/result/ImportTargetResult.java @@ -6,6 +6,8 @@ import lombok.Builder; import lombok.Value; +/** To store import target result. */ +@SuppressWarnings({"SameNameButDifferent", "MissingSummary"}) @Builder @Value public class ImportTargetResult { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/result/ImportTaskResult.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/result/ImportTaskResult.java index 3e08cc709..0d4dba0c8 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/result/ImportTaskResult.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/result/ImportTaskResult.java @@ -7,6 +7,7 @@ import lombok.Builder; import lombok.Value; +@SuppressWarnings({"SameNameButDifferent", "MissingSummary"}) @Builder @Value @JsonDeserialize(builder = ImportTaskResult.ImportTaskResultBuilder.class) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidationResult.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidationResult.java index 30b878b9e..5c299f9a6 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidationResult.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidationResult.java @@ -31,17 +31,29 @@ public void addErrorMessage(String columnName, String errorMessage) { this.errorMessages.add(errorMessage); } - /** @return Immutable list of validation error messages */ + /** + * Return error messages list. + * + * @return Immutable list of validation error messages. + */ public List getErrorMessages() { return Collections.unmodifiableList(this.errorMessages); } - /** @return Immutable set of columns that had errors */ + /** + * A set of columns with errors is stored and returned. + * + * @return Immutable set of columns that had errors. + */ public Set getColumnsWithErrors() { return Collections.unmodifiableSet(this.columnsWithErrors); } - /** @return Validation is valid or not */ + /** + * Stores validation result. + * + * @return Validation is valid or not. + */ public boolean isValid() { return this.errorMessages.isEmpty(); } diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidator.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidator.java index 6d773ffcc..a04683017 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidator.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/task/validation/ImportSourceRecordValidator.java @@ -9,6 +9,7 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; +@SuppressWarnings("SameNameButDifferent") @NoArgsConstructor(access = AccessLevel.PRIVATE) public class ImportSourceRecordValidator { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatch.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatch.java index a922fd8af..6fef97c56 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatch.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatch.java @@ -6,6 +6,7 @@ import lombok.Value; /** Transaction batch details */ +@SuppressWarnings({"SameNameButDifferent", "MissingSummary"}) @Builder @Value public class ImportTransactionBatch { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatchResult.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatchResult.java index 0e44b6695..887b9a392 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatchResult.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatchResult.java @@ -8,6 +8,7 @@ import lombok.Value; /** Transaction batch result */ +@SuppressWarnings({"SameNameButDifferent", "MissingSummary"}) @Builder @Value @JsonDeserialize(builder = ImportTransactionBatchResult.ImportTransactionBatchResultBuilder.class) diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatchStatus.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatchStatus.java index 1b7bae34c..42d37eb64 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatchStatus.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/transactionbatch/ImportTransactionBatchStatus.java @@ -5,7 +5,8 @@ import lombok.Builder; import lombok.Value; -/** Batch status details */ +/** Batch status details. */ +@SuppressWarnings({"SameNameButDifferent", "MissingSummary"}) @Builder @Value public class ImportTransactionBatchStatus { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataRequest.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataRequest.java index 8e79da3d6..3b730a081 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataRequest.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataRequest.java @@ -3,6 +3,7 @@ import lombok.Getter; /** Represents the request for metadata for a single ScalarDB table */ +@SuppressWarnings("SameNameButDifferent") @Getter public class TableMetadataRequest { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataService.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataService.java index 881694580..f91435fe5 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataService.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/tablemetadata/TableMetadataService.java @@ -14,6 +14,7 @@ * Service for retrieving {@link TableMetadata} from ScalarDB. Provides methods to fetch metadata * for individual tables or a collection of tables. */ +@SuppressWarnings("SameNameButDifferent") @RequiredArgsConstructor public class TableMetadataService { diff --git a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/TableMetadataUtil.java b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/TableMetadataUtil.java index ddc15a1e5..7cd1834d8 100644 --- a/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/TableMetadataUtil.java +++ b/data-loader/core/src/main/java/com/scalar/db/dataloader/core/util/TableMetadataUtil.java @@ -11,6 +11,7 @@ import lombok.NoArgsConstructor; /** Utility class for handling ScalarDB table metadata operations. */ +@SuppressWarnings("SameNameButDifferent") @NoArgsConstructor(access = AccessLevel.PRIVATE) public class TableMetadataUtil { diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java new file mode 100644 index 000000000..d03f04fbb --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SingleFileImportLoggerTest.java @@ -0,0 +1,265 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.writer.DefaultLogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class SingleFileImportLoggerTest { + + private static final Logger logger = LoggerFactory.getLogger(SingleFileImportLoggerTest.class); + private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); + + @TempDir Path tempDir; + + private LogWriterFactory logWriterFactory; + + @BeforeEach + void setUp() { + ImportLoggerConfig importLoggerConfig = + ImportLoggerConfig.builder() + .prettyPrint(false) + .isLogSuccessRecords(false) + .isLogRawSourceRecords(false) + .logDirectoryPath("path") + .build(); + logWriterFactory = new DefaultLogWriterFactory(importLoggerConfig); + } + + @AfterEach + void tearDown() throws IOException { + cleanUpTempDir(); + } + + private void cleanUpTempDir() throws IOException { + try (Stream paths = Files.list(tempDir)) { + paths.forEach(this::deleteFile); + } + } + + private void deleteFile(Path file) { + try { + Files.deleteIfExists(file); + } catch (IOException e) { + logger.error("Failed to delete file: {}", file, e); + } + } + + @Test + void onTransactionBatchCompleted_NoErrors_ShouldWriteToSuccessLogFile() throws IOException { + testTransactionBatchCompleted(true, true); + } + + @Test + void onTransactionBatchCompleted_HasErrors_ShouldWriteToFailureLogFile() throws IOException { + testTransactionBatchCompleted(false, true); + } + + private void testTransactionBatchCompleted(boolean success, boolean logSuccessRecords) + throws IOException { + // Arrange + ImportLoggerConfig config = + ImportLoggerConfig.builder() + .logDirectoryPath(tempDir.toString() + "/") + .isLogRawSourceRecords(true) + .isLogSuccessRecords(logSuccessRecords) + .build(); + SingleFileImportLogger importLogger = new SingleFileImportLogger(config, logWriterFactory); + + List batchResults = createBatchResults(1, success); + + // Act + for (ImportTransactionBatchResult batchResult : batchResults) { + importLogger.onTransactionBatchCompleted(batchResult); + importLogger.onDataChunkCompleted( + ImportDataChunkStatus.builder().dataChunkId(batchResult.getDataChunkId()).build()); + } + importLogger.onAllDataChunksCompleted(); + + // Assert + assertTransactionBatchResults(batchResults, success, logSuccessRecords); + } + + private List createBatchResults(int count, boolean success) { + List batchResults = new ArrayList<>(); + + for (int i = 1; i <= count; i++) { + List records = + Collections.singletonList( + ImportTaskResult.builder() + .rowNumber(i) + .rawRecord(OBJECT_MAPPER.createObjectNode()) + .targets(Collections.EMPTY_LIST) + .build()); + ImportTransactionBatchResult result = + ImportTransactionBatchResult.builder() + .dataChunkId(i) + .transactionBatchId(1) + .records(records) + .success(success) + .build(); + batchResults.add(result); + } + + return batchResults; + } + + private void assertTransactionBatchResults( + List batchResults, boolean success, boolean logSuccessRecords) + throws IOException { + DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); + + // Single file log mode + Path logFileName = + tempDir.resolve( + success + ? SingleFileImportLogger.SUCCESS_LOG_FILE_NAME + : SingleFileImportLogger.FAILURE_LOG_FILE_NAME); + if (logSuccessRecords || !success) { + assertTrue(Files.exists(logFileName), "Log file should exist"); + + String logContent = new String(Files.readAllBytes(logFileName), StandardCharsets.UTF_8); + + List logEntries = + objectMapper.readValue( + logContent, new TypeReference>() {}); + + assertEquals( + batchResults.size(), + logEntries.size(), + "Number of log entries should match the number of batch results"); + + for (int i = 0; i < batchResults.size(); i++) { + assertTransactionBatchResult(batchResults.get(i), logEntries.get(i)); + } + } else { + assertFalse(Files.exists(logFileName), "Log file should not exist"); + } + } + + private void assertTransactionBatchResult( + ImportTransactionBatchResult expected, ImportTransactionBatchResult actual) { + assertEquals(expected.getDataChunkId(), actual.getDataChunkId(), "Data chunk ID should match"); + assertEquals( + expected.getTransactionBatchId(), + actual.getTransactionBatchId(), + "Transaction batch ID should match"); + assertEquals( + expected.getTransactionId(), actual.getTransactionId(), "Transaction ID should match"); + assertEquals(expected.isSuccess(), actual.isSuccess(), "Success status should match"); + + List expectedRecords = expected.getRecords(); + List actualRecords = actual.getRecords(); + assertEquals(expectedRecords.size(), actualRecords.size(), "Number of records should match"); + for (int j = 0; j < expectedRecords.size(); j++) { + ImportTaskResult expectedRecord = expectedRecords.get(j); + ImportTaskResult actualRecord = actualRecords.get(j); + assertEquals( + expectedRecord.getRowNumber(), actualRecord.getRowNumber(), "Row number should match"); + assertEquals( + expectedRecord.getRawRecord(), actualRecord.getRawRecord(), "Raw record should match"); + assertEquals(expectedRecord.getTargets(), actualRecord.getTargets(), "Targets should match"); + } + } + + @Test + void onDataChunkCompleted_NoErrors_ShouldWriteToSummaryLogFile() throws IOException { + testDataChunkCompleted(false); + } + + @Test + void onDataChunkCompleted_HasErrors_ShouldWriteToSummaryLogFile() throws IOException { + testDataChunkCompleted(true); + } + + private void testDataChunkCompleted(boolean hasErrors) throws IOException { + ImportLoggerConfig config = + ImportLoggerConfig.builder() + .logDirectoryPath(tempDir.toString() + "/") + .isLogRawSourceRecords(true) + .isLogSuccessRecords(true) + .build(); + SingleFileImportLogger importLogger = new SingleFileImportLogger(config, logWriterFactory); + + List dataChunkStatuses = + Stream.of(1, 2) + .map(id -> createDataChunkStatus(id, hasErrors)) + .collect(Collectors.toList()); + + dataChunkStatuses.forEach(importLogger::onDataChunkCompleted); + importLogger.onAllDataChunksCompleted(); + + assertDataChunkStatusLog(SingleFileImportLogger.SUMMARY_LOG_FILE_NAME, dataChunkStatuses); + } + + private ImportDataChunkStatus createDataChunkStatus(int dataChunkId, boolean hasErrors) { + return ImportDataChunkStatus.builder() + .dataChunkId(dataChunkId) + .startTime(Instant.now()) + .endTime(Instant.now()) + .totalRecords(100) + .successCount(hasErrors ? 90 : 100) + .failureCount(hasErrors ? 10 : 0) + .batchCount(5) + .totalDurationInMilliSeconds(1000) + .build(); + } + + private void assertDataChunkStatusLog( + String logFilePattern, List dataChunkStatuses) throws IOException { + assertSingleFileLog(tempDir, logFilePattern, dataChunkStatuses); + } + + private void assertSingleFileLog( + Path tempDir, String logFileName, List dataChunkStatuses) + throws IOException { + Path summaryLogFile = tempDir.resolve(logFileName); + assertTrue(Files.exists(summaryLogFile)); + + String logContent = new String(Files.readAllBytes(summaryLogFile), StandardCharsets.UTF_8); + DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); + List logEntries = + objectMapper.readValue(logContent, new TypeReference>() {}); + + assertEquals(dataChunkStatuses.size(), logEntries.size()); + for (int i = 0; i < dataChunkStatuses.size(); i++) { + assertDataChunkStatusEquals(dataChunkStatuses.get(i), logEntries.get(i)); + } + } + + private void assertDataChunkStatusEquals( + ImportDataChunkStatus expected, ImportDataChunkStatus actual) { + assertEquals(expected.getDataChunkId(), actual.getDataChunkId()); + assertEquals(expected.getStartTime(), actual.getStartTime()); + assertEquals(expected.getEndTime(), actual.getEndTime()); + assertEquals(expected.getTotalRecords(), actual.getTotalRecords()); + assertEquals(expected.getSuccessCount(), actual.getSuccessCount()); + assertEquals(expected.getFailureCount(), actual.getFailureCount()); + assertEquals(expected.getBatchCount(), actual.getBatchCount()); + assertEquals( + expected.getTotalDurationInMilliSeconds(), actual.getTotalDurationInMilliSeconds()); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLoggerTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLoggerTest.java new file mode 100644 index 000000000..de7ee49be --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/SplitByDataChunkImportLoggerTest.java @@ -0,0 +1,238 @@ +package com.scalar.db.dataloader.core.dataimport.log; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.scalar.db.dataloader.core.DataLoaderObjectMapper; +import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; +import com.scalar.db.dataloader.core.dataimport.log.writer.DefaultLogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; +import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; +import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class SplitByDataChunkImportLoggerTest { + + private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); + + @TempDir Path tempDir; + + private LogWriterFactory logWriterFactory; + + @BeforeEach + void setUp() { + ImportLoggerConfig importLoggerConfig = + ImportLoggerConfig.builder() + .prettyPrint(false) + .isLogSuccessRecords(false) + .isLogRawSourceRecords(false) + .logDirectoryPath("path") + .build(); + logWriterFactory = new DefaultLogWriterFactory(importLoggerConfig); + } + + @Test + void onTransactionBatchCompleted_NoErrors_ShouldWriteToDataChunkSuccessFiles() + throws IOException { + testTransactionBatchCompleted(true, true); + } + + @Test + void onTransactionBatchCompleted_HasErrors_ShouldWriteToDataChunkFailureFiles() + throws IOException { + testTransactionBatchCompleted(false, true); + } + + @Test + void onTransactionBatchCompleted_NoErrorsAndNoSuccessFileLogging_ShouldNotWriteToSuccessFiles() + throws IOException { + testTransactionBatchCompleted(true, false); + } + + private void testTransactionBatchCompleted(boolean success, boolean logSuccessRecords) + throws IOException { + // Arrange + ImportLoggerConfig config = + ImportLoggerConfig.builder() + .logDirectoryPath(tempDir.toString() + "/") + .isLogRawSourceRecords(true) + .isLogSuccessRecords(logSuccessRecords) + .build(); + SplitByDataChunkImportLogger importLogger = + new SplitByDataChunkImportLogger(config, logWriterFactory); + + List batchResults = new ArrayList<>(); + + for (int i = 1; i <= 3; i++) { + List records = + Collections.singletonList( + ImportTaskResult.builder() + .rowNumber(i) + .targets(Collections.EMPTY_LIST) + .rawRecord(OBJECT_MAPPER.createObjectNode()) + .build()); + ImportTransactionBatchResult result = + ImportTransactionBatchResult.builder() + .dataChunkId(i) + .transactionBatchId(1) + .records(records) + .success(success) + .build(); + batchResults.add(result); + } + + // Act + for (ImportTransactionBatchResult batchResult : batchResults) { + importLogger.onTransactionBatchCompleted(batchResult); + importLogger.onDataChunkCompleted( + ImportDataChunkStatus.builder().dataChunkId(batchResult.getDataChunkId()).build()); + } + importLogger.onAllDataChunksCompleted(); + + // Assert + for (int i = 0; i < batchResults.size(); i++) { + ImportTransactionBatchResult batchResult = batchResults.get(i); + String logFileNameFormat = + success + ? SplitByDataChunkImportLogger.SUCCESS_LOG_FILE_NAME_FORMAT + : SplitByDataChunkImportLogger.FAILURE_LOG_FILE_NAME_FORMAT; + Path dataChunkLogFileName = tempDir.resolve(String.format(logFileNameFormat, i + 1)); + + if (success && logSuccessRecords) { + assertTrue(Files.exists(dataChunkLogFileName), "Data chunk success log file should exist"); + assertTransactionBatchResult(batchResult, dataChunkLogFileName); + } else if (!success) { + assertTrue(Files.exists(dataChunkLogFileName), "Data chunk failure log file should exist"); + assertTransactionBatchResult(batchResult, dataChunkLogFileName); + } else { + assertFalse( + Files.exists(dataChunkLogFileName), "Data chunk success log file should not exist"); + } + } + } + + private void assertTransactionBatchResult( + ImportTransactionBatchResult expected, Path dataChunkLogFileName) throws IOException { + // String logContent = Files.readString(dataChunkLogFileName); + String logContent = + new String(Files.readAllBytes(dataChunkLogFileName), StandardCharsets.UTF_8); + DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); + List logEntries = + objectMapper.readValue( + logContent, new TypeReference>() {}); + ImportTransactionBatchResult actual = logEntries.get(0); + + assertEquals(expected.getDataChunkId(), actual.getDataChunkId(), "Data chunk ID should match"); + assertEquals( + expected.getTransactionBatchId(), + actual.getTransactionBatchId(), + "Transaction batch ID should match"); + assertEquals( + expected.getTransactionId(), actual.getTransactionId(), "Transaction ID should match"); + assertEquals(expected.isSuccess(), actual.isSuccess(), "Success status should match"); + + List expectedRecords = expected.getRecords(); + List actualRecords = actual.getRecords(); + assertEquals(expectedRecords.size(), actualRecords.size(), "Number of records should match"); + for (int j = 0; j < expectedRecords.size(); j++) { + ImportTaskResult expectedRecord = expectedRecords.get(j); + ImportTaskResult actualRecord = actualRecords.get(j); + assertEquals( + expectedRecord.getRowNumber(), actualRecord.getRowNumber(), "Row number should match"); + assertEquals( + expectedRecord.getRawRecord(), actualRecord.getRawRecord(), "Raw record should match"); + assertEquals(expectedRecord.getTargets(), actualRecord.getTargets(), "Targets should match"); + } + } + + @Test + void onDataChunkCompleted_NoErrors_ShouldWriteToSummaryLogFile() throws IOException { + testDataChunkCompleted( + String.format(SplitByDataChunkImportLogger.SUMMARY_LOG_FILE_NAME_FORMAT, "%d"), false); + } + + @Test + void onDataChunkCompleted_HasErrors_ShouldWriteToSummaryLogFile() throws IOException { + testDataChunkCompleted( + String.format(SplitByDataChunkImportLogger.SUMMARY_LOG_FILE_NAME_FORMAT, "%d"), true); + } + + private void testDataChunkCompleted(String logFilePattern, boolean hasErrors) throws IOException { + ImportLoggerConfig config = + ImportLoggerConfig.builder() + .logDirectoryPath(tempDir.toString() + "/") + .isLogRawSourceRecords(true) + .isLogSuccessRecords(true) + .build(); + SplitByDataChunkImportLogger importLogger = + new SplitByDataChunkImportLogger(config, logWriterFactory); + + List dataChunkStatuses = + IntStream.rangeClosed(1, 2) + .mapToObj(id -> createDataChunkStatus(id, hasErrors)) + .collect(Collectors.toList()); + + dataChunkStatuses.forEach(importLogger::onDataChunkCompleted); + importLogger.onAllDataChunksCompleted(); + + assertDataChunkStatusLog(logFilePattern, dataChunkStatuses); + } + + private ImportDataChunkStatus createDataChunkStatus(int dataChunkId, boolean hasErrors) { + return ImportDataChunkStatus.builder() + .dataChunkId(dataChunkId) + .startTime(Instant.now()) + .endTime(Instant.now()) + .totalRecords(100) + .successCount(hasErrors ? 90 : 100) + .failureCount(hasErrors ? 10 : 0) + .batchCount(5) + .totalDurationInMilliSeconds(1000) + .build(); + } + + private void assertDataChunkStatusLog( + String logFilePattern, List dataChunkStatuses) throws IOException { + for (ImportDataChunkStatus dataChunkStatus : dataChunkStatuses) { + String logFileName = String.format(logFilePattern, dataChunkStatus.getDataChunkId()); + Path dataChunkLogFile = tempDir.resolve(logFileName); + assertTrue(Files.exists(dataChunkLogFile), "Data chunk summary log file should exist"); + + // String logContent = Files.readString(dataChunkLogFile); + String logContent = new String(Files.readAllBytes(dataChunkLogFile), StandardCharsets.UTF_8); + DataLoaderObjectMapper objectMapper = new DataLoaderObjectMapper(); + List logEntries = + objectMapper.readValue(logContent, new TypeReference>() {}); + + assertEquals(1, logEntries.size()); + assertDataChunkStatusEquals(dataChunkStatus, logEntries.get(0)); + } + } + + private void assertDataChunkStatusEquals( + ImportDataChunkStatus expected, ImportDataChunkStatus actual) { + assertEquals(expected.getDataChunkId(), actual.getDataChunkId()); + assertEquals(expected.getStartTime(), actual.getStartTime()); + assertEquals(expected.getEndTime(), actual.getEndTime()); + assertEquals(expected.getTotalRecords(), actual.getTotalRecords()); + assertEquals(expected.getSuccessCount(), actual.getSuccessCount()); + assertEquals(expected.getFailureCount(), actual.getFailureCount()); + assertEquals(expected.getBatchCount(), actual.getBatchCount()); + assertEquals( + expected.getTotalDurationInMilliSeconds(), actual.getTotalDurationInMilliSeconds()); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactoryTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactoryTest.java new file mode 100644 index 000000000..28c31e5c0 --- /dev/null +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/log/writer/DefaultLogWriterFactoryTest.java @@ -0,0 +1,39 @@ +package com.scalar.db.dataloader.core.dataimport.log.writer; + +import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig; +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class DefaultLogWriterFactoryTest { + + String filePath = Paths.get("").toAbsolutePath() + "/sample.log"; + DefaultLogWriterFactory defaultLogWriterFactory; + + @AfterEach + void removeFileIfCreated() { + File file = new File(filePath); + if (file.exists()) { + file.deleteOnExit(); + } + } + + @Test + void createLogWriter_withValidLocalLogFilePath_shouldReturnLocalFileLogWriterObject() + throws IOException { + defaultLogWriterFactory = + new DefaultLogWriterFactory( + ImportLoggerConfig.builder() + .prettyPrint(false) + .isLogSuccessRecords(false) + .isLogRawSourceRecords(false) + .logDirectoryPath("path") + .build()); + LogWriter logWriter = defaultLogWriterFactory.createLogWriter(filePath); + Assertions.assertEquals(LocalFileLogWriter.class, logWriter.getClass()); + logWriter.close(); + } +} diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java index 94acd20ac..9ff35d4f1 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/CsvImportProcessorTest.java @@ -1,5 +1,6 @@ package com.scalar.db.dataloader.core.dataimport.processor; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -95,7 +96,7 @@ void test_importProcessWithStorage() { csvImportProcessor = new CsvImportProcessor(params); Map statusList = csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader()); - assert statusList != null; + assertThat(statusList).isNotNull(); Assertions.assertEquals(1, statusList.size()); } @@ -115,7 +116,7 @@ void test_importProcessWithTransaction() { csvImportProcessor = new CsvImportProcessor(params); Map statusList = csvImportProcessor.process(5, 1, UnitTestUtils.getCsvReader()); - assert statusList != null; + assertThat(statusList).isNotNull(); Assertions.assertEquals(1, statusList.size()); } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java index aa9a106a0..4cc9db655 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonImportProcessorTest.java @@ -1,5 +1,6 @@ package com.scalar.db.dataloader.core.dataimport.processor; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -95,7 +96,7 @@ void test_importProcessWithStorage() { jsonImportProcessor = new JsonImportProcessor(params); Map statusList = jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader()); - assert statusList != null; + assertThat(statusList).isNotNull(); Assertions.assertEquals(1, statusList.size()); } @@ -115,7 +116,7 @@ void test_importProcessWithTransaction() { jsonImportProcessor = new JsonImportProcessor(params); Map statusList = jsonImportProcessor.process(5, 1, UnitTestUtils.getJsonReader()); - assert statusList != null; + assertThat(statusList).isNotNull(); Assertions.assertEquals(1, statusList.size()); } } diff --git a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java index e3db39175..b2ca39007 100644 --- a/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java +++ b/data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/JsonLinesImportProcessorTest.java @@ -1,5 +1,6 @@ package com.scalar.db.dataloader.core.dataimport.processor; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -95,7 +96,7 @@ void test_importProcessWithStorage() { jsonLinesImportProcessor = new JsonLinesImportProcessor(params); Map statusList = jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader()); - assert statusList != null; + assertThat(statusList).isNotNull(); Assertions.assertEquals(1, statusList.size()); } @@ -115,7 +116,7 @@ void test_importProcessWithTransaction() { jsonLinesImportProcessor = new JsonLinesImportProcessor(params); Map statusList = jsonLinesImportProcessor.process(5, 1, UnitTestUtils.getJsonLinesReader()); - assert statusList != null; + assertThat(statusList).isNotNull(); Assertions.assertEquals(1, statusList.size()); } } diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 23254eb3a..bab1669d8 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -37,7 +37,7 @@ - +