编辑代码

package com.example.demo.client;

import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

@Slf4j
public class DefaultWebSocketClient<T> extends WebSocketClient {

    private static final int connectTimeout = 2000;
    private static final Draft protocolDraft = new Draft_6455();
    private static final Map<String, String> httpHeaders = new HashMap<>();

    private final Object lock = new Object();
    private Object result = null;
    private CountDownLatch downLatch = null;

    public DefaultWebSocketClient(String serverUri) throws URISyntaxException {
        super(new URI(serverUri), protocolDraft, httpHeaders, connectTimeout);
    }

    public DefaultWebSocketClient(URI serverUri) {
        super(serverUri, protocolDraft, httpHeaders, connectTimeout);
    }

    public DefaultWebSocketClient(URI serverUri, Draft protocolDraft) {
        super(serverUri, protocolDraft, httpHeaders, connectTimeout);
    }

    public DefaultWebSocketClient(URI serverUri, Map<String, String> httpHeaders) {
        super(serverUri, protocolDraft, httpHeaders, connectTimeout);
    }

    public DefaultWebSocketClient(URI serverUri, Draft protocolDraft, Map<String, String> httpHeaders) {
        super(serverUri, protocolDraft, httpHeaders, connectTimeout);
    }

    public DefaultWebSocketClient(URI serverUri, Draft protocolDraft, Map<String, String> httpHeaders, int connectTimeout) {
        super(serverUri, protocolDraft, httpHeaders, connectTimeout);
    }

    public T sendExt(Object obj) {
        log.debug("DefaultWebSocketClient.sendExt");
        synchronized (lock){
            try {
                downLatch = new CountDownLatch(1);
                if (obj instanceof String) {
                    send((String) obj);
                }else if (obj instanceof byte[]){
                    send((byte[]) obj);
                }else{
                    throw new RuntimeException("不支持的参数类型");
                }
                downLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                this.close();
                return (T) result;
            }
        }
    }

    @Override
    public void onOpen(ServerHandshake handshakedata) {
        log.debug("DefaultWebSocketClient.onOpen");
    }

    @Override
    public void onMessage(String message) {
        log.debug("DefaultWebSocketClient.onMessage");
        result = message;
        if(downLatch!=null){
            downLatch.countDown();
        }
    }

    @Override
    public void onMessage(ByteBuffer bytes) {
        log.debug("DefaultWebSocketClient.onMessage-bytes");
        result = bytes;
        if(downLatch!=null){
            downLatch.countDown();
        }
    }

    @Override
    public void onClose(int code, String reason, boolean remote) {
        log.debug("DefaultWebSocketClient.onClose");
    }

    @Override
    public void onError(Exception ex) {
        log.debug("DefaultWebSocketClient.onError {}", ex.getMessage());
    }

    public static void main(String[] args) {
        try {
            DefaultWebSocketClient<String> client = new DefaultWebSocketClient<>("ws://localhost:10714/test/one");
            client.connectBlocking();
            String result = client.sendExt("我来了" + new Date());
            System.out.println("result = " + result);
        } catch (URISyntaxException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}