Article directory
-
- Environment | Environment
- Background | Background
- Reproduction steps | Reproduction steps
-
- production news
- Consume news
- Error log | Error log
- Cause Analysis | Analysis
- Solution | Solution
- References | References
Talk is cheap, show me the code.
Environment | Environment
k | version |
---|---|
OS | windows 10 |
jdk | 1.8 |
maven | 3.6.0 |
spring boot | 2.3.4.RELEASE |
redis | 5.0.10 |
Background | Background
When I test redis stream, I need to compare using xdel
to delete messages after consuming messages and not using xdel
to delete messages after consuming messages.
The purpose of this comparative experiment is to study whether not using xdel
to delete consumed messages will occupy more redis memory.
The experimental plan is to push 50,000 messages to the redis stream and compare the memory usage after consumption.
However, during the process of consuming data, when more than 44,000 pieces of data were consumed, an exception occurred: org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis; nested exception is java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
Reproduction steps | Reproduction steps
First, 50,000 pieces of data were produced into the Redis stream at one time. During the data consumption process, an exception occurred when more than 44,000 pieces of data were consumed.
Production news
@RestController @RequestMapping("/redis/stream") public class RedisStreamController {<!-- --> @Autowired private RedisTemplate redisTemplate; @GetMapping("/push-data") public String pushData() {<!-- --> for (int i = 0; i < 5_0000; i + + ) {<!-- --> Map<String, String> map = new HashMap<>(); map.put("k", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS").format(LocalDateTime.now())); RecordId recordId = redisTemplate.opsForStream().add("stream_test1", map); } return "Success"; } }
Consuming messages
@Slf4j @Component public class Stream1Listener extends AbstractAutoRetryStreamListener<String, MapRecord<String, String, String>> {<!-- --> @Override public void doOnMessage(MapRecord<String, String, String> message) {<!-- --> Map<String, String> msgMap = message.getValue(); // do something double random = Math.random(); if (random > 0.5) {<!-- --> log.warn("consume failure, consumer={} message={} random={}", getFullName(), message.getValue(), random); throw new RuntimeException("Message processing failed"); } log.info("consume success, consumer={} message={}", getFullName(), message.getValue()); } @Override public String getStream() {<!-- --> return "stream_test1"; } @Override protected long maxRetries() {<!-- --> return 0L; } }
Error log | Error log
Caused by: java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379 at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ... 30 more Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379 at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:78) at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:56) at io.lettuce.core.RedisClient.lambda$transformAsyncConnectionException$20(RedisClient.java:767) at io.lettuce.core.DefaultConnectionFuture.lambda$thenCompose$1(DefaultConnectionFuture.java:253) ... 22 more Caused by: java.util.concurrent.CompletionException: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379 at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ... 20 more Caused by: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379 Caused by: java.net.BindException: Address already in use: no further information at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:834)
Cause Analysis | Analysis
- The solution ideas were obtained through the following reference articles.
Solution | Solution
Open the Widnows command line window ( Run as administrator
) and execute the following command:
netsh int ipv4 set dynamicport tcp start=20000 num=90000
This command is the Windows Netsh tool command, used to configure the network interface. Specifically,
netsh int ipv4 set dynamicport tcp start=20000 num=90000
This command sets the dynamic port range of the TCP protocol. Among them, ‘start=20000’ represents the starting value of the dynamic port, and ‘num=90000’ represents the size of the port range, that is, starting from 20000, there are a total of 90000 consecutive ports available for dynamic allocation.
References | References
CSDN – Concurrency test exception [Address already in use: no further infomation]
cnpython – java Spring Redis stream consumer stopped consuming messages (address is already in use)
Zhihu – [Netty] Problems caused by the upper limit of network connections