当前位置: 首页 > news >正文

南宁网站设计和开发大赛如何把网站做好

南宁网站设计和开发大赛,如何把网站做好,佛山网站seo公司,CQ网络科技网站建设我正在使用Kafka Consumer API将所有数据从Kafka主题复制到Hive表 . 为此,我使用HDFS作为中间步骤 . 我使用唯一的组ID并将偏移重置为“最早”,以便从头开始获取所有数据,并在执行后忽略提交 . 然后我遍历Kafka主题中的记录,并将每…

我正在使用Kafka Consumer API将所有数据从Kafka主题复制到Hive表 . 为此,我使用HDFS作为中间步骤 . 我使用唯一的组ID并将偏移重置为“最早”,以便从头开始获取所有数据,并在执行后忽略提交 . 然后我遍历Kafka主题中的记录,并将每条记录保存到HDFS中的临时文件中 . 然后我使用Spark从HDFS读取数据,然后使用日期作为文件名将其保存到Parquet文件中 . 然后,我在Hive表中创建一个带日期的分区,最后在Parquet中将文件作为分区加载到Hive中 .

正如您在下面的代码中看到的,我使用了几个中间步骤,这使得我的代码远非最佳 . 这是从Kafka主题复制所有数据的最佳推荐方法吗?我做了一些研究,到目前为止,这是我设法开始工作的变通方法,但是,随着记录数量每天增加,我的执行时间达到了可容忍的极限(从2分钟变为6分钟到6分钟)周) .

代码在这里:

def start( lowerDate: String, upperDate: String )={

// Configurations for kafka consumer

val conf = ConfigFactory.parseResources("properties.conf")

val brokersip = conf.getString("enrichment.brokers.value")

val topics_in = conf.getString("enrichment.topics_in.value")

// Crea la sesion de Spark

val spark = SparkSession

.builder()

.master("yarn")

.appName("ParaTiUserXY")

.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

import spark.implicits._

val properties = new Properties

properties.put("key.deserializer", classOf[StringDeserializer])

properties.put("value.deserializer", classOf[StringDeserializer])

properties.put("bootstrap.servers", brokersip)

properties.put("auto.offset.reset", "earliest")

properties.put("group.id", "ParaTiUserXYZZ12345")

//Schema para transformar los valores del topico de Kafka a JSON

val my_schema = new StructType()

.add("longitudCliente", StringType)

.add("latitudCliente", StringType)

.add("dni", StringType)

.add("alias", StringType)

.add("segmentoCliente", StringType)

.add("timestampCliente", StringType)

.add("dateCliente", StringType)

.add("timeCliente", StringType)

.add("tokenCliente", StringType)

.add("telefonoCliente", StringType)

val consumer = new KafkaConsumer[String, String](properties)

consumer.subscribe( util.Collections.singletonList("parati_rt_geoevents") )

val fs = {

val conf = new Configuration()

FileSystem.get(conf)

}

val temp_path:Path = new Path("hdfs:///tmp/s70956/tmpstgtopics")

if( fs.exists(temp_path)){

fs.delete(temp_path, true)

}

while(true)

{

val records=consumer.poll(100)

for (record

val data = record.value.toString

//println(data)

val dataos: FSDataOutputStream = fs.create(temp_path)

val bw: BufferedWriter = new BufferedWriter( new OutputStreamWriter(dataos, "UTF-8"))

bw.append(data)

bw.close

val data_schema = spark.read.schema(my_schema).json("hdfs:///tmp/s70956/tmpstgtopics")

val fechaCliente = data_schema.select("dateCliente").first.getString(0)

if( fechaCliente < upperDate && fechaCliente >= lowerDate){

data_schema.select("longitudCliente", "latitudCliente","dni", "alias",

"segmentoCliente", "timestampCliente", "dateCliente", "timeCliente",

"tokenCliente", "telefonoCliente")

.coalesce(1).write.mode(SaveMode.Append).parquet("/desa/landing/parati/xyuser/" + fechaCliente)

}

else if( fechaCliente < lowerDate){

//

}

else if( fechaCliente >= upperDate){

break;

}

}

}

consumer.close()

}

http://www.lebaoying.cn/news/329.html

相关文章: