Showing posts with label PySpark. Show all posts
Showing posts with label PySpark. Show all posts

Wednesday, August 5, 2015

Merge Spark Output to a Single File

If your results fit into node's memory, you can use coalesce(number_od_partitions) to bring all data into one node and then write it into your output will be one single file.

look at this example:
Here I want to parse a text file filter few lines based on a criteria and get the distinct number of first item in the file and at the end storing the list of the ids into one single output.
a.filter(lambda x : len(x.split('|')) >3 ).filter(lambda x : x.split('|')[2]=='SD').map(lambda x : x.split('|')[0]).distinct().coalesce(1).saveAsTextFile('/Downloads/spark-1.4.0-bin-hadoop2.6/ciks1')

Saturday, July 25, 2015

Simple Example of Using Accumulators in Python Spark

Here is a simple example of accumulator variable that is used by multiple workers and return an accumulative value at the end.


>>> acc = sc.accumulator(5)
>>> def f(x):
...     global acc
...     acc += x
>>> rdd = sc.parallelize([1,2,4,1])
>>> rdd.foreach(f)
>>> acc.value
13


A couple of notes:

  • Tasks in the worker can not access the Accumulator values
  • Tasks see accumulator as write only
  • Mostly used for debugging puerpose