Apache Spark

OakVar is natively integrated with Apache Spark, allowing annotation on the big genome project scale.

Let's see how it is done.

First, create a Spark session.

import pyspark
from pyspark.sql import SparkSession

conf = pyspark.SparkConf().setAppName("ov")
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

Then, let's make a test set of variants as a Spark DataFrame.

data = [
        {"chrom": "chr7", "pos":140734758, "ref_base": "T", "alt_base": "C"},
        {"chrom": "chr7", "pos":140734780, "ref_base": "-", "alt_base": "G"},
        {"chrom": "chr7", "pos":140736487, "ref_base": "GTGCGA", "alt_base": "-"},
        {"chrom": "chr7", "pos":140736487, "ref_base": "GTGCGAT", "alt_base": "-"},
        {"chrom": "chr7", "pos":140742186, "ref_base": "-", "alt_base": "T"},
        {"chrom": "chr7", "pos":140753351, "ref_base": "A", "alt_base": "G"},
        {"chrom": "chr7", "pos":140800417, "ref_base": "CT", "alt_base": "-"},
        {"chrom": "chr7", "pos":140800417, "ref_base": "CTG", "alt_base": "-"},
        {"chrom": "chr7", "pos":140807936, "ref_base": "A", "alt_base": "-"},
        {"chrom": "chr7", "pos":140924703, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr7", "pos":148847298, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr7", "pos":27199497, "ref_base": "C", "alt_base": "G"},
        {"chrom": "chr7", "pos":2958506, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr7", "pos":50319062, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr7", "pos":55019278, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr7", "pos":55019338, "ref_base": "G", "alt_base": "A"},
        {"chrom": "chr7", "pos":55181319, "ref_base": "-", "alt_base": "GGGTTG"},
        {"chrom": "chr8", "pos":127738263, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr8", "pos":43018497, "ref_base": "A", "alt_base": "G"},
        {"chrom": "chr9", "pos":107489172, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr9", "pos":130714320, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr9", "pos":132928872, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr9", "pos":136545786, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr9", "pos":21968622, "ref_base": "C", "alt_base": "-"},
        {"chrom": "chr9", "pos":21974827, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr9", "pos":37034031, "ref_base": "A", "alt_base": "T"},
        {"chrom": "chr9", "pos":5021988, "ref_base": "A", "alt_base": "T"},
]
for uid in range(len(data)):
    data[uid]["uid"] = uid
df = spark.createDataFrame(data)
print("Original DataFrame")
df.show()

Output:

Original DataFrame
+--------+-----+---------+--------+---+
|alt_base|chrom|      pos|ref_base|uid|
+--------+-----+---------+--------+---+
|       C| chr7|140734758|       T|  0|
|       G| chr7|140734780|       -|  1|
|       -| chr7|140736487|  GTGCGA|  2|
|       -| chr7|140736487| GTGCGAT|  3|
|       T| chr7|140742186|       -|  4|
|       G| chr7|140753351|       A|  5|
|       -| chr7|140800417|      CT|  6|
|       -| chr7|140800417|     CTG|  7|
|       -| chr7|140807936|       A|  8|
|       T| chr7|140924703|       A|  9|
|       T| chr7|148847298|       A| 10|
|       G| chr7| 27199497|       C| 11|
|       T| chr7|  2958506|       A| 12|
|       T| chr7| 50319062|       A| 13|
|       T| chr7| 55019278|       A| 14|
|       A| chr7| 55019338|       G| 15|
|  GGGTTG| chr7| 55181319|       -| 16|
|       T| chr8|127738263|       A| 17|
|       G| chr8| 43018497|       A| 18|
|       T| chr9|107489172|       A| 19|
+--------+-----+---------+--------+---+
only showing top 20 rows

Then, we create a Spark Resilient Distributed Dataset (RDD).

rdd = sc.parallelize(data, 4)

Then, let's define a custom function which will be run in each worker node and for a partition of the RDD.

import oakvar as ov

def get_ov_annotation(iterator):
    mapper = ov.get_mapper("gencode")
    clinvar = ov.get_annotator("clinvar")
    for row in iterator:
        ret = mapper.map(row)
        ret = clinvar.append_annotation(ret)
        yield ret

This function will be run as a standalone function in worker nodes, so the worker nodes should already have OakVar installed and their Python should have access to the installed OakVar package.

This function loads a gencode mapper and a clinvar annotation module, and for variant, runs the mapper and the annotator.

Let's apply this custom function to the variant RDD.

ret = rdd.mapPartitions(get_ov_annotation).collect()

ret will be a list of dict, each dict corresponding to one annotated variant.

Let's create a new RDD with annotated variants. We'll use GENCODE mapper (gencode) and ClinVar annotator (clinvar). They should be already installed in the worker nodes.

schema = ov.lib.util.run.get_spark_schema(["gencode", "clinvar"])
rdd = spark.createDataFrame(ret, schema)
rdd.show()

Output:

+---+-----+---------+--------+--------+----+------+------+-----------------+---+--------------------+--------------------+------+--------------------+------------+--------------------+---------------------+----------------------+--------------------+-----------+-----------------+
|uid|chrom|      pos|ref_base|alt_base|note|coding|  hugo|       transcript| so|             cchange|             achange|exonno|        all_mappings|clinvar__uid|        clinvar__sig|clinvar__disease_refs|clinvar__disease_names|   clinvar__rev_stat|clinvar__id|clinvar__sig_conf|
+---+-----+---------+--------+--------+----+------+------+-----------------+---+--------------------+--------------------+------+--------------------+------------+--------------------+---------------------+----------------------+--------------------+-----------+-----------------+
|  0| chr7|140734758|       T|       C|NULL|     Y|  BRAF|ENST00000646891.2|MIS|           c.2140A>G|         p.Ile714Val|    18|{"BRAF": [["A0A2U...|        NULL|Uncertain signifi...| MONDO:MONDO:00055...|  Colorectal cancer...|criteria provided...|    1410272|             NULL|
|  1| chr7|140734780|       -|       G|NULL|      |  BRAF|ENST00000646891.2|INT|c.2128-10_2128-9insC|
|  2| chr7|140736487|  GTGCGA|       -|NULL|      |  BRAF|ENST00000646891.2|INT|c.2128-1722_2128-...|
|  3| chr7|140736487| GTGCGAT|       -|NULL|      |  BRAF|ENST00000646891.2|INT|c.2128-1723_2128-...|
|  4| chr7|140742186|       -|       T|NULL|      |  BRAF|ENST00000646891.2|INT|      c.1993-2240dup|
|  5| chr7|140753351|       A|       G|NULL|     Y|  BRAF|ENST00000646891.2|MIS|           c.1784T>C|         p.Phe595Se
|  6| chr7|140800417|      CT|       -|NULL|     Y|  BRAF|ENST00000646891.2|FSD|        c.927_928del|  p.Glu309AspfsTer4
|  7| chr7|140800417|     CTG|       -|NULL|     Y|  BRAF|ENST00000646891.2|IND|        c.923_925del|         p.Ala308de
|  8| chr7|140807936|       A|       -|NULL|      |  BRAF|ENST00000646891.2|INT|         c.711+24del|
|  9| chr7|140924703|       A|       T|NULL|     Y|  BRAF|ENST00000646891.2|SYN|              c.1T>A|             p.Met1
| 10| chr7|148847298|       A|       T|NULL|     Y|  EZH2|ENST00000320356.7|SYN|              c.1T>A|             p.Met1
| 11| chr7| 27199497|       C|       G|NULL|     Y|HOXA13|ENST00000649031.1|MIS|            c.581G>C|         p.Cys194Se
| 12| chr7|  2958506|       A|       T|NULL|     Y|CARD11|ENST00000396946.9|SYN|              c.1T>A|             p.Met1
| 13| chr7| 50319062|       A|       T|NULL|     Y| IKZF1|ENST00000331340.8|MLO|              c.1A>T|             p.Met1
| 14| chr7| 55019278|       A|       T|NULL|     Y|  EGFR|ENST00000275493.7|MLO|              c.1A>T|             p.Met1?|     1|{"EGFR": [["P0053...|        NULL|                NULL|                 NULL|                  NULL|                NULL|       NULL|             NULL|
| 15| chr7| 55019338|       G|       A|NULL|     Y|  EGFR|ENST00000275493.7|MIS|             c.61G>A|          p.Ala21Thr|     1|{"EGFR": [["P0053...|        NULL|Uncertain signifi...|      MedGen:CN130014|  EGFR-related lung...|criteria provided...|     848579|             NULL|
| 16| chr7| 55181319|       -|  GGGTTG|NULL|     Y|  EGFR|ENST00000275493.7|INI|c.2309_2310insGGGTTG|p.Asp770delinsGlu...|    20|{"EGFR": [["P0053...|        NULL|                NULL|                 NULL|                  NULL|                NULL|       NULL|             NULL|
| 17| chr8|127738263|       A|       T|NULL|     Y|   MYC|ENST00000377970.6|MLO|              c.1A>T|             p.Met1?|     2|{"CASC11": [["", ...|        NULL|                NULL|                 NULL|                  NULL|                NULL|       NULL|             NULL|
| 18| chr8| 43018497|       A|       G|NULL|     Y| HOOK3|ENST00000307602.9|STL|           c.2156A>G| p.Ter719TrpextTer84|    22|{"HOOK3": [["Q86V...|        NULL|                NULL|                 NULL|                  NULL|                NULL|       NULL|             NULL|
| 19| chr9|107489172|       A|       T|NULL|     Y|  KLF4|ENST00000374672.5|SYN|              c.1T>A|             p.Met1=|     1|{"ENSG00000289987...|        NULL|                NULL|                 NULL|                  NULL|                NULL|       NULL|             NULL|
+---+-----+---------+--------+--------+----+------+------+-----------------+---+--------------------+--------------------+------+--------------------+------------+--------------------+---------------------+----------------------+--------------------+-----------+-----------------+
only showing top 20 rows

The annotated variants can be saved as Parquet files.

df.write.parquet("annotated_variants.parquet")

We are considering adding more helper methods to OakVar to make this process more ergonomic. Please let us know what you think at our Discord server at https://discord.gg/wZfkTMKTjG.