spark streaming如何日志处理?

我现在有一个数据流,通过kafka的一个topic流入spark streaming,然后处理这个流,然后再把这个流写入kafka的另一个topic。这个数据流是一个日志流,有时间、日志级别(info,warning)等等,两行数据为一个处理单元,在spark streaming里怎么处理这个?比如我要把所有的时间格式化,在RDD中怎么处理? 

 

以下为日志:

May 11, 2015 2:39:51 PM org.apache.cxf.jaxrs.utils.JAXRSUtils findTargetMethod
WARNING: No operation matching request path "/restServer/recentlyTask" is found, Relative Path: /, HTTP Method: GET, ContentType: */*, Accept: */*,. Please enable FINE/TRACE log level for more details.
May 11, 2015 2:43:58 PM org.apache.cxf.jaxrs.utils.JAXRSUtils findTargetMethod
WARNING: No operation matching request path "/restServer/myTask/1" is found, Relative Path: /1, HTTP Method: POST, ContentType: */*, Accept: */*,. Please enable FINE/TRACE log level for more details.
May 11, 2015 3:04:32 PM org.apache.cxf.jaxrs.utils.JAXRSUtils findTargetMethod
WARNING: No operation matching request path "/restServer/recentlyTask" is found, Relative Path: /, HTTP Method: GET, ContentType: */*, Accept: */*,. Please enable FINE/TRACE log level for more details.
May 11, 2015 5:08:10 PM org.apache.catalina.core.AprLifecycleListener init
INFO: The APR based Apache Tomcat Native library which allows optimal performance in production environments was not found on the java.library.path: /usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
May 11, 2015 5:08:10 PM org.apache.tomcat.util.digester.SetPropertiesRule begin
WARNING: [SetPropertiesRule]{Server/Service/Engine/Host/Context} Setting property 'debug' to '0' did not find a matching property.
May 11, 2015 5:08:11 PM org.apache.coyote.http11.Http11Protocol init
INFO: Initializing Coyote HTTP/1.1 on http-80
May 11, 2015 5:08:11 PM org.apache.catalina.startup.Catalina load
INFO: Initialization processed in 1889 ms
May 11, 2015 5:08:11 PM org.apache.catalina.core.StandardService start
INFO: Starting service Catalina
May 11, 2015 5:08:11 PM org.apache.catalina.core.StandardEngine start
INFO: Starting Servlet Engine: Apache Tomcat/6.0.41
May 11, 2015 5:08:11 PM org.apache.catalina.loader.WebappClassLoader validateJarFile
INFO: validateJarFile(/tasksys/tasksys/webapps/tasksys/WEB-INF/lib/servlet-api-2.5-6.1.9.jar) - jar not loaded. See Servlet Spec 2.3, section 9.7.2. Offending class: javax/servlet/Servlet.class
May 11, 2015 5:08:17 PM org.apache.catalina.startup.HostConfig deployDirectory
INFO: Deploying web application directory restServer
May 11, 2015 5:08:17 PM org.apache.catalina.loader.WebappClassLoader validateJarFile
INFO: validateJarFile(/tasksys/tasksys/webapps/restServer/WEB-INF/lib/servlet-api-2.5-6.1.9.jar) - jar not loaded. See Servlet Spec 2.3, section 9.7.2. Offending class: javax/servlet/Servlet.class
May 11, 2015 5:08:19 PM org.apache.catalina.startup.HostConfig deployDirectory
INFO: Deploying web application directory paas
May 11, 2015 5:08:19 PM org.apache.catalina.loader.WebappClassLoader validateJarFile
INFO: validateJarFile(/tasksys/tasksys/webapps/paas/WEB-INF/lib/servlet-api-2.5-6.1.9.jar) - jar not loaded. See Servlet Spec 2.3, section 9.7.2. Offending class: javax/servlet/Servlet.class
May 11, 2015 5:08:22 PM org.apache.catalina.startup.HostConfig deployDirectory
INFO: Deploying web application directory tasksys
May 11, 2015 5:08:22 PM org.apache.catalina.loader.WebappClassLoader validateJarFile
INFO: validateJarFile(/tasksys/tasksys/webapps/tasksys/WEB-INF/lib/servlet-api-2.5-6.1.9.jar) - jar not loaded. See Servlet Spec 2.3, section 9.7.2. Offending class: javax/servlet/Servlet.class
May 11, 2015 5:08:25 PM org.apache.coyote.http11.Http11Protocol start
INFO: Starting Coyote HTTP/1.1 on http-80

封尘 - 如:80后IT男..

赞同来自:

看了spark streaming源码包下的所有示例代码,都是关于各种wordcount的,没有具体的对日志流进行解析的,想请教老师如何对数据流进行解析,谢谢!

封尘 - 如:80后IT男..

赞同来自:

1、日志关联,在spark streaming中做两个数据源的关联合并,字典表一般在mysql里面,在spark中将该表加载到spark内存里面和日志数据做关联操作,如果能关联上就将字典表的字段追加到日志中,如果关联不上就不处理日志。 日志级别字典表: INFO 正常 WARNING 警告 日志分类字典表: *jar not loaded* 缺少jar包 第一个字段是用正则匹配   2、日志分词,在spark streaming里面直接调用IKAnalyzer对日志内容字段做分词,分词追加到日志中   以上的处理结果也是放到kafka里面

zp0824 - 好好学习,天天向上

赞同来自:

1、两行是一个处理单元,那你写入kafka的时候是一下子写两行么?如果不是的话,处理起来就会非常麻烦了,读的时候要一次读取2条相邻的数据,读取效率会比较低下的。 2、你要把时间格式化,DStream有一个foreachRDD的方法,在这里面就可以调用rdd的foreach方法就可以对每个数据进行处理,你可以对你的时间字段进行格式化

封尘 - 如:80后IT男..

赞同来自:

首先表示感谢! 下面是我把json写入kafka的代码package com.ultrapower.kafka.test; import java.util.ArrayList; import java.util.List; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage;   import kafka.producer.ProducerConfig; public class InsertKafka {     public static void main(String[] args) {         Properties props = new Properties();           props.setProperty("metadata.broker.list","192.168.101.122:9092");           props.setProperty("serializer.class","kafka.serializer.StringEncoder");           props.put("request.required.acks","1");           ProducerConfig config = new ProducerConfig(props);           Producer producer = new Producer(config);                    String json = "{\"name\": \"John222\",\"age\": 22,\"aa\": 122}";         KeyedMessage data = new KeyedMessage("eskafka",json);           try {               while(true){                 producer.send(data);                  System.out.println(json);                 Thread.sleep(1000);             }         } catch (Exception e) {               e.printStackTrace();           }           producer.close();       } }我如何把上面的那个日志按两行来写入kafka呢? 求解答

封尘 - 如:80后IT男..

赞同来自:

怎么把json改成每次输入log日志的两行数据,并以流的形式输入?

要回复问题请先登录注册