/* * * Copyright (c) 2020 OCLC, Inc. All Rights Reserved. * * OCLC proprietary information: the enclosed materials contain * * proprietary information of OCLC Online Computer Library Center, * * Inc. and shall not be disclosed in whole or in any part to any * * third party or used by any person for any purpose, without written * * consent of OCLC, Inc. Duplication of any portion of these * * materials shall include this notice. * */ package org.oclc.googlelinks.spark.job; import lombok.extern.slf4j.Slf4j; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.oclc.googlelinks.conf.AppProperties; import org.oclc.googlelinks.model.BookData; import org.oclc.googlelinks.model.InstitutionStats; import org.oclc.googlelinks.model.SummaryStats; import org.oclc.googlelinks.spark.BookDataFiles; import org.oclc.googlelinks.spark.SparkJob; import org.oclc.googlelinks.spark.export.ExportBookMapper; import org.oclc.googlelinks.spark.export.ExportUtils; import org.oclc.googlelinks.spark.export.model.ExplodedBookData; import org.oclc.googlelinks.spark.stats.StatsUtils; import org.oclc.googlelinks.spark.stats.model.StatsData; import org.oclc.googlelinks.spark.utils.JsonFileWriter; import org.oclc.googlelinks.spark.utils.TextFileWriter; import org.oclc.googlelinks.spark.validation.ExtractValidBookData; import org.oclc.googlelinks.validator.model.RegIdSuccessRatio; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.ParametersAreNonnullByDefault; import java.util.concurrent.CompletableFuture; import static org.apache.spark.sql.functions.col; /** * * Spark driver class for {@link org.oclc.googlelinks.job.ExportJob}. * */ @Component("export") @Slf4j @ParametersAreNonnullByDefault public class ExportSpark implements SparkJob { private final BookDataFiles bookDataFiles; private final TextFileWriter bookFeedWriter; private final ExportBookMapper exportBookMapper; private final ExportUtils exportUtils; private final JsonFileWriter jsonFileWriter; private final AppProperties props; private final StatsUtils statsUtils; private final ExtractValidBookData validBookData; @Autowired public ExportSpark( final BookDataFiles bookDataFiles, final TextFileWriter bookFeedWriter, final ExportBookMapper exportBookMapper, final ExportUtils exportUtils, final JsonFileWriter jsonFileWriter, final AppProperties props, final StatsUtils statsUtils, final ExtractValidBookData validBookData ) { this.bookDataFiles = bookDataFiles; this.bookFeedWriter = bookFeedWriter; this.exportBookMapper = exportBookMapper; this.exportUtils = exportUtils; this.jsonFileWriter = jsonFileWriter; this.props = props; this.statsUtils = statsUtils; this.validBookData = validBookData; } @Override public void run(final String... args) { LOG.info("Running ExportSpark..."); final Dataset fullBooks = bookDataFiles.read(props.getFs().getBookData()); final Dataset books = exportUtils.explodeBookLinks(fullBooks) .cache(); final Dataset linkStats = loadRegID(); final Dataset validRegId = linkStats .where(col(RegIdSuccessRatio.Fields.successRatio).geq(props.getSuccessThreshold())) .select(linkStats.col(RegIdSuccessRatio.Fields.regId).as("validRegid")) .as(Encoders.INT()); final StatsData statsData = statsUtils.createStats(books, validRegId, linkStats); CompletableFuture.allOf( CompletableFuture.runAsync(() -> writeBookFeed(fullBooks, validRegId)), CompletableFuture.runAsync(() -> writeSummaryStats(statsData.getSummaryStats())), CompletableFuture.runAsync(() -> writeInstitutionStats(statsData.getInstitutionStats())) ).join(); LOG.info("ExportSpark finished."); } private Dataset loadRegID() { return bookDataFiles.readRegIdList(props.getFs().getRegidList()); } private void writeBookFeed( final Dataset fullBookData, final Dataset validRegId ) { Dataset wiseData = fullBookData.where(col(BookData.Fields.oclcNum).equalTo(0)) .as(Encoders.bean(BookData.class)); final Dataset nonWiseData = exportUtils.explodeBookLinks( fullBookData.where(col(BookData.Fields.oclcNum).notEqual(0)) .as(Encoders.bean(BookData.class))) .cache(); final Dataset outputJson = validBookData.getValidBookData(nonWiseData, validRegId).unionByName(wiseData) .mapPartitions(exportBookMapper, Encoders.STRING()); bookFeedWriter.write(outputJson, props.getFs().getGoogleBookFeed()); } private void writeInstitutionStats(final Dataset stats) { // Combine to single file and sort - defines order in stats email // final Dataset output = stats.repartition(1) // .sortWithinPartitions(InstitutionStats.Fields.symbol, InstitutionStats.Fields.regId); // jsonFileWriter.write(output, props.getFs().getStatsInstitution()); // } // // private void writeSummaryStats(final Dataset stats) { // // Expecting only a single item in dataset // jsonFileWriter.write(stats, props.getFs().getStatsSummary()); // } // // } //