Listen to this story
|
Comet online server has integrated with SparkNLP which facilitates NLP engineers to deploy the model or the complete pipeline on the server. It helps in continuous monitoring and to evaluate parameters of the model deployed on the server. Comet enables the models to be accessed irrespective of the platforms it is developed and the model’s pipeline on the server or the model alone can be used to obtain predictions. This article provides a brief overview of how to integrate a SparkNLP pipeline into a comet server and obtain predictions accordingly.
Table of Contents
- Introduction to SparkNLP
- Setting up Pyspark and SparkNLP
- Creating a comet experiment in the server
- Extracting data from AWS server
- Defining a SparkNLP pipeline
- Logging various metrics in the comet server
- Using the pipeline to obtain predictions
- Summary
Introduction to SparkNLP
SparkNLP is the state-of-the-art open source library offered by John Snow Labs which is used by various organizations to boost up their NLP tasks as they facilitate easy model development and linear scalability and create a complete NLP pipeline. The overall build of SparkNLP is with respect to Apache Spark and it provides the flexibility to write codes in Python, Java, or Scala. Due to its extensive flexibility and support for transformer-based models like BERT and NER models this library is extensively used in many organizations now.
Are you looking for a complete repository of Python libraries used in data science, check out here.

Setting up Pyspark and SparkNLP
Initially, Pyspark and SparkNLP have to be set up in the working environment using some standard pip commands. Here the implementation was carried out in Google Colab and SparkNLP was set up from the official repository of John Snow Labs. SparkNLP displays module was also made available for visualization along with comet_ml as shown below.
!wget http://setup.johnsnowlabs.com/colab.sh -O - | bash

!pip install --ignore-installed spark-nlp-display !pip install comet_ml --quiet
Creating a comet experiment in the server
Initially, the SparkNLP modules were made available and the spark session was started in the working environment as shown below.
import sparknlp from sparknlp.base import * from sparknlp.annotator import * from sparknlp.pretrained import PretrainedPipeline ## Acquiring required pyspark modules from pyspark.ml import Pipeline from pyspark.sql import SparkSession import pyspark.sql.functions as F ## Starting the spark session in the working environment spark = sparknlp.start() # Importing required Comet modules import comet_ml from sparknlp.logging.comet import CometLogger
Now the experiment is initiated on the comet server which enables the user to log in on to their server and starts the experiment with a unique API key. Also, all the log files are directed to a directory to monitor all the experiments on the server.
comet_ml.init(project_name='comet-with-sparknlp') OUTPUT_LOG_PATH = './run'

Now the experiment set up on the server can be validated for its presence where the status of the experiment on the server will be updated as live as shown below.
logger = CometLogger()

Extracting data from AWS server
The curl shell command is used to extract the training data from the AWS server and the spark read function is used to read the parquet data with respect to spark context object and the show() command is used to visualize the dataframe for the top entries.
!curl -O 'https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/classifier-dl/toxic_comments/toxic_train.snappy.parquet' train_df=spark.read.parquet("toxic_train.snappy.parquet").repartition(120) train_df.show(5)

Defining a SparkNLP pipeline
Before creating any pipeline proper processing has to be carried out to fit the data in the pipeline and the preprocessing steps are taken up by the Document Assembler to shrink the data before fitting it into the pipeline and Tokenizer was used to tokenize the textual document and the output column was set as tokenized documents. The steps to follow are given below.
doc = (DocumentAssembler().setInputCol("text").setOutputCol("document").setCleanupMode("shrink")) ## Creating a tokenizer instance tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")
Integrating SparkNLP with Tensorflow-Hub
As we are working with textual data and NLP models here TensorFlow-hub is used to integrate the NLP model on the comet server and also to seamlessly utilize any pretrained models if required present in the TensorFlow hub repository. So here the Universal-Sentence-Encoder module of the TensorFlow hub is used to integrate with SparkNLP for the data available.
univ_sent_enc = (UniversalSentenceEncoder.pretrained().setInputCols(["document"]).setOutputCol("sentence_embeddings"))

Now let us use the Multiclass Classifier model of the TensorFlow hub repository as we have multiple instances to classify the data made available from the AWS server as shown below.
multiClassifier = (MultiClassifierDLApproach().setInputCols("sentence_embeddings").setOutputCol("category").setLabelColumn("labels").setBatchSize(256) .setMaxEpochs(15) .setLr(1e-2) .setThreshold(0.5) .setShufflePerEpoch(False) .setEnableOutputLogs(True) .setOutputLogsPath(OUTPUT_LOG_PATH) .setValidationSplit(0.2) )
Now let us integrate the multiClassifier instance on the comet server and let us make the comet server monitor the logging experiments carried out as shown below.
logger.monitor(OUTPUT_LOG_PATH, multiClassifier)
Now let us fit the training data and the multiClassifier instance in the pipeline.
Fitting the data in the SparkNLP pipeline
Now let us look into fitting the training data on the pipeline where the various stages of the pipeline include fitting the document assembler instance, sentence encoder instance, and the multiClassifier instance, and the pipeline instance for fitting the training data.
spark_nlp_pipe = Pipeline(stages=[doc, univ_sent_enc, multiClassifier]) model = spark_nlp_pipe.fit(train_df) ## Lets first direct to the run directory of the server !ls ./run
Now let us log the pipeline created on the server using the log_completed_run module of the comet server as shown below.
logger = CometLogger() logger.log_completed_run('./run/MultiClassifierDLApproach_1d7c43772adc.log')

Now the pipeline is logged into the server and the experiment logged into the server can be visualized where all the pipeline parameters can be visualized in the working environment itself.
logger.experiment.display(tab='charts')

Now as the pipeline is fitted with the training data let us see how to obtain testing data from the AWS server and obtain predictions from the model integrated in the comet server.
Obtaining the testing data from the AWS server
Similar to acquiring the training data the testing data was extracted from the AWS server using the curl shell commands and the spark context object was used to read the dataset as shown below.
!curl -O 'https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/classifier-dl/toxic_comments/toxic_test.snappy.parquet' test_df=spark.read.parquet("/content/toxic_test.snappy.parquet").repartition(10)
Now, this testing dataframe consists of data that is not seen by the model in the pipeline and it will be used to obtain predictions from the model deployed in the pipeline. But before obtaining predictions from the model in the pipeline let us see how to log various parameters in the server to evaluate the model performance
Logging various metrics in the comet server
For evaluating and logging parameters to the server it is necessary to pass the testing data to the model to obtain predictions and also to evaluate the required parameters.
pred=model.transform(test_df)
As this is a spark context object, the prediction was converted to a Pandas dataframe for easily interpreting the multiple classes present to classify a single instance.
pred_df=pred.select('labels','category.result').toPandas() pred_df.head()

As we can see that we have multiple labels to categorize, let’s perform binarization or encoding of the labels column using MultiLabelBinarizer library of the scikit learn module.
from sklearn.preprocessing import MultiLabelBinarizer mlb = MultiLabelBinarizer() ## Creating an instance y_actual = mlb.fit_transform(pred_df['labels']) y_predicted = mlb.fit_transform(pred_df['result'])
Now the actual and predicted values for each of the class labels will be evaluated using the classification report.
from sklearn.metrics import classification_report cr=classification_report(y_actual,y_predicted,output_dict=True) for k,v in cr.items(): print('{} {}'.format(k,v))

Now as we have obtained the various classification report metrics let’s log these metrics into the comet server thereby modifying the model in the pipeline. The logging of the metrics can be done using the log_metrics() and the key and value pairs can be logged into the comet logger. The logged metrics in the server can be visualized using the display function of the comet experiment and declaring the tab to be displayed as metrics.
for k,v in cr.items(): logger.log_metrics(v,prefix=k) logger.experiment.display(tab='metrics')

So now as we have an idea of how to visualize the various metrics of the model, let’s see how to obtain predictions from the pipeline which is present in the comet server.
Using the pipeline to obtain predictions
SparkNLP supports various pretrained models such as Named Entity Recognition models and Bert Models. So here we have used the Named Entity Recognition model named as ner_dl which is pretrained to find features like names of peoples, places and organizations. So any text document(corpus) with these features the pretrained model of ner_dl can be used.
At first let’s mention the model name we are using in a string and later perform required processing using WordEmbeddings function to set appropriate input columns and output columns. The steps to follow are mentioned below.
embeddings=WordEmbeddingsModel.pretrained('glove_100d').setInputCols(["document", 'token']).setOutputCol("embeddings")

Now the pretrained model is made available in the working environment where we set the appropriate input functions to pass to the NER model along with the output to be produced by the model in use as shown below.
ner_converter=NerConverter().setInputCols(['document', 'token', 'ner']).setOutputCol('ner_chunk')
Now let us add all the stages to the pipeline to obtain predictions from the pipeline in the server.
nlp_pipeline=Pipeline(stages=[doc,tokenizer,embeddings,ner_model,ner_converter])
So once these stages of the pipeline are declared an empty spark dataframe to fit it to the pipeline.
empty_df = spark.createDataFrame([['']]).toDF('text') pipeline_model = nlp_pipeline.fit(empty_df)
Now any text document can be pushed into the data frame and use the pretrained NER model in the pipeline to obtain predictions.
import pandas as pd text_list=['Hi hello Good Morning!!. It is a beautiful day with a pleasant weather'] df = spark.createDataFrame(pd.DataFrame({'text': text_list})) results = pipeline_model.transform(df) results.show()

As we have used a spark dataframe it is not easily interpretable and for easy interpretation, we can convert the spark dataframe to a pandas dataframe as shown below and interpret each of the stages and the predictions obtained by each of the stages by the model deployed in the comet server.
res_df=results.toPandas() res_df.head()
Summary
So this is how a complete SparkNLP pipeline is created and deployed on the comet server. The main advantage of the comet server is to continuously monitor any issues with the different stages of the pipeline or the model in use in the server. All the parameters and the entire pipeline is accessible in the server irrespective of the platform of accessing along with interpretable reports of various runs and experiment logging in the server along with interpretable charts of various parameters being logged in the comet server.