专注收集记录技术开发学习笔记、技术难点、解决方案
网站信息搜索 >> 请输入关键词:
您当前的位置: 首页 > XML/SOAP

Spark Solr(二)Persist Data to XML

发布时间:2010-05-20 14:01:29 文章来源:www.iduyao.cn 采编人员:星星草
Spark Solr(2)Persist Data to XML
Spark Solr(2)Persist Data to XML

Differences between RDD and DataFrame
RDD[Person] - Person, Person, Person
DataFrame - Person[Name, Age, Height], Person[Name, Age, Height]

RDD is collection of Person, Dataframe is a collection of Row.
Dataset VS DataFrame  df.as[ElementType], ds.toDF()

I tried to use spark-xml, but it seems not work.

Then I easily tried with a XMLStreamWriter, the Util class is as follow, XMLUtil.java
package com.sillycat.sparkjava.app;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.List;

import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;

import com.sun.xml.txw2.output.IndentingXMLStreamWriter;

public class XMLUtil
{

  public static void xmlStreamWriter( String filePath, List<Job> items )
  {

    XMLStreamWriter writer = null;
    try ( OutputStream os =
      Files.newOutputStream( Paths.get( filePath ), StandardOpenOption.CREATE ) )
    {
      XMLOutputFactory outputFactory = XMLOutputFactory.newInstance();
      writer = new IndentingXMLStreamWriter(
        outputFactory.createXMLStreamWriter( os, "utf-8" ) );
      writer.writeStartDocument( "utf-8", "1.0" );
      writer.writeStartElement( "jobs" );
      for ( Job item : items )
      {
        writer.writeStartElement( "job" );

        writer.writeStartElement( "id" );
        writer.writeCharacters( item.getId() );
        writer.writeEndElement();

        writer.writeStartElement( "title" );
        writer.writeCData( item.getTitle() );
        writer.writeEndElement();

        writer.writeStartElement( "price" );
        writer.writeCharacters( item.getPrice().toBigInteger().toString() );
        writer.writeEndElement();

        writer.writeEndElement();

      }
      writer.writeEndElement();
      writer.writeEndDocument();
    }
    catch ( IOException | XMLStreamException e )
    {
      e.printStackTrace();
    }
    finally
    {
      if ( writer != null )
      {
        try
        {
          writer.close();
        }
        catch ( XMLStreamException e )
        {
          e.printStackTrace();
        }
      }
    }
  }

}


Some more implementation and refactor may needed there, but right now, I focus is to make the processing work first.

The who process will be, spark load jobs from solr cloud—> 1000 jobs a batch——> Do filter in java —> Do filter in Spark SQL —> collect the results —> Write to XML

package com.sillycat.sparkjava.app;

import java.math.BigDecimal;
import java.util.List;

importorg.apache.solr.common.SolrDocument;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import com.lucidworks.spark.rdd.SolrJavaRDD;
import com.sillycat.sparkjava.base.SparkBaseApp;

public class SeniorJavaFeedToXMLApp extends SparkBaseApp
{

  private static final long serialVersionUID = 8364133452168714109L;

  @Override
  protected String getAppName()
  {
    return "SeniorJavaFeedToXMLApp";
  }

  @Override
  public void executeTask( List<String> params )
  {
    SparkConf conf = this.getSparkConf();
    SparkContext sc = new SparkContext( conf );
    SparkSession sqlSession = new SparkSession( sc );

    String zkHost =
      "zookeeper1.us-east-1.elasticbeanstalk.com,zookeeper2.us-east-1.elasticbeanstalk.com,zookeeper3.us-east-1.elasticbeanstalk.com/solr/allJobs";
    String collection = "allJobs";
    //String solrQuery = "expired: false AND title: Java* AND source_id: 4675"; //82  18k jobs java
    String solrQuery = "expired: false AND title: Java*";
    String keyword = "Developer";

    logger.info( "Prepare the resource from " + solrQuery );
    JavaRDD<SolrDocument> rdd = this.generateRdd( sc, zkHost, collection, solrQuery );
    logger.info( "System get sources job count:" + rdd.count() );

    logger.info( "Executing the calculation based on keyword " + keyword );
    JavaRDD<SolrDocument> solrDocs = processRowFilters( rdd, keyword ); //java code filter java developer 10k
    JavaRDD<Job> jobs = solrDocs.map( new Function<SolrDocument, Job>()
    {

      private static final long serialVersionUID = -4456732708499340880L;

      @Override
      public Job call( SolrDocument solr ) throws Exception
      {
        Job job = new Job();
        job.setId( solr.getFieldValue( "id" ).toString() );
        job.setTitle( solr.getFieldValue( "title" ).toString() );
        job.setPrice( new BigDecimal( solr.getFieldValue( "cpc" ).toString() ) );
        return job;
      }

    } );

    Dataset<Row> jobDF = sqlSession.createDataFrame( jobs, Job.class );
    jobDF.createOrReplaceTempView( "job" );
    Dataset<Row> jobHighDF = sqlSession.sql( "SELECT id, title, price FROM job WHERE price > 16 " ); // price > 16 1k

    logger.info( "Find some jobs for you:" + jobHighDF.count() );
    logger.info( "Job Content is:" + jobHighDF.collectAsList().get( 0 ) );

    List<Job> jobsHigh = jobHighDF.as( Encoders.bean( Job.class ) ).collectAsList();

    logger.info( "Persist some jobs to XML:" + jobsHigh.size() );
    logger.info( "Persist some jobs to XML:" + jobsHigh.get( 0 ) );

    XMLUtil.xmlStreamWriter( "/tmp/jobs.xml", jobsHigh );

    sqlSession.close();
    sc.stop();
  }

  private JavaRDD<SolrDocument> generateRdd( SparkContext sc, String zkHost, String collection, String solrQuery )
  {
    SolrJavaRDD solrRDD = SolrJavaRDD.get( zkHost, collection, sc );
    JavaRDD<SolrDocument> resultsRDD = solrRDD.queryShards( solrQuery );
    return resultsRDD;
  }

  private JavaRDD<SolrDocument> processRowFilters( JavaRDD<SolrDocument> rows, String keyword )
  {
    JavaRDD<SolrDocument> lines = rows.filter( new Function<SolrDocument, Boolean>()
    {
      private static final long serialVersionUID = 1L;

      @Override
      public Boolean call( SolrDocument s ) throws Exception
      {
        Object titleObj = s.getFieldValue( "title" );
        if ( titleObj != null )
        {
          String title = titleObj.toString();
          if ( title.toLowerCase().contains( keyword.toLowerCase() ) )
          {
            return true;
          }
        }
        return false;
      }
    } );
    return lines;
  }

}

Exception:
Upload to HDFS
18/01/23 23:55:11 INFO Client: Uploading resource file:/opt/spark/jars/metrics-core-3.1.2.jar -> hdfs://fr-stage-api:9000/user/ec2-user/.sparkStaging/application_1515130308141_0017/metrics-core-3.1.2.jar

Error on YARN
java.io.FileNotFoundException: File does not exist: hdfs://fr-stage-api:9000/user/ec2-user/.sparkStaging/application_1515130308141_0017/metrics-core-3.1.2.jar

Solution:
Comments out the line in Base Class
conf.set( "spark.master", "local[4]" );

References:
http://sillycat.iteye.com/blog/2407961
https://www.jianshu.com/p/c0181667daa0

xml
https://www.ibm.com/developerworks/library/x-tipbigdoc/index.html
https://softwarecave.org/2014/02/15/write-xml-documents-using-streaming-api-for-xml-stax/
https://www.ibm.com/developerworks/cn/xml/x-tipstx4/index.html
https://stackoverflow.com/questions/4616383/xmlstreamwriter-indentation/6723007

submit task
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/misc/how-to-submit-spark-job-to-yarn-from-java-code.md
https://stackoverflow.com/questions/44444215/submitting-spark-application-via-yarn-client
http://massapi.com/method/org/apache/hadoop/conf/Configuration.addResource.html
https://stackoverflow.com/questions/17265002/hadoop-no-filesystem-for-scheme-file

友情提示:
信息收集于互联网,如果您发现错误或造成侵权,请及时通知本站更正或删除,具体联系方式见页面底部联系我们,谢谢。

其他相似内容:

热门推荐: