从kafka读取数据并入库(mysql)
•
大数据
从kafka读取数据并入库
- 层级结构
-
- 主类
- 配置类(config)
- 监听类(liston)
- mapper类(mapper)
- service
- service的实现类
- 实体类(vo)
- application.properties
层级结构

主类
@SpringBootApplication
@MapperScan("这里填充该项目mapper的相对路径") //用来扫描mapper包下的所有mapper
@Slf4j
public class GzBreakDownApplication {
public static void main(String[] args) {
SpringApplication.run(GzBreakDownApplication.class, args);
log.info("程序启动,开始监听kafka消息");
}
}
配置类(config)
@SpringBootConfiguration
public class KafkaConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Value("${spring.kafka.topics}")
private String[] topics;
@Value("${spring.kafka.concurrency}")
private int concurrency;
@Autowired
private MyKafkaMessageListener myListener;
// 多线程接收多主题多分区,每个主题每个分区一个线程,大大提升收取效率
@Bean
public ConcurrentMessageListenerContainer concurrentMessageListenerContainer() {
ContainerProperties containerProps = new ContainerProperties(topics);
containerProps.setMessageListener(myListener);
Map map = kafkaProperties.buildConsumerProperties();
map.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
DefaultKafkaConsumerFactory dkConsumerFactory = new DefaultKafkaConsumerFactory(map);
ConcurrentMessageListenerContainer concurrentMessageListenerContainer = new ConcurrentMessageListenerContainer(dkConsumerFactory, containerProps);
concurrentMessageListenerContainer.setConcurrency(concurrency); // 允许的最大现场数
// int concurrency = concurrentMessageListenerContainer.getConcurrency();
return concurrentMessageListenerContainer;
}
}
监听类(liston)
@Component
@Slf4j
public class MyKafkaMessageListener implements MessageListener {
// 多线程接收消息 并处理
@Override
public void onMessage(Object data) {
ConsumerRecord consumerRecord= (ConsumerRecord) data;
StringBuffer stringBuffer=new StringBuffer();
Object value = consumerRecord.value();
stringBuffer.append(value);
String s = stringBuffer.toString();
//开始处理消息
...
}
}
mapper类(mapper)
@Repository
public interface xxxMapper extends BaseMapper {
}
service
public interface xxxService extends IService {
/**
* 更新并入库
* @param msg
*/
void parseMsg(String msg);
}
service的实现类
@Service
@Slf4j
public class xxxServiceImpl extends ServiceImpl implements xxxService {
@Override
public synchronized void parseMsg(String msg) {
log.info("数据:"+msg);
xxxVO xxVO = JSON.parseObject(msg, xxxVO.class);
//查找 唯一的id是否已经存入数据库
QueryWrapper queryWrapper = new QueryWrapper();
queryWrapper.eq("id",xxVO.get.Id());
xxxVO one = this.getOne(queryWrapper);
if(one!=null){//存在 ,判断更新时间是否大于数据库存入的更新时间
//结果是 -1, 负数:表示 time1 小于 time2 如果是正数,则表示大于后者 如果是0:那就表示相等
if(one.getUpdateTime().compareTo(xxVO.getUpdateTime())<=0){
log.info("更新时间:"+xxVO.getUpdateTime()+" ,编号:"+xxVO.getId());
UpdateWrapper updateWrapper = new UpdateWrapper();
updateWrapper.like("bill_id",xxxVO.getId());
this.update(xxVO,updateWrapper);
}
}else{ //不存在 直接入库
log.info("数据开始入库");
this.save(xxVO);
}
log.info("入库完成");
}
}
实体类(vo)
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
@TableName(value = "数据库表名")
public class xxxVO {
@TableField(value = "id") //映射数据库表字段名
private String id ;
@TableField(value = "update_time")
private String updateTime ;
}
application.properties
#------mysql连接信息------ spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/数据库名?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC spring.datasource.username=用户名 spring.datasource.password=密码 #-----------mybatis--------- mybatis-plus.configuration.map-underscore-to-camel-case=false #------Kafka配置信息------- # kafka集群 三个自己配置的集群xxx spring.kafka.bootstrap-servers=xxx,xxx,xxx spring.kafka.consumer.group-id=groupId2 spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=1000 spring.kafka.topics=自定义消费主题 # max thread size spring.kafka.concurrency=9 logging.file.name=自定义日志位置 logging.file.max-size=300MB
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/88e5a3647a.html
