Special tools are required to process big datasets. Sometimes, there are gaps between the transformations that can be applied sequentially, in a single Python process, versus those at scale within a distributed framework. For example, several Natural Language Processing (NLP) libraries such as Stanford Core NLP and Language Tool must connect to a server in order to fetch the result. A great deal of engineering effort is required to apply this sort of transformation in Apache Spark, a powerful framework for distributed computation which generalizes MapReduce to more arbitrary transformations of data.
The Challenge
PySpark is the Python interface for Apache Spark. The standard way to represent data is with the pyspark.sql.DataFrame class. DataFrames are optimized for resiliency and come with an exhaustive library of standard tabular operations such as grouping, aggregation, mapping, reducing, and even applying custom functions called user-defined functions (UDF’s). The Apache Spark documentation is a great resource for further information.
Sometimes, there are limitations in such frameworks that warrant innovation on the part of the developer. One specific case is applying an operation that queries a Java HTTP server and returns a result. The naive approach is to create a UDF that instantiates a connection. This won’t work because the client cannot be serialized by the PySpark driver and sent to the workers, resulting in an error such as follows:
TypeError: cannot pickle '_thread.lock' object
At this point, our first major design decision is in order. Practically speaking, one can either migrate their work to Java, which natively supports both Spark and the HTTP Server, or utilize the pyspark.sql.DataFrame.foreachPartition method. If you’re a Java whiz and don’t mind translating your code, then the former might be the right solution for you. Otherwise, let us walk through the steps of building a tool that extends PySpark operations to support querying the HTTP Server and returning a result. The seemingly trivial step of “returning a result” is refrained because foreachPartition in fact always returns None, making things not-so-trivial after all.
The Solution
We need to produce an output DataFrame that consists of the querying function applied to each row of the input DataFrame. The solution is to wrap our querying function such that the result is written to a database. We can then load the resulting output table into PySpark as a DataFrame. This leads to the next fork in the road, which is to pick a database. The primary categorization of databases is relational versus non-relational, or SQL versus NoSQL. There is plenty of info on the web about the trade-offs between the two. I settled on PostgreSQL for several reasons:
- SQL is fast.
- PostgreSQL supports JSON objects and dynamically-sized fields.
- PostgreSQL can be setup locally to avoid network latency and costs.
Setting Up PostgreSQL
The greatest challenge of using PostgreSQL is figuring out how to submit SQL statements non-interactively in Python, i.e., without logging into the psql shell. There is not simply a Python package that can be installed with a prebuilt PostgreSQL binary. There is, however, a handy package called psycopg2 that includes an ergonomic PostgreSQL client. Run the commands below to install all dependencies in Ubuntu (excluding PySpark, which is downloaded as a tarball from the official site):
sudo python3 -m pip install psycopg2-binary
sudo apt install postgresql
PostgreSQL has user postgres, database postgres, and cluster main as defaults. For our purposes, these will suffice but it is possible to add more. The command:
sudo pg_ctlcluster 13 main start|stop
can be used to either start or stop the main database server. If the server has not started, then it is not possible to query or make any modifications to the main cluster. Once started, switch to the postgres user via:
sudo -i -u postgres
and then open the psql interactive shell simply with the command psql. From this shell, we can submit SQL statements and inspect the database.
Python Integration with Psycopg2
As mentioned earlier, we would like to control the database non-interactively with Python. The psycopg2 package allows you to create a database connection as well as a cursor to submit SQL statements:
import psycopg2
connection = psycopg2.connect('dbname=postgres user=postgres')
cursor = connection.cursor()
cursor.execute('CREATE TABLE "my_table" (foo VARCHAR(128) bar INT);')
cursor.execute('''INSERT INTO "my_table" (foo, bar) VALUES ('hello world', 2718);''')
Thus, our strategy is to create a psycopg2 connection within the foreachPartition call, write the output of function that invokes the Java HTTP Server, export the table to CSV, and load it into PySpark.
Putting It All Together
The last caveat is that Python must be run as the postgres user. Assuming your PySpark application is executed with spark-submit, the complete command is:
sudo runuser -l postgres -c "$SPARK_HOME/bin/spark-submit [options] script.py"
Please check out the Python package I wrote called socketmap, which implements the steps above. Here is a simple example with Stanford Core NLP, which utilizes the Java HTTP Server:
from pyspark.sql import SparkSession
from pycorenlp import StanfordCoreNLP
from socketmap import socketmap
def parse_sentence(sentence):
nlp = StanfordCoreNLP('http://localhost:9000')
response = nlp.annotate(
sentence,
properties={'annotators': 'parse', 'outputFormat': 'json'},
)
return response['sentences'][0]['parse']
spark = SparkSession.builder.getOrCreate()
sentences = [
['The ball is red.'],
['I went to the store.'],
['There is a wisdom that is a woe.'],
]
input_dataframe = spark.createDataFrame(sentences, ['sentence'])
wrapper = lambda row: {'tree': parse_sentence(row['sentence'])}
output_dataframe = socketmap(spark, input_dataframe, wrapper)
This approach enables you to leverage the power of distributed computing while still utilizing server-dependent libraries that require HTTP connections. The socketmap package abstracts away the complexity of managing PostgreSQL connections within Spark partitions, giving you a clean interface to work with.