比较两个生产级NLP库:运行Spark-NLP和spaCy的管道
一篇详细步骤的指导,从初始化库开始,到加载数据并使用Spark-NLP和spaCy来训练一个分词器模型。

本博客系列的第一篇中我介绍了两个自然语言处理库(John Snow Labs的Apache Spark NLPExplosion AI的spaCy),并用它们训练了分词和词性标注的模型。在本篇中我将继续构建并运行一个自然语言处理(NLP)管道,来把这些训练好的模型应用于新的文本数据。

导入测试数据是一个具有挑战性的步骤,因为我的测试数据是由未格式化的、未进行句子边界界定的文本构成的,是粗糙的和异构的。我要处理一个包含.txt文件的文件夹,并且需要将结果保存为文字标签的格式以便将其与正确的答案进行比较。下面让我们来解决它:

spaCy

start = time.time()

path = “./target/testing/”

files = sorted([path + f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))])

prediction = {}

for file in files:

    fo = io.open(file, mode=’r’, encoding=’utf-8′)

    content = []

    for doc in nlp_model(re.sub(“\\s+”, ‘ ‘, fo.read())):

        content.append((doc.text, doc.tag_))

    prediction[file] = content

    fo.close()

    

print (time.time() – start)

另一种并行计算方法是使用generator和spaCy的语言管道。 像下面这样的方式也可以解决数据导入的问题。

spaCy

from spacy.language import Language

import itertools

def genf():

    path = “./target/testing/”

    files = sorted([path + f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))])

    for file in files:

        fo = io.open(file, mode=’r’, encoding=’utf-8′)

        t = re.sub(“\\s+”, ‘ ‘, fo.read())

        fo.close()

        yield (file, t)

        

gen1, gen2 = itertools.tee(genf())

files = (file for (file, text) in gen1)

texts = (text for (file, text) in gen2)

start = time.time()

prediction = {}

for file, doc in zip(files, nlp_model.pipe(texts, batch_size=10, n_threads=12)):

    content = []

    for d in doc:

        content.append((d.text, d.tag_))

    prediction[file] = content

print (time.time() – start)

Spark-NLP

var data = spark.read.textFile(“./target/testing”).as[String]

    .map(_.trim()).filter(_.nonEmpty)

    .withColumnRenamed(“value”, “text”)

    .withColumn(“filename”, regexp_replace(input_file_name(), “file://”, “”))

data = data.groupBy(“filename”).agg(concat_ws(” “, collect_list(data(“text”))).as(“text”))

    .repartition(12)

    .cache

val files = data.select(“filename”).distinct.as[String].collect

val result = model.transform(data)

val prediction = Benchmark.time(“Time to collect predictions”) {

    result

        .select(“finished_token”, “finished_pos”, “filename”).as[(Array[String], Array[String], String)]

        .map(wt => (wt._3, wt._1.zip(wt._2)))

        .collect.toMap

}

和Apache Spark一样,可以用textFile()来从文件夹读取文本文件,虽然它是逐行读取的。 我需要识别每行的文件名,从而可以再次将它们组合起来。幸运的是,input_file_name()这就是这样实现的。 在读取文件后,继续分组,并用空格连接这些行。

请注意,上面的两个代码片段都没有使用NLP库特有的代码。spaCy的代码里用的是Python的文件操作,而Spark-NLP代码则使用了Spark的原生数据加载和处理的原语。可以看到,如果正确配置了Spark的群集,则上面的Spark代码在导入10KB、10MB或10 TB的文件上的工作方式是完全相同的。此外,你对每种库的学习曲线取决于你对这个库的生态系统的熟悉程度。

测量结果

如果我们迄今为止所做的看起来很难懂,那么这一部分会非常困难懂。当我们的答案以不同方式分词时,我们如何衡量POS准确性? 我不得不在这里施展一些“魔术”,而我相信这会带来争议。没有简单的方法可以客观地计算结果,公平地比较,所以下面是我想出的方法。

spaCy

首先,我需要处理结果文件夹。该文件夹里有很多.txt文件,看起来与训练数据完全一样,有与上一步中使用的测试数据相同的文件名。

answers = {}

total_words = 0

for file in result_files:

    fo = io.open(file, mode=’r’, encoding=’utf-8′)

    file_tags = []

    for pair in re.split(“\\s+”, re.sub(“\\s+”, ‘ ‘, fo.read())):

        total_words += 1

        tag = pair.strip().split(“|”)

        file_tags.append((tag[0], tag[-1]))

    answers[file] = file_tags

    fo.close()

print(total_words)

Spark-NLP

对于Spark-NLP,我用了与解析POS注释器元组相同的函数。它属于一个名为ResourceHelper的助手对象。ResourceHelper中有许多类似的帮助功能。

var total_words = 0

val answer = files.map(_.replace(“testing”, “answer”)).map(file => {

    val content = ResourceHelper

        .parseTupleSentences(file, “TXT”, ‘|’, 500).flatMap(_.tupleWords)

        .flatMap(_.tupleWords)

    total_words += content.length

    (file, content)

}).toMap

println()

println(total_words)

下面就是“魔术”发生的地方。我有这样的答案,在spaCy和Spark-NLP中都有像(文件名,数组((word,tag))的字典。这与我们在预测步骤中使用的格式相同。

所以,我创建了一个小算法。在该算法中,我比较每一对预测和答案中的单词。 对于每对匹配的单词,我加一个匹配的标记。并对于每对匹配的POS标识,我都计一次成功。

但是,由于单词的分词方式与ANC的结果不同,因此我需要设置一个小窗口,让单词有机会与ANC中特定数量的分词相匹配,或者知道在文件的哪里继续开始搜索。

如果预测的单词是次分词的(比ANC少的分词数),那么这个单词将永远不会匹配并被忽略。例如,two-legged|JJ在ANC中是two| NN和legged| JJ,那么我将收集并拼接ANC分词直到确认它被次分词了,然后忽略它(下面代码里的construct是次分词的集合)。

然后,将索引放置在最近的匹配位置。所以如果预测的词出现较晚(由于前一个词被次分词),它最终将能在此范围内被找到,并对其进行计数。

以下是上述算法在代码中的部分:

spaCy

start = time.time()

word_matches = 0

tag_matches = 0

for file in list(prediction.keys()):

    last_match = 0

    print(“analyzing: ” + file)

    for pword, ptag in answers[file.replace(‘testing’, ‘answer’)]:

        print(“target word is: ” + pword)

        last_attempt = 0

        construct = ”

        for word, tag in prediction[file][last_match:]:

            if word.strip() == ”:

                last_match += 1

                continue

            construct += word

            print(“against: ” + word + ” or completion of construct ” + pword)

            last_attempt += 1

            if pword == word:

                print(“word found: ” + word)

                if ptag == tag:

                    print(“match found: ” + word + ” as ” + tag)

                    tag_matches += 1

                word_matches += 1

                last_match += last_attempt

                break

            elif pword == construct:

                print(pword + ” construct complete. No point in keeping the search”)

                last_match += last_attempt

                break

            elif len(pword) <= len(construct):

                print(pword + ” construct larger than target. No point in keeping the search”)

                if (pword in construct):

                    last_match += last_attempt

                break

print (time.time() – start)

运行时间

analyzing: ./target/testing/20000424_nyt-NEW.txt

target word is: IRAQ-POVERTY

against: IRAQ-POVERTY or completion of construct IRAQ-POVERTY

word found: IRAQ-POVERTY

target word is: (

against: ( or completion of construct (

word found: (

match found: ( as (

target word is: Washington

against: Washington or completion of construct Washington

word found: Washington

match found: Washington as NNP

target word is: )

against: ) or completion of construct )

word found: )

match found: ) as )

target word is: Rep.

against: Rep or completion of construct Rep.

against: . or completion of construct Rep.

Rep. construct complete. No point in keeping the search

正如代码里所示,它会给分词一个找到匹配的机会。这个算法基本上能保持分词的同步,仅此而已。

print(“Total words: ” + str(total_words))

print(“Total token matches: ” + str(word_matches))

print(“Total tag matches: ” + str(tag_matches))

print(“Simple Accuracy: ” + str(tag_matches / word_matches))

运行时间

Total words: 21381

Total token matches: 20491

Total tag matches: 17056

Simple Accuracy: 0.832365428724806

Spark-NLP

我们在Spark-NLP中看到类似的逻辑,只是代码是命令式和函数式编程的一点混合。

var misses = 0

var token_matches = 0

var tag_matches = 0

for (file <- prediction.keys) {

    var current = 0

    for ((pword, ptag) <- prediction(file)) {

        println(s”analyzing: ${pword}”)

        var found = false

        val tags = answer(file.replace(“testing”, “answer”))

        var construct = “”

        var attempt = 0

        tags.takeRight(tags.length – current).iterator.takeWhile(_ => !found && construct != pword).foreach { case (word, tag) => {

            construct += word

            println(s”against: $word or if matches construct $construct”)

            if (word == pword) {

                println(s”word match: $pword”)

                token_matches += 1

                if (tag == ptag) {

                    println(s”tag match: $tag”)

                    tag_matches += 1

                }

                found = true

            }

            else if (pword.length < construct.length) {

                if (attempt > 0) {

                    println(s”construct $construct too long for word $pword against $word”)

                    attempt -= attempt

                    misses += 1

                    println(s”failed match our $pword against their $word or $construct”)

                    found = true

                }

            }

            attempt += 1

        }}

        current += attempt

    }

}

运行时间

analyzing: NYT20020731.0371

against: NYT20020731.0371 or if matches construct NYT20020731.0371

word match: NYT20020731.0371

analyzing: 2002-07-31

against: 2002-07-31 or if matches construct 2002-07-31

word match: 2002-07-31

analyzing: 23:38

against: 23:38 or if matches construct 23:38

word match: 23:38

analyzing: A4917

against: A4917 or if matches construct A4917

word match: A4917

analyzing: &

against: & or if matches construct &

word match: &

tag match: CC

下面计算指标。

println(“Total words: ” + (total_words))

println(“Total token matches: ” + (token_matches))

println(“Total tag matches: ” + (tag_matches))

println(“Simple Accuracy: ” + (tag_matches * 1.0 / token_matches))

运行时间

Total words: 21362

Total token matches: 18201

Total tag matches: 15318

Simple Accuracy: 0.8416021097741883

Spark-NLP,但是Spark在哪里?

至此我并没有讨论太多Spark在这里的作用。但是,为了不让你进一步感到无聊,让我把所有内容都放在下面易于阅读的格式里:

  • 你猜对了!这里所做的对Spark来说是杀鸡用牛刀。你不需要用四把锤子来砸一个钉子。我的意思是,Spark在默认情况下是以分布式方式在工作,用于处理非常大的数据(例如默认的spark.sql.shuffle.partitions为200)。在本文的场景里,我试图控制我使用的数据量,而不是扩大太多。 “大数据”可能会成为敌人,内存问题、糟糕的并行性或者慢的算法可能会让你掉到坑里。
  • 您可以轻松地将HDFS用于这里的管道。增加文件的数量或增加数据量,同样的代码也能够处理。你也可以充分利用恰当的分区数和内存计算。
  • 也同样可以将此算法用于一个分布式群集,将其提交到任何节点上的driver,Spark将自动进行分布式的工作负载分配。
  • 这个特定的NLP程序并不是一个好的可扩展的解决方案。基本上,如果应用到几百MB的文本文件中,那么使用collect()操作会将所有单词和标签收集到driver里,就会导致driver的内存不足。在这种情况下,测量分词的表现就需要用MapReduce任务来计算POS匹配的数量。这就解释了为什么Spark-NLP需要比SpaCy更长的时间才能将少量的预测结果传入driver,但是对于大数据量的输入这就会有优势了。

下一步做什么?

在这篇文章中,我们比较了在两个库上运行和评估基准NLP管道的工作。 总体来说,个人偏好或经验可能会使用户更倾向于使用核心Python库和spaCy这样的命令式编程,或是选择核心Spark库和Spark-NLP这样的函数式编程。

对于我们在这里测试的小数据集,两种库的运行时间均少于1秒,并且准确度相当。在此博客系列的第三篇中,我们将用其他大小的数据和参数继续评估。

相关资料:

Saif Addin Ellafi

Saif Addin Ellafi是一名软件开发者、分析师、数据科学家,并永远是一名学生。他同时还是一名极限运动和游戏爱好者。他在银行和金融行业的数据领域拥有丰富的解决问题和测试的经验。现在他在John Snow Labs,并是Spark-NLP的主要贡献者。

Fractal piping (source: Pixabay)