计算道路的平均速度

我参加了一个数据工程师的工作面试。面试官问了我一个问题。他给了我一些情况,并要求我为那个系统设计数据流。我解决了这个问题,但他不喜欢我的解决方案,我失败了。我想知道你是否有更好的想法来解决这个挑战。 问题是: 我们的系统接收四个数据流。该数据包含车辆 id 、速度和地理位置坐标。每个 vihicle 每分钟发送一次数据。特定的河流与特定的道路或 vihicle 或其他任何东西之间没有联系。有一个函数接受坐标并返回路段名称。我们需要知道每 5 分钟每个路段的平均速度。最后,我们想把结果写给卡夫卡。 enter image description here [https://i.stack.imgur.com/KzDqe.png] [https://i.stack.imgur.com/KzDqe.png] 所以我的解决方案是: 首先将所有数据写入卡夫卡集群,写入一个主题,由连接经度的 5-6 个纬度的第一个数字与 5-6 个经度的第一个数字分开。然后通过结构化流读取数据,通过坐标为每一行添加路段名称 (有一个预定义的 udf),然后通过路段名称收集数据。 因为我将 Kafka 中的数据划分为坐标的 5 ……

共6个回答,已解决, 标签: apache-spark apache-kafka bigdata stream-processing
火花读取分割数据在 S3 部分在冰川

我在 S3 中的实木复合地板中有一个按日期 (dt) 划分的实木复合地板数据集, 最古老的日期存储在 AWS 冰川中, 以节省一些资金。例如, 我们有..。 s3://my-bucket/my-dataset/dt=2017-07-01/ [in glacier] ... s3://my-bucket/my-dataset/dt=2017-07-09/ [in glacier] s3://my-bucket/my-dataset/dt=2017-07-10/ [not in glacier] ... s3://my-bucket/my-dataset/dt=2017-07-24/ [not in glacier] 我想阅读此数据集, 但只阅读尚未在冰川中的日期子集, 例如: val from = "2017-07-15" val to = "2017-08-24" val path = "s3://my-bucket/my-dataset/" val X = spark.read.parquet(path).where(col("dt").between(from, to)) ……

共4个回答, 标签: apache-spark amazon-s3 partitioning amazon-glacier
使用 Spark 将 CSV 内容读取为 null

我试图读取 CSV 文件,以便使用 Spark SQL 查询它。CSV 如下所示: 16; 10; 9/6/2018 CSV 文件不包含标题,但是我们知道第一列是部门代码,第二列是构建代码,第三列是 m/d/YYYY 格式的日期。 我编写了以下代码来使用自定义模式加载 CSV 文件: Sch StructType = 数据类型.createStructType (新 StructField [] { 数据类型。 create structfield (“department”,数据类型。整数类型,true), 数据类型。 create structfield (“构建”,数据类型。整数类型,false), 数据类型。 create structfield (“date”,数据类型。 date type,true), }); 数据集 csvLoad = sparkSession.read ().format (“csv”) 。选项 (“分隔符”,“;”) 。模式 (sch) 。选项 (“标题” 、 “false”) 。Load (filfilepath); 显示 (2); 当我使用csvLoa ……

共2个回答, 标签: java apache-spark
将 Python 函数应用于熊猫分组数据帧-什么是最有效的方法来加快计算?

的赏金 [https://stackoverflow.com/help/bounty]8 小时后到期。这个问题的答案有资格获得 50 的声誉奖金。库巴 _ [/users/7383971/kuba]想要引起更多关注对于这个问题。 我正在处理相当大的熊猫数据框-我的数据集类似于以下内容df设置: 将熊猫作为 pd 导入 将 numpy 导入为 np # SIZING 大小参数: R1 = 20 #。重复 (重复 = R1) R2 = 10 #。重复 (重复 = R2) R3 = 541680 #。重复 (重复 = [R3,R4]) R4 = 576720 #。重复 (重复 = [R3,R4]) T = 55920 #。平铺 (,T) A1 = np.arange (0,2708400,100) # ~ 20x 重复使用 A2 = np.arange (0,2883600,100) # ~ 20x 重复使用 # --------------------------------------------- DataFrame 代: Df = pd.DataFrame.From _ dict ( {'Measu ……

共2个回答, 标签: python pandas apache-spark parallel-processing dask
Spark: 为什么 Python 在我的用例中明显优于 Scala?

这个赏金 [https://stackoverflow.com/help/bounty]已经结束。这个问题的答案有资格获得 + 100 的声誉奖励。赏金宽限期在 6 小时后结束。User10938362 [/users/10938362/user10938362]想要引起更多关注对于这个问题。 为了比较 Spark 在使用 Python 和 Scala 时的性能,我用两种语言创建了相同的作业,并比较了运行时。我预计这两个工作需要大致相同的时间,但 Python 工作只需要27min, while Scala job took 37min (almost 40% longer!). I implemented the same job in Java as well and it took 37minutes也是。这怎么可能是 Python 这么快? 最小可验证示例: Python 作业: # 配置 Conf = pyspark.SparkConf () Conf.set ("spark.hadoop.fs.s3a.aws.Creditions.provider","org.apache.hadoop. ……

共2个回答,已解决, 标签: python scala apache-spark pyspark
火花是否优化了在垃圾公园中相同但独立的 Dag?

请考虑以下垃圾公园代码 def transformed_data(spark): df = spark.read.json('data.json') df = expensive_transformation(df) # (A) return df df1 = transformed_data(spark) df = transformed_data(spark) df1 = foo_transform(df1) df = bar_transform(df) return df.join(df1) 我的问题是: 在中优化的操作是否定义为 (A), transformed_data final_view 因此只执行一次? 请注意, 此代码不等效于 df1 = transformed_data(spark) df = df1 df1 = foo_transform(df1) df = bar_transform(df) df.join(df1) (至少从 Python 的角度来看, id(df1) = id(df) 在这种情况下。 更广泛的问题是: 在优化两个 ……

共1个回答, 标签: apache-spark pyspark
如何在 spark 2.4 中使用 jdbc 运行 “从表中删除”?

我正在使用这样的代码: Spark.read.format (“jdbc”).options (Map (“url”-> “jdbc: url”) 我需要用一个删除自. ……

共0个回答, 标签: scala apache-spark
如何在 Spark 中有效地更新 RDD 中的一些数据值?

我的 RDD 包含数字和他们的属性。例如(1,(4,5)),(2,(8,6))等等。 我想根据某些条件更新一些行的值。如果我总是让新的 RDD 只是更新行值,这将是如此耗时。 是否有任何其他方式来更新值?(考虑 rdd 是图的顶点。每次更新都要制作新的图表,这将是非常昂贵的。) ……

共0个回答, 标签: apache-spark