N
N
nurzhannogerbek2018-12-21 08:48:35
Java
nurzhannogerbek, 2018-12-21 08:48:35

How to filter data for a specific period in Spark?

Hello comrades! Please help me figure it out. Have n't worked
with Spark before. I'm trying to figure it out with a simple example. Suppose there is a large file with the following structure (see below). It stores the date, mobile number and its status at that time.

| CREATE_DATE         | MOBILE_KEY | STATUS |
|---------------------|------------|--------|
| 2018-11-28 00:00:00 | 8792548575 | IN     |
| 2018-11-29 20:00:00 | 7052548575 | OUT    |
| 2018-11-30 07:30:00 | 7772548575 | IN     |

How to properly filter all data for a specified period for specific mobile numbers? For example, I receive the following data as input:
val dateFrom = "2018-10-01"
val dateTo = "2018-11-05"
val numbers = "7778529636,745128598,7777533575"

val arr = numbers.split(",") // Создать массив из мобильных номеров

spark.read.parquet("fs://path/file.parquet").filter(???)

Answer the question

In order to leave comments, you need to log in

2 answer(s)
⚡ Kotobotov ⚡, 2018-12-21
@angrySCV

you can just try to filter as you write, for this, at the beginning, get a specific structure and data type:

источникДанных
  .мап(созданиеСтруктуры)
  .фильтр(текущаяЗапись => СписокТребуемыхНомеров.содержит(текущаяЗапись.телефон) 
    && текущаяЗапись.дата<>требуемыйИнтервал)

this will work, but for a very long time, slowly and will gobble up a bunch of resources on one machine - this is not what spark is used for, spark is an engine for distributed computing. And in order to start distributed computing, you first need to create a "key"->"value" pair (where the key is the phone number, and the value is all other data), these pairs will be distributed among the nodes where they will be processed in parallel, and then the result of parallel processing will be aggregated into one general result, and for this purpose, use not a filter, but reduceByKey with aggregate, for parallel collection of keys and values ​​for these keys.

M
Mikhail Potanin, 2018-12-27
@potan

Date in ISO format can be compared as strings. Arrange the list of telephones as a set.
Something like
val arr = numbers.split(",").toSet
spark.read.parquet("fs://path/file.parquet").filter(t => t("CREATE_DATE") < dateTo && t("CREATE_DATE") > dateFrom && arr(t("MOBILE_KEY")))
I don't know exactly how to access record fields in SPARC, maybe it will be necessary to redo it a bit.

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question