MITB Banner

Document Classification using Apache Spark in Scala

document-classification-using-apache-spark-in-scala
Email Spam Identification, category classification of news and organization of web pages by search engines are the modern world examples for document classification. It is a technique to systematically classify a text document in one of the fixed category, or In other words, tagging of a text document can be described as document classification process. This technique is really helpful when the amount of data is too large, especially for organizing, information filtering, and storage purposes.

In this article, we will discuss an approach to implement an end to end document classification pipeline using Apache Spark, and we will use Scala as the core programming language. Apache Spark is the ideal choice while dealing with a greater volume and variety of data. Apache Spark’s machine learning library – Mllib is scalable, easy to deploy and is hundred times faster than MapReduce operations.

Table of Contents

1. Document Pre-Processing
2. Initializing Apache Spark
3. Dataset Preparation
4. Creating the MLlib pipeline
5. Training the model
6. Prediction on Test Data
7. Conclusion and Next Steps

1. Document Pre-Processing

The first component of the pipeline in is a pre-processing block which involves removal of noisy content from the document. This included cleaning of URLs, punctuations, digits, short words, extra whitespace, and English stopwords etc. Below are the scala utility functions used for cleaning various regular expressions and custom words.

\\ Utility function to remove particular regex from text

  def removeRegex(txt: String, flag: String): String = {
    val regex = RegexList.get(flag)
    var cleaned = txt
    regex match {
      case Some(value) =>
        if (value.equals("white_space")) cleaned = txt.replaceAll(value, "")
        else cleaned = txt.replaceAll(value, " ")
      case None => println("No regex flag matched")
    }
    cleaned
  }

\\ Particular function to remove custom word list from text

  def removeCustomWords(txt: String, flag: String): String = {
    var words = txt.split(" ")
    val stopwords = Stopwords.get(flag)
    stopwords match {
      case Some(value) => words = words.filter(x => !value.contains(x))
      case None => println("No stopword flag matched")
    }
    words.mkString(" ")
  }

 

To use these functions, next step is to create the regular expressions and cleaning the entire documents step by step.

 

\\ Building a List of Regex for PreProcessing the text
 var RegexList = Map[String, String]()
  RegexList += ("punctuation" -> "[^a-zA-Z0-9]")
  RegexList += ("digits" -> "\\b\\d+\\b")
  RegexList += ("white_space" -> "\\s+")
  RegexList += ("small_words" -> "\\b[a-zA-Z0-9]{1,2}\\b")
  RegexList += ("urls" -> "(https?\\://)\\S+")

\\ Loading a stopwords list
 var Stopwords = Map[String, List[String]]()
 Stopwords += ("english" -> Source.fromFile("stopwords.txt").getLines().toList)

\\ Function to perform step by step text preprocessing and cleaning on documents 
 def cleanDocument(document_text: String) : String = {
   \\ Converting all words to lowercase
   \\ Removing URLs from document
   \\ Removing Punctuations from document text
\\ Removing Digits from document text
    \\ Removing all words with length less than or equal to 2
   \\ Removing extra whitespaces from text
   \\ Removing English Stopwords
    \\ Returning the preprocessing and cleaned document text

    var text = document_text.toLowerCase
     text = removeRegex(text,"urls")
     text = removeRegex(text,"punctuation")
     text = removeRegex(text,"digits")
     text = removeRegex(text,"small_words")
     text = removeRegex(text,"white_space")
     text = removeCustomWords(text, "english")
     text
  }

 

2. Initializing Spark context


To use Spark, we need to initialize it and create contexts to be used for training the classifiers, building the pipelines and making necessary transformations. Following lines of code can be used for this purpose.

 val conf = new SparkConf().setMaster("local[*]").setAppName("DC")
 val sc = new SparkContext(conf)
 val sqlContext = new SQLContext(sc)

 

3. Dataset Preparation – Loading the Documents

Now, we need to load the documents and create a data frame using SQL context and splitting it into test data and training data frames. This will include reading the text file (containing the documents), creating file RDD to the data frame and finally slicing the data frame into training and test.

   // Loading the text file using sc.textFile function and creating an RDD
    // RDD shape: “CleanedText”,Category”

   val input_path = "/path/to/data.txt"
    val input_RDD = sc.textFile(input_path).map(x => {
      val row = x.split(",")
      (cleanDocument(row(1)),row(2))
    })
 
   // Converting an RDD to DataFrame
   val trainingDF = sqlContext.createDataFrame(input_RDD)
                             .toDF("id","cleaned","category")

  // Slicing the data into 70:30 ratio for training and testing data
   val Array(trainingData, testData) = trainingDF.randomSplit(Array(0.7, 0.3))

  // print the training data
   trainingData.show()

4. Creating the MLlib PipeLine

In the next step, we will prepare the processing and classification pipeline using MLlib. This pipeline consists of:

  1. Indexer (to convert category names into Indexes)
  2. Tokenization (for converting text into tokens (words))
  3. hashingTF (a term frequency matrix for every document. The role of term frequency is to act as features of every document. MLlib provides Hashing trick implementation)
  4. Logistic Regression Component – For classification component, we will use logistic regression. When the problem is multi-class classification, we will wrap the model in one vs. rest model.
// Processing
 val indexer = new StringIndexer()
                              .setInputCol("category")
                              .setOutputCol("label")

  val tokenizer = new Tokenizer()
                            .setInputCol("cleaned")
                            .setOutputCol("tokens")
 

   val hashingTF = new HashingTF()
                               .setInputCol("tokens").setOutputCol("features")
                               .setNumFeatures(20)

 


// Classification   
  val lr = new LogisticRegression().setMaxIter(100).setRegParam(0.001)
 val ovr = new OneVsRest().setClassifier(lr)

 

5. Training The Model

Creating the final pipeline of all the components, and fitting the model to training data.

  val pipeline = new Pipeline().setStages(Array(indexer, tokenizer, hashingTF, ovr))
   val model = pipeline.fit(trainingData)

 

6. Prediction on Test Data

Once the model is trained, it can be used for making predictions on test data. One can use Confusion Matrix or Cross Validation techniques in order to measure the accuracies of the pipeline.

  // Create the classification pipeline and train the model
  val prediction = model.transform(testData).select("id","cleaned_text","category","prediction")

 // Print the predictions
   prediction.foreach(println)

 

7. Conclusion and Next Steps

The full code of this tutorial can be found here, This tutorial explains about creating a pipeline for document classification in spark using scala. This end to end pipeline is capable of predicting the unknown classes of different text with decent accuracies. Next Steps are obviously about the improvement of each component involved in this pipeline. Refer to the official MLlib Link and Spark programming Guide for more detailed documentation. Feel free to share your thoughts in the comments.

Access all our open Survey & Awards Nomination forms in one place >>

Picture of Shivam Bansal

Shivam Bansal

Shivam Bansal, Data Scientist with over 3 years of experience in analytics industry, He is working with various aspects of data science which includes Machine Learning, Natural Language Processing, Statistics and Big Data Architectures.

Download our Mobile App

CORPORATE TRAINING PROGRAMS ON GENERATIVE AI

Generative AI Skilling for Enterprises

Our customized corporate training program on Generative AI provides a unique opportunity to empower, retain, and advance your talent.

3 Ways to Join our Community

Telegram group

Discover special offers, top stories, upcoming events, and more.

Discord Server

Stay Connected with a larger ecosystem of data science and ML Professionals

Subscribe to our Daily newsletter

Get our daily awesome stories & videos in your inbox
Recent Stories