Menu

Data Sources – CREATE DATABASE dbName; GO

There are many locations where you can retrieve data. In this section you will see how to read and write JSON, CSV, and parquet files using PySpark. You have already been introduced to a DataFrame in some capacity. Reading and writing data can happen totally within the context of a file, or the data can be read from a file and loaded into a DataFrame for in‐memory manipulation. Here are some common JSON PySpark examples:

df = spark.read.json(‘path/brainwaves.json’)
df = spark.read.json(‘path/*.json’)
df = spark.read.json([‘AF3/THETA.json’, ‘T8/ALPHA.json’])
df = spark.read.format(‘json’).load(‘path/’)
df = spark.read.option(‘multiline’, ‘true’).json(‘pathToJsonFile’)
df = spark.read.schema(‘schemaStructure’).json(‘path/brainwaves.json’)
df.write.json(‘path/brainwave.json’)
dr.write.mode(‘overwrite’).json(‘path/brainwave.json’)

Reading a JSON file and loading it into a DataFrame in the previous code snippet is performed in a variety of approaches. The first example is loading a single JSON file, which is identified using the path and the filename, path/brainwaves.json. The next example uses a wildcard that results in all JSON files in the described location being loaded into a DataFrame, path/*.json. The next line illustrates that individual files, located in different locations, can be loaded into the same DataFrame. As you can see, instead of using the json() method, you can use the format() method in combination with the load() method to load a JSON file into a DataFrame.

When you’re working with JSON files, it is common that the file is written on multiple lines, like the following code snippet. If the format of your JSON file is spread across multiple lines, you need to use the option() method and set the multiline option to true; it is false by default.

{“Session”: { “Scenario”: “TikTok”, “POWReading”: [{ “ReadingDate”: “2021-07-30T09:40:25.6635″,”Counter”: 0, “AF3”: [{“THETA”: 9.681, “ALPHA”: 3.849, “BETA_L”: 2.582, “BETA_H”: 0.959,”GAMMA”: 0.738}]}]}}

The alternative to this is a single line, like the next code snippet. Single lines are a bit harder to read, which is why the previous multiline example is very common with JSON files. However, if the data producer is aware that no human will look at these files, consider making JSON files in a single line.

{“Session”:{“Scenario”:”TikTok”,”POWReading”:[{“ReadingDate”: “2021-07-30T09:40:25.6635″,”Counter”:0,”AF3″:[{“THETA”: 9.681,”ALPHA”: 3.849, “BETA_L”: 2.582,”BETA_H”: 0.959,”GAMMA”: 0.738}]}]}}

You should already have an understanding of what a schema is; it is a definition of or a container for your data. An example schema for a JSON file might resemble the following. The format for the StructField() method is column name, data type, and nullable option. If you do not provide a schema, the runtime will attempt to infer a schema itself. This is not the case with CSV, where everything is returned as a String data type.

schema = StructType([
         StructField(‘Scenario’, StringType(), True),
         StructField(‘ReadindDate’, DateTimeType(), True),
         StructField(‘Counter’, IntegerType(), False),
         StructField(‘THETA’, DecimalType(), True)])

Once you have defined the schema of the data within the JSON file, you can read it into a DataFrame. Use the df.printSchema() method to see the result of your schema definition. After you have loaded a JSON into a DataFrame and done some manipulation, you might want to write it back to a file. You can do so using the write capability. Notice that there is something shown as mode in one of the write code snippets, which is set to overwrite. This means if a file of that name already exists, replace that one with the one being written now. Other mode options are as follows:

  • Using append will add the new data to the end of an existing file with the same name identified in the write command.
  • The ignore option will not perform the write if a file with the same filename already exists in the given location.
  • The errorifexists means that the file will not be written and that an error will be thrown.

Take a look at the following code snippets related to CSV files. They are very similar, but there are a few syntactical differences.

df = spark.read.csv(‘path/brainwaves.csv’)
df = spark.read.csv(‘path/’)
df = spark.read.csv(‘AF3/THETA.csv, ‘T8/ALPHA.csv)
df = spark.read.format(‘csv’).load(‘path/’)
df = spark.read.option(‘header’, True).csv(‘pathToCsvFile’)
df = spark.read.format(‘csv’) \
          .option(‘header’, True) \
          .schema(‘schemaStructure’).load(‘path/brainwaves.csv’)
df.write.option(‘header’, True).csv(‘path/brainwave.csv’)
dr.write.format(csv).mode(‘error’).save(‘path/brainwave.csv’)

How to read from a CSV file—for example, reading a specific file, reading all the files in a directory, and reading one or more specific files—is illustrated with the first three lines of code. There are a few more options when working with CSV files than with JSON—for example, delimiter, inferSchema, header, quotes, nullValues, and dateFormat. Notice in the following two lines of code the different approaches for adding an option or options:

df = spark.read.options(inferSchema=’True’, delimiter=’,’).csv(‘path/wave.csv’)
df = spark.read.option(‘inferSchema’, True).option(‘delimeter’, ‘,’).csv(‘T7.csv’)

Since CSV stands for comma‐separated values, the default delimiter is a comma. However, a pipe (|), a tab (\t), or a space can also be used as a delimiter. If your CSV file does not use a comma, then you need to set the delimiter in the option. The inferSchema option, which is False by default, will notify the runtime that it should attempt to identify the data type of the columns itself. When the option is False, all columns will be data typed as a string. The top or first line of a CSV file commonly contains the header names of the data included per column. The header option will instruct the runtime to use the first row as column names. The header default is False and will therefore be a string if not set to True. The syntax for building a schema with a CSV file is shown here:

schema = StructType().add(‘Scenario’, StringType(), True) \
                     .add(‘ReadindDate’, DateTimeType(), True) \
                     .add(‘Counter’, IntegerType(), False) \
                     .add(‘THETA’, DecimalType(), True)

If the data within the CSV file can have quotes, use the quote option to notify the runtime; the same goes for nullValues. If there can be null values for columns, you can instruct the runtime to use a specific value in the place of a null. For example, you can use 1900‐01‐01 if a date column is null in the CSV file. Dates can come in many formats. If you need to instruct how the date will be received, use the dateFormat option. The available values for mode are the same for CSV: overwrite, append, ignore, and error have the same meaning and use case as JSON. Finally, review the following snippets, which work with parquet files:

df = spark.read.parquet(‘path/brainwaves.parquet’)
df = spark.read.format(‘parquet’).load(‘path/’)
df.write.parquet(‘path/brainwave.parquet’)
dr.write.mode(‘append’).parquet(‘path/brainwave.parquet’)

Parquet, CSV, and JSON files have very similar capabilities, so there is not much more to discuss about them here. Parquet files maintain the schema, which is used in the processing of the file; therefore, using a schema to read the files is not as important when compared to CSV or JSON. The options for mode are the same for all three file formats discussed here. It is also possible to manage ORC files in the same way:

df = spark.read.orc(‘path\brain*.orc’)
spark.write.format(‘orc’).mode(‘overwrite’).save(‘path/brainwave.orc’)

However, there will not be many examples using this file format in this book. That file format has some great use cases and can add great value, but more detail in that area is outside of the scope of the DP‐203 exam.

Leave a Reply

Your email address will not be published. Required fields are marked *